C#分布式消息队列 EQueue 2.0 发布啦

来源:转载

最近花了我几个月的业余时间,对EQueue做了一个重大的改造,消息持久化采用本地写文件的方式。到现在为止,总算完成了,所以第一时间写文章分享给大家这段时间我所积累的一些成果。

EQueue开源地址: https://github.com/tangxuehua/equeue

EQueue相关文档: http://www.cnblogs.com/netfocus/category/598000.html

EQueue Nuget地址: http://www.nuget.org/packages/equeue

昨天,我写过一篇关于EQueue 2.0性能测试结果的文章,有兴趣的可以看看。

文章地址: http://www.cnblogs.com/netfocus/p/4926305.html

为什么要改为文件存储? SQL Server的问题

之前EQueue的消息持久化是采用SQL Server的。一开始我觉得没什么问题,采用的是异步定时批量持久化,使用SqlBulkCopy的方法,这个方法测试下来,批量插入消息的性能还不 错,就决定使用了。一开始我并没有在使用到EQueue后做集成的性能测试。在功能上确实没什么问题了。而且使用DB持久化也有很多好处,比如消息查询很 简单,DB天生支持各种方式的查询。删除消息也非常简单,一条DELETE语句即可。所以功能实现比较顺利。但后来当我对EQueue做性能测试时,发现 一些问题。当数据库服务器和Broker本身部署在不同的服务器上时,持久化消息也会走网卡,消耗带宽,影响消息的发送和消费的TPS。而如果数据库服务 器部署在Broker同一台服务器上,则因为SQLServer本身也会消耗CPU以及内存,也会影响Broker的消息发送和消费的TPS。另外 SqlBulkCopy的速度,再本身机器正在接收大量的发送消息和拉取消息的请求时,会不太稳定。经过一些测试,发现整个EQueue Broker的性能不太理想。然后又想想,Broker服务器有有一个硬件一直没有好好利用起来,那就是硬盘。假设我们的消息是持久化到本地硬盘的,顺序 写文件,就应该能解决SQL Server的问题了。所以,开始调研如何实现文件持久化消息的方案了。

消息缓存在托管内存的GC的问题

之前消息存储在SQL Server,如果消费者每次读取消息时,总是从数据库去读取,那对数据库就是不断的写入和读取,性能不太理想。所以当初的思路是,尽量把最近可能要被消 费的消息缓存在本地内存中。当初的做法是设计了一个很大的ConcurrentDictionary<long, Message>,这个字典就是存放了所有可能会被消费的消息。如果要消费的消息当前不在这个字典里,就批量从DB拉取一批出来消费。这个设计可以 尽可能的避免读取DB的情况。但是带来了另一个问题。就是我们对这个字典在高并发不断的写入和读取。且这个字典里缓存的消息又很多,到到达几百上千万 时,GC的压力过大,导致很多线程都会被阻塞。严重影响Broker的TPS。

所以,基于上面的两个主要原因,我想到了两个思路来解决:1)采用写文件的方式来持久化消息;2)使用非托管内存来缓存将要被消费的消息;下面我们来看看这两个设计的一些关键问题的设计思路。

文件存储的关键问题设计 心路背景

之前一直无法驾驭写文件的设计。因为精细化的将数据写入文件,并能要精确的读取想要的数据,实在没什么经验。之前虽然也知道阿里的RocketMQ 的消息持久化也是采用顺序写文件的方式的,但是看了代码,发现设计很复杂,一下子也比较难懂。尝试看了多次也无法完全理解。所以一直无法掌握这种方式。有 一天不经意间想到之前看过的EventStore这个开源项目中,也有写文件的设计。这个项目是CQRS架构之父greg young所主导的开源项目,是一个专门为ES(Event Sourcing)设计模式中提供保存事件流支持的事件流存储系统。于是下定决心专研其源码,看C#代码肯定还是比Java容易,呵呵。经过一段时间的摸 索之后,基本学到了它是如何写文件以及如何读文件的。了解了很多设计思路。然后,在看懂了EventStore的文件存储设计之后,再去看 RocketMQ的文件持久化的设计,发现惊人的相似。原来看不懂的代码现在也能看懂了,因为思路差不多的。所以,这给我开始动手提供了很大的信心。经过 自己的一些准备(文件读写的性能验证)和设计思路整理后,终于开始动手了。

如何写消息到文件?

其实说出来也很简单。之前一直以为写文件就是一个消息一行呗。这样当我们要找哪个消息时,只需要知道行号即可。确实,理论上这样也挺好。但上面这两 个开源项目都不是这样做的,而是都是采用更精细化的直接写二进制的方式。搞清楚写入的格式之后,还要考虑一个文件写不下的时候怎么办?因为一个文件总是有 大小的,比如1G,那超过1G后,必然要创建新的文件,然后把消息写入新的文件。所以,我们就又有了Chunk的概念。一个Chunk就是一个文件,假设 我们现在实现了一个FileMessageStore,表示对文件持久化的封装,那这个FileMessageStore肯定维护了一堆的Chunk。然 后我们也很容易想到一点,就是Chunk有3种状态:1)New,表示刚创建的Chunk,这种Chunk我们可以写入新消息进 去;2)Completed,已写入完成的Chunk,这种Chunk是只读的;3)OnGoing的Chunk,就是当 FileMessageStore初始化时,要从磁盘的某个chunk的目录下加载所有的Chunk文件,那不难理解,最后一个文件之前的Chunk文件 应该都是Completed的;最后一个Chunk文件可能写入了一半,就是之前没完全用完的。所以,本质上New和Ongoing的Chunk其实是一 样的,只是初始化的方式不同。

