抱歉,您的浏览器无法访问本站
本页面需要浏览器支持(启用)JavaScript
了解详情 >

一、登录

redis启动命令:在finalsell中systemctl start redis

1、发送验证码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* 发送验证码
* @param phone
* @param session
* @return
*/
public Result sendCode(String phone, HttpSession session) {
//1、校验手机号格式
if (RegexUtils.isPhoneInvalid(phone)) {
return Result.fail("手机号无效");
}
//2、如果成功,生成6位的验证码
String code = RandomUtil.randomNumbers(6);
//3、保存到session
session.setAttribute("code", code);
//4、发送验证码需要调第三方接口,暂不做,打印日志
log.info("验证码为:{}", code);
return Result.ok();
}

2、登录功能

这里一开始忘了在createNewUser用mp的sava将新生成的user保存到数据库了,我说怎么半天不见数据库有新数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
/**
* 登录
* @param loginForm
* @param session
* @return
*/
public Result login(LoginFormDTO loginForm, HttpSession session) {
//1、校验手机号是否合规
String phone = loginForm.getPhone();
if (RegexUtils.isPhoneInvalid(phone)) {
return Result.fail("手机号无效");
}

//2、校验验证码是否有效
String code = loginForm.getCode();
Object cacheCode = session.getAttribute("code");
if(cacheCode == null || !cacheCode.equals(code)){
//验证码无效
return Result.fail("验证码无效");
}

//3、查询用户——此处使用MP
User user = query().eq("phone", phone).one();

if(user == null){
//user不存在,则创建一个新的user
user = createNewUser(phone);
}

//4、根据用户是否存在,最终都保存到session中
//BeanUtil.copyProperties(user, UserDTO.class)将user的信息复制到UserDTO并创建一个UserDTO对象返回
session.setAttribute("user", BeanUtil.copyProperties(user, UserDTO.class));
return Result.ok();
}

private User createNewUser(String phone) {
//1、创建用户
User user = new User();
user.setPhone(phone);
user.setNickName("user_" + RandomUtil.randomString(10));

//2、保存用户,用的MP
save(user);
return user;
}

3、登录校验拦截器

现在有一个问题:对于其它业务(Controller),每个都要写登录逻辑岂不是很麻烦

答:引入拦截器,可以在所有Controller之前判断是否放行

又有一个问题:如何把拦截器拦截到的信息传给其它Controller呢?

答:ThreadLocal,每个请求都是一个独立的线程,也可以避免并发问题的出现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class LoginInterceptor implements HandlerInterceptor {
@Override
/**
* Controller执行之前调用此方法,做登录校验
*/
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
//1、获取session
HttpSession session = request.getSession();

//2、从session中获取user
Object user = session.getAttribute("user");

//3、判断user是否存在
if(user == null){
return false;
}

//否则就保存到当前线程
UserHolder.saveUser((UserDTO) user);

return true;
}

/**
* 渲染之后,将保存的信息删除,避免内存泄露
* @param request
* @param response
* @param handler
* @param ex
* @throws Exception
*/
@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
//直接调用UserHolder
UserHolder.removeUser();
}
}

拦截器写好了还没有生效,需要配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Configuration
public class MvcConfig implements WebMvcConfigurer {
@Override
public void addInterceptors(InterceptorRegistry registry) {
registry.addInterceptor(new LoginInterceptor()).
excludePathPatterns( //排除不需要被拦截登录的路径
"/shop/**",
"/voucher/**",
"shop-type/**",
"/upload/**",
"/blog/hot",
"/user/code",
"/user/login");
}
}

4 、Redis代替session解决session问题

STAR法则

S(Situation ):尽管现在是一个单体式的架构,但为了应对并发,肯定要布置多台tomcat服务器形成负载均衡的集群。这时就有问题了:因为多台tomcat不能共享空间,可能导致多次请求打到不同的tomcat服务器上,出现数据丢失问题

T(Task):session的替代方案应满足以下需求:

  1. 数据共享
  2. 内存存储(因为session就是基于内存的,所以读写效率才会比较高)
  3. key-value结构

A(Action )

1
2
//修改发送验证码,保存验证码到redis,并设置有效期防止堆积占用内存
stringRedisTemplate.opsForValue().set(LOGIN_CODE_KEY + phone, code, LOGIN_CODE_TTL, TimeUnit.MINUTES);

接下来修改登录逻辑,在登录逻辑实现中修改代码如下:

1
2
3
4
5
6
7
//2、校验验证码是否有效,从redis中取
String code = loginForm.getCode();
Object cacheCode = redisTemplate.opsForValue().get(LOGIN_CODE_KEY + phone);
if(cacheCode == null || !cacheCode.equals(code)){
//验证码无效
return Result.fail("验证码无效");
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
//保存用户信息到redis中
//1、生成token作为key
UUID token = UUID.randomUUID(true);

//2、将user转为map存储
UserDTO userDTO = BeanUtil.copyProperties(user, UserDTO.class);
Map<String, Object> userMap = BeanUtil.beanToMap(userDTO);

//3、存储到redis
redisTemplate.opsForHash().putAll(LOGIN_USER_KEY + token, userMap);

//3、设置token有效期
redisTemplate.expire(LOGIN_USER_KEY + token, LOGIN_USER_TTL, TimeUnit.MINUTES);

return Result.ok(token);

拦截器中代码也要修改,同时只要拦截到请求,说明用户还在活跃状态,就刷新token的有效期(不然可能会出现用户一直在使用但token有效期过了就要重新登录的错误)

5、双重拦截器解决非登录页面token刷新问题

但是又有一个问题,因为我们第一个拦截器只拦截了需要登录的页面的请求。如果用户一直处于不需要登录的页面,是在活跃的,但这时token有效期过了再发请求可能就要重新登录,所以可以再加一个拦截器来拦截一切请求,只要来了请求就说明用户还在活跃,然后更新token有效期。

最终双重拦截器的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public class RefreshTokenInterceptor implements HandlerInterceptor {

private StringRedisTemplate stringRedisTemplate;

public RefreshTokenInterceptor(StringRedisTemplate stringRedisTemplate){
this.stringRedisTemplate = stringRedisTemplate;
}

@Override
/**
* Controller执行之前调用此方法,做登录校验
*/
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {

//1、获取token
String token = request.getHeader("authorization");
if (StrUtil.isBlank(token)) {
return true;
}

//2、根据token从redis中获取用户
Map<Object, Object> userMap = stringRedisTemplate.opsForHash().entries(RedisConstants.LOGIN_USER_KEY + token);

//3、判断user是否存在
if(userMap.isEmpty()){
return true;
}

//4、将map转为dto
UserDTO userDTO = BeanUtil.fillBeanWithMap(userMap, new UserDTO(), false);

//保存到当前线程
UserHolder.saveUser(userDTO);

return true;
}

/**
* 渲染之后,将保存的信息删除,避免内存泄露
* @param request
* @param response
* @param handler
* @param ex
* @throws Exception
*/
@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
//直接调用UserHolder
UserHolder.removeUser();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class LoginInterceptor implements HandlerInterceptor {


@Override
/**
* Controller执行之前调用此方法,做登录校验
*/
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
if (UserHolder.getUser() == null) {
return false;
}

return true;
}
}

同时需要在MvcConfig中更新配置,注意用order指定拦截器顺序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Configuration
public class MvcConfig implements WebMvcConfigurer {
@Resource
private StringRedisTemplate stringRedisTemplate;
@Override
public void addInterceptors(InterceptorRegistry registry) {
registry.addInterceptor(new LoginInterceptor()).
excludePathPatterns( //排除不需要被拦截登录的路径
"/shop/**",
"/voucher/**",
"/shop-type/**",
"/upload/**",
"/blog/hot",
"/user/code",
"/user/login").order(1);

registry.addInterceptor(new RefreshTokenInterceptor(stringRedisTemplate)).addPathPatterns("/**").order(0);
}
}

双重拦截器示意图

二、商户查询缓存

1、添加商铺缓存

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@Service
public class ShopServiceImpl extends ServiceImpl<ShopMapper, Shop> implements IShopService {

@Autowired
private StringRedisTemplate stringRedisTemplate;

@Override
public Result queryById(Long id) {
String key = CACHE_SHOP_KEY + id;

//1、从redis中查,查到了直接返回
String shopJson = stringRedisTemplate.opsForValue().get(key);

if(StrUtil.isNotBlank(shopJson)){
Shop shop = JSONUtil.toBean(shopJson, Shop.class);
return Result.ok(shop);
}


//2、没有查到就去数据库查,如果数据库也没有,就返回
Shop shop = getById(id);
if(shop == null)
return Result.fail("商铺信息不存在");

//3、如果数据库有,保存到redis便于下次查找,然后再返回
stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(shop));
return Result.ok(shop);
}
}

