Получение пустого набора при чтении данных из kafka-Spark-Streaming

Привет, я новичок в Spark Streaming. Я пытаюсь прочитать XML-файл и отправить его в тему kafka. Вот мой код Kafka, который отправляет данные Kafka-console-consumer.

Код:

package org.apache.kafka.Kafka_Producer;

import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.Properties;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutionException;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

@SuppressWarnings("unused")
public class KafkaProducer { 
   private static String sCurrentLine;
   public static void main(String args[]) throws InterruptedException, ExecutionException{ 
       try (BufferedReader br = new BufferedReader(new FileReader("/Users/sreeharsha/Downloads/123.txt")))
       {
           while ((sCurrentLine = br.readLine()) != null) {
               System.out.println(sCurrentLine);
               kafka(sCurrentLine);
           }
       } catch (FileNotFoundException e) {
           // TODO Auto-generated catch block
           e.printStackTrace();
       } catch (IOException e) {
           // TODO Auto-generated catch block
           e.printStackTrace();}
   }
   public static void kafka(String sCurrentLine)  {
       Properties props = new Properties();
       props.put("metadata.broker.list", "localhost:9092");
       props.put("serializer.class", "kafka.serializer.StringEncoder");
       props.put("partitioner.class","kafka.producer.DefaultPartitioner");
       props.put("request.required.acks", "1");
       ProducerConfig config = new ProducerConfig(props);
       Producer<String, String> producer = new Producer<String, String>(config);
       producer.send(new KeyedMessage<String, String>("sample",sCurrentLine));
       producer.close();
   }
}

я могу получить данные в Kafka-Console-Consumer. На скриншоте ниже вы можете увидеть данные, которые я отправил в тему.

введите здесь описание изображения

Теперь мне нужно передать данные, которые я отправляю kafka-console-consumer, с помощью Spark-Streaming. Вот код.

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

public class SparkStringConsumer {

   public static void main(String[] args) {

       SparkConf conf = new SparkConf()
               .setAppName("kafka-sandbox")
               .setMaster("local[*]");
       JavaSparkContext sc = new JavaSparkContext(conf);
       JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(2000));

       Map<String, String> kafkaParams = new HashMap<>();
       kafkaParams.put("metadata.broker.list", "localhost:9092");
       Set<String> topics = Collections.singleton("sample");

       JavaPairInputDStream<String, String> directKafkaStream = KafkaUtils.createDirectStream(ssc,
       String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topics);
       directKafkaStream.foreachRDD(rdd -> {
       System.out.println("--- New RDD with " + rdd.partitions().size()
           + " partitions and " + rdd.count() + " records");
       rdd.foreach(record -> System.out.println(record._2));
       });
       ssc.start();
       ssc.awaitTermination();
   }
}

Получение пустого набора при отправке моей работы следующим образом:

./spark-submit --class org.apache.spark_streaming.Spark_Kafka_Streaming.SparkStringConsumer --master local[4] Spark_Kafka_Streaming-0.0.1-SNAPSHOT.jar

Ниже вы можете увидеть скриншот получения данных:

введите здесь описание изображения

Используя следующие версии:

Искра - 2.0.0

Zookeeper -3.4.6

Кафка - 0.8.2.1

Любые предложения, пожалуйста,


person Sree Eedupuganti    schedule 19.09.2016    source источник
comment
Где код класса SparkReceiver? Вы разместили класс SparkStringConsumer, в котором вы используете тему как mytopic, а в классе KafkaProducer вы отправляете сообщения по образцу темы. Вы можете проверить?   -  person abaghel    schedule 19.09.2016
comment
Обновлено сейчас можно еще раз пройти?   -  person Sree Eedupuganti    schedule 19.09.2016
comment
Попробуйте создать новые сообщения в кафке   -  person Ayan Guha    schedule 19.09.2016
comment
Получение сообщений в kafka не проблема при чтении пустого набора в искре   -  person Sree Eedupuganti    schedule 19.09.2016
comment
Попробуйте использовать это в своем классе производителя вместо чтения из файла. Это для проверки. Случайный случайный = новый Случайный (); while (правда) {кафка (Test- + random.nextInt (100)); Thread.sleep (500); }} Также проверьте, откуда он разрешает StringDecoder.class в классе SparkStringConsumer. Это должно быть import kafka.serializer.StringDecoder;   -  person abaghel    schedule 19.09.2016
comment
Пробовал с этим тоже снова получить ту же проблему   -  person Sree Eedupuganti    schedule 19.09.2016
comment
@SreeEedupuganti: посмотрите на этот cwiki.apache.org/confluence/display/ КАФКА /?   -  person Shankar    schedule 19.09.2016


Ответы (1)


Наконец, после серфинга в Интернете я нашел это решение.

Не используйте Spark-Submit и SetMaster одновременно.

  • Если вы запускаете код из своей IDE, используйте SetMaster в своем коде.
  • Если вы запускаете банку через «Spark-Submit», не добавляйте setMaster в свой код.

И еще одна вещь: сначала запустите / отправьте свой Spark jar, а затем отправьте данные в Kafka-Console-Consumer

Работает нормально.

person Sree Eedupuganti    schedule 20.09.2016