百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 编程字典 > 正文

RocketMQ源码分析之文件内存映射对象层MappedFile核心方法

toyiye 2024-09-16 06:02 3 浏览 0 评论

一、前言

RocketMQ的存储都基于MappedFile实现,如CommitLog、Index索引、ConsumeQueue等,本文则主要介绍的实现机制,包括MappedFileQueue类介绍、MappedFile类介绍、预热、MappedFile预分配服务AllocateMappedFileService、MappedFile刷盘等内容。

父类ReferenceResource主要是进行并发控制的;

二、源码导读

  1. MappedFile成员变量;
  2. 父类ReferenceResource成员变量;
  3. MappedFile构造方法;
  4. 清理内存映射区域;
  5. 消息追加;
  6. 消息MessageExtBrokerInner数据模型;
  7. 追加消息回调接口;
  8. 刷盘;
  9. commit提交暂存池数据;
  10. 从指定的位置开始读取一段内存数据片段;
  11. 销毁一个mappedfile
  12. 磁盘预热;

1、MappedFile成员变量

// 磁盘内存映射文件
public class MappedFile extends ReferenceResource {

    // 日志组件
    protected static final InternalLogger log = InternalLoggerFactory.getLogger(
            LoggerName.STORE_LOGGER_NAME
    );

    // os内存页大小,默认是4kb
    public static final int OS_PAGE_SIZE = 1024 * 4;
    // mapped虚拟内存总大小,所有磁盘文件映射到内存的数据量总大小
    private static final AtomicLong TOTAL_MAPPED_VIRTUAL_MEMORY = new AtomicLong(0);
    // mappedfile总数
    private static final AtomicInteger TOTAL_MAPPED_FILES = new AtomicInteger(0);

    // 写入位置
    protected final AtomicInteger wrotePosition = new AtomicInteger(0);
    // 提交位置
    protected final AtomicInteger committedPosition = new AtomicInteger(0);
    // 已经刷盘的位置
    private final AtomicInteger flushedPosition = new AtomicInteger(0);
    // 文件大小
    protected int fileSize;
    // 该MappedFile文件对应的channel
    protected FileChannel fileChannel;
    /**
     * Message will put to here first, and then reput to FileChannel if writeBuffer is not null.
     * 如果启用了TransientStorePool,则writeBuffer为从暂时存储池中借用buffer
     * 消息会先写入这个写缓冲区域里面去,然后再进行reput写入到文件通道里去,写入映射内存区域里去
     * 最后对fileChannel进行flush刷盘传播到物理磁盘里去
     */
    protected ByteBuffer writeBuffer = null;
    // 瞬时存储池化组件:一个内存ByteBuffer池实现,如果如果启用了TransientStorePool则不为空
    protected TransientStorePool transientStorePool = null;
    // 文件名:其实就是该文件内容默认其实位置
    private String fileName;
    // 文件从哪个偏移量开始:该文件中内容相对于整个文件的偏移,其实和文件名相同
    private long fileFromOffset;
    // 该MappedFile对应的实际文件
    private File file;
    // 磁盘文件映射到内存区域:通过fileChannel.map得到的可读写的内存映射buffer
    // TransientStorePool则写数据时会写到该缓冲中,刷盘时直接调用该映射buffer的force函数,而不需要进行commit操作
    private MappedByteBuffer mappedByteBuffer;
    // 存储时间戳
    private volatile long storeTimestamp = 0;
    // 是否再队列里第一次创建标记
    private boolean firstCreateInQueue = false;
}

2、父类ReferenceResource成员变量

// 引用资源组件
public abstract class ReferenceResource {

    // 资源引用计数,如果有人用这个资源,就可以对这个资源进行引用,默认初始值是1
    // 如果说没有人引用他,他引用计数就是1,所以此时如果说去做shutdown应该是递减为0,就可以释放
    protected final AtomicLong refCount = new AtomicLong(1);
    // 当前资源是否可用的标记,默认true
    protected volatile boolean available = true;
    // 当前资源是否完成清理的标记,默认false
    protected volatile boolean cleanupOver = false;
    // 资源第一次关闭的时间戳
    private volatile long firstShutdownTimestamp = 0;
}

3、MappedFile构造方法

构造方法最核心的是调用了init初始化方法;

public MappedFile(final String fileName, final int fileSize) throws IOException {
    init(fileName, fileSize);
}
// 写入缓冲区,仅仅是在启用了瞬时存储池化技术之后,才会使用写缓冲区
public void init(
        final String fileName,
        final int fileSize,
        final TransientStorePool transientStorePool) throws IOException {
    init(fileName, fileSize);
    this.writeBuffer = transientStorePool.borrowBuffer(); // buffer是从临时buffer池中获取到的一个buffer
    // transient store pool,池子,里面分配一堆的临时buffer,拿到一个一个buffer
    // 每个mappedfile都必须对应一个临时buffer
    this.transientStorePool = transientStorePool;
}

TransientStorePool类比较简单,采用双端队列Deque维护了一些列的预分配的ByteBuffer,这些ByteBuffer都是在堆外分配的直接内存,DefaultMessageStore会持有TransientStorePool对象实例,如果启动时配置了启用transientStorePoolEnable,那么在DefaultMessageStore构造函数中会调用TransientStorePool.init方法,预分配ByteBuffer并放入队列中,如果启动时没有启用TransientStorePool功能,则不会调用TransientStorePool.init方法,那么从队列中获取ByteBuffer会返回null。

