Кафка java.io.EOFException — NetworkReceive.readFromReadableChannel

Я пытаюсь подключиться к IBM Message Hub из Apache Spark 2.2.1 Structured Streaming.

Код подключения довольно простой:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder.appName("StreamingRetailTransactions").getOrCreate()

import spark.implicits._

val df = spark.readStream.
                format("kafka").
                option("kafka.bootstrap.servers", "kafka03-prod02.messagehub.services.eu-gb.bluemix.net:9093,kafka04-prod02.messagehub.services.eu-gb.bluemix.net:9093,kafka01-prod02.messagehub.services.eu-gb.bluemix.net:9093,kafka02-prod02.messagehub.services.eu-gb.bluemix.net:9093,kafka05-prod02.messagehub.services.eu-gb.bluemix.net:9093").
                option("subscribe", "transactions_load").
                option("security.protocol", "SASL_SSL").
                option("sasl.mechanism", "PLAIN").
                option("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"*****\" password=\"*****\";").
                option("ssl.protocol", "TLSv1.2").
                option("ssl.enabled.protocols", "TLSv1.2").
                option("ssl.endpoint.identification.algorithm", "HTTPS").
                option("auto.offset.reset","earliest").
                option("group.id", System.currentTimeMillis).
                load()

val query = df.writeStream.format("console").start()

Я запускаю искровую оболочку с помощью:

~/spark-2.2.1-bin-hadoop2.7$ ./bin/spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0

Однако я получаю ошибку отключения:

scala> 18/01/09 08:34:17 WARN NetworkClient: Bootstrap broker kafka02-prod02.messagehub.services.eu-gb.bluemix.net:9093 disconnected
18/01/09 08:34:17 WARN NetworkClient: Bootstrap broker kafka03-prod02.messagehub.services.eu-gb.bluemix.net:9093 disconnected
18/01/09 08:34:17 WARN NetworkClient: Bootstrap broker kafka01-prod02.messagehub.services.eu-gb.bluemix.net:9093 disconnected
<<repeats forever>>

Я увеличил вывод отладки с помощью sc.setLogLevel("DEBUG") и получил:

