Для объекта, которому требуются сеансы, невозможно создать получатель сообщений без сеанса для azure IMessageReceiver.

Я пытаюсь получать сообщения от служебной шины Azure через приложение Java. Я создал необходимую конфигурацию клиента и, например, было успешное соединение через ManagementClient

@Bean
public ClientSettings getMessageReceiver() throws ServiceBusException, InterruptedException {

    AzureTokenCredentials azureTokenCredentials = new ApplicationTokenCredentials(
            "clientID,
            "domain",
            "secret",
            AzureEnvironment.AZURE
    );

    TokenProvider tokenProvider = TokenProvider.createAzureActiveDirectoryTokenProvider(
            new AzureAuthentication(azureTokenCredentials),
            AzureEnvironment.AZURE.activeDirectoryEndpoint(),
            null
    );

    ClientSettings clientSettings = new ClientSettings(tokenProvider,
            RetryPolicy.getDefault(),
            Duration.ofSeconds(30),
            TransportType.AMQP);

    return clientSettings;
}

ManagementClient managementClient =
            new ManagementClient(Util.convertNamespaceToEndPointURI("namespace"),
                    clientSettings);
    managementClient.getTopics();

Но когда я пытаюсь получить сообщения из определенной темы:

        SubscriptionClient subscriptionClient = new SubscriptionClient("namespace", "events/subscriptions/subscription", clientSettings, ReceiveMode.PEEKLOCK);

И появилось сообщение об ошибке:

Для объекта, которому требуются сеансы, невозможно создать несессионный получатель сообщений.

Какие дополнительные шаги нужно предусмотреть?




Ответы (1)


При создании в подписка на вашу тему. Если вам не нужен сеанс сообщений, воссоздайте подписку с отключенным параметром «Требуется сеанс» (ПРИМЕЧАНИЕ: вы не можете изменить это свойство после создания подписки).

Или, если вам действительно нужен сеанс сообщений, обновите свой код, как показано ниже, чтобы сначала получать сеанс, а из полученного сеанса получать сообщения. Все образцы кода можно найти здесь и пример сеанса, в частности здесь.

        // The connection string value can be obtained by:
        // 1. Going to your Service Bus namespace in Azure Portal.
        // 2. Go to "Shared access policies"
        // 3. Copy the connection string for the "RootManageSharedAccessKey" policy.
        String connectionString = "Endpoint={fully-qualified-namespace};SharedAccessKeyName={policy-name};"
            + "SharedAccessKey={key}";

        // Create a receiver.
        // "<<topic-name>>" will be the name of the Service Bus topic you created inside the Service Bus namespace.
        // "<<subscription-name>>" will be the name of the session-enabled subscription.
        ServiceBusReceiverAsyncClient receiver = new ServiceBusClientBuilder()
            .connectionString(connectionString)
            .sessionReceiver()
            .receiveMode(ReceiveMode.PEEK_LOCK)
            .topicName("<<topic-name>>")
            .subscriptionName("<<subscription-name>>")
            .buildAsyncClient();

        Disposable subscription = receiver.receiveMessages()
            .flatMap(context -> {
                if (context.hasError()) {
                    System.out.printf("An error occurred in session %s. Error: %s%n",
                        context.getSessionId(), context.getThrowable());
                    return Mono.empty();
                }

                System.out.println("Processing message from session: " + context.getSessionId());

                // Process message
                return receiver.complete(context.getMessage());
            }).subscribe(aVoid -> {
            }, error -> System.err.println("Error occurred: " + error));

        // Subscribe is not a blocking call so we sleep here so the program does not end.
        TimeUnit.SECONDS.sleep(60);

        // Disposing of the subscription will cancel the receive() operation.
        subscription.dispose();

        // Close the receiver.
        receiver.close();
person krishg    schedule 16.10.2020