Spark 之 SparkListenerBus

2025/2/21 3:26:00


Bus 应该一种是对于多个Listener 。
ListenerBus 是运行在Driver 端,消息发送是发生在Driver 或者 Executor 中

CopyOnWriteArrayList 是 List 接口的一个线程安全实现,适用于需要保证线程安全频繁读取和偶尔修改的场景。其基本工作原理是,当对列表进行写操作(如添加、删除、更新元素)时,它会创建一个底层数组的副本,然后在新数组上执行写操作。这种“写时复制”的机制确保了在进行写操作时,不会影响正在进行的读操作,从而实现了线程安全。


/*** A [[SparkListenerEvent]] bus that relays [[SparkListenerEvent]]s to its listeners*/
private[spark] trait SparkListenerBusextends ListenerBus[SparkListenerInterface, SparkListenerEvent] {

这个类的作用是 保存event事件 和 listener 监听器对象,开启另外一个线程完成 事件投递到listener 监听器、完成事件的处理

/*** An asynchronous queue for events. All events posted to this queue will be delivered to the child* listeners in a separate thread.** Delivery will only begin when the `start()` method is called. The `stop()` method should be* called when no more events need to be delivered.*/
private class AsyncEventQueue(val name: String,conf: SparkConf,metrics: LiveListenerBusMetrics,bus: LiveListenerBus)extends SparkListenerBuswith Logging {import AsyncEventQueue._
  private[scheduler] def capacity: Int = {val queueSize = conf.getInt(s"$LISTENER_BUS_EVENT_QUEUE_PREFIX.$name.capacity",conf.get(LISTENER_BUS_EVENT_QUEUE_CAPACITY))assert(queueSize > 0, s"capacity for event queue $name must be greater than 0, " +s"but $queueSize is configured.")queueSize}private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](capacity)
/*** A SparkListenerBus that can be used to replay events from serialized event data.*/
private[spark] class ReplayListenerBus extends SparkListenerBus with Logging {


/*** Asynchronously passes SparkListenerEvents to registered SparkListeners.** Until `start()` is called, all posted events are only buffered. Only after this listener bus* has started will events be actually propagated to all attached listeners. This listener bus* is stopped when `stop()` is called, and it will drop further events after stopping.*/
private[spark] class LiveListenerBus(conf: SparkConf) {import LiveListenerBus._private var sparkContext: SparkContext = _private[spark] val metrics = new LiveListenerBusMetrics(conf)// Indicate if `start()` is calledprivate val started = new AtomicBoolean(false)// Indicate if `stop()` is calledprivate val stopped = new AtomicBoolean(false)private val queues = new CopyOnWriteArrayList[AsyncEventQueue]()


SQLAppStatusListener 还会负责 exec 的信息写进 leveldb。
Spark UI 只是从 leveldb 提取数据而已。

  private def onExecutionEnd(event: SparkListenerSQLExecutionEnd): Unit = {val SparkListenerSQLExecutionEnd(executionId, time, errorMessage) = eventOption(liveExecutions.get(executionId)).foreach { exec =>exec.completionTime = Some(new Date(time))exec.errorMessage = errorMessageif(exec.errorMessage.isDefined && (!errorMessage.get.isEmpty)) {println(s"Private def onExecutionEnd Error message: ${exec.errorMessage.get}")}update(exec)// Aggregating metrics can be expensive for large queries, so do it asynchronously. The end// event count is updated after the metrics have been aggregated, to prevent a job end event// arriving during aggregation from cleaning up the metrics data.kvstore.doAsync {exec.metricsValues = aggregateMetrics(exec)removeStaleMetricsData(exec)exec.endEvents.incrementAndGet()update(exec, force = true)}}}
  private def update(exec: LiveExecutionData, force: Boolean = false): Unit = {val now = System.nanoTime()if (exec.endEvents.get() >= exec.jobs.size + 1) {exec.write(kvstore, now)removeStaleMetricsData(exec)liveExecutions.remove(exec.executionId)} else if (force) {exec.write(kvstore, now)} else if (liveUpdatePeriodNs >= 0) {if (now - exec.lastWriteTime > liveUpdatePeriodNs) {exec.write(kvstore, now)}}}



