百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 编程字典 > 正文

阅读代码深入原理22——RocketMQ之Producer

toyiye 2024-04-07 14:12 18 浏览 0 评论

RocketMQ由NameServer、Broker、Producer、Consumer构成,NameServer类似于RPC框架的注册中心,Broker负责存储消息,Producer和Consumer之间没有直接关系,通过Broker交互。整体架构如下图:

MQProducer(和MQConsumer)继承接口MQAdmin(定义创建topic、查看指定MessageQueue的offset、通过offsetId查看消息),支持同步(或异步、单向)发送单条(或批量)的(事务型或非事务性)消息。

除此之外,MQProducer还支持类似RPC的同步调用,即通过request方法发送消息,并在Consumer消费后返回消息。

先看Producer的构造方法和启动:

# rocketmq-client-4.9.1.jar!/org.apache.rocketmq.client.producer.DefaultMQProducer
	// 0. 启动时(或构造此对象前)设置系统属性"rocketmq.namesrv.addr"(或环境变量"NAMESRV_ADDR")
    public DefaultMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook,
        boolean enableMsgTrace, final String customizedTraceTopic) {
        this.namespace = namespace; // 1. 若使用不含namespace的构造函数,则尝试从namesrv地址获取(符合"{schema}://MQ_INST_{1}_{2}.{domain}"则取”MQ_INST_{1}_{2}“)。topic在内部处理时,除系统topic(以"rmq_sys_"开头或个别特定topic如"TBW102",见TopicValidator)外都会添加namespace前缀(如果是重试或死信,则添加到中间位置)
        this.producerGroup = producerGroup; // 2. 若使用不含producerGroup的构造函数,则使用默认的”DEFAULT_PRODUCER“
        defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook); // 3. RPCHook用于消息发送请求前后的处理,可能为null。构造DefaultMQProducerImpl时会创建异步发送消息的线程池,队列长度为50000
        //if client open the message trace feature
        if (enableMsgTrace) {
            try {
                AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(producerGroup, TraceDispatcher.Type.PRODUCE, customizedTraceTopic, rpcHook); // 4.customizedTraceTopic为null时,使用默认的topic:RMQ_SYS_TRACE_TOPIC
                dispatcher.setHostProducer(this.getDefaultMQProducerImpl());
                traceDispatcher = dispatcher;
                this.getDefaultMQProducerImpl().registerSendMessageHook( // 5. 注册发送消息钩子
                    new SendMessageTraceHookImpl(traceDispatcher));
                this.defaultMQProducerImpl.registerEndTransactionHook( // 6. 注册事务结束钩子
                    new EndTransactionTraceHookImpl(traceDispatcher));
            } catch (Throwable e) {
                log.error("system mqtrace hook init failed ,maybe can't send msg trace data");
            }
        }
    }
	
    @Override
    public void start() throws MQClientException {
        this.setProducerGroup(withNamespace(this.producerGroup)); // 7. 重置producerGroup:如果有namespace,或namesrv能推测出namespace,则拼接成"{namespace}%{producerGroup}"格式
        this.defaultMQProducerImpl.start(); // 8. 启动消息生产者
        if (null != traceDispatcher) {
            try {
                traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel()); // 9. 启动消息跟踪
            } catch (MQClientException e) {
                log.warn("trace dispatcher start failed ", e);
            }
        }
    }	

接着我们看下DefaultMQProducerImpl的启动过程:

