Отправитель сообщений SmallRye Reactive Messaging ‹›. Send не отправляется в Kotlin через брокера AMQP с Quarkus

В настоящее время я пытаюсь написать «службу уведомлений» на основе Maven, Quarkus и SmallRye Reactive Messaging в Kotlin. В качестве основы у меня есть пример на Java, который отлично работает, и я пытался «перевести» его на Kotlin.

Я хочу, чтобы это работало так: я отправляю HTTP-запрос (например, GET http://localhost:8080/search/ {word}), и система отправляет «слово» (здесь строка) в очередь «запросы» брокера сообщений Artemis AMQP. Другая система подписывается на брокера сообщений и выбирает «слово» из очереди «запросы» по HTTP-запросу (например, GET http://localhost:8080/receiver).

Однако в Kotlin это не работает, и я предполагаю, что Emitter не отправляет «слово», в отличие от Java.

Вот код, который я использую:

Котлин

Отправка

import io.smallrye.reactive.messaging.annotations.Emitter
import io.smallrye.reactive.messaging.annotations.Stream
import javax.ws.rs.GET
import javax.ws.rs.Path
import javax.ws.rs.PathParam


@Path("/search")
class ExampleService {


    @Stream("queries")
    val queryEmitter: Emitter<String>? = null



    @GET
    @Path("/{word}")
    fun search(@PathParam("word") word: String?): String {

        println("about to send word: " + word!!)

        if (word.isNotEmpty()) {

            var qE=queryEmitter?.send(word)
            println("Emitter return : $qE")
            return word
        }
        return "word was empty"
    }


}

Получение

import org.eclipse.microprofile.reactive.messaging.Incoming
import javax.ws.rs.GET
import javax.ws.rs.Path
import javax.ws.rs.Produces
import javax.ws.rs.core.MediaType


@Path("/receiver")
class AdsResource {


    var word : String = "nothing happened so far"

    @GET
    @Produces(MediaType.TEXT_PLAIN)
    fun getWords(): String {
        return word
    }


    @Incoming("sink")
    fun consume(message: String) {
        println("got user query: $message")
        word = message
    }
}

А вот версия Java

Отправка

import io.smallrye.reactive.messaging.annotations.Emitter;
import io.smallrye.reactive.messaging.annotations.Stream;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;



@Path("/search")
public class SearchEndpoint {

    @Stream("queries")
    Emitter<String> queryEmitter;


    @GET
    @Path("/{word}")
    public String search(@PathParam("word") String word) {

        System.out.println("about to send word: " + word);

        if (!word.isEmpty()) {

            Emitter<String> qE = queryEmitter.send(word);
            System.out.println("Emitter return: " + qE);
            return word;
        }

        return "word was empty" ;
    }

}

Получение

import org.eclipse.microprofile.reactive.messaging.Incoming;
import javax.ws.rs.GET;
import javax.ws.rs.Path;


@Path("/receiver")
public class AdsResource {

   private String word = "";


   @GET
   public String getAd() {
      System.out.println("got user query: " + word);
      return word;
   }
   @Incoming("sink")
   public void consume(String message) {
      System.out.println("got user query: " + message);
      word = message;
   }


}

Здесь находятся файлы конфигурации application.properties для Kotlin и Java.

# Configures the AMQP broker credentials.
amqp-username=quarkus
amqp-password=quarkus

# Configure the AMQP connector to write to the `queries `  address
mp.messaging.outgoing.queries.connector=smallrye-amqp
mp.messaging.outgoing.queries.address=sink
mp.messaging.outgoing.queries.durable=true


# Configure the AMQP connector to read from the `queries ` queue
mp.messaging.incoming.sink.connector=smallrye-amqp
mp.messaging.incoming.sink.durable=true

Немного информации:

Заранее спасибо и дайте мне знать, если я пропустил предоставление информации.


person jufru    schedule 16.09.2019    source источник
comment
Привет, вы пробовали добавить @Inject в поля, помеченные знаком @Stream? Я сомневаюсь, что это проблема, но просто хочу охватить все основания   -  person geoand    schedule 19.09.2019
comment
Кроме того, возможно, у вас есть обе версии на github, чтобы я мог попробовать?   -  person geoand    schedule 19.09.2019
comment
Привет, @geoand, '@Inject', к сожалению, также просто вызывает у меня исключение UnsatisfiedResolutionException: Unsatisfied dependency error: вот ссылка в проект Github. заранее спасибо.   -  person jufru    schedule 20.09.2019
comment
Спасибо. Надеюсь, я посмотрю на выходных   -  person geoand    schedule 20.09.2019


Ответы (1)


Проблема сводится к тому, где Kotlin добавляет аннотацию @Stream в байт-код.

По сути, чтобы решить вашу проблему, вам необходимо заменить:

@Stream("queries")

с участием

@field: Stream("queries")
person geoand    schedule 23.09.2019
comment
Привет, большое спасибо. работает отлично. Однако я не совсем понимаю, в чем разница. Вы можете вкратце объяснить, что происходит? - person jufru; 24.09.2019
comment
Рад это слышать! В основном я описываю проблему здесь: github.com/smallrye/smallrye-reactive-messaging / issues / 209 - person geoand; 24.09.2019