响应式编程库Reactor
- 一、官方文档
- 二、什么是响应式编程
- 2.1. 阻塞是对资源的浪费
- 2.2. 异步可以解决问题吗?
- 2.3. 从命令式编程到响应式编程
- 2.3.1. 可编排性与可读性
- 2.3.2. 就像装配流水线
- 2.3.3. 操作符(Operators)
- 2.3.4. subscribe() 之前什么都不会发生
- 2.3.5. 背压
- 2.3.6. 热(Hot) vs 冷(Cold)
- 三、 Reactor 核心特性
- 反应式流的背景
- Reactor基础
- 创建序列
- **序列同步创建 generate()**
- 复杂序列创建create()
- **简单例子**
- **一个并不完善的解决方案**
- **官网例子**
- 简化方案
- 中间操作
- 事件触发
- 流的连接 concat merge
- 流的压缩 zip
- 超时与重试 timeout retry
- 背压(Backpressure )和请求重塑(Reshape Requests)
- handle
- 自定义线程调度、线程上下文
- 异常处理
- sinks 单播|多播
- **基于 lambda 的对 Flux 的订阅(subscribe)**
一、官方文档
查看反应式流规范。https://www.reactive-streams.org/
官方参考文档地址:http://projectreactor.io/docs/core/release/reference/
中文翻译文档地址:http://htmlpreview.github.io/?https://github.com/get-set/reactor-core/blob/master-zh/src/docs/index.html
中文翻译源码地址:https://github.com/get-set/reactor-core/tree/master-zh/src/docs/asciidoc
可以基于源码自行编译文档:
git clone https://github.com/get-set/reactor-core.git -b master-zh
cd reactor-core
./gradlew asciidoctor`
本文档的一些典型的名词如下:
-
Publisher(发布者)、Subscriber(订阅者)、Subscription(订阅 n.)、subscribe(订阅 v.)。
-
event/signal(事件/信号,原文常甚至在一个句子将两个词来回用,但表示的意思是基本相同的, 因此如果你看到本文翻译有时候用事件,有时候用信号,在本文档内基本可以认为一个意思)。
-
sequence/stream(序列/流,两个词意思相似,本文介绍的是响应式流的内容,但是出现比较多的是 sequence这个词,主要翻译为“序列”,有些地方为了更加契合且方便理解翻译为“流序列”)。
-
element/item(主要指序列中的元素,文中两个词基本翻译为“元素”)。
-
emit/produce/generate(发出/产生/生成,文中这三个英文词也有相似之处,对于 emit 多翻译为 “发出”,对于后两个多翻译为“生成”)、consume(消费)。
-
Processor(未做翻译,保留英文)。
-
operator(译作操作符,声明式的可组装的响应式方法,其组装成的链译作“操作链”)
二、什么是响应式编程
响应式编程是一种关注于
数据流(data streams)和变化传递(propagation of change)
的异步编程方式。 这意味着它可以用既有的编程语言表达静态(如数组)或动态(如事件源)的数据流。
了解历史:
● 在响应式编程方面,微软跨出了第一步,它在 .NET 生态中创建了响应式扩展库(Reactive Extensions library, Rx)。接着 RxJava 在JVM上实现了响应式编程。后来,在 JVM 平台出现了一套标准的响应式 编程规范,它定义了一系列标准接口和交互规范。并整合到 Java 9
中(使用 Flow
类)。
● 响应式编程通常作为面向对象编程中的“观察者模式”(Observer design pattern)的一种扩展。 响应式流(reactive streams)与“迭代子模式”(Iterator design pattern
)也有相通之处, 因为其中也有 Iterable-Iterator
这样的对应关系。主要的区别在于,Iterator 是基于 “拉取”(pull)方式的,而响应式流是基于“推送”(push)方式的。
● 使用 iterator 是一种“命令式”(imperative)编程范式,即使访问元素的方法是 Iterable
的唯一职责。关键在于,什么时候执行 next() 获取元素取决于开发者。在响应式流中,相对应的 角色是 Publisher-Subscriber
,但是 当有新的值到来的时候 ,却反过来由发布者(Publisher) 通知订阅者(Subscriber),这种“推送”模式是响应式的关键。此外,对推送来的数据的操作 是通过一种声明式(declaratively)而不是命令式(imperatively)的方式表达的:开发者通过 描述“控制流程”来定义对数据流的处理逻辑。
● 除了数据推送,对错误处理(error handling)和完成(completion)信号的定义也很完善。 一个 Publisher 可以推送新的值到它的 Subscriber
(调用 onNext
方法), 同样也可以推送错误(调用 onError 方法)和完成(调用 onComplete 方法)信号。 错误和完成信号都可以终止响应式流。可以用下边的表达式描述:
onNext x 0..N [onError | onComplete]
这种方式非常灵活,无论是有/没有值,还是 n 个值(包括有无限个值的流,比如时钟的持续读秒),都可处理。
那么我们为什么需要这样的异步响应式开发库呢?
2.1. 阻塞是对资源的浪费
现代应用需要应对大量的并发用户,而且即使现代硬件的处理能力飞速发展,软件性能仍然是关键因素。
广义来说我们有两种思路来提升程序性能:
-
并行化(parallelize) :使用更多的线程和硬件资源。
-
基于现有的资源来 提高执行效率 。
通常,Java开发者使用阻塞式(blocking)编写代码。这没有问题,在出现性能瓶颈后, 我们可以增加处理线程,线程中同样是阻塞的代码。但是这种使用资源的方式会迅速面临 资源竞争和并发问题。
更糟糕的是,阻塞会浪费资源。具体来说,比如当一个程序面临延迟(通常是I/O方面, 比如数据库读写请求或网络调用),所在线程需要进入 idle 状态等待数据,从而浪费资源。
所以,并行化方式并非银弹。这是挖掘硬件潜力的方式,但是却带来了复杂性,而且容易造成浪费。
2.2. 异步可以解决问题吗?
第二种思路——提高执行效率——可以解决资源浪费问题。通过编写 异步非阻塞 的代码, (任务发起异步调用后)执行过程会切换到另一个 使用同样底层资源
的活跃任务,然后等 异步调用返回结果再去处理。
但是在 JVM 上如何编写异步代码呢?Java 提供了两种异步编程方式:
-
回调(Callbacks) :异步方法没有返回值,而是采用一个 callback 作为参数(lambda 或匿名类),当结果出来后回调这个 callback。常见的例子比如 Swings 的 EventListener。
-
Futures :异步方法 立即 返回一个 Future,该异步方法要返回结果的是 T 类型,通过 Future
封装。这个结果并不是 *立刻* 可以拿到,而是等实际处理结束才可用。比如,
ExecutorService 执行 Callable 任务时会返回 Future 对象。
这些技术够用吗?并非对于每个用例都是如此,两种方式都有局限性。
回调很难组合起来,因为很快就会导致代码难以理解和维护(即所谓的“回调地狱(callback hell)”)。
官方文档例子
考虑这样一种情景:在用户界面上显示用户的5个收藏,或者如果没有任何收藏提供5个建议。这需要3个 服务(一个提供收藏的ID列表,第二个服务获取收藏内容,第三个提供建议内容):
回调地狱(Callback Hell)的例子
userService.getFavorites(userId, new Callback<List<String>>() { // 1public void onSuccess(List<String> list) { // 2if (list.isEmpty()) { // 3suggestionService.getSuggestions(new Callback<List<Favorite>>() {public void onSuccess(List<Favorite> list) { // 4UiUtils.submitOnUiThread(() -> { // 5list.stream().limit(5).forEach(uiList::show); // 6});}public void onError(Throwable error) { // 7UiUtils.errorPopup(error);}});} else {list.stream() // 8.limit(5).forEach(favId -> favoriteService.getDetails(favId, // 9new Callback<Favorite>() {public void onSuccess(Favorite details) {UiUtils.submitOnUiThread(() -> uiList.show(details));}public void onError(Throwable error) {UiUtils.errorPopup(error);}}));}}public void onError(Throwable error) {UiUtils.errorPopup(error);}
});
- 基于回调的服务使用一个匿名 Callback 作为参数。后者的两个方法分别在异步执行成功 或异常时被调用。
- 获取到收藏ID的list后调用第一个服务的回调方法 onSuccess。
- 如果 list 为空, 调用 suggestionService。
- 服务 suggestionService 传递 List 给第二个回调。
- 既然是处理 UI,我们需要确保消费代码运行在 UI 线程。
- 使用 Java 8 Stream 来限制建议数量为5,然后在 UI 中显示。
- 在每一层,我们都以同样的方式处理错误:在一个 popup 中显示错误信息。
- 回到收藏 ID 这一层,如果返回 list,我们需要使用 favoriteService 来获取 Favorite 对象。由于只想要5个,因此使用 stream 。
- 再一次回调。这次对每个ID,获取 Favorite 对象在 UI 线程中推送到前端显示。
使用 Reactor 实现以上回调方式同样功能的例子
userService.getFavorites(userId) // 1.flatMap(favoriteService::getDetails) // 2.switchIfEmpty(suggestionService.getSuggestions()) // 3.take(5) // 4.publishOn(UiUtils.uiThreadScheduler()) // 5.subscribe(uiList::show, UiUtils::errorPopup); // 6
- 我们获取到收藏ID的流
- 我们 异步地转换 它们(ID) 为 Favorite 对象(使用 flatMap),现在我们有了
Favorite
流。 - 一旦 Favorite 为空,切换到 suggestionService。
- 我们只关注流中的最多5个元素。
- 最后,我们希望在 UI 线程中进行处理。
- 通过描述对数据的最终处理(在 UI 中显示)和对错误的处理(显示在 popup 中)来触发(subscribe)。
如果你想确保“收藏的ID”的数据在800ms内获得(如果超时,从缓存中获取)呢?在基于回调的代码中, 会比较复杂。但 Reactor 中就很简单,在处理链中增加一个 timeout 的操作符即可。
userService.getFavorites(userId).timeout(Duration.ofMillis(800)) // 1.onErrorResume(cacheService.cachedFavoritesFor(userId)) // 2.flatMap(favoriteService::getDetails) // 3.switchIfEmpty(suggestionService.getSuggestions()).take(5).publishOn(UiUtils.uiThreadScheduler()).subscribe(uiList::show, UiUtils::errorPopup);
- 如果流在超时时限没有发出(emit)任何值,则发出错误(error)。
- 一旦收到错误,交由 cacheService 处理。
- 处理链后边的内容与上例类似。
2.3. 从命令式编程到响应式编程
类似 Reactor 这样的响应式库的目标就是要弥补上述 “经典” 的 JVM 异步方式所带来的不足, 此外还会关注一下几个方面:
可编排性(Composability)
以及可读性(Readability)
- 使用丰富的
操作符
来处理形如流
的数据 - 在 订阅(
subscribe
) 之前什么都不会发生 - 背压(
backpressure
) 具体来说即 消费者能够反向告知生产者生产内容的速度的能力 高层次
(同时也是有高价值的)的抽象,从而达到 并发无关 的效果
2.3.1. 可编排性与可读性
可编排性,指的是编排多个异步任务的能力。比如我们将前一个任务的结果传递给后一个任务作为输入, 或者将多个任务以分解再汇总(fork-join)的形式执行,或者将异步的任务作为离散的组件在系统中 进行重用。
这种编排任务的能力与代码的可读性和可维护性是紧密相关的。随着异步处理任务数量和复杂度 的提高,编写和阅读代码都变得越来越困难。就像我们刚才看到的,回调模式是简单的,但是缺点 是在复杂的处理逻辑中,回调中会层层嵌入回调,导致 回调地狱(Callback Hell) 。你能猜到 (或有过这种痛苦经历),这样的代码是难以阅读和分析的。
Reactor 提供了丰富的编排操作,从而代码直观反映了处理流程,并且所有的操作保持在同一层次 (尽量避免了嵌套)。
2.3.2. 就像装配流水线
你可以想象数据在响应式应用中的处理,就像流过一条装配流水线。Reactor 既是传送带, 又是一个个的装配工或机器人。原材料从源头(最初的 Publisher)流出,最终被加工为成品, 等待被推送到消费者(或者说 Subscriber)。
原材料会经过不同的中间处理过程,或者作为半成品与其他半成品进行组装。如果某处有齿轮卡住, 或者某件产品的包装过程花费了太久时间,相应的工位就可以向上游发出信号来限制或停止发出原材料。
2.3.3. 操作符(Operators)
在 Reactor 中,操作符(operator)就像装配线中的工位(操作员或装配机器人)。每一个操作符 对Publisher
进行相应的处理,然后将 Publisher
包装为一个新的 Publisher。就像一个链条, 数据源自第一个 Publisher
,然后顺链条而下,在每个环节进行相应的处理。最终,一个订阅者 (Subscriber
)终结这个过程。请记住,在订阅者(Subscriber)订阅(subscribe)到一个 发布者(Publisher)之前,什么都不会发生。
理解了操作符会创建新的 Publisher 实例这一点,能够帮助你避免一个常见的问题, 这种问题会让你觉得处理链上的某个操作符没有起作用。
虽然响应式流规范(Reactive Streams specification)没有规定任何操作符, 类似 Reactor 这样的响应式库所带来的最大附加价值之一就是提供丰富的操作符。包括基础的转换操作, 到过滤操作,甚至复杂的编排和错误处理操作。
2.3.4. subscribe() 之前什么都不会发生
在 Reactor 中,当你创建了一条 Publisher 处理链,数据还不会开始生成。事实上,你是创建了 一种抽象的对于异步处理流程的描述(从而方便重用和组装)。
当真正 “订阅(subscrib)” 的时候,你需要将 Publisher 关联到一个 Subscriber 上,然后 才会触发整个链的流动。这时候,Subscriber 会向上游发送一个 request 信号,一直到达源头 的 Publisher。
2.3.5. 背压
向上游传递信号这一点也被用于实现 背压 ,就像在装配线上,某个工位的处理速度如果慢于流水线 速度,会对上游发送反馈信号一样。
在响应式流规范中实际定义的机制同刚才的类比非常接近:订阅者可以无限接受数据并让它的源头 “满负荷” 推送所有的数据,也可以通过使用 request
机制来告知源头它一次最多能够处理 n
个元素。
中间环节的操作也可以影响 request。想象一个能够将每 10 个元素分批打包的缓存(buffer
)操作。 如果订阅者请求一个元素,那么对于源头来说可以生成 10 个元素。此外预取策略也可以使用了, 比如在订阅前预先生成元素。
这样能够将 “推送” 模式转换为 “推送 + 拉取” 混合的模式,如果下游准备好了,可以从上游拉取 n 个元素;但是如果上游元素还没有准备好,下游还是要等待上游的推送。
2.3.6. 热(Hot) vs 冷(Cold)
在 Rx 家族的响应式库中,响应式流分为 “热” 和“冷”两种类型,区别主要在于响应式流如何 对订阅者进行响应:
- 一个 “冷” 的序列,指对于每一个 Subscriber,都会收到从头开始所有的数据。如果源头 生成了一个 HTTP 请求,对于每一个订阅都会创建一个新的 HTTP 请求。
- 一个 “热” 的序列,指对于一个 Subscriber,只能获取从它开始 订阅 之后 发出的数据。不过注意,有些 “热” 的响应式流可以缓存部分或全部历史数据。 通常意义上来说,一个 “热” 的响应式流,甚至在即使没有订阅者接收数据的情况下,也可以 发出数据(这一点同 “Subscribe() 之前什么都不会发生” 的规则有冲突)。
三、 Reactor 核心特性
Reactor 核心特性
反应式流的背景
Reactor遵循Reactive Streams规范,反应式流(Reactive Streams)是一种处理异步数据流的标准和规范,它定义了一套API,旨在以非阻塞的方式处理数据流的发布与订阅,从而实现流的背压(Backpressure)管理。背压是指在流处理中,消费者(Subscriber)能够告知生产者(Publisher)自己能够处理的数据的速度,以避免因为生产者发送数据过快而导致消费者处理不过来,最终可能导致OOM(内存溢出)或其他性能问题。
反应式流规范定义了以下四个主要的接口:
-
Publisher:发布者,负责发布数据流。它可以被订阅(Subscription)。
-
Subscriber:订阅者,订阅并处理来自Publisher的数据流。它定义了处理数据、完成信号和错误信号的方法。
-
Subscription:订阅关系,是Publisher和Subscriber之间的一座桥梁。提供了
请求数据
和取消订阅
的方法,以实现背压管理
。 -
Processor:处理器,充当了
Publisher
和Subscriber
的角色,可以用于在数据流中添加处理逻辑。可以一个或多个链式连接
Reactor基础
-
Mono:表示0或1个元素的异步序列。常用于单个结果的异步操作,如异步的数据库查询或远程服务调用。
-
Flux:表示0到N个元素的异步序列。适用于多个元素的操作,如处理集合、流式数据处理。
创建序列
- just():
可以指定序列中包含的全部元素。创建出来的Flux序列在发布这些元素之后会自动结束
- fromArray(),fromIterable(),fromStream():
可以从一个数组,Iterable对象或Stream对象中穿件Flux对象
- empty():
创建一个不包含任何元素,只发布结束消息的序列
- error(Throwable error):
创建一个只包含错误消息的序列
- never():
传建一个不包含任务消息通知的序列
- range(int start, int count):
创建包含从start起始的count个数量的Integer对象的序列
- interval(Duration period)和interval(Duration delay, Duration period):
创建一个包含了从0开始递增的Long对象的序列。其中包含的元素按照指定的间隔来发布。除了间隔时间之外,还可以指定起始元素发布之前的延迟时间
- intervalMillis(long period)和intervalMillis(long delay, long period):
与interval()方法相同,但该方法通过毫秒数来指定时间间隔和延迟时间
Flux.just(1, 2, 3, 4, 5, 6, 7, 0, 5, 6);
Flux<String> stringFlux = Flux.just("hello", "world");//字符串//fromArray(),fromIterable()和fromStream():可以从一个数组、Iterable 对象或Stream 对象中创建Flux序列Integer[] array = {1,2,3,4};Flux.fromArray(array).subscribe(System.out::println);List<Integer> integers = Arrays.asList(array);Flux.fromIterable(integers).subscribe(System.out::println);Stream<Integer> stream = integers.stream();Flux.fromStream(stream).subscribe(System.out::println);Flux.empty().subscribe(System.out::println);
Flux.range(1, 10).subscribe(System.out::println);
Flux.interval(Duration.of(10, ChronoUnit.SECONDS)).subscribe(System.out::println);
Flux.intervalMillis(1000).subscirbe(System.out::println);
序列同步创建 generate()
generate()方法通过同步和逐一的方式来产生Flux序列。
序列的产生
是通过调用所提供的的SynchronousSink对象的next()
,complete()和error(Throwable)方法来完成的。
逐一生成的含义是在具体的生成逻辑中,next()方法只能最多被调用一次。
在某些情况下,序列的生成可能是有状态的
,需要用到某些状态对象,此时可以使用
/*** 同步地, 逐个地 产生值的方法* 你需要提供一个 Supplier<S> 来初始化状态值,而生成器需要 在每一“回合”生成元素后返回新的状态值(供下一回合使用)Callable<S> stateSupplier* sink 接收器,水槽 sink.next() 将元素放入水槽(流). BiFunction<S, SynchronousSink<T>, S> generator*/public void generate(){Flux<String> flux = Flux.generate(//也可以使用 可变(mutable)类型,AtomicLong() -> 0,(state, sink) -> {sink.next("3 x " + state + " = " + 3*state);System.out.println("generate thread:"+Thread.currentThread().getName());if (state == 7) sink.error(new RuntimeException("7 被拒了哈"));if (state == 10) sink.complete();return state + 1;});System.out.println("main thread:"+Thread.currentThread().getName());// 所有操纵作其实都是对 发布者的!!flux.onErrorReturn("被拒了哈").doOnError(e-> System.out.println("error :"+e)).subscribe(System.out::println);}
复杂序列创建create()
作为一个更高级的创建 Flux 的方式, create 方法的生成方式既可以是同步, 也可以是异步的,并且还可以每次发出多个元素。
简单例子
Flux.create(sink -> {for (int i = 0; i < 10; i++) {sink.next(i);}sink.complete();
}).subscribe(System.out::println);
上面的例子很容易看懂,在一个for循环中使用sink来下发元素然后结束。
很多文章甚至书籍都是拿这样的例子来介绍Flux.create(),这样的用法和Flux.just其实没有太大的区别,在创建的时候就已经确定了怎么去下发元素。这样例子其实并不能让我们真正的使用Flux.create去解决实际开发中的问题,意义不大。
一个并不完善的解决方案
FluxSink<String> outSink;@Testpublic void testFluxCreate() {Flux<String> f = Flux.create(sink -> {outSink = sink;});f.subscribe(e -> System.out.println(e))//do something//下发元素outSink.next("我来了");outSink.next("我来了2");}
- 声明一个FluxSink类型成员变量outSink,用来接收Flux.create中的sink。
- 在
Flux.create
函数式方式中将sink
赋值给成员变量 - 在外部通过
outSink
随时下发元素。
这样确实能解决上面的抛出的问题,但是也引发了其他的问题。我们只是想进行元素的下发,但是将FluxSink开放出来它可不止能进行元素下发,还有其他的方法。这样做破坏了封装性,如果其他人使用不当,比如提前结束FluxSink等等会引发异常的Bug。
官网例子
有一个监听者的接口,在create的lambda表达式中创建一个监听者并将它注册到监听器中。而这个监听者在事件触发的时候就会调用sink的下发元素的方法。它并没有将FluxSink的直接暴露出去,而是使用一个订阅者对它进行了一层封装
,只暴露一些需要的方法。
interface MyEventListener<T> {void onDataChunk(List<T> chunk);void processComplete();
}
Flux<String> bridge = Flux.create(sink -> {myEventProcessor.register( new MyEventListener<String>() { public void onDataChunk(List<String> chunk) {for(String s : chunk) {sink.next(s); }}public void processComplete() {sink.complete(); }});
});
- 桥接 MyEventListener。
- 每一个 chunk 的数据转化为 Flux 中的一个元素。
- processComplete 事件转换为 onComplete。
- 所有这些都是在 myEventProcessor 执行时异步执行的。
此外,既然 create 可以是异步地,并且能够控制背压
,你可以通过提供一个 OverflowStrategy
来定义背压行为。
-
IGNORE
: 完全忽略下游背压请求,这可能会在下游队列积满的时候导致 IllegalStateException。 -
ERROR
: 当下游跟不上节奏的时候发出一个 IllegalStateException 的错误信号。 -
DROP
:当下游没有准备好接收新的元素的时候抛弃这个元素。 -
LATEST
:让下游只得到上游最新的元素。 -
BUFFER
:(默认的)缓存所有下游没有来得及处理的元素(这个不限大小的缓存可能导致 OutOfMemoryError)。
由于官网这个例子的 myEventProcessor 并没有定义,所以,我大概猜了一下,他的功能大概是这样。有可能是错的!!
class MyEventProcessor<T>{private MyEventListener<T> myEventListener;public void register(MyEventListener<T> stringMyEventListener) { this.myEventListener = stringMyEventListener;}public void putData(List<T> chunk){//or you can do something to get a new listmyEventListener.onDataChunk(chunk);}// ....}
完整代码
MyEventProcessor myEventProcessor = new MyEventProcessor();Flux<String> bridge = Flux.create(sink -> {myEventProcessor.register(new MyEventListener<String>() {public void onDataChunk(List<String> chunk) {for(String s : chunk) {sink.next(s);}}public void processComplete() {sink.complete();}});});bridge.log().subscribe(System.out::println);List<String> data = new ArrayList<>();for (int i = 0; i < 10; i++) {data.add("value"+i);}myEventProcessor.putData(data);
简化方案
上面我们看到虽然能解决问题,但是需要一个监听者接口并实现它,还需要一个监听器,如果是一个简单的问题,只有一个监听者需要这么步骤就显得太复杂了。那我们可以结合第一个Bad方案和官方的方案得到一个简化的方案:
Consumer<String> producer;@Testpublic void testFluxCreate() {Flux.create(sink -> {producer = nextData -> sink.next(nextData);}).subscribe(e -> System.out.println(e));//do something//下发元素producer.accept("我来了");producer.accept("我来了2");}
- 声明一个Consumer类型成员变量producer,在Flux.create中进行初始化,定义如何使用sink。
- 在外部通过producer随时下发元素。
上面的例子我们即简单的解决了上面抛出的问题,也没有破话封装性,只暴露关键的功能出来。
中间操作
事件触发
/*** 响应式编程核心:看懂文档弹珠图;* 信号: 正常/异常(取消)* SignalType:* SUBSCRIBE: 被订阅* REQUEST: 请求了N个元素* CANCEL: 流被取消* ON_SUBSCRIBE:在订阅时候* ON_NEXT: 在元素到达* ON_ERROR: 在流错误* ON_COMPLETE:在流正常完成时* AFTER_TERMINATE:中断以后* CURRENT_CONTEXT:当前上下文* ON_CONTEXT:感知上下文* <p>* doOnXxx API触发时机* 1、doOnNext:每个数据(流的数据)到达的时候触发* 2、doOnEach:每个元素(流的数据和 信号 )到达的时候触发* 3、doOnRequest: 消费者请求流元素的时候* 4、doOnError:流发生错误* 5、doOnSubscribe: 流被订阅的时候* 6、doOnTerminate: 发送取消/异常信号中断了流* 7、doOnCancle: 流被取消* 8、doOnDiscard:流中元素被忽略的时候** @param args*/public void doOnXxxx(String[] args) {// 关键:doOnNext:表示流中某个元素到达以后触发我一个回调// doOnXxx要感知某个流的事件,写在这个流的后面,新流的前面Flux.just(1, 2, 3, 4, 5, 6, 7, 0, 5, 6).doOnNext(integer -> System.out.println("元素到达:" + integer)) //元素到达得到时候触发.doOnEach(integerSignal -> { //each封装的详细System.out.println("doOnEach.." + integerSignal);})//1,2,3,4,5,6,7,0.map(integer -> 10 / integer) //10,5,3,// 这个 doOnError 只能感知上一个 map(integer -> 10 / integer) 的错误事件.doOnError(throwable -> {System.out.println("数据库已经保存了异常:" + throwable.getMessage());}).map(integer -> 100 / integer)// .doOnError.doOnNext(integer -> System.out.println("元素到哈:" + integer)).subscribe(System.out::println);}
流的连接 concat merge
/**** concatMap: 一个元素可以 变很多单个; 对于元素类型无限制* concat: Flux.concat; 静态调用。 元素类型无限制* concatWith: 连接的流和老流中的。 元素类型要一样**/@Testvoid concatMap() {System.out.println("concatWith");Flux.just(1, 2).concatWith(Flux.just(4, 5, 6)).log().subscribe();//连接System.out.println("concat");Flux.concat(Flux.just(1,2),Flux.just("h","j"),Flux.just("haha","hehe")).log().subscribe();//Mono、FLux:发布者System.out.println("concatMap");Flux.just(1,2).concatMap(s-> Flux.just(s+"->a",1)).log().subscribe();}/*** concat: 连接; A流 所有元素和 B流所有元素拼接* merge:合并; A流 所有元素和 B流所有元素 按照时间序列合并* mergeWith:* mergeSequential: 按照哪个流先发元素排队*/@Testvoid merge() throws IOException {Flux.mergeSequential();Flux.merge(Flux.just(1, 2, 3).delayElements(Duration.ofSeconds(1)),Flux.just("a", "b").delayElements(Duration.ofMillis(1500)),Flux.just("haha", "hehe", "heihei", "xixi").delayElements(Duration.ofMillis(500))).log().subscribe();Flux.just(1, 2, 3).mergeWith(Flux.just(4, 5, 6));System.in.read();}
流的压缩 zip
/*** zip: 无法结对的元素会被忽略;* 最多支持8流压缩;*/@Testvoid zip() {//Tuple:元组;// Flux< Tuple2:<Integer,String> >Flux.zip(Flux.just(1,2),Flux.just(1,2),Flux.just(2,3),Flux.just(1,3,99)).log().subscribe(System.out::println);// Flux.just(1,2,3)
// .zipWith(Flux.just("a","b","c","d"))
// .map(tuple -> {
// Integer t1 = tuple.getT1(); //元组中的第一个元素
// String t2 = tuple.getT2();// 元组中的第二个元素
// return t1 + "==>" + t2;
// })
// .log()
// .subscribe(v-> System.out.println("v = " + v));}
超时与重试 timeout retry
/*** 重试与超时* @throws IOException*/@Testvoid retryAndTimeout() throws IOException {Flux.just(1).delayElements(Duration.ofSeconds(3)).log().(Duration.ofSeconds(2)).retry(2) // 把流从头到尾重新请求一次.onErrorReturn(2).map(i-> i+"haha").subscribe(v-> System.out.println("v = " + v));System.in.read();}
背压(Backpressure )和请求重塑(Reshape Requests)
Flux<List<Integer>> flux = Flux.range(1, 10) //原始流10个.buffer(3).log();//缓冲区:缓冲3个元素: 消费一次最多可以拿到三个元素; 凑满数批量发给消费者
//
// //一次发一个,一个一个发;
// 10元素,buffer(3);消费者请求4次,数据消费完成
Flux.range(1, 1000).log()//限流触发,看上游是怎么限流获取数据的.limitRate(100) //一次预取30个元素; 第一次 request(100),以后request(75).subscribe();
void TestCache() throws IOException {Flux<Integer> cache = Flux.range(1, 10).delayElements(Duration.ofSeconds(1)) //不调缓存默认就是缓存所有;
// .cache(1); //缓存两个元素; 默认全部缓存cache.subscribe();//缓存元素;// 最定义订阅者new Thread(()->{try {Thread.sleep(9000);} catch (InterruptedException e) {throw new RuntimeException(e);}cache.subscribe(v-> System.out.println("v = " + v));}).start();System.in.read();}
handle
自定义流中元素处理规则
//Flux.range(1,10).handle((value,sink)->{System.out.println("拿到的值:"+value);sink.next("张三:"+value); //可以向下发送数据的通道}).log() //日志.subscribe();
自定义线程调度、线程上下文
响应式:响应式编程: 全异步、消息、事件回调
默认还是用当前线程,生成整个流、发布流、流操作
/*** 线程调度*/public void thread1(){Scheduler s = Schedulers.newParallel("parallel-scheduler", 4);final Flux<String> flux = Flux.range(1, 3).map(i -> 10 + i).log()// 改变发布者的线程,后面的线程开始切换.publishOn(s).log()// 改变订阅者的线程
// .subscribeOn(Schedulers.single()).map(i -> "value " + i);Schedulers.immediate();// 默认: 当前线程执行所有操纵Schedulers.single();//使用一个固定的单线程Schedulers.boundedElastic();//使用有界的、弹性调度的线程池。非无线扩张
// Schedulers.fromExecutor(Executors.newSingleThreadExecutor()); // 自定义线程池//只要不指定线程池,默认发布者用的线程就是订阅者的线程;new Thread(() -> flux.subscribe(System.out::println)).start();}
需求
:100万数据,分8个线程处理,每个线程一次处理100个数据
// 百万数据,8个线程,每个线程处理100,进行分批处理一直处理结束Flux.range(1,1000000).buffer(100).parallel(8).runOn(Schedulers.newParallel("yy")).log().flatMap(list->Flux.fromIterable(list)).collectSortedList(Integer::compareTo).subscribe();System.in.read();
缩写至 100 的数据量打印结果
20:00:46.851 [main] INFO reactor.Parallel.RunOn.1 - onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber)
20:00:46.851 [main] INFO reactor.Parallel.RunOn.1 - request(unbounded)
20:00:46.851 [main] INFO reactor.Parallel.RunOn.1 - onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber)
20:00:46.851 [main] INFO reactor.Parallel.RunOn.1 - request(unbounded)
20:00:46.851 [main] INFO reactor.Parallel.RunOn.1 - onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber)
20:00:46.851 [main] INFO reactor.Parallel.RunOn.1 - request(unbounded)
20:00:46.851 [main] INFO reactor.Parallel.RunOn.1 - onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber)
20:00:46.851 [main] INFO reactor.Parallel.RunOn.1 - request(unbounded)
20:00:46.851 [main] INFO reactor.Parallel.RunOn.1 - onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber)
20:00:46.851 [main] INFO reactor.Parallel.RunOn.1 - request(unbounded)
20:00:46.851 [main] INFO reactor.Parallel.RunOn.1 - onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber)
20:00:46.851 [main] INFO reactor.Parallel.RunOn.1 - request(unbounded)
20:00:46.851 [main] INFO reactor.Parallel.RunOn.1 - onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber)
20:00:46.851 [main] INFO reactor.Parallel.RunOn.1 - request(unbounded)
20:00:46.851 [main] INFO reactor.Parallel.RunOn.1 - onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber)
20:00:46.851 [main] INFO reactor.Parallel.RunOn.1 - request(unbounded)
20:00:46.851 [yy-5] INFO reactor.Parallel.RunOn.1 - onNext([41, 42, 43, 44, 45, 46, 47, 48, 49, 50])
20:00:46.851 [yy-2] INFO reactor.Parallel.RunOn.1 - onNext([11, 12, 13, 14, 15, 16, 17, 18, 19, 20])
20:00:46.851 [yy-1] INFO reactor.Parallel.RunOn.1 - onNext([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
20:00:46.851 [yy-7] INFO reactor.Parallel.RunOn.1 - onNext([61, 62, 63, 64, 65, 66, 67, 68, 69, 70])
20:00:46.851 [yy-8] INFO reactor.Parallel.RunOn.1 - onNext([71, 72, 73, 74, 75, 76, 77, 78, 79, 80])
20:00:46.851 [yy-4] INFO reactor.Parallel.RunOn.1 - onNext([31, 32, 33, 34, 35, 36, 37, 38, 39, 40])
20:00:46.851 [yy-3] INFO reactor.Parallel.RunOn.1 - onNext([21, 22, 23, 24, 25, 26, 27, 28, 29, 30])
20:00:46.851 [yy-6] INFO reactor.Parallel.RunOn.1 - onNext([51, 52, 53, 54, 55, 56, 57, 58, 59, 60])
20:00:46.867 [yy-4] INFO reactor.Parallel.RunOn.1 - request(1)
20:00:46.867 [yy-7] INFO reactor.Parallel.RunOn.1 - request(1)
20:00:46.867 [yy-1] INFO reactor.Parallel.RunOn.1 - request(1)
20:00:46.867 [yy-5] INFO reactor.Parallel.RunOn.1 - request(1)
20:00:46.867 [yy-1] INFO reactor.Parallel.RunOn.1 - onNext([81, 82, 83, 84, 85, 86, 87, 88, 89, 90])
20:00:46.867 [yy-3] INFO reactor.Parallel.RunOn.1 - request(1)
20:00:46.867 [yy-1] INFO reactor.Parallel.RunOn.1 - request(1)
20:00:46.867 [yy-6] INFO reactor.Parallel.RunOn.1 - request(1)
20:00:46.867 [yy-8] INFO reactor.Parallel.RunOn.1 - request(1)
20:00:46.867 [yy-5] INFO reactor.Parallel.RunOn.1 - onComplete()
20:00:46.867 [yy-1] INFO reactor.Parallel.RunOn.1 - onComplete()
20:00:46.867 [yy-2] INFO reactor.Parallel.RunOn.1 - request(1)
20:00:46.867 [yy-4] INFO reactor.Parallel.RunOn.1 - onComplete()
20:00:46.867 [yy-2] INFO reactor.Parallel.RunOn.1 - onNext([91, 92, 93, 94, 95, 96, 97, 98, 99, 100])
20:00:46.867 [yy-3] INFO reactor.Parallel.RunOn.1 - onComplete()
20:00:46.867 [yy-6] INFO reactor.Parallel.RunOn.1 - onComplete()
20:00:46.867 [yy-2] INFO reactor.Parallel.RunOn.1 - request(1)
20:00:46.867 [yy-8] INFO reactor.Parallel.RunOn.1 - onComplete()
20:00:46.867 [yy-7] INFO reactor.Parallel.RunOn.1 - onComplete()
20:00:46.867 [yy-2] INFO reactor.Parallel.RunOn.1 - onComplete()
线程切换 本地线程变量问题
@Test //ThreadLocal在响应式编程中无法使用。//响应式中,数据流期间共享数据,Context API: Context:读写 ContextView:只读;void threadlocal(){//支持Context的中间操作
// Flux.just(1,2,3)
// .transformDeferredContextual((flux,context)->{
// System.out.println("flux = " + flux);
// System.out.println("context = " + context);
// return flux.map(i->i+"==>"+context.get("prefix"));
// })
// //上游能拿到下游的最近一次数据
// .contextWrite(Context.of("prefix","哈哈"))
// //ThreadLocal共享了数据,上游的所有人能看到; Context由下游传播给上游
// .subscribe(v-> System.out.println("v = " + v));// 写入 与 读取 Context 的 相对位置 很重要:因为 Context 是不可变的,它的内容只能被上游的操作符看到String key = "message";Mono<String> r = Mono.just("Hello")// unUsable.contextWrite(ctx -> ctx.put(key, "World")).transformDeferredContextual((flux,context)->{System.out.println("flux = " + flux);System.out.println("context = " + context);return flux.map(ctx->ctx+" "+context.getOrDefault(key,""));})// 作用于上面context.contextWrite(ctx -> ctx.put(key, "reactor")).transformDeferredContextual((flux,context)->{System.out.println("flux = " + flux);System.out.println("context = " + context);return flux.map(ctx->ctx+" "+context.getOrDefault(key,""));})// 作用于上面context.contextWrite(ctx -> ctx.put(key, "world"))// unUsable.contextWrite(ctx -> ctx.put(key, "world!"));StepVerifier.create(r).expectNext("Hello reactor world").verifyComplete();}
异常处理
- Catch and return a static default value. 捕获异常返回一个静态默认值
onErrorReturn: 实现上面效果,错误的时候返回一个值
● 1、吃掉异常,消费者无异常感知
● 2、返回一个兜底默认值
● 3、流正常完成;
Flux.just(1, 2, 0, 4).map(i -> "100 / " + i + " = " + (100 / i)).onErrorReturn(NullPointerException.class,"哈哈-6666").subscribe(v-> System.out.println("v = " + v),err -> System.out.println("err = " + err),()-> System.out.println("流结束")); // error handling example
- Catch and execute an alternative path with a fallback method. 吃掉异常,执行一个兜底方法;
onErrorResume
● 1、吃掉异常,消费者无异常感知
● 2、调用一个兜底方法
● 3、流正常完成
Flux.just(1, 2, 0, 4).map(i -> "100 / " + i + " = " + (100 / i)).onErrorResume(err -> Mono.just("哈哈-777")).subscribe(v -> System.out.println("v = " + v),err -> System.out.println("err = " + err),() -> System.out.println("流结束"));// like
try {return doSomethingDangerous(10);
}
catch (Throwable error) {return doOtherthing(10);
}
- Catch and dynamically compute a fallback value. 捕获并动态计算一个返回值
根据错误返回一个新值
.onErrorResume(err -> Flux.error(new BusinessException(err.getMessage()+":炸了")))// 类似
try {Value v = erroringMethod();return MyWrapper.fromValue(v);
}
catch (Throwable error) {return MyWrapper.fromError(error);
}
● 1、吃掉异常,消费者有感知
● 2、调用一个自定义方法
● 3、流异常完成
- Catch, wrap to a BusinessException, and re-throw.
捕获并包装成一个业务异常,并重新抛出
包装重新抛出异常: 推荐用 .onErrorMap
● 1、吃掉异常,消费者有感知
● 2、抛新异常
● 3、流异常完成
try {return callExternalService(k);
}
catch (Throwable error) {throw new BusinessException("oops, SLA exceeded", error);
}.onErrorResume(err -> Flux.error(new BusinessException(err.getMessage()+":炸了")))Flux.just(1, 2, 0, 4).map(i -> "100 / " + i + " = " + (100 / i)).onErrorMap(err-> new BusinessException(err.getMessage()+": 又炸了...")).subscribe(v -> System.out.println("v = " + v),err -> System.out.println("err = " + err),() -> System.out.println("流结束"));
- Catch, log an error-specific message, and re-throw.
捕获异常,记录特殊的错误日志,重新抛出
try {return callExternalService(k);
}
catch (RuntimeException error) {//make a record of the errorlog("uh oh, falling back, service failed for key " + k);throw error;
}Flux.just(1, 2, 0, 4).map(i -> "100 / " + i + " = " + (100 / i)).doOnError(err -> {System.out.println("err已被记录 = " + err);}).subscribe(v -> System.out.println("v = " + v),err -> System.out.println("err = " + err),() -> System.out.println("流结束"));
● 异常被捕获、做自己的事情
● 不影响异常继续顺着流水线传播
(后续不处理,会终止执行)
● 不吃掉异常
,只在异常发生的时候做一件事,消费者有感知
- Use the finally block to clean up resources or a Java 7 “try-with-resource” construct.
Flux.just(1, 2, 3, 4).map(i -> "100 / " + i + " = " + (100 / i)).doOnError(err -> {System.out.println("err已被记录 = " + err);}).doFinally(signalType -> {System.out.println("流信号:"+signalType);})
- 忽略当前异常,仅通知记录,继续推进
Flux.just(1,2,3,0,5).map(i->10/i).onErrorContinue((err,val)->{System.out.println("err = " + err);System.out.println("val = " + val);System.out.println("发现"+val+"有问题了,继续执行其他的,我会记录这个问题");}) //发生.subscribe(v-> System.out.println("v = " + v),err-> System.out.println("err = " + err));
sinks 单播|多播
Sinks
: 接受器,数据管道,所有数据顺着这个管道往下走的
void sinks() throws InterruptedException, IOException {// Flux.create(fluxSink -> {
// fluxSink.next("111")
// })// Sinks.many(); //发送Flux数据。
// Sinks.one(); //发送Mono数据// Sinks: 接受器,数据管道,所有数据顺着这个管道往下走的//Sinks.many().unicast(); //单播: 这个管道只能绑定单个订阅者(消费者)//Sinks.many().multicast();//多播: 这个管道能绑定多个订阅者//Sinks.many().replay();//重放: 这个管道能重放元素。 是否给后来的订阅者把之前的元素依然发给它;// 从头消费还是从订阅的那一刻消费;Sinks.Many<Object> many = Sinks.many().multicast() //多播.onBackpressureBuffer(); //背压队列//默认订阅者,从订阅的那一刻开始接元素//发布者数据重放; 底层利用队列进行缓存之前数据
// Sinks.Many<Object> many = Sinks.many().replay().limit(3);new Thread(()->{for (int i = 0; i < 10; i++) {many.tryEmitNext("a-"+i);try {Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}}}).start();
//
// //订阅many.asFlux().subscribe(v-> System.out.println("v1 = " + v));new Thread(()->{try {Thread.sleep(5000);} catch (InterruptedException e) {throw new RuntimeException(e);}many.asFlux().subscribe(v-> System.out.println("v2 = " + v));}).start();System.in.read();}
基于 lambda 的对 Flux 的订阅(subscribe)
流在被订阅之前不会触发任何操作!!!
subscribe(); // 1subscribe(Consumer<? super T> consumer); // 2subscribe(Consumer<? super T> consumer,Consumer<? super Throwable> errorConsumer); // 3subscribe(Consumer<? super T> consumer,Consumer<? super Throwable> errorConsumer,Runnable completeConsumer); // 4subscribe(Consumer<? super T> consumer,Consumer<? super Throwable> errorConsumer,Runnable completeConsumer,Consumer<? super Subscription> subscriptionConsumer); // 5
- 订阅并触发序列。
- 对每一个生成的元素进行消费。
- 对正常元素进行消费,也对错误进行响应。
- 对正常元素和错误均有响应,还定义了序列正常完成后的回调。
- 对正常元素、错误和完成信号均有响应, 同时也定义了对该
subscribe
方法返回的Subscription
执行的回调。
前4个好理解
Flux<Integer> ints = Flux.range(1, 4);ints.subscribe(i -> {System.out.println(i / (i - 2));},error -> System.err.println("Error " + error),() -> {System.out.println("Done");});
三个 lambda 表达式:一个是用来处理正常数据,一个用来处理错误,一个完成后的处理。
在 subscribe 方法中,最后一个参数 Consumer<? super Subscription> subscriptionConsumer
用于处理订阅时的 Subscription 对象。这个 Subscription
对象允许你控制数据流的行为,尤其是请求的数量。这在响应式编程中是非常重要的,因为它涉及到背压(backpressure)
机制。
用途
- 请求元素:
你可以使用 Subscription 对象请求一定数量的元素。这是控制数据流的基本方式。 - 取消订阅:
通过 Subscription 对象,可以随时取消订阅,这对于管理资源和避免内存泄漏非常重要。 - 动态请求:
在处理流时,可以根据当前的处理能力动态调整请求的数量,确保不会因为请求过多而造成处理瓶颈。
示例
以下是三个例子,展示如何使用 Consumer<? super Subscription> subscriptionConsumer
。
- 控制请求元素
Flux<Integer> flux = Flux.range(1, 5);
flux.subscribe(i -> System.out.println("Received: " + i),error -> System.err.println("Error: " + error),() -> System.out.println("Completed"),subscription -> {subscription.request(3); // 先请求3个元素}
);
如果改成
Flux.range(1, 3)
,那么Completed
就会被打印出来!
- 取消订阅
// 这个方法返回一个 Flux,它会在每 100 毫秒发出一个递增的 Long 值(从 0 开始)。例如,发出的值依次为 0, 1, 2, 3, 等等。take(10) 这个操作符限制了 Flux 只发出前 10 个值Flux<Long> flux = Flux.interval(Duration.ofMillis(100)).take(10);flux.subscribe(i -> System.out.println("Received: " + i),error -> System.err.println("Error: " + error),() -> System.out.println("Completed"),subscription -> {subscription.request(5); // 请求5个元素// 2秒后取消订阅Executors.newSingleThreadScheduledExecutor().schedule(subscription::cancel, 200, TimeUnit.MILLISECONDS);});Thread.sleep(250);System.out.println("hh");
3. 动态请求(背压
)
{private static Subscription subscription; // 类成员变量public static void main(String[] args) {Flux<Integer> flux = Flux.range(1, 20);flux.subscribe(value -> {System.out.println("Received: " + value);if (value % 5 == 0) {System.out.println("Requesting more...");subscription.request(5); // 请求5个元素}},error -> System.err.println("Error: " + error),() -> System.out.println("Completed"),sub -> {subscription = sub; // 存储 Subscriptionsubscription.request(5); // 初始请求5个元素});}
}
仅个人理解:仅供参考
最后这个lamdba 表达式仅代表一个发起的动作。如pub-sub
一般(Flux
和flux.subscribe()
),在订阅者 subscribe() 之前什么都不会发生。在subscription.request(5)之后,才会走流水线。
而如果自定义了一个Subscriber
,上面的内容甚至不会执行!
例如:
public static void main(String[] args) {SampleSubscriber<Integer> ss = new SampleSubscriber<Integer>();Flux<Integer> ints = Flux.range(1, 4);ints.subscribe(i -> System.out.println(" lambda:"+i),error -> System.err.println("Error " + error),() -> {System.out.println("Done");},s -> ss.request(10));ints.subscribe(ss);}
public class SampleSubscriber<T> extends BaseSubscriber<T> {@Overridepublic void hookOnSubscribe(Subscription subscription) {// 订阅时触发,只执行一次System.out.println("Subscribed");request(1);}@Overridepublic void hookOnNext(T value) {System.out.println("SampleSubscriber:"+value);// 再次请求下一个!!request(1);}}
其实就等于
SampleSubscriber<Integer> ss = new SampleSubscriber<Integer>();Flux<Integer> ints = Flux.range(1, 4);ints.subscribe(ss);
对最后这个参数,还是不太懂。。。。