且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

如何在 Rust 中迭代返回 Futures 的函数 Vec?

更新时间:2023-11-10 16:12:58

你可以使用 stream::unfold 将单个值转换为流.在这种情况下,我们可以使用 IntoIter 迭代器作为单个值.

使用futures::{executor, stream, Stream, TryStreamExt};//0.3.4类型错误 = Box;类型结果<T,E =错误>= std::result::Result;async fn network_request(val:i32) ->结果<i32>{//只是为了演示,不要在实际程序中这样做使用标准::{线,时间::{持续时间,瞬间},};线程::睡眠(持续时间::from_secs(1));println!("在 {:?} 处解析 {}", val, Instant::now());好的(验证 * 100)}fn requests_in_sequence(vals: Vec<i32>) ->impl Stream<Item = Result<i32>>{流::展开(vals.into_iter(),|mut vals|异步{让 val = vals.next()?;让响应 = network_request(val).await;一些((响应,vals))})}fn 主要() {让 s = requests_in_sequence(vec![1, 2, 3]);执行器::block_on(异步{s.try_for_each(|v| 异步移动 {println!("-> {}", v);行(())}).等待.expect("发生错误");});}

Resolving 1 at Instant { tv_sec: 6223328, tv_nsec: 294631597 }->100即时解决 2 { tv_sec: 6223329, tv_nsec: 310839993 }->200立即解决 3 { tv_sec: 6223330, tv_nsec: 311005834 }->300

要忽略 ErrNone,您必须将 Error 传送到 Item,使Item 键入 Result:

使用futures::{executor, stream, Stream, StreamExt};//0.3.4类型错误 = Box;类型结果<T,E =错误>= std::result::Result;async fn network_request(val:i32) ->结果<选项<i32>>{//只是为了演示,不要在实际程序中这样做使用标准::{线,时间::{持续时间,瞬间},};线程::睡眠(持续时间::from_secs(1));println!("在 {:?} 处解析 {}", val, Instant::now());匹配值 {1 =>Err("boom".into()),//一个错误2 =>Ok(None),//没有数据_ =>Ok(Some(val * 100)),//成功}}fn requests_in_sequence(vals: Vec<i32>) ->impl Stream<Item = Result<Option<i32>>>{流::展开(vals.into_iter(),|mut vals|异步{让 val = vals.next()?;让响应 = network_request(val).await;一些((响应,vals))})}fn 主要() {执行器::block_on(异步{让 s = requests_in_sequence(vec![1, 2, 3]);让 s = s.filter_map(|v| 异步移动 { v.ok() });让 s = s.filter_map(|v| 异步移动 { v });让 mut s = s.boxed_local();匹配 s.next().await {一些(v)=>println!("第一次成功:{}", v),无=>println!("没有成功的请求"),}});}

Resolving 1 at Instant { tv_sec: 6224229, tv_nsec: 727216392 }立即解决 2 { tv_sec: 6224230, tv_nsec: 727404752 }即时解析 3 { tv_sec: 6224231, tv_nsec: 727593740 }第一次成功:300

有没有可能建立这样的动态链

是的,通过利用 async 函数:

使用futures::executor;//0.3.4类型错误 = Box;类型结果<T,E =错误>= std::result::Result;async fn network_request(val:i32) ->结果<选项<i32>>{//只是为了演示,不要在实际程序中这样做使用标准::{线,时间::{持续时间,瞬间},};线程::睡眠(持续时间::from_secs(1));println!("在 {:?} 处解析 {}", val, Instant::now());匹配值 {1 =>Err("boom".into()),//一个错误2 =>Ok(None),//没有数据_ =>Ok(Some(val * 100)),//成功}}async fn requests_in_sequence(vals: Vec<i32>) ->结果<i32>{让 mut vals = vals.into_iter().peekable();而让 Some(v) = vals.next() {匹配 network_request(v).await {好的(一些(v))=>返回确定(v),Err(e) if vals.peek().is_none() =>返回错误(e),好的(无) |错误(_)=>{/* 什么都不做,尝试下一个源 */}}}Err("资源耗尽".into())}fn 主要() {执行器::block_on(异步{匹配 requests_in_sequence(vec![1, 2, 3]).await {好的(v) =>println!("第一次成功:{}", v),错误(e)=>println!("没有成功的请求:{}", e),}});}

另见:

在移动到下一个之前,是否需要对迭代器中的每个 Future 进行全面评估

这不是你自己的要求吗?强调我的:

请求数据会依次检查每个来源.如果第一个来源有错误 (Err),或者没有可用的数据 (None),然后会尝试第二个来源

Is it possible to loop over a Vec, calling a method that returns a Future on each, and build a chain of Futures, to be evaluated (eventually) by the consumer? Whether to execute the later Futures would depend on the outcome of the earlier Futures in the Vec.

To clarify:

I'm working on an application that can fetch data from an arbitrary set of upstream sources.

Requesting data would check with each of the sources, in turn. If the first source had an error (Err), or did not have the data available (None), then the second source would be tried, and so on.

Each source should be tried exactly once, and no source should be tried until all of the sources before have returned their results. Errors are logged, but otherwise ignored, passing the query to the next upstream data source.

I have some working code that does this for fetching metadata:

/// Attempts to read/write data to various external sources. These are
/// nested types, because a data source may exist as both a reader and a writer
struct StoreManager {
    /// Upstream data sources
    readers: Vec<Rc<RefCell<StoreRead>>>,
    /// Downstream data sinks
    writers: Vec<Rc<RefCell<StoreWrite>>>,
}

