秒杀业务是电商中必不可少的环节,在本项目中把优惠券作为唯一的商品用来交易,在整个秒杀业务中可以学习到跟Redis相关的运用
一、全局唯一ID 每个店铺都可以发布优惠券,当用户抢购时,就会生成订单并保存到 tb_voucher_order 这张表中,而订单表如果使用数据库自增ID就存在一些问题:
全局ID生成器,是一种在分布式系统下用来生成全局唯一ID的工具,一般要满足下列特性:唯一性、高可用(随时都可以生成)、高性能、递增性(整体上还是呈现递增趋势,提高索引效率)、安全性(不能规律性太明显)。Redis是符合这些特性要求的选择之一,其中安全性稍微复杂
为了增加ID的安全性,我们可以不直接使用Redis自增的数值,而是拼接一些其它信息:
ID的组成部分:
符号位:1bit,永远为0
时间戳:31bit,以秒为单位,可以使用69年
序列号:32bit,秒内的计数器,支持每秒产生2^32个不同ID
其中序列号的组成又比较复杂,首先能想到的是用"icr:" + keyPrefix
,icr:只是一个标识,表明这是全局自增id,keyPrefix 是业务前缀,订单业务有订单业务的全局自增id,其他业务也有其他的全局自增id
但如果只用这个是不行的,因为这样业务只有一个Key,随着时间的增长,ID会越来越多,而我们的序列号只有32位,但是订单量可能会超过2的32次方。所以为了避免这种情况,我们可以在后面拼接上一个日期,以天为单位,这样每天就能有不同的key。比如:icr:order:2022:01:01
就表示2022年1月1号这一天的key。 Key里面的每一个value是自增长的。注意这里不要弄混,redis里面只保存自增长之后的值,然后给他返回,并不是把每个生成的ID都保存在redis里面,而是保存到数据库里。
其具体代码为
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 @Component public class RedisIdWorker { private static final long BEGIN_TIMESTAMP = 1640995200L ; private static final int COUNT_BITS = 32 ; @Autowired private StringRedisTemplate stringRedisTemplate; public long nextId (String keyPrefix) { LocalDateTime now = LocalDateTime.now(); long nowSecond = now.toEpochSecond(ZoneOffset.UTC); long timestamp = nowSecond - BEGIN_TIMESTAMP; String date = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd" )); long count = stringRedisTemplate.opsForValue().increment("icr:" + keyPrefix + ":" + date); return timestamp << COUNT_BITS | count; } }
其他一些全局唯一ID生成策略:
UUID(生成的是16进制的数值,返回结果是字符串结构,也不是单调递增的,不太友好)
snowflake算法(雪花算法,不依赖于redis,也是一种不错的策略)
数据库自增(不是自增id了,而是单独用一张表维护id。这张表的主属性id是自增的,然后去维护订单表id,性能可能没redis好)
二、添加优惠券 每个店铺都可以发布优惠券,分为平价券和特价券。平价券可以任意购买,而特价券需要秒杀抢购:
表关系如下:
正因为秒杀券是优惠券的扩展表,所以优惠券实体在设计的时候,把秒杀券对应的信息也封装到了实体里面,包含了库存、生效时间、创建时间等。具体的可以去实体类Voucher
里查询。
**新增普通卷代码: **VoucherController
1 2 3 4 5 @PostMapping public Result addVoucher (@RequestBody Voucher voucher) { voucherService.save(voucher); return Result.ok(voucher.getId()); }
新增秒杀卷代码:
VoucherController
1 2 3 4 5 @PostMapping("seckill") public Result addSeckillVoucher (@RequestBody Voucher voucher) { voucherService.addSeckillVoucher(voucher); return Result.ok(voucher.getId()); }
VoucherServiceImpl
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 @Override @Transactional public void addSeckillVoucher (Voucher voucher) { save(voucher); SeckillVoucher seckillVoucher = new SeckillVoucher (); seckillVoucher.setVoucherId(voucher.getId()); seckillVoucher.setStock(voucher.getStock()); seckillVoucher.setBeginTime(voucher.getBeginTime()); seckillVoucher.setEndTime(voucher.getEndTime()); seckillVoucherService.save(seckillVoucher); stringRedisTemplate.opsForValue().set(SECKILL_STOCK_KEY + voucher.getId(), voucher.getStock().toString()); }
由于前台没有管理页面,所以我们这里使用 postman 去提交优惠券的信息,其对应的方法为VoucherController
下的addSeckillVoucher
,提交过程如下图所示
其中的JSON字符串为:
1 2 3 4 5 6 7 8 9 10 11 12 { "shopId" : 1 , "title" : "100元代金券" , "subTitle" : "周一到周五均可使用" , "rules" : "全场通用\\n无需预约\\n可无限叠加\\n不兑现,不找零\\n仅限堂食" , "payValue" : 8000 , "actualValue" : 10000 , "type" : 1 , "stock" : 100 , "beginTime" : "2022-01-25T12:00:00" , "endTime" : "2022-01-26T12:00:00" }
需要注意的是截止时间不要晚于当前的系统时间,否则商店信息里面显示不出来优惠券信息
发送完之后去数据库里查询优惠券表和秒杀券表,查看对应信息。
三、实现优惠券下单 订单表:tb_voucher_order,具体的字段如下所示
优惠券信息添加完毕后,我们就可以实现基本的优惠券下单功能,下单时需要判断两点:
秒杀时间是否合理,如果尚未开始或已经结束则无法下单
库存是否充足,不足则无法下单
当用户开始进行下单,我们应当去查询优惠卷信息,查询到优惠卷信息,判断是否满足秒杀条件
比如时间是否充足,如果时间充足,则进一步判断库存是否足够,如果两者都满足,则扣减库存,创建订单,然后返回订单id,如果有一个条件不满足则直接结束。其业务逻辑如下图所示
代码为:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 @Service public class VoucherOrderServiceImpl extends ServiceImpl <VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService { @Resource private ISeckillVoucherService seckillVoucherService; @Resource private RedisIdWorker redisIdWorker; @Override @Transactional public Result seckillVoucher (Long voucherId) { SeckillVoucher voucher = seckillVoucherService.getById(voucherId); if (voucher.getBeginTime().isAfter(LocalDateTime.now())) { return Result.fail("秒杀尚未开始!" ); } if (voucher.getEndTime().isBefore(LocalDateTime.now())) { return Result.fail("秒杀已经结束!" ); } if (voucher.getStock() < 1 ) { return Result.fail("库存不足!" ); } boolean success = seckillVoucherService.update() .setSql("stock = stock - 1" ) .eq("voucher_id" , voucherId) .update(); if (!success) { return Result.fail("库存不足!" ); } VoucherOrder voucherOrder = new VoucherOrder (); long orderId = redisIdWorker.nextId("order" ); voucherOrder.setId(orderId); Long userId = UserHolder.getUser().getId(); voucherOrder.setUserId(userId); voucherOrder.setVoucherId(voucherId); save(voucherOrder); return Result.ok(orderId); } }
整体上的业务逻辑比较简单,没有很复杂的地方,其中涉及到了多张表的调用,需要使用事务进行控制。编写完成之后进行业务测试,在前端页面登录成功之后,点击抢购按钮,会返回订单号。再去优惠券订单表里查询信息,会发现有一条新增的购买记录。这样的话功能就没有问题
四、超卖问题 在第2节中实现的内容是基本的优惠券下单,我们测试的方法是在前端的页面上点击抢购,但在真实的秒杀业务场景里,肯定会有成千上万的用户同时抢购会造成极高的并发量。
这里使用 JMeter 模拟高并发场景(现在只模拟一个用户使用多个线程下单,这里还没做一人一单的限制),这里为了方便测试,我们将上一节的优惠券订单信息删除,并把秒杀券的库存从99改为100,然后用200个线程 同时抢购,观察最后的结果。使用 JMeter 测试时有几个注意事项,务必注意!
jmx文件可以用黑马提供的,注意把其中的HTTP请求里的登录状态头改为自己在redis里面的登陆凭证,为了方便调试,也建议在此处把登录凭证的有效期改为永久有效
http请求的路径最后优惠券id改成自己DB里的,视频里是7,自己的不一定是7,我这里是11
每次新测试之前,都要点击工具栏里的清除数据,否则会对测试有干扰,这点很重要!!!容易忽略
这里的测试结果里的测试结果显示,数据库里优惠券的库存显示为-9,再去订单表里查询发现有109条订单,这就是超卖问题
这里的测试结果显示,数据库的优惠券的库存显示为负九。再去订单表里查询,发现有109条订单。这就是超卖问题。
超卖问题是由于多线程并发导致的,假设现在库存为1,有一个线程想查询库存,查询到为1想要对库存进行扣减,在扣减完成之前又有其他线程进来查询发现库存为1,随后也进行扣减库存的操作。这样就会导致多线程的并发问题。
针对这一问题的常见解决方案就是加锁:
悲观锁 :认为线程安全问题一定会发生,因此在操作数据之前先获取锁,确保线程串行执行,因此性能很差。例如Synchronized、Lock都属于悲观锁。由于悲观锁的代码比较常见,所以就不做过多的演示。
乐观锁 :认为线程安全问题不一定会发生,因此不加锁,只是在更新数据时去判断有没有其它线程对数据做了修改。性能较好
如果没有修改则认为是安全的,自己才更新数据。
如果已经被其它线程修改说明发生了安全问题,此时可以重试或异常。
乐观锁最常见的就是CAS操作,即先比较一开始查询到的库存和要更新数据库之前查询到的库存是否相同?如果相同则进行更新,如果不同则不更新数据库。
1 2 3 boolean success = seckillVoucherService.update() .setSql("stock= stock -1" ) .eq("voucher_id" , voucherId).eq("stock" ,voucher.getStock()).update();
不过cas操作无法解决ABA的问题,我们可以添加版本号来解决,这里就不过多详细介绍。由于是库存的应用场景,到目前为止没有出现退款的业务,所以不会出现ABA的问题,这里就只演示cas操作,不添加版本号
这里还存在一个问题,就是如果严格按照比对数据库的值是否一致,cas的成功率会很低。假设100个线程同时来查询,按照cas的思想,只会有一个线程能成功,其他99个请求都失败。视频里也演示了多线程模拟的场景,发现只卖出去21单。因此这里不用严格的比对数据库前后查询到的值是否一致,而是只要在更新数据库操作之前,查到的库存值大于0就可以下订单并对数据库更新,小于0则不能下订单,这样就能够解决超卖问题。细节如下图所示,一开始库存为1
对代码做出如下修改
1 2 3 4 5 6 7 8 9 boolean success = seckillVoucherService.update() .setSql("stock = stock - 1" ) .eq("voucher_id" , voucherId).gt("stock" , 0 ) .update();if (!success) { return Result.fail("库存不足!" ); }
代码修改完毕之后可以对代码进行测试,测试之前记得将数据库的订单全部删除,将库存重新改为100,然后使用JMeter 工具进行测试。测试结果显示,这次的模拟没有任何问题,库存减为0并且也订单数也为100,多线程模拟的200个请求中,另外100个请求均为失败。
五、一人一单 1. 一人一单的并发安全问题 之前的模拟存在一个很大的问题,200个线程是同一个用户进行下单,这就导致了数据库里100个订单的用户是同一个人,但在实际业务中,这是不被允许的,我们要确保每一个用户只能对一张秒杀券下一次单,所以本节需要对业务代码进行改写。
实现一人一单的业务逻辑其实非常简单,只需要在判断库存是否充足之后,根据用户券ID和用户ID去查询订单,如果这个订单存在,那么就返回异常结果,提示用户无法再下单。如果不存在,才进行原来的扣减库存和创建订单操作。其业务逻辑如下图所示
其修改后的代码为:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 @Override @Transactional public Result seckillVoucher (Long voucherId) { SeckillVoucher voucher = seckillVoucherService.getById(voucherId); if (voucher.getBeginTime().isAfter(LocalDateTime.now())) { return Result.fail("秒杀尚未开始!" ); } if (voucher.getEndTime().isBefore(LocalDateTime.now())) { return Result.fail("秒杀已经结束!" ); } if (voucher.getStock() < 1 ) { return Result.fail("库存不足!" ); } Long userId = UserHolder.getUser().getId(); int count = query().eq("user_id" , userId).eq("voucher_id" , voucherId).count(); if (count > 0 ) { return Result.fail("用户已经购买过一次!" ); } boolean success = seckillVoucherService.update() .setSql("stock = stock - 1" ) .eq("voucher_id" , voucherId).gt("stock" , 0 ) .update(); if (!success) { return Result.fail("库存不足!" ); } VoucherOrder voucherOrder = new VoucherOrder (); long orderId = redisIdWorker.nextId("order" ); voucherOrder.setId(orderId); voucherOrder.setUserId(userId); voucherOrder.setVoucherId(voucherId); save(voucherOrder); return Result.ok(orderId); }
现在进行多线程并发测试,在测试前记得把数据库的订单全部删除,再把库存恢复到100,随后测试结果显示库存为90,订单里有10行数据 ,说明还是存在问题,按理说一个人只能下一个订单,结果下了10个订单,说明还是存在着多线程的并发安全问题。
这里还是和之前超卖问题产生的原因一样,都是先进行查询操作再去更新数据库,现在假设库存为100,没有出售任何优惠券,那么假设有多个线程来查询订单,查到的订单数为0,然后多个线程进行更新数据库的操作,就出现了多线程的并发安全问题。
线程运行逻辑图–正常情况与异常情况:
2. 并发问题解决–同步方法 这里之前也提到过,用悲观锁或者乐观锁的方法来解决,之前我们是使用CAS的操作来解决,但这里没有办法使用CAS的操作,因为CAS只能在更新数据时使用,一开始查一次,更新之前查一次,对比并设置值。这里是在插入数据,是为每个用户新增一个订单,只能判断是否存在,没有办法去查询原来的值,再比较更新。所以这里我们只能使用悲观锁的方式。即乐观锁比较适合更新数据,而现在是插入数据,所以我们需要使用悲观锁操作
加锁后线程运行逻辑图:
接下来就是决定悲观锁的范围,这里我们可以把一人一单,扣减库存和创建订单三个部分,单独封装到一个方法里,然后给方法加上 synchronized 与 事务管理注解, 前面的一些查询操作不用加入同步代码快和事务管理注解
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 @Service public class VoucherOrderServiceImpl extends ServiceImpl <VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService { @Resource private ISeckillVoucherService seckillVoucherService; @Resource private RedisIdWorker redisIdWorker; @Override public Result seckillVoucher (Long voucherId) { SeckillVoucher voucher = seckillVoucherService.getById(voucherId); if (voucher.getBeginTime().isAfter(LocalDateTime.now())) { return Result.fail("秒杀尚未开始!" ); } if (voucher.getEndTime().isBefore(LocalDateTime.now())) { return Result.fail("秒杀已经结束!" ); } if (voucher.getStock() < 1 ) { return Result.fail("库存不足!" ); } return createVoucherOrder(voucherId); } @Transactional public synchronized Result createVoucherOrder (Long voucherId) { Long userId = UserHolder.getUser().getId(); int count = query().eq("user_id" , userId).eq("voucher_id" , voucherId).count(); if (count > 0 ) { return Result.fail("用户已经购买过一次!" ); } boolean success = seckillVoucherService.update() .setSql("stock = stock - 1" ) .eq("voucher_id" , voucherId).gt("stock" , 0 ) .update(); if (!success) { return Result.fail("库存不足!" ); } VoucherOrder voucherOrder = new VoucherOrder (); long orderId = redisIdWorker.nextId("order" ); voucherOrder.setId(orderId); voucherOrder.setUserId(userId); voucherOrder.setVoucherId(voucherId); save(voucherOrder); return Result.ok(orderId); } }
这样处理之后,再次模拟多线程并发测试,会发现库存只减了1,只有一个优惠券订单生成,达到我们的目的。测试结果如下图:
但是这样的写法有一个非常影响性能的地方就是在方法上加锁,即业务实现类的 this 被加锁了,有多个用户下订单,只有这个用户处理完了才能处理其他用户,会造成串行化执行,不能并发处理,大大降低性能。比如ABC等200人发了请求,只能等A处理完了,才处理其他人的请求
总结:在使用锁过程中,控制 锁粒度 是一个非常重要的事情,因为如果锁的粒度太大,会导致每个线程进来都会锁住,所以我们需要去控制锁的粒度。这里引出的知识点是:使用 synchronized 时,更推荐使用同步方法块,而不是使用同步方法
3. 并发问题解决–同步方法块 这里我们其实可以对每个用户进行加锁,因为每个用户的ID是不同的,对这个用户加锁不影响其他的线程里其他的用户下单,这样能够真正的实现并发处理。
后面会考虑如何对用户ID加锁,刚开始写的代码是如下所示,只对传过来的 userId 对象加锁,测试的结果是并没有实现一人一单,一个用户还是加了100单,让库存减为0 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 @Transactional public Result createVoucherOrder (Long voucherId) { Long userId = UserHolder.getUser().getId(); synchronized (userId) { } }
后来分析是因为相同用户的多个请求传过来的 userId 值虽然是一样的,但是每次请求获得的 userId 都是不同的对象 ,这就导致其实没有加锁成功,让100个对象下了100单。
这里就可以提到另外一个知识是:局部变量是线程安全的,但局部变量的引用不一定线程安全。
放在此处应该如何正确理解?现在还是只有一个用户,使用200个线程前来下单,每个线程里都生成了 userId 这样一个局部变量对象,由于库存100,所以还是下了100单。
所以加锁效果没达成,因为我们是希望一人一单的。希望的效果更类似于局部变量的引用不一定线程安全
,比如学习多线程时的一张图。即让多个线程都去访问那一个对象的值,判断那个对象的值在DB里是否已经存在了,不过要注意我们是希望在线程安全的环境下,去访问那一个对象的值。(举这个例子不一定恰当,希望我复习时能回味到表达的意思)
所以我们不应该对 userId 对象进行加锁,应该对 userId 的值进行加锁,可以将其转为字符串,调用 toString()
方法
1 2 3 4 5 6 7 8 @Transactional public Result createVoucherOrder (Long voucherId) { Long userId = UserHolder.getUser().getId(); synchronized (userId.toString()) { ...下面没变 } }
这样就行了吗?通过测试我们发现结果还是和之前的一样,即100个订单,库存降为0,这里面还有一个坑就是 toString()
方法其实并没有转成唯一的字符串,因为 toString()
方法的底层也还是 new 了字符串对象,也就是说我们这里依然是给100个对象加锁,而不是给对象的固定值加锁
1 2 3 4 5 6 7 8 9 10 11 12 public static String toString (long i) { int size = stringSize(i); if (COMPACT_STRINGS) { byte [] buf = new byte [size]; getChars(i, size, buf); return new String (buf, LATIN1); } else { byte [] buf = new byte [size * 2 ]; StringUTF16.getChars(i, size, buf); return new String (buf, UTF16); } }
所以要调用字符串的intern()
方法,即让字符串入池,这个方法调用之后,new 的100个字符串对象,通过 intern()
方法返回的对象,永远是字符串池里的那个唯一的对象
1 2 3 4 5 6 7 8 @Transactional public Result createVoucherOrder (Long voucherId) { Long userId = UserHolder.getUser().getId(); synchronized (userId.toString().intern()) { ...下面没变 } }
完成这些之后再去测试,会发现库存只减1,并只创建1个订单,这样才实现了一人一单的业务需求。到这里表面上关于多线程实现一人一单并发问题已经实现了,注意体会一开始一人多单 –> 在整个方法上加锁 –> 同步方法块加锁 –> 给 userid 对象加锁 –> 给 userid 字符串对象加锁 –> 给 userid 字符串入池对象加锁,这样整个优化的过程
但是这样做其实还是有问题(好多问题啊!),这就要和数据库的事务进行一起探讨,如下块代码所示,目前的的同步代码块是在一个方法内部的,当这个同步方法块的语句执行完毕之后,锁就被释放了,其他线程就可以继续访问这个方法。而如果数据库的事务没有提交之前就有其他线程在此时访问了这个方法,那也会导致多线程的并发问题。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 @Override public Result seckillVoucher (Long voucherId) { ... return createVoucherOrder(voucherId); }@Transactional public Result createVoucherOrder (Long voucherId) { Long userId = UserHolder.getUser().getId(); synchronized (userId.toString().intern()) { ...下面没变 } ... }
所以我们需要对整个方法都进行加锁,但不能在方法上加 synchronized,否则又回到了之前的问题。而是要加在整个方法的调用语句外,如下代码所示
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 @Override public Result seckillVoucher (Long voucherId) { Long userId = UserHolder.getUser().getId(); synchronized (userId.toString().intern()) { return createVoucherOrder(voucherId); } } @Transactional public Result createVoucherOrder (Long voucherId) { Long userId = UserHolder.getUser().getId(); int count = query().eq("user_id" , userId).eq("voucher_id" , voucherId).count(); ... }
在调用方法完全结束之后才结束同步代码块,结束之前,事务肯定已经提交过了。
但是还有问题,涉及到Spring事务管理中事务失效的几种可能情况,这里我之前没有学过,暂时先记录一下 。情况就是:我们给createVoucherOrder
方法加了事务注解,但是seckillVoucher
没有加,那么在后者里调用前者的时候,调用的是下面这段代码,其实其中省略了一个 this ,也就是说这个this 是拿到了当前业务实现类对象VoucherOrderServiceImpl
,而不是它的代理对象。我们要知道事务要想生效,是 this拿到了事务的代理对象,如果不是代理对象,spring的事务就会失效
1 2 3 synchronized (userId.toString().intern()) { return this .createVoucherOrder(voucherId); }
如何解决呢,拿到类的代理对象即可,用如下代码
1 2 3 4 5 6 7 8 9 { ... Long userId = UserHolder.getUser().getId(); synchronized (userId.toString().intern()) { IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy(); return proxy.createVoucherOrder(voucherId); }
不过这之前要进行一些额外配置,先去加环境依赖
1 2 3 4 <dependency > <groupId > org.aspectj</groupId > <artifactId > aspectjweaver</artifactId > </dependency >
再去启动类加上一个注解和设置 exposeProxy 为 true,这才能让代理对象暴露出来,否则是无法获取代理对象的
1 2 3 4 5 6 7 8 9 10 @EnableAspectJAutoProxy(exposeProxy = true) @MapperScan("com.hmdp.mapper") @SpringBootApplication public class HmDianPingApplication { public static void main (String[] args) { SpringApplication.run(HmDianPingApplication.class, args); } }
最终的测试结果也是实现了一人一单,这里就不放图了。(有个问题:吞吐量显示还没有之前同步方法的高,不知道这里看吞吐量有没有意义,因为只有一个用户,只能下一单,视频里没有提到这个)
六、分布式锁 1. 集群下的线程并发安全问题 通过加锁可以解决在单机情况下的一人一单安全问题,但是在集群模式下就不行了。我们将服务启动两份,端口分别为8081和8082,在这里只用把原来的 configuration 复制一份即可(按 ctrl + D),然后更改命名,添加虚拟机参数。随后将两个服务启动(以debug模式启动,后面需要打断点,养成以debug模式启动的习惯 ),确认分别部署在8081和8082端口
然后修改nginx的conf目录下的nginx.conf文件,配置反向代理和负载均衡:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 server { listen 8080 ; server_name localhost; # 指定前端项目所在的位置 location / { root html/hmdp; index index.html index.htm; } error_page 500 502 503 504 /50x.html; location = /50x.html { root html; } location /api { default_type application/json; #internal; keepalive_timeout 30s; keepalive_requests 1000 ; #支持keep-alive proxy_http_version 1.1 ; rewrite /api(/.*) $1 break ; proxy_pass_request_headers on; #more_clear_input_headers Accept-Encoding; proxy_next_upstream error timeout; #proxy_pass http: proxy_pass http: } } upstream backend { server 127.0 .0 .1 :8081 max_fails=5 fail_timeout=10s weight=1 ; server 127.0 .0 .1 :8082 max_fails=5 fail_timeout=10s weight=1 ; }
修改配置文件之后,使用如下命令重载配置
重载完之后,避免之前的 nginx 进程对测试造成影响,我们将其全部杀掉
1 taskkill /IM nginx.exe / F
然后去输入请求:http://localhost:8080/api/voucher/list/1,请求两次,应该能够在两个服务上都出现查询信息,测试没有问题才能代表集群模式和负载均衡配置成功。
接下来测试是否存在线程安全问题,我们还是将数据库的订单优惠券订单全部删掉,将库存恢复到100。使用postman 进行测试:http://localhost:8080/api/voucher-order/seckill/11,两个请求都是去秒杀11号优惠券,而且是同一个用户
在发送请求之前,在代码中同步代码快里打上断点(确认服务是以 debug 的模式启动),这是为了方便测试,要不然请求很快就发送结束,达不到演示并发的效果(注意我们都是在讨论并发的场景下可能出现的问题,如果是顺序执行,哪怕是集群也没影响,我们这里打断点就是为了能够达到并发查询数据库的条件)。随后发送两个请求,会发现两个服务都运行至断点处,并且两个的用户id是相同的
后面把两个服务都放行,去数据库里查看会发现同一个用户下了两个订单,库存减为98,这也就意味着在集群模式下,我们又再次出现了多线程并发安全问题,并没有把同一个用户给锁住,从而让一个用户能够下多个订单
下面分析一下出现这个问题的原因:用了集群模式之后就会有多个 tomcat 的服务器,也就会有多个 JVM ,那么每
个 JVM 都有自己的堆、栈、方法区、常量池,所以我们之前认为把每一个用户的user ID的值给锁住,在多个JVM里面就不现实了,因为每个JVM的常量池都可以有一个这个用户的 userID 值。这就是在集群模式(分布式模式)下带来新的并发线程安全问题。
解决这个问题的技术叫分布式锁。
2. 分布式锁理论知识 分布式锁: 满足分布式系统或集群模式下多进程可见并且互斥的锁。
前文介绍的,在集群模式下出问题的原因就是因为多个jvm里面有多个锁监视器,导致了userid的值没锁住,那么我们可以尝试让多个jvm只有一个锁监视器,这样就能够把值给锁住。
那么分布式锁就得有如下要求:多进程可见、互斥、高可用、高性能、安全性
分布式锁的核心是实现多进程之间互斥,而满足这一点的方式有很多,常见的有三种:
MySQL
Redis
Zookeeper
互斥
利用mysql本身的互斥锁机制
利用setnx这样的互斥命令
利用节点的唯一性和有序性实现互斥
高可用
低
中
高
高性能
低
高
中
安全性
断开连接,自动释放锁
利用锁超时时间,到期释放
临时节点,断开连接自动释放
优点
直接借助数据库容易理解
性能好,实现起来较为方便
有效地解决单点问题,不可重入问题,非阻塞问题以及锁无法释放的问题,实现起来较为简单。
缺点
在使用关系型数据库实现分布式锁的过程中会出现各种问题,例如数据库单点问题和可重入问题,并且在解决过程中会使得整个方案越来越复杂
key的过期时间设置难以确定,如何设置的失效时间太短,方法没等执行完,锁就自动释放了,那么就会产生并发问题。如果设置的时间太长,其他获取锁的线程就可能要平白的多等一段时间。Redis的集群部署虽然能解决单点问题,但是并不是强一致性的,锁的不够健壮
性能上不如使用缓存实现分布式锁
zookeeper也是企业级开发中较好的一个实现分布式锁的方案,由于本套视频并不讲解zookeeper的原理和分布式锁的实现,所以不过多阐述。本节主要探讨 redis 作为分布式锁的方案,实现分布式锁时需要实现的两个基本方法:
(1)获取锁:
互斥:确保只能有一个线程获取锁
redis里有setnx的命令,即原来没有这个 key 才能够设置成功,如果有的话就设置不成功,并且为了避免 redis 服务宕机使得这个key一直存在,我们要给 key 设定一个超时时间,到这个时间之后需要自动删除,避免引起死锁。注意不能使用两个分开的命令,比如先SETNX lock ...
再EXPIRE lock 10
,因为服务器的宕机有可能会发生在这两条命令之间,所以我们要保证这两个操作具备原子性,即让这两个操作在一条命令中执行完成。实现起来也很简单,set命令有很多可选参数
1 2 SET lock thread1 NX EX 10
互斥锁有两种具体的形式分别为阻塞式锁和非阻塞式锁,阻塞式锁获取锁失败后会原地等待,不断重试,另一种是非阻塞式锁,尝试一次之后,如果失败就直接停止。这里我们选择非阻塞式锁,尝试一次,成功返回true,失败返回false。(因为阻塞式锁有点浪费CPU性能,实现起来也比较麻烦)
(2)释放锁:手动释放(del …)、超时自动释放。注意超时自动释放只是为了防止服务宕机的异常情况,大部分的正常的情况还是需要手动释放锁的
整个流程如下图所示:
3. 代码实现Redis分布式锁 首先定义一个接口,其中包含 获取锁和释放锁 的方法
1 2 3 4 5 6 7 8 9 10 11 12 13 public interface ILock { boolean tryLock (long timeoutSec) ; void unlock () ; }
实现类如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 public class SimpleRedisLock implements ILock { private String name; private StringRedisTemplate stringRedisTemplate; public SimpleRedisLock (String name, StringRedisTemplate stringRedisTemplate) { this .name = name; this .stringRedisTemplate = stringRedisTemplate; } private static final String KEY_PREFIX = "lock:" ; @Override public boolean tryLock (long timeoutSec) { long threadId = Thread.currentThread().getId(); Boolean success = stringRedisTemplate.opsForValue() .setIfAbsent(KEY_PREFIX + name, String.valueOf(threadId), timeoutSec, TimeUnit.SECONDS); return Boolean.TRUE.equals(success); } @Override public void unlock () { stringRedisTemplate.delete(KEY_PREFIX + name); } }
订单业务秒杀代码中需要对应修改,之前是加 synchronized,现在是自己手动创建锁、释放锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 @Override public Result seckillVoucher (Long voucherId) { Long userId = UserHolder.getUser().getId(); SimpleRedisLock lock = new SimpleRedisLock ("order:" + userId, stringRedisTemplate); boolean isLock = lock.tryLock(3600 ); if (!isLock){ return Result.fail("不允许重复下单!" ); } try { IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy(); return proxy.createVoucherOrder(voucherId); } finally { lock.unlock(); } }
随后可以进行测试,测试的过程还是跟上一节最后集群模式下并发安全问题中测试过程一样,在获取锁那里打断点,能够发现两个请求只有一个的 isLock 为true
测试的结果显示,使用redis加分布式锁之后,集群模式下,相同的用户也只能下一单
4. 分布式锁误删问题 针对于上面的应用场景,还存在着一个隐藏的问题,现在假设线程1获得了redis 锁,然后他的业务阻塞了,此时锁释放会有两种情况,第1种情况是阻塞恢复,它继续完成业务然后释放锁,第2种情况是超时然后自动的释放锁。
现在假设线程一的业务阻塞很长,也就是后一种情况。redis 锁是超时释放的,然后这个时候线程2进来了,他获得了锁,然后执行业务过程中线程1业务不阻塞了,他完成了自己的业务,按照我们之前代码的逻辑是完成业务之后就释放出来,他也不管是不是自己的线程,反正就直接把锁释放出来,这样就会导致其他的线程又可以来拿到这个锁,现在假设线程3也进来了,然后就会获得这个锁,那么又会出现两个线程同时拿到了锁。
整个过程如下图所示:
其核心的问题就在于释放锁的时候没有判断是不是属于自己的线程 ,所以我们把这个过程给加上,即获取锁时存入线程的标识,释放锁之前判断下锁标识是否是自己,就能解决上面的问题,用下图展示
对应的逻辑流程要改变下:
之前的代码中,我们已经把线程标识存在了 redis 的值里,这里需要回忆一下,当时我们是把业务名称作为Key,线程号作为value存到 redis 里面。不过仅用线程ID作为 value 不太严谨,因为多个集群里可能会出现相同的线程号,所以这里再拼上一段随机字符作为 value ,即最终的线程标识。最后释放锁之前把他取出来比较一下就行。
我们对代码做出如下改动:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 public class SimpleRedisLock implements ILock { private String name; private StringRedisTemplate stringRedisTemplate; public SimpleRedisLock (String name, StringRedisTemplate stringRedisTemplate) { this .name = name; this .stringRedisTemplate = stringRedisTemplate; } private static final String KEY_PREFIX = "lock:" ; private static final String ID_PREFIX = UUID.randomUUID().toString(true ) + "-" ; @Override public boolean tryLock (long timeoutSec) { String threadId = ID_PREFIX + Thread.currentThread().getId(); Boolean success = stringRedisTemplate.opsForValue() .setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS); return Boolean.TRUE.equals(success); } @Override public void unlock () { String threadId = ID_PREFIX + Thread.currentThread().getId(); String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name); if (threadId.equals(id)){ stringRedisTemplate.delete(KEY_PREFIX + name); } } }
测试过程我们可以先让一个请求完成,然后手动去 redis 里面把锁给删掉,模拟超时删除的情况,然后再去观察第2个请求,把断点打在判断是否是自己锁标识的那个地方,就可以观察到。测试过程不过多写了,具体的看视频。
5. Lua脚本保证分布式锁的原子性 之前的内容虽然解决了分布式锁误删的问题,但其实在一种极端的环境下还是有可能存在着误删现象,就是在判断锁标识是否一致之后,释放锁之前,线程1进入阻塞状态,如果这个阻塞时间很长就会触发超时释放锁,另一个线程获取锁之后,如果线程1此时阻塞结束,会直接释放锁,因为在之前它已经判断过了 。如下图所示
为了解决这个问题,我们要保证 判断锁标识 和 释放锁 这两个操作具备原子性
Redis提供了Lua脚本功能,在一个脚本中编写多条Redis命令,确保多条命令执行时的原子性 。Lua是一种编程语言,它的基本语法大家可以参考网站:https://www.runoob.com/lua/lua-tutorial.html
这里重点介绍Redis提供的调用函数,我们可以使用lua去操作redis,又能保证他的原子性,这样就可以实现拿锁比锁删锁是一个原子性动作了,作为Java程序员这一块并不作一个简单要求,并不需要大家过于精通,只需要知道他有什么作用即可。这部分不详细记录Lua的知识点了,知道脚本能够保证多条redis命令的原子性就可以
重点介绍一下Redis提供的调用函数,语法如下:
1 redis.call('命令名称' , 'key' , '其它参数' , ...)
例如,我们要执行set name jack,则脚本是这样:
1 2 # 执行 set name jack redis.call('set' , 'name' , 'jack' )
例如,我们要先执行set name Rose,再执行get name,则脚本如下:
1 2 3 4 5 6 # 先执行 set name jack redis.call('set' , 'name' , 'Rose' ) # 再执行 get namelocal name = redis.call('get' , 'name' ) # 返回return name
如果脚本中的key、value不想写死,可以作为参数传递。key类型参数会放入KEYS数组,其它参数会放入ARGV数组,在脚本中可以从KEYS和ARGV数组获取这些参数
在resourse文件夹下添加文件 unlock.lua
1 2 3 4 5 6 7 if (redis.call('get' , KEYS[1 ]) == ARGV[1 ]) then return redis.call('del' , KEYS[1 ])end return 0
去 SimpleRedisLock 更改释放锁的代码,需要先创建一个redis脚本的对象,DefaultRedisScript
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 private static final DefaultRedisScript<Long> UNLOCK_SCRIPT; static { UNLOCK_SCRIPT = new DefaultRedisScript <>(); UNLOCK_SCRIPT.setLocation(new ClassPathResource ("unlock.lua" )); UNLOCK_SCRIPT.setResultType(Long.class); } ... @Override public void unlock () { stringRedisTemplate.execute()( UNLOCK_SCRIPT, Collections.singletonList(KEY_PREFIX + name), ID_PREFIX + Thread.currentThread().getId()); }
以上的更改就保证了多条redis命令的原子性
到目前为止,我们实现的简单的redis分布式锁,就可以完成基本的需求,确保集群模式下的分布式多线程并发安全问题,其实现思路为:
利用set nx ex获取锁,并设置过期时间,保存线程标示
释放锁时先判断线程标示是否与自己一致,一致则删除锁
特性:
利用set nx满足互斥性
利用set ex保证故障时锁依然能释放,避免死锁,提高安全性
利用Redis集群保证高可用和高并发特性
七、分布式锁优化–Redisson 我们自己实现简单的分布式锁,即基于setnx实现的分布式锁存在下面的问题:
不可重入:同一个线程无法多次获取同一把锁
不可重试:获取锁只尝试一次就返回false,没有重试机制
超时释放:锁超时释放虽然可以避免死锁,但如果是业务执行耗时较长,也会导致锁释放,存在安全隐患
主从一致性:如果Redis提供了主从集群,主从同步存在延迟,当主宕机时,如果从同步主中的锁数据,则会出现问题
还有很多成熟的第三方工具
1. Redisson入门 Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务,其中就包含了各种分布式锁的实现。
官网地址: https://redisson.org
GitHub地址: https://github.com/redisson/redisson
下面演示一下redisson的使用
(1)导入依赖
1 2 3 4 5 <dependency > <groupId > org.redisson</groupId > <artifactId > redisson</artifactId > <version > 3.13.6</version > </dependency >
(2)添加配置类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 @Configuration public class RedissonConfig { @Bean public RedissonClient redissonClient () { Config config = new Config (); config.useSingleServer().setAddress("redis://127.0.0.1:6379" ).setPassword("123321" ); return Redisson.create(config); } }
虽然redisson也有配置文件配置的方法,但一般不建议,会覆盖原本redis的配置,推荐采用这种配置类的做法
(3)使用Redisson的分布式锁
和之前一样,也是分为获取锁、尝试创建锁、释放锁三个步骤
redisson的tryLock方法可以设置三个参数,分别是:获取锁的最大等待时间(期间会重试),锁自动释放时间,时间单位。无参默认情况是:不重试、自动超时为30s
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 @Resource private RedissonClient redissonClient;@Override public Result seckillVoucher (Long voucherId) { Long userId = UserHolder.getUser().getId(); RLock lock = redissonClient.getLock("lock:order:" + userId); boolean isLock = lock.tryLock(); if (!isLock){ return Result.fail("不允许重复下单!" ); } try { IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy(); return proxy.createVoucherOrder(voucherId); } finally { lock.unlock(); } }
测试结果显示,在集群模式下还是能够实现和之前一样的效果,即保证一人一单。(这里虽然用了Redisson,但没有演示可重入、可重试等特性,所以效果和之前一样)
2. 可重入锁原理 3. 可重试原理 4. 主从一致性原理 八、秒杀优化 1. 秒杀优化逻辑 暂时先不用管之前的分布式锁等一系列优化,回顾整个秒杀的业务流程。
服务器里整个业务的流程是串行执行 ,并且多次涉及到去DB里查询和更新 (箭头部分),无疑是性能很低的操作。因为DB支撑并发的能力很低,所以我们需要想办法优化这个问题。将不涉及DB的部分单独拿出来处理,即判断秒杀库存和校验一人一单环节。
这里我们进行性能测试,模拟1000个用户同时下单,这里跟随视频提前把秒杀券库存改为200,删除用户订单
生成1000个token到redis里,并保存到txt文件中,视频里没交代,就自己写的代码,我自己测试是没问题的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 @SpringBootTest public class CreateTokensTest { @Resource private StringRedisTemplate stringRedisTemplate; @Resource private UserMapper userMapper; @Test public void Create_1000_Tokens () { FileWriter fw = null ; try { File file = new File ("tokens.txt" ); fw = new FileWriter (file); for (int i = 1 ; i <= 1000 ; i++) { User user = userMapper.selectById(i); UserDTO userDTO = BeanUtil.copyProperties(user, UserDTO.class); System.out.println(userDTO); Map<String, Object> userMap = BeanUtil.beanToMap(userDTO, new HashMap <>(), CopyOptions.create() .setIgnoreNullValue(true ) .setFieldValueEditor((fieldName, fieldValue) -> fieldValue.toString())); String token = UUID.randomUUID().toString(true ); String tokenKey = LOGIN_USER_KEY + token; stringRedisTemplate.opsForHash().putAll(tokenKey, userMap); fw.write(token); fw.write("\r\n" ); } } catch (IOException e) { e.printStackTrace(); } finally { try { fw.close(); } catch (IOException e) { e.printStackTrace(); } } } }
然后在 JMeter 里进行设置,主要就是将登陆状态头改为 ${token},然后添加一个csv数据文件设置,去读取 tokens.txt,这部分可以看视频,或者直接使用黑马提供资料里的 jmx 文件。测试结果如下:能够完成一人一单业务,库存为0,下了200个订单,这个图就不截了。压测的结果如下:
平均响应时间是 508ms(还是比较慢的),吞吐量每秒 1164
所以秒杀业务优化的思路是:
(1)先利用Redis完成库存余量、一人一单判断,完成抢单业务
(2)再将下单业务放入阻塞队列,利用独立线程异步下单
这个就和我们平常生活中下单的逻辑很接近了,都是先下单,然后跳出一个付款的页面(这部分跳出页面和让人付款的时间就是一段缓冲时间,让DB的压力没那么大),并且也可以暂时不付款,下单成功之后,有一段时间限制,在这段时间内付款就可以。
我们可以不可以使用异步编排来做,或者说我开启N多线程,N多个线程,一个线程执行查询优惠卷,一个执行判断扣减库存,一个去创建订单等等,然后再统一做返回,这种做法和课程中有哪种好呢?答案是课程中的好,因为如果你采用我刚说的方式,如果访问的人很多,那么线程池中的线程可能一下子就被消耗完了,而且你使用上述方案,最大的特点在于,你觉得时效性会非常重要,但是你想想是吗?并不是,比如我只要确定他能做这件事,然后我后边慢慢做就可以了,我并不需要他一口气做完这件事,所以我们应当采用的是课程中,类似消息队列的方式来完成我们的需求,而不是使用线程池或者是异步编排的方式来完成这个需求
如何用Redis完成库存余量、一人一单的判断?
自然要利用缓存,将DB里的库存信息缓存到redis里,用普通的 string 结构就行,key就是优惠券订单,值就是 库存余量。判断库存是否充足只需要导redis中去根据key找对应的value是否大于0即可,如果不充足,则直接结束,如果充足,继续在redis中判断用户是否可以下单
一人一单的业务就需要用到 set 结构,将下单的用户都放到 set里,就能保证一个人只能下一单,key 还是优惠券订单,值就是下单的用户id。如果set集合中没有这条数据,说明他可以下单,如果set集合中没有这条记录,则将userId和优惠卷存入到redis中,并且返回0。
并且我们要保证整个过程的原子性,所以要放到 lua 脚本里,其整体业务逻辑为:
根据 Lua 脚本返回的标识来判断下一步如何处理,返回1或者2的异常信息,就不用处理,直接结束,返回0才表示要真正下单,我们将下单业务所需要的优惠券id,用户id,和订单id 放入阻塞队列,利用独立线程异步下单。其完整的流程为:
2. 代码实现秒杀优化 需求:
①新增秒杀优惠券的同时,将优惠券信息保存到Redis中
②基于Lua脚本,判断秒杀库存、一人一单,决定用户是否抢购成功
③如果抢购成功,将 优惠券id 、 用户id 、订单id 封装后存入阻塞队列
④开启线程任务,不断从阻塞队列中获取信息,实现异步下单功能
(后两个需求放到下一节讲)
第一步,将优惠券信息保存到Redis中,更改VoucherServiceImpl
1 2 3 4 5 6 7 8 9 10 11 12 @Override @Transactional public void addSeckillVoucher (Voucher voucher) { save(voucher); SeckillVoucher seckillVoucher = new SeckillVoucher (); ... seckillVoucherService.save(seckillVoucher); stringRedisTemplate.opsForValue().set(SECKILL_STOCK_KEY + voucher.getId(), voucher.getStock().toString()); }
测试还是使用postman提交添加秒杀券请求,然后去redis里观察是否添加到了redis里,这里我们新添加一个优惠券,内容相同(图里是100的库存,后面为了统一改成200了)。
第二步基于Lua脚本,判断秒杀库存、一人一单,决定用户是否抢购成功,逻辑上不难,就是熟悉这门脚本语言的问题。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 local voucherId = ARGV[1 ]local userId = ARGV[2 ]local stockKey = 'seckill:stock:' .. voucherIdlocal orderKey = 'seckill:order:' .. voucherIdif (tonumber (redis.call('get' , stockKey)) <= 0 ) then return 1 end if (redis.call('sismember' , orderKey, userId) == 1 ) then return 2 end redis.call('incrby' , stockKey, -1 ) redis.call('sadd' , orderKey, userId)return 0
然后更新下单业务的代码,seckillVoucher。注意redis里用到了两个id ,一个是查询秒杀券id,一个是放了 下过单的用户id,订单id 可没有放到redis里,暂时是直接返回给前端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 @Override public Result seckillVoucher (Long voucherId) { Long userId = UserHolder.getUser().getId(); Long result = stringRedisTemplate.execute( SECKILL_SCRIPT, Collections.emptyList(), voucherId.toString(), userId.toString() ); int r = result.intValue(); if (r != 0 ) { return Result.fail(r == 1 ? "库存不足" : "不能重复下单" ); } long orderId = redisIdWorker.nextId("order" ); return Result.ok(orderId); }
随后就可以进行压测,这个下单其实可以描述为“先抢单”,即只判断库存和一人一单,没问题就返回 用户id 和 秒杀券id,至于 用户id 和 秒杀券id 还要关联很多其他东西,是放在阻塞队列里慢慢去DB里创建,并不影响现在下单这个业务的性能,所以现在就可以进行测试
还是用1000个用户模拟下单,库存也是200,注意这次测试的秒杀券id是12,jmx文件里要同步修改,测试结果如下,平均响应时间到了 184 ms,吞吐量1972,相比优化前有不小的提升。其他测试结果就是redis里seckill:stock:12
的库存量减为了0,seckill:order:12
是下了订单的用户集合,里面是200个用户id,没有截图
3. 基于阻塞队列实现异步秒杀 需求:
①新增秒杀优惠券的同时,将优惠券信息保存到Redis中
②基于Lua脚本,判断秒杀库存、一人一单,决定用户是否抢购成功
③如果抢购成功,将 优惠券id 、 用户id 、订单id 封装后存入阻塞队列
④开启线程任务,不断从阻塞队列中获取信息,实现异步下单功能
前两个需求已经完成,现在完成后两个异步下单的过程。
第三步:如果抢购成功,将 优惠券id 、 用户id 、订单id 封装后存入阻塞队列
还是更新 seckillVoucher 的代码,加一个 封装后存入阻塞队列 的环节
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 private BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue <>(1024 * 1024 ); ...@Override public Result seckillVoucher (Long voucherId) { Long userId = UserHolder.getUser().getId(); Long result = stringRedisTemplate.execute( SECKILL_SCRIPT, Collections.emptyList(), voucherId.toString(), userId.toString() ); int r = result.intValue(); if (r != 0 ) { return Result.fail(r == 1 ? "库存不足" : "不能重复下单" ); } VoucherOrder voucherOrder = new VoucherOrder (); long orderId = redisIdWorker.nextId("order" ); voucherOrder.setId(orderId); voucherOrder.setUserId(userId); voucherOrder.setVoucherId(voucherId); orderTasks.add(voucherOrder); return Result.ok(orderId); }
第四步:开启线程任务,不断从阻塞队列中获取信息,实现异步下单功能
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();@PostConstruct private void init () { SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler ()); }private class VoucherOrderHandler implements Runnable { @Override public void run () { while (true ){ try { VoucherOrder voucherOrder = orderTasks.take(); createVoucherOrder(voucherOrder); } catch (Exception e) { log.error("处理订单异常" , e); } } } }
创建秒杀订单的方法,createVoucherOrder就得做出更新,其接受参数应改为 VoucherOrder,而不是之前的 voucherId,因为要从 VoucherOrder 中获取各种信息
后面就不用阻塞队列了,学完消息队列使用消息队列
九、redis消息队列 1. 理论知识 基于阻塞队列的异步秒杀存在哪些问题?
解决的方法是使用消息队列:
消息队列 (M essage Q ueue),字面意思就是存放消息的队列。最简单的消息队列模型包括3个角色:
消息队列:存储和管理消息,也被称为消息代理(Message Broker)
生产者:发送消息到消息队列
消费者:从消息队列获取消息并处理消息
一般我们不用redis作为消息队列,而是用kafka等第三方组件,但这里也是稍微过一下,知道redis能够作为消息队列使用
Redis提供了三种不同的方式来实现消息队列:
1.1 基于List实现消息队列 Redis的list数据结构是一个双向链表,很容易模拟出队列效果。队列是入口和出口不在一边,因此我们可以利用:LPUSH 结合 RPOP、或者 RPUSH 结合 LPOP来实现。不过要注意的是,当队列中没有消息时RPOP或LPOP操作会返回null,并不像JVM的阻塞队列那样会阻塞并等待消息。因此这里应该使用BRPOP 或者BLPOP 来实现阻塞效果。
缺点:无法避免消息丢失(前面提到的数据安全第二点)、只支持单消费者
1.2 基于PubSub的消息队列 PubSub (发布订阅)是Redis2.0版本引入的消息传递模型。顾名思义,消费者可以订阅一个或多个channel,生产者向对应channel发送消息后,所有订阅者都能收到相关消息。即支持多生产、多消费。相关的命令为:
SUBSCRIBE channel [channel] :订阅一个或多个频道
PSUBSCRIBE pattern[pattern] :订阅与pattern格式匹配的所有频道
PUBLISH channel msg :向一个频道发送消息
缺点:不支持数据持久化、无法避免消息丢失、消息堆积有上限,超出时数据丢失。(还不如list)
1.3 基于Stream的消息队列-单消费者 Stream 是 Redis 5.0 引入的一种新数据类型,可以实现一个功能非常完善的消息队列。
(1)发送消息的命令:XADD,一般都建议默认自动创建队列,默认使用redis自动生成消息的 id,消息添加成功后会返回消息的id
1 2 自动创建名为users的消息队列,并发送一个消息,内容是:{name =jack,age=21},并且使用redis自动生成ID XADD users * name jack age 21
(2)读取消息的方式之一:XREAD
1 2 3 4 使用XREAD读取第一个消息: XREAD COUNT 1 STREAMS users 0 XREAD阻塞方式,阻塞1000 ms,读取最新的消息: XREAD COUNT 1 BLOCK 1000 STREAMS users $
注意:当我们指定起始ID为$时,代表读取最新的消息,如果我们处理一条消息的过程中,又有超过1条以上的消息到达队列,则下次获取时也只能获取到最新的一条,会出现漏读消息的问题
STREAM类型消息队列的XREAD命令特点:消息可回溯(消息一直存在队列里,不删除)、一个消息可以被多个消费者读取、可以阻塞读取
缺点:有消息漏读的风险
1.4 基于Stream的消息队列-消费者组 消费者组(Consumer Group):将多个消费者划分到一个组中,监听同一个队列。具备下列特点:
消息分流 :队列中的消息会分流给组内的不同消费者,而不是重复消费,从而加快消息处理的速度。一定程度上避免了消息堆积的问题,如果我们就是想让多个消费者同时消费,那么就创建多个消费者组
消息标示 :消费者组会维护一个标示,记录最后一个被处理的消息,哪怕消费者宕机重启,还会从标示之后读取消息。确保每一个消息都会被消费。避免了前文单消费者模式下消息漏读的风险
消息确认 :消费者获取消息后,消息处于pending状态,并存入一个pending-list。当处理完成后需要通过XACK来确认消息,标记消息为已处理,才会从pending-list移除。确保消息都被处理,解决消息丢失的问题。
由此可见基于消费者组的消息队列,功能更加强大,基本符合我们的需求。常用命令有:
(1)创建消费者组:
1 2 3 4 XGROUP CREATE key groupName ID [MKSTREAM] 示例:给消息队列s1创建一个消费者组g1,从第1 个消息开始 XGROUP CREATE s1 g1 0
key:队列名称
groupName:消费者组名称
ID:起始ID标示,$代表队列中最后一个消息,0则代表队列中第一个消息。取决于你想不想要队列里原来的消息
MKSTREAM:队列不存在时自动创建队列
(2)从消费者组读取消息:
1 2 3 4 5 6 XREADGROUP GROUP group consumer [COUNT count ] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...] 示例:g1消费者组里的c1消费者,从消息队列s1 里,阻塞2 s读取1 条消息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 > 示例:g1消费者组里的c2消费者,从消息队列s1 里,阻塞2 s读取1 条消息,读取的是已消费但未确认的消息 XREADGROUP GROUP g1 c2 COUNT 1 BLOCK 2000 STREAMS s1 0
group:消费组名称
consumer:消费者名称,如果消费者不存在,会自动创建一个消费者
count:本次查询的最大数量
BLOCK milliseconds:当没有消息时最长等待时间
NOACK:无需手动ACK,获取到消息后自动确认,不建议设置,我们需要消息确认
STREAMS key:指定队列名称
ID:获取消息的起始ID:
“>”:从下一个未消费的消息开始(不是从最新的消息,和之前单消费者XREAD中的$不同了)
其它:根据指定id从pending-list中获取已消费但未确认的消息,例如0,是从pending-list中的第一个消息开始
(3)确认消息已读
1 2 3 4 XACK key group ID 示例:g1消费者组确认已读s1消息队列中的2 条消息 XACK s1 g1 1646339018049 -0 1646339342815 -0
key:队列名称
group:消费者组名称
ID:消息ID
(4)查看pending-list
1 2 3 4 XPENDING key group start end count 示例:从消息队列s1的g1消费者组里读取所有的10 条信息 XPENDING s1 g1 - + 10
key:队列名称
group:消费者组名称
start:最小的id,无穷小则是-
end:最大的id,无穷大则是+
count:想查看几条
(5)删除指定的消费者组
1 XGROUP DESTROY key groupName
java消费者监听消息的基本思路:(伪代码)
死循环在正常情况下读取下一个未消费的消息,出现异常(即消息处理完之后没有返回ACK确认)那就再来一个死循环去处理pending-list中已消费但未确认的消息,处理完了再回到正常流程的死循环
STREAM类型消息队列的XREADGROUP命令特点:消息可回溯;可以多消费者争抢消息,加快消费速度;可以阻塞读取;没有消息漏读的风险;有消息确认机制;保证消息至少被消费一次
缺点:STREAM类型消息队列持久化是基于redis持久化的,不能保证万无一失,有丢失风险;不支持生产者消息确认,如果生产者生成消息过程中宕机,无法解决;多消费者下的消息有序性,即事务管理。
如果对于消息队列业务要求没那么严格,STREAM类型消息队列就够用了,但如果要求很严格,STREAM类型消息队列还是存在上述的一些问题,就需要更加专业的第三方消息中间件
2. 基于消息队列实现异步秒杀 (1)创建一个Stream类型的消息队列,名为stream.orders,这就不用Java代码了,直接去redis控制台创建
1 xgroup create stream.orders g1 0 mkstream
(2)修改之前的秒杀下单Lua脚本,在认定有抢购资格后,直接向stream.orders中添加消息,内容包含voucherId、userId、orderId
1 2 3 4 5 6 local orderId = ARGV[3 ] redis.call('xadd' , 'stream.orders' , '*' , 'userId' , userId, 'voucherId' , voucherId, 'id' , orderId)
再更新抢单的业务逻辑
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 @Override public Result seckillVoucher (Long voucherId) { Long userId = UserHolder.getUser().getId(); long orderId = redisIdWorker.nextId("order" ); Long result = stringRedisTemplate.execute( SECKILL_SCRIPT, Collections.emptyList(), voucherId.toString(), userId.toString(), String.valueOf(orderId) ); int r = result.intValue(); if (r != 0 ) { return Result.fail(r == 1 ? "库存不足" : "不能重复下单" ); } return Result.ok(orderId); }
(3)项目启动时,开启一个线程任务,尝试获取stream.orders中的消息,完成下单
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();@PostConstruct private void init () { SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler ()); }private class VoucherOrderHandler implements Runnable { @Override public void run () { while (true ) { try { List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read( Consumer.from("g1" , "c1" ), StreamReadOptions.empty().count(1 ).block(Duration.ofSeconds(2 )), StreamOffset.create("stream.orders" , ReadOffset.lastConsumed()) ); if (list == null || list.isEmpty()) { continue ; } MapRecord<String, Object, Object> record = list.get(0 ); Map<Object, Object> value = record.getValue(); VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder (), true ); createVoucherOrder(voucherOrder); stringRedisTemplate.opsForStream().acknowledge("s1" , "g1" , record.getId()); } catch (Exception e) { log.error("处理订单异常" , e); handlePendingList(); } } } private void handlePendingList () { while (true ) { try { List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read( Consumer.from("g1" , "c1" ), StreamReadOptions.empty().count(1 ), StreamOffset.create("stream.orders" , ReadOffset.from("0" )) ); if (list == null || list.isEmpty()) { break ; } MapRecord<String, Object, Object> record = list.get(0 ); Map<Object, Object> value = record.getValue(); VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder (), true ); createVoucherOrder(voucherOrder); stringRedisTemplate.opsForStream().acknowledge("s1" , "g1" , record.getId()); } catch (Exception e) { log.error("处理PendingList订单异常" , e); } } } }
随后即可进行测试,谨记抢购的优惠券id是12,11因为没往redis里存,是抢购不了的。先进行单人下单,看看逻辑有没有问题,测试结果显示没有问题,redis库存-1,订单集合+1,DB同理。同时,redis的消息队列 stream.orders 里多了 一条消息
随后可以恢复库存,模拟高并发多人抢单,测试结果显示没有超卖,也保证了一人一单,至于平均响应时间和吞吐量,多次测试结果比较波动,但肯定比最开始的500多ms要好很多
后续这里自己用第三方消息中间件再处理一下