百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 编程字典 > 正文

Kafka生产者消息发送流程原理及源码分析

toyiye 2024-06-21 19:17 11 浏览 0 评论

Kafka是一个分布式流处理平台,它能够以极高的吞吐量处理数据。在Kafka中,生产者负责将消息发送到Kafka集群,而消费者则负责从Kafka集群中读取消息。本文将探讨Kafka生产者消息发送流程的细节,包括消息的序列化、分区分配、记录提交等关键步骤。

先看一个生产者发送消息的代码样例

public class MyProducer1 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {

        Map<String, Object> configs = new HashMap<>();
        // 指定初始连接用到的broker地址
        configs.put("bootstrap.servers", "node164:9092");
        // 指定key的序列化类
        configs.put("key.serializer", IntegerSerializer.class);
        // 指定value的序列化类
        configs.put("value.serializer", StringSerializer.class);
        //borker集群消息持久化控制
        configs.put("acks", "all");
        //重试次数
        configs.put("reties", "3");
        KafkaProducer<Integer, String> producer = new KafkaProducer<Integer, String>(configs);
        
        // 用于设置用户自定义的消息头字段
        List<Header> headers = new ArrayList<>();
        headers.add(new RecordHeader("biz.name", "producer.demo".getBytes()));

        ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>(
                "test_topic",
                0,
                0,
                "hello world 0",
                headers
        );

        // 消息异步确认
        producer.send(record, new Callback() {
            @Override
            public void onCompletion(RecordMetadata metadata, Exception exception) {
                if (exception == null) {
                    System.out.println("消息的主题:" + metadata.topic());
                    System.out.println("消息的分区号:" + metadata.partition());
                    System.out.println("消息的偏移量:" + metadata.offset());
                } else {
                    System.out.println("异常消息:" + exception.getMessage());
                }
            }
        });

        // 关闭生产者
        producer.close();
    }
}

通过跟踪producer.send源码可知生产者发送消息的大体流程如下图,RecordAccumulator的消息发送到brokers实际上由Sender线程处理,下图暂时忽略,先看producer主线程处理的一些细节。


  • KafkaProducer构造函数根据客户端参数初始化拦截器、序列化器、分区器,创建Sender守护线程。
  • 调用send函数发送消息时,其内部使用异步消息发送,消息经过拦截器、序列化器、分区器后缓存到缓冲区。
  • 批次发送的条件为:缓冲区数据??达到batch.size或者linger.ms达到上限。
  • 缓冲区消息发送到指定分区,落盘到broker。如果发送失败,客户端将根据设置的重试参数进行重试,如果超过了重试次数,抛出异常。
  • 发送成功,返回RecordMetadata元数据到客户端。如果是同步调用将阻塞等待元数据返回,如果是异步调用将通过Callback接口进行回调返回元数据

生产者拦截器

KafkaProducer调用send方法后,如果有设置拦截器,会先经过拦截器,默认是不会经过任何拦截器的,除非客户端配置了拦截器(interceptor.classes参数),send函数如下

    @Override
    public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
        // intercept the record, which can be potentially modified; this method does not throw exceptions
        ProducerRecord<K, V> interceptedRecord = this.interceptors == null ? record : this.interceptors.onSend(record);
        return doSend(interceptedRecord, callback);
    }

可见,拦截器列表会被首先执行,而拦截器的初始化则是在KafkaProducer的 构造函数中,部分源码如下

List<ProducerInterceptor<K, V>> interceptorList = (List) (new ProducerConfig(userProvidedConfigs, false)).getConfiguredInstances(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
                    ProducerInterceptor.class);
this.interceptors = interceptorList.isEmpty() ? null : new ProducerInterceptors<>(interceptorList);

可见,拦截器是通过客户端配置的ProducerConfig.INTERCEPTOR_CLASSES_CONFIG来初始化的,拦截器必须实现ProducerInterceptor接口。

public interface ProducerInterceptor<K, V> extends Configurable {

    public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);


    public void onAcknowledgement(RecordMetadata metadata, Exception exception);


    public void close();
}

拦截器接口共有三个接口,第一个onSend接口把ProducerRecord直接传了进来,我们可以在实现接口时,对原消息进行统一处理,比如添加一些业务相关的头部信息等。onAcknowledgement接口则可以在确认消息发送成功后做一些操作,最后close接口则可以在拦截器关闭时清理一些资源。

