Почему withColumn так долго работает в pyspark?

У меня есть фреймворк pyspark, содержащий 1000 столбцов и 10000 записей (строк). Мне нужно создать еще 2000 столбцов, выполнив некоторые вычисления для существующих столбцов.

df #pyspark dataframe contaning 1000 columns and 10,000 records
df = df.withColumn('C1001', ((df['C269'] * df['C285'])/df['C41'])) #existing column names range from C1 to C1000
df = df.withColumn('C1002', ((df['C4'] * df['C267'])/df['C146']))
df = df.withColumn('C1003', ((df['C87'] * df['C134'])/df['C238']))
.
.
.
df = df.withColumn('C3000', ((df['C365'] * df['C235'])/df['C321']))

Проблема в том, что это занимает слишком много времени, около 45 минут.
Так как я новичок, мне было интересно, что я делаю не так?
PS: Я использую Spark на блоках данных с 1 драйвер и 1 рабочий узел, оба имеют 16 ГБ памяти и 8 ядер.

Спасибо!


person Mir Muntasar Ali Agha    schedule 22.06.2020    source источник


Ответы (3)


Многое из того, что вы делаете, просто создаете план выполнения. Spark лениво выполняется до тех пор, пока его не запустит действие. Итак, 45 минут, которые вы видите, вероятно, связаны с выполнением всех преобразований, которые вы настраивали.

Если вы хотите узнать, сколько времени занимает один withColumn, то активируйте действие, подобное df.count () или что-то ранее, а затем выполните одно withColumn, а затем еще одно df.count () (чтобы снова запустить действие).

Узнайте больше о плане выполнения, преобразованиях и действиях pyspark.

person noobius    schedule 22.06.2020
comment
Есть предложения, как улучшить тайминги для этих преобразований? - person Mir Muntasar Ali Agha; 22.06.2020
comment
Вы можете попробовать разбрасывать действия здесь и там, чтобы снизить сложность плана выполнения, иногда это может помочь. Если у вас есть возможность масштабировать кластер (увеличить количество узлов или ядер процессора) и увеличить количество исполнителей, а затем перераспределить данные, чтобы получить больше параллельной обработки. - person noobius; 22.06.2020

Не вдаваясь в подробности

  • and looking at the observations of the 1st answer
    • and knowing that Execution Plans for many DF-columns aka "very wide data" are costly to compute
      • a move to RDD processing may well be the path to take.
person thebluephantom    schedule 22.06.2020

Делайте это в одну строку, а не по очереди

df = df.withColumn('C1001', COl1).df.withColumn('C1002', COl2).df.withColumn('C1003', COl3) ......
person Varun05    schedule 24.06.2020