项目:商户点评项目

梳理一下此项目的内容和逻辑,很多代码细节可以去对应的笔记章节复习

一、短信登录

给用户发送登录验证码和登录凭证,记住用户登录状态,使用token刷新拦截器和登录验证拦截器进行登录状态检查,保护特定需登录使用的资源。并且将验证码和token都存到redis中,解决了集群session共享问题。

1. 隐藏用户敏感信息

一开始Session里存取的都是完整的User,包括密码等属性,一方面这是敏感信息,不应该在HTTP里传输,另一方面多余的信息也会带来额外的内存消耗,因此可以建立UserDTO,仅使用必要的信息完成数据传输。后面的user都是使用DTO对象

1
2
3
4
5
6
@Data
public class UserDTO {
private Long id;
private String nickName;
private String icon;
}

2. 基于Session实现登录

一开始是把验证码和token都保存到session里,逻辑什么都很简单,往session里存也只要调用下面代码即可

session.setAttribute("code",code)session.setAttribute("user",user)

拦截器校验登陆状态,每次从session里取出用户信息,为了确保获取当前访问的用户,拦截器需要将每个用户信息传递到Controller里,这里选择将用户保存到 ThreadLocal ,保证线程的安全问题(每收到一个http请求,服务端就会新开一个线程来处理,每个线程有自己的 ThreadLocal )

1
2
3
4
5
private static final ThreadLocal<UserDTO> tl = new ThreadLocal<>();

public static void saveUser(UserDTO user){
tl.set(user);
}
image-20220614130627614

3. 基于Redis实现共享session登录

session共享问题:多台Tomcat并不共享session存储空间,由于负载均衡,集群会把请求切换到不同的tomcat服务,这时其他的tomcat没有之前的session,导致数据丢失。可以使用Redis作为替代方案

Redis代替session需要考虑的问题:为value选择合适的Redis数据结构;选择合适的key方便存取;选择合适的存储粒度

验证码的key-value:

  • 选择合适的key:key显然不能是之前session里的”code”了,因为每个session里有自己的code,现在如果再用code作为key,显然不能满足不同请求的需求,这里可以将手机号作为key。
  • 选择合适的value:由于验证码就是6位数字,所以value可以选择String类型
1
stringRedisTemplate.opsForValue().set(LOGIN_CODE_KEY + phone, code, LOGIN_CODE_TTL, TimeUnit.MINUTES);

token的key-value:

  • 选择合适的key:用一个随机字符串 token,作为key。之前基于session时,用户每次请求是携带cookie的,里面有sessionID,这样就能用session里的用户信息进行判断,这是tomcat帮我们维护的。现在不用session了,得去redis里取出用户信息,这里就可以根据 token 取出,即把 token 作为用户的登陆凭证,取代之前的cookie。那么如何让用户每次请求时都带上 token 呢,这里需要我们在保存用户信息到Redis之后,不能结束流程,而是要将 token 通过响应信息返回给客户端,以便于客户端以后每次请求时都带上这个token。这里也是不将手机号作为key的原因,在响应中放这种敏感信息是不合理的。
  • 选择合适的value:保存一个对象,Redis通常有两种数据结构可选,一是String结构,以JSON字符串来保存,比较直观。二是Hash结构,它可以将对象中的每个字段独立存储,可以针对单个字段做CRUD,并且内存占用更少(JSON里有不必要的符号)。一般如果数据较少的话,两个都行,数据较多从优化的角度,Hash结构更适合。此处我们选择Hash结构

image-20220615125514215

保存对象到redis里时需要将User对象先转为UserDTO再转为HashMap存储到redis里,都是用的hutool包下BeanUtil里的工具。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 7.保存用户信息到 redis中
// 7.1.随机生成token,作为登录令牌。UUID是工具类,toStirng如果给参数true表示不生成带下划线的随机字符串
String token = UUID.randomUUID().toString(true);
// 7.2.将User对象先转为UserDTO再转为HashMap存储到redis里,都是用的BeanUtil里的工具。这里由于UserDTO有long型的id,转换时需要自定义规则
UserDTO userDTO = BeanUtil.copyProperties(user, UserDTO.class);
Map<String, Object> userMap = BeanUtil.beanToMap(userDTO, new HashMap<>(),
CopyOptions.create() // 自定义规则
.setIgnoreNullValue(true) // 忽略空的值(UserDTO里有icon属性暂时还没管)
.setFieldValueEditor((fieldName, fieldValue) -> fieldValue.toString())); // 将所有的属性都转成String类型
// 7.3.存储
String tokenKey = LOGIN_USER_KEY + token;
stringRedisTemplate.opsForHash().putAll(tokenKey, userMap);
// 7.4.设置token有效期,模仿session设置为30min
stringRedisTemplate.expire(tokenKey, LOGIN_USER_TTL, TimeUnit.MINUTES);

// 8.返回token给客户端,这里很重要!不返回给客户端,下次请求就没办法携带token访问了
return Result.ok(token);

这里关于token有效期有一个问题是,expire这个函数是设定死了30min之后失效,不管这期间有没有继续访问请求,但实际上的逻辑应该是如果用户继续不断访问,则应该不断更新有效期,只有用户完全不访问了,再等30min才失去登录状态。这里的逻辑写在后续的拦截器中

拦截器部分自然也要做出更改,这里就不能是从session里取token了,而是从redis里取,再与请求头里的token对比。如果用户身份没问题的话,还需要刷新其token的有效期

image-20220615164223203
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
@Component
public class LoginInterceptor implements HandlerInterceptor {

private StringRedisTemplate stringRedisTemplate;

public LoginInterceptor(StringRedisTemplate stringRedisTemplate) {
this.stringRedisTemplate = stringRedisTemplate;
}


/**
* 进入controller之前进行登录校验
* @param request
* @param response
* @param handler
* @return
* @throws Exception
*/
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
// 1.获取请求头中的token
String token = request.getHeader("authorization"); // 前端代码里这样写的
if (StrUtil.isBlank(token)) {
// token为空,拦截,返回401状态码
response.setStatus(401);
return false;
}
// 2.基于TOKEN获取redis中的用户
String key = LOGIN_USER_KEY + token;
Map<Object, Object> userMap = stringRedisTemplate.opsForHash().entries(key);
// 3.判断用户是否存在
if (userMap.isEmpty()) {
// 4.用户不存在,拦截,返回401状态码
response.setStatus(401);
return false;
}
// 5.将查询到的hash数据转为UserDTO
UserDTO userDTO = BeanUtil.fillBeanWithMap(userMap, new UserDTO(), false);
// 6.存在,保存用户信息到 ThreadLocal
UserHolder.saveUser(userDTO);
// 7.刷新token有效期
stringRedisTemplate.expire(key, LOGIN_USER_TTL, TimeUnit.MINUTES);
// 8.放行
return true;
}

