侧边栏壁纸
博主头像
落叶人生博主等级

走进秋风,寻找秋天的落叶

  • 累计撰写 130562 篇文章
  • 累计创建 28 个标签
  • 累计收到 9 条评论
标签搜索

目 录CONTENT

文章目录

聊聊carrera的BrokerMonitor

2022-06-11 星期六 / 0 评论 / 0 点赞 / 85 阅读 / 18111 字

序本文主要研究一下carrera的BrokerMonitorBrokerMonitorDDMQ/carrera-monitor/src/main/java/com/xiaojukeji/carrera

本文主要研究一下carrera的BrokerMonitor

BrokerMonitor

DDMQ/carrera-monitor/src/main/java/com/xiaojukeji/carrera/monitor/broker/BrokerMonitor.java

public class BrokerMonitor extends BaseConfigMonitor {    private final static Logger LOGGER = LoggerFactory.getLogger(BrokerMonitor.class);    private ExecutorService executor = ExecutorUtils.newFixedThreadPool(100, "BrokerMonitor", 200);    private BrokerMonitorItem monitorItem = null;    @Override    protected void initMonitor(String broker, BrokerConfig brokerConfig) throws Exception {        doMonitor(broker, brokerConfig);    }    public BrokerMonitor(MonitorConfig monitorConfig) {        super("Broker", monitorConfig);    }    private void doMonitor(String broker, BrokerConfig config) throws InterruptedException {        if (monitorItem != null) {            // stop first.            LOGGER.info("Stop old monitor broker: {}", broker);            monitorItem.stop();        }        BrokerMonitorItem item = new BrokerMonitorItem(broker, config);        try {            item.start();        } catch (Exception e) {            LOGGER.error("broker monitor start exception, broker=" + broker, e);        }    }    @Override    public void shutdown() {        ExecutorUtils.shutdown(executor);        monitorItem.stop();        super.shutdown();    }    //......}
  • BrokerMonitor继承了BaseConfigMonitor,其initMonitor方法执行doMonitor,其shutdown会关闭executor,同时执行monitorItem.stop();doMonitor方法判断monitorItem不为null的话,先执行monitorItem.stop(),之后创建BrokerMonitorItem,执行其start方法

BrokerMonitorItem

DDMQ/carrera-monitor/src/main/java/com/xiaojukeji/carrera/monitor/broker/BrokerMonitor.java

