Использование Spark DataFrame.flatMap в java

Я работал с функцией RDD.flatMap в java. Теперь пробую свои силы в DataFrames.

Они говорят:

public <R> RDD<R> flatMap(scala.Function1<org.apache.spark.sql.Row,
    scala.collection.TraversableOnce<R>> f, scala.reflect.ClassTag<R> evidence$4)

Возвращает новый RDD, сначала применяя функцию ко всем строкам этого DataFrame, а затем сглаживая результаты.

Задается: flatMap в интерфейсе RDDApi

Но когда я попробовал это, Function1 вынуждает меня переопределять множество нереализованных методов. Вот что я получаю:

    RDD<Row> res = df.flatMap(new Function1<Row, TraversableOnce<Row>>() {

        @Override
        public <A> Function1<Row, A> andThen(
                Function1<TraversableOnce<Row>, A> arg0) {
            // TODO Auto-generated method stub
            return null;
        }

        @Override
        public <A> Function1<Object, A> andThen$mcDD$sp(
                Function1<Object, A> arg0) {
            // TODO Auto-generated method stub
            return null;
        }

        @Override
        public <A> Function1<Object, A> andThen$mcDF$sp(
                Function1<Object, A> arg0) {
            // TODO Auto-generated method stub
            return null;
        }

        @Override
        public <A> Function1<Object, A> andThen$mcDI$sp(
                Function1<Object, A> arg0) {
            // TODO Auto-generated method stub
            return null;
        }

        @Override
        public <A> Function1<Object, A> andThen$mcDJ$sp(
                Function1<Object, A> arg0) {
            // TODO Auto-generated method stub
            return null;
        }

        @Override
        public <A> Function1<Object, A> andThen$mcFD$sp(
                Function1<Object, A> arg0) {
            // TODO Auto-generated method stub
            return null;
        }

        @Override
        public <A> Function1<Object, A> andThen$mcFF$sp(
                Function1<Object, A> arg0) {
            // TODO Auto-generated method stub
            return null;
        }

        @Override
        public <A> Function1<Object, A> andThen$mcFI$sp(
                Function1<Object, A> arg0) {
            // TODO Auto-generated method stub
            return null;
        }

        @Override
        public <A> Function1<Object, A> andThen$mcFJ$sp(
                Function1<Object, A> arg0) {
            // TODO Auto-generated method stub
            return null;
        }

        @Override
        public <A> Function1<Object, A> andThen$mcID$sp(
                Function1<Object, A> arg0) {
            // TODO Auto-generated method stub
            return null;
        }

        @Override
        public <A> Function1<Object, A> andThen$mcIF$sp(
                Function1<Object, A> arg0) {
            // TODO Auto-generated method stub
            return null;
        }

        @Override
        public <A> Function1<Object, A> andThen$mcII$sp(
                Function1<Object, A> arg0) {
            // TODO Auto-generated method stub
            return null;
        }

        @Override
        public <A> Function1<Object, A> andThen$mcIJ$sp(
                Function1<Object, A> arg0) {
            // TODO Auto-generated method stub
            return null;
        }

        @Override
        public <A> Function1<Object, A> andThen$mcJD$sp(
                Function1<Object, A> arg0) {
            // TODO Auto-generated method stub
            return null;
        }

        @Override
        public <A> Function1<Object, A> andThen$mcJF$sp(
                Function1<Object, A> arg0) {
            // TODO Auto-generated method stub
            return null;
        }

        @Override
        public <A> Function1<Object, A> andThen$mcJI$sp(
                Function1<Object, A> arg0) {
            // TODO Auto-generated method stub
            return null;
        }

        @Override
        public <A> Function1<Object, A> andThen$mcJJ$sp(
                Function1<Object, A> arg0) {
            // TODO Auto-generated method stub
            return null;
        }

        @Override
        public <A> Function1<Object, A> andThen$mcVD$sp(
                Function1<BoxedUnit, A> arg0) {
            // TODO Auto-generated method stub
            return null;
        }

        @Override
        public <A> Function1<Object, A> andThen$mcVF$sp(
                Function1<BoxedUnit, A> arg0) {
            // TODO Auto-generated method stub
            return null;
        }

        @Override
        public <A> Function1<Object, A> andThen$mcVI$sp(
                Function1<BoxedUnit, A> arg0) {
            // TODO Auto-generated method stub
            return null;
        }

        @Override
        public <A> Function1<Object, A> andThen$mcVJ$sp(
                Function1<BoxedUnit, A> arg0) {
            // TODO Auto-generated method stub
            return null;
        }

        @Override
        public <A> Function1<Object, A> andThen$mcZD$sp(
                Function1<Object, A> arg0) {
            // TODO Auto-generated method stub
            return null;
        }

        @Override
        public <A> Function1<Object, A> andThen$mcZF$sp(
                Function1<Object, A> arg0) {
            // TODO Auto-generated method stub
            return null;
        }

        @Override
        public <A> Function1<Object, A> andThen$mcZI$sp(
                Function1<Object, A> arg0) {
            // TODO Auto-generated method stub
            return null;
        }

        @Override
        public <A> Function1<Object, A> andThen$mcZJ$sp(
                Function1<Object, A> arg0) {
            // TODO Auto-generated method stub
            return null;
        }

        @Override
        public TraversableOnce<Row> apply(Row arg0) {
            // TODO Auto-generated method stub
            return null;
        }

        @Override
        public double apply$mcDD$sp(double arg0) {
            // TODO Auto-generated method stub
            return 0;
        }

        @Override
        public double apply$mcDF$sp(float arg0) {
            // TODO Auto-generated method stub
            return 0;
        }

        @Override
        public double apply$mcDI$sp(int arg0) {
            // TODO Auto-generated method stub
            return 0;
        }

        @Override
        public double apply$mcDJ$sp(long arg0) {
            // TODO Auto-generated method stub
            return 0;
        }

        @Override
        public float apply$mcFD$sp(double arg0) {
            // TODO Auto-generated method stub
            return 0;
        }

        @Override
        public float apply$mcFF$sp(float arg0) {
            // TODO Auto-generated method stub
            return 0;
        }

        @Override
        public float apply$mcFI$sp(int arg0) {
            // TODO Auto-generated method stub
            return 0;
        }

        @Override
        public float apply$mcFJ$sp(long arg0) {
            // TODO Auto-generated method stub
            return 0;
        }

        @Override
        public int apply$mcID$sp(double arg0) {
            // TODO Auto-generated method stub
            return 0;
        }

        @Override
        public int apply$mcIF$sp(float arg0) {
            // TODO Auto-generated method stub
            return 0;
        }

        @Override
        public int apply$mcII$sp(int arg0) {
            // TODO Auto-generated method stub
            return 0;
        }

        @Override
        public int apply$mcIJ$sp(long arg0) {
            // TODO Auto-generated method stub
            return 0;
        }

        @Override
        public long apply$mcJD$sp(double arg0) {
            // TODO Auto-generated method stub
            return 0;
        }

        @Override
        public long apply$mcJF$sp(float arg0) {
            // TODO Auto-generated method stub
            return 0;
        }

        @Override
        public long apply$mcJI$sp(int arg0) {
            // TODO Auto-generated method stub
            return 0;
        }

        @Override
        public long apply$mcJJ$sp(long arg0) {
            // TODO Auto-generated method stub
            return 0;
        }

        @Override
        public void apply$mcVD$sp(double arg0) {
            // TODO Auto-generated method stub

        }

        @Override
        public void apply$mcVF$sp(float arg0) {
            // TODO Auto-generated method stub

        }

        @Override
        public void apply$mcVI$sp(int arg0) {
            // TODO Auto-generated method stub

        }

        @Override
        public void apply$mcVJ$sp(long arg0) {
            // TODO Auto-generated method stub

        }

        @Override
        public boolean apply$mcZD$sp(double arg0) {
            // TODO Auto-generated method stub
            return false;
        }

        @Override
        public boolean apply$mcZF$sp(float arg0) {
            // TODO Auto-generated method stub
            return false;
        }

        @Override
        public boolean apply$mcZI$sp(int arg0) {
            // TODO Auto-generated method stub
            return false;
        }

        @Override
        public boolean apply$mcZJ$sp(long arg0) {
            // TODO Auto-generated method stub
            return false;
        }

        @Override
        public <A> Function1<A, TraversableOnce<Row>> compose(
                Function1<A, Row> arg0) {
            // TODO Auto-generated method stub
            return null;
        }

        @Override
        public <A> Function1<A, Object> compose$mcDD$sp(
                Function1<A, Object> arg0) {
            // TODO Auto-generated method stub
            return null;
        }

        @Override
        public <A> Function1<A, Object> compose$mcDF$sp(
                Function1<A, Object> arg0) {
            // TODO Auto-generated method stub
            return null;
        }

        @Override
        public <A> Function1<A, Object> compose$mcDI$sp(
                Function1<A, Object> arg0) {
            // TODO Auto-generated method stub
            return null;
        }

        @Override
        public <A> Function1<A, Object> compose$mcDJ$sp(
                Function1<A, Object> arg0) {
            // TODO Auto-generated method stub
            return null;
        }

        @Override
        public <A> Function1<A, Object> compose$mcFD$sp(
                Function1<A, Object> arg0) {
            // TODO Auto-generated method stub
            return null;
        }

        @Override
        public <A> Function1<A, Object> compose$mcFF$sp(
                Function1<A, Object> arg0) {
            // TODO Auto-generated method stub
            return null;
        }

        @Override
        public <A> Function1<A, Object> compose$mcFI$sp(
                Function1<A, Object> arg0) {
            // TODO Auto-generated method stub
            return null;
        }

        @Override
        public <A> Function1<A, Object> compose$mcFJ$sp(
                Function1<A, Object> arg0) {
            // TODO Auto-generated method stub
            return null;
        }

        @Override
        public <A> Function1<A, Object> compose$mcID$sp(
                Function1<A, Object> arg0) {
            // TODO Auto-generated method stub
            return null;
        }

        @Override
        public <A> Function1<A, Object> compose$mcIF$sp(
                Function1<A, Object> arg0) {
            // TODO Auto-generated method stub
            return null;
        }

        @Override
        public <A> Function1<A, Object> compose$mcII$sp(
                Function1<A, Object> arg0) {
            // TODO Auto-generated method stub
            return null;
        }

        @Override
        public <A> Function1<A, Object> compose$mcIJ$sp(
                Function1<A, Object> arg0) {
            // TODO Auto-generated method stub
            return null;
        }

        @Override
        public <A> Function1<A, Object> compose$mcJD$sp(
                Function1<A, Object> arg0) {
            // TODO Auto-generated method stub
            return null;
        }

        @Override
        public <A> Function1<A, Object> compose$mcJF$sp(
                Function1<A, Object> arg0) {
            // TODO Auto-generated method stub
            return null;
        }

        @Override
        public <A> Function1<A, Object> compose$mcJI$sp(
                Function1<A, Object> arg0) {
            // TODO Auto-generated method stub
            return null;
        }

        @Override
        public <A> Function1<A, Object> compose$mcJJ$sp(
                Function1<A, Object> arg0) {
            // TODO Auto-generated method stub
            return null;
        }

        @Override
        public <A> Function1<A, BoxedUnit> compose$mcVD$sp(
                Function1<A, Object> arg0) {
            // TODO Auto-generated method stub
            return null;
        }

        @Override
        public <A> Function1<A, BoxedUnit> compose$mcVF$sp(
                Function1<A, Object> arg0) {
            // TODO Auto-generated method stub
            return null;
        }

        @Override
        public <A> Function1<A, BoxedUnit> compose$mcVI$sp(
                Function1<A, Object> arg0) {
            // TODO Auto-generated method stub
            return null;
        }

        @Override
        public <A> Function1<A, BoxedUnit> compose$mcVJ$sp(
                Function1<A, Object> arg0) {
            // TODO Auto-generated method stub
            return null;
        }

        @Override
        public <A> Function1<A, Object> compose$mcZD$sp(
                Function1<A, Object> arg0) {
            // TODO Auto-generated method stub
            return null;
        }

        @Override
        public <A> Function1<A, Object> compose$mcZF$sp(
                Function1<A, Object> arg0) {
            // TODO Auto-generated method stub
            return null;
        }

        @Override
        public <A> Function1<A, Object> compose$mcZI$sp(
                Function1<A, Object> arg0) {
            // TODO Auto-generated method stub
            return null;
        }

        @Override
        public <A> Function1<A, Object> compose$mcZJ$sp(
                Function1<A, Object> arg0) {
            // TODO Auto-generated method stub
            return null;
        }
    }, evidence$4);

