侧边栏壁纸
博主头像
落叶人生博主等级

走进秋风,寻找秋天的落叶

  • 累计撰写 130562 篇文章
  • 累计创建 28 个标签
  • 累计收到 9 条评论
标签搜索

目 录CONTENT

文章目录

聊聊chronos的MasterElection

2022-06-11 星期六 / 0 评论 / 0 点赞 / 86 阅读 / 9722 字

序本文主要研究一下chronos的MasterElectionMasterElectionDDMQ/carrera-chronos/src/main/java/com/xiaojukeji/chron

本文主要研究一下chronos的MasterElection

MasterElection

DDMQ/carrera-chronos/src/main/java/com/xiaojukeji/chronos/ha/MasterElection.java

public class MasterElection {    private static final Logger SWITCH_LOGGER = LogUtils.SWITCH_LOGGER;    private static volatile ServerState state = ServerState.BACKUPING;    public static void election(final CountDownLatch cdl) {        final CuratorFramework client = ZkUtils.getCuratorClient();        final LeaderSelector selector = new LeaderSelector(client, Constants.MASTER_PATH, new LeaderSelectorListenerAdapter() {            @Override            public void takeLeadership(CuratorFramework curatorFramework) throws Exception {                SWITCH_LOGGER.info("take master leadership");                long seekTimestamp = MetaService.getSeekTimestamp();                long zkSeekTimestamp = MetaService.getZkSeekTimestamp();                final long sleepMs = 200;                long sleepCount = 0;                // 如果zk上的数据丢失了, 则zkSeekTimestamp为0, 此时chronos则被block住                while (seekTimestamp < zkSeekTimestamp && zkSeekTimestamp > 0) {                    SWITCH_LOGGER.info("sleep {}ms to wait seekTimestamp:{} to catch up with zkSeekTimestamp:{}",                            sleepMs, seekTimestamp, zkSeekTimestamp);                    TimeUnit.MILLISECONDS.sleep(sleepMs);                    seekTimestamp = MetaService.getSeekTimestamp();                    zkSeekTimestamp = MetaService.getZkSeekTimestamp();                    sleepCount++;                }                state = ServerState.MASTERING;                SWITCH_LOGGER.info("change server state to {}, totalSleepMs:{}ms", state, sleepCount * sleepMs);                cdl.await();                state = ServerState.BACKUPING;                SWITCH_LOGGER.info("release master leadership");            }        });        selector.autoRequeue();        selector.start();    }    public static boolean isMaster() {        return state == ServerState.MASTERING;    }    public static boolean isBackup() {        return state == ServerState.BACKUPING;    }    public static void standAlone() {        state = ServerState.MASTERING;    }    public static ServerState getState() {        return state;    }}
  • MasterElection提供了election、isMaster、isBackup、standAlone、getState方法;其中election方法使用的是curator recipes的LeaderSelector,其LeaderSelectorListenerAdapter的takeLeadership方法会先获取seekTimestamp、zkSeekTimestamp,使用while循环直到seekTimestamp大于等于zkSeekTimestamp,之后更新state为ServerState.MASTERING,然后调用CountDownLatch的await方法,之后就是更新state为ServerState.BACKUPING,释放leadership;创建LeaderSelector之后调用其autoRequeue及start方法

ChronosStartup

DDMQ/carrera-chronos/src/main/java/com/xiaojukeji/chronos/ChronosStartup.java

public class ChronosStartup {    private static final Logger LOGGER = LoggerFactory.getLogger(ChronosStartup.class);    private CountDownLatch waitForShutdown;    private String configFilePath = "chronos.yaml";    private PullWorker pullWorker;    private PushWorker pushWorker;    private DeleteBgWorker deleteBgWorker;    private NettyHttpServer nettyHttpServer;    ChronosStartup(final String configFilePath) {        if (StringUtils.isNotBlank(configFilePath)) {            this.configFilePath = configFilePath;        }    }    public void start() throws Exception {        LOGGER.info("start to launch chronos...");        final long start = System.currentTimeMillis();        Runtime.getRuntime().addShutdownHook(new Thread() {            @Override            public void run() {                try {                    LOGGER.info("start to stop chronos...");                    final long start = System.currentTimeMillis();                    ChronosStartup.this.stop();                    final long cost = System.currentTimeMillis() - start;                    LOGGER.info("succ stop chronos, cost:{}ms", cost);                } catch (Exception e) {                    LOGGER.error("error while shutdown chronos, err:{}", e.getMessage(), e);                } finally {                    /* shutdown log4j2 */                    LogManager.shutdown();                }            }        });        /* 注意: 以下初始化顺序有先后次序 */        /* init config */        ConfigManager.initConfig(configFilePath);        /* init metrics */        if (!MetricService.init()) {            System.exit(-1);        }        /* init rocksdb */        RDB.init(ConfigManager.getConfig().getDbConfig().getDbPath());        /* init zk */        ZkUtils.init();        /* init seektimestamp */        MetaService.load();        waitForShutdown = new CountDownLatch(1);        if (ConfigManager.getConfig().isStandAlone()) {            /* standalone */            MasterElection.standAlone();        } else {            /* 集群模式 master election */            MasterElection.election(waitForShutdown);        }        /* init pull worker */        if (ConfigManager.getConfig().isPullOn()) {            pullWorker = PullWorker.getInstance();            pullWorker.start();        }        /* init push worker */        if (ConfigManager.getConfig().isPushOn()) {            pushWorker = PushWorker.getInstance();            pushWorker.start();        }        /* init delete worker */        if (ConfigManager.getConfig().isDeleteOn()) {            deleteBgWorker = DeleteBgWorker.getInstance();            deleteBgWorker.start();        }        final long cost = System.currentTimeMillis() - start;        LOGGER.info("succ start chronos, cost:{}ms", cost);        /* init http server */        nettyHttpServer = NettyHttpServer.getInstance();        nettyHttpServer.start();        waitForShutdown.await();    }    void stop() {        /* shutdown netty http server */        if (nettyHttpServer != null) {            nettyHttpServer.shutdown();        }        /* stop pull from MQ */        if (pullWorker != null) {            pullWorker.stop();        }        /* stop push to MQ */        if (pushWorker != null) {            pushWorker.stop();        }        /* stop delete */        if (deleteBgWorker != null) {            deleteBgWorker.stop();        }        MqConsumeStatService.getInstance().stop();        /* close zk client */        ZkUtils.close();        /* close rocksdb */        RDB.close();        if (waitForShutdown != null) {            waitForShutdown.countDown();            waitForShutdown = null;        }    }}
  • ChronosStartup的start方法会创建waitForShutdown,对于非standlone模式的执行MasterElection.election(waitForShutdown)

小结

MasterElection提供了election、isMaster、isBackup、standAlone、getState方法;其中election方法使用的是curator recipes的LeaderSelector,其LeaderSelectorListenerAdapter的takeLeadership方法会先获取seekTimestamp、zkSeekTimestamp,使用while循环直到seekTimestamp大于等于zkSeekTimestamp,之后更新state为ServerState.MASTERING,然后调用CountDownLatch的await方法,之后就是更新state为ServerState.BACKUPING,释放leadership;创建LeaderSelector之后调用其autoRequeue及start方法

doc

.
.

广告 广告

评论区