Избегайте использования структур данных Java в Apache Spark, чтобы избежать копирования данных.

У меня есть база данных MySQL с одной таблицей, содержащей около 100 миллионов записей (~ 25 ГБ, ~ 5 столбцов). Используя Apache Spark, я извлекаю эти данные через соединитель JDBC и сохраняю их в DataFrame. Отсюда я выполняю некоторую предварительную обработку данных (например, заменяю значения NULL), поэтому мне абсолютно необходимо просмотреть каждую запись. Затем я хотел бы выполнить уменьшение размерности и выбор функций (например, с помощью PCA), выполнить кластеризацию (например, K-Means), а затем провести тестирование модели на новых данных.

Я реализовал это в Java API Spark, но это слишком медленно (для моих целей), поскольку я много копирую данные из DataFrame в java.util.Vector и java.util.List (чтобы иметь возможность перебирать все записи и выполнять предварительную обработку), а затем обратно в DataFrame (поскольку PCA в Spark ожидает DataFrame в качестве входных данных).

Я попытался извлечь информацию из базы данных в org.apache.spark.sql.Column, но не могу найти способ перебрать ее. Я также пытался избежать использования структур данных Java (таких как List и Vector), используя org.apache.spark.mllib.linalg. {DenseVector, SparseVector}, но не могу заставить это работать. Наконец, я также рассматривал возможность использования JavaRDD (путем создания его из DataFrame и настраиваемой схемы), но не смог полностью его решить.

После длинного описания у меня вопрос: есть ли способ выполнить все шаги, упомянутые в первом абзаце, без копирования всех данных в структуру данных Java? Возможно, один из вариантов, который я попробовал, действительно сработает, но я просто не могу понять, как это сделать, поскольку документации и литературы по Spark немного.


person Rajko    schedule 02.06.2016    source источник


Ответы (1)


Судя по формулировке вашего вопроса, похоже, есть некоторая путаница в отношении этапов обработки Spark.

Сначала мы говорим Spark, что делать, задавая входные данные и преобразования. На данный момент известно только (а) количество разделов на различных этапах обработки и (б) схема данных. org.apache.spark.sql.Column используется на этом этапе для идентификации метаданных, связанных со столбцом. Однако он не содержит никаких данных. Фактически, на данном этапе данных вообще нет.

Во-вторых, мы говорим Spark выполнить действие с фреймом данных / набором данных. Это то, что запускает обработку. Входные данные считываются и проходят через различные преобразования и в конечную операцию действия, будь то collect, save или что-то еще.

Итак, это объясняет, почему вы не можете «извлечь информацию из базы данных в» Column.

Что касается сути вашего вопроса, трудно комментировать, не видя вашего кода и не зная точно, что вы пытаетесь выполнить, но можно с уверенностью сказать, что большая миграция между типами - плохая идея.

Вот пара вопросов, которые могут помочь вам добиться лучшего результата:

  • Почему вы не можете выполнить необходимые преобразования данных, работая непосредственно с Row экземплярами?

  • Было бы удобно обернуть часть вашего кода преобразования в UDF или UDAF?

Надеюсь это поможет.

person Sim    schedule 12.06.2016
comment
Сим, ты написал несколько довольно полезных концептуальных вещей, они будут мне очень полезны. В это время. Мне удалось решить мою проблему (ы), используя UDF на Dataframes. Кроме того, Spark 2.0.0 Preview предоставляет множество дополнительных функций, поэтому перенос моего кода на 2.0.0 упростил использование исключительно наборов данных (вместо копирования данных в java Vector / List). - person Rajko; 14.06.2016
comment
Да, если вы используете относительно простые структуры данных, Datasets в 2.0.0 должен работать хорошо. - person Sim; 16.06.2016