Redis:04.实战应用三:优惠券秒杀

秒杀业务是电商中必不可少的环节,在本项目中把优惠券作为唯一的商品用来交易,在整个秒杀业务中可以学习到跟Redis相关的运用

一、全局唯一ID

每个店铺都可以发布优惠券,当用户抢购时,就会生成订单并保存到 tb_voucher_order 这张表中,而订单表如果使用数据库自增ID就存在一些问题:

  • id的规律性太明显(比如今天下载ID是10,明天下单的ID是100,那么在这个过程中就销售了90单,我们不希望用户能掌握这些信息)

  • 受单表数据量的限制(订单的数据量非常的大,可能达到数千万或者数亿级,单表放不下那么多的数据就会拆分,但如果是自增长的话拆分的表的ID就会重复,不能保证 id 的唯一性)

全局ID生成器,是一种在分布式系统下用来生成全局唯一ID的工具,一般要满足下列特性:唯一性、高可用(随时都可以生成)、高性能、递增性(整体上还是呈现递增趋势,提高索引效率)、安全性(不能规律性太明显)。Redis是符合这些特性要求的选择之一,其中安全性稍微复杂

为了增加ID的安全性,我们可以不直接使用Redis自增的数值,而是拼接一些其它信息:

image-20220624122904025

ID的组成部分:

  • 符号位:1bit,永远为0
  • 时间戳:31bit,以秒为单位,可以使用69年
  • 序列号:32bit,秒内的计数器,支持每秒产生2^32个不同ID

其中序列号的组成又比较复杂,首先能想到的是用"icr:" + keyPrefix,icr:只是一个标识,表明这是全局自增id,keyPrefix 是业务前缀,订单业务有订单业务的全局自增id,其他业务也有其他的全局自增id

但如果只用这个是不行的,因为这样业务只有一个Key,随着时间的增长,ID会越来越多,而我们的序列号只有32位,但是订单量可能会超过2的32次方。所以为了避免这种情况,我们可以在后面拼接上一个日期,以天为单位,这样每天就能有不同的key。比如:icr:order:2022:01:01就表示2022年1月1号这一天的key。 Key里面的每一个value是自增长的。注意这里不要弄混,redis里面只保存自增长之后的值,然后给他返回,并不是把每个生成的ID都保存在redis里面,而是保存到数据库里。

其具体代码为

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@Component
public class RedisIdWorker {
/**
* 开始时间戳,2022.01.01 00:00:00
*/
private static final long BEGIN_TIMESTAMP = 1640995200L;
/**
* 序列号的位数,低32位用来放自增的id
*/
private static final int COUNT_BITS = 32;

@Autowired
private StringRedisTemplate stringRedisTemplate;

// keyPrefix 是业务前缀,不同的业务有不同的id,比如订单业务就是 order:...
public long nextId(String keyPrefix) {
// 1.生成时间戳:高32位
LocalDateTime now = LocalDateTime.now();
long nowSecond = now.toEpochSecond(ZoneOffset.UTC);
long timestamp = nowSecond - BEGIN_TIMESTAMP;

// 2.生成序列号:低32位
// 2.1.获取当前日期,精确到天
String date = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));
// 2.2.自增长
long count = stringRedisTemplate.opsForValue().increment("icr:" + keyPrefix + ":" + date);
// 3.拼接并返回:先左移32位再把低32位或上
return timestamp << COUNT_BITS | count;
}
}

其他一些全局唯一ID生成策略:

  • UUID(生成的是16进制的数值,返回结果是字符串结构,也不是单调递增的,不太友好)
  • snowflake算法(雪花算法,不依赖于redis,也是一种不错的策略)
  • 数据库自增(不是自增id了,而是单独用一张表维护id。这张表的主属性id是自增的,然后去维护订单表id,性能可能没redis好)

二、添加优惠券

每个店铺都可以发布优惠券,分为平价券和特价券。平价券可以任意购买,而特价券需要秒杀抢购:

image-20220624181857351

表关系如下:

  • tb_voucher:优惠券的基本信息,优惠金额、使用规则等

    image-20220624182518805

  • tb_seckill_voucher:优惠券的库存、开始抢购时间,结束抢购时间。特价优惠券才需要填写这些信息(需要注意的是秒杀券也是优惠券,所以它的主健是关联的优惠券的ID,可以理解为秒杀券是优惠券的扩展表)

    image-20220624182600891

正因为秒杀券是优惠券的扩展表,所以优惠券实体在设计的时候,把秒杀券对应的信息也封装到了实体里面,包含了库存、生效时间、创建时间等。具体的可以去实体类Voucher里查询。

**新增普通卷代码: **VoucherController

1
2
3
4
5
@PostMapping
public Result addVoucher(@RequestBody Voucher voucher) {
voucherService.save(voucher);
return Result.ok(voucher.getId());
}

新增秒杀卷代码:

VoucherController

1
2
3
4
5
@PostMapping("seckill")
public Result addSeckillVoucher(@RequestBody Voucher voucher) {
voucherService.addSeckillVoucher(voucher);
return Result.ok(voucher.getId());
}

VoucherServiceImpl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Override
@Transactional
public void addSeckillVoucher(Voucher voucher) {
// 保存优惠券
save(voucher);
// 保存秒杀信息
SeckillVoucher seckillVoucher = new SeckillVoucher();
seckillVoucher.setVoucherId(voucher.getId());
seckillVoucher.setStock(voucher.getStock());
seckillVoucher.setBeginTime(voucher.getBeginTime());
seckillVoucher.setEndTime(voucher.getEndTime());
seckillVoucherService.save(seckillVoucher);
// 保存秒杀库存到Redis中
stringRedisTemplate.opsForValue().set(SECKILL_STOCK_KEY + voucher.getId(), voucher.getStock().toString());
}

由于前台没有管理页面,所以我们这里使用 postman 去提交优惠券的信息,其对应的方法为VoucherController下的addSeckillVoucher,提交过程如下图所示

image-20220624185242506

其中的JSON字符串为:

1
2
3
4
5
6
7
8
9
10
11
12
{
"shopId": 1,
"title": "100元代金券",
"subTitle": "周一到周五均可使用",
"rules": "全场通用\\n无需预约\\n可无限叠加\\n不兑现,不找零\\n仅限堂食",
"payValue": 8000,
"actualValue": 10000,
"type": 1,
"stock": 100,
"beginTime": "2022-01-25T12:00:00",
"endTime": "2022-01-26T12:00:00"
}

需要注意的是截止时间不要晚于当前的系统时间,否则商店信息里面显示不出来优惠券信息

发送完之后去数据库里查询优惠券表和秒杀券表,查看对应信息。

三、实现优惠券下单

订单表:tb_voucher_order,具体的字段如下所示

image-20220712105159566

优惠券信息添加完毕后,我们就可以实现基本的优惠券下单功能,下单时需要判断两点:

  • 秒杀时间是否合理,如果尚未开始或已经结束则无法下单
  • 库存是否充足,不足则无法下单

当用户开始进行下单,我们应当去查询优惠卷信息,查询到优惠卷信息,判断是否满足秒杀条件

比如时间是否充足,如果时间充足,则进一步判断库存是否足够,如果两者都满足,则扣减库存,创建订单,然后返回订单id,如果有一个条件不满足则直接结束。其业务逻辑如下图所示

image-20220624195624101

代码为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {

@Resource
private ISeckillVoucherService seckillVoucherService;

@Resource
private RedisIdWorker redisIdWorker;

@Override
@Transactional // 涉及到了秒杀券表和优惠券订单表
public Result seckillVoucher(Long voucherId) {
// 1.查询优惠券:去秒杀券表里查
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
// 2.判断秒杀是否开始
if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
// 尚未开始
return Result.fail("秒杀尚未开始!");
}
// 3.判断秒杀是否已经结束
if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
// 尚未开始
return Result.fail("秒杀已经结束!");
}
// 4.判断库存是否充足
if (voucher.getStock() < 1) {
// 库存不足
return Result.fail("库存不足!");
}
// 5.扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock = stock - 1") // set stock = stock - 1
.eq("voucher_id", voucherId)
.update();
if (!success) {
// 扣减失败
return Result.fail("库存不足!");
}
// 6.创建订单
VoucherOrder voucherOrder = new VoucherOrder();
// 6.1.订单id,调用全局ID生成器
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
// 6.2.用户id
Long userId = UserHolder.getUser().getId();
voucherOrder.setUserId(userId);
// 6.3.代金券id
voucherOrder.setVoucherId(voucherId);
save(voucherOrder);
// 7.返回订单id
return Result.ok(orderId);
}
}

