Можно ли выполнить свертку в монаде состояния в постоянном стеке и пространстве кучи? Или для моей проблемы лучше подходит другая функциональная техника?
В следующих разделах описывается проблема и мотивирующий вариант использования. Я использую Scala, но приветствуются и решения на Haskell.
Сложите State
монаду, заполняющую кучу
Предположим, Scalaz 7. Рассмотрим монадическую складку в монаде State. Чтобы избежать переполнения стека, мы воспользуемся складкой.
import scalaz._
import Scalaz._
import scalaz.std.iterable._
import Free.Trampoline
type TrampolinedState[S, B] = StateT[Trampoline, S, B] // monad type constructor
type S = Int // state is an integer
type M[B] = TrampolinedState[S, B] // our trampolined state monad
type R = Int // or some other monoid
val col: Iterable[R] = largeIterableofRs() // defined elsewhere
val (count, sum): (S, R) = col.foldLeftM[M, R](Monoid[R].zero){
(acc: R, x: R) => StateT[Trampoline, S, R] {
s: S => Trampoline.done {
(s + 1, Monoid[R].append(acc, x))
}
}
} run 0 run
// In Scalaz 7, foldLeftM is implemented in terms of foldRight, which in turn
// is a reversed.foldLeft. This pulls the whole collection into memory and kills
// the heap. Ignore this heap overflow. We could reimplement foldLeftM to avoid
// this overflow or use a foldRightM instead.
// Our real issue is the heap used by the unexecuted State mobits.
Для большой коллекции col
это заполнит кучу.
Я считаю, что во время сворачивания создается закрытие (мобит состояния) для каждого значения в коллекции (параметр x: R
), заполняя кучу. Ни один из них не может быть оценен до тех пор, пока не будет выполнен run 0
, обеспечивающий начальное состояние.
Можно ли избежать этого использования кучи O (n)?
Более конкретно, может ли начальное состояние быть предоставлено перед сверткой, чтобы монада State могла выполняться во время каждого связывания, а не вложение замыканий для более поздней оценки?
Или можно построить свертку таким образом, чтобы она выполнялась лениво после того, как монада состояния будет run
? Таким образом, следующее x: R
замыкание не будет создано до тех пор, пока предыдущие не будут оценены и станут пригодными для сборки мусора.
Или есть лучшая функциональная парадигма для такого рода работ?
Пример приложения
Но, возможно, я использую не тот инструмент для работы. Далее следует развитие примера использования. Я здесь ошибаюсь?
Рассмотрим выборку резервуара, т. Е. Выбор за один проход однородных случайных k
элементов из коллекции, слишком большой для размещения в памяти . В Scala такая функция может быть
def sample[A](col: TraversableOnce[A])(k: Int): Vector[A]
и если бы он был введен в тип TraversableOnce
, можно было бы использовать это так
val tenRandomInts = (Int.Min to Int.Max) sample 10
Работа, проделанная sample
, по сути, fold
:
def sample[A](col: Traversable[A])(k: Int): Vector[A] = {
col.foldLeft(Vector()){update(k)(_: Vector[A], _: A)}
}
Однако update
сохраняет состояние; это зависит от n
, количества уже просмотренных элементов. (Это также зависит от ГСЧ, но для простоты я предполагаю, что он глобальный и с отслеживанием состояния. Методы, используемые для обработки n
, будут расширяться тривиально.). Итак, как справиться с этим состоянием?
Нечистое решение простое и работает с постоянным стеком и кучей.
/* Impure version of update function */
def update[A](k: Int) = new Function2[Vector[A], A, Vector[A]] {
var n = 0
def apply(sample: Vector[A], x: A): Vector[A] = {
n += 1
algorithmR(k, n, acc, x)
}
}
def algorithmR(k: Int, n: Int, acc: Vector[A], x: A): Vector[A] = {
if (sample.size < k) {
sample :+ x // must keep first k elements
} else {
val r = rand.nextInt(n) + 1 // for simplicity, rand is global/stateful
if (r <= k)
sample.updated(r - 1, x) // sample is 0-index
else
sample
}
}
А как насчет чисто функционального решения? update
должен принимать n
в качестве дополнительного параметра и возвращать новое значение вместе с обновленным образцом. Мы могли бы включить n
в неявное состояние, аккумулятор свертки, например,
(col.foldLeft ((0, Vector())) (update(k)(_: (Int, Vector[A]), _: A)))._2
Но это затемняет намерение; мы только действительно намереваемся накапливать образец вектора. Эта проблема кажется уже готовой для монады State и монадической левой складки. Давай попробуем еще.
Мы будем использовать Scalaz 7 с этим импортом
import scalaz._
import Scalaz._
import scalaz.std.iterable_
и работать с Iterable[A]
, поскольку Scalaz не поддерживает монадическое сворачивание Traversable
.
sample
теперь определен
// sample using State monad
def sample[A](col: Iterable[A])(k: Int): Vector[A] = {
type M[B] = State[Int, B]
// foldLeftM is implemented using foldRight, which must reverse `col`, blowing
// the heap for large `col`. Ignore this issue for now.
// foldLeftM could be implemented differently or we could switch to
// foldRightM, implemented using foldLeft.
col.foldLeftM[M, Vector[A]](Vector())(update(k)(_: Vector[A], _: A)) eval 0
}
где обновление
// update using State monad
def update(k: Int) = {
(acc: Vector[A], x: A) => State[Int, Vector[A]] {
n => (n + 1, algorithmR(k, n + 1, acc, x)) // algR same as impure solution
}
}
К сожалению, это сильно раздувает большую коллекцию.
Итак, давайте займемся прыжками на батуте. sample
сейчас
// sample using trampolined State monad
def sample[A](col: Iterable[A])(k: Int): Vector[A] = {
import Free.Trampoline
type TrampolinedState[S, B] = StateT[Trampoline, S, B]
type M[B] = TrampolinedState[Int, B]
// Same caveat about foldLeftM using foldRight and blowing the heap
// applies here. Ignore for now. This solution blows the heap anyway;
// let's fix that issue first.
col.foldLeftM[M, Vector[A]](Vector())(update(k)(_: Vector[A], _: A)) eval 0 run
}
где обновление
// update using trampolined State monad
def update(k: Int) = {
(acc: Vector[A], x: A) => StateT[Trampoline, Int, Vector[A]] {
n => Trampoline.done { (n + 1, algorithmR(k, n + 1, acc, x) }
}
}
Это устраняет переполнение стека, но по-прежнему отбрасывает кучу для очень больших коллекций (или очень маленьких куч). Одна анонимная функция для каждого значения в коллекции создается во время сворачивания (я считаю, что закрывается по каждому параметру x: A
), потребляя кучу до того, как батут даже будет запущен. (FWIW, версия State также имеет эту проблему; переполнение стека сначала появляется только с меньшими коллекциями.)
f = s => bigFun()
, тогдаbigFun
фактически не происходит, пока вы не передадитеs
. В этот моментf
можно выбросить , если вы не держитесь за него. Скорее всего, ваша коллекция слишком строгая. Попробуйте использоватьEphemeralStream
и сравните результаты. - person Apocalisp   schedule 25.12.2013