Перепишите Java-приложение Spark на Scala

Я пытаюсь «преобразовать» свое приложение Spark, написанное на Java, в Scala. Поскольку я новичок в Scala и Scala API от Spark, я не знаю, как написать эту функцию «transformToPair» в Scala:

Джава:

JavaPairDStream<String, Boolean> outlierPairDStream = avgAll1h.union(avgPerPlug1h).transformToPair(findOutliersPerComparisonFunction);

*** FUNCTION ***

private static Function<JavaPairRDD<String,Float>, JavaPairRDD<String,Boolean>> findOutliersPerComparisonFunction = new Function<JavaPairRDD<String,Float>, JavaPairRDD<String,Boolean>>() {
    public JavaPairRDD<String, Boolean> call(JavaPairRDD<String, Float> v1) throws Exception {

        float avgOfAll;
        if(v1.count() > 0) {
            avgOfAll = v1.filter(new Function<Tuple2<String,Float>, Boolean>() {
                public Boolean call(Tuple2<String, Float> v1) throws Exception {
                    return v1._1().equals("all");
                }
            }).values().collect().get(0);
        } else {
            avgOfAll = 0.0f;
        }

        final float finalAvg = avgOfAll;

        JavaPairRDD<String, Boolean> rddBool = v1.mapValues(new Function<Float, Boolean>() {
            public Boolean call(Float v1) throws Exception {
                return v1 > finalAvg;
            }
        });


        return rddBool.filter(new Function<Tuple2<String,Boolean>, Boolean>() {
            public Boolean call(Tuple2<String, Boolean> v1) throws Exception {
                return !v1._1().equals("all");
            }
        });
    }
};

Вот моя попытка со Scala:

val outlierPairDStream = avgAll1h.union(avgPerPlug1h).transform{rdd => 
  var avgOfAll = 0.0

  if(rdd.count() > 0) {
    avgOfAll = rdd.filter({case (k, v) => (k == "all")}).map({case (k, v) => v}).collect()(0)
  }

  val finalAvg = avgOfAll

  val rddBool = rdd.map({case(k, v) => (k, v > finalAvg)})

  val rddNew = rddBool.filter({case(k, v) => (k != "all")})
}

Я получаю следующее сообщение об ошибке:

<console>:281: error: type mismatch;
 found   : Unit
 required: org.apache.spark.rdd.RDD[?]
       }
       ^

Кто-нибудь может мне помочь? Как я могу вернуть DStream "rddNew"?

Если я скажу

return rddNew

в конце функции "преобразование" я получаю следующую ошибку:

<console>:293: error: return outside method definition
       return rddNew
       ^

person D. Müller    schedule 30.12.2015    source источник


Ответы (1)


Фактически вы должны вернуть последнее значение, например. как это:

val outlierPairDStream = avgAll1h.union(avgPerPlug1h).transform{rdd => 
  var avgOfAll = 0.0

  if(rdd.count() > 0) {
    avgOfAll = rdd.filter({case (k, v) => (k == "all")}).map({case (k, v) => v}).collect()(0)
  }

  val finalAvg = avgOfAll

  val rddBool = rdd.map({case(k, v) => (k, v > finalAvg)})

  val rddNew = rddBool.filter({case(k, v) => (k != "all")})

  rddNew
}

Или просто пропустите определение переменной:

val outlierPairDStream = avgAll1h.union(avgPerPlug1h).transform{rdd => 
  var avgOfAll = 0.0

  if(rdd.count() > 0) {
    avgOfAll = rdd.filter({case (k, v) => (k == "all")}).map({case (k, v) => v}).collect()(0)
  }

  val finalAvg = avgOfAll

  val rddBool = rdd.map({case(k, v) => (k, v > finalAvg)})

  rddBool.filter({case(k, v) => (k != "all")})
}

Чуть больше Scala-подобного могло бы быть:

val outlierPairDStream = avgAll1h.union(avgPerPlug1h).transform{rdd => 

  val finalAvg = if(rdd.count() > 0) {
    rdd.filter({case (k, v) => (k == "all")}).map({case (k, v) => v}).collect()(0)
  } else { 0.0 }

  val rddBool = rdd.map({case(k, v) => (k, v > finalAvg)})

  rddBool.filter({case(k, v) => (k != "all")})
}
person Ashalynd    schedule 30.12.2015
comment
Это помогло мне, я просто попытался использовать строку rddNew БЕЗ ключевого слова RETURN, и теперь она работает! Ты спас мне день, спасибо! - person D. Müller; 30.12.2015