# rocketmq-client-4.9.1.jar!/org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl
    public void start() throws MQClientException {
        this.start(true);
    }
    public void start(final boolean startFactory) throws MQClientException {
        switch (this.serviceState) {
            case CREATE_JUST: // 1. 初始状态
                this.serviceState = ServiceState.START_FAILED;
                this.checkConfig(); // 2. 检查producerGroup(不能为空,长度应小于255,字符应属于”%|a-zA-Z0-9_-“,且不能是”DEFAULT_PRODUCER“)
                if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
                    this.defaultMQProducer.changeInstanceNameToPID(); // 3. 实例名取自系统属性”rocketmq.client.name“,若为空则取默认值”DEFAULT“,当producerGroup不是”CLIENT_INNER_PRODUCER“,且默认实例名时重置实例名为进程号与当前时间({pid}#{nanoTime})
                }
                this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook); // 4. 以IP、实例名作为clientId(如果设置了unitName则添加unitName),创建MQClientInstance,并保存映射关系到MQClientManager.factoryTable
                boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this); // 5. 将producerGroup与DefaultMQProducerImpl绑定,放到MQClientInstance.producerTable
                if (!registerOK) {
                    this.serviceState = ServiceState.CREATE_JUST;
                    throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
                        + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                        null);
                }
                this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo()); // 6. 将”TBW102“这个topic放入topicPublishInfoTable
                if (startFactory) {
                    mQClientFactory.start(); // 7. 启动MQClientInstance
                }
                log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
                    this.defaultMQProducer.isSendMessageWithVIPChannel());
                this.serviceState = ServiceState.RUNNING;
                break;
            case RUNNING:
            case START_FAILED:
            case SHUTDOWN_ALREADY:
                throw new MQClientException("The producer service state not OK, maybe started once, "
                    + this.serviceState
                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                    null);
            default:
                break;
        }
        this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); // 8. 给所有broker发送心跳,心跳包内容为{clientId:,producerGroup:[]},当brokerAddrTable不为空时,给broker发送心跳,并记录broker对应的版本
        this.startScheduledTask(); // 9. 使用单线程的定时任务线程池,周期性(间隔1秒)扫描requestFutureTable,将超时的RequestResponseFuture移出,设置异常原因,并执行回调
    }

再接着看下MQClientInstance的启动:

# rocketmq-client-4.9.1.jar!/org.apache.rocketmq.client.impl.factory.MQClientInstance
    public void start() throws MQClientException {
        synchronized (this) {
            switch (this.serviceState) {
                case CREATE_JUST:
                    this.serviceState = ServiceState.START_FAILED;
                    // If not specified,looking address from name server
                    if (null == this.clientConfig.getNamesrvAddr()) {
                        this.mQClientAPIImpl.fetchNameServerAddr(); // 1. 如果未设置namesrv,则使用HTTP GET方式从指定URL获取。此处应该是淘宝内部使用方式,默认地址为”http://jmenv.tbsite.net:8080/rocketmq/nsaddr?nofix=1“。可以通过系统属性修改(参考TopAddressing, MixAll)
                    }
                    // Start request-response channel
                    this.mQClientAPIImpl.start(); // 2. 启动netty,使用rocketmq的NettyEncoder、NettyDecoder来编解码,使用rocketmq的NettyRemotingClient.NettyClientHandler来处理
                    // Start various schedule tasks
					// 3. 如果未设置namesrv,周期性(每2分钟)获取地址(见步骤1:mQClientAPIImpl.fetchNameServerAddr)。
					// 周期性从producerTable获取topic,然后根据topic从namesrv获取TopicRouteData,如果信息变化,则更新brokerAddrTable,以及更新topic相关的TopicPublishInfo
					// 周期性(每个心跳间隔)清除brokerAddrTable中下线的broker信息,然后给所有broker发送心跳
					// 周期性从consumerTable遍历MQConsumerInner,持久化其offset。(广播模式使用本地文件,集群模式使用远程的broker)
					// 周期性(每隔1分钟)调整线程池:遍历consumerTable,根据处理队列里消息数量,增加或减少核心线程数
                    this.startScheduledTask(); 
                    // Start pull service
                    this.pullMessageService.start(); // 4. 创建一个线程,Runnable对象为PullMessageService,启动线程后会阻塞式获取PullRequest,根据Consumer的consumerGroup,从consumerTable获取MQConsumerInner,拉取消息
                    // Start rebalance service
                    this.rebalanceService.start(); // 5. 创建线程,Runnable对象为RebalanceService,它会隔一定时间,遍历consumerTable获取MQConsumerInner,根据topic和当前消费者情况以一定策略,调整消费队列,以心跳形式通知到broker
                    // Start push service
                    this.defaultMQProducer.getDefaultMQProducerImpl().start(false); // 6. 再次执行DefaultMQProducerImpl.start,但不会循环调用MQClientInstance.start
                    log.info("the client factory [{}] start OK", this.clientId);
                    this.serviceState = ServiceState.RUNNING;
                    break;
                case START_FAILED:
                    throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
                default:
                    break;
            }
        }
    }

