谷粒商城-分布式高级篇_03
谷粒商城高级篇:异步任务编排; 商品详情业务

前言

谷粒商城系列-分布式高级篇03

本文内容:

  1. 异步任务编排(completableFuture)
  2. 商品详情业务(仅记录后端关键代码)

异步任务编排(CompletableFuture)

介绍

FutureJava 5添加的类,用来描述一个异步计算的结果。你可以使用isDone方法检查计算是否完成,或者使用get阻塞住调用线程,直到计算完成返回结果,你也可以使用cancel方法停止任务的执行。

虽然Future以及相关使用方法提供了异步执行任务的能力,但是对于结果的获取却是很不方便,只能通过阻塞或者轮询的方式得到任务的结果。阻塞的方式显然和我们的异步编程的初衷相违背,轮询的方式又会耗费无谓的CPU资源,而且也不能及时地得到计算结果,为什么不能用观察者设计模式当计算结果完成及时通知监听者呢?

作为正统的Java类库,是不是应该做点什么,加强一下自身库的功能呢?

在Java 8中, 新增加了一个类: CompletableFuture,提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,提供了函数式编程的能力,可以通过回调的方式处理计算结果,并且提供了转换和组合CompletableFuture的方法。

实践

基本使用

CompletableFuture提供了四个静态方法来开启一个异步任务

1
2
3
4
5
6
7
8
// 开启一个异步任务且没有返回值
static CompletableFuture<Void> runAsync(Runnable runnable)
// 开启一个异步任务且没有返回值,可指定自定义的线程池对象
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
// 开启一个异步任务可接受返回值
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
// 开启一个异步任务可接受返回值,可指定自定义的线程池对象
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

示例:

  • 无返回值获取

    1
    2
    3
    4
    5
    6
    // 无返回值获取
    CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
    System.out.println("当前线程" + Thread.currentThread().getId());
    int i = 10 / 2;
    System.out.println("运行结束" + i);
    }, service);
  • 获取返回值

    1
    2
    3
    4
    5
    6
    7
    8
    // 获取返回值
    CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
    System.out.println("当前线程" + Thread.currentThread().getId());
    int i = 10 / 2;
    System.out.println("运行结束" + i);
    return i;
    }, service);
    System.out.println("结果: " + future1.get());

任务完成后动作

可在任务完成后继续动作或者捕获异常设置默认返回

1
2
3
4
5
6
7
8
9
10
11
12
13
// 执行当前任务的线程执行继续同步执行 whenComplete 的任务
public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action);
// 执行把 whenCompleteAsync 这个任务继续提交给线程池来进行执行
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action);
// 执行把 whenCompleteAsync 这个任务继续提交给线程池来进行执行,可指定自定义的线程池
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor);

// 执行当前任务的线程执行继续同步执行, 可改变上一步返回值
public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);
// 任务继续提交给线程池来进行执行, 可改变上一步返回值
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn);
// 任务继续提交给线程池来进行执行, 可改变上一步返回值,可指定自定义的线程池
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor);

示例:

  • 在当前线程同步执行

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
    System.out.println("当前线程" + Thread.currentThread().getId());
    int i = 10 / 2;
    System.out.println("运行结束" + i);
    return i;
    }, service).whenComplete((result , ex) -> {
    // 这里能获取异常信息 但是没法修改数据
    System.out.println("异步任务成功完成了... 结果:" + result);
    // 感知异常 给出默认结果
    }).exceptionally(ex -> 10);
  • 指定线程池来执行

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
    System.out.println("当前线程" + Thread.currentThread().getId());
    int i = 10 / 2;
    System.out.println("运行结束" + i);
    return i;
    }, service).whenCompleteAsync((result , ex) -> {
    // 这里能获取异常信息 但是没法修改数据
    System.out.println("异步任务成功完成了... 结果:" + result);
    // 感知异常 给出默认结果
    }, service).exceptionally(ex -> 10);
  • 在当前线程同步执行并修改其值(只能改,并不能返回其他类型的,比如第一步返回的是Integer的类型,后续返回的值也必须为Integer)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
    System.out.println("当前线程" + Thread.currentThread().getId());
    int i = 10 / 2;
    System.out.println("运行结束" + i);
    return i;
    }, service).handle((result, ex) -> {
    if(result != null){
    return result * 8;
    }
    if(ex != null){
    System.out.println("异常为:" + ex);
    return -1;
    }
    return 0;
    });
    System.out.println("main....end 结果:" + future.get());

