百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 编程字典 > 正文

源码分析Kafka之Producer

toyiye 2024-06-21 19:16 10 浏览 0 评论

Kafka是一款很棒的消息系统,可以看看我之前写的 后端好书阅读与推荐来了解一下它的整体设计。今天我们就来深入了解一下它的实现细节(我fork了一份代码),首先关注Producer这一方。

要使用kafka首先要实例化一个KafkaProducer,需要有brokerIP、序列化器必要Properties以及acks(0、1、n)、compression、retries、batch.size非必要Properties,通过这个简单的接口可以控制Producer大部分行为,实例化后就可以调用send方法发送消息了。

核心实现是这个方法:

public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {

// intercept the record, which can be potentially modified; this method does not throw exceptions

ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);//①

return doSend(interceptedRecord, callback);//②

}

通过不同的模式可以实现发送即忘(忽略返回结果)、同步发送(获取返回的future对象,回调函数置为null)、异步发送(设置回调函数)三种消息模式。

我们来看看消息类ProducerRecord有哪些属性:

private final String topic;//主题

private final Integer partition;//分区

private final Headers headers;//头

private final K key;//键

private final V value;//值

private final Long timestamp;//时间戳

它有多个构造函数,可以适应不同的消息类型:比如有无分区、有无key等。

①中ProducerInterceptors(有0 ~ 无穷多个,形成一个拦截链)对ProducerRecord进行拦截处理(比如打上时间戳,进行审计与统计等操作)

public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) {

ProducerRecord<K, V> interceptRecord = record;

for (ProducerInterceptor<K, V> interceptor : this.interceptors) {

try {

interceptRecord = interceptor.onSend(interceptRecord);

} catch (Exception e) {

// 不抛出异常,继续执行下一个拦截器

if (record != null)

log.warn("Error executing interceptor onSend callback for topic: {}, partition: {}", record.topic(), record.partition(), e);

else

log.warn("Error executing interceptor onSend callback", e);

}

}

return interceptRecord;

}

如果用户有定义就进行处理并返回处理后的ProducerRecord,否则直接返回本身。

然后②中doSend真正发送消息,并且是异步的(源码太长只保留关键):

private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {

TopicPartition tp = null;

try {

// 序列化 key 和 value

byte[] serializedKey;

try {

serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());

} catch (ClassCastException cce) {

}

byte[] serializedValue;

try {

serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());

} catch (ClassCastException cce) {

}

// 计算分区获得主题与分区

int partition = partition(record, serializedKey, serializedValue, cluster);

tp = new TopicPartition(record.topic(), partition);

// 回调与事务处理省略。

Header[] headers = record.headers().toArray();

// 消息追加到RecordAccumulator中

RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,

serializedValue, headers, interceptCallback, remainingWaitMs);

// 该批次满了或者创建了新的批次就要唤醒IO线程发送该批次了,也就是sender的wakeup方法

if (result.batchIsFull || result.newBatchCreated) {

log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);

this.sender.wakeup();

}

return result.future;

} catch (Exception e) {

// 拦截异常并抛出

this.interceptors.onSendError(record, tp, e);

throw e;

}

}

下面是计算分区的方法:

private int partition(ProducerRecord<K, V> record,

byte[] serializedKey, byte[] serializedValue, Cluster cluster) {

Integer partition = record.partition();

// 消息有分区就直接使用,否则就使用分区器计算

return partition != null ?

partition :

partitioner.partition(

record.topic(), record.key(), serializedKey,

record.value(), serializedValue, cluster);

}

默认的分区器DefaultPartitioner实现方式是如果partition存在就直接使用,否则根据key计算partition,如果key也不存在就使用round robin算法分配partition。

/**

* The default partitioning strategy:

* <ul>

* <li>If a partition is specified in the record, use it

* <li>If no partition is specified but a key is present choose a partition based on a hash of the key

* <li>If no partition or key is present choose a partition in a round-robin fashion

*/

public class DefaultPartitioner implements Partitioner {

private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap<>();

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {

List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);

int numPartitions = partitions.size();

if (keyBytes == null) {//key为空

int nextValue = nextValue(topic);

List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);//可用的分区

if (availablePartitions.size() > 0) {//有分区,取模就行

int part = Utils.toPositive(nextValue) % availablePartitions.size();

return availablePartitions.get(part).partition();

} else {// 无分区,

return Utils.toPositive(nextValue) % numPartitions;

}

} else {// key 不为空,计算key的hash并取模获得分区

return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;

}

}