整体上的业务逻辑比较简单,没有很复杂的地方,其中涉及到了多张表的调用,需要使用事务进行控制。编写完成之后进行业务测试,在前端页面登录成功之后,点击抢购按钮,会返回订单号。再去优惠券订单表里查询信息,会发现有一条新增的购买记录。这样的话功能就没有问题

image-20220624201252460

四、超卖问题

在第2节中实现的内容是基本的优惠券下单,我们测试的方法是在前端的页面上点击抢购,但在真实的秒杀业务场景里,肯定会有成千上万的用户同时抢购会造成极高的并发量。

这里使用 JMeter 模拟高并发场景(现在只模拟一个用户使用多个线程下单,这里还没做一人一单的限制),这里为了方便测试,我们将上一节的优惠券订单信息删除,并把秒杀券的库存从99改为100,然后用200个线程同时抢购,观察最后的结果。使用 JMeter 测试时有几个注意事项,务必注意!

  • jmx文件可以用黑马提供的,注意把其中的HTTP请求里的登录状态头改为自己在redis里面的登陆凭证,为了方便调试,也建议在此处把登录凭证的有效期改为永久有效
  • http请求的路径最后优惠券id改成自己DB里的,视频里是7,自己的不一定是7,我这里是11
  • 每次新测试之前,都要点击工具栏里的清除数据,否则会对测试有干扰,这点很重要!!!容易忽略

这里的测试结果里的测试结果显示,数据库里优惠券的库存显示为-9,再去订单表里查询发现有109条订单,这就是超卖问题

这里的测试结果显示,数据库的优惠券的库存显示为负九。再去订单表里查询,发现有109条订单。这就是超卖问题。

image-20220625181444033

image-20220625181538849

超卖问题是由于多线程并发导致的,假设现在库存为1,有一个线程想查询库存,查询到为1想要对库存进行扣减,在扣减完成之前又有其他线程进来查询发现库存为1,随后也进行扣减库存的操作。这样就会导致多线程的并发问题。

针对这一问题的常见解决方案就是加锁:

  • 悲观锁:认为线程安全问题一定会发生,因此在操作数据之前先获取锁,确保线程串行执行,因此性能很差。例如Synchronized、Lock都属于悲观锁。由于悲观锁的代码比较常见,所以就不做过多的演示。
  • 乐观锁:认为线程安全问题不一定会发生,因此不加锁,只是在更新数据时去判断有没有其它线程对数据做了修改。性能较好
    • 如果没有修改则认为是安全的,自己才更新数据。
    • 如果已经被其它线程修改说明发生了安全问题,此时可以重试或异常。

乐观锁最常见的就是CAS操作,即先比较一开始查询到的库存和要更新数据库之前查询到的库存是否相同?如果相同则进行更新,如果不同则不更新数据库。

1
2
3
boolean success = seckillVoucherService.update()
.setSql("stock= stock -1") //set stock = stock -1
.eq("voucher_id", voucherId).eq("stock",voucher.getStock()).update(); //where id = ? and stock = ?

不过cas操作无法解决ABA的问题,我们可以添加版本号来解决,这里就不过多详细介绍。由于是库存的应用场景,到目前为止没有出现退款的业务,所以不会出现ABA的问题,这里就只演示cas操作,不添加版本号

这里还存在一个问题,就是如果严格按照比对数据库的值是否一致,cas的成功率会很低。假设100个线程同时来查询,按照cas的思想,只会有一个线程能成功,其他99个请求都失败。视频里也演示了多线程模拟的场景,发现只卖出去21单。因此这里不用严格的比对数据库前后查询到的值是否一致,而是只要在更新数据库操作之前,查到的库存值大于0就可以下订单并对数据库更新,小于0则不能下订单,这样就能够解决超卖问题。细节如下图所示,一开始库存为1

image-20220625183331283

对代码做出如下修改

1
2
3
4
5
6
7
8
9
// 5.扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock = stock - 1") // set stock = stock - 1
.eq("voucher_id", voucherId).gt("stock", 0) // where id =? and stock >0
.update();
if (!success) {
// 扣减失败
return Result.fail("库存不足!");
}

代码修改完毕之后可以对代码进行测试,测试之前记得将数据库的订单全部删除,将库存重新改为100,然后使用JMeter 工具进行测试。测试结果显示,这次的模拟没有任何问题,库存减为0并且也订单数也为100,多线程模拟的200个请求中,另外100个请求均为失败。

image-20220625191449021

image-20220625191538652

五、一人一单

1. 一人一单的并发安全问题

之前的模拟存在一个很大的问题,200个线程是同一个用户进行下单,这就导致了数据库里100个订单的用户是同一个人,但在实际业务中,这是不被允许的,我们要确保每一个用户只能对一张秒杀券下一次单,所以本节需要对业务代码进行改写。

实现一人一单的业务逻辑其实非常简单,只需要在判断库存是否充足之后,根据用户券ID和用户ID去查询订单,如果这个订单存在,那么就返回异常结果,提示用户无法再下单。如果不存在,才进行原来的扣减库存和创建订单操作。其业务逻辑如下图所示

image-20220625205612612

其修改后的代码为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
@Override
@Transactional // 涉及到了秒杀券表和优惠券订单表
public Result seckillVoucher(Long voucherId) {
// 1.查询优惠券:去秒杀券表里查
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
// 2.判断秒杀是否开始
if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
// 尚未开始
return Result.fail("秒杀尚未开始!");
}
// 3.判断秒杀是否已经结束
if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
// 尚未开始
return Result.fail("秒杀已经结束!");
}
// 4.判断库存是否充足
if (voucher.getStock() < 1) {
// 库存不足
return Result.fail("库存不足!");
}

// 5.一人一单
Long userId = UserHolder.getUser().getId();
// 5.1 查询订单
int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
// 5.2 判断是否存在
if (count > 0) {
// 用户已经购买过了
return Result.fail("用户已经购买过一次!");
}

// 6.扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock = stock - 1") // set stock = stock - 1
.eq("voucher_id", voucherId).gt("stock", 0) // where id = ? and stock >0
.update();
if (!success) {
// 扣减失败
return Result.fail("库存不足!");
}

// 7.创建订单
VoucherOrder voucherOrder = new VoucherOrder();
// 7.1.订单id,调用全局ID生成器
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
// 7.2.用户id
voucherOrder.setUserId(userId);
// 7.3.代金券id
voucherOrder.setVoucherId(voucherId);
save(voucherOrder);

// 8.返回订单id
return Result.ok(orderId);
}

现在进行多线程并发测试,在测试前记得把数据库的订单全部删除,再把库存恢复到100,随后测试结果显示库存为90,订单里有10行数据,说明还是存在问题,按理说一个人只能下一个订单,结果下了10个订单,说明还是存在着多线程的并发安全问题。

这里还是和之前超卖问题产生的原因一样,都是先进行查询操作再去更新数据库,现在假设库存为100,没有出售任何优惠券,那么假设有多个线程来查询订单,查到的订单数为0,然后多个线程进行更新数据库的操作,就出现了多线程的并发安全问题。

线程运行逻辑图–正常情况与异常情况:

2. 并发问题解决–同步方法

这里之前也提到过,用悲观锁或者乐观锁的方法来解决,之前我们是使用CAS的操作来解决,但这里没有办法使用CAS的操作,因为CAS只能在更新数据时使用,一开始查一次,更新之前查一次,对比并设置值。这里是在插入数据,是为每个用户新增一个订单,只能判断是否存在,没有办法去查询原来的值,再比较更新。所以这里我们只能使用悲观锁的方式。即乐观锁比较适合更新数据,而现在是插入数据,所以我们需要使用悲观锁操作

加锁后线程运行逻辑图:

image-20220626214823833

接下来就是决定悲观锁的范围,这里我们可以把一人一单,扣减库存和创建订单三个部分,单独封装到一个方法里,然后给方法加上 synchronized 与 事务管理注解, 前面的一些查询操作不用加入同步代码快和事务管理注解

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {

@Resource
private ISeckillVoucherService seckillVoucherService;

@Resource
private RedisIdWorker redisIdWorker;

@Override
public Result seckillVoucher(Long voucherId) {
// 1.查询优惠券:去秒杀券表里查
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
// 2.判断秒杀是否开始
if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
// 尚未开始
return Result.fail("秒杀尚未开始!");
}
// 3.判断秒杀是否已经结束
if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
// 尚未开始
return Result.fail("秒杀已经结束!");
}
// 4.判断库存是否充足
if (voucher.getStock() < 1) {
// 库存不足
return Result.fail("库存不足!");
}

return createVoucherOrder(voucherId);
}