线程串行化

在上一步任务的基础上继续进行任务,并且可以返回新的返回值(可返回不同类型的值,即新的任务返回的为全新的返回值)

1
2
3
4
5
6
7
8
9
10
11
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)

public CompletionStage<Void> thenAccept(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor);

public CompletionStage<Void> thenRun(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action,Executor executor);
  • thenApply:新的任务获取上一个任务返回的结果,并返回当前任务的返回值。
  • thenAccept:新的任务获取上一个任务返回的结果,进行相应处理但不返回新的返回值。
  • thenRun:在上一个任务执行完后执行,不能获取到上一个任务的返回值且不能产生新的返回值。

示例:

  • thenRun

    1
    2
    3
    4
    5
    6
    7
    8
    9
    CompletableFuture.supplyAsync(() -> {
    System.out.println("当前线程" + Thread.currentThread().getId());
    int i = 10 / 2;
    System.out.println("运行结束" + i);
    return i;
    }, service).thenRunAsync(() -> {
    // thenRunAsync 不能获取执行结果
    System.out.println("任务2启动了...");
    }, service);
  • thenAccept

    1
    2
    3
    4
    5
    6
    CompletableFuture.supplyAsync(() -> {
    System.out.println("当前线程" + Thread.currentThread().getId());
    int i = 10 / 2;
    System.out.println("运行结束" + i);
    return i;
    }, service).thenAcceptAsync(res -> System.out.println("thenAcceptAsync获取上一步执行结果:" + res));
  • thenApply

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    CompletableFuture<String> async = CompletableFuture.supplyAsync(() -> {
    System.out.println("当前线程" + Thread.currentThread().getId());
    int i = 10 / 2;
    System.out.println("运行结束" + i);
    return i;
    }, service).thenApplyAsync(res -> {
    System.out.println("任务2启动了...");
    try {
    Thread.sleep(2000);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    return "thenApplyAsync" + res;
    });
    System.out.println("thenApplyAsync获取结果:" + async.get());

双任务组合 -> 都完成

当有任务需要在另外两个任务都完成后才执行时,可使用该方式来编排任务

1
2
3
4
5
6
7
8
9
10
11
public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other,Runnable action);
public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action);
public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action,Executor executor);

public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);
public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);
public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action, Executor executor);

public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn, Executor executor);
  • runAfterBoth:组合两个Future,无法获取到两个Future的返回值,只在两个Future运行完成后处理下一个任务
  • thenAcceptBoth:组合两个Future,可以获取到两个Future的返回值,使用两个值处理下一个任务,但下一个任务无返回值
  • thenCombine:组合两个Future,可以获取到两个Future的返回值,使用两个值处理下一个任务且可以产生新的返回值

示例

需要先行创建两个Future

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 任务1
CompletableFuture<Object> async1 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务1线程" + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("任务1结束" + i);
return i;
}, service);
// 任务2
CompletableFuture<Object> async2 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务2线程" + Thread.currentThread().getId());
int i = 10 / 2;
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任务2结束" + i);
return "任务合并";
}, service);
  • runAfterBoth

    1
    2
    3
    async1.runAfterBothAsync(async2, () -> {
    System.out.println("任务1和2完成, 任务3开始...");
    }, service);
  • thenAcceptBoth

    1
    2
    3
    async1.thenAcceptBothAsync(async2, (res1, res2) -> {
    System.out.println("任务3开始... 任务1的结果:" + res1 + "任务2的结果:" + res2);
    }, service);
  • thenCombine

    1
    2
    CompletableFuture<String> async = async1.thenCombineAsync(async2, (res1, res2) -> res1 + ":" + res2 + "-> fire", service);
    System.out.println("自定义返回结果:" + async.get());

双任务组合 -> 任一完成