public class TransientStorePool {
    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);

    // 池中预分配ByteBuffer数量
    private final int poolSize;
    // 每个ByteBuffer大小
    private final int fileSize;
    // 采用双端队列维护预分配的ByteBuffer
    private final Deque<ByteBuffer> availableBuffers;
    private final MessageStoreConfig storeConfig;

    public TransientStorePool(final MessageStoreConfig storeConfig) {
        this.storeConfig = storeConfig;
        this.poolSize = storeConfig.getTransientStorePoolSize();
        this.fileSize = storeConfig.getMappedFileSizeCommitLog();
        this.availableBuffers = new ConcurrentLinkedDeque<>();
    }

    /**
     * It's a heavy init method.
     */
    public void init() {
        // 默认会搞5个写入缓冲区,缓冲区大小就是一个CommitLog大小,就是1GB
        for (int i = 0; i < poolSize; i++) {
            ByteBuffer byteBuffer = ByteBuffer.allocateDirect(fileSize);

            final long address = ((DirectBuffer) byteBuffer).address();
            Pointer pointer = new Pointer(address);
            LibC.INSTANCE.mlock(pointer, new NativeLong(fileSize));

            availableBuffers.offer(byteBuffer);
        }
    }

    // 销毁内存池
    public void destroy() {
        // 取消对内存的锁定
        for (ByteBuffer byteBuffer : availableBuffers) {
            final long address = ((DirectBuffer) byteBuffer).address();
            Pointer pointer = new Pointer(address);
            LibC.INSTANCE.munlock(pointer, new NativeLong(fileSize));
        }
    }

    // 使用完毕之后归还ByteBuffer
    public void returnBuffer(ByteBuffer byteBuffer) {
        // ByteBuffer各下标复位
        byteBuffer.position(0);
        byteBuffer.limit(fileSize);
        // 放入队头,等待下次重新被分配
        this.availableBuffers.offerFirst(byteBuffer);
    }

    // 从池中获取ByteBuffer
    public ByteBuffer borrowBuffer() {
        // 非阻塞弹出队头元素,如果没有启用暂存池
        // 则不会调用init方法,队列中就没有元素,这里返回null
        // 其次,如果队列中所有元素都被借用出去,队列也为空此时也会返回null
        ByteBuffer buffer = availableBuffers.pollFirst();
        // 如果队列中剩余元素数量小于配置个数的0.4,则写日志提示
        if (availableBuffers.size() < poolSize * 0.4) {
            log.warn("TransientStorePool only remain {} sheets.", availableBuffers.size());
        }
        return buffer;
    }

    // 剩下可借出的ByteBuffer数量
    public int availableBufferNums() {
        // 如果启用了暂存池,则返回队列中元素个数
        if (storeConfig.isTransientStorePoolEnable()) {
            return availableBuffers.size();
        }
        // 否则返会Integer.MAX_VALUE
        return Integer.MAX_VALUE;
    }
}

核心功能就是通过fileChannel.map得到的可读写的内存映射buffer;

MapMode 表示打开模式,为枚举值,其值可以为 READ_ONLY, READ_WRITE, PRIVATE。

  • 模式为 READ_ONLY 时,不能对 buf 进行写操作;
  • 模式为 READ_WRITE 时,通道 fileChannel 必须具有读写文件的权限;对 buf 进行的写操作将对文件生效,但不保证立即同步到 I/O 设备;
  • 模式为 PRIVATE 时,通道 fileChannle 必须对文件有读写权限;但是对文件的修改操作不会传播到 I/O 设备,而是会在内存复制一份数据。此时对文件的修改对其它线程和进程不可见。

注意:调用 map() 方法之后,返回的 MappedByteBuffer 就于 fileChannel 脱离了关系,关闭 fileChannel 对 buf 没有影响。同时,如果要确保对 buf 修改的数据能够同步到文件 I/O 设备中,需要调用 MappedByteBuffer 中的无参数的 force() 方法,而调用 FileChannel 中的 force(metaData) 方法无效。

private void init(final String fileName, final int fileSize) throws IOException {
  	// 记录文件名、大小等信息
    this.fileName = fileName;
    this.fileSize = fileSize;
  	// 新建File对象
    this.file = new File(fileName);
    // 每个磁盘文件的文件名称,都是CommitLog里面某一个位置的偏移量
    // 完整的一个CommitLog来说他是会拆分为多个mappedfile,你往里面不断的写入消息,偏移量会不断的增长
    // 不断的拆分出来一个一个mappedfile,每个file的名字就是当前的CommitLog整体的一个偏移量
    // 用写入到的一个偏移量位置作为一个新的mappedfile文件名称
    // 所以说在这里是对文件名称进行long型数据的转化,fileFromOffset,这个mappedfile文件是从哪个总偏移量开始写入的
    this.fileFromOffset = Long.parseLong(this.file.getName());
    boolean ok = false;

    // 确认文件所在的目录是ok的
    ensureDirOK(this.file.getParent());

    try {
        // 构建一个RandomAccessFile,随机读写文件组件,nio机制里面的
        // 注意注意注意,是针对磁盘文件,构建了随机读写文件,然后拿到了一个nio文件通道
        this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
        // 是通过nio文件通道执行了map函数,这个函数就会把你的磁盘文件做一个内存映射,磁盘文件映射到内存区域里去
        // 映射完毕了以后,拿到的磁盘文件映射的内存区域,是由nio里面的ByteBuffer来代表的
        this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);
        // 当前映射总内存大小可以加入文件size
        TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize);
        // 当前映射总文件数量可以加1
        TOTAL_MAPPED_FILES.incrementAndGet();
        ok = true;
    } catch (FileNotFoundException e) {
        log.error("Failed to create file " + this.fileName, e);
        throw e;
    } catch (IOException e) {
        log.error("Failed to map file " + this.fileName, e);
        throw e;
    } finally {
        if (!ok && this.fileChannel != null) {
            this.fileChannel.close();
        }
    }
}

