Монадическая свертка с монадой состояния в постоянном пространстве (куча и стек)?

Можно ли выполнить свертку в монаде состояния в постоянном стеке и пространстве кучи? Или для моей проблемы лучше подходит другая функциональная техника?

В следующих разделах описывается проблема и мотивирующий вариант использования. Я использую Scala, но приветствуются и решения на Haskell.


Сложите State монаду, заполняющую кучу

Предположим, Scalaz 7. Рассмотрим монадическую складку в монаде State. Чтобы избежать переполнения стека, мы воспользуемся складкой.

import scalaz._
import Scalaz._
import scalaz.std.iterable._
import Free.Trampoline

type TrampolinedState[S, B] = StateT[Trampoline, S, B] // monad type constructor

type S = Int  // state is an integer
type M[B] = TrampolinedState[S, B] // our trampolined state monad

type R = Int  // or some other monoid

val col: Iterable[R] = largeIterableofRs() // defined elsewhere

val (count, sum): (S, R) = col.foldLeftM[M, R](Monoid[R].zero){ 
    (acc: R, x: R) => StateT[Trampoline, S, R] {
      s: S => Trampoline.done { 
        (s + 1, Monoid[R].append(acc, x))
      }
    }
} run 0 run

// In Scalaz 7, foldLeftM is implemented in terms of foldRight, which in turn
// is a reversed.foldLeft. This pulls the whole collection into memory and kills
// the heap.  Ignore this heap overflow. We could reimplement foldLeftM to avoid
// this overflow or use a foldRightM instead.
// Our real issue is the heap used by the unexecuted State mobits.

Для большой коллекции col это заполнит кучу.

Я считаю, что во время сворачивания создается закрытие (мобит состояния) для каждого значения в коллекции (параметр x: R), заполняя кучу. Ни один из них не может быть оценен до тех пор, пока не будет выполнен run 0, обеспечивающий начальное состояние.

Можно ли избежать этого использования кучи O (n)?

Более конкретно, может ли начальное состояние быть предоставлено перед сверткой, чтобы монада State могла выполняться во время каждого связывания, а не вложение замыканий для более поздней оценки?

Или можно построить свертку таким образом, чтобы она выполнялась лениво после того, как монада состояния будет run? Таким образом, следующее x: R замыкание не будет создано до тех пор, пока предыдущие не будут оценены и станут пригодными для сборки мусора.

Или есть лучшая функциональная парадигма для такого рода работ?


Пример приложения

Но, возможно, я использую не тот инструмент для работы. Далее следует развитие примера использования. Я здесь ошибаюсь?

Рассмотрим выборку резервуара, т. Е. Выбор за один проход однородных случайных k элементов из коллекции, слишком большой для размещения в памяти . В Scala такая функция может быть

def sample[A](col: TraversableOnce[A])(k: Int): Vector[A]

и если бы он был введен в тип TraversableOnce, можно было бы использовать это так

val tenRandomInts = (Int.Min to Int.Max) sample 10

Работа, проделанная sample, по сути, fold:

def sample[A](col: Traversable[A])(k: Int): Vector[A] = {
    col.foldLeft(Vector()){update(k)(_: Vector[A], _: A)}
}

Однако update сохраняет состояние; это зависит от n, количества уже просмотренных элементов. (Это также зависит от ГСЧ, но для простоты я предполагаю, что он глобальный и с отслеживанием состояния. Методы, используемые для обработки n, будут расширяться тривиально.). Итак, как справиться с этим состоянием?

Нечистое решение простое и работает с постоянным стеком и кучей.

/* Impure version of update function */
def update[A](k: Int) = new Function2[Vector[A], A, Vector[A]] {
    var n = 0
    def apply(sample: Vector[A], x: A): Vector[A] = {
        n += 1
        algorithmR(k, n, acc, x)
    }
}

def algorithmR(k: Int, n: Int, acc: Vector[A], x: A): Vector[A] = {
    if (sample.size < k) {
        sample :+ x // must keep first k elements
    } else {
        val r = rand.nextInt(n) + 1 // for simplicity, rand is global/stateful
        if (r <= k)
            sample.updated(r - 1, x) // sample is 0-index
        else
            sample
    }
}

А как насчет чисто функционального решения? update должен принимать n в качестве дополнительного параметра и возвращать новое значение вместе с обновленным образцом. Мы могли бы включить n в неявное состояние, аккумулятор свертки, например,

(col.foldLeft ((0, Vector())) (update(k)(_: (Int, Vector[A]), _: A)))._2

Но это затемняет намерение; мы только действительно намереваемся накапливать образец вектора. Эта проблема кажется уже готовой для монады State и монадической левой складки. Давай попробуем еще.

Мы будем использовать Scalaz 7 с этим импортом

import scalaz._
import Scalaz._
import scalaz.std.iterable_

и работать с Iterable[A], поскольку Scalaz не поддерживает монадическое сворачивание Traversable.

sample теперь определен

// sample using State monad
def sample[A](col: Iterable[A])(k: Int): Vector[A] = {       
    type M[B] = State[Int, B]

    // foldLeftM is implemented using foldRight, which must reverse `col`, blowing
    // the heap for large `col`.  Ignore this issue for now.
    // foldLeftM could be implemented differently or we could switch to
    // foldRightM, implemented using foldLeft.
    col.foldLeftM[M, Vector[A]](Vector())(update(k)(_: Vector[A], _: A)) eval 0
}

где обновление

// update using State monad
def update(k: Int) = {
    (acc: Vector[A], x: A) => State[Int, Vector[A]] {
        n => (n + 1, algorithmR(k, n + 1, acc, x)) // algR same as impure solution
    }
}

К сожалению, это сильно раздувает большую коллекцию.

Итак, давайте займемся прыжками на батуте. sample сейчас

// sample using trampolined State monad
def sample[A](col: Iterable[A])(k: Int): Vector[A] = {
    import Free.Trampoline

    type TrampolinedState[S, B] = StateT[Trampoline, S, B]
    type M[B] = TrampolinedState[Int, B]

    // Same caveat about foldLeftM using foldRight and blowing the heap
    // applies here.  Ignore for now. This solution blows the heap anyway;
    // let's fix that issue first.
    col.foldLeftM[M, Vector[A]](Vector())(update(k)(_: Vector[A], _: A)) eval 0 run
}

где обновление

