目录
示例
大致流程
parallel
cache
PARALLEL_SUPPLIER
newParallel
init
publishOn
new MonoSubscribeOnValue
subscribe
new LambdaMonoSubscriber
MonoSubscribeOnValue.subscribe
onSubscribe
request
schedule
directSchedule
run
onNext
时序图
类图
数据发布者
MonoSubscribeOnValue
调度器
ParallelScheduler
数据订阅者
LambdaMonoSubscriber
订阅的消息体
ScheduledScalar
本篇文章我们来研究如何控制数据流在特定线程池上的执行。即将操作符的执行切换到指定调度器(Scheduler)的线程。在Project Reactor框架中,Mono.publishOn()就可以实现该功能,示例如下所示:
示例
CountDownLatch countDownLatch = new CountDownLatch(1); // 创建一个包含数据的 Mono Mono<String> mono = Mono.just("Hello, Reactive World!"); mono.publishOn(Schedulers.parallel()) .subscribe(x -> { log.info("Sub thread, subscribe: {}", x); countDownLatch.countDown(); }); log.info("Main thread, blocking"); countDownLatch.await(); |
首先,通过mono.publishOn方法指定线程池;
其次,通过subscribe指定的消费者处理逻辑会在前一步指定的线程池里执行。
大致流程
点击Schedulers.parallel(),如下所示:
parallel
public static Scheduler parallel() { return cache(CACHED_PARALLEL, PARALLEL, PARALLEL_SUPPLIER); } |
在这里,调用cache方法并且PARALLEL_SUPPLIER参数,返回Scheduler调度器。cache方法如下所示:
cache
static CachedScheduler cache(AtomicReference<CachedScheduler> reference, String key, Supplier<Scheduler> supplier) { CachedScheduler s = reference.get(); if (s != null) { return s; } s = new CachedScheduler(key, supplier.get()); if (reference.compareAndSet(null, s)) { return s; } ... ... } |
在这里,调用supplier.get()方法来生成CachedScheduler调度器。supplier.get()方法的定义如下所示:
PARALLEL_SUPPLIER
static final Supplier<Scheduler> PARALLEL_SUPPLIER = () -> newParallel(PARALLEL, DEFAULT_POOL_SIZE, true); |
newParallel
public static Scheduler newParallel(int parallelism, ThreadFactory threadFactory) { final Scheduler fromFactory = factory.newParallel(parallelism, threadFactory); fromFactory.init(); return fromFactory; } |
在这里,通过工厂方法模式创建Scheduler对象,并调用init()初始化方法,如下所示:
init
@Override public void init() { ... ... SchedulerState<ScheduledExecutorService[]> b = SchedulerState.init(new ScheduledExecutorService[n]); for (int i = 0; i < n; i++) { b.currentResource[i] = Schedulers.decorateExecutorService(this, this.get()); } if (!STATE.compareAndSet(this, null, b)) { for (ScheduledExecutorService exec : b.currentResource) { exec.shutdownNow(); } if (isDisposed()) { throw new IllegalStateException( "Initializing a disposed scheduler is not permitted" ); } }} |
在这里,new了一个ScheduledExecutorService线程池对象作为调度器的底导线程池实现。
点击示例里的mono.publishOn()方法,如下所示:
publishOn
public final Mono<T> publishOn(Scheduler scheduler) { if(this instanceof Callable) { if (this instanceof Fuseable.ScalarCallable) { try { T value = block(); return onAssembly(new MonoSubscribeOnValue<>(value, scheduler)); } catch (Throwable t) { //leave MonoSubscribeOnCallable defer error } } @SuppressWarnings("unchecked") Callable<T> c = (Callable<T>)this; return onAssembly(new MonoSubscribeOnCallable<>(c, scheduler)); } return onAssembly(new MonoPublishOn<>(this, scheduler)); } |
在这里,new 了一个MonoSubscribeOnValue对象,并且传递了scheduler对象作为构造参数。
如下所示:
new MonoSubscribeOnValue
final class MonoSubscribeOnValue<T> extends Mono<T> implements Scannable { final T value; final Scheduler scheduler; MonoSubscribeOnValue(@Nullable T value, Scheduler scheduler) { this.value = value; this.scheduler = Objects.requireNonNull(scheduler, "scheduler"); } |
在这里,将入参scheduler作为MonoSubscribeOnValue对象的属性scheduler保留下来。
点击示例里的mono.subscribe()方法,如下所示:
subscribe
public final Disposable subscribe( @Nullable Consumer<? super T> consumer, @Nullable Consumer<? super Throwable> errorConsumer, @Nullable Runnable completeConsumer, @Nullable Context initialContext) { return subscribeWith(new LambdaMonoSubscriber<>(consumer, errorConsumer, completeConsumer, null, initialContext)); } |
在这里,new了一个LambdaMonoSubscriber对象,这里与《spring响应式编程系列:总体流程》类似。LambdaMonoSubscriber对象的构造函数如下所示:
new LambdaMonoSubscriber
LambdaMonoSubscriber(@Nullable Consumer<? super T> consumer, @Nullable Consumer<? super Throwable> errorConsumer, @Nullable Runnable completeConsumer, @Nullable Consumer<? super Subscription> subscriptionConsumer, @Nullable Context initialContext) { this.consumer = consumer; this.errorConsumer = errorConsumer; this.completeConsumer = completeConsumer; this.subscriptionConsumer = subscriptionConsumer; this.initialContext = initialContext == null ? Context.empty() : initialContext; } |
MonoSubscribeOnValue.subscribe
@Override public void subscribe(CoreSubscriber<? super T> actual) { T v = value; if (v == null) { ScheduledEmpty parent = new ScheduledEmpty(actual); actual.onSubscribe(parent); try { parent.setFuture(scheduler.schedule(parent)); } catch (RejectedExecutionException ree) { if (parent.future != OperatorDisposables.DISPOSED) { actual.onError(Operators.onRejectedExecution(ree, actual.currentContext())); } } } else { actual.onSubscribe(new ScheduledScalar<>(actual, v, scheduler)); } } |
在这里,new一个ScheduledScalar对象,传入消费者和scheduler对象,然后同样是调用消费者的onSubscribe()方法。
如下所示:
onSubscribe
public final void onSubscribe(Subscription s) { if (Operators.validate(subscription, s)) { this.subscription = s; if (subscriptionConsumer != null) { try { subscriptionConsumer.accept(s); } catch (Throwable t) { Exceptions.throwIfFatal(t); s.cancel(); onError(t); } } else { s.request(Long.MAX_VALUE); } } } |
在这里,与《spring响应式编程系列:总体流程》类似,调用订阅消费体的request()方法,如下所示:
request
@Override public void request(long n) { if (Operators.validate(n)) { if (ONCE.compareAndSet(this, 0, 1)) { try { Disposable f = scheduler.schedule(this); if (!FUTURE.compareAndSet(this, null, f) && future != FINISHED && future != OperatorDisposables.DISPOSED) { f.dispose(); } } catch (RejectedExecutionException ree) { if (future != FINISHED && future != OperatorDisposables.DISPOSED) { actual.onError(Operators.onRejectedExecution(ree, this, null, value, actual.currentContext())); } } } } } |
在这里,调用scheduler.schedule(this)方法,并将订阅的消息体作为参数传入。如下所示:
schedule
public Disposable schedule(Runnable task) { return Schedulers.directSchedule(pick(), task, null, 0L, TimeUnit.MILLISECONDS); } |
在这里,首先,调用pick()方法获取当前调度器的线程池对象,与订阅的消息体一起作为参数传入directSchedule()方法,如下所示:
directSchedule
static Disposable directSchedule(ScheduledExecutorService exec, Runnable task, @Nullable Disposable parent, long delay, TimeUnit unit) { task = onSchedule(task); SchedulerTask sr = new SchedulerTask(task, parent); Future<?> f; if (delay <= 0L) { f = exec.submit((Callable<?>) sr); } else { f = exec.schedule((Callable<?>) sr, delay, unit); } sr.setFuture(f); return sr; } |
在这里,将订阅的消息体任务提交到线程池去执行。接下来,我们看看为什么订阅的消息体是一个线程池可以执行的任务呢?如下所示:
run
static final class ScheduledScalar<T> implements QueueSubscription<T>, InnerProducer<T>, Runnable { ... ... @Override public void run() { try { if (fusionState == NO_VALUE) { fusionState = HAS_VALUE; } actual.onNext(value); actual.onComplete(); } finally { FUTURE.lazySet(this, FINISHED); } } |
在这里,ScheduledScalar实现了Runnable 接口,并且实现了run()方法,所以,订阅的消息体就是一个线程池可以执行的任务了。该线程池任务的执行逻辑如下所示:
onNext
public final void onNext(T x) { Subscription s = S.getAndSet(this, Operators.cancelledSubscription()); if (s == Operators.cancelledSubscription()) { Operators.onNextDropped(x, this.initialContext); return; } if (consumer != null) { try { consumer.accept(x); } catch (Throwable t) { Exceptions.throwIfFatal(t); s.cancel(); doError(t); } } if (completeConsumer != null) { try { completeConsumer.run(); } catch (Throwable t) { Operators.onErrorDropped(t, this.initialContext); } } } |
在这里,调用数据消费者的onNext()方法执行相关的消费逻辑。
至此,大致流程就结束了。
时序图

- 类关系的设计,与《spring响应式编程系列:总体流程》类似,主要包括数据发布者对象、数据订阅者对象及订阅的消息体对象;
- Mono和MonoSubscribeOnValue是数据发布者,LambdaMonoSubscriber是数据订阅者,ScheduledScalar是订阅的消息体;
- 不同点在于,多了Schedulers(暂且叫着调度器工厂)和ParallelScheduler调度器;以及ScheduledScalar在执行request方法时,需要将任务交由调度器来处理。
- Schedulers类里有一个Factory接口,该接口可以默认创建各种Scheduler调度器对象,如(ElasticScheduler、BoundedElasticScheduler、ParallelScheduler、SingleScheduler),这就是典型的工厂方法设计模式。
类图
数据发布者
MonoSubscribeOnValue

MonoSubscribeOnValue与《spring响应式编程系列:总体流程》介绍的类似,都是继承于Mono类,并且实现了CorePublisher和Publisher接口。
不同点在于,该数据发布者多了一个属性,如下所示:
final Scheduler scheduler;
该属性带有线程池ScheduledExecutorService信息,可以为数据订阅者提供异步执行的功能。
调度器
ParallelScheduler

- Scheduler
提供了接口:Disposable schedule(Runnable task);
- ParallelScheduler
该类封装了线程池信息;实现了接口schedule(Runnable task),用于提供对所封装的线程池的调度。
数据订阅者
LambdaMonoSubscriber
LambdaMonoSubscriber与《spring响应式编程系列:总体流程》介绍的一样。
订阅的消息体
ScheduledScalar

ScheduledScalar与《spring响应式编程系列:总体流程》介绍的类似,都实现了Subscription接口。
不同点在于,ScheduledScalar实现了Runnable接口,从而可以提供给线程池执行。