Почему мне не следует использовать collect() в моих преобразованиях Python?

TL;DR: Ходят слухи, что некоторые функции PySpark не рекомендуется использовать в Transforms, но я не уверен, какие функции неверны и почему?

Почему я не могу просто collect() при определенных обстоятельствах перенести свои данные в список и перебирать строки?


person Andrew St P    schedule 25.09.2020    source источник


Ответы (1)


Здесь есть много моментов, которые нужно понять, чтобы прийти к окончательному выводу, а именно, что collect() и другие функции являются неэффективным использованием Spark.

Локальный против распределенного

Во-первых, давайте рассмотрим разницу между локальными и распределенными вычислениями. В Spark операции pyspark.sql.functions и pyspark.sql.DataFrame, которые вы обычно выполняете, такие как join() или groupBy(), делегируют выполнение этих операций базовым библиотекам Spark для достижения максимально возможной производительности. Думайте об этом как об использовании Python просто как более удобного языка поверх SQL, где вы лениво описываете операции, которые вы хотите, чтобы Spark выполнял за вас.

Таким образом, когда вы придерживаетесь SQL-операций в PySpark, вы можете ожидать высокой масштабируемости производительности, но только для вещей, которые вы можете выразить в SQL. Именно здесь люди обычно используют ленивый подход и реализуют свои преобразования с использованием for циклов вместо того, чтобы думать о наилучшей возможной тактике.

Давайте рассмотрим случай, когда вы хотите просто добавить одно значение в целочисленный столбец в вашем DataFrame. Вы найдете в Stack Overflow и других местах множество примеров в некоторых более тонких случаях, когда они предлагают использовать collect() для переноса данных в список Python, перебирая каждую строку и помещая данные обратно в DataFrame после завершения, что это одна тактика, которую вы могли бы использовать здесь. Однако давайте подумаем о том, что это означает на практике: вы возвращаете свои данные, размещенные в Spark, обратно в драйвер вашей сборки для зацикливания с использованием одного потока в Python для каждой строки и добавления постоянного значения в каждую строку. вовремя. Если бы вместо этого мы нашли (очевидный в данном случае) SQL-эквивалент этой операции, Spark мог бы взять ваши данные и в массовом параллельном режиме добавить значение к отдельным строкам. А именно, если у вас есть 64 исполнителя (экземпляры рабочих, доступных для выполнения вашей работы), то у вас будет 64 «ядра» (это не идеальная аналогия, но близкая), чтобы разделить данные и отправить их в каждый для добавления значения в столбец. Это позволит вам значительно быстрее выполнить операцию с конечным результатом, которую вы хотели.

Работа над драйвером — это то, что я называю «локальными» вычислениями, а работа в исполнителях — «параллельными».

Это может быть очевидным примером здесь, но часто бывает трудно запомнить эту разницу при работе с более сложными преобразованиями, такими как расширенные оконные операции или вычисления линейной алгебры. В Spark есть библиотеки для выполнения матричных умножений и манипуляций в распределенном режиме, а также некоторые довольно сложные операции в Windows, которые требуют сначала немного больше подумать о вашей проблеме.

Ленивая оценка

Самый эффективный способ использования PySpark — отправить ваши «инструкции» о том, как создать ваш DataFrame сразу, чтобы Spark мог найти лучший способ материализовать эти данные. Таким образом, функций, которые вызывают вычисление DataFrame, чтобы вы могли проверить его в какой-то момент вашего кода, следует избегать, если это вообще возможно; они означают, что Spark работает дополнительно, чтобы удовлетворить ваш оператор print() или другой вызов метода, вместо того, чтобы работать над записью ваших данных.

Python в Java в Scala

Среда выполнения Python фактически выполняется внутри JVM, которая, в свою очередь, взаимодействует со средой выполнения Spark, написанной на Scala. Таким образом, для каждого вызова collect(), когда вы хотите материализовать свои данные в Python, Spark должен материализовать ваши данные в один локально доступный DataFrame, затем синтезировать его из Scala в его эквивалент Java, а затем, наконец, перейти от JVM к Python. эквиваленты, прежде чем он будет доступен для повторения. Это невероятно неэффективный процесс, который невозможно распараллелить. Это приводит к тому, что операций, которые передают ваши данные в Python, настоятельно рекомендуется избегать.

Функции, которых следует избегать

Итак, каких функций следует избегать?

  • собирать
  • глава
  • брать
  • первый
  • показывать

Каждый из этих методов вызывает выполнение в DataFrame и возвращает результаты в среду выполнения Python для отображения/использования. Это означает, что у Spark не будет возможности лениво определить наиболее эффективный способ вычислений с вашими данными, и вместо этого он будет вынужден возвращать запрошенные данные, прежде чем приступить к любому другому выполнению.

person Adil B    schedule 25.09.2020