Как истечь ключевое состояние с TTL в Apache Flink?

У меня такой конвейер:

 env.addSource(kafkaConsumer)
            .keyBy { value -> value.f0 }
            .window(EventTimeSessionWindows.withGap(Time.minutes(2)))
            .reduce(::reduceRecord)
            .addSink(kafkaProducer)

Я хочу, чтобы данные с ключом истекли с помощью TTL.

В некоторых сообщениях в блогах указывается, что для этого мне нужен ValueStateDescriptor. Я сделал такой:

val desc = ValueStateDescriptor("val state", MyKey::class.java)
desc.enableTimeToLive(ttlConfig)

Но как мне на самом деле применить этот дескриптор к моему конвейеру, чтобы он действительно действовал по истечению срока TTL?


person pdeva    schedule 26.04.2021    source источник


Ответы (1)


Описанный вами конвейер не использует какое-либо состояние с ключом, которому было бы полезно установить TTL состояния. Единственное состояние с ключом в вашем конвейере - это содержимое окон сеанса, и это состояние очищается как можно скорее - по мере закрытия сеансов. (Более того, поскольку вы используете функцию сокращения, это состояние состоит только из одного значения для каждого ключа.)

По большей части истекающее состояние актуально только для состояния, которое вы явно создаете, и в этом случае у вас будет готовый доступ к дескриптору состояния и вы сможете настроить его для использования State TTL. Flink SQL действительно создает состояние от вашего имени, срок действия которого может не истекать автоматически, и в этом случае вам нужно будет использовать Время удержания состояния простоя, чтобы настроить его. Библиотека CEP также создает состояние от вашего имени, и в этом случае вы должны убедиться, что ваши шаблоны либо в конечном итоге совпадают, либо истекло время ожидания.

person David Anderson    schedule 26.04.2021
comment
насчет самого ключа. ключ очищается после закрытия окна? или останется навсегда? если да, то что, если в поток постоянно добавляются уникальные ключи? - person pdeva; 26.04.2021
comment
Ключи не хранятся сами по себе. Когда состояние с ключом очищается, пара ключ / значение очищается, и ничего не остается. - person David Anderson; 26.04.2021
comment
понятно. Правильно ли я сказал следующее: в приведенном выше коде конвейера скажем, я получаю только одно событие с ключом x и значением y. Как только сеанс для ключа x закрывается через 2 минуты, оба 'x' и 'y' удаляются из состояния, и состояние становится пустым. - person pdeva; 26.04.2021
comment
Похоже, вы понимаете, но будем педантичны: как только появляется водяной знак, указывающий, что был интервал продолжительностью не менее двух минут, в течение которого не было событий для x, и x, и y удаляются из состояния. Но состояние будет удерживать любое событие, которое использовалось в качестве доказательства водяного знака, который закрыл сеанс для x. - person David Anderson; 27.04.2021
comment
поэтому я проверил это, и кажется, что этот ответ неточен: stackoverflow.com/questions/67429035/ - person pdeva; 07.05.2021