Swift Combine 学习(二):发布者 Publisher

2025/2/25
在上一篇文章中,初步简单的介绍了 Combine 框架的基本概念,大概有了一个初印象。本文将开始介绍 Combine 中的发布者(Publisher)。Publisher 是 Combine 框架的核心组件之一,负责生成和传递数据流。通过理解 Publisher 的类型、特性和使用方法,可以更好地在 Combine 中生成和管理数据流。

发布者 (Publisher)

Declares that a type can transmit a sequence of values over time.


发布者(Publisher)是 Combine 框架中的核心概念之一,它定义如何生成并传递一系列值。像是观察者模式中的 Observable。它可以使用 operator 来组合变换,生成新的 Publisher。发布者会随时间的推移将一系列值发送给一个或多个订阅者 Subscriber。发布者遵循 Publisher 协议,该协议定义了两个关联类型:

  1. Output:发布者发送出的值。
  2. Failure:发布者可能产生的错误,遵循 Error 协议。
public protocol Publisher<Output, Failure> {/// The kind of values published by this publisher./// 此发布者发布的值的类型associatedtype Output/// The kind of errors this publisher might publish./// 此发布者可能发布的错误类型/// Use `Never` if this `Publisher` does not publish errors./// 如果这个发布者不会发错误,用 Neverassociatedtype Failure : Error/// Attaches the specified subscriber to this publisher./// 将指定的订阅者附加给此发布者/// Implementations of ``Publisher`` must implement this method./// /// The provided implementation of ``Publisher/subscribe(_:)-4u8kn``calls this method.////// - Parameter subscriber: The subscriber to attach to this ``Publisher``, after which it can receive values.func receive<S>(subscriber: S) where S : Subscriber, Self.Failure == S.Failure, Self.Output == S.Input
}extension Publisher {/// Attaches the specified subject to this publisher./// 将指定的 Subject 订阅到一个 Publisher/// - Parameter subject: The subject to attach to this publisher.public func subscribe<S>(_ subject: S) -> AnyCancellable where S : Subject, Self.Failure == S.Failure, Self.Output == S.Output
}extension Publisher {/// Specifies the scheduler on which to receive elements from the publisher.指定用于接收发布者元素的调度器。////// You use the ``Publisher/receive(on:options:)`` operator to receive results and completion on a specific scheduler, such as performing UI work on the main run loop.你可以使用 ``Publisher/receive(on:options:)`` 操作符在特定的调度器上接收结果和完成信号,比如在主运行循环上执行 UI 工作。////// In contrast with ``Publisher/subscribe(on:options:)``, which affects upstream messages, ``Publisher/receive(on:options:)`` changes the execution context of downstream messages. 与影响上游消息的 ``Publisher/subscribe(on:options:)`` 相比,``Publisher/receive(on:options:)`` 改变的是下游消息的执行上下文。////// - Parameters:///   - scheduler: The scheduler the publisher uses for element delivery.发布者用于传递元素的调度器。///   - options: Scheduler options used to customize element delivery.用于自定义元素传递的调度器选项。/// - Returns: A publisher that delivers elements using the specified scheduler.返回一个使用指定调度器传递元素的发布者。public func receive<S>(on scheduler: S, options: S.SchedulerOptions? = nil) -> Publishers.ReceiveOn<Self, S> where S : Scheduler
}... 其他很多 Publisherextension ...

Publisher 通过 receive<S>(subscriber: S) 来接受订阅。

一个发布者可以发布多个值,可以有两种可能的状态:成功或失败。在成功状态下,发布者会发送 Output 类型的值;在失败状态下,发布者则会发送 Failure 类型的错误。如果发布者不会失败,Failure 类型通常会被设置为 Never,表示不会产生错误。

Combine 框架内置了很多发布者,包括 JustFutureDeferredEmptyFailRecord 以及 PassthroughSubjectcurValueSubject