impl StoreRead for StoreManager {
    fn metadata(self: &Self, id: &Identifier) -> Box<Future<Option<Metadata>, Error>> {
       Box::new(ok(self.readers
            .iter()
            .map(|store| {
                executor::block_on(store.borrow().metadata(id)).unwrap_or_else(|err| {
                    error!("Error on metadata(): {:?}", err);
                    None
                })
            })
            .find(Option::is_some)
            .unwrap_or(None)))
    }
}

Aside from my unhappiness with all of the Box and Rc/RefCell nonsense, my real concern is with the executor::block_on() call. It blocks, waiting for each Future to return a result, before continuing to the next.

Given that it's possible to call fn_returning_future().or_else(|_| other_fn()) and so on, is it possible to build up a dynamic chain like this? Or is it a requirement to fully evaluate each Future in the iterator before moving to the next?

You can use stream::unfold to convert a single value into a stream. In this case, we can use the IntoIter iterator as that single value.

use futures::{executor, stream, Stream, TryStreamExt}; // 0.3.4

type Error = Box<dyn std::error::Error>;
type Result<T, E = Error> = std::result::Result<T, E>;

async fn network_request(val: i32) -> Result<i32> {
    // Just for demonstration, don't do this in a real program
    use std::{
        thread,
        time::{Duration, Instant},
    };
    thread::sleep(Duration::from_secs(1));
    println!("Resolving {} at {:?}", val, Instant::now());

    Ok(val * 100)
}

fn requests_in_sequence(vals: Vec<i32>) -> impl Stream<Item = Result<i32>> {
    stream::unfold(vals.into_iter(), |mut vals| async {
        let val = vals.next()?;
        let response = network_request(val).await;
        Some((response, vals))
    })
}

fn main() {
    let s = requests_in_sequence(vec![1, 2, 3]);
    executor::block_on(async {
        s.try_for_each(|v| async move {
            println!("-> {}", v);
            Ok(())
        })
        .await
        .expect("An error occurred");
    });
}

Resolving 1 at Instant { tv_sec: 6223328, tv_nsec: 294631597 }
-> 100
Resolving 2 at Instant { tv_sec: 6223329, tv_nsec: 310839993 }
-> 200
Resolving 3 at Instant { tv_sec: 6223330, tv_nsec: 311005834 }
-> 300


To ignore Err and None, you have to shuttle the Error over to the Item, making the Item type a Result<Option<T>, Error>:

use futures::{executor, stream, Stream, StreamExt}; // 0.3.4

type Error = Box<dyn std::error::Error>;
type Result<T, E = Error> = std::result::Result<T, E>;

async fn network_request(val: i32) -> Result<Option<i32>> {
    // Just for demonstration, don't do this in a real program
    use std::{
        thread,
        time::{Duration, Instant},
    };
    thread::sleep(Duration::from_secs(1));
    println!("Resolving {} at {:?}", val, Instant::now());

    match val {
        1 => Err("boom".into()),  // An error
        2 => Ok(None),            // No data
        _ => Ok(Some(val * 100)), // Success
    }
}

fn requests_in_sequence(vals: Vec<i32>) -> impl Stream<Item = Result<Option<i32>>> {
    stream::unfold(vals.into_iter(), |mut vals| async {
        let val = vals.next()?;
        let response = network_request(val).await;
        Some((response, vals))
    })
}

fn main() {
    executor::block_on(async {
        let s = requests_in_sequence(vec![1, 2, 3]);

        let s = s.filter_map(|v| async move { v.ok() });
        let s = s.filter_map(|v| async move { v });
        let mut s = s.boxed_local();

        match s.next().await {
            Some(v) => println!("First success: {}", v),
            None => println!("No successful requests"),
        }
    });
}

Resolving 1 at Instant { tv_sec: 6224229, tv_nsec: 727216392 }
Resolving 2 at Instant { tv_sec: 6224230, tv_nsec: 727404752 }
Resolving 3 at Instant { tv_sec: 6224231, tv_nsec: 727593740 }
First success: 300


is it possible to build up a dynamic chain like this

Yes, by leveraging async functions:

use futures::executor; // 0.3.4

type Error = Box<dyn std::error::Error>;
type Result<T, E = Error> = std::result::Result<T, E>;

async fn network_request(val: i32) -> Result<Option<i32>> {
    // Just for demonstration, don't do this in a real program
    use std::{
        thread,
        time::{Duration, Instant},
    };
    thread::sleep(Duration::from_secs(1));
    println!("Resolving {} at {:?}", val, Instant::now());

    match val {
        1 => Err("boom".into()),  // An error
        2 => Ok(None),            // No data
        _ => Ok(Some(val * 100)), // Success
    }
}

async fn requests_in_sequence(vals: Vec<i32>) -> Result<i32> {
    let mut vals = vals.into_iter().peekable();

    while let Some(v) = vals.next() {
        match network_request(v).await {
            Ok(Some(v)) => return Ok(v),
            Err(e) if vals.peek().is_none() => return Err(e),
            Ok(None) | Err(_) => { /* Do nothing and try the next source */ }
        }
    }

    Err("Ran out of sources".into())
}

fn main() {
    executor::block_on(async {
        match requests_in_sequence(vec![1, 2, 3]).await {
            Ok(v) => println!("First success: {}", v),
            Err(e) => println!("No successful requests: {}", e),
        }
    });
}

See also:


is it a requirement to fully evaluate each Future in the iterator before moving to the next

Isn't that part of your own requirements? Emphasis mine:

Requesting data would check with each of the sources, in turn. If the first source had an error (Err), or did not have the data available (None), then the second source would be tried