Одношаговый канал

Я хочу сделать что-то вроде ArrowChoice, но с каналами. Я хочу дождаться значения «Любой», а затем передать значения «Лево» в один канал и значения «Право» в другой, а затем объединить результаты обратно в поток «Либо».

Предположительно, это можно сделать, сделав внутренние каналы похожими на автоматы: превратить канал в функцию, которая принимает аргумент и возвращает монадический список полученных выходных данных:

newtype AutomataM i m o = Automata (i -> m (o, Automata i o))

conduitStep :: Conduit i m o -> AutomataM i m [o]

Причина списка выходов заключается в том, что Conduit может давать 0 или более выходов для каждого входа.

Я просмотрел ResumableConduit и его родственников, и, предположительно, ответ где-то там. Но я не совсем понимаю, как это делается.


person Paul Johnson    schedule 10.02.2014    source источник


Ответы (2)


Это не совсем та подпись, которую вы предоставили, но:

import Data.Conduit
import Data.Conduit.Internal (Pipe (..), ConduitM (..))

newtype Automata i o m r = Automata (m ([o], Either r (i -> Automata i o m r)))

conduitStep :: Monad m => ConduitM i o m r -> Automata i o m r
conduitStep (ConduitM con0) =
    Automata $ go [] id con0
  where
    go _ front (Done r) = return (front [], Left r)
    go ls front (HaveOutput p _ o) = go ls (front . (o:)) p
    go ls front (NeedInput p _) =
        case ls of
            [] -> return (front [], Right $ conduitStep . ConduitM . p)
            l:ls' -> go ls' front (p l)
    go ls front (PipeM mp) = mp >>= go ls front
    go ls front (Leftover p l) = go (l:ls) front p

Но будьте осторожны с этим подходом:

  1. Сохраняя вывод в виде списка, это не постоянная память.
  2. Мы выбрасываем финализаторы.

Вероятно, есть способ предоставить абстракцию ZipConduit, подобную ZipSource и ZipSink, которая решала бы проблемы такого рода более элегантно, но я не слишком много думал об этом.


EDIT В итоге я реализовал ZipConduit в conduit-extra 0.1.5. Вот демонстрация его использования, которая немного похожа на ваш случай:

import           Control.Applicative
import           Data.Conduit
import           Data.Conduit.Extra
import qualified Data.Conduit.List   as CL

conduit1 :: Monad m => Conduit Int m String
conduit1 = CL.map $ \i -> "conduit1: " ++ show i

conduit2 :: Monad m => Conduit Double m String
conduit2 = CL.map $ \d -> "conduit2: " ++ show d

conduit :: Monad m => Conduit (Either Int Double) m String
conduit = getZipConduit $
    ZipConduit (lefts =$= conduit1) *>
    ZipConduit (rights =$= conduit2)
  where
    lefts = CL.mapMaybe (either Just (const Nothing))
    rights = CL.mapMaybe (either (const Nothing) Just)

main :: IO ()
main = do
    let src = do
            yield $ Left 1
            yield $ Right 2
            yield $ Left 3
            yield $ Right 4
        sink = CL.mapM_ putStrLn
    src $$ conduit =$ sink
person Michael Snoyman    schedule 10.02.2014
comment
Этот метод, по-видимому, не такой мощный, но с учетом pipe1 и pipe2, таких как conduit1 и conduit2, и inputs = each [Left 1, Right 2, Left 3, Right 4] эквивалент каналов этой программы: main = runEffect $ inputs >-> for (pipe1 +++ pipe2) (either P.yield P.yield) >-> P.stdoutLn Он использует (+++) из дополнительных каналов. Конечно, есть и другие анализы проблемы. - person Michael; 18.09.2015
comment
Для тех, кто читает это позже, ZipConduit теперь в пакете conduit. - person nh2; 13.06.2020

Существует народный метод сделать это с помощью pipes с помощью каналов "push-category". Полная реализация взята из этого сообщения списка рассылки и этот ответ на переполнение стека. Я думаю, что он еще не выпущен из-за усилий по упрощению интерфейса Pipes, сосредоточения внимания на использовании экземпляра монады «упорядочивания», который скрыт с помощью этого метода, и отсутствия доказательств того, что эта реализация действительно правильно реализует класс Arrow. .

Идея состоит в том, чтобы реализовать новый тип Edge (показанный ниже), который представляет собой канал на основе push с аргументами типа в правильном порядке для Category, Arrow, ArrowChoice и обоих Functor и Applicative по их выходным значениям. Это позволяет составлять из них ориентированные ациклические графы с использованием стрелочных обозначений. Я рассмотрю реализацию ниже, но можно просто игнорировать ее и использовать Arrow/ArrowChoice/Applicative экземпляров Edge без особого беспокойства.

