Как использовать подписчика ZMQ в Haskell с каналами

Я могу заставить подписчика ZMQ работать в Haskell, но был бы признателен за руководство о том, как использовать эти данные с Pipes. Моя попытка написать Producer терпит неудачу при «сборке стека» со следующей ошибкой:

Не удалось сопоставить тип «Proxy X () c'0 c0 (ZMQ z)» с «ZMQ z»

Ожидаемый тип: ZMQ z ()

Фактический тип: Прокси X () c'0 c0 (ZMQ z) ()

{-# LANGUAGE OverloadedStrings #-}
module Main where
import Control.Monad
import Pipes
import qualified Pipes.Prelude as P
import System.ZMQ4.Monadic
import qualified Data.ByteString.Char8 as CS

fromZMQ :: (Receiver r) => Socket z r -> Producer String (ZMQ z) ()
fromZMQ sock = do
    msg <- lift $ receive sock
    yield (CS.unpack msg)
    fromZMQ sock

main :: IO ()
main = --do
  runZMQ $ do
    subSock <- socket Sub  ---subscriptionSocket
    subscribe subSock ""
    connect subSock "tcp://127.0.0.1:4998" 
    forever $ fromZMQ subSock >-> P.take 3 >-> P.print

Обратите внимание: я хочу использовать данные, публикуемые в ZMQ, с помощью скрипта Python.


person Stuart    schedule 26.10.2018    source источник
comment
Я предлагаю не использовать монадический API zmq и при необходимости использовать собственную монаду. Это или рассмотрите возможность косвенного обращения к zmq через Chan и рабочий поток, который просто передает данные в/из zmq.   -  person Thomas M. DuBuisson    schedule 27.10.2018
comment
Спасибо @Thomas за предложение исследовать Чана. Поскольку я думаю, что zeroMQ обеспечивает буферизацию, которую обеспечивает Чан, я смог запустить что-то с помощью MVar, что, кажется, аккуратно решает основную проблему, с которой я боролся, т.е. как обновить представление состояния, не получая ошибки времени выполнения ‹‹loop››. Код ниже, если это поможет кому-то еще.   -  person Stuart    schedule 27.10.2018


Ответы (2)


Отработав предложение Томаса об использовании Chan, я адаптировал пример MVar (ссылка ниже) для накопления полученных строк и их подсчета для демонстрации состояния обновления и чтения.

https://www.oreilly.com/library/view/parallel-and-concurrent/9781449335939/ch07.html

{-# LANGUAGE OverloadedStrings #-}

module Main where

import Control.Concurrent  
import Control.Monad
import System.ZMQ4.Monadic
import qualified Data.ByteString.Char8 as CS

newtype State = State (MVar (Int, [CS.ByteString]) ) --(count, list of strings received over zmq)

newState :: IO State
newState = do
  m <- newMVar (0, [])
  return (State m)

updateState :: State -> CS.ByteString -> IO ()
updateState (State m) newString = do
  (count,strList) <- takeMVar m
  putMVar m ( count + 1 , strList ++ [newString] )

showState :: State -> IO String
showState (State m) = do
  count <- takeMVar m
  putMVar m count  --return the lock; no changes
  return (show count)

main = runZMQ $ do
    sub <- socket Sub
    subscribe sub ""
    connect sub "tcp://127.0.0.1:4998"
    s <- liftIO newState 
    forever $ do
      receive sub >>= liftIO . updateState s 
      liftIO $ updateState s "hello"  --'manually' add an additional string on each iteration
      op <- liftIO $ showState s
      liftIO $ print op 
person Stuart    schedule 27.10.2018

Единственная проблема в вашем коде - последняя строка.

У вас есть «труба»:

fromZMQ subSock >-> P.take 3 >-> P.print :: Effect (ZMQ z) ()

и когда вы применяете к нему forever, тип остается прежним. Но то, что вам нужно, это простое ZMQ z, другими словами, вам нужно фактически выполнить вычисление pipes, и вы используете для этого функцию runEffect. В качестве примечания: вам на самом деле не нужен forever, так как поток все равно никогда не закончится.

Итак, все, что вам нужно сделать, это заменить forever на runEffect в последней строке.

person kirelagin    schedule 17.03.2019