Spark делает обмен разделами, которые уже правильно распределены

Я объединяю 2 набора данных двумя столбцами, и в результате получается набор данных, содержащий 55 миллиардов строк. После этого мне нужно выполнить некоторую агрегацию в этой DS по столбцам, отличным от тех, которые используются в соединении. Проблема в том, что Spark выполняет раздел обмена после соединения (занимает слишком много времени с 55 миллиардами строк), хотя данные уже распределены правильно, потому что агрегированный столбец уникален. Я знаю, что ключ агрегации распределен правильно, и есть ли способ сообщить об этом приложению Spark?


person datahack    schedule 26.10.2017    source источник


Ответы (1)


1) Перейдите в пользовательский интерфейс Spark и отметьте «Уровень местоположения».

введите здесь описание изображения

2) Если вы объединяете большие и маленькие данные, используйте Brodcast Join

3) При объединении данных большого и среднего размера и если RDD среднего размера не полностью помещается в память, используйте фильтр

val keys = sc.broadcast(mediumRDD.map(_._1).collect.toSet)
val reducedRDD = largeRDD.filter{ case(key, value) => keys.value.contains(key) }
reducedRDD.join(mediumRDD)

4) Проверьте, серилизуются ли данные или нет

.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .set("spark.kryoserializer.buffer.max", "128m")
      .set("spark.kryoserializer.buffer", "64m")
      .registerKryoClasses(
        Array(classOf[ArrayBuffer[String]], classOf[ListBuffer[String]])

5) Проверьте пользовательский интерфейс Spark или добавьте следующую строку в код для отладки

df.rdd.getNumPartitions

Пользовательский интерфейс приложения Spark, на следующем снимке экрана видно, что «Всего задач» представляет количество разделов.

введите здесь описание изображения

person vaquar khan    schedule 26.10.2017