/**
* 业务执行完毕销毁对应的用户信息,避免threadlocal内存泄漏
* @param request
* @param response
* @param handler
* @param ex
* @throws Exception
*/
@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
// 移除用户
UserHolder.removeUser();
}
}

注意,在拦截器类 LoginInterceptor 注入 StringRedisTemplate可能会失败,参考本站笔记:《问题解决-springboot拦截器无法注入StringRedisTemplate》

4. 优化拦截器登陆验证

之前的拦截器里给token写死了30分钟的TTL,但其实有些网页请求是不用拦截的,比如商户信息。现在假设一个用户登录了之后浏览了30分钟的商户信息,这30分钟没有用到拦截器,token就不会刷新,然后 token 就过期了,用户得重新登录,这显然是不合理的,因此我们需要对拦截器做出优化。可以新增加一个拦截器拦截一切路径,在这个拦截器里负责获取、查询、更新 token,原先的 LoginInterceptor 拦截器只负责需要登录的路径,即查询用户是否存在,逻辑图如下,这样就可以解决前面提到的问题。

image-20220615222048131

新添加拦截器 RefreshTokenInterceptor,注意此处是拦截一切请求,所以查不到 token 也没事,放行就可以。查到用户就放入ThreadLocal,查不到就放行。需要登录的路径放到原先的 LoginInterceptor 拦截器去判断ThreadLocal是否有用户

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class RefreshTokenInterceptor implements HandlerInterceptor {

private StringRedisTemplate stringRedisTemplate;

public RefreshTokenInterceptor(StringRedisTemplate stringRedisTemplate) {
this.stringRedisTemplate = stringRedisTemplate;
}

// 拦截一切请求,获取、查询、更新 token
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
// 1.获取请求头中的token
String token = request.getHeader("authorization");
if (StrUtil.isBlank(token)) {
return true; // 注意这里的变化,此处是拦截一切请求,所以查不到token也没事,直接放行
}
// 2.基于TOKEN获取redis中的用户
String key = LOGIN_USER_KEY + token;
Map<Object, Object> userMap = stringRedisTemplate.opsForHash().entries(key);
// 3.判断用户是否存在
if (userMap.isEmpty()) {
return true; // 用户不存在,也直接放行
}
// 5.将查询到的hash数据转为UserDTO
UserDTO userDTO = BeanUtil.fillBeanWithMap(userMap, new UserDTO(), false);
// 6.存在,保存用户信息到 ThreadLocal
UserHolder.saveUser(userDTO);
// 7.刷新token有效期
stringRedisTemplate.expire(key, LOGIN_USER_TTL, TimeUnit.MINUTES);
// 8.放行
return true;
}

// 业务执行完毕销毁对应的用户信息,避免内存泄漏
@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
// 移除用户
UserHolder.removeUser();
}
}

更新 LoginInterceptor,只需要去ThreadLocal中查询是否有用户

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Component
public class LoginInterceptor implements HandlerInterceptor {
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
// 1.判断是否需要拦截(ThreadLocal中是否有用户)
if (UserHolder.getUser() == null) {
// 没有,需要拦截,设置状态码
response.setStatus(401);
// 拦截
return false;
}
// 有用户,则放行
return true;
}
}

更新拦截器配置类,注意添加拦截器 registry.addInterceptor() 时,用 order() 方法指定顺序,确保 token刷新拦截器比登录验证拦截器先执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Configuration
public class MvcConfig implements WebMvcConfigurer {

@Resource
private StringRedisTemplate stringRedisTemplate;

@Override
public void addInterceptors(InterceptorRegistry registry) {
// token刷新拦截器
registry.addInterceptor(new RefreshTokenInterceptor(stringRedisTemplate)).addPathPatterns("/**").order(0);
// 登录验证拦截器
registry.addInterceptor(new LoginInterceptor())
.excludePathPatterns(
"/shop/**",
"/voucher/**",
"/shop-type/**",
"/upload/**",
"/blog/hot",
"/user/code",
"/user/login"
).order(1);
}
}

二、商铺信息缓存

1. 添加缓存

用户请求商铺信息第一次会去DB里取,然后会添加进redis,作为缓存

  • redis的key:店铺id
  • redis的value:String类型,商铺对象转为JSON字符串

2. 缓存更新

缓存更新策略的最佳实践方案:

  • 低一致性需求:使用Redis自带的内存淘汰机制
  • 高一致性需求:主动更新,采取旁路缓存策略,先更新DB,再删除cache,并以超时剔除作为兜底方案
  • 强一致性需求:主动更新,采取旁路缓存策略,先更新DB,再删除cache,并加锁避免线程安全问题,并以超时剔除作为兜底方案

后两者读写操作需要注意:

  • 读操作:缓存命中则直接返回,缓存未命中则查询数据库,并写入缓存,设定超时时间
  • 写操作:先更新数据库,然后再删除缓存,要确保数据库与缓存操作的原子性

因此相比添加缓存,这里变动两个需求:

  • 根据id查询店铺时,如果缓存未命中,则查询数据库,将数据库结果写入缓存,变化就是要设置一个超时时间,作为缓存更新的超时剔除兜底方案,ShopServiceImpl.java

    1
    2
    3
    4
    5
    @Override
    public Result queryById(Long id) {
    // 6.存在,写入redis,并设置超时时间
    stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(shop), CACHE_SHOP_TTL, TimeUnit.MINUTES); // 超时时间:30分钟
    }
  • 根据id修改店铺时,先修改数据库,再删除缓存,redis理论篇已经分析过。注意要加上事务注解,ShopServiceImpl.java

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    @Override
    @Transactional // 声明式事务
    public Result update(Shop shop) {
    Long id = shop.getId();
    if (id == null) {
    return Result.fail("店铺id不能为空");
    }
    // 1.更新数据库,mybatis
    updateById(shop);
    // 2.删除缓存
    stringRedisTemplate.delete(CACHE_SHOP_KEY + id);
    return Result.ok();
    }

3. 解决缓存穿透

采用缓存空对象的方案来解决缓存穿透,逻辑上做出对应变更,如下图所示

image-20220621231354482

