更新时间:2023-01-12 17:12:02
所有序列/流处理库都为管道构建提供了非常相似的 API.不同之处在于用于处理多线程和管道组合的 API.
All sequence/stream processing libs are offering very similar API for pipeline building. The differences are in API for handling multi-threading and composition of pipelines.
RxJava 与 Stream 完全不同.在所有 JDK 中,最接近 rx.Observable
的可能是 java.util.stream.Collector
Stream
+CompletableFuture
组合(以处理额外的 monad 层为代价,即必须处理 Stream<CompletableFuture<T>>
和 CompletableFuture<Stream<T>>
).
RxJava is quite different from Stream. Of all JDK things, the closest to rx.Observable
is perhaps java.util.stream.Collector
Stream
+ CompletableFuture
combo (which comes at a cost of dealing with extra monad layer, i. e. having to handle conversion between Stream<CompletableFuture<T>>
and CompletableFuture<Stream<T>>
).
Observable 和 Stream 有很大区别:
There are significant differences between Observable and Stream:
Stream#parallel()
将序列拆分为分区,Observable#subscribeOn()
和 Observable#observeOn()
不会;用 Observable 模拟 Stream#parallel()
行为是很棘手的,它曾经有 .parallel()
方法,但是这种方法引起了很多混乱,以至于 .parallel()
支持已移至单独的存储库:ReactiveX/RxJavaParallel: Experimental Parallel Extensions for RxJava.更多详细信息在另一个答案中.Stream#parallel()
不允许指定要使用的线程池,这与大多数接受可选调度程序的 RxJava 方法不同.由于 JVM 中的所有流实例使用相同的 fork-join 池,因此添加 .parallel()
可能会意外影响程序另一个模块中的行为.Observable#interval()
、Observable#window()
等;这主要是因为 Streams 是基于拉取的,上游无法控制何时向下游发出下一个元素.takeWhile()
、takeUntil()
);使用 Stream#anyMatch()
的解决方法是有限的:它是终端操作,因此每个流不能多次使用它Stream#zip()
操作,这有时非常有用.Files#lines()
和 BufferedReader#lines()
开箱即用,其他类似的场景可以通过构造来自迭代器的流).Observable#using()
);您可以用它包装 IO 流或互斥锁,并确保用户不会忘记释放资源 - 它将在订阅终止时自动释放;Stream 具有 onClose(Runnable)
方法,但您必须手动调用它或通过 try-with-resources 调用它.例如.您必须记住 Files#lines()
必须 包含在 try-with-resources 块中.Stream#parallel()
splits sequence into partitions, Observable#subscribeOn()
and Observable#observeOn()
do not; it is tricky to emulate Stream#parallel()
behavior with Observable, it once had .parallel()
method but this method caused so much confusion that .parallel()
support was moved to separate repository: ReactiveX/RxJavaParallel: Experimental Parallel Extensions for RxJava. More details are in another answer.Stream#parallel()
does not allow to specify a thread pool to use, unlike most of RxJava methods accepting optional Scheduler. Since all stream instances in a JVM use the same fork-join pool, adding .parallel()
can accidentally affect the behaviour in another module of your program.Observable#interval()
, Observable#window()
and many others; this is mostly because Streams are pull-based, and upstream has no control on when to emit next element downstream.takeWhile()
, takeUntil()
); workaround using Stream#anyMatch()
is limited: it is terminal operation, so you can't use it more than once per streamStream#zip()
operation, which is quite useful sometimes.Files#lines()
and BufferedReader#lines()
out of the box though, and other similar scenarios can be managed by constructing Stream from Iterator).Observable#using()
); you can wrap IO stream or mutex with it and be sure that the user will not forget to free the resource - it will be disposed automatically on subscription termination; Stream has onClose(Runnable)
method, but you have to call it manually or via try-with-resources. E. g. you have to keep in mind that Files#lines()
must be enclosed in try-with-resources block.RxJava 与 Streams 显着不同.真正的 RxJava 替代品是 ReactiveStreams 的其他实现,例如.G.Akka 的相关部分.
RxJava differs from Streams significantly. Real RxJava alternatives are other implementations of ReactiveStreams, e. g. relevant part of Akka.
Stream#parallel
有使用非默认分叉连接池的技巧,请参阅 Java 8 并行流中的自定义线程池.
There's trick to use non-default fork-join pool for Stream#parallel
, see Custom thread pool in Java 8 parallel stream.
以上所有内容均基于使用 RxJava 1.x 的经验.现在 RxJava 2.x 在这里,这个答案可能已经过时了.
All of the above is based on the experience with RxJava 1.x. Now that RxJava 2.x is here, this answer may be out-of-date.