private int nextValue(String topic) {

AtomicInteger counter = topicCounterMap.get(topic);

if (null == counter) {

counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());

AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);

if (currentCounter != null) {

counter = currentCounter;

}

}

return counter.getAndIncrement();//返回并加一,在取模的配合下就是round robin

}

}

以上就是发送消息的逻辑处理,接下来我们再看看消息发送的物理处理。

Sender(是一个Runnable,被包含在一个IO线程ioThread中,该线程不断从RecordAccumulator队列中的读取消息并通过Selector将数据发送给Broker)的wakeup方法,实际上是KafkaClient接口的wakeup方法,由NetworkClient类实现,采用了NIO,也就是java.nio.channels.Selector.wakeup()方法实现。

Sender的run中主要逻辑是不停执行准备消息和等待消息:

long pollTimeout = sendProducerData(now);//③

client.poll(pollTimeout, now);//④

③完成消息设置并保存到信道中,然后监听感兴趣的key,由KafkaChannel实现。

public void setSend(Send send) {

if (this.send != null)

throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress, connection id is " + id);

this.send = send;

this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);

}

// transportLayer的一种实现中的相关方法

public void addInterestOps(int ops) {

key.interestOps(key.interestOps() | ops);

}

④主要是Selector的poll,其select被wakeup唤醒:

public void poll(long timeout) throws IOException {

/* check ready keys */

long startSelect = time.nanoseconds();

int numReadyKeys = select(timeout);//wakeup使其停止阻塞

long endSelect = time.nanoseconds();

this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds());

if (numReadyKeys > 0 || !immediatelyConnectedKeys.isEmpty() || dataInBuffers) {

Set<SelectionKey> readyKeys = this.nioSelector.selectedKeys();

// Poll from channels that have buffered data (but nothing more from the underlying socket)

if (dataInBuffers) {

keysWithBufferedRead.removeAll(readyKeys); //so no channel gets polled twice

Set<SelectionKey> toPoll = keysWithBufferedRead;

keysWithBufferedRead = new HashSet<>(); //poll() calls will repopulate if needed

pollSelectionKeys(toPoll, false, endSelect);

}

// Poll from channels where the underlying socket has more data

pollSelectionKeys(readyKeys, false, endSelect);

// Clear all selected keys so that they are included in the ready count for the next select

readyKeys.clear();

pollSelectionKeys(immediatelyConnectedKeys, true, endSelect);

immediatelyConnectedKeys.clear();

} else {

madeReadProgressLastPoll = true; //no work is also "progress"

}

long endIo = time.nanoseconds();

this.sensors.ioTime.record(endIo - endSelect, time.milliseconds());

}

其中pollSelectionKeys方法会调用如下方法完成消息发送:

public Send write() throws IOException {

Send result = null;

if (send != null && send(send)) {

result = send;

send = null;

}

return result;

}

private boolean send(Send send) throws IOException {

send.writeTo(transportLayer);

if (send.completed())

transportLayer.removeInterestOps(SelectionKey.OP_WRITE);

return send.completed();

}

Send是一次数据发包,一般由ByteBufferSend或者MultiRecordsSend实现,其writeTo调用transportLayer的write方法,一般由PlaintextTransportLayer或者SslTransportLayer实现,区分是否使用ssl

public long writeTo(GatheringByteChannel channel) throws IOException {

long written = channel.write(buffers);

if (written < 0)

throw new EOFException("Wrote negative bytes to channel. This shouldn't happen.");

remaining -= written;

pending = TransportLayers.hasPendingWrites(channel);

return written;

}

public int write(ByteBuffer src) throws IOException {

return socketChannel.write(src);

}

到此就把Producer业务相关逻辑处理非业务相关的网络 2方面的主要流程梳理清楚了。其他额外的功能是通过一些配置保证的。

比如顺序保证就是max.in.flight.requests.per.connection,InFlightRequests的doSend会进行判断(由NetworkClient的canSendRequest调用),只要该参数设为1即可保证当前包未确认就不能发送下一个包从而实现有序性

public boolean canSendMore(String node) {

Deque<NetworkClient.InFlightRequest> queue = requests.get(node);

return queue == null || queue.isEmpty() ||

(queue.peekFirst().send.completed() && queue.size() < this.maxInFlightRequestsPerConnection);

}

再比如可靠性,通过设置acks,Sender中sendProduceRequest的clientRequest加入了回调函数:

RequestCompletionHandler callback = new RequestCompletionHandler() {

public void onComplete(ClientResponse response) {

handleProduceResponse(response, recordsByPartition, time.milliseconds());//调用completeBatch

}

};