4、清理内存映射区域

// 对内存映射区域进行清理
public static void clean(final ByteBuffer buffer) {
    // 如果说内存映射区域是null,或者是内存映射区域不是堆外内存,或者是内存映射区域容量是0,此时就不能清理了
    if (buffer == null || !buffer.isDirect() || buffer.capacity() == 0)
        return;
    // 否则正常可以清理
    // 获取到了原生buffer以后可以通过反射调用buffer里面的cleaner函数,此时可以获取到原生buffer绑定的清理组件
    // 获取到了原生buffer的清理组件以后,再次对清理组件反射调用clean函数,正式对原生buffer去做一个清理
    invoke(invoke(viewed(buffer), "cleaner"), "clean");
}

// 安全反射
// target反射目标对象,methodName是反射方法名称,args是反射方法入参
private static Object invoke(final Object target, final String methodName, final Class<?>... args) {
    // 这个东西是属于jdk提供的安全调用机制
    return AccessController.doPrivileged(new PrivilegedAction<Object>() {
        public Object run() {
            try {
                Method method = method(target, methodName, args);
                method.setAccessible(true);
                return method.invoke(target);
            } catch (Exception e) {
                throw new IllegalStateException(e);
            }
        }
    });
}

private static Method method(Object target, String methodName, Class<?>[] args)
    throws NoSuchMethodException {
    try {
        return target.getClass().getMethod(methodName, args);
    } catch (NoSuchMethodException e) {
        return target.getClass().getDeclaredMethod(methodName, args);
    }
}

// 对内存映射区域需要去获取一个视图
// 他怕的是你原生buffer被一些包装性的一些buffer进行包裹,你就需要一层一层的剥开这个buffer
// 到里面去进行真正的一个原生buffer获取
private static ByteBuffer viewed(ByteBuffer buffer) {
    // 默认要反射获取viewedBuffer方法
    String methodName = "viewedBuffer";

    // 如果说对ByteBuffer反射获取method发现了一个attachment这个方法,就用这个方法名字
    Method[] methods = buffer.getClass().getMethods();
    for (int i = 0; i < methods.length; i++) {
        if (methods[i].getName().equals("attachment")) {
            methodName = "attachment";
            break;
        }
    }

    // 对这个ByteBuffer需要去反射调用这个viewedBuffer / attachment这个方法
    // 对原生的bytebuffer反射调用里面的方法,获取到了一个内部的视图buffer,或者是附件buffer
    ByteBuffer viewedBuffer = (ByteBuffer) invoke(buffer, methodName);
    // 如果说这个视图buffer是null,此时就返回原生buffer就可以了
    if (viewedBuffer == null)
        return buffer;
    // 反之如果说这个视图buffer不是null,此时就还需要再次递归调用视图buffer获取函数
    else
        return viewed(viewedBuffer);
}

5、消息追加

// 可以把消息追加到mappedfile里去
public AppendMessageResult appendMessage(
        final MessageExtBrokerInner msg, // 消息
        final AppendMessageCallback cb, // 消息追加回调函数
        PutMessageContext putMessageContext) { // 写入消息上下文
    return appendMessagesInner(msg, cb, putMessageContext);
}

// 追加消息
public AppendMessageResult appendMessages(
        final MessageExtBatch messageExtBatch,
        final AppendMessageCallback cb,
        PutMessageContext putMessageContext) {
    return appendMessagesInner(messageExtBatch, cb, putMessageContext);
}

// 追加消息内部函数
public AppendMessageResult appendMessagesInner(
        final MessageExt messageExt,
        final AppendMessageCallback cb,
        PutMessageContext putMessageContext) {
    assert messageExt != null;
    assert cb != null;

    int currentPos = this.wrotePosition.get();

    if (currentPos < this.fileSize) {
        // 常规情况下,没开启存储池化技术,直接用磁盘映射内存区域来写入就可以了
        // 如果writeBuffer不为空,则优先写入writeBuffer,否则写入mappedByteBuffer,
        // 通过前面的介绍可以知道,如果启用了暂存池TransientStorePool则writeBuffer会被初始化
        // 否则writeBuffer为空,slice方法返回一个新的byteBuffer
        // 但是这里新的byteBuffer和原先的ByteBuffer共用一个存储空间
        // 只是自己维护的相关下标
        ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
        // 然后将编码后的数据写入这里得到的byteBuffer等待刷盘
        byteBuffer.position(currentPos);
        AppendMessageResult result;

        // 如果是单条消息,也可以是批量消息
        if (messageExt instanceof MessageExtBrokerInner) {
            result = cb.doAppend(
                    this.getFileFromOffset(),
                    byteBuffer,
                    this.fileSize - currentPos,
                    (MessageExtBrokerInner) messageExt,
                    putMessageContext
            );
        } else if (messageExt instanceof MessageExtBatch) {
            result = cb.doAppend(
                    this.getFileFromOffset(),
                    byteBuffer,
                    this.fileSize - currentPos,
                    (MessageExtBatch) messageExt,
                    putMessageContext
            );
        } else {
            return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
        }
        // 更新一下写入位置
        this.wrotePosition.addAndGet(result.getWroteBytes());
        this.storeTimestamp = result.getStoreTimestamp();
        return result;
    }
    log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize);
    return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}
