欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 新闻 > 社会 > Redis 实现分布式锁

Redis 实现分布式锁

2024/12/22 0:11:22 来源:https://blog.csdn.net/qq_41893505/article/details/144423114  浏览:    关键词:Redis 实现分布式锁

单实例条件下的分布式锁

-- 加锁操作
-- KEYS[1]: 锁的键(lock_key)
-- ARGV[1]: 当前客户端的标识(client_id)
-- ARGV[2]: 锁的过期时间(毫秒)if (redis.call('EXISTS', KEYS[1]) == 0) then-- 如果锁不存在,则进行加锁redis.call('SET', KEYS[1], ARGV[1])redis.call('PEXPIRE', KEYS[1], ARGV[2])return 1
elseif (redis.call('GET', KEYS[1]) == ARGV[1]) then-- 如果锁已存在且是当前客户端持有的,则续期redis.call('PEXPIRE', KEYS[1], ARGV[2])return 1
else-- 如果锁已存在且不是当前客户端持有的,返回失败return 0
end
-- 解锁操作
-- KEYS[1]: 锁的键(lock_key)
-- ARGV[1]: 当前客户端的标识(client_id)if (redis.call('GET', KEYS[1]) == ARGV[1]) then-- 如果当前客户端持有锁,则解锁redis.call('DEL', KEYS[1])return 1
else-- 如果不是当前客户端持有锁,返回失败return 0
end

使用 Redisson 执行 Lua 脚本

@Service
public class RedisLockService {@Autowiredprivate RedissonClient redissonClient;// 加锁操作public boolean lock(String lockKey, String clientId, long expireTime) {String script ="if (redis.call('EXISTS', KEYS[1]) == 0) then " +"    redis.call('SET', KEYS[1], ARGV[1]); " +"    redis.call('PEXPIRE', KEYS[1], ARGV[2]); " +"    return 1; " +"elseif (redis.call('GET', KEYS[1]) == ARGV[1]) then " +"    redis.call('PEXPIRE', KEYS[1], ARGV[2]); " +"    return 1; " +"else " +"    return 0; " +"end";Long result = (Long) redissonClient.getScript().eval(RScript.Mode.READ_WRITE,script,RScript.ReturnType.INTEGER,java.util.Collections.singletonList(lockKey),clientId,String.valueOf(expireTime));return result != null && result == 1;}// 解锁操作public boolean unlock(String lockKey, String clientId) {String script ="if (redis.call('GET', KEYS[1]) == ARGV[1]) then " +"    redis.call('DEL', KEYS[1]); " +"    return 1; " +"else " +"    return 0; " +"end";Long result = (Long) redissonClient.getScript().eval(RScript.Mode.READ_WRITE,script,RScript.ReturnType.INTEGER,java.util.Collections.singletonList(lockKey),clientId);return result != null && result == 1;}
}

使用 lua 脚本实现可重入分布式锁
加锁脚本需要检测当前锁的持有者是否是同一个客户端。如果是同一个客户端,则增加计数器,否则返回失败。

-- 加锁操作
-- KEYS[1]: 锁的键(lock_key)
-- ARGV[1]: 当前客户端的标识(client_id)
-- ARGV[2]: 锁的过期时间(毫秒)-- 检查锁的持有者
local lockOwner = redis.call('HGET', KEYS[1], 'owner')
if lockOwner == false then-- 如果锁不存在,则初始化锁的持有者和计数器redis.call('HSET', KEYS[1], 'owner', ARGV[1])redis.call('HSET', KEYS[1], 'count', 1)redis.call('PEXPIRE', KEYS[1], ARGV[2])return 1
elseif lockOwner == ARGV[1] then-- 如果锁已存在且是当前客户端持有的,则增加计数器local count = redis.call('HINCRBY', KEYS[1], 'count', 1)redis.call('PEXPIRE', KEYS[1], ARGV[2])return count
else-- 如果锁已存在且不是当前客户端持有的,返回失败return 0
end

解锁脚本需要检测当前锁的持有者是否是当前客户端。如果是,则减少计数器,当计数器减到 0 时,才释放锁。

-- 解锁操作
-- KEYS[1]: 锁的键(lock_key)
-- ARGV[1]: 当前客户端的标识(client_id)-- 检查锁的持有者
local lockOwner = redis.call('HGET', KEYS[1], 'owner')
if lockOwner == ARGV[1] then-- 如果是当前客户端持有锁,则减少计数器local count = redis.call('HINCRBY', KEYS[1], 'count', -1)if count == 0 then-- 如果计数器为 0,则删除锁redis.call('DEL', KEYS[1])else-- 如果计数器不为 0,则更新过期时间redis.call('PEXPIRE', KEYS[1], ARGV[2])endreturn count
else-- 如果不是当前客户端持有锁,返回失败return -1
end

