当前位置: 首页 > news >正文

spring响应式编程系列:总体流程

目录

示例

程序流程

just

subscribe

new LambdaMonoSubscriber

​​​​​​​MonoJust.subscribe

​​​​​​​new Operators.ScalarSubscription  

​​​​​​​onSubscribe

​​​​​​​request

​​​​​​​onNext

时序图

类图

数据发布者

MonoJust

数据订阅者

LambdaSubscriber

订阅的消息体

ScalarSubscription


       

        想要了解响应式编程的总体流程,只要做到真正吃透一个简单的示例即可。

        如下所示:

示例

        首先,通过调用Mono.just创建一个单元素的数据发布者(Publisher);

        然后,通过调用mono.subscribe订阅数据发布者(Publisher)发布的数据。

        如下所示:

// 创建一个包含数据的 Mono
Mono<String> mono = Mono.just("Hello, Reactive World!");
// 订阅并消费 Mono
mono.subscribe(System.out::println);

程序流程

        点击Mono.just,如下所示:

​​​​​​​just

public static <T> Mono<T> just(T data) {

        return onAssembly(new MonoJust(data));

    }

        在这里,直接new一个MonoJust对象并返回。

        点击示例里的mono.subscribe,如下所示:

subscribe

public abstract class Mono<T> implements CorePublisher<T> {

    ... ...

    public final Disposable subscribe(@Nullable Consumer<? super T> consumer, @Nullable Consumer<? super Throwable> errorConsumer, @Nullable Runnable completeConsumer, @Nullable Context initialContext) {

        return (Disposable)this.subscribeWith(new LambdaMonoSubscriber(consumer, errorConsumer, completeConsumer, (Consumer)null, initialContext));

    }

      在这里,将示例里subscribe的参数作为LambdaMonoSubscriber的构造参数,然后new一个LambdaMonoSubscriber对象。

        LambdaMonoSubscriber对象的初始化参数,如下所示:

​​​​​​​new LambdaMonoSubscriber

final class LambdaMonoSubscriber<T> implements InnerConsumer<T>, Disposable {

    final Consumer<? super T> consumer;

    final Consumer<? super Throwable> errorConsumer;

    final Runnable completeConsumer;

    final Consumer<? super Subscription> subscriptionConsumer;

    final Context initialContext;

    volatile Subscription subscription;

    ... ...

    LambdaMonoSubscriber(@Nullable Consumer<? super T> consumer, @Nullable Consumer<? super Throwable> errorConsumer, @Nullable Runnable completeConsumer, @Nullable Consumer<? super Subscription> subscriptionConsumer, @Nullable Context initialContext) {

        this.consumer = consumer;

        this.errorConsumer = errorConsumer;

        this.completeConsumer = completeConsumer;

        this.subscriptionConsumer = subscriptionConsumer;

        this.initialContext = initialContext == null ? Context.empty() : initialContext;

    }

​​​​​​​MonoJust.subscribe

final class MonoJust<T> extends Mono<T> implements ScalarCallable<T>, Fuseable, SourceProducer<T> {

    ... ...

public void subscribe(CoreSubscriber<? super T> actual) {

        actual.onSubscribe(Operators.scalarSubscription(actual, this.value));

    }

       在这里,来到了MonoJust对象的subscribe方法,该方法调用了LambdaMonoSubscriber对象的onSubscribe方法;

        同时,new一个Operators.ScalarSubscription对象,该对象封装了LambdaMonoSubscriber对象和数据发布者MonoJust发布的数据。

        如下所示:

​​​​​​​new Operators.ScalarSubscription  

public static <T> Subscription scalarSubscription(CoreSubscriber<? super T> subscriber, T value, String stepName) {

        return new Operators.ScalarSubscription(subscriber, value, stepName);

    }

        点击actual.onSubscribe,如下所示:

​​​​​​​onSubscribe

final class LambdaMonoSubscriber<T> implements InnerConsumer<T>, Disposable {

    ... ...

    public final void onSubscribe(Subscription s) {

        if (Operators.validate(this.subscription, s)) {

            this.subscription = s;

            if (this.subscriptionConsumer != null) {

                try {

                    this.subscriptionConsumer.accept(s);

                } catch (Throwable var3) {

                    Exceptions.throwIfFatal(var3);

                    s.cancel();

                    this.onError(var3);

                }

            } else {

                s.request(9223372036854775807L);

            }

        }

    }

      在这里,LambdaMonoSubscriber对象调用了Operators.ScalarSubscription对象的request方法。

        如下所示:

​​​​​​​request

static final class ScalarSubscription<T> implements SynchronousSubscription<T>, InnerProducer<T> {

public void request(long n) {

            if (Operators.validate(n) && ONCE.compareAndSet(this, 0, 1)) {

                Subscriber<? super T> a = this.actual;

                a.onNext(this.value);

                if (this.once != 2) {

                    a.onComplete();

                }

            }

        }

        在这里,Operators.ScalarSubscription对象又调用了LambdaMonoSubscriber对象的onNext方法。

