Kafka是一个分布式流处理平台,它能够以极高的吞吐量处理数据。在Kafka中,生产者负责将消息发送到Kafka集群,而消费者则负责从Kafka集群中读取消息。本文将探讨Kafka生产者消息发送流程的细节,包括消息的序列化、分区分配、记录提交等关键步骤。
先看一个生产者发送消息的代码样例
public class MyProducer1 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Map<String, Object> configs = new HashMap<>();
// 指定初始连接用到的broker地址
configs.put("bootstrap.servers", "node164:9092");
// 指定key的序列化类
configs.put("key.serializer", IntegerSerializer.class);
// 指定value的序列化类
configs.put("value.serializer", StringSerializer.class);
//borker集群消息持久化控制
configs.put("acks", "all");
//重试次数
configs.put("reties", "3");
KafkaProducer<Integer, String> producer = new KafkaProducer<Integer, String>(configs);
// 用于设置用户自定义的消息头字段
List<Header> headers = new ArrayList<>();
headers.add(new RecordHeader("biz.name", "producer.demo".getBytes()));
ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>(
"test_topic",
0,
0,
"hello world 0",
headers
);
// 消息异步确认
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) {
System.out.println("消息的主题:" + metadata.topic());
System.out.println("消息的分区号:" + metadata.partition());
System.out.println("消息的偏移量:" + metadata.offset());
} else {
System.out.println("异常消息:" + exception.getMessage());
}
}
});
// 关闭生产者
producer.close();
}
}
通过跟踪producer.send源码可知生产者发送消息的大体流程如下图,RecordAccumulator的消息发送到brokers实际上由Sender线程处理,下图暂时忽略,先看producer主线程处理的一些细节。
- KafkaProducer构造函数根据客户端参数初始化拦截器、序列化器、分区器,创建Sender守护线程。
- 调用send函数发送消息时,其内部使用异步消息发送,消息经过拦截器、序列化器、分区器后缓存到缓冲区。
- 批次发送的条件为:缓冲区数据??达到batch.size或者linger.ms达到上限。
- 缓冲区消息发送到指定分区,落盘到broker。如果发送失败,客户端将根据设置的重试参数进行重试,如果超过了重试次数,抛出异常。
- 发送成功,返回RecordMetadata元数据到客户端。如果是同步调用将阻塞等待元数据返回,如果是异步调用将通过Callback接口进行回调返回元数据
生产者拦截器
KafkaProducer调用send方法后,如果有设置拦截器,会先经过拦截器,默认是不会经过任何拦截器的,除非客户端配置了拦截器(interceptor.classes参数),send函数如下
@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
// intercept the record, which can be potentially modified; this method does not throw exceptions
ProducerRecord<K, V> interceptedRecord = this.interceptors == null ? record : this.interceptors.onSend(record);
return doSend(interceptedRecord, callback);
}
可见,拦截器列表会被首先执行,而拦截器的初始化则是在KafkaProducer的 构造函数中,部分源码如下
List<ProducerInterceptor<K, V>> interceptorList = (List) (new ProducerConfig(userProvidedConfigs, false)).getConfiguredInstances(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
ProducerInterceptor.class);
this.interceptors = interceptorList.isEmpty() ? null : new ProducerInterceptors<>(interceptorList);
可见,拦截器是通过客户端配置的ProducerConfig.INTERCEPTOR_CLASSES_CONFIG来初始化的,拦截器必须实现ProducerInterceptor接口。
public interface ProducerInterceptor<K, V> extends Configurable {
public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);
public void onAcknowledgement(RecordMetadata metadata, Exception exception);
public void close();
}
拦截器接口共有三个接口,第一个onSend接口把ProducerRecord直接传了进来,我们可以在实现接口时,对原消息进行统一处理,比如添加一些业务相关的头部信息等。onAcknowledgement接口则可以在确认消息发送成功后做一些操作,最后close接口则可以在拦截器关闭时清理一些资源。
如需要自定义拦截器则直接实现ProducerInterceptor接口,实现相关方法,在客户端进行配置即可,客户端配置示例:
// 如果有多个拦截器,则设置为多个拦截器类的全限定类名,中间用逗号隔开
configs.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, "com.xxx.CustomInterceptorOne,com.xxx.CustomInterceptorTwo");
生产者序列化器
拦截器处理完后,将进入到doSend方法,在发送消息前,首先会根据客户端配置的序列化器对key和value进行序列化。
序列化接口如下:
public interface Serializer<T> extends Closeable {
/**
* Configure this class.
* @param configs configs in key/value pairs
* @param isKey whether is for key or value
*/
void configure(Map<String, ?> configs, boolean isKey);
/**
* Convert {@code data} into a byte array.
*
* @param topic topic associated with data
* @param data typed data
* @return serialized bytes
*/
byte[] serialize(String topic, T data);
/**
* Close this serializer.
*
* This method must be idempotent as it may be called multiple times.
*/
@Override
void close();
}
在Kafka中,消息可以是任何类型的数据,如字符串、JSON、二进制数据等。为了将这些数据存储到Kafka集群中,Kafka需要对它们进行序列化。Kafka提供了多种序列化器,如StringSerializer、JsonSerializer等。生产者可以根据自己的需求选择合适的序列化器来序列化消息。如果默认提供的序列化器仍未满足需求,实现上面的Serializer接口,然后在客户端配置自己的序列化器即可。通过接口可以看出,序列化器最终将key和value序列化成字节数组。
doSend方法使用序列化器的部分源码:
byte[] serializedKey;
try {
serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
} catch (ClassCastException cce) {
throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
" to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
" specified in key.serializer", cce);
}
byte[] serializedValue;
try {
serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
} catch (ClassCastException cce) {
throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
" to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
" specified in value.serializer", cce);
}
如上源码,keySerializer和valueSerializer实际就是我们配置的序列化器,在KafkaProducer的构造函数中已经初始化。
生产者分区器
接下来是分区分配过程。Kafka采用分区机制来管理消息的存储和消费。每个主题(Topic)可以被划分为多个分区,每个分区都是一个有序的、不可变的消息序列。在发送消息时,生产者需要确定消息应该发送到哪个分区。
Kafka提供了几种分区策略,如轮询分区(RoundRobinPartitioner)、范围分区(RangePartitioner)等。轮询分区策略将消息均匀地分配到各个分区,而范围分区策略则根据消息的键或值的哈希值来分配分区。 在确定了消息的分区之后,生产者就会将消息发送到相应的分区。在发送过程中,Kafka采用了生产者ID和分区ID的组合来唯一标识每条消息。此外,为了提高消息发送的可靠性,Kafka提供了同步和异步两种发送模式。在同步模式下,生产者会等待消息被成功写入到分区中才能返回成功响应;而在异步模式下,生产者会立即返回成功响应,而不管消息是否已经被写入到分区中。
分区接口:
public interface Partitioner extends Configurable, Closeable {
/**
* Compute the partition for the given record.
*
* @param topic The topic name
* @param key The key to partition on (or null if no key)
* @param keyBytes The serialized key to partition on( or null if no key)
* @param value The value to partition on or null
* @param valueBytes The serialized value to partition on or null
* @param cluster The current cluster metadata
*/
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);
/**
* This is called when partitioner is closed.
*/
public void close();
}
通过主题、消息的key和value计算当前消息的分区,KafkaProducer分区方法部分源码如下:
private final Partitioner partitioner;
private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
Integer partition = record.partition();
return partition != null ?
partition :
partitioner.partition(
record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
}
如果在ProducerRecord对象中指定了分区则直接使用此分区,如果没指定则使用配置的Partitioner进行计算得到分区。
partitioner也可以在客户端配置中指定,如不指定,将使用DefaultPartitioner。DefaultPartitioner是一个默认的分区策略实现,用于在发送消息时决定消息应该被分配到哪个分区。
DefaultPartitioner分区核心源码如下:
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (keyBytes == null) {
int nextValue = nextValue(topic);
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() > 0) {
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {
// no partitions are available, give a non-available partition
return Utils.toPositive(nextValue) % numPartitions;
}
} else {
// hash the keyBytes to choose a partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}
分区主要策略如下:
- 基于键的哈希分区: 当消息包含键(key != null)时,DefaultPartitioner会使用键的字节表示(keyBytes)进行哈希处理。这里使用了MurmurHash2算法,这是一种快速且广泛使用的哈希函数,它能将任意长度的输入数据映射到固定长度的哈希值。 计算出的哈希值通过模运算(%)与分区总数(numPartitions)相结合,来确定消息应该发送到的分区编号。 如果哈希值是负数,Utils.toPositive方法会将其转换为正数。
- 基于轮询的无键分区: 当消息没有键(key == null)时,DefaultPartitioner会回退到使用轮询分区策略。这种情况下,它会遍历所有分区,并选择一个可用的分区来发送消息。 使用nextValue方法来生成一个基于时间或序列的值,该值用于确定轮询的顺序。 然后,通过取模运算选择一个分区,确保消息能够被发送到某个分区,即使所有分区都不可用,也会选择一个“非可用”的分区编号。
- 分区选择的一致性: 对于有键的消息,DefaultPartitioner提供了一致性的分区选择,即相同键的消息总是被发送到同一个分区,这对于确保消息顺序和一致性至关重要。 对于无键的消息,由于使用了轮询策略,相同消息可能会被发送到不同的分区,这在某些应用场景中可能不是最佳选择。
- 集群状态的考虑: 分区策略还考虑了集群的当前状态(cluster),包括分区的数量和每个分区的健康状况。这确保了消息只能被发送到当前可用的分区。
RecordAccumulator批消息缓冲
RecordAccumulator 是 Kafka 0.8.2 版本引入的一个概念,它是 Kafka 生产者 API 中的一个关键组件,主要用于在生产者发送消息到服务器之前暂存这些消息。RecordAccumulator 的主要作用包括以下几点:
- 批量发送优化: RecordAccumulator 允许生产者将多条消息累积到一起,然后作为一个批次发送给 Kafka broker。这样可以减少网络请求的次数,降低网络延迟,并提高吞吐量。RecoderAccumulator为每个分区都维护了?个 Deque<ProducerBatch> 类型的双端 队列。
- 流量控制: RecordAccumulator 通过控制累积的消息数量或等待的时间,实现对生产者发送速率的控制。这有助于防止生产者在网络拥塞时发送过多的消息,从而导致消息丢失或重试次数过多。
- 消息重试机制: 当 Kafka broker 由于某些原因(如网络问题)无法成功处理生产者发送的消息时,RecordAccumulator 会保存这些消息,并在后续的发送机会中重试。这确保了消息不会因为临时的网络问题而丢失。
- 顺序保证: RecordAccumulator 在内部维护了消息的顺序信息,确保即使在批量发送的情况下,消息在 Kafka broker 端也能按照它们被发送的顺序进行处理。
- 内存和磁盘管理: RecordAccumulator 在内存中维护消息的累积队列,当达到一定的阈值时,会将消息序列化并写入本地磁盘,以避免内存溢出。当消息成功发送到 Kafka broker 后,相应的磁盘文件会被删除。
- 死信队列(DLQ)支持: 在消息发送失败后,RecordAccumulator 可以配置为将消息发送到死信队列,以便后续的处理。
- 幂等性支持: RecordAccumulator 配合 Kafka 的 producer id 和 sequence number 机制,确保即使在网络不稳定的情况下,相同的消息不会被重复发送到 Kafka broker。
RecordAccumulator部分源码
private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches;
public RecordAppendResult append(TopicPartition tp,
long timestamp,
byte[] key,
byte[] value,
Header[] headers,
Callback callback,
long maxTimeToBlock) throws InterruptedException {
// We keep track of the number of appending thread to make sure we do not miss batches in
// abortIncompleteBatches().
appendsInProgress.incrementAndGet();
ByteBuffer buffer = null;
if (headers == null) headers = Record.EMPTY_HEADERS;
try {
// check if we have an in-progress batch
Deque<ProducerBatch> dq = getOrCreateDeque(tp);
synchronized (dq) {
if (closed)
throw new IllegalStateException("Cannot send after the producer is closed.");
RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
if (appendResult != null)
return appendResult;
}
// we don't have an in-progress record batch try to allocate a new batch
byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
buffer = free.allocate(size, maxTimeToBlock);
synchronized (dq) {
// Need to check if producer is closed again after grabbing the dequeue lock.
if (closed)
throw new IllegalStateException("Cannot send after the producer is closed.");
RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
if (appendResult != null) {
// Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
return appendResult;
}
MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);
ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds());
FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, headers, callback, time.milliseconds()));
dq.addLast(batch);
incomplete.add(batch);
// Don't deallocate this buffer in the finally block as it's being used in the record batch
buffer = null;
return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true);
}
} finally {
if (buffer != null)
free.deallocate(buffer);
appendsInProgress.decrementAndGet();
}
}
//Get the deque for the given topic-partition, creating it if necessary.
private Deque<ProducerBatch> getOrCreateDeque(TopicPartition tp) {
Deque<ProducerBatch> d = this.batches.get(tp);
if (d != null)
return d;
d = new ArrayDeque<>();
Deque<ProducerBatch> previous = this.batches.putIfAbsent(tp, d);
if (previous == null)
return d;
else
return previous;
}
源码说明:
- ProducerBatch 可以理解为是 ProducerRecord 的集合,批量发送有利于提升吞吐量,降低?络 影响。
- 由于?产者客户端使? java.io.ByteBuffer 在发送消息之前进?消息保存,并维护了?个 BufferPool 实现 ByteBuffer 的复?;该缓存池只针对特定??( batch.size 指定)的 ByteBuffer 进?管理,对于消息过?的缓存,不能做到重复利?。
- 每次追加?条ProducerRecord消息,会寻找/新建对应的双端队列,从其尾部获取?个 ProducerBatch,判断当前消息的??是否可以写?该批次中。若可以写?则写?;若不可以写 ?,则新建?个ProducerBatch,判断该消息??是否超过客户端参数配置 batch.size 的值,不超 过,则以 batch.size建?新的ProducerBatch,这样?便进?缓存重复利?;若超过,则以计算的 消息??建?对应的 ProducerBatch ,缺点就是该内存不能被复?了。
Sender线程作用及消息发送原理
主线程的核心流程上面已经作了介绍,下面探讨RecordAccumulator 的数据是如何被发送到Kafka集群的。
由上图可以看出,KafkaProducer由两个线程相互配合,最终将消息发送到broker中。
主线程主要负责消息创建,拦截器,序列化器,分区器等操作,并将消息追加到消息缓冲区(RecordAccumulator)。
Sender线程扮演着至关重要的角色,它是生产者客户端与Kafka集群之间通信的桥梁。Sender线程负责将生产者发送过来的消息批量传输到Kafka集群的相应分区。为了确保消息的可靠传输,Kafka采用了异步发送机制,其中Sender线程在后台默默地工作,而生产者线程则专注于消息的生成和发送请求。
Sender线程的工作原理如下:
- 线程初始化:在KafkaProducer的构造函数中,将启动一条Sender daemon后台线程
Sender类定义
public class Sender implements Runnable {..}
KafkaProducer部分源码
private final Sender sender;
//批次消息都封装在里面了
private final RecordAccumulator accumulator;
this.accumulator = new RecordAccumulator(logContext,
config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),
this.totalMemorySize,
this.compressionType,
config.getLong(ProducerConfig.LINGER_MS_CONFIG),
retryBackoffMs,
metrics,
time,
apiVersions,
transactionManager);
//注意,这里的构造函数把accumulator传进去了,sender线程后续发送消息就从accumulator中获取
this.sender = new Sender(logContext,
client,
this.metadata,
this.accumulator,
maxInflightRequests == 1,
config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
acks,
retries,
metricsRegistry.senderMetrics,
Time.SYSTEM,
this.requestTimeoutMs,
config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG),
this.transactionManager,
apiVersions);
String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;
this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
this.ioThread.start();
- 消息接收:
当生产者线程生成消息后,它会将消息发送给对应的Sender线程。
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
serializedValue, headers, interceptCallback, remainingWaitMs);
//如果追加完数据后,对应的 RecordBatch 已经达到了 batch.size 的??(或者batch的剩余空间不?以添加下?条 Record),则唤醒 sender 线程发送数据
if (result.batchIsFull || result.newBatchCreated) {
log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
this.sender.wakeup();
}
return result.future;
- 网络连接管理:
Sender线程维护与Kafka集群的网络连接,包括建立连接、心跳检测、断开连接重连等。 它会定期向Kafka集群发送心跳消息,以保持连接的活跃状态。 如果连接断开,Sender线程会尝试重连,并根据重连策略进行适当的等待。
- 消息传输:
Sender线程将缓存的消息块发送到Kafka集群的相应分区。 为了确保消息的可靠传输,Kafka提供了多种确认模式,如至少一次送达(at least once delivery)和恰好一次送达(exactly once delivery)。 在至少一次送达模式下,Sender线程会在消息成功写入分区日志后收到一个确认(ACK),然后继续发送下一批消息。如果在发送过程中发生故障,已发送的消息可能会被重新发送。 在恰好一次送达模式下,Sender线程会使用更复杂的机制来确保每个消息只被发送一次,即使在网络故障或其他异常情况下也能保证消息的一致性。
Sender run部分源码:
// main loop, runs until close is called
while (running) {
try {
run(time.milliseconds());
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e);
}
}
可见Sender线程实际处于一个类似于定时任务中,定时或由程序触发去将消息发送到broker。下面看看核心发送消息的run方法部分源码
/**
* Run a single iteration of sending
*
* @param now The current POSIX time in milliseconds
*/
void run(long now) {
// ...省略多处源码......
long pollTimeout = sendProducerData(now);
client.poll(pollTimeout, now);
}
因此核心发送消息的方法在sendProducerData方法中,接着看源码
private long sendProducerData(long now) {
Cluster cluster = metadata.fetch();
// get the list of partitions with data ready to send
RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
// if there are any partitions whose leaders are not known yet, force metadata update
if (!result.unknownLeaderTopics.isEmpty()) {
// The set of topics with unknown leader contains topics with leader election pending as well as
// topics which may have expired. Add the topic again to metadata to ensure it is included
// and request metadata update, since there are messages to send to the topic.
for (String topic : result.unknownLeaderTopics)
this.metadata.add(topic);
this.metadata.requestUpdate();
}
// remove any nodes we aren't ready to send to
Iterator<Node> iter = result.readyNodes.iterator();
long notReadyTimeout = Long.MAX_VALUE;
while (iter.hasNext()) {
Node node = iter.next();
if (!this.client.ready(node, now)) {
iter.remove();
notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now));
}
}
// create produce requests
Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes,
this.maxRequestSize, now);
if (guaranteeMessageOrder) {
// Mute all the partitions drained
for (List<ProducerBatch> batchList : batches.values()) {
for (ProducerBatch batch : batchList)
this.accumulator.mutePartition(batch.topicPartition);
}
}
List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(this.requestTimeout, now);
// Reset the producer id if an expired batch has previously been sent to the broker. Also update the metrics
// for expired batches. see the documentation of @TransactionState.resetProducerId to understand why
// we need to reset the producer id here.
if (!expiredBatches.isEmpty())
log.trace("Expired {} batches in accumulator", expiredBatches.size());
for (ProducerBatch expiredBatch : expiredBatches) {
failBatch(expiredBatch, -1, NO_TIMESTAMP, expiredBatch.timeoutException(), false);
if (transactionManager != null && expiredBatch.inRetry()) {
// This ensures that no new batches are drained until the current in flight batches are fully resolved.
transactionManager.markSequenceUnresolved(expiredBatch.topicPartition);
}
}
sensors.updateProduceRequestMetrics(batches);
// If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately
// loop and try sending more data. Otherwise, the timeout is determined by nodes that have partitions with data
// that isn't yet sendable (e.g. lingering, backing off). Note that this specifically does not include nodes
// with sendable data that aren't ready to send since they would cause busy looping.
long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
if (!result.readyNodes.isEmpty()) {
log.trace("Nodes with data ready to send: {}", result.readyNodes);
// if some partitions are already ready to be sent, the select time would be 0;
// otherwise if some partition already has some data accumulated but not ready yet,
// the select time will be the time difference between now and its linger expiry time;
// otherwise the select time will be the time difference between now and the metadata expiry time;
pollTimeout = 0;
}
sendProduceRequests(batches, now);
return pollTimeout;
}
这个方法主要是从accumulator中找出需要发送的消息,封装到produce requests,然后发送到broker
// create produce requests
Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes,
this.maxRequestSize, now);
....
sendProduceRequests(batches, now);
最终的发送调用如下
private void sendProduceRequest(long now, int destination, short acks, int timeout, List<ProducerBatch> batches) {
if (batches.isEmpty())
return;
Map<TopicPartition, MemoryRecords> produceRecordsByPartition = new HashMap<>(batches.size());
final Map<TopicPartition, ProducerBatch> recordsByPartition = new HashMap<>(batches.size());
// find the minimum magic version used when creating the record sets
byte minUsedMagic = apiVersions.maxUsableProduceMagic();
for (ProducerBatch batch : batches) {
if (batch.magic() < minUsedMagic)
minUsedMagic = batch.magic();
}
for (ProducerBatch batch : batches) {
TopicPartition tp = batch.topicPartition;
MemoryRecords records = batch.records();
// down convert if necessary to the minimum magic used. In general, there can be a delay between the time
// that the producer starts building the batch and the time that we send the request, and we may have
// chosen the message format based on out-dated metadata. In the worst case, we optimistically chose to use
// the new message format, but found that the broker didn't support it, so we need to down-convert on the
// client before sending. This is intended to handle edge cases around cluster upgrades where brokers may
// not all support the same message format version. For example, if a partition migrates from a broker
// which is supporting the new magic version to one which doesn't, then we will need to convert.
if (!records.hasMatchingMagic(minUsedMagic))
records = batch.records().downConvert(minUsedMagic, 0, time).records();
produceRecordsByPartition.put(tp, records);
recordsByPartition.put(tp, batch);
}
String transactionalId = null;
if (transactionManager != null && transactionManager.isTransactional()) {
transactionalId = transactionManager.transactionalId();
}
ProduceRequest.Builder requestBuilder = ProduceRequest.Builder.forMagic(minUsedMagic, acks, timeout,
produceRecordsByPartition, transactionalId);
RequestCompletionHandler callback = new RequestCompletionHandler() {
public void onComplete(ClientResponse response) {
handleProduceResponse(response, recordsByPartition, time.milliseconds());
}
};
String nodeId = Integer.toString(destination);
ClientRequest clientRequest = client.newClientRequest(nodeId, requestBuilder, now, acks != 0, callback);
client.send(clientRequest, now);
log.trace("Sent produce request to {}: {}", nodeId, requestBuilder);
}
在开始的send方法中,我们曾看到如下的代码
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
serializedValue, headers, interceptCallback, remainingWaitMs);
if (result.batchIsFull || result.newBatchCreated) {
log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
this.sender.wakeup();
}
如果追加完数据后,对应的 RecordBatch 已经达到了 batch.size 的??(或者batch的剩余空间不?以添加下?条 Record),则唤醒 sender 线程发送数据。看到这里我们就明白了,这里是将可能处于IO阻塞的sener线程唤醒继续执行消息发送操作,真正的消息发送是Sender线程定时执行处理的。
- 消息确认: 当消息成功发送到Kafka集群并被写入分区日志后,Kafka集群会向Sender线程发送一个确认消息。 Sender线程收到确认后,会更新本地缓存的消息状态,并通知生产者线程消息已成功发送。
- 错误处理: 如果在发送过程中遇到错误,Sender线程会根据错误类型和重试策略进行相应的处理。 对于可恢复的错误,如短暂的网络故障,Sender线程会进行重试。 对于不可恢复的错误,如分区不可达,Sender线程可能会将消息发送到备用分区或记录到错误日志中。
- 资源管理: Sender线程会根据当前的网络状况和集群负载动态调整发送策略,如调整发送频率和重试间隔。 它还会管理本地资源,如内存和网络连接,以确保高效稳定地运行。
- 通过以上工作原理,我们可以看到Sender线程在Kafka中的核心地位。它不仅负责消息的传输,还管理着与Kafka集群的连接,确保消息的可靠性和高效传输。而生产者线程则专注于消息的生成和发送请求,与Sender线程紧密配合,共同完成Kafka的数据生产任务。
写在最后:kafka消息发送细节还是挺多,上面只是粗略探讨,如需了解更多细节,深入源码探索吧。