百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 编程字典 > 正文

快速了解 Kafka 生产者的使用和原理

toyiye 2024-06-21 12:38 9 浏览 0 评论

作者 | 草捏子

整理 | 杨碧玉

出品 | 草捏子(ID:chaycao)

头图 | CSDN 下载自视觉中国

本文将学习 Kafka 生产者的使用和原理,文中使用的 kafka-clients 版本号为2.6.0。下面进入正文,先通过一个示例看下如何使用生产者 API 发送消息。

public class Producer {

public static void main(String[] args) {

// 1. 配置参数

Properties properties = new Properties;

properties.put("bootstrap.servers", "localhost:9092");

properties.put("key.serializer",

"org.apache.kafka.common.serialization.StringSerializer");

properties.put("value.serializer",

"org.apache.kafka.common.serialization.StringSerializer");

// 2. 根据参数创建KafkaProducer实例(生产者)

KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

// 3. 创建ProducerRecord实例(消息)

ProducerRecord<String, String> record = new ProducerRecord<>("topic-demo", "hello kafka");

// 4. 发送消息

producer.send(record);

// 5. 关闭生产者示例

producer.close;

}

}

关于配置的三个必填参数

首先创建一个 Properties 实例,设置了三个必填参数:

  • bootstrap.servers: broker 的地址清单;

  • key.serializer:消息的键的序列化器;

  • value.serializer:消息的值的序列化器。

由于 broker 希望接受的是字节数组,所以需要将消息中的键值序列化成字节数组。在设置好参数后,根据参数创建 KafkaProducer 实例,也就是用于发送消息的生产者,接着再创建准备发送的消息 ProducerRecord 实例,然后使用 KafkaProducer 的 send 方法发送消息,最后再关闭生产者。

关于 KafkaProducer ,我们先记住两点:

  • 在创建实例的时候,需要指定配置;

  • send 方法可发送消息。

send方法

关于配置我们先只了解这三个必填参数,下面我们看下 send 方法,关于发送消息的方式有三种

1、发送并忘记(fire-and-forget)

在发送消息给 Kafka 时,不关心消息是否正常到达,只负责成功发送,存在丢失消息的可能。上面给出的示例就是这种方式。

2、同步发送(sync)

send 方法的返回值是一个 Future 对象,当调用其 get 方法时将阻塞等待 Kafka 的响应。如下:

Future<RecordMetadata> recordMetadataFuture = producer.send(record);

RecordMetadata recordMetadata = recordMetadataFuture.get;

RecordMetadata 对象中包含有消息的一些元数据,如消息的主题、分区号、分区中的偏移量、时间戳等。

3、异步发送(async)

在调用 send 方法时,指定回调函数,在 Kafka 返回响应时,将调用该函数。如下:

producer.send(record, new Callback {

@Override

public void onCompletion(RecordMetadata recordMetadata, Exception e) {

if (e != ) {

e.printStackTrace;

} else {

System.out.println(recordMetadata.topic + "-"

+ recordMetadata.partition + ":" + recordMetadata.offset);

}

}

});

onCompletion 有两个参数,其类型分别是 RecordMetadata 和 Exception 。当消息发送成功时, recordMetadata 为非 ,而 e 将为 。当消息发送失败时,则反之。

消息对象ProducerRecord

下面我们认识下消息对象 ProducerRecord ,封装了发送的消息,其定义如下:

public class ProducerRecord<K, V> {

private final String topic; // 主题

private final Integer partition; // 分区号

private final Headers headers; // 消息头部

private final K key; // 键

private final V value; // 值

private final Long timestamp; // 时间戳

// ...其他构造方法和成员方法

}

其中主题和值为必填,其余非必填。例如当给出了分区号,则相当于指定了分区,而当未给出分区号时,若给出了键,则可用于计算分区号。关于消息头部和时间戳,暂不讲述。

发送消息时用到的组件

在对生产者对象 KafkaProducer 和消息对象 ProducerRecord 有了认识后,下面我们看下在使用生产者发送消息时,会使用到的组件有生产者拦截器、序列化器和分区器。其架构(部分)如下:

1、生产者拦截器:ProducerInterceptor 接口,主要用于在消息发送前做一些准备工作,比如对消息做过滤,或者修改消息内容,也可以用于在发送回调逻辑前做一些定制化的需求,例如统计类工作。

2、序列化器,Serializer 接口,用于将数据转换为字节数组。

3、分区器,Partitioner 接口,若未指定分区号,且提供 key 。

处理过程

下面结合代码来看下处理过程,加深印象。

public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {

// 拦截器,拦截消息进行处理

ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);

return doSend(interceptedRecord, callback);

}

上面是 KafkaProducer 的 send 方法,首先会将消息传给拦截器的 onSend 方法,然后进入 doSend 方法。其中 doSend 方法较长,但内容并不复杂,下面给出了主要步骤的注释。

