JUC(七)分支合并框架

2023-07-12,,

JUC分支合并框架

简介

Fork/Join可以将一个大的任务拆分成多个子任务进行并行处理,最后将子任务的结果合并称为最终的计算结果。

Fork:负责将任务拆分
Join:合并拆分任务
ForkJoinPool:分支合并池,添加分支合并任务,使用get得到计算结果

案例一:实现1+...+100,要求计算过程不能有相差超过10的数字的相加

class MyTask extends RecursiveTask<Integer> {
private final Integer VALUE = 10;
private Integer begin;
private Integer end;
MyTask(int begin, int end) {
this.begin = begin;
this.end = end;
}
@Override
public Integer compute() {
int result = 0;
if(end - begin >= VALUE) {
int mid = (end + begin) / 2;
MyTask myTask1 = new MyTask(begin, mid);
MyTask myTask2 = new MyTask(mid + 1, end);
myTask1.fork();
myTask2.fork(); result += myTask1.join() + myTask2.join();
} else {
for(int i = begin; i <= end; i++) {
result += i;
}
}
return result;
}
} public class ForkJoinTaskDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
MyTask task = new MyTask(1, 100);
ForkJoinPool forkJoinPool = new ForkJoinPool();
ForkJoinTask<Integer> submit = forkJoinPool.submit(task);
Integer integer = submit.get();
forkJoinPool.shutdown();
System.out.println(integer);
}
}

案例二:实现分支计算斐波那契数列

class Fi extends RecursiveTask<Integer>{
private final int n;
Fi(int n) {
this.n = n;
}
@Override
protected Integer compute() {
if(n == 0 || n ==1) {
return 1;
}
Fi f1 = new Fi(n-1);
Fi f2 = new Fi(n-2);
f1.fork();
f2.fork(); return f1.join() + f2.join();
}
} public class ForkJoinTaskDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Fi fi = new Fi(5);
ForkJoinPool pool = new ForkJoinPool();
Integer integer = pool.submit(fi).get();
System.out.println(integer);
}
}

JUC CompletableFuture异步回调

两个异步方法:

CompletableFuture.runAsync:不带返回值的异步方法
CompletableFuture.supplyAsync:携带返回值的异步方法

public static CompletableFuture<Void> runAsync(Runnable runnable) {
return asyncRunStage(ASYNC_POOL, runnable);
} /**
* Returns a new CompletableFuture that is asynchronously completed
* by a task running in the given executor after it runs the given
* action.
*
* @param runnable the action to run before completing the
* returned CompletableFuture
* @param executor the executor to use for asynchronous execution
* @return the new CompletableFuture
*/
public static CompletableFuture<Void> runAsync(Runnable runnable,
Executor executor) {
return asyncRunStage(screenExecutor(executor), runnable);
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> {
System.out.println(Thread.currentThread().getName());
});
future1.get(); CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName());
int i = 1 / 0;
return 1024;
});
future2.whenComplete((t, u) -> {
System.out.println(t);
System.out.println(u);
});
}

whenComplete获取返回值的时候,第一个参数为值,第二个参数为异步产生的异常

ForkJoinPool.commonPool-worker-1

ForkJoinPool.commonPool-worker-1

null

java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero

JUC(七)分支合并框架的相关教程结束。

《JUC(七)分支合并框架.doc》

下载本文的Word格式文档,以方便收藏与打印。