    class BrokerMonitorItem {        private String broker;        private BrokerConfig config;        private volatile boolean isRunning = false;        private ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor();        public BrokerMonitorItem(String broker, BrokerConfig config) {            this.broker = broker;            this.config = config;        }        public void start() {            isRunning = true;            scheduledExecutor.submit(() -> {                while (isRunning) {                    monitorNamesvr();                    monitorBroker();                    try {                        Thread.sleep(10000);                    } catch (InterruptedException e) {                        e.printStackTrace();                    }                    LOGGER.info("broker<{}> [Active]", broker);                }            });        }        public void stop() {            isRunning = false;            ExecutorUtils.shutdown(scheduledExecutor);        }        private void monitorBroker() {            if (MapUtils.isEmpty(config.getBrokers()) || StringUtils.isBlank(config.getBrokerClusterAddrs())) {                return;            }            String nameSvr = config.getBrokerClusterAddrs().split(";")[0]; // use first namesvr.            for (Map.Entry<String, Set<String>> entry : config.getBrokers().entrySet()) {                String master = entry.getKey();                Set<String> slaves = entry.getValue();                executor.execute(() -> {                    int j = 0;                    for (; j < 2; ++j) {                        try {                            long masterOffset = Utils.checkReceive(broker, nameSvr, master);                            if (masterOffset <= 0) {                                continue;                            }                            Utils.checkSend(broker, nameSvr, master);                            if (CollectionUtils.isNotEmpty(slaves)) {                                for (String slave : slaves) {                                    long slaveOffset = Utils.checkReceive(broker, nameSvr, slave);                                    LOGGER.info("ReplicaDelayCheck broker={}, address={}->{}, masterOffset={}, slaveOffset={}, delayNum={}", broker, master.split(":")[0], slave.split(":")[0], masterOffset, slaveOffset, (masterOffset - slaveOffset));                                    if (slaveOffset <= 0) {                                        continue;                                    }                                    if (masterOffset - slaveOffset > 60) {                                        LOGGER.error(String.format("[AlarmReplicaDelayErr] broker=%s, address=%s->%s, delayNum=%s", broker, master.split(":")[0], slave.split(":")[0], (masterOffset - slaveOffset)));                                    }                                }                            }                            break;                        } catch (Throwable e) {                            LOGGER.error("broker check broker exception, broker=" + broker, e);                        }                        try {                            Thread.sleep(1000);                        } catch (InterruptedException e) {                        }                    }                    if (j == 2) {                        LOGGER.error(String.format("[AlarmCheckBrokerErr] broker=%s, namesvr=%s", broker, nameSvr));                    }                });            }        }        private void monitorNamesvr() {            if (StringUtils.isBlank(config.getBrokerClusterAddrs())) {                LOGGER.info("broker:{}, brokerClusterAddrs is empty", config.getBrokerCluster());                return;            }            for (String nameSvr : config.getBrokerClusterAddrs().split(";")) {                executor.execute(() -> {                    int j = 0;                    for (; j < 2; ++j) {                        try {                            Utils.checkNameSvr(nameSvr, broker);                            LOGGER.info(String.format("[NameSvrCheck] broker=%s, namesvr=%s", broker, nameSvr));                            break;                        } catch (Throwable e) {                            LOGGER.error("broker checkNameSvr exception, broker=" + broker + ", namesvr=" + nameSvr, e);                        }                        try {                            Thread.sleep(1000);                        } catch (InterruptedException e) {                            LOGGER.error("broker checkNameSvr Thread.sleep exception, broker=" + broker, e);                        }                    }                    if (j == 2) {                        LOGGER.error(String.format("[AlarmNameSvrErr] broker=%s, namesvr=%s", broker, nameSvr));                    }                });            }        }    }
  • BrokerMonitorItem的start方法会异步执行一个runnable,该runnable会不断执行monitorNamesvr、monitorBroker方法;monitorNamesvr方法会遍历nameSvr执行Utils.checkNameSvr(nameSvr, broker);monitorBroker方法会遍历config.getBrokers().entrySet(),挨个执行Utils.checkReceive(broker, nameSvr, master)以及Utils.checkSend(broker, nameSvr, master),对于slaves执行Utils.checkReceive(broker, nameSvr, slave)

Utils

DDMQ/carrera-monitor/src/main/java/com/xiaojukeji/carrera/monitor/broker/Utils.java

public class Utils {    private static final Logger logger = LoggerFactory.getLogger(Utils.class);    private static final Map<String, DefaultMQAdminExt> mqAdminExtMap = new ConcurrentHashMap<>();    private static final Map<String, DefaultMQPullConsumer> nameSvrCheckMap = new ConcurrentHashMap<>();    private static final Map<String, DefaultMQPullConsumer> brokerReceiveCheckMap = new ConcurrentHashMap<>();    private static final Map<String, DefaultMQProducer> brokerSendCheckMap = new ConcurrentHashMap<>();    //......    public static void checkNameSvr(String nameSvr, String cluster) throws MQClientException, InterruptedException {        getNameSvrCheckConsumer(nameSvr, cluster).getDefaultMQPullConsumerImpl().fetchPublishMessageQueues("SELF_TEST_TOPIC");    }    public static long checkReceive(String cluster, String nameSvr, String address)            throws MQClientException, NoSuchFieldException, SecurityException, IllegalArgumentException,            IllegalAccessException, InterruptedException, RemotingException, MQBrokerException {        DefaultMQPullConsumer consumer = getReceiveCheckConsumer(nameSvr, cluster, address);        Field f1 = DefaultMQPullConsumerImpl.class.getDeclaredField("mQClientFactory");        f1.setAccessible(true);        MQClientInstance instance = (MQClientInstance) f1.get(consumer.getDefaultMQPullConsumerImpl());        Field f = MQClientInstance.class.getDeclaredField("brokerAddrTable");        f.setAccessible(true);        Field f2 = MQClientInstance.class.getDeclaredField("scheduledExecutorService");        f2.setAccessible(true);        ScheduledExecutorService service = (ScheduledExecutorService) f2.get(instance);        service.shutdown();        service.awaitTermination(1000, TimeUnit.SECONDS);        ConcurrentHashMap<String, HashMap<Long, String>> map = (ConcurrentHashMap<String, HashMap<Long, String>>) f.get(instance);        HashMap<Long, String> addresses = new HashMap<>();        addresses.put(0L, address);        map.put("rmqmonitor_" + address, addresses);        MessageQueue queue = new MessageQueue("SELF_TEST_TOPIC", "rmqmonitor_" + address, 0);        boolean pullOk = false;        long maxOffset = -1;        for (int i = 0; i < 2; ++i) {            try {                maxOffset = consumer.getDefaultMQPullConsumerImpl().maxOffset(queue);                PullResult result = consumer.pull(queue, "*", maxOffset > 100 ? maxOffset - 10 : 0, 1);                if (result.getPullStatus() == PullStatus.FOUND) {                    pullOk = true;                    break;                } else if(result.getPullStatus() == PullStatus.NO_NEW_MSG) {                    checkSend(cluster, nameSvr, address);                    continue;                }                logger.warn("pull result failed, PullResult={}, cluster={}, namesvr={}, address={}", result, cluster, nameSvr, address);            } catch (Throwable e) {                logger.error("pull exception, cluster={}, namesvr={}, address={}", cluster, nameSvr, address, e);            }            Thread.sleep(1000);        }        if (!pullOk) {            logger.error(String.format("[AlarmPullErr] cluster=%s, broker=%s", cluster, address));        } else {            logger.info("AlarmPullCheck cluster={}, broker={}", cluster, address);        }        return maxOffset;    }    public static void checkSend(String cluster, String nameSvr, String address) throws MQClientException, NoSuchFieldException,            SecurityException, InterruptedException, IllegalArgumentException, IllegalAccessException, UnsupportedEncodingException, MQBrokerException, RemotingException {        if (!isBrokerTopicWritable(cluster, nameSvr, address)) {            return;        }        DefaultMQProducer producer = getSendCheckProducer(nameSvr, cluster, address);        MQClientInstance instance = producer.getDefaultMQProducerImpl().getmQClientFactory();        Field f = MQClientInstance.class.getDeclaredField("brokerAddrTable");        f.setAccessible(true);        Field f2 = MQClientInstance.class.getDeclaredField("scheduledExecutorService");        f2.setAccessible(true);        ScheduledExecutorService service = (ScheduledExecutorService) f2.get(instance);        service.shutdown();        service.awaitTermination(1000, TimeUnit.SECONDS);        ConcurrentHashMap<String, HashMap<Long, String>> map = (ConcurrentHashMap<String, HashMap<Long, String>>) f                .get(instance);        HashMap<Long, String> addresses = new HashMap<>();        addresses.put(0L, address);        map.put("rmqmonitor_" + address, addresses);        MessageQueue queue = new MessageQueue("SELF_TEST_TOPIC", "rmqmonitor_" + address, 0);        boolean sendOk = false;        SendResult sendResult = null;        for (int i = 0; i < 2; i++) {            try {                Message msg = new Message("SELF_TEST_TOPIC", // topic                        "TagA", // tag                        ("Hello RocketMQ " + i).getBytes()// body                );                sendResult = producer.send(msg, queue);                if (sendResult.getSendStatus() == SendStatus.SEND_OK || sendResult.getSendStatus() == SLAVE_NOT_AVAILABLE) {                    sendOk = true;                    break;                }                logger.warn("send result failed, SendResult={}, cluster={}, namesvr={}, address={}", sendResult, cluster, nameSvr, address);            } catch (Exception e) {                logger.error("send exception, cluster={}, namesvr={}, address={}", cluster, nameSvr, address, e);            }            Thread.sleep(1000);        }        // 报警        if (!sendOk) {            logger.error(String.format("[AlarmSendErr] cluster=%s, broker=%s, result=%s", cluster, address, sendResult == null ? "null" : sendResult.toString()));        } else {            logger.info("AlarmSendCheck cluster={}, broker={}, result={}", cluster, address, sendResult.toString());        }    }    //......}
  • checkNameSvr方法执行getNameSvrCheckConsumer(nameSvr,cluster).getDefaultMQPullConsumerImpl().fetchPublishMessageQueues方法;checkReceive方法主要是从SELF_TEST_TOPIC拉取数据;checkSend方法主要是给SELF_TEST_TOPIC发送数据

小结

BrokerMonitor继承了BaseConfigMonitor,其initMonitor方法执行doMonitor,其shutdown会关闭executor,同时执行monitorItem.stop();doMonitor方法判断monitorItem不为null的话,先执行monitorItem.stop(),之后创建BrokerMonitorItem,执行其start方法

doc

.
.

广告 广告

评论区