最后,我们看下消息发送。

先介绍同步发送:

# rocketmq-client-4.9.1.jar!/org.apache.rocketmq.client.producer.DefaultMQProducer
    @Override
    public SendResult send(
        Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        Validators.checkMessage(msg, this);
        msg.setTopic(withNamespace(msg.getTopic())); // 1. 如果有namespace,重置topic
        return this.defaultMQProducerImpl.send(msg);
    }
# rocketmq-client-4.9.1.jar!/org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl
    public SendResult send(
        Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        return send(msg, this.defaultMQProducer.getSendMsgTimeout());
    }
	
    public SendResult send(Message msg,
        long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
    }
	private SendResult sendDefaultImpl( // 1. 异步send最终也是调用此方法,只是在线程池中执行任务,不关心返回值,而参数值CommunicationMode为CommunicationMode.ASYNC。单向发送也是调用此方法,参数值CommunicationMode为CommunicationMode.ONEWAY。
        Message msg,
        final CommunicationMode communicationMode,
        final SendCallback sendCallback,
        final long timeout
    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        this.makeSureStateOK(); // 2. 检查serviceState是否为RUNNING
        Validators.checkMessage(msg, this.defaultMQProducer);
        final long invokeID = random.nextLong();
        long beginTimestampFirst = System.currentTimeMillis();
        long beginTimestampPrev = beginTimestampFirst;
        long endTimestamp = beginTimestampFirst;
        TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic()); // 3. 从topicPublishInfoTable根据topic得到TopicPublishInfo(为空则先从namesrv获取TopicRouteData,转换成TopicPublishInfo)
        if (topicPublishInfo != null && topicPublishInfo.ok()) {
            boolean callTimeout = false;
            MessageQueue mq = null;
            Exception exception = null;
            SendResult sendResult = null;
            int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
            int times = 0;
            String[] brokersSent = new String[timesTotal];
            for (; times < timesTotal; times++) {
                String lastBrokerName = null == mq ? null : mq.getBrokerName();
                MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); // 4. 使用mqFaultStrategy根据topic、最近一次使用的broker信息挑选MessageQueue。(一些send方法支持入参传MessageQueue或MessageQueueSelector)
                if (mqSelected != null) {
                    mq = mqSelected;
                    brokersSent[times] = mq.getBrokerName();
                    try {
                        beginTimestampPrev = System.currentTimeMillis();
                        if (times > 0) {
                            //Reset topic with namespace during resend.
                            msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
                        }
                        long costTime = beginTimestampPrev - beginTimestampFirst;
                        if (timeout < costTime) {
                            callTimeout = true;
                            break;
                        }
                        sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime); // 5. 核心的消息发送代码,见后面分析
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                        switch (communicationMode) {
                            case ASYNC:
                                return null;
                            case ONEWAY:
                                return null;
                            case SYNC:
                                if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                                    if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) { // 6. 发送状态不是成功时,根据属性决定是否在另一个broker重试(默认值不重试)
                                        continue;
                                    }
                                }
                                return sendResult; // 7. 仅同步返回结果
                            default:
                                break;
                        }
                    } catch (RemotingException e) { // 8. 指定条件(异常类型,及某些类型的返回码)重试
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                        log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                        log.warn(msg.toString());
                        exception = e;
                        continue;
                    } catch (MQClientException e) {
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                        log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                        log.warn(msg.toString());
                        exception = e;
                        continue;
                    } catch (MQBrokerException e) {
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                        log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                        log.warn(msg.toString());
                        exception = e;
                        switch (e.getResponseCode()) {
                            case ResponseCode.TOPIC_NOT_EXIST:
                            case ResponseCode.SERVICE_NOT_AVAILABLE:
                            case ResponseCode.SYSTEM_ERROR:
                            case ResponseCode.NO_PERMISSION:
                            case ResponseCode.NO_BUYER_ID:
                            case ResponseCode.NOT_IN_CURRENT_UNIT:
                                continue;
                            default:
                                if (sendResult != null) {
                                    return sendResult;
                                }
                                throw e;
                        }
                    } catch (InterruptedException e) {
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                        log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                        log.warn(msg.toString());
                        log.warn("sendKernelImpl exception", e);
                        log.warn(msg.toString());
                        throw e;
                    }
                } else {
                    break;
                }
            }
            if (sendResult != null) {
                return sendResult;
            }
            String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s",
                times,
                System.currentTimeMillis() - beginTimestampFirst,
                msg.getTopic(),
                Arrays.toString(brokersSent));
            info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);
            MQClientException mqClientException = new MQClientException(info, exception);
            if (callTimeout) {
                throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");
            }
            if (exception instanceof MQBrokerException) {
                mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());
            } else if (exception instanceof RemotingConnectException) {
                mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);
            } else if (exception instanceof RemotingTimeoutException) {
                mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);
            } else if (exception instanceof MQClientException) {
                mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);
            }
            throw mqClientException;
        }
        validateNameServerSetting();
        throw new MQClientException("No route info of this topic: " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
            null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
    }	
	
    private SendResult sendKernelImpl(final Message msg,
        final MessageQueue mq,
        final CommunicationMode communicationMode,
        final SendCallback sendCallback,
        final TopicPublishInfo topicPublishInfo,
        final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        long beginStartTime = System.currentTimeMillis();
        String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName()); // 5.1 从MQClientInstance.brokerAddrTable根据brokerName获取主broker地址
        if (null == brokerAddr) { // 5.2 重试
            tryToFindTopicPublishInfo(mq.getTopic());
            brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
        }
        SendMessageContext context = null;
        if (brokerAddr != null) {
            brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr); // 5.3 可能使用VIP的处理(设置系统属性”com.rocketmq.sendMessageWithVIPChannel“才能启用,VIP地址为原端口-2)
            byte[] prevBody = msg.getBody();
            try {
                //for MessageBatch,ID has been set in the generating process
                if (!(msg instanceof MessageBatch)) {
                    MessageClientIDSetter.setUniqID(msg); // 5.4 非MessageBatch类型的Message,设置属性”UNIQ_KEY“值(包含ip、进程号、ClassLoader哈希值、类加载至今时间、自增计数)
                }
                boolean topicWithNamespace = false;
                if (null != this.mQClientFactory.getClientConfig().getNamespace()) { // 5.5 namespace存在时,设置Message属性”INSTANCE_ID“为namespace
                    msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace());
                    topicWithNamespace = true;
                }
                int sysFlag = 0;
                boolean msgBodyCompressed = false;
                if (this.tryToCompressMessage(msg)) { // 5.6 当消息类型不是MessageBatch,且消息体长度过大时(默认4k),按一定压缩比(默认5级)压缩消息体,再重设消息体
                    sysFlag |= MessageSysFlag.COMPRESSED_FLAG; // 5.7 压缩消息后标记sysFlag
                    msgBodyCompressed = true;
                }
                final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
                if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
                    sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE; // 5.8 (使用TransactionMQProducer发送的事务型消息)消息属性”TRAN_MSG“被解析成true时,标记sysFlag
                }
                if (hasCheckForbiddenHook()) { // 5.9 如果存在CheckForbiddenHook(需用户注册,默认不存在),执行CheckForbiddenHook.checkForbidden
                    CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();
                    checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());
                    checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());
                    checkForbiddenContext.setCommunicationMode(communicationMode);
                    checkForbiddenContext.setBrokerAddr(brokerAddr);
                    checkForbiddenContext.setMessage(msg);
                    checkForbiddenContext.setMq(mq);
                    checkForbiddenContext.setUnitMode(this.isUnitMode());
                    this.executeCheckForbiddenHook(checkForbiddenContext);
                }
                if (this.hasSendMessageHook()) { // 5.10 SendMessageHook存在时(如果允许trace,则会有SendMessageTraceHookImpl),执行SendMessageHook.sendMessageBefore
                    context = new SendMessageContext();
                    context.setProducer(this);
                    context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
                    context.setCommunicationMode(communicationMode);
                    context.setBornHost(this.defaultMQProducer.getClientIP());
                    context.setBrokerAddr(brokerAddr);
                    context.setMessage(msg);
                    context.setMq(mq);
                    context.setNamespace(this.defaultMQProducer.getNamespace());
                    String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
                    if (isTrans != null && isTrans.equals("true")) {
                        context.setMsgType(MessageType.Trans_Msg_Half);
                    }
                    if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {
                        context.setMsgType(MessageType.Delay_Msg);
                    }
                    this.executeSendMessageHookBefore(context);
                }
                SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
                requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
                requestHeader.setTopic(msg.getTopic());
                requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
                requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
                requestHeader.setQueueId(mq.getQueueId());
                requestHeader.setSysFlag(sysFlag);
                requestHeader.setBornTimestamp(System.currentTimeMillis());
                requestHeader.setFlag(msg.getFlag());
                requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
                requestHeader.setReconsumeTimes(0);
                requestHeader.setUnitMode(this.isUnitMode());
                requestHeader.setBatch(msg instanceof MessageBatch);
                if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { // 5.11 topic以”%RETRY%“开头,重置SendMessageRequestHeader的消费次数,清除消息的”RECONSUME_TIME“属性
                    String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
                    if (reconsumeTimes != null) {
                        requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
                        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
                    }
                    String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
                    if (maxReconsumeTimes != null) {
                        requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
                        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
                    }
                }
                SendResult sendResult = null;
                switch (communicationMode) {
                    case ASYNC: // 5.12 异步消息topic去除namespace,发送的消息体使用可能压缩的内容,但原Message.body需还原成压缩前
                        Message tmpMessage = msg;
                        boolean messageCloned = false;
                        if (msgBodyCompressed) {
                            //If msg body was compressed, msgbody should be reset using prevBody.
                            //Clone new message using commpressed message body and recover origin massage.
                            //Fix bug:https://github.com/apache/rocketmq-externals/issues/66
                            tmpMessage = MessageAccessor.cloneMessage(msg);
                            messageCloned = true;
                            msg.setBody(prevBody);
                        }
                        if (topicWithNamespace) {
                            if (!messageCloned) {
                                tmpMessage = MessageAccessor.cloneMessage(msg);
                                messageCloned = true;
                            }
                            msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
                        }
                        long costTimeAsync = System.currentTimeMillis() - beginStartTime;
                        if (timeout < costTimeAsync) {
                            throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
                        }
                        sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
                            brokerAddr,
                            mq.getBrokerName(),
                            tmpMessage,
                            requestHeader,
                            timeout - costTimeAsync,
                            communicationMode,
                            sendCallback,
                            topicPublishInfo,
                            this.mQClientFactory,
                            this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
                            context,
                            this);
                        break;
                    case ONEWAY:
                    case SYNC:
                        long costTimeSync = System.currentTimeMillis() - beginStartTime;
                        if (timeout < costTimeSync) {
                            throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
                        }
                        sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage( // 5.13 同步发送消息
                            brokerAddr,
                            mq.getBrokerName(),
                            msg,
                            requestHeader,
                            timeout - costTimeSync,
                            communicationMode,
                            context,
                            this);
                        break;
                    default:
                        assert false;
                        break;
                }
                if (this.hasSendMessageHook()) {
                    context.setSendResult(sendResult);
                    this.executeSendMessageHookAfter(context); // 5.14 SendMessageHook存在时(如果允许trace,则会有SendMessageTraceHookImpl),执行SendMessageHook.sendMessageAfter
                }
                return sendResult;
            } catch (RemotingException e) {
                if (this.hasSendMessageHook()) {
                    context.setException(e);
                    this.executeSendMessageHookAfter(context);
                }
                throw e;
            } catch (MQBrokerException e) {
                if (this.hasSendMessageHook()) {
                    context.setException(e);
                    this.executeSendMessageHookAfter(context);
                }
                throw e;
            } catch (InterruptedException e) {
                if (this.hasSendMessageHook()) {
                    context.setException(e);
                    this.executeSendMessageHookAfter(context);
                }
                throw e;
            } finally { // 5.15 还原body为未压缩状态,重置topic为不含namespace
                msg.setBody(prevBody);
                msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
            }
        }
        throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
    }	

