前言
谷粒商城系列-分布式高级篇03
本文内容:
- 异步任务编排(completableFuture)
- 商品详情业务(仅记录后端关键代码)
异步任务编排(CompletableFuture)
介绍
Future
是Java 5
添加的类,用来描述一个异步计算的结果。你可以使用isDone
方法检查计算是否完成,或者使用get阻塞住调用线程,直到计算完成返回结果,你也可以使用cancel
方法停止任务的执行。
虽然Future
以及相关使用方法提供了异步执行任务的能力,但是对于结果的获取却是很不方便,只能通过阻塞或者轮询的方式得到任务的结果。阻塞的方式显然和我们的异步编程的初衷相违背,轮询的方式又会耗费无谓的CPU资源,而且也不能及时地得到计算结果,为什么不能用观察者设计模式当计算结果完成及时通知监听者呢?
作为正统的Java类库,是不是应该做点什么,加强一下自身库的功能呢?
在Java 8中, 新增加了一个类: CompletableFuture
,提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,提供了函数式编程的能力,可以通过回调的方式处理计算结果,并且提供了转换和组合CompletableFuture
的方法。
实践
基本使用
CompletableFuture提供了四个静态方法来开启一个异步任务
1 | // 开启一个异步任务且没有返回值 |
示例:
无返回值获取
1
2
3
4
5
6// 无返回值获取
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
System.out.println("当前线程" + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("运行结束" + i);
}, service);获取返回值
1
2
3
4
5
6
7
8// 获取返回值
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
System.out.println("当前线程" + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("运行结束" + i);
return i;
}, service);
System.out.println("结果: " + future1.get());
任务完成后动作
可在任务完成后继续动作或者捕获异常设置默认返回
1 | // 执行当前任务的线程执行继续同步执行 whenComplete 的任务 |
示例:
在当前线程同步执行
1
2
3
4
5
6
7
8
9
10CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
System.out.println("当前线程" + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("运行结束" + i);
return i;
}, service).whenComplete((result , ex) -> {
// 这里能获取异常信息 但是没法修改数据
System.out.println("异步任务成功完成了... 结果:" + result);
// 感知异常 给出默认结果
}).exceptionally(ex -> 10);指定线程池来执行
1
2
3
4
5
6
7
8
9
10CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
System.out.println("当前线程" + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("运行结束" + i);
return i;
}, service).whenCompleteAsync((result , ex) -> {
// 这里能获取异常信息 但是没法修改数据
System.out.println("异步任务成功完成了... 结果:" + result);
// 感知异常 给出默认结果
}, service).exceptionally(ex -> 10);在当前线程同步执行并修改其值(只能改,并不能返回其他类型的,比如第一步返回的是Integer的类型,后续返回的值也必须为Integer)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
System.out.println("当前线程" + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("运行结束" + i);
return i;
}, service).handle((result, ex) -> {
if(result != null){
return result * 8;
}
if(ex != null){
System.out.println("异常为:" + ex);
return -1;
}
return 0;
});
System.out.println("main....end 结果:" + future.get());
线程串行化
在上一步任务的基础上继续进行任务,并且可以返回新的返回值(可返回不同类型的值,即新的任务返回的为全新的返回值)
1 | public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) |
- thenApply:新的任务获取上一个任务返回的结果,并返回当前任务的返回值。
- thenAccept:新的任务获取上一个任务返回的结果,进行相应处理但不返回新的返回值。
- thenRun:在上一个任务执行完后执行,不能获取到上一个任务的返回值且不能产生新的返回值。
示例:
thenRun
1
2
3
4
5
6
7
8
9CompletableFuture.supplyAsync(() -> {
System.out.println("当前线程" + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("运行结束" + i);
return i;
}, service).thenRunAsync(() -> {
// thenRunAsync 不能获取执行结果
System.out.println("任务2启动了...");
}, service);thenAccept
1
2
3
4
5
6CompletableFuture.supplyAsync(() -> {
System.out.println("当前线程" + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("运行结束" + i);
return i;
}, service).thenAcceptAsync(res -> System.out.println("thenAcceptAsync获取上一步执行结果:" + res));thenApply
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15CompletableFuture<String> async = CompletableFuture.supplyAsync(() -> {
System.out.println("当前线程" + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("运行结束" + i);
return i;
}, service).thenApplyAsync(res -> {
System.out.println("任务2启动了...");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "thenApplyAsync" + res;
});
System.out.println("thenApplyAsync获取结果:" + async.get());
双任务组合 -> 都完成
当有任务需要在另外两个任务都完成后才执行时,可使用该方式来编排任务
1 | public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other,Runnable action); |
- runAfterBoth:组合两个
Future
,无法获取到两个Future
的返回值,只在两个Future运行完成后处理下一个任务 - thenAcceptBoth:组合两个
Future
,可以获取到两个Future
的返回值,使用两个值处理下一个任务,但下一个任务无返回值 - thenCombine:组合两个
Future
,可以获取到两个Future
的返回值,使用两个值处理下一个任务且可以产生新的返回值
示例
需要先行创建两个Future
1 | // 任务1 |
runAfterBoth
1
2
3async1.runAfterBothAsync(async2, () -> {
System.out.println("任务1和2完成, 任务3开始...");
}, service);thenAcceptBoth
1
2
3async1.thenAcceptBothAsync(async2, (res1, res2) -> {
System.out.println("任务3开始... 任务1的结果:" + res1 + "任务2的结果:" + res2);
}, service);thenCombine
1
2CompletableFuture<String> async = async1.thenCombineAsync(async2, (res1, res2) -> res1 + ":" + res2 + "-> fire", service);
System.out.println("自定义返回结果:" + async.get());
双任务组合 -> 任一完成
当有任务需要在另外两个任务任意一个任务完成后才执行时,可使用该方式来编排任务
1 | public CompletableFuture<Void> runAfterEither(CompletionStage<?> other,Runnable action); |
- runAfterEither:组合两个
Future
,无法获取到两个Future
的返回值,任何一个Future运行完成后处理下一个任务 - acceptEither:组合两个
Future
,可以获取到两个Future
的返回值,使用任何一个任务的返回值处理下一个任务,但下一个任务无返回值 - applyToEither:组合两个
Future
,可以获取到两个Future
的返回值,使用任何一个任务的返回值处理下一个任务且可以产生新的返回值
示例
继续使用双任务组合中创建的两个Future
1 | // 任务1 |
runAfterEither
1
2
3async1.runAfterEitherAsync(async2, () -> {
System.out.println("任务3开始...");
}, service);acceptEither
1
2
3async1.acceptEitherAsync(async2, (res) -> {
System.out.println("任务3开始...之前的结果:" + res);
}, service);applyToEither
1
2
3
4
5CompletableFuture<String> async = async1.applyToEitherAsync(async2, (res) -> {
System.out.println("任务3开始...之前的结果:" + res);
return res.toString() + "-> fire";
}, service);
System.out.println("任务3返回的结果:" + async.get());
多任务组合
当有任务需要在其他多个任务务完成后才执行时,可使用该方式来编排任务
1 | public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs); |
- allOf:所有任务执行完毕才能继续
- anyOf:任意一个任务执行完毕就可以继续并且可以使用
get()
方法获取到成功的任务的返回值
示例
先行创建三个任务
1 | CompletableFuture<String> img = CompletableFuture.supplyAsync(() -> { |
allOf
1
2
3
4CompletableFuture<Void> allOf = CompletableFuture.allOf(img, attr, desc);
allOf.join();// 阻塞等待任务完成
System.out.println("main....end" + desc.get() + attr.get() + img.get());anyOf
1
2
3
4
5CompletableFuture<Object> anyOf = CompletableFuture.anyOf(img, attr, desc);
// 获取到完成的那个任务的返回值
Object result = anyOf.get();
System.out.println("main....end" + result);
商品详情
开始商品详情之前的操作:
- 配置nginx,放入item相关静态资源
- 配置gateway网关路由规则
- item.html放入templates目录(代码省略)
配置类
新增线程池自定义配置到
application.yml
1
2
3
4
5
6
7
8gulimall:
thread:
# 核心线程数
core-size: 20 # 20-50
# 最大线程数
max-size: 200
# 空闲保活时间
keep-alive-time: 10 # 10s读取新增的线程池配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16package com.imxushuai.gulimall.product.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
"gulimall.thread") (prefix =
public class ThreadPoolConfigProperties {
private Integer coreSize;
private Integer maxSize;
private Integer keepAliveTime;
}配置线程池
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28package com.imxushuai.gulimall.product.config;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* 配置线程池
*/
(ThreadPoolConfigProperties.class)
public class MyThreadConfig {
public ThreadPoolExecutor threadPoolExecutor(ThreadPoolConfigProperties threadPoolConfigProperties){
return new ThreadPoolExecutor(threadPoolConfigProperties.getCoreSize(),
threadPoolConfigProperties.getMaxSize(),
threadPoolConfigProperties.getKeepAliveTime() ,TimeUnit.SECONDS,
new LinkedBlockingDeque<>(10000), Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
}
}
核心代码
ItemController
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25package com.imxushuai.gulimall.product.web;
import com.imxushuai.gulimall.product.service.SkuInfoService;
import com.imxushuai.gulimall.product.vo.SkuItemVo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.ui.Model;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import java.util.concurrent.ExecutionException;
public class ItemController {
private SkuInfoService skuInfoService;
"/{skuId}.html") (
public String skuItem(@PathVariable("skuId") Long skuId, Model model) throws ExecutionException, InterruptedException {
SkuItemVo vo = skuInfoService.item(skuId);
model.addAttribute("item", vo);
return "item";
}
}SkuInfoServiceImpl
这里我只放主方法,内部调用的其他方法就不贴代码了,需要自行查看源代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42/**
* 查询页面详细内容
*/
public SkuItemVo item(Long skuId) throws ExecutionException, InterruptedException {
SkuItemVo skuItemVo = new SkuItemVo();
CompletableFuture<SkuInfoEntity> infoFutrue = CompletableFuture.supplyAsync(() -> {
//1 sku基本信息
SkuInfoEntity info = getById(skuId);
skuItemVo.setInfo(info);
return info;
}, executor);
// 无需获取返回值
CompletableFuture<Void> imageFuture = CompletableFuture.runAsync(() -> {
//2 sku图片信息
List<SkuImagesEntity> images = imagesService.getImagesBySkuId(skuId);
skuItemVo.setImages(images);
}, executor);
// 在1之后
CompletableFuture<Void> saleAttrFuture = infoFutrue.thenAcceptAsync(res -> {
//3 获取spu销售属性组合 list
List<ItemSaleAttrVo> saleAttrVos = skuSaleAttrValueService.getSaleAttrsBuSpuId(res.getSpuId());
skuItemVo.setSaleAttr(saleAttrVos);
}, executor);
// 在1之后
CompletableFuture<Void> descFuture = infoFutrue.thenAcceptAsync(res -> {
//4 获取spu介绍
SpuInfoDescEntity spuInfo = spuInfoDescService.getById(res.getSpuId());
skuItemVo.setDesc(spuInfo);
}, executor);
// 在1之后
CompletableFuture<Void> baseAttrFuture = infoFutrue.thenAcceptAsync(res -> {
//5 获取spu规格参数信息
List<SpuItemAttrGroup> attrGroups = attrGroupService.getAttrGroupWithAttrsBySpuId(res.getSpuId(), res.getCatalogId());
skuItemVo.setGroupAttrs(attrGroups);
}, executor);
// 等待所有任务都完成再返回
CompletableFuture.allOf(imageFuture, saleAttrFuture, descFuture, baseAttrFuture).get();
return skuItemVo;
}