侧边栏壁纸
博主头像
落叶人生博主等级

走进秋风,寻找秋天的落叶

  • 累计撰写 130562 篇文章
  • 累计创建 28 个标签
  • 累计收到 9 条评论
标签搜索

目 录CONTENT

文章目录

聊聊rocketmq的ScheduleMessageService

2022-06-23 星期四 / 0 评论 / 0 点赞 / 58 阅读 / 19388 字

序本文主要研究一下rocketmq的ScheduleMessageServiceScheduleMessageServicerocketmq-all-4.6.0-source-release/stor

本文主要研究一下rocketmq的ScheduleMessageService

ScheduleMessageService

rocketmq-all-4.6.0-source-release/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java

public class ScheduleMessageService extends ConfigManager {    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);    public static final String SCHEDULE_TOPIC = "SCHEDULE_TOPIC_XXXX";    private static final long FIRST_DELAY_TIME = 1000L;    private static final long DELAY_FOR_A_WHILE = 100L;    private static final long DELAY_FOR_A_PERIOD = 10000L;    private final ConcurrentMap<Integer /* level */, Long/* delay timeMillis */> delayLevelTable =        new ConcurrentHashMap<Integer, Long>(32);    private final ConcurrentMap<Integer /* level */, Long/* offset */> offsetTable =        new ConcurrentHashMap<Integer, Long>(32);    private final DefaultMessageStore defaultMessageStore;    private final AtomicBoolean started = new AtomicBoolean(false);    private Timer timer;    private MessageStore writeMessageStore;    private int maxDelayLevel;    public ScheduleMessageService(final DefaultMessageStore defaultMessageStore) {        this.defaultMessageStore = defaultMessageStore;        this.writeMessageStore = defaultMessageStore;    }    public static int queueId2DelayLevel(final int queueId) {        return queueId + 1;    }    public static int delayLevel2QueueId(final int delayLevel) {        return delayLevel - 1;    }    //......    private void updateOffset(int delayLevel, long offset) {        this.offsetTable.put(delayLevel, offset);    }    public long computeDeliverTimestamp(final int delayLevel, final long storeTimestamp) {        Long time = this.delayLevelTable.get(delayLevel);        if (time != null) {            return time + storeTimestamp;        }        return storeTimestamp + 1000;    }    public void start() {        if (started.compareAndSet(false, true)) {            this.timer = new Timer("ScheduleMessageTimerThread", true);            for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {                Integer level = entry.getKey();                Long timeDelay = entry.getValue();                Long offset = this.offsetTable.get(level);                if (null == offset) {                    offset = 0L;                }                if (timeDelay != null) {                    this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);                }            }            this.timer.scheduleAtFixedRate(new TimerTask() {                @Override                public void run() {                    try {                        if (started.get()) ScheduleMessageService.this.persist();                    } catch (Throwable e) {                        log.error("scheduleAtFixedRate flush exception", e);                    }                }            }, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());        }    }    public void shutdown() {        if (this.started.compareAndSet(true, false)) {            if (null != this.timer)                this.timer.cancel();        }    }    //......    public boolean load() {        boolean result = super.load();        result = result && this.parseDelayLevel();        return result;    }    @Override    public String configFilePath() {        return StorePathConfigHelper.getDelayOffsetStorePath(this.defaultMessageStore.getMessageStoreConfig()            .getStorePathRootDir());    }    @Override    public void decode(String jsonString) {        if (jsonString != null) {            DelayOffsetSerializeWrapper delayOffsetSerializeWrapper =                DelayOffsetSerializeWrapper.fromJson(jsonString, DelayOffsetSerializeWrapper.class);            if (delayOffsetSerializeWrapper != null) {                this.offsetTable.putAll(delayOffsetSerializeWrapper.getOffsetTable());            }        }    }    public String encode(final boolean prettyFormat) {        DelayOffsetSerializeWrapper delayOffsetSerializeWrapper = new DelayOffsetSerializeWrapper();        delayOffsetSerializeWrapper.setOffsetTable(this.offsetTable);        return delayOffsetSerializeWrapper.toJson(prettyFormat);    }    public boolean parseDelayLevel() {        HashMap<String, Long> timeUnitTable = new HashMap<String, Long>();        timeUnitTable.put("s", 1000L);        timeUnitTable.put("m", 1000L * 60);        timeUnitTable.put("h", 1000L * 60 * 60);        timeUnitTable.put("d", 1000L * 60 * 60 * 24);        String levelString = this.defaultMessageStore.getMessageStoreConfig().getMessageDelayLevel();        try {            String[] levelArray = levelString.split(" ");            for (int i = 0; i < levelArray.length; i++) {                String value = levelArray[i];                String ch = value.substring(value.length() - 1);                Long tu = timeUnitTable.get(ch);                int level = i + 1;                if (level > this.maxDelayLevel) {                    this.maxDelayLevel = level;                }                long num = Long.parseLong(value.substring(0, value.length() - 1));                long delayTimeMillis = tu * num;                this.delayLevelTable.put(level, delayTimeMillis);            }        } catch (Exception e) {            log.error("parseDelayLevel exception", e);            log.info("levelString String = {}", levelString);            return false;        }        return true;    }    //......}
  • ScheduleMessageService继承了ConfigManager;定义了delayLevelTable,其key为level,value为delay timeMillis;其start方法会先延时FIRST_DELAY_TIME调度DeliverDelayedMessageTimerTask;之后注册了另一个定时任务,每隔flushDelayOffsetInterval执行persist方法(ConfigManager.persist)

