Как проверить, была ли отправлена ​​новая запись в заданный период времени с помощью kafka и faust

Я использую тестовую установку, включающую конфлюентную платформу (докер), и обрабатываю записи со следующей информацией: идентификатор датчика, отметка времени, значение. Используя faust robinhood (похожий на Kafka Streams, но в python), я пытаюсь сделать следующее:

Всякий раз, когда есть новая запись для датчика, должен быть «таймер», и если новая запись для этого идентификатора датчика не получена в течение заданного времени, должна быть ошибка, указывающая на возможный сбой для этого датчика/машины.

Я пытался использовать time.sleep(), но происходит то, что он просто засыпает на 10 секунд, а затем обрабатывает следующую запись.

Возможно ли вообще сделать что-то подобное с настройкой, которую я использую?


person LukasM    schedule 25.02.2019    source источник
comment
Поскольку мне не удалось решить проблему описанным способом, я применил обходной путь, используя словарь с идентификаторами датчиков в качестве ключей и самыми последними временными метками в качестве значений. Этот словарь проверяется на временные метки старше 10 секунд всякий раз, когда принимается ЛЮБАЯ запись, что обычно происходит более одного раза в секунду. Это, вероятно, не лучшее решение, но на данный момент это единственное, что я мог придумать.   -  person LukasM    schedule 28.02.2019


Ответы (1)