        LambdaMonoSubscriber对象的onNext方法如下所示:

​​​​​​​onNext

final class LambdaMonoSubscriber<T> implements InnerConsumer<T>, Disposable {

public final void onNext(T x) {

        Subscription s = (Subscription)S.getAndSet(this, Operators.cancelledSubscription());

        if (s == Operators.cancelledSubscription()) {

            Operators.onNextDropped(x, this.initialContext);

        } else {

            if (this.consumer != null) {

                try {

                    this.consumer.accept(x);

                } catch (Throwable var5) {

                    Exceptions.throwIfFatal(var5);

                    s.cancel();

                    this.doError(var5);

                }

            }

            if (this.completeConsumer != null) {

                try {

                    this.completeConsumer.run();

                } catch (Throwable var4) {

                    Operators.onErrorDropped(var4, this.initialContext);

                }

            }

        }

}

        终于,在这里,调用了示例里subscribe()方法的回调函数了。

时序图

【说明】

  1. Mono和MonoJust是数据发布者,LambdaMonoSubscriber是数据消费者,ScalarSubscription是订阅的消息;
  2. 类的设计还是比较清晰的,就是方法的调用显示有点绕。
  3. 数据发布者,提供了just方法来生成数据发布者(Publisher);
  4. 数据订阅者,提供了onSubscribe和onNext方法来响应订阅事件和读取数据;
  5. 订阅的消息体,封装了数据订阅者和数据发布发布的数据,并且提供了request方法用来处理数据。
  6. 使用了观察者设计模式:LambdaMonoSubscriber是观察者模式中的观察者(Observer),它订阅(subscribe)一个发布者(MonoJust),MonoJust是观察者模式中的主题(Subject),它负责通知所有的 Subscriber。

类图

数据发布者

MonoJust

【说明】

  • Publisher

    定义了接口:void subscribe(Subscriber<? super T> var1)。

  • CorePublisher

    定义了接口:void subscribe(CoreSubscriber<? super T> subscriber)。

  • Mono

    是一个抽象类,实现了数据发布者通用的各种功能。

比如:使用了工厂方法设计模式来创建诸如MonoJust、MonoCreate、MonoDefer、MonoError等各种具体的数据发布者。

  • MonoJust

    是一个特定的数据发布者(Publisher),实现了接口void subscribe(CoreSubscriber<? super T> actual)。

数据订阅者

LambdaSubscriber

【说明】

  • Subscriber

    定义了如下接口:onSubscribe、onNext、onError、onComplete。

  • CoreSubscriber

    定义了如下接口:onSubscribe

  • LambdaMonoSubscriber

    关联了consumer、errorConsumer、completeConsumer、subscriptionConsumer这些对象,以完成订阅相关的各种操作。

订阅的消息体

ScalarSubscription

【说明】

  • Subscription

    提供了如下接口:void request(long var1)、void cancel();

  • ScalarSubscription

    封装了数据订阅者和数据发布者发布的数据。

http://www.xdnf.cn/news/6499.html

相关文章:

  • Ubuntu18.04安装Qt5.12
  • 在PyCharm中部署AI模型的完整指南
  • 鸿蒙-跨设备互通,设备互通提供跨设备的相机、扫描、图库访问能力,平板或2in1设备可以调用手机的相机、扫描、图库等功能。
  • 【VSCode】在 VSCode 中运行 HTML 页面并通过 HTTPS 访问
  • 在pycharm中搭建yolo11分类检测系统--PyQt5学习(二)
  • 发现“横”字手写有难度,对比两个“横”字
  • CSS3笔记
  • 小知识合集 慢慢更新
  • vue,uniapp解决h5跨域问题
  • uniapp打包IOS私钥证书过期了,如何在非mac系统操作
  • PDK中technology file从tf格式转换为lef格式
  • 【AI插件开发】Notepad++ AI插件开发实践:支持配置界面
  • 双轮驱动能源革命:能源互联网与分布式能源赋能工厂能效跃迁
  • 在Ubuntu系统中安装和升级RabbitVCS
  • 【教程】无视硬件限制强制升级Windows 11
  • 《数据结构之美--链表oj练习》
  • 2026《数据结构》考研复习笔记三(C++高级教程)
  • 「数据可视化 D3系列」入门第十章:饼图绘制详解与实现
  • 《实战AI智能体》——邮件转工单的AI自动化
  • 「数据可视化 D3系列」入门第十一章:力导向图深度解析与实现
  • 设计模式 --- 装饰器模式
  • 通过 Zotero 的样式编辑器(Style Editor)自定义文献引用和参考文献列表的格式
  • 在阿里云虚拟主机上启用WordPress伪静态
  • Redis 的指令执行方式:Pipeline、事务与 Lua 脚本的对比
  • HTTP:九.WEB机器人
  • 探索 HumanoidBench:类人机器人学习的新平台
  • 甘果桌面tv版下载-甘果桌面安卓电视版使用教程
  • OpenAI 34页最佳构建Agent实践
  • Python(23)Python异常处理完全指南:从防御到调试的工程实践
  • 使用 Vue 开发登录页面的完整指南