Параллельная обработка нескольких очередей в фоновой службе в ядре .NET

Я работаю с некоторыми WIFI устройствами, например, с камерами.

Базовый собрат, который я реализовал:

  1. Кто-то нажимает кнопку.
  2. Кнопка вызывает мою конечную точку Web API.
  3. Моя конечная точка Web API вызывает одну из API's камеры (автор HttpRequest).
  4. Обработка каждого запроса занимает 5 секунд. Причем между каждым запросом должна быть задержка в 1 секунду. Например, если вы нажимаете кнопку 2 раза с задержкой в ​​одну секунду после каждого: сначала мы ожидаем 5 секунд для обработки первого нажатия, затем одну секунду задержки и в конце мы ожидаем 5 секунд для последнего процесса (второе нажатие).

Для этого я использую Queued background tasks на основе Fire and Forgot в проекте .NetCore 3.1, и он отлично работает, когда я имею дело только с одной камерой.

Но новое требование проекта: фоновая задача должна обрабатывать несколько камер. Это означает, что одна очередь на камеру, и очереди должны работать параллельно, в зависимости от того, что я описал выше.

Например, если у нас есть 2 устройства camera-001 и camera-002 и 2 подключенные кнопки btn-cam-001 и btn-cam-002, И порядок нажатия (задержка 0,5 с после каждого нажатия): 2X btn-cam-001 и 1X btn-cam-002.

На самом деле происходит FIFO. Сначала будут обработаны запросы btn-cam-001, а затем btn-cam-002.

Что я ожидаю и что мне нужно: Camera-002 не следует ждать получения запроса, и первые запросы к обеим камерам _17 _ / _ 18_ должны обрабатываться одновременно (на основе примера). Вроде у каждой камеры своя очередь и свой процесс.

Вопрос в том, как этого добиться в .NetCore 3.1? Ценю любую помощь.

Моя текущая фоновая служба:

public class QueuedHostedService : BackgroundService
{
    public IBackgroundTaskQueue TaskQueue { get; }

    private readonly ILogger _logger;

    public QueuedHostedService(IBackgroundTaskQueue taskQueue, ILoggerFactory loggerFactory)
    {
        TaskQueue = taskQueue;
        _logger = loggerFactory.CreateLogger<QueuedHostedService>();
    }

    protected override async Task ExecuteAsync(CancellationToken cancellationToken)
    {
        _logger.LogInformation("Queued Hosted Service is starting.");

        while (!cancellationToken.IsCancellationRequested)
        {
            var workItem = await TaskQueue.DequeueAsync(cancellationToken);

            try
            {
                await workItem(cancellationToken);
            }
            catch (Exception exception)
            {
                _logger.LogError(exception, $"Error occurred executing {nameof(workItem)}.");
            }
        }

        _logger.LogInformation("Queued Hosted Service is stopping.");
    }
}

И текущий BackgroundTaskQueue:

public class BackgroundTaskQueue : IBackgroundTaskQueue
{
    private readonly SemaphoreSlim _signal = new SemaphoreSlim(0);

    private readonly ConcurrentQueue<Func<CancellationToken, Task>> _workItems =
        new ConcurrentQueue<Func<CancellationToken, Task>>();

    public void QueueBackgroundWorkItem(Func<CancellationToken, Task> workItem)
    {
        if (workItem is null)
        {
            throw new ArgumentNullException(nameof(workItem));
        }

        _workItems.Enqueue(workItem);

        _signal.Release();
    }

    public async Task<Func<CancellationToken, Task>> DequeueAsync(CancellationToken cancellationToken)
    {
        await _signal.WaitAsync(cancellationToken);

        _workItems.TryDequeue(out var workItem);

        return workItem;
    }
}