public long getFileFromOffset() {
    return this.fileFromOffset;
}
static class PutMessageContext {

    // topic-queueId
    private String topicQueueTableKey;
    // 消息物理位置字节数组
    private long[] phyPos;
    // 批量消息大小
    private int batchSize;
}

6、消息MessageExtBrokerInner数据模型

// 消息扩展在broker内部的一些东西
public class MessageExtBrokerInner extends MessageExt {

    private static final long serialVersionUID = 7256001576878700634L;

    // 消息属性字符串
    private String propertiesString;
    // tags code
    private long tagsCode;
    // 编码后的消息
    private ByteBuffer encodedBuff;
}
// 消息扩展
public class MessageExt extends Message {

    private static final long serialVersionUID = 5720810158625748049L;

    // 消息投递到了哪个broker组里面去
    private String brokerName;
    // 消息投递到了topic里面的哪个queueId里面去,这个queue一定是在一个broker组里某台机器里面
    private int queueId;
    // 消息存储大小
    private int storeSize;
    // 消息队列偏移量
    private long queueOffset;
    // sysflag
    private int sysFlag;
    // 消息诞生时间戳
    private long bornTimestamp;
    // 消息诞生机器地址
    private SocketAddress bornHost;
    // 消息存储时间戳
    private long storeTimestamp;
    // 消息存储机器地址
    private SocketAddress storeHost;
    // 消息id
    private String msgId;
    // 消息在commitlog里面的物理偏移量
    private long commitLogOffset;
    // 消息内容crc校验和
    private int bodyCRC;
    // 消息重新消费次数
    private int reconsumeTimes;
    // prepared事务消息偏移量
    private long preparedTransactionOffset;
}
// 消息
public class Message implements Serializable {

    private static final long serialVersionUID = 8445773977080406428L;

    // 消息投递到哪个topic里去
    private String topic;
    // flag
    private int flag;
    // 消息属性
    private Map<String, String> properties;
    // 消息内容
    private byte[] body;
    // 事务消息id
    private String transactionId;
}

7、追加消息回调接口

// 默认追加消息回调接口
class DefaultAppendMessageCallback implements AppendMessageCallback {

    // File at the end of the minimum fixed length empty
    private static final int END_FILE_MIN_BLANK_LENGTH = 4 + 4;
    private final ByteBuffer msgIdMemory; // msgId内存区域
    private final ByteBuffer msgIdV6Memory; // v6版本的msgId内存区域
    // Store the message content
    private final ByteBuffer msgStoreItemMemory; // 消息存储条目内存区域
    // The maximum length of the message
    private final int maxMessageSize; // 最大消息大小

    DefaultAppendMessageCallback(final int size) {
        this.msgIdMemory = ByteBuffer.allocate(4 + 4 + 8);
        this.msgIdV6Memory = ByteBuffer.allocate(16 + 4 + 8);
        this.msgStoreItemMemory = ByteBuffer.allocate(END_FILE_MIN_BLANK_LENGTH);
        this.maxMessageSize = size;
    }