@Transactional // 涉及到了秒杀券表和优惠券订单表
public synchronized Result createVoucherOrder(Long voucherId) {
// 5.一人一单
Long userId = UserHolder.getUser().getId();
// 5.1 查询订单
int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
// 5.2 判断是否存在
if (count > 0) {
// 用户已经购买过了
return Result.fail("用户已经购买过一次!");
}

// 6.扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock = stock - 1") // set stock = stock - 1
.eq("voucher_id", voucherId).gt("stock", 0) // where id = ? and stock >0
.update();
if (!success) {
// 扣减失败
return Result.fail("库存不足!");
}

// 7.创建订单
VoucherOrder voucherOrder = new VoucherOrder();
// 7.1.订单id,调用全局ID生成器
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
// 7.2.用户id
voucherOrder.setUserId(userId);
// 7.3.代金券id
voucherOrder.setVoucherId(voucherId);
save(voucherOrder);

// 8.返回订单id
return Result.ok(orderId);
}
}

这样处理之后,再次模拟多线程并发测试,会发现库存只减了1,只有一个优惠券订单生成,达到我们的目的。测试结果如下图:

同步方法结果

同步方法结果1

同步方法结果2

但是这样的写法有一个非常影响性能的地方就是在方法上加锁,即业务实现类的 this 被加锁了,有多个用户下订单,只有这个用户处理完了才能处理其他用户,会造成串行化执行,不能并发处理,大大降低性能。比如ABC等200人发了请求,只能等A处理完了,才处理其他人的请求

总结:在使用锁过程中,控制 锁粒度 是一个非常重要的事情,因为如果锁的粒度太大,会导致每个线程进来都会锁住,所以我们需要去控制锁的粒度。这里引出的知识点是:使用 synchronized 时,更推荐使用同步方法块,而不是使用同步方法

3. 并发问题解决–同步方法块

这里我们其实可以对每个用户进行加锁,因为每个用户的ID是不同的,对这个用户加锁不影响其他的线程里其他的用户下单,这样能够真正的实现并发处理。

后面会考虑如何对用户ID加锁,刚开始写的代码是如下所示,只对传过来的 userId 对象加锁,测试的结果是并没有实现一人一单,一个用户还是加了100单,让库存减为0

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Transactional  // 涉及到了秒杀券表和优惠券订单表
public Result createVoucherOrder(Long voucherId) {
// 5.一人一单
Long userId = UserHolder.getUser().getId();
synchronized (userId) {
// 5.1 查询订单

// 5.2 判断是否存在

// 6.扣减库存

// 7.创建订单

// 8.返回订单id
}
}

后来分析是因为相同用户的多个请求传过来的 userId 值虽然是一样的,但是每次请求获得的 userId 都是不同的对象,这就导致其实没有加锁成功,让100个对象下了100单。

这里就可以提到另外一个知识是:局部变量是线程安全的,但局部变量的引用不一定线程安全。放在此处应该如何正确理解?现在还是只有一个用户,使用200个线程前来下单,每个线程里都生成了 userId 这样一个局部变量对象,由于库存100,所以还是下了100单。

所以加锁效果没达成,因为我们是希望一人一单的。希望的效果更类似于局部变量的引用不一定线程安全,比如学习多线程时的一张图。即让多个线程都去访问那一个对象的值,判断那个对象的值在DB里是否已经存在了,不过要注意我们是希望在线程安全的环境下,去访问那一个对象的值。(举这个例子不一定恰当,希望我复习时能回味到表达的意思)

image-20220116123108856

所以我们不应该对 userId 对象进行加锁,应该对 userId 的值进行加锁,可以将其转为字符串,调用 toString()方法

1
2
3
4
5
6
7
8
@Transactional  // 涉及到了秒杀券表和优惠券订单表
public Result createVoucherOrder(Long voucherId) {
// 5.一人一单
Long userId = UserHolder.getUser().getId();
synchronized (userId.toString()) {
...下面没变
}
}

这样就行了吗?通过测试我们发现结果还是和之前的一样,即100个订单,库存降为0,这里面还有一个坑就是 toString()方法其实并没有转成唯一的字符串,因为 toString()方法的底层也还是 new 了字符串对象,也就是说我们这里依然是给100个对象加锁,而不是给对象的固定值加锁

1
2
3
4
5
6
7
8
9
10
11
12
public static String toString(long i) {
int size = stringSize(i);
if (COMPACT_STRINGS) {
byte[] buf = new byte[size];
getChars(i, size, buf);
return new String(buf, LATIN1);
} else {
byte[] buf = new byte[size * 2];
StringUTF16.getChars(i, size, buf);
return new String(buf, UTF16);
}
}

所以要调用字符串的intern()方法,即让字符串入池,这个方法调用之后,new 的100个字符串对象,通过 intern()方法返回的对象,永远是字符串池里的那个唯一的对象

1
2
3
4
5
6
7
8
@Transactional  // 涉及到了秒杀券表和优惠券订单表
public Result createVoucherOrder(Long voucherId) {
// 5.一人一单
Long userId = UserHolder.getUser().getId();
synchronized (userId.toString().intern()) {
...下面没变
}
}

完成这些之后再去测试,会发现库存只减1,并只创建1个订单,这样才实现了一人一单的业务需求。到这里表面上关于多线程实现一人一单并发问题已经实现了,注意体会一开始一人多单 –> 在整个方法上加锁 –> 同步方法块加锁 –> 给 userid 对象加锁 –> 给 userid 字符串对象加锁 –> 给 userid 字符串入池对象加锁,这样整个优化的过程

但是这样做其实还是有问题(好多问题啊!),这就要和数据库的事务进行一起探讨,如下块代码所示,目前的的同步代码块是在一个方法内部的,当这个同步方法块的语句执行完毕之后,锁就被释放了,其他线程就可以继续访问这个方法。而如果数据库的事务没有提交之前就有其他线程在此时访问了这个方法,那也会导致多线程的并发问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Override
public Result seckillVoucher(Long voucherId) {
// 1.查询优惠券:去秒杀券表里查
// 2.判断秒杀是否开始
// 3.判断秒杀是否已经结束
// 4.判断库存是否充足
...
return createVoucherOrder(voucherId);
}

@Transactional // 涉及到了秒杀券表和优惠券订单表
public Result createVoucherOrder(Long voucherId) {
// 5.一人一单
Long userId = UserHolder.getUser().getId();
synchronized (userId.toString().intern()) {
...下面没变
}
...
}

所以我们需要对整个方法都进行加锁,但不能在方法上加 synchronized,否则又回到了之前的问题。而是要加在整个方法的调用语句外,如下代码所示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
  @Override
public Result seckillVoucher(Long voucherId) {
// 1.查询优惠券:去秒杀券表里查
// 2.判断秒杀是否开始
// 3.判断秒杀是否已经结束
// 4.判断库存是否充足

// 注意这里
Long userId = UserHolder.getUser().getId();
synchronized (userId.toString().intern()) {
return createVoucherOrder(voucherId);
}
}

@Transactional // 涉及到了秒杀券表和优惠券订单表
public Result createVoucherOrder(Long voucherId) {
// 5.一人一单
Long userId = UserHolder.getUser().getId();
// 5.1 查询订单
int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
...
}

在调用方法完全结束之后才结束同步代码块,结束之前,事务肯定已经提交过了。

但是还有问题,涉及到Spring事务管理中事务失效的几种可能情况,这里我之前没有学过,暂时先记录一下。情况就是:我们给createVoucherOrder方法加了事务注解,但是seckillVoucher没有加,那么在后者里调用前者的时候,调用的是下面这段代码,其实其中省略了一个 this ,也就是说这个this 是拿到了当前业务实现类对象VoucherOrderServiceImpl,而不是它的代理对象。我们要知道事务要想生效,是 this拿到了事务的代理对象,如果不是代理对象,spring的事务就会失效

1
2
3
synchronized (userId.toString().intern()) {
return this.createVoucherOrder(voucherId);
}

如何解决呢,拿到类的代理对象即可,用如下代码

1
2
3
4
5
6
7
8
9
{
...
// 注意这里
Long userId = UserHolder.getUser().getId();
synchronized (userId.toString().intern()) {
// 获得代理对象
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId); // 用代理对象去调用加了事务管理注解的方法
}

不过这之前要进行一些额外配置,先去加环境依赖

1
2
3
4
<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjweaver</artifactId>
</dependency>

再去启动类加上一个注解和设置 exposeProxy 为 true,这才能让代理对象暴露出来,否则是无法获取代理对象的

1
2
3
4
5
6
7
8
9
10
@EnableAspectJAutoProxy(exposeProxy = true)  // 原来默认值是false
@MapperScan("com.hmdp.mapper")
@SpringBootApplication
public class HmDianPingApplication {

public static void main(String[] args) {
SpringApplication.run(HmDianPingApplication.class, args);
}

}

