Как читать из нескольких очередей в реальном мире?

Вот теоретический вопрос:

Когда я создаю приложение с использованием очереди сообщений, мне понадобится несколько очередей, поддерживающих разные типы данных для разных целей. Предположим, у меня есть 20 очередей (например, одна для создания новых пользователей, одна для обработки новых заказов, одна для редактирования настроек пользователя и т. д.).

Я собираюсь развернуть это в Windows Azure, используя «минимум» 1 веб-роль и 1 рабочую роль.

Как правильно читать из всех этих 20 очередей? Это то, что я имел в виду, но у меня мало или совсем нет реального практического опыта в этом:

Создайте класс, который порождает 20 потоков в классе «основной» рабочей роли. Пусть каждый из этих потоков выполняет метод для опроса другой очереди и позволяет всем этим потокам спать между каждым опросом (конечно, с механизмом отсрочки, который увеличивает время ожидания).

Это приводит к 20 потокам (или 21?) и 20 очередям, которые активно опрашиваются, что приводит к большому количеству потерянных сообщений (каждый раз, когда вы опрашиваете пустую очередь, она оплачивается как сообщение).

Как решить эту проблему?


person Leon Cullens    schedule 09.06.2012    source источник


Ответы (3)


Я прочитал другие ответы (очень хорошие ответы) и хотел добавить к этому свой собственный взгляд.

Придерживаясь очередей Windows Azure, как описывал @Lucifure: я действительно не вижу необходимости в нескольких очередях, за исключением двух сценариев:

  • Вы хотите другие приоритеты. Последнее, что вам нужно, — это сообщение с высоким приоритетом, застрявшее за сотнями сообщений с низким приоритетом. Создайте очередь hi-pri для них.
  • Количество прочитанных и удаленных сообщений превысит целевой показатель в 500 транзакций в секунду. В этом случае создайте несколько очередей, чтобы распределить объем транзакций по разделам хранилища (а учетная запись хранения будет обрабатывать до 5 000 транзакций в секунду).

Если вы придерживаетесь одной очереди (на основе хранилища, а не служебной шины), вы можете читать блоки сообщений одновременно (до 32). Вы можете легко разработать формат, который поможет вам различать типы сообщений (возможно, с помощью простого префикса). Затем просто передайте сообщение соответствующему потоку для обработки. В очередях служебной шины нет операций чтения нескольких сообщений, хотя они допускают предварительную выборку (что приводит к загрузке буферизованных сообщений в кэш).

Преимущество одной очереди над многими: вы устраняете (или значительно уменьшаете) проблему «во многих очередях нет сообщений, что приводит к пустым чтениям».

Если вам нужна большая пропускная способность, вы всегда можете увеличить количество потоков, выполняющих чтение и диспетчеризацию очереди.

Помните, что каждое удаление является атомарным; нет дозирования. И что касается опроса очереди: вы правы, когда думаете об отсрочке. Вам не нужно отступать после успешного прочтения сообщения (или блока сообщений). Просто отступите, когда вы ничего не получите после попытки чтения.

Одно приятное преимущество по сравнению с очередями служебной шины: очереди Windows Azure предоставляют вам приблизительное количество сообщений (что очень полезно при рассмотрении масштабирования до нескольких экземпляров). Очереди служебной шины не обеспечивают этого.

person David Makogon    schedule 10.06.2012
comment
Но как узнать, что делать с сообщением, когда вы его получите? Предположим, у нас есть очередь, в которой хранятся 3 типа объектов: Заказ, Клиент, Продукт. Получаем объект Product. Как узнать, следует ли добавить, обновить или удалить продукт? Я не вижу чистого способа справиться с этим, кроме как создать очередь для каждой цели. - person Leon Cullens; 10.06.2012
comment
Просто отформатируйте сообщения с каким-нибудь уникальным префиксом. Затем ваш код чтения очереди просматривает префикс и решает, что делать с каждым сообщением. Например: RENDER|\raw\image1.jpg|\rendered\image1.jpg и THUMBNAIL|\raw\image1.jpg|\thumbs\image1.jpg. Проанализируйте по разделителю '|', проверьте тип сообщения и передайте его в соответствующий поток. Примечание. Сообщения очереди являются двоичными или строковыми. Придумайте любой формат, который вы хотите. Просто приведу простой пример. - person David Makogon; 10.06.2012
comment
Хм, это очень, очень, очень грязно. Я надеялся на более чистое решение, странно, что (очевидно) его нет. - person Leon Cullens; 10.06.2012
comment
BrokeredMessage имеет свойство ContentType. Сообщения могут быть типов CreateOrder, UpdateOrder, DeleteOrder, CreateCustomer, UpdateCustomer и т. д. - person Steven T. Cramer; 12.07.2014

