Экспорт фрейма данных PySpark в озеро данных Azure займет вечность

Приведенный ниже код отлично работал в автономной версии PySpark 2.4 в Mac OS (Python 3.7), когда размер входных данных (около 6 ГБ) был небольшим. Однако, когда я запустил код в кластере HDInsight (HDI 4.0, т.е. Python 3.5, PySpark 2.4, 4 рабочих узла, каждый из которых имеет 64 ядра и 432 ГБ ОЗУ, 2 узла заголовка, каждый из которых имеет 4 ядра и 28 ГБ ОЗУ, 2-й генерация озера данных) с большими входными данными (169 ГБ), последний шаг, то есть запись данных в озеро данных, занял целую вечность (я убил его после 24 часов выполнения). Учитывая тот факт, что HDInsight не пользуется популярностью в сообществе облачных вычислений, я мог ссылаться только на сообщения, в которых жаловались на низкую скорость при записи фрейма данных в S3. Некоторые предлагали переразбить набор данных, что я и сделал, но это не помогло.

from pyspark.sql import SparkSession
from pyspark.sql.types import ArrayType, StringType, IntegerType, BooleanType
from pyspark.sql.functions import udf, regexp_extract, collect_set, array_remove, col, size, asc, desc
from pyspark.ml.fpm import FPGrowth
import os
os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3.5"
os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/bin/python3.5"

def work(order_path, beer_path, corpus_path, output_path, FREQ_THRESHOLD=1000, LIFT_THRESHOLD=1):
    print("Creating Spark Environment...")
    spark = SparkSession.builder.appName("Menu").getOrCreate()
    print("Spark Environment Created!")
    print("Working on Checkpoint1...")
    orders = spark.read.csv(order_path)
    orders.createOrReplaceTempView("orders")
    orders = spark.sql(
        "SELECT _c14 AS order_id, _c31 AS in_menu_id, _c32 AS in_menu_name FROM orders"
    )
    orders.createOrReplaceTempView("orders")
    beer = spark.read.csv(
        beer_path,
        header=True
    )
    beer.createOrReplaceTempView("beer")
    beer = spark.sql(
        """
        SELECT 
            order_id AS beer_order_id,
            in_menu_id AS beer_in_menu_id,
            '-999' AS beer_in_menu_name
        FROM beer
        """
    )
    beer.createOrReplaceTempView("beer")
    orders = spark.sql(
        """
        WITH orders_beer AS (
            SELECT *
            FROM orders
            LEFT JOIN beer
            ON orders.in_menu_id = beer.beer_in_menu_id
        )
        SELECT
            order_id,
            in_menu_id,
            CASE
                WHEN beer_in_menu_name IS NOT NULL THEN beer_in_menu_name
                WHEN beer_in_menu_name IS NULL THEN in_menu_name
            END AS menu_name
        FROM orders_beer
        """
    )
    print("Checkpoint1 Completed!")
    print("Working on Checkpoint2...")
    corpus = spark.read.csv(
        corpus_path,
        header=True
    )
    keywords = corpus.select("Food_Name").rdd.flatMap(lambda x: x).collect()
    orders = orders.withColumn(
        "keyword", 
        regexp_extract(
            "menu_name", 
            "(?=^|\s)(" + "|".join(keywords) + ")(?=\s|$)", 
            0
        )
    )
    orders.createOrReplaceTempView("orders")
    orders = spark.sql("""
        SELECT order_id, in_menu_id, keyword
        FROM orders
        WHERE keyword != ''
    """)
    orders.createOrReplaceTempView("orders")
    orders = orders.groupBy("order_id").agg(
        collect_set("keyword").alias("items")
    )
    print("Checkpoint2 Completed!")
    print("Working on Checkpoint3...")
    fpGrowth = FPGrowth(
        itemsCol="items", 
        minSupport=0, 
        minConfidence=0
    )
    model = fpGrowth.fit(orders)
    print("Checkpoint3 Completed!")
    print("Working on Checkpoint4...")
    frequency = model.freqItemsets
    frequency = frequency.filter(col("freq") > FREQ_THRESHOLD)
    frequency = frequency.withColumn(
        "items", 
        array_remove("items", "-999")
    )
    frequency = frequency.filter(size(col("items")) > 0)
    frequency = frequency.orderBy(asc("items"), desc("freq"))
    frequency = frequency.dropDuplicates(["items"])
    frequency = frequency.withColumn(
        "antecedent", 
        udf(
            lambda x: "|".join(sorted(x)), StringType()
        )(frequency.items)
    )
    frequency.createOrReplaceTempView("frequency")
    lift = model.associationRules
    lift = lift.drop("confidence")
    lift = lift.filter(col("lift") > LIFT_THRESHOLD)
    lift = lift.filter(
        udf(
            lambda x: x == ["-999"], BooleanType()
        )(lift.consequent)
    )
    lift = lift.drop("consequent")
    lift = lift.withColumn(
        "antecedent", 
        udf(
            lambda x: "|".join(sorted(x)), StringType()
        )(lift.antecedent)
    )
    lift.createOrReplaceTempView("lift")
    result = spark.sql(
        """
        SELECT lift.antecedent, freq AS frequency, lift
        FROM lift
        INNER JOIN frequency
        ON lift.antecedent = frequency.antecedent
        """
    )
    print("Checkpoint4 Completed!")
    print("Writing Result to Data Lake...")
    result.repartition(1024).write.mode("overwrite").parquet(output_path)
    print("All Done!")

