как восстановить потерянные сообщения от издателя, когда подписчик отключен?

Есть:

  1. издатель, который привязывается к localhost:5556 и отправляет сообщение (topic1, topic2) каждые 2 секунды.
  2. два подписчика, которые подключаются к localhost:5556 и соответственно получают "topic1" и "topic2"

Все работает нормально, когда PUB отправляет сообщения, а подписчики их получают.

У меня проблема, когда один из двух абонентов перестает работать. Я бы хотел, чтобы все сообщения, отброшенные PUB, ставились в очередь и отправлялись «мертвому» подписчику, когда он оживает. Этого не бывает. Когда подписчик повторно подключается к издателю, все сообщения, отправленные за этот период времени (от мертвых до живых), удаляются.

Я пытаюсь писать код на Python, используя модуль pyzmq.

Кроме того, я заметил, что если вместо того, чтобы закрыть процесс подписчика python, попробуйте добавить time.sleep(10), PUB ставит сообщения в очередь, и когда подписчик просыпается, все сообщения отправляются. Этого не происходит, если процесс закрыт (CTRL + C) и перезапущен.

Но если я попытаюсь инвертировать bind() на connect() между издателем и подписчиком, приложение будет работать так, как я хочу. Но в этом случае возникает большая проблема. У меня не могло быть больше подписчиков, связанных с одним издателем, потому что подписчики привязываются к разным портам, а издатель может подключаться только к одному порту.

Как я могу это решить?


person Spl0it92    schedule 03.05.2019    source источник
comment
Кажется, у вас есть несколько вопросов. Вы можете немного очистить это. В любом случае PUB / SUB - это транспорт с потерями. Если подписчик не подключен при отправке сообщения, это сообщение теряется. Это (а) задумано и (б) задокументировано.   -  person larsks    schedule 03.05.2019


Ответы (2)


«Моя проблема ...» разрешима

ZeroMQ - это интеллектуальная и высокопроизводительная платформа сигнализации / обмена сообщениями, созданная для обеспечения уровня сервиса, состоящего из масштабируемых архетипов формальной коммуникации. В течение многих лет это было оптимизировано для низкой задержки, высокой производительности, почти линейного масштабирования и простоты использования распределенного поведения агентов, которое напоминает поведение человека (например, один REQ -гостиный , другой будет REP -ly).

Все это было по замыслу БЕЗ БРОКЕРА, т. Е. Не существует такой вещи, как хранилище «человек посередине» для повторной отправки сообщений, которые были отправлены через инфраструктуру Framework в то время, когда агент был вне сети. обслуживания.

Это не означает, что нельзя создать такую ​​функцию на уровне приложения, если действительно нужен такой набор свойств. Задача разработчика - определить, какие функции следует добавить (временные метки, хранилище сообщений, указатели-счетчики-флаги доставки сообщений NACK / POSACK, частные двунаправленные протоколы для запроса пропущенных сообщений и т. Д. И т. Д.).

Таким образом, эта часть разрешима, но она не потребляет ни ресурсов, ни увеличивает задержку для базовой структуры, которая была разработана с максимально достижимой производительностью и минимально возможной задержкой с использованием Zen-of-Zero (Zero-copy, где это возможно, Zero- гарантия - либо доставить точную копию исходного сообщения, либо ничего, если присутствовали поврежденные части и т. д.) философия.

В более новых версиях ZeroMQ есть даже прямо противоположная функция - метод .setsockopt( ZMQ_CONFLATE, 1 ), который позволяет диспетчеру данных Context-instance отбрасывать все сообщения, кроме самых "свежих", из очереди сообщений отдельного контрагента, таким образом доставляя его по следующему запросу. , но самое последнее сообщение и никакое другое. Это очень удобно для многих сценариев приложений, где «старые» сообщения просто теряли свою ценность, если не были доставлены «сейчас», а CONFLATE-режим позволяет не перемещать их все через медленное или нестабильное соединение, таким образом (косвенно) отдавая приоритет распределению только «новейшее» состояние мирового сообщения (и удаление устаревших сообщений также сокращает ресурсы управления очередями и рабочие нагрузки, не так ли?).

Если концепции ZeroMQ новы для ваших дизайнерских усилий, обязательно прочтите книгу Питера ХИНТДЖЕНСА, библию не только о самой структуре ZeroMQ, или вы можете прочитать хотя бы об основных концептуальных различиях ZeroMQ для начала в [Иерархия ZeroMQ менее чем за пять секунд] Раздел.

Мало того, я заметил ...

Да, это будет работать нормально, потому что оба распределенных Context-экземпляра используют отдельные потоки, которые остаются в контакте, даже несмотря на то, что интерпретатор Python был проинструктирован .sleep() (что он и делает, но не усилия по поддержанию соединения с обеих сторон специфичные для протокола насосы данных)

если я попытаюсь инвертировать .bind() с помощью .connect()

О, конечно, он должен так себя вести. Только представьте себе случай с радиостанцией BBC - все люди во всем мире знают радиостанцию, которую нужно слушать, в то время как ни одна из станций BBC не может знать всех слушателей радиостанции, поэтому никогда не сможет «установить» связь от BBC со всеми из них по всему миру (именно по той причине, что у каждого из них есть уникальный адрес, который априори неизвестен вещательной компании, которая, таким образом, не может настроить ту же инфраструктуру со своей стороны). ZeroMQ PUB -lisher - такая же история - вы рекламируете «центральный» адрес для подключения, и каждый, кто хочет и может, это сделает. А не наоборот.

В любом случае, наслаждайтесь мирами ZeroMQ в своих будущих дизайнерских работах. Это стоит освоить.

person user3666197    schedule 05.07.2019

ZeroMQ не будет поддерживать это без большой дополнительной работы / кода.

Если вам действительно нужна система сообщений pub / sub, в которой подписчики могут приходить и уходить без потерь, было бы намного проще использовать систему, которая была построена для обеспечения этой функциональности.

Одна из таких систем - kafka: https://kafka.apache.org/intro

Если вы действительно хотите использовать zeromq, недавно были начаты проекты в этом направлении. https://github.com/zeromq/dafka

person James Harvey    schedule 08.07.2019