也就是在查询DB之后,改动商铺是否存在的判断,如果不存在,不仅仅返回商铺信息不存在,还要把空值写入redis,并设置较短超时时间

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
@Override
public Result queryById(Long id) {
// redis的key就用店铺id
String key = CACHE_SHOP_KEY + id;
// 1.从redis查询商铺缓存(这里value结构用hash也行,视频演示的是string)
String shopJson = stringRedisTemplate.opsForValue().get(key);
// 2.判断是否存在
if(StrUtil.isNotBlank((shopJson))){
// 3.存在,直接返回
Shop shop = JSONUtil.toBean(shopJson, Shop.class);
return Result.ok(shop);
}
// 判断命中的是否是空值。不等于空就只能是空值了,因为上面的isNotBlank()方法是认定
// 只有字符串有内容才返回true,比如“abc”。其他类似NULL(为空),""(空值)返回都是false
// 或者直接写 if ("".equals(shopJson))
if (shopJson != null){
// 返回一个错误信息
return Result.fail("店铺不存在");
}

// 4.不存在,提供id查询数据库
Shop shop = getById(id);
// 5.不存在,返回错误
if(shop == null){
// 解决缓存穿透:将空值写入redis,设置较短超时时间:2分钟
stringRedisTemplate.opsForValue().set(key, "", CACHE_NULL_TTL, TimeUnit.MINUTES);
// 返回错误信息
return Result.fail("店铺不存在");
}
// 6.存在,写入redis,并设置超时时间:30分钟
stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(shop), CACHE_SHOP_TTL, TimeUnit.MINUTES);
// 7.返回
return Result.ok(shop);
}

4. 解决缓存雪崩

缓存雪崩是指在同一时段大量的缓存key同时失效或者Redis服务宕机,导致大量请求到达数据库,带来巨大压力。

解决方案:给不同的Key的TTL添加随机值(很多热点数据是提前存进redis里的,设置随机的超时时间)

其他方案:

  • 利用Redis集群提高服务的可用性
  • 给缓存业务添加降级限流策略
  • 给业务添加多级缓存

5. 解决缓存击穿

缓存击穿问题也叫热点Key问题(不同于缓存雪崩是大部分key失效,缓存击穿只是部分key,但这部分key很重要,是热点内容),就是一个被高并发访问并且缓存重建业务较复杂(有时需要多张表查询并进行复杂运算)的key突然失效了,无数的请求访问(因为它是热点key,有大量请求会在缓存重建过程中不断访问)会在瞬间给数据库带来巨大的冲击。

解决方案:逻辑过期–不设置真正的过期时间(TTL),让key永久生效。但我们又怎么知道到底什么时候缓存会失效呢,这里可以把时效的时间(当前时间+有效期)写到value里,让他只是逻辑上的过期时间。但其实这个key是没有TTL的,再搭配上合适的内存淘汰策略,理论上来讲是可以永远能查到的,不会存在未命中的情况。

当用户开始查询redis时,判断是否命中,如果没有命中则直接返回空数据,不查询数据库,而一旦命中后,将value取出,判断value中的过期时间是否满足,如果没有过期,则直接返回redis中的数据。如果过期,则判断获取锁(同样使用setnx)是否成功,成功则开启独立线程去重构缓存,自己本线程直接返回旧值,如果获取锁没成功,则表示现在锁被别人获取了,还是直接返回旧值。独立线程重构缓存完成后释放互斥锁。

image-20220709115209330

新建一个实体类,对原来代码没有侵入性。放到工具包里

1
2
3
4
5
@Data
public class RedisData {
private LocalDateTime expireTime;
private Object data;
}

ShopServiceImpl,严格按照上面的逻辑图编写

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public Shop queryWithLogicalExpire( Long id ) {
String key = CACHE_SHOP_KEY + id;
// 1.从redis查询商铺缓存
String json = stringRedisTemplate.opsForValue().get(key);
// 2.判断是否存在
if (StrUtil.isBlank(json)) {
// 3.存在,直接返回
return null;
}
// 4.命中,需要先把json反序列化为对象
RedisData redisData = JSONUtil.toBean(json, RedisData.class);
Shop shop = JSONUtil.toBean((JSONObject) redisData.getData(), Shop.class);
LocalDateTime expireTime = redisData.getExpireTime();
// 5.判断是否过期
if(expireTime.isAfter(LocalDateTime.now())) {
// 5.1.未过期,直接返回店铺信息
return shop;
}
// 5.2.已过期,需要缓存重建
// 6.缓存重建
// 6.1.获取互斥锁
String lockKey = LOCK_SHOP_KEY + id;
boolean isLock = tryLock(lockKey);
// 6.2.判断是否获取锁成功
if (isLock){
CACHE_REBUILD_EXECUTOR.submit( ()->{
try{
// 重建缓存,这里实际应该设置为30min,方便测试设置成了10s
this.saveShop2Redis(id,10L);
}catch (Exception e){
throw new RuntimeException(e);
}finally {
unlock(lockKey);
}
});
}
// 6.4.未获取锁成功,或者获取锁之后,都直接返回过期的商铺信息。这里只有独立线程会更新缓存
return shop;
}

private boolean tryLock(String key) {
Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10, TimeUnit.SECONDS);
return BooleanUtil.isTrue(flag);
}

private void unlock(String key) {
stringRedisTemplate.delete(key);
}

这样改写后,对应的 queryById 直接调用逻辑过期方法即可。测试前先预热缓存

测试的时候我们应该测试如下内容:

  • 商户信息变更之后,redis缓存能否更新成功
  • 缓存一致性问题,前几个先来的线程查询到的仍然是旧值,缓存更新后,访问的才是新的值
  • 多线程访问同一个店铺的场景下,是否只进行一次缓存重建。

6. 封装缓存工具

基于StringRedisTemplate封装一个缓存工具类,满足下列需求:

  • 方法1:将任意Java对象序列化为json并存储在string类型的key中,并且可以设置TTL过期时间
  • 方法2:将任意Java对象序列化为json并存储在string类型的key中,并且可以设置逻辑过期时间,用于处理缓

存击穿问题

  • 方法3:根据指定类型的key查询缓存,并反序列化为指定类型,利用缓存空值的方式解决缓存穿透问题
  • 方法4:根据指定类型的key查询缓存,并反序列化为指定类型,需要利用逻辑过期解决缓存击穿问题

方法1和3是对应普通缓存,并解决缓存穿透的;方法2和4是对应解决缓存击穿的

