侧边栏壁纸
博主头像
落叶人生博主等级

走进秋风,寻找秋天的落叶

  • 累计撰写 130562 篇文章
  • 累计创建 28 个标签
  • 累计收到 9 条评论
标签搜索

目 录CONTENT

文章目录

用mappedbytebuffer实现一个持久化队列

2022-07-10 星期日 / 0 评论 / 0 点赞 / 81 阅读 / 56970 字

自从前段时间的一个事故让队列里缓存的大量关键数据丢失后,一直琢磨着弄一个能持久化到本地文件的队列,这样即使系统再次发生意外,我也不至于再苦逼的修数据了。选定使用mappedbytebuffer来实现,

    自从前段时间的一个事故让队列里缓存的大量关键数据丢失后,一直琢磨着弄一个能持久化到本地文件的队列,这样即使系统再次发生意外,我也不至于再苦逼的修数据了。选定使用mappedbytebuffer来实现,但做出来的原型不够理想。《高性能队列Fqueue的设计和使用实践》这篇文章给了我莫大的帮助。 当然只是借鉴了大致的文件系统结构,绝大部分还是按自己的想法来的。

    上图就是队列的文件系统,index文件记录了队列当前的读写文件号,读写位置和读写计数。队列的size是通过读写计数writeCounter-readCounter的方式记录的,这样做的好处是可以做到读写分离。运行时size用一个AtomicInteger变量记录,系统初始化加载队列时才用到读写计数差。block文件记录了实际的入队数据,每个block必须要有足够的空间写入4(len)+data.length+4(EOF)长度的数据,否则写入一个EOF标记,换一个新的block开始写入数据,而当读取到这个EOF时,表示这个block读取完毕,载入下一个block,如果有的话,释放并准备删除当前block。现在规定的block大小32MB,按我的使用场景,每个block可以写入100W数据(PS:protostuff-runtime挺不错的,我用它做的序列化)。

    最后附上代码:

