Есть ли способ, чтобы канал брал данные из нескольких источников, не блокируя ни один из них?

Я пишу сервер, и одно из требований заключается в том, что он должен иметь возможность передавать данные клиентам без прямого запроса данных клиентом. Я использую каналы, но мне кажется, что это выходит за рамки возможностей каналов. Проблема, с которой я сталкиваюсь, заключается в том, что, похоже, нет способа определить, есть ли в сокете доступные данные или нет, и await заблокирует выполнение до тех пор, пока данные не будут доступны. Скажем, у меня есть следующие функции

getPacket :: Conduit ByteString IO ClientPacket --take a bytestring and yield a ClientPacket i.e. the ByteString deserialized into a sensible form
processPacket :: Conduit ClientPacket IO ServerPacket --take a ClientPacket and yield a ServerPacket i.e. a response to the client's request
putPacket :: Conduit ServerPacket IO ByteString --serialize the ServerPacket

затем соединяю кондуиты вместе с источником и стоком из библиотеки Conduit.Network

appSource appData $$ getPacket =$= processPacket =$= putPacket $= appSink appData

Теперь я ввожу источник данных извне канала и хочу включить эти данные в канал. Например, если бы это был чат-сервер, внешними данными были бы сообщения, отправленные другими клиентами. Проблема в том, что независимо от того, где я пытаюсь ввести эти внешние данные, они будут заблокированы вызовами ожидания. По сути, я закончу код, который выглядит так.

yield processOutsideData --deal with the outside data
data <- await            --await data from upstream

Единственный способ, которым будет обработано больше внешних данных, — это если вышестоящий компонент что-то выдает, но вышестоящий компонент выдаст, только если он получит данные от клиента, чего я пытаюсь избежать. Я пытался использовать несколько потоков и TChan, чтобы решить эту проблему, но похоже, что appSource и appSink должны использоваться в одном потоке, иначе я получаю недопустимые исключения файловых дескрипторов из recv (что имеет смысл).

Однако, если источник сокета и приемник работают в одном и том же потоке, я снова сталкиваюсь с проблемой блокировки ожидания, и у меня нет возможности проверить, доступны ли данные из сокета. В этот момент кажется, что я ударился о стену с трубопроводами.

Но мне действительно нравится использовать кондуиты, и я бы предпочел продолжать их использовать. Итак, мои вопросы: есть ли способ сделать то, что я пытаюсь достичь с помощью каналов?


person user467526    schedule 11.08.2015    source источник
comment
Взгляните на stm-conduit или в земле каналов параллельные каналы.   -  person Cirdec    schedule 11.08.2015


Ответы (1)


В примерах конвейерной сети Майкла Сноймана используется параллелизм. Пример клиента telnet запускает один поток для отправки ввода, а другой — для отображения полученных данных. Я адаптировал его для отправки и получения целых строк

{-# LANGUAGE OverloadedStrings #-}
import Conduit
import Control.Concurrent.Async (concurrently)
import Control.Monad            (liftM, void)
import Data.ByteString          (ByteString)
import Data.ByteString.Char8    (unpack)
import Data.Conduit.Network
import Data.String              (IsString, fromString)
import Network                  (withSocketsDo)

getLines :: (IsString a, MonadIO m) => Producer m a
getLines = repeatMC . liftM fromString $ liftIO getLine

putLines :: (MonadIO m) => Consumer ByteString m ()
putLines = mapM_C $ liftIO . putStrLn . unpack

main :: IO ()
main = withSocketsDo $
    runTCPClient (clientSettings 4000 "localhost") $ \server ->
        void $ concurrently
            (getLines $$ appSink server)
            (appSource server $$ putLines)

Мы можем сделать то же самое на сервере. Создайте канал STM, запишите полученные данные в канал и отправьте данные из канала клиентам. При этом используются простые оболочки пакета stm-conduit вокруг канала STM, sourceTBMChan и sinkTBMChan.

{-# LANGUAGE OverloadedStrings #-}
import Conduit
import Control.Concurrent.Async       (concurrently)
import Control.Concurrent.STM.TBMChan (newTBMChan)
import Control.Monad                  (void)
import Control.Monad.STM              (atomically)
import Data.Conduit.Network
import Data.Conduit.TMChan            (sourceTBMChan, sinkTBMChan)
import Network                        (withSocketsDo)

main :: IO ()
main = withSocketsDo $ do
    channel <- atomically $ newTBMChan 10
    runTCPServer (serverSettings 4000 "*") $ \server ->
        void $ concurrently
            (appSource server $$ sinkTBMChan channel False)
            (sourceTBMChan channel $$ appSink server)

Если мы запустим сервер только с одним подключенным клиентом, он отобразит то, что отправил клиент.

----------
| a      | (sent)
| a      | (received)
| b      | (sent)
| b      | (received)
| c      | (sent)
| c      | (received) 
----------

Если мы запускаем сервер с несколькими подключенными клиентами, сообщения распределяются между клиентами, причем каждое сообщение получает один клиент.

----------             ----------
| 1      | (sent)      | 1      | (received)
| 2      | (sent)      | 3      | (received)
| 2      | (received)  |        |
| 3      | (sent)      |        |
|        |             |        |
|        |             |        |
----------             ----------

В этом примере не показано, что делать, когда клиент закрывает соединение.

person Cirdec    schedule 11.08.2015