Использование портов завершения ввода-вывода из нескольких сокетов в ограниченном пуле потоков в C#

Я пытаюсь прослушивать пакеты UDP с различных входящих портов (~ 20). Я хотел бы выделить ~ 3-5 потоков для получения и обработки этих пакетов. Это кажется идеальной ситуацией для портов завершения ввода-вывода в Windows. Чего я не понимаю, так это того, как выполнить сопоставление нескольких сокетов «многие к меньшему» для проверки меньшего набора потоков.

Следующий код создает все мои сокеты и начинает операцию асинхронного получения.

for(int ix = 0; ix < 20; ix++)
{
    var socket = new Socket(AddressFamily.InterNetwork,
                            SocketType.Dgram, ProtocolType.Udp);
    socket.Bind(new IPEndPoint(IPAddress.Any, ix+6000));
    var e = new SocketAsyncEventArgs();
    e.Completed+=OnReceive;
    e.SetBuffer(buffer, ix*1024*1024, 1024*1024);
    socket.ReceiveFromAsync(e);
    _sockets.Add(socket);
}

Я понимаю, что каждое сообщение OnReceive будет вызываться при получении пакета...

static void OnReceive(object sender, SocketAsyncEventArgs e)
{
    Console.WriteLine("Received {0} bytes", e.BytesTransfered);
    if(!((Socket)sender).ReceiveFromAsync(e))
        e_Completed(sender, e);
}
  1. Как ограничить количество потоков, выполняющих события OnReceive?
  2. Как лучше всего предотвратить переполнение стека в тех редких случаях, когда метод OnReceive рекурсивно вызывает себя слишком много раз?

person Superman    schedule 27.09.2012    source источник
comment
Ищем эквивалент параметра CreateIoCompletionPort NumberOfConcurrentThreads.   -  person Superman    schedule 27.09.2012


Ответы (1)


Не уверен, что понимаю, что вы имеете в виду под потоками для получения и обработки. Прием происходит в фоновом режиме.

В любом случае, я бы использовал BlockingCollection.

OnReceive может выглядеть примерно так

private static BlockingCollection<byte[]> _received = new ...

static void OnReceive(object sender, SocketAsyncEventArgs e) {
    byte[] data = new byte[e.BytesTransfered];
    Array.Copy(e.buffer, e.Offset, data, 0, e.BytesTransfered);
    _received.Add(data)
    ...
}

Затем вы просто используете TPL/PLinq для обработки полученных данных в запрошенном количестве потоков.

var parallelOptions = new ParalellOptions { MaxDegreeOfParallelism = 3 };
Parallel.ForEach(_received.GetConsumingPartitioner(),
                parallelOptions, 
                data => {
                    // do processing
                    ...
                });
person adrianm    schedule 27.09.2012
comment
Согласованное получение происходит в фоновом режиме, но порты завершения уже обращаются к планировщику потоков, просто интересно, как это контролировать. Не стремясь делать копии памяти в другой поток и ограничивать их там, так как я потеряю преимущества портов завершения. - person Superman; 27.09.2012