    public AppendMessageResult doAppend(
            // 在文件里的哪个偏移量开始进行写入
            final long fileFromOffset,
            // 要写入消息的内存映射区域
            final ByteBuffer byteBuffer,
            // 最大空白区域
            final int maxBlank,
            // 消息内容
            final MessageExtBrokerInner msgInner,
            // 写入消息上下文
            PutMessageContext putMessageContext) {
        // STORETIMESTAMP + STOREHOSTADDRESS + OFFSET <br>

        // PHY OFFSET
        // 从文件的哪个偏移量开始,加上内存映射区域的起始位置,定位到磁盘文件物理偏移量位置,从这个位置开始写入就可以了
        // 物理磁盘文件->映射一块内存区域,这块内存区域可能是从物理磁盘文件的某个偏移量开始来进行映射的
        // 当前mappedfile是从哪个偏移量开始写入的,此时再加上内存映射区域的可以写入位置
        // byteBuffer.position(),一个mappedfile内存映射区域里面写入位置,是属于针对单个mappedfile来的
        // 把内存映射区域写入位置 + 当前这个文件物理全局上的开始偏移量,此时就是一个真正的物理上的偏移量
        long wroteOffset = fileFromOffset + byteBuffer.position();

        // 处理好一个消息id
        // 消息Id是由存储机器socket地址+port端口号+物理偏移量
        Supplier<String> msgIdSupplier = () -> {
            // 首先是根据sysFlag是否是存储机器地址v6标识,如果不是的话msgId就是4+4+8,如果是的话msgId就是16+4+8
            // 根据我们判断出来的msgId大小分配一块内存空间
            int sysflag = msgInner.getSysFlag();
            int msgIdLen = (sysflag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 + 8 : 16 + 4 + 8;
            ByteBuffer msgIdBuffer = ByteBuffer.allocate(msgIdLen);

            // 他会把存储机器地址写入到msgIdBuffer里去
            MessageExt.socketAddress2ByteBuffer(msgInner.getStoreHost(), msgIdBuffer);

            msgIdBuffer.clear();//because socketAddress2ByteBuffer flip the buffer
            // 从msgIdLen倒推回去8个位置,此时写入一个long类型的wroteOffset
            msgIdBuffer.putLong(msgIdLen - 8, wroteOffset);

            return UtilAll.bytes2string(msgIdBuffer.array());
        };

        // Record ConsumeQueue information
        // 记录一下topic队列偏移量映射表里面的一个记录信息,这个消息是属于topic->queueId
        String key = putMessageContext.getTopicQueueTableKey();
        // 把这个topic-queueId里面的偏移量获取出来
        Long queueOffset = CommitLog.this.topicQueueTable.get(key);
        // 如果说这个消息对应的topic-queueId的偏移量是null,此时就可以做一个重置
        if (null == queueOffset) {
            queueOffset = 0L;
            CommitLog.this.topicQueueTable.put(key, queueOffset);
        }

        // 对这个消息做一个多路分发wrap
        boolean multiDispatchWrapResult = CommitLog.this.multiDispatch.wrapMultiDispatch(msgInner);
        if (!multiDispatchWrapResult) {
            return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
        }

        // Transaction messages that require special handling
        final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag());
        switch (tranType) {
            // Prepared and Rollback message is not consumed, will not enter the
            // consumer queuec
            case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
            case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
                queueOffset = 0L;
                break;
            case MessageSysFlag.TRANSACTION_NOT_TYPE:
            case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
            default:
                break;
        }

        // 消息内容,preEncodeBuffer就已经是消息内容了
        ByteBuffer preEncodeBuffer = msgInner.getEncodedBuff();
        final int msgLen = preEncodeBuffer.getInt(0); // 消息开始4个字节,就是消息长度

        // Determines whether there is sufficient free space
        if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {
            this.msgStoreItemMemory.clear();
            // 1 TOTALSIZE
            this.msgStoreItemMemory.putInt(maxBlank);
            // 2 MAGICCODE
            this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE);
            // 3 The remaining space may be any value
            // Here the length of the specially set maxBlank
            final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
            byteBuffer.put(this.msgStoreItemMemory.array(), 0, 8);

            return new AppendMessageResult(
                    AppendMessageStatus.END_OF_FILE,
                    wroteOffset,
                    maxBlank, /* only wrote 8 bytes, but declare wrote maxBlank for compute write position */
                    msgIdSupplier, msgInner.getStoreTimestamp(),
                    queueOffset,
                    CommitLog.this.defaultMessageStore.now() - beginTimeMills
            );
        }

        // 在这里一大段的东西都是在对已经编码后的消息进行一些信息补充
        int pos = 4 + 4 + 4 + 4 + 4;
        // 6 QUEUEOFFSET,写入消息在topic->queueId里面的偏移量
        preEncodeBuffer.putLong(pos, queueOffset);
        pos += 8;
        // 7 PHYSICALOFFSET,写入消息在整个CommitLog里面的物理偏移量
        preEncodeBuffer.putLong(pos, fileFromOffset + byteBuffer.position());
        int ipLen = (msgInner.getSysFlag() & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 4 + 4 : 16 + 4;
        // 8 SYSFLAG, 9 BORNTIMESTAMP, 10 BORNHOST, 11 STORETIMESTAMP
        pos += 8 + 4 + 8 + ipLen;
        // refresh store time stamp in lock,写入存储时间戳
        preEncodeBuffer.putLong(pos, msgInner.getStoreTimestamp());

        final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
        // Write messages to the queue buffer
        byteBuffer.put(preEncodeBuffer); // 把编码后的完整的消息字节数据写入到mappedfile内存映射区域里去
        msgInner.setEncodedBuff(null);

        AppendMessageResult result = new AppendMessageResult(
                AppendMessageStatus.PUT_OK,
                wroteOffset,
                msgLen,
                msgIdSupplier,
                msgInner.getStoreTimestamp(),
                queueOffset,
                CommitLog.this.defaultMessageStore.now() - beginTimeMills
        );

        switch (tranType) {
            case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
            case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
                break;
            case MessageSysFlag.TRANSACTION_NOT_TYPE:
            case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
                // The next update ConsumeQueue information
                CommitLog.this.topicQueueTable.put(key, ++queueOffset);
                CommitLog.this.multiDispatch.updateMultiQueueOffset(msgInner);
                break;
            default:
                break;
        }

        return result;
    }
}
public static class MessageExtEncoder {

    // Store the message content
    // 编码组件对应的一个内存缓冲区
    private final ByteBuffer encoderBuffer;
    // The maximum length of the message
    private final int maxMessageSize;
}

8、刷盘

