侧边栏壁纸
博主头像
落叶人生博主等级

走进秋风,寻找秋天的落叶

  • 累计撰写 130562 篇文章
  • 累计创建 28 个标签
  • 累计收到 9 条评论
标签搜索

目 录CONTENT

文章目录

聊聊chronos的cancelMessage

2022-06-12 星期日 / 0 评论 / 0 点赞 / 85 阅读 / 9313 字

序本文主要研究一下chronos的cancelMessageMqPullServiceDDMQ/carrera-chronos/src/main/java/com/xiaojukeji/chronos

本文主要研究一下chronos的cancelMessage

MqPullService

DDMQ/carrera-chronos/src/main/java/com/xiaojukeji/chronos/services/MqPullService.java

public class MqPullService implements Runnable {    private static final Logger LOGGER = LoggerFactory.getLogger(MqPullService.class);    private static final PullConfig PULL_CONFIG = ConfigManager.getConfig().getPullConfig();    private static final Batcher BATCHER = Batcher.getInstance();    private volatile boolean shouldStop = false;    private CountDownLatch cdl;    private final List<Long> succOffsets = new ArrayList<>();    private final List<Long> failOffsets = new ArrayList<>();    private SimpleCarreraConsumer carreraConsumer;    private String mqPullServiceName;    private final int INTERNAL_PAIR_COUNT = 5000;    private final BlockingQueue<InternalPair> blockingQueue = new ArrayBlockingQueue<>(INTERNAL_PAIR_COUNT);    //......    private void cancelMessage(final InternalKey internalKey, final String topic, final int action) {        InternalKey tombStoneInternalKey = internalKey.cloneTombstoneInternalKey();        if (internalKey.getType() == MsgTypes.DELAY.getValue()) {            MetricService.incPullQps(topic, MetricMsgAction.CANCEL, MetricMsgType.DELAY);            BATCHER.putToDefaultCF(tombStoneInternalKey.genUniqDelayMsgId(),                    new CancelWrap(internalKey.genUniqDelayMsgId(), topic).toJsonString(), topic, tombStoneInternalKey, action);        } else if (internalKey.getType() == MsgTypes.LOOP_DELAY.getValue()) {            MetricService.incPullQps(topic, MetricMsgAction.CANCEL, MetricMsgType.LOOP_DELAY);            BATCHER.putLoopTombstoneKey(tombStoneInternalKey, internalKey, topic, action);        } else if (internalKey.getType() == MsgTypes.LOOP_EXPONENT_DELAY.getValue()) {            MetricService.incPullQps(topic, MetricMsgAction.CANCEL, MetricMsgType.LOOP_EXPONENT_DELAY);            BATCHER.putLoopTombstoneKey(tombStoneInternalKey, internalKey, topic, action);        } else {            MetricService.incPullQps(topic, MetricMsgAction.CANCEL, MetricMsgType.UNKNOWN);            LOGGER.error("should not go here, invalid message type: {}, internalKey: {}", internalKey.getType(),                    internalKey.genUniqDelayMsgId());        }    }    //......}
  • cancelMessage方法首先通过internalKey.cloneTombstoneInternalKey()构造tombStoneInternalKey,之后对于MsgTypes.DELAY类型的执行BATCHER.putToDefaultCF(tombStoneInternalKey.genUniqDelayMsgId(), new CancelWrap(internalKey.genUniqDelayMsgId(), topic).toJsonString(), topic, tombStoneInternalKey, action);对于MsgTypes.LOOP_DELAY及MsgTypes.LOOP_EXPONENT_DELAY的执行BATCHER.putLoopTombstoneKey(tombStoneInternalKey, internalKey, topic, action)

InternalKey

DDMQ/carrera-common/src/main/java/com/xiaojukeji/carrera/chronos/model/InternalKey.java

public class InternalKey {    private static final String SEPARATOR = "-";    private static final int LEN_UUID = 36;    private static final long ONE_DAY_SECONDS = 24 * 60 * 60;    private long timestamp;    private int type;    private long expire;    private long times;    private long timed;    private long interval;    private int innerTopicSeq;    private String uuid;    private int segmentNum;    private int segmentIndex;    //......    public InternalKey cloneTombstoneInternalKey() {        InternalKey tombstoneInternalKey = new InternalKey(this);        tombstoneInternalKey.setType(MsgTypes.TOMBSTONE.getValue());        return tombstoneInternalKey;    }    //......}
  • cloneTombstoneInternalKey方法设置type为MsgTypes.TOMBSTONE.getValue()

CancelWrap

DDMQ/carrera-chronos/src/main/java/com/xiaojukeji/chronos/model/CancelWrap.java

public class CancelWrap {    private String uniqDelayMsgId;    private String topic;    public CancelWrap() {    }    public CancelWrap(String uniqDelayMsgId, String topic) {        this.uniqDelayMsgId = uniqDelayMsgId;        this.topic = topic;    }    public String getUniqDelayMsgId() {        return uniqDelayMsgId;    }    public void setUniqDelayMsgId(String uniqDelayMsgId) {        this.uniqDelayMsgId = uniqDelayMsgId;    }    public String getTopic() {        return topic;    }    public void setTopic(String topic) {        this.topic = topic;    }    public String toJsonString() {        return JsonUtils.toJsonString(this);    }    @Override    public String toString() {        return "CancelWrap{" +                "uniqDelayMsgId='" + uniqDelayMsgId + '/'' +                ", topic='" + topic + '/'' +                '}';    }}
  • CancelWrap定义了uniqDelayMsgId及topic两个属性

Batcher

DDMQ/carrera-chronos/src/main/java/com/xiaojukeji/chronos/autobatcher/Batcher.java

public class Batcher {    private static final Logger LOGGER = LoggerFactory.getLogger(Batcher.class);    private static final int PULL_BATCH_ITEM_NUM = ConfigManager.getConfig().getPullConfig().getPullBatchItemNum();    private static final int MSG_BYTE_BASE_LEN = ConfigManager.getConfig().getPullConfig().getMsgByteBaseLen();    private WriteBatch wb = new WriteBatch();    private volatile int itemNum = 0;    private static volatile Batcher instance = null;    public static volatile ReentrantLock lock = new ReentrantLock();    //......    public void putLoopTombstoneKey(final InternalKey tombstoneInternalKey, InternalKey internalKey, final String topic, final int action) {        lock.lock();        try {            // 指数循环            // 1536811267-4-1536911267-3-0-300-0-9e7952e0-b709-11e8-a709-aafb4bfc0bc5            // 1536811567-4-1536911267-3-1-300-0-9e7952e0-b709-11e8-a709-aafb4bfc0bc5            // 1536897967-4-1536911267-3-2-300-0-9e7952e0-b709-11e8-a709-aafb4bfc0bc5            // 普通循环            // 1536811267-3-1536911267-3-0-10-0-9e7952e0-b709-11e8-a709-aafb4bfc0bc5            while (!KeyUtils.afterSeekTimestamp(internalKey.getTimestamp())) {                internalKey = internalKey.nextUniqDelayMsgId();            }            tombstoneInternalKey.setTimestamp(internalKey.getTimestamp());            tombstoneInternalKey.setTimes(internalKey.getTimed() + 2);            tombstoneInternalKey.setTimed(internalKey.getTimed());            if (!KeyUtils.isInvalidMsg(tombstoneInternalKey)) {                putToDefaultCF(tombstoneInternalKey.genUniqDelayMsgId(),                        new CancelWrap(internalKey.genUniqDelayMsgId(), topic).toJsonString(), topic, internalKey, action);            }        } finally {            lock.unlock();        }    }    //......}
  • putLoopTombstoneKey方法通过KeyUtils.afterSeekTimestamp(internalKey.getTimestamp())寻找internalKey,然后通过putToDefaultCF添加一条CancelWrap记录

小结

cancelMessage方法首先通过internalKey.cloneTombstoneInternalKey()构造tombStoneInternalKey,之后对于MsgTypes.DELAY类型的执行BATCHER.putToDefaultCF(tombStoneInternalKey.genUniqDelayMsgId(), new CancelWrap(internalKey.genUniqDelayMsgId(), topic).toJsonString(), topic, tombStoneInternalKey, action);对于MsgTypes.LOOP_DELAY及MsgTypes.LOOP_EXPONENT_DELAY的执行BATCHER.putLoopTombstoneKey(tombStoneInternalKey, internalKey, topic, action)

doc

.
.

广告 广告

评论区