百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 编程字典 > 正文

RocketMQ4源码(一)NameServer

toyiye 2024-06-27 00:42 9 浏览 0 评论


前言

本章基于rocketmq4.6.0分析nameserver的实现。

nameserver的基础概念不再赘述,可以参考官网。

本章将分析以下内容:

  • 借nameserver,分析rocketmq-remoting通讯层通用模块
  • nameserver的kv配置管理
  • nameserver的路由信息管理

后续分析不会再看nameserver侧,有个概念即可。

一、概览

nameserver侧比较简单,一共不超过10个类。

NamesrvStartup:启动类,读取配置,构造NamesrvController启动;

NamesrvController:管理nameserver所有组件,比如:通讯server、kv配置管理、路由信息管理等;

KVConfigManager:kv配置管理,kv配置的增删改查;

RouteInfoManager:路由信息管理,路由信息的增删改查;

BrokerHousekeepingService:当通讯层长连接发生变更,调用RouteInfoManager处理;

DefaultRequestProcessor:业务请求处理器,处理所有客户端的请求,包括broker和client;

二、Server端通讯层

所有组件的通讯层都会使用rocketmq-remoting模块,无论客户端还是服务端,无论是producer还是consumer还是broker还是nameserver。

这里简单分析一下服务端的实现,即NettyRemotingServer。

1、协议

绕过底层bytebuffer的读写,业务上都是RemotingCommand模型,无论请求报文还是响应报文。

  • code:业务报文编码,对于请求来说,区分请求类型,对于响应来说,0代表成功,非0代表异常
  • language:语言类型
  • version:rocketmq的版本号
  • opaque:请求id,由请求方生成,内存自增
  • flag:标志位,目前只有两个二进制位被使用,一位用于区分是请求还是响应报文,一位用于区分是否是oneway请求
  • remark:提示信息
  • extFields:自定义头部
  • serializeTypeCurrentRPC:序列化方式,默认json
  • body:请求体

2、线程模型

Netty

NettyRemotingServer#start:

Netty线程分为三组

  • eventLoopGroupBoss:接收连接,1个线程;
  • eventLoopGroupSelector:io线程组,3个线程,可配置serverSelectorThreads;
  • defaultEventExecutorGroup:worker线程组,处理编解码等业务,8个线程,可配置serverWorkerThreads;

业务线程

无论客户端还是服务端,当netty的worker线程处理完编解码后,区分请求还是响应,走不同逻辑。

NettyRemotingAbstract#processMessageReceived

NettyRemotingAbstract#processRequestCommand

对于请求来说,根据请求业务编码,找对应NettyRequestProcessor处理器,及其业务线程池处理。

NettyRemotingAbstract.processorTable:

NettyRemotingAbstract#defaultRequestProcessor:

对于没有特殊配置的业务请求,使用默认的Processor处理。

NamesrvController#registerProcessor:nameserver就只有一个Processor。

nameserver的业务线程池大小默认也是8,通过serverWorkerThreads配置。

NettyRemotingAbstract#processResponseCommand

对于响应来说,根据请求id找到future,

rpc都差不多,发送请求的时候把id和future全局存一下,收到响应根据id找future

如果future是注册了callback回调方法的,使用future的callback线程池执行回调,

否则netty的worker线程,将response写入future,唤醒阻塞等待响应的线程。

超时请求检测

NettyRemotingServer#start:这里是服务端,但是客户端也类似,开启一个线程扫描超时请求。

NettyRemotingAbstract#responseTable:当前已经发出,但未收到相应的请求。

三、kv配置

1、KVConfigManager

内存中维护一份Map,存储Namespace-Key-Value。

KVConfigManager#putKVConfig:写kv配置,先拿写锁写内存,后拿读锁写磁盘。

KVConfigManager#getKVConfig:拿读锁读配置。

默认kv配置会存储在user.home下的namesrv/kvConfig.json中。

json复制代码{"configTable":{"namespace":{"key":"value"}}}

2、用途

目前只有一个地方在用这个kv配置,针对生产者,场景是顺序消息。

假设有两个broker,每个broker有8个queue。

对于生产者发现路由来说,当一个broker下线后,原来是16个队列hash,现在变成8个队列hash,那么有可能乱序。

通过在nameserver侧写死路由表,可以保证不发生上面的情况,问题在于消息投递到下线的broker会报错。

比如updateTopic命令,设置TopicA为顺序topic。

arduino复制代码mqadmin updateTopic -n localhost:9876 -b 169.254.246.163:10911 -t topicA -r 8 -w 8 -o true