private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {

TopicPartition tp = ;

try {

throwIfProducerClosed;

// 1.确认数据发送到的topic的metadata可用

long nowMs = time.milliseconds;

ClusterAndWaitTime clusterAndWaitTime;

try {

clusterAndWaitTime = waitOnMetadata(record.topic, record.partition, nowMs, maxBlockTimeMs);

} catch (KafkaException e) {

if (metadata.isClosed)

throw new KafkaException("Producer closed while send in progress", e);

throw e;

}

nowMs += clusterAndWaitTime.waitedOnMetadataMs;

long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);

Cluster cluster = clusterAndWaitTime.cluster;

// 2.序列化器,序列化消息的key和value

byte serializedKey;

try {

serializedKey = keySerializer.serialize(record.topic, record.headers, record.key);

} catch (ClassCastException cce) {

throw new SerializationException("Can't convert key of class " + record.key.getClass.getName +

" to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName +

" specified in key.serializer", cce);

}

byte serializedValue;

try {

serializedValue = valueSerializer.serialize(record.topic, record.headers, record.value);

} catch (ClassCastException cce) {

throw new SerializationException("Can't convert value of class " + record.value.getClass.getName +

" to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName +

" specified in value.serializer", cce);

}

// 3.分区器,获取或计算分区号

int partition = partition(record, serializedKey, serializedValue, cluster);

tp = new TopicPartition(record.topic, partition);

setReadOnly(record.headers);

Header headers = record.headers.toArray;

int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic,

compressionType, serializedKey, serializedValue, headers);

ensureValidRecordSize(serializedSize);

long timestamp = record.timestamp == ? nowMs : record.timestamp;

if (log.isTraceEnabled) {

log.trace("Attempting to append record {} with callback {} to topic {} partition {}", record, callback, record.topic, partition);

}

Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);

if (transactionManager != && transactionManager.isTransactional) {

transactionManager.failIfNotReadyForSend;

}

// 4.消息累加器,缓存消息

RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,

serializedValue, headers, interceptCallback, remainingWaitMs, true, nowMs);

if (result.abortForNewBatch) {

int prevPartition = partition;

partitioner.onNewBatch(record.topic, cluster, prevPartition);

partition = partition(record, serializedKey, serializedValue, cluster);

tp = new TopicPartition(record.topic, partition);

if (log.isTraceEnabled) {

log.trace("Retrying append due to new batch creation for topic {} partition {}. The old partition was {}", record.topic, partition, prevPartition);

}

// producer callback will make sure to call both 'callback' and interceptor callback

interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);

result = accumulator.append(tp, timestamp, serializedKey,

serializedValue, headers, interceptCallback, remainingWaitMs, false, nowMs);

}

if (transactionManager != && transactionManager.isTransactional)

transactionManager.maybeAddPartitionToTransaction(tp);

// 5.如果batch满了或者消息大小超过了batch的剩余空间需要创建新的batch

// 将唤醒sender线程发送消息

if (result.batchIsFull || result.newBatchCreated) {

log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic, partition);

this.sender.wakeup;

}

return result.future;

} catch (ApiException e) {

log.debug("Exception occurred during message send:", e);

if (callback != )

callback.onCompletion(, e);

this.errors.record;

this.interceptors.onSendError(record, tp, e);

return new FutureFailure(e);

} catch (InterruptedException e) {

this.errors.record;

this.interceptors.onSendError(record, tp, e);

throw new InterruptException(e);

} catch (KafkaException e) {

this.errors.record;

this.interceptors.onSendError(record, tp, e);

throw e;

} catch (Exception e) {

this.interceptors.onSendError(record, tp, e);

throw e;

}

}

doSend方法

doSend 方法主要分为5个步骤

  • 在发送数据前,先确认数据发送的 topic 的 metadata 是可用的( partition 的 leader 存在即为可用,如果开启了权限控制,则还要求 client 具有相应的权限);

  • 序列化器,序列化消息的 key 和 value ;

  • 分区器,获取或计算分区号;

  • 消息累加器,缓存消息;

  • 在消息累加器中,消息会被放在一个 batch 中,用于批量发送,当 batch 满了或者消息大小超过了 batch 的剩余空间需要创建新的 batch ,则将唤醒 sender 线程发送消息。

关于 meatadata 本文将不深究,序列化器、分区器前文也给出了介绍。下面我们主要看下消息累加器。

消息累加器

消息累加器,其作用是用于缓存消息,以便批量发送消息。在 RecordAccumulator 中用一个 ConcurrentMap < TopicPartition , Deque < ProducerBatch >> batches 的 map 变量保存消息。作为 key 的 TopicPartition 封装了 topic 和分区号,而对应的 value 为 ProducerBatch 的双端队列,也就是将发往同一个分区的消息缓存在 ProducerBatch 中。在发送消息时, Record 会被追加在队列的尾部,即加入到尾部的 ProducerBatch 中,如果 ProducerBatch 的空间不足或队列为空,则将创建新的 ProducerBatch ,然后追加。当 ProducerBatch 满了或创建新的 ProducerBatch 时,将唤醒 Sender 线程从队列的头部获取 ProducerBatch 进行发送。