<<log output ommitted for brevity>>
18/01/09 08:38:28 DEBUG SessionState: SessionState user: null
18/01/09 08:38:28 DEBUG SessionState: HDFS root scratch dir: /tmp/hive with schema null, permission: rwx-wx-wx
18/01/09 08:38:28 INFO SessionState: Created local directory: /tmp/2b4557ce-dd17-46d1-9ab0-f9a36fd750f9_resources
18/01/09 08:38:28 INFO SessionState: Created HDFS directory: /tmp/hive/snowch/2b4557ce-dd17-46d1-9ab0-f9a36fd750f9
18/01/09 08:38:28 INFO SessionState: Created local directory: /tmp/snowch/2b4557ce-dd17-46d1-9ab0-f9a36fd750f9
18/01/09 08:38:28 INFO SessionState: Created HDFS directory: /tmp/hive/snowch/2b4557ce-dd17-46d1-9ab0-f9a36fd750f9/_tmp_space.db
18/01/09 08:38:28 INFO HiveClientImpl: Warehouse location for Hive client (version 1.2.1) is file:/home/snowch/spark-2.2.1-bin-hadoop2.7/spark-warehouse
18/01/09 08:38:28 DEBUG SessionState: Session is using authorization class class org.apache.hadoop.hive.ql.security.authorization.DefaultHiveAuthorizationProvider
18/01/09 08:38:28 DEBUG StateStoreCoordinatorRef: Retrieved existing StateStoreCoordinator endpoint
18/01/09 08:38:28 DEBUG StreamExecution: Starting Trigger Calculation
18/01/09 08:38:28 INFO StreamExecution: Starting new streaming query.
18/01/09 08:38:28 DEBUG UserGroupInformation: PrivilegedAction as:snowch (auth:SIMPLE) from:org.apache.hadoop.fs.FileContext.getAbstractFileSystem(FileContext.java:331)
18/01/09 08:38:28 DEBUG KafkaSource$$anon$1: Unable to find batch /tmp/temporary-fb3e6e1a-fbbe-4098-991c-0b29f63ecade/sources/0/0
18/01/09 08:38:28 DEBUG AbstractCoordinator: Sending coordinator request for group spark-kafka-source-9a14bb54-8f1b-47db-8497-19c083128496--998588290-driver-0 to broker kafka05-prod02.messagehub.services.eu-gb.bluemix.net:9093 (id: -5 rack: null)
18/01/09 08:38:28 DEBUG NetworkClient: Initiating connection to node -5 at kafka05-prod02.messagehub.services.eu-gb.bluemix.net:9093.
18/01/09 08:38:28 DEBUG NetworkClient: Initialize connection to node -3 for sending metadata request
18/01/09 08:38:28 DEBUG NetworkClient: Initiating connection to node -3 at kafka01-prod02.messagehub.services.eu-gb.bluemix.net:9093.
18/01/09 08:38:28 DEBUG Metrics: Added sensor with name node--5.bytes-sent
18/01/09 08:38:28 DEBUG Metrics: Added sensor with name node--5.bytes-received
18/01/09 08:38:28 DEBUG Metrics: Added sensor with name node--5.latency
18/01/09 08:38:28 DEBUG NetworkClient: Completed connection to node -5
18/01/09 08:38:28 DEBUG Metrics: Added sensor with name node--3.bytes-sent
18/01/09 08:38:28 DEBUG Metrics: Added sensor with name node--3.bytes-received
18/01/09 08:38:28 DEBUG Metrics: Added sensor with name node--3.latency
18/01/09 08:38:28 DEBUG NetworkClient: Completed connection to node -3
18/01/09 08:38:28 DEBUG NetworkClient: Sending metadata request {topics=[transactions_load]} to node -5
18/01/09 08:38:28 DEBUG Selector: Connection with kafka05-prod02.messagehub.services.eu-gb.bluemix.net/159.8.179.153 disconnected
java.io.EOFException
    at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:83)
    at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
    at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:154)
    at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:135)
    at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:323)
    at org.apache.kafka.common.network.Selector.poll(Selector.java:283)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:179)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:974)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:938)
    at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$fetchLatestOffsets$1$$anonfun$apply$9.apply(KafkaOffsetReader.scala:174)
    at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$fetchLatestOffsets$1$$anonfun$apply$9.apply(KafkaOffsetReader.scala:172)
    at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$org$apache$spark$sql$kafka010$KafkaOffsetReader$$withRetriesWithoutInterrupt$1.apply$mcV$sp(KafkaOffsetReader.scala:263)
    at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$org$apache$spark$sql$kafka010$KafkaOffsetReader$$withRetriesWithoutInterrupt$1.apply(KafkaOffsetReader.scala:262)
    at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$org$apache$spark$sql$kafka010$KafkaOffsetReader$$withRetriesWithoutInterrupt$1.apply(KafkaOffsetReader.scala:262)
    at org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:85)
    at org.apache.spark.sql.kafka010.KafkaOffsetReader.org$apache$spark$sql$kafka010$KafkaOffsetReader$$withRetriesWithoutInterrupt(KafkaOffsetReader.scala:261)
    at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$fetchLatestOffsets$1.apply(KafkaOffsetReader.scala:172)
    at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$fetchLatestOffsets$1.apply(KafkaOffsetReader.scala:172)
    at org.apache.spark.sql.kafka010.KafkaOffsetReader.runUninterruptibly(KafkaOffsetReader.scala:230)
    at org.apache.spark.sql.kafka010.KafkaOffsetReader.fetchLatestOffsets(KafkaOffsetReader.scala:171)
    at org.apache.spark.sql.kafka010.KafkaSource$$anonfun$initialPartitionOffsets$1.apply(KafkaSource.scala:132)
    at org.apache.spark.sql.kafka010.KafkaSource$$anonfun$initialPartitionOffsets$1.apply(KafkaSource.scala:129)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.sql.kafka010.KafkaSource.initialPartitionOffsets$lzycompute(KafkaSource.scala:129)
    at org.apache.spark.sql.kafka010.KafkaSource.initialPartitionOffsets(KafkaSource.scala:97)
    at org.apache.spark.sql.kafka010.KafkaSource.getOffset(KafkaSource.scala:163)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$10$$anonfun$apply$6.apply(StreamExecution.scala:521)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$10$$anonfun$apply$6.apply(StreamExecution.scala:521)
    at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:279)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$10.apply(StreamExecution.scala:520)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$10.apply(StreamExecution.scala:518)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.AbstractTraversable.map(Traversable.scala:104)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$constructNextBatch(StreamExecution.scala:518)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$populateStartOffsets(StreamExecution.scala:492)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(StreamExecution.scala:297)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$apply$mcZ$sp$1.apply(StreamExecution.scala:294)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$apply$mcZ$sp$1.apply(StreamExecution.scala:294)
    at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:279)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1.apply$mcZ$sp(StreamExecution.scala:294)
    at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:290)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:206)
