侧边栏壁纸
博主头像
落叶人生博主等级

走进秋风,寻找秋天的落叶

  • 累计撰写 130562 篇文章
  • 累计创建 28 个标签
  • 累计收到 9 条评论
标签搜索

目 录CONTENT

文章目录

聊聊rocketmq的DLedgerRoleChangeHandler

2022-06-18 星期六 / 0 评论 / 0 点赞 / 72 阅读 / 10545 字

序本文主要研究一下rocketmq的DLedgerRoleChangeHandlerDLedgerRoleChangeHandlerrocketmq-all-4.6.0-source-release/

本文主要研究一下rocketmq的DLedgerRoleChangeHandler

DLedgerRoleChangeHandler

rocketmq-all-4.6.0-source-release/broker/src/main/java/org/apache/rocketmq/broker/dledger/DLedgerRoleChangeHandler.java

public class DLedgerRoleChangeHandler implements DLedgerLeaderElector.RoleChangeHandler {    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);    private ExecutorService executorService = Executors.newSingleThreadExecutor(new ThreadFactoryImpl("DLegerRoleChangeHandler_"));    private BrokerController brokerController;    private DefaultMessageStore messageStore;    private DLedgerCommitLog dLedgerCommitLog;    private DLedgerServer dLegerServer;    public DLedgerRoleChangeHandler(BrokerController brokerController, DefaultMessageStore messageStore) {        this.brokerController = brokerController;        this.messageStore = messageStore;        this.dLedgerCommitLog = (DLedgerCommitLog) messageStore.getCommitLog();        this.dLegerServer = dLedgerCommitLog.getdLedgerServer();    }    @Override public void handle(long term, MemberState.Role role) {        Runnable runnable = new Runnable() {            @Override public void run() {                long start = System.currentTimeMillis();                try {                    boolean succ = true;                    log.info("Begin handling broker role change term={} role={} currStoreRole={}", term, role, messageStore.getMessageStoreConfig().getBrokerRole());                    switch (role) {                        case CANDIDATE:                            if (messageStore.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE) {                                brokerController.changeToSlave(dLedgerCommitLog.getId());                            }                            break;                        case FOLLOWER:                            brokerController.changeToSlave(dLedgerCommitLog.getId());                            break;                        case LEADER:                            while (true) {                                if (!dLegerServer.getMemberState().isLeader()) {                                    succ = false;                                    break;                                }                                if (dLegerServer.getdLedgerStore().getLedgerEndIndex() == -1) {                                    break;                                }                                if (dLegerServer.getdLedgerStore().getLedgerEndIndex() == dLegerServer.getdLedgerStore().getCommittedIndex()                                    && messageStore.dispatchBehindBytes() == 0) {                                    break;                                }                                Thread.sleep(100);                            }                            if (succ) {                                messageStore.recoverTopicQueueTable();                                brokerController.changeToMaster(BrokerRole.SYNC_MASTER);                            }                            break;                        default:                            break;                    }                    log.info("Finish handling broker role change succ={} term={} role={} currStoreRole={} cost={}", succ, term, role, messageStore.getMessageStoreConfig().getBrokerRole(), DLedgerUtils.elapsed(start));                } catch (Throwable t) {                    log.info("[MONITOR]Failed handling broker role change term={} role={} currStoreRole={} cost={}", term, role, messageStore.getMessageStoreConfig().getBrokerRole(), DLedgerUtils.elapsed(start), t);                }            }        };        executorService.submit(runnable);    }    @Override public void startup() {    }    @Override public void shutdown() {        executorService.shutdown();    }}
  • DLedgerRoleChangeHandler实现了DLedgerLeaderElector.RoleChangeHandler接口,其handle方法会往executorService提交一个runnable;其shutdown方法会执行executorService.shutdown();runnable方法会根据MemberState.Role做不同处理,在role为CANDIDATE且messageStore.getMessageStoreConfig().getBrokerRole()不为SLAVE的时候会执行brokerController.changeToSlave(dLedgerCommitLog.getId());在role为FOLLOWER时会执行brokerController.changeToSlave(dLedgerCommitLog.getId());在role为LEADER时执行messageStore.recoverTopicQueueTable()及brokerController.changeToMaster(BrokerRole.SYNC_MASTER)