则会在namespace=ORDER_TOPIC_CONFIG,新增kv配置,key=topic名,value=broker名1:队列数量1;broker名2:队列数量2。见UpdateTopicSubCommand#execute。

MQClientInstance#topicRouteData2TopicPublishInfo:

生产者获取路由表时,如果topic在nameserver被配置为顺序消息topic,将采用静态的路由表给生产者。

但是,默认情况下nameserver并没有开启这个功能。

四、路由信息

1、RouteInfoManager


RouteInfoManager基于内存存储路由信息,一共5个table。

2、broker注册

DefaultRequestProcessor#registerBrokerWithFilterServer:

broker注册请求包含broker配置信息和topic配置信息;

broker响应请求包含master地址和所有topic的静态路由信息(kv配置);

写入5个table

RouteInfoManager#registerBroker:先获取写锁,处理内存5个table

step1,clusterAddrTable,注册cluster到brokerName的映射关系。

step2,brokerAddrTable,注册brokerName到BrokerData的关系。

BrokerData包含同名broker的拓扑关系。

step3,topicQueueTable,注册topic到QueueData(队列)的映射关系,其中QueueData包含brokerName。

Step4,brokerLiveTable,注册broker地址到存活broker信息的映射关系。

Step5,filterServerTable,注册broker地址到filterServer的映射关系。sql92过滤相关。

Step6,如果注册broker非master,从brokerAddrTable获取masterBroker地址,从brokerLiveTable获取masterBroker的ha地址,返回broker。

触发条件

broker注册分为两大类。

一个类是broker启动或故障恢复之后注册。

举个例子,BrokerController#start:

启动后注册,定时注册,默认30s一次。

另一类是topic变更触发broker注册。

举个例子,TopicConfigManager#createTopicInSendMessageMethod:

broker自动创建topic,触发broker注册,实际是为了刷新topic配置。

3、broker注销

主动注销

broker正常关闭,主动调用nameserver注销。

DefaultRequestProcessor#unregisterBroker:

RouteInfoManager#unregisterBroker:

注销broker也是先拿写锁,然后操作5个table。

需要注意的是,除了两个纯broker纬度的table,其他table的处理是有条件的。

  • brokerAddrTable:当同名broker全注销后,才移除BrokerData;
  • clusterAddrTable:当同名broker全注销后,才移除cluster对应broker关系,如果cluster下broker为空,移除cluster;
  • topicQueueTable:当同名broker全注销后,才移除brokerName对应的所有QueueData;

通讯层注销

BrokerHousekeepingService:

当通讯层连接关闭、异常、空闲,都会被动触发broker注销。

RouteInfoManager#onChannelDestroy:

对于RouteInfoManager来说,和主动注销的区别在于,

需要先获取读锁,通过通讯层的netty channel,找到存活brokerTable中的broker信息,执行上述5个table的remove操作。

定时心跳检测注销

NamesrvController#initialize:

nameserver后台每隔10秒会扫描非活跃broker执行注销。

RouteInfoManager#scanNotActiveBroker:

如果broker2分钟内未发送注册请求,同样会执行注销。

4、查询路由

最常用的方法是根据topic查询路由TopicRouteData,无论生产消费都需要。

DefaultRequestProcessor#getRouteInfoByTopic:

  • 根据topic查询TopicRouteData;
  • 如果nameserver开启orderMessage,默认false,查询topic静态路由配置;

TopicRouteData:包含topic下所有queue,以及broker信息。

RouteInfoManager#pickupTopicRouteData:从几个table中根据topic查路由。

  • 根据topic找QueueData;
  • 根据QueueData上的brokerName找BrokerData;
  • 根据BrokerData上的brokerAddr找filter server;

结合broker注销,只要有存活的同名broker,都能拿到QueueData。

总结

RocketMQ通讯协议是个私有协议,主要包含几部分:

  • code:业务报文编码,对于请求来说,区分请求类型,对于响应来说,0代表成功,非0代表异常
  • language:语言类型
  • version:rocketmq的版本号
  • opaque:请求id,由请求方生成,内存自增
  • flag:标志位,一位用于区分是请求还是响应报文,一位用于区分是否是oneway请求
  • remark:提示信息
  • extFields:自定义头部
  • serializeTypeCurrentRPC:序列化方式,默认json
  • body:请求体

Server端线程模型如下:

Client端线程模型如下(本文没提,见NettyRemotingClient):

nameserver提供动态kv配置能力,目前只有一个用途,针对顺序消费场景,生产者可以走nameserver提供的静态路由配置。但是这个能力默认nameserver是禁用的,orderMessageEnable=false。

