Асинхронное чтение из UdpSocket

Я пытаюсь одновременно обрабатывать прибывающие пакеты UDP в Tokio. Однако следующий MWE не делает того, что я ожидал:

extern crate futures;
extern crate tokio_core;
extern crate tokio_io;

use futures::{Future, Stream};
use std::net::SocketAddr;
use tokio_core::net::{UdpCodec, UdpSocket};
use tokio_core::reactor::Core;

// just a codec to send and receive bytes
pub struct LineCodec;
impl UdpCodec for LineCodec {
    type In = (SocketAddr, Vec<u8>);
    type Out = (SocketAddr, Vec<u8>);

    fn decode(&mut self, addr: &SocketAddr, buf: &[u8]) -> std::io::Result<Self::In> {
        Ok((*addr, buf.to_vec()))
    }

    fn encode(&mut self, (addr, buf): Self::Out, into: &mut Vec<u8>) -> SocketAddr {
        into.extend(buf);
        addr
    }
}

fn compute(addr: SocketAddr, msg: Vec<u8>) -> Box<Future<Item = (), Error = ()>> {
    println!("Starting to compute for: {}", addr);
    // sleep is a placeholder for a long computation
    std::thread::sleep(std::time::Duration::from_secs(8));
    println!("Done computing for for: {}", addr);
    Box::new(futures::future::ok(()))
}

fn main() {
    let mut core = Core::new().unwrap();
    let handle = core.handle();
    let listening_addr = "127.0.0.1:8080".parse::<SocketAddr>().unwrap();
    let socket = UdpSocket::bind(&listening_addr, &handle).unwrap();
    println!("Listening on: {}", socket.local_addr().unwrap());

    let (writer, reader) = socket.framed(LineCodec).split();

    let socket_read = reader.for_each(|(addr, msg)| {
        println!("Got {:?}", msg);
        handle.spawn(compute(addr, msg));
        Ok(())
    });

    core.run(socket_read).unwrap();
}

Соединив два терминала с помощью $ nc -u localhost 8080 и отправив некоторый текст, я вижу, что сообщение от второго терминала обрабатывается после завершения первого.

Что мне нужно изменить?


person Manuel Schmidt    schedule 06.02.2018    source источник


Ответы (2)


Как сказал @Stefan в другом ответе, вы не должны блокировать асинхронный код. Учитывая ваш пример, похоже, что sleep - это заполнитель для некоторых длинных вычислений. Поэтому вместо использования тайм-аута вы должны делегировать это вычисление другому потоку, например this пример:

extern crate futures;
extern crate futures_cpupool;

use futures::Future;
use futures_cpupool::CpuPool;

...

let pool = CpuPool::new_num_cpus();

...

fn compute(handle: &Handle, addr: SocketAddr, _msg: Vec<u8>) -> Box<Future<Item = (), Error = ()>> {
    // I don't know enough about Tokio to know how to make `pool` available here
    pool.spawn_fn (|| {
        println!("Starting to compute for: {}", addr);
        std::thread::sleep(std::time::Duration::from_secs(8));
        println!("Done computing for for: {}", addr);
        Ok(())
    })
}
person Jmb    schedule 06.02.2018
comment
Да, это то, что я имел в виду. Я прокомментировал свой вопрос, чтобы прояснить использование сна. - person Manuel Schmidt; 06.02.2018

Никогда не sleep в асинхронном коде (а также избегайте любых других блокирующих вызовов).

Возможно, вы захотите использовать Timeout вместо это:

Детская площадка

fn compute(handle: &Handle, addr: SocketAddr, _msg: Vec<u8>) -> Box<Future<Item = (), Error = ()>> {
    println!("Starting to compute for: {}", addr);
    Box::new(
        Timeout::new(std::time::Duration::from_secs(8), handle)
            .unwrap()
            .map_err(|e| panic!("timeout failed: {:?}", e))
            .and_then(move |()| {
                println!("Done computing for for: {}", addr);
                Ok(())
            }),
    )
}
person Stefan    schedule 06.02.2018