public int flush(final int flushLeastPages) {
    if (this.isAbleToFlush(flushLeastPages)) {
        if (this.hold()) { // 在flush的时候其实会对mappedfile引用资源去做一个挂起
            int value = getReadPosition();

            try {
                //We only append data to fileChannel or mappedByteBuffer, never both.
                if (writeBuffer != null || this.fileChannel.position() != 0) {
                    this.fileChannel.force(false);
                } else {
                    this.mappedByteBuffer.force(); // 触发force,其实就是在进行flush
                }
            } catch (Throwable e) {
                log.error("Error occurred when force data to disk.", e);
            }

            this.flushedPosition.set(value);
            this.release();
        } else {
            log.warn("in flush, hold failed, flush offset = " + this.flushedPosition.get());
            this.flushedPosition.set(getReadPosition());
        }
    }
    return this.getFlushedPosition();
}
// 当前到底能不能做flush,以及至少要flush多少个os page
private boolean isAbleToFlush(final int flushLeastPages) {
    int flush = this.flushedPosition.get();
    int write = getReadPosition();

    if (this.isFull()) {
        return true;
    }

    if (flushLeastPages > 0) {
        return ((write / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE)) >= flushLeastPages;
    }

    return write > flush;
}

父类ReferenceResource#hold方法

// 如果有人要使用这个资源,此时就可以调用这个资源的hold函数
public synchronized boolean hold() {
    // 如果说这个资源是可用的
    if (this.isAvailable()) {
        // 此时如果说引用计数是大于0的,默认就是大于0的,此时就返回一个true就可以了
        // 所以会对引用计数先获取的时候是大于0就可以了,然后获取完了以后再进行累加和递增这样子
        if (this.refCount.getAndIncrement() > 0) {
            return true;
        }
        // 如果说当前引用计数已经是<=0了以后,此时可以对引用计数进行递减
        else {
            this.refCount.getAndDecrement();
        }
    }

    return false;
}
public int getReadPosition() {
    return this.writeBuffer == null ? this.wrotePosition.get() : this.committedPosition.get();
}

父类ReferenceResource#release方法

// 释放资源
public void release() {
    // 对引用计数做一个递减
    long value = this.refCount.decrementAndGet();
    // 如果说引用计数递减完了以后还是大于0,此时还是有人在引用我们的这个资源,此时就直接返回
    // 就是不能立即来释放这个资源
    if (value > 0)
        return;

    // 除非是此时没有人引用这个资源,此时refCount<=0,此时就可以对当前资源做一个清理
    synchronized (this) {
        // 对资源释放具体如何释放是走你的子类,就可以去走mappedfile
        this.cleanupOver = this.cleanup(value);
    }
}
// 对mappedfile进行销毁的时候,如果引用资源计数是0,此时就可以进行清理了
@Override
public boolean cleanup(final long currentRef) {
    if (this.isAvailable()) {
        log.error("this file[REF:" + currentRef + "] " + this.fileName
                + " have not shutdown, stop unmapping.");
        return false;
    }

    if (this.isCleanupOver()) {
        log.error("this file[REF:" + currentRef + "] " + this.fileName
                + " have cleanup, do not do it again.");
        return true;
    }

    // 此时需要对mappedfile内存映射区域进行清理,通过层层反射,最终是基于cleaner组件进行清理
    clean(this.mappedByteBuffer);
    // 总计的mappedfile虚拟内存量扣减掉这个mappedfile数据量
    TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(this.fileSize * (-1));
    // 总计的mappedfiles数量去做一个扣减
    TOTAL_MAPPED_FILES.decrementAndGet();

    log.info("unmap file[REF:" + currentRef + "] " + this.fileName + " OK");

    return true;
}
public int getFlushedPosition() {
    return flushedPosition.get();
}

9、commit提交暂存池数据

// 提交暂存池数据到fileChannel
public int commit(final int commitLeastPages) {
    // 没有使用暂存池
    if (writeBuffer == null) {
        //no need to commit data to file channel, so just regard wrotePosition as committedPosition.
        return this.wrotePosition.get();
    }
    // 达到commit条件
    if (this.isAbleToCommit(commitLeastPages)) {
        if (this.hold()) {
            commit0();
            this.release();
        } else {
            log.warn("in commit, hold failed, commit offset = " + this.committedPosition.get());
        }
    }

    // All dirty data has been committed to FileChannel.
    if (writeBuffer != null && this.transientStorePool != null && this.fileSize == this.committedPosition.get()) {
        this.transientStorePool.returnBuffer(writeBuffer);
        this.writeBuffer = null;
    }

    return this.committedPosition.get();
}

protected void commit0() {
    int writePos = this.wrotePosition.get();
    int lastCommittedPosition = this.committedPosition.get();

    if (writePos - lastCommittedPosition > 0) {
        try {
            ByteBuffer byteBuffer = writeBuffer.slice();
            byteBuffer.position(lastCommittedPosition);
            byteBuffer.limit(writePos);
            this.fileChannel.position(lastCommittedPosition);
            this.fileChannel.write(byteBuffer);
            this.committedPosition.set(writePos);
        } catch (Throwable e) {
            log.error("Error occurred when commit data to FileChannel.", e);
        }
    }
}

10、从指定的位置开始读取一段内存数据片段

