Я объединяю 2 набора данных двумя столбцами, и в результате получается набор данных, содержащий 55 миллиардов строк. После этого мне нужно выполнить некоторую агрегацию в этой DS по столбцам, отличным от тех, которые используются в соединении. Проблема в том, что Spark выполняет раздел обмена после соединения (занимает слишком много времени с 55 миллиардами строк), хотя данные уже распределены правильно, потому что агрегированный столбец уникален. Я знаю, что ключ агрегации распределен правильно, и есть ли способ сообщить об этом приложению Spark?
Spark делает обмен разделами, которые уже правильно распределены
Ответы (1)
1) Перейдите в пользовательский интерфейс Spark и отметьте «Уровень местоположения».
2) Если вы объединяете большие и маленькие данные, используйте Brodcast Join
3) При объединении данных большого и среднего размера и если RDD среднего размера не полностью помещается в память, используйте фильтр
val keys = sc.broadcast(mediumRDD.map(_._1).collect.toSet)
val reducedRDD = largeRDD.filter{ case(key, value) => keys.value.contains(key) }
reducedRDD.join(mediumRDD)
4) Проверьте, серилизуются ли данные или нет
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.kryoserializer.buffer.max", "128m")
.set("spark.kryoserializer.buffer", "64m")
.registerKryoClasses(
Array(classOf[ArrayBuffer[String]], classOf[ListBuffer[String]])
5) Проверьте пользовательский интерфейс Spark или добавьте следующую строку в код для отладки
df.rdd.getNumPartitions
Пользовательский интерфейс приложения Spark, на следующем снимке экрана видно, что «Всего задач» представляет количество разделов.
person
vaquar khan
schedule
26.10.2017