Способы повышения производительности линейных кабелепроводов

Я использую haskell для построчной обработки данных, т.е. задач, где можно применить sed, awk и подобные инструменты. В качестве простого примера давайте добавим 000 к каждой строке из стандартного ввода.

У меня есть три альтернативных способа выполнить задачу:

  1. Ленивый ввод-вывод с ленивыми ByteStrings
  2. Трубопровод на основе линии.
  3. Кондуит на основе фрагментов с чистой строгой обработкой ByteString внутри.

example.hs:

{-# LANGUAGE NoImplicitPrelude #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE FlexibleContexts #-}

import ClassyPrelude.Conduit
import qualified Data.ByteString.Char8 as B8
import qualified Data.ByteString.Lazy.Char8 as BL8
import qualified Data.Conduit.Binary as CB

main = do
  [arg] <- getArgs
  case arg of

    "lazy" -> BL8.getContents >>= BL8.putStr . BL8.unlines . map ("000" ++) . BL8.lines

    "lines" -> runConduitRes $ stdinC .| CB.lines .|
      mapC ("000" ++) .| mapC (`snoc` 10) .| stdoutC

    "chunks" -> runConduitRes $ stdinC .| lineChunksC .|
      mapC (B8.unlines . (map ("000" ++)) . B8.lines) .| stdoutC


lineChunksC :: Monad m => Conduit ByteString m ByteString
lineChunksC = await >>= maybe (return ()) go
  where
  go acc = if
    | Just (_, 10) <- unsnoc acc -> yield acc >> lineChunksC
    | otherwise -> await >>= maybe (yield acc) (go' . breakAfterEOL)
    where
    go' (this, next) = let acc' = acc ++ this in if null next then go acc' else yield acc' >> go next

breakAfterEOL :: ByteString -> (ByteString, ByteString)
breakAfterEOL = uncurry (\x -> maybe (x, "") (first (snoc x)) . uncons) . break (== 10)
$ stack ghc --package={classy-prelude-conduit,conduit-extra} -- -O2 example.hs -o example
$ for cmd in lazy lines chunks; do echo $cmd; time -p seq 10000000 | ./example $cmd > /dev/null; echo; done
lazy
real 2.99
user 3.06
sys 0.07

lines
real 3.30
user 3.36
sys 0.06

chunks
real 1.83
user 1.95
sys 0.06

(Результаты одинаковы для нескольких прогонов, а также для строк с несколькими числами).

Таким образом, chunks в 1,6 раза быстрее, чем lines, что немного быстрее, чем lazy. Это означает, что конвейеры могут быть быстрее, чем простые строки байтов, но накладные расходы конвейеров слишком велики, когда вы разбиваете фрагменты на короткие строки.

Что мне не нравится в подходе chunks, так это то, что он смешивает как проводник, так и чистый мир, и это затрудняет его использование для более сложных задач.

Вопрос в том, не упустил ли я простое и элегантное решение, которое позволило бы мне писать эффективный код так же, как при подходе lines?

EDIT1: по предложению @Michael я объединил два mapC в один mapC (("000" ++). (snoc10)) в решении lines, чтобы число труб (.|) было одинаковым между lines и chunks. Это заставило его работать немного лучше (с 3,3 с до 2,8 с), но все же значительно медленнее, чем chunks.

Также я попробовал более старый Conduit.Binary.lines, который Майкл предложил в комментариях, и он также немного повышает производительность, примерно на 0,1 с.

EDIT2: исправлено lineChunksC, поэтому оно работает с очень маленькими фрагментами, например.

> runConduitPure $ yield ("\nr\n\n"::ByteString) .| concatC .| mapC singleton .| lineChunksC .| sinkList 
["\n","r\n","\n"]

person modular    schedule 29.10.2016    source источник
comment
Какова цель mapC (`snoc` 10)?   -  person chepner    schedule 29.10.2016
comment
@chepner 10 - это значение Word8 для '\n', поэтому он добавляет символ новой строки обратно к каждой строке.   -  person modular    schedule 29.10.2016
comment
Это ошибка, для начала. Предположим, я разбиваю исходный код на однобайтовые строки байтов. stdinC' = (stdinC :: Source (ResourceT IO) B.ByteString) .| concatC .| mapC B.singleton Это не влияет на семантику stdinC, но нарушает эту функцию, но не функцию трубопроводных линий.   -  person Michael    schedule 29.10.2016
comment
Когда это будет исправлено, окажется, что при склеивании однобайтовых цепочек байтов в строки ваша функция chunks выделяет столько промежуточных цепочек байтов, сколько байтов в строке. Рационально, когда вы ожидаете один или два фрагмента в строке. Это, конечно, достаточно типично.   -  person Michael    schedule 29.10.2016
comment
Мне приходит в голову, что старый трубопровод lines обладал этим свойством; см. этот патч github.com/snoyberg/conduit/pull/209 Возможно, старая версия была быстрее для входных данных, подобных вашему, где у нас есть тонна коротких строк, состоящих из одного фрагмента.   -  person Michael    schedule 29.10.2016
comment
@Michael, кажется, что старый CB.lines работает лучше для меня (ускоряет работу на ~ 5%), но в любом случае большая часть времени тратится на конвейер (предохранитель, >>=).   -  person modular    schedule 29.10.2016
comment
@ Майкл, я думаю, ты имеешь в виду, что ошибка в моем lineChunksC? Вполне вероятно, я просто писал для решения chunks и ни на чем другом не тестировал.   -  person modular    schedule 29.10.2016
comment
Да, если ему даются байты крошечными кусками, он завершается через две строки.   -  person Michael    schedule 29.10.2016
comment
Обычная функция канала ускоряется, если вы усваиваете карты, пишущие mapC (("000" ++). (`snoc` 10)) вместо mapC ("000" ++) .| mapC (`snoc` 10). Это делается неявно другим.   -  person Michael    schedule 29.10.2016
comment
Конечно, существует грубое правило, согласно которому большее количество использований .| делает программу более сложной, если только они не сливаются. Ваш chunks использует три, но ваш lines использует четыре. Это не имеет ничего общего с функцией линий.   -  person Michael    schedule 29.10.2016
comment
@Майкл, посмотри мое редактирование. Объединение двух mapC повышает производительность, но не радикально, поэтому при построчной обработке конвейера по-прежнему наблюдается серьезная потеря производительности.   -  person modular    schedule 30.10.2016
comment
Пока вы не исправите реализацию lineChunksC, это будет невозможно проверить. Попробуйте, например. yield ("\nr\n\n"::B.ByteString) =$= concatC =$= mapC B.singleton вместо stdinC. Это важно для бенчмаркинга.   -  person Michael    schedule 31.10.2016
comment
@Michael Я обновил ответ исправленной версией. Это вообще не изменило время (неудивительно, так как с обычным stdin он вызывается только один раз каждые N килобайт)   -  person modular    schedule 31.10.2016
comment
Верно, в некоторых приятных случаях это быстро, поскольку там, где это возможно, используется ByteString.lines, но, как и ожидалось, если я передам ему файл из 10 миллионов t без разрывов строки, он будет намного медленнее sprunge.us/APeK Если я даю файл в однобайтовых фрагментах char за char, он не завершается через 5 минут, так как он выделяет 10M отдельных bytestrings, в то время как другой делается за 5 секунд. Я думаю, что это квадратично по количеству фрагментов в строке. Вот мой источник sprunge.us/gVQG   -  person Michael    schedule 31.10.2016
comment
@Майкл, спасибо, это хороший пример. Создание строгой строки байтов из множества частей будет медленным.   -  person modular    schedule 31.10.2016


Ответы (1)


Я предполагаю, что для «линий» часть mapC ("000" ++) .| mapC (`snoc` 10) выполняет большую работу.

Объединение нескольких строгих ByteStrings в другой строгий ByteString дорого обходится. Объединение их в ленивый ByteString, как правило, более эффективно.

Чтобы избежать этих затрат, вы можете выдать каждую часть отдельно вниз по течению как строгое ByteString (но имейте в виду, что тогда мы больше не говорим о «линиях»).

В качестве альтернативы, выведите каждую преобразованную строку как ленивый ByteString нижестоящий поток.


Вопрос в том, не упустил ли я простое и элегантное решение, которое позволило бы мне писать эффективный код так же, как и при линейном подходе?

У некоторых потоковых библиотек есть интересная особенность: вы можете разграничивать строки в потоке и манипулировать ими без необходимости материализовать целые строки в памяти в любой точке.

Здесь я использую потоковую передачу и streaming-bytestring, потому что я лучше знаком с ними.

В модуле Data.ByteString.Streaming.Char8 из streaming-bytestring у нас есть файл lines:

lines :: Monad m => ByteString m r -> Stream (ByteString m) m r

lines превращает ByteString в связанный поток ByteStrings при делении на символы новой строки. Результирующие строки не содержат символов новой строки. Это действительно потоковые строки, которые только разбивают куски и, таким образом, никогда не увеличивают использование памяти.

Суть в том, что ByteString m r уже является потоковым типом! Итак, эта версия lines преобразует поток в «поток потоков». И мы можем добраться до «следующего потока» (следующей строки), только исчерпав «текущий поток» (текущую строку).

Ваш пример "lines" может быть записан как:

{-# language OverloadedStrings #-}
module Main where

import Control.Applicative ((*>))
import Streaming
import qualified Streaming.Prelude as S
import qualified Data.ByteString.Streaming.Char8 as Q

main :: IO ()
main = Q.stdout
     . Q.unlines
     . S.maps (\line -> "000" *> line)
     . Q.lines
     $ Q.stdin
person danidiaz    schedule 29.10.2016
comment
Спасибо за ответ, я обязательно проверю streaming. Но после добавления вашего кода в бенчмарк результаты оказались на одном уровне с моим решением для линий (+/- 5%), поэтому в моем случае не было немедленных улучшений. - person modular; 29.10.2016