目录
- Java ForkJoinPool线程池的使用之并行计算数组求和
- ForkJoinPool 示例代码解析
- 代码结构分析
- 1. 初始化部分
- 2. RecursiveAction 实现(无返回值)
- 3. RecursiveTask 实现(有返回值)
- 4. 主方法
- 关键概念解释
- 执行流程
- 使用建议
- 总结
Java ForkJoinPool线程池的使用之并行计算数组求和
package com.zhangxueliang.juc; import java.io.IOException; import java.util.Arrays; import java.util.Random; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveAction; import java.util.concurrent.RecursiveTask; public class ForkJoinPoolDemo { static int[] nums = new int[1000000]; static final int MAX_NUM = 50000; static Random r = new Random(); static { for(int i=0; i<nums.length; i++) { nums[i] = r.nextInt(100); } System.out.println("---" + Arrays.stream(nums).sum()); //stream api } static class AddTask extends RecursiveAction { int start, end; AddTask(int s, int e)编程客栈 { start = s; end = e; } @Override protected void compute() { if(end-start <= MAX_NUM) { long sum = 0L; for(int i=start; i<end; i++) sum +编程= nums[i]; System.out.println("from:" + start + " to:" + end + " = " + sum); } else { int middle = start + (end-start)/2; AddTask subTask1 = new AddTask(start, middle); AddTask subTask2 = new AddTask(middle, end); subTask1.fork(); subTask2.fork(); } } } static class AddTaskRet extends RecursiveTask<Long> { private static final long serialVersionUID = 1L; int start, end; AddTaskRet(int s, int e) { start = s; end = e; } @Override protected Long compute() { if(end-start <= MAX_NUM) { long sum = 0L; for(int i=start; i<end; i++) sum += nums[i]; return sum; } int middle = start + (end-start)/2; AddTaskRet subTask1 = new AddTaskRet(start, middle); AddTaskRet subTask2 = new AddTaskRet(middle, end); subTask1.fork(); subTask2.fork(); return subTask1.join() + subTask2.join(); } } public static void main(String[] args) throws IOException { /*ForkJoinPool fjp = new ForkJoinPool(); AddTask task = new AddTask(0, nums.length); fjp.execute(task);*/ ForkJoinPoolDemo temp = new ForkJoinPoolDemo(); ForkJoinPool fjp = new ForkJoinPool(); AddTaskRet task = new AddTaskRet(0, nums.length); fjp.execute(task); long result = task.join(); System.out.println(result); //System.in.read(); } }
ForkJoinPool 示例代码解析
这段代码演示了 Java 中 ForkJoinPool
框架的使用,展示了两种不同的任务分割方式:
RecursiveAction
(无返回值)RecursiveTask
(有返回值)
代码结构分析
1. 初始化部分
static int[] nums = new int[1000000]; // 创建包含100万个元素的数组 static final int MAX_NUM = 50000; // 任务分割的阈值 static Random r = new Random(); // 随机数生成器 // 静态初始化块:填充数组并计算总和 static { for(int i=0; i<nums.length; i++) { nums[i] = r.nextInt(100); // 每个元素赋值为0-99的随机数 php } System.out.println("---" + Arrays.stream(nums).sum()); // 使用stream API计算总和作为验证基准 }
2. RecursiveAction 实现(无返回值)
static class AddTask extends RecursiveAction { int start, end; AddTask(int s, int e) { start = s; end = e; } @Override protected void compute() { if(end-start <= MAX_NUM) { // 如果任务足够小,直接计算 long sum = 0L; for(int i=start; i<end; i++) sum += nums[i]; System.out.println("from:" + start + " to:" + end + " = " + sum); } else { // 否则分割任务 int middle = start + (end-start)/2; AddTask subTask1 = new AddTask(starjst, middle); AddTask subTask2 = new AddTask(middle, end); subTask1.fork(); // 异步执行子任务 subTask2.fork(); } } }
3. RecursiveTask 实现(有返回值)
static class AddTaskRet extends RecursiveTask<Long> { int start, end; AddTaskRet(int s, int e) { start = s; end = e; } @Override protected Long compute() { if(end-start <= MAX_NUM) { // 如果任务足够小,直接计算并返回结果 long sum = 0L; for(int i=start; i<end; i++) sum += nums[i]; return sum; } // 分割任务 int middle = start + (end-start)/2; AddTaskRet subTask1 = new AddTaskRet(start, middle); AddTaskRet subTask2 = new AddTaskRet(middle, end); subTask1.fork(); // 异步执行子任务 subTask2.fork(); return subTask1.join() + subTask2.join(); // 合并子任务结果 } }
4. 主方法
public static void main(String[] args) throws IOException { // 创建ForkJoinPool实例 ForkJoinPool fjp = new ForkJoinPool(); // 创建有返回值的任务 AddTaskRet task = new AddTaskRet(0, nums.length); // 执行任务 fjp.execute(task); // 获取并打印结果 long result = task.join(); System.out.println(result); }
关键概念解释
ForkJoinPool:
- Java 7引入的线程池实现
- 使用工作窃取(work-stealing)算法提高并行效率
- 特别适合分治(divide-and-conquer)算法
Recur编程客栈siveAction:
- 用于不返回结果的任务
- 需要实现
compute()
方法 - 示例中的
AddTask
只打印结果不返回
RecursiveTask:
- 用于需要返回结果的任务
- 需要实现
compute()
方法并返回指定类型 - 示例中的
AddTaskRet
返回子数组的和
fork()和join():
fork()
: 异步安排任务执行join()
: 等待任务完成并获取结果
执行流程
- 初始化一个包含100万个随机数的数组
- 使用Stream API计算总和作为基准
- 创建ForkJoinPool
- 创建AddTaskRet任务,范围是整个数组
- 任务会根据MAX_NUM阈值(50000)不断分割,直到足够小
- 小任务直接计算子数组和
- 合并所有子任务的结果得到最终总和
- 打印结果(应与Stream API计算的结果一致)
使用建议
- 对于计算密集型任务,ForkJoinPool通常比传统线程池更高效
- 任务分割的阈值需要合理设置,太小会导致过多任务创建开销,太大会降低并行度
- 有返回结果需求时使用RecursiveTask,否则使用RecursiveAction
- 注意join()是阻塞调用,会等待任务完成
这段代码很好地展示了ForkJoin框架的分治思想和使用方法,是并行计算数组求和的经典示例。
总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持编程客栈(www.devze.com)。
精彩评论