当前位置: 首页 > java >正文

【Java】RxJava解析

一,概述

RxJava 是 ReactiveX(响应式扩展)在 Java 语言上的实现,它是一种基于异步数据流的编程模式。响应式编程的核心理念是以数据为中心,强调数据的流动与变化。通过 RxJava,开发者可以基于观察者模式实现异步编程和事件驱动操作,从而快速响应外部事件。

响应式编程可以将数据流视为一系列按时间排序的事件流,这些流可以被观测、过滤、操作或组合,RxJava 提供了一组丰富的工具和操作符来支持这种编程范式。

此框架采用了观察者模式、责任链模式、装饰器模式、策略模式,实现数据流操作。本文,笔者以几个核心例子说明此框架用途及其实现原理。

以下是笔者浅析的类图,仅供参考

二,实例

1,Observable.create

1,通过create方法传入一个ObservableOnSubscribe接口的实现,返回一个Observable<T>实例。

重写ObservableOnSubscribe#subscribe方法,此方法即触发订阅的源头。

2,当返回的Observable开始subscribe时,通过封装Observer的ObservableEmiter对象,回调onNext或onError或onComplete方法,实现数据流订阅

3,callback此流,

2,Observable.just

从1可知,create方法即数据源创建的核心,此框架封装的just,fromIterable等方法均是对create的封装,

3,filter、map等操作符

类同Stream,filter等操作费同样使用责任链模式实现,详见三。

4,interval

interval返回一个定时Observable,subscribe在一个守护线程中执行,

1,设置1s一次定时触发

2,解除主线程的挂起

3,挂起主线程

输出如下

5,线程切换

通过subScribeOn和observeOn设定Scheduler,可以指定subscribe和observer所执行线程,

三,原理

几乎Rxjava所有操作,均是实现Observable,在子类中实现特定操作,属于策略模式的典型,以下策略参考

1,Observab.create

采用ObserableCreate策略

传入ObservableOnSubscribe接口实现类,被封装至ObservableCreate中,此为装饰器模式的典型实现,重点看下ObservableCreate

1,继承Observable,而Observable即本框架的第一门面API,其内部封装了很多工厂方法,

Observable实现ObservableSource接口,重写subscribe方法,

创建模版方法subscribeActual并在subscribe调用,因此ObservableCreate开始订阅。

回到ObservarCreate#subscribeActual

以上可知,CreateEmiter是对Observer的封装,当此被观察者已经disposed时,则不会触发onNext。

2,Observable.just

采用ObserableFromArray策略

just是对array的封装,跟进,

这里返回ObservableFromArray,是对Observable的子类,

这里就将传入的array遍历迭代,不赘述

3,filter、map等操作符

采用ObservableFilter等操作符策略

以filter和map为例,filter是Observable方法,会将自己this传入到下一个Observable,如下,

1,基础AbstractObserableWithUpstream,表示责任链

2,上一个Observable作为source传入,

3,保存过滤操作predicate

跟进subscribeActual

通过filter操作,在onNext会吊钟判断是否触发下一个Observable的onNext,downstream即下一个Observer,

同理,看下map

从以上可知,Rxjava是通过将每个item依次发送给各个Observable,最后在subscribe接收到结果,触发onNext回调,而非stream那样先在每个操作费完成操作,再将数据流向下一个操作。

4,interval

Schedulers默认传入computation

策略实现是ObserableInterval,

封装了一个IntervalObserver,作为is传入schedulePeriodicallyDirect方法,IntervalObserver实现了Runnable接口,跟进run方法

只要没有disposed,触发下游onNext,count自增一次,默认从0开始自增,

因此,定时调度逻辑落在了Scheduler.schedulePeriodicallyDirect方法,此处就不继续跟进了,感选取的读者自行了解。

5,线程切换

重点看下subscribeOn和observeOn方法

将上一个Stream的Observable作为source参数传入,其重写了subscribeActual方法,稍许不一样,

保存传入的scheduler,并且通过schedule开启一个线程,SubscribeTask中通过source在指定线程中再次触发subscribe,这个source就是ObservableSubscribeOn上游的一个Observable。

因此,以上即完成subscribe切换线程逻辑,那么对于下游的Observable呢?很简单,直接调用即可,

以上,上游的observer在指定线程中执行到subscribeOn时,直接通过onNext调用到下游downstream.onNext即可,其它callback类似。

接下来看下observeOn,即观察者回调线程设定。

ObserverableOnbserveOn封装了一层observer,并且使用上游Observable触发subscribe,封装的Observer保存了一个Scheduler.Work,即指定的线程环境,

看下onNext回调实现,

将上游传递的值t保存进队列,供切线程后从队列中获取值,

随后调用schedule方法切线程

ObserveOnObserver实现了Runnable接口,看下实现,

跟进drainNormal

1,通过队列获取到上游保存的Value

2,拿到下游downstream,

3,从队列中取值,并触发下游Observer#onNext等回调,

于是乎,读者思考下以下问题,当map在observerOn方法后执行,那么map在哪个线程环境呢?

从上述可知,已经切换至observer的线程环境了,

因此,onberveOn一般放在subscribe前面调用。

http://www.xdnf.cn/news/12259.html

相关文章:

  • 麒麟信安系统下修改系统默认记录日志大小
  • 上传、下载功能 巧实现
  • 如何修改项目在浏览器中的小图标
  • 【MATLAB去噪算法】基于CEEMDAN联合小波阈值去噪算法(第四期)
  • 轨道交通可视化,赋能智慧车站运维
  • C++034(一维数组)
  • 基于WSL搭建Ubnutu 20.04.6 LTS(二)-部署Docker环境
  • LoRA:大模型高效微调的低秩之道——原理解析与技术实现
  • 检测到 #include 错误。请更新 includePath。已为此翻译单元(D:\软件\vscode\test.c)禁用波形曲线
  • 力扣面试150题--被围绕的区域
  • std__map,std__unordered_map,protobuf__map之间的性能比较
  • 网页显示:嗯…无法访问此页面,的解决办法和原理
  • 大模型学习
  • 家政维修平台实战14登录验证
  • 如何用 SD-WAN 打破 ERP 内网限制,实现随时随地高效访问?
  • 总结HTML中的文本标签
  • 危化工厂部署人员定位系统的重要性
  • 算法性能分析
  • 智慧赋能:移动充电桩的能源供给革命与便捷服务升级
  • TripGenie:畅游济南旅行规划助手:个人工作纪实(九)
  • Linux 下生成动态库时 -fPIC的作用详解
  • 一些常用的JavaScript简写技巧
  • 如何利用Facebook优化TikTok的跨境商品推广效果
  • STM32 NVIC中断控制器
  • 【Algorithm】Union-Find简单介绍
  • 【Docker管理工具】部署Docker可视化管理面板Dpanel
  • [Java 基础]数组
  • 8086的简化版8088
  • PublishSubject、ReplaySubject、BehaviorSubject、AsyncSubject的区别
  • B+树知识点总结