def main():
    work(
        order_path=169.1 GB of txt,
        beer_path=4.9 GB of csv,
        corpus_path=210 KB of csv,
        output_path="final_result.parquet"
    )

if __name__ == "__main__":
    main()

Сначала я подумал, что это вызвано форматом файла паркет. Однако, когда я попробовал csv, я столкнулся с той же проблемой. Я попытался result.count() посмотреть, сколько строк в таблице result. Чтобы получить номер строки, потребовалась вечность, как и при записи данных в озеро данных. Было предложено использовать широковещательное хеш-соединение вместо стандартного сортировка-объединение, если большой набор данных объединяется с небольшим набором данных. Я подумал, что стоит попробовать, потому что меньшие образцы в пилотном исследовании сказали мне, что номер строки frequency составляет примерно 0,09% от числа lift (см. Запрос ниже, если у вас возникли трудности с отслеживанием frequency и lift).

SELECT lift.antecedent, freq AS frequency, lift
FROM lift
INNER JOIN frequency
ON lift.antecedent = frequency.antecedent

Имея это в виду, я изменил свой код:

from pyspark.sql import SparkSession
from pyspark.sql.types import ArrayType, StringType, IntegerType, BooleanType
from pyspark.sql.functions import udf, regexp_extract, collect_set, array_remove, col, size, asc, desc
from pyspark.ml.fpm import FPGrowth
import os
os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3.5"
os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/bin/python3.5"

def work(order_path, beer_path, corpus_path, output_path, FREQ_THRESHOLD=1000, LIFT_THRESHOLD=1):
    print("Creating Spark Environment...")
    spark = SparkSession.builder.appName("Menu").getOrCreate()
    print("Spark Environment Created!")
    print("Working on Checkpoint1...")
    orders = spark.read.csv(order_path)
    orders.createOrReplaceTempView("orders")
    orders = spark.sql(
        "SELECT _c14 AS order_id, _c31 AS in_menu_id, _c32 AS in_menu_name FROM orders"
    )
    orders.createOrReplaceTempView("orders")
    beer = spark.read.csv(
        beer_path,
        header=True
    )
    beer.createOrReplaceTempView("beer")
    beer = spark.sql(
        """
        SELECT 
            order_id AS beer_order_id,
            in_menu_id AS beer_in_menu_id,
            '-999' AS beer_in_menu_name
        FROM beer
        """
    )
    beer.createOrReplaceTempView("beer")
    orders = spark.sql(
        """
        WITH orders_beer AS (
            SELECT *
            FROM orders
            LEFT JOIN beer
            ON orders.in_menu_id = beer.beer_in_menu_id
        )
        SELECT
            order_id,
            in_menu_id,
            CASE
                WHEN beer_in_menu_name IS NOT NULL THEN beer_in_menu_name
                WHEN beer_in_menu_name IS NULL THEN in_menu_name
            END AS menu_name
        FROM orders_beer
        """
    )
    print("Checkpoint1 Completed!")
    print("Working on Checkpoint2...")
    corpus = spark.read.csv(
        corpus_path,
        header=True
    )
    keywords = corpus.select("Food_Name").rdd.flatMap(lambda x: x).collect()
    orders = orders.withColumn(
        "keyword", 
        regexp_extract(
            "menu_name", 
            "(?=^|\s)(" + "|".join(keywords) + ")(?=\s|$)", 
            0
        )
    )
    orders.createOrReplaceTempView("orders")
    orders = spark.sql("""
        SELECT order_id, in_menu_id, keyword
        FROM orders
        WHERE keyword != ''
    """)
    orders.createOrReplaceTempView("orders")
    orders = orders.groupBy("order_id").agg(
        collect_set("keyword").alias("items")
    )
    print("Checkpoint2 Completed!")
    print("Working on Checkpoint3...")
    fpGrowth = FPGrowth(
        itemsCol="items", 
        minSupport=0, 
        minConfidence=0
    )
    model = fpGrowth.fit(orders)
    print("Checkpoint3 Completed!")
    print("Working on Checkpoint4...")
    frequency = model.freqItemsets
    frequency = frequency.filter(col("freq") > FREQ_THRESHOLD)
    frequency = frequency.withColumn(
        "antecedent", 
        array_remove("items", "-999")
    )
    frequency = frequency.drop("items")
    frequency = frequency.filter(size(col("antecedent")) > 0)
    frequency = frequency.orderBy(asc("antecedent"), desc("freq"))
    frequency = frequency.dropDuplicates(["antecedent"])
    frequency = frequency.withColumn(
        "antecedent", 
        udf(
            lambda x: "|".join(sorted(x)), StringType()
        )(frequency.antecedent)
    )
    lift = model.associationRules
    lift = lift.drop("confidence")
    lift = lift.filter(col("lift") > LIFT_THRESHOLD)
    lift = lift.filter(
        udf(
            lambda x: x == ["-999"], BooleanType()
        )(lift.consequent)
    )
    lift = lift.drop("consequent")
    lift = lift.withColumn(
        "antecedent", 
        udf(
            lambda x: "|".join(sorted(x)), StringType()
        )(lift.antecedent)
    )
    result = lift.join(
        frequency.hint("broadcast"), 
        ["antecedent"], 
        "inner"
    )
    print("Checkpoint4 Completed!")
    print("Writing Result to Data Lake...")
    result.repartition(1024).write.mode("overwrite").parquet(output_path)
    print("All Done!")