18/01/09 08:38:28 DEBUG NetworkClient: Node -5 disconnected.
18/01/09 08:38:28 WARN NetworkClient: Bootstrap broker kafka05-prod02.messagehub.services.eu-gb.bluemix.net:9093 disconnected
18/01/09 08:38:28 DEBUG ConsumerNetworkClient: Cancelled GROUP_COORDINATOR request ClientRequest(expectResponse=true, callback=org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler@4eacac4e, request=RequestSend(header={api_key=10,api_version=0,correlation_id=0,client_id=consumer-1}, body={group_id=spark-kafka-source-9a14bb54-8f1b-47db-8497-19c083128496--998588290-driver-0}), createdTimeMs=1515487108479, sendTimeMs=1515487108598) with correlation id 0 due to node -5 being disconnected
18/01/09 08:38:28 DEBUG NetworkClient: Sending metadata request {topics=[transactions_load]} to node -3
18/01/09 08:38:28 DEBUG Selector: Connection with kafka01-prod02.messagehub.services.eu-gb.bluemix.net/159.8.179.149 disconnected
java.io.EOFException
    at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:83)
    at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
    <<repeated>>

Я видел следующие похожие вопросы:

Я понимаю, что некоторые выходные данные являются просто «шумом», однако мое приложение потоковой передачи искры, похоже, не получает никаких данных. Я подключился к консольному клиенту и могу видеть данные.


Обновление 1. Я пытался настроить JaaS, но по-прежнему получаю ту же ошибку. Проблема может заключаться в том, что код JaaS должен выполняться на каждом рабочем узле, но не выполняется на них.

sc.setLogLevel("DEBUG")

def jaasClientConfig(username: String, password: String): Unit = {

    import javax.security.auth.login.AppConfigurationEntry
    import javax.security.auth.login.Configuration
    import javax.security.auth.login.LoginException

    import scala.collection.JavaConversions._

    System.setProperty("java.security.auth.login.config", "")

    Configuration.setConfiguration(new Configuration() {
        def getAppConfigurationEntry(name: String): Array[AppConfigurationEntry] = {
            val idMap = Map(
                "serviceName" -> "kafka",
                "username" -> username,
                "password" -> password
                )
            val ace = new AppConfigurationEntry(
               "org.apache.kafka.common.security.plain.PlainLoginModule",
               AppConfigurationEntry.LoginModuleControlFlag.REQUIRED,
               idMap
            )
            return Array(ace)
        }
    })
}

def startSparkStreaming(): Unit = {

    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.SparkSession

    val spark = SparkSession.builder.appName("StreamingRetailTransactions").getOrCreate()

    import spark.implicits._

    val df = spark.readStream.
                    format("kafka").
                    option("kafka.bootstrap.servers", "kafka03-prod02.messagehub.services.eu-gb.bluemix.net:9093,kafka04-prod02.messagehub.services.eu-gb.bluemix.net:9093,kafka01-prod02.messagehub.services.eu-gb.bluemix.net:9093,kafka02-prod02.messagehub.services.eu-gb.bluemix.net:9093,kafka05-prod02.messagehub.services.eu-gb.bluemix.net:9093").
                    option("subscribe", "transactions_load").
                    option("security.protocol", "SASL_SSL").
                    option("sasl.mechanism", "PLAIN").
                    option("ssl.protocol", "TLSv1.2").
                    option("ssl.enabled.protocols", "TLSv1.2").
                    option("ssl.endpoint.identification.algorithm", "HTTPS").
                    option("auto.offset.reset","earliest").
                    option("group.id", System.currentTimeMillis).
                    load()

    val query = df.writeStream.format("console").start()
}

