Есть ли какой-либо Java API, чтобы знать, когда топология готова для чтения первого сообщения от Spout?

Наша топология Apache Storm прослушивает сообщения от Kafka с помощью KafkaSpout и после выполнения большого количества сопоставлений/уменьшения/обогащения/агрегации и т. д. и т. д. и т. д., наконец, вставляет данные в Cassandra. Есть еще один вход kafka, где мы получаем пользовательские запросы на данные, если топология находит ответ, а затем отправляет его в третью тему kafka. Теперь мы хотим написать тест E2E с помощью Junit, в котором мы можем напрямую программно вставлять данные в топологию, а затем, вставляя сообщение пользовательского запроса, мы можем утверждать в третьем пункте, что ответ, полученный на наш запрос, правильный.

Чтобы добиться этого, мы решили запустить EmbeddedKafka и CassandraUnit, а затем заменить ими настоящие Kafka и Cassandra, а затем мы можем начать топологию в контексте этого единственного теста Junit.

Прежде чем мы начнем настоящий тест, мы создадим топологию и отправим ее в LocalCluster. Он запускает топологию в другом потоке, выходит из Before и начинает выполнять наш тест. До этого времени топология не готова, потому что требуется некоторое время, чтобы быть готовой к обработке. Есть ли какой-либо java API, который может сообщить нам, когда топология готова к обработке (означает готовность прочитать первое сообщение от Spout)?


person Prashant Bhardwaj    schedule 23.07.2019    source источник


Ответы (1)


Это зависит от того, что вы имеете в виду, когда говорите «готово к обработке».

Если вы включите симуляцию времени для вашего LocalCluster, вы можете использовать Time.advanceClusterTime для продвижения времени по шагам. Если вы вызовете этот метод после отправки топологии, он вернется только после того, как кластер будет в основном бездействовать. См., например. https://github.com/apache/storm/blob/8f49e06998abb4dfc50f51d78b6784ebd04844fb/storm-core/test/jvm/org/apache/storm/integration/TopologyIntegrationTest.java#L233.

Если вы хотите заменить носики заглушками (например, FixedTupleSpout), вы можете использовать Testing.completeTopology, чтобы дождаться, пока топология завершит обработку всех кортежей, которые вы настроили заглушку для отправки.

Другой способ дождаться, пока топология обработает некоторые кортежи, заключается в том, что вы помещаете несколько сообщений в Kafka, запускаете свою топологию, а затем тестируете поток, опрашивая Cassandra, чтобы узнать, прошли ли ожидаемые сообщения. Таким образом, вы можете установить тайм-аут в своем потоке тестирования, и тест провалится, если условие не будет выполнено в течение некоторого количества секунд. Вы можете использовать такую ​​утилиту, как Awaitility, для этого https://github.com/awaitility/awaitility или просто напишите свою собственную логику опроса.

Если под "готов к обработке" вы подразумеваете что-то другое, опишите подробнее, что вы имеете в виду.