如需要自定义拦截器则直接实现ProducerInterceptor接口,实现相关方法,在客户端进行配置即可,客户端配置示例:

 // 如果有多个拦截器,则设置为多个拦截器类的全限定类名,中间用逗号隔开
  configs.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, "com.xxx.CustomInterceptorOne,com.xxx.CustomInterceptorTwo");

生产者序列化器

拦截器处理完后,将进入到doSend方法,在发送消息前,首先会根据客户端配置的序列化器对key和value进行序列化。

序列化接口如下:

public interface Serializer<T> extends Closeable {

    /**
     * Configure this class.
     * @param configs configs in key/value pairs
     * @param isKey whether is for key or value
     */
    void configure(Map<String, ?> configs, boolean isKey);

    /**
     * Convert {@code data} into a byte array.
     *
     * @param topic topic associated with data
     * @param data typed data
     * @return serialized bytes
     */
    byte[] serialize(String topic, T data);

    /**
     * Close this serializer.
     *
     * This method must be idempotent as it may be called multiple times.
     */
    @Override
    void close();
}

在Kafka中,消息可以是任何类型的数据,如字符串、JSON、二进制数据等。为了将这些数据存储到Kafka集群中,Kafka需要对它们进行序列化。Kafka提供了多种序列化器,如StringSerializer、JsonSerializer等。生产者可以根据自己的需求选择合适的序列化器来序列化消息。如果默认提供的序列化器仍未满足需求,实现上面的Serializer接口,然后在客户端配置自己的序列化器即可。通过接口可以看出,序列化器最终将key和value序列化成字节数组。

doSend方法使用序列化器的部分源码:

byte[] serializedKey;
            try {
                serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
            } catch (ClassCastException cce) {
                throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
                        " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
                        " specified in key.serializer", cce);
            }
            byte[] serializedValue;
            try {
                serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
            } catch (ClassCastException cce) {
                throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
                        " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
                        " specified in value.serializer", cce);
            }

如上源码,keySerializervalueSerializer实际就是我们配置的序列化器,在KafkaProducer的构造函数中已经初始化。

生产者分区器

接下来是分区分配过程。Kafka采用分区机制来管理消息的存储和消费。每个主题(Topic)可以被划分为多个分区,每个分区都是一个有序的、不可变的消息序列。在发送消息时,生产者需要确定消息应该发送到哪个分区。

Kafka提供了几种分区策略,如轮询分区(RoundRobinPartitioner)、范围分区(RangePartitioner)等。轮询分区策略将消息均匀地分配到各个分区,而范围分区策略则根据消息的键或值的哈希值来分配分区。 在确定了消息的分区之后,生产者就会将消息发送到相应的分区。在发送过程中,Kafka采用了生产者ID和分区ID的组合来唯一标识每条消息。此外,为了提高消息发送的可靠性,Kafka提供了同步和异步两种发送模式。在同步模式下,生产者会等待消息被成功写入到分区中才能返回成功响应;而在异步模式下,生产者会立即返回成功响应,而不管消息是否已经被写入到分区中。

分区接口:

public interface Partitioner extends Configurable, Closeable {

    /**
     * Compute the partition for the given record.
     *
     * @param topic The topic name
     * @param key The key to partition on (or null if no key)
     * @param keyBytes The serialized key to partition on( or null if no key)
     * @param value The value to partition on or null
     * @param valueBytes The serialized value to partition on or null
     * @param cluster The current cluster metadata
     */
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);

    /**
     * This is called when partitioner is closed.
     */
    public void close();

}

通过主题、消息的key和value计算当前消息的分区,KafkaProducer分区方法部分源码如下:

private final Partitioner partitioner;   


private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
        Integer partition = record.partition();
        return partition != null ?
                partition :
                partitioner.partition(
                        record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
    }

如果在ProducerRecord对象中指定了分区则直接使用此分区,如果没指定则使用配置的Partitioner进行计算得到分区。

partitioner也可以在客户端配置中指定,如不指定,将使用DefaultPartitionerDefaultPartitioner是一个默认的分区策略实现,用于在发送消息时决定消息应该被分配到哪个分区。

DefaultPartitioner分区核心源码如下:

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        if (keyBytes == null) {
            int nextValue = nextValue(topic);
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
            if (availablePartitions.size() > 0) {
                int part = Utils.toPositive(nextValue) % availablePartitions.size();
                return availablePartitions.get(part).partition();
            } else {
                // no partitions are available, give a non-available partition
                return Utils.toPositive(nextValue) % numPartitions;
            }
        } else {
            // hash the keyBytes to choose a partition
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }
}

