@StreamListener("input")
@SendTo("output")
public KStream<?, MyObject> process(KStream<Object, IncomingObject> input) {
KTable table = input.flatMapValues(value -> this.getMylogic(value));
return table.toStream();
}
Я пытаюсь преобразовать KStream в KTable, а затем в KStream, но получаю не могу преобразовать из KStream в KTable
значение - json. Пожалуйста, помогите, и как я могу также использовать агрегирование?
{
"name":"test",
address{
"localAddress":"myaddress",
"businessAddress":"testAddress"
}
}
в методе mylogic я беру только адрес для отправки в другую тему. Любезно помогите
flatMapValues
возвращаетKStream
, а неKTable
- person Vasyl Sarzhynskyi   schedule 23.07.2018KTable
. чтобы получить KTable, вы можете использовать следующее:kStream.groupByKey().aggregate(..)
- person Vasyl Sarzhynskyi   schedule 23.07.2018