person Stig Rohde Døssing    schedule 24.07.2019
comment
«Готово к обработке» означает готовность прочитать первое сообщение от Spout. - person Prashant Bhardwaj; 30.07.2019
comment
Есть ли у нас блог или книга по тестированию Storm? Основываясь на ваших ответах, теперь я знаю, что в тестовом пакете есть много тестовых вспомогательных классов, а Testing.java полон многих вещей, которые можно использовать для тестирования топологии Storm. Есть ли какой-либо онлайн-ресурс, который может объяснить использование всех присутствующих классов? в тестовом пакете + Testing.java? - person Prashant Bhardwaj; 30.07.2019
comment
Я так не думаю. Различные люди писали об этом на протяжении многих лет, я думаю github.com/xumingming/storm-lib/blob/master/src/jvm/storm/ ближе всего к полному обзору. Было бы неплохо добавить раздел об этом на веб-сайт github. .com/apache/storm/blob/master/ - person Stig Rohde Døssing; 30.07.2019
comment
любая причина для понижения его? - person Prashant Bhardwaj; 04.08.2019
comment
Понижение чего? - person Stig Rohde Døssing; 04.08.2019
comment
Я имею в виду, почему этот вопрос был отклонен? Я начал писать документ о сквозном тестировании топологии Storm. Нужна ваша помощь в этом. Вышлю ссылку для ознакомления. После вашего утверждения он может стать частью документации Storm. - person Prashant Bhardwaj; 05.08.2019
comment
Я не знаю, почему за это проголосовали. Звучит хорошо, если возможно, откройте PR против github.com/apache/storm, таким образом другие сопровождающие тоже можно пересмотреть. - person Stig Rohde Døssing; 05.08.2019
comment
Спасибо за ваши комментарии и поддержку. Я разветвил репозиторий Storm на github. Создал модуль storm-cassandra-examples под пример. Создайте топологию для обработки событий, связанных с электронной коммерцией, считывайте их из Kafka и сохраняйте в Cassandra в необработанном виде, а также в некоторых агрегированных данных. Потом будет впритык тестировать эту топологию. Затем напишет документ, связанный со сквозным тестированием топологии storm, и даст ссылку из модуля storm-cassandra-examples. Это звучит хорошо? Есть ли какой-либо другой канал связи, чтобы поговорить с вами, кроме этого stackoverflow? - person Prashant Bhardwaj; 09.08.2019
comment
Звучит здорово. У нас уже есть несколько примеров сквозного тестирования (например, различные тесты с пометкой @IntegrationTest), но было бы неплохо иметь пример и для Cassandra. Если вы хотите предложить изменения или получить отзывы, рассмотрите возможность использования списка рассылки storm-dev (storm.apache .org/getting-help.html). Я читал этот список, как и многие другие разработчики Storm. - person Stig Rohde Døssing; 09.08.2019
comment
При написании примера для Storm, Cassandra и kafka обнаружил, что некоторые классы используют затененные классы, например, класс ConfigUtils использует org.apache.storm.shade.com.google.common.collect.Maps. В моей настройке InteliiJ ConfigUtils не может найти этот импорт. Я запустил чистую установку mvn, которая была выполнена успешно, и увидела, что была создана банка apache-storm-shaded. Даже я заглянул внутрь этой банки, и все упомянутые классы являются частью этой банки. как исправить эту проблему, потому что я не могу запустить свои тесты, они не работают с этими ошибками компиляции в других классах? - person Prashant Bhardwaj; 28.08.2019
comment
Где-то в IntelliJ есть ошибка, из-за которой не удается найти затененные классы. Мы обошли это с помощью профиля Maven, вы сможете включить его в настройках IntelliJ ="nofollow noreferrer">github.com/apache/storm/blob/ - person Stig Rohde Døssing; 28.08.2019
comment
Я активировал профиль IntelliJ maven. Тем не менее, это не помогло. Я что-то упустил? [Отправлено электронное письмо на адрес электронной почты группы разработчиков со скриншотом] - person Prashant Bhardwaj; 29.08.2019
comment
Сэр, любая помощь по этому вопросу, связанному с профилем. Я закончил с изменениями кода, не могу проверить свои изменения. Если эта проблема устранена, я могу протестировать и создать запрос на извлечение. - person Prashant Bhardwaj; 26.09.2019
comment
Итан разместил ссылку на другой обходной путь здесь mail-archives.apache. org/mod_mbox/storm-dev/201908.mbox/browser. Пожалуйста, попробуйте это. - person Stig Rohde Døssing; 26.09.2019
comment
Спасибо за вашу помощь. Я видел предоставленный URL, у него очень длинный почтовый след. Я прочитал каждое письмо, отправленное Итеном, но, к сожалению, не смог найти ответ, который искал. Я знаю, что становлюсь слишком большим бременем для вас, однако я думаю, что это просто первоначальный сбой, как только я пройду через это, я сделаю все возможное, чтобы совершить большое путешествие со Штормом. Я попрошу, если вы можете предоставить мне ссылку, о которой вы говорили. - person Prashant Bhardwaj; 27.09.2019
comment
Ой, извините. Я имел в виду ссылку на пост, а не на весь архив. mail-archives.apache.org/mod_mbox/storm-dev/201908.mbox/ - person Stig Rohde Døssing; 27.09.2019