2、练习:添加商铺类型缓存

3、缓存更新策略

issue1、采用什么缓存更新策略?

内存淘汰 超时剔除 主动更新
说明 不用自己维护, 利用Redis的内存淘汰机制, 当内存不足时自动淘汰部分数据。 下次查询时更新缓存。 给缓存数据添加TTL时间, 到期后自动删除缓存。 下次查询时更新缓存。 编写业务逻辑, 在修改数据库的同时, 更新缓存。
一致性 一般
维护成本
  • 业务场景
    • 低一致性需求:使用内存淘汰机制,例如店铺类型的查询缓存(因为这个很长一段时间都不需要更新)
    • 高一致性需求:主动更新,并以超时剔除作为兜底方案,例如店铺详情查询的缓存

ans1:本项目采用主动更新+超时剔除(兜底)

更新方案采用双写方案(Cache Aside Pattern):人工编码方式,缓存调用者在更新完数据库后再去更新缓存。

issue2:在对缓存的更新上,对比删除缓存与更新缓存

  • 更新缓存:每次更新数据库都需要更新缓存,无效写操作较多
  • 删除缓存:更新数据库时让缓存失效,再次查询时更新缓存

ans2:所以我们采用直接删除缓存

issue3:是先操作缓存(先删除缓存,再更新数据库)还是先操作数据库(先更新数据库,再操作缓存)?

对比如下:

  • 先删除缓存,再更新数据库:

    删除缓存的操作很快,但是更新数据库的操作相对较慢,如果此时有一个线程2刚好进来查询缓存,由于我们刚刚才删除缓存,所以线程2需要查询数据库,并写入缓存,但是我们更新数据库的操作还未完成,所以线程2查询到的数据是脏数据,出现线程安全问题

    先删后更

  • 先更新数据库,再操作缓存

    当线程1在查询缓存且未命中,此时线程1查询数据,查询完准备写入缓存时,由于没有加锁线程2乘虚而入,线程2在这期间对数据库进行了更新,此时线程1将旧数据返回了,出现了脏读,这个事件发生的概率很低,因为先是需要满足缓存未命中,且在写入缓存的那段事件内有一个线程进行更新操作,缓存的查询很快,这段空隙时间很小,所以出现脏读现象的概率也很低

    这种方式的不足之处:存在脏读现象,但概率较小

    先更后删

ans3:虽然这二者都存在线程安全问题,但是相对来说,后者出现线程安全问题的概率相对较低,所以我们最终采用后者先操作数据库,再删除缓存的方案

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
/**
* 根据id查询商铺数据(查询时,重建缓存)
*
* @param id
* @return
*/
@Override
public Result queryById(Long id) {
String key = CACHE_SHOP_KEY + id;
// 1、从Redis中查询店铺数据
String shopJson = stringRedisTemplate.opsForValue().get(key);

Shop shop = null;
// 2、判断缓存是否命中
if (StrUtil.isNotBlank(shopJson)) {
// 2.1 缓存命中,直接返回店铺数据
shop = JSONUtil.toBean(shopJson, Shop.class);
return Result.ok(shop);
}
// 2.2 缓存未命中,从数据库中查询店铺数据
shop = this.getById(id);

// 4、判断数据库是否存在店铺数据
if (Objects.isNull(shop)) {
// 4.1 数据库中不存在,返回失败信息
return Result.fail("店铺不存在");
}
// 4.2 数据库中存在,重建缓存,并返回店铺数据
stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(shop), CACHE_SHOP_TTL, TimeUnit.MINUTES);
return Result.ok(shop);
}

/**
* 更新商铺数据(更新时,更新数据库,删除缓存)
*
* @param shop
* @return
*/
@Transactional
@Override
public Result updateShop(Shop shop) {
// 参数校验, 略

// 1、更新数据库中的店铺数据
boolean f = this.updateById(shop);
if (!f){
// 缓存更新失败,抛出异常,事务回滚
throw new RuntimeException("数据库更新失败");
}
// 2、删除缓存
f = stringRedisTemplate.delete(CACHE_SHOP_KEY + shop.getId());
if (!f){
// 缓存删除失败,抛出异常,事务回滚
throw new RuntimeException("缓存删除失败");
}
return Result.ok();
}

4、缓存穿透

概念:客户端请求的数据在缓存中和数据库中都不存在,这样缓存永远不会生效,这些请求都会打到数据库。

解决方案:

  • 缓存空对象
    • 优点:实现简单,维护方便
    • 缺点:额外的内存消耗,可能造成短期的不一致
  • 布隆过滤
    • 优点:内存占用较少,没有多余key
    • 缺点:实现复杂,存在误判可能(有穿透的风险),无法删除数据

本文采用是缓存缓存空对象

缓存穿透

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@Override
public Result queryById(Long id) {
//先从Redis中查,这里的常量值是固定的前缀 + 店铺id
String shopJson = stringRedisTemplate.opsForValue().get(CACHE_SHOP_KEY + id);
//如果不为空(查询到了),则转为Shop类型直接返回
if (StrUtil.isNotBlank(shopJson)) {
Shop shop = JSONUtil.toBean(shopJson, Shop.class);
return Result.ok(shop);
}
//如果查询到的是空字符串,则说明是我们缓存的空数据
if (shopjson != null) {
return Result.fail("店铺不存在!!");
}
//否则去数据库中查
Shop shop = getById(id);
//查不到,则将空字符串写入Redis
if (shop == null) {
//这里的常量值是2分钟
stringRedisTemplate.opsForValue().set(CACHE_SHOP_KEY + id, "", CACHE_NULL_TTL, TimeUnit.MINUTES);
return Result.fail("店铺不存在!!");
}
//查到了则转为json字符串
String jsonStr = JSONUtil.toJsonStr(shop);
//并存入redis,设置TTL
stringRedisTemplate.opsForValue().set(CACHE_SHOP_KEY + id, jsonStr, CACHE_SHOP_TTL, TimeUnit.MINUTES);
//最终把查询到的商户信息返回给前端
return Result.ok(shop);
}

5、缓存雪崩

  1. 概念:缓存雪崩是指在同一时段大量的缓存key同时失效或者Redis服务宕机,导致大量请求到达数据库,带来巨大压力。

  2. 解决方案

    1. 给不同的Key的TTL添加随机值
    2. 利用Redis集群提高服务的可用性
    3. 给缓存业务添加降级限流策略,比如快速失败机制,让请求尽可能打不到数据库上
    4. 给业务添加多级缓存

    这里采用方案1