最终的测试结果也是实现了一人一单,这里就不放图了。(有个问题:吞吐量显示还没有之前同步方法的高,不知道这里看吞吐量有没有意义,因为只有一个用户,只能下一单,视频里没有提到这个)

六、分布式锁

1. 集群下的线程并发安全问题

通过加锁可以解决在单机情况下的一人一单安全问题,但是在集群模式下就不行了。我们将服务启动两份,端口分别为8081和8082,在这里只用把原来的 configuration 复制一份即可(按 ctrl + D),然后更改命名,添加虚拟机参数。随后将两个服务启动(以debug模式启动,后面需要打断点,养成以debug模式启动的习惯),确认分别部署在8081和8082端口

image-20220626211027094

然后修改nginx的conf目录下的nginx.conf文件,配置反向代理和负载均衡:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
server {
listen 8080;
server_name localhost;
# 指定前端项目所在的位置
location / {
root html/hmdp;
index index.html index.htm;
}

error_page 500 502 503 504 /50x.html;
location = /50x.html {
root html;
}


location /api {
default_type application/json;
#internal;
keepalive_timeout 30s;
keepalive_requests 1000;
#支持keep-alive
proxy_http_version 1.1;
rewrite /api(/.*) $1 break;
proxy_pass_request_headers on;
#more_clear_input_headers Accept-Encoding;
proxy_next_upstream error timeout;
#proxy_pass http://127.0.0.1:8081;
proxy_pass http://backend;
}
}

upstream backend {
server 127.0.0.1:8081 max_fails=5 fail_timeout=10s weight=1;
server 127.0.0.1:8082 max_fails=5 fail_timeout=10s weight=1;
}

修改配置文件之后,使用如下命令重载配置

1
nginx -s reload

重载完之后,避免之前的 nginx 进程对测试造成影响,我们将其全部杀掉

1
taskkill /IM nginx.exe /F

然后去输入请求:http://localhost:8080/api/voucher/list/1,请求两次,应该能够在两个服务上都出现查询信息,测试没有问题才能代表集群模式和负载均衡配置成功。

image-20220626211833996

image-20220626211929443

接下来测试是否存在线程安全问题,我们还是将数据库的订单优惠券订单全部删掉,将库存恢复到100。使用postman 进行测试:http://localhost:8080/api/voucher-order/seckill/11,两个请求都是去秒杀11号优惠券,而且是同一个用户

Snipaste_2022-06-26_21-26-11

在发送请求之前,在代码中同步代码快里打上断点(确认服务是以 debug 的模式启动),这是为了方便测试,要不然请求很快就发送结束,达不到演示并发的效果(注意我们都是在讨论并发的场景下可能出现的问题,如果是顺序执行,哪怕是集群也没影响,我们这里打断点就是为了能够达到并发查询数据库的条件)。随后发送两个请求,会发现两个服务都运行至断点处,并且两个的用户id是相同的

image-20220626213711476

后面把两个服务都放行,去数据库里查看会发现同一个用户下了两个订单,库存减为98,这也就意味着在集群模式下,我们又再次出现了多线程并发安全问题,并没有把同一个用户给锁住,从而让一个用户能够下多个订单

image-20220626214144521

下面分析一下出现这个问题的原因:用了集群模式之后就会有多个 tomcat 的服务器,也就会有多个 JVM ,那么每

个 JVM 都有自己的堆、栈、方法区、常量池,所以我们之前认为把每一个用户的user ID的值给锁住,在多个JVM里面就不现实了,因为每个JVM的常量池都可以有一个这个用户的 userID 值。这就是在集群模式(分布式模式)下带来新的并发线程安全问题。

image-20220626215013052

解决这个问题的技术叫分布式锁。

2. 分布式锁理论知识

分布式锁:满足分布式系统或集群模式下多进程可见并且互斥的锁。

前文介绍的,在集群模式下出问题的原因就是因为多个jvm里面有多个锁监视器,导致了userid的值没锁住,那么我们可以尝试让多个jvm只有一个锁监视器,这样就能够把值给锁住。

image-20220627162106313

那么分布式锁就得有如下要求:多进程可见、互斥、高可用、高性能、安全性

分布式锁的核心是实现多进程之间互斥,而满足这一点的方式有很多,常见的有三种:

MySQL Redis Zookeeper
互斥 利用mysql本身的互斥锁机制 利用setnx这样的互斥命令 利用节点的唯一性和有序性实现互斥
高可用
高性能
安全性 断开连接,自动释放锁 利用锁超时时间,到期释放 临时节点,断开连接自动释放
优点 直接借助数据库容易理解 性能好,实现起来较为方便 有效地解决单点问题,不可重入问题,非阻塞问题以及锁无法释放的问题,实现起来较为简单。
缺点 在使用关系型数据库实现分布式锁的过程中会出现各种问题,例如数据库单点问题和可重入问题,并且在解决过程中会使得整个方案越来越复杂 key的过期时间设置难以确定,如何设置的失效时间太短,方法没等执行完,锁就自动释放了,那么就会产生并发问题。如果设置的时间太长,其他获取锁的线程就可能要平白的多等一段时间。Redis的集群部署虽然能解决单点问题,但是并不是强一致性的,锁的不够健壮 性能上不如使用缓存实现分布式锁

zookeeper也是企业级开发中较好的一个实现分布式锁的方案,由于本套视频并不讲解zookeeper的原理和分布式锁的实现,所以不过多阐述。本节主要探讨 redis 作为分布式锁的方案,实现分布式锁时需要实现的两个基本方法:

(1)获取锁:

  • 互斥:确保只能有一个线程获取锁

    redis里有setnx的命令,即原来没有这个 key 才能够设置成功,如果有的话就设置不成功,并且为了避免 redis 服务宕机使得这个key一直存在,我们要给 key 设定一个超时时间,到这个时间之后需要自动删除,避免引起死锁。注意不能使用两个分开的命令,比如先SETNX lock ...EXPIRE lock 10,因为服务器的宕机有可能会发生在这两条命令之间,所以我们要保证这两个操作具备原子性,即让这两个操作在一条命令中执行完成。实现起来也很简单,set命令有很多可选参数

    1
    2
    # 添加锁 NX是互斥 EX是设置超时时间
    SET lock thread1 NX EX 10
  • 互斥锁有两种具体的形式分别为阻塞式锁和非阻塞式锁,阻塞式锁获取锁失败后会原地等待,不断重试,另一种是非阻塞式锁,尝试一次之后,如果失败就直接停止。这里我们选择非阻塞式锁,尝试一次,成功返回true,失败返回false。(因为阻塞式锁有点浪费CPU性能,实现起来也比较麻烦)

(2)释放锁:手动释放(del …)、超时自动释放。注意超时自动释放只是为了防止服务宕机的异常情况,大部分的正常的情况还是需要手动释放锁的

整个流程如下图所示:

image-20220627165129456

3. 代码实现Redis分布式锁

首先定义一个接口,其中包含 获取锁和释放锁 的方法

1
2
3
4
5
6
7
8
9
10
11
12
13
public interface ILock {
/**
* 尝试获取锁
* @param timeoutSec 锁持有的超时时间,过期后自动释放
* @return true代表获取锁成功; false代表获取锁失败
*/
boolean tryLock(long timeoutSec);

/**
* 释放锁
*/
void unlock();
}

实现类如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class SimpleRedisLock implements ILock{
private String name; // 锁名字
private StringRedisTemplate stringRedisTemplate;

public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {
this.name = name;
this.stringRedisTemplate = stringRedisTemplate;
}

private static final String KEY_PREFIX = "lock:";

@Override
public boolean tryLock(long timeoutSec) {
// 获取线程标识,作为值
long threadId = Thread.currentThread().getId();
// 获取锁,setIfAbsent就是类似setnx指令
Boolean success = stringRedisTemplate.opsForValue()
.setIfAbsent(KEY_PREFIX + name, String.valueOf(threadId), timeoutSec, TimeUnit.SECONDS);
return Boolean.TRUE.equals(success); // 这样写是为了防止包装类有null值,还拆箱
}

@Override
public void unlock() {
// 释放锁
stringRedisTemplate.delete(KEY_PREFIX + name);
}
}

订单业务秒杀代码中需要对应修改,之前是加 synchronized,现在是自己手动创建锁、释放锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@Override
public Result seckillVoucher(Long voucherId) {
// 1.查询优惠券:去秒杀券表里查
// 2.判断秒杀是否开始
// 3.判断秒杀是否已经结束
// 4.判断库存是否充足

// 注意这里
Long userId = UserHolder.getUser().getId();
// 创建锁对象
SimpleRedisLock lock = new SimpleRedisLock("order:" + userId, stringRedisTemplate);
// 获取锁,方便调试将超时时间设置长一点,实际上要根据业务执行时间来定,比如业务只用执行500ms,设置个5s最多了
boolean isLock = lock.tryLock(3600);
// 判断是否获取锁成功
if(!isLock){
// 获取锁失败,返回错误提示
return Result.fail("不允许重复下单!");
}
// 防止出现异常,放到try-finally里,最终释放锁
try {
// 获得代理对象(事务)
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId); // 用代理对象去调用加了事务管理注解的方法
} finally {
// 释放锁
lock.unlock();
}
}

