当前位置: 首页 > news >正文

spring响应式编程系列:异步消费数据

目录

示例

大致流程

parallel

cache

PARALLEL_SUPPLIER

newParallel

init

publishOn

new MonoSubscribeOnValue

​​​​​​​subscribe

​​​​​​​new LambdaMonoSubscriber

​​​​​​​MonoSubscribeOnValue.subscribe

​​​​​​​onSubscribe

​​​​​​​request

​​​​​​​schedule

​​​​​​​directSchedule

​​​​​​​run

​​​​​​​onNext

时序图

类图

数据发布者

MonoSubscribeOnValue

调度器

ParallelScheduler

数据订阅者

LambdaMonoSubscriber

订阅的消息体

ScheduledScalar


       本篇文章我们来研究如何控制数据流在特定线程池上的执行。即将操作符的执行切换到指定调度器(Scheduler)的线程。在Project Reactor框架中,Mono.publishOn()就可以实现该功能,示例如下所示:

示例

CountDownLatch countDownLatch = new CountDownLatch(1);

// 创建一个包含数据的 Mono

Mono<String> mono = Mono.just("Hello, Reactive World!");

mono.publishOn(Schedulers.parallel())

    .subscribe(x -> {

        log.info("Sub thread, subscribe: {}", x);

        countDownLatch.countDown();

    });

log.info("Main thread, blocking");

countDownLatch.await();

       首先,通过mono.publishOn方法指定线程池;

       其次,通过subscribe指定的消费者处理逻辑会在前一步指定的线程池里执行。

大致流程

       点击Schedulers.parallel(),如下所示:

parallel

public static Scheduler parallel() {

return cache(CACHED_PARALLEL, PARALLEL, PARALLEL_SUPPLIER);

}

       在这里,调用cache方法并且PARALLEL_SUPPLIER参数,返回Scheduler调度器。cache方法如下所示:

cache

static CachedScheduler cache(AtomicReference<CachedScheduler> reference, String key, Supplier<Scheduler> supplier) {

CachedScheduler s = reference.get();

if (s != null) {

return s;

}

s = new CachedScheduler(key, supplier.get());

if (reference.compareAndSet(null, s)) {

return s;

}

... ...

}

       在这里,调用supplier.get()方法来生成CachedScheduler调度器。supplier.get()方法的定义如下所示:

PARALLEL_SUPPLIER

static final Supplier<Scheduler> PARALLEL_SUPPLIER =
      () -> newParallel(PARALLEL, DEFAULT_POOL_SIZE, true);

newParallel

public static Scheduler newParallel(int parallelism, ThreadFactory threadFactory) {

final Scheduler fromFactory = factory.newParallel(parallelism, threadFactory);

fromFactory.init();

return fromFactory;

}

       在这里,通过工厂方法模式创建Scheduler对象,并调用init()初始化方法,如下所示:

init

@Override

public void init() {

... ...

SchedulerState<ScheduledExecutorService[]> b =

SchedulerState.init(new ScheduledExecutorService[n]);

for (int i = 0; i < n; i++) {

b.currentResource[i] = Schedulers.decorateExecutorService(this, this.get());

}

if (!STATE.compareAndSet(this, null, b)) {

for (ScheduledExecutorService exec : b.currentResource) {

exec.shutdownNow();

}

if (isDisposed()) {

throw new IllegalStateException(

"Initializing a disposed scheduler is not permitted"

);

}

}}

       在这里,new了一个ScheduledExecutorService线程池对象作为调度器的底导线程池实现。

       点击示例里的mono.publishOn()方法,如下所示:

​​​​​​​publishOn

public final Mono<T> publishOn(Scheduler scheduler) {
   if(this instanceof Callable) {
      if (this instanceof Fuseable.ScalarCallable) {
         try {
            T value = block();
            return onAssembly(new MonoSubscribeOnValue<>(value, scheduler));
         }
         catch (Throwable t) {
            //leave MonoSubscribeOnCallable defer error
         }
      }
      @SuppressWarnings("unchecked")
      Callable<T> c = (Callable<T>)this;
      return onAssembly(new MonoSubscribeOnCallable<>(c, scheduler));
   }
   return onAssembly(new MonoPublishOn<>(this, scheduler));
}

       在这里,new 了一个MonoSubscribeOnValue对象,并且传递了scheduler对象作为构造参数。

       如下所示:

​​​​​​​new MonoSubscribeOnValue

final class MonoSubscribeOnValue<T> extends Mono<T> implements Scannable {
   final T value;
   final Scheduler scheduler;
   MonoSubscribeOnValue(@Nullable T value, Scheduler scheduler) {
      this.value = value;
      this.scheduler = Objects.requireNonNull(scheduler, "scheduler");
   }