changeToSlave

rocketmq-all-4.6.0-source-release/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java

public class BrokerController {		//......    public void changeToSlave(int brokerId) {        log.info("Begin to change to slave brokerName={} brokerId={}", brokerConfig.getBrokerName(), brokerId);        //change the role        brokerConfig.setBrokerId(brokerId == 0 ? 1 : brokerId); //TO DO check        messageStoreConfig.setBrokerRole(BrokerRole.SLAVE);        //handle the scheduled service        try {            this.messageStore.handleScheduleMessageService(BrokerRole.SLAVE);        } catch (Throwable t) {            log.error("[MONITOR] handleScheduleMessageService failed when changing to slave", t);        }        //handle the transactional service        try {            this.shutdownProcessorByHa();        } catch (Throwable t) {            log.error("[MONITOR] shutdownProcessorByHa failed when changing to slave", t);        }        //handle the slave synchronise        handleSlaveSynchronize(BrokerRole.SLAVE);        try {            this.registerBrokerAll(true, true, brokerConfig.isForceRegister());        } catch (Throwable ignored) {        }        log.info("Finish to change to slave brokerName={} brokerId={}", brokerConfig.getBrokerName(), brokerId);    }	//......}
  • changeToSlave方法主要执行messageStoreConfig.setBrokerRole(BrokerRole.SLAVE),然后执行shutdownProcessorByHa()、handleSlaveSynchronize(BrokerRole.SLAVE)以及registerBrokerAll(true, true, brokerConfig.isForceRegister())

changeToMaster

rocketmq-all-4.6.0-source-release/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java

public class BrokerController {		//......    public void changeToMaster(BrokerRole role) {        if (role == BrokerRole.SLAVE) {            return;        }        log.info("Begin to change to master brokerName={}", brokerConfig.getBrokerName());        //handle the slave synchronise        handleSlaveSynchronize(role);        //handle the scheduled service        try {            this.messageStore.handleScheduleMessageService(role);        } catch (Throwable t) {            log.error("[MONITOR] handleScheduleMessageService failed when changing to master", t);        }        //handle the transactional service        try {            this.startProcessorByHa(BrokerRole.SYNC_MASTER);        } catch (Throwable t) {            log.error("[MONITOR] startProcessorByHa failed when changing to master", t);        }        //if the operations above are totally successful, we change to master        brokerConfig.setBrokerId(0); //TO DO check        messageStoreConfig.setBrokerRole(role);        try {            this.registerBrokerAll(true, true, brokerConfig.isForceRegister());        } catch (Throwable ignored) {        }        log.info("Finish to change to master brokerName={}", brokerConfig.getBrokerName());    }	//......}
  • changeToMaster方法执行handleSlaveSynchronize、startProcessorByHa(BrokerRole.SYNC_MASTER)以及registerBrokerAll(true, true, brokerConfig.isForceRegister())

小结

DLedgerRoleChangeHandler实现了DLedgerLeaderElector.RoleChangeHandler接口,其handle方法会往executorService提交一个runnable;其shutdown方法会执行executorService.shutdown();runnable方法会根据MemberState.Role做不同处理,在role为CANDIDATE且messageStore.getMessageStoreConfig().getBrokerRole()不为SLAVE的时候会执行brokerController.changeToSlave(dLedgerCommitLog.getId());在role为FOLLOWER时会执行brokerController.changeToSlave(dLedgerCommitLog.getId());在role为LEADER时执行messageStore.recoverTopicQueueTable()及brokerController.changeToMaster(BrokerRole.SYNC_MASTER)

doc

.
.

广告 广告

评论区