6、缓存击穿

  1. 概念:缓存击穿问题也叫热点Key问题,就是一个被高并发访问并且缓存重建业务较复杂的key突然失效了,无数的请求访问会在瞬间给数据库带来巨大的冲击。

  2. 解决方案

    1. 互斥锁(时间换空间)
      优点:内存占用小,一致性高,实现简单
      缺点:性能较低,容易出现死锁

    2. 逻辑过期(空间换时间)
      优点:性能高
      缺点:内存占用较大,容易出现脏读

      两者相比较,互斥锁更加易于实现,但是容易发生死锁,且锁导致并行变成串行,导致系统性能下降,逻辑过期实现起来相较复杂,且需要耗费额外的内存,但是通过开启子线程重建缓存,使原来的同步阻塞变成异步,提高系统的响应速度,但是容易出现脏读

    缓存击穿解决方法

    互斥锁解决

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    //互斥锁解决方案

    /**
    * 根据id查询商铺数据
    *
    * @param id
    * @return
    */
    @Override
    public Result queryById(Long id) {
    String key = CACHE_SHOP_KEY + id;
    // 1、从Redis中查询店铺数据,并判断缓存是否命中
    Result result = getShopFromCache(key);
    if (Objects.nonNull(result)) {
    // 缓存命中,直接返回
    return result;
    }
    try {
    // 2、缓存未命中,需要重建缓存,判断能否能够获取互斥锁
    String lockKey = LOCK_SHOP_KEY + id;
    boolean isLock = tryLock(lockKey);
    if (!isLock) {
    // 2.1 获取锁失败,已有线程在重建缓存,则休眠重试
    Thread.sleep(50);
    return queryById(id);
    }
    // 2.2 获取锁成功,判断缓存是否重建,防止堆积的线程全部请求数据库(所以说双检是很有必要的)
    result = getShopFromCache(key);
    if (Objects.nonNull(result)) {
    // 缓存命中,直接返回
    return result;
    }

    // 3、从数据库中查询店铺数据,并判断数据库是否存在店铺数据
    Shop shop = this.getById(id);
    if (Objects.isNull(shop)) {
    // 数据库中不存在,缓存空对象(解决缓存穿透),返回失败信息
    stringRedisTemplate.opsForValue().set(key, "", CACHE_NULL_TTL, TimeUnit.SECONDS);
    return Result.fail("店铺不存在");
    }

    // 4、数据库中存在,重建缓存,响应数据
    stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(shop),
    CACHE_SHOP_TTL, TimeUnit.MINUTES);
    return Result.ok(shop);
    }catch (Exception e){
    throw new RuntimeException("发生异常");
    } finally {
    // 5、释放锁(释放锁一定要记得放在finally中,防止死锁)
    unlock(key);
    }
    }

    /**
    * 从缓存中获取店铺数据
    * @param key
    * @return
    */
    private Result getShopFromCache(String key) {
    String shopJson = stringRedisTemplate.opsForValue().get(key);
    // 判断缓存是否命中
    if (StrUtil.isNotBlank(shopJson)) {
    // 缓存数据有值,说明缓存命中了,直接返回店铺数据
    Shop shop = JSONUtil.toBean(shopJson, Shop.class);
    return Result.ok(shop);
    }
    // 判断缓存中查询的数据是否是空字符串(isNotBlank把 null 和 空字符串 给排除了)
    if (Objects.nonNull(shopJson)) {
    // 当前数据是空字符串,说明缓存也命中了(该数据是之前缓存的空对象),直接返回失败信息
    return Result.fail("店铺不存在");
    }
    // 缓存未命中(缓存数据既没有值,又不是空字符串)
    return null;
    }


    /**
    * 获取锁
    *
    * @param key
    * @return
    */
    private boolean tryLock(String key) {
    Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10, TimeUnit.SECONDS);
    // 拆箱要判空,防止NPE
    return BooleanUtil.isTrue(flag);
    }

    /**
    * 释放锁
    *
    * @param key
    */
    private void unlock(String key) {
    stringRedisTemplate.delete(key);
    }

    逻辑过期解决

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    //逻辑过期实现
    /**
    * 缓存重建线程池
    */
    public static final ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(10);

    /**
    * 根据id查询商铺数据
    *
    * @param id
    * @return
    */
    @Override
    public Result queryById(Long id) {
    String key = CACHE_SHOP_KEY + id;
    // 1、从Redis中查询店铺数据,并判断缓存是否命中
    String shopJson = stringRedisTemplate.opsForValue().get(key);
    if (StrUtil.isBlank(shopJson)) {
    // 1.1 缓存未命中,直接返回失败信息
    return Result.fail("店铺数据不存在");
    }
    // 1.2 缓存命中,将JSON字符串反序列化未对象,并判断缓存数据是否逻辑过期
    RedisData redisData = JSONUtil.toBean(shopJson, RedisData.class);
    // 这里需要先转成JSONObject再转成反序列化,否则可能无法正确映射Shop的字段
    JSONObject data = (JSONObject) redisData.getData();
    Shop shop = JSONUtil.toBean(data, Shop.class);
    LocalDateTime expireTime = redisData.getExpireTime();
    if (expireTime.isAfter(LocalDateTime.now())) {
    // 当前缓存数据未过期,直接返回
    return Result.ok(shop);
    }

    // 2、缓存数据已过期,获取互斥锁,并且重建缓存
    String lockKey = LOCK_SHOP_KEY + id;
    boolean isLock = tryLock(lockKey);
    if (isLock) {
    // 获取锁成功,开启一个子线程去重建缓存
    CACHE_REBUILD_EXECUTOR.submit(() -> {
    try {
    this.saveShopToCache(id, 20L);
    } catch (InterruptedException e) {
    throw new RuntimeException(e);
    } finally {
    unlock(lockKey);
    }
    });
    }

    // 3、获取锁失败,再次查询缓存,判断缓存是否重建(这里双检是有必要的)
    shopJson = stringRedisTemplate.opsForValue().get(key);
    if (StrUtil.isBlank(shopJson)) {
    // 3.1 缓存未命中,直接返回失败信息
    return Result.fail("店铺数据不存在");
    }
    // 3.2 缓存命中,将JSON字符串反序列化未对象,并判断缓存数据是否逻辑过期
    redisData = JSONUtil.toBean(shopJson, RedisData.class);
    // 这里需要先转成JSONObject再转成反序列化,否则可能无法正确映射Shop的字段
    data = (JSONObject) redisData.getData();
    shop = JSONUtil.toBean(data, Shop.class);
    expireTime = redisData.getExpireTime();
    if (expireTime.isAfter(LocalDateTime.now())) {
    // 当前缓存数据未过期,直接返回
    return Result.ok(shop);
    }

    // 4、返回过期数据
    return Result.ok(shop);
    }

    /**
    * 从缓存中获取店铺数据
    * @param key
    * @return
    */
    private Result getShopFromCache(String key) {
    String shopJson = stringRedisTemplate.opsForValue().get(key);
    // 判断缓存是否命中
    if (StrUtil.isNotBlank(shopJson)) {
    // 缓存数据有值,说明缓存命中了,直接返回店铺数据
    Shop shop = JSONUtil.toBean(shopJson, Shop.class);
    return Result.ok(shop);
    }
    // 判断缓存中查询的数据是否是空字符串(isNotBlank把 null 和 空字符串 给排除了)
    if (Objects.nonNull(shopJson)) {
    // 当前数据是空字符串,说明缓存也命中了(该数据是之前缓存的空对象),直接返回失败信息
    return Result.fail("店铺不存在");
    }
    // 缓存未命中(缓存数据既没有值,又不是空字符串)
    return null;
    }

    /**
    * 将数据保存到缓存中
    *
    * @param id 商铺id
    * @param expireSeconds 逻辑过期时间
    */
    public void saveShopToCache(Long id, Long expireSeconds) throws InterruptedException {
    // 从数据库中查询店铺数据
    Shop shop = this.getById(id);
    Thread.sleep(200);
    // 封装逻辑过期数据
    RedisData redisData = new RedisData();
    redisData.setData(shop);
    redisData.setExpireTime(LocalDateTime.now().plusSeconds(expireSeconds));
    // 将逻辑过期数据存入Redis中
    stringRedisTemplate.opsForValue().set(CACHE_SHOP_KEY + id, JSONUtil.toJsonStr(redisData));
    }

    小结
    为了解决数据一致性问题,我们可以选择适当的缓存更新策略:

    以缓存主动更新(双写方案+删除缓存模式+先操作数据库后操作缓存+事务)为主,超时剔除为辅

    1. 查询时,先查询缓存,缓存命中直接返回,缓存未命中查询数据库并重建缓存,返回查询结果
    2. 更新时,先修改数据删除缓存,使用事务保证缓存和数据操作两者的原子性、

    除了会遇到数据一致性问题意外,我们还会遇到缓存穿透、缓存雪崩、缓存击穿等问题

    1. 对于缓存穿透,我们采用了缓存空对象解决
    2. 对于缓存击穿,我们分别演示了互斥锁(setnx实现方式)和逻辑过期两种方式解决

