且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

订阅具有异步功能的可观察序列

更新时间:2022-03-29 09:35:29

订户不应长时间运行,因此不支持在订阅处理程序中执行长时间运行的异步方法.

Subscribers are not supposed to be long running, and therefore there isn't support for executing long running async methods in the Subscribe handlers.

相反,应将异步方法视为可从另一个序列获取值的单个值可观察序列.现在您可以编写序列,这是Rx设计要做的.

Instead, consider your async method to be a single value observable sequence that takes a value from another sequence. Now you can compose sequences, which is what Rx was designed to do.

现在,您已经取得了飞跃,您可能会拥有类似@Reijher在

Now that you have made that leap, you will probably have something like what @Reijher creates in Howto call back async function from rx subscribe?.

他的代码分解如下.

//The input sequence. Produces values potentially quicker than consumer
Observable.Interval(TimeSpan.FromSeconds(1))
      //Project the event you receive, into the result of the async method
      .Select(l => Observable.FromAsync(() => asyncMethod(l)))
      //Ensure that the results are serialized
      .Concat()
      //do what you will here with the results of the async method calls
      .Subscribe();

在这种情况下,您正在创建隐式队列.在生产者比消费者快的任何问题中,都需要在等待时使用队列来收集值.就我个人而言,我更喜欢通过将数据放入队列来使其明确.另外,您可以显式地使用Scheduler来发出信号,该信号应该是应该吸收松弛的线程模型.

In this scenario, you are creating implicit queues. In any problem where the producer is faster than the consumer, a queue will need to be used to collect values while waiting. Personally I prefer to make this explicit by putting data into a queue. Alternatively you could explicitly use a Scheduler to signal that is the threading model that should be picking up the slack.

对于Rx新手来说,这似乎是一个流行的障碍(在订阅处理程序中执行异步).出于许多原因,指南不将其放入您的订户中,例如:1.打破错误模型2.您正在混合异步模型(rx在这里,任务在那里)3.订阅是异步序列组成的使用者.异步方法只是单个值序列,因此,由于该视图不能成为序列的结尾,因此结果可能是

This seems to be a popular hurdle (executing async in a subscribe handler) for Rx newcomers. There are many reasons that the guidance is to not put them in your subscriber, for example: 1. you break the error model 2. you are mixing async models (rx here, task there) 3. subscribe is the consumer of a composition of async sequences. An async method is just a single value sequence, so by that view cant be the end of the sequence, it's result might be though.

更新

为了说明有关破坏错误模型的注释,这里是对OP示例的更新.

To illustrate the comment about breaking the error model here is an update of the OP sample.

void Main()
{
    var ob = Observable.Interval(TimeSpan.FromMilliseconds(100));
    var d = ob.Subscribe(
        x => ConsumeThrows(x).Wait(),
        ex=> Console.WriteLine("I will not get hit"));

    Thread.Sleep(10000);
    d.Dispose();
}

static async Task<Unit> ConsumeThrows(long count)
{
    return await Task.FromException<Unit>(new Exception("some failure"));
    //this will have the same effect of bringing down the application.
    //throw new Exception("some failure");
}

在这里我们可以看到,如果要抛出 OnNext 处理程序,那么我们将不受Rx OnError 处理程序的保护.该异常将无法处理,很可能会导致应用程序崩溃.

Here we can see that if the OnNext handler was to throw, then we are not protected by our Rx OnError handler. The exception would be unhandled and most likely bring down the application.