分布式锁的基本概念
在 Redis 中实现分布式锁的常用方式是通过 SETNX
命令(SET
with NX
option)来设置一个键(key
),这个键代表锁。如果 key
不存在,SETNX
会设置成功,并返回 1
,表示成功获取锁。如果 key
已经存在,SETNX
会返回 0
,表示获取锁失败。
线程号在 Redis 锁中的作用
通常,为了确保在释放锁时只有锁的持有者能够删除这个锁,会在锁的 key
或 value
中保存线程标识符或节点标识符(例如,线程 ID 或唯一的随机值)。这样,在解锁时,可以验证当前操作的线程是否为持有该锁的线程。
你提到的问题分析
1. 关于 key
和线程号
如果你理解的是在锁的 key
中添加线程号,那确实会导致每个线程设置的 key
不一样。例如,假设线程 A 设置的 key
为 lock:threadA
,线程 B 设置的 key
为 lock:threadB
,那么它们各自的 key
是独立的,互不影响。这种情况下,确实每个线程都会成功设置自己的 key
,但这实际上并不是我们想要的分布式锁效果,因为每个线程都能创建自己的锁,而不会有竞争关系。
2. 正确的做法:在 value
中保存线程标识符
通常情况下,锁的 key
是固定的,比如 lock:resource_id
,表示对某个资源的锁,而不同线程或节点通过在 value
中存储唯一的标识符来区分持有锁的线程。
示例:
go复制代码// 线程 A 尝试获取锁
SET lock:resource_id "threadA" NX PX 30000// 线程 B 尝试获取锁
SET lock:resource_id "threadB" NX PX 30000
- 如果线程 A 成功获取锁,那么
lock:resource_id
的value
就是"threadA"
。 - 如果线程 B 尝试获取锁,因为
key
已经存在,操作会失败。
3. 释放锁
当线程 A 需要释放锁时,会先检查 key
的 value
是否等于 "threadA"
,如果是,才能删除锁:
go复制代码if GET lock:resource_id == "threadA" {DEL lock:resource_id
}
这样,只有持有锁的线程才能释放锁,避免其他线程误删。
总结
- 锁的
key
:应固定,表示要锁定的资源。 - 锁的
value
:保存线程或节点的唯一标识符,用于区分持有锁的线程或节点。
在 Redis 分布式锁中,所有线程都应该尝试设置相同的 key
,而区分不同线程的是 value
中的标识符。你提到的问题可能是在理解时将线程标识符放在了 key
中,这样会导致每个线程创建的 key
不一样,从而失去了锁的作用。
在 Go 中结合 Redis 实现一个简单的分布式锁,通常可以使用 Redis 的 SET
命令和一些关键选项,如 NX
(表示仅在键不存在时才设置键)和 PX
(设置键的过期时间,以毫秒为单位)。以下是一个示例代码,演示如何使用 Redis 实现一个分布式锁。
1. 环境准备
首先,确保你已经安装了 Redis,并在 Go 项目中引入了 Redis 的 Go 客户端库 go-redis
。
bash
复制代码
go get github.com/redis/go-redis/v9
2. 实现分布式锁
以下代码展示了如何在 Go 中使用 Redis 实现一个简单的分布式锁。
go复制代码package mainimport ("context""fmt""github.com/redis/go-redis/v9""time"
)var ctx = context.Background()// 尝试获取分布式锁
func acquireLock(client *redis.Client, key string, value string, expiration time.Duration) (bool, error) {// SET key value NX PX expirationresult, err := client.SetNX(ctx, key, value, expiration).Result()if err != nil {return false, err}return result, nil
}// 释放分布式锁
func releaseLock(client *redis.Client, key string, value string) (bool, error) {// Lua脚本保证原子性luaScript := `if redis.call("GET", KEYS[1]) == ARGV[1] thenreturn redis.call("DEL", KEYS[1])elsereturn 0end`result, err := client.Eval(ctx, luaScript, []string{key}, value).Int()if err != nil {return false, err}return result == 1, nil
}func main() {// 创建Redis客户端client := redis.NewClient(&redis.Options{Addr: "localhost:6379", // Redis服务器地址})// 锁的key和valuelockKey := "my_lock"lockValue := "unique_value" // 每个线程应生成唯一的值,通常是UUID或线程ID// 尝试获取锁,设置10秒过期locked, err := acquireLock(client, lockKey, lockValue, 10*time.Second)if err != nil {fmt.Println("Error acquiring lock:", err)return}if locked {fmt.Println("Lock acquired successfully!")// 执行需要保护的操作...// 操作完成后释放锁unlocked, err := releaseLock(client, lockKey, lockValue)if err != nil {fmt.Println("Error releasing lock:", err)return}if unlocked {fmt.Println("Lock released successfully!")} else {fmt.Println("Failed to release lock!")}} else {fmt.Println("Failed to acquire lock, another process might hold it.")}
}
3. 代码说明
- 获取锁 (
acquireLock
):- 使用
SETNX
命令尝试获取锁。如果key
不存在,则设置该key
,同时指定过期时间,确保锁在超时后会自动释放。 - 如果
key
已经存在,SETNX
返回false
,表示获取锁失败。
- 使用
- 释放锁 (
releaseLock
):- 为了防止误释放锁(例如:锁已过期并被其他线程重新获取),我们使用 Lua 脚本保证删除操作的原子性。
- 只有当
key
的value
与当前线程持有的锁的value
一致时,才删除锁。
- 主函数 (
main
):- 创建 Redis 客户端,连接到 Redis 服务器。
- 尝试获取锁并进行保护的操作。
- 完成操作后释放锁。
4. 扩展
在实际应用中,分布式锁可能需要更多功能,例如自动续期、死锁检测等。这些功能可以根据具体需求进行扩展。还可以使用现成的库,如 Redlock 实现更复杂的分布式锁机制。
看门狗机制
分布式锁中的“开门狗机制”是用来解决锁过期时间不足而导致的锁提前释放的问题。开门狗机制可以自动延长锁的有效期,防止在锁持有者还在执行任务时锁被释放,从而避免其他客户端意外获得锁。
为什么需要开门狗机制?
当一个客户端获取分布式锁时,它通常会设置一个锁的过期时间(TTL),以防止锁因为某些意外原因(例如客户端崩溃)而永远不被释放。TTL 确保了即使客户端没有主动释放锁,锁也会在一定时间后自动释放。
然而,任务执行的时间有时可能比预期的要长。如果没有合适的机制,锁的 TTL 到期后,锁会被自动释放,导致其他客户端可能在任务尚未完成时获得锁,进而引发数据一致性问题。 看门狗可以更新所得到期时间
看门狗机制的工作原理
开门狗机制主要包括以下步骤:
- 获取锁并设置初始TTL:客户端获取锁时,设置一个初始的 TTL(例如 10 秒)。
- 定期续约:在锁持有期间,客户端启动一个后台任务(开门狗),定期检查锁的状态。如果客户端依然持有锁,并且任务还在执行,开门狗会延长锁的TTL。例如,每隔一半的TTL时间(例如5秒),将锁的TTL重置为原来的TTL时间(例如10秒)。
- 释放锁:一旦任务完成,客户端主动释放锁,同时停止开门狗。
示例代码
以下是一个简单的示例,演示如何在 Go 中实现带开门狗机制的分布式锁。
go复制代码package mainimport ("context""fmt""github.com/redis/go-redis/v9""time"
)var ctx = context.Background()// 获取分布式锁,带初始TTL
func acquireLock(client *redis.Client, key string, value string, expiration time.Duration) (bool, error) {result, err := client.SetNX(ctx, key, value, expiration).Result()if err != nil {return false, err}return result, nil
}// 续约锁的TTL
func renewLock(client *redis.Client, key string, value string, expiration time.Duration) error {luaScript := `if redis.call("GET", KEYS[1]) == ARGV[1] thenreturn redis.call("PEXPIRE", KEYS[1], ARGV[2])elsereturn 0end`_, err := client.Eval(ctx, luaScript, []string{key}, value, int(expiration.Milliseconds())).Result()return err
}// 释放分布式锁
func releaseLock(client *redis.Client, key string, value string) (bool, error) {luaScript := `if redis.call("GET", KEYS[1]) == ARGV[1] thenreturn redis.call("DEL", KEYS[1])elsereturn 0end`result, err := client.Eval(ctx, luaScript, []string{key}, value).Int()if err != nil {return false, err}return result == 1, nil
}// 开门狗机制,定期续约锁的TTL
func startWatchdog(client *redis.Client, key string, value string, expiration time.Duration, interval time.Duration, stopChan chan bool) {ticker := time.NewTicker(interval)defer ticker.Stop()for {select {case <-ticker.C:// 续约锁的TTLerr := renewLock(client, key, value, expiration)if err != nil {fmt.Println("Error renewing lock:", err)return}fmt.Println("Lock renewed for another", expiration)case <-stopChan:fmt.Println("Watchdog stopped")return}}
}func main() {// 创建Redis客户端client := redis.NewClient(&redis.Options{Addr: "localhost:6379",})// 锁的key和valuelockKey := "my_lock"lockValue := "unique_value"// 尝试获取锁,设置初始TTL为10秒locked, err := acquireLock(client, lockKey, lockValue, 10*time.Second)if err != nil {fmt.Println("Error acquiring lock:", err)return}if locked {fmt.Println("Lock acquired successfully!")// 启动开门狗机制,间隔5秒续约,TTL为10秒stopChan := make(chan bool)go startWatchdog(client, lockKey, lockValue, 10*time.Second, 5*time.Second, stopChan)// 模拟执行任务time.Sleep(15 * time.Second)// 任务完成后释放锁unlocked, err := releaseLock(client, lockKey, lockValue)if err != nil {fmt.Println("Error releasing lock:", err)return}if unlocked {fmt.Println("Lock released successfully!")} else {fmt.Println("Failed to release lock!")}// 停止开门狗stopChan <- true} else {fmt.Println("Failed to acquire lock, another process might hold it.")}
}