Я обрабатываю JSON-поток кафки в Spark Structured Streaming. Обработка как микропакеты, могу ли я использовать аккумуляторы с потоковыми фреймами данных?
LongAccumulator longAccum = new LongAccumulator("my accum");
Dataset<Row> df2 = df.filter(output.col("Called number").equalTo("0860"))
.groupBy("Calling number").count();
// put row counter to accumulator for example
df2.javaRDD().foreach(row -> {longAccumulator.add(1);})
бросает
Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
. Также меня смущает использование аккумуляторов таким образом. Преобразование фрейма данных в RDD выглядит странно и излишне. Могу ли я сделать это без RDD и foreach ()?
Согласно исключению, я удалил foreach из исходного фрейма данных и сделал это в writeStream (). ForeachBatch ()
StreamingQuery ds = df2
.writeStream().foreachBatch( (rowDataset, aLong) -> {
longAccum.add(1);
log.info("accum : " + longAccum.value());
})
.outputMode("complete")
.format("console").start();
Он работает, но у меня нет значений в журналах, и я не вижу аккумулятор в графическом интерфейсе.