表结构
基于Session实现登录流程
发送验证码:
用户在提交手机号后,会校验手机号是否合法,如果不合法,则要求用户重新输入手机号
如果手机号合法,后台此时生成对应的验证码,同时将验证码进行保存,然后再通过短信的方式将验证码发送给用户
短信验证码登录、注册:
用户将验证码和手机号进行输入,后台从session中拿到当前验证码,然后和用户输入的验证码进行校验,如果不一致,则无法通过校验,如果一致,则后台根据手机号查询用户,如果用户不存在,则为用户创建账号信息,保存到数据库,无论是否存在,都会将用户信息保存到session中,方便后续获得当前登录信息
校验登录状态:
用户在请求时候,会从cookie中携带者JsessionId到后台,后台通过JsessionId从session中拿到用户信息,如果没有session信息,则进行拦截,如果有session信息,则将用户信息保存到threadLocal中,并且放行
实现发送短信验证码功能
说明 | |
---|---|
请求方式 | POST |
请求路径 | /user/code |
请求参数 | phone,电话号码 |
返回值 | 无 |
发送验证码
@Overridepublic Result sendCode(String phone, HttpSession session) {// 1.校验手机号if (RegexUtils.isPhoneInvalid(phone)) {// 2.如果不符合,返回错误信息return Result.fail("手机号格式错误!");}// 3.符合,生成验证码String code = RandomUtil.randomNumbers(6);// 4.保存验证码到 sessionsession.setAttribute("code",code);// 5.发送验证码log.debug("发送短信验证码成功,验证码:{}", code);// 返回okreturn Result.ok();}
登录
@Overridepublic Result login(LoginFormDTO loginForm, HttpSession session) {// 1.校验手机号String phone = loginForm.getPhone();if (RegexUtils.isPhoneInvalid(phone)) {// 2.如果不符合,返回错误信息return Result.fail("手机号格式错误!");}// 3.校验验证码Object cacheCode = session.getAttribute("code");String code = loginForm.getCode();if(cacheCode == null || !cacheCode.toString().equals(code)){//3.不一致,报错return Result.fail("验证码错误");}//一致,根据手机号查询用户User user = query().eq("phone", phone).one();//5.判断用户是否存在if(user == null){//不存在,则创建user = createUserWithPhone(phone);}//7.保存用户信息到session中session.setAttribute("user",user);return Result.ok();}
实现登录拦截功能
拦截器代码
public class LoginInterceptor implements HandlerInterceptor {@Overridepublic boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {//1.获取sessionHttpSession session = request.getSession();//2.获取session中的用户Object user = session.getAttribute("user");//3.判断用户是否存在if(user == null){//4.不存在,拦截,返回401状态码response.setStatus(401);return false;}//5.存在,保存用户信息到ThreadlocalUserHolder.saveUser((User)user);//6.放行return true;}
}
让拦截器生效
@Configuration
public class MvcConfig implements WebMvcConfigurer {@Resourceprivate StringRedisTemplate stringRedisTemplate;@Overridepublic void addInterceptors(InterceptorRegistry registry) {// 登录拦截器registry.addInterceptor(new LoginInterceptor()).excludePathPatterns("/shop/**","/voucher/**","/shop-type/**","/upload/**","/blog/hot","/user/code","/user/login").order(1);// token刷新的拦截器registry.addInterceptor(new RefreshTokenInterceptor(stringRedisTemplate)).addPathPatterns("/**").order(0);}
}
隐藏用户敏感信息
不直接返回User实体类的全部信息,我们应当在返回用户信息之前,将用户的敏感信息进行隐藏,采用的核心思路就是书写一个UserDto对象,这个UserDto对象就没有敏感信息了,我们在返回前,将有用户敏感信息的User对象转化成没有敏感信息的UserDto对象。
修改
(1)保存用户dto到session
(2)在拦截器处如果用户存在,保存用户信息到ThreadLocal
(3)在UserHolder处将User对象改为UserDTO
session共享问题
(1)什么是Session集群共享问题?
每个tomcat中都有一份属于自己的session,假设用户第一次访问第一台tomcat,并且把自己的信息存放到第一台服务器的session中,但是第二次这个用户访问到了第二台tomcat,那么在第二台服务器上,肯定没有第一台服务器存放的session,所以此时 整个登录拦截功能就会出现问题。
(2) Session集群共享问题造成哪些问题?
服务器之间无法实现会话状态的共享。比如:在当前这个服务器上用户已经完成了登录,Session中存储了用户的信息,能够判断用户已登录,但是在另一个服务器的Session中没有用户信息,无法调用显示没有登录的服务器上的服务。
(3)如何解决Session集群共享问题?
方案一:Session拷贝(不推荐)
Tomcat提供了Session拷贝功能,通过配置Tomcat可以实现Session的拷贝,但是这会增加服务器的额外内存开销,同时会带来数据一致性问题
方案二:Redis缓存(推荐)
Redis缓存具有Session存储一样的特点,基于内存、存储结构可以是key-value结构、数据共享
(4)Redis缓存相较于传统Session存储的优点
1 高性能和可伸缩性:Redis 是一个内存数据库,具有快速的读写能力。相比于传统的 Session 存储方式,将会话数据存储在 Redis 中可以大大提高读写速度和处理能力。此外,Redis 还支持集群和分片技术,可以实现水平扩展,处理大规模的并发请求。
2 可靠性和持久性:Redis 提供了持久化机制,可以将内存中的数据定期或异步地写入磁盘,以保证数据的持久性。这样即使发生服务器崩溃或重启,会话数据也可以被恢复。
3 丰富的数据结构:Redis 不仅仅是一个键值存储数据库,它还支持多种数据结构,如字符串、列表、哈希、集合和有序集合等。这些数据结构的灵活性使得可以更方便地存储和操作复杂的会话数据。
4 分布式缓存功能:Redis 作为一个高效的缓存解决方案,可以用于缓存会话数据,减轻后端服务器的负载。与传统的 Session 存储方式相比,使用 Redis 缓存会话数据可以大幅提高系统的性能和可扩展性。
5 可用性和可部署性:Redis 是一个强大而成熟的开源工具,有丰富的社区支持和活跃的开发者社区。它可以轻松地与各种编程语言和框架集成,并且可以在多个操作系统上运行。
PS:但是Redis费钱,而且增加了系统的复杂度
Redis代替session的业务流程
设计key的具体细节
我们可以使用String结构,就是一个简单的key,value键值对的方式
Hash 结构与 String 结构类型的比较
String 数据结构是以 JSON 字符串的形式保存,更加直观,操作也更加简单,但是 JSON 结构会有很多非必须的内存开销,比如双引号、大括号,内存占用比 Hash 更高
Hash 数据结构是以 Hash 表的形式保存,可以对单个字段进行CRUD,更加灵活
流程
当注册完成后,用户去登录会去校验用户提交的手机号和验证码,是否一致,如果一致,则根据手机号查询用户信息,不存在则新建,最后将用户数据保存到redis,并且生成token作为redis的key,当我们校验用户是否登录时,会去携带着token进行访问,从redis中取出token对应的value,判断是否存在这个数据,如果没有则拦截,如果存在则将其保存到threadLocal中,并且放行。
Redis替代Session需要考虑的问题
(1)选择合适的数据结构,了解 Hash 比 String 的区别
(2)选择合适的key,为key设置一个业务前缀,方便区分和分组,为key拼接一个UUID,避免key冲突防止数据覆盖
(3)选择合适的存储粒度,对于验证码这类数据,一般设置TTL为3min即可,防止大量缓存数据的堆积,而对于用户信息这类数据可以稍微设置长一点,比如30min,防止频繁对Redis进行IO操作
基于Redis实现短信登录
/*** 发送验证码** @param phone* @param session* @return*/@Overridepublic Result sendCode(String phone, HttpSession session) {// 1、判断手机号是否合法if (RegexUtils.isPhoneInvalid(phone)) {return Result.fail("手机号格式不正确");}// 2、手机号合法,生成验证码,并保存到Redis中String code = RandomUtil.randomNumbers(6);stringRedisTemplate.opsForValue().set(LOGIN_CODE_KEY + phone, code,RedisConstants.LOGIN_CODE_TTL, TimeUnit.MINUTES);// 3、发送验证码log.info("验证码:{}", code);return Result.ok();}/*** 用户登录** @param loginForm* @param session* @return*/@Overridepublic Result login(LoginFormDTO loginForm, HttpSession session) {String phone = loginForm.getPhone();String code = loginForm.getCode();// 1、判断手机号是否合法if (RegexUtils.isPhoneInvalid(phone)) {return Result.fail("手机号格式不正确");}// 2、判断验证码是否正确String redisCode = stringRedisTemplate.opsForValue().get(LOGIN_CODE_KEY + phone);if (code == null || !code.equals(redisCode)) {return Result.fail("验证码不正确");}// 3、判断手机号是否是已存在的用户User user = this.getOne(new LambdaQueryWrapper<User>().eq(User::getPhone, phone));if (Objects.isNull(user)) {// 用户不存在,需要注册user = createUserWithPhone(phone);}// 4、保存用户信息到Redis中,便于后面逻辑的判断(比如登录判断、随时取用户信息,减少对数据库的查询)UserDTO userDTO = BeanUtil.copyProperties(user, UserDTO.class);// 将对象中字段全部转成string类型,StringRedisTemplate只能存字符串类型的数据Map<String, Object> userMap = BeanUtil.beanToMap(userDTO, new HashMap<>(),CopyOptions.create().setIgnoreNullValue(true).setFieldValueEditor((fieldName, fieldValue) -> fieldValue.toString()));String token = UUID.randomUUID().toString(true);String tokenKey = LOGIN_USER_KEY + token;stringRedisTemplate.opsForHash().putAll(tokenKey, userMap);stringRedisTemplate.expire(tokenKey, LOGIN_USER_TTL, TimeUnit.MINUTES);return Result.ok(token);}/*** 根据手机号创建用户并保存** @param phone* @return*/private User createUserWithPhone(String phone) {User user = new User();user.setPhone(phone);user.setNickName(SystemConstants.USER_NICK_NAME_PREFIX + RandomUtil.randomString(10));this.save(user);return user;}
注意
CopyOptions.create().setIgnoreNullValue(true).setFieldValueEditor((fieldName, fieldValue) -> fieldValue.toString());
这部分代码创建了一个 CopyOptions 对象,并设置了两个选项:
setIgnoreNullValue(true):
这个选项表示在转换过程中忽略 userDTO 对象中值为 null 的属性。也就是说,如果 userDTO 中有某个属性的值为 null,那么在转换后的 Map 中不会包含这个属性。
setFieldValueEditor((fieldName, fieldValue) -> fieldValue.toString()):
这个选项用于对每个属性的值进行编辑。它接受一个 BiFunction<String, Object, Object> 类型的参数,其中 fieldName 是属性名,fieldValue 是属性值。
(fieldName, fieldValue) -> fieldValue.toString() 是一个 Lambda 表达式,它的作用是将每个属性的值转换为 String 类型。这样做的目的是确保转换后的 Map 中所有的值都是 String 类型,满足 Redis 存储的要求。例如,如果 userDTO 中有一个 id 属性,其类型为 Long,通过这个 Lambda 表达式,id 的值会被转换为 String 类型。
解决状态登录刷新问题
单独配置一个拦截器用户刷新Redis中的token:在基于Session实现短信验证码登录时,我们只配置了一个拦截器,这里需要另外再配置一个拦截器专门用于刷新存入Redis中的 token,因为我们现在改用Redis了,为了防止用户在操作网站时突然由于Redis中的 token 过期,导致直接退出网站,严重影响用户体验。那为什么不把刷新的操作放到一个拦截器中呢,因为之前的那个拦截器只是用来拦截一些需要进行登录校验的请求,对于哪些不需要登录校验的请求是不会走拦截器的,刷新操作显然是要针对所有请求比较合理,所以单独创建一个拦截器拦截一切请求,刷新Redis中的Key
登录拦截器
public class LoginInterceptor implements HandlerInterceptor {/*** 前置拦截器,用于判断用户是否登录*/@Overridepublic boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {// 判断当前用户是否已登录if (ThreadLocalUtls.getUser() == null){// 当前用户未登录,直接拦截response.setStatus(HttpStatus.HTTP_UNAUTHORIZED);return false;}// 用户存在,直接放行return true;}
}
刷新token的拦截器
public class RefreshTokenInterceptor implements HandlerInterceptor {// new出来的对象是无法直接注入IOC容器的(LoginInterceptor是直接new出来的)// 所以这里需要再配置类中注入,然后通过构造器传入到当前类中private StringRedisTemplate stringRedisTemplate;public RefreshTokenInterceptor(StringRedisTemplate stringRedisTemplate) {this.stringRedisTemplate = stringRedisTemplate;}@Overridepublic boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {// 1、获取token,并判断token是否存在String token = request.getHeader("authorization");if (StrUtil.isBlank(token)){// token不存在,说明当前用户未登录,不需要刷新直接放行return true;}// 2、判断用户是否存在String tokenKey = LOGIN_USER_KEY + token;Map<Object, Object> userMap = stringRedisTemplate.opsForHash().entries(tokenKey);if (userMap.isEmpty()){// 用户不存在,说明当前用户未登录,不需要刷新直接放行return true;}// 3、用户存在,则将用户信息保存到ThreadLocal中,方便后续逻辑处理,比如:方便获取和使用用户信息,Redis获取用户信息是具有侵入性的UserDTO userDTO = BeanUtil.fillBeanWithMap(userMap, new UserDTO(), false);ThreadLocalUtls.saveUser(BeanUtil.copyProperties(userMap, UserDTO.class));// 4、刷新token有效期stringRedisTemplate.expire(token, LOGIN_USER_TTL, TimeUnit.MINUTES);return true;}
}
将自定义的拦截器添加到SpringMVC的拦截器表中,使其生效
@Configuration
public class WebMvcConfig implements WebMvcConfigurer {// new出来的对象是无法直接注入IOC容器的(LoginInterceptor是直接new出来的)// 所以这里需要再配置类中注入,然后通过构造器传入到当前类中@Resourceprivate StringRedisTemplate stringRedisTemplate;@Overridepublic void addInterceptors(InterceptorRegistry registry) {// 添加登录拦截器registry.addInterceptor(new LoginInterceptor())// 设置放行请求.excludePathPatterns("/user/code","/user/login","/blog/hot","/shop/**","/shop-type/**","/upload/**","/voucher/**").order(1); // 优先级默认都是0,值越大优先级越低// 添加刷新token的拦截器registry.addInterceptor(new RefreshTokenInterceptor(stringRedisTemplate)).addPathPatterns("/**").order(0);}
}
这样设计拦截器顺序的原因:
主要是因为即使访问不需要登录拦截的接口,用户的token也需要刷新。如果登录拦截器优先级更高,那么在访问这些不需要登录的接口时,token可能已经失效,而刷新token的逻辑却无法执行。所以将刷新token拦截器的优先级设置得更高(order为0),确保在任何请求到达时都能先尝试刷新token,避免token失效导致用户无法正常访问接口,同时登录拦截器再对需要验证登录的接口进行拦截,这样可以更好地保障用户体验和系统逻辑的合理性。
商户查询缓存
缓存(Cache),就是数据交换的缓冲区,俗称的缓存就是缓冲区内的数据,一般从数据库中获取,存储于本地代码。
为什么使用缓存
缓存数据存储于代码中,而代码运行在内存中,内存的读写性能远高于磁盘,缓存可以大大降低用户访问并发量带来的服务器读写压力
实际开发过程中,企业的数据量,少则几十万,多则几千万,这么大数据量,如果没有缓存来作为"避震器",系统是几乎撑不住的,所以企业会大量运用到缓存技术。
缓存代码思路
如果缓存有,则直接返回,如果缓存不存在,则查询数据库,然后存入redis。
缓存更新策略
缓存更新是redis为了节约内存而设计出来的一个东西,主要是因为内存数据宝贵,当我们向redis插入太多数据,此时就可能会导致缓存中的数据过多,所以redis会对部分数据进行更新,或者把他叫为淘汰更合适。
内存淘汰:**redis自动进行,当redis内存达到咱们设定的max-memery的时候,会自动触发淘汰机制,淘汰掉一些不重要的数据(可以自己设置策略方式)
**超时剔除:**当我们给redis设置了过期时间ttl之后,redis会将超时的数据进行删除,方便咱们继续使用缓存
**主动更新:**我们可以手动调用方法把缓存删掉,通常用于解决缓存和数据库不一致问题
!在更新数据的情况下 优先选择删除缓存模式 其次是更新缓存模式
问题:操作时,先操作数据库还是先操作缓存捏?
答案:先操作数据库,再删缓存
如果先操作缓存:先删缓存,再更新数据库
当线程1删除缓存到更新数据库之间的时间段,会有其它线程进来查询数据,由于没有加锁,且前面的线程将缓存删除了,这就导致请求会直接打到数据库上,给数据库带来巨大压力。这个事件发生的概率很大,因为缓存的读写速度块,而数据库的读写较慢。
这种方式的不足之处:存在缓存击穿问题,且概率较大
如果先操作数据库:先更新数据库,再删缓存
当线程1在查询缓存且未命中,此时线程1查询数据,查询完准备写入缓存时,由于没有加锁线程2乘虚而入,线程2在这期间对数据库进行了更新,此时线程1将旧数据返回了,出现了脏读,这个事件发生的概率很低,因为先是需要满足缓存未命中,且在写入缓存的那段时间内有一个线程进行更新操作,缓存的读写和查询很快,这段空隙时间很小,所以出现脏读现象的概率也很低
这种方式的不足之处:存在脏读现象,但概率较小
要保证两个操作同时操作 在单体项目中可以放在同一个事务中
缓存穿透解决方案
缓存穿透是指客户端请求的数据在缓存中和数据库中都不存在,这样缓存永远不会生效,这些请求都会打到数据库。
(1)缓存空对象
(2)布隆过滤(存在误判可能)
缓存雪崩解决方案
缓存雪崩是指在同一时段大量的缓存key同时失效或者Redis服务宕机,导致大量请求到达数据库,带来巨大压力。
就是说,一群设置了有效期的key同时消失了,或者说redis罢工了,导致所有的或者说大量的请求会给数据库带来巨大压力叫做缓存雪崩~
(1)给不同的Key的TTL添加随机值
(2)利用Redis集群提高服务的可用性
(3)给缓存业务添加降级限流策略,比如快速失败机制,让请求尽可能打不到数据库上
(4)给业务添加多级缓存
缓存击穿解决方案
缓存击穿问题也叫热点Key问题,就是一个被高并发访问并且缓存重建业务较复杂的key突然失效了,无数的请求访问会在瞬间给数据库带来巨大的冲击。
(1)互斥锁
(2)逻辑过期
全局唯一id
自增id存在的问题
-
规律性太明显:
容易被猜测,导致信息泄露或伪造请求。
攻击者可能通过规律推测其他用户的ID,造成安全风险。
-
分库分表限制:
MySQL单表存储量有限(约500万行或2GB),超过后需分库分表。
自增ID在分库分表后无法保证全局唯一性。
-
扩展性差:
高并发场景下,自增ID可能导致性能瓶颈。
维护复杂,需额外机制保证ID的唯一性和安全性。
分布式id的实现方式
-
UUID:
优点:简单,全局唯一。
缺点:无序,存储空间大,不适合索引。
-
Redis自增:
优点:高性能,支持分布式。
缺点:依赖Redis,需考虑Redis的高可用性。
-
数据库自增:
优点:简单易用。
缺点:性能瓶颈,扩展性差。
-
Snowflake算法:
优点:高性能,ID有序。
缺点:依赖系统时钟,时钟回拨可能导致ID重复。
-
自定义实现:
结合时间戳、序列号和数据库自增,生成高安全性ID。
自定义分布式id生成器
核心逻辑:
时间戳:31bit,表示秒级时间,支持69年
序列号:32bit,表示每秒内的计数器,支持每秒生成2^32个ID。
拼接方式:时间戳左移32位后与序列号按位或运算。
代码实现
@Component
public class RedisIdWorker {@Resourceprivate StringRedisTemplate stringRedisTemplate;private static final long BEGIN_TIMESTAMP = 1640995200; // 起始时间戳private static final int COUNT_BITS = 32; // 序列号位数public long nextId(String keyPrefix) {// 1. 生成时间戳LocalDateTime now = LocalDateTime.now();long nowSecond = now.toEpochSecond(ZoneOffset.UTC);long timestamp = nowSecond - BEGIN_TIMESTAMP;// 2. 生成序列号(以当天日期为key,防止序列号溢出)String date = now.format(DateTimeFormatter.ofPattern("yyyyMMdd"));Long count = stringRedisTemplate.opsForValue().increment("id:" + keyPrefix + ":" + date);// 3. 拼接并返回IDreturn timestamp << COUNT_BITS | count;}
}
优惠券秒杀
业务流程
/*** 抢购秒杀券** @param voucherId* @return*/@Transactional@Overridepublic Result seckillVoucher(Long voucherId) {// 1、查询秒杀券SeckillVoucher voucher = seckillVoucherService.getById(voucherId);// 2、判断秒杀券是否合法if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {// 秒杀券的开始时间在当前时间之后return Result.fail("秒杀尚未开始");}if (voucher.getEndTime().isBefore(LocalDateTime.now())) {// 秒杀券的结束时间在当前时间之前return Result.fail("秒杀已结束");}if (voucher.getStock() < 1) {return Result.fail("秒杀券已抢空");}// 5、秒杀券合法,则秒杀券抢购成功,秒杀券库存数量减一boolean flag = seckillVoucherService.update(new LambdaUpdateWrapper<SeckillVoucher>().eq(SeckillVoucher::getVoucherId, voucherId).setSql("stock = stock -1"));if (!flag){throw new RuntimeException("秒杀券扣减失败");}// 6、秒杀成功,创建对应的订单,并保存到数据库VoucherOrder voucherOrder = new VoucherOrder();long orderId = redisIdWorker.nextId(SECKILL_VOUCHER_ORDER);voucherOrder.setId(orderId);voucherOrder.setUserId(ThreadLocalUtls.getUser().getId());voucherOrder.setVoucherId(voucherOrder.getId());flag = this.save(voucherOrder);if (!flag){throw new RuntimeException("创建秒杀券订单失败");}// 返回订单idreturn Result.ok(orderId);}
单体下一人多单超卖问题
通过Jmeter压力测试 可以发现在qps400时,异常率高达95.80%,也就是说2000个请求只有84个请求正常响应,其它的1916个请求失败,这是很糟糕的,如果发生在公司的系统(特别是大公司)那将是不可估量的损失!经过多次压测,库存居然还惊现了负值!
为什么会产生超卖问题呢?
线程1查询库存,发现库存充足,创建订单,然后准备对库存进行扣减,但此时线程2和线程3也进行查询,同样发现库存充足,然后线程1执行完扣减操作后,库存变为了0,线程2和线程3同样完成了库存扣减操作,最终导致库存变成了负数!这就是超卖问题的完整流程
超卖问题常见解决方案
方案一:悲观锁
悲观锁,认为线程安全问题一定会发生,因此操作数据库之前都需要先获取锁,确保线程串行执行。常见的悲观锁有:synchronized、lock
方案二:乐观锁
乐观锁,认为线程安全问题不一定发生,因此不加锁,只会在更新数据库的时候去判断有没有其它线程对数据进行修改,如果没有修改则认为是安全的,直接更新数据库中的数据即可,如果修改了则说明不安全,直接抛异常或者等待重试。常见的实现方式有:版本号法、CAS操作、乐观锁算法
悲观锁乐观锁的比较
- 悲观锁比乐观锁的性能低:悲观锁需要先加锁再操作,而乐观锁不需要加锁,所以乐观锁通常具有更好的性能。
- 悲观锁比乐观锁的冲突处理能力低:悲观锁在冲突发生时直接阻塞其他线程,乐观锁则是在提交阶段检查冲突并进行重试。
- 悲观锁比乐观锁的并发度低:悲观锁存在锁粒度较大的问题,可能会限制并发性能;而乐观锁可以实现较高的并发度。
- 应用场景:两者都是互斥锁,悲观锁适合写入操作较多、冲突频繁的场景;乐观锁适合读取操作较多、冲突较少的场景。
CAS
CAS(Compare and Swap)是一种并发编程中常用的原子操作,用于解决多线程环境下的数据竞争问题。它是乐观锁算法的一种实现方式。
CAS操作包含三个参数:内存地址V、旧的预期值A和新的值B。CAS的执行过程如下:
- 比较(Compare):将内存地址V中的值与预期值A进行比较。
- 判断(Judgment):如果相等,则说明当前值和预期值相等,表示没有发生其他线程的修改。
- 交换(Swap):使用新的值B来更新内存地址V中的值。
CAS操作是一个原子操作,意味着在执行过程中不会被其他线程中断,保证了线程安全性。如果CAS操作失败(即当前值与预期值不相等),通常会进行重试,直到CAS操作成功为止。
CAS操作适用于精细粒度的并发控制,可以避免使用传统的加锁机制带来的性能开销和线程阻塞。然而,CAS操作也存在一些限制和注意事项:
- ABA问题:CAS操作无法感知到对象值从A变为B又变回A的情况,可能会导致数据不一致。为了解决ABA问题,可以引入版本号或标记位等机制。
- 自旋开销:当CAS操作失败时,需要不断地进行重试,会占用CPU资源。如果重试次数过多或者线程争用激烈,可能会引起性能问题。
- 并发性限制:如果多个线程同时对同一内存地址进行CAS操作,只有一个线程的CAS操作会成功,其他线程需要重试或放弃操作。
在Java中,提供了相关的CAS操作支持,如AtomicInteger、AtomicLong、AtomicReference等类,可以实现基于CAS操作的线程安全操作。
乐观锁解决一人多单超卖
实现方法一:版本号法
首先我们要为 tb_seckill_voucher 表新增一个版本号字段 version ,线程1查询完库存,在进行库存扣减操作的同时将版本号+1,线程2在查询库存时,同时查询出当前的版本号,发现库存充足,也准备执行库存扣减操作,但是需要判断当前的版本号是否是之前查询时的版本号,结果发现版本号发生了改变,这就说明数据库中的数据已经发生了修改,需要进行重试(或者直接抛异常中断)
实现方法二:CAS法
CAS法类似与版本号法,但是不需要另外在添加一个 version 字段,而是直接使用库存替代版本号,线程1查询完库存后进行库存扣减操作,线程2在查询库存时,发现库存充足,也准备执行库存扣减操作,但是需要判断当前的库存是否是之前查询时的库存,结果发现库存数量发生了改变,这就说明数据库中的数据已经发生了修改,需要进行重试(或者直接抛异常中断)
所以CAS法更好 代码实现如下:
// 5、秒杀券合法,则秒杀券抢购成功,秒杀券库存数量减一boolean flag = seckillVoucherService.update(new LambdaUpdateWrapper<SeckillVoucher>().eq(SeckillVoucher::getVoucherId, voucherId).eq(SeckillVoucher::getStock, voucher.getStock()).setSql("stock = stock -1"));
但是以上的方法我们只要发现数据修改就直接终止操作了,qps200,异常90%,我们只需要修改一下判断条件,即只要库存大于0就可以进行修改,而不是库存数据修改我们就终止操作
// 5、秒杀券合法,则秒杀券抢购成功,秒杀券库存数量减一boolean flag = seckillVoucherService.update(new LambdaUpdateWrapper<SeckillVoucher>().eq(SeckillVoucher::getVoucherId, voucherId).gt(SeckillVoucher::getStock, 0).setSql("stock = stock -1"));
qps200,异常50%
单体下的一人一单超卖问题
/*** 抢购秒杀券** @param voucherId* @return*/@Transactional@Overridepublic Result seckillVoucher(Long voucherId) {// 1、查询秒杀券SeckillVoucher voucher = seckillVoucherService.getById(voucherId);// 2、判断秒杀券是否合法if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {// 秒杀券的开始时间在当前时间之后return Result.fail("秒杀尚未开始");}if (voucher.getEndTime().isBefore(LocalDateTime.now())) {// 秒杀券的结束时间在当前时间之前return Result.fail("秒杀已结束");}if (voucher.getStock() < 1) {return Result.fail("秒杀券已抢空");}// 3、判断当前用户是否是第一单int count = this.count(new LambdaQueryWrapper<VoucherOrder>().eq(VoucherOrder::getUserId, ThreadLocalUtls.getUser().getId()));if (count >= 1) {// 当前用户不是第一单return Result.fail("用户已购买");}// 4、用户是第一单,可以下单,秒杀券库存数量减一boolean flag = seckillVoucherService.update(new LambdaUpdateWrapper<SeckillVoucher>().eq(SeckillVoucher::getVoucherId, voucherId).gt(SeckillVoucher::getStock, 0).setSql("stock = stock -1"));if (!flag) {throw new RuntimeException("秒杀券扣减失败");}// 5、创建对应的订单,并保存到数据库VoucherOrder voucherOrder = new VoucherOrder();long orderId = redisIdWorker.nextId(SECKILL_VOUCHER_ORDER);voucherOrder.setId(orderId);voucherOrder.setUserId(ThreadLocalUtls.getUser().getId());voucherOrder.setVoucherId(voucherOrder.getId());flag = this.save(voucherOrder);if (!flag) {throw new RuntimeException("创建秒杀券订单失败");}// 6、返回订单idreturn Result.ok(orderId);}
}
通过测试,发现并没有达到我们想象中的目标,一个人只能购买一次,但是发现一个用户居然能够购买8次。这说明还是存在超卖问题!
问题原因:出现这个问题的原因和前面库存为负数数的情况是一样的,线程1查询当前用户是否有订单,当前用户没有订单准备下单,此时线程2也查询当前用户是否有订单,由于线程1还没有完成下单操作,线程2同样发现当前用户未下单,也准备下单,这样明明一个用户只能下一单,结果下了两单,也就出现了超卖问题
悲观锁解决一人多单超卖问题
使用synchronized锁住用户ID,确保同一用户串行执行。
/*** 抢购秒杀券** @param voucherId* @return*/@Transactional@Overridepublic Result seckillVoucher(Long voucherId) {// 1、查询秒杀券SeckillVoucher voucher = seckillVoucherService.getById(voucherId);// 2、判断秒杀券是否合法if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {// 秒杀券的开始时间在当前时间之后return Result.fail("秒杀尚未开始");}if (voucher.getEndTime().isBefore(LocalDateTime.now())) {// 秒杀券的结束时间在当前时间之前return Result.fail("秒杀已结束");}if (voucher.getStock() < 1) {return Result.fail("秒杀券已抢空");}// 3、创建订单Long userId = ThreadLocalUtls.getUser().getId();synchronized (userId.toString().intern()) {// 创建代理对象,使用代理对象调用第三方事务方法, 防止事务失效IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();return proxy.createVoucherOrder(userId, voucherId);}}/*** 创建订单** @param userId* @param voucherId* @return*/@Transactionalpublic Result createVoucherOrder(Long userId, Long voucherId) {
// synchronized (userId.toString().intern()) {// 1、判断当前用户是否是第一单int count = this.count(new LambdaQueryWrapper<VoucherOrder>().eq(VoucherOrder::getUserId, userId));if (count >= 1) {// 当前用户不是第一单return Result.fail("用户已购买");}// 2、用户是第一单,可以下单,秒杀券库存数量减一boolean flag = seckillVoucherService.update(new LambdaUpdateWrapper<SeckillVoucher>().eq(SeckillVoucher::getVoucherId, voucherId).gt(SeckillVoucher::getStock, 0).setSql("stock = stock -1"));if (!flag) {throw new RuntimeException("秒杀券扣减失败");}// 3、创建对应的订单,并保存到数据库VoucherOrder voucherOrder = new VoucherOrder();long orderId = redisIdWorker.nextId(SECKILL_VOUCHER_ORDER);voucherOrder.setId(orderId);voucherOrder.setUserId(ThreadLocalUtls.getUser().getId());voucherOrder.setVoucherId(voucherOrder.getId());flag = this.save(voucherOrder);if (!flag) {throw new RuntimeException("创建秒杀券订单失败");}// 4、返回订单idreturn Result.ok(orderId);
// }}
实现细节:重要重要
(1)锁的范围尽量小。synchronized尽量锁代码块,而不是方法,锁的范围越大性能越低
(2)锁的对象一定要是一个不变的值。我们不能直接锁 Long 类型的 userId,每请求一次都会创建一个新的 userId 对象,synchronized 要锁不变的值,所以我们要将 Long 类型的 userId 通过 toString()方法转成 String 类型的 userId,toString()方法底层(可以点击去看源码)是直接 new 一个新的String对象,显然还是在变,所以我们要使用 intern() 方法从常量池中寻找与当前 字符串值一致的字符串对象,这就能够保障一个用户 发送多次请求,每次请求的 userId 都是不变的,从而能够完成锁的效果(并行变串行)
(3)我们要锁住整个事务,而不是锁住事务内部的代码。如果我们锁住事务内部的代码会导致其它线程能够进入事务,当我们事务还未提交,锁一旦释放,仍然会存在超卖问题
(4)Spring的@Transactional注解要想事务生效,必须使用动态代理。Service中一个方法中调用另一个方法,另一个方法使用了事务,此时会导致@Transactional失效,所以我们需要创建一个代理对象,使用代理对象来调用方法。
让代理对象生效的步骤
①引入AOP依赖,动态代理是AOP的常见实现之一
<dependency><groupId>org.aspectj</groupId><artifactId>aspectjweaver</artifactId></dependency>
②暴露动态代理对象,默认是关闭的
@EnableAspectJAutoProxy(exposeProxy = true)
集群下的一人一单超卖问题
在集群部署的情况下,请求访问到不同的服务器,这个synchronized锁形同虚设,这是由于synchronized是本地锁,只能提供线程级别的同步,每个JVM中都有一把synchronized锁,不能跨 JVM 进行上锁,当一个线程进入被 synchronized 关键字修饰的方法或代码块时,它会尝试获取对象的内置锁(也称为监视器锁)。如果该锁没有被其他线程占用,则当前线程获得锁,可以继续执行代码;否则,当前线程将进入阻塞状态,直到获取到锁为止。而现在我们是多台服务器,也就意味着有多个JVM,所以synchronized会失效!
从而会出现超卖问题!
分布式锁
分布式锁:满足分布式系统或集群模式下多进程可见并且互斥的锁
简要原理
前面sychronized锁失效的原因是由于每一个JVM都有一个独立的锁监视器,用于监视当前JVM中的sychronized锁,所以无法保障多个集群下只有一个线程访问一个代码块。所以我们直接将使用一个分布锁,在整个系统的全局中设置一个锁监视器,从而保障不同节点的JVM都能够识别,从而实现集群下只允许一个线程访问一个代码块
分布式锁特点
-
多线程可见:分布式锁存储在共享存储(如Redis)中,所有线程和节点都能看到锁的状态。
-
互斥性:任何时候只有一个线程或节点能持有锁,其他线程或节点必须等待。
-
高可用性:
即使部分节点故障,锁服务仍能正常工作。
具备容错性,锁持有者故障时能自动释放锁。 -
高性能:
锁的获取和释放操作要快,减少对共享资源的等待时间。
减少锁竞争带来的开销。 -
安全性:
可重入性:同一线程可多次获取同一把锁。
锁超时机制:避免锁被长时间占用,设置超时时间自动释放锁。
分布式锁的常见实现方式
-
基于关系数据库:可以利用数据库的事务特性和唯一索引来实现分布式锁。通过向数据库插入一条具有唯一约束的记录作为锁,其他进程在获取锁时会受到数据库的并发控制机制限制。
-
基于缓存(如Redis):使用分布式缓存服务(如Redis)提供的原子操作来实现分布式锁。通过将锁信息存储在缓存中,其他进程可以通过检查缓存中的锁状态来判断是否可以获取锁。
-
基于ZooKeeper:ZooKeeper是一个分布式协调服务,可以用于实现分布式锁。通过创建临时有序节点,每个请求都会尝试创建一个唯一的节点,并检查自己是否是最小节点,如果是,则表示获取到了锁。
-
基于分布式算法:还可以利用一些分布式算法来实现分布式锁,例如Chubby、DLM(Distributed Lock Manager)等。这些算法通过在分布式系统中协调进程之间的通信和状态变化,实现分布式锁的功能。
setnx指令的特点:setnx只能设置key不存在的值,值不存在设置成功,返回 1 ;值存在设置失败,返回 0
Redis分布式锁的实现
获取锁:
-
使用setnx指令设置锁,确保锁的唯一性。
-
为锁设置超时时间,避免死锁。
#保障指令的原子性
# 添加锁
set [key] [value] ex [time] nx
Boolean result = stringRedisTemplate.opsForValue().setIfAbsent("lock:" + name, threadId, timeoutSec, TimeUnit.SECONDS);
释放锁:使用del指令删除锁
stringRedisTemplate.delete("lock:" + name);
Redis分布式锁解决集群下的超卖问题
(1)创建分布式锁
public class SimpleRedisLock implements Lock {/*** RedisTemplate*/private StringRedisTemplate stringRedisTemplate;/*** 锁的名称*/private String name;public SimpleRedisLock(StringRedisTemplate stringRedisTemplate, String name) {this.stringRedisTemplate = stringRedisTemplate;this.name = name;}/*** 获取锁** @param timeoutSec 超时时间* @return*/@Overridepublic boolean tryLock(long timeoutSec) {String id = Thread.currentThread().getId() + "";// SET lock:name id EX timeoutSec NXBoolean result = stringRedisTemplate.opsForValue().setIfAbsent("lock:" + name, id, timeoutSec, TimeUnit.SECONDS);return Boolean.TRUE.equals(result);}/*** 释放锁*/@Overridepublic void unlock() {stringRedisTemplate.delete("lock:" + name);}
}
(2)使用分布式锁
改造前面VoucherOrderServiceImpl中的代码,将之前使用sychronized锁的地方,改成我们自己实现的分布式锁:
// 3、创建订单(使用分布式锁)Long userId = ThreadLocalUtls.getUser().getId();SimpleRedisLock lock = new SimpleRedisLock(stringRedisTemplate, "order:" + userId);boolean isLock = lock.tryLock(1200);if (!isLock) {// 索取锁失败,重试或者直接抛异常(这个业务是一人一单,所以直接返回失败信息)return Result.fail("一人只能下一单");}try {// 索取锁成功,创建代理对象,使用代理对象调用第三方事务方法, 防止事务失效IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();return proxy.createVoucherOrder(userId, voucherId);} finally {lock.unlock();}
(3)实现细节
try…finally…确保发生异常时锁能够释放,注意这给地方不要使用catch,A事务方法内部调用B事务方法,A事务方法不能够直接catch,否则会导致事务失效。
分布式锁优化
优化1 解决锁超时释放出现的超卖问题
问题
当线程1获取锁后,由于业务阻塞,线程1的锁超时释放了,这时候线程2趁虚而入拿到了锁,然后此时线程1业务完成了,然后把线程2刚刚获取的锁给释放了,这时候线程3又趁虚而入拿到了锁,这就导致又出现了超卖问题!(但是这个在小项目(并发数不高)中出现的概率比较低,在大型项目(并发数高)情况下是有一定概率的)
如何解决呢?
我们为分布式锁添加一个线程标识,在释放锁时判断当前锁是否是自己的锁,是自己的就直接释放,不是自己的就不释放锁,从而解决多个线程同时获得锁的情况导致出现超卖
package com.hmdp.utils.lock.impl;import cn.hutool.core.lang.UUID;
import com.hmdp.utils.lock.Lock;
import org.springframework.data.redis.core.StringRedisTemplate;import java.util.concurrent.TimeUnit;/*** @author ghp* @title* @description*/
public class SimpleRedisLock implements Lock {/*** RedisTemplate*/private StringRedisTemplate stringRedisTemplate;/*** 锁的名称*/private String name;/*** key前缀*/public static final String KEY_PREFIX = "lock:";/*** ID前缀*/public static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";public SimpleRedisLock(StringRedisTemplate stringRedisTemplate, String name) {this.stringRedisTemplate = stringRedisTemplate;this.name = name;}/*** 获取锁** @param timeoutSec 超时时间* @return*/@Overridepublic boolean tryLock(long timeoutSec) {String threadId = ID_PREFIX + Thread.currentThread().getId() + "";// SET lock:name id EX timeoutSec NXBoolean result = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS);return Boolean.TRUE.equals(result);}/*** 释放锁*/@Overridepublic void unlock() {// 判断 锁的线程标识 是否与 当前线程一致String currentThreadFlag = ID_PREFIX + Thread.currentThread().getId();String redisThreadFlag = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);if (currentThreadFlag != null || currentThreadFlag.equals(redisThreadFlag)) {// 一致,说明当前的锁就是当前线程的锁,可以直接释放stringRedisTemplate.delete(KEY_PREFIX + name);}// 不一致,不能释放}
}
package com.hmdp.utils.lock.impl;import cn.hutool.core.lang.UUID;
import com.hmdp.utils.lock.Lock;
import org.springframework.data.redis.core.StringRedisTemplate;import java.util.concurrent.TimeUnit;/*** @author ghp* @title* @description*/
public class SimpleRedisLock implements Lock {/*** RedisTemplate*/private StringRedisTemplate stringRedisTemplate;/*** 锁的名称*/private String name;/*** key前缀*/public static final String KEY_PREFIX = "lock:";/*** ID前缀*/public static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";public SimpleRedisLock(StringRedisTemplate stringRedisTemplate, String name) {this.stringRedisTemplate = stringRedisTemplate;this.name = name;}/*** 获取锁** @param timeoutSec 超时时间* @return*/@Overridepublic boolean tryLock(long timeoutSec) {String threadId = ID_PREFIX + Thread.currentThread().getId() + "";// SET lock:name id EX timeoutSec NXBoolean result = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS);return Boolean.TRUE.equals(result);}/*** 释放锁*/@Overridepublic void unlock() {// 判断 锁的线程标识 是否与 当前线程一致String currentThreadFlag = ID_PREFIX + Thread.currentThread().getId();String redisThreadFlag = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);if (currentThreadFlag != null && currentThreadFlag.equals(redisThreadFlag)) {// 一致,说明当前的锁就是当前线程的锁,可以直接释放stringRedisTemplate.delete(KEY_PREFIX + name);}// 不一致,不能释放}
}
优化2 解决释放锁时的原子性问题
我们通过给锁添加一个线程标识,并且在释放锁时添加一个判断,从而防止锁超时释放产生的超卖问题,一定程度上解决了超卖问题,但是仍有可能发生超卖问题(出现超卖概率更低了):当线程1获取锁,执行完业务然后并且判断完当前锁是自己的锁时,但就在此时发生了阻塞,结果锁被超时释放了,线程2立马就趁虚而入了,获得锁执行业务,但就在此时线程1阻塞完成,由于已经判断过锁,已经确定锁是自己的锁了,于是直接就删除了锁,结果删的是线程2的锁,这就又导致线程3趁虚而入了,从而继续发生超卖问题
解决方案:使用Lua脚本确保判断锁和释放锁的原子性。
Lua脚本的优势
-
原子性:
Redis执行Lua脚本时,会阻塞其他命令和脚本,确保脚本内的操作是原子的。
类似于事务的MULTI/EXEC,但Lua脚本更轻量。 -
高性能:
Lua脚本在Redis中执行,避免了多次网络通信的开销。 -
简单易用:
Lua脚本可以直接嵌入Java代码中,通过Redis执行。
编写lua脚本
释放锁的lua脚本
检查锁的线程标识是否与当前线程一致。
如果一致,则删除锁;否则,不做任何操作。
脚本内容:
-- 比较缓存中的线程标识与当前线程标识是否一致
if (redis.call('get', KEYS[1]) == ARGV[1]) then-- 一致,直接删除return redis.call('del', KEYS[1])
end
-- 不一致,返回0
return 0
脚本说明:
KEYS[1]:锁的Key(如lock:order:1)。
ARGV[1]:当前线程的标识(如UUID-线程ID)。
在Java中加载Lua脚本
定义Lua脚本:
将Lua脚本保存为文件(如unlock.lua),并放在resources/lua目录下。
加载Lua脚本:
使用DefaultRedisScript加载Lua脚本。
private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;static {UNLOCK_SCRIPT = new DefaultRedisScript<>();UNLOCK_SCRIPT.setLocation(new ClassPathResource("lua/unlock.lua"));UNLOCK_SCRIPT.setResultType(Long.class);
}
实现释放锁的逻辑
释放锁的Java代码:使用stringRedisTemplate.execute执行Lua脚本。
@Override
public void unlock() {// 执行Lua脚本stringRedisTemplate.execute(UNLOCK_SCRIPT,Collections.singletonList(KEY_PREFIX + name), // KEYS[1]ID_PREFIX + Thread.currentThread().getId() // ARGV[1]);
}
Redission
在分布式系统中,为保证数据一致性和线程安全,常需要使用分布式锁。但自己实现的分布式锁存在诸多问题,难以达到生产可用级别:
- 不可重入:同一线程无法重复获取同一把锁,易造成死锁。例如在嵌套方法调用中,若方法 A 和方法 B 都需获取同一把锁,线程 1 在方法 A 获取锁后,进入方法 B 再次获取时会失败,导致死锁。
- 不可重试:获取锁仅尝试一次,失败即返回 false,无重试机制。若线程 1 获取锁失败后直接结束,会导致数据丢失,比如线程 1 要将数据写入数据库,因锁被线程 2 占用而放弃,数据无法正常写入。
- 超时释放问题:虽超时释放机制能降低死锁概率,但有效期设置困难。有效期过短,业务未执行完锁就释放,存在安全隐患;有效期过长,易出现死锁。
- 主从一致性问题:在 Redis 主从集群中,主从同步存在延迟。若线程 1 在主节点获取锁后,主节点故障,从节点未及时同步该锁信息,其他线程可能在从节点再次获取到该锁,导致数据不一致。
- Redisson 是成熟的 Redis 框架,提供分布式锁和同步器、分布式对象、分布式集合、分布式服务等多种分布式解决方案,可有效解决上述问题,因此可直接使用 Redisson优化分布式锁。
使用步骤
(1)引入依赖
<dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.13.6</version>
</dependency>
(2)配置Redisson客户端
@Configuration
public class RedissonConfig {@Value("${spring.redis.host}")private String host;@Value("${spring.redis.port}")private String port;@Value("${spring.redis.password}")private String password;@Beanpublic RedissonClient redissonClient() {Config config = new Config();config.useSingleServer().setAddress("redis://" + this.host + ":" + this.port).setPassword(this.password);return Redisson.create(config);}
}
(3) 修改使用锁的密码
在业务代码中,使用 Redisson 客户端获取锁并尝试加锁:
Long userId = ThreadLocalUtls.getUser().getId();
RLock lock = redissonClient.getLock(RedisConstants.LOCK_ORDER_KEY + userId);
boolean isLock = lock.tryLock();
tryLock 方法详解
- tryLock():使用默认的超时时间和等待机制,具体超时时间由 Redisson 配置文件或自定义配置决定。
- tryLock(long time, TimeUnit unit):在指定的 time 时间内尝试获取锁,若成功则返回
true;若在指定时间内未获取到锁,则返回 false。 - tryLock(long waitTime, long leaseTime, TimeUnit unit):指定等待时间waitTime,若超过 leaseTime 仍未获取到锁,则直接返回失败。 无参的 tryLock 方法中,waitTime 默认值为-1,表示不等待;leaseTime 默认值为 30 秒,即锁超过 30 秒未释放会自动释放。自上而下,tryLock 方法的灵活性逐渐提高。
Redisson可重入锁原理
Redisson 内部将锁以 hash 数据结构存储在 Redis 中,每次获取锁时,将对应线程的 value 值加 1;每次释放锁时,将 value 值减 1;只有当 value 值归 0 时,才真正释放锁,以此确保锁的可重入性。
Redisson底层通过利用Lua脚本确保 判断锁是否存在、添加锁的有效期、添加线程标识这些的操作全部封装到了一个Lua脚本(确保了锁的原子性和可重入性)
可重入问题解决
利用 hash 结构记录线程 ID 和重入次数。每次线程获取锁时,检查 hash 结构中该线程 ID 对应的重入次数,若不存在则初始化重入次数为 1,若已存在则将重入次数加 1。
可重试问题解决
利用信号量和 PubSub(发布 - 订阅)功能实现等待、唤醒机制。当线程获取锁失败时,将其放入等待队列,通过 PubSub 监听锁释放的消息,一旦锁释放,唤醒等待队列中的线程重试获取锁。
超时续约问题解决
利用看门狗(WatchDog)机制,每隔一段时间(releaseTime / 3)重置锁的超时时间。若线程持有锁的时间超过预设的有效时间,看门狗会自动延长锁的有效期,确保业务执行完毕后再释放锁。
主从一致性问题解决
利用 Redisson 的 MultiLock 机制,多个独立的 Redis 节点必须都获取到重入锁,才算获取锁成功。这样即使主从节点同步存在延迟,也能保证锁的一致性。但此方法存在运维成本高、实现复杂的缺陷。
编码实现可重入锁
可重入锁需要进行一系列的逻辑判断,这些逻辑代码我们最好将它们全都封装到一个 Lua脚本 中,以确保操作的原子性,从而确保线程安全(Redisson底层也是这么干的)
1)编写获取锁的Lua脚本
---
--- Generated by EmmyLua(https://github.com/EmmyLua)
--- Created by ghp.
--- DateTime: 2023/2/14 16:11
---
-- 获取锁的key,即: KEY_PREFIX + name
local key = KEYS[1];
-- 获取当前线程的标识, 即: ID_PREFIX + Thread.currentThread().getId()
local threadId = ARGV[1];
-- 锁的有效期
local releaseTime = ARGV[2];-- 判断缓存中是否存在锁
if (redis.call('EXISTS', key) == 0) then-- 不存在,获取锁redis.call('HSET', key, threadId, '1');-- 设置锁的有效期redis.call('EXPIRE', key, releaseTime);return 1; -- 返回1表示锁获取成功
end-- 缓存中已存在锁,判断threadId是否说自己的
if (redis.call('HEXISTS', key, threadId) == 1) then-- 是自己的锁,获取锁然后重入次数+1redis.call('HINCRBY', key, threadId, '1');-- 设置有效期redis.call('EXPIRE', key, releaseTime);return 1; -- 返回1表示锁获取成功
end-- 锁不是自己的,直接返回0,表示锁获取失败
return 0;
2)编写释放锁的Lua脚本
---
--- Generated by EmmyLua(https://github.com/EmmyLua)
--- Created by ghp.
--- DateTime: 2023/2/14 16:11
---
-- 获取锁的key,即: KEY_PREFIX + name
local key = KEYS[1];
-- 获取当前线程的标识, 即: ID_PREFIX + Thread.currentThread().getId()
local threadId = ARGV[1];
-- 锁的有效期
local releaseTime = ARGV[2];-- 判断当前线程的锁是否还在缓存中
if (redis.call('HEXISTS', key, threadId) == 0) then-- 缓存中没找到自己的锁,说明锁已过期,则直接返回空return nil; -- 返回nil,表示啥也不干
end
-- 缓存中找到了自己的锁,则重入次数-1
local count = redis.call('HINCRBY', key, threadId, -1);-- 进一步判断是否需要释放锁
if (count > 0) then-- 重入次数大于0,说明不能释放锁,且刷新锁的有效期redis.call('EXPIRE', key, releaseTime);return nil;
else-- 重入次数等于0,说明可以释放锁redis.call('DEL', key);return nil;
end
3)编写可重入锁
public class ReentrantLock implements Lock {/*** RedisTemplate*/private StringRedisTemplate stringRedisTemplate;/*** 锁的名称*/private String name;/*** key前缀*/private static final String KEY_PREFIX = "lock:";/*** ID前缀*/private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";/*** 锁的有效期*/public long timeoutSec;public ReentrantLock(StringRedisTemplate stringRedisTemplate, String name) {this.stringRedisTemplate = stringRedisTemplate;this.name = name;}/*** 加载获取锁的Lua脚本*/private static final DefaultRedisScript<Long> TRYLOCK_SCRIPT;static {TRYLOCK_SCRIPT = new DefaultRedisScript<>();TRYLOCK_SCRIPT.setLocation(new ClassPathResource("lua/re-trylock.lua"));TRYLOCK_SCRIPT.setResultType(Long.class);}/*** 获取锁** @param timeoutSec 超时时间* @return*/@Overridepublic boolean tryLock(long timeoutSec) {this.timeoutSec = timeoutSec;// 执行lua脚本Long result = stringRedisTemplate.execute(TRYLOCK_SCRIPT,Collections.singletonList(KEY_PREFIX + name),ID_PREFIX + Thread.currentThread().getId(),Long.toString(timeoutSec));return result != null && result.equals(1L);}/*** 加载释放锁的Lua脚本*/private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;static {UNLOCK_SCRIPT = new DefaultRedisScript<>();UNLOCK_SCRIPT.setLocation(new ClassPathResource("lua/re-unlock.lua"));UNLOCK_SCRIPT.setResultType(Long.class);}/*** 释放锁*/@Overridepublic void unlock() {// 执行lua脚本stringRedisTemplate.execute(UNLOCK_SCRIPT,Collections.singletonList(KEY_PREFIX + name),ID_PREFIX + Thread.currentThread().getId(),Long.toString(this.timeoutSec));}
}
秒杀流程优化-同步改异步
之前秒杀业务流程都是同步执行的,但是性能很差,这里的下单是可以使用异步的,因为下单操作比较耗时,后端操作步骤多,可以进行拆分。
我们可以将一部分的工作交给Redis,并且不能直接去调用Redis,而是通过开启一个独立的子线程去异步执行,从而大大提高效率
实现
编写lua脚本
---
--- Generated by EmmyLua(https://github.com/EmmyLua)
--- Created by ghp.
--- DateTime: 2023/7/15 15:22
--- Description 判断库存是否充足 && 判断用户是否已下单
---
-- 优惠券id
local voucherId = ARGV[1];
-- 用户id
local userId = ARGV[2];-- 库存的key
local stockKey = 'seckill:stock:' .. voucherId;
-- 订单key
local orderKey = 'seckill:order:' .. voucherId;-- 判断库存是否充足 get stockKey > 0 ?
local stock = redis.call('GET', stockKey);
if (tonumber(stock) <= 0) then-- 库存不足,返回1return 1;
end-- 库存充足,判断用户是否已经下过单 SISMEMBER orderKey userId
if (redis.call('SISMEMBER', orderKey, userId) == 1) then-- 用户已下单,返回2return 2;
end-- 库存充足,没有下过单,扣库存、下单
redis.call('INCRBY', stockKey, -1);
redis.call('SADD', orderKey, userId);
-- 返回0,标识下单成功
return 0;
细节
- 库存判断放到Redis中,我们应该使用哪一种数据结构存储订单的库存呢?可以直接使用 String类型的数据结构,Redis的IO操作是单线程的,所以能够充分保障线程安全。
- 一人一单的判断也是由Redis完成的,所以我们需要在Redis中存储订单信息,而订单是唯一的,所以我们可以使用 Set类型的数据结构
- lua脚本中,接收的参数都是String类型的,String类型的数据无法进行比较,我们需要利用tonumber函数将String转成Number
- stringRedisTemplate.execute这个方法,第二个参数是应该List集合,标识传入Lua脚本中的的key,如果我们没有传key,那么直接使用Collections.emptyList(),而不是直接使用null,是因为在stringRedisTemplate.execute 方法内部可能对参数进行了处理,如果传递 null 可能引发NPE异常
RabbitMQ代替阻塞队列
RabbitMQ实现异步秒杀
点赞
使用set数据结构实现一人只能点赞一次
思路分析:
1.给Blog类中添加一个isLike字段,标示是否被当前用户点赞。
2.修改点赞功能,利用Redis的Set集合判断是否点赞过,未点赞则点赞数+1,已点赞则点赞数-1。
3.修改分页查询Blog业务和根据id查询Blog的业务,判断当前用户是否点赞过,赋值给isLike字段。
@PutMapping("/like/{id}")
public Result likeBlog(@PathVariable("id") Long id) {return blogService.likeBlog(id);
}
Result likeBlog(Long id);
private final StringRedisTemplate stringRedisTemplate;
public BlogServiceImpl(StringRedisTemplate stringRedisTemplate) {this.stringRedisTemplate = stringRedisTemplate;
}
@Override
public Result likeBlog(Long id) {//1.获取登录用户UserDTO user = UserHolder.getUser();Long userId = user.getId();//2.判断当前用户是否已经点赞过String key = "blog:liked:" +id;Boolean isMember = stringRedisTemplate.opsForSet().isMember(key, userId.toString());if(BooleanUtil.isFalse(isMember)){//3.未点赞,可以点赞//3.1.数据库点赞数+1boolean isSuccess = update().setSql("liked=liked+1").eq("id", id).update();//3.2.保存用户到Redisif(isSuccess){stringRedisTemplate.opsForSet().add(key,userId.toString());}}else{//4.已点赞,取消点赞//4.1.数据库点赞数-1boolean isSuccess = update().setSql("liked=liked-1").eq("id", id).update();//4.2.把用户从Redis的set集合移除stringRedisTemplate.opsForSet().remove(key,userId.toString());}return Result.ok();
}
public Boolean isBlogLiked(Blog blog) {Long userId = null;try {//1.获取登录用户userId = UserHolder.getUser().getId();} catch (Exception e) {log.debug("用户未登录!");return false;}//2.判断当前用户是否已经点赞过String key = "blog:liked:" +blog.getId();Boolean isMember = stringRedisTemplate.opsForSet().isMember(key, userId.toString());try {blog.setIsLike(BooleanUtil.isTrue(isMember));} catch (Exception e) {log.debug("点赞信息为空!");return false;}return isMember;
}
isBlogLiked的作用主要是给blog对象设置值(在Java中对象是引用传递),前端会根据返回的blog对象的isLike参数的true或false来给点赞标签高亮或灰暗。
@Service
public class BlogServiceImpl extends ServiceImpl<BlogMapper, Blog> implements IBlogService {@Resourceprivate IUserService userService;@Resourceprivate StringRedisTemplate stringRedisTemplate;/*** 根据id查询博客** @param id* @return*/@Overridepublic Result queryBlogById(Long id) {// 查询博客信息Blog blog = this.getById(id);if (Objects.isNull(blog)) {return Result.fail("笔记不存在");}// 查询blog相关的用户信息queryUserByBlog(blog);// 判断当前用户是否点赞该博客isBlogLiked(blog);return Result.ok(blog);}/*** 判断当前用户是否点赞该博客*/private void isBlogLiked(Blog blog) {Long userId = ThreadLocalUtls.getUser().getId();String key = BLOG_LIKED_KEY + blog.getId();Boolean isMember = stringRedisTemplate.opsForSet().isMember(key, userId.toString());blog.setIsLike(BooleanUtil.isTrue(isMember));}/*** 查询热门博客** @param current* @return*/@Overridepublic Result queryHotBlog(Integer current) {// 根据用户查询Page<Blog> page = this.query().orderByDesc("liked").page(new Page<>(current, SystemConstants.MAX_PAGE_SIZE));// 获取当前页数据List<Blog> records = page.getRecords();// 查询用户records.forEach(blog -> {this.queryUserByBlog(blog);this.isBlogLiked(blog);});return Result.ok(records);}/*** 点赞** @param id* @return*/@Overridepublic Result likeBlog(Long id) {// 判断用户是否点赞Long userId = ThreadLocalUtls.getUser().getId();String key = BLOG_LIKED_KEY + blog.getId();// sismember key valueBoolean isMember = stringRedisTemplate.opsForSet().isMember(key, userId.toString());boolean result;if (BooleanUtil.isFalse(isMember)) {// 用户未点赞,点赞数+1result = this.update(new LambdaUpdateWrapper<Blog>().eq(Blog::getId, id).setSql("liked = liked + 1"));if (result) {// 数据库更新成功,更新缓存 sadd key valuestringRedisTemplate.opsForSet().add(key, userId.toString());}} else {// 用户已点赞,点赞数-1result = this.update(new LambdaUpdateWrapper<Blog>().eq(Blog::getId, id).setSql("liked = liked - 1"));if (result) {// 数据更新成功,更新缓存 srem key valuestringRedisTemplate.opsForSet().remove(key, userId.toString());}}return Result.ok();}/*** 查询博客相关用户信息** @param blog*/private void queryUserByBlog(Blog blog) {Long userId = blog.getUserId();User user = userService.getById(userId);blog.setName(user.getNickName());blog.setIcon(user.getIcon());}
}
点赞排行榜
按照点赞时间先后排序 返回top5用户
对于Set集合我们可以使用 isMember方法判断用户是否存在,对于SortedList我们可以使用ZSCORE方法判断用户是否存在
Set集合没有提供范围查询,无法获排行榜前几名的数据,SortedList可以使用ZRANGE方法实现范围查询
@Service
public class BlogServiceImpl extends ServiceImpl<BlogMapper, Blog> implements IBlogService {@Resourceprivate IUserService userService;@Resourceprivate StringRedisTemplate stringRedisTemplate;/*** 根据id查询博客** @param id* @return*/@Overridepublic Result queryBlogById(Long id) {// 查询博客信息Blog blog = this.getById(id);if (Objects.isNull(blog)) {return Result.fail("笔记不存在");}// 查询blog相关的用户信息queryUserByBlog(blog);// 判断当前用户是否点赞该博客isBlogLiked(blog);return Result.ok(blog);}/*** 判断当前用户是否点赞该博客*/private void isBlogLiked(Blog blog) {UserDTO user = ThreadLocalUtls.getUser();if (Objects.isNull(user)){// 当前用户未登录,无需查询点赞return;}Long userId = user.getId();String key = BLOG_LIKED_KEY + blog.getId();Double score = stringRedisTemplate.opsForZSet().score(key, userId.toString());blog.setIsLike(Objects.nonNull(score));}/*** 查询热门博客** @param current* @return*/@Overridepublic Result queryHotBlog(Integer current) {// 根据用户查询Page<Blog> page = this.query().orderByDesc("liked").page(new Page<>(current, SystemConstants.MAX_PAGE_SIZE));// 获取当前页数据List<Blog> records = page.getRecords();// 查询用户records.forEach(blog -> {this.queryUserByBlog(blog);this.isBlogLiked(blog);});return Result.ok(records);}/*** 点赞** @param id* @return*/@Overridepublic Result likeBlog(Long id) {// 1、判断用户是否点赞Long userId = ThreadLocalUtls.getUser().getId();String key = BLOG_LIKED_KEY + id;// zscore key valueDouble score = stringRedisTemplate.opsForZSet().score(key, userId.toString());boolean result;if (score == null) {// 1.1 用户未点赞,点赞数+1result = this.update(new LambdaUpdateWrapper<Blog>().eq(Blog::getId, id).setSql("liked = liked + 1"));if (result) {// 数据库更新成功,更新缓存 zadd key value scorestringRedisTemplate.opsForZSet().add(key, userId.toString(), System.currentTimeMillis());}} else {// 1.2 用户已点赞,点赞数-1result = this.update(new LambdaUpdateWrapper<Blog>().eq(Blog::getId, id).setSql("liked = liked - 1"));if (result) {// 数据更新成功,更新缓存 zrem key valuestringRedisTemplate.opsForZSet().remove(key, userId.toString());}}return Result.ok();}/*** 查询所有点赞博客的用户** @param id* @return*/@Overridepublic Result queryBlogLikes(Long id) {// 查询Top5的点赞用户 zrange key 0 4Long userId = ThreadLocalUtls.getUser().getId();String key = BLOG_LIKED_KEY + id;Set<String> top5 = stringRedisTemplate.opsForZSet().range(key, 0, 4);if (top5 == null || top5.isEmpty()) {return Result.ok(Collections.emptyList());}List<Long> ids = top5.stream().map(Long::valueOf).collect(Collectors.toList());List<UserDTO> userDTOList = userService.listByIds(ids).stream().map(user -> BeanUtil.copyProperties(user, UserDTO.class)).collect(Collectors.toList());return Result.ok(userDTOList);}/*** 查询博客相关用户信息** @param blog*/private void queryUserByBlog(Blog blog) {Long userId = blog.getUserId();User user = userService.getById(userId);blog.setName(user.getNickName());blog.setIcon(user.getIcon());}
}
若要实现先点赞的排在前面,后点赞的排在后面,如何实现捏
查询默认按照数据库的顺序查询,而数据库中的记录默认都是按照id自增的,所以查出来的结果默认是按照id自增
解决方法:
select id, phone,password,nick_name,icon,create_time,update_time
from tb_user
where id in(1, 5)
order by field(id, 5, 1)
/*** 查询所有点赞博客的用户** @param id* @return*/@Overridepublic Result queryBlogLikes(Long id) {// 查询Top5的点赞用户 zrange key 0 4Long userId = ThreadLocalUtls.getUser().getId();String key = BLOG_LIKED_KEY + id;Set<String> top5 = stringRedisTemplate.opsForZSet().range(key, 0, 4);if (top5 == null || top5.isEmpty()) {return Result.ok(Collections.emptyList());}List<Long> ids = top5.stream().map(Long::valueOf).collect(Collectors.toList());String idStr = StrUtil.join(",", ids);// 根据id降序排序 select * from tb_user where id in(1,5) order by field(id, 1, 5)List<UserDTO> userDTOList = userService.list(new LambdaQueryWrapper<User>().in(User::getId, ids).last("order by field (id," + idStr + ")")).stream().map(user -> BeanUtil.copyProperties(user, UserDTO.class)).collect(Collectors.toList());return Result.ok(userDTOList);}
好友关注
两个接口
1 关注和取关接口
2 判断是否关注的接口
博主与粉丝的关系有一张tb_follow表来表示
@RestController
@RequestMapping("/follow")
public class FollowController {@Resourceprivate IFollowService followService;/*** 关注用户* @param followUserId 关注用户的id* @param isFollow 是否已关注* @return*/@PutMapping("/{id}/{isFollow}")public Result follow(@PathVariable("id") Long followUserId, @PathVariable Boolean isFollow){return followService.follow(followUserId, isFollow);}/*** 是否关注用户* @param followUserId 关注用户的id* @return*/@GetMapping("/or/not/{id}")public Result isFollow(@PathVariable("id") Long followUserId){return followService.isFollow(followUserId);}
}
@Service
public class FollowServiceImpl extends ServiceImpl<FollowMapper, Follow> implements IFollowService {/*** 关注用户** @param followUserId 关注用户的id* @param isFollow 是否已关注* @return*/@Overridepublic Result follow(Long followUserId, Boolean isFollow) {Long userId = ThreadLocalUtls.getUser().getId();if (isFollow) {// 用户为关注,则关注Follow follow = new Follow();follow.setUserId(userId);follow.setFollowUserId(followUserId);this.save(follow);} else {// 用户已关注,删除关注信息this.remove(new LambdaQueryWrapper<Follow>().eq(Follow::getUserId, userId).eq(Follow::getFollowUserId, followUserId));}return Result.ok();}/*** 是否关注用户** @param followUserId 关注用户的id* @return*/@Overridepublic Result isFollow(Long followUserId) {Long userId = ThreadLocalUtls.getUser().getId();int count = this.count(new LambdaQueryWrapper<Follow>().eq(Follow::getUserId, userId).eq(Follow::getFollowUserId, followUserId));return Result.ok(count > 0);}
}
Set实现共同关注
我们想要查询出两个用户的共同关注对象,这就需要使用求交集,对于求交集,我们可以使用Set集合
@Service
public class FollowServiceImpl extends ServiceImpl<FollowMapper, Follow> implements IFollowService {@Resourceprivate StringRedisTemplate stringRedisTemplate;@Resourceprivate IUserService userService;/*** 关注用户** @param followUserId 关注用户的id* @param isFollow 是否已关注* @return*/@Overridepublic Result follow(Long followUserId, Boolean isFollow) {Long userId = ThreadLocalUtls.getUser().getId();String key = FOLLOW_KEY + userId;if (isFollow) {// 用户为关注,则关注Follow follow = new Follow();follow.setUserId(userId);follow.setFollowUserId(followUserId);boolean isSuccess = this.save(follow);if (isSuccess) {// 用户关注信息保存成功,把关注的用户id放入Redis的Set集合中,stringRedisTemplate.opsForSet().add(key, followUserId.toString());}} else {// 用户已关注,删除关注信息boolean isSuccess = this.remove(new LambdaQueryWrapper<Follow>().eq(Follow::getUserId, userId).eq(Follow::getFollowUserId, followUserId));if (isSuccess) {stringRedisTemplate.opsForSet().remove(key, followUserId.toString());}}return Result.ok();}/*** 是否关注用户** @param followUserId 关注用户的id* @return*/@Overridepublic Result isFollow(Long followUserId) {Long userId = ThreadLocalUtls.getUser().getId();int count = this.count(new LambdaQueryWrapper<Follow>().eq(Follow::getUserId, userId).eq(Follow::getFollowUserId, followUserId));return Result.ok(count > 0);}/*** 查询共同关注** @param id* @return*/@Overridepublic Result followCommons(Long id) {Long userId = ThreadLocalUtls.getUser().getId();String key1 = FOLLOW_KEY + userId;String key2 = FOLLOW_KEY + id;// 查询当前用户与目标用户的共同关注对象Set<String> intersect = stringRedisTemplate.opsForSet().intersect(key1, key2);if (Objects.isNull(intersect) || intersect.isEmpty()) {return Result.ok(Collections.emptyList());}List<Long> ids = intersect.stream().map(Long::valueOf).collect(Collectors.toList());// 查询共同关注的用户信息List<UserDTO> userDTOList = userService.listByIds(ids).stream().map(user -> BeanUtil.copyProperties(user, UserDTO.class)).collect(Collectors.toList());return Result.ok(userDTOList);}
}
// 查询用户
// listByIds返回的是一个List<User>,所以需要进行转换
// .stream() 方法将集合 ids 转换为一个 Java 8 流(Stream)
// .map(user -> BeanUtil.copyProperties(user, UserDTO.class)) 是一个映射操作,它将集合中的每个元素从 User 类型转换为 UserDTO 类型。
// .collect(Collectors.toList()将流中的映射后的 UserDTO 对象收集到一个新的 List<UserDTO> 中。这个操作将流转换为一个列表,其中包含了经过映射后的 UserDTO 对象。
List<UserDTO> users = userService.listByIds(ids).stream().map(user -> BeanUtil.copyProperties(user, UserDTO.class)).collect(Collectors.toList());
Feed流关注推送
Feed流产品有两种常见模式:
- Timeline:不做内容筛选,简单的按照内容发布时间排序,常用于好友或关注。例如朋友圈
优点 信息全面,不会有缺失。并且实现也相对简单
缺点 信息噪音较多,用户不一定感兴趣,内容获取效率低 - 智能排序:利用智能算法屏蔽掉违规的、用户不感兴趣的内容。推送用户感兴趣信息来吸引用户
优点 投喂用户感兴趣信息,用户粘度很高,容易沉迷
缺点 如果算法不精准,可能起到反作用
采用Timeline的模式。该模式的实现方案有三种:
3. 拉模式:只有粉丝⽤户在读取收件箱的时候, 才会根据其关注的⽤户进⾏拉取,把博主发件箱⾥的消息拉取到粉丝⽤户的收件箱⾥,然后对收件箱⾥的消息按时 间戳进⾏排序。
优点:节约空间
缺点:比较延迟,假设用户关注了大用户,此时就会拉取海量的内容,对服务器压力巨大。
-
推模式:当⽤户(博主)发送消息时,会把消息+时间戳直接发送到所有粉丝⽤户的收件箱中,并按时间戳进⾏排序。当粉 丝⽤户在读取收件箱的消息时,直接读取。
优点: 延迟低
缺点: 发消息时,内容占⽤较⾼。因为每个粉丝都会保留⼀份消息。 -
推拉模式:对于粉丝少的博主⽤户,采⽤推模式。
对于粉丝多的博主⽤户,根据粉丝⽤户类型进⾏判断: 活跃度⾼的粉丝⽤户,采⽤推模式。活跃度低的粉丝⽤户,采⽤拉模式。
推拉模式兼具推和拉两种模式的优点
新需求:
(1)修改新增探店笔记的业务,在保存blog到数据库的同时,推送到粉丝的收件箱
(2)收件箱满足可以根据时间戳排序,必须用Redis的数据结构实现
(3)查询收件箱数据时,可以实现分页查询
注意:
当粉丝⽤户需要按分页模式来读取收件箱的信息时,不能采⽤传统的分页模式(按数据的⾓标开始查)。因为Feed 流中的数据会不断更新,所以数据的⾓标也在不断变化。传统的分页模式,会出现消息重复读的问题。
redis实现:用zset方便排序,时间戳做score
@Overridepublic Result saveBlog(Blog blog) {if (blog.getShopId() == null || blog.getTitle() == null || blog.getContent() == null) {return Result.fail("提交前情把Blog全部信息填写完整(●'◡'●)");}// 1. 获取登录用户UserDTO user = UserHolder.getUser();blog.setUserId(user.getId());// 2.保存探店笔记boolean isSuccess = save(blog);if (!isSuccess) {return Result.fail("新增笔记失败!");}// 3.查询笔记作者的所有粉丝List<Follow> follows = followService.query().eq("follow_user_id", user.getId()).list();// 4.推送笔记id给所有粉丝for (Follow follow : follows) {// 4.1.获取粉丝idLong userId = follow.getUserId();// 4.2.推送 (思路就是只把blog的id传到redis里面,到时候再调用bolg的query方法获取详情)String key = FEED_KEY + userId;// 还是要按时间戳当作value,因为要进行排序stringRedisTemplate.opsForZSet().add(key, blog.getId().toString(), System.currentTimeMillis());}return Result.ok(blog.getId());}
实现分页查询收邮件
1、每次查询完成后,我们要分析出查询出数据的最小时间戳,这个值会作为下一次查询的条件
2、我们需要找到与上一次查询相同的查询个数作为偏移量,下次查询时,跳过这些查询过的数据,拿到我们需要的数据
综上:我们的请求参数中就需要携带 lastId:上一次查询的最小时间戳minTime和偏移量offset这两个参数。
这两个参数第一次会由前端来指定,以后的查询就根据后台结果作为条件,再次传递到后台。
redis使用:opsForZSet的reverseRangeByScoreWithScores方法,传入key、minTime、maxTime、offset、每次查询数量
Set<ZSetOperations.TypedTuple<String>> typedTuples = stringRedisTemplate.opsForZSet().reverseRangeByScoreWithScores(key, 0, max, offset, 2);
@Data
public class ScrollResult {// 查询的Blog结果private List<?> list;// 上次查询的最小时间戳private Long minTime;// 偏移量private Integer offset;
}
@Overridepublic Result queryBlogOfFollow(Long max, Integer offset) {// 1.获取登录用户Long userId = UserHolder.getUser().getId();// 2. 查询自己的收件箱String key = FEED_KEY + userId;Set<ZSetOperations.TypedTuple<String>> typedTuples = stringRedisTemplate.opsForZSet().reverseRangeByScoreWithScores(key, 0, max, offset, 2);if (typedTuples == null || typedTuples.isEmpty()) {return Result.ok(Collections.emptyList());}// 解析数据:blogId、minTime(时间戳)、offsetArrayList<Long> ids = new ArrayList<>(typedTuples.size());long minTime = 2;int os = 1;for (ZSetOperations.TypedTuple<String> tuple : typedTuples) {ids.add(Long.valueOf(tuple.getValue()));long time = tuple.getScore().longValue();if (time == minTime) {os++;} else {minTime = time;os = 1;}}os = minTime == max ? os : os + offset;// 根据id查blogString idStr = StrUtil.join(",", ids);List<Blog> blogs = query().in("id", ids).last("ORDER BY FIELD(id," + idStr + ")").list();// 查询blog相关信息for (Blog blog : blogs) {queryBlogUser(blog);isBlogLike(blog);}// 封装并返回ScrollResult r = new ScrollResult();r.setList(blogs);r.setOffset(os);r.setMinTime(minTime);return Result.ok(r);}
流程
1. 获取登录用户的 ID
Long userId = UserHolder.getUser().getId();
2. 查询用户的收件箱
String key = FEED_KEY + userId;
Set<ZSetOperations.TypedTuple<String>> typedTuples = stringRedisTemplate.opsForZSet().reverseRangeByScoreWithScores(key, 0, max, offset, 2);
if (typedTuples == null || typedTuples.isEmpty()) {return Result.ok(Collections.emptyList());
}
构建 Redis 的键名,FEED_KEY 是一个常量,feed:123,表示用户 ID 为 123 的收件箱。
使用 stringRedisTemplate.opsForZSet().reverseRangeByScoreWithScores 方法从 Redis 的有序集合(Sorted Set)中查询数据。reverseRangeByScoreWithScores 方法的参数含义如下:
- key:要查询的有序集合的键名。
- 0:分数的最小值,表示从分数为 0 开始查询。
- max:分数的最大值,表示查询分数小于等于 max 的元素。
- offset:偏移量,表示从第 offset 个元素开始查询。
- 2:查询的元素数量,表示最多查询 2 个元素。
该方法返回一个 Set<ZSetOperations.TypedTuple> 集合,其中每个 TypedTuple 对象包含元素的值(博客 ID)和对应的分数(时间戳)。
如果查询结果为空,则直接返回一个空列表。
3. 解析查询结果
ArrayList<Long> ids = new ArrayList<>(typedTuples.size());
long minTime = 2;
int os = 1;
for (ZSetOperations.TypedTuple<String> tuple : typedTuples) {ids.add(Long.valueOf(tuple.getValue()));long time = tuple.getScore().longValue();if (time == minTime) {os++;} else {minTime = time;os = 1;}
}
os = minTime == max ? os : os + offset;
- 创建一个 ArrayList 用于存储博客 ID。
- 初始化 minTime 为 2,os 为 1。minTime 用于记录查询结果中最小的时间戳,os 用于记录偏移量。
- 遍历查询结果,将每个 TypedTuple 对象中的博客 ID 转换为 Long 类型并添加到 ids 列表中。
- 获取每个 TypedTuple 对象的分数(时间戳),如果当前时间戳等于 minTime,则 os 加 1;否则,更新 minTime 为当前时间戳,并将 os 重置为 1。
- 根据 minTime 和 max 的关系更新 os 的值,如果 minTime 等于 max,则保持 os 不变;否则,将 os 加上传入的 offset。
4 根据博客id查询博客信息
String idStr = StrUtil.join(",", ids);
List<Blog> blogs = query().in("id", ids).last("ORDER BY FIELD(id," + idStr + ")").list();
- 使用 StrUtil.join 方法将 ids 列表中的元素用逗号连接成一个字符串,例如 1,2,3。
- 使用 MyBatis-Plus 的 query 方法构建查询条件,通过 in 方法筛选出 id 字段在 ids 列表中的博客记录。
- 使用 last 方法在 SQL 语句末尾添加自定义的排序条件,ORDER BY FIELD(id," + idStr + “)” 表示按照 id 列表中的顺序对查询结果进行排序,确保返回的博客列表顺序与 Redis 中查询结果的顺序一致。
- 调用 list 方法执行查询并返回博客列表。
5 查询博客信息
for (Blog blog : blogs) {queryBlogUser(blog);isBlogLike(blog);
}
- 遍历查询到的博客列表,对每篇博客调用 queryBlogUser 方法查询博客作者的用户信息,并将用户信息设置到博客对象中。
- 调用 isBlogLike 方法判断当前登录用户是否点赞了该博客,并将判断结果设置到博客对象的 isLike 属性中。
6 封装并返回结果
ScrollResult r = new ScrollResult();
r.setList(blogs);
r.setOffset(os);
r.setMinTime(minTime);
return Result.ok(r);
- 创建一个 ScrollResult 对象,用于封装查询结果。
- 将查询到的博客列表设置到 ScrollResult 对象的 list 属性中。
- 将计算得到的偏移量 os 设置到 ScrollResult 对象的 offset 属性中。
- 将最小时间戳 minTime 设置到 ScrollResult 对象的 minTime 属性中。
- 使用 Result.ok 方法将 ScrollResult 对象封装成统一的返回结果并返回。
综上,该方法通过从 Redis 中查询用户收件箱信息,获取博客 ID,再根据博客 ID 从数据库中查询博客信息,并补充博客相关的用户信息和点赞状态,最后将结果封装成 ScrollResult 对象返回,实现了滚动分页查询用户关注的博主发布的博客的功能。