broker注册和心跳,在nameserver侧维护了5张table。

对于producer和consumer来说,最重要的是其中两张table提供的路由数据,topicQueueTable和brokerAddrTable。

BrokerLiveInfo主要存储了broker的通讯层channel、心跳时间、ha地址等信息。

有三种方式导致broker从nameserver注销:broker主动注销、通讯层注销、nameserver定时检测心跳。

无论是通讯层空闲还是心跳检测,超时时间都是120秒。

BrokerLiveInfo移除后,只有所有同名broker下线,才会导致topicQueueTable和brokerAddrTable移除路由数据。

相关推荐

为何越来越多的编程语言使用JSON(为什么编程)

JSON是JavascriptObjectNotation的缩写,意思是Javascript对象表示法,是一种易于人类阅读和对编程友好的文本数据传递方法,是JavaScript语言规范定义的一个子...

何时在数据库中使用 JSON(数据库用json格式存储)

在本文中,您将了解何时应考虑将JSON数据类型添加到表中以及何时应避免使用它们。每天?分享?最新?软件?开发?,Devops,敏捷?,测试?以及?项目?管理?最新?,最热门?的?文章?,每天?花?...

MySQL 从零开始:05 数据类型(mysql数据类型有哪些,并举例)

前面的讲解中已经接触到了表的创建,表的创建是对字段的声明,比如:上述语句声明了字段的名称、类型、所占空间、默认值和是否可以为空等信息。其中的int、varchar、char和decimal都...

JSON对象花样进阶(json格式对象)

一、引言在现代Web开发中,JSON(JavaScriptObjectNotation)已经成为数据交换的标准格式。无论是从前端向后端发送数据,还是从后端接收数据,JSON都是不可或缺的一部分。...

深入理解 JSON 和 Form-data(json和formdata提交区别)

在讨论现代网络开发与API设计的语境下,理解客户端和服务器间如何有效且可靠地交换数据变得尤为关键。这里,特别值得关注的是两种主流数据格式:...

JSON 语法(json 语法 priority)

JSON语法是JavaScript语法的子集。JSON语法规则JSON语法是JavaScript对象表示法语法的子集。数据在名称/值对中数据由逗号分隔花括号保存对象方括号保存数组JS...

JSON语法详解(json的语法规则)

JSON语法规则JSON语法是JavaScript对象表示法语法的子集。数据在名称/值对中数据由逗号分隔大括号保存对象中括号保存数组注意:json的key是字符串,且必须是双引号,不能是单引号...

MySQL JSON数据类型操作(mysql的json)

概述mysql自5.7.8版本开始,就支持了json结构的数据存储和查询,这表明了mysql也在不断的学习和增加nosql数据库的有点。但mysql毕竟是关系型数据库,在处理json这种非结构化的数据...

JSON的数据模式(json数据格式示例)

像XML模式一样,JSON数据格式也有Schema,这是一个基于JSON格式的规范。JSON模式也以JSON格式编写。它用于验证JSON数据。JSON模式示例以下代码显示了基本的JSON模式。{"...

前端学习——JSON格式详解(后端json格式)

JSON(JavaScriptObjectNotation)是一种轻量级的数据交换格式。易于人阅读和编写。同时也易于机器解析和生成。它基于JavaScriptProgrammingLa...

什么是 JSON:详解 JSON 及其优势(什么叫json)

现在程序员还有谁不知道JSON吗?无论对于前端还是后端,JSON都是一种常见的数据格式。那么JSON到底是什么呢?JSON的定义...

PostgreSQL JSON 类型:处理结构化数据

PostgreSQL提供JSON类型,以存储结构化数据。JSON是一种开放的数据格式,可用于存储各种类型的值。什么是JSON类型?JSON类型表示JSON(JavaScriptO...

JavaScript:JSON、三种包装类(javascript 包)

JOSN:我们希望可以将一个对象在不同的语言中进行传递,以达到通信的目的,最佳方式就是将一个对象转换为字符串的形式JSON(JavaScriptObjectNotation)-JS的对象表示法...

Python数据分析 只要1分钟 教你玩转JSON 全程干货

Json简介:Json,全名JavaScriptObjectNotation,JSON(JavaScriptObjectNotation(记号、标记))是一种轻量级的数据交换格式。它基于J...

比较一下JSON与XML两种数据格式?(json和xml哪个好)

JSON(JavaScriptObjectNotation)和XML(eXtensibleMarkupLanguage)是在日常开发中比较常用的两种数据格式,它们主要的作用就是用来进行数据的传...

取消回复欢迎 发表评论:

请填写验证码