Haskell Conduit Aeson: анализ больших файлов JSON и фильтрация соответствующих ключей/значений

Я написал приложение на Haskell, которое делает следующее:

  1. Рекурсивно перечислить каталог,
  2. Проанализируйте файлы JSON из списка каталогов,
  3. Найдите совпадающие пары ключ-значение и
  4. Возвращает имена файлов, в которых были найдены совпадения.

Моя первая версия этого приложения была самой простой, наивной версией, которую я мог написать, но я заметил, что использование пространства монотонно увеличивалось.

В итоге я перешел на conduit, и теперь мой основной функционал выглядит так:

conduitFilesFilter :: ProjectFilter -> Path Abs Dir -> IO [Path Abs File]
conduitFilesFilter projFilter dirname' = do
  (_, allFiles) <- listDirRecur dirname'
  C.runConduit $
    C.yieldMany allFiles
    .| C.filterMC (filterMatchingFile projFilter)
    .| C.sinkList

Теперь мое приложение имеет ограниченное использование памяти, но оно все еще довольно медленное. Из этого у меня два вопроса.

1)

Я использовал stack new для создания скелета для создания этого приложения, и оно по умолчанию использует параметры ghc -threaded -rtsopts -with-rtsopts=-N.

Удивительно (для меня) то, что приложение использует все доступные ему процессоры (около 40 на целевой машине), когда я фактически запускаю его. Однако я не писал какую-либо часть приложения для параллельного запуска (на самом деле я это рассматривал).

Что работает параллельно?

2)

Кроме того, большинство файлов JSON действительно большие (10 МБ), и, вероятно, их нужно просмотреть 500 000. Это означает, что моя программа очень медленная из-за декодирования Aeson. Моя идея состояла в том, чтобы запустить часть filterMatchingFile параллельно, но, глядя на библиотеку stm-conduit, я не вижу очевидного способа запуска этого промежуточного действия параллельно на нескольких процессорах.

Может ли кто-нибудь предложить способ разумно распараллелить мою функцию выше, используя stm-conduit или какие-либо другие средства?


Изменить

Я понял, что могу разбить свой readFile -> decodeObject -> runFilterFunction на отдельные части conduit и потом использовать там stm-conduit с ограниченным каналом. Может быть, я дам ему шанс ...


Я запустил свое приложение с +RTS -s (я перенастроил его на -N4) и вижу следующее:

 115,961,554,600 bytes allocated in the heap
  35,870,639,768 bytes copied during GC
      56,467,720 bytes maximum residency (681 sample(s))
       1,283,008 bytes maximum slop
             145 MB total memory in use (0 MB lost due to fragmentation)

                                     Tot time (elapsed)  Avg pause  Max pause
  Gen  0     108716 colls, 108716 par   76.915s  20.571s     0.0002s    0.0266s
  Gen  1       681 colls,   680 par    0.530s   0.147s     0.0002s    0.0009s

  Parallel GC work balance: 14.99% (serial 0%, perfect 100%)

  TASKS: 10 (1 bound, 9 peak workers (9 total), using -N4)

  SPARKS: 0 (0 converted, 0 overflowed, 0 dud, 0 GC'd, 0 fizzled)

  INIT    time    0.001s  (  0.007s elapsed)
  MUT     time   34.813s  ( 42.938s elapsed)
  GC      time   77.445s  ( 20.718s elapsed)
  EXIT    time    0.000s  (  0.010s elapsed)
  Total   time  112.260s  ( 63.672s elapsed)

  Alloc rate    3,330,960,996 bytes per MUT second

  Productivity  31.0% of total user, 67.5% of total elapsed

gc_alloc_block_sync: 188614
whitehole_spin: 0
gen[0].sync: 33
gen[1].sync: 811204

person erewok    schedule 18.01.2018    source источник
comment
1) Почти наверняка ничего. Вам нужно сделать параллелизм явным (Haskell просто упрощает распараллеливание вашего кода, а не делает это за вас).   -  person Alec    schedule 19.01.2018
comment
Привет @Алек. Я тоже так думал, но когда я смотрю на процесс в htop, там столько же дочерних процессов, сколько и процессоров, и каждый из них, похоже, что-то делает (в основном S). Вот что натолкнуло на вопрос. Он выполняет только параллельный GC (это что-то?)?   -  person erewok    schedule 19.01.2018
comment
Замените .| на buffer' и runConduit с runCConduit.   -  person user2407038    schedule 19.01.2018
comment
@erewok Parallel GC определенно вещь. (Это даже указано прямо в вашей статистике!) Я не уверен на 100%, но RTS может также выполнять некоторые потоки для управления вводом-выводом. Я сомневаюсь, что большая часть вашего реального пользовательского кода параллельна.   -  person MathematicalOrchid    schedule 19.01.2018
comment
@MathematicalOrchid спасибо за объяснение. Я думал, что это может происходить.   -  person erewok    schedule 19.01.2018


Ответы (3)


Судя по описанию вашей программы, нет причин для увеличения использования памяти. Я думаю, что это была случайная утечка памяти из-за пропущенных ленивых вычислений. Это легко обнаружить с помощью профилирования кучи: https://downloads.haskell.org/~ghc/latest/docs/html/users_guide/profiling.html#hp2ps-rendering-heap-profiles-to-postscript.. Другая возможная причина заключается в том, что среда выполнения не возвращает всю память ОС. До некоторого порога он будет удерживать память, пропорциональную самому большому обработанному файлу. Это может выглядеть как утечка памяти, если отслеживать размер RSS процесса.

Опция -A32m увеличивает размер питомника. Это позволяет вашей программе выделять больше памяти до запуска сборки мусора. Статистика показывает, что при сборке мусора остается очень мало памяти, поэтому чем реже это происходит, тем больше времени программа тратит на реальную работу.

person blaze    schedule 22.01.2018

По подсказке Майкла Сноймана из Haskell Cafe, который указал, что моя первая версия на самом деле не использует возможности потоковой передачи Conduit, я переписал свою версию приложения Conduit (без использования stm-conduit). Это было большим улучшением: моя первая версия Conduit работала со всеми данными, а я этого не осознавал.

Я также увеличил размер питомника, и это повысило мою производительность за счет того, что я стал реже собирать мусор.

Моя пересмотренная функция в итоге выглядела так:

module Search where

import           Conduit               ((.|))
import qualified Conduit               as C
import           Control.Monad
import           Control.Monad.IO.Class   (MonadIO, liftIO)
import           Control.Monad.Trans.Resource (MonadResource)
import qualified Data.ByteString       as B
import           Data.List             (isPrefixOf)
import           Data.Maybe            (fromJust, isJust)
import           System.Path.NameManip (guess_dotdot, absolute_path)
import           System.FilePath       (addTrailingPathSeparator, normalise)
import           System.Directory      (getHomeDirectory)

import           Filters


sourceFilesFilter :: (MonadResource m, MonadIO m) => ProjectFilter -> FilePath -> C.ConduitM () String m ()
sourceFilesFilter projFilter dirname' =
    C.sourceDirectoryDeep False dirname'
    .| parseProject projFilter

parseProject :: (MonadResource m, MonadIO m) => ProjectFilter -> C.ConduitM FilePath String m ()
parseProject (ProjectFilter filterFunc) = do
  C.awaitForever go
  where
    go path' = do
      bytes <- liftIO $ B.readFile path'
      let isProj = validProject bytes
      when (isJust isProj) $ do
        let proj' = fromJust isProj
        when (filterFunc proj') $ C.yield path'

Мой main просто запускает канал и печатает те, которые проходят фильтр:

mainStreamingConduit :: IO ()
mainStreamingConduit = do
  options <- getRecord "Search JSON Files"
  let filterFunc = makeProjectFilter options
  searchDir <- absolutize (searchPath options)
  itExists <- doesDirectoryExist searchDir
  case itExists of
    False -> putStrLn "Search Directory does not exist" >> exitWith (ExitFailure 1)
    True -> C.runConduitRes $ sourceFilesFilter filterFunc searchDir .| C.mapM_ (liftIO . putStrLn)

Я запускаю это так (обычно без статистики):

stack exec search-json -- --searchPath $FILES --name NAME +RTS -s -A32m -n4m

Без увеличения размера питомника я получаю производительность около 30%. Однако с учетом вышеизложенного это выглядит так:

  72,308,248,744 bytes allocated in the heap
     733,911,752 bytes copied during GC
       7,410,520 bytes maximum residency (8 sample(s))
         863,480 bytes maximum slop
             187 MB total memory in use (27 MB lost due to fragmentation)

                                     Tot time (elapsed)  Avg pause  Max pause
  Gen  0       580 colls,   580 par    2.731s   0.772s     0.0013s    0.0105s
  Gen  1         8 colls,     7 par    0.163s   0.044s     0.0055s    0.0109s

  Parallel GC work balance: 35.12% (serial 0%, perfect 100%)

  TASKS: 10 (1 bound, 9 peak workers (9 total), using -N4)

  SPARKS: 0 (0 converted, 0 overflowed, 0 dud, 0 GC'd, 0 fizzled)

  INIT    time    0.001s  (  0.006s elapsed)
  MUT     time   26.155s  ( 31.602s elapsed)
  GC      time    2.894s  (  0.816s elapsed)
  EXIT    time   -0.003s  (  0.008s elapsed)
  Total   time   29.048s  ( 32.432s elapsed)

  Alloc rate    2,764,643,665 bytes per MUT second

  Productivity  90.0% of total user, 97.5% of total elapsed

gc_alloc_block_sync: 3494
whitehole_spin: 0
gen[0].sync: 15527
gen[1].sync: 177

Я все еще хотел бы выяснить, как распараллелить часть filterProj . parseJson . readFile, но пока меня это устраивает.

person erewok    schedule 19.01.2018

Я понял, как запустить это приложение с помощью stm-conduit с помощью вики Haskell по параллелизму и Ответ переполнения стека, в котором говорится об ожидании завершения потоков до выхода main.

Это работает следующим образом: я создаю канал, содержащий все имена файлов, над которыми нужно работать. Затем я разветвляю кучу потоков, каждый из которых запускает Conduit с каналом пути к файлу как Source. Я отслеживаю все дочерние потоки и жду их завершения.

Может быть, это решение будет полезно для кого-то еще?

Не все мои функции фильтра нижнего уровня присутствуют, но суть в том, что у меня есть Conduit, который проверяет некоторый JSON. Если он проходит, то это yields FilePath.

Вот мой основной полностью:

{-# LANGUAGE DeriveGeneric     #-}
{-# LANGUAGE OverloadedStrings #-}


module Main where

import           Conduit                      ((.|))
import qualified Conduit                      as C
import           Control.Concurrent
import           Control.Monad                (forM_)
import           Control.Monad.IO.Class       (liftIO)
import           Control.Concurrent.STM
import           Control.Monad.Trans.Resource (register)

import qualified Data.Conduit.TMChan          as STMChan
import           Data.Maybe                   (isJust, fromJust)
import qualified Data.Text                    as T
import           Options.Generic
import           System.Directory            (doesDirectoryExist)
import           System.Exit

import           Search


data Commands =
  Commands { searchPath  :: String
           , par         :: Maybe Int
           , project     :: Maybe T.Text
           , revision    :: Maybe T.Text
           } deriving (Generic, Show)

instance ParseRecord Commands

makeProjectFilter :: Commands -> ProjectFilter
makeProjectFilter options =
  let stdFilts = StdProjectFilters
        (ProjName <$> project options)
        (Revision <$> revision options)
  in makeProjectFilters stdFilts

main :: IO ()
main = do
  options <- getRecord "Search JSON Files"
  -- Would user like to run in parallel?
  let runner = if isJust $ par options
        then mainSTMConduit (fromJust $ par options)
        else mainStreamingConduit

  -- necessary things to search files: search path, filters to use, search dir exists
  let filterFunc = makeProjectFilter options
  searchDir <- absolutize (searchPath options)
  itExists <- doesDirectoryExist searchDir

  -- Run it if it exists
  case itExists of
    False -> putStrLn "Search Directory does not exist" >> exitWith (ExitFailure 1)
    True -> runner filterFunc searchDir

-- Single-threaded version with bounded memory usage
mainStreamingConduit :: ProjectFilter -> FilePath -> IO ()
mainStreamingConduit filterFunc searchDir = do
  C.runConduitRes $
    sourceFilesFilter filterFunc searchDir .| C.mapM_C (liftIO . putStrLn)

-- Multiple-threaded version of this program using channels from `stm-conduit`
mainSTMConduit :: Int -> ProjectFilter -> FilePath -> IO ()
mainSTMConduit nrWorkers filterFunc searchDir = do
  children <- newMVar []
  inChan <- atomically $ STMChan.newTBMChan 16
  _ <- forkIO . C.runResourceT $ do
         _ <- register $ atomically $ STMChan.closeTBMChan inChan
         C.runConduitRes $ C.sourceDirectoryDeep False searchDir .| STMChan.sinkTBMChan inChan True
  forM_ [1..nrWorkers] (\_ -> forkChild children $ runConduitChan inChan filterFunc)
  waitForChildren children
  return ()


runConduitChan :: STMChan.TBMChan FilePath -> ProjectFilter -> IO ()
runConduitChan inChan filterFunc = do
  C.runConduitRes $
       STMChan.sourceTBMChan inChan
       .| parseProject filterFunc
       .| C.mapM_C (liftIO . putStrLn)

waitForChildren :: MVar [MVar ()] -> IO ()
waitForChildren children = do
  cs <- takeMVar children
  case cs of
    []   -> return ()
    m:ms -> do
      putMVar children ms
      takeMVar m
      waitForChildren children

forkChild :: MVar [MVar ()] -> IO () -> IO ThreadId
forkChild children io = do
  mvar <- newEmptyMVar
  childs <- takeMVar children
  putMVar children (mvar:childs)
  forkFinally io (\_ -> putMVar mvar ())

Примечание. Я использую stm-conduit 3.0.0 с conduit 1.12.1, поэтому мне нужно было включить логический аргумент:

STMChan.sinkTBMChan inChan True

В версии 4.0.0 из stm-conduit эта функция автоматически закрывает канал, поэтому логический аргумент был удален.

person erewok    schedule 26.02.2018