且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

使并行IntStream更有效率/更快?

更新时间:2023-11-29 16:04:46

使用代码进行有效的并行处理存在两个普遍问题.首先,使用 iterate ,这不可避免地需要前一个元素来计算下一个元素,而这并不是并行处理的良好起点.其次,您正在使用无限流.有效的工作负载划分至少需要估计要处理的元素数量.

There are two general problems for efficient parallel processing with your code. First, using iterate, which unavoidably requires the previous element to calculate the next one, which is not a good starting point for parallel processing. Second, you are using an infinite stream. Efficient workload splitting requires at least an estimate of the number of element to process.

由于您正在处理升序整数,因此达到 Integer.MAX_VALUE 时有一个明显的限制,但是流实现不知道您实际上是在处理升序数,因此将处理您的形式上无限的流与真正无限的流一样.

Since you are processing ascending integer numbers, there is an obvious limit when reaching Integer.MAX_VALUE, but the stream implementation doesn’t know that you are actually processing ascending numbers, hence, will treat your formally infinite stream as truly infinite.

解决这些问题的解决方案是

A solution fixing these issues, is

public static IntStream stream() {
    return IntStream.rangeClosed(1, Integer.MAX_VALUE/2).parallel()
            .map(i -> i*2+1)
            .filter(i -> i % 3 != 0).mapToObj(BigInteger::valueOf)
            .filter(i -> i.isProbablePrime(1))
            .mapToInt(BigInteger::intValue);
}

,但必须强调的是,以这种形式,该解决方案仅在您确实要处理完整整数范围内的所有或大多数质数时才有用.一旦对流应用 skip limit ,并行性能将大大下降,这是这些方法的文档所指定的.另外,对带有仅接受较小数值范围内的值的谓词使用 filter 意味着,将有很多不必要的工作不如并行完成.

but it must be emphasized that in this form, this solution is only useful if you truly want to process all or most of the prime numbers in the full integer range. As soon as you apply skip or limit to the stream, the parallel performance will drop significantly, as specified by the documentation of these methods. Also, using filter with a predicate that accepts values in a smaller numeric range only, implies that there will be a lot of unnecessary work that would better not be done than done in parallel.

您可以调整该方法以接收值范围作为参数,以调整源 IntStream 的范围来解决此问题.

You could adapt the method to receive a value range as parameter to adapt the range of the source IntStream to solve this.

现在该强调算法比并行处理的重要性了.考虑剑灵筛子.以下实现

This is the time to emphasize the importance of algorithms over parallel processing. Consider the Sieve of Eratosthenes. The following implementation

public static IntStream primes(int max) {
    BitSet prime = new BitSet(max>>1);
    prime.set(1, max>>1);
    for(int i = 3; i<max; i += 2)
        if(prime.get((i-1)>>1))
            for(int b = i*3; b>0 && b<max; b += i*2) prime.clear((b-1)>>1);
    return IntStream.concat(IntStream.of(2), prime.stream().map(i -> i+i+1));
}

即使不使用并行处理,即使使用 Integer.MAX_VALUE 作为上限,

仍比其他方法快一个数量级(使用的终端操作测量).reduce((a,b)-> b)而不是 toArray forEach(System.out :: println),以确保完整处理所有值,而不会增加额外的存储或打印成本.

turned out to be faster by an order of magnitude compared to the other approaches despite not using parallel processing, even when using Integer.MAX_VALUE as upper bound (measured using a terminal operation of .reduce((a,b) -> b) instead of toArray or forEach(System.out::println), to ensure complete processing of all values without adding additional storage or printing costs).

要点是,当您有特定的应聘者或想要处理较小范围的数字时(或者当数字位于 int 之外时,或者 isProbablePrime 很好),甚至 long 范围)¹,但是对于处理较大的质数升序序列,有更好的方法,而并行处理并不是解决性能问题的最终答案.

The takeaway is, isProbablePrime is great when you have a particular candidate or want to process a small range of numbers (or when the number is way outside the int or even long range)¹, but for processing a large ascending sequence of prime numbers there are better approaches, and parallel processing is not the ultimate answer to performance questions.

¹考虑例如

Stream.iterate(new BigInteger("1000000000000"), BigInteger::nextProbablePrime)
      .filter(b -> b.isProbablePrime(1))