Почему в моей программе параллельного обхода на Haskell происходит утечка памяти?

Рассмотрим следующую программу на Haskell (я делаю это в основном для обучения):

import qualified Control.Concurrent.MSem as Sem
import System.Environment (getArgs)
import Control.Concurrent (forkIO)
import Control.Monad

-- Traverse with maximum n threads
parallelTraverse :: Foldable a => Int -> (b -> IO()) -> a b -> IO ()
parallelTraverse n action values = do
  sem <- Sem.new n
  forM_ values $ \value -> Sem.with sem (forkIO $ action value)

main :: IO ()
main = do
  args <- getArgs
  let nThreads = read . head $ args :: Int
  parallelTraverse nThreads print [(1::Int)..]

когда я его запускаю, память быстро вылезает на несколько гб. Я пробовал различные комбинации, чтобы убедиться, что я отбрасываю результаты промежуточных вычислений (действия печати). Почему до сих пор не хватает места?


person static_rtti    schedule 07.09.2015    source источник
comment
Вы не предотвратили одновременный запуск неограниченного числа потоков. Внимательно прочитайте свою программу.   -  person Reid Barton    schedule 07.09.2015
comment
@ReidBarton: кажется, я понял: forkIO немедленно возвращается, что делает семафор бесполезным. Вернемся к дизайнерскому столу :)   -  person static_rtti    schedule 07.09.2015


Ответы (1)


Во-первых, у вас очевидная ошибка в следующем фрагменте:

Sem.with sem (forkIO $ action value)

Вы обращаетесь к семафору из главного потока вокруг операции «форк», а не к действию там. Ниже приведен правильный способ его реализации:

forkIO (Sem.with sem (action value))

То есть для обращения к семафору из контекста разветвленного потока.

Во-вторых, в следующем коде вы вызываете операцию parallelTraverse для бесконечного списка:

parallelTraverse nThreads print [(1::Int)..]

Что приводит к бесконечному разветвлению потоков. А поскольку операция forkIO выполняется для вызывающего потока практически мгновенно, неудивительно, что у вас скоро закончатся ресурсы.


Чтобы использовать семафор для ограничения количества рабочих потоков, шаблон with в вашем случае просто не подойдет. Вместо этого вы должны использовать явную комбинацию wait и signal и не забывать правильно обрабатывать исключения (если вы их ожидаете). Например.,:

parallelTraverse :: Foldable a => Int -> (b -> IO()) -> a b -> IO ()
parallelTraverse n action values = do
  sem <- Sem.new n
  forM_ values $ \value -> do
    Sem.wait sem
    forkIO $ finally (action value) (Sem.signal sem)
person Nikita Volkov    schedule 07.09.2015
comment
Моя цель — ограничить количество одновременных потоков, но теперь я понимаю, почему это не работает. Бесконечный список задуман, потому что я хочу, чтобы моя программа могла обрабатывать бесконечный поток действий. - person static_rtti; 07.09.2015
comment
Большое спасибо. Мне просто нужна была небольшая дополнительная помощь, чтобы понять, что мне нужно ждать и сигнализировать на двух сторонах forkIO, поэтому (если я правильно понимаю) with нельзя использовать в этом случае. - person static_rtti; 07.09.2015