三、优惠券秒杀

1、全局唯一ID

自增ID存在的问题

当用户抢购时,就会生成订单并保存到tb_voucher_order这张表中,而订单表如果使用数据库自增ID就存在一些问题:

  1. id的规律性太明显,容易出现信息的泄露,被不怀好意的人伪造请求
  2. 受单表数据量的限制,MySQL中表能够存储的数据有限,会出现分库分表的情况,id不能够一直自增

那么该如何解决呢?我们需要使用分布式ID(也可以叫全局唯一ID),分布式ID满足以下特点:

  • 全局唯一性:分布式ID保证在整个分布式系统中唯一性,不会出现重复的标识符。这对于区分和追踪系统中的不同实体非常重要。
  • 高可用性:分布式ID生成器通常被设计为高可用的组件,可以通过水平扩展、冗余备份或集群部署来确保服务的可用性。即使某个节点或组件发生故障,仍然能够正常生成唯一的ID标识符。
  • 安全性:分布式ID生成器通常是独立于应用程序和业务逻辑的。它们被设计为一个单独的组件或服务,可以被各种应用程序和服务所共享和使用,使得各个应用程序之间的ID生成过程互不干扰。
  • 高性能:分布式ID生成器通常要求在很短的时间内生成唯一的标识符。为了实现低延迟,设计者通常采用高效的算法和数据结构,以及优化的网络通信和存储策略。
  • 递增性:分布式ID通常可以被设计成可按时间顺序排序,以便更容易对生成的ID进行索引、检索或排序操作。这对于一些场景,如日志记录和事件溯源等,非常重要。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
//生成分布式ID
@Component
public class RedisIdWorker {
@Resource
private StringRedisTemplate stringRedisTemplate;
/**
* 开始时间戳
*/
private static final long BEGIN_TIMESTAMP = 1640995200;
/**
* 序列化位数
*/
private static final int COUNT_BITS = 32;

/**
* 生成分布式ID
* @param keyPrefix
* @return
*/
public long nextId(String keyPrefix){
// 1、生成时间戳
LocalDateTime now = LocalDateTime.now();
long nowSecond = now.toEpochSecond(ZoneOffset.UTC);
long timestamp = nowSecond - BEGIN_TIMESTAMP;
// 2、生成序列号
// 以当天的时间戳为key,防止一直自增下去导致超时,这样每天的极限都是 2^{31}
String date = now.format(DateTimeFormatter.ofPattern("yyyyMMdd"));
Long count = stringRedisTemplate.opsForValue().increment("icr:" + keyPrefix + ":" + date);
// 3、拼接并返回
return timestamp << COUNT_BITS | count;
}
}

注意:测试类包括CountDownLatch的使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
//测试类
@SpringBootTest
public class RedisIdWorkerTest {
@Resource
private RedisIdWorker redisIdWorker;

private ExecutorService es = Executors.newFixedThreadPool(500);

/**
* 测试分布式ID生成器的性能,以及可用性
*/
@Test
public void testNextId() throws InterruptedException {
// 使用CountDownLatch让线程同步等待
CountDownLatch latch = new CountDownLatch(300);
// 创建线程任务
Runnable task = () -> {
for (int i = 0; i < 100; i++) {
long id = redisIdWorker.nextId("order");
System.out.println("id = " + id);
}
// 等待次数-1
latch.countDown();
};
long begin = System.currentTimeMillis();
// 创建300个线程,每个线程创建100个id,总计生成3w个id
for (int i = 0; i < 300; i++) {
es.submit(task);
}
// 线程阻塞,直到计数器归0时才全部唤醒所有线程
latch.await();
long end = System.currentTimeMillis();
System.out.println("生成3w个id共耗时" + (end - begin) + "ms");
}
}

test

2、秒杀券下单功能

首先添加秒杀券,tb_voucher保存的是普通优惠券,而tb_seckill_voucher保存的是秒杀券的信息,也就是说,秒杀券也是优惠券,只是秒杀券比优惠券有更多的一些信息。在这里踩了一个坑。就是设置秒杀券的end_time时,这个时间必须在你电脑当前时间之后,否则就会不显示!!

然后是秒杀券的下单功能

msq

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
@Transactional
public Result seckillVocher(Long voucherId) {
// 1、查询秒杀券
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
log.info("当前秒杀券");
// 2、判断秒杀券是否合法
if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
// 秒杀券的开始时间在当前时间之后
return Result.fail("秒杀尚未开始");
}
if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
// 秒杀券的结束时间在当前时间之前
return Result.fail("秒杀已结束");
}
if (voucher.getStock() < 1) {
return Result.fail("秒杀券已抢空");
}
// 5、秒杀券合法,则秒杀券抢购成功,秒杀券库存数量减一
boolean flag = seckillVoucherService.update(new LambdaUpdateWrapper<SeckillVoucher>()
.eq(SeckillVoucher::getVoucherId, voucherId)
.setSql("stock = stock -1"));
if (!flag){
throw new RuntimeException("秒杀券扣减失败");
}
// 6、秒杀成功,创建对应的订单,并保存到数据库
VoucherOrder voucherOrder = new VoucherOrder();
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
voucherOrder.setUserId(UserHolder.getUser().getId());
voucherOrder.setVoucherId(voucherOrder.getId());
save(voucherOrder);
// 返回订单id
return Result.ok(orderId);
}

这里也踩了一个坑,就是执行了半天点击但发现优惠券并没有扣除,甚至controller处的写的打印信息都没有在控制台打印,原因好像跟访问路径有关系??刚解决完就忘了。。鱼的记忆-_-

3、单体下一人多单超卖问题

上一节我们通过分布式ID+事务成功完成了优惠券秒杀功能,并且在测试后发现逻辑跑通了,看上去已经成功的解决了秒杀优惠券功能。但是前面我们只是正常的测试,那如果换到高并发的场景下能否成功解决?现在就让我们使用 Jmeter 来进行压力测试看看吧!

