У меня такой конвейер:
env.addSource(kafkaConsumer)
.keyBy { value -> value.f0 }
.window(EventTimeSessionWindows.withGap(Time.minutes(2)))
.reduce(::reduceRecord)
.addSink(kafkaProducer)
Я хочу, чтобы данные с ключом истекли с помощью TTL.
В некоторых сообщениях в блогах указывается, что для этого мне нужен ValueStateDescriptor
. Я сделал такой:
val desc = ValueStateDescriptor("val state", MyKey::class.java)
desc.enableTimeToLive(ttlConfig)
Но как мне на самом деле применить этот дескриптор к моему конвейеру, чтобы он действительно действовал по истечению срока TTL?