生产者消息对象
public class ProducerRecord<K, V> {private final String topic; // 主题private final Integer partition; //分区号private final Headers headers; //消息头部private final K key; //键private final V value; //值private final Long timestamp; //消息的时间戳
}
其中key是用来指定消息的键,它不仅是消息的附加信息,还可以用来计算分区号,进而让消息发往特定的分区,一般同一个key的消息会被划分到同一个分区中。
timestamp是指消息的时间戳,它有CreateTime和LogAppendTime两种类型,前者表示消息创建的时间,后者表示消息追加到日志文件的时间。
创建生产者实例
public static Properties initConfig() {Properties props = new Properties();props.put(ProducerConfig.KEY_SERIALZER_CLASS_CONFIG,StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALZER_CLASS_CONFIG,StringSerializer.class.getName());
}KafkaProducer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());
消息的发送
创建生产者实例
创建生产者实例的方法有很多种,其中最简单的是下面的构造方于除了topic和value外的属性,其他都置为null。
public ProducerRecord(String topic, V value);
发送消息主要有三种模式:发完即忘(fire-and-forget),同步(sync)及异步(async)。
KafkaProducer的sand()方法返回值并非是void类型,而是Future类型,send()方法有两个重载方法,具体定义如下:
public Future<RecordMetadata> send(ProducerRecord<K,V> record);
public Future<RecordMetadata> send(ProducerRecord<K,V> record, Callback callback);
-
发完即忘
它只管往Kafka中发送消息而并不关心消息是否正确到达。
在大多数情况下,这种发送方式没有什么问题,不过在某些时候(比如发生不可重试异常时),会造成消息的丢失。这种发送方式性能最高,但可靠性也最差。 -
同步发送
try {producer.send(record).get();
} catch (ExecutionException | InterruptedException e) {e.printStackTrace();
}
通过feature对象中的get()方法,来阻塞等待kafka的响应,直到发送成功,或者发生异常。
同步发送的可靠性高,但性能会差很多,因为需要阻塞等到一条消息发送完之后,才能发送下一条。
- 异步发送
producer.send(record, new Callback()) {@overridepublic void onCompletion(RecordMetadata metadata, Excetion exception){if (excetion != null) {exception.printStackTrace();} else {...}}
}
当Kafka有响应时候,就会有回调,要么发送成功,要么抛出异常。
序列化器
生产者需要用序列化器把对象转换成字节数组,才能通过网络发送给Kafka。而消费者需要用反序列器把从Kafka中收到的字节数组转换成相应的对象。
分区器
分区器的作用是为消息分配分区。
消息经过序列化后,就需要确定它发往的分区,如果消息ProducerRecord中指定了partition字段,那么就不需要分区器,因为patition代表的就是要发往的分区号。如果没有指定partition,则需要依赖分区器,根据key字段来计算partition的值。
拦截器
生产者拦截器既可以用来在消息发送前做一些准备工作,比如按照某个规则过滤不符合要求的消息、修改消息的内容等,也可以用来在发送回调逻辑前,做一些定制化的需求,比如统计类工作。
原理分析
整体架构
整个生产者客户端由两个线程协调运行,这两个线程分别为主线程和Sender线程。在主线程中由KafkaProducer创建消息,然后通过可能的拦截器、序列化器和分区器的作用后,缓存到消息收集器中(RecordAccumulator)。Sender现成负责从消息收集器中获取消息,并将其发送到kafka中。
RecordAccumulator
该收集器主要用来缓存消息,以便Sender线程可以批量发送,进而减少网络传输的资源消耗,以提高性能。
RecordAccumulator的内部为每个分区都维护了一个双端队列,队列中的内容为ProducerBatch,即Deque。消息写入缓存时候,追加到双端队列的尾部,读取消息时,从双端队列的头部读取。
Sender
Seender从RecordAccumulator中获取缓存的消息后,会进一步将原来<分区,Deque>的保存形式转变为<Node, List>的形式,其中Node表示集群的broken节点。
对于网络连接来说,生产者客户端是与具体的broken节点建立的连接,就是向具体的broken节点发送消息,而不关心消息属于哪一个分区;而对于KafkaProducer
的应用逻辑而言,我们只关注向哪个分区中发送消息,所以在这里需要做一个应用逻辑层面到网络I/O层面的转换。
在转换成<Node, List>后,Sender还会进一步封装成<Node, Request>的形式,这样就可以将Request发往各个Node了。
请求在从Sender线程发往Kafka之前还会保存到InFlightRequest中,InFlightRequest存对象的具体形式为Map<NodeId, Deque>,它的主要作用是缓存了已经发送出去但还是没有收到响应的请求。
元数据的更新
元数据是指Kafka集群的元数据,这些元数据具体记录了集群中有哪些主题,这些主题有哪些分区,每个分区的leader副本分配在哪个节点上follower副本又分配在哪些节点上等等信息。
假设我们通过如下的方式创建了一条消息ProducerRecord,
ProducerRecord<String, String> record = new ProducerRecord<>(topic, “xxx”);
这里的发送指令,我们只知道主题名称,和需要发送的内容,对其他信息却一无所知。例如要将此消息追加到指定主题的某个分区所对应的leader副本之前,首先需要知道主题的分区数量,然后计算出目标分区,还需要知道leader副本所在的broken节点的地址、端口等信息才能建立链接,这些都属于元信息。
元数据的更新是在客户端进行的,对客户端的外部使用者不可见。更新操作是由Sender线程发起的,主线程也需要读取这些信息,这里的数据同步,是通过Synchronized和final关键字来保障。