Топология Trident или Storm, которая пишет на Redis

У меня проблема с топологией. Я пытаюсь объяснить рабочий процесс ... У меня есть источник, который выдает ~ 500 тыс. кортежей каждые 2 минуты, эти кортежи должны быть прочитаны носиком и обработаны точно один раз, как один объект (я думаю, что пакет в трезубце). После этого болт/функция/что еще?... должны добавить метку времени и сохранить кортежи в Redis.

Я попытался реализовать топологию Trident с функцией, которая сохраняет все кортежи в Redis, используя объект Jedis (библиотека Redis для Java) в этом классе функций, но когда я развертываю, я получаю исключение NotSerializable для этого объекта.

Мой вопрос. Как я могу реализовать функцию, которая записывает в Redis эту партию кортежей? Читая в Интернете, я не нашел ни одного примера, который записывает функцию в Redis, или любого примера с использованием объекта State в Trident (вероятно, я должен его использовать...)

Моя простая топология:

TridentTopology topology = new TridentTopology();
topology.newStream("myStream", new mySpout()).each(new Fields("field1", "field2"), new myFunction("redis_ip", "6379"));

заранее спасибо


person Eddyman    schedule 24.02.2014    source источник
comment
Вы имеете в виду, что пытаетесь сериализовать экземпляр Jedis? Вы не можете этого сделать, вы должны использовать специальный одноэлементный шаблон с JedisPool (Jedis действительно не рекомендуется, если вы потеряете соединение, оно будет потеряно навсегда, пул решает эту проблему).   -  person zenbeni    schedule 25.02.2014
comment
Хорошо, с пулом я решил проблему сериализации Jedis, но я не могу понять, как реализовать такую ​​топологию с трезубцем (настоящая проблема — отсутствие документации и подобных примеров). Сегодня я изменил топологию на базовую топологию Storm (без Trident), и она работает хорошо, но не гарантирует обработку всех кортежей как одного (каким-то образом упорядоченного) пакета...   -  person Eddyman    schedule 25.02.2014
comment
В любом случае, спасибо за ваш совет, шаг за шагом я начинаю видеть решение! :-П   -  person Eddyman    schedule 25.02.2014


Ответы (2)


(отвечая о состоянии в целом, поскольку конкретная проблема, связанная с Redis, кажется решенной в других комментариях)

Концепции обновления БД в Storm становятся более понятными, если учесть, что Storm читает из распределенных (или «разделенных») источников данных (через «носики» Storm), параллельно обрабатывает потоки данных на многих узлах, при необходимости выполняет вычисления на этих узлах. потоков данных (называемых «агрегациями») и сохраняет результаты в распределенных хранилищах данных (называемых «состояниями»). Агрегация — это очень широкий термин, который просто означает «вычислительный материал»: например, вычисление минимального значения для потока рассматривается в Storm как агрегирование ранее известного минимального значения с новыми значениями, которые в настоящее время обрабатываются в каком-либо узле кластера.

Имея в виду концепции агрегации и партиции, мы можем взглянуть на два основных примитива в Storm, которые позволяют сохранять что-то в состоянии: partitionPersist и persistAggregate, первый работает на уровне каждого узла кластера без согласования с другие разделы и немного похоже на общение с БД через DAO, в то время как второй включает «перераспределение» кортежей (т. ) перед чтением/сохранением чего-либо в БД, и это похоже на разговор с HashMap, а не с БД (в этом случае Storm называет БД «MapState» или «Snapshot», если на карте есть только один ключ).

Еще одна вещь, которую следует иметь в виду, заключается в том, что семантика ровно один раз в Storm не достигается путем обработки каждого кортежа ровно один раз: это было бы слишком хрупко, поскольку потенциально существует несколько операций чтения/записи для каждого кортежа, определенного в В нашей топологии мы хотим избежать двухэтапных коммитов по соображениям масштабируемости, а в больших масштабах сетевые разделы становятся более вероятными. Вместо этого Шторм обычно продолжает воспроизводить кортежи до тех пор, пока не будет уверен, что они были полностью успешно обработаны хотя бы один раз. Важное отношение этого к обновлениям состояния заключается в том, что Storm предоставляет нам примитив (OpaqueMap), который позволяет идемпотентно обновлять состояние, чтобы эти повторы не повреждали ранее сохраненные данные. Например, если мы суммируем числа [1,2,3,4,5], результирующая вещь, сохраняемая в БД, всегда будет 15, даже если они воспроизводятся и обрабатываются в операции «сумма» несколько раз из-за некоторых временный отказ. OpaqueMap оказывает незначительное влияние на формат, используемый для сохранения данных в БД. Обратите внимание, что эти повторы и непрозрачная логика присутствуют только в том случае, если мы говорим Шторму действовать таким образом, но обычно мы так и делаем.

Если вам интересно узнать больше, я разместил здесь 2 статьи в блоге по этому вопросу.

http://svendvanderveken.wordpress.com/2013/07/30/scalable-real-time-state-update-with-storm/

http://svendvanderveken.wordpress.com/2014/02/05/error-handling-in-storm-trident-topologies/

И последнее: как намекает приведенный выше материал воспроизведения, Storm по своей природе является очень асинхронным механизмом: у нас обычно есть какой-то производитель данных, который отправляет событие в систему очередей (например, Kafka или 0MQ), и Storm читает оттуда. В результате назначение временной метки из шторма, как предлагается в вопросе, может иметь или не иметь желаемого эффекта: эта временная метка будет отражать «последнее время успешной обработки», а не время приема данных, и, конечно, она не будет идентична в случае воспроизведенных кортежей.

person Svend    schedule 27.02.2014
comment
Спасибо за ваше подробное объяснение, я стараюсь читать больше в ваших статьях. - person Eddyman; 06.03.2014