至此,我们知道了写文件的两个关键思路:1)按二进制写;2)拆分为Chunk文件,且每个Chunk文件有状态;按二进制写主要的思路是,假如我 们当前要写入的消息的二进制数组大小为100个字节,也就是说消息的长度为100,那我们可以先把消息的长度写入文件,再接着写入消息本身。这样我们读取 消息时,只要知道了写入消息长度时的那个Position,就能先读取到消息的长度,然后就能知道接下来要读取多少字节为消息内容。从而能正确读取消息出 来。

另外再分享一点,EventStore中,写入一个事件到文件中时,还会在写入消息内容后再写入这个消息的长度到文件里。也就是说,写入一个数据到 文件时,会在头尾都写入该数据的长度。这样做的好处是什么呢?就是当我们想从后往前读数据时,也能方便的做到,因为每个数据的前后都记录了该数据的长度。 这点应该不难理解吧?而EventStore是一个面向流的存储系统,我们对事件流确实可能从前往后读,也可能是从后往前读。另外这个设计还有一个好处, 就是起到了校验数据合法性的目的。当我们根据长度读取数据后,再数据之后再读取一个长度,如果这两个长度一致,那数据应该就没问题的。在RocketMQ 中,是通过CRC校验的方式来保证读取的数据没有问题。我个人还是比较喜欢EventStore的做法。所以EQueue里现在写入数据就是这样做的。

上面我介绍了一种写入不定长数据到文件的设计思路,这种设计是为了解决写入消息到文件的情况,因为消息的长度是不定的。在EQueue中,我们还有 一另一种写文件的场景。就是队列信息的持久化。EQueue的架构是一个Topic下有多个Queue,每个Queue里有很多消息,消费者负载均衡是通 过给消费者分配均匀数量的Queue的方式来达到的。这样我们只要确保写入Queue的消息是均匀的,那每个Consumer消费到的消息数就是均匀的。 那一个Queue里记录的是什么呢?就是一个消息和其在队列的位置的对应关系。假设消息写入在文件的物理位置为10000,然后这个消息在Queue里的 索引是100,那这个队列就会把这两个位置对应起来。这样当我们要消费这个Queue中索引为100的消息时,就能找到这个消息在文件中的物理位置为 10000,就能根据这个位置找到消息的内容了。如果是托管内存,我们只需要弄一个Dictionary,key是消息在队列中的 Offset,value是消息在文件中的物理Offset即可。这样我们有了这个dict,就能轻松建立起对应关系了。但上面我说过,这种巨大的 dict是要占用内存的,会有GC的问题。所以更好的办法是,把这个对应关系也写入文件,那怎么做呢?这时就又需要更精细化的设计了。想到了其实也很简 单,这个设计我是从RocketMQ中学到的。就是我们设计一种固定长度的结构体,这个结构体里就存放一个数据,就是消息在文件的物理位置(为了后面好表 达,我命名为MessagePosition),一个Long值,一个Long的长度是8个字节。也就是说,这个文件中,每个写入的数据的长度都是8个字 节。假设我们一个文件要保存100W个MessagePosition。那这个文件的长度就是100W * 8这么多字节,大概为7.8MB。那么这样做有什么好处呢?好处就是,假如我们现在要消费这个Queue里的第一个消息,那这个消息的 MessagePosition在这个文件中的位置0,第二个消息在这个文件中的位置是8,第三个就是16,以此类推,第N 个消息就是(N-1) * 8。也就是说,我们无须显式的把消息在队列中的位置信息也写入到文件,而是通过这样的固定算法,就能精确的算出Queue中某个消息的 MessagePosition是写入在文件的哪个位置。然后拿到了MessagePosition之后,就能从Message的Chunk文件中读取到 这个消息了。

通过上面的分析,我们知道了,Producer发送一个消息到Broker时,Broker会写两次磁盘。一次是现将消息本身写入磁盘 (Message Chunk里),另一次是将消息的写入位置写入到磁盘(Queue Chunk里)。细心的朋友可能会问,假如我第一次写入成功,但第二次写入时失败,比如正好机器断电或者当前Broker服务器正好出啥问题 了,没有写入成功。那怎么办呢?这个没有什么大的影响。因为首先这种情况会被认为是消息发送失败。所以Producer还会重新发送该消息,然后 Broker收到消息后还会再做一次这两个写入操作。也就是说,第一次写入的消息内容永远也不会用到了,因为那个写入位置永远也不会在Queue Chunk里有记录。

