Потоковые байты в сетевой веб-сокет

У меня есть код, который использует дескриптор файла для имитации приемника потоковой передачи Bytestring из источника (AWS S3). Если мы хотим использовать Network.Websocket в качестве приемника , достаточно ли заменить LBS.writeFile в приведенном ниже коде на sendBinaryData (с дескриптором подключения)?

{-# LANGUAGE OverloadedStrings,ScopedTypeVariables #-}

import qualified Aws
import qualified Aws.S3 as S3
import           Data.Conduit (($$+-))
import qualified Data.Conduit.List as CL (mapM_)
import qualified Data.ByteString.Streaming.HTTP as SP
import qualified Data.ByteString.Lazy as LBS
import Streaming as S
import Streaming.Prelude as S hiding (show,print)
import Control.Concurrent.Async (async,waitCatch)
import Data.Text as T (Text)

data AwsConfig a = AwsConfig { _aws_cfg :: Aws.Configuration, _aws_s3cfg :: S3.S3Configuration a, _aws_httpmgr :: SP.Manager }

getObject :: AwsConfig Aws.NormalQuery -> T.Text -> T.Text ->  IO Int
getObject cfg bucket key = do
  req <- waitCatch =<< async (runResourceT $ do
    {- Create a request object with S3.getObject and run the request with pureAws. -}
    S3.GetObjectResponse { S3.gorResponse = rsp, S3.gorMetadata = mdata } <- 
      Aws.pureAws (_aws_cfg cfg) (_aws_s3cfg cfg) (_aws_httpmgr cfg) $
        S3.getObject bucket key
    {- Stream the response to a lazy bytestring -}
    liftIO $ LBS.writeFile "testaws" LBS.empty -- this will be replaced by content-length of the bytes 
    let obj = (($$+- CL.mapM_ S.yield) . hoist lift ) (SP.responseBody rsp)
    S.mapM_ (liftIO . (LBS.appendFile "testaws") . LBS.fromStrict) obj
    return $ lookup "content-length" (S3.omUserMetadata mdata))
  case req of
    Left _ -> return 2 -- perhaps, we could use this to send an error message over websocket
    Right _ -> return 0

Источником путаницы для меня является то, как определяется завершение потока? В случае файлов об этом заботится writeFile API. А как насчет sendBinaryData? Обрабатывает ли он завершение так же, как writeFile? Или это определяется парсером данных на стороне клиента?

Обновить

Этот вопрос касается того, как передать данные в дескриптор веб-сокета (допустим, дескриптор был предоставлен), как мы делаем с дескриптором файла в приведенном выше примере, а не о том, как управлять дескриптором в resourceT. conduit действительно занимает mapM_< /a> подход к приему данных. Так что, кажется, это действительно путь.

Вопрос об завершении связан с моим ходом мыслей: если у нас есть функция, прослушивающая данные на другой стороне дескриптора веб-сокета, то определение конца сообщения, по-видимому, имеет значение в контексте потоковой передачи. Учитывая функцию, как показано ниже:

f :: LBS.ByteString -> a

если мы делаем S.mapM_ для потоковой передачи данных в дескриптор веб-сокета, позаботится ли он о добавлении какого-либо маркера end of stream, чтобы f прослушивание на другой стороне могло прекратить обработку ленивой строки байтов. В противном случае f не узнает, когда сообщение будет готово.


person Sal    schedule 14.06.2016    source источник
comment
Почему вы вообще используете Lazy ByteString в этой программе?   -  person Michael    schedule 14.06.2016
comment
@ Майкл, возможно, невежество с моей стороны. Можно поменять местами со строгой строкой байтов.   -  person Sal    schedule 14.06.2016
comment
Кажется, теперь я вижу, это исходит от websockets   -  person Michael    schedule 14.06.2016


Ответы (3)


Вот несколько кусочков, которые могут сделать вещи более понятными. Во-первых, для первой небольшой демонстрации, пересматривая ваш getObject, я использую Streaming.ByteString.writeFile, который в любом случае находится в ResourceT, чтобы отбросить обход с помощью ленивой строки байтов.

{-# LANGUAGE OverloadedStrings,ScopedTypeVariables #-}

import qualified Aws
import qualified Aws.S3 as S3
import           Data.Conduit 
import qualified Data.Conduit.List as CL (mapM_)
import qualified Data.ByteString.Streaming.HTTP as HTTP
import qualified Data.ByteString.Streaming as SB
import qualified Data.ByteString.Streaming.Internal as SB
import qualified Data.ByteString as B
import qualified Data.ByteString.Lazy as BL
import           Streaming as S
import           Streaming.Prelude as S hiding (show,print)
import           Control.Concurrent.Async (async,waitCatch)
import           Data.Text as T (Text) 
import qualified Network.WebSockets as WebSockets
import           Control.Monad.Trans.Resource

data AwsConfig a = AwsConfig { _aws_cfg :: Aws.Configuration
                             , _aws_s3cfg :: S3.S3Configuration a
                             , _aws_httpmgr :: HTTP.Manager }

getObject :: AwsConfig Aws.NormalQuery -> FilePath -> T.Text -> T.Text ->  IO Int
getObject cfg file bucket key = do
  req <- waitCatch =<< async (runResourceT $ do
    S3.GetObjectResponse { S3.gorResponse = rsp, S3.gorMetadata = mdata } <- 
      Aws.pureAws (_aws_cfg cfg) (_aws_s3cfg cfg) (_aws_httpmgr cfg) $
        S3.getObject bucket key
    let bytestream = do 
         -- lookup "content-length" (S3.omUserMetadata mdata))
         SB.chunk B.empty -- this will be replaced by content-length 
         hoist lift (HTTP.responseBody rsp)  $$+- CL.mapM_ SB.chunk 
    SB.writeFile file bytestream ) -- this is in ResourceT 
  case req of
    Left _ -> return 2
    Right _ -> return 0

Мы можем более или менее абстрагироваться от того, что вы делали с SB.writeFile:

getObjectAbstracted
      :: (SB.ByteString (ResourceT IO) () -> ResourceT IO b)
         -> AwsConfig Aws.NormalQuery -> S3.Bucket -> Text -> ResourceT IO b
getObjectAbstracted action cfg bucket key = do
    S3.GetObjectResponse { S3.gorResponse = rsp, S3.gorMetadata = mdata } <- 
      Aws.pureAws (_aws_cfg cfg) 
                  (_aws_s3cfg cfg) 
                  (_aws_httpmgr cfg) 
                  (S3.getObject bucket key)

    action (hoist lift (HTTP.responseBody rsp)  $$+- CL.mapM_ SB.chunk) 

Здесь нам нужен небольшой помощник, не включенный в потоковую библиотеку bytestring.

mapMChunks_ :: Monad m => (B.ByteString -> m ()) -> SB.ByteString m r -> m r
mapMChunks_ act bytestream = do
  (a S.:> r) <- SB.foldlChunksM (\_ bs -> act bs) (return ()) bytestream
  return r

и может действовать более или менее так, как планировал @haoformayor, используя потоковую строку байтов

writeConnection :: MonadIO m => WebSockets.Connection -> SB.ByteString m r -> m r
writeConnection connection  = 
  mapMChunks_ (liftIO . WebSockets.sendBinaryData connection)

-- following `haoformayor`
connectWrite
    :: (MonadResource m, WebSockets.WebSocketsData a) 
    => WebSockets.PendingConnection 
    -> a                  -- closing  message
    -> SB.ByteString m r  -- stream from aws
    -> m r
connectWrite request closeMessage bytestream = do
    (releaseKey, connection) <- allocate (WebSockets.acceptRequest request)
                                         (`WebSockets.sendClose` closeMessage)
    writeConnection connection bytestream

getObjectWS :: WebSockets.WebSocketsData a =>
       WebSockets.PendingConnection
       -> a
       -> AwsConfig Aws.NormalQuery
       -> S3.Bucket
       -> Text
       -> ResourceT IO ()
getObjectWS request closeMessage = getObjectAbstracted (connectWrite request closeMessage)

Конечно, ни один из них пока не использует разницу между conduit и streaming/streaming-bytestring.

person Michael    schedule 14.06.2016

Вы правы, думая, что ручки потребуют дополнительных хитростей. Однако, поскольку вы уже используете монадный преобразователь ResourceT, это восхитительно просто сделать с allocate. allocate позволяет вам создать дескриптор в монаде ресурса и зарегистрировать действие очистки (которое в вашем случае просто закрывает соединение).

ok <- runResourceT $ do
  (releaseKey, handle) <-
    allocate (WebSockets.acceptRequest request) 
             (`WebSockets.sendClose` closeMessage)
  WebSockets.sendBinaryData handle data
  return ok
where
  request = ...
  closeMessage = ...
  data = ...
  ok = ...

При использовании allocate дескриптор гарантированно закроется к тому времени, когда runResourceT вернет ok.

Однако я не совсем уверен, что это то, чего вы хотите. Мне кажется, что getObject не должен знать о том, как принимать и закрывать WS-соединения; возможно, он должен принять дескриптор соединения WS в качестве аргумента, а затем записать его. Если вы повысите его тип возвращаемого значения до ResourceT, тогда вы можете поручить вызывающему объекту до getObject ответственность за вызов runResourceT и выделение дескрипторов WS и так далее. Но, надеюсь, приведенного выше примера достаточно, чтобы вы продолжили свой путь.

person hao    schedule 14.06.2016
comment
Да, getObject не нужно принимать и закрывать соединения WS. Об этом уже позаботились в другом месте. Мы просто передаем дескриптор WS connection в функцию. Однако, чтобы писать в него, разве нам не нужно делать mapM_ для отправки данных в дескриптор Websocket? Это был первоначальный вопрос - как передать данные в дескриптор WS, как мы это делаем в файл. На самом деле это не об управлении дескрипторами. - person Sal; 14.06.2016

(Предостережение - код не тестировался.)

Ваш код повторно открывает выходной файл и дополняет его каждый раз, когда приходит пакет данных. Очевидно, что лучшим решением является использование LBS.hPutStr для записи в файл с использованием уже открытого дескриптора файла.

То есть вместо:

S.mapM_ (liftIO . (LBS.appendFile "testaws") . LBS.fromStrict) obj

вы хотите использовать:

S.mapM_ (liftIO . (LBS.hPutStr h) . LBS.fromStrict) obj

Конечно, это ссылка на дескриптор h, и откуда это взялось?

Одно из решений — передать его в getObject или иным образом создать перед вызовом тела getObject, например:

getObject cfg bucket key = withFile "output" $ \h -> do
    req <- ...
    ...
    S.mapM_ (liftIO . (LBS.hPutStr h) . LBS.fromStrict) obj
    ...

Или, может быть, вам нужно создать внутри runResourceT... Я не уверен.

Обновление. См. ответ @haoformayor о том, как заставить ResourceT управлять дескриптором файла для вас.

person ErikR    schedule 14.06.2016