(отвечая о состоянии в целом, поскольку конкретная проблема, связанная с Redis, кажется решенной в других комментариях)
Концепции обновления БД в Storm становятся более понятными, если учесть, что Storm читает из распределенных (или «разделенных») источников данных (через «носики» Storm), параллельно обрабатывает потоки данных на многих узлах, при необходимости выполняет вычисления на этих узлах. потоков данных (называемых «агрегациями») и сохраняет результаты в распределенных хранилищах данных (называемых «состояниями»). Агрегация — это очень широкий термин, который просто означает «вычислительный материал»: например, вычисление минимального значения для потока рассматривается в Storm как агрегирование ранее известного минимального значения с новыми значениями, которые в настоящее время обрабатываются в каком-либо узле кластера.
Имея в виду концепции агрегации и партиции, мы можем взглянуть на два основных примитива в Storm, которые позволяют сохранять что-то в состоянии: partitionPersist и persistAggregate, первый работает на уровне каждого узла кластера без согласования с другие разделы и немного похоже на общение с БД через DAO, в то время как второй включает «перераспределение» кортежей (т. ) перед чтением/сохранением чего-либо в БД, и это похоже на разговор с HashMap, а не с БД (в этом случае Storm называет БД «MapState» или «Snapshot», если на карте есть только один ключ).
Еще одна вещь, которую следует иметь в виду, заключается в том, что семантика ровно один раз в Storm не достигается путем обработки каждого кортежа ровно один раз: это было бы слишком хрупко, поскольку потенциально существует несколько операций чтения/записи для каждого кортежа, определенного в В нашей топологии мы хотим избежать двухэтапных коммитов по соображениям масштабируемости, а в больших масштабах сетевые разделы становятся более вероятными. Вместо этого Шторм обычно продолжает воспроизводить кортежи до тех пор, пока не будет уверен, что они были полностью успешно обработаны хотя бы один раз. Важное отношение этого к обновлениям состояния заключается в том, что Storm предоставляет нам примитив (OpaqueMap), который позволяет идемпотентно обновлять состояние, чтобы эти повторы не повреждали ранее сохраненные данные. Например, если мы суммируем числа [1,2,3,4,5], результирующая вещь, сохраняемая в БД, всегда будет 15, даже если они воспроизводятся и обрабатываются в операции «сумма» несколько раз из-за некоторых временный отказ. OpaqueMap оказывает незначительное влияние на формат, используемый для сохранения данных в БД. Обратите внимание, что эти повторы и непрозрачная логика присутствуют только в том случае, если мы говорим Шторму действовать таким образом, но обычно мы так и делаем.
Если вам интересно узнать больше, я разместил здесь 2 статьи в блоге по этому вопросу.
http://svendvanderveken.wordpress.com/2013/07/30/scalable-real-time-state-update-with-storm/
http://svendvanderveken.wordpress.com/2014/02/05/error-handling-in-storm-trident-topologies/
И последнее: как намекает приведенный выше материал воспроизведения, Storm по своей природе является очень асинхронным механизмом: у нас обычно есть какой-то производитель данных, который отправляет событие в систему очередей (например, Kafka или 0MQ), и Storm читает оттуда. В результате назначение временной метки из шторма, как предлагается в вопросе, может иметь или не иметь желаемого эффекта: эта временная метка будет отражать «последнее время успешной обработки», а не время приема данных, и, конечно, она не будет идентична в случае воспроизведенных кортежей.
person
Svend
schedule
27.02.2014