Я хочу выполнить операцию типа scanLeft в одном столбце фрейма данных. Scanleft не является парареллизуемым, но в моем случае я хочу применить эту функцию только к элементам, которые уже находятся в том же разделе. Поэтому операцию можно выполнять параллельно в каждой партиции. (без перемешивания данных)
Рассмотрим следующий пример:
| partitionKey | orderColumn | value | scanLeft(0)(_+_) |
|-------------- |------------- |------- |------------------ |
| 1 | 1 | 1 | 1 |
| 1 | 2 | 2 | 3 |
| 2 | 1 | 3 | 3 |
| 2 | 2 | 4 | 7 |
| 1 | 3 | 5 | 8 |
| 2 | 3 | 6 | 13 |
Я хочу просмотреть значения в одном разделе и создать новый столбец для сохранения результата.
Мой код на данный момент будет выглядеть примерно так:
inDataframe
.repartition(col("partitionKey"))
.foreachPartition{
partition =>
partition.map(row => row(1).asInstanceOf[Double])
.scanLeft(0.0)(_+_)
.foreach(println(_))
})
Это агрегирует значения, как я хочу, и распечатывает результат, однако я хочу добавить эти значения в качестве нового столбца фрейма данных.
Есть идеи, как это сделать?
----edit---- Реальный вариант использования - рассчитать норму прибыли, взвешенную по времени (https://www.investopedia.com/terms/t/time-weightedror.asp) Ожидаемый ввод выглядит примерно так:
| product | valuation date | daily return |
|--------- |---------------- |-------------- |
| 1 | 2019-01-01 | 0.1 |
| 1 | 2019-01-02 | 0.2 |
| 1 | 2019-01-03 | 0.3 |
| 2 | 2019-01-01 | 0.4 |
| 2 | 2019-01-02 | 0.5 |
| 2 | 2019-01-03 | 0.6 |
Я хочу рассчитать совокупный доход на продукт для всех дат до текущей. Dataframe разделен по продуктам, а разделы упорядочены по дате оценки. Я уже написал функцию агрегации для передачи в scanLeft:
def chain_ret (x: Double, y: Double): Double = {
(1 + x) * (1 + y) - 1
}
Ожидаемые данные возврата:
| product | valuation date | daily return | cumulated return |
|--------- |---------------- |-------------- |------------------ |
| 1 | 2019-01-01 | 0.1 | 0.1 |
| 1 | 2019-01-02 | 0.2 | 0.32 |
| 1 | 2019-01-03 | 0.3 | 0.716 |
| 2 | 2019-01-01 | 0.4 | 0.4 |
| 2 | 2019-01-02 | 0.5 | 1.1 |
| 2 | 2019-01-03 | 0.6 | 2.36 |
Я уже решил эту проблему, отфильтровав кадр данных для заданного диапазона дат и применив к нему UDAF. (смотрите ниже) Это очень долго и думаю со scanLeft будет намного быстрее!
while(endPeriod.isBefore(end)) {
val filtered = inDataframe
.where("VALUATION_DATE >= '" + start + "' AND VALUATION_DATE <= '" + endPeriod + "'")
val aggregated = aggregate_returns(filtered)
.withColumn("VALUATION_DATE", lit(Timestamp.from(endPeriod)).cast(TimestampType))
df_ret = df_ret.union(aggregated)
endPeriod = endPeriod.plus(1, ChronoUnit.DAYS)
}
def aggregate_returns(inDataframe: DataFrame): DataFrame = {
val groupedByKey = inDataframe
.groupBy("product")
groupedByKey
.agg(
returnChain(col("RETURN_LOCAL")).as("RETURN_LOCAL_CUMUL"),
returnChain(col("RETURN_FX")).as("RETURN_FX_CUMUL"),
returnChain(col("RETURN_CROSS")).as("RETURN_CROSS_CUMUL"),
returnChain(col("RETURN")).as("RETURN_CUMUL")
)
class ReturnChain extends UserDefinedAggregateFunction{
// Defind the schema of the input data
override def inputSchema: StructType =
StructType(StructField("return", DoubleType) :: Nil)
// Define how the aggregates types will be
override def bufferSchema: StructType = StructType(
StructField("product", DoubleType) :: Nil
)
// define the return type
override def dataType: DataType = DoubleType
// Does the function return the same value for the same input?
override def deterministic: Boolean = true
// Initial values
override def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = 0.toDouble
}
// Updated based on Input
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
buffer(0) = (1.toDouble + buffer.getAs[Double](0)) * (1.toDouble + input.getAs[Double](0))
}
// Merge two schemas
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
buffer1(0) = buffer1.getAs[Double](0) + buffer2.getAs[Double](0)
}
// Output
override def evaluate(buffer: Row): Any = {
buffer.getDouble(0) - 1.toDouble
}
}
partitionKey
, это не означает, что 1 раздел содержит только 1partitionKey
, но что 1partitionKey
присутствует только в 1 разделе. Так что вам бы ещё иgroupBy
внутриmapPartitions
.... В общем, я бы попробовал как-нибудь оконными функциями решить - person Raphael Roth   schedule 24.09.2019