/**

* 完成或者重试投递,这里如果acks不对就会重试

*

* @param batch The record batch

* @param response The produce response

* @param correlationId The correlation id for the request

* @param now The current POSIX timestamp in milliseconds

*/

private void completeBatch(ProducerBatch batch, ProduceResponse.PartitionResponse response, long correlationId,

long now, long throttleUntilTimeMs) {

}

public class ProduceResponse extends AbstractResponse {

/**

* Possible error code:

* INVALID_REQUIRED_ACKS (21)

*/

}

kafka源码一层一层包装很多,错综复杂,如有错误请大家不吝赐教。

相关推荐

幸运角色过去了,谈一谈DNF起源的元素

总的来说伤害比上个版本强太多了,打卢克每日和团本明显能感觉的到。目前打团B套+圣耀稍微打造下应该都能随便二拖了。组队基本上都是秒秒秒(以前得强力辅助,现在随便带个毒奶都行)。单刷除了王座和顶能源阿斯兰...

DNF元素超大凉打桩测试(把括号的伤害加起来好像比较正常)

最近修练场的二觉老是很奇怪,发现以前都是习惯性先减抗然后丢二觉,结果伤害。。。直接丢二觉就正常了下面是其他技能伤害,没达到BUG线,估计问题不大。装备打造方面:全身红字加起来353(41*5+74*2...

ANSYS接触和出图技巧(ansys rough接触)

1.ANSYS后处理时如何按灰度输出云图?1)你可以到utilitymenu-plotctrls-style-colors-windowcolors试试2)直接utilitymenu-plotctr...

ANSYS有限元使用经验总结-后处理(4)

28.求塑性极限荷载时,结构的变形应该较大,建议把大变形打开。...

CFopen21.1、CFopen21.2都来了(cfile open)

[呲牙][赞][加油]

为何越来越多的编程语言使用JSON(为什么编程)

JSON是JavascriptObjectNotation的缩写,意思是Javascript对象表示法,是一种易于人类阅读和对编程友好的文本数据传递方法,是JavaScript语言规范定义的一个子...

何时在数据库中使用 JSON(数据库用json格式存储)

在本文中,您将了解何时应考虑将JSON数据类型添加到表中以及何时应避免使用它们。每天?分享?最新?软件?开发?,Devops,敏捷?,测试?以及?项目?管理?最新?,最热门?的?文章?,每天?花?...

MySQL 从零开始:05 数据类型(mysql数据类型有哪些,并举例)

前面的讲解中已经接触到了表的创建,表的创建是对字段的声明,比如:上述语句声明了字段的名称、类型、所占空间、默认值和是否可以为空等信息。其中的int、varchar、char和decimal都...

JSON对象花样进阶(json格式对象)

一、引言在现代Web开发中,JSON(JavaScriptObjectNotation)已经成为数据交换的标准格式。无论是从前端向后端发送数据,还是从后端接收数据,JSON都是不可或缺的一部分。...

深入理解 JSON 和 Form-data(json和formdata提交区别)

在讨论现代网络开发与API设计的语境下,理解客户端和服务器间如何有效且可靠地交换数据变得尤为关键。这里,特别值得关注的是两种主流数据格式:...

JSON 语法(json 语法 priority)

JSON语法是JavaScript语法的子集。JSON语法规则JSON语法是JavaScript对象表示法语法的子集。数据在名称/值对中数据由逗号分隔花括号保存对象方括号保存数组JS...

JSON语法详解(json的语法规则)

JSON语法规则JSON语法是JavaScript对象表示法语法的子集。数据在名称/值对中数据由逗号分隔大括号保存对象中括号保存数组注意:json的key是字符串,且必须是双引号,不能是单引号...

MySQL JSON数据类型操作(mysql的json)

概述mysql自5.7.8版本开始,就支持了json结构的数据存储和查询,这表明了mysql也在不断的学习和增加nosql数据库的有点。但mysql毕竟是关系型数据库,在处理json这种非结构化的数据...

JSON的数据模式(json数据格式示例)

像XML模式一样,JSON数据格式也有Schema,这是一个基于JSON格式的规范。JSON模式也以JSON格式编写。它用于验证JSON数据。JSON模式示例以下代码显示了基本的JSON模式。{"...

前端学习——JSON格式详解(后端json格式)

JSON(JavaScriptObjectNotation)是一种轻量级的数据交换格式。易于人阅读和编写。同时也易于机器解析和生成。它基于JavaScriptProgrammingLa...

取消回复欢迎 发表评论:

请填写验证码