Я пытаюсь запустить пользовательскую функцию параллельно с некоторыми параметрами, хранящимися в RDD, в spark scala. Я предполагаю, что использование карты должно дать результат. Я получаю сообщение об ошибке при передаче определенной мной пользовательской функции. Если вместо этого я передаю некоторую стандартную функцию (например, длину), ошибки не будет. Однако это не похоже на проблему с моей функцией, потому что даже если я передаю пустую пользовательскую функцию, она все равно не работает. Буду признателен за любое предложение.
Чтобы быть конкретным, df_short - это фрейм данных с 6 столбцами и 1 строкой .:
df_short.show
| 1| 2| 3| 4| 5| 6|
| 10|0.5|0.4|0.05|0.7|0.07|
превращение этого в RDD, а затем переход в карту с помощью стандартной функции:
df_short.rdd.map(i => i.length).collect
Далее получаем настраиваемую функцию - функцию, которая возвращает только 0
def grid_search_2(prm1: Int, prm2: Double, prm3: Double, prm4: Double,
prm5: Double, prm6: Double): Int = {
return 0
}
Теперь при попытке передать пользовательскую функцию возникает ошибка:
df_short.rdd.map(i => grid_search_2(i.getInt(0), i.getDouble(1),
i.getDouble(2), i.getDouble(3), i.getDouble(4), i.getDouble(5))).collect
Я получаю следующую ошибку:
va.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:430)
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:83)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
.
.
.
.
Ошибка очень длинная, и я могу вставить ее еще раз. Я был бы признателен за любую помощь в выяснении, почему возникает эта ошибка - пока мне не очень повезло с поиском решения. Я использую Spark версии 2.4.3, scala версии 2.11.12 и пишу код в блокноте Zeppelin. Спасибо!