SparkStreaming: ошибка в fileStream ()

Я пытаюсь реализовать приложение потоковой передачи искр в scala. Я хочу использовать метод fileStream () для обработки вновь поступивших файлов, а также старых файлов, находящихся в каталоге hadoop.

Я следил за реализацией fileStream () из следующих двух потоков из stackoverflow как:

Я использую fileStream () следующим образом:

val linesRDD = ssc.fileStream[LongWritable, Text, TextInputFormat](inputDirectory, (t: org.apache.hadoop.fs.Path) => true, false).map(_._2.toString)

Но я получаю следующее сообщение об ошибке:

type arguments [org.apache.hadoop.io.LongWritable,org.apache.hadoop.io.Text,
org.apache.hadoop.mapred.TextInputFormat] conform to the bounds of none of the overloaded alternatives of value fileStream: [K, V, F <: org.apache.hadoop.mapreduce.InputFormat[K,V]](directory: String, filter: org.apache.hadoop.fs.Path ⇒ Boolean, newFilesOnly: Boolean, conf: org.apache.hadoop.conf.Configuration)(implicit evidence$12: scala.reflect.ClassTag[K], implicit evidence$13: scala.reflect.ClassTag[V], implicit evidence$14: scala.reflect.ClassTag[F])
org.apache.spark.streaming.dstream.InputDStream[(K, V)] <and> 
[K, V, F <: org.apache.hadoop.mapreduce.InputFormat[K,V]](directory:
String, filter: org.apache.hadoop.fs.Path ⇒ Boolean, newFilesOnly: Boolean)(implicit evidence$9: scala.reflect.ClassTag[K], implicit evidence$10: scala.reflect.ClassTag[V], 
implicit evidence$11: scala.reflect.ClassTag[F])
org.apache.spark.streaming.dstream.InputDStream[(K, V)] <and> [K, V, F <: org.apache.hadoop.mapreduce.InputFormat[K,V]](directory: String)(implicit evidence$6: scala.reflect.ClassTag[K], implicit evidence$7: scala.reflect.ClassTag[V], implicit evidence$8: scala.reflect.ClassTag[F])
org.apache.spark.streaming.dstream.InputDStream[(K, V)]

wrong number of type parameters for overloaded method value fileStream with alternatives: 
[K, V, F <: org.apache.hadoop.mapreduce.InputFormat[K,V]](directory: String, filter: org.apache.hadoop.fs.Path ⇒ Boolean, newFilesOnly: Boolean, conf: org.apache.hadoop.conf.Configuration)(implicit evidence$12: scala.reflect.ClassTag[K], implicit evidence$13: scala.reflect.ClassTag[V], implicit evidence$14: scala.reflect.ClassTag[F])
org.apache.spark.streaming.dstream.InputDStream[(K, V)] <and> [K, V, F <:     org.apache.hadoop.mapreduce.InputFormat[K,V]](directory: String, filter: org.apache.hadoop.fs.Path ⇒ Boolean, newFilesOnly: Boolean)(implicit evidence$9: scala.reflect.ClassTag[K], implicit evidence$10: scala.reflect.ClassTag[V], implicit evidence$11: scala.reflect.ClassTag[F])
org.apache.spark.streaming.dstream.InputDStream[(K, V)] <and> 
[K, V, F <: org.apache.hadoop.mapreduce.InputFormat[K,V]](directory: String)(implicit evidence$6: scala.reflect.ClassTag[K], implicit evidence$7: scala.reflect.ClassTag[V], implicit evidence$8: scala.reflect.ClassTag[F])
org.apache.spark.streaming.dstream.InputDStream[(K, V)] 

Я использую spark 1.4.1 и hadoop 2.7.1. Прежде чем опубликовать этот вопрос, я просмотрел другую реализацию, обсуждаемую в stackoverflow, а также искровые документы, но мне ничего не помогло. Любая помощь будет оценена по достоинству.

Спасибо, Раджниш.


person Rajneesh Kumar    schedule 12.10.2015    source источник


Ответы (1)


Пожалуйста, найдите ниже образец кода Java с правильным импортом, он отлично работает для меня

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

JavaStreamingContext jssc = SparkUtils.getStreamingContext("key", jsc);
//      JavaDStream<String> rawInput = jssc.textFileStream(inputPath);

        JavaPairInputDStream<LongWritable, Text> inputStream = jssc.fileStream(
                inputPath, LongWritable.class, Text.class,
                TextInputFormat.class, new Function<Path, Boolean>() {
                    @Override
                    public Boolean call(Path v1) throws Exception {
                        if ( v1.getName().contains("COPYING") ) {
                            // This eliminates staging files.
                            return Boolean.FALSE;
                        }
                        return Boolean.TRUE;
                    }
                }, true);
        JavaDStream<String> rawInput = inputStream.map(
                  new Function<Tuple2<LongWritable, Text>, String>() {
                    @Override
                    public String call(Tuple2<LongWritable, Text> v1) throws Exception {
                      return v1._2().toString();
                    }
                });
        log.info(tracePrefix + "Created the stream, Window Interval: " + windowInterval + ", Slide interval: " + slideInterval);
        rawInput.print();
person Lokesh Kumar P    schedule 16.10.2015
comment
Спасибо Локешу. У меня это тоже работает. Я импортировал неправильный пакет для TextInputFormat как: import org.apache.hadoop.mapred.TextInputFormat; но правильный вариант: import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; Мне нужна еще одна ваша помощь. Ссылаясь на ваш вопрос: http://stackoverflow.com/questions/29935732/spark-streaming-textfilestream-filestream-get-file-name; вы пытались получить имя обработанного файла с помощью искры. можешь помочь мне в этом? Я пытаюсь получить имена файлов, которые обрабатываются искрой, чтобы я мог удалить / переместить их в другой каталог. - person Rajneesh Kumar; 17.10.2015
comment
Привет, к сожалению, Spark RDD не предоставляет API-механизм для выборки fileNames, единственный совет, который я получил от stackoverflow, - это распечатать RDD как debugString и проанализировать fileNames из этого, но это довольно грязный взлом. - person Lokesh Kumar P; 19.10.2015
comment
Не раскапывать старую проблему, но ответ здесь, похоже, работает, хотя не уверен в проблемах с производительностью. stackoverflow.com/a/40245068/213816 - person OneCricketeer; 27.01.2017