一、登录 redis启动命令:在finalsell中systemctl start redis
1、发送验证码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public Result sendCode (String phone, HttpSession session) { if (RegexUtils.isPhoneInvalid(phone)) { return Result.fail("手机号无效" ); } String code = RandomUtil.randomNumbers(6 ); session.setAttribute("code" , code); log.info("验证码为:{}" , code); return Result.ok(); }
2、登录功能 这里一开始忘了在createNewUser用mp的sava将新生成的user保存到数据库了,我说怎么半天不见数据库有新数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 public Result login (LoginFormDTO loginForm, HttpSession session) { String phone = loginForm.getPhone(); if (RegexUtils.isPhoneInvalid(phone)) { return Result.fail("手机号无效" ); } String code = loginForm.getCode(); Object cacheCode = session.getAttribute("code" ); if (cacheCode == null || !cacheCode.equals(code)){ return Result.fail("验证码无效" ); } User user = query().eq("phone" , phone).one(); if (user == null ){ user = createNewUser(phone); } session.setAttribute("user" , BeanUtil.copyProperties(user, UserDTO.class)); return Result.ok(); } private User createNewUser (String phone) { User user = new User (); user.setPhone(phone); user.setNickName("user_" + RandomUtil.randomString(10 )); save(user); return user; }
3、登录校验拦截器 现在有一个问题:对于其它业务(Controller),每个都要写登录逻辑岂不是很麻烦
答:引入拦截器,可以在所有Controller之前判断是否放行
又有一个问题:如何把拦截器拦截到的信息传给其它Controller呢?
答:ThreadLocal,每个请求都是一个独立的线程,也可以避免并发问题的出现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 public class LoginInterceptor implements HandlerInterceptor { @Override public boolean preHandle (HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception { HttpSession session = request.getSession(); Object user = session.getAttribute("user" ); if (user == null ){ return false ; } UserHolder.saveUser((UserDTO) user); return true ; } @Override public void afterCompletion (HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception { UserHolder.removeUser(); } }
拦截器写好了还没有生效,需要配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 @Configuration public class MvcConfig implements WebMvcConfigurer { @Override public void addInterceptors (InterceptorRegistry registry) { registry.addInterceptor(new LoginInterceptor ()). excludePathPatterns( "/shop/**" , "/voucher/**" , "shop-type/**" , "/upload/**" , "/blog/hot" , "/user/code" , "/user/login" ); } }
4 、Redis代替session解决session问题 STAR法则
S(Situation ) :尽管现在是一个单体式的架构,但为了应对并发,肯定要布置多台tomcat服务器形成负载均衡的集群。这时就有问题了:因为多台tomcat不能共享空间,可能导致多次请求打到不同的tomcat服务器上,出现数据丢失问题
T(Task) :session的替代方案应满足以下需求:
数据共享
内存存储(因为session就是基于内存的,所以读写效率才会比较高)
key-value结构
A(Action ) :
1 2 stringRedisTemplate.opsForValue().set(LOGIN_CODE_KEY + phone, code, LOGIN_CODE_TTL, TimeUnit.MINUTES);
接下来修改登录逻辑,在登录逻辑实现中修改代码如下:
1 2 3 4 5 6 7 String code = loginForm.getCode(); Object cacheCode = redisTemplate.opsForValue().get(LOGIN_CODE_KEY + phone); if (cacheCode == null || !cacheCode.equals(code)){ return Result.fail("验证码无效" ); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 UUID token = UUID.randomUUID(true ); UserDTO userDTO = BeanUtil.copyProperties(user, UserDTO.class); Map<String, Object> userMap = BeanUtil.beanToMap(userDTO); redisTemplate.opsForHash().putAll(LOGIN_USER_KEY + token, userMap); redisTemplate.expire(LOGIN_USER_KEY + token, LOGIN_USER_TTL, TimeUnit.MINUTES); return Result.ok(token);
拦截器中代码也要修改,同时只要拦截到请求,说明用户还在活跃状态,就刷新token的有效期(不然可能会出现用户一直在使用但token有效期过了就要重新登录的错误)
5、双重拦截器解决非登录页面token刷新问题 但是又有一个问题,因为我们第一个拦截器只拦截了需要登录的页面的请求。如果用户一直处于不需要登录的页面,是在活跃的,但这时token有效期过了再发请求可能就要重新登录,所以可以再加一个拦截器来拦截一切请求,只要来了请求就说明用户还在活跃,然后更新token有效期。
最终双重拦截器的代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 public class RefreshTokenInterceptor implements HandlerInterceptor { private StringRedisTemplate stringRedisTemplate; public RefreshTokenInterceptor (StringRedisTemplate stringRedisTemplate) { this .stringRedisTemplate = stringRedisTemplate; } @Override public boolean preHandle (HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception { String token = request.getHeader("authorization" ); if (StrUtil.isBlank(token)) { return true ; } Map<Object, Object> userMap = stringRedisTemplate.opsForHash().entries(RedisConstants.LOGIN_USER_KEY + token); if (userMap.isEmpty()){ return true ; } UserDTO userDTO = BeanUtil.fillBeanWithMap(userMap, new UserDTO (), false ); UserHolder.saveUser(userDTO); return true ; } @Override public void afterCompletion (HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception { UserHolder.removeUser(); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public class LoginInterceptor implements HandlerInterceptor { @Override public boolean preHandle (HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception { if (UserHolder.getUser() == null ) { return false ; } return true ; } }
同时需要在MvcConfig中更新配置,注意用order指定拦截器顺序
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 @Configuration public class MvcConfig implements WebMvcConfigurer { @Resource private StringRedisTemplate stringRedisTemplate; @Override public void addInterceptors (InterceptorRegistry registry) { registry.addInterceptor(new LoginInterceptor ()). excludePathPatterns( "/shop/**" , "/voucher/**" , "/shop-type/**" , "/upload/**" , "/blog/hot" , "/user/code" , "/user/login" ).order(1 ); registry.addInterceptor(new RefreshTokenInterceptor (stringRedisTemplate)).addPathPatterns("/**" ).order(0 ); } }
二、商户查询缓存 1、添加商铺缓存 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 @Service public class ShopServiceImpl extends ServiceImpl <ShopMapper, Shop> implements IShopService { @Autowired private StringRedisTemplate stringRedisTemplate; @Override public Result queryById (Long id) { String key = CACHE_SHOP_KEY + id; String shopJson = stringRedisTemplate.opsForValue().get(key); if (StrUtil.isNotBlank(shopJson)){ Shop shop = JSONUtil.toBean(shopJson, Shop.class); return Result.ok(shop); } Shop shop = getById(id); if (shop == null ) return Result.fail("商铺信息不存在" ); stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(shop)); return Result.ok(shop); } }
2、练习:添加商铺类型缓存 3、缓存更新策略 issue1 、采用什么缓存更新策略?
内存淘汰
超时剔除
主动更新
说明
不用自己维护, 利用Redis的内存淘汰机制, 当内存不足时自动淘汰部分数据。 下次查询时更新缓存。
给缓存数据添加TTL时间, 到期后自动删除缓存。 下次查询时更新缓存。
编写业务逻辑, 在修改数据库的同时, 更新缓存。
一致性
差
一般
好
维护成本
无
低
高
业务场景
低一致性需求:使用内存淘汰机制,例如店铺类型的查询缓存(因为这个很长一段时间都不需要更新)
高一致性需求:主动更新,并以超时剔除作为兜底方案,例如店铺详情查询的缓存
ans1 :本项目采用主动更新+超时剔除(兜底)
更新方案采用双写方案 (Cache Aside Pattern):人工编码方式,缓存调用者在更新完数据库后再去更新缓存。
issue2 :在对缓存的更新上,对比删除缓存与更新缓存
更新缓存
:每次更新数据库都需要更新缓存,无效写操作较多
删除缓存
:更新数据库时让缓存失效,再次查询时更新缓存
ans2 :所以我们采用直接删除缓存
issue3 :是先操作缓存(先删除缓存,再更新数据库)还是先操作数据库(先更新数据库,再操作缓存)?
对比如下:
先删除缓存,再更新数据库:
删除缓存的操作很快,但是更新数据库的操作相对较慢,如果此时有一个线程2刚好进来查询缓存,由于我们刚刚才删除缓存,所以线程2需要查询数据库,并写入缓存,但是我们更新数据库的操作还未完成,所以线程2查询到的数据是脏数据,出现线程安全问题
先更新数据库,再操作缓存
当线程1在查询缓存且未命中,此时线程1查询数据,查询完准备写入缓存时,由于没有加锁线程2乘虚而入,线程2在这期间对数据库进行了更新,此时线程1将旧数据返回了,出现了脏读,这个事件发生的概率很低,因为先是需要满足缓存未命中,且在写入缓存的那段事件内有一个线程进行更新操作,缓存的查询很快,这段空隙时间很小,所以出现脏读现象的概率也很低
这种方式的不足之处:存在脏读现象,但概率较小
ans3 :虽然这二者都存在线程安全问题,但是相对来说,后者出现线程安全问题的概率相对较低,所以我们最终采用后者先操作数据库,再删除缓存
的方案
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 @Override public Result queryById (Long id) { String key = CACHE_SHOP_KEY + id; String shopJson = stringRedisTemplate.opsForValue().get(key); Shop shop = null ; if (StrUtil.isNotBlank(shopJson)) { shop = JSONUtil.toBean(shopJson, Shop.class); return Result.ok(shop); } shop = this .getById(id); if (Objects.isNull(shop)) { return Result.fail("店铺不存在" ); } stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(shop), CACHE_SHOP_TTL, TimeUnit.MINUTES); return Result.ok(shop); } @Transactional @Override public Result updateShop (Shop shop) { boolean f = this .updateById(shop); if (!f){ throw new RuntimeException ("数据库更新失败" ); } f = stringRedisTemplate.delete(CACHE_SHOP_KEY + shop.getId()); if (!f){ throw new RuntimeException ("缓存删除失败" ); } return Result.ok(); }
4、缓存穿透 概念:客户端请求的数据在缓存中和数据库中都不存在,这样缓存永远不会生效,这些请求都会打到数据库。
解决方案:
缓存空对象
优点:实现简单,维护方便
缺点:额外的内存消耗,可能造成短期的不一致
布隆过滤
优点:内存占用较少,没有多余key
缺点:实现复杂,存在误判可能(有穿透的风险),无法删除数据
本文采用是缓存缓存空对象
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 @Override public Result queryById (Long id) { String shopJson = stringRedisTemplate.opsForValue().get(CACHE_SHOP_KEY + id); if (StrUtil.isNotBlank(shopJson)) { Shop shop = JSONUtil.toBean(shopJson, Shop.class); return Result.ok(shop); } if (shopjson != null ) { return Result.fail("店铺不存在!!" ); } Shop shop = getById(id); if (shop == null ) { stringRedisTemplate.opsForValue().set(CACHE_SHOP_KEY + id, "" , CACHE_NULL_TTL, TimeUnit.MINUTES); return Result.fail("店铺不存在!!" ); } String jsonStr = JSONUtil.toJsonStr(shop); stringRedisTemplate.opsForValue().set(CACHE_SHOP_KEY + id, jsonStr, CACHE_SHOP_TTL, TimeUnit.MINUTES); return Result.ok(shop); }
5、缓存雪崩
概念:缓存雪崩 是指在同一时段大量的缓存key同时失效或者Redis服务宕机,导致大量请求到达数据库,带来巨大压力。
解决方案
给不同的Key的TTL添加随机值
利用Redis集群提高服务的可用性
给缓存业务添加降级限流策略 ,比如快速失败机制,让请求尽可能打不到数据库上
给业务添加多级缓存
这里采用方案1
6、缓存击穿
概念:缓存击穿问题也叫热点Key问题 ,就是一个被高并发访问 并且缓存重建业务较复杂 的key突然失效了,无数的请求访问会在瞬间给数据库带来巨大的冲击。
解决方案
互斥锁(时间换空间) 优点:内存占用小,一致性高,实现简单 缺点:性能较低,容易出现死锁
逻辑过期(空间换时间) 优点:性能高 缺点:内存占用较大,容易出现脏读
两者相比较,互斥锁更加易于实现,但是容易发生死锁,且锁导致并行变成串行,导致系统性能下降,逻辑过期实现起来相较复杂,且需要耗费额外的内存,但是通过开启子线程重建缓存,使原来的同步阻塞变成异步,提高系统的响应速度,但是容易出现脏读
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 @Override public Result queryById (Long id) { String key = CACHE_SHOP_KEY + id; Result result = getShopFromCache(key); if (Objects.nonNull(result)) { return result; } try { String lockKey = LOCK_SHOP_KEY + id; boolean isLock = tryLock(lockKey); if (!isLock) { Thread.sleep(50 ); return queryById(id); } result = getShopFromCache(key); if (Objects.nonNull(result)) { return result; } Shop shop = this .getById(id); if (Objects.isNull(shop)) { stringRedisTemplate.opsForValue().set(key, "" , CACHE_NULL_TTL, TimeUnit.SECONDS); return Result.fail("店铺不存在" ); } stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(shop), CACHE_SHOP_TTL, TimeUnit.MINUTES); return Result.ok(shop); }catch (Exception e){ throw new RuntimeException ("发生异常" ); } finally { unlock(key); } } private Result getShopFromCache (String key) { String shopJson = stringRedisTemplate.opsForValue().get(key); if (StrUtil.isNotBlank(shopJson)) { Shop shop = JSONUtil.toBean(shopJson, Shop.class); return Result.ok(shop); } if (Objects.nonNull(shopJson)) { return Result.fail("店铺不存在" ); } return null ; } private boolean tryLock (String key) { Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1" , 10 , TimeUnit.SECONDS); return BooleanUtil.isTrue(flag); } private void unlock (String key) { stringRedisTemplate.delete(key); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 public static final ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(10 ); @Override public Result queryById (Long id) { String key = CACHE_SHOP_KEY + id; String shopJson = stringRedisTemplate.opsForValue().get(key); if (StrUtil.isBlank(shopJson)) { return Result.fail("店铺数据不存在" ); } RedisData redisData = JSONUtil.toBean(shopJson, RedisData.class); JSONObject data = (JSONObject) redisData.getData(); Shop shop = JSONUtil.toBean(data, Shop.class); LocalDateTime expireTime = redisData.getExpireTime(); if (expireTime.isAfter(LocalDateTime.now())) { return Result.ok(shop); } String lockKey = LOCK_SHOP_KEY + id; boolean isLock = tryLock(lockKey); if (isLock) { CACHE_REBUILD_EXECUTOR.submit(() -> { try { this .saveShopToCache(id, 20L ); } catch (InterruptedException e) { throw new RuntimeException (e); } finally { unlock(lockKey); } }); } shopJson = stringRedisTemplate.opsForValue().get(key); if (StrUtil.isBlank(shopJson)) { return Result.fail("店铺数据不存在" ); } redisData = JSONUtil.toBean(shopJson, RedisData.class); data = (JSONObject) redisData.getData(); shop = JSONUtil.toBean(data, Shop.class); expireTime = redisData.getExpireTime(); if (expireTime.isAfter(LocalDateTime.now())) { return Result.ok(shop); } return Result.ok(shop); } private Result getShopFromCache (String key) { String shopJson = stringRedisTemplate.opsForValue().get(key); if (StrUtil.isNotBlank(shopJson)) { Shop shop = JSONUtil.toBean(shopJson, Shop.class); return Result.ok(shop); } if (Objects.nonNull(shopJson)) { return Result.fail("店铺不存在" ); } return null ; } public void saveShopToCache (Long id, Long expireSeconds) throws InterruptedException { Shop shop = this .getById(id); Thread.sleep(200 ); RedisData redisData = new RedisData (); redisData.setData(shop); redisData.setExpireTime(LocalDateTime.now().plusSeconds(expireSeconds)); stringRedisTemplate.opsForValue().set(CACHE_SHOP_KEY + id, JSONUtil.toJsonStr(redisData)); }
小结 为了解决数据一致性问题,我们可以选择适当的缓存更新策略:
以缓存主动更新(双写方案+删除缓存模式+先操作数据库后操作缓存+事务)为主,超时剔除为辅
查询时,先查询缓存,缓存命中直接返回,缓存未命中查询数据库并重建缓存,返回查询结果
更新时,先修改数据删除缓存,使用事务保证缓存和数据操作两者的原子性、
除了会遇到数据一致性问题意外,我们还会遇到缓存穿透、缓存雪崩、缓存击穿等问题
对于缓存穿透,我们采用了缓存空对象 解决
对于缓存击穿,我们分别演示了互斥锁(setnx实现方式)和逻辑过期 两种方式解决
三、优惠券秒杀 1、全局唯一ID 自增ID存在的问题
当用户抢购时,就会生成订单并保存到tb_voucher_order
这张表中,而订单表如果使用数据库自增ID就存在一些问题:
id的规律性太明显 ,容易出现信息的泄露,被不怀好意的人伪造请求
受单表数据量的限制 ,MySQL中表能够存储的数据有限,会出现分库分表的情况,id不能够一直自增
那么该如何解决呢?我们需要使用分布式ID(也可以叫全局唯一ID) ,分布式ID满足以下特点:
全局唯一性:分布式ID保证在整个分布式系统中唯一性,不会出现重复的标识符。这对于区分和追踪系统中的不同实体非常重要。
高可用性:分布式ID生成器通常被设计为高可用的组件,可以通过水平扩展、冗余备份或集群部署来确保服务的可用性。即使某个节点或组件发生故障,仍然能够正常生成唯一的ID标识符。
安全性:分布式ID生成器通常是独立于应用程序和业务逻辑的。它们被设计为一个单独的组件或服务,可以被各种应用程序和服务所共享和使用,使得各个应用程序之间的ID生成过程互不干扰。
高性能:分布式ID生成器通常要求在很短的时间内生成唯一的标识符。为了实现低延迟,设计者通常采用高效的算法和数据结构,以及优化的网络通信和存储策略。
递增性:分布式ID通常可以被设计成可按时间顺序排序,以便更容易对生成的ID进行索引、检索或排序操作。这对于一些场景,如日志记录和事件溯源等,非常重要。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 @Component public class RedisIdWorker { @Resource private StringRedisTemplate stringRedisTemplate; private static final long BEGIN_TIMESTAMP = 1640995200 ; private static final int COUNT_BITS = 32 ; public long nextId (String keyPrefix) { LocalDateTime now = LocalDateTime.now(); long nowSecond = now.toEpochSecond(ZoneOffset.UTC); long timestamp = nowSecond - BEGIN_TIMESTAMP; String date = now.format(DateTimeFormatter.ofPattern("yyyyMMdd" )); Long count = stringRedisTemplate.opsForValue().increment("icr:" + keyPrefix + ":" + date); return timestamp << COUNT_BITS | count; } }
注意: 测试类包括CountDownLatch
的使用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 @SpringBootTest public class RedisIdWorkerTest { @Resource private RedisIdWorker redisIdWorker; private ExecutorService es = Executors.newFixedThreadPool(500 ); @Test public void testNextId () throws InterruptedException { CountDownLatch latch = new CountDownLatch (300 ); Runnable task = () -> { for (int i = 0 ; i < 100 ; i++) { long id = redisIdWorker.nextId("order" ); System.out.println("id = " + id); } latch.countDown(); }; long begin = System.currentTimeMillis(); for (int i = 0 ; i < 300 ; i++) { es.submit(task); } latch.await(); long end = System.currentTimeMillis(); System.out.println("生成3w个id共耗时" + (end - begin) + "ms" ); } }
2、秒杀券下单功能 首先添加秒杀券,tb_voucher保存的是普通优惠券,而tb_seckill_voucher保存的是秒杀券的信息,也就是说,秒杀券也是优惠券,只是秒杀券比优惠券有更多的一些信息。在这里踩了一个坑。就是设置秒杀券的end_time时,这个时间必须在你电脑当前时间之后,否则就会不显示!!
然后是秒杀券的下单功能
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 @Transactional public Result seckillVocher (Long voucherId) { SeckillVoucher voucher = seckillVoucherService.getById(voucherId); log.info("当前秒杀券" ); if (voucher.getBeginTime().isAfter(LocalDateTime.now())) { return Result.fail("秒杀尚未开始" ); } if (voucher.getEndTime().isBefore(LocalDateTime.now())) { return Result.fail("秒杀已结束" ); } if (voucher.getStock() < 1 ) { return Result.fail("秒杀券已抢空" ); } boolean flag = seckillVoucherService.update(new LambdaUpdateWrapper <SeckillVoucher>() .eq(SeckillVoucher::getVoucherId, voucherId) .setSql("stock = stock -1" )); if (!flag){ throw new RuntimeException ("秒杀券扣减失败" ); } VoucherOrder voucherOrder = new VoucherOrder (); long orderId = redisIdWorker.nextId("order" ); voucherOrder.setId(orderId); voucherOrder.setUserId(UserHolder.getUser().getId()); voucherOrder.setVoucherId(voucherOrder.getId()); save(voucherOrder); return Result.ok(orderId); }
这里也踩了一个坑,就是执行了半天点击但发现优惠券并没有扣除,甚至controller处的写的打印信息都没有在控制台打印,原因好像跟访问路径有关系??刚解决完就忘了。。鱼的记忆-_-
3、单体下一人多单超卖问题 上一节我们通过分布式ID+事务成功完成了优惠券秒杀功能,并且在测试后发现逻辑跑通了,看上去已经成功的解决了秒杀优惠券功能。但是前面我们只是正常的测试,那如果换到高并发的场景下能否成功解决?现在就让我们使用 Jmeter 来进行压力测试看看吧!
jmeter设置
经过测试, 发现有超卖问题。为什么会产生超卖呢?
线程1查询库存,发现库存充足,创建订单,然后准备对库存进行扣减,但此时线程2和线程3也进行查询,同样发现库存充足,然后线程1执行完扣减操作后,库存变为了0,线程2和线程3同样完成了库存扣减操作,最终导致库存变成了负数!这就是超卖问题的完整流程
那么我们该如何有效防止超卖问题的发生呢,以下提供几种常见的解决方案
超卖问题的常见解决方案:
悲观锁,认为线程安全问题一定会发生,因此操作数据库之前都需要先获取锁,确保线程串行执行。常见的悲观锁有:synchronized、lock
乐观锁,认为线程安全问题不一定发生,因此不加锁,只会在更新数据库的时候去判断有没有其它线程对数据进行修改,如果没有修改则认为是安全的,直接更新数据库中的数据即可,如果修改了则说明不安全,直接抛异常或者等待重试。常见的实现方式有:版本号法、CAS操作、乐观锁算法
悲观锁和乐观锁的比较
悲观锁比乐观锁的性能低:悲观锁需要先加锁再操作,而乐观锁不需要加锁,所以乐观锁通常具有更好的性能。
悲观锁比乐观锁的冲突处理能力低:悲观锁在冲突发生时直接阻塞其他线程,乐观锁则是在提交阶段检查冲突并进行重试。
悲观锁比乐观锁的并发度低:悲观锁存在锁粒度较大的问题,可能会限制并发性能;而乐观锁可以实现较高的并发度。
应用场景:两者都是互斥锁,悲观锁适合写入操作较多、冲突频繁的场景;乐观锁适合读取操作较多、冲突较少的场景。
拓展:CAS
CAS(Compare and Swap)是一种并发编程中常用的原子操作,用于解决多线程环境下的数据竞争问题。它是乐观锁算法的一种实现方式。
CAS操作包含三个参数:内存地址V、旧的预期值A和新的值B。CAS的执行过程如下:
比较(Compare):将内存地址V中的值与预期值A进行比较。
判断(Judgment):如果相等,则说明当前值和预期值相等,表示没有发生其他线程的修改。
交换(Swap):使用新的值B来更新内存地址V中的值。
CAS操作是一个原子操作,意味着在执行过程中不会被其他线程中断,保证了线程安全性。如果CAS操作失败(即当前值与预期值不相等),通常会进行重试,直到CAS操作成功为止。
CAS操作适用于精细粒度的并发控制,可以避免使用传统的加锁机制带来的性能开销和线程阻塞。然而,CAS操作也存在一些限制和注意事项:
ABA问题:CAS操作无法感知到对象值从A变为B又变回A的情况,可能会导致数据不一致。为了解决ABA问题,可以引入版本号或标记位等机制。
自旋开销:当CAS操作失败时,需要不断地进行重试,会占用CPU资源。如果重试次数过多或者线程争用激烈,可能会引起性能问题。
并发性限制:如果多个线程同时对同一内存地址进行CAS操作,只有一个线程的CAS操作会成功,其他线程需要重试或放弃操作。
在Java中,提供了相关的CAS操作支持,如AtomicInteger、AtomicLong、AtomicReference等类,可以实现基于CAS操作的线程安全操作。
乐观锁解决一人多单超卖问题
CAS法类似与版本号法,但是不需要另外在添加一个 version 字段,而是直接使用库存替代版本号,线程1查询完库存后进行库存扣减操作,线程2在查询库存时,发现库存充足,也准备执行库存扣减操作,但是需要判断当前的库存是否是之前查询时的库存,结果发现库存数量发生了改变,这就说明数据库中的数据已经发生了修改,需要进行重试(或者直接抛异常中断)
qps200 ,异常90%, 吞吐量1652.9
这又是什么原因呢?这就是乐观锁的弊端,我们只要发现数据修改就直接终止操作了,我们只需要修改一下判断条件,即只要库存大于0就可以进行修改,而不是库存数据修改我们就终止操作
1 2 3 4 5 boolean flag = seckillVoucherService.update(new LambdaUpdateWrapper <SeckillVoucher>() .eq(SeckillVoucher::getVoucherId, voucherId) .gt(SeckillVoucher::getStock, 0 ) .setSql("stock = stock -1" ));
执行完之后库存恰好为0,订单表中恰好是一百条
4、单体下一人一单问题
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 @Override @Transactional public Result seckillVocher (Long voucherId) { SeckillVoucher voucher = seckillVoucherService.getById(voucherId); log.info("当前秒杀券" ); if (voucher.getBeginTime().isAfter(LocalDateTime.now())) { return Result.fail("秒杀尚未开始" ); } if (voucher.getEndTime().isBefore(LocalDateTime.now())) { return Result.fail("秒杀已结束" ); } if (voucher.getStock() < 1 ) { return Result.fail("秒杀券已抢空" ); } int count = this .count(new LambdaQueryWrapper <VoucherOrder>() .eq(VoucherOrder::getUserId, UserHolder.getUser().getId())); if (count >= 1 ) { return Result.fail("用户已购买" ); } boolean flag = seckillVoucherService.update(new LambdaUpdateWrapper <SeckillVoucher>() .eq(SeckillVoucher::getVoucherId, voucherId) .gt(SeckillVoucher::getStock, 0 ) .setSql("stock = stock -1" )); if (!flag) { throw new RuntimeException ("秒杀券扣减失败" ); } VoucherOrder voucherOrder = new VoucherOrder (); long orderId = redisIdWorker.nextId("order" ); voucherOrder.setId(orderId); voucherOrder.setUserId(UserHolder.getUser().getId()); voucherOrder.setVoucherId(voucherOrder.getId()); save(voucherOrder); return Result.ok(orderId); }
现在是不是可以成功完成了一人一单的要求呢?让我们来使用 Jmeter 测一下吧
通过测试,发现并没有达到我们想象中的目标,一个人只能购买一次,但是发现一个用户居然能够购买10次。这说明还是存在超卖问题!
问题原因: 出现这个问题的原因和前面库存为负数数的情况是一样的,线程1查询当前用户是否有订单,当前用户没有订单准备下单,此时线程2也查询当前用户是否有订单,由于线程1还没有完成下单操作,线程2同样发现当前用户未下单,也准备下单,这样明明一个用户只能下一单,结果下了两单,也就出现了超卖问题
解决方案: 一般这种超卖问题可以使用下面两种常见的解决方案
悲观锁
乐观锁
悲观锁解决超卖问题
乐观锁需要判断数据是否修改,而当前是判断当前是否存在,所以无法像解决库存超卖一样使用CAS机制,但是可以使用版本号法,但是版本号法需要新增一个字段,所以这里为了方便,就直接演示使用悲观锁解决超卖问题
代码这里注意,seckillVocher上面的@Transactional
要去掉,然后要加一个依赖,启动类上加上一个注解@EnableAspectJAutoProxy(exposeProxy = true)
1 2 3 4 5 <dependency> <groupId>org.aspectj</groupId> <artifactId>aspectjweaver</artifactId> <version>1.9 .21 </version> </dependency>
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 @Override public Result seckillVocher (Long voucherId) { SeckillVoucher voucher = seckillVoucherService.getById(voucherId); log.info("当前秒杀券" ); if (voucher.getBeginTime().isAfter(LocalDateTime.now())) { return Result.fail("秒杀尚未开始" ); } if (voucher.getEndTime().isBefore(LocalDateTime.now())) { return Result.fail("秒杀已结束" ); } if (voucher.getStock() < 1 ) { return Result.fail("秒杀券已抢空" ); } Long userId = UserHolder.getUser().getId(); synchronized (userId.toString().intern()) { IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy(); return proxy.createVoucherOrder(userId, voucherId); } } @Transactional public Result createVoucherOrder (Long userId, Long voucherId) { int count = this .count(new LambdaQueryWrapper <VoucherOrder>() .eq(VoucherOrder::getUserId, userId)); if (count >= 1 ) { return Result.fail("用户已购买" ); } boolean flag = seckillVoucherService.update(new LambdaUpdateWrapper <SeckillVoucher>() .eq(SeckillVoucher::getVoucherId, voucherId) .gt(SeckillVoucher::getStock, 0 ) .setSql("stock = stock -1" )); if (!flag) { throw new RuntimeException ("秒杀券扣减失败" ); } VoucherOrder voucherOrder = new VoucherOrder (); long orderId = redisIdWorker.nextId("order" ); voucherOrder.setId(orderId); voucherOrder.setUserId(UserHolder.getUser().getId()); voucherOrder.setVoucherId(voucherOrder.getId()); flag = this .save(voucherOrder); if (!flag) { throw new RuntimeException ("创建秒杀券订单失败" ); } return Result.ok(orderId); }
TODO 这里代码的实现细节有点看不懂(以下内容看的别人的总结,这里回头再看看
锁的范围尽量小。synchronized尽量锁代码块,而不是方法,锁的范围越大性能越低
锁的对象一定要是一个不变的值。我们不能直接锁 Long 类型的 userId,每请求一次都会创建一个新的 userId 对象,synchronized 要锁不变的值,所以我们要将 Long 类型的 userId 通过 toString()方法转成 String 类型的 userId,toString()方法底层(可以点击去看源码)是直接 new 一个新的String对象,显然还是在变,所以我们要使用 intern() 方法从常量池中寻找与当前 字符串值一致的字符串对象,这就能够保障一个用户 发送多次请求,每次请求的 userId 都是不变的,从而能够完成锁的效果(并行变串行)
我们要锁住整个事务,而不是锁住事务内部的代码。如果我们锁住事务内部的代码会导致其它线程能够进入事务,当我们事务还未提交,锁一旦释放,仍然会存在超卖问题
Spring的@Transactional注解要想事务生效,必须使用动态代理。Service中一个方法中调用另一个方法,另一个方法使用了事务,此时会导致@Transactional失效,所以我们需要创建一个代理对象,使用代理对象来调用方法。
总结:
首先用全局唯一id代表优惠券的下单订单号。如果一个人可以买多单的情况下,在并发场景下,会出现超卖情况,此时用乐观锁的cas来实现防止超卖。然后是单体的一人一单,这里就是下单前需要验证当前用户是否已经下单了,确认是否存在,如果硬要用乐观锁,使用版本号法需要加一个字段,所以用的是悲观锁。但如果在集群环境下,由于锁依赖于jvm内部的监视器,如果是多台机器,每个机器内部的锁只能锁住自己这个进程的,所以要用分布式锁了。
四、分布式锁 由于synchronized是本地锁,只能提供线程级别的同步,每个JVM中都有一把synchronized锁,不能跨 JVM 进行上锁,当一个线程进入被 synchronized 关键字修饰的方法或代码块时,它会尝试获取对象的内置锁(也称为监视器锁)。如果该锁没有被其他线程占用,则当前线程获得锁,可以继续执行代码;否则,当前线程将进入阻塞状态,直到获取到锁为止。而现在我们是创建了两个节点,也就意味着有两个JVM,所以synchronized会失效!从而出现超卖问题:
分布式锁的常见实现
基于关系数据库:可以利用数据库的事务特性和唯一索引来实现分布式锁。通过向数据库插入一条具有唯一约束的记录作为锁,其他进程在获取锁时会受到数据库的并发控制机制限制。
基于缓存(如Redis):使用分布式缓存服务(如Redis)提供的原子操作来实现分布式锁。通过将锁信息存储在缓存中,其他进程可以通过检查缓存中的锁状态来判断是否可以获取锁。
基于ZooKeeper:ZooKeeper是一个分布式协调服务,可以用于实现分布式锁。通过创建临时有序节点,每个请求都会尝试创建一个唯一的节点,并检查自己是否是最小节点,如果是,则表示获取到了锁。
从性能角度(从高到低):
缓存 > Zookeeper > 数据库
从可靠性角度(从高到低):
Zookeeper > 缓存 > 数据库
setnx指令的特点 :setnx
只能设置key不存在的值,值不存在设置成功,返回 1 ;值存在设置失败,返回 0
获取锁
1 2 # 添加锁 set [key] [value] ex [time] nx
释放锁
1 2 # 释放锁(除了使用del手动释放,还可超时释放) del [key]
1、基于redis实现的分布式锁 创建分布式锁
1 2 3 4 5 6 7 8 9 10 11 12 13 public interface ILock { boolean tryLock (long timeoutSec) ; void unlock () ; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 public class SimpleRedisLock implements ILock { private StringRedisTemplate stringRedisTemplate; private String name; public SimpleRedisLock (StringRedisTemplate stringRedisTemplate, String name) { this .stringRedisTemplate = stringRedisTemplate; this .name = name; } @Override public boolean tryLock (long timeoutSec) { String id = Thread.currentThread().getId() + "" ; Boolean result = stringRedisTemplate.opsForValue() .setIfAbsent("lock:" + name, id, timeoutSec, TimeUnit.SECONDS); return Boolean.TRUE.equals(result); } @Override public void unlock () { stringRedisTemplate.delete("lock:" + name); } }
使用分布式锁:改造前面VoucherOrderServiceImpl中的代码,将之前使用sychronized
锁的地方,改成我们自己实现的分布式锁:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 Long userId = UserHolder.getUser().getId(); SimpleRedisLock lock = new SimpleRedisLock (stringRedisTemplate, "order:" + userId); boolean isLock = lock.tryLock(1200 ); if (!isLock) { return Result.fail("一人只能下一单" ); } try { IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy(); return proxy.createVoucherOrder(userId, voucherId); } finally { lock.unlock(); }
然后用apifox发送两个请求
可以看到8081端口为false,8082端口为true
这里遇到了一个bug,就是打了断点但是变量那里看不到,根本就没有进入断点。最后发现是apifox发送的请求中,携带登录token的参数Authorization应该是放在header中,我一开始放在了params中
2、分布式锁优化1 ——解决释放不属于自己锁的问题
上一节,我们实现了一个简单的分布式锁,但是会存在一个问题:当线程1获取锁后,由于业务阻塞,线程1的锁超时释放了,这时候线程2趁虚而入拿到了锁,然后此时线程1业务完成了,然后把线程2刚刚获取的锁给释放了,这时候线程3又趁虚而入拿到了锁,这就导致又出现了超卖问题!(但是这个在小项目(并发数不高)中出现的概率比较低,在大型项目(并发数高)情况下是有一定概率的)
备注:我们可以把锁的有效期降低一点,这样就能够测试上面哪种情况了(●’◡’●)
如何解决呢?我们为分布式锁添加一个线程标识,在释放锁时判断当前锁是否是自己的锁,是自己的就直接释放,不是自己的就不释放锁,从而解决多个线程同时获得锁的情况导致出现超卖
3、分布式锁优化2 ——释放锁时的原子性问题。说到底也是锁超时释放的问题
在上一节中,我们通过给锁添加一个线程标识,并且在释放锁时添加一个判断,从而防止锁超时释放产生的超卖问题,一定程度上解决了超卖问题,但是仍有可能发生超卖问题(出现超卖概率更低了):当线程1获取锁,执行完业务然后并且判断完当前锁是自己的锁时,但就在此时发生了阻塞,结果锁被超时释放了,线程2立马就趁虚而入了,获得锁执行业务,但就在此时线程1阻塞完成,由于已经判断过锁,已经确定锁是自己的锁了,于是直接就删除了锁,结果删的是线程2的锁,这就又导致线程3趁虚而入了,从而继续发生超卖问题
备注:我们可以在判断删除锁的那行代码上打一个断点,然后user1发送一个请求,获取锁,手动把锁删了,模拟锁超时释放,然后使用user2发送一个请求,成功获取锁,从而模拟上诉过程,检验超卖问题
PS:虽然这个情况发生的概率较低,但是根据墨菲定律,我们最好不要抱有侥幸心理,不然最终我们会在这个细微的问题上付诸沉重的代价!你可能还会想,判断锁和释放锁在同一个方法中,并且两者之间没有别的代码,为什么会发生阻塞呢?JVM的垃圾回收机制会导致短暂的阻塞(我个人感觉这种情况发生的概率真的不高,但是我也没有实际接触过真正的大型高并发项目,所以具体也只能靠揣摩)
那么我们该如何保障 判断锁 和 释放锁 这连段代码的原子性呢?答案是使用Lua脚本
4、Redisson 经过优化1和优化2,我们实现的分布式锁已经达到生产可用级别 了,但是还不够完善,比如:
分布式锁不可重入:不可重入是指同一线程不能重复获取同一把锁。比如,方法A中调用方法B,方法A需要获取分布式锁,方法B同样需要获取分布式锁,线程1进入方法A获取了一次锁,进入方法B又获取一次锁,由于锁不可重入,所以就会导致死锁
分布式锁不可重试:获取锁只尝试一次就返回false,没有重试机制,这会导致数据丢失,比如线程1获取锁,然后要将数据写入数据库,但是当前的锁被线程2占用了,线程1直接就结束了而不去重试,这就导致数据发生了丢失
分布式锁超时释放:超市释放机机制虽然一定程度避免了死锁发生的概率,但是如果业务执行耗时过长,期间锁就释放了,这样存在安全隐患。锁的有效期过短,容易出现业务没执行完就被释放,锁的有效期过长,容易出现死锁,所以这是一个大难题!
我们可以设置一个较短的有效期,但是加上一个 心跳机制 和 自动续期:在锁被获取后,可以使用心跳机制并自动续期锁的持有时间。通过定期发送心跳请求,显示地告知其他线程或系统锁还在使用中,同时更新锁的过期时间。如果某个线程持有锁的时间超过了预设的有效时间,其他线程可以尝试重新获取锁。
主从一致性问题:如果Redis提供了主从集群,主从同步存在延迟,线程1获取了锁
我们如果想要更进一步优化分布式锁,当然是可以的,但是没必要,除非是迫不得已,我们完全可以直接使用已经造好的轮子,比如:Redisson。Redssion是一个十分成熟的Redis框架,功能也很多,比如:分布式锁和同步器、分布式对象、分布式集合、分布式服务,各种Redis实现分布式的解决方案。简而言之Redisson就是一个使用Redis解决分布式问题的方案的集合,当然它不仅仅是解决分布式相关问题,还包含其它的一些问题。
所以说分布式锁的究极优化就是使用别人造好的轮子🤣redisson分布式锁的实现使用
1)、引入依赖
1 2 3 4 5 <dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.13 .6 </version> </dependency>
2)、配置Redisson客户端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 @Configuration public class RedissonConfig { @Bean public RedissonClient redissonClient () { Config config = new Config (); config.useSingleServer().setAddress("redis://192.168.200.100:6379" ).setPassword("123321" ); return Redisson.create(config); } }
温馨提示 :此外还有一种引入方式,可以引入 redission 的 starter 依赖,然后在yml文件中配置Redisson,但是不推荐这种方式,因为他会替换掉 Spring官方 提供的这套对 Redisson 的配置
3)、修改一下使用锁的地方,其它的业务代码都不需要改
1 2 3 RLock lock = redissonClient.getLock("lock:order:" + userId); boolean isLock = lock.tryLock();
五、MQ异步优化 此时先测一下之前同步的方式性能如何
用一下测试代码生成1000个登录用户的token并保存
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 package com.hmdp;import cn.hutool.core.bean.BeanUtil;import cn.hutool.core.bean.copier.CopyOptions;import com.hmdp.dto.UserDTO;import com.hmdp.entity.User;import com.hmdp.service.IUserService;import org.junit.jupiter.api.Test;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.data.redis.core.StringRedisTemplate;import javax.annotation.Resource;import java.io.File;import java.io.FileOutputStream;import java.io.IOException;import java.util.HashMap;import java.util.List;import java.util.Map;import java.util.UUID;import java.util.concurrent.TimeUnit;import static com.hmdp.utils.RedisConstants.LOGIN_USER_KEY;import static com.hmdp.utils.RedisConstants.LOGIN_USER_TTL;@SpringBootTest public class yibuTest { @Resource IUserService userService; @Resource private StringRedisTemplate stringRedisTemplate; @Test public void testGetAll () { List<User> users = userService.list(); users.forEach( user -> { String token = UUID.randomUUID().toString(); UserDTO userDTO = BeanUtil.copyProperties(user, UserDTO.class); File file = new File ("D:\\code\\token.txt" ); FileOutputStream output = null ; try { output = new FileOutputStream (file, true ); byte [] bytes = token.getBytes(); output.write(bytes); output.write("\r\n" .getBytes()); } catch (Exception e) { throw new RuntimeException (e); } finally { try { output.close(); } catch (IOException e) { throw new RuntimeException (e); } } Map<String, Object> userMap = BeanUtil.beanToMap(userDTO, new HashMap <>(), CopyOptions.create() .setIgnoreNullValue(true ) .setFieldValueEditor((fieldName, fieldValue) -> fieldValue.toString())); String tokenKey = LOGIN_USER_KEY + token; stringRedisTemplate.opsForHash().putAll(tokenKey, userMap); stringRedisTemplate.expire(tokenKey, LOGIN_USER_TTL, TimeUnit.MINUTES); } ); } }
更改优惠券数量为200,并清空订单表
注意优惠券的过期时间!!!!!
执行完之后,库存扣减为0,订单表中正好200条订单
jmeter显示结果如下:还是比较慢的
第二次测试
第三次测试
1、优化思路 之前的流程如下:
2、安装rabbitMQ 已有centOS7
,docker
,finalshell
安装参考链接:rabbitMQ安装参考
安装Erlang 成功截图
rabbitmq启动成功
开启web管理功能
开启后,在本机访问虚拟机ip加端口号15672即可访问
然后是创建用户。因为在finalshell开启了rabbitmq后,就不能继续输入命令了。所以这里我是先在centos里面开启rabbitmq,然后再在finalshell输入创建用户的命令。执行后如下:
然后登录web管理页面:
3.实现异步处理的代码 新增代码结构如下:
配置mq通信为主题模式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 package com.hmdp.config;import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.BindingBuilder;import org.springframework.context.annotation.Bean;import org.springframework.amqp.core.TopicExchange;import org.springframework.amqp.core.Queue;import org.springframework.context.annotation.Configuration;@Configuration public class RabbitMQTopicConfig { public static final String QUEUE = "seckillQueue" ; public static final String EXCHANGE = "seckillExchange" ; public static final String ROUTINGKEY = "seckill.lua.#" ; @Bean public Queue queue () { return new Queue (QUEUE); } @Bean public TopicExchange topicExchange () { return new TopicExchange (EXCHANGE); } @Bean public Binding binding () { return BindingBuilder.bind(queue()).to(topicExchange()).with(ROUTINGKEY); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 package com.hmdp.rabbitmq;import com.hmdp.config.RabbitMQTopicConfig;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Service;@Slf4j @Service public class MQSender { @Autowired private RabbitTemplate rabbitTemplate; private static final String ROUTINGKEY = "seckill.lua.message" ; public void sendSeckillMessage (String msg) { log.info("发送消息" +msg); rabbitTemplate.convertAndSend(RabbitMQTopicConfig.EXCHANGE,ROUTINGKEY,msg); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 package com.hmdp.rabbitmq;import com.alibaba.fastjson.JSON;import com.hmdp.config.RabbitMQTopicConfig;import com.hmdp.entity.VoucherOrder;import com.hmdp.service.ISeckillVoucherService;import com.hmdp.service.IVoucherOrderService;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Service;import org.springframework.transaction.annotation.Transactional;import javax.annotation.Resource;@Slf4j @Service public class MQReceiver { @Resource IVoucherOrderService voucherOrderService; @Resource ISeckillVoucherService seckillVoucherService; @Transactional @RabbitListener(queues = RabbitMQTopicConfig.QUEUE) public void receiveSeckillMessage (String msg) { log.info("接收到消息: " +msg); VoucherOrder voucherOrder = JSON.parseObject(msg, VoucherOrder.class); Long voucherId = voucherOrder.getVoucherId(); Long userId = voucherOrder.getUserId(); int count = voucherOrderService.query().eq("user_id" ,userId).eq("voucher_id" , voucherId).count(); if (count>0 ){ log.error("该用户已购买过" ); return ; } log.info("扣减库存" ); boolean success = seckillVoucherService .update() .setSql("stock = stock-1" ) .eq("voucher_id" , voucherId) .gt("stock" ,0 ) .update(); if (!success){ log.error("库存不足" ); return ; } voucherOrderService.save(voucherOrder); } }
注意这里需要手动向redis中添加库存键及数量,键名就是下面lua代码中stockKey
的名字,这里voucherId
为7,键名就是stock:7
,这里也算一个bug ,搞了有一阵子
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 local voucherId = ARGV[1 ]local userId = ARGV[2 ]local stockKey = 'stock:' .. voucherIdlocal orderKey = 'order:' .. voucherIdif (tonumber (redis.call('get' ,stockKey)) <= 0 )then return 1 end if (redis.call('sismember' ,orderKey,userId) == 1 ) then return 2 end redis.call('incrby' ,stockKey,-1 ) redis.call('sadd' ,orderKey,userId) return 0
下面是VoucherOrderServiceImpl的代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 public class VoucherOrderServiceImpl extends ServiceImpl <VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService { @Resource private MQSender mqSender; @Resource private RedisIdWorker redisIdWorker; @Resource private StringRedisTemplate stringRedisTemplate; private static final DefaultRedisScript<Long> SECKILL_SCRIPT; static { SECKILL_SCRIPT = new DefaultRedisScript <>(); SECKILL_SCRIPT.setLocation(new ClassPathResource ("seckill.lua" )); SECKILL_SCRIPT.setResultType(Long.class); } @Override public Result seckillVocher (Long voucherId) { Long userId = UserHolder.getUser().getId(); Long r = stringRedisTemplate.execute( SECKILL_SCRIPT, Collections.emptyList(), voucherId.toString(), userId.toString() ); int result = r.intValue(); if (result != 0 ) { return Result.fail(r == 1 ? "库存不足" : "该用户重复下单" ); } VoucherOrder voucherOrder = new VoucherOrder (); long orderId = redisIdWorker.nextId("order" ); voucherOrder.setId(orderId); voucherOrder.setUserId(userId); voucherOrder.setVoucherId(voucherId); mqSender.sendSeckillMessage(JSON.toJSONString(voucherOrder)); return Result.ok(orderId); } }
4.结果分析 注意在用jmeter测试之前,先清除之前的数据,如下:
实验结果如下:
可以看到最小耗时2ms,最大耗时112ms,提高了将近20倍,平均耗时34ms,提高了将近30倍左右