一些 Publisher 举例:

  1. Future

    • 用于表示一个异步操作,该操作最终会产生一个值或一个错误。

    • 在以下例子中模拟一个异步操作,随机决定成功或失败。

      let futureP = Future<Int, Error> { promise inDispatchQueue.main.asyncAfter(deadline: .now() + 1) {let success = Bool.random()if success {promise(.success(2))} else {promise(.failure(NSError(domain: "ExampleError", code: 0, userInfo: nil)))}}
  2. Just

    • 创建一个只发出单个值然后立即完成的 Publisher。

      let justP = Just("Hello, World!")
  3. Deferred

    • 延迟创建 Publisher 直到有订阅者订阅。

      let deferredPublisher = Deferred {Just("Hello, World!")


ConnectablePublisher 可以让开发者控制数据流的开始发送时机。通常情况下,Publisher 在有订阅者时会立即开始发送数据,但 ConnectablePublisher 可以通过调用 connect() 来显式启动数据流。这在需要同步多个订阅者的场景中非常有用,例如网络请求或数据库查询。

比如当多个订阅者订阅了同一个非 ConnectablePublisherPublisher,有可能会出现其中一个订阅者收到了订阅内容,而另外一个订阅者却没收到的情况。这时候就可以使用 使用 makeConnectable()connect() 控制发布。

import Combineclass PublisherExample {private var cancellables = Set<AnyCancellable>()func demonstratePublishers() {// 1. Publisherprint("🍎普通 Publisher🍎")let numbers = PassthroughSubject<Int, Never>()// 第一个订阅者numbers.map { value -> Int inprint("处理数据: \(value)")return value * 2}.sink { value inprint("订阅者A: \(value)")}.store(in: &cancellables)// 第二个订阅者numbers.map { value -> Int inprint("处理数据: \(value)")return value * 2}.sink { value inprint("订阅者B: \(value)")}.store(in: &cancellables)// 发送数据numbers.send(1)numbers.send(2)// 2. ConnectablePublisherprint("\n🍎ConnectablePublisher🍎")let subject = PassthroughSubject<Int, Never>()let connectablePublisher = subject.map { value -> Int inprint("处理数据: \(value)")return value * 2}.makeConnectable()// 第一个订阅者connectablePublisher.sink { value inprint("订阅者A: \(value)")}.store(in: &cancellables)// 第二个订阅者connectablePublisher.sink { value inprint("订阅者B: \(value)")}.store(in: &cancellables)// 连接发布者connectablePublisher.connect().store(in: &cancellables)// 发送数据subject.send(1)subject.send(2)}
}let exp = PublisherExample()
exp.demonstratePublishers()/* 输出:
🍎普通 Publisher🍎
处理数据: 1
订阅者A: 2
处理数据: 1
订阅者B: 2
处理数据: 2
订阅者A: 4
处理数据: 2
订阅者B: 4🍎ConnectablePublisher🍎
处理数据: 1
订阅者B: 2
订阅者A: 2
处理数据: 2
订阅者B: 4
订阅者A: 4
  • 普通 Publisher:
    • 每个订阅者订阅时都会触发一次完整的数据流
    • 耗时操作会被执行多次
    • 订阅者获得的是独立的数据流
  • ConnectablePublisher:
    • 在调用 connect() 之前不会开始发送数据
    • 所有订阅者共享同一个数据流
    • 耗时操作只执行一次
    • 适合需要等待所有订阅者准备就绪才开始发送数据的场景


在 Combine 中,引用共享通常是指多个订阅者(Subscriber)共享同一个发布者(Publisher)的输出,而不是每个订阅者都触发一次数据生成。这可以通过 .share() 操作符实现。

关于 ConnectablePublisher 和引用共享所使用的 share 操作符号,在此先不展开讲解,后面讲到操作符的时候再举例说明。


A publisher that exposes a method for outside callers to publish elements.

Subject 是一个特殊的 Publisher。它的特殊之处在于:

  1. 主动发送能力:

    1. 普通的 Publisher 只能在创建时定义数据
    2. Subject 通过 send(_😃 方法可以在外部主动发送值到数据流中
  2. 控制权的不同:

    1. 普通 Publisher 的数据流是由内部逻辑控制的(比如 Just、Future)
    2. Subject 允许外部代码通过 send 方法控制数据流
    // 普通 Publisher:只能在创建时定义数据
    let publisher = [1, 2, 3].publisher  // 数据流固定// Subject:可以随时发送新数据
    let subject = PassthroughSubject<Int, Never>()
    subject.send(completion: .finished)  // 还可以主动结束
public protocol Subject<Output, Failure> : AnyObject, Publisher {/// Sends a value to the subscriber.////// - Parameter value: The value to send.func send(_ value: Self.Output)/// Sends a completion signal to the subscriber.////// - Parameter completion: A `Completion` instance which indicates whether publishing has finished normally or failed with an error.func send(completion: Subscribers.Completion<Self.Failure>)/// Sends a subscription to the subscriber.////// This call provides the ``Subject`` an opportunity to establish demand for any new upstream subscriptions.////// - Parameter subscription: The subscription instance through which the subscriber can request elements.func send(subscription: any Subscription)

Combine 内置了两种 Subject,分别是 PassthrougSubjectCurrentValueSubject


  • 无初始值或当前值存储
  • 只转发接收到的值给订阅者
  • 适用于:
    • 按钮点击等事件触发的场景
    • 不需要保存状态的场景
    • 一次性通知


  • 必须提供初始值
  • 维护一个当前值
  • 新订阅者立即收到当前值
  • 可通过 .value 属性读写当前值
  • 适用于:
    • 状态管理(如开关状态)
    • 需要缓存最新值的场景
    • 需要立即知道当前状态的场景
值访问无法访问当前值可通过 value 属性访问
设置值只能通过 send() 发送值可用 send() 或 value 属性
import Combine
import Foundation// PassthroughSubject
let passthroughSubject = PassthroughSubject<String, Never>()// curValueSubject
let curValueSubject = CurrentValueSubject<String, Never>("初始值")// 订阅 PassthroughSubject
passthroughSubject.sink { value inprint("PassthroughSubject 接收到: \(value)")}// 发送俩值
passthroughSubject.send("第二个值")// 订阅 curValueSubject
curValueSubject.sink { value inprint("curValueSubject 接收到: \(value)")}// 发送新值
print("curValueSubject 当前值: \(curValueSubject.value)")// 发送另一个新值
curValueSubject.send("再次更新的值")// 打印当前值
print("curValueSubject 当前值: \(curValueSubject.value)")/* 
PassthroughSubject 接收到: 第一个值
PassthroughSubject 接收到: 第二个值
curValueSubject 接收到: 初始值
curValueSubject 接收到: 更新后的值
curValueSubject 当前值: 更新后的值
curValueSubject 接收到: 再次更新的值
curValueSubject 当前值: 再次更新的值


Publisher 是 Combine 框架的基础组件之一,它为数据流的生成和传递提供了灵活而强大的工具。通过学习 Publisher 的工作原理和使用方式,开发者可以更有效地管理应用中的数据流。在下一篇文章中,将继续探讨介绍 Combine 中的订阅者(Subscriber)机制,进一步完善 Combine 知识体系。