def main():
    work(
        order_path=169.1 GB of txt,
        beer_path=4.9 GB of csv,
        corpus_path=210 KB of csv,
        output_path="final_result.parquet"
    )

if __name__ == "__main__":
    main()

Код отлично работал с теми же образцами данных на моей Mac OS и, как и ожидалось, занял меньше времени (34 секунды против 26 секунд). Затем я решил запустить код в HDInsight с полными наборами данных. На последнем шаге, который записывает данные в озеро данных, задача не удалась, и мне сказали: Задание отменено из-за закрытия SparkContext. Я новичок в больших данных и понятия не имею, как это сделать. В сообщениях в Интернете говорилось, что этому может быть много причин. Какой бы метод я ни использовал, как оптимизировать мой код, чтобы получить желаемый результат в озере данных за приемлемое время?


person James Chang    schedule 07.12.2019    source источник


Ответы (2)


Я бы попробовал несколько вещей, в зависимости от количества энергии, которое им требуется:

  • Убедитесь, что хранилище ADL находится в том же регионе, что и ваш кластер HDInsight.
  • Добавляйте вызовы для df = df.cache() после тяжелых вычислений или даже записывайте и считывайте фреймы данных в кэш-хранилище и из него в промежутках между этими вычислениями.
  • Замените свои UDF «родным» кодом Spark, поскольку UDF являются одним из плохие методы работы Spark.
person David Taub    schedule 07.12.2019
comment
Спасибо за ваш быстрый ответ. Я пройдусь по каждому из них. - person James Chang; 07.12.2019
comment
Учетная запись хранения при создании кластера такая же, как и учетная запись хранения моего выходного пути. Однако они не используют один и тот же контейнер больших двоичных объектов. Влияет ли это на производительность? - person James Chang; 07.12.2019
comment
аккаунт и регион не совпадают. и да, - person David Taub; 07.12.2019
comment
кеширование мне помогло! Спасибо. - person James Chang; 10.12.2019

Я понял это после пяти дней борьбы. Вот подходы, которые я использовал для оптимизации кода. Время выполнения кода сократилось с более 24 часов до примерно 10 минут. Оптимизация кода действительно важна.

  1. Как указывал ниже Дэвид Тауб, используйте df.cache() после тяжелых вычислений или перед подачей данных в модель. Я использовал df.cache().count(), так как вызов .cache() сам по себе лениво оценивается, но следующий .count() вызывает оценку всего набора данных.
  2. Используйте flashtext вместо регулярного выражения для извлечения ключевых слов. Это значительно улучшает производительность кода.
  3. Будьте осторожны с объединениями / слияниями. Это может стать очень медленным из-за асимметрии данных. Всегда думайте о способах избежать ненужных объединений.
  4. Установите minSupport для FPGrowth. Это значительно сокращает время звонка model.freqItemsets.
person James Chang    schedule 10.12.2019