// 从指定的位置开始读取一段内存数据片段
public SelectMappedBufferResult selectMappedBuffer(int pos) {
    int readPosition = getReadPosition();

    // 如果说你要查找的文件内部的相对一个位置,是小于了最大写入位置,同时还大于等于0
    if (pos < readPosition && pos >= 0) {
        // 如果说有人在读取你的这个mappedfile数据的话,此时会调用你的引用资源的挂起函数
        if (this.hold()) {
            //  对内存映射区域调用slice函数,得到了当前可以读取一段内存数据
            ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
            // 在这里设置我们要读取的位置
            byteBuffer.position(pos);
            // 用我们最大可读取位置减去开始要读取的位置,拿到一个要读取数据大小
            int size = readPosition - pos;
            // 再次对我们内存区域调用slice,是属于从起始位置开始进行截取,然后最大截取多少数据,拿到一个内存片段
            ByteBuffer byteBufferNew = byteBuffer.slice();
            byteBufferNew.limit(size);

            // 最终会返回你从内存片段里读取到的一段数据
            return new SelectMappedBufferResult(
                    this.fileFromOffset + pos, // mappedfile起始物理偏移量 + 文件内读取相对位置,就是一个绝对物理位置
                    byteBufferNew, // 读取出来的内存片段
                    size, // 这段数据大小
                    this
            );
        }
    }

    return null;
}

11、销毁一个mappedfile

// 销毁一个mappedfile
public boolean destroy(final long intervalForcibly) {
    // 对当前的这个mappedfile调用关闭函数
    // 此处调用会触发引用资源的关闭和释放,以及时触发mappedfile自己的清理函数,完成内存区域的清理和释放
    this.shutdown(intervalForcibly);

    // 如果说已经清理释放完毕了
    if (this.isCleanupOver()) {
        try {
            // 此时就对这个mappedfile绑定的一个nio文件通道做一个关闭函数的调用
            this.fileChannel.close();
            log.info("close file channel " + this.fileName + " OK");

            long beginTime = System.currentTimeMillis();
            // 同时再次对这个mappedfile进行一个文件上的删除
            boolean result = this.file.delete();
            log.info("delete file[REF:" + this.getRefCount() + "] " + this.fileName
                + (result ? " OK, " : " Failed, ") + "W:" + this.getWrotePosition() + " M:"
                + this.getFlushedPosition() + ", "
                + UtilAll.computeElapsedTimeMilliseconds(beginTime));
        } catch (Exception e) {
            log.warn("close file channel " + this.fileName + " Failed. ", e);
        }

        return true;
    } else {
        log.warn("destroy mapped file[REF:" + this.getRefCount() + "] " + this.fileName
            + " Failed. cleanupOver: " + this.cleanupOver);
    }

    return false;
}

父类ReferenceResource#shutdown方法

// 关闭组件
public void shutdown(final long intervalForcibly) {
    // 如果说当前资源是可用的,默认就是可用的
    if (this.available) {
        // 把当前资源可用标记设置为false,当前资源已经不可用了
        this.available = false;
        //  当前资源首次关闭时间戳设置一下
        this.firstShutdownTimestamp = System.currentTimeMillis();
        // 调用当前资源的释放函数
        this.release();
    }
    // 上一次来关闭没能关闭的了,因为之前可能还有人在引用这个资源,但是这一次过来再次关闭和释放
    else if (this.getRefCount() > 0) {
        // 第一次关闭时间戳到现在为止经过的时间,如果已经超过了强制关闭间隔了,此时就触发强制关闭
        if ((System.currentTimeMillis() - this.firstShutdownTimestamp) >= intervalForcibly) {
            // refCount直接用一个-1000强制把refCount标记为一个负数
            this.refCount.set(-1000 - this.getRefCount());
            // 强制标记为了负数了之后,此时再去进行资源释放,100%可以是进行释放了
            this.release();
        }
    }
}
// 释放资源
public void release() {
    // 对引用计数做一个递减
    long value = this.refCount.decrementAndGet();
    // 如果说引用计数递减完了以后还是大于0,此时还是有人在引用我们的这个资源,此时就直接返回
    // 就是不能立即来释放这个资源
    if (value > 0)
        return;

    // 除非是此时没有人引用这个资源,此时refCount<=0,此时就可以对当前资源做一个清理
    synchronized (this) {
        // 对资源释放具体如何释放是走你的子类,就可以去走mappedfile
        this.cleanupOver = this.cleanup(value);
    }
}

12、磁盘预热

加载分配空间的每个内存页进行写入,使分配的ByteBuffer加载到内存中,并和暂存池一样,避免其被操作系统换出;

// 磁盘预热
public void warmMappedFile(FlushDiskType type, int pages) {
    long beginTime = System.currentTimeMillis();
    ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
    int flush = 0;
    long time = System.currentTimeMillis();

    for (int i = 0, j = 0; i < this.fileSize; i += MappedFile.OS_PAGE_SIZE, j++) {
        byteBuffer.put(i, (byte) 0);
        // force flush when flush disk type is sync
        if (type == FlushDiskType.SYNC_FLUSH) {
            if ((i / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE) >= pages) {
                flush = i;
                mappedByteBuffer.force();
            }
        }

        // prevent gc
        if (j % 1000 == 0) {
            log.info("j={}, costTime={}", j, System.currentTimeMillis() - time);
            time = System.currentTimeMillis();
            try {
                Thread.sleep(0);
            } catch (InterruptedException e) {
                log.error("Interrupted", e);
            }
        }
    }

    // force flush when prepare load finished
    if (type == FlushDiskType.SYNC_FLUSH) {
        log.info("mapped file warm-up done, force to disk, mappedFile={}, costTime={}",
            this.getFileName(), System.currentTimeMillis() - beginTime);
        mappedByteBuffer.force();
    }
    log.info("mapped file warm-up done. mappedFile={}, costTime={}", this.getFileName(),
        System.currentTimeMillis() - beginTime);

    this.mlock();
}