分区主要策略如下:

  1. 基于键的哈希分区: 当消息包含键(key != null)时,DefaultPartitioner会使用键的字节表示(keyBytes)进行哈希处理。这里使用了MurmurHash2算法,这是一种快速且广泛使用的哈希函数,它能将任意长度的输入数据映射到固定长度的哈希值。 计算出的哈希值通过模运算(%)与分区总数(numPartitions)相结合,来确定消息应该发送到的分区编号。 如果哈希值是负数,Utils.toPositive方法会将其转换为正数。
  2. 基于轮询的无键分区: 当消息没有键(key == null)时,DefaultPartitioner会回退到使用轮询分区策略。这种情况下,它会遍历所有分区,并选择一个可用的分区来发送消息。 使用nextValue方法来生成一个基于时间或序列的值,该值用于确定轮询的顺序。 然后,通过取模运算选择一个分区,确保消息能够被发送到某个分区,即使所有分区都不可用,也会选择一个“非可用”的分区编号。
  3. 分区选择的一致性: 对于有键的消息,DefaultPartitioner提供了一致性的分区选择,即相同键的消息总是被发送到同一个分区,这对于确保消息顺序和一致性至关重要。 对于无键的消息,由于使用了轮询策略,相同消息可能会被发送到不同的分区,这在某些应用场景中可能不是最佳选择。
  4. 集群状态的考虑: 分区策略还考虑了集群的当前状态(cluster),包括分区的数量和每个分区的健康状况。这确保了消息只能被发送到当前可用的分区。

RecordAccumulator批消息缓冲

RecordAccumulator 是 Kafka 0.8.2 版本引入的一个概念,它是 Kafka 生产者 API 中的一个关键组件,主要用于在生产者发送消息到服务器之前暂存这些消息。RecordAccumulator 的主要作用包括以下几点:

  1. 批量发送优化RecordAccumulator 允许生产者将多条消息累积到一起,然后作为一个批次发送给 Kafka broker。这样可以减少网络请求的次数,降低网络延迟,并提高吞吐量。RecoderAccumulator为每个分区都维护了?个 Deque<ProducerBatch> 类型的双端 队列。
  2. 流量控制: RecordAccumulator 通过控制累积的消息数量或等待的时间,实现对生产者发送速率的控制。这有助于防止生产者在网络拥塞时发送过多的消息,从而导致消息丢失或重试次数过多。
  3. 消息重试机制: 当 Kafka broker 由于某些原因(如网络问题)无法成功处理生产者发送的消息时,RecordAccumulator 会保存这些消息,并在后续的发送机会中重试。这确保了消息不会因为临时的网络问题而丢失。
  4. 顺序保证: RecordAccumulator 在内部维护了消息的顺序信息,确保即使在批量发送的情况下,消息在 Kafka broker 端也能按照它们被发送的顺序进行处理。
  5. 内存和磁盘管理: RecordAccumulator 在内存中维护消息的累积队列,当达到一定的阈值时,会将消息序列化并写入本地磁盘,以避免内存溢出。当消息成功发送到 Kafka broker 后,相应的磁盘文件会被删除。
  6. 死信队列(DLQ)支持: 在消息发送失败后,RecordAccumulator 可以配置为将消息发送到死信队列,以便后续的处理。
  7. 幂等性支持: RecordAccumulator 配合 Kafka 的 producer id 和 sequence number 机制,确保即使在网络不稳定的情况下,相同的消息不会被重复发送到 Kafka broker。

RecordAccumulator部分源码

private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches;


