Предотвратить блокировку `chan :: Receiver` при пустом буфере

Я хотел бы создать канал Multi-Producer Multi-Consumer (MPMC) с различными параллельными задачами, обрабатывающими и производящими в нем данные. Некоторые из этих задач отвечают за взаимодействие с файловой системой или сетью.

Два примера:

  • PrintOutput(String) будет использоваться средством ведения журнала, выводом на консоль или графическим интерфейсом пользователя.

  • NewJson(String) будет использоваться регистратором или парсером.

Для этого я выбрал chan в качестве поставщика канала MPMC и tokio в качестве системы для управления циклами событий для каждого слушателя на канале.

Прочитав пример на сайте tokio, я начал реализовывать futures::stream::Stream для chan::Receiver. Это позволит использовать для каждого будущего прослушивания на канале. Однако документация этих двух библиотек указывает на конфликт:

fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error>

Попытка вытащить следующее значение этого потока, возвращая None, если поток завершен.

Этот метод, как и Future :: poll, является единственным методом извлечения значения из потока. Этот метод также обычно должен запускаться в контексте задачи, и разработчики этого признака должны гарантировать, что реализации этого метода не блокируются, так как это может привести к плохому поведению потребителей.

fn recv(&self) -> Option<T>

Получите значение на этом канале.

Если это асинхронный канал, извлекать только блоки, когда буфер пуст.

Если это синхронный канал, recv блокируется только тогда, когда буфер пуст.

Если это канал рандеву, recv блокируется до тех пор, пока соответствующая отправка не отправит значение.

Для всех каналов, если канал закрыт и буфер пуст, recv всегда и сразу возвращает None. (Если буфер не пуст на закрытом канале, возвращаются значения из буфера.)

Значения гарантированно будут получены в том же порядке, в котором они были отправлены.

Эта операция никогда не вызовет паники! но он может заблокироваться, если канал никогда не закрывается.

chan::Receiver может блокироваться, когда буфер пуст, но futures::stream::Stream ожидает, что никогда не будет блокироваться при опросе.

Если пустой буфер блокируется, нет четкого способа подтвердить, что он пуст. Как проверить, пуст ли буфер, чтобы предотвратить блокировку?

Хотя Кабуки на моем радаре и кажется наиболее зрелым из Ящики с моделями актеров почти полностью лишены документации.


Это моя реализация на данный момент:

extern crate chan;
extern crate futures;

struct RX<T>(chan::Receiver<T>);

impl<T> futures::stream::Stream for RX<T> {
    type Item = T;
    type Error = Box<std::error::Error>;

    fn poll(&mut self) -> futures::Poll<Option<Self::Item>, Self::Error> {
        let &mut RX(ref receiver) = self;
        let item = receiver.recv();

        match item {
            Some(value) => Ok(futures::Async::Ready(Some(value))),
            None => Ok(futures::Async::NotReady),
        }
    }
}

Я закончил быстрый тест, чтобы увидеть, как это работает. Вроде нормально, но, как и ожидалось, блокируется после завершения буфера. Хотя это должно сработать, меня несколько беспокоит, что для потребителя означает «плохое поведение». А пока я продолжу тестировать этот подход и, надеюсь, не столкнусь с плохим поведением.

extern crate chan;
extern crate futures;
use futures::{Stream, Future};

fn my_test() {
    let mut core = tokio_core::reactor::Core::new().unwrap();
    let handle = core.handle();

    let (tx, rx) = chan::async::<String>();

    tx.send("Hello".to_string()); // fill the buffer before it blocks; single thread here.

    let incoming = RX(rx).for_each(|s| {
        println!("Result: {}", s);

        Ok(())
    });

    core.run(incoming).unwrap()
}

person Aaron3468    schedule 22.10.2017    source источник


Ответы (1)


chan crate предоставляет макрос chan_select, который позволяет не -блокировка recv; но для реализации Future для таких примитивов вам также необходимо разбудить задачу, когда канал станет готовым (см. _ 5_).

Вы можете реализовать Future, используя существующие примитивы; внедрение новых обычно труднее. В этом случае вам, вероятно, придется выполнить форк chan, чтобы сделать его Future совместимым.

Кажется, multiqueue ящик имеет Future совместимый канал mpmc mpmc_fut_queue .

person Stefan    schedule 22.10.2017
comment
О, mpmc_fut_queue похоже именно то, что я искал! Мне удалось заставить chan работать для моего варианта использования, потому что с потоком, заблокированным до прихода нового сообщения, легко справиться; просто отправьте Terminate сообщение, когда все будет готово. - person Aaron3468; 24.10.2017