Как переместить сообщение об ошибке в очередь недоставленных сообщений Azure (темы - Подписка) с помощью Java?

Мне нужно отправить свои сообщения в очередь недоставленных сообщений из подписки на тему Azure в случае возникновения какой-либо ошибки при чтении и обработке сообщения из темы. Поэтому я попробовал протестировать отправку сообщения прямо в DLQ.

Мой пример кода будет похож на

static void sendMessage()
{
    // create a Service Bus Sender client for the queue 
    ServiceBusSenderClient senderClient = new ServiceBusClientBuilder()
            .connectionString(connectionString)
            .sender()
            .topicName(topicName)
            
            .buildClient();

    // send one message to the topic
    
    
    senderClient.sendMessage(new ServiceBusMessage("Hello, World!"));    


}
static void resceiveAsync() {
    ServiceBusReceiverAsyncClient receiver = new ServiceBusClientBuilder()
            .connectionString(connectionString)
            .receiver()
            .topicName(topicName)
            .subscriptionName(subName)
            .buildAsyncClient();

        // receive() operation continuously fetches messages until the subscription is disposed.
        // The stream is infinite, and completes when the subscription or receiver is closed.
        Disposable subscription = receiver.receiveMessages().subscribe(message -> {

            System.out.printf("Id: %s%n", message.getMessageId());
            System.out.printf("Contents: %s%n", message.getBody().toString());
        }, error -> {
                System.err.println("Error occurred while receiving messages: " + error);
            }, () -> {
                System.out.println("Finished receiving messages.");
            });

        // Continue application processing. When you are finished receiving messages, dispose of the subscription.
        subscription.dispose();

        // When you are done using the receiver, dispose of it.
        receiver.close();
    
    
    
}

Я пытался получить путь к очереди недействительных писем

    String dlq = EntityNameHelper.formatDeadLetterPath(topicName);

Я получил путь очереди недоставленных сообщений, например = mytopic / $ deadletterqueue

Но он не работает при передаче пути в качестве имени темы. Он выбрасывает исключение Entity topic not found.

Любой, пожалуйста, посоветуйте мне по этому поводу

Ссылка: Как переместить ошибку сообщение в очередь недоставленных сообщений Azure с помощью Java?

https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dead-letter-queues#moving-messages-to-the-dlq

Как отправлять сообщения об ошибках в очередь мертвых писем служебной шины Azure в Spring Boot Java?

https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-java-how-to-use-topics-subscriptions-legacy#receive-messages-from-a-subscription


person Debugger    schedule 06.01.2021    source источник


Ответы (1)


Вы, вероятно, знаете, что сообщение будет автоматически перемещено в очередь недоставленных сообщений, если вы создадите исключения во время обработки и будет превышено максимальное количество доставок. Если вы хотите явно переместить сообщение в DLQ, вы также можете это сделать. Обычно это происходит, если вы знаете, что сообщение никогда не может быть успешным из-за его содержимого.

Вы не можете отправлять новые сообщения напрямую в DLQ, потому что тогда в системе будет два сообщения. Вам нужно вызвать специальную операцию над родительской сущностью. Кроме того, <topic path>/$deadletterqueue не работает, потому что это будет DLQ всех подписок. Правильный путь к сущности строится следующим образом:

<queue path>/$deadletterqueue
<topic path>/Subscriptions/<subscription path>/$deadletterqueue

https://github.com/Azure/azure-service-bus/blob/master/samples/Java/azure-servicebus/DeadletterQueue/src/main/java/com/microsoft/azure/servicebus/samples/deadletterqueue/DeadletterQueue.java

Этот пример кода предназначен для очередей, но вы сможете довольно легко адаптировать его к темам:

    // register the RegisterMessageHandler callback
    receiver.registerMessageHandler(
            new IMessageHandler() {
                // callback invoked when the message handler loop has obtained a message
                public CompletableFuture<Void> onMessageAsync(IMessage message) {
                    // receives message is passed to callback
                    if (message.getLabel() != null &&
                            message.getContentType() != null &&
                            message.getLabel().contentEquals("Scientist") &&
                            message.getContentType().contentEquals("application/json")) {

                        // ...
                    } else {
                        return receiver.deadLetterAsync(message.getLockToken());
                    }
                    return receiver.completeAsync(message.getLockToken());
                }

                // callback invoked when the message handler has an exception to report
                public void notifyException(Throwable throwable, ExceptionPhase exceptionPhase) {
                    System.out.printf(exceptionPhase + "-" + throwable.getMessage());
                }
            },
            // 1 concurrent call, messages are auto-completed, auto-renew duration
            new MessageHandlerOptions(1, false, Duration.ofMinutes(1)),
            executorService);
person Alex AIT    schedule 06.01.2021