public RecordAppendResult append(TopicPartition tp,
                                     long timestamp,
                                     byte[] key,
                                     byte[] value,
                                     Header[] headers,
                                     Callback callback,
                                     long maxTimeToBlock) throws InterruptedException {
        // We keep track of the number of appending thread to make sure we do not miss batches in
        // abortIncompleteBatches().
        appendsInProgress.incrementAndGet();
        ByteBuffer buffer = null;
        if (headers == null) headers = Record.EMPTY_HEADERS;
        try {
            // check if we have an in-progress batch
            Deque<ProducerBatch> dq = getOrCreateDeque(tp);
            synchronized (dq) {
                if (closed)
                    throw new IllegalStateException("Cannot send after the producer is closed.");
                RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
                if (appendResult != null)
                    return appendResult;
            }

            // we don't have an in-progress record batch try to allocate a new batch
            byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
            int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
            log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
            buffer = free.allocate(size, maxTimeToBlock);
            synchronized (dq) {
                // Need to check if producer is closed again after grabbing the dequeue lock.
                if (closed)
                    throw new IllegalStateException("Cannot send after the producer is closed.");

                RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
                if (appendResult != null) {
                    // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
                    return appendResult;
                }

                MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);
                ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds());
                FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, headers, callback, time.milliseconds()));

                dq.addLast(batch);
                incomplete.add(batch);

                // Don't deallocate this buffer in the finally block as it's being used in the record batch
                buffer = null;

                return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true);
            }
        } finally {
            if (buffer != null)
                free.deallocate(buffer);
            appendsInProgress.decrementAndGet();
        }
}

//Get the deque for the given topic-partition, creating it if necessary.
 private Deque<ProducerBatch> getOrCreateDeque(TopicPartition tp) {
        Deque<ProducerBatch> d = this.batches.get(tp);
        if (d != null)
            return d;
        d = new ArrayDeque<>();
        Deque<ProducerBatch> previous = this.batches.putIfAbsent(tp, d);
        if (previous == null)
            return d;
        else
            return previous;
}

源码说明:

  • ProducerBatch 可以理解为是 ProducerRecord 的集合,批量发送有利于提升吞吐量,降低?络 影响。
  • 由于?产者客户端使? java.io.ByteBuffer 在发送消息之前进?消息保存,并维护了?个 BufferPool 实现 ByteBuffer 的复?;该缓存池只针对特定??( batch.size 指定)的 ByteBuffer 进?管理,对于消息过?的缓存,不能做到重复利?。
  • 每次追加?条ProducerRecord消息,会寻找/新建对应的双端队列,从其尾部获取?个 ProducerBatch,判断当前消息的??是否可以写?该批次中。若可以写?则写?;若不可以写 ?,则新建?个ProducerBatch,判断该消息??是否超过客户端参数配置 batch.size 的值,不超 过,则以 batch.size建?新的ProducerBatch,这样?便进?缓存重复利?;若超过,则以计算的 消息??建?对应的 ProducerBatch ,缺点就是该内存不能被复?了。

Sender线程作用及消息发送原理

主线程的核心流程上面已经作了介绍,下面探讨RecordAccumulator 的数据是如何被发送到Kafka集群的。



由上图可以看出,KafkaProducer由两个线程相互配合,最终将消息发送到broker中。

主线程主要负责消息创建,拦截器,序列化器,分区器等操作,并将消息追加到消息缓冲区(RecordAccumulator)。

Sender线程扮演着至关重要的角色,它是生产者客户端与Kafka集群之间通信的桥梁。Sender线程负责将生产者发送过来的消息批量传输到Kafka集群的相应分区。为了确保消息的可靠传输,Kafka采用了异步发送机制,其中Sender线程在后台默默地工作,而生产者线程则专注于消息的生成和发送请求。

Sender线程的工作原理如下:

  • 线程初始化:在KafkaProducer的构造函数中,将启动一条Sender daemon后台线程

Sender类定义

public class Sender implements Runnable {..}

KafkaProducer部分源码

private final Sender sender;
//批次消息都封装在里面了
private final RecordAccumulator accumulator;

this.accumulator = new RecordAccumulator(logContext,
                                         config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),
                                         this.totalMemorySize,
                                         this.compressionType,
                                         config.getLong(ProducerConfig.LINGER_MS_CONFIG),
                                         retryBackoffMs,
                                         metrics,
                                         time,
                                         apiVersions,
                                         transactionManager);
//注意,这里的构造函数把accumulator传进去了,sender线程后续发送消息就从accumulator中获取
this.sender = new Sender(logContext,
                         client,
                         this.metadata,
                         this.accumulator,
                         maxInflightRequests == 1,
                         config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
                         acks,
                         retries,
                         metricsRegistry.senderMetrics,
                         Time.SYSTEM,
                         this.requestTimeoutMs,
                         config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG),
                         this.transactionManager,
                         apiVersions);
String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;
this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
this.ioThread.start();
  • 消息接收:

当生产者线程生成消息后,它会将消息发送给对应的Sender线程。

            RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
                    serializedValue, headers, interceptCallback, remainingWaitMs);
  //如果追加完数据后,对应的 RecordBatch 已经达到了 batch.size 的??(或者batch的剩余空间不?以添加下?条 Record),则唤醒 sender 线程发送数据
            if (result.batchIsFull || result.newBatchCreated) {
                log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
                this.sender.wakeup();
            }
            return result.future;
  • 网络连接管理:

Sender线程维护与Kafka集群的网络连接,包括建立连接、心跳检测、断开连接重连等。 它会定期向Kafka集群发送心跳消息,以保持连接的活跃状态。 如果连接断开,Sender线程会尝试重连,并根据重连策略进行适当的等待。

  • 消息传输:

Sender线程将缓存的消息块发送到Kafka集群的相应分区。 为了确保消息的可靠传输,Kafka提供了多种确认模式,如至少一次送达(at least once delivery)和恰好一次送达(exactly once delivery)。 在至少一次送达模式下,Sender线程会在消息成功写入分区日志后收到一个确认(ACK),然后继续发送下一批消息。如果在发送过程中发生故障,已发送的消息可能会被重新发送。 在恰好一次送达模式下,Sender线程会使用更复杂的机制来确保每个消息只被发送一次,即使在网络故障或其他异常情况下也能保证消息的一致性。

Sender run部分源码:

// main loop, runs until close is called
while (running) {
    try {
        run(time.milliseconds());
    } catch (Exception e) {
        log.error("Uncaught error in kafka producer I/O thread: ", e);
    }
}

可见Sender线程实际处于一个类似于定时任务中,定时或由程序触发去将消息发送到broker。下面看看核心发送消息的run方法部分源码

    /**
     * Run a single iteration of sending
     *
     * @param now The current POSIX time in milliseconds
     */
    void run(long now) {
  // ...省略多处源码......
        long pollTimeout = sendProducerData(now);
        client.poll(pollTimeout, now);
    }

因此核心发送消息的方法在sendProducerData方法中,接着看源码

    private long sendProducerData(long now) {
        Cluster cluster = metadata.fetch();

        // get the list of partitions with data ready to send
        RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);

        // if there are any partitions whose leaders are not known yet, force metadata update
        if (!result.unknownLeaderTopics.isEmpty()) {
            // The set of topics with unknown leader contains topics with leader election pending as well as
            // topics which may have expired. Add the topic again to metadata to ensure it is included
            // and request metadata update, since there are messages to send to the topic.
            for (String topic : result.unknownLeaderTopics)
                this.metadata.add(topic);
            this.metadata.requestUpdate();
        }

        // remove any nodes we aren't ready to send to
        Iterator<Node> iter = result.readyNodes.iterator();
        long notReadyTimeout = Long.MAX_VALUE;
        while (iter.hasNext()) {
            Node node = iter.next();
            if (!this.client.ready(node, now)) {
                iter.remove();
                notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now));
            }
        }

        // create produce requests
        Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes,
                this.maxRequestSize, now);
        if (guaranteeMessageOrder) {
            // Mute all the partitions drained
            for (List<ProducerBatch> batchList : batches.values()) {
                for (ProducerBatch batch : batchList)
                    this.accumulator.mutePartition(batch.topicPartition);
            }
        }

        List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(this.requestTimeout, now);
        // Reset the producer id if an expired batch has previously been sent to the broker. Also update the metrics
        // for expired batches. see the documentation of @TransactionState.resetProducerId to understand why
        // we need to reset the producer id here.
        if (!expiredBatches.isEmpty())
            log.trace("Expired {} batches in accumulator", expiredBatches.size());
        for (ProducerBatch expiredBatch : expiredBatches) {
            failBatch(expiredBatch, -1, NO_TIMESTAMP, expiredBatch.timeoutException(), false);
            if (transactionManager != null && expiredBatch.inRetry()) {
                // This ensures that no new batches are drained until the current in flight batches are fully resolved.
                transactionManager.markSequenceUnresolved(expiredBatch.topicPartition);
            }
        }

        sensors.updateProduceRequestMetrics(batches);

        // If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately
        // loop and try sending more data. Otherwise, the timeout is determined by nodes that have partitions with data
        // that isn't yet sendable (e.g. lingering, backing off). Note that this specifically does not include nodes
        // with sendable data that aren't ready to send since they would cause busy looping.
        long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
        if (!result.readyNodes.isEmpty()) {
            log.trace("Nodes with data ready to send: {}", result.readyNodes);
            // if some partitions are already ready to be sent, the select time would be 0;
            // otherwise if some partition already has some data accumulated but not ready yet,
            // the select time will be the time difference between now and its linger expiry time;
            // otherwise the select time will be the time difference between now and the metadata expiry time;
            pollTimeout = 0;
        }
        sendProduceRequests(batches, now);

        return pollTimeout;
    }

