Я пытаюсь «преобразовать» свое приложение Spark, написанное на Java, в Scala. Поскольку я новичок в Scala и Scala API от Spark, я не знаю, как написать эту функцию «transformToPair» в Scala:
Джава:
JavaPairDStream<String, Boolean> outlierPairDStream = avgAll1h.union(avgPerPlug1h).transformToPair(findOutliersPerComparisonFunction);
*** FUNCTION ***
private static Function<JavaPairRDD<String,Float>, JavaPairRDD<String,Boolean>> findOutliersPerComparisonFunction = new Function<JavaPairRDD<String,Float>, JavaPairRDD<String,Boolean>>() {
public JavaPairRDD<String, Boolean> call(JavaPairRDD<String, Float> v1) throws Exception {
float avgOfAll;
if(v1.count() > 0) {
avgOfAll = v1.filter(new Function<Tuple2<String,Float>, Boolean>() {
public Boolean call(Tuple2<String, Float> v1) throws Exception {
return v1._1().equals("all");
}
}).values().collect().get(0);
} else {
avgOfAll = 0.0f;
}
final float finalAvg = avgOfAll;
JavaPairRDD<String, Boolean> rddBool = v1.mapValues(new Function<Float, Boolean>() {
public Boolean call(Float v1) throws Exception {
return v1 > finalAvg;
}
});
return rddBool.filter(new Function<Tuple2<String,Boolean>, Boolean>() {
public Boolean call(Tuple2<String, Boolean> v1) throws Exception {
return !v1._1().equals("all");
}
});
}
};
Вот моя попытка со Scala:
val outlierPairDStream = avgAll1h.union(avgPerPlug1h).transform{rdd =>
var avgOfAll = 0.0
if(rdd.count() > 0) {
avgOfAll = rdd.filter({case (k, v) => (k == "all")}).map({case (k, v) => v}).collect()(0)
}
val finalAvg = avgOfAll
val rddBool = rdd.map({case(k, v) => (k, v > finalAvg)})
val rddNew = rddBool.filter({case(k, v) => (k != "all")})
}
Я получаю следующее сообщение об ошибке:
<console>:281: error: type mismatch;
found : Unit
required: org.apache.spark.rdd.RDD[?]
}
^
Кто-нибудь может мне помочь? Как я могу вернуть DStream "rddNew"?
Если я скажу
return rddNew
в конце функции "преобразование" я получаю следующую ошибку:
<console>:293: error: return outside method definition
return rddNew
^