       在这里,将入参scheduler作为MonoSubscribeOnValue对象的属性scheduler保留下来。

       点击示例里的mono.subscribe()方法,如下所示:

​​​​​​​subscribe

public final Disposable subscribe(
      @Nullable Consumer<? super T> consumer,
      @Nullable Consumer<? super Throwable> errorConsumer,
      @Nullable Runnable completeConsumer,
      @Nullable Context initialContext) {
   return subscribeWith(new LambdaMonoSubscriber<>(consumer, errorConsumer,
         completeConsumer, null, initialContext));
}

      在这里,new了一个LambdaMonoSubscriber对象,这里与《spring响应式编程系列:总体流程》类似。LambdaMonoSubscriber对象的构造函数如下所示:

​​​​​​​new LambdaMonoSubscriber

LambdaMonoSubscriber(@Nullable Consumer<? super T> consumer,
      @Nullable Consumer<? super Throwable> errorConsumer,
      @Nullable Runnable completeConsumer,
      @Nullable Consumer<? super Subscription> subscriptionConsumer,
      @Nullable Context initialContext) {
   this.consumer = consumer;
   this.errorConsumer = errorConsumer;
   this.completeConsumer = completeConsumer;
   this.subscriptionConsumer = subscriptionConsumer;
   this.initialContext = initialContext == null ? Context.empty() : initialContext;
}

​​​​​​​MonoSubscribeOnValue.subscribe

@Override

public void subscribe(CoreSubscriber<? super T> actual) {

T v = value;

if (v == null) {

ScheduledEmpty parent = new ScheduledEmpty(actual);

actual.onSubscribe(parent);

try {

parent.setFuture(scheduler.schedule(parent));

}

catch (RejectedExecutionException ree) {

if (parent.future != OperatorDisposables.DISPOSED) {

actual.onError(Operators.onRejectedExecution(ree,

actual.currentContext()));

}

}

}

else {

actual.onSubscribe(new ScheduledScalar<>(actual, v, scheduler));

}

}

       在这里,new一个ScheduledScalar对象,传入消费者和scheduler对象,然后同样是调用消费者的onSubscribe()方法。

       如下所示:

​​​​​​​onSubscribe

public final void onSubscribe(Subscription s) {
   if (Operators.validate(subscription, s)) {
      this.subscription = s;
      if (subscriptionConsumer != null) {
         try {
            subscriptionConsumer.accept(s);
         }
         catch (Throwable t) {
            Exceptions.throwIfFatal(t);
            s.cancel();
            onError(t);
         }
      }
      else {
         s.request(Long.MAX_VALUE);
      }
   }
}

       在这里,与《spring响应式编程系列:总体流程》类似,调用订阅消费体的request()方法,如下所示:

​​​​​​​request

@Override
public void request(long n) {
   if (Operators.validate(n)) {
      if (ONCE.compareAndSet(this, 0, 1)) {
         try {
            Disposable f = scheduler.schedule(this);
            if (!FUTURE.compareAndSet(this,
                  null,
                  f) && future != FINISHED && future != OperatorDisposables.DISPOSED) {
               f.dispose();
            }
         }
         catch (RejectedExecutionException ree) {
            if (future != FINISHED && future != OperatorDisposables.DISPOSED) {
               actual.onError(Operators.onRejectedExecution(ree,
                     this,
                     null,
                     value, actual.currentContext()));
            }
         }
      }
   }
}

       在这里,调用scheduler.schedule(this)方法,并将订阅的消息体作为参数传入。如下所示:

​​​​​​​schedule

public Disposable schedule(Runnable task) {
 return Schedulers.directSchedule(pick(), task, null, 0L, TimeUnit.MILLISECONDS);
}

       在这里,首先,调用pick()方法获取当前调度器的线程池对象,与订阅的消息体一起作为参数传入directSchedule()方法,如下所示:

​​​​​​​directSchedule

static Disposable directSchedule(ScheduledExecutorService exec,
      Runnable task,
      @Nullable Disposable parent,
      long delay,
      TimeUnit unit) {
   task = onSchedule(task);
   SchedulerTask sr = new SchedulerTask(task, parent);
   Future<?> f;
   if (delay <= 0L) {
      f = exec.submit((Callable<?>) sr);
   }
   else {
      f = exec.schedule((Callable<?>) sr, delay, unit);
   }
   sr.setFuture(f);
   return sr;
}