再跟进去的发送代码:

# rocketmq-client-4.9.1.jar!/org.apache.rocketmq.client.impl.MQClientAPIImpl
    public SendResult sendMessage( // 1. 同步发送
        final String addr,
        final String brokerName,
        final Message msg,
        final SendMessageRequestHeader requestHeader,
        final long timeoutMillis,
        final CommunicationMode communicationMode,
        final SendMessageContext context,
        final DefaultMQProducerImpl producer
    ) throws RemotingException, MQBrokerException, InterruptedException {
        return sendMessage(addr, brokerName, msg, requestHeader, timeoutMillis, communicationMode, null, null, null, 0, context, producer);
    }
    public SendResult sendMessage( // 2. 同步/异步发送
        final String addr,
        final String brokerName,
        final Message msg,
        final SendMessageRequestHeader requestHeader,
        final long timeoutMillis,
        final CommunicationMode communicationMode,
        final SendCallback sendCallback,
        final TopicPublishInfo topicPublishInfo,
        final MQClientInstance instance,
        final int retryTimesWhenSendFailed,
        final SendMessageContext context,
        final DefaultMQProducerImpl producer
    ) throws RemotingException, MQBrokerException, InterruptedException {
        long beginStartTime = System.currentTimeMillis();
        RemotingCommand request = null;
        String msgType = msg.getProperty(MessageConst.PROPERTY_MESSAGE_TYPE); // 3. 默认未设置,模拟RPC时可通过MessageUtil.createReplyMessage来设置
        boolean isReply = msgType != null && msgType.equals(MixAll.REPLY_MESSAGE_FLAG);
        if (isReply) {
            if (sendSmartMsg) {
                SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
                request = RemotingCommand.createRequestCommand(RequestCode.SEND_REPLY_MESSAGE_V2, requestHeaderV2);
            } else {
                request = RemotingCommand.createRequestCommand(RequestCode.SEND_REPLY_MESSAGE, requestHeader);
            }
        } else {
            if (sendSmartMsg || msg instanceof MessageBatch) { // 4. 创建RemotingCommand(sendSmartMsg根据系统属性""org.apache.rocketmq.client.sendSmartMsg"来定,默认为true)
                SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
                request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2, requestHeaderV2);
            } else {
                request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);
            }
        }
        request.setBody(msg.getBody());
        switch (communicationMode) {
            case ONEWAY:
                this.remotingClient.invokeOneway(addr, request, timeoutMillis);
                return null;
            case ASYNC:
                final AtomicInteger times = new AtomicInteger();
                long costTimeAsync = System.currentTimeMillis() - beginStartTime;
                if (timeoutMillis < costTimeAsync) {
                    throw new RemotingTooMuchRequestException("sendMessage call timeout");
                }
                this.sendMessageAsync(addr, brokerName, msg, timeoutMillis - costTimeAsync, request, sendCallback, topicPublishInfo, instance,
                    retryTimesWhenSendFailed, times, context, producer);
                return null;
            case SYNC:
                long costTimeSync = System.currentTimeMillis() - beginStartTime;
                if (timeoutMillis < costTimeSync) {
                    throw new RemotingTooMuchRequestException("sendMessage call timeout");
                }
                return this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request); // 5. 调用NettyRemotingClient的invokeSync方法(执行前调RPCHook.doBeforeRequest,执行后调用RPCHook.doAfterResponse),然后处理结果
            default:
                assert false;
                break;
        }
        return null;
    }

