Запись в ConfluentCloud из Apache Beam (поток данных GCP)

Я пытаюсь написать для записи в Confluent Cloud / Kafka из потока данных (Apache Beam), используя следующее:

kafkaKnowledgeGraphKVRecords.apply("Write to Kafka", KafkaIO.<String, String>write()
                                .withBootstrapServers("<mybootstrapserver>.confluent.cloud:9092")
                                .withTopic("testtopic").withKeySerializer(StringSerializer.class)
                                .withProducerConfigUpdates(props).withValueSerializer(StringSerializer.class));

где Map<String, Object> props = new HashMap<>(); (т.е. пока пусто)

В логах получаю: send failed : 'Topic testtopic not present in metadata after 60000 ms.'

Тема действительно существует в этом кластере, поэтому я предполагаю, что существует проблема с входом в систему, что имеет смысл, поскольку я не мог найти способ передать APIKey.

Я пробовал различные комбинации, чтобы передать APIKey / Secret, который у меня есть из Confluent Cloud, для аутентификации с props выше, но мне не удалось найти рабочую настройку.


person Pinguin Dirk    schedule 14.01.2020    source источник
comment
Я пробовал различные комбинации для передачи APIKey / Secret - ›не могли бы вы обновить свой вопрос, включив их, пожалуйста   -  person Robin Moffatt    schedule 14.01.2020
comment
stackoverflow.com/questions/53939658/ показывает пример подключения к Confluent Cloud из Beam - это как потребитель, поэтому вам нужно будет изменить его для соответствующей конфигурации производителя, но свойства должны быть такими же.   -  person Robin Moffatt    schedule 14.01.2020
comment
спасибо @RobinMoffatt - я пробовал параметры, аналогичные тем, которые указаны в другом ответе - возможно, я что-то перепутал. Я попробую завтра со связанным ответом и отчитаюсь здесь с обратной связью. Уже спасибо!   -  person Pinguin Dirk    schedule 14.01.2020
comment
@RobinMoffatt еще раз спасибо за указатели, я нашел решение, см. Ниже   -  person Pinguin Dirk    schedule 17.01.2020


Ответы (1)


Нашел решение, благодаря указателям в комментариях @RobinMoffatt под вопросом

Вот настройки, которые у меня есть сейчас:

Map<String, Object> props = new HashMap<>()

props.put("ssl.endpoint.identification.algorithm", "https");
props.put("sasl.mechanism", "PLAIN");
props.put("request.timeout.ms", 20000);
props.put("retry.backoff.ms", 500);
props.put("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<APIKEY>\" password=\"<SECRET>\";");
props.put("security.protocol", "SASL_SSL");

kafkaKnowledgeGraphKVRecords.apply("Write to Kafka-TESTTOPIC", KafkaIO.<String, String>write()
    .withBootstrapServers("<CLUSTER>.confluent.cloud:9092")
    .withTopic("test").withKeySerializer(StringSerializer.class)
    .withProducerConfigUpdates(props).withValueSerializer(StringSerializer.class));

Ключевая строка, которую я ошибся, - это sasl.jaas.config (обратите внимание на ; в конце!)

person Pinguin Dirk    schedule 17.01.2020