Dataset.reduce не поддерживает функцию сокращения

У меня простой код:

test("0153") {
  val c = Seq(1,8,4,2,7)
  val max = (x:Int, y:Int)=> if (x > y) x else y
  c.reduce(max)
}

Работает нормально. Но когда я использую Dataset.reduce так же,

test("SparkSQLTest") {
  def max(x: Int, y: Int) = if (x > y) x else y
  val spark = SparkSession.builder().master("local").appName("SparkSQLTest").enableHiveSupport().getOrCreate()
  val ds = spark.range(1, 100).map(_.toInt)
  ds.reduce(max) //compiling error:Error:(20, 15) missing argument list for method max
}

Компилятор жалуется, что missing argument list for method max, я не понимаю, что здесь происходит.


person Tom    schedule 12.07.2018    source источник
comment
ds.reduce ((x, y) = ›max (x, y)) должен работать   -  person m-bhole    schedule 12.07.2018
comment
ds.reduce(max _) работает?   -  person Brian McCutchon    schedule 12.07.2018
comment
@BrianMcCutchon не работает ..   -  person Tom    schedule 12.07.2018


Ответы (2)


Измените на функцию вместо метода, и она должна работать, т.е. вместо

def max(x: Int, y: Int) = if (x > y) x else y

использовать

val max = (x: Int, y: Int) => if (x > y) x else y

Использование функции ds.reduce(max) должно работать напрямую. Подробнее о различиях можно найти здесь.


В противном случае, как указал hadooper, вы можете использовать метод, указав аргументы,

def max(x: Int, y: Int) = if (x > y) x else y
ds.reduce((x, y) => max(x,y))
person Shaido    schedule 12.07.2018
comment
@hadooper: Никаких проблем :) - person Shaido; 12.07.2018
comment
Это интересно, спасибо @Shaido - person Tom; 12.07.2018

В соответствии с документом Spark Scala подпись функции уменьшения reduce (func: ReduceFunction [T]): T и reduce (func: (T, T) ⇒ T): T Таким образом, любое из следующих действий будет работать

Подход 1:

scala> val ds = spark.range(1, 100).map(_.toInt)
ds: org.apache.spark.sql.Dataset[Int] = [value: int]

scala> def max(x: Int, y: Int) = if (x > y) x else y
max: (x: Int, y: Int)Int

scala> ds.reduce((x, y) => max(x,y))
res1: Int = 99

Подход 2 [Если вы настаиваете на сокращенных обозначениях, например, reduce (max)]:

scala> val ds = spark.range(1, 100).map(_.toInt)
ds: org.apache.spark.sql.Dataset[Int] = [value: int]

scala> object max extends org.apache.spark.api.java.function.ReduceFunction[Int]{
     | def call(x:Int, y:Int) = {if (x > y) x else y}
     | }
defined object max

scala> ds.reduce(max)
res3: Int = 99

Надеюсь это поможет!

person m-bhole    schedule 12.07.2018
comment
Спасибо @hadooper. Второй подход больше похож на использование Java API. Для 1-го подхода я не понимаю, почему ds.reduce(max) не работает - person Tom; 12.07.2018