
且构网 - 分享程序员编程开发的那些事

Project Reactor 中的背压是如何工作的?

更新时间:2022-10-19 19:48:20


背压或消费者向生产者发送信号的能力发射率太高 - Reactor Reference


通常热源不尊重订阅者的需求,因为它们经常产生实时数据,例如收听 Twitter 提要.在这个例子中,订阅者无法控制推文的创建速度,因此很容易被淹没.

另一方面,冷源通常在订阅发生时按需生成数据,例如发出 HTTP 请求然后处理响应.在这种情况下,您正在调用的 HTTP 服务器只会在您发送请求后发送响应.




给定一个产生从 1 到 Integer.MAX_VALUE 的数字的 Flux 并给定一个需要 100 毫秒来处理单个元素的处理步骤:

Flux.range(1, Integer.MAX_VALUE).日志().concatMap(x -> Mono.delay(Duration.ofMillis(100)), 1)//模拟处理需要时间.blockLast();


[ INFO] (main) |onSubscribe([同步可熔断] FluxRange.RangeSubscription)[信息](主要)|请求(1)[信息](主要)|onNext(1)[信息](主要)|请求(1)[信息](主要)|onNext(2)[信息](并行 1) |请求(1)[信息](并行 1) |onNext(3)[信息](并行2)|请求(1)[信息](并行2)|onNext(4)[信息](并行3)|请求(1)[信息](并行3)|onNext(5)

我们可以看到,在每次 onNext 之前都有一个请求.请求信号由 concatMap 操作符发送.当 concatMap 完成当前元素并准备好接受下一个元素时,它会发出信号.源仅在收到来自下游的请求时才发送下一项.



为了简单起见,我为这个例子选择了一个易于理解的冷发布者.它是 Flux.interval 每指定的时间间隔发出一项.这个冷酷的发布者不尊重需求是有道理的,因为看到项目以不同的、比最初指定的更长的时间间隔发布是很奇怪的.


Flux.interval(Duration.ofMillis(1)).日志().concatMap(x -> Mono.delay(Duration.ofMillis(100))).blockLast();

Source 每毫秒发出一项.订阅者能够每 100 毫秒处理一项.很明显,订阅者跟不上生产者,我们很快就会收到类似这样的异常:

reactor.core.Exceptions$OverflowException:由于缺少请求,无法发出滴答 32(间隔不支持补充速度慢于滴答的小型下游请求)在reactor.core.Exceptions.failWithOverflow(Exceptions.java:215)...



默认的背压策略是我们在上面看到的一种:以错误终止.Reactor 不会对我们强制执行任何错误处理策略.当我们看到这种错误时,我们可以决定哪一个最适合我们的用例.

您可以在 Reactor 参考中找到其中的几个.


Flux.interval(Duration.ofMillis(1)).onBackpressureDrop().concatMap(a -> Mono.delay(Duration.ofMillis(100)).thenReturn(a)).doOnNext(a -> System.out.println("消费者保留的元素:" + a)).blockLast();



我们可以看到,在前 32 项之后有一个相当大的跳跃到 2400.由于定义的策略,它们之间的元素被丢弃了.


  • 背压通常是自动的,我们不需要做任何事情,因为我们是按需获取数据的.
  • 如果来源不尊重订阅者的需求,我们需要定义一个策略来避免终止错误.


I've been working in Spring Reactor and had some previous testing that made me wonder how Fluxes handle backpressure by default. I know that onBackpressureBuffer and such exist, and I have also read that RxJava defaults to unbounded until you define whether to buffer, drop, etc.

So, can anyone clarify for me: What is the default backpressure behavior for a Flux in Reactor 3?

I tried searching for the answer but didn't find any clear answers, only definitions of Backpressure or that answer linked above for RxJava

What is back-pressure?

Backpressure or the ability for the consumer to signal the producer that the rate of emission is too high - Reactor Reference

When we are talking about backpressure we have to separate sources/publishers into two groups: the ones that respect the demand from the subscriber, and those that ignore it.

Generally hot sources do not respect subscriber demand, since they often produce live data, like listening into a Twitter feed. In this example the subscriber doesn't have control over at what rate tweets are created, so it could easily get overwhelmed.

On the other hand a cold source usually generates data on demand when subscription happens, like making an HTTP request and then processing the response. In this case the HTTP server you are calling will only send a response after you sent your request.

Important to note that this is not a rule: not every hot source ignores the demand and not every cold source respects it. You can read more on hot and cold sources here.

Let's look at some examples that might help in understanding.

Publisher that respects the demand

Given a Flux that produces numbers from 1 to Integer.MAX_VALUE and given a processing step that takes 100ms to process a single element:

Flux.range(1, Integer.MAX_VALUE)
    .concatMap(x -> Mono.delay(Duration.ofMillis(100)), 1) // simulate that processing takes time

Let's see the logs:

[ INFO] (main) | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
[ INFO] (main) | request(1)
[ INFO] (main) | onNext(1)
[ INFO] (main) | request(1)
[ INFO] (main) | onNext(2)
[ INFO] (parallel-1) | request(1)
[ INFO] (parallel-1) | onNext(3)
[ INFO] (parallel-2) | request(1)
[ INFO] (parallel-2) | onNext(4)
[ INFO] (parallel-3) | request(1)
[ INFO] (parallel-3) | onNext(5)

We can see that before every onNext there is a request. The request signal is sent by concatMap operator. It is signaled when concatMap finished the current element and ready to accept the next one. The source only sends the next item when it receives a request from the downstream.

In this example backpressure is automatic, we don't need to define any strategy because the operator knows what it can handle and the source respects it.

Publisher that ignores the demand and no backpressure strategy is defined

For the sake of simplicity I selected an easy to understand cold publisher for this example. It's Flux.interval which emits one item per the specified time interval. It makes sense that this cold publisher does not respect demand since it would be quite strange to see items emitted by different, longer intervals than the one originally specified.

Let's see the code:

    .concatMap(x -> Mono.delay(Duration.ofMillis(100)))

Source emits one item every millisecond. Subscriber is able to process one item every 100 milliseconds. It's clear that the subscriber is not able to keep up with the producer and we get an exception something like this quite soon:

reactor.core.Exceptions$OverflowException: Could not emit tick 32 due to lack of requests (interval doesn't support small downstream requests that replenish slower than the ticks)
    at reactor.core.Exceptions.failWithOverflow(Exceptions.java:215)

What can we do to avoid this exception?

Publisher that ignores the demand and backpressure strategy is defined

The default backpressure strategy is the one we have seen above: terminating with error. Reactor does not enforce any error handling strategy on us. When we see this kind of error we can decide which one is the most applicable for our use case.

You can find a couple of them in Reactor reference.

For this example we will use the simplest one: onBackpressureDrop.

    .concatMap(a -> Mono.delay(Duration.ofMillis(100)).thenReturn(a))
    .doOnNext(a -> System.out.println("Element kept by consumer: " + a))


Element kept by consumer: 0
Element kept by consumer: 1
Element kept by consumer: 2
Element kept by consumer: 3
Element kept by consumer: 4
Element kept by consumer: 5
Element kept by consumer: 6
Element kept by consumer: 7
Element kept by consumer: 8
Element kept by consumer: 9
Element kept by consumer: 10
Element kept by consumer: 11
Element kept by consumer: 12
Element kept by consumer: 13
Element kept by consumer: 14
Element kept by consumer: 15
Element kept by consumer: 16
Element kept by consumer: 17
Element kept by consumer: 18
Element kept by consumer: 19
Element kept by consumer: 20
Element kept by consumer: 21
Element kept by consumer: 22
Element kept by consumer: 23
Element kept by consumer: 24
Element kept by consumer: 25
Element kept by consumer: 26
Element kept by consumer: 27
Element kept by consumer: 28
Element kept by consumer: 29
Element kept by consumer: 30
Element kept by consumer: 31
Element kept by consumer: 2399
Element kept by consumer: 2400
Element kept by consumer: 2401
Element kept by consumer: 2402
Element kept by consumer: 2403
Element kept by consumer: 2404
Element kept by consumer: 2405
Element kept by consumer: 2406
Element kept by consumer: 2407

We can see that after the first 32 items there is a quite big skip to 2400. The elements between are dropped due to the defined strategy.

Key takeaways

  • Back pressure is often automatic and we don't need to do anything since we get data on demand.
  • In case of sources which do not respect subscriber demand we need to define a strategy to avoid terminating error.

UPDATE: Useful read: How to control request rate