当有任务需要在另外两个任务任意一个任务完成后才执行时,可使用该方式来编排任务

1
2
3
4
5
6
7
8
9
10
11
public CompletableFuture<Void> runAfterEither(CompletionStage<?> other,Runnable action);
public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action);
public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action,Executor executor);

public CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action);
public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action);
public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action,Executor executor);

public <U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn);
public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn);
public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn,Executor executor);
  • runAfterEither:组合两个Future,无法获取到两个Future的返回值,任何一个Future运行完成后处理下一个任务
  • acceptEither:组合两个Future,可以获取到两个Future的返回值,使用任何一个任务的返回值处理下一个任务,但下一个任务无返回值
  • applyToEither:组合两个Future,可以获取到两个Future的返回值,使用任何一个任务的返回值处理下一个任务且可以产生新的返回值

示例

继续使用双任务组合中创建的两个Future

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 任务1
CompletableFuture<Object> async1 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务1线程" + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("任务1结束" + i);
return i;
}, service);
// 任务2
CompletableFuture<Object> async2 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务2线程" + Thread.currentThread().getId());
int i = 10 / 2;
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任务2结束" + i);
return "任务合并";
}, service);
  • runAfterEither

    1
    2
    3
    async1.runAfterEitherAsync(async2, () -> {
    System.out.println("任务3开始...");
    }, service);
  • acceptEither

    1
    2
    3
    async1.acceptEitherAsync(async2, (res) -> {
    System.out.println("任务3开始...之前的结果:" + res);
    }, service);
  • applyToEither

    1
    2
    3
    4
    5
    CompletableFuture<String> async = async1.applyToEitherAsync(async2, (res) -> {
    System.out.println("任务3开始...之前的结果:" + res);
    return res.toString() + "-> fire";
    }, service);
    System.out.println("任务3返回的结果:" + async.get());

多任务组合

当有任务需要在其他多个任务务完成后才执行时,可使用该方式来编排任务

1
2
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs);
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)
  • allOf:所有任务执行完毕才能继续
  • anyOf:任意一个任务执行完毕就可以继续并且可以使用get()方法获取到成功的任务的返回值

示例

先行创建三个任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
CompletableFuture<String> img = CompletableFuture.supplyAsync(() -> {
System.out.println("查询商品图片信息");
return "1.jpg";
}, service);

CompletableFuture<String> attr = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("查询商品属性");
return "麒麟990 5G 钛空银";
}, service);


CompletableFuture<String> desc = CompletableFuture.supplyAsync(() -> {
System.out.println("查询商品介绍");
return "华为";
}, service);
  • allOf

    1
    2
    3
    4
    CompletableFuture<Void> allOf = CompletableFuture.allOf(img, attr, desc);
    allOf.join();// 阻塞等待任务完成

    System.out.println("main....end" + desc.get() + attr.get() + img.get());
  • anyOf

    1
    2
    3
    4
    5
    CompletableFuture<Object> anyOf = CompletableFuture.anyOf(img, attr, desc);
    // 获取到完成的那个任务的返回值
    Object result = anyOf.get();

    System.out.println("main....end" + result);

商品详情

开始商品详情之前的操作:

  1. 配置nginx,放入item相关静态资源
  2. 配置gateway网关路由规则
  3. item.html放入templates目录(代码省略)

