Карта Spark Scala не работает с пользовательской функцией

Я пытаюсь запустить пользовательскую функцию параллельно с некоторыми параметрами, хранящимися в RDD, в spark scala. Я предполагаю, что использование карты должно дать результат. Я получаю сообщение об ошибке при передаче определенной мной пользовательской функции. Если вместо этого я передаю некоторую стандартную функцию (например, длину), ошибки не будет. Однако это не похоже на проблему с моей функцией, потому что даже если я передаю пустую пользовательскую функцию, она все равно не работает. Буду признателен за любое предложение.

Чтобы быть конкретным, df_short - это фрейм данных с 6 столбцами и 1 строкой .:

df_short.show

|  1|  2|  3|   4|  5|   6|

| 10|0.5|0.4|0.05|0.7|0.07|

превращение этого в RDD, а затем переход в карту с помощью стандартной функции:

df_short.rdd.map(i => i.length).collect

Далее получаем настраиваемую функцию - функцию, которая возвращает только 0

def grid_search_2(prm1: Int, prm2: Double, prm3: Double, prm4: Double, 
prm5: Double, prm6: Double): Int = {
return 0
}

Теперь при попытке передать пользовательскую функцию возникает ошибка:

df_short.rdd.map(i => grid_search_2(i.getInt(0), i.getDouble(1), 
i.getDouble(2), i.getDouble(3), i.getDouble(4), i.getDouble(5))).collect

Я получаю следующую ошибку:

va.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:430)
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:83)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
.
.
.
.

Ошибка очень длинная, и я могу вставить ее еще раз. Я был бы признателен за любую помощь в выяснении, почему возникает эта ошибка - пока мне не очень повезло с поиском решения. Я использую Spark версии 2.4.3, scala версии 2.11.12 и пишу код в блокноте Zeppelin. Спасибо!


person user10751040    schedule 10.10.2019    source источник


Ответы (1)


Возможно, это потому, что вы используете метод экземпляра в своем удаленном коде (а не когда вы используете .map(i => i.length), который является анонимной функцией).

Я просто догадываюсь: когда вы используете свой метод экземпляра (который принадлежит к «классу», который не виден напрямую из Zeppelin-Notebook), Spark пытается сериализовать весь класс. Вместо этого вы можете попытаться определить объект функции:

val grid_search_2 = (prm1: Int, prm2: Double, prm3: Double, prm4: Double, prm5: Double, prm6: Double) =>  {0}

И посмотрите, имеет ли это значение

person Raphael Roth    schedule 10.10.2019
comment
Боюсь, ваш код не работает. Т.е. предложенное вами назначение в порядке, но я получаю ту же ошибку при вызове карты. Тем не менее, спасибо за предложение! - person user10751040; 10.10.2019