DeliverDelayedMessageTimerTask

rocketmq-all-4.6.0-source-release/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java

    class DeliverDelayedMessageTimerTask extends TimerTask {        private final int delayLevel;        private final long offset;        public DeliverDelayedMessageTimerTask(int delayLevel, long offset) {            this.delayLevel = delayLevel;            this.offset = offset;        }        @Override        public void run() {            try {                if (isStarted()) {                    this.executeOnTimeup();                }            } catch (Exception e) {                // XXX: warn and notify me                log.error("ScheduleMessageService, executeOnTimeup exception", e);                ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(                    this.delayLevel, this.offset), DELAY_FOR_A_PERIOD);            }        }        /**         * @return         */        private long correctDeliverTimestamp(final long now, final long deliverTimestamp) {            long result = deliverTimestamp;            long maxTimestamp = now + ScheduleMessageService.this.delayLevelTable.get(this.delayLevel);            if (deliverTimestamp > maxTimestamp) {                result = now;            }            return result;        }        public void executeOnTimeup() {            ConsumeQueue cq =                ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(SCHEDULE_TOPIC,                    delayLevel2QueueId(delayLevel));            long failScheduleOffset = offset;            if (cq != null) {                SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);                if (bufferCQ != null) {                    try {                        long nextOffset = offset;                        int i = 0;                        ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();                        for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {                            long offsetPy = bufferCQ.getByteBuffer().getLong();                            int sizePy = bufferCQ.getByteBuffer().getInt();                            long tagsCode = bufferCQ.getByteBuffer().getLong();                            if (cq.isExtAddr(tagsCode)) {                                if (cq.getExt(tagsCode, cqExtUnit)) {                                    tagsCode = cqExtUnit.getTagsCode();                                } else {                                    //can't find ext content.So re compute tags code.                                    log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}",                                        tagsCode, offsetPy, sizePy);                                    long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);                                    tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);                                }                            }                            long now = System.currentTimeMillis();                            long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);                            nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);                            long countdown = deliverTimestamp - now;                            if (countdown <= 0) {                                MessageExt msgExt =                                    ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(                                        offsetPy, sizePy);                                if (msgExt != null) {                                    try {                                        MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);                                        PutMessageResult putMessageResult =                                            ScheduleMessageService.this.writeMessageStore                                                .putMessage(msgInner);                                        if (putMessageResult != null                                            && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {                                            continue;                                        } else {                                            // XXX: warn and notify me                                            log.error(                                                "ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}",                                                msgExt.getTopic(), msgExt.getMsgId());                                            ScheduleMessageService.this.timer.schedule(                                                new DeliverDelayedMessageTimerTask(this.delayLevel,                                                    nextOffset), DELAY_FOR_A_PERIOD);                                            ScheduleMessageService.this.updateOffset(this.delayLevel,                                                nextOffset);                                            return;                                        }                                    } catch (Exception e) {                                        /*                                         * XXX: warn and notify me                                         */                                        log.error(                                            "ScheduleMessageService, messageTimeup execute error, drop it. msgExt="                                                + msgExt + ", nextOffset=" + nextOffset + ",offsetPy="                                                + offsetPy + ",sizePy=" + sizePy, e);                                    }                                }                            } else {                                ScheduleMessageService.this.timer.schedule(                                    new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset),                                    countdown);                                ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);                                return;                            }                        } // end of for                        nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);                        ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(                            this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);                        ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);                        return;                    } finally {                        bufferCQ.release();                    }                } // end of if (bufferCQ != null)                else {                    long cqMinOffset = cq.getMinOffsetInQueue();                    if (offset < cqMinOffset) {                        failScheduleOffset = cqMinOffset;                        log.error("schedule CQ offset invalid. offset=" + offset + ", cqMinOffset="                            + cqMinOffset + ", queueId=" + cq.getQueueId());                    }                }            } // end of if (cq != null)            ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,                failScheduleOffset), DELAY_FOR_A_WHILE);        }        private MessageExtBrokerInner messageTimeup(MessageExt msgExt) {            MessageExtBrokerInner msgInner = new MessageExtBrokerInner();            msgInner.setBody(msgExt.getBody());            msgInner.setFlag(msgExt.getFlag());            MessageAccessor.setProperties(msgInner, msgExt.getProperties());            TopicFilterType topicFilterType = MessageExt.parseTopicFilterType(msgInner.getSysFlag());            long tagsCodeValue =                MessageExtBrokerInner.tagsString2tagsCode(topicFilterType, msgInner.getTags());            msgInner.setTagsCode(tagsCodeValue);            msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));            msgInner.setSysFlag(msgExt.getSysFlag());            msgInner.setBornTimestamp(msgExt.getBornTimestamp());            msgInner.setBornHost(msgExt.getBornHost());            msgInner.setStoreHost(msgExt.getStoreHost());            msgInner.setReconsumeTimes(msgExt.getReconsumeTimes());            msgInner.setWaitStoreMsgOK(false);            MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_DELAY_TIME_LEVEL);            msgInner.setTopic(msgInner.getProperty(MessageConst.PROPERTY_REAL_TOPIC));            String queueIdStr = msgInner.getProperty(MessageConst.PROPERTY_REAL_QUEUE_ID);            int queueId = Integer.parseInt(queueIdStr);            msgInner.setQueueId(queueId);            return msgInner;        }    }
  • DeliverDelayedMessageTimerTask继承了TimerTask,其run方法执行executeOnTimeup,若出现异常则使用timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, this.offset), DELAY_FOR_A_PERIOD)重新调度
  • executeOnTimeup方法首先通过defaultMessageStore.findConsumeQueue(SCHEDULE_TOPIC, delayLevel2QueueId(delayLevel))方法找到ConsumeQueue,然后取出SelectMappedBufferResult,进行遍历计算tagsCode,从而通过correctDeliverTimestamp方法计算deliverTimestamp
  • 若deliverTimestamp小于等于当前时间则构造MessageExtBrokerInner然后执行writeMessageStore.putMessage(msgInner);没有put成功则重新调度DeliverDelayedMessageTimerTask;如果deliverTimestamp大于当前时间也会重新调度DeliverDelayedMessageTimerTask

小结

ScheduleMessageService继承了ConfigManager;定义了delayLevelTable,其key为level,value为delay timeMillis;其start方法会先延时FIRST_DELAY_TIME调度DeliverDelayedMessageTimerTask;之后注册了另一个定时任务,每隔flushDelayOffsetInterval执行persist方法(ConfigManager.persist)

doc

.
.

广告 广告

评论区