// update using trampolined State monad
def update(k: Int) = {
    (acc: Vector[A], x: A) => StateT[Trampoline, Int, Vector[A]] {
        n => Trampoline.done { (n + 1, algorithmR(k, n + 1, acc, x) }
    }
}

Это устраняет переполнение стека, но по-прежнему отбрасывает кучу для очень больших коллекций (или очень маленьких куч). Одна анонимная функция для каждого значения в коллекции создается во время сворачивания (я считаю, что закрывается по каждому параметру x: A), потребляя кучу до того, как батут даже будет запущен. (FWIW, версия State также имеет эту проблему; переполнение стека сначала появляется только с меньшими коллекциями.)


person David B.    schedule 24.12.2013    source источник
comment
Я не думаю, что ваша догадка верна, что в куче создается одна функция для каждого значения, и это то, что пожирает вашу память. Составная функция создается лениво. Подумай об этом. Когда вы говорите f = s => bigFun(), тогда bigFun фактически не происходит, пока вы не передадите s. В этот момент f можно выбросить , если вы не держитесь за него. Скорее всего, ваша коллекция слишком строгая. Попробуйте использовать EphemeralStream и сравните результаты.   -  person Apocalisp    schedule 25.12.2013
comment
Первоначально я понимал ленивое создание, но я вижу, что эти закрытия созданы (с использованием профилировщика). Это после того, как задано начальное состояние и батут запущен, но до того, как батут фактически выполнит каждую вещь. Смотрите мои комментарии к вашему ответу.   -  person David B.    schedule 25.12.2013
comment
Между прочим, как только мое замешательство будет разрешено, я отредактирую свой вопрос, чтобы удалить отвлекающие факторы (например, подходит ли коллекция в памяти. Это на самом деле не имеет значения; просто использование кучи большой O монадической складки ...)   -  person David B.    schedule 25.12.2013


Ответы (2)


Наша настоящая проблема - это куча, используемая невыполненными мобитами State.

Нет это не так. Настоящая проблема в том, что коллекция не умещается в памяти и что foldLeftM и foldRightM принудительно собирают всю коллекцию. Побочный эффект нечистого раствора состоит в том, что вы освобождаете память по ходу дела. В «чисто функциональном» решении вы этого нигде не делаете.

Ваше использование Iterable игнорирует важную деталь: что это за коллекция col на самом деле, как создаются ее элементы и как ожидается их удаление. И так, обязательно, foldLeftM на Iterable. Вероятно, это слишком строго, и вы заставляете всю коллекцию в памяти. Например, если это Stream, то до тех пор, пока вы удерживаете col, все элементы, принудительно заданные до сих пор, будут в памяти. Если это какой-то другой ленивый Iterable, который не запоминает свои элементы, то свертка все равно слишком строгая.

Я попробовал ваш первый пример с EphemeralStream, не обнаружил какого-либо значительного давления в куче, хотя он явно будет иметь те же «невыполненные мобиты состояния». Разница в том, что на элементы EphemeralStream ссылаются слабо, и его foldRight не форсирует весь поток.

Я подозреваю, что если бы вы использовали Foldable.foldr, вы не заметили бы проблемного поведения, поскольку оно сводится к функции, которая не использует свой второй аргумент. Когда вы коллируете фолд, вы хотите, чтобы он немедленно возвращал приостановку, которая выглядит примерно так:

Suspend(() => head |+| tail.foldRightM(...))

Когда батут возобновляет первую приостановку и переходит к следующей приостановке, все распределения между приостановками становятся доступными для освобождения сборщиком мусора.

Попробуйте следующее:

def foldM[M[_]:Monad,A,B](a: A, bs: Iterable[B])(f: (A, B) => M[A]): M[A] =
  if (bs.isEmpty) Monad[M].point(a)
  else Monad[M].bind(f(a, bs.head))(fax => foldM(fax, bs.tail)(f))

val MS = StateT.stateTMonadState[Int, Trampoline]
import MS._

foldM[M,R,Int](Monoid[R].zero, col) {
  (x, r) => modify(_ + 1) map (_ => Monoid[R].append(x, r))
} run 0 run

Это будет работать в постоянной куче для монады с трамплином M, но переполнит стек для монады без трамплина.

Но настоящая проблема в том, что Iterable не является хорошей абстракцией для данных, которые слишком велики, чтобы поместиться в памяти. Конечно, вы можете написать императивную побочную программу, в которой вы явно отбрасываете элементы после каждой итерации или используйте ленивую правую складку. Это хорошо работает, пока вы не захотите скомпоновать эту программу с другой. И я предполагаю, что вся причина, по которой вы исследуете это в State монаде для начала, состоит в том, чтобы получить композиционность.

Так что ты можешь сделать? Вот несколько вариантов:

  1. Используйте Reducer, Monoid и их композицию, затем выполните императивный цикл с явным освобождением (или ленивую правую складку с трамплином) в качестве последнего шага, после которого композиция становится невозможной или ожидаемой.
  2. Используйте Iteratee композицию и монадические Enumerator, чтобы накормить их.
  3. Напишите преобразователи композиционного потока с помощью Scalaz-Stream.

Последний из этих вариантов - тот, который я бы использовал и рекомендовал в общем случае.

person Apocalisp    schedule 25.12.2013
comment
Для своих тестов я использовал анонимный new Iterator{...} (который просто увеличивал var: Int). Это не удерживает предыдущие элементы в памяти (подтверждено решением с отслеживанием состояния в примере приложения). Его поведение должно быть таким же в других sample реализациях. - person David B.; 25.12.2013
comment
Меня не беспокоят некоторые Iterable-коллекции, требующие, чтобы все элементы были в памяти - это следует учитывать при выборе коллекции. Меня беспокоит foldLeftM[State[B]](...)(...) использование дополнительного O(n) пространства кучи. (Я должен был быть более конкретным в вопросе; я просто подумал, что объяснение, слишком большое, чтобы поместиться в памяти, было более простым для объяснения.) - person David B.; 25.12.2013
comment
Чтобы было ясно, я не пытаюсь оспаривать ваш анализ (черт возьми, я узнал о StateT[Trampoline, S, B] из вашей статьи о свободных монадах и переполнении стека), просто поймите основную причину моей проблемы. Ваши три предложения могут быть лучше для моей проблемы (спасибо!), Но я хотел бы понять, почему мой foldLeftM[StateT[Trampoline, S, B] не использует постоянную дополнительную кучу. - person David B.; 25.12.2013
comment
С foldLeftM в коллекции N элементов я вижу, что N scalaz.IterableInstances$$anonfunc$foldRight$1$1$$anonfun$apply$1 функций, каждая из которых ссылается на закрытие scalaz.Foldable$$anonfun$foldLeftM$2$$anonfun$apply$10 над элементом коллекции. Все они создаются одновременно при вызове Trampoline.run, до того, как какие-либо из них будут удалены и развернуты. - person David B.; 25.12.2013
comment
Операции сворачивания EphemeralStream отличаются от иерархии коллекций двумя способами (например, мой итератор). Во-первых, foldRight является рекурсивной, а не reserved.foldLeft, которая помещает всю коллекцию в новый список. Я не думаю, что это основная причина. Во-вторых, параметр функции уменьшения для foldRight должен использовать ленивую оценку (вызов по имени) для аргументов. Строгая оценка, используемая иерархией коллекций, может привести к созданию замыканий перед оценкой. Модифицированный Iterable с EphemeralStream реализацией foldRight работает правильно: использование кучи равно O (1). - person David B.; 25.12.2013
comment
Неважно, есть ли замыкание, удерживающее элемент потока. Проблема в том, что каждый элемент потока никогда не освобождается из памяти во время сворачивания. - person Apocalisp; 25.12.2013
comment
Обновлен ответ, чтобы отразить вашу причину, по которой это работает с EphemeralStream. - person Apocalisp; 25.12.2013
comment
Я не понимаю, почему вы говорите, что это не имеет значения, есть ли замыкание, удерживающее элемент потока. Это моя проблема! Из моего реального варианта использования: у меня есть коллекция размером 50 ГБ, которая умещается в памяти (на машине с 60 ГБ оперативной памяти). Вся моя обработка должна использовать дополнительное пространство кучи O (1), но эти закрытия используют дополнительный O (n) и переполняют кучу. - person David B.; 25.12.2013
comment
Foldr имеет смысл. Вы знаете, почему коллекции scala foldLeft и foldRight не могут лениться в своих аргументах? - person David B.; 25.12.2013
comment
Это полностью дизайнерское решение стандартной библиотеки. Говоря строгим языком, создание ленивых версий всего требует значительных усилий и трудно сделать все правильно. - person Apocalisp; 25.12.2013
comment
Да, я думаю, это не помогает тому, что вы создаете все эти замыкания изнутри. Вы должны создавать только то закрытие, которое вам нужно, чтобы однажды вернуть управление батуту. - person Apocalisp; 25.12.2013

Использование State или любой подобной монады - не лучший подход к решению проблемы. Использование State обречено на выброс стека / кучи на большие коллекции. Рассмотрим значение x: State[A,B], созданное из большой коллекции (например, путем складывания поверх нее). Затем x можно оценить при разных значениях начального состояния A, давая разные результаты. Поэтому x необходимо сохранить всю информацию, содержащуюся в коллекции. В чистых настройках x не может забыть некоторую информацию, чтобы не взорвать стек / кучу, поэтому все, что вычислено, остается в памяти до тех пор, пока не будет освобождено все монадическое значение, что происходит только после оценки результата. Таким образом, потребление памяти x пропорционально размеру коллекции.

Я считаю, что подходящим подходом к этой проблеме является использование функциональных итераций / каналов / каналов. Эта концепция (упоминаемая под этими тремя именами) была изобретена для обработки больших коллекций данных с постоянным потреблением памяти и для описания таких процессов с помощью простого комбинатора.

Я пытался использовать Scalaz 'Iteratees, но похоже, что эта часть еще не созрела, она страдает от переполнения стека, как и State (или, возможно, я использую ее неправильно; код доступен здесь, если кому интересно).

Однако это было просто с использованием моего (все еще немного экспериментального) scala-pipeline библиотека (отказ от ответственности: я являюсь автором):

import conduit._
import conduit.Pipe._

object Run extends App {
  // Define a sampling function as a sink: It consumes
  // data of type `A` and produces a vector of samples.
  def sampleI[A](k: Int): Sink[A, Vector[A]] =
    sampleI[A](k, 0, Vector())

  // Create a sampling sink with a given state. It requests
  // a value from the upstream conduit. If there is one,
  // update the state and continue (the first argument to `requestF`).
  // If not, return the current sample (the second argument).
  // The `Finalizer` part isn't important for our problem.
  private def sampleI[A](k: Int, n: Int, sample: Vector[A]):
                  Sink[A, Vector[A]] =
    requestF((x: A) => sampleI(k, n + 1, algorithmR(k, n + 1, sample, x)),
             (_: Any) => sample)(Finalizer.empty)


  // The sampling algorithm copied from the question.
  val rand = new scala.util.Random()

  def algorithmR[A](k: Int, n: Int, sample: Vector[A], x: A): Vector[A] = {
    if (sample.size < k) {
      sample :+ x // must keep first k elements
    } else {
      val r = rand.nextInt(n) + 1 // for simplicity, rand is global/stateful
      if (r <= k)
        sample.updated(r - 1, x) // sample is 0-index
      else
        sample
    }
  }

  // Construct an iterable of all `short` values, pipe it into our sampling
  // funcition, and run the combined pipe.
  {
    print(runPipe(Util.fromIterable(Short.MinValue to Short.MaxValue) >->
          sampleI(10)))
  }
}

Обновление: проблему можно было бы решить с помощью State, но нам нужно реализовать специальную свертку специально для State, которая знает, как это делать с постоянным пространством:

import scala.collection._
import scala.language.higherKinds
import scalaz._
import Scalaz._
import scalaz.std.iterable._

object Run extends App {
  // Folds in a state monad over a foldable
  def stateFold[F[_],E,S,A](xs: F[E],
                            f: (A, E) => State[S,A],
                            z: A)(implicit F: Foldable[F]): State[S,A] =
    State[S,A]((s: S) => F.foldLeft[E,(S,A)](xs, (s, z))((p, x) => f(p._2, x)(p._1)))


  // Sample a lazy collection view
  def sampleS[F[_],A](k: Int, xs: F[A])(implicit F: Foldable[F]):
                  State[Int,Vector[A]] =
    stateFold[F,A,Int,Vector[A]](xs, update(k), Vector())

  // update using State monad
  def update[A](k: Int) = {
    (acc: Vector[A], x: A) => State[Int, Vector[A]] {
        n => (n + 1, algorithmR(k, n + 1, acc, x)) // algR same as impure solution
    }
  }

  def algorithmR[A](k: Int, n: Int, sample: Vector[A], x: A): Vector[A] = ...

  {
    print(sampleS(10, (Short.MinValue to Short.MaxValue)).eval(0))
  }
}
person Petr    schedule 25.12.2013
comment
Ваш первый абзац совпадает с моим пониманием того, что происходит - монада ссылается на всю коллекцию со своим собственным набором замыканий размером O (N) и не может развернуть / освободить их, пока не будет предоставлено начальное состояние. Я считаю, что @Apocalisp говорит, что для соответствующей реализации сворачивания коллекция не повторяется до тех пор, пока не будет предоставлено начальное состояние и не будет запущен трамплин - элемент может быть освобожден, когда будет загружен следующий. - person David B.; 26.12.2013
comment
Тем не менее, подход Iteratee / pipeline, предложенный вами обоими, в любом случае должен избегать всех этих сложностей и головной боли, связанной с реализацией сворачивания. - person David B.; 26.12.2013
comment
@DavidB. Другой вариант (обновленный в ответе) - создать специальную функцию для сворачивания монады State в постоянном пространстве. Думаю, это что-то наполовину к трамвайному подходу к монаде. - person Petr; 26.12.2013