mlock就是将进程的虚拟内存锁定到物理内存中,防止交换到swap区域。

具体这样做的原因是:通过mmap建立起的内存文件在刚开始并没有将文件内容映射进来,而是只建立一个映射关系,而读相对应区域的时候,第一次还是会去读磁盘,通过mlock和madvise使内存映射后对应的文件数据尽可能多的预加载至内存中,从而达到内存预热的效果。

public void mlock() {
    final long beginTime = System.currentTimeMillis();
    final long address = ((DirectBuffer) (this.mappedByteBuffer)).address();
    Pointer pointer = new Pointer(address);
    {
        int ret = LibC.INSTANCE.mlock(pointer, new NativeLong(this.fileSize));
        log.info("mlock {} {} {} ret = {} time consuming = {}", address, this.fileName, this.fileSize, ret, System.currentTimeMillis() - beginTime);
    }

    {
        int ret = LibC.INSTANCE.madvise(pointer, new NativeLong(this.fileSize), LibC.MADV_WILLNEED);
        log.info("madvise {} {} {} ret = {} time consuming = {}", address, this.fileName, this.fileSize, ret, System.currentTimeMillis() - beginTime);
    }
}

相关推荐

为何越来越多的编程语言使用JSON(为什么编程)

JSON是JavascriptObjectNotation的缩写,意思是Javascript对象表示法,是一种易于人类阅读和对编程友好的文本数据传递方法,是JavaScript语言规范定义的一个子...

何时在数据库中使用 JSON(数据库用json格式存储)

在本文中,您将了解何时应考虑将JSON数据类型添加到表中以及何时应避免使用它们。每天?分享?最新?软件?开发?,Devops,敏捷?,测试?以及?项目?管理?最新?,最热门?的?文章?,每天?花?...

MySQL 从零开始:05 数据类型(mysql数据类型有哪些,并举例)

前面的讲解中已经接触到了表的创建,表的创建是对字段的声明,比如:上述语句声明了字段的名称、类型、所占空间、默认值和是否可以为空等信息。其中的int、varchar、char和decimal都...

JSON对象花样进阶(json格式对象)

一、引言在现代Web开发中,JSON(JavaScriptObjectNotation)已经成为数据交换的标准格式。无论是从前端向后端发送数据,还是从后端接收数据,JSON都是不可或缺的一部分。...

深入理解 JSON 和 Form-data(json和formdata提交区别)

在讨论现代网络开发与API设计的语境下,理解客户端和服务器间如何有效且可靠地交换数据变得尤为关键。这里,特别值得关注的是两种主流数据格式:...

JSON 语法(json 语法 priority)

JSON语法是JavaScript语法的子集。JSON语法规则JSON语法是JavaScript对象表示法语法的子集。数据在名称/值对中数据由逗号分隔花括号保存对象方括号保存数组JS...

JSON语法详解(json的语法规则)

JSON语法规则JSON语法是JavaScript对象表示法语法的子集。数据在名称/值对中数据由逗号分隔大括号保存对象中括号保存数组注意:json的key是字符串,且必须是双引号,不能是单引号...

MySQL JSON数据类型操作(mysql的json)

概述mysql自5.7.8版本开始,就支持了json结构的数据存储和查询,这表明了mysql也在不断的学习和增加nosql数据库的有点。但mysql毕竟是关系型数据库,在处理json这种非结构化的数据...

JSON的数据模式(json数据格式示例)

像XML模式一样,JSON数据格式也有Schema,这是一个基于JSON格式的规范。JSON模式也以JSON格式编写。它用于验证JSON数据。JSON模式示例以下代码显示了基本的JSON模式。{"...

前端学习——JSON格式详解(后端json格式)

JSON(JavaScriptObjectNotation)是一种轻量级的数据交换格式。易于人阅读和编写。同时也易于机器解析和生成。它基于JavaScriptProgrammingLa...

什么是 JSON:详解 JSON 及其优势(什么叫json)

现在程序员还有谁不知道JSON吗?无论对于前端还是后端,JSON都是一种常见的数据格式。那么JSON到底是什么呢?JSON的定义...

PostgreSQL JSON 类型:处理结构化数据

PostgreSQL提供JSON类型,以存储结构化数据。JSON是一种开放的数据格式,可用于存储各种类型的值。什么是JSON类型?JSON类型表示JSON(JavaScriptO...

JavaScript:JSON、三种包装类(javascript 包)

JOSN:我们希望可以将一个对象在不同的语言中进行传递,以达到通信的目的,最佳方式就是将一个对象转换为字符串的形式JSON(JavaScriptObjectNotation)-JS的对象表示法...

Python数据分析 只要1分钟 教你玩转JSON 全程干货

Json简介:Json,全名JavaScriptObjectNotation,JSON(JavaScriptObjectNotation(记号、标记))是一种轻量级的数据交换格式。它基于J...

比较一下JSON与XML两种数据格式?(json和xml哪个好)

JSON(JavaScriptObjectNotation)和XML(eXtensibleMarkupLanguage)是在日常开发中比较常用的两种数据格式,它们主要的作用就是用来进行数据的传...

取消回复欢迎 发表评论:

请填写验证码