1、概述Producer 发送消息。主要是同步发送消息源码,涉及到 异步/Oneway发送消息,事务消息会跳过。Broker 接收消息。Producer 发送消息DefaultMQProducer#s
1、概述
Producer 发送消息。主要是同步发送消息源码,涉及到 异步/Oneway发送消息,事务消息会跳过。
Broker 接收消息。
Producer 发送消息
DefaultMQProducer#send(Message)
/** * Send message in synchronous mode. This method returns only when the sending procedure totally completes. </p> * * <strong>Warn:</strong> this method has internal retry-mechanism, that is, internal implementation will retry * {@link #retryTimesWhenSendFailed} times before claiming failure. As a result, multiple messages may potentially * delivered to broker(s). It's up to the application developers to resolve potential duplication issue. * * @param msg Message to send. * @return {@link SendResult} instance to inform senders details of the deliverable, say Message ID of the message, * {@link SendStatus} indicating broker storage/replication status, message queue sent to, etc. * @throws MQClientException if there is any client error. * @throws RemotingException if there is any network-tier error. * @throws MQBrokerException if there is any error with broker. * @throws InterruptedException if the sending thread is interrupted. */ @Override public SendResult send( Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { Validators.checkMessage(msg, this); msg.setTopic(withNamespace(msg.getTopic())); return this.defaultMQProducerImpl.send(msg); }
DefaultMQProducerImpl#sendDefaultImpl()
说明 :发送消息。步骤:获取消息路由信息,选择要发送到的消息队列,执行消息发送核心方法,并对发送结果进行封装返回。
private SendResult sendDefaultImpl( Message msg, final CommunicationMode communicationMode, final SendCallback sendCallback, final long timeout ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { // 校验Producer处于运行状态 this.makeSureStateOK(); // 校验消息格式 Validators.checkMessage(msg, this.defaultMQProducer); // 调用编号;用于下面打印日志,标记为同一次发送消息 final long invokeID = random.nextLong(); long beginTimestampFirst = System.currentTimeMillis(); long beginTimestampPrev = beginTimestampFirst; long endTimestamp = beginTimestampFirst; // 获取 Topic路由信息 TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic()); if (topicPublishInfo != null && topicPublishInfo.ok()) { boolean callTimeout = false; // 最后选择消息要发送到的队列 MessageQueue mq = null; Exception exception = null; // 最后一次发送结果 SendResult sendResult = null; // 同步多次调用 int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1; int times = 0; String[] brokersSent = new String[timesTotal]; for (; times < timesTotal; times++) { String lastBrokerName = null == mq ? null : mq.getBrokerName(); // 选择消息要发送到的队列 MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); if (mqSelected != null) { mq = mqSelected; brokersSent[times] = mq.getBrokerName(); try { beginTimestampPrev = System.currentTimeMillis(); if (times > 0) { //Reset topic with namespace during resend. msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic())); } long costTime = beginTimestampPrev - beginTimestampFirst; if (timeout < costTime) { callTimeout = true; break; } // 调用发送消息核心方法 sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime); endTimestamp = System.currentTimeMillis(); // 更新Broker可用性信息 this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false); switch (communicationMode) { case ASYNC: return null; case ONEWAY: return null; case SYNC: if (sendResult.getSendStatus() != SendStatus.SEND_OK) { // 同步发送成功但存储有问题时 && 配置存储异常时重新发送开关 时,进行重试 if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) { continue; } } return sendResult; default: break; } } catch (RemotingException e) { // 打印异常,更新Broker可用性信息,更新继续循环 endTimestamp = System.currentTimeMillis(); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true); log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e); log.warn(msg.toString()); exception = e; continue; } catch (MQClientException e) { // 打印异常,更新Broker可用性信息,更新继续循环 endTimestamp = System.currentTimeMillis(); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true); log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e); log.warn(msg.toString()); exception = e; continue; } catch (MQBrokerException e) { // 打印异常,更新Broker可用性信息,部分情况下的异常,直接返回,结束循环 endTimestamp = System.currentTimeMillis(); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true); log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e); log.warn(msg.toString()); exception = e; switch (e.getResponseCode()) { // 如下异常continue,进行发送消息重试 case ResponseCode.TOPIC_NOT_EXIST: case ResponseCode.SERVICE_NOT_AVAILABLE: case ResponseCode.SYSTEM_ERROR: case ResponseCode.NO_PERMISSION: case ResponseCode.NO_BUYER_ID: case ResponseCode.NOT_IN_CURRENT_UNIT: continue; // 如果有发送结果,进行返回,否则,抛出异常; default: if (sendResult != null) { return sendResult; } throw e; } } catch (InterruptedException e) { // 打印异常,更新Broker可用性信息,更新继续循环 endTimestamp = System.currentTimeMillis(); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false); log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e); log.warn(msg.toString()); log.warn("sendKernelImpl exception", e); log.warn(msg.toString()); throw e; } } else { break; } } // 返回发送结果 if (sendResult != null) { return sendResult; } // 根据不同情况,抛出不同的异常 String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s", times, System.currentTimeMillis() - beginTimestampFirst, msg.getTopic(), Arrays.toString(brokersSent)); info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED); MQClientException mqClientException = new MQClientException(info, exception); if (callTimeout) { throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout"); } if (exception instanceof MQBrokerException) { mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode()); } else if (exception instanceof RemotingConnectException) { mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION); } else if (exception instanceof RemotingTimeoutException) { mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT); } else if (exception instanceof MQClientException) { mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION); } throw mqClientException; } // Namesrv找不到异常 validateNameServerSetting(); // 消息路由找不到异常 throw new MQClientException("No route info of this topic: " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO), null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION); }
DefaultMQProducerImpl#tryToFindTopicPublishInfo()
说明 :获得 Topic发布信息。优先从缓存topicPublishInfoTable,其次从Namesrv中获得。
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) { // 缓存中获取 Topic发布信息 TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic); // 当无可用的 Topic发布信息时,从Namesrv获取一次 if (null == topicPublishInfo || !topicPublishInfo.ok()) { this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo()); this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic); topicPublishInfo = this.topicPublishInfoTable.get(topic); } // 若获取的 Topic发布信息时候可用,则返回 if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) { return topicPublishInfo; } else { // 使用 {@link DefaultMQProducer#createTopicKey} 对应的 Topic发布信息。用于 Topic发布信息不存在 && Broker支持自动创建Topic this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer); topicPublishInfo = this.topicPublishInfoTable.get(topic); return topicPublishInfo; } }
MQFaultStrategy
说明 :Producer消息发送容错策略。默认情况下容错策略关闭,即sendLatencyFaultEnable=false。
| Producer发送消息消耗时长 | Broker不可用时长 || — | — || >= 15000 ms | 600 1000 ms || >= 3000 ms | 180 1000 ms || >= 2000 ms | 120 1000 ms || >= 1000 ms | 60 1000 ms || >= 550 ms | 30 * 1000 ms || >= 100 ms | 0 ms || >= 50 ms | 0 ms |
public class MQFaultStrategy { private final static InternalLogger log = ClientLogger.getLog(); /** * 延迟故障容错,维护每个Broker的发送消息的延迟 * key:brokerName */ private final LatencyFaultTolerance<String> latencyFaultTolerance = new LatencyFaultToleranceImpl(); /** * 发送消息延迟容错开关 */ private boolean sendLatencyFaultEnable = false; /** * 延迟级别数组 */ private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L}; /** * 不可用时长数组 */ private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L}; public long[] getNotAvailableDuration() { return notAvailableDuration; } public void setNotAvailableDuration(final long[] notAvailableDuration) { this.notAvailableDuration = notAvailableDuration; } public long[] getLatencyMax() { return latencyMax; } public void setLatencyMax(final long[] latencyMax) { this.latencyMax = latencyMax; } public boolean isSendLatencyFaultEnable() { return sendLatencyFaultEnable; } public void setSendLatencyFaultEnable(final boolean sendLatencyFaultEnable) { this.sendLatencyFaultEnable = sendLatencyFaultEnable; } /** * 根据 Topic发布信息 选择一个消息队列 * * @param tpInfo Topic发布信息 * @param lastBrokerName brokerName * @return 消息队列 */ public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) { if (this.sendLatencyFaultEnable) { try { // 获取 brokerName=lastBrokerName && 可用的一个消息队列 int index = tpInfo.getSendWhichQueue().getAndIncrement(); for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) { int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size(); if (pos < 0) { pos = 0; } MessageQueue mq = tpInfo.getMessageQueueList().get(pos); if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) { if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName)) { return mq; } } } // 选择一个相对好的broker,并获得其对应的一个消息队列,不考虑该队列的可用性 final String notBestBroker = latencyFaultTolerance.pickOneAtLeast(); int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker); if (writeQueueNums > 0) { final MessageQueue mq = tpInfo.selectOneMessageQueue(); if (notBestBroker != null) { mq.setBrokerName(notBestBroker); mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums); } return mq; } else { latencyFaultTolerance.remove(notBestBroker); } } catch (Exception e) { log.error("Error occurred when selecting message queue", e); } // 选择一个消息队列,不考虑队列的可用性 return tpInfo.selectOneMessageQueue(); } // 获得 lastBrokerName 对应的一个消息队列,不考虑该队列的可用性 return tpInfo.selectOneMessageQueue(lastBrokerName); } /** * 更新延迟容错信息 * @param brokerName brokerName * @param currentLatency 延迟 * @param isolation 是否隔离。当开启隔离时,默认延迟为30000。目前主要用于发送消息异常时 */ public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) { if (this.sendLatencyFaultEnable) { long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency); this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration); } } /** * 计算延迟对应的不可用时间 * @param currentLatency 延迟 * @return 不可用时间 */ private long computeNotAvailableDuration(final long currentLatency) { for (int i = latencyMax.length - 1; i >= 0; i--) { if (currentLatency >= latencyMax[i]) { return this.notAvailableDuration[i]; } } return 0; }}
LatencyFaultTolerance
延迟故障容错接口
public interface LatencyFaultTolerance<T> { /** * 更新对象的延迟和不可用时长 * * @param name 对象 * @param currentLatency 延迟时间 * @param notAvailableDuration 不可用时长 */ void updateFaultItem(final T name, final long currentLatency, final long notAvailableDuration); /** * 判断对象是否可用 * * @param name broker名称 * @return 是否可用 */ boolean isAvailable(final T name); /** * 移除对象 * * @param name 移除对象 */ void remove(final T name); /** * 获取一个对象 * * @return 对象 */ T pickOneAtLeast();}
LatencyFaultToleranceImpl
public class LatencyFaultToleranceImpl implements LatencyFaultTolerance<String> { /** * 对象故障信息Table */ private final ConcurrentHashMap<String, FaultItem> faultItemTable = new ConcurrentHashMap<String, FaultItem>(16); /** * 对象选择Index */ private final ThreadLocalIndex whichItemWorst = new ThreadLocalIndex(); @Override public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) { FaultItem old = this.faultItemTable.get(name); if (null == old) { final FaultItem faultItem = new FaultItem(name); faultItem.setCurrentLatency(currentLatency); faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration); old = this.faultItemTable.putIfAbsent(name, faultItem); if (old != null) { old.setCurrentLatency(currentLatency); old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration); } } else { old.setCurrentLatency(currentLatency); old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration); } } @Override public boolean isAvailable(final String name) { final FaultItem faultItem = this.faultItemTable.get(name); if (faultItem != null) { return faultItem.isAvailable(); } return true; } @Override public void remove(final String name) { this.faultItemTable.remove(name); } @Override public String pickOneAtLeast() { final Enumeration<FaultItem> elements = this.faultItemTable.elements(); List<FaultItem> tmpList = new LinkedList<FaultItem>(); while (elements.hasMoreElements()) { final FaultItem faultItem = elements.nextElement(); tmpList.add(faultItem); } if (!tmpList.isEmpty()) { // 打乱 + 排序。TODO 疑问:应该只能二选一。猜测Collections.shuffle(tmpList)去掉。 Collections.shuffle(tmpList); Collections.sort(tmpList); // 选择顺序在前一半的对象 final int half = tmpList.size() / 2; if (half <= 0) { return tmpList.get(0).getName(); } else { final int i = this.whichItemWorst.getAndIncrement() % half; return tmpList.get(i).getName(); } } return null; } @Override public String toString() { return "LatencyFaultToleranceImpl{" + "faultItemTable=" + faultItemTable + ", whichItemWorst=" + whichItemWorst + '}'; } class FaultItem implements Comparable<FaultItem> { private final String name; private volatile long currentLatency; private volatile long startTimestamp; public FaultItem(final String name) { this.name = name; } @Override public int compareTo(final FaultItem other) { if (this.isAvailable() != other.isAvailable()) { if (this.isAvailable()) { return -1; } if (other.isAvailable()) { return 1; } } if (this.currentLatency < other.currentLatency) { return -1; } else if (this.currentLatency > other.currentLatency) { return 1; } if (this.startTimestamp < other.startTimestamp) { return -1; } else if (this.startTimestamp > other.startTimestamp) { return 1; } return 0; } public boolean isAvailable() { return (System.currentTimeMillis() - startTimestamp) >= 0; } @Override public int hashCode() { int result = getName() != null ? getName().hashCode() : 0; result = 31 * result + (int) (getCurrentLatency() ^ (getCurrentLatency() >>> 32)); result = 31 * result + (int) (getStartTimestamp() ^ (getStartTimestamp() >>> 32)); return result; } @Override public boolean equals(final Object o) { if (this == o) { return true; } if (!(o instanceof FaultItem)) { return false; } final FaultItem faultItem = (FaultItem) o; if (getCurrentLatency() != faultItem.getCurrentLatency()) { return false; } if (getStartTimestamp() != faultItem.getStartTimestamp()) { return false; } return getName() != null ? getName().equals(faultItem.getName()) : faultItem.getName() == null; } @Override public String toString() { return "FaultItem{" + "name='" + name + '/'' + ", currentLatency=" + currentLatency + ", startTimestamp=" + startTimestamp + '}'; } public String getName() { return name; } public long getCurrentLatency() { return currentLatency; } public void setCurrentLatency(final long currentLatency) { this.currentLatency = currentLatency; } public long getStartTimestamp() { return startTimestamp; } public void setStartTimestamp(final long startTimestamp) { this.startTimestamp = startTimestamp; } }}
FaultItem
说明 :对象故障信息。维护对象的名字、延迟、开始可用的时间。
class FaultItem implements Comparable<FaultItem> { /** * 对象名 */ private final String name; /** * 延迟 */ private volatile long currentLatency; /** * 开始可用时间 */ private volatile long startTimestamp; public FaultItem(final String name) { this.name = name; } /** * 可用性 > 延迟 > 开始可用时间 * * @param other 另一对象 * @return 升序 */ @Override public int compareTo(final FaultItem other) { if (this.isAvailable() != other.isAvailable()) { if (this.isAvailable()) { return -1; } if (other.isAvailable()) { return 1; } } if (this.currentLatency < other.currentLatency) { return -1; } else if (this.currentLatency > other.currentLatency) { return 1; } if (this.startTimestamp < other.startTimestamp) { return -1; } else if (this.startTimestamp > other.startTimestamp) { return 1; } return 0; } public boolean isAvailable() { return (System.currentTimeMillis() - startTimestamp) >= 0; } @Override public int hashCode() { int result = getName() != null ? getName().hashCode() : 0; result = 31 * result + (int) (getCurrentLatency() ^ (getCurrentLatency() >>> 32)); result = 31 * result + (int) (getStartTimestamp() ^ (getStartTimestamp() >>> 32)); return result; } @Override public boolean equals(final Object o) { if (this == o) { return true; } if (!(o instanceof FaultItem)) { return false; } final FaultItem faultItem = (FaultItem) o; if (getCurrentLatency() != faultItem.getCurrentLatency()) { return false; } if (getStartTimestamp() != faultItem.getStartTimestamp()) { return false; } return getName() != null ? getName().equals(faultItem.getName()) : faultItem.getName() == null; } @Override public String toString() { return "FaultItem{" + "name='" + name + '/'' + ", currentLatency=" + currentLatency + ", startTimestamp=" + startTimestamp + '}'; } public String getName() { return name; } public long getCurrentLatency() { return currentLatency; } public void setCurrentLatency(final long currentLatency) { this.currentLatency = currentLatency; } public long getStartTimestamp() { return startTimestamp; } public void setStartTimestamp(final long startTimestamp) { this.startTimestamp = startTimestamp; } }
DefaultMQProducerImpl#sendKernelImpl()
说明 :发送消息核心方法。该方法真正发起网络请求,发送消息给 Broker。
private SendResult sendKernelImpl(final Message msg, final MessageQueue mq, final CommunicationMode communicationMode, final SendCallback sendCallback, final TopicPublishInfo topicPublishInfo, final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { long beginStartTime = System.currentTimeMillis(); // 获取 broker地址 String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName()); if (null == brokerAddr) { tryToFindTopicPublishInfo(mq.getTopic()); brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName()); } SendMessageContext context = null; if (brokerAddr != null) { // 是否使用broker vip通道。broker会开启两个端口对外服务。 brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr); byte[] prevBody = msg.getBody(); try { // for MessageBatch,ID has been set in the generating process // 设置唯一编号 if (!(msg instanceof MessageBatch)) { MessageClientIDSetter.setUniqID(msg); } boolean topicWithNamespace = false; if (null != this.mQClientFactory.getClientConfig().getNamespace()) { msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace()); topicWithNamespace = true; } int sysFlag = 0; boolean msgBodyCompressed = false; // 消息压缩 if (this.tryToCompressMessage(msg)) { sysFlag |= MessageSysFlag.COMPRESSED_FLAG; msgBodyCompressed = true; } // 事务 final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED); if (tranMsg != null && Boolean.parseBoolean(tranMsg)) { sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE; } // hook:发送消息校验 if (hasCheckForbiddenHook()) { CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext(); checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr()); checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup()); checkForbiddenContext.setCommunicationMode(communicationMode); checkForbiddenContext.setBrokerAddr(brokerAddr); checkForbiddenContext.setMessage(msg); checkForbiddenContext.setMq(mq); checkForbiddenContext.setUnitMode(this.isUnitMode()); this.executeCheckForbiddenHook(checkForbiddenContext); } // hook:发送消息前逻辑 if (this.hasSendMessageHook()) { context = new SendMessageContext(); context.setProducer(this); context.setProducerGroup(this.defaultMQProducer.getProducerGroup()); context.setCommunicationMode(communicationMode); context.setBornHost(this.defaultMQProducer.getClientIP()); context.setBrokerAddr(brokerAddr); context.setMessage(msg); context.setMq(mq); context.setNamespace(this.defaultMQProducer.getNamespace()); String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED); if (isTrans != null && isTrans.equals("true")) { context.setMsgType(MessageType.Trans_Msg_Half); } if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) { context.setMsgType(MessageType.Delay_Msg); } this.executeSendMessageHookBefore(context); } // 构建发送消息请求 SendMessageRequestHeader requestHeader = new SendMessageRequestHeader(); requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup()); requestHeader.setTopic(msg.getTopic()); requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey()); requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums()); requestHeader.setQueueId(mq.getQueueId()); requestHeader.setSysFlag(sysFlag); requestHeader.setBornTimestamp(System.currentTimeMillis()); requestHeader.setFlag(msg.getFlag()); requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties())); requestHeader.setReconsumeTimes(0); requestHeader.setUnitMode(this.isUnitMode()); requestHeader.setBatch(msg instanceof MessageBatch); if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { String reconsumeTimes = MessageAccessor.getReconsumeTime(msg); if (reconsumeTimes != null) { requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes)); MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME); } String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg); if (maxReconsumeTimes != null) { requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes)); MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES); } } // 发送消息 SendResult sendResult = null; switch (communicationMode) { case ASYNC: Message tmpMessage = msg; boolean messageCloned = false; if (msgBodyCompressed) { //If msg body was compressed, msgbody should be reset using prevBody. //Clone new message using commpressed message body and recover origin massage. //Fix bug:https://github.com/apache/rocketmq-externals/issues/66 tmpMessage = MessageAccessor.cloneMessage(msg); messageCloned = true; msg.setBody(prevBody); } if (topicWithNamespace) { if (!messageCloned) { tmpMessage = MessageAccessor.cloneMessage(msg); messageCloned = true; } msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace())); } long costTimeAsync = System.currentTimeMillis() - beginStartTime; if (timeout < costTimeAsync) { throw new RemotingTooMuchRequestException("sendKernelImpl call timeout"); } sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage( brokerAddr, mq.getBrokerName(), tmpMessage, requestHeader, timeout - costTimeAsync, communicationMode, sendCallback, topicPublishInfo, this.mQClientFactory, this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(), context, this); break; case ONEWAY: case SYNC: long costTimeSync = System.currentTimeMillis() - beginStartTime; if (timeout < costTimeSync) { throw new RemotingTooMuchRequestException("sendKernelImpl call timeout"); } sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage( brokerAddr, mq.getBrokerName(), msg, requestHeader, timeout - costTimeSync, communicationMode, context, this); break; default: assert false; break; } // hook:发送消息后逻辑 if (this.hasSendMessageHook()) { context.setSendResult(sendResult); this.executeSendMessageHookAfter(context); } // 返回发送结果 return sendResult; } catch (RemotingException e) { if (this.hasSendMessageHook()) { context.setException(e); this.executeSendMessageHookAfter(context); } throw e; } catch (MQBrokerException e) { if (this.hasSendMessageHook()) { context.setException(e); this.executeSendMessageHookAfter(context); } throw e; } catch (InterruptedException e) { if (this.hasSendMessageHook()) { context.setException(e); this.executeSendMessageHookAfter(context); } throw e; } finally { msg.setBody(prevBody); msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace())); } } // broker为空抛出异常 throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null); }
3、Broker 接收消息
SendMessageProcessor#sendMessage
#processRequest() 说明 :处理消息请求。
#sendMessage() 说明 :发送消息,并返回发送消息结果。
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.rocketmq.broker.processor;import java.net.SocketAddress;import java.util.List;import java.util.Map;import io.netty.channel.ChannelHandlerContext;import org.apache.rocketmq.broker.BrokerController;import org.apache.rocketmq.broker.mqtrace.ConsumeMessageContext;import org.apache.rocketmq.broker.mqtrace.ConsumeMessageHook;import org.apache.rocketmq.broker.mqtrace.SendMessageContext;import org.apache.rocketmq.common.MQVersion;import org.apache.rocketmq.common.MixAll;import org.apache.rocketmq.common.TopicConfig;import org.apache.rocketmq.common.TopicFilterType;import org.apache.rocketmq.common.UtilAll;import org.apache.rocketmq.common.constant.PermName;import org.apache.rocketmq.common.help.FAQUrl;import org.apache.rocketmq.common.message.MessageAccessor;import org.apache.rocketmq.common.message.MessageConst;import org.apache.rocketmq.common.message.MessageDecoder;import org.apache.rocketmq.common.message.MessageExt;import org.apache.rocketmq.common.message.MessageExtBatch;import org.apache.rocketmq.common.protocol.NamespaceUtil;import org.apache.rocketmq.common.protocol.RequestCode;import org.apache.rocketmq.common.protocol.ResponseCode;import org.apache.rocketmq.common.protocol.header.ConsumerSendMsgBackRequestHeader;import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;import org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader;import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;import org.apache.rocketmq.common.sysflag.MessageSysFlag;import org.apache.rocketmq.common.sysflag.TopicSysFlag;import org.apache.rocketmq.remoting.exception.RemotingCommandException;import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;import org.apache.rocketmq.remoting.protocol.RemotingCommand;import org.apache.rocketmq.store.MessageExtBrokerInner;import org.apache.rocketmq.store.PutMessageResult;import org.apache.rocketmq.store.config.StorePathConfigHelper;import org.apache.rocketmq.store.stats.BrokerStatsManager;public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor { private List<ConsumeMessageHook> consumeMessageHookList; public SendMessageProcessor(final BrokerController brokerController) { super(brokerController); } @Override public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { SendMessageContext mqtraceContext; switch (request.getCode()) { case RequestCode.CONSUMER_SEND_MSG_BACK: return this.consumerSendMsgBack(ctx, request); default: // 解析请求 SendMessageRequestHeader requestHeader = parseRequestHeader(request); if (requestHeader == null) { return null; } // 发送请求Context。在 hook 场景下使用 mqtraceContext = buildMsgContext(ctx, requestHeader); // hook:处理发送消息前逻辑 this.executeSendMessageHookBefore(ctx, request, mqtraceContext); RemotingCommand response; // 处理发送消息逻辑 if (requestHeader.isBatch()) { response = this.sendBatchMessage(ctx, request, mqtraceContext, requestHeader); } else { response = this.sendMessage(ctx, request, mqtraceContext, requestHeader); } // hook:处理发送消息后逻辑 this.executeSendMessageHookAfter(response, mqtraceContext); return response; } } @Override public boolean rejectRequest() { return this.brokerController.getMessageStore().isOSPageCacheBusy() || this.brokerController.getMessageStore().isTransientStorePoolDeficient(); } private RemotingCommand consumerSendMsgBack(final ChannelHandlerContext ctx, final RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); final ConsumerSendMsgBackRequestHeader requestHeader = (ConsumerSendMsgBackRequestHeader)request.decodeCommandCustomHeader(ConsumerSendMsgBackRequestHeader.class); String namespace = NamespaceUtil.getNamespaceFromResource(requestHeader.getGroup()); if (this.hasConsumeMessageHook() && !UtilAll.isBlank(requestHeader.getOriginMsgId())) { ConsumeMessageContext context = new ConsumeMessageContext(); context.setNamespace(namespace); context.setConsumerGroup(requestHeader.getGroup()); context.setTopic(requestHeader.getOriginTopic()); context.setCommercialRcvStats(BrokerStatsManager.StatsType.SEND_BACK); context.setCommercialRcvTimes(1); context.setCommercialOwner(request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER)); this.executeConsumeMessageHookAfter(context); } SubscriptionGroupConfig subscriptionGroupConfig = this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getGroup()); if (null == subscriptionGroupConfig) { response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST); response.setRemark("subscription group not exist, " + requestHeader.getGroup() + " " + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST)); return response; } if (!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission())) { response.setCode(ResponseCode.NO_PERMISSION); response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending message is forbidden"); return response; } if (subscriptionGroupConfig.getRetryQueueNums() <= 0) { response.setCode(ResponseCode.SUCCESS); response.setRemark(null); return response; } String newTopic = MixAll.getRetryTopic(requestHeader.getGroup()); int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % subscriptionGroupConfig.getRetryQueueNums(); int topicSysFlag = 0; if (requestHeader.isUnitMode()) { topicSysFlag = TopicSysFlag.buildSysFlag(false, true); } TopicConfig topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod( newTopic, subscriptionGroupConfig.getRetryQueueNums(), PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag); if (null == topicConfig) { response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("topic[" + newTopic + "] not exist"); return response; } if (!PermName.isWriteable(topicConfig.getPerm())) { response.setCode(ResponseCode.NO_PERMISSION); response.setRemark(String.format("the topic[%s] sending message is forbidden", newTopic)); return response; } MessageExt msgExt = this.brokerController.getMessageStore().lookMessageByOffset(requestHeader.getOffset()); if (null == msgExt) { response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("look message by offset failed, " + requestHeader.getOffset()); return response; } final String retryTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC); if (null == retryTopic) { MessageAccessor.putProperty(msgExt, MessageConst.PROPERTY_RETRY_TOPIC, msgExt.getTopic()); } msgExt.setWaitStoreMsgOK(false); int delayLevel = requestHeader.getDelayLevel(); int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes(); if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) { maxReconsumeTimes = requestHeader.getMaxReconsumeTimes(); } if (msgExt.getReconsumeTimes() >= maxReconsumeTimes || delayLevel < 0) { newTopic = MixAll.getDLQTopic(requestHeader.getGroup()); queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP; topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic, DLQ_NUMS_PER_GROUP, PermName.PERM_WRITE, 0 ); if (null == topicConfig) { response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("topic[" + newTopic + "] not exist"); return response; } } else { if (0 == delayLevel) { delayLevel = 3 + msgExt.getReconsumeTimes(); } msgExt.setDelayTimeLevel(delayLevel); } MessageExtBrokerInner msgInner = new MessageExtBrokerInner(); msgInner.setTopic(newTopic); msgInner.setBody(msgExt.getBody()); msgInner.setFlag(msgExt.getFlag()); MessageAccessor.setProperties(msgInner, msgExt.getProperties()); msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties())); msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(null, msgExt.getTags())); msgInner.setQueueId(queueIdInt); msgInner.setSysFlag(msgExt.getSysFlag()); msgInner.setBornTimestamp(msgExt.getBornTimestamp()); msgInner.setBornHost(msgExt.getBornHost()); msgInner.setStoreHost(this.getStoreHost()); msgInner.setReconsumeTimes(msgExt.getReconsumeTimes() + 1); String originMsgId = MessageAccessor.getOriginMessageId(msgExt); MessageAccessor.setOriginMessageId(msgInner, UtilAll.isBlank(originMsgId) ? msgExt.getMsgId() : originMsgId); PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner); if (putMessageResult != null) { switch (putMessageResult.getPutMessageStatus()) { case PUT_OK: String backTopic = msgExt.getTopic(); String correctTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC); if (correctTopic != null) { backTopic = correctTopic; } this.brokerController.getBrokerStatsManager().incSendBackNums(requestHeader.getGroup(), backTopic); response.setCode(ResponseCode.SUCCESS); response.setRemark(null); return response; default: break; } response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark(putMessageResult.getPutMessageStatus().name()); return response; } response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("putMessageResult is null"); return response; } private boolean handleRetryAndDLQ(SendMessageRequestHeader requestHeader, RemotingCommand response, RemotingCommand request, MessageExt msg, TopicConfig topicConfig) { String newTopic = requestHeader.getTopic(); if (null != newTopic && newTopic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { // 获取订阅分组配置 String groupName = newTopic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length()); SubscriptionGroupConfig subscriptionGroupConfig = this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(groupName); if (null == subscriptionGroupConfig) { response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST); response.setRemark( "subscription group not exist, " + groupName + " " + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST)); return false; } // 计算最大可消费次数 int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes(); if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) { maxReconsumeTimes = requestHeader.getMaxReconsumeTimes(); } int reconsumeTimes = requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes(); // 超过最大消费次数 if (reconsumeTimes >= maxReconsumeTimes) { newTopic = MixAll.getDLQTopic(groupName); int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP; topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic, DLQ_NUMS_PER_GROUP, PermName.PERM_WRITE, 0 ); msg.setTopic(newTopic); msg.setQueueId(queueIdInt); if (null == topicConfig) { response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("topic[" + newTopic + "] not exist"); return false; } } } int sysFlag = requestHeader.getSysFlag(); if (TopicFilterType.MULTI_TAG == topicConfig.getTopicFilterType()) { sysFlag |= MessageSysFlag.MULTI_TAGS_FLAG; } msg.setSysFlag(sysFlag); return true; } private RemotingCommand sendMessage(final ChannelHandlerContext ctx, final RemotingCommand request, final SendMessageContext sendMessageContext, final SendMessageRequestHeader requestHeader) throws RemotingCommandException { // 初始化响应 final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class); final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader(); response.setOpaque(request.getOpaque()); response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId()); response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn())); log.debug("receive SendMessage request command, {}", request); // 如果未开始接收消息,抛出系统异常 final long startTimstamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp(); if (this.brokerController.getMessageStore().now() < startTimstamp) { response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimstamp))); return response; } // 消息配置(Topic配置)校验 response.setCode(-1); super.msgCheck(ctx, requestHeader, response); if (response.getCode() != -1) { return response; } final byte[] body = request.getBody(); // 如果队列小于0,从可用队列随机选择 int queueIdInt = requestHeader.getQueueId(); TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic()); if (queueIdInt < 0) { queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums(); } MessageExtBrokerInner msgInner = new MessageExtBrokerInner(); msgInner.setTopic(requestHeader.getTopic()); msgInner.setQueueId(queueIdInt); // 对RETRY类型的消息处理。如果超过最大消费次数,则topic修改成"%DLQ%" + 分组名,即加入 死信队列(Dead Letter Queue) if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig)) { return response; } // 创建MessageExtBrokerInner msgInner.setBody(body); msgInner.setFlag(requestHeader.getFlag()); MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties())); msgInner.setBornTimestamp(requestHeader.getBornTimestamp()); msgInner.setBornHost(ctx.channel().remoteAddress()); msgInner.setStoreHost(this.getStoreHost()); msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes()); String clusterName = this.brokerController.getBrokerConfig().getBrokerClusterName(); MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_CLUSTER, clusterName); msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties())); PutMessageResult putMessageResult = null; Map<String, String> oriProps = MessageDecoder.string2messageProperties(requestHeader.getProperties()); String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED); // 校验是否不允许发送事务消息 if (traFlag != null && Boolean.parseBoolean(traFlag)) { if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) { response.setCode(ResponseCode.NO_PERMISSION); response.setRemark( "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending transaction message is forbidden"); return response; } putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner); } else { putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner); } return handlePutMessageResult(putMessageResult, response, request, msgInner, responseHeader, sendMessageContext, ctx, queueIdInt); } private RemotingCommand handlePutMessageResult(PutMessageResult putMessageResult, RemotingCommand response, RemotingCommand request, MessageExt msg, SendMessageResponseHeader responseHeader, SendMessageContext sendMessageContext, ChannelHandlerContext ctx, int queueIdInt) { if (putMessageResult == null) { response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("store putMessage return null"); return response; } boolean sendOK = false; switch (putMessageResult.getPutMessageStatus()) { // Success case PUT_OK: sendOK = true; response.setCode(ResponseCode.SUCCESS); break; case FLUSH_DISK_TIMEOUT: response.setCode(ResponseCode.FLUSH_DISK_TIMEOUT); sendOK = true; break; case FLUSH_SLAVE_TIMEOUT: response.setCode(ResponseCode.FLUSH_SLAVE_TIMEOUT); sendOK = true; break; case SLAVE_NOT_AVAILABLE: response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE); sendOK = true; break; // Failed case CREATE_MAPEDFILE_FAILED: response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("create mapped file failed, server is busy or broken."); break; case MESSAGE_ILLEGAL: case PROPERTIES_SIZE_EXCEEDED: response.setCode(ResponseCode.MESSAGE_ILLEGAL); response.setRemark( "the message is illegal, maybe msg body or properties length not matched. msg body length limit 128k, msg properties length limit 32k."); break; case SERVICE_NOT_AVAILABLE: response.setCode(ResponseCode.SERVICE_NOT_AVAILABLE); response.setRemark( "service not available now, maybe disk full, " + diskUtil() + ", maybe your broker machine memory too small."); break; case OS_PAGECACHE_BUSY: response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("[PC_SYNCHRONIZED]broker busy, start flow control for a while"); break; case UNKNOWN_ERROR: response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("UNKNOWN_ERROR"); break; default: response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("UNKNOWN_ERROR DEFAULT"); break; } String owner = request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER); if (sendOK) { // 统计 this.brokerController.getBrokerStatsManager().incTopicPutNums(msg.getTopic(), putMessageResult.getAppendMessageResult().getMsgNum(), 1); this.brokerController.getBrokerStatsManager().incTopicPutSize(msg.getTopic(), putMessageResult.getAppendMessageResult().getWroteBytes()); this.brokerController.getBrokerStatsManager().incBrokerPutNums(putMessageResult.getAppendMessageResult().getMsgNum()); // 响应 response.setRemark(null); responseHeader.setMsgId(putMessageResult.getAppendMessageResult().getMsgId()); responseHeader.setQueueId(queueIdInt); responseHeader.setQueueOffset(putMessageResult.getAppendMessageResult().getLogicsOffset()); doResponse(ctx, request, response); // hook:设置发送成功到context if (hasSendMessageHook()) { sendMessageContext.setMsgId(responseHeader.getMsgId()); sendMessageContext.setQueueId(responseHeader.getQueueId()); sendMessageContext.setQueueOffset(responseHeader.getQueueOffset()); int commercialBaseCount = brokerController.getBrokerConfig().getCommercialBaseCount(); int wroteSize = putMessageResult.getAppendMessageResult().getWroteBytes(); int incValue = (int)Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT) * commercialBaseCount; sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_SUCCESS); sendMessageContext.setCommercialSendTimes(incValue); sendMessageContext.setCommercialSendSize(wroteSize); sendMessageContext.setCommercialOwner(owner); } return null; } else { // hook:设置发送失败到context if (hasSendMessageHook()) { int wroteSize = request.getBody().length; int incValue = (int)Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT); sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_FAILURE); sendMessageContext.setCommercialSendTimes(incValue); sendMessageContext.setCommercialSendSize(wroteSize); sendMessageContext.setCommercialOwner(owner); } } return response; } private RemotingCommand sendBatchMessage(final ChannelHandlerContext ctx, final RemotingCommand request, final SendMessageContext sendMessageContext, final SendMessageRequestHeader requestHeader) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class); final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader(); response.setOpaque(request.getOpaque()); response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId()); response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn())); log.debug("Receive SendMessage request command {}", request); final long startTimstamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp(); if (this.brokerController.getMessageStore().now() < startTimstamp) { response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimstamp))); return response; } response.setCode(-1); super.msgCheck(ctx, requestHeader, response); if (response.getCode() != -1) { return response; } int queueIdInt = requestHeader.getQueueId(); TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic()); if (queueIdInt < 0) { queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums(); } if (requestHeader.getTopic().length() > Byte.MAX_VALUE) { response.setCode(ResponseCode.MESSAGE_ILLEGAL); response.setRemark("message topic length too long " + requestHeader.getTopic().length()); return response; } if (requestHeader.getTopic() != null && requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { response.setCode(ResponseCode.MESSAGE_ILLEGAL); response.setRemark("batch request does not support retry group " + requestHeader.getTopic()); return response; } MessageExtBatch messageExtBatch = new MessageExtBatch(); messageExtBatch.setTopic(requestHeader.getTopic()); messageExtBatch.setQueueId(queueIdInt); int sysFlag = requestHeader.getSysFlag(); if (TopicFilterType.MULTI_TAG == topicConfig.getTopicFilterType()) { sysFlag |= MessageSysFlag.MULTI_TAGS_FLAG; } messageExtBatch.setSysFlag(sysFlag); messageExtBatch.setFlag(requestHeader.getFlag()); MessageAccessor.setProperties(messageExtBatch, MessageDecoder.string2messageProperties(requestHeader.getProperties())); messageExtBatch.setBody(request.getBody()); messageExtBatch.setBornTimestamp(requestHeader.getBornTimestamp()); messageExtBatch.setBornHost(ctx.channel().remoteAddress()); messageExtBatch.setStoreHost(this.getStoreHost()); messageExtBatch.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes()); String clusterName = this.brokerController.getBrokerConfig().getBrokerClusterName(); MessageAccessor.putProperty(messageExtBatch, MessageConst.PROPERTY_CLUSTER, clusterName); PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessages(messageExtBatch); return handlePutMessageResult(putMessageResult, response, request, messageExtBatch, responseHeader, sendMessageContext, ctx, queueIdInt); } public boolean hasConsumeMessageHook() { return consumeMessageHookList != null && !this.consumeMessageHookList.isEmpty(); } public void executeConsumeMessageHookAfter(final ConsumeMessageContext context) { if (hasConsumeMessageHook()) { for (ConsumeMessageHook hook : this.consumeMessageHookList) { try { hook.consumeMessageAfter(context); } catch (Throwable e) { // Ignore } } } } public SocketAddress getStoreHost() { return storeHost; } private String diskUtil() { String storePathPhysic = this.brokerController.getMessageStoreConfig().getStorePathCommitLog(); double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic); String storePathLogis = StorePathConfigHelper.getStorePathConsumeQueue(this.brokerController.getMessageStoreConfig().getStorePathRootDir()); double logisRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathLogis); String storePathIndex = StorePathConfigHelper.getStorePathIndex(this.brokerController.getMessageStoreConfig().getStorePathRootDir()); double indexRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathIndex); return String.format("CL: %5.2f CQ: %5.2f INDEX: %5.2f", physicRatio, logisRatio, indexRatio); } public void registerConsumeMessageHook(List<ConsumeMessageHook> consumeMessageHookList) { this.consumeMessageHookList = consumeMessageHookList; }}
AbstractSendMessageProcessor#msgCheck
说明:校验消息是否正确,主要是Topic配置方面,例如:Broker 是否有写入权限,topic配置是否存在,队列编号是否正确。
protected RemotingCommand msgCheck(final ChannelHandlerContext ctx, final SendMessageRequestHeader requestHeader, final RemotingCommand response) { // 检查 broker 是否有写入权限 if (!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission()) && this.brokerController.getTopicConfigManager().isOrderTopic(requestHeader.getTopic())) { response.setCode(ResponseCode.NO_PERMISSION); response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending message is forbidden"); return response; } // 检查topic是否可以被发送。目前是{@link MixAll.DEFAULT_TOPIC}不被允许发送 if (!this.brokerController.getTopicConfigManager().isTopicCanSendMessage(requestHeader.getTopic())) { String errorMsg = "the topic[" + requestHeader.getTopic() + "] is conflict with system reserved words."; log.warn(errorMsg); response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark(errorMsg); return response; } TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic()); // 不存在topicConfig,则进行创建 if (null == topicConfig) { int topicSysFlag = 0; if (requestHeader.isUnitMode()) { if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { topicSysFlag = TopicSysFlag.buildSysFlag(false, true); } else { topicSysFlag = TopicSysFlag.buildSysFlag(true, false); } } // 创建topic配置 log.warn("the topic {} not exist, producer: {}", requestHeader.getTopic(), ctx.channel().remoteAddress()); topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageMethod( requestHeader.getTopic(), requestHeader.getDefaultTopic(), RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getDefaultTopicQueueNums(), topicSysFlag); // 如果没配置 if (null == topicConfig) { if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod( requestHeader.getTopic(), 1, PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag); } } if (null == topicConfig) { response.setCode(ResponseCode.TOPIC_NOT_EXIST); response.setRemark("topic[" + requestHeader.getTopic() + "] not exist, apply first please!" + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL)); return response; } } // 队列编号是否正确 int queueIdInt = requestHeader.getQueueId(); int idValid = Math.max(topicConfig.getWriteQueueNums(), topicConfig.getReadQueueNums()); if (queueIdInt >= idValid) { String errorInfo = String.format("request queueId[%d] is illegal, %s Producer: %s", queueIdInt, topicConfig.toString(), RemotingHelper.parseChannelRemoteAddr(ctx.channel())); log.warn(errorInfo); response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark(errorInfo); return response; } return response; }
DefaultMessageStore#putMessage
说明:存储消息封装,最终存储需要 CommitLog 实现。
@Override public PutMessageResult putMessage(MessageExtBrokerInner msg) { if (this.shutdown) { log.warn("message store has shutdown, so putMessage is forbidden"); return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null); } // // 从节点不允许写入 if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) { long value = this.printTimes.getAndIncrement(); if ((value % 50000) == 0) { log.warn("message store is slave mode, so putMessage is forbidden "); } return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null); } // store是否允许写入 if (!this.runningFlags.isWriteable()) { long value = this.printTimes.getAndIncrement(); if ((value % 50000) == 0) { log.warn("message store is not writeable, so putMessage is forbidden " + this.runningFlags.getFlagBits()); } return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null); } else { this.printTimes.set(0); } // 消息过长 if (msg.getTopic().length() > Byte.MAX_VALUE) { log.warn("putMessage message topic length too long " + msg.getTopic().length()); return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null); } // 消息附加属性过长 if (msg.getPropertiesString() != null && msg.getPropertiesString().length() > Short.MAX_VALUE) { log.warn("putMessage message properties length too long " + msg.getPropertiesString().length()); return new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null); } if (this.isOSPageCacheBusy()) { return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null); } long beginTime = this.getSystemClock().now(); // 添加消息到commitLog PutMessageResult result = this.commitLog.putMessage(msg); long elapsedTime = this.getSystemClock().now() - beginTime; if (elapsedTime > 500) { log.warn("putMessage not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, msg.getBody().length); } this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime); if (null == result || !result.isOk()) { this.storeStatsService.getPutMessageFailedTimes().incrementAndGet(); } return result; }
. .

- 0