Redis-分布式锁
文章目录
- Redis-分布式锁
- 1.基本原理和不同方式实现方式对比
- 2.Redis分布式锁的基本实现思路
- 3.分布式锁误删问题一
- 4.分布式锁误删问题二
- 5.Redission
- 1.功能介绍
- 2.快速入门
- 3.可重入锁原理
- 4.锁重试和WatchDog机制
- 1.锁重试
- 2. WatchDog 机制(锁自动续期)
- 3.锁重试的获取与释放锁流程图
- 5.MultiLock原理(联合锁)
这篇文章帮助大家简单了解利用Redis实现分布锁,并在实现过程中逐步完善在并发时会出现的问题。
1.基本原理和不同方式实现方式对比
如图在单个jvm中,维护了自己的锁监视器,来监视当前jvm内部线程锁的获取情况,保证线程的正确执行。当集群形式时,一个新的tomcat意味着一个新的jvm(也维护了自己的锁监视器),这时对于jvm1来说对于用户id为1(假设)下单,可以获取到锁,jvm2中用户id为1下单也可以获取到锁,此时无法实现一人一单,存在线程安全问题。
为了解决这个问题,我们可以维护一个外部的锁监视器,让外部的锁监视器来管理多个集群的锁,实现多线程互斥。
所以什么是分布式锁?
分布式锁:满足分布式系统或者集群模式下多进程课件并且互斥的锁。
注意分布式锁的核心是实现多进程之间互斥,满足这一点的方式有很多种,常见的有三种:
以下是提取后的分布式锁三种实现方式的对比表格:
特性 | MySQL | Redis | Zookeeper |
---|---|---|---|
互斥 | 利用 MySQL 本身的互斥锁机制 | 利用 setnx 等互斥命令 | 利用节点的唯一性和有序性实现互斥 |
高可用 | 好 | 好 | 好 |
高性能 | 一般 | 好 | 一般 |
安全性 | 断开连接后自动释放锁 | 通过锁超时时间到期释放 | 临时节点,断开连接后自动释放 |
有关说明
-
互斥机制
- MySQL:基于行锁或乐观锁实现。
- Redis:通过
SETNX
(或SET key value NX EX
)实现原子性抢锁。 - Zookeeper:利用临时有序节点的唯一性(如最小节点获锁)。
-
安全性
- MySQL/Redis 需显式处理锁释放(如超时或断开连接)。
- Zookeeper 的临时节点在会话终止时会自动删除,避免死锁。
-
适用场景
- Redis:高性能、短期锁场景(如秒杀)。
- Zookeeper:强一致性、长期锁场景(如选主)。
- MySQL:依赖数据库且对性能要求不高的场景。
2.Redis分布式锁的基本实现思路
实现分布式锁时需要实现的两个基本方法:
-
获取锁:
-
互斥:确保只能有一个线程获取锁
-
非阻塞:尝试获取锁一次,成功返回true,否则返回false
#添加锁,利用setnx的互斥特性 SETNX lock thread1 #添加锁过期时间,避免服务宕机引起的死锁 EXPIRE lock 10
注意:这里可能存在添加锁后,还没来得及添加锁的过期时间,服务宕机此时锁可能就会无法释放,因此应该保证两个操作的原子性。
#添加锁并添加过期时间NX是互斥,EX是设置超时时间 SET lock thread1 NX EX 10
-
-
释放锁:
-
手动释放
-
超时释放:获取锁时添加一个超时时间
#释放锁,删除即可 DEL key
-
基于上述我们获取锁和释放锁的流程如图所示:
基于redis实现分布式锁初级版本:
需求:定义一个类,实现下面接口,利用Redis实现分布式锁功能。
//接口
public interface ILock {
/**
* 尝试获取锁
* @param timeoutSec 锁持有的超时时间,过期后自动释放
* @return true代表获取锁成功;false代表获取锁失败
*/
boolean tryLock(long timeoutSec);/**
* 释放锁
*/
void unlock();
}
//实现类
import org.springframework.data.redis.core.StringRedisTemplate;
import java.util.concurrent.TimeUnit;
public class SimpleRedisLock implements ILock{// 锁的统一前缀private static final String key_prefix = "lock:";//锁的业务名称private String name;private StringRedisTemplate stringRedisTemplate;public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {this.name = name;this.stringRedisTemplate = stringRedisTemplate;}@Overridepublic boolean tryLock(long timeoutSec) {//使用当前线程的id作为锁的值String lockValue = String.valueOf(Thread.currentThread().getId());//获取锁Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(key_prefix + name, lockValue, timeoutSec, TimeUnit.SECONDS);//直接返回自动拆箱可能会出现空指针异常/* return success;*/return Boolean.TRUE.equals(success);//null会转为false}@Overridepublic void unlock() {//解锁stringRedisTemplate.delete(key_prefix + name);}
}
3.分布式锁误删问题一
如图线程一获取锁后,因为业务阻塞时间过长,锁超时释放。
线程二乘虚而入获取到锁并执行业务,但是线程一业务完成并删除锁(注意删除的是线程二的锁)。
此时线程三也获取到锁,出现线程二、三并行执行,线程安全问题可能发生。
解决方式:当要去释放锁时,检查此锁是不是自己的,再进行操作,这样就不会误删别人的锁。
改进措施:
-
在获取锁时存入线程标示(用UUID表示)(因为每一个jvm维护线程的名称是是id自增,可能存在相同的情况)
-
在释放锁时先获取锁中的线程标示,判断是否与当前线程标示:
-
一致则释放锁
-
不一致则不释放锁
-
import cn.hutool.core.lang.UUID;
import org.springframework.data.redis.core.StringRedisTemplate;
import java.util.concurrent.TimeUnit;
public class SimpleRedisLock implements ILock{// 锁的统一前缀private static final String KEY_PREFIX = "lock:";//--------------------------------------------------------------------------------------//线程id保证唯一性private static final String ID_PREFIX = UUID.randomUUID().toString(true)+"-";//--------------------------------------------------------------------------------------//锁的业务名称private String name;private StringRedisTemplate stringRedisTemplate;public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {this.name = name;this.stringRedisTemplate = stringRedisTemplate;}@Overridepublic boolean tryLock(long timeoutSec) {//使用当前线程的id作为锁的值String lockValue = ID_PREFIX+Thread.currentThread().getId();//获取锁Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, lockValue, timeoutSec, TimeUnit.SECONDS);//直接返回自动拆箱可能会出现空指针异常/* return success;*/return Boolean.TRUE.equals(success);//null会转为false}@Overridepublic void unlock() {//获取当前锁占有的值(值是线程id)String ID = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);//线程的idString threadId = String.valueOf(Thread.currentThread().getId());//判断是不是当前线程占有的锁//是if(ID.equals(threadId)){//解锁stringRedisTemplate.delete(KEY_PREFIX + name);}}
}
此时获取锁和释放锁的流程如图所示:
4.分布式锁误删问题二
我们当前实现的锁在一些情况下还是会出现问题:
如图线程一获取锁并执行业务,业务执行完毕判断当前锁是自己的,但是在删除所之前,发生阻塞。
虽然判断和和释放锁之间没有代码,但是在jvm中存在垃圾回收机制,这里不是业务阻塞而可能是jvm阻塞。
其他进程进入并获取到锁,线程一的阻塞结束并执行释放锁操作(已经判断过),此时删除的是其他线程的锁,则又变成问题一。
这个问题发生的原因是判断锁标志和释放锁是两个动作,在两个动作之间产生阻塞,我们需要保证这两个动作的原子性。
这个问题使用Redis的Lua脚本解决:
Redis提供了Lua脚本功能,在一个脚本中编写多条Redis命令,确保多条命令执行时的原子性。Lua是一种编程语言,它的基本语法可以参考网站:https://www.runoob.com/lua/lua-tutorial.html
这里简单介绍下Redis在Lua中提供的调用函数,语法如下:
#执行 redis命令
redis.call('命令名称', 'key', '其它参数', ...)
#例如,我们要执行set name jack,则脚本是这样:
#执行 set name jackredis.call('set', 'name', 'jack')
#例如,我们要先执行set name Rose,再执行get name,则脚本如下:
#先执行 set name jack
redis.call('set', 'name', 'jack')#再执行 get name
local name = redis.call('get', 'name')#返回
return name
写好脚本以后,用命令行中使用Redis命令来调用脚本,调用脚本的常见命令如下:
#例如,我们要执行 redis.call('set', 'name', 'jack') 这个脚本,语法如下:
#调用脚本
EVAL "return redis.call('set', 'name', 'jack')" 0# 脚本内容 脚本需要的key类型的参数个数
如果脚本中的key、value不想写死,可以作为采纳数传递。key类型采纳数会放入KEYS数组,其它参数会放入ARGV数组,在脚本中可以从KEYS和ARGV数组获取这些参数:
现在我们来改进redis的分布锁:
注意在Java中使用RedisTemplate调用Lua脚本的API如下:
-
在resources下创建并编写Lua脚本
--锁的key local key =KEY[1] --当前线程标示 local threadId =ARGV[1] --获取锁种的线程标示 local id =redis.call('get',key) --比较线程标示和锁种的标示是否一致 if(id == threadId) then--释放锁 del keyreturn redis.call('del',key) end return 0
-
重新实现两个方法
import cn.hutool.core.lang.UUID; import org.springframework.core.io.ClassPathResource; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.data.redis.core.script.DefaultRedisScript; import java.util.Collections; import java.util.concurrent.TimeUnit; public class SimpleRedisLock implements ILock{// 锁的统一前缀private static final String KEY_PREFIX = "lock:";//线程id保证唯一性private static final String ID_PREFIX = UUID.randomUUID().toString(true)+"-";//锁的业务名称private String name;private StringRedisTemplate stringRedisTemplate;public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {this.name = name;this.stringRedisTemplate = stringRedisTemplate;}//--------------------------------------------------------------------------private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;/*作用:定义一个静态不可变的 DefaultRedisScript 对象,用于加载和执行 Lua 脚本。泛型 <Long>:指定脚本返回值的类型为 Long (Lua 脚本返回的数字类型在 Java 中映射为 Long)。*/static{UNLOCK_SCRIPT = new DefaultRedisScript<>();UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));UNLOCK_SCRIPT.setResultType(Long.class);}/*new DefaultRedisScript<>():创建脚本执行器实例。setLocation():从类路径(resources 目录)加载 Lua 脚本文件 unlock.lua。setResultType():声明脚本返回值的类型为 Long(例如返回 1 表示成功,0 表示失败)*///--------------------------------------------------------------------------@Overridepublic boolean tryLock(long timeoutSec) {//使用当前线程的id作为锁的值String lockValue = ID_PREFIX+Thread.currentThread().getId();//获取锁Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, lockValue, timeoutSec, TimeUnit.SECONDS);//直接返回自动拆箱可能会出现空指针异常/* return success;*/return Boolean.TRUE.equals(success);//null会转为false}@Overridepublic void unlock() {stringRedisTemplate.execute(UNLOCK_SCRIPT,Collections.singletonList(KEY_PREFIX+name),ID_PREFIX+Thread.currentThread().getId());} }
此时我们就已经实现了一个生产可用的基于redis的分布式锁。
5.Redission
基于setnx实现的分布式锁还存在以下问题:
- 不可重入:同一个线程无法多次获取同一把锁。
- 不可重试:获取锁只尝试一次就返回false,没有重试机制。
- 超时释放:锁超时释放虽然可以避免死锁,但如果业务的执行耗时较长,也会导致锁释放,存在安全隐患,也就是锁的生存时长不确定。
- 主从一致性:如果Redis配置为主从集群,由于主从同步存在延迟,当主节点宕机时,若从节点尚未完全同步主节点中的锁数据,则会出现锁失效的问题。
通过之前的介绍我们已将了解了分布式锁的基本原理,不过为了解决这些问题我们需要使用一些成熟的框架来帮我们实现------Redission
1.功能介绍
Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务,其中就包含了各种分布式锁的实现。
例如:
分布式锁(Lock)和同步器(Synchronizer)
- 可重入锁(Reentrant Lock)
- 公平锁(Fair Lock)
- 联锁(MultiLock)
- 红锁(RedLock)
- 读写锁(ReadWriteLock)
- 信号量(Semaphore)
- 可过期性信号量(PermitExpirableSemaphore)
- 闭锁(CountDownLatch)
官网地址:https://redisson.org
GitHub地址:https://github.com/redisson/redisson
2.快速入门
-
引入依赖
//引入redission依赖 <dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.13.6</version> </dependency>
-
配置Redission客户端
//配置Redission客户端 @Configuration public class RedisConfig {@Beanpublic RedissonClient redissonClient() {// 配置类Config config = new Config();// 添加redis地址,这里添加了单点的地址,//也可以使用config.useClusterServers()添加集群地址config.useSingleServer().setAddress("redis://地址:6379").setPassword("密码");// 创建客户端return Redisson.create(config);} }
-
使用Redission的分布式锁
@Resource private RedissonClient redissonClient; @Test void testRedisson() throws InterruptedException {// 获取锁(可重入),指定锁的名称RLock lock = redissonClient.getLock("anylock");// 尝试获取锁,参数分别是:获取锁的最大等待时间(期间会重试),锁自动释放时间,时间单位boolean isLock = lock.tryLock(1, 10, TimeUnit.SECONDS);// 判断是否获取成功if(isLock){try {System.out.println("执行业务");}finally {// 释放锁lock.unlock();}} }
3.可重入锁原理
我们之前锁的实现是基于redis中String类型key-value形式,当第一次获取锁成功后,同一个线程再次获取锁,则会判断锁已经被获取从而获取失败。
如图为我们解决锁误删问题所实现的锁的流程图。
可重入锁是基于Hash结构实现的,当我们获取锁后会存入表示和初始化value为1,并添加锁的过期时间。
当同一个线程再次获取当前锁时令value值++,完成一个业务时会释放锁令**值value–**并重置锁的过期时间,直到value值减为0,才会真正释放锁。
如图为可重入锁的流程图逻辑。
为了保证加锁和释放锁过程的一致性,Redission的底层也使用了lua脚本来实现。(注意这里并不是redission中的lua源码)
获取锁的lua脚本:
local key = KEYS[1]; -- 锁的key
local threadId = ARGV[1]; -- 线程唯一标识
local releaseTime = ARGV[2]; -- 锁的自动释放时间
-- 判断是否存在
if(redis.call('exists', key) == 0) then-- 不存在,获取锁redis.call('hset', key, threadId, '1');-- 设置有效期redis.call('expire', key, releaseTime);return 1; -- 返回结果
end;
-- 锁已经存在,判断threadId是否是自己
if(redis.call('hexists', key, threadId) == 1) then-- 存在,获取锁,重入次数+1redis.call('hincrby', key, threadId, '1');-- 设置有效期redis.call('expire', key, releaseTime);return 1; -- 返回结果
end;
return 0; -- 代码走到这里,说明获取锁的不是自己,获取锁失败
释放锁的lua脚本:
local key = KEYS[1]; -- 锁的key
local threadId = ARGV[1]; -- 线程唯一标识
local releaseTime = ARGV[2]; -- 锁的自动释放时间
-- 判断当前锁是否还是被自己持有
if (redis.call('HEXISTS', key, threadId) == 0) then return nil; -- 如果已经不是自己,则直接返回
end;
-- 是自己的锁,则重入次数-1
local count = redis.call('HINCRBY', key, threadId, -1);
-- 判断重入次数是否已经为0
if (count > 0) then-- 大于0说明不能释放锁,重置有效期然后返回redis.call('EXPIRE', key, releaseTime);return nil;
else -- 等于0说明可以释放锁,直接删除redis.call('DEL', key);return nil;
end;
如下图为redisson中一段实现获取锁的代码,其利用lua脚本硬编码实现。(大家可自行查看源码学习)
4.锁重试和WatchDog机制
1.锁重试
tryLock()
方法是 Redisson 提供的可等待的分布式锁,它有多个重载版本,最常用的三个参数版本是:
boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException;
这三个参数决定了锁的获取行为、持有时间,并直接影响 WatchDog 是否生效。
1. tryLock() 的三个参数
(1)waitTime
:获取锁的最大等待时间
-
作用:在这段时间内,如果锁被其他线程占用,当前线程会不断尝试获取锁
不是无脑的无休止尝试获取锁,而是基于 Redis 的 Pub/Sub 消息订阅机制 监听锁释放消息,而这个释放锁的通知就来自于lua脚本中的publish命令:
-
示例:
lock.tryLock(10, TimeUnit.SECONDS); // 最多等10秒,拿不到就放弃
-
底层实现:
- 先尝试立即获取锁。
- 如果失败,则订阅 Redis 的锁释放事件(避免 CPU 空转)。
- 在
waitTime
内,如果收到锁释放的通知,就再次尝试获取。 - 如果超时仍未获取到,返回
false
。
(2)leaseTime
:锁的自动释放时间
-
作用:设置锁的最大持有时间,防止死锁(即使业务代码未显式释放,锁也会自动过期)。
注意如果不设置leaseTime,则会使用默认值-1。
-
示例:
lock.tryLock(1, 10, TimeUnit.SECONDS); // 此锁最多持有10秒
2. WatchDog 机制(锁自动续期)
(1)什么情况下会触发 WatchDog?
- 未指定
leaseTime
(或leaseTime = -1
)时,Redisson 会启动 WatchDog。 - 示例:
lock.tryLock(10, -1, TimeUnit.SECONDS); // 启用 WatchDog
(2)WatchDog 的作用
- 防止业务执行时间过长导致锁提前释放:
- 默认情况下,WatchDog 每隔 10 秒 检查一次锁是否仍然被当前线程持有。
- 如果是,则自动续期 30 秒(默认
lockWatchdogTimeout=30s
)。
- 示例流程:
- 线程 A 获取锁,未指定
leaseTime
,WatchDog 启动。 - 每 10 秒检查一次:
- 如果锁仍被线程 A 持有,则重置 TTL 为 30 秒。
- 如果锁已释放,则停止 WatchDog。
- 业务执行完毕后,手动
unlock()
,WatchDog 停止。
- 线程 A 获取锁,未指定
3.锁重试的获取与释放锁流程图
简单总结:
- 可重试机制
- 实际使用 Redis 的 Pub/Sub 监听锁释放通知。
- 获取失败后订阅通知,收到释放通知后重试,避免忙等待。
- 重试必须在waitTime之内
- WatchDog 续约逻辑
- 默认
releaseTime
(即lockWatchdogTimeout
)为 30秒,续约间隔为 10秒(30/3)。 - 仅在未显式设置
leaseTime
时生效 - watchdog是在当显示的使用unlock释放锁或持有锁的jvm线程崩溃时关闭。
- 默认
5.MultiLock原理(联合锁)
如图为主从同步时会发生的问题,当客户端向主节点成功获取锁后,主从准备同步数据时,主机宕机导致锁数据失效,此时会出现并发问题。
为了解决这个问题,第一个想法是取消主从机制,转而改为多节点的主机,此时我们需要向每个节点获取锁,都保存锁信息才算获取成功。
如果还想使其的可用性更高,可以给每个节点建立主从机制。此时当一个节点宕机时便不会出现主从数据同步导致的锁数据丢失问题。
当第一个节点宕机,此时是可以在其子节点中拿到锁,但是在其他两个节点无法获取到锁。也就是只要有一个节点存活,就无法获取到锁,则不会出现锁失效问题。这种方案保留了主从同步机制,保证了集群的稳定性和高可用性,解决了主从同步的锁失效问题,这种方案在Redis中叫做MultiLock原理。
注意multilock根据自己的需求来决定是否使用主从机制,不使用则为刚刚提到的多节点主机方式。
这里简单演示使用过程:
-
配置三个可用的redis客户端
@Configuration public class RedissionConfig {@Beanpublic RedissonClient redissionClient(){//配置redisson客户端Config config = new Config();//useSingleServer()单机模式config.useSingleServer().setAddress("redis://地址").setPassword("密码");//返回配置return Redisson.create(config);}@Beanpublic RedissonClient redissionClient2(){//配置redisson客户端Config config = new Config();//useSingleServer()单机模式config.useSingleServer().setAddress("redis://地址").setPassword("密码");//返回配置return Redisson.create(config);}@Beanpublic RedissonClient redissionClient3(){//配置redisson客户端Config config = new Config();//useSingleServer()单机模式config.useSingleServer().setAddress("redis://地址").setPassword("密码");//返回配置return Redisson.create(config);} }
-
使用redission中的multilock
@Slf4j @SpringBootTest class RedissonTest {@Resourceprivate RedissonClient redissonClient;private RLock lock;@BeforeEachvoid setUp() {RLock lock1 = redissonClient.getLock("order");RLock lock2 = redissonClient.getLock("order");RLock lock3 = redissonClient.getLock("order");//创建连锁lock=redissonClient.getMultiLock(lock1, lock2, lock3);}@Testvoid method1() throws InterruptedException {// 尝试获取锁boolean isLock = lock.tryLock(1L, TimeUnit.SECONDS);if (!isLock) {log.error("获取锁失败 .... 1");return;}try {log.info("获取锁成功 .... 1");method2();log.info("开始执行业务 ... 1");} finally {log.warn("准备释放锁 .... 1");lock.unlock();}}void method2() {// 尝试获取锁boolean isLock = lock.tryLock();if (!isLock) {log.error("获取锁失败 .... 2");return;}try {log.info("获取锁成功 .... 2");log.info("开始执行业务 ... 2");} finally {log.warn("准备释放锁 .... 2");lock.unlock();}} }
此时大家可以直接使用提供的演示代码测试来观察获取锁释放锁的过程,如果想要更加了解底层代码,可以进入trylock的multilock实现学习,这里不过多赘述。
到这里相信大家已经对基于redis所实现的分布式锁的原理有了不错的了解,觉得不错的话点个关注,我们一起学习!