jmeter设置
sz

jem

经过测试, 发现有超卖问题。为什么会产生超卖呢?

cm

线程1查询库存,发现库存充足,创建订单,然后准备对库存进行扣减,但此时线程2和线程3也进行查询,同样发现库存充足,然后线程1执行完扣减操作后,库存变为了0,线程2和线程3同样完成了库存扣减操作,最终导致库存变成了负数!这就是超卖问题的完整流程

那么我们该如何有效防止超卖问题的发生呢,以下提供几种常见的解决方案

超卖问题的常见解决方案:

  • 悲观锁,认为线程安全问题一定会发生,因此操作数据库之前都需要先获取锁,确保线程串行执行。常见的悲观锁有:synchronized、lock
  • 乐观锁,认为线程安全问题不一定发生,因此不加锁,只会在更新数据库的时候去判断有没有其它线程对数据进行修改,如果没有修改则认为是安全的,直接更新数据库中的数据即可,如果修改了则说明不安全,直接抛异常或者等待重试。常见的实现方式有:版本号法、CAS操作、乐观锁算法

悲观锁和乐观锁的比较

  • 悲观锁比乐观锁的性能低:悲观锁需要先加锁再操作,而乐观锁不需要加锁,所以乐观锁通常具有更好的性能。
  • 悲观锁比乐观锁的冲突处理能力低:悲观锁在冲突发生时直接阻塞其他线程,乐观锁则是在提交阶段检查冲突并进行重试。
  • 悲观锁比乐观锁的并发度低:悲观锁存在锁粒度较大的问题,可能会限制并发性能;而乐观锁可以实现较高的并发度。
  • 应用场景:两者都是互斥锁,悲观锁适合写入操作较多、冲突频繁的场景;乐观锁适合读取操作较多、冲突较少的场景。

拓展:CAS

CAS(Compare and Swap)是一种并发编程中常用的原子操作,用于解决多线程环境下的数据竞争问题。它是乐观锁算法的一种实现方式。

CAS操作包含三个参数:内存地址V、旧的预期值A和新的值B。CAS的执行过程如下:

  • 比较(Compare):将内存地址V中的值与预期值A进行比较。
  • 判断(Judgment):如果相等,则说明当前值和预期值相等,表示没有发生其他线程的修改。
  • 交换(Swap):使用新的值B来更新内存地址V中的值。

CAS操作是一个原子操作,意味着在执行过程中不会被其他线程中断,保证了线程安全性。如果CAS操作失败(即当前值与预期值不相等),通常会进行重试,直到CAS操作成功为止。

CAS操作适用于精细粒度的并发控制,可以避免使用传统的加锁机制带来的性能开销和线程阻塞。然而,CAS操作也存在一些限制和注意事项:

  • ABA问题:CAS操作无法感知到对象值从A变为B又变回A的情况,可能会导致数据不一致。为了解决ABA问题,可以引入版本号或标记位等机制。
  • 自旋开销:当CAS操作失败时,需要不断地进行重试,会占用CPU资源。如果重试次数过多或者线程争用激烈,可能会引起性能问题。
  • 并发性限制:如果多个线程同时对同一内存地址进行CAS操作,只有一个线程的CAS操作会成功,其他线程需要重试或放弃操作。

在Java中,提供了相关的CAS操作支持,如AtomicInteger、AtomicLong、AtomicReference等类,可以实现基于CAS操作的线程安全操作。

乐观锁解决一人多单超卖问题
cas1

CAS法类似与版本号法,但是不需要另外在添加一个 version 字段,而是直接使用库存替代版本号,线程1查询完库存后进行库存扣减操作,线程2在查询库存时,发现库存充足,也准备执行库存扣减操作,但是需要判断当前的库存是否是之前查询时的库存,结果发现库存数量发生了改变,这就说明数据库中的数据已经发生了修改,需要进行重试(或者直接抛异常中断)

qps200异常90%吞吐量1652.9

这又是什么原因呢?这就是乐观锁的弊端,我们只要发现数据修改就直接终止操作了,我们只需要修改一下判断条件,即只要库存大于0就可以进行修改,而不是库存数据修改我们就终止操作

1
2
3
4
5
// 5、秒杀券合法,则秒杀券抢购成功,秒杀券库存数量减一
boolean flag = seckillVoucherService.update(new LambdaUpdateWrapper<SeckillVoucher>()
.eq(SeckillVoucher::getVoucherId, voucherId)
.gt(SeckillVoucher::getStock, 0)
.setSql("stock = stock -1"));

执行完之后库存恰好为0,订单表中恰好是一百条

res1

res2

4、单体下一人一单问题

yryd

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
@Override
@Transactional
public Result seckillVocher(Long voucherId) {
// 1、查询秒杀券
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
log.info("当前秒杀券");
// 2、判断秒杀券是否合法
if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
// 秒杀券的开始时间在当前时间之后
return Result.fail("秒杀尚未开始");
}
if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
// 秒杀券的结束时间在当前时间之前
return Result.fail("秒杀已结束");
}
if (voucher.getStock() < 1) {
return Result.fail("秒杀券已抢空");
}
// 3、判断当前用户是否是第一单
int count = this.count(new LambdaQueryWrapper<VoucherOrder>()
.eq(VoucherOrder::getUserId, UserHolder.getUser().getId()));
if (count >= 1) {
// 当前用户不是第一单
return Result.fail("用户已购买");
}
// 4、用户是第一单,可以下单,秒杀券库存数量减一
boolean flag = seckillVoucherService.update(new LambdaUpdateWrapper<SeckillVoucher>()
.eq(SeckillVoucher::getVoucherId, voucherId)
.gt(SeckillVoucher::getStock, 0)
.setSql("stock = stock -1"));
if (!flag) {
throw new RuntimeException("秒杀券扣减失败");
}


// 6、秒杀成功,创建对应的订单,并保存到数据库
VoucherOrder voucherOrder = new VoucherOrder();
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
voucherOrder.setUserId(UserHolder.getUser().getId());
voucherOrder.setVoucherId(voucherOrder.getId());
save(voucherOrder);
// 返回订单id
return Result.ok(orderId);
}

现在是不是可以成功完成了一人一单的要求呢?让我们来使用 Jmeter 测一下吧

erawiuhggjf

通过测试,发现并没有达到我们想象中的目标,一个人只能购买一次,但是发现一个用户居然能够购买10次。这说明还是存在超卖问题!

问题原因:出现这个问题的原因和前面库存为负数数的情况是一样的,线程1查询当前用户是否有订单,当前用户没有订单准备下单,此时线程2也查询当前用户是否有订单,由于线程1还没有完成下单操作,线程2同样发现当前用户未下单,也准备下单,这样明明一个用户只能下一单,结果下了两单,也就出现了超卖问题

解决方案:一般这种超卖问题可以使用下面两种常见的解决方案

  1. 悲观锁
  2. 乐观锁

悲观锁解决超卖问题

乐观锁需要判断数据是否修改,而当前是判断当前是否存在,所以无法像解决库存超卖一样使用CAS机制,但是可以使用版本号法,但是版本号法需要新增一个字段,所以这里为了方便,就直接演示使用悲观锁解决超卖问题

yryddfsf

代码这里注意,seckillVocher上面的@Transactional要去掉,然后要加一个依赖,启动类上加上一个注解@EnableAspectJAutoProxy(exposeProxy = true)

