且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

可以在 rxjava 中转换以下代码吗

更新时间:1970-01-01 07:57:48

我们来看看... 一、代码:

Let's see... First, the code:

package rxtest;

import static io.reactivex.Flowable.generate;
import static io.reactivex.Flowable.just;

import java.util.List;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import io.reactivex.Emitter;
import io.reactivex.Scheduler;
import io.reactivex.schedulers.Schedulers;

public class Main {

    private static final Scheduler SCHEDULER = Schedulers.from(Executors.newFixedThreadPool(10));

    private static class DatabaseProducer {
        private int offset = 0;
        private int limit = 100;

        void fetchDataFromDb(Emitter<List<Integer>> queue) {
            System.out.println(Thread.currentThread().getName() + " fetching "+offset);
            queue.onNext(IntStream.range(offset, offset + limit).boxed().collect(Collectors.toList()));
            offset += limit;
        }
    }

    public static void main(String[] args) {
        generate(new DatabaseProducer()::fetchDataFromDb)
        .subscribeOn(Schedulers.io())
        .concatMapIterable(list -> list, 1) // 1 call, no prefetch
        .flatMap(item -> 
                just(item)
                .doOnNext(i -> longRunJob(i))
                .subscribeOn(SCHEDULER)
                , 10) // don't subscribe to more than 10 at a time
        .take(1000)
        .blockingSubscribe();
    }

    private static void longRunJob(Integer i) {
        System.out.println(Thread.currentThread().getName() + " long run job of " + i);
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}

Class DatabaseProducer 只是值的有状态生产者,因为它需要当前的偏移量.这不是绝对必要的,因为 generate 调用可以替换为

Class DatabaseProducer is simply the stateful producer of values, as it needs the current offset. It's not strictly necessary, as the generate call could have been replaced with

        generate(() -> 0, (offset,e) -> {
            e.onNext(IntStream.range(offset, offset + 100).boxed()
                       .collect(Collectors.toList()));
            return offset + 100;
        }, e -> {});

但这几乎没有可读性.

请记住,cocatMapflatMap 可以并且将会预取和预订阅 observables/flowables 直到一个实现相关的限制,即使有没有空闲线程来处理它们——它们只会在调度程序中排队.每次调用上的数字代表我们想要的限制 - concatMap 上的 1 因为我们只想在必要时从数据库中获取(如果你在这里放 2,你可能会过度阅读,但管道中的延迟会更少).

Keep in mind that cocatMap and flatMap can and will pre-fetch and pre-subscribe to observables/flowables up to an implementations-dependent limit, even if there are no free threads to process them - they will simply get queued in the schedulers. The numbers on each call represent the limits that we want to have - 1 on the concatMap because we want to fetch from the database only if it's necessary (if you put here 2, you may over-read, but there will be less latency in the pipeline).

如果你想做 Cpu-bound 计算,那么***使用 Schedulers.computation(),因为它会自动配置为运行 JVM 的系统的 CPU 数量,并且您可以从代码库的其他部分使用它,这样您就不会使处理器过载.

If you want to do Cpu-bound computation, then it's better to use Schedulers.computation(), as that is auto-configured to the number of CPUs of the system the JVM is running on, and you can use it from other parts of your codebase so that you don't overload the processor.