Это выглядит странно, но я сделал evidence$4 как:

ClassTag<Row> evidence$4 = scala.reflect.ClassTag$.MODULE$.apply(Row.class);

Я намерен просто поиграть с flatMap (конечно, на DataFrames, а не на RDD). Так что мне не нужны никакие изменения на Row. Может вернуть ввод как есть без каких-либо изменений.

Итак, я думаю, мне нужно внести изменения только в метод apply.

    @Override
    public TraversableOnce<Row> apply(Row arg0) {
        // TODO Auto-generated method stub
        return null;
    }

Но опять же, как мне получить TraversableOnce<Row> из Row?

Кроме того, правильный ли метод, который я пытаюсь использовать? Или я что-то упускаю?

Я использую Apache Spark 1.3.1.


person Gireesh Puthumana    schedule 25.05.2015    source источник
comment
comment
Пожалуйста, удалите тег spark-java, он не имеет отношения к Apache Spark.   -  person Gleb    schedule 03.06.2015


Ответы (1)


Вы должны сделать что-то вроде следующего:

RDD<Row> res = df.flatMap(new AbstractFunction1<Row, TraversableOnce<Row>>() {
  public TraversableOnce<Row> apply(Row row) {
    return new ListSet<Row>().$plus(row); //Note the updated list is returned from $plus()
  }
}, evidence$4);

