开发者

Ensuring that the Thread calling Socket.XXXAsync stays alive to complete the IO request (IO Completion Port, C#)

开发者 https://www.devze.com 2023-02-20 22:10 出处:网络
I am implementing a small library to make the use of System.Net.Sockets.Socket easier. It should handle an arbitrary number of listening and receiving TCP sockets and it is important that the implemen

I am implementing a small library to make the use of System.Net.Sockets.Socket easier. It should handle an arbitrary number of listening and receiving TCP sockets and it is important that the implementation is ~fast~.

I am using the XXXAsync methods and the CLR ThreadPool to call back into the delegates of the library user (for example, whenever a message was successfully sent or some data was received). Preferable the library wouldn't start any threads by itself.

The user of the library can access an interface to my Sockets wrapper to send message or to start receiving messages (among many other methods and overloads):

public interface IClientSocket {
    // will eventually call Socket.SendAsync
    IMessageHandle SendAsync(byte[] buffer, int offset, int length);

    // will eventually call Socket.RecieveAsync
    void StartReceiveAsync();
}

The XXXAsync methods use IO Completion Ports. Thus, the thread which calls these methods must stay alive until the operation completes, otherwise the operation fails with SocketError.OperationAborted (I think that is the case, or no?). Imposing such a restriction on the user of the library is ugly and error prone.

What is the best alternative here?

  • Calling ThreadPool.QueueUserWorkItem with a delegate to invoke the XXXAsync methods? Is that safe? I read somewhere, that the ThreadPool won't stop idle threads which have any IOCP. That would be good as it solves the problem above.

    But it could also be bad with many TCP connections. In such a case is is likely that each ThreadPool thread invoked one of the pending ReceiveAsync calls. Therefore the ThreadPool would never shrink even if the current workload is low and many Threads are idle (and wasting memory).

  • Starting a dedicated thread which is always alive and invokes the XXXAsync methods. For example, when a library user want to send data, it places a delegate into a syncronized queue, the thread pops it and invokes the SendAsync method.

    I don't like this solution much, because it wastes a thread and on a multi core machine, sending is only performed by one thread.

Moreover both solutions don't seam the best because they pass the job to call the asyncronous method to another thread. Can that be avoided?

What do you think? (Thank you!!)

Thomas

EDIT 1:

I can reproduce the SocketError.OperationAborted problem with the following test program (I think it is correct).

Compile, start and telnet to port 127.0.0.1:10000. Send a "t" or a "T" and wait for > 3 seconds. When sending "T", the ReceiveAsync call is done in the ThreadPool (works), with "t" a new thread is started which terminates after 3 seconds (fails).

using System;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Collections.Generic;

namespace Test {
    class Program {
        static List<Socket> _referenceToSockets = new List<Socket>();
        static void Main(string[] args) {
            Thread.CurrentThread.Name = "Main Thread";

            // Create a listening socket on Port 10000.
            Socket ServerSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
            _referenceToSockets.Add(ServerSocket);
            var endPoint = new IPEndPoint(IPAddress.Parse("127.0.0.1"), 10000);
            ServerSocket.Bind(endPoint);
            ServerSocket.Listen(50);

            // Start listening.
            var saeaAccept = new SocketAsyncEventArgs();
            saeaAccept.Completed += OnCompleted;
            ServerSocket.AcceptAsync(saeaAccept);

            Console.WriteLine(String.Format("Listening on {0}.", endPoint));
            Console.ReadLine();
        }

        private static void OnCompleted(object obj, SocketAsyncEventArgs evt) {
            var socket = (Socket)obj;
            Console.WriteLine(String.Format("Async operation completed: {0}; Error: {1}; Callback-Thread: \"{2}\" ({3} threadpool)", evt.LastOperation, evt.SocketError, Thread.CurrentThread.Name, Thread.CurrentThread.IsThreadPoolThread?"is":"no"));
            switch (evt.LastOperation) {
                case SocketAsyncOperation.Accept:
                    // Client connected. Listen for more.
                    Socket clientSocket = evt.AcceptSocket;
                    _referenceToSockets.Add(clientSocket);
                    evt.AcceptSocket = null;
                    socket.AcceptAsync(evt);

                    // Start receiving data.
                    var saeaReceive = new SocketAsyncEventArgs();
                    saeaReceive.Completed += OnCompleted;
                    saeaReceive.SetBuffer(new byte[1024], 0, 1024);
                    clientSocket.ReceiveAsync(saeaReceive);
                    break;
                case SocketAsyncOperation.Disconnect:
                    socket.Close();
                    evt.Dispose();
                    break;
                case SocketAsyncOperation.Receive:
                    if (evt.SocketError != SocketError.Success) {
                        socket.DisconnectAsync(evt);
                        return;
                    }
                    var asText = Encoding.ASCII.GetString(evt.Buffer, evt.Offset, evt.BytesTransferred);
                    Console.WriteLine(String.Format("Received: {0} bytes: \"{1}\"", evt.BytesTransferred, asText));
                    if (evt.BytesTransferred == 0) {
                        socket.Close();
                        evt.Dispose();
                    }
                    if (asText.ToUpper().StartsWith("T")) {
                        Action<object> action = (object o) => {
                            socket.ReceiveAsync(evt);
                            Console.WriteLine(String.Format("Called ReceiveAsync {0}...", o));
                            Thread.Sleep(3000);
                            Console.WriteLine("End of Action...");
                        };
                        if (asText.StartsWith("T")) {
                            ThreadPool.QueueUserWorkItem(o=>action(o), "in ThreadPool");
                        } else {
                            new Thread(o=>action(o)).Start("in a new Thread");
                        }
                    } else {
                        socket.ReceiveAsync(evt);
                    }
                    break;
            }
        }
    }
}

EDIT #3

The following is the solution I intend to use:

I let the thread of the library user invoke the XXXAsync (except SendAsync) operations directly. In the most cases the call will be successfull (because the calling thread does terminate only seldomly).

If the operation failed with SocketError.OperationAborted, the library just invokes the operation again using the current thre开发者_C百科ad from the async callback. This one is a ThreadPool thread and it has a good chance to succeed (an additional flag will be set, to use this workaround at most once if the reason for SocketError.OperationAborted was cause due to some other error). This should work because the socket itself is still ok, just the previous operation failed.

For the SendAsync, this workaround does not work, because it could mess up the order of the messages. In this case I will queue the messages in a FIFO list. I will use the ThreadPool to dequeue them and send them via SendAsync.


A couple of red flags here. If you've got trouble keeping the thread alive then you've got a thread that's not doing anything useful. In which case it doesn't make sense to use Async methods. If speed is your only concern then you should not use the Async methods. They require that the Socket grabs a tp thread to make the callback. That's small overhead but there is none if a regular thread makes a blocking call.

You should only consider the Async methods when you need to make your app scale well when you handle many simultaneous connections. That produces very bursty cpu loads, perfect for tp threads.


the thread which calls these methods must stay alive until the operation completes"

I'm not convinced of the veractiy of this statement. When you start a network operation this is handed off to the Windows IO system. When a network event occurs (i.e. data is received and your app is notified) this can happily be dealt with by another thread (even if the initiating thread has long since died).

Are you sure you're not dropping a reference to something crucial and allowing it to be garbage collected?

If you are in a situation where threads are allowed to die then I would suspect that your code isn't ready to scale to the mega-fast/stellar # of clients. If you're really looking to scale, your use of threads should be very tightly controlled. The cost of letting a thread die (where I'm assuming another would need spinning up to replace it) isn't one that you should allow to happen too much. Thread pools are very definitely the way to go here.

0

精彩评论

暂无评论...
验证码 换一张
取 消