Сигнализация источника Flume об остановке, если приемник недоступен

Я пытаюсь использовать Flume для отправки журналов доступа в кластер Spark. Но есть куча ограничений, которые вынуждают меня писать собственное приложение (или исходный код Flume) для чтения файлов журнала.

Что я пытаюсь сделать, так это заставить клиент Flume сигнализировать об этом источнике, если он не может записать данные в приемник. Поскольку у нас постоянно бывают длительные сбои в сети, и на диске недостаточно места для размещения журналов с ошибками на диске до тех пор, пока сеть не будет восстановлена. Вместо этого я хотел бы «сказать» источнику прекратить чтение журналов до тех пор, пока сеть не заработает, а затем «сказать» ему начать снова. Но пока я не видел никаких обратных вызовов при ошибках в документации.

Могу ли я в любом случае реализовать такой сценарий, не изобретая велосипед?


person Emam    schedule 08.09.2015    source источник
comment
Какой тип источника вы используете? Вы копируете файлы журнала в HDFS или передаете вывод в SparkStreaming с помощью AvroSink? На самом деле ваш сценарий — это то, для чего Flume был разработан, но вы должны предоставить немного больше информации о своей конфигурации, чтобы мы могли дать вам подсказки. Возможно, подойдет копия вашего flume.conf.   -  person Erik Schmiegelow    schedule 08.09.2015
comment
Ах, извините, забыл упомянуть, что я использую SparkStreaming. Итак, я отправляю журналы в приемник Avro на Spark. Мне просто нужно гарантировать, что поток будет продолжаться автоматически после сбоев в сети, без дублирования журналов на диске во время сбоя (обычные файлы журналов доступа + буферизация диска Flume).   -  person Emam    schedule 08.09.2015


Ответы (1)


Итак, теперь, когда мы прояснили несколько вопросов, вот что на самом деле происходит:

Flume Source - SpoolDir or similar -> Channel -> AvroSink (SparkStreaming)

Flume анализирует файл и преобразует строки этих файлов в события FlumeEvents, которые помещаются в канал. Это происходит максимально быстро, по крайней мере, пока канал не заполнится. Если канал заполнен, источник отключится до тех пор, пока канал снова не примет записи. Вы можете контролировать емкость канала, указав объем памяти и количество записей, которые может хранить канал.

Канал будет читать AvroSink. Если AvroSink не может отправить события из-за сбоя в сети, он прекратит потребление из канала, что приведет к заполнению канала.

В этот момент вы увидите сообщения в файле журнала Flume, указывающие на то, что приемники не могут идти в ногу с источниками, что является ожидаемым поведением, поскольку ваш канал действует как задний буфер для вашего (ненадежного) приемника. Вы не столкнетесь с дублированной обработкой событий, однако вы можете потерять некоторые события из-за сбоев, если вы выберете неустойчивые типы каналов, такие как MemoryChannel.

person Erik Schmiegelow    schedule 08.09.2015
comment
Меня не беспокоит дублирующая обработка моих событий Flume. Просто чтобы уточнить немного больше, мой вопрос заключается в том, как я могу сигнализировать пользовательскому источнику, чтобы он отключился или перестал генерировать какие-либо события. Потому что, насколько я видел, пользовательские источники и приемники полностью отделены друг от друга, и задача flume — ставить необработанные события в очередь где-то до тех пор, пока приемник не вернется в оперативный режим. - person Emam; 09.09.2015
comment
Как я уже писал, механизм отката срабатывает от пропускной способности канала. Если канал заполнен, источники отключатся. Нет никакой функциональности, чтобы сделать это явно. Этот механизм обратного хода на самом деле довольно надежен. - person Erik Schmiegelow; 09.09.2015