核心内容:泛型的使用、函数式编程、lamda表达式的使用,看下面这一个就大概能理解

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// 根据指定类型的key查询缓存,并反序列化为指定类型,利用缓存空值的方式解决缓存穿透问题
// 之前返回类型固定为店铺,现在使用泛型R,参数里加上Class<R> type,表示手动传入对象类型,进行泛型推断
// keyPrefix是redis里的前缀,前缀+id,才是redis里的key。ID也不一定为Long型,所以也使用泛型
public <R,ID> R queryWithPassThrough(String keyPrefix, ID id, Class<R> type, Function<ID, R> dbFallback, Long time, TimeUnit unit){
String key = keyPrefix + id;
// 1.从redis查询对象类型的缓存
String json = stringRedisTemplate.opsForValue().get(key);
// 2.判断是否存在
if (StrUtil.isNotBlank(json)) {
// 3.存在,直接返回。返回的类型不再是店铺,而是传进来的type
return JSONUtil.toBean(json, type);
}
// 判断命中的是否是空值。这里很容易忘记,查到的缓存只有三种情况:查到(正常情况);空值(“”);为null
// 所以不等于空就只能是空值了,上面的isNotBlank()方法是认定只有字符串有内容才返回true,比如“abc”。其他类似NULL(为空),""(空值)返回都是false
// 或者直接写 if ("".equals(shopJson))
if (json != null) {
// 返回一个错误信息
return null;
}

// 4.缓存不存在,根据id查询数据库。这里需要格外关注,使用到了函数式编程,不同对象的数据库查询方法自然不同,这里既然不知道是什么对象类型,只能把问题抛给调用者
// 所以我们在参数里加上函数,有参(参数类型ID)有返回值(返回值类型R),起名dbFallback,apply方法就是调用这个函数,并把id作为参数传进去
R r = dbFallback.apply(id);
// 5.DB里也不存在,返回错误
if (r == null) {
// 将空值写入redis,避免缓存穿透。TTL为2min
stringRedisTemplate.opsForValue().set(key, "", CACHE_NULL_TTL, TimeUnit.MINUTES);
// 返回错误信息
return null;
}
// 6.DB里存在,写入redis,调用了前面的普通设置缓存的方法,还需要参数时间和单位
this.set(key, r, time, unit);
return r;
}

ShopServiceImpl中更新方法,指定对象类型和函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Resource
private CacheClient cacheClient;

@Override
public Result queryById(Long id) {
// 调用缓存工具类内的方法 queryWithPassThrough 解决缓存穿透
// 参数:店铺前缀,店铺id,店铺类型对象,根据id查询DB里店铺的方法,时间,时间单位
// 这里的方法引用this::getById是lamda表达式的简写,实际为:id -> getById(id),因为这里是在ShopServiceImpl里,继承了ShopMapper,所以知道getById是查询店铺类型
// 这里的逻辑过期时间设置为30min
Shop shop = cacheClient.queryWithPassThrough(CACHE_SHOP_KEY, id, Shop.class, this::getById, CACHE_SHOP_TTL, TimeUnit.MINUTES);


if (shop == null){
return Result.fail("店铺不存在");
}
return Result.ok(shop);
}

封装之后能够让各个对象都是用缓存工具类里的方法,添加、更新缓存,以及使用缓存穿透、击穿等解决方案

三、优惠券秒杀

1. 全局唯一ID

核心点:符号位(1bit) + 时间戳(31bit,秒为单位)+ 序列号(32bit,组成:”icr:” + 业务前缀+日期)

每个店铺都可以发布优惠券,当用户抢购时,就会生成订单并保存到 tb_voucher_order 这张表中,而订单表如果使用数据库自增ID就存在一些问题:

  • id的规律性太明显(比如今天下载ID是10,明天下单的ID是100,那么在这个过程中就销售了90单,我们不希望用户能掌握这些信息)

  • 受单表数据量的限制(订单的数据量非常的大,可能达到数千万或者数亿级,单表放不下那么多的数据就会拆分,但如果是自增长的话拆分的表的ID就会重复,不能保证 id 的唯一性)

全局ID生成器,是一种在分布式系统下用来生成全局唯一ID的工具,一般要满足下列特性:唯一性、高可用(随时都可以生成)、高性能、递增性(整体上还是呈现递增趋势,提高索引效率)、安全性(不能规律性太明显)。Redis是符合这些特性要求的选择之一,其中安全性稍微复杂

为了增加ID的安全性,我们可以不直接使用Redis自增的数值,而是拼接一些其它信息:

image-20220624122904025

ID的组成部分:

  • 符号位:1bit,永远为0
  • 时间戳:31bit,以秒为单位,可以使用69年
  • 序列号:32bit,秒内的计数器,支持每秒产生2^32个不同ID

其中序列号的组成又比较复杂,首先能想到的是用"icr:" + keyPrefix,icr:只是一个标识,表明这是全局自增id,keyPrefix 是业务前缀,订单业务有订单业务的全局自增id,其他业务也有其他的全局自增id

但如果只用这个是不行的,因为这样业务只有一个Key,随着时间的增长,ID会越来越多,而我们的序列号只有32位,但是订单量可能会超过2的32次方。所以为了避免这种情况,我们可以在后面拼接上一个日期,以天为单位,这样每天就能有不同的key。比如:icr:order:2022:01:01就表示2022年1月1号这一天的key。 Key里面的每一个value是自增长的。注意这里不要弄混,redis里面只保存自增长之后的值,然后给他返回,并不是把每个生成的ID都保存在redis里面,而是保存到数据库里。

先生成时间戳,再生成序列号,拼接时:先左移32位再把低32位或上

1
2
// timestamp时间戳  COUNT_BITS:32位  count:序列号
return timestamp << COUNT_BITS | count;

其他一些全局唯一ID生成策略:

  • UUID(生成的是16进制的数值,返回结果是字符串结构,也不是单调递增的,不太友好)
  • snowflake算法(雪花算法,不依赖于redis,也是一种不错的策略)
  • 数据库自增(不是自增id了,而是单独用一张表维护id。这张表的主属性id是自增的,然后去维护订单表id,性能可能没redis好)

2. 添加优惠券

核心点:秒杀优惠券是优惠券表的扩展,关联优惠券表的id

优惠券表关系如下:

  • tb_voucher:优惠券的基本信息,优惠金额、使用规则等

    image-20220624182518805

  • tb_seckill_voucher:优惠券的库存、开始抢购时间,结束抢购时间。特价优惠券才需要填写这些信息(需要注意的是秒杀券也是优惠券,所以它的主健是关联的优惠券的ID,可以理解为秒杀券是优惠券的扩展表)

    image-20220624182600891

正因为秒杀券是优惠券的扩展表,所以优惠券实体在设计的时候,把秒杀券对应的信息也封装到了实体里面,包含了库存、生效时间、创建时间等。具体的可以去实体类Voucher里查询

随后可以利用postman添加优惠券信息,具体代码此处略

3. 实现优惠券下单

核心点:下单的逻辑是判断优惠券时间和库存符合条件

订单表:tb_voucher_order,具体的字段如下所示

image-20220712105159566