配置类

  1. 新增线程池自定义配置到application.yml

    1
    2
    3
    4
    5
    6
    7
    8
    gulimall:
    thread:
    # 核心线程数
    core-size: 20 # 20-50
    # 最大线程数
    max-size: 200
    # 空闲保活时间
    keep-alive-time: 10 # 10s
  2. 读取新增的线程池配置

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    package com.imxushuai.gulimall.product.config;

    import lombok.Data;
    import org.springframework.boot.context.properties.ConfigurationProperties;
    import org.springframework.stereotype.Component;

    @ConfigurationProperties(prefix = "gulimall.thread")
    @Data
    public class ThreadPoolConfigProperties {

    private Integer coreSize;

    private Integer maxSize;

    private Integer keepAliveTime;
    }
  3. 配置线程池

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    package com.imxushuai.gulimall.product.config;

    import org.springframework.boot.context.properties.EnableConfigurationProperties;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;

    import java.util.concurrent.Executors;
    import java.util.concurrent.LinkedBlockingDeque;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;

    /**
    * 配置线程池
    */
    @EnableConfigurationProperties(ThreadPoolConfigProperties.class)
    @Configuration
    public class MyThreadConfig {

    @Bean
    public ThreadPoolExecutor threadPoolExecutor(ThreadPoolConfigProperties threadPoolConfigProperties){

    return new ThreadPoolExecutor(threadPoolConfigProperties.getCoreSize(),
    threadPoolConfigProperties.getMaxSize(),
    threadPoolConfigProperties.getKeepAliveTime() ,TimeUnit.SECONDS,
    new LinkedBlockingDeque<>(10000), Executors.defaultThreadFactory(),
    new ThreadPoolExecutor.AbortPolicy());
    }
    }

核心代码

  1. ItemController

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    package com.imxushuai.gulimall.product.web;

    import com.imxushuai.gulimall.product.service.SkuInfoService;
    import com.imxushuai.gulimall.product.vo.SkuItemVo;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Controller;
    import org.springframework.ui.Model;
    import org.springframework.web.bind.annotation.PathVariable;
    import org.springframework.web.bind.annotation.RequestMapping;

    import java.util.concurrent.ExecutionException;

    @Controller
    public class ItemController {

    @Autowired
    private SkuInfoService skuInfoService;

    @RequestMapping("/{skuId}.html")
    public String skuItem(@PathVariable("skuId") Long skuId, Model model) throws ExecutionException, InterruptedException {
    SkuItemVo vo = skuInfoService.item(skuId);
    model.addAttribute("item", vo);
    return "item";
    }
    }
  2. SkuInfoServiceImpl

    这里我只放主方法,内部调用的其他方法就不贴代码了,需要自行查看源代码

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    /**
    * 查询页面详细内容
    */
    @Override
    public SkuItemVo item(Long skuId) throws ExecutionException, InterruptedException {
    SkuItemVo skuItemVo = new SkuItemVo();

    CompletableFuture<SkuInfoEntity> infoFutrue = CompletableFuture.supplyAsync(() -> {
    //1 sku基本信息
    SkuInfoEntity info = getById(skuId);
    skuItemVo.setInfo(info);
    return info;
    }, executor);
    // 无需获取返回值
    CompletableFuture<Void> imageFuture = CompletableFuture.runAsync(() -> {
    //2 sku图片信息
    List<SkuImagesEntity> images = imagesService.getImagesBySkuId(skuId);
    skuItemVo.setImages(images);
    }, executor);
    // 在1之后
    CompletableFuture<Void> saleAttrFuture = infoFutrue.thenAcceptAsync(res -> {
    //3 获取spu销售属性组合 list
    List<ItemSaleAttrVo> saleAttrVos = skuSaleAttrValueService.getSaleAttrsBuSpuId(res.getSpuId());
    skuItemVo.setSaleAttr(saleAttrVos);
    }, executor);
    // 在1之后
    CompletableFuture<Void> descFuture = infoFutrue.thenAcceptAsync(res -> {
    //4 获取spu介绍
    SpuInfoDescEntity spuInfo = spuInfoDescService.getById(res.getSpuId());
    skuItemVo.setDesc(spuInfo);
    }, executor);
    // 在1之后
    CompletableFuture<Void> baseAttrFuture = infoFutrue.thenAcceptAsync(res -> {
    //5 获取spu规格参数信息
    List<SpuItemAttrGroup> attrGroups = attrGroupService.getAttrGroupWithAttrsBySpuId(res.getSpuId(), res.getCatalogId());
    skuItemVo.setGroupAttrs(attrGroups);
    }, executor);

    // 等待所有任务都完成再返回
    CompletableFuture.allOf(imageFuture, saleAttrFuture, descFuture, baseAttrFuture).get();
    return skuItemVo;
    }
文章作者: imxushuai
文章链接: https://www.imxushuai.com/2022/01/05/49.谷粒商城-分布式高级篇-03/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 imxushuai
支付宝打赏
微信打赏