Группировка по нескольким полям Storm

Я полагаю, что нужно сгруппировать поток по двум полям ("remote-client-ip", "request-params") и подсчитать количество кортежей в каждой группе. И объедините их в карту. Вот моя топология:

topology.newStream("kafka-spout-stream-1", repeatSpout)
                    .each(new Fields("str"), new URLParser(), new   Fields(fieldNames))
                    .each(new Fields("remote-client-ip", "request-params"), new HTTPParameterExtractor(), new Fields("query-string"))
                    .groupBy(new Fields("remote-client-ip", "query-string"))
                    .aggregate(new Fields("remote-client-ip", "query-string"), new Count(), new Fields("user-word-count"))
                    .groupBy(new Fields("remote-client-ip"))
                    .persistentAggregate(new MemoryMapState.Factory(), new UserQueryStringCombiner(), new Fields("user-word-count-list"));

Но после отладки я обнаружил, что поток данных сначала заблокирован groupBy(), что является группировкой по нескольким полям. У меня ничего не было выполнено для Count() в последующем агрегатном операторе.

Поэтому я думаю, что неправильно понимаю некую концепцию взаимодействия между группировкой по нескольким полям и агрегацией.

Пожалуйста, дайте мне знать, верны мои предположения или нет. Спасибо!


person Eageon    schedule 16.04.2015    source источник


Ответы (1)


Вы группируете уже сгруппированные поля с функцией Aggregate() в своей топологии. Попробуй это:

.aggregate(new Count(), new Fields("user-word-count"))

Вместо этого:

.aggregate(new Fields("remote-client-ip", "query-string"), new Count(), new Fields("user-word-count"))
person Lukas Hajdu    schedule 17.04.2015