当用户开始进行下单,我们应当去查询优惠卷信息,查询到优惠卷信息,判断是否满足秒杀条件

比如时间是否充足,如果时间充足,则进一步判断库存是否足够,如果两者都满足,则扣减库存,创建订单,然后返回订单id,如果有一个条件不满足则直接结束。其业务逻辑如下图所示

image-20220624195624101

4. 乐观锁解决超卖问题

核心点:超卖问题是多个线程对库存同时进行修改,悲观锁性能差,使用乐观锁即CAS的操作能够解决,即多个线程都是先查询库存,然后再进入一个判断逻辑,再查一次库存,看是不是跟之前查的一样,如果一样则代表没人动过,不一样那就不能动,这就是cas的思想。但是这里cas的成功率会很低,假设100个线程同时来查询,按照cas的思想,只会有一个线程能成功,其他99个请求都失败。针对于超卖问题,这里不用严格的比对数据库前后查询到的值是否一致,而是只要在更新数据库操作之前,查到的库存值大于0就可以下订单并对数据库更新,小于0则不能下订单,这样就能够解决超卖问题。

超卖问题是由于多线程并发导致的,假设现在库存为1,有一个线程想查询库存,查询到为1想要对库存进行扣减,在扣减完成之前又有其他线程进来查询发现库存为1,随后也进行扣减库存的操作。这样就会导致多线程的并发问题。

针对这一问题的常见解决方案就是加锁:

  • 悲观锁:认为线程安全问题一定会发生,因此在操作数据之前先获取锁,确保线程串行执行,因此性能很差。例如Synchronized、Lock都属于悲观锁。由于悲观锁的代码比较常见,所以就不做过多的演示。
  • 乐观锁:认为线程安全问题不一定会发生,因此不加锁,只是在更新数据时去判断有没有其它线程对数据做了修改。性能较好
    • 如果没有修改则认为是安全的,自己才更新数据。
    • 如果已经被其它线程修改说明发生了安全问题,此时可以重试或异常。

乐观锁最常见的就是CAS操作,即先比较一开始查询到的库存和要更新数据库之前查询到的库存是否相同?如果相同则进行更新,如果不同则不更新数据库。

1
2
3
boolean success = seckillVoucherService.update()
.setSql("stock= stock -1") //set stock = stock -1
.eq("voucher_id", voucherId).eq("stock",voucher.getStock()).update(); //where id = ? and stock = ?

不过cas操作无法解决ABA的问题,我们可以添加版本号来解决,这里就不过多详细介绍。由于是库存的应用场景,到目前为止没有出现退款的业务,所以不会出现ABA的问题,这里就只演示cas操作,不添加版本号

这里还存在一个问题,就是如果严格按照比对数据库的值是否一致,cas的成功率会很低。假设100个线程同时来查询,按照cas的思想,只会有一个线程能成功,其他99个请求都失败。视频里也演示了多线程模拟的场景,发现只卖出去21单。因此这里不用严格的比对数据库前后查询到的值是否一致,而是只要在更新数据库操作之前,查到的库存值大于0就可以下订单并对数据库更新,小于0则不能下订单,这样就能够解决超卖问题。细节如下图所示,一开始库存为1

image-20220625183331283

对代码做出如下修改

1
2
3
4
5
6
7
8
9
// 5.扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock = stock - 1") // set stock = stock - 1
.eq("voucher_id", voucherId).gt("stock", 0) // where id =? and stock >0
.update();
if (!success) {
// 扣减失败
return Result.fail("库存不足!");
}

5. 悲观锁解决一人一单问题

核心点:基本逻辑是判断库存充足之后,根据用户券ID和用户ID去查询订单,订单存在返回异常结果,提示用户无法再下单。如果不存在,才进行原来的扣减库存和创建订单操作。但先进行查询操作再去更新数据库,会存在线程安全问题,比如同一个人用多个线程下单。乐观锁比较适合更新数据,而现在是插入订单数据,所以我们需要使用悲观锁操作(CAS只能在更新数据时使用,一开始查一次,更新之前查一次,对比并设置值。这里是在插入数据,是为每个用户新增一个订单,只能判断是否存在,没有办法去查询原来的值,再比较更新)。

悲观锁能让一人下多单变成一人只能下一单,悲观锁的范围演变:在整个方法上加锁(锁住了this,多线程变成串行化) –> 同步方法块加锁,对每个用户进行加锁,因为每个用户的ID是不同的,所以给 userid 对象加锁(同一个用户使用多个线程传进来的userid 对象仍然是不同的) –> 所以我们不应该对 userId 对象进行加锁,应该对 userId 的值进行加锁,可以将其转为字符串,给 userid 字符串对象加锁(这里用了toString()方法,其实并没有转成唯一的字符串,因为 toString()方法的底层也还是 new 了字符串对象,也就是说我们这里依然是给100个对象加锁,而不是给对象的固定值加锁) –> 给 userid 字符串入池对象加锁(调用字符串的intern()方法,即让字符串入池,这个方法调用之后,new 的100个字符串对象,通过 intern()方法返回的对象,永远是字符串池里的那个唯一的对象)

另外关于事务失效:事务要想生效,是 this拿到了事务的代理对象,如果不是代理对象,spring的事务就会失效。后续再补充

我们要确保每一个用户只能对一张秒杀券下一次单,实现一人一单的业务逻辑其实非常简单,只需要在判断库存是否充足之后,根据用户券ID和用户ID去查询订单,如果这个订单存在,那么就返回异常结果,提示用户无法再下单。如果不存在,才进行原来的扣减库存和创建订单操作。其业务逻辑如下图所示

image-20220625205612612

1
2
3
4
5
6
7
8
9
// 5.一人一单
Long userId = UserHolder.getUser().getId();
// 5.1 查询订单
int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
// 5.2 判断是否存在
if (count > 0) {
// 用户已经购买过了
return Result.fail("用户已经购买过一次!");
}

但和之前超卖问题一样,都是先进行查询操作再去更新数据库,还是有并发安全问题,线程运行逻辑图–正常情况与异常情况:

**乐观锁比较适合更新数据,而现在是插入数据,所以我们需要使用悲观锁操作**。接下来就是决定悲观锁的范围,显然不能加在方法上,方法上加锁就把业务实现类的 this 锁住了,有多个用户下订单,只有这个用户处理完了才能处理其他用户,会造成串行化执行,不能并发处理,大大降低性能。比如ABC等200人发了请求,只能等A处理完了,才处理其他人的请求