jaasClientConfig("****","****")

startSparkStreaming()

Обновление 2

Я также пробовал с jaas.conf:

~/spark-2.2.1-bin-hadoop2.7$ ./bin/spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0 --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.conf=jaas.conf" --files "jaas.conf"

и ...

KafkaClient {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="*****"
    password="*****";
    }

Все та же проблема...


person Chris Snow    schedule 09.01.2018    source источник


Ответы (3)


Сначала мне нужно было запустить spark-shell с --conf, который указывает исполнителю и драйверу на мой jaas.conf:

   ./bin/spark-shell --master local[1] \
        --jars external/kafka-0-10-sql/target/spark-sql-kafka-0-10_2.11-2.2.2-SNAPSHOT.jar,external/kafka-0-10-assembly/target/spark-streaming-kafka-0-10-assembly_2.11-2.2.2-SNAPSHOT.jar \
       --conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=/path/to/jaas.conf" \
       --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=/path/to/jaas.conf" \
       --num-executors 1  --executor-cores 1 

Затем мне пришлось добавить некоторые параметры kafka:

val df = spark.readStream.
                format("kafka").
                option("kafka.bootstrap.servers", "kafka03-prod02.messagehub.services.eu-gb.bluemix.net:9093,kafka04-prod02.messagehub.services.eu-gb.bluemix.net:9093,kafka01-prod02.messagehub.services.eu-gb.bluemix.net:9093,kafka02-prod02.messagehub.services.eu-gb.bluemix.net:9093,kafka05-prod02.messagehub.services.eu-gb.bluemix.net:9093").
                option("subscribe", "transactions_load").
                option("kafka.security.protocol", "SASL_SSL").
                option("kafka.sasl.mechanism", "PLAIN").
                option("kafka.ssl.protocol", "TLSv1.2").
                option("kafka.ssl.enabled.protocols", "TLSv1.2").
                load()

Обратите внимание, что параметры kafka должны иметь префикс kafka., например:

  • security.protocol => kafka.security.protocol

Эти изменения решили проблему с подключением для меня.

person Chris Snow    schedule 18.01.2018

Это похоже на неудачную аутентификацию. В текущей MH-версии Kafka сервер просто закрывает соединение при сбое аутентификации.

person Edoardo Comar    schedule 09.01.2018
comment
Можно ли получить дополнительную информацию об ошибке аутентификации? Мои имя пользователя и пароль в моей конфигурации введены правильно, поэтому, возможно, есть другая проблема, например, конфигурация JaaS. - person Chris Snow; 09.01.2018
comment
Я бы хотел, чтобы и клиент, и сервер Kafka были на версии 1.0 — это помогло бы обнаружить проблему благодаря улучшениям, сделанным с помощью issues.apache.org/jira/browse/KAFKA-4764 - person Edoardo Comar; 09.01.2018

параметр «sasl.jaas.config» используется в клиенте Kafka начиная с версии 0.10.2.

клиент Kafka, используемый Spark 2.2.1, имеет версию 0.10.0, поэтому аутентификация не удалась, как и предполагалось.

вы можете использовать системное свойство java.security.auth.login.config, чтобы указать файл jaas

В качестве альтернативы вы можете программно установить учетные данные для клиента с помощью фрагмента, подобного этому

public static void jaasClientConfig(final String username, final String password) throws Exception {
    System.setProperty("java.security.auth.login.config", "");

    Configuration.setConfiguration(new Configuration() {
        public AppConfigurationEntry[] getAppConfigurationEntry(String name) {
            HashMap<String, String> idMap = new HashMap<>();
            idMap.put("serviceName", "kafka"); // Seems to be optional
            idMap.put("username", username);
            idMap.put("password", password);

            AppConfigurationEntry ace = new AppConfigurationEntry("org.apache.kafka.common.security.plain.PlainLoginModule",
                    AppConfigurationEntry.LoginModuleControlFlag.REQUIRED, idMap);
            AppConfigurationEntry[] entry = { ace };
            return entry;
        }
    });
}
person Edoardo Comar    schedule 09.01.2018