欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 新闻 > 焦点 > 响应式编程RxJava

响应式编程RxJava

2025/3/14 20:37:58 来源:https://blog.csdn.net/weixin_51020083/article/details/145607665  浏览:    关键词:响应式编程RxJava

        了解回调的通常都听过回调地狱。所谓回调指的是将一个 方法(或代码块、函数)作为 参数 传递给另一个方法。当某个操作完成时,后者会调用这个传递过来的方法。常用于 异步编程事件驱动编程,用来处理异步操作的结果。

        看下面的代码,模拟网络连接请求,请求需要消耗一定时间,因此同步运行会造成明显的顿感。

public class NetworkRequest {public CompletableFuture<String> makeRequest(String url) {return CompletableFuture.supplyAsync(() -> {System.out.println("Sending request to " + url);try {Thread.sleep(1000); // 模拟网络请求延迟} catch (InterruptedException e) {throw new RuntimeException(e);}return "Response from " + url;}, Executors.newCachedThreadPool());}
}

 链式回调。每次异步请求完成后,会触发下一个请求,并在每个请求完成时调用相应的回调方法。

public class Main {public static void main(String[] args) {NetworkRequest networkRequest = new NetworkRequest();// 使用异步操作链式调用请求networkRequest.makeRequest("http://example.com").thenAccept(message -> {System.out.println("First request completed: " + message);// 第二个异步请求networkRequest.makeRequest("http://example.com/next").thenAccept(message2 -> {System.out.println("Second request completed: " + message2);// 第三个异步请求networkRequest.makeRequest("http://example.com/final").thenAccept(message3 -> {System.out.println("Third request completed: " + message3);}).exceptionally(ex -> {System.err.println("Error in third request: " + ex.getMessage());return null;});}).exceptionally(ex -> {System.err.println("Error in second request: " + ex.getMessage());return null;});}).exceptionally(ex -> {System.err.println("Error in first request: " + ex.getMessage());return null;});}
}

输出结果:

Sending request to http://example.com
First request completed: Request completed successfully!
Sending request to http://example.com/next
Second request completed: Request completed successfully!
Sending request to http://example.com/final
Third request completed: Request completed successfully!

 

  • networkRequest.makeRequest("http://example.com") 会异步发起一个请求,并在请求完成后执行 thenAccept(message -> { ... }),处理返回的 message
  • 在第一个请求的回调中,第二个请求会被发起(networkRequest.makeRequest("http://example.com/next")),并且在第二个请求完成时,执行第二个请求的回调。
  • 同样,第三个请求在第二个请求的回调中被发起,直到所有请求完成。

可以看出这种 回调函数 层层嵌套,可读性差,难以维护,因此需要用到RxJava。

RxJava 的核心组成部分主要有以下两个:

Observable(事件发布)

Observable 是 RxJava 的核心组成部分之一,它代表了一个可以被观察的对象。它可以用来发送数据、事件、通知等。你可以将 Observable 看作一个生产者,它会发出事件流,供观察者来消费。

  • Observable 通过发射数据来表示一个流。
  • 观察者可以订阅(subscribe)这个流,接收其中发出的事件。

Observer(观察者)

Observer 是一个监听 Observable 的对象,它用于接收事件流发出的数据。Observer 会订阅 Observable,并且能够接收 Observable 发出的三种事件:nexterrorcomplete

  • onNext():接收到数据项。
  • onError():发生错误时调用。
  • onComplete():数据发送完成时调用。
public class RxBus {private static final PublishRelay<Object> bus = PublishRelay.create();public static void post(Object event) {bus.accept(event);}public static <T> Observable<T> toObservable(Class<T> eventType) {return bus.ofType(eventType);}
}

这段代码实现了一个简单的 RxBus 类,它的作用是用来发送和接收事件。这里的 RxBus 采用了 RxJava 中的 PublishRelay 来作为事件总线的核心机制。下面逐行解释这段代码:

1. private static final PublishRelay<Object> bus = PublishRelay.create();

这一行创建了一个 PublishRelay 实例,并将其赋值给 bus 变量。

  • PublishRelay 是 RxRelay 的一种类型,它是 PublishSubject 的一种形式,可以用来发射事件。PublishRelay 在 RxJava 中的作用类似于 ObservableSubject 的结合体,能够接收并广播事件给所有订阅者。
  • PublishRelay 具有以下特点:
    • 它可以接受外部事件 (accept() 方法)。
    • 它不会缓存事件,每个订阅者都会接收到它订阅时发出的最新事件。

使用 PublishRelay.create() 创建实例,意味着 bus 是一个能够发射 Object 类型事件的事件总线。

2. public static void post(Object event) { bus.accept(event); }

这个 post 方法是用来向 RxBus 发送事件的。

  • accept(event) 会把传入的 event 发布到 bus 中。所有订阅了这个 bus 的观察者都会接收到这个事件。
  • event 参数可以是任何类型的数据(这里是 Object 类型),这使得它能够发送任何类型的事件。

3. public static <T> Observable<T> toObservable(Class<T> eventType) { return bus.ofType(eventType); }

这个方法用于将 RxBus 中发布的事件转换为 Observable,并允许订阅者根据事件类型进行过滤。

  • ofType(eventType) 是一个 RxJava 操作符,用来过滤 bus 中的事件,只允许特定类型的事件通过。例如,如果你希望只接收 String 类型的事件,可以这样写:RxBus.toObservable(String.class)
  • eventType 是一个 Class 类型的参数,用来指定事件的类型。
  • Observable<T> 返回的是一个可以订阅的 Observable 对象,订阅者可以通过 subscribe() 方法来接收该类型的事件。

总结:

  1. RxBus 类的核心作用是管理事件的发布和订阅,允许不同的组件之间进行解耦。
  2. post(Object event):向 RxBus 发布事件。
  3. toObservable(Class<T> eventType):将 RxBus 中发布的事件转换成 Observable,并且根据事件类型进行过滤。订阅者可以订阅特定类型的事件。

定义一个事件:搜索

public class SearchEvent {private String query;public SearchEvent(String query) {this.query = query;}public String getQuery() {return query;}}

定义一个主界面: 

public class MainActivity extends AppCompatActivity {private EditText etSearch;private Button btnSearch;private CompositeDisposable disposables = new CompositeDisposable(); // 订阅管理器@Overrideprotected void onCreate(Bundle savedInstanceState) {super.onCreate(savedInstanceState);setContentView(R.layout.activity_main7);etSearch = findViewById(R.id.etSearch);btnSearch = findViewById(R.id.btnSearch);// 按钮点击事件:发送 RxBus 事件btnSearch.setOnClickListener(v -> {String query = etSearch.getText().toString();RxBus.post(new SearchEvent(query)); // 发送事件});// 订阅 RxBus 事件subscribeToSearchEvent();}private void subscribeToSearchEvent() {//任何使用 RxBus.post(new SearchEvent(...)) 发送的事件,都会被这个订阅者接收到。disposables.add(RxBus.toObservable(SearchEvent.class)//一般情况下,订阅事件流的操作可能会有一些 耗时操作(比如网络请求、数据库操作等),这些耗时操作不能在 主线程 执行,否则会导致应用界面卡顿,用户体验差。.subscribeOn(Schedulers.io()) // 事件流的订阅行为会在子线程(IO线程)中执行。//事件的处理行为会在主线程(UI线程)中执行。//由于 Android 不允许在子线程直接操作 UI,因此我们需要 将事件的处理回调切换到主线程,这样才能安全地更新 UI。.observeOn(AndroidSchedulers.mainThread()) // 观察线程(主线程).subscribe(new Consumer<SearchEvent>() {@Overridepublic void accept(SearchEvent event) {handleSearchEvent(event);}}));}private void handleSearchEvent(SearchEvent event) {Toast.makeText(this, "搜索内容:" + event.getQuery(), Toast.LENGTH_SHORT).show();}@Overrideprotected void onDestroy() {super.onDestroy();disposables.clear(); // 避免 RxJava 订阅导致内存泄漏}
}

CompositeDisposableRxJava 中的一个类,用来管理多个订阅(Disposable)的生命周期。它的作用是帮助你统一管理多个订阅,方便你在合适的时机取消订阅,防止内存泄漏。

RxBus.toObservable(SearchEvent.class)

  • 这是 RxBus 提供的方法,作用是监听 SearchEvent 类型的事件
  • 任何使用 RxBus.post(new SearchEvent(...)) 发送的事件,都会被这个订阅者接收到。

 

subscribeOn(Schedulers.io())

含义:

  • .subscribeOn(Schedulers.io()) 用来指定 订阅事件流的线程
  • 具体来说,这句代码表示 事件流的订阅行为会在子线程(IO线程)中执行

为什么要这么做:

  • 一般情况下,订阅事件流的操作可能会有一些 耗时操作(比如网络请求、数据库操作等),这些耗时操作不能在 主线程 执行,否则会导致应用界面卡顿,用户体验差。
  • Schedulers.io() 是一个专门用于 IO操作的线程池,适合进行 网络请求、文件读写 等任务。

observeOn(AndroidSchedulers.mainThread())

含义:

  • .observeOn(AndroidSchedulers.mainThread()) 用来指定 观察事件的线程
  • 具体来说,这句代码表示 事件的处理行为会在主线程(UI线程)中执行

为什么要这么做:

  • 很多情况下,我们接收到的事件需要在 UI线程 上更新界面,比如显示网络请求的结果。
  • 由于 Android 不允许在子线程直接操作 UI,因此我们需要 将事件的处理回调切换到主线程,这样才能安全地更新 UI。
  • AndroidSchedulers.mainThread() 就是专门为 Android 提供的,用来指定在 主线程 执行代码的调度器。

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词