且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

java Fork / Join池,ExecutorService和CountDownLatch

更新时间:2023-12-03 13:37:16

在研究过去3个月的各种多线程框架后,我找到了问题的答案。

After research on various multi threading frameworks for past 3 months , I have found answer to question.

ExecutorService

使用有限的控制,它简单易用。您可以使用它

It is simple and easy to use with limited control. You can use it


  1. 在没有等待的情况下启动并行独立任务

  2. 等待所有完成你的任务

Callable / Runnable 任务的数量为0时,我更喜欢这个数量少,无限队列中的任务堆积不会导致内存堆积不足。降低系统的性能。

I prefer this one when number of Callable/Runnable tasks are small in number and piling of tasks in unbounded queue does not cause pile-up in memory & degrade the performance of the system.

它隐藏了 ThreadPoolExecutor 的低级详细信息。它不允许使用其他参数(有界队列,拒绝处理程序等来微调性能),如 ThreadPoolExectuor

It hides low level details of ThreadPoolExecutor. It does not allow playing with other parameters ( Bounded Queue, Rejection Handler etc. to fine tune the performance) as in ThreadPoolExectuor.

ThreadPoolExecutor

ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, 
TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
RejectedExecutionHandler handler)

它提供更多控制你。除了设置最小和最大线程,您可以设置队列大小并使 BlockingQueue 有界。

It provides more control to you. Apart from setting minimum and maximum threads, you can set queue size and make BlockingQueue is bounded.

你可以来如果您需要以下功能,请使用您自己的线程工厂

You can come up with your own thread factory if you need below features


  1. 设置更具描述性的线程名称

  2. 设置线程守护程序状态

  3. 设置线程优先级

如果您的应用程序受到约束待处理的Runnable / Callable任务数量,您将通过设置最大容量来使用有界队列。队列达到最大容量后,您可以定义RejectionHandler。 Java提供了四种类型的拒绝处理程序政策

If your application is constrained by number of pending Runnable/Callable tasks, you will use bounded queue by setting the max capacity. Once the queue reaches maximum capacity, you can define RejectionHandler. Java provides four types of Rejection Handler policies.


  1. 在默认的 ThreadPoolExecutor.AbortPolicy 中,处理程序拒绝后抛出运行时RejectedExecutionException。

  1. In the default ThreadPoolExecutor.AbortPolicy, the handler throws a runtime RejectedExecutionException upon rejection.

ThreadPoolExecutor.CallerRunsPolicy 中,调用execute本身的线程运行任务。这提供了一种简单的反馈控制机制,可以降低新任务的提交速度。

In ThreadPoolExecutor.CallerRunsPolicy, the thread that invokes execute itself runs the task. This provides a simple feedback control mechanism that will slow down the rate that new tasks are submitted.

ThreadPoolExecutor.DiscardPolicy ,简单地删除了无法执行的任务。

In ThreadPoolExecutor.DiscardPolicy, a task that cannot be executed is simply dropped.

ThreadPoolExecutor.DiscardOldestPolicy ,如果执行程序未关闭,则删除工作队列头部的任务,然后重试执行(可能会再次失败,导致重复执行。)

In ThreadPoolExecutor.DiscardOldestPolicy, if the executor is not shut down, the task at the head of the work queue is dropped, and then execution is retried (which can fail again, causing this to be repeated.)

CountDownLatch

CountDownLatch :此框架允许java线程等到其他一组线程完成他们的任务。

CountDownLatch : This framework allows a java thread to wait until other set of threads completes their tasks.

用例:


  1. 实现最大并行度:有时我们希望同时启动多个线程以实现最大并行度

  1. Achieving Maximum Parallelism: Sometimes we want to start a number of threads at the same time to achieve maximum parallelism

在开始执行其他代码块之前等待N个线程完成

死锁检测。

更多细节列于此 article

ForkJoinPool

ForkJoinPool 与Java ExecutorService类似,但有一点不同。 ForkJoinPool 使任务可以轻松地将其工作分成较小的任务,然后将这些任务提交给ForkJoinPool。当忙工作线程从繁忙的工作线程队列中窃取任务时,在ForkJoinPool中发生任务窃取。

The ForkJoinPool is similar to the Java ExecutorService but with one difference. The ForkJoinPool makes it easy for tasks to split their work up into smaller tasks which are then submitted to the ForkJoinPool too. Task stealing happens in ForkJoinPool when free worker threads steal tasks from busy worker thread queue.

public ForkJoinPool(int parallelism,
            ForkJoinPool.ForkJoinWorkerThreadFactory factory,
            Thread.UncaughtExceptionHandler handler,
            boolean asyncMode)
Creates a ForkJoinPool with the given parameters.

参数:

并行度 - 并行度级别。对于默认值,请使用 Runtime.availableProcessors()

parallelism - the parallelism level. For default value, use Runtime.availableProcessors().

factory - 工厂用于创建新线程。对于默认值,请使用
defaultForkJoinWorkerThreadFactory。

factory - the factory for creating new threads. For default value, use defaultForkJoinWorkerThreadFactory.

handler - 由于不可恢复的错误而终止的内部工作线程的处理程序

handler - the handler for internal worker threads that terminate due to unrecoverable errors

asyncMode - 如果为true,则为从未加入的分叉任务建立本地先进先出调度模式

asyncMode - if true, establishes local first-in-first-out scheduling mode for forked tasks that are never joined.

关于主查询:

您可以使用 ExecutorService.invokeAll() CountDownLatch 框架或 ForkJoinPool 。所有这些框架互不相同,以控制任务从高级别到低级别的执行。

You can use ExecutorService.invokeAll() or CountDownLatch framework or ForkJoinPool . All these frameworks are complimentary to each other varying of granularity to control the execution of tasks from high level to low level.

EDIT :

查看相关的SE问题:

使用ExecutorService有什么好处?

Java's Fork / Join vs ExecutorService - 何时使用哪个?