深入了解kafka系列-生产者

基本概念

整个Kafka体系结构中引入了以下3个术语。

(1)Producer:生产者,也就是发送消息的一方。生产者负责创建消息,然后将其投递到Kafka中。

(2)Consumer:消费者,也就是接收消息的一方。消费者连接到Kafka上并接收消息,进而进行相应的业务逻辑处理。

(3)Broker:服务代理节点。对于Kafka而言,Broker可以简单地看作一个独立的Kafka服务节点或Kafka服务实例。大多数情况下也可以将Broker看作一台Kafka服务器,前提是这台服务器上只部署了一个Kafka实例。一个或多个Broker组成了一个Kafka集群。一般而言,我们更习惯使用首字母小写的broker来表示服务代理节点。

在Kafka中还有两个特别重要的概念—主题(Topic)与分区(Partition)。Kafka中的消息以主题为单位进行归类,生产者负责将消息发送到特定的主题(发送到Kafka集群中的每一条消息都要指定一个主题),而消费者负责订阅主题并进行消费。

主题是一个逻辑上的概念,它还可以细分为多个分区,一个分区只属于单个主题,很多时候也会把分区称为主题分区(Topic-Partition)。同一主题下的不同分区包含的消息是不同的,分区在存储层面可以看作一个可追加的日志(Log)文件,消息在被追加到分区日志文件的时候都会分配一个特定的偏移量(offset)。offset是消息在分区中的唯一标识,Kafka通过它来保证消息在分区内的顺序性,不过offset并不跨越分区,也就是说,Kafka保证的是分区有序而不是主题有序。如图所示,主题中有 4 个分区,消息被顺序追加到每个分区日志文件的尾部。Kafka中的分区可以分布在不同的服务器(broker)上,也就是说,一个主题可以横跨多个broker,以此来提供比单个broker更强大的性能。

生产者

消息在真正发往Kafka之前,有可能需要经历拦截器(Interceptor)、序列化器(Serializer)和分区器(Partitioner)等一系列的作用,那么在此之后又会发生什么呢?下面我们来看一下生产者客户端的整体架构.

producer 流程:

整个生产者客户端由两个线程协调运行,这两个线程分别为主线程和Sender线程(发送线程)。

在主线程中由KafkaProducer创建消息,然后通过可能的拦截器、序列化器和分区器的作用之后缓存到消息累加器(RecordAccumulator,也称为消息收集器)中。

Sender 线程负责从RecordAccumulator中获取消息并将其发送到Kafka中。

主线程

生产者拦截器 可以用来在消息发送前做一些准备工作,比如按照某个规则过滤不符合要求的消息、修改消息的内容等,也可以用来在发送回调逻辑前做一些定制化的需求,比如统计类工作。生产者拦截器的使用也很方便,主要是自定义实现 org.apache.kafka.clients.producer.ProducerInterceptor接口。ProducerInterceptor接口中包含3个方法

KafkaProducer() 在将消息序列化和计算分区之前会调用生产者拦截器的onSend()方法来对消息进行相应的定制化操作。

KafkaProducer() 在消息被应答(Acknowledgement)之前或消息发送失败时调用生产者拦截器的onAcknowledgement()方法,优先于用户设定的 Callback 之前执行。

close() 主要用于在关闭拦截器时执行一些资源的清理工作。在这 3 个方法中抛出的异常都会被捕获并记录到日志中,但并不会再向上传递。

序列化器 生产者需要用序列化器(Serializer) 把对象转换成字节数组才能通过网络发送给Kafka,消费者需要用反序列化器(Deserializer)把从 Kafka 中收到的字节数组转换成相应的对象,Serializer接口中包含3个方法

configure() 方法用来配置当前类

serialize() 方法用来执行序列化操作

close() 方法用来关闭当前的序列化器,一般情况下 close()是一个空方法,如果实现了此方法,则必须确保此方法的幂等性,因为这个方法很可能会被KafkaProducer调用多次。

分区器 分区器的作用就是为消息分配分区。序列化器是必需的。消息经过序列化之后就需要确定它发往的分区,如果消息ProducerRecord中指定了partition字段,那么就不需要分区器的作用,因为partition代表的就是所要发往的分区号。

消息累加器 RecordAccumulator

RecordAccumulator 主要用来缓存消息以便 Sender 线程可以批量发送,进而减少网络传输的资源消耗以提升性能。RecordAccumulator 缓存的大小可以通过生产者客户端参数buffer.memory 配置,默认值为 32MB。
如果生产者发送消息的速度超过发送到服务器的速度,则会导致生产者空间不足,这个时候KafkaProducer的send()方法调用要么被阻塞,要么抛出异常,这个取决于参数max.block.ms的配置,此参数的默认值为60000,即60秒。

主线程中发送过来的消息都会被追加到RecordAccumulator的某个双端队列(Deque)中,在RecordAccumulator 的内部为每个分区都维护了一个双端队列,队列中的内容就是ProducerBatch,即 Deque<ProducerBatch>。消息写入缓存时,追加到双端队列的尾部;Sender读取消息时,从双端队列的头部读取。注意ProducerBatch不是ProducerRecord,ProducerBatch中可以包含一至多个 ProducerRecord。

通俗地说,ProducerRecord 是生产者中创建的消息,而ProducerBatch是指一个消息批次,ProducerRecord会被包含在ProducerBatch中,这样可以使字节的使用更加紧凑。与此同时,将较小的ProducerRecord拼凑成一个较大的ProducerBatch,也可以减少网络请求的次数以提升整体的吞吐量

sender 线程

Sender 从 RecordAccumulator 中获取缓存的消息之后,会进一步将原本<分区,Deque<ProducerBatch>>的保存形式转变成<Node,List< ProducerBatch>的形式,其中Node表示Kafka集群的broker节点,Sender 还会进一步封装成<Node,Request>的形式,这样就可以将Request请求发往各个Node了,这里的Request是指Kafka的各种协议请求,请求在从Sender线程发往Kafka之前还会保存到InFlightRequests中,InFlightRequests保存对象的具体形式为Map<NodeId,Deque<Request>>
它的主要作用是缓存了已经发出去但还没有收到响应的请求(NodeId 是一个 String 类型,表示节点的 id 编号)。

原创不易,如果觉得有点用的话,请毫不留情点个赞,转发一下,这将是我持续输出优质文章的最强动力。

请我吃🍗