你是否也在用Fork-Join

Java7開始引入了Fork/Join框架。

官方定義:Fork/Join框架是一個實現了ExecutorService接口的多線程處理器。它可以把一個大的任務劃分為若干個小的任務併發執行,充分利用可用的資源,進而提高應用的執行效率。

Fork操作

創建子線程,通過將父線程的任務量進行切片,每個fork出來的子線程將分配到一定“片”數的工作量。其中,子線程在完成分配的工作量後,將調用Join,從而匯合都父線程中去。

Join操作

當子線程結束,將調用Join。父線程等待子線程Join操作後繼續向前執行。

大的任務分割成若干小的任務,中文表達就是:

if(任務小){

執行要做的任務

}else(){

任務拆分並執行拆分的任務等待結果

}

任務分割出的子任務會添加到當前工作線程所維護的雙端隊列中,進入隊列的頭部。當一個工作線程的隊列裡暫時沒有任務時,它會隨機從其他工作線程的隊列的尾部獲取一個任務(工作竊取算法)。

要實現一個FrokJoinTask(執行任務類)必須要繼承RecursiveTask或RecursiveAction,根據自己需求將業務代碼放入重寫coupute方法中。RecursiveTask和RecursiveAction都繼承了FrokJoinTask,區別在於RecursiveTask有返回值而RecursiveAction沒有。

做好了調用類可以使用了,調用時需要創建Fork/Join線程池ForkJoinPool(ForkJoinPool是ForkJoin框架中的任務調度器),向線程池中提交一個ForkJoinTask並得到結果。ForkJoinPool的submit方法的入參是一個ForkJoinTask,返回值也是一個ForkJoinTask,可以通過get方法可以獲取到執行結果。

下面是一個小例子,計算數字累加和:

<code>public class CountTaskTmp extends RecursiveTask{

	private static final int THRESHOLD = 2;
	
	private int start;
	
	private int end;
	
	public CountTaskTmp(int start, int end) {
		this.start = start;
		this.end = end;
	}
	@Override
	protected Integer compute() {
		int sum = 0;
		boolean canCompute = (end - start) <= THRESHOLD;
		if (canCompute) {
			for (int i = start; i <= end; i++)
				sum += i;
		} else {
			//如果任務大於閥值,就分裂成兩個子任務計算
			int mid = (start + end) / 2;
			CountTask leftTask = new CountTask(start, mid);
			CountTask rightTask = new CountTask(mid+1, end);
			
			//執行子任務
			leftTask.fork();
			rightTask.fork();
			
			//等待子任務執行完,並得到結果
			int leftResult = (int)leftTask.join();
			int rightResult = (int)rightTask.join();
			sum = leftResult + rightResult;
		}
		return sum;
	}
	

	public static void main(String[] args) {
		ForkJoinPool forkJoinPool = new ForkJoinPool();
    //生成一個計算資格,負責計算1+2+3+4  
    CountTask task = new CountTask(1, 4); 
    Future result = forkJoinPool.submit(task);
    try {
       System.out.println(result.get());
		} catch (Exception e) {
		}
	}
}/<code>


分享到:


相關文章: