使用RabbitMQ完成商品数据同步
商品增删改时,同步更新索引库、静态页等。
业务分析
发送方 - 商品微服务
接收方 - 搜索微服务、商品详情页微服务
发送方 - 商品微服务
准备工作
引入依赖
1 2 3 4
| <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
|
配置application.yml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| spring: rabbitmq: host: 192.168.136.103 username: leyou password: leyou virtual-host: /leyou template: retry: enabled: true initial-interval: 10000ms max-interval: 60000ms multiplier: 2 exchange: leyou.item.exchange publisher-confirms: true
|
配置说明
- template:有关
AmqpTemplate
的配置
- retry:失败重试
- enabled:开启失败重试
- initial-interval:第一次重试的间隔时长
- max-interval:最长重试间隔,超过这个间隔将不再重试
- multiplier:下次重试间隔的倍数,此处是2即下次重试间隔是上次的2倍
- exchange:缺省的交换机名称,此处配置后,发送消息如果不指定交换机就会使用这个
- publisher-confirms:生产者确认机制,确保消息会正确发送,如果发送失败会有错误回执,从而触发重试
注意:
1. 该配置是放在 `spring`配置下的。新增配置时注意层级。
2. `rabbitmq`中的`leyou`账号,需要提前创建好。
统一定义RoutingKey
我这里把QueueName
和ExchangeName
都定义在了Common工程中的常量类LeyouConstants
中。
LeyouConstants
新增常量
1 2 3 4 5 6 7
|
public static final String QUEUE_INSERT_ITEM = "ly.item.insert"; public static final String QUEUE_UPDATE_ITEM = "ly.item.update"; public static final String QUEUE_DELETE_ITEM = "ly.item.delete"; public static final String EXCHANGE_DEFAULT_ITEM = "ly.item.exchange";
|
GoodsService新增逻辑
在GoodsService
中的增删改中新增发送消息逻辑
消息发送方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14
|
private void sendMessage(Long id, String routingKey) { try { amqpTemplate.convertAndSend(routingKey, id); } catch (Exception e) { log.error("[{}]商品消息发送异常,商品id:{}", routingKey, id, e); } }
|
吞掉异常,防止发送消息失败影响到正常的流程。
在增删改方法中新增逻辑
在增删改方法中,调用发送消息的方法
接收方
准备工作
准备工作的内容,需要分别在搜索微服务和商品详情页微服务各做一次。
引入依赖
1 2 3 4
| <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
|
配置application.yml
1 2 3 4 5 6 7 8 9 10
| spring: # RabbitMQ配置 rabbitmq: # 主机地址 host: 192.168.136.103 # 用户名、密码 username: leyou password: leyou # 虚拟主机 virtual-host: /leyou
|
搜索微服务
保存和删除索引
由于索引的新增和更新为同一个方法,所以只需要在SearchService
中新增保存索引和删除索引的方法。
保存索引
1 2 3 4 5 6 7 8 9 10 11 12 13
|
public void saveIndex(Long id) { SpuBO spuBO = goodsClient.queryGoodsById(id); Goods goods = buildGoods(spuBO); goodsRepository.save(goods); }
|
删除索引
1 2 3 4 5 6 7 8
|
public void deleteIndex(Long id) { goodsRepository.deleteById(id); }
|
接收并处理消息
编写一个类,在其中定义两个方法,一个方法用于接收新增或更新商品的消息,另一个用于接收删除商品的消息。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
| package com.leyou.search.listener;
import com.leyou.common.util.LeyouConstants; import com.leyou.search.service.SearchService; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;
@Component public class GoodsListener {
@Autowired private SearchService searchService;
@RabbitListener(bindings = @QueueBinding( value = @Queue(name = LeyouConstants.QUEUE_SAVE_SEARCH, durable = "true"), exchange = @Exchange(name = LeyouConstants.EXCHANGE_DEFAULT_ITEM), key = {LeyouConstants.QUEUE_INSERT_ITEM, LeyouConstants.QUEUE_UPDATE_ITEM} )) public void saveIndex(Long id) { if (id != null) searchService.saveIndex(id); }
@RabbitListener(bindings = @QueueBinding( value = @Queue(name = LeyouConstants.QUEUE_DELETE_SEARCH, durable = "true"), exchange = @Exchange(name = LeyouConstants.EXCHANGE_DEFAULT_ITEM), key = LeyouConstants.QUEUE_DELETE_ITEM )) public void deleteIndex(Long id) { if (id != null) searchService.deleteIndex(id); }
}
|
商品详情页微服务
由于新增和修改商品详情页的方法已经在之前写好了,只需要新增删除商品详情页的方法。
删除商品详情页
1 2 3 4 5 6 7 8 9 10 11 12
|
public void deleteHtml(Long id) { File file = new File(pagePath + "/" + id + ".html"); if (file.exists()) { file.delete(); } }
|
接收并处理消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
| package com.leyou.listener;
import com.leyou.common.util.LeyouConstants; import com.leyou.page.service.PageService; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;
@Component public class GoodsListener {
@Autowired private PageService pageService;
@RabbitListener(bindings = @QueueBinding( value = @Queue(name = LeyouConstants.QUEUE_SAVE_PAGE, durable = "true"), exchange = @Exchange(name = LeyouConstants.EXCHANGE_DEFAULT_ITEM), key = {LeyouConstants.QUEUE_INSERT_ITEM, LeyouConstants.QUEUE_UPDATE_ITEM} )) public void savePage(Long id) { if (id != null) pageService.asyncExcute(id); }
@RabbitListener(bindings = @QueueBinding( value = @Queue(name = LeyouConstants.QUEUE_DELETE_PAGE, durable = "true"), exchange = @Exchange(name = LeyouConstants.EXCHANGE_DEFAULT_ITEM), key = LeyouConstants.QUEUE_DELETE_ITEM )) public void deleteIndex(Long id) { if (id != null) pageService.deleteHtml(id); }
}
|