public class MFQueuePool {    private static final Logger LOGGER = LoggerFactory.getLogger(MFQueuePool.class);    private static final BlockingQueue<String> DELETING_QUEUE = new LinkedBlockingQueue<>();    private static MFQueuePool instance = null;    private String fileBackupPath;    private Map<String, MFQueue> fQueueMap;    private ScheduledExecutorService syncService;    private MFQueuePool(String fileBackupPath) {        this.fileBackupPath = fileBackupPath;        File fileBackupDir = new File(fileBackupPath);        if (!fileBackupDir.exists() && !fileBackupDir.mkdir()) {            throw new IllegalArgumentException("can not create directory");        }        this.fQueueMap = scanDir(fileBackupDir);        this.syncService = Executors.newSingleThreadScheduledExecutor();        this.syncService.scheduleWithFixedDelay(new Runnable() {            @Override            public void run() {                for (MFQueue MFQueue : fQueueMap.values()) {                    MFQueue.sync();                }                deleteBlockFile();            }        }, 10L, 10L, TimeUnit.SECONDS);    }    private void deleteBlockFile() {        String blockFilePath = DELETING_QUEUE.poll();        if (StringUtils.isNotBlank(blockFilePath)) {            File delFile = new File(blockFilePath);            try {                if (!delFile.delete()) {                    LOGGER.warn("block file:{} delete failed", blockFilePath);                }            } catch (SecurityException e) {                LOGGER.error("security manager exists, delete denied");            }        }    }    private static void toClear(String filePath) {        DELETING_QUEUE.add(filePath);    }    private Map<String, MFQueue> scanDir(File fileBackupDir) {        if (!fileBackupDir.isDirectory()) {            throw new IllegalArgumentException("it is not a directory");        }        Map<String, MFQueue> exitsFQueues = new HashMap<>();        File[] indexFiles = fileBackupDir.listFiles(new FilenameFilter() {            @Override            public boolean accept(File dir, String name) {                return MFQueueIndex.isIndexFile(name);            }        });        if (ArrayUtils.isNotEmpty(indexFiles)) {            for (File indexFile : indexFiles) {                String queueName = MFQueueIndex.parseQueueName(indexFile.getName());                exitsFQueues.put(queueName, new MFQueue(queueName, fileBackupPath));            }        }        return exitsFQueues;    }    public synchronized static void init(String deployPath) {        if (instance == null) {            instance = new MFQueuePool(deployPath);        }    }    private void disposal() {        this.syncService.shutdown();        for (MFQueue MFQueue : fQueueMap.values()) {            MFQueue.close();        }        while (!DELETING_QUEUE.isEmpty()) {            deleteBlockFile();        }    }    public synchronized static void destory() {        if (instance != null) {            instance.disposal();            instance = null;        }    }    private MFQueue getQueueFromPool(String queueName) {        if (fQueueMap.containsKey(queueName)) {            return fQueueMap.get(queueName);        }        MFQueue MFQueue = new MFQueue(queueName, fileBackupPath);        fQueueMap.put(queueName, MFQueue);        return MFQueue;    }    public synchronized static MFQueue getFQueue(String queueName) {        if (StringUtils.isBlank(queueName)) {            throw new IllegalArgumentException("empty queue name");        }        return instance.getQueueFromPool(queueName);    }    public static class MFQueue extends AbstractQueue<byte[]> {        private String queueName;        private String fileBackupDir;        private MFQueueIndex index;        private MFQueueBlock readBlock;        private MFQueueBlock writeBlock;        private ReentrantLock readLock;        private ReentrantLock writeLock;        private AtomicInteger size;        public MFQueue(String queueName, String fileBackupDir) {            this.queueName = queueName;            this.fileBackupDir = fileBackupDir;            this.readLock = new ReentrantLock();            this.writeLock = new ReentrantLock();            this.index = new MFQueueIndex(MFQueueIndex.formatIndexFilePath(queueName, fileBackupDir));            this.size = new AtomicInteger(index.getWriteCounter() - index.getReadCounter());            this.writeBlock = new MFQueueBlock(index, MFQueueBlock.formatBlockFilePath(queueName,                    index.getWriteNum(), fileBackupDir));            if (index.getReadNum() == index.getWriteNum()) {                this.readBlock = this.writeBlock.duplicate();            } else {                this.readBlock = new MFQueueBlock(index, MFQueueBlock.formatBlockFilePath(queueName,                        index.getReadNum(), fileBackupDir));            }        }        @Override        public Iterator<byte[]> iterator() {            throw new UnsupportedOperationException();        }        @Override        public int size() {            return this.size.get();        }        private void rotateNextWriteBlock() {            int nextWriteBlockNum = index.getWriteNum() + 1;            nextWriteBlockNum = (nextWriteBlockNum < 0) ? 0 : nextWriteBlockNum;            writeBlock.putEOF();            if (index.getReadNum() == index.getWriteNum()) {                writeBlock.sync();            } else {                writeBlock.close();            }            writeBlock = new MFQueueBlock(index, MFQueueBlock.formatBlockFilePath(queueName,                    nextWriteBlockNum, fileBackupDir));            index.putWriteNum(nextWriteBlockNum);            index.putWritePosition(0);        }        @Override        public boolean offer(byte[] bytes) {            if (ArrayUtils.isEmpty(bytes)) {                return true;            }            writeLock.lock();            try {                if (!writeBlock.isSpaceAvailable(bytes.length)) {                    rotateNextWriteBlock();                }                writeBlock.write(bytes);                size.incrementAndGet();                return true;            } finally {                writeLock.unlock();            }        }        private void rotateNextReadBlock() {            if (index.getReadNum() == index.getWriteNum()) {                // 读缓存块的滑动必须发生在写缓存块滑动之后                return;            }            int nextReadBlockNum = index.getReadNum() + 1;            nextReadBlockNum = (nextReadBlockNum < 0) ? 0 : nextReadBlockNum;            readBlock.close();            String blockPath = readBlock.getBlockFilePath();            if (nextReadBlockNum == index.getWriteNum()) {                readBlock = writeBlock.duplicate();            } else {                readBlock = new MFQueueBlock(index, MFQueueBlock.formatBlockFilePath(queueName,                        nextReadBlockNum, fileBackupDir));            }            index.putReadNum(nextReadBlockNum);            index.putReadPosition(0);            MFQueuePool.toClear(blockPath);        }        @Override        public byte[] poll() {            readLock.lock();            try {                if (readBlock.eof()) {                    rotateNextReadBlock();                }                byte[] bytes = readBlock.read();                if (bytes != null) {                    size.decrementAndGet();                }                return bytes;            } finally {                readLock.unlock();            }        }        @Override        public byte[] peek() {            throw new UnsupportedOperationException();        }        public void sync() {            index.sync();            // read block只读,不用同步            writeBlock.sync();        }        public void close() {            writeBlock.close();            if (index.getReadNum() != index.getWriteNum()) {                readBlock.close();            }            index.reset();            index.close();        }    }    @SuppressWarnings("UnusedDeclaration")    private static class MFQueueIndex {        private static final String MAGIC = "v.1.0000";        private static final String INDEX_FILE_SUFFIX = ".idx";        private static final int INDEX_SIZE = 32;        private static final int READ_NUM_OFFSET = 8;        private static final int READ_POS_OFFSET = 12;        private static final int READ_CNT_OFFSET = 16;        private static final int WRITE_NUM_OFFSET = 20;        private static final int WRITE_POS_OFFSET = 24;        private static final int WRITE_CNT_OFFSET = 28;        private int p11, p12, p13, p14, p15, p16, p17, p18; // 缓存行填充 32B        private volatile int readPosition;   // 12   读索引位置        private volatile int readNum;        // 8   读索引文件号        private volatile int readCounter;    // 16   总读取数量        private int p21, p22, p23, p24, p25, p26, p27, p28; // 缓存行填充 32B        private volatile int writePosition;  // 24  写索引位置        private volatile int writeNum;       // 20  写索引文件号        private volatile int writeCounter;   // 28 总写入数量        private int p31, p32, p33, p34, p35, p36, p37, p38; // 缓存行填充 32B        private RandomAccessFile indexFile;        private FileChannel fileChannel;        // 读写分离        private MappedByteBuffer writeIndex;        private MappedByteBuffer readIndex;        public MFQueueIndex(String indexFilePath) {            File file = new File(indexFilePath);            try {                if (file.exists()) {                    this.indexFile = new RandomAccessFile(file, "rw");                    byte[] bytes = new byte[8];                    this.indexFile.read(bytes, 0, 8);                    if (!MAGIC.equals(new String(bytes))) {                        throw new IllegalArgumentException("version mismatch");                    }                    this.readNum = indexFile.readInt();                    this.readPosition = indexFile.readInt();                    this.readCounter = indexFile.readInt();                    this.writeNum = indexFile.readInt();                    this.writePosition = indexFile.readInt();                    this.writeCounter = indexFile.readInt();                    this.fileChannel = indexFile.getChannel();                    this.writeIndex = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, INDEX_SIZE);                    this.writeIndex = writeIndex.load();                    this.readIndex = (MappedByteBuffer) writeIndex.duplicate();                } else {                    this.indexFile = new RandomAccessFile(file, "rw");                    this.fileChannel = indexFile.getChannel();                    this.writeIndex = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, INDEX_SIZE);                    this.readIndex = (MappedByteBuffer) writeIndex.duplicate();                    putMagic();                    putReadNum(0);                    putReadPosition(0);                    putReadCounter(0);                    putWriteNum(0);                    putWritePosition(0);                    putWriteCounter(0);                }            } catch (Exception e) {                throw new IllegalArgumentException(e);            }        }        public static boolean isIndexFile(String fileName) {            return fileName.endsWith(INDEX_FILE_SUFFIX);        }        public static String parseQueueName(String indexFileName) {            String fileName = indexFileName.substring(0, indexFileName.lastIndexOf('.'));            return fileName.split("_")[1];        }        public static String formatIndexFilePath(String queueName, String fileBackupDir) {            return fileBackupDir + File.separator + String.format("findex_%s%s", queueName, INDEX_FILE_SUFFIX);        }        public int getReadNum() {            return this.readNum;        }        public int getReadPosition() {            return this.readPosition;        }        public int getReadCounter() {            return this.readCounter;        }        public int getWriteNum() {            return this.writeNum;        }        public int getWritePosition() {            return this.writePosition;        }        public int getWriteCounter() {            return this.writeCounter;        }        public void putMagic() {            this.writeIndex.position(0);            this.writeIndex.put(MAGIC.getBytes());        }        public void putWritePosition(int writePosition) {            this.writeIndex.position(WRITE_POS_OFFSET);            this.writeIndex.putInt(writePosition);            this.writePosition = writePosition;        }        public void putWriteNum(int writeNum) {            this.writeIndex.position(WRITE_NUM_OFFSET);            this.writeIndex.putInt(writeNum);            this.writeNum = writeNum;        }        public void putWriteCounter(int writeCounter) {            this.writeIndex.position(WRITE_CNT_OFFSET);            this.writeIndex.putInt(writeCounter);            this.writeCounter = writeCounter;        }        public void putReadNum(int readNum) {            this.readIndex.position(READ_NUM_OFFSET);            this.readIndex.putInt(readNum);            this.readNum = readNum;        }        public void putReadPosition(int readPosition) {            this.readIndex.position(READ_POS_OFFSET);            this.readIndex.putInt(readPosition);            this.readPosition = readPosition;        }        public void putReadCounter(int readCounter) {            this.readIndex.position(READ_CNT_OFFSET);            this.readIndex.putInt(readCounter);            this.readCounter = readCounter;        }        public void reset() {            int size = writeCounter - readCounter;            putReadCounter(0);            putWriteCounter(size);            if (size == 0 && readNum == writeNum) {                putReadPosition(0);                putWritePosition(0);            }        }        public void sync() {            if (writeIndex != null) {                writeIndex.force();            }        }        public void close() {            try {                if (writeIndex == null) {                    return;                }                sync();                AccessController.doPrivileged(new PrivilegedAction<Object>() {                    public Object run() {                        try {                            Method getCleanerMethod = writeIndex.getClass().getMethod("cleaner");                            getCleanerMethod.setAccessible(true);                            sun.misc.Cleaner cleaner = (sun.misc.Cleaner) getCleanerMethod.invoke(writeIndex);                            cleaner.clean();                        } catch (Exception e) {                            LOGGER.error("close fqueue index file failed", e);                        }                        return null;                    }                });                writeIndex = null;                readIndex = null;                fileChannel.close();                indexFile.close();            } catch (IOException e) {                LOGGER.error("close fqueue index file failed", e);            }        }    }    private static class MFQueueBlock {        private static final String BLOCK_FILE_SUFFIX = ".blk"; // 数据文件        private static final int BLOCK_SIZE = 32 * 1024 * 1024; // 32MB        private final int EOF = -1;        private String blockFilePath;        private MFQueueIndex index;        private RandomAccessFile blockFile;        private FileChannel fileChannel;        private ByteBuffer byteBuffer;        private MappedByteBuffer mappedBlock;        public MFQueueBlock(String blockFilePath, MFQueueIndex index, RandomAccessFile blockFile, FileChannel fileChannel,                            ByteBuffer byteBuffer, MappedByteBuffer mappedBlock) {            this.blockFilePath = blockFilePath;            this.index = index;            this.blockFile = blockFile;            this.fileChannel = fileChannel;            this.byteBuffer = byteBuffer;            this.mappedBlock = mappedBlock;        }        public MFQueueBlock(MFQueueIndex index, String blockFilePath) {            this.index = index;            this.blockFilePath = blockFilePath;            try {                File file = new File(blockFilePath);                this.blockFile = new RandomAccessFile(file, "rw");                this.fileChannel = blockFile.getChannel();                this.mappedBlock = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, BLOCK_SIZE);                this.byteBuffer = mappedBlock.load();            } catch (Exception e) {                throw new IllegalArgumentException(e);            }        }        public MFQueueBlock duplicate() {            return new MFQueueBlock(this.blockFilePath, this.index, this.blockFile, this.fileChannel,                    this.byteBuffer.duplicate(), this.mappedBlock);        }        public static String formatBlockFilePath(String queueName, int fileNum, String fileBackupDir) {            return fileBackupDir + File.separator + String.format("fblock_%s_%d%s", queueName, fileNum, BLOCK_FILE_SUFFIX);        }        public String getBlockFilePath() {            return blockFilePath;        }        public void putEOF() {            this.byteBuffer.position(index.getWritePosition());            this.byteBuffer.putInt(EOF);        }        public boolean isSpaceAvailable(int len) {            int increment = len + 4;            int writePosition = index.getWritePosition();            return BLOCK_SIZE >= increment + writePosition + 4; // 保证最后有4字节的空间可以写入EOF        }        public boolean eof() {            int readPosition = index.getReadPosition();            return readPosition > 0 && byteBuffer.getInt(readPosition) == EOF;        }        public int write(byte[] bytes) {            int len = bytes.length;            int increment = len + 4;            int writePosition = index.getWritePosition();            byteBuffer.position(writePosition);            byteBuffer.putInt(len);            byteBuffer.put(bytes);            index.putWritePosition(increment + writePosition);            index.putWriteCounter(index.getWriteCounter() + 1);            return increment;        }        public byte[] read() {            byte[] bytes;            int readNum = index.getReadNum();            int readPosition = index.getReadPosition();            int writeNum = index.getWriteNum();            int writePosition = index.getWritePosition();            if (readNum == writeNum && readPosition >= writePosition) {                return null;            }            byteBuffer.position(readPosition);            int dataLength = byteBuffer.getInt();            if (dataLength <= 0) {                return null;            }            bytes = new byte[dataLength];            byteBuffer.get(bytes);            index.putReadPosition(readPosition + bytes.length + 4);            index.putReadCounter(index.getReadCounter() + 1);            return bytes;        }        public void sync() {            if (mappedBlock != null) {                mappedBlock.force();            }        }        public void close() {            try {                if (mappedBlock == null) {                    return;                }                sync();                AccessController.doPrivileged(new PrivilegedAction<Object>() {                    public Object run() {                        try {                            Method getCleanerMethod = mappedBlock.getClass().getMethod("cleaner");                            getCleanerMethod.setAccessible(true);                            sun.misc.Cleaner cleaner = (sun.misc.Cleaner) getCleanerMethod.invoke(mappedBlock);                            cleaner.clean();                        } catch (Exception e) {                            LOGGER.error("close fqueue block file failed", e);                        }                        return null;                    }                });                mappedBlock = null;                byteBuffer = null;                fileChannel.close();                blockFile.close();            } catch (IOException e) {                LOGGER.error("close fqueue block file failed", e);            }        }    }}



广告 广告

评论区