且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

转换一个IEnumerable的< T>到的IObservable<吨>中具有最大并行

更新时间:2022-05-12 05:50:28

修改

下面应该工作。 此重载限制并发用户数量。

The following should work. This overload limits the number of concurrent subscriptions.

var resultObservable = pages
  .Select(p => Observable.FromAsync(() => GetPage(p)))
  .Merge(maxConcurrent);

说明

为了理解为什么需要这种改变,我们需要一些背景

Explanation

In order to understand why this change is needed we need some background


  1. FromAsync 返回一个可观察的,将调用传递函数功能 every一次订阅的的。这意味着,如果观察到从未订阅,它永远不会被调用

  1. FromAsync returns an observable that will invoke the passed Func every time it is subscribed to. This implies that if the observable is never subscribed to, it will never be invoked.

合并急切地读取源序列,只有的订阅 N 同时观测。

Merge eagerly reads the source sequence, and only subscribes to n observables simultaneously.

有了这两条,我们可以知道为什么原来的版本将并行执行的一切:因为(2), GETPAGE 将已调用的所有源字符串由时间合并决定多少观测需要订阅。

With these two pieces we can know why the original version will execute everything in parallel: because of (2), GetPage will have already been invoked for all the source strings by the time Merge decides how many observables need to be subscribed.

和我们也可以看到,为什么第二个版本的作品:即使序列已完全遍历,(1)意味着 GETPAGE 则不会调用直到合并决定它需要订阅 N 观测。这导致仅的期望的结果ñ任务中同时执行。

And we can also see why the second version works: even though the sequence has been fully iterated over, (1) means that GetPage is not invoked until Merge decides it needs to subscribe to n observables. This leads to the desired result of only n tasks being executed simultaneously.