Ленивое разделение

У меня есть источник элементов, и я хочу отдельно обрабатывать их прогоны, имеющие одинаковое значение ключевой функции. В Python это выглядело бы так

for key_val, part in itertools.groupby(src, key_fn):
  process(key_val, part)

Это решение совершенно ленивое, т.е. если process не пытается сохранить содержимое всего part, код будет выполняться в O(1) памяти.

Решение Clojure

(doseq [part (partition-by key-fn src)]
  (process part))

менее ленив: он осознает каждую часть полностью. Проблема в том, что src может иметь очень длинные серии элементов с одинаковым значением key-fn, и их понимание может привести к OOM.

Я нашел это обсуждение, в котором утверждается, что следующая функция (слегка изменен для согласованности имен внутри поста) достаточно ленив

(defn lazy-partition-by [key-fn coll]
  (lazy-seq
    (when-let [s (seq coll)]
      (let [fst (first s)
            fv (key-fn fst)
            part (lazy-seq (cons fst (take-while #(= fv (key-fn %)) (next s))))]
        (cons part (lazy-partition-by key-fn (drop-while #(= fv (key-fn %)) s)))))))

Однако я не понимаю, почему он не страдает от OOM: обе части cons-ячейки содержат ссылку на s, поэтому, пока process потребляет part, реализуется s, но не сборщик мусора. Он получит право на сборку мусора только тогда, когда drop-while пересекает part.

Итак, мои вопросы:

  1. Правильно ли я, что lazy-partition-by недостаточно ленив?
  2. Существует ли реализация partition-by с гарантированными требованиями к памяти, при условии, что я не буду иметь никаких ссылок на предыдущий part к тому времени, когда я начну реализовывать следующий?

РЕДАКТИРОВАТЬ: Вот достаточно ленивая реализация в Haskell:

lazyPartitionBy :: Eq b => (a -> b) -> [a] -> [[a]]
lazyPartitionBy _ [] = []
lazyPartitionBy keyFn xl@(x:_) = let
  fv = keyFn x
  (part, rest) = span ((== fv) . keyFn) xl
  in part : lazyPartitionBy keyFn rest

Как видно из span реализации , part и rest неявно разделяют состояние. Интересно, можно ли перевести этот метод на Clojure.


person tempestadept    schedule 14.07.2014    source источник


Ответы (3)


Хотя этот вопрос вызывает очень интересные размышления о проектировании языков, практическая проблема заключается в том, что вы хотите обрабатывать разделы в постоянной памяти. И практическая проблема решается небольшой инверсией.

Вместо того, чтобы обрабатывать результат функции, возвращающей последовательность разделов, передайте функцию обработки в функцию, которая создает разделы. Затем вы можете контролировать состояние ограниченным образом.

Сначала мы предоставим способ объединить потребление последовательности с состоянием хвоста.

(defn fuse [coll wick]
  (lazy-seq 
   (when-let [s (seq coll)]
     (swap! wick rest)
     (cons (first s) (fuse (rest s) wick)))))

Потом доработанная версия partition-by

(defn process-partition-by [processfn keyfn coll] 
  (lazy-seq
    (when (seq coll)
      (let [tail (atom (cons nil coll))
            s (fuse coll tail)
            fst (first s)
            fv (keyfn fst)
            pred #(= fv (keyfn %))
            part (take-while pred s)
            more (lazy-seq (drop-while pred @tail))] 
        (cons (processfn part) 
              (process-partition-by processfn keyfn more))))))

Примечание. Для потребления памяти O (1) processfn должен быть активным потребителем! Таким образом, хотя (process-partition-by identity key-fn coll) совпадает с (partition-by key-fn coll), поскольку identity не использует раздел, потребление памяти не является постоянным.


Тесты

(defn heavy-seq [] 
  ;adjust payload for your JVM so only a few fit in memory
  (let [payload (fn [] (long-array 20000000))]
   (map #(vector % (payload)) (iterate inc 0))))

(defn my-process [s] (reduce + (map first s)))

(defn test1 []
  (doseq [part (partition-by #(quot (first %) 10) (take 50 (heavy-seq)))]
    (my-process part)))

(defn test2 []
  (process-partition-by 
    my-process #(quot (first %) 20) (take 200 (heavy-seq))))

so.core=> (test1)
OutOfMemoryError Java heap space  [trace missing]

so.core=> (test2)
(190 590 990 1390 1790 2190 2590 2990 3390 3790)
person A. Webb    schedule 16.07.2014
comment
Правильно ли я, что если я заменю (lazy-seq @tail) на (lazy-seq (drop-while #(= fv (keyfn %)) @tail)), это будет работать также в том случае, когда processfn не использует свой аргумент полностью? - person tempestadept; 17.07.2014
comment
Также похоже, что версия без drop-while теряет первый элемент каждой части после первого. - person tempestadept; 17.07.2014
comment
@tempestadept Спасибо за отладку, исправил эти проблемы с помощью правок выше. (1) drop-while должен быть (lazy-seq (drop-while ...)), чтобы предотвратить нетерпеливую оценку аргументов для drop-while. (2) Ошибка с уменьшением на единицу возникла из-за того, что take-while должен проверить первое ложное значение. Таким образом, нам нужно добавить в хвост фиктивное первое значение, чтобы оставаться на шаг впереди. - person A. Webb; 17.07.2014

Эмпирическое правило, которое я использую в этих сценариях (т. Е. Тех, в которых вы хотите, чтобы одна входная последовательность создавала несколько выходных последовательностей), заключается в том, что из следующих трех желаемых свойств вы обычно можете иметь только два:

  1. Эффективность (пройти входную последовательность только один раз, не удерживая голову)
  2. Лень (производить элементы только по запросу)
  3. Нет общего изменяемого состояния

Версия в clojure.core выбирает (1,3), но отказывается от (2), создавая сразу весь раздел. И Python, и Haskell выбирают (1,2), хотя это не сразу очевидно: разве в Haskell нет вообще изменяемого состояния? Что ж, его ленивая оценка всего (а не только последовательностей) означает, что все выражения являются преобразователями, которые начинаются как чистые листы и записываются только тогда, когда их значение необходимо; реализация span, как вы говорите, использует один и тот же преобразователь span p xs' в обеих своих выходных последовательностях, так что в зависимости от того, какая из них сначала нужна, "отправляет" его в результат другой последовательности, выполняя действие на расстоянии, которое необходимо для сохранить другие прекрасные свойства.

Как вы отметили, альтернативная реализация Clojure, с которой вы связались, выбирает (2,3).

Проблема в том, что для partition-by отклонение либо (1) , либо (2) означает, что вы держите начало некоторой последовательности: либо вход, либо один из выходов. Поэтому, если вам нужно решение, в котором можно обрабатывать произвольно большие разделы произвольно большого ввода, вам нужно выбрать (1,2). В Clojure это можно сделать несколькими способами:

  1. Возьмите подход Python: верните что-то более похожее на итератор, чем на seq - seqs дают более строгие гарантии отсутствия мутации и обещают, что вы можете безопасно пройти их несколько раз и т.д. и т.д. Если вместо seq seqs вы вернете итератор из итераторы, то потребление элементов из любого одного итератора может свободно изменять или аннулировать другой (и). Это гарантирует, что потребление происходит по порядку и что память может быть освобождена.
  2. Возьмите подход Haskell: вручную проанализируйте все, с большим количеством обращений к delay, и потребуйте, чтобы клиент вызывал force столько раз, сколько необходимо для получения данных. Это будет намного уродливее в Clojure и значительно увеличит глубину вашего стека (использование этого на нетривиальном вводе, вероятно, взорвет стек), но теоретически это возможно.
  3. Напишите что-нибудь более приправленное Clojure (но все же довольно необычное), имея несколько изменяемых объектов данных, которые координируются между выходными последовательностями, каждый из которых обновляется по мере необходимости, когда что-то запрашивается от любого из них.

Я почти уверен, что любой из этих трех подходов возможен, но, честно говоря, все они довольно сложные и отнюдь не естественные. Абстракция последовательности Clojure просто не упрощает создание структуры данных, которая вам нужна. Я советую вам, если вам нужно что-то подобное, а разделы могут быть слишком большими, чтобы их было удобно разместить, просто примите немного другой формат и сделайте немного больше бухгалтерского учета: избегайте дилеммы (1,2,3), не создавая несколько выходные последовательности у всех!

Поэтому вместо ((2 4 6 8) (1 3 5) (10 12) (7)) формат вывода для чего-то вроде (partition-by even? [2 4 6 8 1 3 5 10 12 7]), вы можете принять немного более уродливый формат: ([::key true] 2 4 6 8 [::key false] 1 3 5 [::key true] 10 12 [::key false] 7). Это несложно произвести и потребить, хотя писать это немного долго и утомительно.

Вот одна из разумных реализаций производящей функции:

(defn lazy-partition-by [f coll]
  (lazy-seq
    (when (seq coll)
      (let [x (first coll)
            k (f x)]
        (list* [::key k] x
         ((fn part [k xs]
            (lazy-seq
              (when (seq xs)
                (let [x (first xs)
                      k' (f x)]
                  (if (= k k')
                    (cons x (part k (rest xs)))
                    (list* [::key k'] x (part k' (rest xs))))))))
          k (rest coll)))))))

И вот как его использовать, сначала определив общий reduce-grouped, который скрывает детали формата группировки, а затем пример функции count-partition-sizes для вывода ключа и размера каждого раздела без сохранения каких-либо последовательностей в памяти:

(defn reduce-grouped [f init groups]
  (loop [k nil, acc init, coll groups]
    (if (empty? coll)
      acc
      (if (and (coll? (first coll)) (= ::key (ffirst coll)))
        (recur (second (first coll)) acc (rest coll))
        (recur k (f k acc (first coll)) (rest coll))))))

(defn count-partition-sizes [f coll]
  (reduce-grouped (fn [k acc _]
                    (if (and (seq acc) (= k (first (peek acc))))
                      (conj (pop acc) (update-in (peek acc) [1] inc))
                      (conj acc [k 1])))
                  [] (lazy-partition-by f coll)))

user> (lazy-partition-by even? [2 4 6 8 1 3 5 10 12 7])
([:user/key true] 2 4 6 8 [:user/key false] 1 3 5 [:user/key true] 10 12 [:user/key false] 7)
user> (count-partition-sizes even? [2 4 6 8 1 3 5 10 12 7])
[[true 4] [false 3] [true 2] [false 1]]

Изменить: взглянув на него еще раз, я не совсем уверен, что мой reduce-grouped намного полезнее, чем (reduce f init (map g xs)), поскольку он не дает вам четкого указания на то, когда ключ меняется. Так что, если вам действительно нужно знать, когда группа меняется, вам понадобится более умная абстракция или использовать мою оригинальную lazy-partition-by без какой-либо "умной" оболочки.

person amalloy    schedule 14.07.2014
comment
Я хотел бы предоставить клиентскому коду библиотеки интерфейс, который позволяет писать process функции, которые принимают последовательность элементов (с одним и тем же ключом). process затем вызывается из-за побочных эффектов. Думаю, мне больше всего подходит 3-й способ. Не могли бы вы в общих чертах рассказать, как это реализовать? - person tempestadept; 15.07.2014
comment
Не совсем. Как я уже сказал, это действительно сложно. Я вложил в него 15 или 20 строк, пока писал свой первоначальный ответ, а потом понял, что все еще не понимаю, что пишу. - person amalloy; 15.07.2014

Правильно ли я насчет ленивого разбиения, недостаточно ленивого?

Что ж, есть разница между ленью и использованием памяти. Последовательность может быть ленивой и по-прежнему требовать много памяти - см., Например, реализацию clojure.core/distinct, которая использует набор для запоминания всех ранее наблюдаемых значений в последовательности. Но да, ваш анализ требований к памяти для lazy-partition-by верен - вызов функции для вычисления заголовка второго раздела сохранит заголовок первого раздела, а это означает, что реализация первого раздела приводит к его сохранению в памяти. Это можно проверить с помощью следующего кода:

user> (doseq [part (lazy-partition-by :a
                      (repeatedly
                         (fn [] {:a 1 :b (long-array 10000000)})))]
        (dorun part))
; => OutOfMemoryError Java heap space

Поскольку ни doseq, ни dorun не сохраняют заголовок, это просто работало бы вечно, если бы lazy-partition-by было O (1) в памяти.

Существует ли реализация разделения по памяти с гарантированными требованиями к памяти, при условии, что я не буду иметь никаких ссылок на предыдущую часть к тому времени, когда я начну реализовывать следующую?

Было бы очень сложно, если не невозможно, написать такую ​​реализацию чисто функциональным образом, которая работала бы в общем случае. Учтите, что общая реализация lazy-partition-by не может делать никаких предположений о том, когда (или если) будет реализован раздел. Единственный гарантированно правильный способ найти начало второго раздела, за исключением введения некоторой неприятной информации о состоянии для отслеживания того, какая часть первого раздела была реализована, - это запомнить, где начался первый раздел, и выполнить сканирование вперед по запросу.

В особом случае, когда вы обрабатываете записи по одной на предмет побочных эффектов и хотите, чтобы они были сгруппированы по ключу (как подразумевается при использовании doseq выше), вы можете рассмотреть что-то вроде _9 _ / _ 10_, который поддерживает состояние и переустанавливает его при смене ключа.

person Alex    schedule 14.07.2014
comment
Ну я понимаю разницу между ленью и гарантиями памяти. Я вижу, что выбрал не самые лучшие слова для своих вопросов. Я бы использовал метод loop/recur, но я пишу библиотечную функцию, которая будет принимать process в качестве аргумента. Я не вижу очевидного способа сделать это. Что касается избегания явного сохранения состояния - я добавлю к исходному вопросу реализацию Haskell, которая, как мне кажется, достаточно ленивая. - person tempestadept; 14.07.2014
comment
Моя первая попытка перенести вашу функцию Haskell на Clojure столкнулась с той же проблемой OOM, но тогда я не очень разбираюсь в Haskell. - person Alex; 14.07.2014