1
2
3
4
5
<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjweaver</artifactId>
<version>1.9.21</version>
</dependency>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
@Override
public Result seckillVocher(Long voucherId) {
// 1、查询秒杀券
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
log.info("当前秒杀券");
// 2、判断秒杀券是否合法
if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
// 秒杀券的开始时间在当前时间之后
return Result.fail("秒杀尚未开始");
}
if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
// 秒杀券的结束时间在当前时间之前
return Result.fail("秒杀已结束");
}
if (voucher.getStock() < 1) {
return Result.fail("秒杀券已抢空");
}
// 3、创建订单
Long userId = UserHolder.getUser().getId();
synchronized (userId.toString().intern()) {
// 创建代理对象,使用代理对象调用第三方事务方法, 防止事务失效
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
return proxy.createVoucherOrder(userId, voucherId);
}
}

/**
* 创建订单
*
* @param userId
* @param voucherId
* @return
*/
@Transactional
public Result createVoucherOrder(Long userId, Long voucherId) {
// synchronized (userId.toString().intern()) {
// 1、判断当前用户是否是第一单
int count = this.count(new LambdaQueryWrapper<VoucherOrder>()
.eq(VoucherOrder::getUserId, userId));
if (count >= 1) {
// 当前用户不是第一单
return Result.fail("用户已购买");
}
// 2、用户是第一单,可以下单,秒杀券库存数量减一
boolean flag = seckillVoucherService.update(new LambdaUpdateWrapper<SeckillVoucher>()
.eq(SeckillVoucher::getVoucherId, voucherId)
.gt(SeckillVoucher::getStock, 0)
.setSql("stock = stock -1"));
if (!flag) {
throw new RuntimeException("秒杀券扣减失败");
}
// 3、创建对应的订单,并保存到数据库
VoucherOrder voucherOrder = new VoucherOrder();
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
voucherOrder.setUserId(UserHolder.getUser().getId());
voucherOrder.setVoucherId(voucherOrder.getId());
flag = this.save(voucherOrder);
if (!flag) {
throw new RuntimeException("创建秒杀券订单失败");
}
// 4、返回订单id
return Result.ok(orderId);
}

TODO