Это будет работать аналогично map, только с большей свободой изменений. Например, чтобы отфильтровать вещи, вы можете вернуть пустой new ListSet<Row>(), когда захотите его вернуть, или сохранить текущее поведение. flatMap очень гибкий.

(Кажется, преобразование коллекций Java в коллекции Scala не является тривиальным.)

person Gábor Bakos    schedule 25.05.2015
comment
На мой взгляд, DataFrame = RDD + Schema. Если это так, то, по сути, мы должны сделать newDF = DF.RDD.flatmap(f).applySchema(), не создавая особых проблем с изобретением велосипеда. - person ayan guha; 26.05.2015
comment
Это оно?? Под капотом нет изменений? Будет ли производительность такой же? - person Gireesh Puthumana; 26.05.2015
comment
@ gábor-bakos, пробовал точно такой же код из вашего ответа. Он показывает ошибку компиляции и просит меня добавить приведение к «TraversableOnce‹Row›». Пробовал это, но получил исключение java.lang.ClassCastException: scala.collection.convert.Decorators$AsScala нельзя преобразовать в scala.collection.TraversableOnce. - person Gireesh Puthumana; 26.05.2015
comment
В таком случае добавим к этому toList(). - person Gábor Bakos; 26.05.2015
comment
Куда добавить toList()? (Извините, но эта штука scala - java сбивает с толку). Не могли бы вы опубликовать обновленный код? И что такое свидетельство $4? Какой цели он служит? Правильно ли я придумал? Есть ли хорошая документация по API Spark DF java с примерами? - person Gireesh Puthumana; 26.05.2015
comment
Только что заметил, что вы отредактировали свой ответ. Но этот JavaConverters$.MODULE$.asScalaBufferConverter(Collections.singletonList(row)).toList(); дает Метод toList() не определен для типа Decorators.AsScala‹Buffer‹Row›› Я использую spark 1.3.1 - person Gireesh Puthumana; 26.05.2015
comment
Извините, это выходит за рамки моих текущих знаний об использовании коллекций Scala из Java. evidence$4 можно было бы назвать по-другому, я просто попытался добавить то, что вы предложили, в Scala, который заполняется автоматически. Возможно, добавление List.apply в список: List$.MODULE$.apply(JavaConverters$.MODULE$.asScalaBufferConverter(Collections.singletonList(row))) исправит ошибку. Или List$.MODULE$.apply(new Row[]{row}) тоже может работать. - person Gábor Bakos; 26.05.2015
comment
Оба не работали. Но это сделало: ListSet<Row> b = new ListSet<Row>(); b.$plus(row); return b;. Но я получаю 0 записей на выходе. Я сомневаюсь в evidence$4. Любая помощь? - person Gireesh Puthumana; 26.05.2015
comment
Да, я думаю, что $plus возвращает коллекцию, содержащую строку, так что это может сработать: ListSet<Row> b = new ListSet<Row>().$plus(row); return b;. - person Gábor Bakos; 26.05.2015
comment
Упс! Вот и все. ListSet<Row> b = new ListSet<Row>().$plus(row); выдает результат. проблема заключалась в том, что отдельный b.$plus(row); не был назначен обратно b. Пожалуйста, измените свой ответ. Я приму это. Благодарю вас! - person Gireesh Puthumana; 26.05.2015
comment
Я обновил ответ, это тоже было для меня познавательно. Спасибо. - person Gábor Bakos; 26.05.2015