声明:本文是《 Java 7 Concurrency Cookbook 》的第八章, 作者: Javier Fernández González 译者:郑玉婷
监控Fork/Join池
Executor 框架提供了线程的创建和管理,来实现任务的执行机制。Java 7 包括了一个 Executor 框架的延伸为了一种特别的问题而使用的,将比其他解决方案的表现有所提升(可以直接使用 Thread 对象或者 Executor 框架)。它是 Fork/Join 框架。
此框架是为了解决可以使用 divide 和 conquer 技术,使用 fork() 和 join() 操作把任务分成小块的问题而设计的。主要的类实现这个行为的是 ForkJoinPool 类。
在这个指南,你将学习从ForkJoinPool类可以获取的信息和如何获取这些信息。
准备
指南中的例子是使用Eclipse IDE 来实现的。如果你使用Eclipse 或者其他的IDE,例如NetBeans, 打开并创建一个新的java任务。
怎么做呢…
按照这些步骤来实现下面的例子:
//1. 创建一个类,名为 Task, 扩展 RecursiveAction 类。
01 |
public class Task extends RecursiveAction {
|
11 |
public Task( int array[], int start, int end) {
|
20 |
protected void compute() {
|
21 |
if (end - start > 100 ) {
|
22 |
int mid = (start + end) / 2 ;
|
23 |
Task task1 = new Task(array, start, mid);
|
24 |
Task task2 = new Task(array, mid, end);
|
34 |
for ( int i = start; i < end; i++) {
|
39 |
} catch (InterruptedException e) {
|
//7. 创建例子的主类通过创建一个类,名为 Main 并添加 main()方法。
01 |
import java.util.concurrent.ForkJoinPool;
|
02 |
import java.util.concurrent.TimeUnit;
|
06 |
public static void main(String[] args) throws Exception {
|
09 |
ForkJoinPool pool = new ForkJoinPool();
|
12 |
int array[] = new int [ 10000 ];
|
15 |
Task task1 = new Task(array, 0 , array.length);
|
21 |
while (!task1.isDone()) {
|
23 |
TimeUnit.SECONDS.sleep( 1 );
|
30 |
pool.awaitTermination( 1 , TimeUnit.DAYS);
|
34 |
System.out.printf( "Main: End of the program.\n" );
|
38 |
private static void showLog(ForkJoinPool pool) {
|
39 |
System.out.printf( "**********************\n" );
|
40 |
System.out.printf( "Main: Fork/Join Pool log\n" );
|
41 |
System.out.printf( "Main: Fork/Join Pool: Parallelism:%d\n" ,
|
42 |
pool.getParallelism());
|
43 |
System.out.printf( "Main: Fork/Join Pool: Pool Size:%d\n" ,
|
45 |
System.out.printf( "Main: Fork/Join Pool: Active Thread Count:%d\n" ,
|
46 |
pool.getActiveThreadCount());
|
47 |
System.out.printf( "Main: Fork/Join Pool: Running Thread Count:%d\n" ,
|
48 |
pool.getRunningThreadCount());
|
49 |
System.out.printf( "Main: Fork/Join Pool: Queued Submission:%d\n" ,
|
50 |
pool.getQueuedSubmissionCount());
|
51 |
System.out.printf( "Main: Fork/Join Pool: Queued Tasks:%d\n" ,
|
52 |
pool.getQueuedTaskCount());
|
53 |
System.out.printf( "Main: Fork/Join Pool: Queued Submissions:%s\n" ,
|
54 |
pool.hasQueuedSubmissions());
|
55 |
System.out.printf( "Main: Fork/Join Pool: Steal Count:%d\n" ,
|
56 |
pool.getStealCount());
|
57 |
System.out.printf( "Main: Fork/Join Pool: Terminated :%s\n" ,
|
59 |
System.out.printf( "**********************\n" );
|
它是如何工作的…
在这个指南,你实现了任务 使用 ForkJoinPool 类和一个扩展 RecursiveAction 类的 Task 类来增加array的元素;它是 ForkJoinPool 类执行的任务种类之一。当任务还在处理array时,你就把关于 ForkJoinPool 类的状态信息打印到操控台。
你使用了ForkJoinPool类中的以下方法:
- getPoolSize(): 此方法返回 int 值,它是ForkJoinPool内部线程池的worker线程们的数量。
- getParallelism(): 此方法返回池的并行的级别。
- getActiveThreadCount(): 此方法返回当前执行任务的线程的数量。
- getRunningThreadCount():此方法返回没有被任何同步机制阻塞的正在工作的线程。
- getQueuedSubmissionCount(): 此方法返回已经提交给池还没有开始他们的执行的任务数。
- getQueuedTaskCount(): 此方法返回已经提交给池已经开始他们的执行的任务数。
- hasQueuedSubmissions(): 此方法返回 Boolean 值,表明这个池是否有queued任务还没有开始他们的执行。
- getStealCount(): 此方法返回 long 值,worker 线程已经从另一个线程偷取到的时间数。
- isTerminated(): 此方法返回 Boolean 值,表明 fork/join 池是否已经完成执行。