这里代码的实现细节有点看不懂(以下内容看的别人的总结,这里回头再看看

  1. 锁的范围尽量小。synchronized尽量锁代码块,而不是方法,锁的范围越大性能越低
  2. 锁的对象一定要是一个不变的值。我们不能直接锁 Long 类型的 userId,每请求一次都会创建一个新的 userId 对象,synchronized 要锁不变的值,所以我们要将 Long 类型的 userId 通过 toString()方法转成 String 类型的 userId,toString()方法底层(可以点击去看源码)是直接 new 一个新的String对象,显然还是在变,所以我们要使用 intern() 方法从常量池中寻找与当前 字符串值一致的字符串对象,这就能够保障一个用户 发送多次请求,每次请求的 userId 都是不变的,从而能够完成锁的效果(并行变串行)
  3. 我们要锁住整个事务,而不是锁住事务内部的代码。如果我们锁住事务内部的代码会导致其它线程能够进入事务,当我们事务还未提交,锁一旦释放,仍然会存在超卖问题
  4. Spring的@Transactional注解要想事务生效,必须使用动态代理。Service中一个方法中调用另一个方法,另一个方法使用了事务,此时会导致@Transactional失效,所以我们需要创建一个代理对象,使用代理对象来调用方法。

jeasfsdf

总结:

首先用全局唯一id代表优惠券的下单订单号。如果一个人可以买多单的情况下,在并发场景下,会出现超卖情况,此时用乐观锁的cas来实现防止超卖。然后是单体的一人一单,这里就是下单前需要验证当前用户是否已经下单了,确认是否存在,如果硬要用乐观锁,使用版本号法需要加一个字段,所以用的是悲观锁。但如果在集群环境下,由于锁依赖于jvm内部的监视器,如果是多台机器,每个机器内部的锁只能锁住自己这个进程的,所以要用分布式锁了。


四、分布式锁

由于synchronized是本地锁,只能提供线程级别的同步,每个JVM中都有一把synchronized锁,不能跨 JVM 进行上锁,当一个线程进入被 synchronized 关键字修饰的方法或代码块时,它会尝试获取对象的内置锁(也称为监视器锁)。如果该锁没有被其他线程占用,则当前线程获得锁,可以继续执行代码;否则,当前线程将进入阻塞状态,直到获取到锁为止。而现在我们是创建了两个节点,也就意味着有两个JVM,所以synchronized会失效!从而出现超卖问题:

分布式锁的常见实现

fbss

  • 基于关系数据库:可以利用数据库的事务特性和唯一索引来实现分布式锁。通过向数据库插入一条具有唯一约束的记录作为锁,其他进程在获取锁时会受到数据库的并发控制机制限制。

  • 基于缓存(如Redis):使用分布式缓存服务(如Redis)提供的原子操作来实现分布式锁。通过将锁信息存储在缓存中,其他进程可以通过检查缓存中的锁状态来判断是否可以获取锁。

  • 基于ZooKeeper:ZooKeeper是一个分布式协调服务,可以用于实现分布式锁。通过创建临时有序节点,每个请求都会尝试创建一个唯一的节点,并检查自己是否是最小节点,如果是,则表示获取到了锁。

    从性能角度(从高到低):

    缓存 > Zookeeper > 数据库

    从可靠性角度(从高到低):

    Zookeeper > 缓存 > 数据库

  • setnx指令的特点setnx只能设置key不存在的值,值不存在设置成功,返回 1 ;值存在设置失败,返回 0

    • 获取锁

      1
      2
      # 添加锁
      set [key] [value] ex [time] nx
    • 释放锁

      1
      2
      # 释放锁(除了使用del手动释放,还可超时释放)
      del [key]

GASDG

1、基于redis实现的分布式锁

创建分布式锁

1
2
3
4
5
6
7
8
9
10
11
12
13
public interface ILock {
/**
* 尝试获取锁
* @param timeoutSec 锁持有的时间,过期后自动释放
* @return true true代表获取锁成功,反之代表失败
*/
boolean tryLock(long timeoutSec);

/**
* 释放锁
*/
void unlock();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public class SimpleRedisLock implements ILock {
/**
* RedisTemplate
*/
private StringRedisTemplate stringRedisTemplate;

/**
* 锁的名称
*/
private String name;

public SimpleRedisLock(StringRedisTemplate stringRedisTemplate, String name) {
this.stringRedisTemplate = stringRedisTemplate;
this.name = name;
}


/**
* 获取锁
*
* @param timeoutSec 超时时间
* @return
*/
@Override
public boolean tryLock(long timeoutSec) {
String id = Thread.currentThread().getId() + "";
// SET lock:name id EX timeoutSec NX
Boolean result = stringRedisTemplate.opsForValue()
.setIfAbsent("lock:" + name, id, timeoutSec, TimeUnit.SECONDS);
return Boolean.TRUE.equals(result);
}



/**
* 释放锁
*/
@Override
public void unlock() {
stringRedisTemplate.delete("lock:" + name);
}
}

使用分布式锁:改造前面VoucherOrderServiceImpl中的代码,将之前使用sychronized锁的地方,改成我们自己实现的分布式锁:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Long userId = UserHolder.getUser().getId();
SimpleRedisLock lock = new SimpleRedisLock(stringRedisTemplate, "order:" + userId);
boolean isLock = lock.tryLock(1200);
if (!isLock) {
// 索取锁失败,重试或者直接抛异常(这个业务是一人一单,所以直接返回失败信息)
return Result.fail("一人只能下一单");
}
try {
// 索取锁成功,创建代理对象,使用代理对象调用第三方事务方法, 防止事务失效
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
return proxy.createVoucherOrder(userId, voucherId);
} finally {
lock.unlock();
}

然后用apifox发送两个请求

apifox发送请求测试

可以看到8081端口为false,8082端口为true

8081端口

8082端口

rdm中的锁

这里遇到了一个bug,就是打了断点但是变量那里看不到,根本就没有进入断点。最后发现是apifox发送的请求中,携带登录token的参数Authorization应该是放在header中,我一开始放在了params中

2、分布式锁优化1

——解决释放不属于自己锁的问题

上一节,我们实现了一个简单的分布式锁,但是会存在一个问题:当线程1获取锁后,由于业务阻塞,线程1的锁超时释放了,这时候线程2趁虚而入拿到了锁,然后此时线程1业务完成了,然后把线程2刚刚获取的锁给释放了,这时候线程3又趁虚而入拿到了锁,这就导致又出现了超卖问题!(但是这个在小项目(并发数不高)中出现的概率比较低,在大型项目(并发数高)情况下是有一定概率的)

分布式锁问题1

备注:我们可以把锁的有效期降低一点,这样就能够测试上面哪种情况了(●’◡’●)

如何解决呢?我们为分布式锁添加一个线程标识,在释放锁时判断当前锁是否是自己的锁,是自己的就直接释放,不是自己的就不释放锁,从而解决多个线程同时获得锁的情况导致出现超卖

解决分布式锁问题1

3、分布式锁优化2

——释放锁时的原子性问题。说到底也是锁超时释放的问题

在上一节中,我们通过给锁添加一个线程标识,并且在释放锁时添加一个判断,从而防止锁超时释放产生的超卖问题,一定程度上解决了超卖问题,但是仍有可能发生超卖问题(出现超卖概率更低了):当线程1获取锁,执行完业务然后并且判断完当前锁是自己的锁时,但就在此时发生了阻塞,结果锁被超时释放了,线程2立马就趁虚而入了,获得锁执行业务,但就在此时线程1阻塞完成,由于已经判断过锁,已经确定锁是自己的锁了,于是直接就删除了锁,结果删的是线程2的锁,这就又导致线程3趁虚而入了,从而继续发生超卖问题

备注:我们可以在判断删除锁的那行代码上打一个断点,然后user1发送一个请求,获取锁,手动把锁删了,模拟锁超时释放,然后使用user2发送一个请求,成功获取锁,从而模拟上诉过程,检验超卖问题
分布式锁问题2

PS:虽然这个情况发生的概率较低,但是根据墨菲定律,我们最好不要抱有侥幸心理,不然最终我们会在这个细微的问题上付诸沉重的代价!你可能还会想,判断锁和释放锁在同一个方法中,并且两者之间没有别的代码,为什么会发生阻塞呢?JVM的垃圾回收机制会导致短暂的阻塞(我个人感觉这种情况发生的概率真的不高,但是我也没有实际接触过真正的大型高并发项目,所以具体也只能靠揣摩)

那么我们该如何保障 判断锁 和 释放锁 这连段代码的原子性呢?答案是使用Lua脚本

4、Redisson

经过优化1和优化2,我们实现的分布式锁已经达到生产可用级别了,但是还不够完善,比如:

  1. 分布式锁不可重入:不可重入是指同一线程不能重复获取同一把锁。比如,方法A中调用方法B,方法A需要获取分布式锁,方法B同样需要获取分布式锁,线程1进入方法A获取了一次锁,进入方法B又获取一次锁,由于锁不可重入,所以就会导致死锁
  2. 分布式锁不可重试:获取锁只尝试一次就返回false,没有重试机制,这会导致数据丢失,比如线程1获取锁,然后要将数据写入数据库,但是当前的锁被线程2占用了,线程1直接就结束了而不去重试,这就导致数据发生了丢失
  3. 分布式锁超时释放:超市释放机机制虽然一定程度避免了死锁发生的概率,但是如果业务执行耗时过长,期间锁就释放了,这样存在安全隐患。锁的有效期过短,容易出现业务没执行完就被释放,锁的有效期过长,容易出现死锁,所以这是一个大难题!
    1. 我们可以设置一个较短的有效期,但是加上一个 心跳机制 和 自动续期:在锁被获取后,可以使用心跳机制并自动续期锁的持有时间。通过定期发送心跳请求,显示地告知其他线程或系统锁还在使用中,同时更新锁的过期时间。如果某个线程持有锁的时间超过了预设的有效时间,其他线程可以尝试重新获取锁。
  4. 主从一致性问题:如果Redis提供了主从集群,主从同步存在延迟,线程1获取了锁

我们如果想要更进一步优化分布式锁,当然是可以的,但是没必要,除非是迫不得已,我们完全可以直接使用已经造好的轮子,比如:Redisson。Redssion是一个十分成熟的Redis框架,功能也很多,比如:分布式锁和同步器、分布式对象、分布式集合、分布式服务,各种Redis实现分布式的解决方案。简而言之Redisson就是一个使用Redis解决分布式问题的方案的集合,当然它不仅仅是解决分布式相关问题,还包含其它的一些问题。

所以说分布式锁的究极优化就是使用别人造好的轮子🤣
redisson分布式锁的实现使用

1)、引入依赖

1
2
3
4
5
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.13.6</version>
</dependency>

2)、配置Redisson客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Configuration
public class RedissonConfig {
/**
* 创建Redisson配置对象,然后交给IOC管理
*
* @return
*/
@Bean
public RedissonClient redissonClient() {
// 获取Redisson配置对象
Config config = new Config();
// 添加redis地址,这里添加的是单节点地址,也可以通过 config.userClusterServers()添加集群地址
config.useSingleServer().setAddress("redis://192.168.200.100:6379").setPassword("123321");
// 获取RedisClient对象,并交给IOC进行管理
return Redisson.create(config);
}
}

温馨提示:此外还有一种引入方式,可以引入 redission 的 starter 依赖,然后在yml文件中配置Redisson,但是不推荐这种方式,因为他会替换掉 Spring官方 提供的这套对 Redisson 的配置

3)、修改一下使用锁的地方,其它的业务代码都不需要改

1
2
3
//SimpleRedisLock lock = new SimpleRedisLock(stringRedisTemplate, "order:" + userId);
RLock lock = redissonClient.getLock("lock:order:" + userId);
boolean isLock = lock.tryLock();

五、MQ异步优化

此时先测一下之前同步的方式性能如何

用一下测试代码生成1000个登录用户的token并保存

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
package com.hmdp;

import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.bean.copier.CopyOptions;
import com.hmdp.dto.UserDTO;
import com.hmdp.entity.User;
import com.hmdp.service.IUserService;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.redis.core.StringRedisTemplate;

import javax.annotation.Resource;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

import static com.hmdp.utils.RedisConstants.LOGIN_USER_KEY;
import static com.hmdp.utils.RedisConstants.LOGIN_USER_TTL;