这个方法主要是从accumulator中找出需要发送的消息,封装到produce requests,然后发送到broker

// create produce requests
Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes,
this.maxRequestSize, now);
....
sendProduceRequests(batches, now);

最终的发送调用如下

    private void sendProduceRequest(long now, int destination, short acks, int timeout, List<ProducerBatch> batches) {
        if (batches.isEmpty())
            return;

        Map<TopicPartition, MemoryRecords> produceRecordsByPartition = new HashMap<>(batches.size());
        final Map<TopicPartition, ProducerBatch> recordsByPartition = new HashMap<>(batches.size());

        // find the minimum magic version used when creating the record sets
        byte minUsedMagic = apiVersions.maxUsableProduceMagic();
        for (ProducerBatch batch : batches) {
            if (batch.magic() < minUsedMagic)
                minUsedMagic = batch.magic();
        }

        for (ProducerBatch batch : batches) {
            TopicPartition tp = batch.topicPartition;
            MemoryRecords records = batch.records();

            // down convert if necessary to the minimum magic used. In general, there can be a delay between the time
            // that the producer starts building the batch and the time that we send the request, and we may have
            // chosen the message format based on out-dated metadata. In the worst case, we optimistically chose to use
            // the new message format, but found that the broker didn't support it, so we need to down-convert on the
            // client before sending. This is intended to handle edge cases around cluster upgrades where brokers may
            // not all support the same message format version. For example, if a partition migrates from a broker
            // which is supporting the new magic version to one which doesn't, then we will need to convert.
            if (!records.hasMatchingMagic(minUsedMagic))
                records = batch.records().downConvert(minUsedMagic, 0, time).records();
            produceRecordsByPartition.put(tp, records);
            recordsByPartition.put(tp, batch);
        }

        String transactionalId = null;
        if (transactionManager != null && transactionManager.isTransactional()) {
            transactionalId = transactionManager.transactionalId();
        }
        ProduceRequest.Builder requestBuilder = ProduceRequest.Builder.forMagic(minUsedMagic, acks, timeout,
                produceRecordsByPartition, transactionalId);
        RequestCompletionHandler callback = new RequestCompletionHandler() {
            public void onComplete(ClientResponse response) {
                handleProduceResponse(response, recordsByPartition, time.milliseconds());
            }
        };

        String nodeId = Integer.toString(destination);
        ClientRequest clientRequest = client.newClientRequest(nodeId, requestBuilder, now, acks != 0, callback);
        client.send(clientRequest, now);
        log.trace("Sent produce request to {}: {}", nodeId, requestBuilder);
    }

在开始的send方法中,我们曾看到如下的代码

            RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
                    serializedValue, headers, interceptCallback, remainingWaitMs);
            if (result.batchIsFull || result.newBatchCreated) {
                log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
                this.sender.wakeup();
            }

如果追加完数据后,对应的 RecordBatch 已经达到了 batch.size 的??(或者batch的剩余空间不?以添加下?条 Record),则唤醒 sender 线程发送数据。看到这里我们就明白了,这里是将可能处于IO阻塞的sener线程唤醒继续执行消息发送操作,真正的消息发送是Sender线程定时执行处理的。

  • 消息确认: 当消息成功发送到Kafka集群并被写入分区日志后,Kafka集群会向Sender线程发送一个确认消息。 Sender线程收到确认后,会更新本地缓存的消息状态,并通知生产者线程消息已成功发送。
  • 错误处理: 如果在发送过程中遇到错误,Sender线程会根据错误类型和重试策略进行相应的处理。 对于可恢复的错误,如短暂的网络故障,Sender线程会进行重试。 对于不可恢复的错误,如分区不可达,Sender线程可能会将消息发送到备用分区或记录到错误日志中。
  • 资源管理: Sender线程会根据当前的网络状况和集群负载动态调整发送策略,如调整发送频率和重试间隔。 它还会管理本地资源,如内存和网络连接,以确保高效稳定地运行。
  • 通过以上工作原理,我们可以看到Sender线程在Kafka中的核心地位。它不仅负责消息的传输,还管理着与Kafka集群的连接,确保消息的可靠性和高效传输。而生产者线程则专注于消息的生成和发送请求,与Sender线程紧密配合,共同完成Kafka的数据生产任务。