单向只会在执行前调RPCHook.doBeforeRequest,然后获取单向的信号量才能写消息。

异步调用只会在执行前调RPCHook.doBeforeRequest,也需要获取信号量,还有InvokeCallback,在此时触发SendMessageHook和SendCallback。

然后看下事务型消息的处理:

# rocketmq-client-4.9.1.jar!/org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl
    public TransactionSendResult sendMessageInTransaction(final Message msg,
        final LocalTransactionExecuter localTransactionExecuter, final Object arg)
        throws MQClientException {
        TransactionListener transactionListener = getCheckListener();
        if (null == localTransactionExecuter && null == transactionListener) {
            throw new MQClientException("tranExecutor is null", null);
        }
        // ignore DelayTimeLevel parameter
        if (msg.getDelayTimeLevel() != 0) {
            MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
        }
        Validators.checkMessage(msg, this.defaultMQProducer);
        SendResult sendResult = null;
        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true"); // 1. 设置消息属性”TRAN_MSG“、”PGROUP“
        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
        try {
            sendResult = this.send(msg); // 2. 发送消息(见之前普通的发送代码)
        } catch (Exception e) {
            throw new MQClientException("send message Exception", e);
        }
        LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
        Throwable localException = null;
        switch (sendResult.getSendStatus()) {
            case SEND_OK: {
                try {
                    if (sendResult.getTransactionId() != null) {
                        msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
                    }
                    String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
                    if (null != transactionId && !"".equals(transactionId)) {
                        msg.setTransactionId(transactionId);
                    }
                    if (null != localTransactionExecuter) { // 2. 使用LocalTransactionExecuter或TransactionListener来执行本地事务
                        localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
                    } else if (transactionListener != null) {
                        log.debug("Used new transaction API");
                        localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
                    }
                    if (null == localTransactionState) {
                        localTransactionState = LocalTransactionState.UNKNOW;
                    }
                    if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
                        log.info("executeLocalTransactionBranch return {}", localTransactionState);
                        log.info(msg.toString());
                    }
                } catch (Throwable e) {
                    log.info("executeLocalTransactionBranch exception", e);
                    log.info(msg.toString());
                    localException = e;
                }
            }
            break;
            case FLUSH_DISK_TIMEOUT:
            case FLUSH_SLAVE_TIMEOUT:
            case SLAVE_NOT_AVAILABLE:
                localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
                break;
            default:
                break;
        }
        try {
            this.endTransaction(msg, sendResult, localTransactionState, localException); // 3. 结束事务:(EndTransactionHook如果存在)则先执行EndTransactionHook.endTransaction,然后发送单向消息进行提交(半消息发送成功且本地事务成功)或回滚
        } catch (Exception e) {
            log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
        }
        TransactionSendResult transactionSendResult = new TransactionSendResult();
        transactionSendResult.setSendStatus(sendResult.getSendStatus());
        transactionSendResult.setMessageQueue(sendResult.getMessageQueue());
        transactionSendResult.setMsgId(sendResult.getMsgId());
        transactionSendResult.setQueueOffset(sendResult.getQueueOffset());
        transactionSendResult.setTransactionId(sendResult.getTransactionId());
        transactionSendResult.setLocalTransactionState(localTransactionState);
        return transactionSendResult;
    }

