一、什么是“事件流”?
简单来说,事件流 就是数据从一个地方(发布者)传递到另一个地方(订阅者)的过程,像是一个“流水线”。发布者发布事件(数据),订阅者则在“流水线”上接收并处理这些事件。
生活中的类比:
假设你和你朋友玩一个游戏,你是发布者,你的朋友是订阅者。每当你发布一个游戏事件(比如发一个提示、一个消息或者数字),你的朋友会立刻接收到并作出反应。这就是一个事件流的例子。
具体例子:在应用中输入搜索框,实时搜索并显示结果
假设你在手机上使用一个搜索框来查找信息。当你输入每个字母时,系统会实时搜索并显示相应的结果。每输入一个字母,系统都会发出一个“事件”,而这个“事件”会被系统接收到并处理,显示出搜索结果。
这里的输入每个字母就是事件的发射,搜索结果的显示就是事件的接收和处理。
事件流的基本概念:
- 事件:是从发布者(
Observable
)发出的数据项,可以是任何数据(如数字、字符串、对象等)。 - 发布者(Observable):发布事件的源,负责发射数据。它会创建一个数据流,将数据逐个传递给订阅者。
- 订阅者(Observer):接收事件并处理事件的消费者。订阅者通过
subscribe()
方法订阅发布者,从而开始接收事件。 - 操作符:在事件流中,常常使用不同的操作符来转换、过滤、合并事件等。比如,
map
、filter
、flatMap
等。
事件流的三个关键步骤:
- 发布事件:
Observable
将数据逐个发布(发射)给订阅者。 - 接收事件:
Observer
(订阅者)接收到Observable
发布的事件,并对事件进行处理。 - 处理事件完成:在所有事件被处理完后,
Observable
可以发射onComplete()
来表示数据流结束,或者通过onError()
来报告错误。
事件流的流程示意:
Observable
创建了一个事件流,它开始发射数据。Observer
订阅了这个事件流,并在事件流中接收数据。Observer
处理每一个事件,直到流结束(onComplete()
)或发生错误(onError()
)。
二、事件流的发布和触发代码先览
Observable
是 RxJava 中的核心类,用来表示一个可以发射数据(事件流)的对象。可以把它理解为 数据的源,它产生一系列的数据项(事件),并将它们逐一发送给 订阅者(Observer)。RxJava 中的事件流是异步的,意味着它们会在后台线程中发射,而订阅者可以在主线程中处理这些数据。
通俗的类比:
想象一下,你在看一场电影,电影的播放过程就是事件流,而你(观众)则是订阅者(Observer)。电影播放(Observable)会发射不同的场景(数据项/事件),你(Observer)则逐一观看每个场景。
Observable
的核心概念:
- 事件流:
Observable
发射的数据项是事件流的基础。事件可以是任意类型的对象。 - 观察者模式:
Observable
和Observer
之间是典型的观察者模式。Observable
负责发送事件,而Observer
负责接收事件并处理它们。 - 异步和反应式:RxJava 强调反应式编程,
Observable
使得你可以异步地处理数据流。在数据产生时,Observer
会做出响应。
Observable的基本使用
Observable
可以通过多种方式创建,例如:
Observable.just()
:发射一组固定的事件。Observable.create()
:手动控制事件流的发射。Observable.fromIterable()
:从一个集合中逐一发射数据项。Observable.interval()
:定期发射事件
Observable.create
和 Observable.just
都是用来创建 Observable
对象的,但它们之间有很大的区别。我们可以从创建方式、灵活性和用途上来进行比较:
1. Observable.create()
说明:
Observable.create()
是一种最灵活、最基础的方式,用来手动定义Observable
的行为。- 你可以在
create()
中自由控制事件的发射过程,包括发射多个事件、处理异常、控制事件的完成等。
用法:
你需要通过 Emitter
对象手动定义发射事件,直到调用 onComplete()
或 onError()
来结束事件流。
在 Observable.create(emitter -> {...})
代码执行时,Observable
还没有真正开始发射事件。它只是定义了如何发射事件,但不会立即执行。只有当有订阅者(Observer)订阅它时,才会触发事件的发射。
下面是一个事件被发布的代码,此时还没有出现订阅者,所以没有事件发射:
// 模拟一个事件发布的地方
Observable<String> searchEvent = Observable.create(emitter -> {// 假设用户输入的每个字母都是一个事件emitter.onNext("Hello");emitter.onNext("Hello R");emitter.onNext("Hello Rx");emitter.onNext("Hello RxJava");emitter.onComplete(); // 事件流结束
});
Observable 是事件的发布者,而 emitter
就是发布事件的工具。在这个例子中,你创建了一个 Observable
,但还没有让它开始发射事件,直到有订阅者 订阅 这个 Observable
。
事件只有在通过 .subscribe()
订阅了这个 Observable
后,才会触发并发射事件。
searchEvent.subscribe(new Observer<String>() {@Overridepublic void onSubscribe(Disposable d) {System.out.println("开始订阅...");}@Overridepublic void onNext(String result) {System.out.println("接收到事件:" + result);}@Overridepublic void onError(Throwable e) {System.out.println("发生错误:" + e.getMessage());}@Overridepublic void onComplete() {System.out.println("事件流完成!");}
});
subscribe()
:这里才是触发事件流的时刻。订阅者通过 subscribe()
方法订阅了 Observable
后,事件开始发射(onNext
会被调用,依次输出事件内容),直到 onComplete()
被调用,表示事件流结束。
2. Observable.just()
说明:
Observable.just()
用于创建一个简单的Observable
,它会发射传入的固定数据项。- 适用于当你只需要发射固定的几个值时,简化了
Observable
的创建过程。
用法:
你只需要将数据传递给 just()
,它就会自动将这些数据依次发射。
Observable<String> observable = Observable.just("Hello", "Hello Rx", "Hello RxJava");
不能像 Observable.create()
那样自定义事件发射过程。例如,你不能在中间插入复杂的逻辑或者控制流。