使用 Redisson 执行 Lua 脚本

@Service
public class ReentrantLockService {@Autowiredprivate RedissonClient redissonClient;// 加锁操作public boolean lock(String lockKey, String clientId, long expireTime) {String script ="local lockOwner = redis.call('HGET', KEYS[1], 'owner') " +"if lockOwner == false then " +"    redis.call('HSET', KEYS[1], 'owner', ARGV[1]) " +"    redis.call('HSET', KEYS[1], 'count', 1) " +"    redis.call('PEXPIRE', KEYS[1], ARGV[2]) " +"    return 1 " +"elseif lockOwner == ARGV[1] then " +"    local count = redis.call('HINCRBY', KEYS[1], 'count', 1) " +"    redis.call('PEXPIRE', KEYS[1], ARGV[2]) " +"    return count " +"else " +"    return 0 " +"end";Long result = (Long) redissonClient.getScript().eval(RScript.Mode.READ_WRITE,script,RScript.ReturnType.INTEGER,java.util.Collections.singletonList(lockKey),clientId,String.valueOf(expireTime));return result != null && result > 0;}// 解锁操作public boolean unlock(String lockKey, String clientId, long expireTime) {String script ="local lockOwner = redis.call('HGET', KEYS[1], 'owner') " +"if lockOwner == ARGV[1] then " +"    local count = redis.call('HINCRBY', KEYS[1], 'count', -1) " +"    if count == 0 then " +"        redis.call('DEL', KEYS[1]) " +"    else " +"        redis.call('PEXPIRE', KEYS[1], ARGV[2]) " +"    end " +"    return count " +"else " +"    return -1 " +"end";Long result = (Long) redissonClient.getScript().eval(RScript.Mode.READ_WRITE,script,RScript.ReturnType.INTEGER,java.util.Collections.singletonList(lockKey),clientId,String.valueOf(expireTime));return result != null && result >= 0;}
}

使用 Redisson 库实现分布式锁

@Configuration
public class RedissonConfig {@Beanpublic RedissonClient redissonClient() {Config config = new Config();config.useSingleServer().setAddress("redis://localhost:6379"); // Redis 单实例地址return Redisson.create(config);}
}
@Service
public class DistributedLockService {@Autowiredprivate RedissonClient redissonClient;/*** 加锁操作** @param lockKey 锁的键* @param leaseTime 锁的过期时间(秒)* @return 是否加锁成功*/public boolean lock(String lockKey, long leaseTime) {RLock lock = redissonClient.getLock(lockKey);try {// 尝试加锁,最多等待 10 秒,锁的租约时间为指定的 leaseTimereturn lock.tryLock(10, leaseTime, TimeUnit.SECONDS);} catch (InterruptedException e) {Thread.currentThread().interrupt();return false;}}/*** 解锁操作** @param lockKey 锁的键*/public void unlock(String lockKey) {RLock lock = redissonClient.getLock(lockKey);if (lock.isHeldByCurrentThread()) {lock.unlock();}}
}
  • 使用 Redisson 非常方便,它自动处理了分布式锁的各种边缘情况,例如锁的过期时间、可重入性和网络分区问题。

  • RLock 默认是可重入锁,这意味着同一个客户端(同一线程)可以多次获取同一把锁,所以上面的代码也可以作为可重入锁的代码。

  • 如果业务逻辑涉及多线程或长时间操作,Redisson 的内置锁机制能有效管理锁的状态并防止死锁。

    • 在使用 Redis 设置分布式锁时,如果任务在锁的过期时间快到了但尚未完成,可以采取以下几种策略来处理:
      • 设置合理的过期时间:在任务开始时,根据预估的处理时间合理设置锁的过期时间。如果任务的处理时间通常较长,可以考虑设置一个较长的过期时间,以减少续期的需要。
      • 使用线程守护机制:在业务逻辑执行时,可以启动一个线程或使用定时器,在任务执行过程中,可以定期(如每隔一定时间)检查锁的状态,并在必要时延长锁的过期时间。这可以通过调用 PEXPIRE 命令实现。
      • 使用 Redisson 加锁可以做到自动续期。

集群条件下的分布式锁
使用 Redisson 对 redis 集群进行加锁,具有以下特点:

  • 高可靠性:Redisson 能够在 Redis 集群架构下使用主从复制模式,实现锁的高可靠性。
    • 这里的高可靠性是限于同一个节点的主从节点范围内的,如果加锁的主从节点都挂了话,那么锁就没有了。
  • 易用性:Redisson 封装了复杂的锁定机制,开发者只需要简单的 API 调用即可实现分布式锁。
  • 内置支持:Redisson 具有内置的锁过期机制和防止死锁的机制,避免了由于异常导致的锁未释放问题。
  • 可重入性:Redisson 对 Redis 集群添加的分布式锁是支持可重入性的。这意味着在同一个线程中,获取锁的线程可以多次获取同一个锁,而无需担心死锁或冲突。
    • 这里可重入性的参考维度是客户端,而不是线程

在 Redis 集群架构下,需要配置 Redisson 客户端连接 Redis 集群。可以在 Spring Boot 项目的配置文件中使用 application.yml 进行配置。

spring:redis:cluster:nodes:- 127.0.0.1:6379- 127.0.0.1:6380- 127.0.0.1:6381
redisson:config: |clusterServersConfig:idleConnectionTimeout: 10000connectTimeout: 10000timeout: 3000retryAttempts: 3retryInterval: 1500scanInterval: 2000nodeAddresses:- redis://127.0.0.1:6379- redis://127.0.0.1:6380- redis://127.0.0.1:6381password: nullsubscriptionsPerConnection: 5clientName: nullidleConnectionTimeout: 10000pingTimeout: 1000keepAlive: truetcpNoDelay: truednsMonitoringInterval: 5000

创建一个配置类,初始化 RedissonClient:

@Configuration
public class RedissonConfig {@Beanpublic RedissonClient redissonClient() {Config config = new Config();// 配置 Redis 集群节点config.useClusterServers().addNodeAddress("redis://127.0.0.1:6379", "redis://127.0.0.1:6380", "redis://127.0.0.1:6381").setScanInterval(2000) // 集群节点扫描间隔.setRetryAttempts(3)    // 重试次数.setRetryInterval(1500) // 重试间隔.setConnectTimeout(10000) // 连接超时.setTimeout(3000);     // 命令等待响应超时return Redisson.create(config);}
}

使用 Redisson 分布式锁

@Service
public class MyService {@Autowiredprivate RedissonClient redissonClient;public void doSomethingWithLock() {// 获取可重入锁RLock lock = redisson.getLock("myReentrantLock");try {// 尝试获取锁,等待最多 10 秒,锁定时间为 5 秒if (lock.tryLock(10, 5, TimeUnit.SECONDS)) {try {// 第一次加锁成功,执行业务逻辑System.out.println("Lock acquired for the first time!");// 再次获取同一把锁(可重入)lock.lock();System.out.println("Lock acquired again (reentrant)!");// 业务逻辑处理// ...} finally {// 释放锁(可重入锁的计数器递减)lock.unlock();System.out.println("Lock released once!");// 再次释放锁lock.unlock();System.out.println("Lock fully released!");}}} catch (InterruptedException e) {e.printStackTrace();}}
}

可以使用 Lua 脚本在 Redis 集群中实现分布式锁。Lua 脚本在 Redis 中是原子执行的,因此能够有效地确保分布式锁的原子性和一致性。通过 Lua 脚本,可以精确控制锁的获取和释放逻辑,从而避免在高并发环境下的竞态条件。

不过在 Redis 集群架构下,使用 Lua 脚本进行分布式锁需要特别注意一些事项,因为 Redis 集群的数据是分片存储的,每个键可能位于不同的分片节点上。这会导致以下几点挑战:

  • Redis 集群不支持跨节点事务:在 Redis 集群模式下,如果 Lua 脚本涉及多个键,这些键必须位于同一个分片上才能保证 Lua 脚本的原子性执行。
  • 哈希标签(Hash Tag)解决方案:为了确保多个键位于同一分片,可以使用 哈希标签。哈希标签是指在键名中使用 {} 包裹的部分,Redis 会对这个部分进行一致哈希计算,将具有相同哈希标签的键存储在同一个分片上。

Redis (单例或者集群)添加可重入性分布式锁时,参考维度通常是客户端,而不是线程。这是因为在 Redis 的上下文中,分布式锁是针对客户端的,而不是针对单个线程。

  • 单线程模型:Redis 是单线程的,所有的命令在一个线程中处理。因此,Redis 处理的所有操作都是在一个事件循环中执行的。即使在多线程的应用程序中,所有对 Redis 的请求实际上是通过同一个客户端连接发送的。
  • 客户端标识:在分布式环境中,锁的持有者通常是一个客户端,而不是单个线程。一个客户端可以有多个线程在操作,但 Redis 只关心哪个客户端持有锁。
  • 锁的设计:在实现可重入锁时,需要一个机制来跟踪锁的持有者。这个持有者应该是客户端的标识符(例如 UUID 或其他唯一标识符),以便确保同一个客户端可以多次获取锁,而不会出现死锁。
  • 多线程和多进程:如果一个客户端的多个线程或进程尝试获取同一个锁,可以通过使用相同的客户端标识符来实现可重入性。这种设计方式允许同一客户端的不同线程安全地获取和释放锁。

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com