侧边栏壁纸
博主头像
落叶人生博主等级

走进秋风,寻找秋天的落叶

  • 累计撰写 130562 篇文章
  • 累计创建 28 个标签
  • 累计收到 9 条评论
标签搜索

目 录CONTENT

文章目录

RocketMQ消息的发送与接收

2022-07-14 星期四 / 0 评论 / 0 点赞 / 32 阅读 / 91078 字

1、概述Producer 发送消息。主要是同步发送消息源码,涉及到 异步/Oneway发送消息,事务消息会跳过。Broker 接收消息。Producer 发送消息DefaultMQProducer#s

1、概述

Producer 发送消息。主要是同步发送消息源码,涉及到 异步/Oneway发送消息,事务消息会跳过。

Broker 接收消息。

Producer 发送消息

DefaultMQProducer#send(Message)

    /**     * Send message in synchronous mode. This method returns only when the sending procedure totally completes. </p>     *     * <strong>Warn:</strong> this method has internal retry-mechanism, that is, internal implementation will retry     * {@link #retryTimesWhenSendFailed} times before claiming failure. As a result, multiple messages may potentially     * delivered to broker(s). It's up to the application developers to resolve potential duplication issue.     *     * @param msg Message to send.     * @return {@link SendResult} instance to inform senders details of the deliverable, say Message ID of the message,     * {@link SendStatus} indicating broker storage/replication status, message queue sent to, etc.     * @throws MQClientException if there is any client error.     * @throws RemotingException if there is any network-tier error.     * @throws MQBrokerException if there is any error with broker.     * @throws InterruptedException if the sending thread is interrupted.     */    @Override    public SendResult send(        Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {        Validators.checkMessage(msg, this);        msg.setTopic(withNamespace(msg.getTopic()));        return this.defaultMQProducerImpl.send(msg);    }

DefaultMQProducerImpl#sendDefaultImpl()

说明 :发送消息。步骤:获取消息路由信息,选择要发送到的消息队列,执行消息发送核心方法,并对发送结果进行封装返回。

    private SendResult sendDefaultImpl(        Message msg,        final CommunicationMode communicationMode,        final SendCallback sendCallback,        final long timeout    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {        // 校验Producer处于运行状态        this.makeSureStateOK();        // 校验消息格式        Validators.checkMessage(msg, this.defaultMQProducer);        // 调用编号;用于下面打印日志,标记为同一次发送消息        final long invokeID = random.nextLong();        long beginTimestampFirst = System.currentTimeMillis();        long beginTimestampPrev = beginTimestampFirst;        long endTimestamp = beginTimestampFirst;        // 获取 Topic路由信息        TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());        if (topicPublishInfo != null && topicPublishInfo.ok()) {            boolean callTimeout = false;            // 最后选择消息要发送到的队列            MessageQueue mq = null;            Exception exception = null;            // 最后一次发送结果            SendResult sendResult = null;            // 同步多次调用            int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;            int times = 0;            String[] brokersSent = new String[timesTotal];            for (; times < timesTotal; times++) {                String lastBrokerName = null == mq ? null : mq.getBrokerName();                // 选择消息要发送到的队列                MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);                if (mqSelected != null) {                    mq = mqSelected;                    brokersSent[times] = mq.getBrokerName();                    try {                        beginTimestampPrev = System.currentTimeMillis();                        if (times > 0) {                            //Reset topic with namespace during resend.                            msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));                        }                        long costTime = beginTimestampPrev - beginTimestampFirst;                        if (timeout < costTime) {                            callTimeout = true;                            break;                        }                        // 调用发送消息核心方法                        sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);                        endTimestamp = System.currentTimeMillis();                        // 更新Broker可用性信息                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);                        switch (communicationMode) {                            case ASYNC:                                return null;                            case ONEWAY:                                return null;                            case SYNC:                                if (sendResult.getSendStatus() != SendStatus.SEND_OK) {                                    // 同步发送成功但存储有问题时 && 配置存储异常时重新发送开关 时,进行重试                                    if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {                                        continue;                                    }                                }                                return sendResult;                            default:                                break;                        }                    } catch (RemotingException e) {                        // 打印异常,更新Broker可用性信息,更新继续循环                        endTimestamp = System.currentTimeMillis();                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);                        log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);                        log.warn(msg.toString());                        exception = e;                        continue;                    } catch (MQClientException e) {                        // 打印异常,更新Broker可用性信息,更新继续循环                        endTimestamp = System.currentTimeMillis();                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);                        log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);                        log.warn(msg.toString());                        exception = e;                        continue;                    } catch (MQBrokerException e) {                        // 打印异常,更新Broker可用性信息,部分情况下的异常,直接返回,结束循环                        endTimestamp = System.currentTimeMillis();                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);                        log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);                        log.warn(msg.toString());                        exception = e;                        switch (e.getResponseCode()) {                            // 如下异常continue,进行发送消息重试                            case ResponseCode.TOPIC_NOT_EXIST:                            case ResponseCode.SERVICE_NOT_AVAILABLE:                            case ResponseCode.SYSTEM_ERROR:                            case ResponseCode.NO_PERMISSION:                            case ResponseCode.NO_BUYER_ID:                            case ResponseCode.NOT_IN_CURRENT_UNIT:                                continue;                                // 如果有发送结果,进行返回,否则,抛出异常;                            default:                                if (sendResult != null) {                                    return sendResult;                                }                                throw e;                        }                    } catch (InterruptedException e) {                        // 打印异常,更新Broker可用性信息,更新继续循环                        endTimestamp = System.currentTimeMillis();                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);                        log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);                        log.warn(msg.toString());                        log.warn("sendKernelImpl exception", e);                        log.warn(msg.toString());                        throw e;                    }                } else {                    break;                }            }            // 返回发送结果            if (sendResult != null) {                return sendResult;            }            // 根据不同情况,抛出不同的异常            String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s",                times,                System.currentTimeMillis() - beginTimestampFirst,                msg.getTopic(),                Arrays.toString(brokersSent));            info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);            MQClientException mqClientException = new MQClientException(info, exception);            if (callTimeout) {                throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");            }            if (exception instanceof MQBrokerException) {                mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());            } else if (exception instanceof RemotingConnectException) {                mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);            } else if (exception instanceof RemotingTimeoutException) {                mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);            } else if (exception instanceof MQClientException) {                mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);            }            throw mqClientException;        }        // Namesrv找不到异常        validateNameServerSetting();        // 消息路由找不到异常        throw new MQClientException("No route info of this topic: " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),            null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);    }

DefaultMQProducerImpl#tryToFindTopicPublishInfo()

说明 :获得 Topic发布信息。优先从缓存topicPublishInfoTable,其次从Namesrv中获得。

    private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {        // 缓存中获取 Topic发布信息        TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);        // 当无可用的 Topic发布信息时,从Namesrv获取一次        if (null == topicPublishInfo || !topicPublishInfo.ok()) {            this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);            topicPublishInfo = this.topicPublishInfoTable.get(topic);        }        // 若获取的 Topic发布信息时候可用,则返回        if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {            return topicPublishInfo;        } else {            // 使用 {@link DefaultMQProducer#createTopicKey} 对应的 Topic发布信息。用于 Topic发布信息不存在 && Broker支持自动创建Topic            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);            topicPublishInfo = this.topicPublishInfoTable.get(topic);            return topicPublishInfo;        }    }

MQFaultStrategy

说明 :Producer消息发送容错策略。默认情况下容错策略关闭,即sendLatencyFaultEnable=false。

| Producer发送消息消耗时长 | Broker不可用时长 || — | — || >= 15000 ms | 600 1000 ms || >= 3000 ms | 180 1000 ms || >= 2000 ms | 120 1000 ms || >= 1000 ms | 60 1000 ms || >= 550 ms | 30 * 1000 ms || >= 100 ms | 0 ms || >= 50 ms | 0 ms |

public class MQFaultStrategy {    private final static InternalLogger log = ClientLogger.getLog();    /**     * 延迟故障容错,维护每个Broker的发送消息的延迟     * key:brokerName     */    private final LatencyFaultTolerance<String> latencyFaultTolerance = new LatencyFaultToleranceImpl();    /**     * 发送消息延迟容错开关     */    private boolean sendLatencyFaultEnable = false;    /**     * 延迟级别数组     */    private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};    /**     * 不可用时长数组     */    private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};    public long[] getNotAvailableDuration() {        return notAvailableDuration;    }    public void setNotAvailableDuration(final long[] notAvailableDuration) {        this.notAvailableDuration = notAvailableDuration;    }    public long[] getLatencyMax() {        return latencyMax;    }    public void setLatencyMax(final long[] latencyMax) {        this.latencyMax = latencyMax;    }    public boolean isSendLatencyFaultEnable() {        return sendLatencyFaultEnable;    }    public void setSendLatencyFaultEnable(final boolean sendLatencyFaultEnable) {        this.sendLatencyFaultEnable = sendLatencyFaultEnable;    }    /**     * 根据 Topic发布信息 选择一个消息队列     *     * @param tpInfo         Topic发布信息     * @param lastBrokerName brokerName     * @return 消息队列     */    public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {        if (this.sendLatencyFaultEnable) {            try {                // 获取 brokerName=lastBrokerName && 可用的一个消息队列                int index = tpInfo.getSendWhichQueue().getAndIncrement();                for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {                    int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();                    if (pos < 0) {                        pos = 0;                    }                    MessageQueue mq = tpInfo.getMessageQueueList().get(pos);                    if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {                        if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName)) {                            return mq;                        }                    }                }                // 选择一个相对好的broker,并获得其对应的一个消息队列,不考虑该队列的可用性                final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();                int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);                if (writeQueueNums > 0) {                    final MessageQueue mq = tpInfo.selectOneMessageQueue();                    if (notBestBroker != null) {                        mq.setBrokerName(notBestBroker);                        mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);                    }                    return mq;                } else {                    latencyFaultTolerance.remove(notBestBroker);                }            } catch (Exception e) {                log.error("Error occurred when selecting message queue", e);            }            // 选择一个消息队列,不考虑队列的可用性            return tpInfo.selectOneMessageQueue();        }        // 获得 lastBrokerName 对应的一个消息队列,不考虑该队列的可用性        return tpInfo.selectOneMessageQueue(lastBrokerName);    }    /**     * 更新延迟容错信息     * @param brokerName brokerName     * @param currentLatency 延迟     * @param isolation 是否隔离。当开启隔离时,默认延迟为30000。目前主要用于发送消息异常时     */    public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {        if (this.sendLatencyFaultEnable) {            long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);            this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);        }    }    /**     * 计算延迟对应的不可用时间     * @param currentLatency 延迟     * @return 不可用时间     */    private long computeNotAvailableDuration(final long currentLatency) {        for (int i = latencyMax.length - 1; i >= 0; i--) {            if (currentLatency >= latencyMax[i]) {                return this.notAvailableDuration[i];            }        }        return 0;    }}

LatencyFaultTolerance

延迟故障容错接口

public interface LatencyFaultTolerance<T> {    /**     * 更新对象的延迟和不可用时长     *     * @param name                 对象     * @param currentLatency       延迟时间     * @param notAvailableDuration 不可用时长     */    void updateFaultItem(final T name, final long currentLatency, final long notAvailableDuration);    /**     * 判断对象是否可用     *     * @param name broker名称     * @return 是否可用     */    boolean isAvailable(final T name);    /**     * 移除对象     *     * @param name 移除对象     */    void remove(final T name);    /**     * 获取一个对象     *     * @return 对象     */    T pickOneAtLeast();}

LatencyFaultToleranceImpl

public class LatencyFaultToleranceImpl implements LatencyFaultTolerance<String> {    /**     * 对象故障信息Table     */    private final ConcurrentHashMap<String, FaultItem> faultItemTable = new ConcurrentHashMap<String, FaultItem>(16);    /**     * 对象选择Index     */    private final ThreadLocalIndex whichItemWorst = new ThreadLocalIndex();    @Override    public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {        FaultItem old = this.faultItemTable.get(name);        if (null == old) {            final FaultItem faultItem = new FaultItem(name);            faultItem.setCurrentLatency(currentLatency);            faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);            old = this.faultItemTable.putIfAbsent(name, faultItem);            if (old != null) {                old.setCurrentLatency(currentLatency);                old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);            }        } else {            old.setCurrentLatency(currentLatency);            old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);        }    }    @Override    public boolean isAvailable(final String name) {        final FaultItem faultItem = this.faultItemTable.get(name);        if (faultItem != null) {            return faultItem.isAvailable();        }        return true;    }    @Override    public void remove(final String name) {        this.faultItemTable.remove(name);    }    @Override    public String pickOneAtLeast() {        final Enumeration<FaultItem> elements = this.faultItemTable.elements();        List<FaultItem> tmpList = new LinkedList<FaultItem>();        while (elements.hasMoreElements()) {            final FaultItem faultItem = elements.nextElement();            tmpList.add(faultItem);        }        if (!tmpList.isEmpty()) {            // 打乱 + 排序。TODO 疑问:应该只能二选一。猜测Collections.shuffle(tmpList)去掉。            Collections.shuffle(tmpList);            Collections.sort(tmpList);            // 选择顺序在前一半的对象            final int half = tmpList.size() / 2;            if (half <= 0) {                return tmpList.get(0).getName();            } else {                final int i = this.whichItemWorst.getAndIncrement() % half;                return tmpList.get(i).getName();            }        }        return null;    }    @Override    public String toString() {        return "LatencyFaultToleranceImpl{" +            "faultItemTable=" + faultItemTable +            ", whichItemWorst=" + whichItemWorst +            '}';    }    class FaultItem implements Comparable<FaultItem> {        private final String name;        private volatile long currentLatency;        private volatile long startTimestamp;        public FaultItem(final String name) {            this.name = name;        }        @Override        public int compareTo(final FaultItem other) {            if (this.isAvailable() != other.isAvailable()) {                if (this.isAvailable()) {                    return -1;                }                if (other.isAvailable()) {                    return 1;                }            }            if (this.currentLatency < other.currentLatency) {                return -1;            } else if (this.currentLatency > other.currentLatency) {                return 1;            }            if (this.startTimestamp < other.startTimestamp) {                return -1;            } else if (this.startTimestamp > other.startTimestamp) {                return 1;            }            return 0;        }        public boolean isAvailable() {            return (System.currentTimeMillis() - startTimestamp) >= 0;        }        @Override        public int hashCode() {            int result = getName() != null ? getName().hashCode() : 0;            result = 31 * result + (int) (getCurrentLatency() ^ (getCurrentLatency() >>> 32));            result = 31 * result + (int) (getStartTimestamp() ^ (getStartTimestamp() >>> 32));            return result;        }        @Override        public boolean equals(final Object o) {            if (this == o) {                return true;            }            if (!(o instanceof FaultItem)) {                return false;            }            final FaultItem faultItem = (FaultItem) o;            if (getCurrentLatency() != faultItem.getCurrentLatency()) {                return false;            }            if (getStartTimestamp() != faultItem.getStartTimestamp()) {                return false;            }            return getName() != null ? getName().equals(faultItem.getName()) : faultItem.getName() == null;        }        @Override        public String toString() {            return "FaultItem{" +                "name='" + name + '/'' +                ", currentLatency=" + currentLatency +                ", startTimestamp=" + startTimestamp +                '}';        }        public String getName() {            return name;        }        public long getCurrentLatency() {            return currentLatency;        }        public void setCurrentLatency(final long currentLatency) {            this.currentLatency = currentLatency;        }        public long getStartTimestamp() {            return startTimestamp;        }        public void setStartTimestamp(final long startTimestamp) {            this.startTimestamp = startTimestamp;        }    }}

FaultItem

说明 :对象故障信息。维护对象的名字、延迟、开始可用的时间。

    class FaultItem implements Comparable<FaultItem> {        /**         * 对象名         */        private final String name;        /**         * 延迟         */        private volatile long currentLatency;        /**         * 开始可用时间         */        private volatile long startTimestamp;        public FaultItem(final String name) {            this.name = name;        }        /**         * 可用性 > 延迟 > 开始可用时间         *         * @param other 另一对象         * @return 升序         */        @Override        public int compareTo(final FaultItem other) {            if (this.isAvailable() != other.isAvailable()) {                if (this.isAvailable()) {                    return -1;                }                if (other.isAvailable()) {                    return 1;                }            }            if (this.currentLatency < other.currentLatency) {                return -1;            } else if (this.currentLatency > other.currentLatency) {                return 1;            }            if (this.startTimestamp < other.startTimestamp) {                return -1;            } else if (this.startTimestamp > other.startTimestamp) {                return 1;            }            return 0;        }        public boolean isAvailable() {            return (System.currentTimeMillis() - startTimestamp) >= 0;        }        @Override        public int hashCode() {            int result = getName() != null ? getName().hashCode() : 0;            result = 31 * result + (int) (getCurrentLatency() ^ (getCurrentLatency() >>> 32));            result = 31 * result + (int) (getStartTimestamp() ^ (getStartTimestamp() >>> 32));            return result;        }        @Override        public boolean equals(final Object o) {            if (this == o) {                return true;            }            if (!(o instanceof FaultItem)) {                return false;            }            final FaultItem faultItem = (FaultItem) o;            if (getCurrentLatency() != faultItem.getCurrentLatency()) {                return false;            }            if (getStartTimestamp() != faultItem.getStartTimestamp()) {                return false;            }            return getName() != null ? getName().equals(faultItem.getName()) : faultItem.getName() == null;        }        @Override        public String toString() {            return "FaultItem{" +                    "name='" + name + '/'' +                    ", currentLatency=" + currentLatency +                    ", startTimestamp=" + startTimestamp +                    '}';        }        public String getName() {            return name;        }        public long getCurrentLatency() {            return currentLatency;        }        public void setCurrentLatency(final long currentLatency) {            this.currentLatency = currentLatency;        }        public long getStartTimestamp() {            return startTimestamp;        }        public void setStartTimestamp(final long startTimestamp) {            this.startTimestamp = startTimestamp;        }    }

DefaultMQProducerImpl#sendKernelImpl()

说明 :发送消息核心方法。该方法真正发起网络请求,发送消息给 Broker。

    private SendResult sendKernelImpl(final Message msg,        final MessageQueue mq,        final CommunicationMode communicationMode,        final SendCallback sendCallback,        final TopicPublishInfo topicPublishInfo,        final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {        long beginStartTime = System.currentTimeMillis();        // 获取 broker地址        String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());        if (null == brokerAddr) {            tryToFindTopicPublishInfo(mq.getTopic());            brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());        }        SendMessageContext context = null;        if (brokerAddr != null) {            // 是否使用broker vip通道。broker会开启两个端口对外服务。            brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);            byte[] prevBody = msg.getBody();            try {                // for MessageBatch,ID has been set in the generating process                // 设置唯一编号                if (!(msg instanceof MessageBatch)) {                    MessageClientIDSetter.setUniqID(msg);                }                boolean topicWithNamespace = false;                if (null != this.mQClientFactory.getClientConfig().getNamespace()) {                    msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace());                    topicWithNamespace = true;                }                int sysFlag = 0;                boolean msgBodyCompressed = false;                // 消息压缩                if (this.tryToCompressMessage(msg)) {                    sysFlag |= MessageSysFlag.COMPRESSED_FLAG;                    msgBodyCompressed = true;                }                // 事务                final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);                if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {                    sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;                }                // hook:发送消息校验                if (hasCheckForbiddenHook()) {                    CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();                    checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());                    checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());                    checkForbiddenContext.setCommunicationMode(communicationMode);                    checkForbiddenContext.setBrokerAddr(brokerAddr);                    checkForbiddenContext.setMessage(msg);                    checkForbiddenContext.setMq(mq);                    checkForbiddenContext.setUnitMode(this.isUnitMode());                    this.executeCheckForbiddenHook(checkForbiddenContext);                }                // hook:发送消息前逻辑                if (this.hasSendMessageHook()) {                    context = new SendMessageContext();                    context.setProducer(this);                    context.setProducerGroup(this.defaultMQProducer.getProducerGroup());                    context.setCommunicationMode(communicationMode);                    context.setBornHost(this.defaultMQProducer.getClientIP());                    context.setBrokerAddr(brokerAddr);                    context.setMessage(msg);                    context.setMq(mq);                    context.setNamespace(this.defaultMQProducer.getNamespace());                    String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);                    if (isTrans != null && isTrans.equals("true")) {                        context.setMsgType(MessageType.Trans_Msg_Half);                    }                    if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {                        context.setMsgType(MessageType.Delay_Msg);                    }                    this.executeSendMessageHookBefore(context);                }                // 构建发送消息请求                SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();                requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());                requestHeader.setTopic(msg.getTopic());                requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());                requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());                requestHeader.setQueueId(mq.getQueueId());                requestHeader.setSysFlag(sysFlag);                requestHeader.setBornTimestamp(System.currentTimeMillis());                requestHeader.setFlag(msg.getFlag());                requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));                requestHeader.setReconsumeTimes(0);                requestHeader.setUnitMode(this.isUnitMode());                requestHeader.setBatch(msg instanceof MessageBatch);                if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {                    String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);                    if (reconsumeTimes != null) {                        requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));                        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);                    }                    String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);                    if (maxReconsumeTimes != null) {                        requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));                        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);                    }                }                // 发送消息                SendResult sendResult = null;                switch (communicationMode) {                    case ASYNC:                        Message tmpMessage = msg;                        boolean messageCloned = false;                        if (msgBodyCompressed) {                            //If msg body was compressed, msgbody should be reset using prevBody.                            //Clone new message using commpressed message body and recover origin massage.                            //Fix bug:https://github.com/apache/rocketmq-externals/issues/66                            tmpMessage = MessageAccessor.cloneMessage(msg);                            messageCloned = true;                            msg.setBody(prevBody);                        }                        if (topicWithNamespace) {                            if (!messageCloned) {                                tmpMessage = MessageAccessor.cloneMessage(msg);                                messageCloned = true;                            }                            msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));                        }                        long costTimeAsync = System.currentTimeMillis() - beginStartTime;                        if (timeout < costTimeAsync) {                            throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");                        }                        sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(                            brokerAddr,                            mq.getBrokerName(),                            tmpMessage,                            requestHeader,                            timeout - costTimeAsync,                            communicationMode,                            sendCallback,                            topicPublishInfo,                            this.mQClientFactory,                            this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),                            context,                            this);                        break;                    case ONEWAY:                    case SYNC:                        long costTimeSync = System.currentTimeMillis() - beginStartTime;                        if (timeout < costTimeSync) {                            throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");                        }                        sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(                            brokerAddr,                            mq.getBrokerName(),                            msg,                            requestHeader,                            timeout - costTimeSync,                            communicationMode,                            context,                            this);                        break;                    default:                        assert false;                        break;                }                // hook:发送消息后逻辑                if (this.hasSendMessageHook()) {                    context.setSendResult(sendResult);                    this.executeSendMessageHookAfter(context);                }                // 返回发送结果                return sendResult;            } catch (RemotingException e) {                if (this.hasSendMessageHook()) {                    context.setException(e);                    this.executeSendMessageHookAfter(context);                }                throw e;            } catch (MQBrokerException e) {                if (this.hasSendMessageHook()) {                    context.setException(e);                    this.executeSendMessageHookAfter(context);                }                throw e;            } catch (InterruptedException e) {                if (this.hasSendMessageHook()) {                    context.setException(e);                    this.executeSendMessageHookAfter(context);                }                throw e;            } finally {                msg.setBody(prevBody);                msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));            }        }        // broker为空抛出异常        throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);    }

3、Broker 接收消息

SendMessageProcessor#sendMessage

#processRequest() 说明 :处理消息请求。

#sendMessage() 说明 :发送消息,并返回发送消息结果。

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements.  See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License.  You may obtain a copy of the License at * *     http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.rocketmq.broker.processor;import java.net.SocketAddress;import java.util.List;import java.util.Map;import io.netty.channel.ChannelHandlerContext;import org.apache.rocketmq.broker.BrokerController;import org.apache.rocketmq.broker.mqtrace.ConsumeMessageContext;import org.apache.rocketmq.broker.mqtrace.ConsumeMessageHook;import org.apache.rocketmq.broker.mqtrace.SendMessageContext;import org.apache.rocketmq.common.MQVersion;import org.apache.rocketmq.common.MixAll;import org.apache.rocketmq.common.TopicConfig;import org.apache.rocketmq.common.TopicFilterType;import org.apache.rocketmq.common.UtilAll;import org.apache.rocketmq.common.constant.PermName;import org.apache.rocketmq.common.help.FAQUrl;import org.apache.rocketmq.common.message.MessageAccessor;import org.apache.rocketmq.common.message.MessageConst;import org.apache.rocketmq.common.message.MessageDecoder;import org.apache.rocketmq.common.message.MessageExt;import org.apache.rocketmq.common.message.MessageExtBatch;import org.apache.rocketmq.common.protocol.NamespaceUtil;import org.apache.rocketmq.common.protocol.RequestCode;import org.apache.rocketmq.common.protocol.ResponseCode;import org.apache.rocketmq.common.protocol.header.ConsumerSendMsgBackRequestHeader;import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;import org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader;import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;import org.apache.rocketmq.common.sysflag.MessageSysFlag;import org.apache.rocketmq.common.sysflag.TopicSysFlag;import org.apache.rocketmq.remoting.exception.RemotingCommandException;import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;import org.apache.rocketmq.remoting.protocol.RemotingCommand;import org.apache.rocketmq.store.MessageExtBrokerInner;import org.apache.rocketmq.store.PutMessageResult;import org.apache.rocketmq.store.config.StorePathConfigHelper;import org.apache.rocketmq.store.stats.BrokerStatsManager;public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {    private List<ConsumeMessageHook> consumeMessageHookList;    public SendMessageProcessor(final BrokerController brokerController) {        super(brokerController);    }    @Override    public RemotingCommand processRequest(ChannelHandlerContext ctx,                                          RemotingCommand request) throws RemotingCommandException {        SendMessageContext mqtraceContext;        switch (request.getCode()) {            case RequestCode.CONSUMER_SEND_MSG_BACK:                return this.consumerSendMsgBack(ctx, request);            default:                // 解析请求                SendMessageRequestHeader requestHeader = parseRequestHeader(request);                if (requestHeader == null) {                    return null;                }                // 发送请求Context。在 hook 场景下使用                mqtraceContext = buildMsgContext(ctx, requestHeader);                // hook:处理发送消息前逻辑                this.executeSendMessageHookBefore(ctx, request, mqtraceContext);                RemotingCommand response;                // 处理发送消息逻辑                if (requestHeader.isBatch()) {                    response = this.sendBatchMessage(ctx, request, mqtraceContext, requestHeader);                } else {                    response = this.sendMessage(ctx, request, mqtraceContext, requestHeader);                }                // hook:处理发送消息后逻辑                this.executeSendMessageHookAfter(response, mqtraceContext);                return response;        }    }    @Override    public boolean rejectRequest() {        return this.brokerController.getMessageStore().isOSPageCacheBusy() ||            this.brokerController.getMessageStore().isTransientStorePoolDeficient();    }    private RemotingCommand consumerSendMsgBack(final ChannelHandlerContext ctx, final RemotingCommand request)        throws RemotingCommandException {        final RemotingCommand response = RemotingCommand.createResponseCommand(null);        final ConsumerSendMsgBackRequestHeader requestHeader =            (ConsumerSendMsgBackRequestHeader)request.decodeCommandCustomHeader(ConsumerSendMsgBackRequestHeader.class);        String namespace = NamespaceUtil.getNamespaceFromResource(requestHeader.getGroup());        if (this.hasConsumeMessageHook() && !UtilAll.isBlank(requestHeader.getOriginMsgId())) {            ConsumeMessageContext context = new ConsumeMessageContext();            context.setNamespace(namespace);            context.setConsumerGroup(requestHeader.getGroup());            context.setTopic(requestHeader.getOriginTopic());            context.setCommercialRcvStats(BrokerStatsManager.StatsType.SEND_BACK);            context.setCommercialRcvTimes(1);            context.setCommercialOwner(request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER));            this.executeConsumeMessageHookAfter(context);        }        SubscriptionGroupConfig subscriptionGroupConfig =            this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getGroup());        if (null == subscriptionGroupConfig) {            response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);            response.setRemark("subscription group not exist, " + requestHeader.getGroup() + " "                + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));            return response;        }        if (!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission())) {            response.setCode(ResponseCode.NO_PERMISSION);            response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending message is forbidden");            return response;        }        if (subscriptionGroupConfig.getRetryQueueNums() <= 0) {            response.setCode(ResponseCode.SUCCESS);            response.setRemark(null);            return response;        }        String newTopic = MixAll.getRetryTopic(requestHeader.getGroup());        int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % subscriptionGroupConfig.getRetryQueueNums();        int topicSysFlag = 0;        if (requestHeader.isUnitMode()) {            topicSysFlag = TopicSysFlag.buildSysFlag(false, true);        }        TopicConfig topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(            newTopic,            subscriptionGroupConfig.getRetryQueueNums(),            PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);        if (null == topicConfig) {            response.setCode(ResponseCode.SYSTEM_ERROR);            response.setRemark("topic[" + newTopic + "] not exist");            return response;        }        if (!PermName.isWriteable(topicConfig.getPerm())) {            response.setCode(ResponseCode.NO_PERMISSION);            response.setRemark(String.format("the topic[%s] sending message is forbidden", newTopic));            return response;        }        MessageExt msgExt = this.brokerController.getMessageStore().lookMessageByOffset(requestHeader.getOffset());        if (null == msgExt) {            response.setCode(ResponseCode.SYSTEM_ERROR);            response.setRemark("look message by offset failed, " + requestHeader.getOffset());            return response;        }        final String retryTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);        if (null == retryTopic) {            MessageAccessor.putProperty(msgExt, MessageConst.PROPERTY_RETRY_TOPIC, msgExt.getTopic());        }        msgExt.setWaitStoreMsgOK(false);        int delayLevel = requestHeader.getDelayLevel();        int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();        if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) {            maxReconsumeTimes = requestHeader.getMaxReconsumeTimes();        }        if (msgExt.getReconsumeTimes() >= maxReconsumeTimes            || delayLevel < 0) {            newTopic = MixAll.getDLQTopic(requestHeader.getGroup());            queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;            topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,                DLQ_NUMS_PER_GROUP,                PermName.PERM_WRITE, 0            );            if (null == topicConfig) {                response.setCode(ResponseCode.SYSTEM_ERROR);                response.setRemark("topic[" + newTopic + "] not exist");                return response;            }        } else {            if (0 == delayLevel) {                delayLevel = 3 + msgExt.getReconsumeTimes();            }            msgExt.setDelayTimeLevel(delayLevel);        }        MessageExtBrokerInner msgInner = new MessageExtBrokerInner();        msgInner.setTopic(newTopic);        msgInner.setBody(msgExt.getBody());        msgInner.setFlag(msgExt.getFlag());        MessageAccessor.setProperties(msgInner, msgExt.getProperties());        msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));        msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(null, msgExt.getTags()));        msgInner.setQueueId(queueIdInt);        msgInner.setSysFlag(msgExt.getSysFlag());        msgInner.setBornTimestamp(msgExt.getBornTimestamp());        msgInner.setBornHost(msgExt.getBornHost());        msgInner.setStoreHost(this.getStoreHost());        msgInner.setReconsumeTimes(msgExt.getReconsumeTimes() + 1);        String originMsgId = MessageAccessor.getOriginMessageId(msgExt);        MessageAccessor.setOriginMessageId(msgInner, UtilAll.isBlank(originMsgId) ? msgExt.getMsgId() : originMsgId);        PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);        if (putMessageResult != null) {            switch (putMessageResult.getPutMessageStatus()) {                case PUT_OK:                    String backTopic = msgExt.getTopic();                    String correctTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);                    if (correctTopic != null) {                        backTopic = correctTopic;                    }                    this.brokerController.getBrokerStatsManager().incSendBackNums(requestHeader.getGroup(), backTopic);                    response.setCode(ResponseCode.SUCCESS);                    response.setRemark(null);                    return response;                default:                    break;            }            response.setCode(ResponseCode.SYSTEM_ERROR);            response.setRemark(putMessageResult.getPutMessageStatus().name());            return response;        }        response.setCode(ResponseCode.SYSTEM_ERROR);        response.setRemark("putMessageResult is null");        return response;    }    private boolean handleRetryAndDLQ(SendMessageRequestHeader requestHeader, RemotingCommand response,                                      RemotingCommand request,                                      MessageExt msg, TopicConfig topicConfig) {        String newTopic = requestHeader.getTopic();        if (null != newTopic && newTopic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {            // 获取订阅分组配置            String groupName = newTopic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length());            SubscriptionGroupConfig subscriptionGroupConfig =                this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(groupName);            if (null == subscriptionGroupConfig) {                response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);                response.setRemark(                    "subscription group not exist, " + groupName + " " + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));                return false;            }            // 计算最大可消费次数            int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();            if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) {                maxReconsumeTimes = requestHeader.getMaxReconsumeTimes();            }            int reconsumeTimes = requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes();            // 超过最大消费次数            if (reconsumeTimes >= maxReconsumeTimes) {                newTopic = MixAll.getDLQTopic(groupName);                int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;                topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,                    DLQ_NUMS_PER_GROUP,                    PermName.PERM_WRITE, 0                );                msg.setTopic(newTopic);                msg.setQueueId(queueIdInt);                if (null == topicConfig) {                    response.setCode(ResponseCode.SYSTEM_ERROR);                    response.setRemark("topic[" + newTopic + "] not exist");                    return false;                }            }        }        int sysFlag = requestHeader.getSysFlag();        if (TopicFilterType.MULTI_TAG == topicConfig.getTopicFilterType()) {            sysFlag |= MessageSysFlag.MULTI_TAGS_FLAG;        }        msg.setSysFlag(sysFlag);        return true;    }    private RemotingCommand sendMessage(final ChannelHandlerContext ctx,                                        final RemotingCommand request,                                        final SendMessageContext sendMessageContext,                                        final SendMessageRequestHeader requestHeader) throws RemotingCommandException {        // 初始化响应        final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);        final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();        response.setOpaque(request.getOpaque());        response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId());        response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn()));        log.debug("receive SendMessage request command, {}", request);        // 如果未开始接收消息,抛出系统异常        final long startTimstamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp();        if (this.brokerController.getMessageStore().now() < startTimstamp) {            response.setCode(ResponseCode.SYSTEM_ERROR);            response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimstamp)));            return response;        }        // 消息配置(Topic配置)校验        response.setCode(-1);        super.msgCheck(ctx, requestHeader, response);        if (response.getCode() != -1) {            return response;        }        final byte[] body = request.getBody();        // 如果队列小于0,从可用队列随机选择        int queueIdInt = requestHeader.getQueueId();        TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());        if (queueIdInt < 0) {            queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums();        }        MessageExtBrokerInner msgInner = new MessageExtBrokerInner();        msgInner.setTopic(requestHeader.getTopic());        msgInner.setQueueId(queueIdInt);        // 对RETRY类型的消息处理。如果超过最大消费次数,则topic修改成"%DLQ%" + 分组名,即加入 死信队列(Dead Letter Queue)        if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig)) {            return response;        }        // 创建MessageExtBrokerInner        msgInner.setBody(body);        msgInner.setFlag(requestHeader.getFlag());        MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties()));        msgInner.setBornTimestamp(requestHeader.getBornTimestamp());        msgInner.setBornHost(ctx.channel().remoteAddress());        msgInner.setStoreHost(this.getStoreHost());        msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());        String clusterName = this.brokerController.getBrokerConfig().getBrokerClusterName();        MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_CLUSTER, clusterName);        msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));        PutMessageResult putMessageResult = null;        Map<String, String> oriProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());        String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);        // 校验是否不允许发送事务消息        if (traFlag != null && Boolean.parseBoolean(traFlag)) {            if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {                response.setCode(ResponseCode.NO_PERMISSION);                response.setRemark(                    "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()                        + "] sending transaction message is forbidden");                return response;            }            putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner);        } else {            putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);        }        return handlePutMessageResult(putMessageResult, response, request, msgInner, responseHeader, sendMessageContext, ctx, queueIdInt);    }    private RemotingCommand handlePutMessageResult(PutMessageResult putMessageResult, RemotingCommand response,                                                   RemotingCommand request, MessageExt msg,                                                   SendMessageResponseHeader responseHeader, SendMessageContext sendMessageContext, ChannelHandlerContext ctx,                                                   int queueIdInt) {        if (putMessageResult == null) {            response.setCode(ResponseCode.SYSTEM_ERROR);            response.setRemark("store putMessage return null");            return response;        }        boolean sendOK = false;        switch (putMessageResult.getPutMessageStatus()) {            // Success            case PUT_OK:                sendOK = true;                response.setCode(ResponseCode.SUCCESS);                break;            case FLUSH_DISK_TIMEOUT:                response.setCode(ResponseCode.FLUSH_DISK_TIMEOUT);                sendOK = true;                break;            case FLUSH_SLAVE_TIMEOUT:                response.setCode(ResponseCode.FLUSH_SLAVE_TIMEOUT);                sendOK = true;                break;            case SLAVE_NOT_AVAILABLE:                response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE);                sendOK = true;                break;            // Failed            case CREATE_MAPEDFILE_FAILED:                response.setCode(ResponseCode.SYSTEM_ERROR);                response.setRemark("create mapped file failed, server is busy or broken.");                break;            case MESSAGE_ILLEGAL:            case PROPERTIES_SIZE_EXCEEDED:                response.setCode(ResponseCode.MESSAGE_ILLEGAL);                response.setRemark(                    "the message is illegal, maybe msg body or properties length not matched. msg body length limit 128k, msg properties length limit 32k.");                break;            case SERVICE_NOT_AVAILABLE:                response.setCode(ResponseCode.SERVICE_NOT_AVAILABLE);                response.setRemark(                    "service not available now, maybe disk full, " + diskUtil() + ", maybe your broker machine memory too small.");                break;            case OS_PAGECACHE_BUSY:                response.setCode(ResponseCode.SYSTEM_ERROR);                response.setRemark("[PC_SYNCHRONIZED]broker busy, start flow control for a while");                break;            case UNKNOWN_ERROR:                response.setCode(ResponseCode.SYSTEM_ERROR);                response.setRemark("UNKNOWN_ERROR");                break;            default:                response.setCode(ResponseCode.SYSTEM_ERROR);                response.setRemark("UNKNOWN_ERROR DEFAULT");                break;        }        String owner = request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER);        if (sendOK) {            // 统计            this.brokerController.getBrokerStatsManager().incTopicPutNums(msg.getTopic(), putMessageResult.getAppendMessageResult().getMsgNum(), 1);            this.brokerController.getBrokerStatsManager().incTopicPutSize(msg.getTopic(),                putMessageResult.getAppendMessageResult().getWroteBytes());            this.brokerController.getBrokerStatsManager().incBrokerPutNums(putMessageResult.getAppendMessageResult().getMsgNum());            // 响应            response.setRemark(null);            responseHeader.setMsgId(putMessageResult.getAppendMessageResult().getMsgId());            responseHeader.setQueueId(queueIdInt);            responseHeader.setQueueOffset(putMessageResult.getAppendMessageResult().getLogicsOffset());            doResponse(ctx, request, response);            // hook:设置发送成功到context            if (hasSendMessageHook()) {                sendMessageContext.setMsgId(responseHeader.getMsgId());                sendMessageContext.setQueueId(responseHeader.getQueueId());                sendMessageContext.setQueueOffset(responseHeader.getQueueOffset());                int commercialBaseCount = brokerController.getBrokerConfig().getCommercialBaseCount();                int wroteSize = putMessageResult.getAppendMessageResult().getWroteBytes();                int incValue = (int)Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT) * commercialBaseCount;                sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_SUCCESS);                sendMessageContext.setCommercialSendTimes(incValue);                sendMessageContext.setCommercialSendSize(wroteSize);                sendMessageContext.setCommercialOwner(owner);            }            return null;        } else {            // hook:设置发送失败到context            if (hasSendMessageHook()) {                int wroteSize = request.getBody().length;                int incValue = (int)Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT);                sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_FAILURE);                sendMessageContext.setCommercialSendTimes(incValue);                sendMessageContext.setCommercialSendSize(wroteSize);                sendMessageContext.setCommercialOwner(owner);            }        }        return response;    }    private RemotingCommand sendBatchMessage(final ChannelHandlerContext ctx,                                             final RemotingCommand request,                                             final SendMessageContext sendMessageContext,                                             final SendMessageRequestHeader requestHeader) throws RemotingCommandException {        final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);        final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();        response.setOpaque(request.getOpaque());        response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId());        response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn()));        log.debug("Receive SendMessage request command {}", request);        final long startTimstamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp();        if (this.brokerController.getMessageStore().now() < startTimstamp) {            response.setCode(ResponseCode.SYSTEM_ERROR);            response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimstamp)));            return response;        }        response.setCode(-1);        super.msgCheck(ctx, requestHeader, response);        if (response.getCode() != -1) {            return response;        }        int queueIdInt = requestHeader.getQueueId();        TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());        if (queueIdInt < 0) {            queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums();        }        if (requestHeader.getTopic().length() > Byte.MAX_VALUE) {            response.setCode(ResponseCode.MESSAGE_ILLEGAL);            response.setRemark("message topic length too long " + requestHeader.getTopic().length());            return response;        }        if (requestHeader.getTopic() != null && requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {            response.setCode(ResponseCode.MESSAGE_ILLEGAL);            response.setRemark("batch request does not support retry group " + requestHeader.getTopic());            return response;        }        MessageExtBatch messageExtBatch = new MessageExtBatch();        messageExtBatch.setTopic(requestHeader.getTopic());        messageExtBatch.setQueueId(queueIdInt);        int sysFlag = requestHeader.getSysFlag();        if (TopicFilterType.MULTI_TAG == topicConfig.getTopicFilterType()) {            sysFlag |= MessageSysFlag.MULTI_TAGS_FLAG;        }        messageExtBatch.setSysFlag(sysFlag);        messageExtBatch.setFlag(requestHeader.getFlag());        MessageAccessor.setProperties(messageExtBatch, MessageDecoder.string2messageProperties(requestHeader.getProperties()));        messageExtBatch.setBody(request.getBody());        messageExtBatch.setBornTimestamp(requestHeader.getBornTimestamp());        messageExtBatch.setBornHost(ctx.channel().remoteAddress());        messageExtBatch.setStoreHost(this.getStoreHost());        messageExtBatch.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());        String clusterName = this.brokerController.getBrokerConfig().getBrokerClusterName();        MessageAccessor.putProperty(messageExtBatch, MessageConst.PROPERTY_CLUSTER, clusterName);        PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessages(messageExtBatch);        return handlePutMessageResult(putMessageResult, response, request, messageExtBatch, responseHeader, sendMessageContext, ctx, queueIdInt);    }    public boolean hasConsumeMessageHook() {        return consumeMessageHookList != null && !this.consumeMessageHookList.isEmpty();    }    public void executeConsumeMessageHookAfter(final ConsumeMessageContext context) {        if (hasConsumeMessageHook()) {            for (ConsumeMessageHook hook : this.consumeMessageHookList) {                try {                    hook.consumeMessageAfter(context);                } catch (Throwable e) {                    // Ignore                }            }        }    }    public SocketAddress getStoreHost() {        return storeHost;    }    private String diskUtil() {        String storePathPhysic = this.brokerController.getMessageStoreConfig().getStorePathCommitLog();        double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic);        String storePathLogis =            StorePathConfigHelper.getStorePathConsumeQueue(this.brokerController.getMessageStoreConfig().getStorePathRootDir());        double logisRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathLogis);        String storePathIndex =            StorePathConfigHelper.getStorePathIndex(this.brokerController.getMessageStoreConfig().getStorePathRootDir());        double indexRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathIndex);        return String.format("CL: %5.2f CQ: %5.2f INDEX: %5.2f", physicRatio, logisRatio, indexRatio);    }    public void registerConsumeMessageHook(List<ConsumeMessageHook> consumeMessageHookList) {        this.consumeMessageHookList = consumeMessageHookList;    }}

AbstractSendMessageProcessor#msgCheck

说明:校验消息是否正确,主要是Topic配置方面,例如:Broker 是否有写入权限,topic配置是否存在,队列编号是否正确。

  protected RemotingCommand msgCheck(final ChannelHandlerContext ctx,        final SendMessageRequestHeader requestHeader, final RemotingCommand response) {        // 检查 broker 是否有写入权限        if (!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission())            && this.brokerController.getTopicConfigManager().isOrderTopic(requestHeader.getTopic())) {            response.setCode(ResponseCode.NO_PERMISSION);            response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()                + "] sending message is forbidden");            return response;        }        // 检查topic是否可以被发送。目前是{@link MixAll.DEFAULT_TOPIC}不被允许发送        if (!this.brokerController.getTopicConfigManager().isTopicCanSendMessage(requestHeader.getTopic())) {            String errorMsg = "the topic[" + requestHeader.getTopic() + "] is conflict with system reserved words.";            log.warn(errorMsg);            response.setCode(ResponseCode.SYSTEM_ERROR);            response.setRemark(errorMsg);            return response;        }        TopicConfig topicConfig =            this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());        // 不存在topicConfig,则进行创建        if (null == topicConfig) {            int topicSysFlag = 0;            if (requestHeader.isUnitMode()) {                if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {                    topicSysFlag = TopicSysFlag.buildSysFlag(false, true);                } else {                    topicSysFlag = TopicSysFlag.buildSysFlag(true, false);                }            }            // 创建topic配置            log.warn("the topic {} not exist, producer: {}", requestHeader.getTopic(), ctx.channel().remoteAddress());            topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageMethod(                requestHeader.getTopic(),                requestHeader.getDefaultTopic(),                RemotingHelper.parseChannelRemoteAddr(ctx.channel()),                requestHeader.getDefaultTopicQueueNums(), topicSysFlag);            // 如果没配置            if (null == topicConfig) {                if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {                    topicConfig =                        this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(                            requestHeader.getTopic(), 1, PermName.PERM_WRITE | PermName.PERM_READ,                            topicSysFlag);                }            }            if (null == topicConfig) {                response.setCode(ResponseCode.TOPIC_NOT_EXIST);                response.setRemark("topic[" + requestHeader.getTopic() + "] not exist, apply first please!"                    + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));                return response;            }        }        // 队列编号是否正确        int queueIdInt = requestHeader.getQueueId();        int idValid = Math.max(topicConfig.getWriteQueueNums(), topicConfig.getReadQueueNums());        if (queueIdInt >= idValid) {            String errorInfo = String.format("request queueId[%d] is illegal, %s Producer: %s",                queueIdInt,                topicConfig.toString(),                RemotingHelper.parseChannelRemoteAddr(ctx.channel()));            log.warn(errorInfo);            response.setCode(ResponseCode.SYSTEM_ERROR);            response.setRemark(errorInfo);            return response;        }        return response;    }

DefaultMessageStore#putMessage

说明:存储消息封装,最终存储需要 CommitLog 实现。

    @Override    public PutMessageResult putMessage(MessageExtBrokerInner msg) {        if (this.shutdown) {            log.warn("message store has shutdown, so putMessage is forbidden");            return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);        }        // // 从节点不允许写入        if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {            long value = this.printTimes.getAndIncrement();            if ((value % 50000) == 0) {                log.warn("message store is slave mode, so putMessage is forbidden ");            }            return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);        }        // store是否允许写入        if (!this.runningFlags.isWriteable()) {            long value = this.printTimes.getAndIncrement();            if ((value % 50000) == 0) {                log.warn("message store is not writeable, so putMessage is forbidden " + this.runningFlags.getFlagBits());            }            return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);        } else {            this.printTimes.set(0);        }        // 消息过长        if (msg.getTopic().length() > Byte.MAX_VALUE) {            log.warn("putMessage message topic length too long " + msg.getTopic().length());            return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);        }        // 消息附加属性过长        if (msg.getPropertiesString() != null && msg.getPropertiesString().length() > Short.MAX_VALUE) {            log.warn("putMessage message properties length too long " + msg.getPropertiesString().length());            return new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null);        }        if (this.isOSPageCacheBusy()) {            return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null);        }        long beginTime = this.getSystemClock().now();        // 添加消息到commitLog        PutMessageResult result = this.commitLog.putMessage(msg);        long elapsedTime = this.getSystemClock().now() - beginTime;        if (elapsedTime > 500) {            log.warn("putMessage not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, msg.getBody().length);        }        this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);        if (null == result || !result.isOk()) {            this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();        }        return result;    }
.
.

广告 广告

评论区