侧边栏壁纸
博主头像
落叶人生博主等级

走进秋风,寻找秋天的落叶

  • 累计撰写 130562 篇文章
  • 累计创建 28 个标签
  • 累计收到 9 条评论
标签搜索

目 录CONTENT

文章目录

聊聊rocketmq的HAClient

2022-07-03 星期日 / 0 评论 / 0 点赞 / 40 阅读 / 10535 字

序本文主要研究一下rocketmq的HAClientHAClientrocketmq-all-4.6.0-source-release/store/src/main/java/org/apache/r

本文主要研究一下rocketmq的HAClient

HAClient

rocketmq-all-4.6.0-source-release/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java

    class HAClient extends ServiceThread {        private static final int READ_MAX_BUFFER_SIZE = 1024 * 1024 * 4;        private final AtomicReference<String> masterAddress = new AtomicReference<>();        private final ByteBuffer reportOffset = ByteBuffer.allocate(8);        private SocketChannel socketChannel;        private Selector selector;        private long lastWriteTimestamp = System.currentTimeMillis();        private long currentReportedOffset = 0;        private int dispatchPosition = 0;        private ByteBuffer byteBufferRead = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);        private ByteBuffer byteBufferBackup = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);        public HAClient() throws IOException {            this.selector = RemotingUtil.openSelector();        }        //......        @Override        public void run() {            log.info(this.getServiceName() + " service started");            while (!this.isStopped()) {                try {                    if (this.connectMaster()) {                        if (this.isTimeToReportOffset()) {                            boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset);                            if (!result) {                                this.closeMaster();                            }                        }                        this.selector.select(1000);                        boolean ok = this.processReadEvent();                        if (!ok) {                            this.closeMaster();                        }                        if (!reportSlaveMaxOffsetPlus()) {                            continue;                        }                        long interval =                            HAService.this.getDefaultMessageStore().getSystemClock().now()                                - this.lastWriteTimestamp;                        if (interval > HAService.this.getDefaultMessageStore().getMessageStoreConfig()                            .getHaHousekeepingInterval()) {                            log.warn("HAClient, housekeeping, found this connection[" + this.masterAddress                                + "] expired, " + interval);                            this.closeMaster();                            log.warn("HAClient, master not response some time, so close connection");                        }                    } else {                        this.waitForRunning(1000 * 5);                    }                } catch (Exception e) {                    log.warn(this.getServiceName() + " service has exception. ", e);                    this.waitForRunning(1000 * 5);                }            }            log.info(this.getServiceName() + " service end");        }        @Override        public String getServiceName() {            return HAClient.class.getSimpleName();        }        //......        private boolean processReadEvent() {            int readSizeZeroTimes = 0;            while (this.byteBufferRead.hasRemaining()) {                try {                    int readSize = this.socketChannel.read(this.byteBufferRead);                    if (readSize > 0) {                        readSizeZeroTimes = 0;                        boolean result = this.dispatchReadRequest();                        if (!result) {                            log.error("HAClient, dispatchReadRequest error");                            return false;                        }                    } else if (readSize == 0) {                        if (++readSizeZeroTimes >= 3) {                            break;                        }                    } else {                        log.info("HAClient, processReadEvent read socket < 0");                        return false;                    }                } catch (IOException e) {                    log.info("HAClient, processReadEvent read socket exception", e);                    return false;                }            }            return true;        }        //......            }
  • HAClient继承了ServiceThread,其run方法以isStopped为false进行while循环,之后通过connectMaster方法判断是否连上masterAddress,连不上则执行waitForRunning(1000 * 5);连上了master之后再判断isTimeToReportOffset,即判断当前时间与lastWriteTimestamp的差值,若该值大于defaultMessageStore.getMessageStoreConfig().getHaSendHeartbeatInterval(),则返回true;最后执行processReadEvent;processReadEvent在byteBufferRead.hasRemaining()前提下会执行dispatchReadRequest

dispatchReadRequest

rocketmq-all-4.6.0-source-release/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java

    class HAClient extends ServiceThread {    	//......        private boolean dispatchReadRequest() {            final int msgHeaderSize = 8 + 4; // phyoffset + size            int readSocketPos = this.byteBufferRead.position();            while (true) {                int diff = this.byteBufferRead.position() - this.dispatchPosition;                if (diff >= msgHeaderSize) {                    long masterPhyOffset = this.byteBufferRead.getLong(this.dispatchPosition);                    int bodySize = this.byteBufferRead.getInt(this.dispatchPosition + 8);                    long slavePhyOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();                    if (slavePhyOffset != 0) {                        if (slavePhyOffset != masterPhyOffset) {                            log.error("master pushed offset not equal the max phy offset in slave, SLAVE: "                                + slavePhyOffset + " MASTER: " + masterPhyOffset);                            return false;                        }                    }                    if (diff >= (msgHeaderSize + bodySize)) {                        byte[] bodyData = new byte[bodySize];                        this.byteBufferRead.position(this.dispatchPosition + msgHeaderSize);                        this.byteBufferRead.get(bodyData);                        HAService.this.defaultMessageStore.appendToCommitLog(masterPhyOffset, bodyData);                        this.byteBufferRead.position(readSocketPos);                        this.dispatchPosition += msgHeaderSize + bodySize;                        if (!reportSlaveMaxOffsetPlus()) {                            return false;                        }                        continue;                    }                }                if (!this.byteBufferRead.hasRemaining()) {                    this.reallocateByteBuffer();                }                break;            }            return true;        }        //......    }
  • dispatchReadRequest会判断diff >= (msgHeaderSize + bodySize),若成立则执行defaultMessageStore.appendToCommitLog(masterPhyOffset, bodyData)

小结

  • HAClient继承了ServiceThread,其run方法以isStopped为false进行while循环,之后通过connectMaster方法判断是否连上masterAddress,连不上则执行waitForRunning(1000 * 5);连上了master之后再判断isTimeToReportOffset,即判断当前时间与lastWriteTimestamp的差值,若该值大于defaultMessageStore.getMessageStoreConfig().getHaSendHeartbeatInterval(),则返回true;最后执行processReadEvent;processReadEvent在byteBufferRead.hasRemaining()前提下会执行dispatchReadRequest

doc

.
.

广告 广告

评论区