@SpringBootTest
public class yibuTest {
@Resource
IUserService userService;
@Resource private StringRedisTemplate stringRedisTemplate;
@Test
public void testGetAll() {
List<User> users = userService.list();
// for(User user : users){
// System.out.println(user);
// }
// System.exit(0);
users.forEach(
user -> {
// 7.1,随机生成token,作为登录令牌
String token = UUID.randomUUID().toString();
// 7.2,将User对象转化为HashMap存储
UserDTO userDTO = BeanUtil.copyProperties(user, UserDTO.class);
File file = new File("D:\\code\\token.txt");
FileOutputStream output = null;
try {
output = new FileOutputStream(file, true);
byte[] bytes = token.getBytes();
output.write(bytes);
output.write("\r\n".getBytes());
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
try {
output.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
Map<String, Object> userMap = BeanUtil.beanToMap(userDTO, new HashMap<>(),
CopyOptions.create()
.setIgnoreNullValue(true)
.setFieldValueEditor((fieldName, fieldValue) -> fieldValue.toString()));
// 7.3,存储
String tokenKey = LOGIN_USER_KEY + token;
stringRedisTemplate.opsForHash().putAll(tokenKey, userMap);
// 7.4,设置token有效期
stringRedisTemplate.expire(tokenKey, LOGIN_USER_TTL, TimeUnit.MINUTES);
}
);
}

}

更改优惠券数量为200,并清空订单表

设置1000token

注意优惠券的过期时间!!!!!

执行完之后,库存扣减为0,订单表中正好200条订单

测试完

jmeter显示结果如下:还是比较慢的

jmeter结果

第二次测试

第二次测试

第三次测试

第三次测试

1、优化思路

之前的流程如下:

之前流程

2、安装rabbitMQ

已有centOS7,docker,finalshell

安装参考链接:rabbitMQ安装参考

安装Erlang 成功截图

安装erlang截图

rabbitmq启动成功

rabbitmq启动成功

开启web管理功能

开启web管理功能

开启后,在本机访问虚拟机ip加端口号15672即可访问

web管理mq

然后是创建用户。因为在finalshell开启了rabbitmq后,就不能继续输入命令了。所以这里我是先在centos里面开启rabbitmq,然后再在finalshell输入创建用户的命令。执行后如下:

mq创建用户

然后登录web管理页面:

成功登录

3.实现异步处理的代码

新增代码结构如下:

代码结构

配置mq通信为主题模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package com.hmdp.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Configuration;

// 这段代码用于配置 RabbitMQ 的话题模式,包括队列、交换机和绑定。下面是加上注释后的代码:
@Configuration
public class RabbitMQTopicConfig {
public static final String QUEUE = "seckillQueue"; // 队列名
public static final String EXCHANGE = "seckillExchange"; // 交换机名
public static final String ROUTINGKEY = "seckill.lua.#";

@Bean
public Queue queue(){
return new Queue(QUEUE);
}

@Bean
public TopicExchange topicExchange(){
return new TopicExchange(EXCHANGE);
}

@Bean
public Binding binding(){
return BindingBuilder.bind(queue()).to(topicExchange()).with(ROUTINGKEY);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package com.hmdp.rabbitmq;

import com.hmdp.config.RabbitMQTopicConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

/**
* 消息发送者
*/
@Slf4j
@Service
public class MQSender {
@Autowired
private RabbitTemplate rabbitTemplate;

private static final String ROUTINGKEY = "seckill.lua.message";
/**
* 发送秒杀信息
* @param msg
*/
public void sendSeckillMessage(String msg){
log.info("发送消息"+msg);
rabbitTemplate.convertAndSend(RabbitMQTopicConfig.EXCHANGE,ROUTINGKEY,msg);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
package com.hmdp.rabbitmq;

import com.alibaba.fastjson.JSON;
import com.hmdp.config.RabbitMQTopicConfig;
import com.hmdp.entity.VoucherOrder;
import com.hmdp.service.ISeckillVoucherService;
import com.hmdp.service.IVoucherOrderService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import javax.annotation.Resource;

/**
* 消息消费者
*/
@Slf4j
@Service
public class MQReceiver {

@Resource
IVoucherOrderService voucherOrderService;

@Resource
ISeckillVoucherService seckillVoucherService;
/**
* 接收秒杀信息并下单
* @param msg
*/
@Transactional
@RabbitListener(queues = RabbitMQTopicConfig.QUEUE)
public void receiveSeckillMessage(String msg){
log.info("接收到消息: "+msg);
VoucherOrder voucherOrder = JSON.parseObject(msg, VoucherOrder.class);

Long voucherId = voucherOrder.getVoucherId();
//5.一人一单
Long userId = voucherOrder.getUserId();
//5.1查询订单
int count = voucherOrderService.query().eq("user_id",userId).eq("voucher_id", voucherId).count();
//5.2判断是否存在
if(count>0){
//用户已经购买过了
log.error("该用户已购买过");
return ;
}
log.info("扣减库存");
//6.扣减库存
boolean success = seckillVoucherService
.update()
.setSql("stock = stock-1")
.eq("voucher_id", voucherId)
.gt("stock",0)//cas乐观锁
.update();
if(!success){
log.error("库存不足");
return;
}
//直接保存订单
voucherOrderService.save(voucherOrder);
}

}

注意这里需要手动向redis中添加库存键及数量,键名就是下面lua代码中stockKey的名字,这里voucherId为7,键名就是stock:7,这里也算一个bug,搞了有一阵子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
-- 1.参数列表
-- 1.1优惠卷id
local voucherId = ARGV[1]
-- 1.2用户id
local userId = ARGV[2]

-- 2.数据key
-- 2.1库存key
local stockKey = 'stock:' .. voucherId
-- 2.2订单key
local orderKey = 'order:' .. voucherId

-- 3.脚本业务
-- 3.1判断库存是否充足
if(tonumber(redis.call('get',stockKey)) <= 0)then
-- 3.2 库存不足 返回1
return 1
end
--3.2判断用户是否下单
if(redis.call('sismember',orderKey,userId) == 1) then
-- 3.3存在,说明是重复下单
return 2
end
-- 3.4扣库存
redis.call('incrby',stockKey,-1)
-- 3.5下单并保存用户
redis.call('sadd',orderKey,userId)
return 0

下面是VoucherOrderServiceImpl的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {

@Resource
private MQSender mqSender;
@Resource
private RedisIdWorker redisIdWorker;

//private RateLimiter rateLimiter = RateLimiter.create(10);
@Resource
private StringRedisTemplate stringRedisTemplate;

//@Resource
//private RedissonClient redissonClient;

//lua脚本
private static final DefaultRedisScript<Long> SECKILL_SCRIPT;

static {
SECKILL_SCRIPT = new DefaultRedisScript<>();
SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
SECKILL_SCRIPT.setResultType(Long.class);
}

@Override
public Result seckillVocher(Long voucherId) {
//1.执行lua脚本
Long userId = UserHolder.getUser().getId();

Long r = stringRedisTemplate.execute(
SECKILL_SCRIPT,
Collections.emptyList(),
voucherId.toString(),
userId.toString()
);
//2.判断结果为0
int result = r.intValue();
if (result != 0) {
//2.1不为0代表没有购买资格
return Result.fail(r == 1 ? "库存不足" : "该用户重复下单");
}
//2.2为0代表有购买资格,将下单信息保存到阻塞队列

//2.3创建订单
VoucherOrder voucherOrder = new VoucherOrder();
//2.4订单id
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
//2.5用户id
voucherOrder.setUserId(userId);
//2.6代金卷id
voucherOrder.setVoucherId(voucherId);

//2.7将信息放入MQ中
mqSender.sendSeckillMessage(JSON.toJSONString(voucherOrder));


//2.7 返回订单id
return Result.ok(orderId);
}
}

4.结果分析

注意在用jmeter测试之前,先清除之前的数据,如下:

清楚jmeter数据

实验结果如下:

第二次结果

可以看到最小耗时2ms,最大耗时112ms,提高了将近20倍,平均耗时34ms,提高了将近30倍左右

评论