下面的代码展示了写消息到文件的核心代码: //消息写文件需要加锁,确保顺序写文件 MessageStoreResult result = null; lock (_syncObj) { var queueOffset = queue.NextOffset; var messageRecord = _messageStore.StoreMessage(queueId, queueOffset, message); queue.AddMessage(messageRecord.LogPosition, message.Tag); queue.IncrementNextOffset(); result = new MessageStoreResult(messageRecord.MessageId, message.Code, message.Topic, queueId, queueOffset, message.Tag); } StoreMessage方法内部实现: public MessageLogRecord StoreMessage(int queueId, long queueOffset, Message message) { var record = new MessageLogRecord( message.Topic, message.Code, message.Body, queueId, queueOffset, message.CreatedTime, DateTime.Now, message.Tag); _chunkWriter.Write(record); return record; } queue.AddMessage方法的内部实现: public void AddMessage(long messagePosition, string messageTag) { _chunkWriter.Write(new QueueLogRecord(messagePosition + 1, messageTag.GetHashcode2())); } ChunkWriter的内部实现: public long Write(ILogRecordrecord) { lock(_lockObj) { if (_isClosed) { throw new ChunkWriteException(_currentChunk.ToString(), "Chunkwriterisclosed." ); } //如果当前文件已经写完,则需要新建文件 if (_currentChunk.IsCompleted) { _currentChunk=_chunkManager.AddNewChunk(); } //先尝试写文件 varresult=_currentChunk.TryAppend(record); //如果当前文件已满 if (!result.Success) { //结束当前文件 _currentChunk.Complete(); //新建新的文件 _currentChunk=_chunkManager.AddNewChunk(); //再尝试写入新的文件 result=_currentChunk.TryAppend(record); //如果还是不成功,则报错 if (!result.Success) { throw new ChunkWriteException(_currentChunk.ToString(), "Writerecordtochunkfailed." ); } } //如果需要同步刷盘,则立即同步刷盘 if (_chunkManager.Config.SyncFlush) { _currentChunk.Flush(); } //返回数据写入位置 return result.Position; } }

当然,我上面为了简化问题的复杂度。所以没有引入关于如何根据某个全局的MessagePosition找到其在哪个Message Chunk的问题。这个其实也很好做,我们首先固定好每个Message Chunk文件的大小。比如大小为256MB,然后我们为每个Chunk文件设计一个ChunkHeader,每个Chunk文件总是先把这个 ChunkHeader写入文件,这个Header里记录了这个文件的起始位置和结束位置,以及文件的大小。这样我们根据某个 MessagePosition计算其在哪个Chunk文件时,只需要把这个MessagePositon对Chunk的大小做取摸操作即可。根据数据的 位置找其在哪个Chunk的代码看起来如下面这样这样:

public ChunkGetChunkFor( long dataPosition) { varchunkNum=( int )(dataPosition/_config.GetChunkDataSize()); return GetChunk(chunkNum); } public ChunkGetChunk( int chunkNum) { if (_chunks.ContainsKey(chunkNum)) { return _chunks[chunkNum]; } return null ; }

代码很简单,就不多讲了。拿到了Chunk对象后,我们就可以把dataPosition传给Chunk,然后Chunk内部把这个全局的 dataPosition转换为本地的一个位置,就能准确的定位到这个数据在当前Chunk文件的实际位置了。将全局位置转换为本地的位置的算法也很简单 直接:

public int GetLocalDataPosition(long globalDataPosition) { if (globalDataPosition < ChunkDataStartPosition || globalDataPosition > ChunkDataEndPosition) { throw new Exception(string.Format("globalDataPosition {0} is out of chunk data positions [{1}, {2}].", globalDataPosition, ChunkDataStartPosition, ChunkDataEndPosition)); } return (int)(globalDataPosition - ChunkDataStartPosition); }

只需要把这个全局的位置减去当前Chunk的数据开始位置,就能知道这个全局位置相对于当前Chunk的本地位置了。

好了,上面介绍了消息如何写入的主要思路以及如何读取数据的思路。

另外一点还想提一下,就是关于刷盘的策略。一般我们写数据到文件后,是需要调用文件流的Flush方法的,确保数据最终刷入到了磁盘上。否则数据就 还是在缓冲区里。当然,我们需要注意到,即便调用了Flush方法,数据可能也还没真正逻辑到磁盘,而只是在操作系统内部的缓冲区里。这个我们就无法控制 了,我们能做到的是调用了Flush方法即可。那当我们每次写入一个数据到文件都要调用Flush方法的话,无疑性能是低下的,所以就有了所谓的异步刷盘 的设计。就是我们写入消息后不立即调用Flush方法,而是采用一个独立的线程,定时调用Flush方法来实现刷盘。目前EQueue支持同步刷盘和异步 刷盘,开发者可以自己配置决定采用哪一种。异步刷盘的间隔默认是100ms。当我们在追求高吞吐量时,应该考虑异步刷盘,但要求数据可靠性更高但对吞吐量 可以低一点时,则可以使用同步刷盘。如果又要高吞吐又要数据高可靠,那就只有一个办法了,呵呵。就是多增加一些Broker机器,通过集群来弥补单台 Broker写入数据的瓶颈。



分享给朋友:
您可能感兴趣的文章:
随机阅读: