Как сгруппировать записи из одного раздела, используя окно для времени T для одной записи, каждая запись имеет один и тот же ключ

Допустим, у меня есть поток событий.

R1 - {"abc": "значение 1"}

R2 - {"abc": "значение 2"}

R3 - {"abc": "значение 3"}

R4 - {"abc": "значение 4"}

в одном разделе. Я хочу, чтобы производный поток событий из указанного выше потока имел такое событие, как

{"abc": ["значение 1", "значение 2", "значение 3", "значение 4"]}

учитывая, что каждая запись с таким же ключом уже есть в теме.

Как я могу это сделать, используя агрегацию и groupByKey в Kafka Stream API?


person Jawad Ahmad    schedule 14.03.2019    source источник
comment
Что вы уже пробовали? Вы можете начать с: kafka.apache.org/21 / документация / потоки / руководство-разработчика /   -  person Bartosz Wardziński    schedule 14.03.2019
comment
да. У меня есть, но я не могу заставить его работать. Я не пробовал это в кодировании, но потоковая обработка - это не кодирование, а понимание. Так что мне только любопытно, может ли кто-то это сделать.   -  person Jawad Ahmad    schedule 14.03.2019
comment
В документации вы можете найти раздел с примером работы с окнами: (kafka.apache.org/21/documentation/streams/developer-guide/), агрегирование (kafka.apache.org/21/documentation/streams/developer-guide/) и оконных окончательных результатов (kafka.apache.org/21/documentation/streams/developer- гид /). Может быть, неплохо начать с этого.   -  person Bartosz Wardziński    schedule 14.03.2019


Ответы (1)


Вот пример потока событий JSON, вы можете попробовать что-то вроде следующего:

KTable<Windowed<String>, JsonNode> timeWindowedAggregatedStream = stream.groupByKey().windowedBy(Duration.ofMinutes(5))
    .aggregate(
        () -> objectMapper::createObjectNode, /* initializer */
        (aggKey, newValue, aggValue) -> {
            final JsonNode element = value.has(fieldName) && value.get(fieldName) != null ? value.get(fieldName) : null;

        final ArrayNode arrayNode = aggregate == null || aggregate.get(fieldName) != null
                ? (ArrayNode) aggregate.get(fieldName)
                : mapper.createArrayNode();

        arrayNode.add(element);
        // TO remove duplicates
        Stream<Object> elementStream = IntStream.range(0, arrayNode.size()).mapToObj(arrayNode::get);
        Set<Object> arrayAsSet = elementStream.collect(Collectors.toSet());
        ObjectNode aggregateNode = mapper.createObjectNode();
        ArrayNode uniqueArrayNode = mapper.valueToTree(arrayAsSet);
        aggregate.set(fieldName, uniqueArrayNode); 
        return aggregate;
} , /* adder */
        Materialized.<String, JsonNode, WindowStore<Bytes, byte[]>>as("time-windowed-aggregated-stream-store") /* state store name */
        .withValueSerde(jsonNodeSerde)); /* serde for aggregate value */
person Nishu Tayal    schedule 14.03.2019