随后可以进行测试,测试的过程还是跟上一节最后集群模式下并发安全问题中测试过程一样,在获取锁那里打断点,能够发现两个请求只有一个的 isLock 为true

1 2

测试的结果显示,使用redis加分布式锁之后,集群模式下,相同的用户也只能下一单

3

4. 分布式锁误删问题

针对于上面的应用场景,还存在着一个隐藏的问题,现在假设线程1获得了redis 锁,然后他的业务阻塞了,此时锁释放会有两种情况,第1种情况是阻塞恢复,它继续完成业务然后释放锁,第2种情况是超时然后自动的释放锁。

现在假设线程一的业务阻塞很长,也就是后一种情况。redis 锁是超时释放的,然后这个时候线程2进来了,他获得了锁,然后执行业务过程中线程1业务不阻塞了,他完成了自己的业务,按照我们之前代码的逻辑是完成业务之后就释放出来,他也不管是不是自己的线程,反正就直接把锁释放出来,这样就会导致其他的线程又可以来拿到这个锁,现在假设线程3也进来了,然后就会获得这个锁,那么又会出现两个线程同时拿到了锁。

整个过程如下图所示:

image-20220627211324516

其核心的问题就在于释放锁的时候没有判断是不是属于自己的线程,所以我们把这个过程给加上,即获取锁时存入线程的标识,释放锁之前判断下锁标识是否是自己,就能解决上面的问题,用下图展示

image-20220627211751249

对应的逻辑流程要改变下:

image-20220627211557493

之前的代码中,我们已经把线程标识存在了 redis 的值里,这里需要回忆一下,当时我们是把业务名称作为Key,线程号作为value存到 redis 里面。不过仅用线程ID作为 value 不太严谨,因为多个集群里可能会出现相同的线程号,所以这里再拼上一段随机字符作为 value ,即最终的线程标识。最后释放锁之前把他取出来比较一下就行。

我们对代码做出如下改动:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class SimpleRedisLock implements ILock{
private String name; // 锁名字
private StringRedisTemplate stringRedisTemplate;

public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {
this.name = name;
this.stringRedisTemplate = stringRedisTemplate;
}

private static final String KEY_PREFIX = "lock:";
// 线程标识,不仅仅用线程ID,因为多个集群里可能会出现相同的线程号,所以这里再拼上一段随机字符。isSimple设为true表示不要加下划线
private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";

@Override
public boolean tryLock(long timeoutSec) {
// 获取线程标识,作为值,拼上随机字符串
String threadId = ID_PREFIX + Thread.currentThread().getId();
// 获取锁,setIfAbsent就是类似setnx指令
Boolean success = stringRedisTemplate.opsForValue()
.setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS);
return Boolean.TRUE.equals(success); // 这样写是为了防止包装类有null值,还拆箱
}

@Override
public void unlock() {
// 获取线程标识
String threadId = ID_PREFIX + Thread.currentThread().getId();
// 获取锁中的标识
String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);
if(threadId.equals(id)){
// 释放锁
stringRedisTemplate.delete(KEY_PREFIX + name);
}
}
}

测试过程我们可以先让一个请求完成,然后手动去 redis 里面把锁给删掉,模拟超时删除的情况,然后再去观察第2个请求,把断点打在判断是否是自己锁标识的那个地方,就可以观察到。测试过程不过多写了,具体的看视频。

5. Lua脚本保证分布式锁的原子性

之前的内容虽然解决了分布式锁误删的问题,但其实在一种极端的环境下还是有可能存在着误删现象,就是在判断锁标识是否一致之后,释放锁之前,线程1进入阻塞状态,如果这个阻塞时间很长就会触发超时释放锁,另一个线程获取锁之后,如果线程1此时阻塞结束,会直接释放锁,因为在之前它已经判断过了。如下图所示

image-20220628111928137

为了解决这个问题,我们要保证 判断锁标识 和 释放锁 这两个操作具备原子性

Redis提供了Lua脚本功能,在一个脚本中编写多条Redis命令,确保多条命令执行时的原子性。Lua是一种编程语言,它的基本语法大家可以参考网站:https://www.runoob.com/lua/lua-tutorial.html

这里重点介绍Redis提供的调用函数,我们可以使用lua去操作redis,又能保证他的原子性,这样就可以实现拿锁比锁删锁是一个原子性动作了,作为Java程序员这一块并不作一个简单要求,并不需要大家过于精通,只需要知道他有什么作用即可。这部分不详细记录Lua的知识点了,知道脚本能够保证多条redis命令的原子性就可以

重点介绍一下Redis提供的调用函数,语法如下:

1
redis.call('命令名称', 'key', '其它参数', ...)

例如,我们要执行set name jack,则脚本是这样:

1
2
# 执行 set name jack
redis.call('set', 'name', 'jack')

例如,我们要先执行set name Rose,再执行get name,则脚本如下:

1
2
3
4
5
6
# 先执行 set name jack
redis.call('set', 'name', 'Rose')
# 再执行 get name
local name = redis.call('get', 'name')
# 返回
return name

如果脚本中的key、value不想写死,可以作为参数传递。key类型参数会放入KEYS数组,其它参数会放入ARGV数组,在脚本中可以从KEYS和ARGV数组获取这些参数

在resourse文件夹下添加文件 unlock.lua

1
2
3
4
5
6
7
-- 这里的 KEYS[1] 就是锁的key,这里的ARGV[1] 就是当前线程标示
-- 比较线程标示与锁中的标示是否一致
if(redis.call('get', KEYS[1]) == ARGV[1]) then
-- 释放锁 del key
return redis.call('del', KEYS[1])
end
return 0

去 SimpleRedisLock 更改释放锁的代码,需要先创建一个redis脚本的对象,DefaultRedisScript

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 静态代码块加载lua脚本
private static final DefaultRedisScript<Long> UNLOCK_SCRIPT; // 定义redis脚本变量
static { // 放到静态代码块里,这样类加载时就能够加载好脚本文件,而不是等每次要调用时候才加载
UNLOCK_SCRIPT = new DefaultRedisScript<>();
UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
UNLOCK_SCRIPT.setResultType(Long.class);
}
...

@Override
public void unlock() {
// 调用lua脚本,使用的是stringRedisTemplate.execute()方法
stringRedisTemplate.execute()(
UNLOCK_SCRIPT,
Collections.singletonList(KEY_PREFIX + name),
ID_PREFIX + Thread.currentThread().getId());
}

以上的更改就保证了多条redis命令的原子性

到目前为止,我们实现的简单的redis分布式锁,就可以完成基本的需求,确保集群模式下的分布式多线程并发安全问题,其实现思路为:

  • 利用set nx ex获取锁,并设置过期时间,保存线程标示
  • 释放锁时先判断线程标示是否与自己一致,一致则删除锁
    • 特性:
      • 利用set nx满足互斥性
      • 利用set ex保证故障时锁依然能释放,避免死锁,提高安全性
      • 利用Redis集群保证高可用和高并发特性

七、分布式锁优化–Redisson

我们自己实现简单的分布式锁,即基于setnx实现的分布式锁存在下面的问题:

  • 不可重入:同一个线程无法多次获取同一把锁
  • 不可重试:获取锁只尝试一次就返回false,没有重试机制
  • 超时释放:锁超时释放虽然可以避免死锁,但如果是业务执行耗时较长,也会导致锁释放,存在安全隐患
  • 主从一致性:如果Redis提供了主从集群,主从同步存在延迟,当主宕机时,如果从同步主中的锁数据,则会出现问题

还有很多成熟的第三方工具

1. Redisson入门

Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务,其中就包含了各种分布式锁的实现。

image-20220629111517608

官网地址: https://redisson.org

GitHub地址: https://github.com/redisson/redisson

下面演示一下redisson的使用

(1)导入依赖

1
2
3
4
5
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.13.6</version>
</dependency>

(2)添加配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Configuration
public class RedissonConfig {

@Bean
public RedissonClient redissonClient(){
// 配置
Config config = new Config();
// redis单机
config.useSingleServer().setAddress("redis://127.0.0.1:6379").setPassword("123321");

// 创建RedissonClient对象
return Redisson.create(config);
}
}

虽然redisson也有配置文件配置的方法,但一般不建议,会覆盖原本redis的配置,推荐采用这种配置类的做法

(3)使用Redisson的分布式锁