       在这里,将订阅的消息体任务提交到线程池去执行。接下来,我们看看为什么订阅的消息体是一个线程池可以执行的任务呢?如下所示:

​​​​​​​run

static final class ScheduledScalar<T>

implements QueueSubscription<T>, InnerProducer<T>, Runnable {

        ... ...

@Override

public void run() {

try {

if (fusionState == NO_VALUE) {

fusionState = HAS_VALUE;

}

actual.onNext(value);

actual.onComplete();

}

finally {

FUTURE.lazySet(this, FINISHED);

}

}

       在这里,ScheduledScalar实现了Runnable 接口,并且实现了run()方法,所以,订阅的消息体就是一个线程池可以执行的任务了。该线程池任务的执行逻辑如下所示:

​​​​​​​onNext

public final void onNext(T x) {
   Subscription s = S.getAndSet(this, Operators.cancelledSubscription());
   if (s == Operators.cancelledSubscription()) {
      Operators.onNextDropped(x, this.initialContext);
      return;
   }
   if (consumer != null) {
      try {
         consumer.accept(x);
      }
      catch (Throwable t) {
         Exceptions.throwIfFatal(t);
         s.cancel();
         doError(t);
      }
   }
   if (completeConsumer != null) {
      try {
         completeConsumer.run();
      }
      catch (Throwable t) {
         Operators.onErrorDropped(t, this.initialContext);
      }
   }
}

       在这里,调用数据消费者的onNext()方法执行相关的消费逻辑。

       至此,大致流程就结束了。

时序图

  1. 类关系的设计,与《spring响应式编程系列:总体流程》类似,主要包括数据发布者对象、数据订阅者对象及订阅的消息体对象;
  2. Mono和MonoSubscribeOnValue是数据发布者,LambdaMonoSubscriber是数据订阅者,ScheduledScalar是订阅的消息体;
  3. 不同点在于,多了Schedulers(暂且叫着调度器工厂)和ParallelScheduler调度器;以及ScheduledScalar在执行request方法时,需要将任务交由调度器来处理。
  4. Schedulers类里有一个Factory接口,该接口可以默认创建各种Scheduler调度器对象,如(ElasticScheduler、BoundedElasticScheduler、ParallelScheduler、SingleScheduler),这就是典型的工厂方法设计模式。

类图

数据发布者

MonoSubscribeOnValue

MonoSubscribeOnValue与《spring响应式编程系列:总体流程》介绍的类似,都是继承于Mono类,并且实现了CorePublisher和Publisher接口。

不同点在于,该数据发布者多了一个属性,如下所示:

final Scheduler scheduler;

该属性带有线程池ScheduledExecutorService信息,可以为数据订阅者提供异步执行的功能。

调度器

ParallelScheduler

  1. Scheduler

    提供了接口:Disposable schedule(Runnable task);

  1. ParallelScheduler

    该类封装了线程池信息;实现了接口schedule(Runnable task),用于提供对所封装的线程池的调度。

数据订阅者

LambdaMonoSubscriber

LambdaMonoSubscriber与《spring响应式编程系列:总体流程》介绍的一样。

订阅的消息体

ScheduledScalar

       ScheduledScalar与《spring响应式编程系列:总体流程》介绍的类似,都实现了Subscription接口。

       不同点在于,ScheduledScalar实现了Runnable接口,从而可以提供给线程池执行。

http://www.xdnf.cn/news/380017.html

相关文章:

  • springboot3+vue3融合项目实战-大事件文章管理系统-更新用户信息
  • MGP-STR:用于场景文本识别的多粒度预测
  • 【Vulkan 入门系列】创建和配置描述符集,创建同步对象(九)
  • 跟我学C++中级篇——STL中的删除对比
  • C++ learning day 02
  • 常见的算法介绍
  • 人脸真假检测:SVM 与 ResNet18 的实战对比
  • Java单例模式总结
  • 【Linux 系统调试】系统内存越界调试利器Electric Fence详解
  • waterfall与Bidding的请求机制
  • Day20打卡-奇异值SVD分解
  • Python序列化的学习笔记
  • 基于PE环境搭建及调试S32K312
  • Lua—元表(Metatable)
  • 怎样使自己处于高能量状态
  • Discriminative and domain invariant subspace alignment for visual tasks
  • JVM——即时编译器的中间表达形式
  • MYSQL 索引与数据结构笔记
  • 【大数据技术-HBase-关于Hmaster、RegionServer、Region等组件功能和读写流程总结】
  • 【Linux】线程POSIX信号量
  • JDBC工具类
  • c#建筑行业财务流水账系统软件可上传记账凭证财务管理系统签核功能
  • 代码随想录算法训练营第三十七天
  • win10-启动django项目时报错
  • ndk.symlinkdir - 在 Android Studio 3.5 及更高版本中,创建指向 NDK 的符号链接
  • 关于数据库查询速度优化
  • vue3使用tailwindcss报错问题
  • C.循环函数基础
  • 远程调试---在电脑上devtools调试运行在手机上的应用
  • PyTorch API 3 - mps、xpu、backends、导出