Как отменить бесконечный поток из самого потока?

Я пытаюсь отменить интервал (interval_timer) после очистки очереди, но не уверен, что это правильная стратегия.

let mut some_vars = vec![1, 2, 3, 4, 5, 6, 7, 8];
let interval_timer = tokio_timer::Timer::default();

let timer = interval_timer
    .interval(Duration::from_millis(1000))
    .map_err(|_| {
        println!("Errored out");
    });

let s = timer.for_each(move |_| {
    println!("Woke up");
    let item = some_vars.pop().unwrap();

    let f = futures::future::ok(item).map(|x| {
        println!("{:?}", x);
    });
    tokio::spawn(f)
});

tokio::run(s);

Я попробовал drop, как было предложено в gitter, но это закончилось ошибкой:

let mut some_vars = vec![1, 2, 3, 4, 5, 6, 7, 8];
let mut interval_timer = tokio_timer::Timer::default();

let timer = interval_timer
    .interval(Duration::from_millis(1000))
    .map_err(|_| {
        println!("Errored out");
    });

let s = timer.for_each(move |_| {
    println!("Woke up");
    if some_vars.len() == 1 {
        drop(interval_timer);
    }

    let item = some_vars.pop().unwrap();

    let f = futures::future::ok(item).map(|x| {
        println!("{:?}", x);
    });
    tokio::spawn(f)
});

tokio::run(s);

Ошибка:

error[E0507]: cannot move out of captured outer variable in an `FnMut` closure
--> src/main.rs:72:22
   |
60 |     let mut interval_timer = tokio_timer::Timer::default();
   |         ------------------ captured outer variable
...
72 |                 drop(interval_timer);
   |                      ^^^^^^^^^^^^^^ cannot move out of captured outer variable in an `FnMut` closure

person opensourcegeek    schedule 12.03.2018    source источник


Ответы (3)


Если вы хотите отменить поток вне потока, см. отменить трансляцию.


В вашем конкретном случае проще всего преобразовать вашу коллекцию в поток и заархивировать ее вместе с интервальным таймером. Таким образом, результирующий поток естественным образом останавливается, когда коллекция пуста:

use futures::{future, stream, Stream}; // 0.1.29
use std::time::Duration;
use tokio; // 0.1.22
use tokio_timer::Interval; // 0.2.11

fn main() {
    tokio::run({
        let some_vars = vec![1, 2, 3, 4, 5, 6, 7, 8];

        let timer =
            Interval::new_interval(Duration::from_millis(100)).map_err(|e| panic!("Error: {}", e));

        let some_vars = stream::iter_ok(some_vars.into_iter().rev());
        let combined = timer.zip(some_vars);

        combined.for_each(move |(_, item)| {
            eprintln!("Woke up");

            tokio::spawn(future::lazy(move || {
                println!("{:?}", item);
                Ok(())
            }));

            Ok(())
        })
    });
}

В противном случае вы можете остановить поток, используя and_then, чтобы удалить значение из коллекции и контролировать, должен ли поток продолжаться:

use futures::{future, Stream}; // 0.1.29
use std::time::Duration;
use tokio; // 0.1.22
use tokio_timer::Interval; // 0.2.11

fn main() {
    tokio::run({
        let mut some_vars = vec![1, 2, 3, 4, 5, 6, 7, 8];

        let timer =
            Interval::new_interval(Duration::from_millis(100)).map_err(|e| panic!("Error: {}", e));

        let limited = timer.and_then(move |_| {
            if some_vars.len() <= 4 {
                Err(())
            } else {
                some_vars.pop().ok_or(())
            }
        });

        limited.for_each(move |item| {
            eprintln!("Woke up");

            tokio::spawn(future::lazy(move || {
                println!("{:?}", item);
                Ok(())
            }));

            Ok(())
        })
    });
}
person Shepmaster    schedule 05.11.2019

Я создал копию структуры Tokio Interval, добавив ссылку на метод моего приложения, чтобы указать, когда прерывать раньше.

В моем случае я хочу прервать Interval для выключения.

Мой метод опроса Interval выглядит так:

fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
    if self.session.read().unwrap().shutdown {
        return Ok(Async::Ready(Some(Instant::now())));
    }

    // Wait for the delay to be done
    let _ = match self.delay.poll() {

Затем вам нужно сохранить дескриптор задачи (вызов task = futures::task::current() при выполнении внутри задачи тайм-аута).

В любой момент вы можете вызвать task.notify(), чтобы активировать интервал, и ввести код прерывания, прерывая Interval раньше.

Внутри Interval есть структура Delay, которую можно изменить, вы можете создать Interval, которую вы можете прерывать и изменять время ожидания, таким образом вы можете прервать один раз, а затем продолжить.

person teknopaul    schedule 04.11.2019

tokio_timer::Interval реализует futures::Stream, поэтому попробуйте использовать take_while метод:

let s = timer
    .take_while(|()| 
        future::ok(is_net_completed()))
    .for_each(move |_| {
        println!("Woke up");
        // ...
    })
person wolandr    schedule 13.03.2018
comment
Не могли бы вы объяснить, как это отменяет повторяющийся интервал? - person Shepmaster; 13.03.2018
comment
Я попробовал take_ while, проблема заключалась в том, что я не мог использовать some_vars в закрытии take_ while, а также for_each (изменяемое) закрытие. Если собственность может быть решена, тогда это решит непосредственную проблему. - person opensourcegeek; 14.03.2018
comment
@Shepmaster Если вам нужна пауза в потоке с интервалом, это можно реализовать с помощью filer между частями take_while и for_each. Или ровно в for_each закрытии. - person wolandr; 14.03.2018
comment
@opensourcegeek Итак, вам нужно синхронизировать объект шарда, используемый, например, типы из std::sync::* - person wolandr; 14.03.2018
comment
Я пробовал take_ while, и он не прерывает интервал, он все еще срабатывает по тому же расписанию, если вы поместите println! () В закрытие take_ while, это довольно ясно - person teknopaul; 04.11.2019