和之前一样,也是分为获取锁、尝试创建锁、释放锁三个步骤

redisson的tryLock方法可以设置三个参数,分别是:获取锁的最大等待时间(期间会重试),锁自动释放时间,时间单位。无参默认情况是:不重试、自动超时为30s

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@Resource
private RedissonClient redissonClient;

@Override
public Result seckillVoucher(Long voucherId) {
// 1.查询优惠券:去秒杀券表里查
// 2.判断秒杀是否开始
// 3.判断秒杀是否已经结束
// 4.判断库存是否充足

// 分布式锁
Long userId = UserHolder.getUser().getId();
// 创建锁对象
RLock lock = redissonClient.getLock("lock:order:" + userId); // 这里是可重入的锁
// redisson的tryLock方法可以设置三个参数,分别是:获取锁的最大等待时间(期间会重试),锁自动释放时间,时间单位
// 无参默认情况是:不重试、自动超时为30s,我们代码里的逻辑也是不重试,所以就选择无参的情况
boolean isLock = lock.tryLock();
// 判断是否获取锁成功
if(!isLock){
return Result.fail("不允许重复下单!");
}
try {
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId); // 用代理对象去调用加了事务管理注解的方法
} finally {
// 释放锁
lock.unlock();
}
}

测试结果显示,在集群模式下还是能够实现和之前一样的效果,即保证一人一单。(这里虽然用了Redisson,但没有演示可重入、可重试等特性,所以效果和之前一样)

2. 可重入锁原理

3. 可重试原理

4. 主从一致性原理

八、秒杀优化

1. 秒杀优化逻辑

暂时先不用管之前的分布式锁等一系列优化,回顾整个秒杀的业务流程。

image-20220629155050888

服务器里整个业务的流程是串行执行,并且多次涉及到去DB里查询和更新(箭头部分),无疑是性能很低的操作。因为DB支撑并发的能力很低,所以我们需要想办法优化这个问题。将不涉及DB的部分单独拿出来处理,即判断秒杀库存和校验一人一单环节。

这里我们进行性能测试,模拟1000个用户同时下单,这里跟随视频提前把秒杀券库存改为200,删除用户订单

