消息读写与存储结构
- commitLog
- ReputMessageService.doReput
- consumeQueue
- 总结
在rocketMQ中, 消息存储在硬盘中, commitLog与consumeQueue在结构概念上是一个队列, 但是在具体实现上是用多文件存储结构
commitLog的文件默认大小为1G, consumeQueue的文件默认大小为600w字节, 即可以存放30w个条目
public int getMappedFileSizeConsumeQueue() {
// mappedFileSizeConsumeQueue = 300000 * ConsumeQueue.CQ_STORE_UNIT_SIZE
// ConsumeQueue.CQ_STORE_UNIT_SIZE = 20
int factor = (int) Math.ceil(this.mappedFileSizeConsumeQueue / (ConsumeQueue.CQ_STORE_UNIT_SIZE * 1.0));
return (int) (factor * ConsumeQueue.CQ_STORE_UNIT_SIZE);
}
commitLog
首先看看commitLog的文件存储结构
在CommitLog类中有一个load方法, 在该方法中加载了mappedFileQueue, 即rocketMQ的逻辑队列
public boolean load() {
boolean result = this.mappedFileQueue.load();
log.info("load commit log " + (result ? "OK" : "Failed"));
return result;
}
在mappedFileQueue的load方法中:
- 读取根目录文件夹
- 遍历文件排序
- 用MappedFile类读取文件夹下的文件并且创建fileChannel
private String storePathCommitLog = System.getProperty("user.home") + File.separator + "store"
+ File.separator + "commitlog";
// CommitLog file size,default is 1G
private int mappedFileSizeCommitLog = 1024 * 1024 * 1024;
mappedFile对象就是commitLog对硬盘的操作对象
在消费流程中, 消费中订阅topic从topic下的消费队列中获取offset, 然后根据offset从commitLog中获取消息
public SelectMappedBufferResult getData(final long offset, final boolean returnFirstOnNotFound) {
// 这里的大小是默认配置的1g
int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog();
// 通过offset通过mappedFileQueue找到具体的mappedFile
// 这里的逻辑是 offset / size 找到该消息存在于第几个文件中
MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset, returnFirstOnNotFound);
if (mappedFile != null) {
// 计算读取的位置
int pos = (int) (offset % mappedFileSize);
// 这里传入定位好的位置
// 内部处理逻辑是设置的buffer大小就是pos后的所有数据
SelectMappedBufferResult result = mappedFile.selectMappedBuffer(pos);
return result;
}
return null;
}
在获取了buffer之后, 可以通过位数读取来获取消息的长度, 然后得到消息主体内容
我们先来看看一条消息的组成结构
// 1 TOTALSIZE
this.msgStoreItemMemory.putInt(msgLen);
// 2 MAGICCODE
this.msgStoreItemMemory.putInt(CommitLog.MESSAGE_MAGIC_CODE);
// 3 BODYCRC 校检参数
this.msgStoreItemMemory.putInt(msgInner.getBodyCRC());
// 4 QUEUEID
this.msgStoreItemMemory.putInt(msgInner.getQueueId());
// 5 FLAG
this.msgStoreItemMemory.putInt(msgInner.getFlag());
// 6 QUEUEOFFSET
this.msgStoreItemMemory.putLong(queueOffset);
// 7 PHYSICALOFFSET
this.msgStoreItemMemory.putLong(fileFromOffset + byteBuffer.position());
// 8 SYSFLAG
this.msgStoreItemMemory.putInt(msgInner.getSysFlag());
// 9 BORNTIMESTAMP
this.msgStoreItemMemory.putLong(msgInner.getBornTimestamp());
// 10 BORNHOST
this.resetByteBuffer(bornHostHolder, bornHostLength);
this.msgStoreItemMemory.put(msgInner.getBornHostBytes(bornHostHolder));
// 11 STORETIMESTAMP
this.msgStoreItemMemory.putLong(msgInner.getStoreTimestamp());
// 12 STOREHOSTADDRESS
this.resetByteBuffer(storeHostHolder, storeHostLength);
this.msgStoreItemMemory.put(msgInner.getStoreHostBytes(storeHostHolder));
// 13 RECONSUMETIMES
this.msgStoreItemMemory.putInt(msgInner.getReconsumeTimes());
// 14 Prepared Transaction Offset
this.msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset());
// 15 BODY
this.msgStoreItemMemory.putInt(bodyLength);
if (bodyLength > 0)
this.msgStoreItemMemory.put(msgInner.getBody());
// 16 TOPIC
this.msgStoreItemMemory.put((byte) topicLength);
this.msgStoreItemMemory.put(topicData);
// 17 PROPERTIES
this.msgStoreItemMemory.putShort((short) propertiesLength);
在putMessage的时候就会将以上17个部分处理并存储到commitLog中, 所以我们获取了初始的offset之后
第一个int数据就是整条消息的长度
由此可见getData方法仅仅是根据offset返回一个从offset开始到文件结束或者最后一次写入位置的buffer
最后获取消息主体的逻辑是调用者处理的
ReputMessageService.doReput
当无法拉取到消息后,可以等下一次消息拉取,同时服务端也支持长轮询模式,如果一个消息拉取请求未拉取到消息,Broker允许等待30s的时间,只要这段时间内有新消息到达,将直接返回给消费端。这里,RocketMQ的具体做法是,使用Broker端的后台服务线程—ReputMessageService不停地分发请求并异步构建ConsumeQueue(逻辑消费队列)和IndexFile(索引文件)数据。
在github上的官方文档中, 我们可以得知长轮询的实现中通知便是在构建consumeQueue的时候发出的, 而构建消费队列则是使用定时任务ReputMessageService来完成的
其中doReput方法是最核心的方法, 在这里mq做了几件事:
- reputFromOffset在启动messageStore的时候就设置好了, 根据硬盘中consumeQueue的最大offset来恢复, 如果offset不存在则设置为commitLog的最小offset
- 只要reputFromOffset小于commitLog中的最大offset就一直进行consumeQueue的构建工作
- 以上的判断与设置完成后, 调用commitLog的getData方法, 这个方法在上文中介绍过
- 读取3返回的buffer信息, 由于buffer中存在不止一条数据, 这里根据buffer的size循环读取消息, 将消息的offset存入consumeQueue
- 存入完毕后, 如果master开启了长轮询则通知holdService开始消费消息
- reputFromOffset + 当前处理完毕的一条消息长度获得新的reputFromOffset
异步构建consumeQueue甚至可以用简单来形容, 仅仅是需要通过约定的消息结构获取offset, size和 tag 构建成一个20字节的条目, 然后刷入硬盘
consumeQueue
consumeQueue的文件存储结构与commitLog是相同的, 不过consumeQueue是根据topic来分类的, 一个topic下面可以存在多条consumeQueue, 用queueId作为文件名区分, 与commitLog相同的是, 在queueId文件夹下也是存在多个文件来构成逻辑上的consumeQueue
consumeQueue通过queueId与topic向broker请求offset
public long queryOffset(final String group, final String topic, final int queueId) {
// topic@group
String key = topic + TOPIC_GROUP_SEPARATOR + group;
// 通过拼接的key获取map
ConcurrentMap<Integer, Long> map = this.offsetTable.get(key);
if (null != map) {
// map中存放的是最新的offset
Long offset = map.get(queueId);
if (offset != null)
return offset;
}
return -1;
}
如果在offsetTable中不存在该key, 则会返回-1
ConsumerManageProcessor中获取到-1后, 会断定该consumeQueue是第一次被消费, 所以获取队列的最小偏移量, 读取队列中第一条消息
这里讲讲为什么offsetTable中取不到key就能证明该队列是第一次被消费
if (storeOffsetEnable) {
// 消费完毕后 更新消费offset
this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(channel),
requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset());
}
在消费逻辑中的最后, 如果消费完毕会更新消费的offset, 调用了commitOffset方法
private void commitOffset(final String clientHost, final String key, final int queueId, final long offset) {
// 拿到offset所属的map
ConcurrentMap<Integer, Long> map = this.offsetTable.get(key);
if (null == map) {
map = new ConcurrentHashMap<Integer, Long>(32);
map.put(queueId, offset);
this.offsetTable.put(key, map);
} else {
Long storeOffset = map.put(queueId, offset);
if (storeOffset != null && offset < storeOffset) {
log.warn("[NOTIFYME]update consumer offset less than store. clientHost={}, key={}, queueId={}, requestOffset={}, storeOffset={}", clientHost, key, queueId, offset, storeOffset);
}
}
}
在该方法中, 我们可以看见如果offsetTable中该key不存在, 则会创建该key, 这里传入的offset是下一条消息的初始偏移量
至此, consumeQueue的使命就完成了, consumeQueue提供offset给消费者, 消费者将offset提供给broker, 从commitLog中根据offset取出消息并返回给消费者消费
总结
rocketMQ在commitLog中尽量保证了顺序写, 并且commitLog构建出了consumeQueue, 消费者在consumeQueue中是顺序读的, 所以读取速度接近于内存的读取速度.
而读取commitLog的时候确实是随机读的, 不过由于consumeQueue中offset其实是有序的, 所以对于commitLog的随机读而言, 会对性能产生影响
页缓存(PageCache)是OS对文件的缓存,用于加速对文件的读写。一般来说,程序对文件进行顺序读写的速度几乎接近于内存的读写速度,主要原因就是由于OS使用PageCache机制对读写访问操作进行了性能优化,将一部分的内存用作PageCache。对于数据的写入,OS会先写入至Cache内,随后通过异步的方式由pdflush内核线程将Cache内的数据刷盘至物理磁盘上。对于数据的读取,如果一次读取文件时出现未命中PageCache的情况,OS从物理磁盘上访问读取文件的同时,会顺序对其他相邻块的数据文件进行预读取。
在RocketMQ中,ConsumeQueue逻辑消费队列存储的数据较少,并且是顺序读取,在page cache机制的预读取作用下,Consume Queue文件的读性能几乎接近读内存,即使在有消息堆积情况下也不会影响性能。而对于CommitLog消息存储的日志数据文件来说,读取消息内容时候会产生较多的随机访问读取,严重影响性能。如果选择合适的系统IO调度算法,比如设置调度算法为“Deadline”(此时块存储采用SSD的话),随机读的性能也会有所提升。
另外,RocketMQ主要通过MappedByteBuffer对文件进行读写操作。其中,利用了NIO中的FileChannel模型将磁盘上的物理文件直接映射到用户态的内存地址中(这种Mmap的方式减少了传统IO将磁盘文件数据在操作系统内核地址空间的缓冲区和用户应用程序地址空间的缓冲区之间来回进行拷贝的性能开销),将对文件的操作转化为直接对内存地址进行操作,从而极大地提高了文件的读写效率(正因为需要使用内存映射机制,故RocketMQ的文件存储都使用定长结构来存储,方便一次将整个文件映射至内存)。
总结一下文件结构
-commitlog
-00000000000000000000 (文件名20位)
-00000000001073741824
-consumequeue
-topic1
-queue1
-00000000000000000000
-00000000000005997854
-queue2
-topic2
共有条评论 网友评论