写在最后:kafka消息发送细节还是挺多,上面只是粗略探讨,如需了解更多细节,深入源码探索吧。

相关推荐

为何越来越多的编程语言使用JSON(为什么编程)

JSON是JavascriptObjectNotation的缩写,意思是Javascript对象表示法,是一种易于人类阅读和对编程友好的文本数据传递方法,是JavaScript语言规范定义的一个子...

何时在数据库中使用 JSON(数据库用json格式存储)

在本文中,您将了解何时应考虑将JSON数据类型添加到表中以及何时应避免使用它们。每天?分享?最新?软件?开发?,Devops,敏捷?,测试?以及?项目?管理?最新?,最热门?的?文章?,每天?花?...

MySQL 从零开始:05 数据类型(mysql数据类型有哪些,并举例)

前面的讲解中已经接触到了表的创建,表的创建是对字段的声明,比如:上述语句声明了字段的名称、类型、所占空间、默认值和是否可以为空等信息。其中的int、varchar、char和decimal都...

JSON对象花样进阶(json格式对象)

一、引言在现代Web开发中,JSON(JavaScriptObjectNotation)已经成为数据交换的标准格式。无论是从前端向后端发送数据,还是从后端接收数据,JSON都是不可或缺的一部分。...

深入理解 JSON 和 Form-data(json和formdata提交区别)

在讨论现代网络开发与API设计的语境下,理解客户端和服务器间如何有效且可靠地交换数据变得尤为关键。这里,特别值得关注的是两种主流数据格式:...

JSON 语法(json 语法 priority)

JSON语法是JavaScript语法的子集。JSON语法规则JSON语法是JavaScript对象表示法语法的子集。数据在名称/值对中数据由逗号分隔花括号保存对象方括号保存数组JS...

JSON语法详解(json的语法规则)

JSON语法规则JSON语法是JavaScript对象表示法语法的子集。数据在名称/值对中数据由逗号分隔大括号保存对象中括号保存数组注意:json的key是字符串,且必须是双引号,不能是单引号...

MySQL JSON数据类型操作(mysql的json)

概述mysql自5.7.8版本开始,就支持了json结构的数据存储和查询,这表明了mysql也在不断的学习和增加nosql数据库的有点。但mysql毕竟是关系型数据库,在处理json这种非结构化的数据...

JSON的数据模式(json数据格式示例)

像XML模式一样,JSON数据格式也有Schema,这是一个基于JSON格式的规范。JSON模式也以JSON格式编写。它用于验证JSON数据。JSON模式示例以下代码显示了基本的JSON模式。{"...

前端学习——JSON格式详解(后端json格式)

JSON(JavaScriptObjectNotation)是一种轻量级的数据交换格式。易于人阅读和编写。同时也易于机器解析和生成。它基于JavaScriptProgrammingLa...

什么是 JSON:详解 JSON 及其优势(什么叫json)

现在程序员还有谁不知道JSON吗?无论对于前端还是后端,JSON都是一种常见的数据格式。那么JSON到底是什么呢?JSON的定义...

PostgreSQL JSON 类型:处理结构化数据

PostgreSQL提供JSON类型,以存储结构化数据。JSON是一种开放的数据格式,可用于存储各种类型的值。什么是JSON类型?JSON类型表示JSON(JavaScriptO...

JavaScript:JSON、三种包装类(javascript 包)

JOSN:我们希望可以将一个对象在不同的语言中进行传递,以达到通信的目的,最佳方式就是将一个对象转换为字符串的形式JSON(JavaScriptObjectNotation)-JS的对象表示法...

Python数据分析 只要1分钟 教你玩转JSON 全程干货

Json简介:Json,全名JavaScriptObjectNotation,JSON(JavaScriptObjectNotation(记号、标记))是一种轻量级的数据交换格式。它基于J...

比较一下JSON与XML两种数据格式?(json和xml哪个好)

JSON(JavaScriptObjectNotation)和XML(eXtensibleMarkupLanguage)是在日常开发中比较常用的两种数据格式,它们主要的作用就是用来进行数据的传...

取消回复欢迎 发表评论:

请填写验证码