“分而治之”是一個有效的處理大數(shù)據(jù)的方法,著名的MapReduce就是采用這種分而治之的思路. 簡單點說,如果要處理的1000個數(shù)據(jù),但是我們不具備處理1000個數(shù)據(jù)的能力,可以只處理10個數(shù)據(jù), 可以把這1000個數(shù)據(jù)分階段處理100次,每次處理10個,把100次的處理結(jié)果進(jìn)行合成,形成最后這1000個數(shù)據(jù)的處理結(jié)果。
把一個大任務(wù)調(diào)用fork()方法分解為若干小的任務(wù),把小任務(wù)的處理結(jié)果進(jìn)行join()合并為大任務(wù)的結(jié)果。
系統(tǒng)對ForkJoinPool線程池進(jìn)行了優(yōu)化,提交的任務(wù)數(shù)量與線程的數(shù)量不一定是一對一關(guān)系.在多數(shù)情況下,一個物理線程實際上需要處理多個邏輯任務(wù)。
ForkJoinPool線程池中最常用的方法是:
ForkJoinTask submit(ForkJoinTask task) 向線程池提交一個ForkJoinTask任務(wù). ForkJoinTask任務(wù)支持fork()分解與join()等待的任務(wù). ForkJoinTask有兩個重要的子類:RecursiveAction和 RecursiveTask ,它們的區(qū)別在于RecursiveAction任務(wù)沒有返回值, RecursiveTask 任務(wù)可以帶有返回值。
package com.wkcto.threadpool;
import java.util.ArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
/**
* 演示ForkJoinPool線程池的使用
* 使用該線程池模擬數(shù)列求和
*/
public class Test09 {
//計算數(shù)列的和, 需要返回結(jié)果,可以定義任務(wù)繼承RecursiveTask
private static class CountTask extends RecursiveTask<Long>{
private static final int THRESHOLD = 10000; //定義數(shù)據(jù)規(guī)模的閾值,允許計算10000個數(shù)內(nèi)的和,超過該閾值的數(shù)列就需要分解
private static final int TASKNUM = 100; //定義每次把大任務(wù)分解為100個小任務(wù)
private long start; //計算數(shù)列的起始值
private long end; //計算數(shù)列的結(jié)束值
public CountTask(long start, long end) {
this.start = start;
this.end = end;
}
//重寫RecursiveTask類的compute()方法,計算數(shù)列的結(jié)果
@Override
protected Long compute() {
long sum = 0 ; //保存計算的結(jié)果
//判斷任務(wù)是否需要繼續(xù)分解,如果當(dāng)前數(shù)列end與start范圍的數(shù)超過閾值THRESHOLD,就需要繼續(xù)分解
if ( end - start < THRESHOLD){
//小于閾值可以直接計算
for (long i = start ; i <= end; i++){
sum += i;
}
}else { //數(shù)列范圍超過閾值,需要繼續(xù)分解
//約定每次分解成100個小任務(wù),計算每個任務(wù)的計算量
long step = (start + end ) / TASKNUM;
//start = 0 , end = 200000, step = 2000, 如果計算[0,200000]范圍內(nèi)數(shù)列的和, 把該范圍的數(shù)列分解為100個小任務(wù),每個任務(wù)計算2000個數(shù)即可
//注意,如果任務(wù)劃分的層次很深,即THRESHOLD閾值太小,每個任務(wù)的計算量很小,層次劃分就會很深,可能出現(xiàn)兩種情況:一是系統(tǒng)內(nèi)的線程數(shù)量會越積越多,導(dǎo)致性能下降嚴(yán)重; 二是分解次數(shù)過多,方法調(diào)用過多可能會導(dǎo)致棧溢出
//創(chuàng)建一個存儲任務(wù)的集合
ArrayList<CountTask> subTaskList = new ArrayList<>();
long pos = start; //每個任務(wù)的起始位置
for (int i = 0; i < TASKNUM; i++) {
long lastOne = pos + step; //每個任務(wù)的結(jié)束位置
//調(diào)整最后一個任務(wù)的結(jié)束位置
if ( lastOne > end ){
lastOne = end;
}
//創(chuàng)建子任務(wù)
CountTask task = new CountTask(pos, lastOne);
//把任務(wù)添加到集合中
subTaskList.add(task);
//調(diào)用for()提交子任務(wù)
task.fork();
//調(diào)整下個任務(wù)的起始位置
pos += step + 1;
}
//等待所有的子任務(wù)結(jié)束后,合并計算結(jié)果
for (CountTask task : subTaskList) {
sum += task.join(); //join()會一直等待子任務(wù)執(zhí)行完畢返回執(zhí)行結(jié)果
}
}
return sum;
}
}
public static void main(String[] args) {
//創(chuàng)建ForkJoinPool線程池
ForkJoinPool forkJoinPool = new ForkJoinPool();
//創(chuàng)建一個大的任務(wù)
CountTask task = new CountTask(0L, 200000L);
//把大任務(wù)提交給線程池
ForkJoinTask<Long> result = forkJoinPool.submit(task);
try {
Long res = result.get(); //調(diào)用任務(wù)的get()方法返回結(jié)果
System.out.println("計算數(shù)列結(jié)果為:" + res);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
//驗證
long s = 0L;
for (long i = 0; i <= 200000 ; i++) {
s += i;
}
System.out.println(s);
}
}