了解回调的通常都听过回调地狱。所谓回调指的是将一个 方法(或代码块、函数)作为 参数 传递给另一个方法。当某个操作完成时,后者会调用这个传递过来的方法。常用于 异步编程 或 事件驱动编程,用来处理异步操作的结果。
看下面的代码,模拟网络连接请求,请求需要消耗一定时间,因此同步运行会造成明显的顿感。
public class NetworkRequest {public CompletableFuture<String> makeRequest(String url) {return CompletableFuture.supplyAsync(() -> {System.out.println("Sending request to " + url);try {Thread.sleep(1000); // 模拟网络请求延迟} catch (InterruptedException e) {throw new RuntimeException(e);}return "Response from " + url;}, Executors.newCachedThreadPool());}
}
链式回调。每次异步请求完成后,会触发下一个请求,并在每个请求完成时调用相应的回调方法。
public class Main {public static void main(String[] args) {NetworkRequest networkRequest = new NetworkRequest();// 使用异步操作链式调用请求networkRequest.makeRequest("http://example.com").thenAccept(message -> {System.out.println("First request completed: " + message);// 第二个异步请求networkRequest.makeRequest("http://example.com/next").thenAccept(message2 -> {System.out.println("Second request completed: " + message2);// 第三个异步请求networkRequest.makeRequest("http://example.com/final").thenAccept(message3 -> {System.out.println("Third request completed: " + message3);}).exceptionally(ex -> {System.err.println("Error in third request: " + ex.getMessage());return null;});}).exceptionally(ex -> {System.err.println("Error in second request: " + ex.getMessage());return null;});}).exceptionally(ex -> {System.err.println("Error in first request: " + ex.getMessage());return null;});}
}
输出结果:
Sending request to http://example.com
First request completed: Request completed successfully!
Sending request to http://example.com/next
Second request completed: Request completed successfully!
Sending request to http://example.com/final
Third request completed: Request completed successfully!
networkRequest.makeRequest("http://example.com")
会异步发起一个请求,并在请求完成后执行thenAccept(message -> { ... })
,处理返回的message
。- 在第一个请求的回调中,第二个请求会被发起(
networkRequest.makeRequest("http://example.com/next")
),并且在第二个请求完成时,执行第二个请求的回调。 - 同样,第三个请求在第二个请求的回调中被发起,直到所有请求完成。
可以看出这种 回调函数 层层嵌套,可读性差,难以维护,因此需要用到RxJava。
RxJava 的核心组成部分主要有以下两个:
Observable(事件发布)
Observable
是 RxJava 的核心组成部分之一,它代表了一个可以被观察的对象。它可以用来发送数据、事件、通知等。你可以将 Observable
看作一个生产者,它会发出事件流,供观察者来消费。
- Observable 通过发射数据来表示一个流。
- 观察者可以订阅(subscribe)这个流,接收其中发出的事件。
Observer(观察者)
Observer
是一个监听 Observable
的对象,它用于接收事件流发出的数据。Observer
会订阅 Observable
,并且能够接收 Observable
发出的三种事件:next、error 和 complete。
- onNext():接收到数据项。
- onError():发生错误时调用。
- onComplete():数据发送完成时调用。
public class RxBus {private static final PublishRelay<Object> bus = PublishRelay.create();public static void post(Object event) {bus.accept(event);}public static <T> Observable<T> toObservable(Class<T> eventType) {return bus.ofType(eventType);}
}
这段代码实现了一个简单的 RxBus 类,它的作用是用来发送和接收事件。这里的 RxBus
采用了 RxJava 中的 PublishRelay
来作为事件总线的核心机制。下面逐行解释这段代码:
1. private static final PublishRelay<Object> bus = PublishRelay.create();
这一行创建了一个 PublishRelay
实例,并将其赋值给 bus
变量。
PublishRelay
是 RxRelay 的一种类型,它是 PublishSubject 的一种形式,可以用来发射事件。PublishRelay
在 RxJava 中的作用类似于Observable
和Subject
的结合体,能够接收并广播事件给所有订阅者。PublishRelay
具有以下特点:- 它可以接受外部事件 (
accept()
方法)。 - 它不会缓存事件,每个订阅者都会接收到它订阅时发出的最新事件。
- 它可以接受外部事件 (
使用 PublishRelay.create()
创建实例,意味着 bus
是一个能够发射 Object
类型事件的事件总线。
2. public static void post(Object event) { bus.accept(event); }
这个 post
方法是用来向 RxBus
发送事件的。
accept(event)
会把传入的event
发布到bus
中。所有订阅了这个bus
的观察者都会接收到这个事件。event
参数可以是任何类型的数据(这里是Object
类型),这使得它能够发送任何类型的事件。
3. public static <T> Observable<T> toObservable(Class<T> eventType) { return bus.ofType(eventType); }
这个方法用于将 RxBus
中发布的事件转换为 Observable
,并允许订阅者根据事件类型进行过滤。
ofType(eventType)
是一个 RxJava 操作符,用来过滤bus
中的事件,只允许特定类型的事件通过。例如,如果你希望只接收String
类型的事件,可以这样写:RxBus.toObservable(String.class)
。eventType
是一个Class
类型的参数,用来指定事件的类型。Observable<T>
返回的是一个可以订阅的Observable
对象,订阅者可以通过subscribe()
方法来接收该类型的事件。
总结:
RxBus
类的核心作用是管理事件的发布和订阅,允许不同的组件之间进行解耦。post(Object event)
:向RxBus
发布事件。toObservable(Class<T> eventType)
:将RxBus
中发布的事件转换成Observable
,并且根据事件类型进行过滤。订阅者可以订阅特定类型的事件。
定义一个事件:搜索
public class SearchEvent {private String query;public SearchEvent(String query) {this.query = query;}public String getQuery() {return query;}}
定义一个主界面:
public class MainActivity extends AppCompatActivity {private EditText etSearch;private Button btnSearch;private CompositeDisposable disposables = new CompositeDisposable(); // 订阅管理器@Overrideprotected void onCreate(Bundle savedInstanceState) {super.onCreate(savedInstanceState);setContentView(R.layout.activity_main7);etSearch = findViewById(R.id.etSearch);btnSearch = findViewById(R.id.btnSearch);// 按钮点击事件:发送 RxBus 事件btnSearch.setOnClickListener(v -> {String query = etSearch.getText().toString();RxBus.post(new SearchEvent(query)); // 发送事件});// 订阅 RxBus 事件subscribeToSearchEvent();}private void subscribeToSearchEvent() {//任何使用 RxBus.post(new SearchEvent(...)) 发送的事件,都会被这个订阅者接收到。disposables.add(RxBus.toObservable(SearchEvent.class)//一般情况下,订阅事件流的操作可能会有一些 耗时操作(比如网络请求、数据库操作等),这些耗时操作不能在 主线程 执行,否则会导致应用界面卡顿,用户体验差。.subscribeOn(Schedulers.io()) // 事件流的订阅行为会在子线程(IO线程)中执行。//事件的处理行为会在主线程(UI线程)中执行。//由于 Android 不允许在子线程直接操作 UI,因此我们需要 将事件的处理回调切换到主线程,这样才能安全地更新 UI。.observeOn(AndroidSchedulers.mainThread()) // 观察线程(主线程).subscribe(new Consumer<SearchEvent>() {@Overridepublic void accept(SearchEvent event) {handleSearchEvent(event);}}));}private void handleSearchEvent(SearchEvent event) {Toast.makeText(this, "搜索内容:" + event.getQuery(), Toast.LENGTH_SHORT).show();}@Overrideprotected void onDestroy() {super.onDestroy();disposables.clear(); // 避免 RxJava 订阅导致内存泄漏}
}
CompositeDisposable
是 RxJava 中的一个类,用来管理多个订阅(Disposable
)的生命周期。它的作用是帮助你统一管理多个订阅,方便你在合适的时机取消订阅,防止内存泄漏。
RxBus.toObservable(SearchEvent.class)
- 这是
RxBus
提供的方法,作用是监听SearchEvent
类型的事件。 - 任何使用
RxBus.post(new SearchEvent(...))
发送的事件,都会被这个订阅者接收到。
subscribeOn(Schedulers.io())
含义:
.subscribeOn(Schedulers.io())
用来指定 订阅事件流的线程。- 具体来说,这句代码表示 事件流的订阅行为会在子线程(IO线程)中执行。
为什么要这么做:
- 一般情况下,订阅事件流的操作可能会有一些 耗时操作(比如网络请求、数据库操作等),这些耗时操作不能在 主线程 执行,否则会导致应用界面卡顿,用户体验差。
Schedulers.io()
是一个专门用于 IO操作的线程池,适合进行 网络请求、文件读写 等任务。
observeOn(AndroidSchedulers.mainThread())
含义:
.observeOn(AndroidSchedulers.mainThread())
用来指定 观察事件的线程。- 具体来说,这句代码表示 事件的处理行为会在主线程(UI线程)中执行。
为什么要这么做:
- 很多情况下,我们接收到的事件需要在 UI线程 上更新界面,比如显示网络请求的结果。
- 由于 Android 不允许在子线程直接操作 UI,因此我们需要 将事件的处理回调切换到主线程,这样才能安全地更新 UI。
AndroidSchedulers.mainThread()
就是专门为 Android 提供的,用来指定在 主线程 执行代码的调度器。