(Изменить: Этот код лучше всего доступен по адресу https://github.com/Gabriel439/Haskell-RCPL-Library)


{-# LANGUAGE RankNTypes           #-}
{-# LANGUAGE TypeSynonymInstances #-}

import Prelude hiding ((.), id)
import Pipes.Core
import Pipes.Lift
import Control.Monad.Morph
import Control.Category
import Control.Monad.State.Strict
import Control.Arrow

Это нетипичный способ использования каналов, который не представлен в модуле Pipes; вы должны импортировать Pipes.Core, чтобы использовать push. Нажимные трубы выглядят так

-- push :: a -> Proxy a' a a' a m r

и, таким образом, они требуют по крайней мере одно восходящее значение, прежде чем Proxy будет разрешено работать. Это означает, что весь процесс необходимо «запустить», передав первое значение в качестве вызова функции, и что крайний левый push-Proxy будет управлять всем потоком.

Учитывая канал на основе push, мы можем реализовать Category, Arrow и ArrowChoice. Стандартное решение также включает класс типов Edge, так что у нас есть аргументы типа в правильном порядке для Category и Arrow.

newtype Edge m r a b = Edge { unEdge :: a -> Pipe a b m r }

Для экземпляра Category мы используем категорию «push», которая имеет push как id и (<~<) как состав:

instance Monad m => Category (Edge m r) where
  id = Edge push
  Edge a . Edge b = Edge (a <~< b)

Мы встраиваем функции в Edge с arr, увеличивая id (то есть push) на нижнем краю. Для этого мы используем категорию respond, которая имеет закон p />/ respond == p, но вмешиваем в процесс нашу f.

instance Monad m => Arrow (Edge m r) where
  arr f = Edge (push />/ respond . f)

Мы также используем локальный преобразователь состояния для хранения snd половины наших пар и передачи их "вокруг" входного канала в first.

  first (Edge p) = Edge $ \(b, d) ->
    evalStateP d $ (up \>\ hoist lift . p />/ dn) b
    where
      up () = do
        (b, d) <- request ()
        lift (put d)
        return b
      dn c = do
        d <- lift get
        respond (c, d)

Наконец, мы получаем экземпляр ArrowChoice, реализуя left. Для этого мы разделяем бремя передачи сторон Left и Right, используя либо возврат, либо канал для передачи значений.

instance (Monad m) => ArrowChoice (Edge m r) where
    left (Edge k) = Edge (bef >=> (up \>\ (k />/ dn)))
      where
          bef x = case x of
              Left b -> return b
              Right d -> do
                  _ <- respond (Right d)
                  x2 <- request ()
                  bef x2
          up () = do
              x <- request ()
              bef x
          dn c = respond (Left c)

Мы можем использовать Edge для создания производителей и потребителей, основанных на push.

type PProducer m r b =            Edge m r () b
type PConsumer m r a = forall b . Edge m r a  b

а затем мы предоставим экземпляры Functor и Applicative для PProducer. Это происходит с помощью case анализа базового Pipe, так что это немного многословно. По сути, однако, все, что происходит, это то, что мы вставляем f в слот yield Pipe.

instance Functor (PProducer m r) where
  fmap f (Edge k) = $ Edge $ \() -> go (k ()) where
    go p = case p of
      Request () ku -> Request ()    (\() -> go (ku ()))
      -- This is the only interesting line
      Respond b  ku -> Respond (f b) (\() -> go (ku ()))
      M          m  -> M (m >>= \p' -> return (go p'))  
      Pure    r     -> Pure r

Наконец, Applicative почти такой же, за исключением того, что нам нужно переключаться между запуском восходящего канала для создания функций и запуском нижестоящего канала для создания аргументов.

instance (Monad m) => Applicative (Edge m r ()) where
    pure b = Edge $ \() -> forever $ respond b
    (Edge k1) <*> (Edge k2) = Edge (\() -> goL (k1 ()) (k2 ()))
      where
        goL p1 p2 = case p1 of
            Request () ku -> Request () (\() -> goL   (ku ()) p2)
            Respond f  ku ->                    goR f (ku ()) p2
            M          m  -> M (m >>= \p1' -> return (goL p1' p2))
            Pure    r     -> Pure r
        goR f p1 p2 = case p2 of
            Request () ku -> Request ()    (\() -> goR f p1 (ku ()))
            Respond x  ku -> Respond (f x) (\() -> goL   p1 (ku ()))
            M          m  -> M (m >>= \p2' -> return (goR f p1 p2'))
            Pure    r     -> Pure r
person J. Abrahamson    schedule 10.02.2014