Хост NServiceBus, который подписывается на свои собственные опубликованные сообщения

Используемая версия NServiceBus: 2.0.0.1145

Вопрос:

Можно ли настроить хост NServiceBus таким образом, чтобы он потреблял (подписывался) на свои опубликованные сообщения?

Отвечать:

Это кажется возможным, но в следующей конфигурации это дает мне исключение с заблокированной транзакцией при попытке вставить подписки в SubscriptionStorage. Это происходит, когда вы используете DbSubscriptionStorage и более 1 "NumberOfWorkerThreads".

Ошибка:

Could not execute command:
INSERT INTO Subscription (SubscriberEndpoint, MessageType) VALUES (@p0, @p1)
System.Data.SqlClinet.SqlException:
Transaction was deadlocked on lock resources with another process and has been chosen as the deadlock victim. Rerun the transaction.

После этого NServiceBus пытается отключиться, но терпит неудачу, потому что транзакция все еще выполняется, и выдает исключение UnhandledException.

Как воспроизвести:

Вот мой App.Config:

<!-- Publishing Configuration -->
<MsmqTransportConfig InputQueue="test_publisher_output" ErrorQueue="test_error" NumberOfWorkerThreads="3" MaxRetries="5" />

<!-- Subscription Configuration -->
<UnicastBusConfig DistributorControlAddress="" DistributorDataAddress="" ForwardReceivedMessagesTo="">
    <MessageEndpointMappings>
        <add Messages="MessageAssembly" Endpoint="test_publisher_output" />
    </MessageEndpointMappings>
</UnicastBusConfig>

Моя конфигурация шины:

var bus = Configure.With()
    .Log4Net()
    .StructureMapBuilder(container)
    .XmlSerializer()
    .MsmqTransport()
        .IsTransactional(true)
        .PurgeOnStartup(false)
    .DBSubcriptionStorage(subscriptionDbProperties, true)
    .Sagas()
    .NHibernateSagaPersister(sagaDbProperties, true)
    .UnicastBus()
        .ImpersonateSender(false)
        .LoadMessageHandlers(First<GridInterceptingMessageHandler>
            .Then<SagaMessageHandler>())
    .CreateBus()
    .Start();

и вот мои dbProperties как для подписки, так и для базы данных саги:

connection.provider      NHibernate.Connection.DriverConnectionProvider
connection.driver_class  NHibernate.Driver.SqlClientDriver
dialect                  NHibernate.Dialect.MsSql2005Dialect

Все работает нормально, пока я не увеличу NumberOfWorkerThreads выше 1. Все, что выше этого, вызовет указанные выше ошибки.

Надеюсь, я ничего не забыл. Заранее благодарны за Вашу помощь.


person Stephan Schinkel    schedule 15.10.2010    source источник
comment
Зачем той же конечной точке подписываться на сообщения, которые она публикует сама? Я имею в виду, что если вы хотите, чтобы в той же конечной точке происходили дополнительные действия, вы могли бы просто поместить их в код, который изначально выполнял публикацию.   -  person Udi Dahan    schedule 16.10.2010
comment
Привет, Уди, потому что тогда я потерял бы стабильность, которую дает мне nservicebus. я хочу разбить всю кучу работы на небольшие пакеты, которые выполняются обработчиками сообщений. примерами проделанной здесь работы являются вызов веб-сервисов, открытие ftp-соединений, создание и преобразование xml-файлов, транзакций базы данных и т. д. Также я хотел бы иметь возможность просто публиковать или отправлять сообщение и не хочу заботиться о конечной точке. администраторы приложения должны решить, где размещать конечные точки. в самой простой конфигурации вся эта куча работ должна выполняться на одном хосте.   -  person Stephan Schinkel    schedule 16.10.2010


Ответы (2)


Я бы действительно подумал о редизайне этого компонента. Если вам нужна стабильность, которую дает вам nservicebus, и вы уже разбили компонент, чтобы каждая часть обработки находилась в отдельном обработчике сообщений, поместите каждый обработчик сообщений в отдельный исполняемый файл с отдельной очередью. Если это невозможно, тогда у вас нет стабильности nservicebus, поскольку вы заблокированы некоторыми другими фрагментами кода, и в этом случае вы должны просто напрямую вызывать необходимые функции.

Если вы просто тестируете их все в одной очереди, просто разделите их при тестировании. На самом деле нет причин подписываться на ваши собственные сообщения - либо разделите обработчики на отдельные конечные точки, если это возможно, а если это невозможно, то вызовите функции напрямую.

person mike    schedule 25.10.2010
comment
каким бы удивительным ни было это озарение для меня, это кажется правильным ответом. разделить свои услуги. иметь nservicebus только на внешних границах вашего сервиса. понять обычаи отправить, опубликовать, ответить. моя сомнительная реализация вышеизложенного не поддерживается (по дизайну) с nservicebus. - person Stephan Schinkel; 25.10.2010
comment
Почему нет? Если в моем исполняемом файле есть одна часть, обработка которой занимает много времени, почему бы не запустить эту часть с помощью команды и завершить ее сагой? NServiceBus определил бы, что сервер обработки server1 с моим exe1, который первоначально отправил сообщение (себе), теперь занят, и другой сервер с тем же exe1 теперь обработает его правильно! Нет? Разве не так должно быть, если от внешнего клиента были получены две одинаковые команды, а мой сервер с exe1 завис. Разве NServiceBus не обрабатывает это автоматически и не отправляет его на server2 с запущенным exe1? - person pashute; 07.05.2015
comment
Кажется, это противоречит модели CQRS. На вашей стороне запроса (в том же процессе) будут обработчики событий, которые перенаправят это сообщение денормализаторам. - person Sinaesthetic; 30.11.2016

Если вы хотите, чтобы тот же процесс обрабатывал опубликованное сообщение, было бы лучше сделать Bus.SendLocal() после Bus.Publish(). Метод SendLocal() поместит сообщение в локальную очередь, а ваш внутренний обработчик подберет его и обработает. Это избавит вас от тупика, но сохранит ту же семантику.

person Adam Fyles    schedule 15.10.2010
comment
Привет, Адам, нет, проблема в подписке, а не в отправке или публикации сообщения. поэтому проблема уже возникает при запуске хоста, на котором сообщения не отправляются и не публикуются. - person Stephan Schinkel; 16.10.2010