Location>code7788 >text

C# WebSocket high concurrency communication blocking problem

Popularity:573 ℃/2024-09-04 18:09:33

The project encountered the use of WebSocket timeout problem, the specific situation is this, OTA upgrade process, decompression of zip files will have decompression progress event, the decompression progress through the process of communication to another process, communication prompted a timeout exception

Small friends of the Church Park found that the use of Zip decompression of large files, decompression progress event interval is actually 1ms, simply super large frequency ah!

However, the decompression event overclocking should not be a communication exception, so I tested the inter-process communication flow by sending communication events at 1ms timings.

WebSocketSharp

The current interprocess communication component is based on thekaistseo/UnitySocketIO-WebSocketSharpTo realize this, a server is set up in the host, multiple clients connect to the server, and client communication is forwarded data by the server. Client A sends it to B. Client B sends the execution result back to Client A.

Then, in the positioning, it was found that each link sending delay is normal, including the server to send feedback data to the client A, but the client A to receive data delay is very large, the following is part of the return data:

And the latency will get bigger and bigger after a long time of communication.

Here is the external event OnMessage:

 1     private void WebSocketOnMessage(object sender, MessageEventArgs e)
 2     {
 3         if (!)
 4         {
 5             //Not supported at this time
 6             return;
 7         }
 8         ($"{("HH:mm:ss fff")},{}");
 9 
10         var receivedMessage = <ChannelServerMessage>();
11         xxxxx
12     }

Let's move on, OnMessage is triggered by () and gets data from the _messageEventQueue queue:

 1     private void message ()
 2     {
 3       MessageEventArgs e = null;
 4       lock (_forMessageEventQueue) {
 5         if (_inMessage || _messageEventQueue.Count == 0 || _readyState != )
 6           return;
 7 
 8         _inMessage = true;
 9         e = _messageEventQueue.Dequeue ();
10       }
11 
12       _message (e);
13     }

The loop to receive data is taken like this:

 1     private void startReceiving ()
 2     {
 3       xxxx
 4       _receivingExited = new ManualResetEvent (false);
 5       Action receive = () =>  (_stream, false,
 6             frame => {
 7               if (!processReceivedFrame (frame) || _readyState == ) {
 8                 var exited = _receivingExited;
 9                 if (exited != null)
10                    ();
11                 return;
12               }
13               // Receive next asap because the Ping or Close needs a response to it.
14               receive ();
15               xxxx
16               message ();
17             },
18             xxxx
19           );
20       receive ();
21     }

Here I see ManualResetEvent. The amount of data is so large that a synchronized signal lock here would surely block it!

Why do you set thread synchronization locks? Let's look down

WebSocketSharp data sending is based on the TCPClient implementation:

1     _tcpClient = new TcpClient (_proxyUri.DnsSafeHost, _proxyUri.Port);
2     _stream = _tcpClient.GetStream ();

Initialize and send data via _stream.Write (bytes, 0, ); send data

Receiving data, also read via _stream, can be seen above in the startReceiving() method, (_stream, false,...)

As we know, TCP is connection-oriented and provides reliable, sequential transmission of data streams. It is used for one-to-one communication, i.e. a TCP connection can only have one sender and one receiver. For more details, you can read my previous article:NET TCP, UDP, Socket, WebSocket - TangSongYuanMingQing2188 - Blogland ()

However, proper synchronization measures are still necessary in high concurrency scenarios when receiving. We can use locks as well as SemaphoreSlim to achieve complex synchronization needs, here we use the signal lock ManualResetEvent

Let's look at the sender code, which also uses lock an object to limit concurrent operations:

 1     private bool send (Opcode opcode, Stream stream)
 2     {
 3       lock (_forSend) {
 4         var src = stream;
 5         var compressed = false;
 6         var sent = false;
 7         xxxxx 
 8         sent = send (opcode, stream, compressed);
 9         xxxxx 
10         return sent;
11       }
12     }

So WebSocketSharp in a highly concurrent scenario is the existence of communication blocking problems. Of course, WebSocketSharp has been implemented very well, if normal, a few ms will not encounter blocking problems, the following settings 10ms timer overclocking send:

Client A sends a message, which is forwarded to Client B by the server, and then the feedback from Client B is forwarded back to Client A by the server, with a real delay of only 0-2ms!

WebSocket

Let's take a look at the native WebSocket and write a WebSocket communication demokybs00/WebSocketDemo ()

The server timed 1ms to make an effort to send a Message message to the client, and the result was surprising:

:“There is already one outstanding 'SendAsync' call for this WebSocket instance. ReceiveAsync and SendAsync can be called simultaneously, but at most one outstanding operation for each of them is allowed at the same time.”

It seems that sending events externally also has to deal with high concurrency scenarios, 1ms is really too much of a slam dunk

 

 1     private SemaphoreSlim _sendLock = new SemaphoreSlim(1);
 2     private async void Timer_Elapsed(object sender, ElapsedEventArgs e)
 3     {
 4         var message = $"{("HH:mm:ss fff")},hello from server";
 5 
 6         await _sendLock.WaitAsync();
 7         await BroadcastAsync("test", message);
 8         _sendLock.Release();
 9         (message);
10     }

After adding the semaphore synchronization, the server side can send normally. Here is the client receiving the data, the transmission is almost latency free:

Also tried adding semaphore synchronization on the client receive alone, and it still prompts the server to send an exception that parallelism is not supported.

So the native WebSocket needs to be processed serially on the sending side, and then execute _stream.FlushAsync() after writing the data.