В моем приложении GHC Haskell
, использующем stm, network-conduit и conduit, у меня есть цепь для каждого сокета, которая автоматически разветвляется с использованием runTCPServer
. Нити могут связываться с другими ветвями с помощью широковещательного TChan.
Это демонстрирует, как я хотел бы настроить «цепочку» канала:
Итак, у нас есть два источника (каждый из которых привязан к вспомогательным каналам), которые создают объект Packet
, который encoder
примет и превратит в ByteString
, а затем отправит сокет. У меня были большие трудности с эффективным (производительность) объединением двух входов.
Я был бы признателен, если бы кто-нибудь мог указать мне в правильном направлении.
Поскольку было бы грубо с моей стороны опубликовать этот вопрос без попытки, я помещу здесь то, что пробовал ранее;
Я написал/выбрал функцию, которая (блокируя) создает источник из TMChan (закрываемый канал);
-- | Takes a generic type of STM chan and, given read and close functionality,
-- returns a conduit 'Source' which consumes the elements of the channel.
chanSource
:: (MonadIO m, MonadSTM m)
=> a -- ^ The channel
-> (a -> STM (Maybe b)) -- ^ The read function
-> (a -> STM ()) -- ^ The close/finalizer function
-> Source m b
chanSource ch readCh closeCh = ConduitM pull
where close = liftSTM $ closeCh ch
pull = PipeM $ liftSTM $ readCh ch >>= translate
translate = return . maybe (Done ()) (HaveOutput pull close)
Аналогично, функция превращения Чана в раковину;
-- | Takes a stream and, given write and close functionality, returns a sink
-- which wil consume elements and broadcast them into the channel
chanSink
:: (MonadIO m, MonadSTM m)
=> a -- ^ The channel
-> (a -> b -> STM()) -- ^ The write function
-> (a -> STM()) -- ^ The close/finalizer function
-> Sink b m ()
chanSink ch writeCh closeCh = ConduitM sink
where close = const . liftSTM $ closeCh ch
sink = NeedInput push close
write = liftSTM . writeCh ch
push x = PipeM $ write x >> return sink
Тогда слить источники просто; разветвить 2 потока (чего я действительно не хочу делать, но какого черта), которые могут помещать свои новые элементы в один список, из которого я затем создаю источник;
-- | Merges a list of 'Source' objects, sinking them into a 'TMChan' and returns
-- a source which consumes the elements of the channel.
mergeSources
:: (MonadIO m, MonadBaseControl IO m, MonadSTM m)
=> [Source (ResourceT m) a] -- ^ The list of sources
-> ResourceT m (Source (ResourceT m) a)
mergeSources sx = liftSTM newTMChan >>= liftA2 (>>) (fsrc sx) retn
where push c s = s $$ chanSink c writeTMChan closeTMChan
fsrc x c = mapM_ (\s -> resourceForkIO $ push c s) x
retn c = return $ chanSource c readTMChan closeTMChan
Несмотря на то, что мне удалось выполнить проверку типов с помощью этих функций, мне не удалось получить какое-либо использование этих функций для проверки типов;
-- | Helper which represents a conduit chain for each client connection
serverApp :: Application SessionIO
serverApp appdata = do
use ssBroadcast >>= liftIO . atomically . dupTMChan >>= assign ssBroadcast
-- appSource appdata $$ decoder $= protocol =$= encoder =$ appSink appdata
mergsrc $$ protocol $= encoder =$ appSink appdata
where chansrc = chanSource (use ssBroadcast) readTMChan closeTMChan
mergsrc = mergeSources [appSource appdata $= decoder, chansrc]
-- | Structure which holds mutable information for clients
data SessionState = SessionState
{ _ssBroadcast :: TMChan Packet -- ^ Outbound packet broadcast channel
}
makeLenses ''SessionState
-- | A transformer encompassing both SessionReader and SessionState
type Session m = ReaderT SessionReader (StateT SessionState m)
-- | Macro providing Session applied to an IO monad
type SessionIO = Session IO
В любом случае, я считаю этот метод ошибочным — в нем много промежуточных списков и преобразований. Это не может быть хорошо для производительности. Ищу руководства.
PS. Насколько я понимаю, это не дубликат; Объединение каналов с несколькими входами , так как в моей ситуации оба источника производят того же типа, и мне все равно, из какого источника создается объект Packet
, пока я не жду одного, пока у другого есть объекты, готовые к использованию.
ППС. Я прошу прощения за использование (и, следовательно, требование знаний) Lens в примере кода.
Data.Conduit.TMChan
из пакетаstm-conduit
? В нем есть все определяемые вами функции, включаяmergeSources
. - person Gabriel Gonzalez   schedule 26.05.2013decRefCount refcount
кодом, закрывающим все источники? - person Iain   schedule 01.06.2013