Альтернативной стратегией может быть использование одной или меньшего количества очередей, чтобы очередь могла поддерживать более одного типа сообщений. Этот подход проще в управлении и дешевле, если ваша системная архитектура поддерживает его.

В реальном мире я успешно использовал несколько очередей (в целях масштабируемости), каждая очередь читалась в отдельном потоке, запускаемом событием таймера. В зависимости от нагрузки на очередь и потребностей приложения событие таймера было изменено для обслуживания очереди с динамически меняющимися интервалами.

person hocho    schedule 09.06.2012
comment
Какие данные вы поместили в очередь (т. е. как ваш рабочий процесс узнал, какой тип он получит из очереди, если смешать все разные типы)? - person Leon Cullens; 10.06.2012
comment
Если вы используете очереди служебной шины, вы работаете с BrokeredMessage. Этот объект BrokeredMessage имеет свойство с именем Properties, к которому вы можете добавить пользовательскую информацию, которую вы могли бы использовать при получении сообщения. - person Sandrino Di Mattia; 10.06.2012
comment
В основном, используя сообщения с самоописанием... Я использовал класс конверта, который обертывал фактический объект сообщения, что-то вроде класса Envelope‹T›. Конверт содержал собственно объект сообщения и имя его типа, был сериализован в XML и помещен в очередь. При чтении XML анализировался, чтобы получить имя типа, а затем десериализовался в фактическое сообщение и отправлен в обработчик сообщений. Вместо этого можно использовать более сложные, но более простые подходы. - person hocho; 10.06.2012
comment
Комментарий Sandrino об использовании Brokered Message с очередями служебной шины также является очень жизнеспособной стратегией. - person hocho; 10.06.2012

Если вам недостаточно механизма отсрочки для очередей хранилища, я предлагаю вам рассмотреть очереди служебной шины. С помощью очередей служебной шины вам не придется проводить такие агрессивные опросы.

Вам все равно потребуется реализовать цикл для опроса очереди, но тайм-аут приема делает его легче, чем механизм постоянного опроса, который у вас был бы при использовании очередей хранилища.

В следующем примере я пытаюсь получить сообщение из очереди. Если сообщение не найдено, оно будет держать соединение открытым в течение 30 секунд, чтобы увидеть, не поступило ли что-нибудь новое. Если сообщение не поступило через 30 секунд, метод Receive вернет значение null (и у меня будет цикл, пытающийся снова вызвать Receive). Обратите внимание, что максимальное время ожидания составляет 24 дня.

MessagingFactory factory = MessagingFactory.Create(ServiceBusEnvironment.CreateServiceUri("sb", ServiceNamespace, string.Empty), credentials); 
QueueClient myQueueClient = factory.CreateQueueClient("TestQueue");
myQueueClient.Receive(new TimeSpan(hours: 0, minutes: 0, seconds: 30));

Создание потоков для каждой очереди, из которой вы хотите читать, — хорошая идея, но, учитывая ограничения емкости пула потоков CLR, вам также следует рассмотреть возможность получения сообщений асинхронно (например, с помощью TaskFactory.FromAsync): http://msdn.microsoft.com/en-us/library/windowsazure/hh851744.aspx

person Sandrino Di Mattia    schedule 09.06.2012
comment
Мой вопрос был на самом деле не о механизме отсрочки, а о чтении из нескольких частей очередей. У вас есть пример того, как можно реализовать это с помощью FromAsync? А что если у нас будет 200 очередей вместо 20? Как вы решаете это, не создавая слишком много потоков? - person Leon Cullens; 10.06.2012
comment
Теперь я вижу, что недостаточно хорошо рассмотрел ваш пример, я думал, что речь идет только о TPL, но он касается Azure. Читаю сейчас :) - person Leon Cullens; 10.06.2012