这里应该加在同步代码块上,我们可以对每个用户进行加锁,因为每个用户的ID是不同的,对这个用户加锁不影响其他的线程里其他的用户下单,这样能够真正的实现并发处理。这里对用户加锁需要注意,相同用户的多个请求传过来的 userId 值虽然是一样的,但是每次请求获得的 userId 都是不同的对象,这就导致其实没有加锁成功,所以我们不应该对 userId 对象进行加锁,应该对 userId 的值进行加锁,可以将其转为字符串,调用 toString()方法,但toString()方法其实并没有转成唯一的字符串,因为 toString()方法的底层也还是 new 了字符串对象,也就是说我们这里依然是给100个对象加锁,而不是给对象的固定值加锁,所以要调用字符串的intern()方法,即让字符串入池,这个方法调用之后,new 的100个字符串对象,通过 intern()方法返回的对象,永远是字符串池里的那个唯一的对象

这里有个建议,最好不要在事务的方法里加局部锁,因为局部锁以外的代码没被锁住,方法自身涉及到事务(数据库的事务没有提交之前就有其他线程在此时访问了这个方法,局部锁以外的代码会导致多线程的并发问题)。

所以最好把锁加在整个方法的调用语句外,这样在调用方法完全结束之后才结束同步代码块,结束之前,事务肯定已经提交过了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Override
public Result seckillVoucher(Long voucherId) {
...
// 注意这里
Long userId = UserHolder.getUser().getId();
synchronized (userId.toString().intern()) {
return createVoucherOrder(voucherId);
}
}

@Transactional
public Result createVoucherOrder(Long voucherId) {
...
}

6. 分布式锁

关于集群模拟,把spring的service复制了一份,然后部署在两个端口上,在配置nginx负载均衡。集群(分布式)模式下,可能会出现并发安全问题,因为分布式模式之后就会有多个 JVM,不能把每一个用户的user ID的值给锁住,解决这个问题的技术叫分布式锁,即让多个jvm只有一个锁监视器,这样就能够把值给锁住。常用的方案是使用zookeeper或者redis。redis里的key是唯一的,解决多个jvm的问题

自己实现的redis作为分布式锁的方案里主要有两个方法:

  • 获得锁,使用setnx的命令
    • 为了防止redis宕机使key一直存在,还需要给 key 设定一个超时时间,到期自动删除,避免引起死锁。这两个操作要具备原子性,即让这两个操作在一条命令中执行完成。例如 SET lock thread1 NX EX 10 # 添加锁 NX是互斥 EX是设置超时时间
    • 这里是用的非阻塞式锁,尝试一次,成功返回true,失败返回false。(因为阻塞式锁有点浪费CPU性能,实现起来也比较麻烦)。
    • 在设置key的名称时,需要加上线程标识,因为防止分布式场景下锁被错误释放(线程1获得了redis 锁,然后他的业务阻塞了,此时锁释放会有两种情况,第1种情况是阻塞恢复,它继续完成业务然后释放锁,第2种情况是超时然后自动的释放锁,当第2种情况发生后,线程2获得了锁,但线程1业务阻塞恢复了,也直接释放锁,不管是不是自己的线程),线程标识组成:随机字符(多个集群里可能会出现相同的线程号)+线程ID
  • 释放锁,包括手动释放(删除之前比较一下是不是自己的线程标识,是的话删除key)和超时释放(过期时间)
    • 还有一种极端环境:在判断锁标识是否一致之后,释放锁之前,线程1进入阻塞状态,如果这个阻塞时间很长就会触发超时释放锁,另一个线程获取锁之后,如果线程1此时阻塞结束,会直接释放锁,因为在之前它已经判断过了。所以要保证 判断锁标识 和 释放锁 这两个操作具备原子性。这里使用lua脚本,在一个脚本中编写多条Redis命令,确保多条命令执行时的原子性

自己实现简单的分布式锁,即基于setnx实现的分布式锁存在下面的问题:

  • 不可重入:同一个线程无法多次获取同一把锁
  • 不可重试:获取锁只尝试一次就返回false,没有重试机制
  • 超时释放:锁超时释放虽然可以避免死锁,但如果是业务执行耗时较长,也会导致锁释放,存在安全隐患
  • 主从一致性:如果Redis提供了主从集群,主从同步存在延迟,当主宕机时,如果从同步主中的锁数据,则会出现问题

改进方法为使用Redisson的分布式锁,也是分为获取锁、尝试创建锁、释放锁三个步骤,没太大区别

通过加锁可以解决在单机情况下的一人一单安全问题,但是在集群模式下就不行了。用了集群模式之后就会有多个 tomcat 的服务器,也就会有多个 JVM ,那么每个 JVM 都有自己的堆、栈、方法区、常量池,所以我们之前认为把每一个用户的user ID的值给锁住,在多个JVM里面就不现实了,因为每个JVM的常量池都可以有一个这个用户的 userID 值。这就是在集群模式(分布式模式)下带来新的并发线程安全问题。解决这个问题的技术叫分布式锁。

分布式锁:满足分布式系统或集群模式下多进程可见并且互斥的锁。我们尝试让多个jvm只有一个锁监视器,这样就能够把值给锁住。这里主要探讨 redis 作为分布式锁的方案

其实现思路为:

  • 利用set nx ex获取锁,并设置过期时间,保存线程标示
  • 释放锁时先判断线程标示是否与自己一致,一致则删除锁(防止一个线程把别的线程的锁给删了)并且要保证 判断锁标识 和 释放锁 这两个操作具备原子性,可以使用Lua脚本实现
    • 特性:
      • 利用set nx满足互斥性
      • 利用set ex保证故障时锁依然能释放,避免死锁,提高安全性
      • 利用Redis集群保证高可用和高并发特性

(一路走来,利用添加过期时间,防止死锁问题的发生,但是有了过期时间之后,可能出现误删别人锁的问题,这个问题我们开始是利用删之前 通过拿锁,比锁,删锁这个逻辑来解决的,也就是删之前判断一下当前这把锁是否是属于自己的,但是现在还有原子性问题,也就是我们没法保证拿锁比锁删锁是一个原子性的动作,最后通过lua表达式来解决这个问题)

自己实现的锁 SimpleRedisLock.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class SimpleRedisLock implements ILock{
private String name; // 锁名字
private StringRedisTemplate stringRedisTemplate;

public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {
this.name = name;
this.stringRedisTemplate = stringRedisTemplate;
}

private static final String KEY_PREFIX = "lock:";
// 线程标识,不仅仅用线程ID,因为多个集群里可能会出现相同的线程号,所以这里再拼上一段随机字符。isSimple设为true表示不要加下划线
private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";

@Override
public boolean tryLock(long timeoutSec) {
// 获取线程标识,作为值,拼上随机字符串
String threadId = ID_PREFIX + Thread.currentThread().getId();
// 获取锁,setIfAbsent就是类似setnx指令
Boolean success = stringRedisTemplate.opsForValue()
.setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS);
return Boolean.TRUE.equals(success); // 这样写是为了防止包装类有null值,还拆箱
}