Sender 线程中会将待发送的 ProducerBatch 将转换成< Integer , List < ProducerBatch >>的形式,按 Kafka 节点的 ID 进行分组,然后将同一个 node 的 ProducerBatch 放在一个请求中发送。

Kafak 生产者的内容就先了解到这,下面通过思维导图对本文内容做一个简单的回顾:

参考

  • 《深入理解Kafka核心设计与实践原理》

  • 《Kafka权威指南》

  • Kafka 源码解析之 Producer 发送模型(一): http://matt33.com/2017/06/25/kafka-producer-send-module/

点分享

相关推荐

为何越来越多的编程语言使用JSON(为什么编程)

JSON是JavascriptObjectNotation的缩写,意思是Javascript对象表示法,是一种易于人类阅读和对编程友好的文本数据传递方法,是JavaScript语言规范定义的一个子...

何时在数据库中使用 JSON(数据库用json格式存储)

在本文中,您将了解何时应考虑将JSON数据类型添加到表中以及何时应避免使用它们。每天?分享?最新?软件?开发?,Devops,敏捷?,测试?以及?项目?管理?最新?,最热门?的?文章?,每天?花?...

MySQL 从零开始:05 数据类型(mysql数据类型有哪些,并举例)

前面的讲解中已经接触到了表的创建,表的创建是对字段的声明,比如:上述语句声明了字段的名称、类型、所占空间、默认值和是否可以为空等信息。其中的int、varchar、char和decimal都...

JSON对象花样进阶(json格式对象)

一、引言在现代Web开发中,JSON(JavaScriptObjectNotation)已经成为数据交换的标准格式。无论是从前端向后端发送数据,还是从后端接收数据,JSON都是不可或缺的一部分。...

深入理解 JSON 和 Form-data(json和formdata提交区别)

在讨论现代网络开发与API设计的语境下,理解客户端和服务器间如何有效且可靠地交换数据变得尤为关键。这里,特别值得关注的是两种主流数据格式:...

JSON 语法(json 语法 priority)

JSON语法是JavaScript语法的子集。JSON语法规则JSON语法是JavaScript对象表示法语法的子集。数据在名称/值对中数据由逗号分隔花括号保存对象方括号保存数组JS...

JSON语法详解(json的语法规则)

JSON语法规则JSON语法是JavaScript对象表示法语法的子集。数据在名称/值对中数据由逗号分隔大括号保存对象中括号保存数组注意:json的key是字符串,且必须是双引号,不能是单引号...

MySQL JSON数据类型操作(mysql的json)

概述mysql自5.7.8版本开始,就支持了json结构的数据存储和查询,这表明了mysql也在不断的学习和增加nosql数据库的有点。但mysql毕竟是关系型数据库,在处理json这种非结构化的数据...

JSON的数据模式(json数据格式示例)

像XML模式一样,JSON数据格式也有Schema,这是一个基于JSON格式的规范。JSON模式也以JSON格式编写。它用于验证JSON数据。JSON模式示例以下代码显示了基本的JSON模式。{"...

前端学习——JSON格式详解(后端json格式)

JSON(JavaScriptObjectNotation)是一种轻量级的数据交换格式。易于人阅读和编写。同时也易于机器解析和生成。它基于JavaScriptProgrammingLa...

什么是 JSON:详解 JSON 及其优势(什么叫json)

现在程序员还有谁不知道JSON吗?无论对于前端还是后端,JSON都是一种常见的数据格式。那么JSON到底是什么呢?JSON的定义...

PostgreSQL JSON 类型:处理结构化数据

PostgreSQL提供JSON类型,以存储结构化数据。JSON是一种开放的数据格式,可用于存储各种类型的值。什么是JSON类型?JSON类型表示JSON(JavaScriptO...

JavaScript:JSON、三种包装类(javascript 包)

JOSN:我们希望可以将一个对象在不同的语言中进行传递,以达到通信的目的,最佳方式就是将一个对象转换为字符串的形式JSON(JavaScriptObjectNotation)-JS的对象表示法...

Python数据分析 只要1分钟 教你玩转JSON 全程干货

Json简介:Json,全名JavaScriptObjectNotation,JSON(JavaScriptObjectNotation(记号、标记))是一种轻量级的数据交换格式。它基于J...

比较一下JSON与XML两种数据格式?(json和xml哪个好)

JSON(JavaScriptObjectNotation)和XML(eXtensibleMarkupLanguage)是在日常开发中比较常用的两种数据格式,它们主要的作用就是用来进行数据的传...

取消回复欢迎 发表评论:

请填写验证码