Примените функцию (scanLeft) к разделу, чтобы создать новый столбец в кадре данных

Я хочу выполнить операцию типа scanLeft в одном столбце фрейма данных. Scanleft не является парареллизуемым, но в моем случае я хочу применить эту функцию только к элементам, которые уже находятся в том же разделе. Поэтому операцию можно выполнять параллельно в каждой партиции. (без перемешивания данных)

Рассмотрим следующий пример:

| partitionKey  | orderColumn   | value     | scanLeft(0)(_+_)  |
|-------------- |-------------  |-------    |------------------ |
| 1             | 1             | 1         | 1                 |
| 1             | 2             | 2         | 3                 |
| 2             | 1             | 3         | 3                 |
| 2             | 2             | 4         | 7                 |
| 1             | 3             | 5         | 8                 |
| 2             | 3             | 6         | 13                |

Я хочу просмотреть значения в одном разделе и создать новый столбец для сохранения результата.

Мой код на данный момент будет выглядеть примерно так:

    inDataframe
      .repartition(col("partitionKey"))
      .foreachPartition{
      partition =>
        partition.map(row => row(1).asInstanceOf[Double])
      .scanLeft(0.0)(_+_)
      .foreach(println(_))
    })

Это агрегирует значения, как я хочу, и распечатывает результат, однако я хочу добавить эти значения в качестве нового столбца фрейма данных.

Есть идеи, как это сделать?

----edit---- Реальный вариант использования - рассчитать норму прибыли, взвешенную по времени (https://www.investopedia.com/terms/t/time-weightedror.asp) Ожидаемый ввод выглядит примерно так:

| product   | valuation date    | daily return  |
|---------  |----------------   |-------------- |
| 1         | 2019-01-01        | 0.1           |
| 1         | 2019-01-02        | 0.2           |
| 1         | 2019-01-03        | 0.3           |
| 2         | 2019-01-01        | 0.4           |
| 2         | 2019-01-02        | 0.5           |
| 2         | 2019-01-03        | 0.6           |

Я хочу рассчитать совокупный доход на продукт для всех дат до текущей. Dataframe разделен по продуктам, а разделы упорядочены по дате оценки. Я уже написал функцию агрегации для передачи в scanLeft:

  def chain_ret (x: Double, y: Double): Double = {
    (1 + x) * (1 + y) - 1
  }

Ожидаемые данные возврата:

| product   | valuation date    | daily return  | cumulated return  |
|---------  |----------------   |-------------- |------------------ |
| 1         | 2019-01-01        | 0.1           | 0.1               |
| 1         | 2019-01-02        | 0.2           | 0.32              |
| 1         | 2019-01-03        | 0.3           | 0.716             |
| 2         | 2019-01-01        | 0.4           | 0.4               |
| 2         | 2019-01-02        | 0.5           | 1.1               |
| 2         | 2019-01-03        | 0.6           | 2.36              |

Я уже решил эту проблему, отфильтровав кадр данных для заданного диапазона дат и применив к нему UDAF. (смотрите ниже) Это очень долго и думаю со scanLeft будет намного быстрее!

    while(endPeriod.isBefore(end)) {
      val filtered = inDataframe
        .where("VALUATION_DATE >= '" + start + "' AND VALUATION_DATE <= '" + endPeriod + "'")
      val aggregated = aggregate_returns(filtered)
        .withColumn("VALUATION_DATE", lit(Timestamp.from(endPeriod)).cast(TimestampType))
      df_ret = df_ret.union(aggregated)
      endPeriod = endPeriod.plus(1, ChronoUnit.DAYS)
    }

 def aggregate_returns(inDataframe: DataFrame): DataFrame = {
    val groupedByKey = inDataframe
      .groupBy("product")
    groupedByKey
      .agg(
        returnChain(col("RETURN_LOCAL")).as("RETURN_LOCAL_CUMUL"),
        returnChain(col("RETURN_FX")).as("RETURN_FX_CUMUL"),
        returnChain(col("RETURN_CROSS")).as("RETURN_CROSS_CUMUL"),
        returnChain(col("RETURN")).as("RETURN_CUMUL")
      )

class ReturnChain extends UserDefinedAggregateFunction{

  // Defind the schema of the input data
  override def inputSchema: StructType =
    StructType(StructField("return", DoubleType) :: Nil)

  // Define how the aggregates types will be
  override def bufferSchema: StructType = StructType(
    StructField("product", DoubleType) :: Nil
  )

  // define the return type
  override def dataType: DataType = DoubleType

  // Does the function return the same value for the same input?
  override def deterministic: Boolean = true

  // Initial values
  override def initialize(buffer: MutableAggregationBuffer): Unit = {
    buffer(0) = 0.toDouble
  }

  // Updated based on Input
  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    buffer(0) = (1.toDouble + buffer.getAs[Double](0)) * (1.toDouble + input.getAs[Double](0))
  }

  // Merge two schemas
  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    buffer1(0) = buffer1.getAs[Double](0) + buffer2.getAs[Double](0)
  }

  // Output
  override def evaluate(buffer: Row): Any = {
    buffer.getDouble(0) - 1.toDouble
  }
}

person Aneta    schedule 24.09.2019    source источник
comment
Можете ли вы добавить некоторые входные и ожидаемые данные, могут быть другие способы сделать это.   -  person koiralo    schedule 24.09.2019
comment
@ShankarKoirala Я добавил реальный вариант использования   -  person Aneta    schedule 24.09.2019
comment
вы должны обратить внимание: если фрейм данных разделен на partitionKey, это не означает, что 1 раздел содержит только 1 partitionKey, но что 1 partitionKey присутствует только в 1 разделе. Так что вам бы ещё и groupBy внутри mapPartitions.... В общем, я бы попробовал как-нибудь оконными функциями решить   -  person Raphael Roth    schedule 24.09.2019


Ответы (1)


foreachPartition ничего не возвращает, вместо этого вам нужно использовать .mapPartition()

Разница между foreachPartition и mapPartition такая же, как и между map и foreach. Посмотрите здесь хорошие объяснения Foreach vs Map в Scala

person Pablo López Gallego    schedule 24.09.2019
comment
хорошо, я использую forEach, потому что распечатывал результат scanLeft. Вы точно знаете, как создать новый столбец с помощью mapPartitions? - person Aneta; 24.09.2019
comment
Первый вопрос. Вам действительно нужно использовать repartition? - person Pablo López Gallego; 24.09.2019
comment
Спасибо за помощь! Я добавил подробное объяснение моего варианта использования. На самом деле данные поступают из базы данных Cassandra и хранятся с разбивкой по продуктам. - person Aneta; 24.09.2019