@Override
public void unlock() {
// 获取线程标识
String threadId = ID_PREFIX + Thread.currentThread().getId();
// 获取锁中的标识
String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);
if(threadId.equals(id)){
// 释放锁
stringRedisTemplate.delete(KEY_PREFIX + name);
}
}
}

resourse文件夹下添加文件 unlock.lua

1
2
3
4
5
6
7
-- 这里的 KEYS[1] 就是锁的key,这里的ARGV[1] 就是当前线程标示
-- 比较线程标示与锁中的标示是否一致
if(redis.call('get', KEYS[1]) == ARGV[1]) then
-- 释放锁 del key
return redis.call('del', KEYS[1])
end
return 0

去 SimpleRedisLock 更改释放锁的代码,需要先创建一个redis脚本的对象,DefaultRedisScript

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 静态代码块加载lua脚本
private static final DefaultRedisScript<Long> UNLOCK_SCRIPT; // 定义redis脚本变量
static { // 放到静态代码块里,这样类加载时就能够加载好脚本文件,而不是等每次要调用时候才加载
UNLOCK_SCRIPT = new DefaultRedisScript<>();
UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
UNLOCK_SCRIPT.setResultType(Long.class);
}
...

@Override
public void unlock() {
// 调用lua脚本,使用的是stringRedisTemplate.execute()方法
stringRedisTemplate.execute()(
UNLOCK_SCRIPT,
Collections.singletonList(KEY_PREFIX + name),
ID_PREFIX + Thread.currentThread().getId());
}

以上的更改就保证了多条redis命令的原子性

7. 秒杀优化

之前服务器里整个业务的流程(查询优惠卷、判断秒杀库存是否足够、查询订单、校验是否是一人一单、扣减库存、创建订单)是串行执行,并且多次涉及到去DB里查询和更新,无疑是性能很低的操作。因为DB支撑并发的能力很低,所以我们需要想办法优化这个问题。将不涉及DB的部分单独拿出来处理,即判断秒杀库存和校验一人一单环节。

我们将耗时比较短的逻辑判断放入到redis中,比如是否库存足够,比如是否一人一单,这样的操作,只要这种逻辑可以完成,就意味着我们是一定可以下单完成的,我们只需要进行快速的逻辑判断,根本就不用等下单逻辑走完,我们直接给用户返回成功, 再在后台开一个线程,后台线程慢慢的去执行queue里边的消息。

所以秒杀业务优化的思路是:

(1)先利用Redis完成库存余量、一人一单判断,完成抢单业务

(2)再将下单业务放入阻塞队列,利用独立线程异步下单

这个就和我们平常生活中下单的逻辑很接近了,都是先下单,然后跳出一个付款的页面(这部分跳出页面和让人付款的时间就是一段缓冲时间,让DB的压力没那么大),并且也可以暂时不付款,下单成功之后,有一段时间限制,在这段时间内付款就可以。

如何用Redis完成库存余量、一人一单的判断?

自然要利用缓存,将DB里的库存信息缓存到redis里,用普通的 string 结构就行,key就是优惠券订单,值就是 库存余量。判断库存是否充足只需要导redis中去根据key找对应的value是否大于0即可,如果不充足,则直接结束,如果充足,继续在redis中判断用户是否可以下单

一人一单的业务就需要用到 set 结构,将下单的用户都放到 set里,就能保证一个人只能下一单,key 还是优惠券订单,值就是下单的用户id。如果set集合中没有这条数据,说明他可以下单,如果set集合中没有这条记录,则将userId和优惠卷存入到redis中,并且返回0。

并且我们要保证整个过程的原子性,所以要放到 lua 脚本里,根据 Lua 脚本返回的标识来判断下一步如何处理,返回1或者2的异常信息,就不用处理,直接结束,返回0才表示要真正下单,我们将下单业务所需要的优惠券id,用户id,和订单id 放入阻塞队列(后面改为消息队列),利用独立线程异步下单。其完整的流程为:

image-20220629160741070

基于Lua脚本,判断秒杀库存、一人一单,决定用户是否抢购成功,逻辑上不难,就是熟悉这门脚本语言的问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
-- 1.参数列表
-- 1.1.优惠券id
local voucherId = ARGV[1]
-- 1.2.用户id
local userId = ARGV[2]

-- 2.数据key
-- 2.1.库存key,lua里拼接使用两点,不是用加号
local stockKey = 'seckill:stock:' .. voucherId
-- 2.2.订单key
local orderKey = 'seckill:order:' .. voucherId

-- 3.脚本业务
-- 3.1.判断库存是否充足 get stockKey
-- redis.call出来的是字符,所以先tonumber转成数字再比较
if(tonumber(redis.call('get', stockKey)) <= 0) then
-- 3.2.库存不足,返回1
return 1
end
-- 3.2.判断用户是否下单 SISMEMBER orderKey userId
-- SISMEMBER是redis命令,判断是否是一个集合的成员
if(redis.call('sismember', orderKey, userId) == 1) then
-- 3.3.存在,说明是重复下单,返回2
return 2
end
-- 3.4.扣库存 incrby stockKey -1
redis.call('incrby', stockKey, -1)
-- 3.5.下单(保存用户)sadd orderKey userId
redis.call('sadd', orderKey, userId)

return 0

然后更新下单业务的代码,seckillVoucher。注意redis里用到了两个id ,一个是查询秒杀券id,一个是放了 下过单的用户id,订单id 可没有放到redis里,暂时是直接返回给前端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Override
public Result seckillVoucher(Long voucherId) {
Long userId = UserHolder.getUser().getId();

// 1.执行lua脚本,使用的是stringRedisTemplate.execute()方法
Long result = stringRedisTemplate.execute(
SECKILL_SCRIPT,
Collections.emptyList(), // 这里不用传redis的key,所以传一个空列表,不能传null
voucherId.toString(), userId.toString() // 这里传其他参数(即非key的参数),以字符串形式
);
int r = result.intValue(); // 转成int型

// 2.判断结果是否为0
if (r != 0) {
// 2.1.不为0,代表没有购买资格
return Result.fail(r == 1 ? "库存不足" : "不能重复下单");
}
// 2.2.为0,返回订单id,全局ID生成器
long orderId = redisIdWorker.nextId("order");

// 3.返回订单id
return Result.ok(orderId);
}

后面的优化环节使用消息队列

8. 消息队列实现秒杀优化

基于Stream的消息队列-消费者组

