扫码阅读
手机扫码阅读

ForkJoinPool实践

128 2024-04-13

最近在看一本15年出版的《Java并发编程的艺术》一书,其中看到并发编程时间部分的ForkJoinPool功能时,突然发现这个功能实际使用上就是把一个大任务分成多个小的子任务,然后使用多个线程完成。

这个场景跟我之前写过的自定义 Java自定义异步功能实践有点异曲同工之妙,只不过这里有有个子任务的概念,多个任务执行结果是具有相关性的。资料指出ForkJoinPool比较适合计算密集型的任务。

性能测试中QPS取样器和RT取样器中,有这样一个使用场景,在用例执行过程中,我想了解一下当前用例执行的QPS和RT信息,就需要有个触发开关,开始收集这些数据,等某一个终止条件被触发,结束收集,然后计算结果。在用例QPS超过10万的情况下,单次收集的数据可能会超过100万,计算QPS和RT就非常适合ForkJoinPool来完成。

如果一直实时展示或者上报这些信息的话,也可以使用ForkJoinPool来完成计算功能。这里还有另外的方案来实现,如果只是得到QPS和RT数据的话,比ForkJoinPool更加合适,这里先不分享了。

ForkJoinPool API相比较ExecutorService还是比较简单的。主要的功能3个:创建任务的ForkJoinPool、创建任务分配规则和收集任务结果。

下面我以一个数组求和的Demo演示一下ForkJoinPool的功能。

首先我们需要定义一个ForkJoinPool,通常使用java.util.concurrent.ForkJoinPool#ForkJoinPool(int)或者java.util.concurrent.ForkJoinPool#commonPool这两个方法其中之一,如果你使用JDK 7及以前的版本,第二个API是不存在的。

翻看源码之后,看起来ForkJoinPool构造方法参数还是挺多的,如果都要自定义比较麻烦也是没多大必要的,所以我就选上面提到的第一种API来创建ForkJoinPool。

然后我们要创建一个任务类实现任务分配规则,首先继承java.util.concurrent.RecursiveTask实现java.util.concurrent.RecursiveTask#compute方法。

拆分任务的思路如下:使用两个int属性,标记List中需要求和片段索引。这样每次分配任务的时候,只需要改变索引值即可。将一个很长的List求和分成N个小片段求和。

类代码设计如下:

import com.funtester.frame.SourceCode import groovy.util.logging.Log4j2 import java.util.concurrent.ForkJoinPool import java.util.concurrent.RecursiveTask @Log4j2 class ForkJoinT extends RecursiveTask<Integer> { static def data = 1..100 as List int start int end ForkJoinT(int start, int end) { this.start = start this.end = end
    } @Override protected Integer compute() { if (end - start < 5) {
            sum(start, end)
        } else {
            def middle = ((start + end) / 2) as int def left = new ForkJoinT(start, middle)
            def right = new ForkJoinT(middle + 1, end)
            left.fork()
            right.fork()
            left.join() + right.join()
        }
    } /**
     * 片段求和
     * @param i
     * @param k
     * @return */ static def sum(int i, int k) {
        SourceCode.range(i, k + 1).map(data::get).sum()
    }
} 

总体感觉java.util.concurrent.RecursiveTask#compute方法写起来有点像递归,思路明确了以后还是很流畅的。

先来个高斯求和,下面是测试代码:

 static void main(String[] args) {
        def pool = new ForkJoinPool(5)
        def t = new ForkJoinT(0, data.size() - 1)
        pool.submit(t)
        log.info("sum: {}", t.get())
    } 

控制台输出:

22:30:42.725 main sum: 5050 

性能方面等我先研究一波JMH之后再来。

原文链接: http://mp.weixin.qq.com/s?__biz=MzU4MTE2NDEyMQ==&mid=2247498940&idx=1&sn=3ef379075845f2f88d9010b4fa593794&chksm=fd49718aca3ef89cc6079776d34b7fb37acd05a484175967d0d7c3d1ee85d68870e1d1d2a004#rd