开发者

Java ForkJoinPool线程池的使用之并行计算数组求和实例

开发者 https://www.devze.com 2025-05-29 10:40 出处:网络 作者: 学亮编程手记
目录Java ForkJoinPool线程池的使用之并行计算数组求和ForkJoinPool 示例代码解析代码结构分析1. 初始化部分2. RecursiveAction 实现(无返回值)3. RecursiveTask 实现(有返回值)4. 主方法关键概念解释执行流程使
目录
  • Java ForkJoinPool线程池的使用之并行计算数组求和
    • ForkJoinPool 示例代码解析
    • 代码结构分析
      • 1. 初始化部分
      • 2. RecursiveAction 实现(无返回值)
      • 3. RecursiveTask 实现(有返回值)
      • 4. 主方法
    • 关键概念解释
      • 执行流程
        • 使用建议
        • 总结

          Java ForkJoinPool线程池的使用之并行计算数组求和

          package com.zhangxueliang.juc;
          
          import java.io.IOException;
          import java.util.Arrays;
          import java.util.Random;
          import java.util.concurrent.ForkJoinPool;
          import java.util.concurrent.RecursiveAction;
          import java.util.concurrent.RecursiveTask;
          
          public class ForkJoinPoolDemo {
          	static int[] nums = new int[1000000];
          	static final int MAX_NUM = 50000;
          	static Random r = new Random();
          	
          	static {
          		for(int i=0; i<nums.length; i++) {
          			nums[i] = r.nextInt(100);
          		}
          		
          		System.out.println("---" + Arrays.stream(nums).sum()); //stream api
          	}
          	
          
          	static class AddTask extends RecursiveAction {
          
          		int start, end;
          
          		AddTask(int s, int e)编程客栈 {
          			start = s;
          			end = e;
          		}
          
          		@Override
          		protected void compute() {
          
          			if(end-start <= MAX_NUM) {
          				long sum = 0L;
          				for(int i=start; i<end; i++) sum +编程= nums[i];
          				System.out.println("from:" + start + " to:" + end + " = " + sum);
          			} else {
          
          				int middle = start + (end-start)/2;
          
          				AddTask subTask1 = new AddTask(start, middle);
          				AddTask subTask2 = new AddTask(middle, end);
          				subTask1.fork();
          				subTask2.fork();
          			}
          
          
          		}
          
          	}
          
          	
          	static class AddTaskRet extends RecursiveTask<Long> {
          		
          		private static final long serialVersionUID = 1L;
          		int start, end;
          		
          		AddTaskRet(int s, int e) {
          			start = s;
          			end = e;
          		}
          
          		@Override
          		protected Long compute() {
          			
          			if(end-start <= MAX_NUM) {
          				long sum = 0L;
          				for(int i=start; i<end; i++) sum += nums[i];
          				return sum;
          			} 
          			
          			int middle = start + (end-start)/2;
          			
          			AddTaskRet subTask1 = new AddTaskRet(start, middle);
          			AddTaskRet subTask2 = new AddTaskRet(middle, end);
          			subTask1.fork();
          			subTask2.fork();
          			
          			return subTask1.join() + subTask2.join();
          		}
          		
          	}
          	
          	public static void main(String[] args) throws IOException {
          		/*ForkJoinPool fjp = new ForkJoinPool();
          		AddTask task = new AddTask(0, nums.length);
          		fjp.execute(task);*/
          
          		ForkJoinPoolDemo temp = new ForkJoinPoolDemo();
          
          		ForkJoinPool fjp = new ForkJoinPool();
          		AddTaskRet task = new AddTaskRet(0, nums.length);
          		fjp.execute(task);
          		long result = task.join();
          		System.out.println(result);
          		
          		//System.in.read();
          		
          	}
          }
          

          ForkJoinPool 示例代码解析

          这段代码演示了 Java 中 ForkJoinPool 框架的使用,展示了两种不同的任务分割方式:

          • RecursiveAction(无返回值)
          • RecursiveTask(有返回值)

          代码结构分析

          1. 初始化部分

          static int[] nums = new int[1000000];  // 创建包含100万个元素的数组
          static final int MAX_NUM = 50000;     // 任务分割的阈值
          static Random r = new Random();        // 随机数生成器
          
          // 静态初始化块:填充数组并计算总和
          static {
              for(int i=0; i<nums.length; i++) {
                  nums[i] = r.nextInt(100);  // 每个元素赋值为0-99的随机数
            php  }
              System.out.println("---" + Arrays.stream(nums).sum()); // 使用stream API计算总和作为验证基准
          }

          2. RecursiveAction 实现(无返回值)

          static class AddTask extends RecursiveAction {
              int start, end;
              
              AddTask(int s, int e) {
                  start = s;
                  end = e;
              }
          
              @Override
              protected void compute() {
                  if(end-start <= MAX_NUM) {  // 如果任务足够小,直接计算
                      long sum = 0L;
                      for(int i=start; i<end; i++) sum += nums[i];
                      System.out.println("from:" + start + " to:" + end + " = " + sum);
                  } else {  // 否则分割任务
                      int middle = start + (end-start)/2;
                      AddTask subTask1 = new AddTask(starjst, middle);
                      AddTask subTask2 = new AddTask(middle, end);
                      subTask1.fork();  // 异步执行子任务
                      subTask2.fork();
                  }
              }
          }

          3. RecursiveTask 实现(有返回值)

          static class AddTaskRet extends RecursiveTask<Long> {
              int start, end;
              
              AddTaskRet(int s, int e) {
                  start = s;
                  end = e;
              }
          
              @Override
              protected Long compute() {
                  if(end-start <= MAX_NUM) {  // 如果任务足够小,直接计算并返回结果
                      long sum = 0L;
                      for(int i=start; i<end; i++) sum += nums[i];
                      return sum;
                  } 
                  
                  // 分割任务
                  int middle = start + (end-start)/2;
                  AddTaskRet subTask1 = new AddTaskRet(start, middle);
                  AddTaskRet subTask2 = new AddTaskRet(middle, end);
                  subTask1.fork();  // 异步执行子任务
                  subTask2.fork();
                  
                  return subTask1.join() + subTask2.join();  // 合并子任务结果
              }
          }

          4. 主方法

          public static void main(String[] args) throws IOException {
              // 创建ForkJoinPool实例
              ForkJoinPool fjp = new ForkJoinPool();
              
              // 创建有返回值的任务
              AddTaskRet task = new AddTaskRet(0, nums.length);
              
              // 执行任务
              fjp.execute(task);
              
              // 获取并打印结果
              long result = task.join();
              System.out.println(result);
          }

          关键概念解释

          ForkJoinPool:

          • Java 7引入的线程池实现
          • 使用工作窃取(work-stealing)算法提高并行效率
          • 特别适合分治(divide-and-conquer)算法

          Recur编程客栈siveAction:

          • 用于不返回结果的任务
          • 需要实现compute()方法
          • 示例中的AddTask只打印结果不返回

          RecursiveTask:

          • 用于需要返回结果的任务
          • 需要实现compute()方法并返回指定类型
          • 示例中的AddTaskRet返回子数组的和

          fork()和join():

          • fork(): 异步安排任务执行
          • join(): 等待任务完成并获取结果

          执行流程

          1. 初始化一个包含100万个随机数的数组
          2. 使用Stream API计算总和作为基准
          3. 创建ForkJoinPool
          4. 创建AddTaskRet任务,范围是整个数组
          5. 任务会根据MAX_NUM阈值(50000)不断分割,直到足够小
          6. 小任务直接计算子数组和
          7. 合并所有子任务的结果得到最终总和
          8. 打印结果(应与Stream API计算的结果一致)

          使用建议

          1. 对于计算密集型任务,ForkJoinPool通常比传统线程池更高效
          2. 任务分割的阈值需要合理设置,太小会导致过多任务创建开销,太大会降低并行度
          3. 有返回结果需求时使用RecursiveTask,否则使用RecursiveAction
          4. 注意join()是阻塞调用,会等待任务完成

          这段代码很好地展示了ForkJoin框架的分治思想和使用方法,是并行计算数组求和的经典示例。

          总结

          以上为个人经验,希望能给大家一个参考,也希望大家多多支持编程客栈(www.devze.com)。

          0

          精彩评论

          暂无评论...
          验证码 换一张
          取 消

          关注公众号