再看下同步RPC如何模拟:

# rocketmq-client-4.9.1.jar!/org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl
    public Message request(Message msg,
        long timeout) throws RequestTimeoutException, MQClientException, RemotingException, MQBrokerException, InterruptedException {
        long beginTimestamp = System.currentTimeMillis();
        prepareSendRequest(msg, timeout); // 1. 以UUID作为关联id给消息属性”CORRELATION_ID“,以clientId给消息属性”REPLY_TO_CLIENT“,以超时时间作为消息属性”TTL“
        final String correlationId = msg.getProperty(MessageConst.PROPERTY_CORRELATION_ID);
        try {
            final RequestResponseFuture requestResponseFuture = new RequestResponseFuture(correlationId, timeout, null);
            RequestFutureTable.getRequestFutureTable().put(correlationId, requestResponseFuture);
            long cost = System.currentTimeMillis() - beginTimestamp;
            this.sendDefaultImpl(msg, CommunicationMode.ASYNC, new SendCallback() { // 2. 发送消息
                @Override
                public void onSuccess(SendResult sendResult) {
                    requestResponseFuture.setSendRequestOk(true);
                }
                @Override
                public void onException(Throwable e) {
                    requestResponseFuture.setSendRequestOk(false);
                    requestResponseFuture.putResponseMessage(null);
                    requestResponseFuture.setCause(e);
                }
            }, timeout - cost);
            return waitResponse(msg, timeout, requestResponseFuture, cost); // 3. 等待Future结果
        } finally {
            RequestFutureTable.getRequestFutureTable().remove(correlationId);
        }
    }

从客户端代码无法了解发送消息的客户端如何接收响应的消息,即一个ProducerGroup下有多个实例,在request模式下,只能是发起request请求的Producer能收到Message,具体如何处理,需要后续分析Broker相关代码了。

相关推荐

如何用 coco 数据集训练 Detectron2 模型?

随着最新的Pythorc1.3版本的发布,下一代完全重写了它以前的目标检测框架,新的目标检测框架被称为Detectron2。本教程将通过使用自定义coco数据集训练实例分割模型,帮助你开始使...

CICD联动阿里云容器服务Kubernetes实践之Bamboo篇

本文档以构建一个Java软件项目并部署到阿里云容器服务的Kubernetes集群为例说明如何使用Bamboo在阿里云Kubernetes服务上运行RemoteAgents并在agents上...

Open3D-ML点云语义分割实验【RandLA-Net】

作为点云Open3D-ML实验的一部分,我撰写了文章解释如何使用Tensorflow和PyTorch支持安装此库。为了测试安装,我解释了如何运行一个简单的Python脚本来可视化名为...

清理系统不用第三方工具(系统自带清理软件效果好不?)

清理优化系统一定要借助于优化工具吗?其实,手动优化系统也没有那么神秘,掌握了方法和技巧,系统清理也是一件简单和随心的事。一方面要为每一个可能产生累赘的文件找到清理的方法,另一方面要寻找能够提高工作效率...