消费者组(Consumer Group):将多个消费者划分到一个组中,监听同一个队列。具备下列特点:

  • 消息分流:队列中的消息会分流给组内的不同消费者,而不是重复消费,从而加快消息处理的速度。一定程度上避免了消息堆积的问题,如果我们就是想让多个消费者同时消费,那么就创建多个消费者组
  • 消息标示:消费者组会维护一个标示,记录最后一个被处理的消息,哪怕消费者宕机重启,还会从标示之后读取消息。确保每一个消息都会被消费。避免了前文单消费者模式下消息漏读的风险
  • 消息确认:消费者获取消息后,消息处于pending状态,并存入一个pending-list。当处理完成后需要通过XACK来确认消息,标记消息为已处理,才会从pending-list移除。确保消息都被处理,解决消息丢失的问题。

由此可见基于消费者组的消息队列,功能更加强大,基本符合我们的需求。

java消费者监听消息的基本思路:(伪代码)

死循环在正常情况下读取下一个未消费的消息,出现异常(即消息处理完之后没有返回ACK确认)那就再来一个死循环去处理pending-list中已消费但未确认的消息,处理完了再回到正常流程的死循环

image-20220705114036683

STREAM类型消息队列的XREADGROUP命令特点:消息可回溯;可以多消费者争抢消息,加快消费速度;可以阻塞读取;没有消息漏读的风险;有消息确认机制;保证消息至少被消费一次

缺点:STREAM类型消息队列持久化是基于redis持久化的,不能保证万无一失,有丢失风险;不支持生产者消息确认,如果生产者生成消息过程中宕机,无法解决;多消费者下的消息有序性,即事务管理。

如果对于消息队列业务要求没那么严格,STREAM类型消息队列就够用了,但如果要求很严格,STREAM类型消息队列还是存在上述的一些问题,就需要更加专业的第三方消息中间件

(1)创建一个Stream类型的消息队列,名为stream.orders,这就不用Java代码了,直接去redis控制台创建

1
xgroup create stream.orders g1 0 mkstream

(2)修改之前的秒杀下单Lua脚本,在认定有抢购资格后,直接向stream.orders中添加消息,内容包含voucherId、userId、orderId

1
2
3
4
5
6
-- 1.3.订单id
local orderId = ARGV[3]

-- 3.6.发送消息到队列中, XADD stream.orders * k1 v1 k2 v2 ...
-- 这里 orderId 的 key 建议直接叫id,符合 voucherOrder实体里面的字段
redis.call('xadd', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId)

再更新抢单的业务逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Override
public Result seckillVoucher(Long voucherId) {
Long userId = UserHolder.getUser().getId();
long orderId = redisIdWorker.nextId("order");
// 1.执行lua脚本,使用的是stringRedisTemplate.execute()方法
Long result = stringRedisTemplate.execute(
SECKILL_SCRIPT,
Collections.emptyList(), // 这里不用传redis的key,所以传一个空列表,不能传null
voucherId.toString(), userId.toString(), String.valueOf(orderId) // 这里传其他参数(即非key的参数),以字符串形式
);
int r = result.intValue(); // 转成int型
// 2.判断结果是否为0
if (r != 0) {
// 2.1.不为0 ,代表没有购买资格
return Result.fail(r == 1 ? "库存不足" : "不能重复下单");
}
// 3.返回订单id
return Result.ok(orderId);
}

(3)项目启动时,开启一个线程任务,尝试获取stream.orders中的消息,完成下单

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
// 创建线程池,这里只创建了一个单线程池
private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();

// 单独线程应该一开始就执行任务,不断去消息队列里取。所以这里是写成类一初始化结束,就开启线程任务
// @PostConstruct注解是在当前类初始化完毕之后,立刻执行该方法
@PostConstruct
private void init() {
SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
}


// 创建单独线程要执行的任务
private class VoucherOrderHandler implements Runnable{

@Override
public void run() {
while (true) {
try {
// 1.获取消息队列中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS stream.orders >
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"), // 消费者组里的哪个消费者
StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)), // 读1条消息,阻塞2s
StreamOffset.create("stream.orders", ReadOffset.lastConsumed()) // 指定消息队列,以及是读取最新一条未处理的
);
// 2.判断消息获取是否成功,即订单信息是否为空
if (list == null || list.isEmpty()) {
// 如果为null,说明没有消息,继续下一次循环
continue;
}
// 解析消息中的订单数据
MapRecord<String, Object, Object> record = list.get(0); // 第一个string是消息id,后面两个是消息的内容,即lua脚本里写的键值对
Map<Object, Object> value = record.getValue();
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
// 3.创建订单
createVoucherOrder(voucherOrder);
// 4.确认消息 XACK stream.orders g1 id
stringRedisTemplate.opsForStream().acknowledge("s1", "g1", record.getId());
} catch (Exception e) {
log.error("处理订单异常", e);
// 5.死循环在正常情况下读取下一个未消费的消息,出现异常(即消息处理完之后没有返回ACK确认)那就再来一个死循环
// 去处理pending-list中已消费但未确认的消息,处理完了再回到正常流程的死循环
handlePendingList();
}
}
}

// 流程和上面类似
private void handlePendingList() {
while (true) {
try {
// 1.获取pending-list中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 STREAMS s1 0
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1), // 读 PendingList 不用阻塞
StreamOffset.create("stream.orders", ReadOffset.from("0")) // 读 PendingList 一定读已读未完成的消息
);
// 2.判断订单信息是否为空
if (list == null || list.isEmpty()) {
// 如果为null,说明没有异常消息,结束循环
break;
}
// 解析数据
MapRecord<String, Object, Object> record = list.get(0);
Map<Object, Object> value = record.getValue();
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
// 3.创建订单
createVoucherOrder(voucherOrder);
// 4.确认消息 XACK
stringRedisTemplate.opsForStream().acknowledge("s1", "g1", record.getId());
} catch (Exception e) {
log.error("处理PendingList订单异常", e);
}
}
}
}

随后即可进行测试,谨记抢购的优惠券id是12,11因为没往redis里存,是抢购不了的。先进行单人下单,看看逻辑有没有问题,测试结果显示没有问题,redis库存-1,订单集合+1,DB同理。同时,redis的消息队列 stream.orders 里多了 一条消息

image-20220705152334255

随后可以恢复库存,模拟高并发多人抢单,测试结果显示没有超卖,也保证了一人一单,至于平均响应时间和吞吐量,多次测试结果比较波动,但肯定比最开始的500多ms要好很多

四、社交功能


项目:商户点评项目
http://jswanyu.github.io/2022/06/27/项目/商户点评项目/
作者
万宇
发布于
2022年6月27日
许可协议