1. 库存超卖问题重现
下面这段代码在多线程下存在线程安全问题!
超卖的主要原因是下面的步骤不是原子性的。
- 判断库存是否充足。
- 库存扣减
public String deductProduct(String userId) {int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));log.info("开始扣减库存");//加锁,分布式锁if (stock > 0) {int stockAfterDeduction = stock - 1;stringRedisTemplate.opsForValue().set("stock", String.valueOf(stockAfterDeduction));log.info("扣减库存成功,现库存为 {}", stockAfterDeduction);} else {log.error("库存不足,现库存为:{}", stock);}return "操作成功!";
}
==> 单机情况下,可以加锁(synchronized | ReentrantLock)解决这个线程安全问题。
但是在分布式场景下,必须使用分布式锁解决这个问题。
2. 手写简单分布式锁
set key value nx ex
2.1 可以基于set nx ex实现一个简单的分布式锁
public interface ILock {/*** 尝试获取锁* @param timeoutSecond 锁持有的超时时间,过期后自动删除* @return 是否加锁成功*/boolean tryLock(long timeoutSecond);/*** 释放锁*/void unLock();
}
2.2 分布式锁-实现加锁和解锁流程
public class SimpleRedisLock implements ILock {private static final String KEY_PREFIX = "lock:";//分布式中,集群的标识符uuid,用于表示集群中的每个节点private static final String ID_PREFIX = UUID.randomUUID().toString(true) + ":";//解锁的lua脚本,用于防止锁误删private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;private final String name; //锁的名字private final StringRedisTemplate stringRedisTemplate;public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {this.name = name;this.stringRedisTemplate = stringRedisTemplate;}static {//初始化 unlock.lua脚本文件UNLOCK_SCRIPT = new DefaultRedisScript<>();UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));UNLOCK_SCRIPT.setResultType(Long.TYPE);}@Overridepublic boolean tryLock(long timeoutSecond) {//获取线程标识,uuid + thread-idString threadId = ID_PREFIX + Thread.currentThread().getId();Boolean isLock = stringRedisTemplate.opsForValue()//基于set nx ex实现分布式锁.setIfAbsent(KEY_PREFIX + name, threadId, timeoutSecond, TimeUnit.SECONDS);return Boolean.TRUE.equals(isLock);}@Overridepublic void unLock() {//删除锁,要防止锁误删(lua脚本)stringRedisTemplate.execute(UNLOCK_SCRIPT,Collections.singletonList(KEY_PREFIX + name),ID_PREFIX + Thread.currentThread().getId());}
}
2.3 lua脚本实现
if (redis.call('get', KEYS[1]) == ARGV[1])
then-- 一致的话,就删除锁return redis.call('del', KEYS[1])
end
2.4 分布式锁实战
public String deductProduct(@PathVariable String userId) {ILock lock = new SimpleRedisLock("product", stringRedisTemplate);//尝试加锁boolean isLock = lock.tryLock(60);if (!isLock) {log.error("线程加锁失败: {}", Thread.currentThread().getId());return "加锁失败";}try {int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));log.info("开始扣减库存");if (stock > 0) {int stockAfterDeduction = stock - 1;stringRedisTemplate.opsForValue().set("stock", String.valueOf(stockAfterDeduction));log.info("扣减库存成功,现库存为 {}", stockAfterDeduction);} else {log.error("库存不足,现库存为:{}", stock);}return "操作成功!";}finally {lock.unLock();}
}
3. Redisson框架
uuid(区分不同的jvm服务)+线程id(区分jvm服务上面的线程)
3.1 导入pom坐标
<dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.13.6</version>
</dependency>
3.2 配置Redisson客户端
@Configuration
public class RedissonConfig {@Value("${spring.redis.host}")public String host;@Value("${spring.redis.port}")public String port;@Value("${spring.redis.password}")private String password;@Beanpublic RedissonClient redissonClient() {// 单机redis配置Config config = new Config();String redisAddress = "redis://" + host + ":" + port;config.useSingleServer().setAddress(redisAddress).setPassword(password);// 设置分布式锁 watch dog超时时间// config.setLockWatchdogTimeout()return Redisson.create(config);}
}
3.3 使用分布式锁
public class HelloController {private RedissonClient redissonClient;public String deductProduct(String userId) {//获取分布式锁的锁对象RLock lock = redissonClient.getLock("product");//加分布式锁lock.lock(); //set nx ex 30stry {//业务逻辑}finally {//释放锁lock.unlock();}}
}
3.4 Redission源码分析
3.5 Redis主从架构下,分布式锁失效
Redis的集群架构保证AP,Zookeeper的集群架构保证CP。
但是Redis在主从情况下会出现锁丢失的问题,如何解决呢? => 红锁RedLock