KStream в KTable

@StreamListener("input")
@SendTo("output")
public KStream<?, MyObject> process(KStream<Object, IncomingObject> input) {
KTable table = input.flatMapValues(value -> this.getMylogic(value));            
return table.toStream();
}

Я пытаюсь преобразовать KStream в KTable, а затем в KStream, но получаю не могу преобразовать из KStream в KTable

значение - json. Пожалуйста, помогите, и как я могу также использовать агрегирование?

{
"name":"test",
address{
"localAddress":"myaddress",
"businessAddress":"testAddress"
}
}

в методе mylogic я беру только адрес для отправки в другую тему. Любезно помогите


person Sri    schedule 23.07.2018    source источник
comment
метод flatMapValues возвращает KStream, а не KTable   -  person Vasyl Sarzhynskyi    schedule 23.07.2018
comment
да, так как же конвертировать из kstream в ktable?   -  person Sri    schedule 23.07.2018
comment
Вы не написали, зачем вам KTable. чтобы получить KTable, вы можете использовать следующее: kStream.groupByKey().aggregate(..)   -  person Vasyl Sarzhynskyi    schedule 23.07.2018
comment
какова цель создания Ktable? Можете ли вы предоставить подробную информацию об ожидаемых результатах в таблице ktable?   -  person Nishu Tayal    schedule 26.07.2018
comment
docs.confluent.io/current/streams/   -  person Matthias J. Sax    schedule 31.07.2018


Ответы (1)


Вам нужно применить агрегатную функцию, такую ​​как count, чтобы получить KTable в качестве результата. В противном случае нет возможности сделать KStream -> KTable -> KStream.

Что вам нужно и что вы можете сделать, это KStream.count () (например) -> KTable -> KStream. Таким образом, в основном результаты агрегирования будут публиковаться в KStream, которые также могут быть опубликованы в теме Kafka.

person Matus Cimerman    schedule 23.07.2018