生成1000个token到redis里,并保存到txt文件中,视频里没交代,就自己写的代码,我自己测试是没问题的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
@SpringBootTest
public class CreateTokensTest {
// 可以先去配置文件里改成2号数据库少循环几次试试,不影响原来的数据库

@Resource
private StringRedisTemplate stringRedisTemplate;

@Resource
private UserMapper userMapper;

@Test
public void Create_1000_Tokens(){
// 这里读取id 1-1000的用户,在用黑马资料创建表时,id为3、7、8、9这几个用户没有创建,记得手动创建下
FileWriter fw = null;
try {
File file = new File("tokens.txt");
fw = new FileWriter(file);
for (int i = 1; i <= 1000; i++) {
User user = userMapper.selectById(i);
UserDTO userDTO = BeanUtil.copyProperties(user, UserDTO.class);
System.out.println(userDTO);
Map<String, Object> userMap = BeanUtil.beanToMap(userDTO, new HashMap<>(),
CopyOptions.create() // 自定义规则
.setIgnoreNullValue(true) // 忽略空的值(UserDTO里有icon属性暂时还没管)
.setFieldValueEditor((fieldName, fieldValue) -> fieldValue.toString())); // 将所有的属性都转成String类型
String token = UUID.randomUUID().toString(true);
String tokenKey = LOGIN_USER_KEY + token;
stringRedisTemplate.opsForHash().putAll(tokenKey, userMap);
// 写token到文件里
fw.write(token);
fw.write("\r\n");
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
fw.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}

然后在 JMeter 里进行设置,主要就是将登陆状态头改为 ${token},然后添加一个csv数据文件设置,去读取 tokens.txt,这部分可以看视频,或者直接使用黑马提供资料里的 jmx 文件。测试结果如下:能够完成一人一单业务,库存为0,下了200个订单,这个图就不截了。压测的结果如下:

image-20220629211638686

平均响应时间是 508ms(还是比较慢的),吞吐量每秒 1164

所以秒杀业务优化的思路是:

(1)先利用Redis完成库存余量、一人一单判断,完成抢单业务

(2)再将下单业务放入阻塞队列,利用独立线程异步下单

这个就和我们平常生活中下单的逻辑很接近了,都是先下单,然后跳出一个付款的页面(这部分跳出页面和让人付款的时间就是一段缓冲时间,让DB的压力没那么大),并且也可以暂时不付款,下单成功之后,有一段时间限制,在这段时间内付款就可以。

我们可以不可以使用异步编排来做,或者说我开启N多线程,N多个线程,一个线程执行查询优惠卷,一个执行判断扣减库存,一个去创建订单等等,然后再统一做返回,这种做法和课程中有哪种好呢?答案是课程中的好,因为如果你采用我刚说的方式,如果访问的人很多,那么线程池中的线程可能一下子就被消耗完了,而且你使用上述方案,最大的特点在于,你觉得时效性会非常重要,但是你想想是吗?并不是,比如我只要确定他能做这件事,然后我后边慢慢做就可以了,我并不需要他一口气做完这件事,所以我们应当采用的是课程中,类似消息队列的方式来完成我们的需求,而不是使用线程池或者是异步编排的方式来完成这个需求

如何用Redis完成库存余量、一人一单的判断?

自然要利用缓存,将DB里的库存信息缓存到redis里,用普通的 string 结构就行,key就是优惠券订单,值就是 库存余量。判断库存是否充足只需要导redis中去根据key找对应的value是否大于0即可,如果不充足,则直接结束,如果充足,继续在redis中判断用户是否可以下单

一人一单的业务就需要用到 set 结构,将下单的用户都放到 set里,就能保证一个人只能下一单,key 还是优惠券订单,值就是下单的用户id。如果set集合中没有这条数据,说明他可以下单,如果set集合中没有这条记录,则将userId和优惠卷存入到redis中,并且返回0。

并且我们要保证整个过程的原子性,所以要放到 lua 脚本里,其整体业务逻辑为:

image-20220629160045409

根据 Lua 脚本返回的标识来判断下一步如何处理,返回1或者2的异常信息,就不用处理,直接结束,返回0才表示要真正下单,我们将下单业务所需要的优惠券id,用户id,和订单id 放入阻塞队列,利用独立线程异步下单。其完整的流程为:

image-20220629160741070

2. 代码实现秒杀优化

需求:

①新增秒杀优惠券的同时,将优惠券信息保存到Redis中

②基于Lua脚本,判断秒杀库存、一人一单,决定用户是否抢购成功

③如果抢购成功,将 优惠券id 、 用户id 、订单id 封装后存入阻塞队列

④开启线程任务,不断从阻塞队列中获取信息,实现异步下单功能

(后两个需求放到下一节讲)

第一步,将优惠券信息保存到Redis中,更改VoucherServiceImpl

1
2
3
4
5
6
7
8
9
10
11
12
  @Override
@Transactional
public void addSeckillVoucher(Voucher voucher) {
// 保存优惠券到DB里
save(voucher);
// 保存秒杀信息到DB里
SeckillVoucher seckillVoucher = new SeckillVoucher();
...
seckillVoucherService.save(seckillVoucher);
// 保存秒杀库存到Redis中,普通的String结构,不用设置超时时间
stringRedisTemplate.opsForValue().set(SECKILL_STOCK_KEY + voucher.getId(), voucher.getStock().toString());
}

测试还是使用postman提交添加秒杀券请求,然后去redis里观察是否添加到了redis里,这里我们新添加一个优惠券,内容相同(图里是100的库存,后面为了统一改成200了)。

image-20220629171703663

第二步基于Lua脚本,判断秒杀库存、一人一单,决定用户是否抢购成功,逻辑上不难,就是熟悉这门脚本语言的问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
-- 1.参数列表
-- 1.1.优惠券id
local voucherId = ARGV[1]
-- 1.2.用户id
local userId = ARGV[2]

-- 2.数据key
-- 2.1.库存key,lua里拼接使用两点,不是用加号
local stockKey = 'seckill:stock:' .. voucherId
-- 2.2.订单key
local orderKey = 'seckill:order:' .. voucherId

-- 3.脚本业务
-- 3.1.判断库存是否充足 get stockKey
-- redis.call出来的是字符,所以先tonumber转成数字再比较
if(tonumber(redis.call('get', stockKey)) <= 0) then
-- 3.2.库存不足,返回1
return 1
end
-- 3.2.判断用户是否下单 SISMEMBER orderKey userId
-- SISMEMBER是redis命令,判断是否是一个集合的成员
if(redis.call('sismember', orderKey, userId) == 1) then
-- 3.3.存在,说明是重复下单,返回2
return 2
end
-- 3.4.扣库存 incrby stockKey -1
redis.call('incrby', stockKey, -1)
-- 3.5.下单(保存用户)sadd orderKey userId
redis.call('sadd', orderKey, userId)

return 0

然后更新下单业务的代码,seckillVoucher。注意redis里用到了两个id ,一个是查询秒杀券id,一个是放了 下过单的用户id,订单id 可没有放到redis里,暂时是直接返回给前端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Override
public Result seckillVoucher(Long voucherId) {
Long userId = UserHolder.getUser().getId();

// 1.执行lua脚本,使用的是stringRedisTemplate.execute()方法
Long result = stringRedisTemplate.execute(
SECKILL_SCRIPT,
Collections.emptyList(), // 这里不用传redis的key,所以传一个空列表,不能传null
voucherId.toString(), userId.toString() // 这里传其他参数(即非key的参数),以字符串形式
);
int r = result.intValue(); // 转成int型

// 2.判断结果是否为0
if (r != 0) {
// 2.1.不为0,代表没有购买资格
return Result.fail(r == 1 ? "库存不足" : "不能重复下单");
}
// 2.2.为0,返回订单id,全局ID生成器
long orderId = redisIdWorker.nextId("order");

// 3.返回订单id
return Result.ok(orderId);
}

随后就可以进行压测,这个下单其实可以描述为“先抢单”,即只判断库存和一人一单,没问题就返回 用户id 和 秒杀券id,至于 用户id 和 秒杀券id 还要关联很多其他东西,是放在阻塞队列里慢慢去DB里创建,并不影响现在下单这个业务的性能,所以现在就可以进行测试

还是用1000个用户模拟下单,库存也是200,注意这次测试的秒杀券id是12,jmx文件里要同步修改,测试结果如下,平均响应时间到了 184 ms,吞吐量1972,相比优化前有不小的提升。其他测试结果就是redis里seckill:stock:12的库存量减为了0,seckill:order:12是下了订单的用户集合,里面是200个用户id,没有截图

优化后压测结果

3. 基于阻塞队列实现异步秒杀

需求:

①新增秒杀优惠券的同时,将优惠券信息保存到Redis中

②基于Lua脚本,判断秒杀库存、一人一单,决定用户是否抢购成功

③如果抢购成功,将 优惠券id 、 用户id 、订单id 封装后存入阻塞队列

④开启线程任务,不断从阻塞队列中获取信息,实现异步下单功能

前两个需求已经完成,现在完成后两个异步下单的过程。

第三步:如果抢购成功,将 优惠券id 、 用户id 、订单id 封装后存入阻塞队列

还是更新 seckillVoucher 的代码,加一个 封装后存入阻塞队列 的环节

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// 创建简单的阻塞队列
private BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>(1024 * 1024);

...

@Override
public Result seckillVoucher(Long voucherId) {
Long userId = UserHolder.getUser().getId();

// 1.执行lua脚本,使用的是stringRedisTemplate.execute()方法
Long result = stringRedisTemplate.execute(
SECKILL_SCRIPT,
Collections.emptyList(), // 这里不用传redis的key,所以传一个空列表,不能传null
voucherId.toString(), userId.toString() // 这里传其他参数(即非key的参数),以字符串形式
);
int r = result.intValue(); // 转成int型

// 2.判断结果是否为0
if (r != 0) {
// 2.1.不为0 ,代表没有购买资格
return Result.fail(r == 1 ? "库存不足" : "不能重复下单");
}
// 2.2.为0,有购买资格,把下单信息保存到阻塞队列
VoucherOrder voucherOrder = new VoucherOrder();
// 2.3.订单id,全局ID生成器
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
// 2.4.用户id
voucherOrder.setUserId(userId);
// 2.5.代金券id
voucherOrder.setVoucherId(voucherId);
// 2.6.放入阻塞队列
orderTasks.add(voucherOrder);

// 3.返回订单id
return Result.ok(orderId);
}

第四步:开启线程任务,不断从阻塞队列中获取信息,实现异步下单功能

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// 创建线程池,这里只创建了一个单线程池
private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();

// 单独线程应该一开始就执行任务,不断去阻塞队列里取。所以这里是写成类一初始化结束,就开启线程任务
// @PostConstruct注解是在当前类初始化完毕之后,立刻执行该方法
@PostConstruct
private void init() {
SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
}

// 创建单独线程要执行的任务
private class VoucherOrderHandler implements Runnable{

@Override
public void run() {
while (true){
try {
// 1.获取队列中的订单信息
// take方法表示获取阻塞队列里的内容,没有则等待,所以不用担心while(true)会很占用资源
VoucherOrder voucherOrder = orderTasks.take();
// 2.创建订单
createVoucherOrder(voucherOrder);
} catch (Exception e) {
log.error("处理订单异常", e);
}
}
}
}

创建秒杀订单的方法,createVoucherOrder就得做出更新,其接受参数应改为 VoucherOrder,而不是之前的 voucherId,因为要从 VoucherOrder 中获取各种信息

后面就不用阻塞队列了,学完消息队列使用消息队列

九、redis消息队列

1. 理论知识

基于阻塞队列的异步秒杀存在哪些问题?

  • 内存限制问题:阻塞队列是基于jdk的,受JVM内存限制,阻塞队列给的初始容量满了就没办法了

  • 数据安全问题:

    • redis服务宕机,内存里的数据丢失了;
    • 任务取出之后队列里就没有了,万一此时线程执行出现了异常,该任务执行失败,也没办法恢复

解决的方法是使用消息队列:

消息队列Message Queue),字面意思就是存放消息的队列。最简单的消息队列模型包括3个角色:

  • 消息队列:存储和管理消息,也被称为消息代理(Message Broker)
  • 生产者:发送消息到消息队列
  • 消费者:从消息队列获取消息并处理消息

image-20220630102814729

一般我们不用redis作为消息队列,而是用kafka等第三方组件,但这里也是稍微过一下,知道redis能够作为消息队列使用

Redis提供了三种不同的方式来实现消息队列:

1.1 基于List实现消息队列

Redis的list数据结构是一个双向链表,很容易模拟出队列效果。队列是入口和出口不在一边,因此我们可以利用:LPUSH 结合 RPOP、或者 RPUSH 结合 LPOP来实现。不过要注意的是,当队列中没有消息时RPOP或LPOP操作会返回null,并不像JVM的阻塞队列那样会阻塞并等待消息。因此这里应该使用BRPOP或者BLPOP来实现阻塞效果。

缺点:无法避免消息丢失(前面提到的数据安全第二点)、只支持单消费者

1.2 基于PubSub的消息队列

PubSub(发布订阅)是Redis2.0版本引入的消息传递模型。顾名思义,消费者可以订阅一个或多个channel,生产者向对应channel发送消息后,所有订阅者都能收到相关消息。即支持多生产、多消费。相关的命令为:

  • SUBSCRIBE channel [channel] :订阅一个或多个频道
  • PSUBSCRIBE pattern[pattern] :订阅与pattern格式匹配的所有频道
  • PUBLISH channel msg :向一个频道发送消息

缺点:不支持数据持久化、无法避免消息丢失、消息堆积有上限,超出时数据丢失。(还不如list)

1.3 基于Stream的消息队列-单消费者

Stream 是 Redis 5.0 引入的一种新数据类型,可以实现一个功能非常完善的消息队列。

(1)发送消息的命令:XADD,一般都建议默认自动创建队列,默认使用redis自动生成消息的 id,消息添加成功后会返回消息的id

image-20220705105451855

1
2
自动创建名为users的消息队列,并发送一个消息,内容是:{name=jack,age=21},并且使用redis自动生成ID
XADD users * name jack age 21

(2)读取消息的方式之一:XREAD

image-20220705110503609

1
2
3
4
使用XREAD读取第一个消息:
XREAD COUNT 1 STREAMS users 0
XREAD阻塞方式,阻塞1000ms,读取最新的消息:
XREAD COUNT 1 BLOCK 1000 STREAMS users $

注意:当我们指定起始ID为$时,代表读取最新的消息,如果我们处理一条消息的过程中,又有超过1条以上的消息到达队列,则下次获取时也只能获取到最新的一条,会出现漏读消息的问题

STREAM类型消息队列的XREAD命令特点:消息可回溯(消息一直存在队列里,不删除)、一个消息可以被多个消费者读取、可以阻塞读取

缺点:有消息漏读的风险

1.4 基于Stream的消息队列-消费者组

消费者组(Consumer Group):将多个消费者划分到一个组中,监听同一个队列。具备下列特点:

  • 消息分流:队列中的消息会分流给组内的不同消费者,而不是重复消费,从而加快消息处理的速度。一定程度上避免了消息堆积的问题,如果我们就是想让多个消费者同时消费,那么就创建多个消费者组
  • 消息标示:消费者组会维护一个标示,记录最后一个被处理的消息,哪怕消费者宕机重启,还会从标示之后读取消息。确保每一个消息都会被消费。避免了前文单消费者模式下消息漏读的风险
  • 消息确认:消费者获取消息后,消息处于pending状态,并存入一个pending-list。当处理完成后需要通过XACK来确认消息,标记消息为已处理,才会从pending-list移除。确保消息都被处理,解决消息丢失的问题。

由此可见基于消费者组的消息队列,功能更加强大,基本符合我们的需求。常用命令有:

(1)创建消费者组:

1
2
3
4
XGROUP CREATE key groupName ID [MKSTREAM]

示例:给消息队列s1创建一个消费者组g1,从第1个消息开始
XGROUP CREATE s1 g1 0
  • key:队列名称
  • groupName:消费者组名称
  • ID:起始ID标示,$代表队列中最后一个消息,0则代表队列中第一个消息。取决于你想不想要队列里原来的消息
  • MKSTREAM:队列不存在时自动创建队列

(2)从消费者组读取消息:

1
2
3
4
5
6
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]

示例:g1消费者组里的c1消费者,从消息队列s1里,阻塞2s读取1条消息
XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 >
示例:g1消费者组里的c2消费者,从消息队列s1里,阻塞2s读取1条消息,读取的是已消费但未确认的消息
XREADGROUP GROUP g1 c2 COUNT 1 BLOCK 2000 STREAMS s1 0
  • group:消费组名称
  • consumer:消费者名称,如果消费者不存在,会自动创建一个消费者
  • count:本次查询的最大数量
  • BLOCK milliseconds:当没有消息时最长等待时间
  • NOACK:无需手动ACK,获取到消息后自动确认,不建议设置,我们需要消息确认
  • STREAMS key:指定队列名称
  • ID:获取消息的起始ID:
    • “>”:从下一个未消费的消息开始(不是从最新的消息,和之前单消费者XREAD中的$不同了)
    • 其它:根据指定id从pending-list中获取已消费但未确认的消息,例如0,是从pending-list中的第一个消息开始

(3)确认消息已读

1
2
3
4
XACK key group ID

示例:g1消费者组确认已读s1消息队列中的2条消息
XACK s1 g1 1646339018049-0 1646339342815-0
  • key:队列名称
  • group:消费者组名称
  • ID:消息ID

(4)查看pending-list

1
2
3
4
XPENDING key group start end count

示例:从消息队列s1的g1消费者组里读取所有的10条信息
XPENDING s1 g1 - + 10
  • key:队列名称
  • group:消费者组名称
  • start:最小的id,无穷小则是-
  • end:最大的id,无穷大则是+
  • count:想查看几条

(5)删除指定的消费者组

1
XGROUP DESTROY key groupName

java消费者监听消息的基本思路:(伪代码)

死循环在正常情况下读取下一个未消费的消息,出现异常(即消息处理完之后没有返回ACK确认)那就再来一个死循环去处理pending-list中已消费但未确认的消息,处理完了再回到正常流程的死循环

image-20220705114036683

STREAM类型消息队列的XREADGROUP命令特点:消息可回溯;可以多消费者争抢消息,加快消费速度;可以阻塞读取;没有消息漏读的风险;有消息确认机制;保证消息至少被消费一次

缺点:STREAM类型消息队列持久化是基于redis持久化的,不能保证万无一失,有丢失风险;不支持生产者消息确认,如果生产者生成消息过程中宕机,无法解决;多消费者下的消息有序性,即事务管理。

如果对于消息队列业务要求没那么严格,STREAM类型消息队列就够用了,但如果要求很严格,STREAM类型消息队列还是存在上述的一些问题,就需要更加专业的第三方消息中间件

2. 基于消息队列实现异步秒杀

(1)创建一个Stream类型的消息队列,名为stream.orders,这就不用Java代码了,直接去redis控制台创建

1
xgroup create stream.orders g1 0 mkstream

(2)修改之前的秒杀下单Lua脚本,在认定有抢购资格后,直接向stream.orders中添加消息,内容包含voucherId、userId、orderId

1
2
3
4
5
6
-- 1.3.订单id
local orderId = ARGV[3]

-- 3.6.发送消息到队列中, XADD stream.orders * k1 v1 k2 v2 ...
-- 这里 orderId 的 key 建议直接叫id,符合 voucherOrder实体里面的字段
redis.call('xadd', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId)

再更新抢单的业务逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Override
public Result seckillVoucher(Long voucherId) {
Long userId = UserHolder.getUser().getId();
long orderId = redisIdWorker.nextId("order");
// 1.执行lua脚本,使用的是stringRedisTemplate.execute()方法
Long result = stringRedisTemplate.execute(
SECKILL_SCRIPT,
Collections.emptyList(), // 这里不用传redis的key,所以传一个空列表,不能传null
voucherId.toString(), userId.toString(), String.valueOf(orderId) // 这里传其他参数(即非key的参数),以字符串形式
);
int r = result.intValue(); // 转成int型
// 2.判断结果是否为0
if (r != 0) {
// 2.1.不为0 ,代表没有购买资格
return Result.fail(r == 1 ? "库存不足" : "不能重复下单");
}
// 3.返回订单id
return Result.ok(orderId);
}

(3)项目启动时,开启一个线程任务,尝试获取stream.orders中的消息,完成下单

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
// 创建线程池,这里只创建了一个单线程池
private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();

// 单独线程应该一开始就执行任务,不断去消息队列里取。所以这里是写成类一初始化结束,就开启线程任务
// @PostConstruct注解是在当前类初始化完毕之后,立刻执行该方法
@PostConstruct
private void init() {
SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
}


// 创建单独线程要执行的任务
private class VoucherOrderHandler implements Runnable{

@Override
public void run() {
while (true) {
try {
// 1.获取消息队列中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS stream.orders >
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"), // 消费者组里的哪个消费者
StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)), // 读1条消息,阻塞2s
StreamOffset.create("stream.orders", ReadOffset.lastConsumed()) // 指定消息队列,以及是读取最新一条未处理的
);
// 2.判断消息获取是否成功,即订单信息是否为空
if (list == null || list.isEmpty()) {
// 如果为null,说明没有消息,继续下一次循环
continue;
}
// 解析消息中的订单数据
MapRecord<String, Object, Object> record = list.get(0); // 第一个string是消息id,后面两个是消息的内容,即lua脚本里写的键值对
Map<Object, Object> value = record.getValue();
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
// 3.创建订单
createVoucherOrder(voucherOrder);
// 4.确认消息 XACK stream.orders g1 id
stringRedisTemplate.opsForStream().acknowledge("s1", "g1", record.getId());
} catch (Exception e) {
log.error("处理订单异常", e);
// 5.死循环在正常情况下读取下一个未消费的消息,出现异常(即消息处理完之后没有返回ACK确认)那就再来一个死循环
// 去处理pending-list中已消费但未确认的消息,处理完了再回到正常流程的死循环
handlePendingList();
}
}
}