【信创】联想开先终端开机不显示grub界面的修改方法

原文链接:【信创】联想开先终端开机不显示grub界面的修改方法...

如意玲珑成熟度再提升,三大发行版支持教程来啦!

前期,我们已分别发布如意玲珑在deepinV23与UOSV20、openEuler24.03发行版的操作指南,本文,我们将为大家详细介绍Ubuntu24.04、Debian12、op...

118种常见的多媒体文件格式(英文简写)

MP4[?mpi?f??]-MPEG-4Part14(MPEG-4第14部分)AVI[e?vi??a?]-AudioVideoInterleave(音视频交错)MOV[m...

密码丢了急上火?码住7种console密码紧急恢复方式!

身为攻城狮的你,...

CSGO丨CS2的cfg指令代码分享(csgo自己的cfg在哪里?config文件位置在哪?)

?...

使用open SSL生成局域网IP地址证书

某些特殊情况下,用户内网访问多可文档管理系统时需要启用SSL传输加密功能,但只有IP,没有域名和证书。这种情况下多可提供了一种免费可行的方式,通过openSSL生成免费证书。此方法生成证书浏览器会提示...

Python中加载配置文件(python怎么加载程序包)

我们在做开发的时候经常要使用配置文件,那么配置文件的加载就需要我们提前考虑,再不使用任何框架的情况下,我们通常会有两种解决办法:完整加载将所有配置信息一次性写入单一配置文件.部分加载将常用配置信息写...

python开发项目,不得不了解的.cfg配置文件

安装软件时,经常会见到后缀为.cfg、.ini的文件,一般我们不用管,只要不删就行。因为这些是程序安装、运行时需要用到的配置文件。但对开发者来说,这种文件是怎么回事就必须搞清了。本文从.cfg文件的创...

瑞芯微RK3568鸿蒙开发板OpenHarmony系统修改cfg文件权限方法

本文适用OpenHarmony开源鸿蒙系统,本次使用的是开源鸿蒙主板,搭载瑞芯微RK3568芯片。深圳触觉智能专注研发生产OpenHarmony开源鸿蒙硬件,包括核心板、开发板、嵌入式主板,工控整机等...

Python9:图像风格迁移-使用阿里的接口

先不多说,直接上结果图。#!/usr/bin/envpython#coding=utf-8importosfromaliyunsdkcore.clientimportAcsClient...

Python带你打造个性化的图片文字识别

我们的目标:从CSV文件读取用户的文件信息,并将文件名称修改为姓名格式的中文名称,进行规范资料整理,从而实现快速对多个文件进行重命名。最终效果:将原来无规律的文件名重命名为以姓名为名称的文件。技术点:...

取消回复欢迎 发表评论:

请填写验证码