Apache Beam ReadFromKafka с использованием Python работает во Flink, но опубликованные сообщения не проходят через

У меня в Minikube запущен локальный кластер. Моя работа с конвейером написана на Python и является основным потребителем Kafka. Мой пайплайн выглядит следующим образом:

    def run():
    import apache_beam as beam
    options = PipelineOptions([
        "--runner=FlinkRunner",
        "--flink_version=1.10",
        "--flink_master=localhost:8081",
        "--environment_type=EXTERNAL",
        "--environment_config=localhost:50000",
        "--streaming",
        "--flink_submit_uber_jar"
    ])

    options.view_as(SetupOptions).save_main_session = True
    options.view_as(StandardOptions).streaming = True

    with beam.Pipeline(options=options) as p:
        (p
            | 'Create words' >> ReadFromKafka(
                    topics=['mullerstreamer'],
                    consumer_config={
                            'bootstrap.servers': '192.168.49.1:9092,192.168.49.1:9093',
                            'auto.offset.reset': 'earliest',
                            'enable.auto.commit': 'true',
                            'group.id': 'BEAM-local'
                        }
                   
                )
            | 'print' >> beam.Map(print)
        )

if __name__ == "__main__":
    run()

Бегун Flink не показывает проходящих записей в полученных записях

введите описание изображения здесь

Я упустил что-то базовое?


person Müller    schedule 08.03.2021    source источник
comment
Печатаются ли ваши элементы преобразованием print в конце? Это может помочь определить, неточна ли сама метрика или нет никаких записей.   -  person Daniel Oliveira    schedule 10.03.2021
comment
Это хороший совет, спасибо. Однако я безуспешно писал во внешний источник.   -  person Müller    schedule 10.03.2021
comment
Итак, Beam работает в контейнере, все ли соответствующие порты открыты / связаны для связи с вашим кластером Kafka? Возможно, он не сможет подключиться.   -  person Cubez    schedule 16.03.2021
comment
@Cubez, спасибо, но как мне это проверить? Я пробовал использовать недопустимую строку подключения, а затем получаю ошибку тайм-аута. Поэтому я совершенно уверен, что соединение с брокером устанавливается, когда я помещаю действительную строку подключения.   -  person Müller    schedule 16.03.2021
comment
Я думаю, это связано с этой проблемой: issues.apache.org/jira/browse/BEAM- 11998 У меня тоже такая проблема. Я могу заставить его работать с Dataflow runner, но портативные бегуны и direct runner не работают. Если вы хотите быть уверенным, просто отмените конвейер, и вы увидите свои события на выходе. Или установите параметр max_num_records, и он тоже должен работать, но тогда он больше не будет потоковой передачи :(   -  person Silberfab    schedule 23.06.2021


Ответы (2)


--environment_type=EXTERNAL означает, что вы запускаете воркеры вручную, и в первую очередь для внутреннего тестирования. Работает ли это, если вы вообще не укажете environment_type / config?

person robertwb    schedule 17.03.2021

def run(bootstrap_servers, topic, pipeline_args):
  bootstrap_servers = 'localhost:9092'
  topic = 'wordcount'

  pipeline_args = pipeline_args.append('--flink_submit_uber_jar')
  pipeline_options = PipelineOptions([
          "--runner=FlinkRunner",
          "--flink_master=localhost:8081",
          "--flink_version=1.12",
          pipeline_args
      ],
      save_main_session=True, streaming=True)

  with beam.Pipeline(options=pipeline_options) as pipeline:
    _ = (
        pipeline
        | ReadFromKafka(
            consumer_config={'bootstrap.servers': bootstrap_servers},
            topics=[topic])
        | beam.FlatMap(lambda kv: log_ride(kv[1])))

Я столкнулся с другой проблемой с последними версиями apache Beam 2.30.0, Flink 1.12.4

2021/06/10 17:39:42 Initializing python harness: /opt/apache/beam/boot --id=1-2 --provision_endpoint=localhost:42353
2021/06/10 17:39:50 Failed to retrieve staged files: failed to retrieve /tmp/staged in 3 attempts: failed to retrieve chunk for /tmp/staged/pickled_main_session
        caused by:
rpc error: code = Unknown desc = ; failed to retrieve chunk for /tmp/staged/pickled_main_session
        caused by:
rpc error: code = Unknown desc = ; failed to retrieve chunk for /tmp/staged/pickled_main_session
        caused by:
rpc error: code = Unknown desc = ; failed to retrieve chunk for /tmp/staged/pickled_main_session
        caused by:
rpc error: code = Unknown desc =
2021-06-10 17:39:53,076 WARN  org.apache.flink.runtime.taskmanager.Task                    [] - [3]ReadFromKafka(beam:external:java:kafka:read:v1)/{KafkaIO.Read, Remove Kafka Metadata} -> [1]FlatMap(<lambda at kafka-taxi.py:88>) (1/1)#0 (9d941b13ae9f28fd1460bc242b7f6cc9) switched from RUNNING to FAILED.
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException: java.lang.IllegalStateException: No container running for id d727ca3c0690d949f9ed1da9c3435b3ab3af70b6b422dc82905eed2f74ec7a15
person Ravi D. Borse    schedule 10.06.2021