// 流程和上面类似
private void handlePendingList() {
while (true) {
try {
// 1.获取pending-list中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 STREAMS s1 0
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1), // 读 PendingList 不用阻塞
StreamOffset.create("stream.orders", ReadOffset.from("0")) // 读 PendingList 一定读已读未完成的消息
);
// 2.判断订单信息是否为空
if (list == null || list.isEmpty()) {
// 如果为null,说明没有异常消息,结束循环
break;
}
// 解析数据
MapRecord<String, Object, Object> record = list.get(0);
Map<Object, Object> value = record.getValue();
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
// 3.创建订单
createVoucherOrder(voucherOrder);
// 4.确认消息 XACK
stringRedisTemplate.opsForStream().acknowledge("s1", "g1", record.getId());
} catch (Exception e) {
log.error("处理PendingList订单异常", e);
}
}
}
}

随后即可进行测试,谨记抢购的优惠券id是12,11因为没往redis里存,是抢购不了的。先进行单人下单,看看逻辑有没有问题,测试结果显示没有问题,redis库存-1,订单集合+1,DB同理。同时,redis的消息队列 stream.orders 里多了 一条消息

image-20220705152334255

随后可以恢复库存,模拟高并发多人抢单,测试结果显示没有超卖,也保证了一人一单,至于平均响应时间和吞吐量,多次测试结果比较波动,但肯定比最开始的500多ms要好很多

后续这里自己用第三方消息中间件再处理一下


Redis:04.实战应用三:优惠券秒杀
http://jswanyu.github.io/2022/06/20/Redis/Redis:04.实战应用三:优惠券秒杀/
作者
万宇
发布于
2022年6月20日
许可协议