Как заставить Kafka Source повторно подключаться при перезапуске Kafka

Я создаю Source записей потребителей с помощью Reactive Kafka следующим образом:

val settings = ConsumerSettings(system, keyDeserializer, valueDeserializer)
.withBootstrapServers(bootstrapServers)
.withGroupId(groupName)
// what offset to begin with if there's no offset for this group
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
// do we want to automatically commit offsets?
.withProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
// auto-commit offsets every 1 minute, in the background
.withProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000")
// reconnect every 1 second, when disconnected
.withProperty(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG, "1000")
// every session lasts 30 seconds
.withProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000")
// send heartbeat every 10 seconds i.e. 1/3 * session.timeout.ms
.withProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "10000")
// how many records to fetch in each poll( )
.withProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "100")

Consumer.atMostOnceSource(settings, Subscriptions.topics(topic)).map(_.value)  

На моем локальном компьютере запущен 1 экземпляр Kafka. Я вставляю значения в тему через производителя консоли и вижу их распечатанными. Затем я убиваю Kafka и перезапускаю его, чтобы проверить, подключается ли источник заново.

Вот как работают мои журналы:

* Connection with /192.168.0.1 disconnected
    java.net.ConnectException: Connection refused
* Give up sending metadata request since no node is available
* Consumer interrupted with WakeupException after timeout. Message: null. Current value of akka.kafka.consumer.wakeup-timeout is 3000 milliseconds
* Resuming partition test-events-0
* Error while fetching metadata with correlation id 139 : {test-events=INVALID_REPLICATION_FACTOR}
* Sending metadata request (type=MetadataRequest, topics=test-events) to node 0
* Sending GroupCoordinator request for group mytestgroup to broker 192.168.0.1:9092 (id: 0 rack: null)
* Consumer interrupted with WakeupException after timeout. Message: null. Current value of akka.kafka.consumer.wakeup-timeout is 3000 milliseconds
* Received GroupCoordinator response ClientResponse(receivedTimeMs=1491797713078, latencyMs=70, disconnected=false, requestHeader={api_key=10,api_version=0,correlation_id=166,client_id=consumer-1}, responseBody={error_code=15,coordinator={node_id=-1,host=,port=-1}}) for group mytestgroup
* Error while fetching metadata with correlation id 169 : {test-events=INVALID_REPLICATION_FACTOR}
* Received GroupCoordinator response ClientResponse(receivedTimeMs=1491797716169, latencyMs=72, disconnected=false, requestHeader={api_key=10,api_version=0,correlation_id=196,client_id=consumer-1}, responseBody={error_code=15,coordinator={node_id=-1,host=,port=-1}}) for group mytestgroup
09:45:16.169 [testsystem-akka.kafka.default-dispatcher-16] DEBUG o.a.k.c.c.i.AbstractCoordinator - Group coordinator lookup for group mytestgroup failed: The group coordinator is not available.
09:45:16.169 [testsystem-akka.kafka.default-dispatcher-16] DEBUG o.a.k.c.c.i.AbstractCoordinator - Coordinator discovery failed for group mytestgroup, refreshing metadata
* Initiating API versions fetch from node 2147483647
* Offset commit for group mytestgroup failed: This is not the correct coordinator for this group.
* Marking the coordinator 192.168.43.25:9092 (id: 2147483647 rack: null) dead for group mytestgroup
* The Kafka consumer has closed.  

Как мне убедиться, что этот источник повторно подключается и продолжает обработку журналов?


person An SO User    schedule 10.04.2017    source источник


Ответы (1)


Я думаю, вам нужно иметь как минимум 2 брокера. Если один выйдет из строя, другой сможет выполнить задание, а вы можете перезапустить другой.

person ImbaBalboa    schedule 10.04.2017