Моя текущая конечная точка:

  [HttpPost("hit")]
    public ActionResult TurnOnAsync([FromBody] HitRequest request, CancellationToken cancellationToken = default)
    {
        try
        {
            var camera = ConfigurationHelper.GetAndValidateCamera(request.Device, _configuration);

            _taskQueue.QueueBackgroundWorkItem(async x =>
                {
                    await _cameraRelayService.TurnOnAsync(request.Device, cancellationToken);

                    Thread.Sleep(TimeSpan.FromSeconds(1));

                });

            return Ok();
        }
        catch (Exception exception)
        {
            _logger.LogError(exception, "Error when truning on the lamp {DeviceName}.", request.Device);

            return StatusCode(StatusCodes.Status500InternalServerError, exception.Message);
        }
    }

person peyman gilmour    schedule 21.05.2020    source источник
comment
Можете ли вы показать код, в котором вы в настоящее время применяете задержку в 1 секунду? Вы применяете его перед вызовом метода TaskQueue.QueueBackgroundWorkItem?   -  person Theodor Zoulias    schedule 21.05.2020
comment
В качестве примечания, метод ExecuteAsync, кажется, ведет себя непоследовательно в случае отмены. Возвращенный Task может либо перейти в состояние Canceled (если отмена происходит на любом из этапов await), либо в состояние RanToCompletion (если отмена происходит во время проверки IsCancellationRequested в цикле while).   -  person Theodor Zoulias    schedule 21.05.2020
comment
@TheodorZoulias Спасибо за ответ. Я обновил пост.   -  person peyman gilmour    schedule 21.05.2020
comment
Вместо Thread.Sleep(TimeSpan.FromSeconds(1)); вы можете рассмотреть более легкий await Task.Delay(TimeSpan.FromSeconds(1));, чтобы избежать блокировки ThreadPool потока.   -  person Theodor Zoulias    schedule 21.05.2020
comment
@TheodorZoulias Спасибо за отличные советы. Я буду применять все ... И Task.Delay () - это то, что я хотел сделать.   -  person peyman gilmour    schedule 21.05.2020


Ответы (1)


Вместо одного BackgroundTaskQueue у вас может быть по одному на камеру. Вы можете хранить очереди в словаре, используя камеру в качестве ключа:

public IDictionary<IDevice, IBackgroundTaskQueue> TaskQueues { get; }

Затем в вашей конечной точке используйте очередь, связанную с запрошенной камерой:

_taskQueues[camera].QueueBackgroundWorkItem(async x =>
person Theodor Zoulias    schedule 21.05.2020
comment
Спасибо за ответ. Я попробовал почти 2 способа и применил упомянутые вами изменения, которые оказались безуспешными. Первый способ - применить все эти изменения в контроллере. Но поскольку IBackgroundTaskQueue одноэлементный, я получил тот же результат. - person peyman gilmour; 22.05.2020
comment
Другой способ сбил меня с толку. IDictionary ‹IDevice, IBackgroundTaskQueue› в фоновой службе .cs вместо IBackgroundTaskQueue TaskQueue {get; }. Вопрос в том, откуда сервису известно о камере как о ключе? потому что мне нужно получить правильную очередь backgroundtaskqueue из dic, чтобы исключить из очереди и выполнить задачу. Не могли бы вы привести мне пример? (Я использовал эту ссылку более или менее docs.microsoft.com/en-us/aspnet/core/fundamentals/host/) - person peyman gilmour; 22.05.2020
comment
@peymangilmour Я думаю, вы знаете, сколько у вас камер, когда запускается программа. Вы можете заполнить словарь камерами при запуске службы: TaskQueues.Add(camera1, new BackgroundTaskQueue()) и т. Д. Кстати, класс BackgroundTaskQueue кажется довольно примитивной реализацией асинхронной очереди. Если бы я был на вашем месте, я бы предпочел использовать Channels < / а>. Есть также другие варианты. - person Theodor Zoulias; 22.05.2020
comment
Да, согласен ... Вместо этого я попробую использовать каналы. Спасибо еще раз. - person peyman gilmour; 22.05.2020