当前位置: 动力学知识库 > 问答 > 编程问答 >

c# - How to implement good messaging queue

问题描述:

I need help on finding the best and possibly quickest data structure for the message queue.

The architecture is the following: the client gets a message from external C++ server (can not chage it's code). The message in the form msg.length + data.

In the old code there is usage of TcpClient class for connection and MemoryStream as a queue. Client reads the data from this TCPClient and copies it to the stream. If during this read client read the whole message it is being processed in another thread and everything is perfect. In case there partial message or 2 messages together the code gets very messy.

I have a strong feeling that there is possible easy way to write the code. What very bothers me is the "play" with a pointer in MemoryStream and the need to somehow delete old data from it.

网友答案:

You can use the Queue class; it is like a FIFO, first in first out. You need two threads (at least) one to read the messages from the socket and Enqueue to the FIFO and another thread to Dequeue the messages and process them. Also you need to use a Mutex in order to prevent simultaneous access to the queue. Here is the code:

    class MessagePacket
    {
        private byte[] data;
        private int length;

        public MessagePacket(int len, byte[] aData)
        {
            this.length = len;
            data = new byte[len];
            Array.Copy(aData, data, len);
        }
        public int Length()
        {
            return this.length;
        }
        public byte[] Data()
        {
            return this.data;
        }
    }

    static Queue<MessagePacket> MsgQueue = new Queue<MessagePacket>();
    static Mutex mutQueue = new Mutex();

    /// <summary> 
    ///     This thread read the message from the sever and put them in the queue.
    /// </summary>
    static void readSocket()
    {
        byte[] dataSize = new byte[4];
        while (true/*or ApplicationIsActive*/)
        {
            try
            {
                // it's assumed that data is a 32bit integer in network byte order
                if (ClientSocket.Receive(dataSize, 4, SocketFlags.None) != 4)
                {
                    return;
                }
                int size = BitConverter.ToInt32(dataSize, 0);
                size = IPAddress.NetworkToHostOrder(size);

                byte[] buffer = new byte[size];
                int offset = 0;
                while (size > 0)
                {
                    int ret = ClientSocket.Receive(buffer, offset, size, SocketFlags.None);
                    if (ret <= 0)
                    {
                        // Socket has been closed or there is an error, quit
                        return;
                    }
                    size -= ret;
                    offset += ret;
                }
                mutQueue.WaitOne();
                try { MsgQueue.Enqueue(new MessagePacket(size, buffer)); }
                finally { mutQueue.ReleaseMutex(); }
            }
            catch  
            {
                return;
            }
        }
    }

    /// <summary> 
    ///     This thread processes the messages in the queue.
    /// </summary>
    static void processMessages()
    {
        while (true/*or ApplicationIsActive*/)
        {
            if (MsgQueue.Count > 0)
            {
                MessagePacket msg;
                mutQueue.WaitOne();
                try { msg = MsgQueue.Dequeue(); }
                finally { mutQueue.ReleaseMutex(); }
                // Process the message: msg
            }
            else Thread.Sleep(50);
        }
    }
分享给朋友:
您可能感兴趣的文章:
随机阅读: