Один канал обработки, 2 источника ввода-вывода одного типа

В моем приложении GHC Haskell, использующем stm, network-conduit и conduit, у меня есть цепь для каждого сокета, которая автоматически разветвляется с использованием runTCPServer. Нити могут связываться с другими ветвями с помощью широковещательного TChan.

Это демонстрирует, как я хотел бы настроить «цепочку» канала:

введите здесь описание изображения

Итак, у нас есть два источника (каждый из которых привязан к вспомогательным каналам), которые создают объект Packet, который encoder примет и превратит в ByteString, а затем отправит сокет. У меня были большие трудности с эффективным (производительность) объединением двух входов.

Я был бы признателен, если бы кто-нибудь мог указать мне в правильном направлении.


Поскольку было бы грубо с моей стороны опубликовать этот вопрос без попытки, я помещу здесь то, что пробовал ранее;

Я написал/выбрал функцию, которая (блокируя) создает источник из TMChan (закрываемый канал);

-- | Takes a generic type of STM chan and, given read and close functionality,
--   returns a conduit 'Source' which consumes the elements of the channel.
chanSource 
    :: (MonadIO m, MonadSTM m)
    => a                    -- ^ The channel
    -> (a -> STM (Maybe b)) -- ^ The read function
    -> (a -> STM ())        -- ^ The close/finalizer function
    -> Source m b
chanSource ch readCh closeCh = ConduitM pull
    where close     = liftSTM $ closeCh ch
          pull      = PipeM $ liftSTM $ readCh ch >>= translate
          translate = return . maybe (Done ()) (HaveOutput pull close)

Аналогично, функция превращения Чана в раковину;

-- | Takes a stream and, given write and close functionality, returns a sink
--   which wil consume elements and broadcast them into the channel 
chanSink
    :: (MonadIO m, MonadSTM m)
    => a                 -- ^ The channel
    -> (a -> b -> STM()) -- ^ The write function
    -> (a -> STM())      -- ^ The close/finalizer function
    -> Sink b m ()
chanSink ch writeCh closeCh = ConduitM sink
    where close  = const . liftSTM $ closeCh ch
          sink   = NeedInput push close
          write  = liftSTM . writeCh ch
          push x = PipeM $ write x >> return sink

Тогда слить источники просто; разветвить 2 потока (чего я действительно не хочу делать, но какого черта), которые могут помещать свои новые элементы в один список, из которого я затем создаю источник;

-- | Merges a list of 'Source' objects, sinking them into a 'TMChan' and returns
--   a source which consumes the elements of the channel.
mergeSources
    :: (MonadIO m, MonadBaseControl IO m, MonadSTM m)
    => [Source (ResourceT m) a]             -- ^ The list of sources
    -> ResourceT m (Source (ResourceT m) a)
mergeSources sx = liftSTM newTMChan >>= liftA2 (>>) (fsrc sx) retn
    where push c s = s $$ chanSink c writeTMChan closeTMChan
          fsrc x c = mapM_ (\s -> resourceForkIO $ push c s) x
          retn c   = return $ chanSource c readTMChan closeTMChan

Несмотря на то, что мне удалось выполнить проверку типов с помощью этих функций, мне не удалось получить какое-либо использование этих функций для проверки типов;

-- | Helper which represents a conduit chain for each client connection
serverApp :: Application SessionIO
serverApp appdata = do
    use ssBroadcast >>= liftIO . atomically . dupTMChan >>= assign ssBroadcast
    -- appSource appdata $$ decoder $= protocol =$= encoder =$ appSink appdata
    mergsrc $$ protocol $= encoder =$ appSink appdata
    where chansrc = chanSource (use ssBroadcast) readTMChan closeTMChan
          mergsrc = mergeSources [appSource appdata $= decoder, chansrc]

-- | Structure which holds mutable information for clients
data SessionState = SessionState
    { _ssBroadcast     :: TMChan Packet -- ^ Outbound packet broadcast channel
    }

makeLenses ''SessionState

-- | A transformer encompassing both SessionReader and SessionState
type Session m = ReaderT SessionReader (StateT SessionState m)

-- | Macro providing Session applied to an IO monad
type SessionIO = Session IO

В любом случае, я считаю этот метод ошибочным — в нем много промежуточных списков и преобразований. Это не может быть хорошо для производительности. Ищу руководства.


PS. Насколько я понимаю, это не дубликат; Объединение каналов с несколькими входами , так как в моей ситуации оба источника производят того же типа, и мне все равно, из какого источника создается объект Packet, пока я не жду одного, пока у другого есть объекты, готовые к использованию.

ППС. Я прошу прощения за использование (и, следовательно, требование знаний) Lens в примере кода.


person kvanbere    schedule 26.05.2013    source источник
comment
Есть ли причина, по которой вы не используете Data.Conduit.TMChan из пакета stm-conduit? В нем есть все определяемые вами функции, включая mergeSources.   -  person Gabriel Gonzalez    schedule 26.05.2013
comment
На самом деле есть - я бы хотел, чтобы источник, который объединяет оба, закрывался, как только закрывается любой из источников. Пакет stm-conduit использует счетчики ссылок (и ожидает закрытия последнего источника, чтобы закрыть результирующий источник), что нежелательно. Закрытие сразу после того, как любой из источников становится недействительным, дает мне возможность, когда я закрываю свой глобальный TMChan, также своевременно закрывать каждый сокет.   -  person kvanbere    schedule 27.05.2013
comment
Пустая мысль: что произойдет, если вы возьмете mergeSources из TMChan, выбросите все, что связано с подсчетом ссылок, и замените бит decRefCount refcount кодом, закрывающим все источники?   -  person Iain    schedule 01.06.2013
comment
Я (вроде) пробовал это выше, но у меня были проблемы с проверкой типов при компиляции версий соответствующих функций stm-conduit на github, поэтому мне пришлось их сильно изменить (см. OP). Я попробую еще раз сегодня вечером, используя хакерские. Я много думал об этом в последнее время, и промежуточный Чан действительно может быть необходим.   -  person kvanbere    schedule 01.06.2013


Ответы (1)


Я не знаю, поможет ли это, но я попытался реализовать предложение Иэна и сделал вариант mergeSources', который останавливается, как только останавливается любой из каналов:

mergeSources' :: (MonadIO m, MonadBaseControl IO m)
              => [Source (ResourceT m) a] -- ^ The sources to merge.
              -> Int -- ^ The bound of the intermediate channel.
              -> ResourceT m (Source (ResourceT m) a)
mergeSources' sx bound = do
    c <- liftSTM $ newTBMChan bound
    mapM_ (\s -> resourceForkIO $
                    s $$ chanSink c writeTBMChan closeTBMChan) sx
    return $ sourceTBMChan c

(Это простое дополнение доступно здесь).

Некоторые комментарии к вашей версии mergeSources (отнеситесь к ним с долей скептицизма, может быть я что-то не так понял):

  • Использование ...TMChan вместо ...TBMChan кажется опасным. Если писатели быстрее, чем читатели, ваша куча взорвется. Глядя на вашу диаграмму, кажется, что это может легко произойти, если ваш одноранговый узел TCP недостаточно быстро считывает данные. Так что я бы определенно использовал ...TBMChan, возможно, с большой, но ограниченной границей.
  • Вам не нужно ограничение MonadSTM m. Все материалы STM упакованы в IO с

    liftSTM = liftIO . atomically
    

    Возможно, это немного поможет вам при использовании mergeSources' в serverApp.

  • Просто косметическая проблема, я нашел

    liftSTM newTMChan >>= liftA2 (>>) (fsrc sx) retn
    

    очень трудно читать из-за использования liftA2 в монаде (->) r. я бы сказал

    do
        c <- liftSTM newTMChan
        fsrc sx c
        retn c
    

    будет длиннее, но гораздо легче читать.

Не могли бы вы создать самостоятельный проект, в котором можно было бы играть с serverApp?

person Petr    schedule 05.07.2013
comment
Спасибо за совет. Я буду иметь это в виду (мне придется вернуться к проблеме в ближайшее время). - person kvanbere; 08.07.2013