欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 健康 > 养生 > 三种分布式锁的实现方式

三种分布式锁的实现方式

2024/10/24 4:13:17 来源:https://blog.csdn.net/dream_xin2013/article/details/143087994  浏览:    关键词:三种分布式锁的实现方式

要想共享资源的更新不存在并发问题,在更新前要对共享资源加锁保护起来,在更新过程中只能有一个线程操作。如果服务是单机部署,可以使用JVM提供的 java.util.concurrent.locks.Locksynchronized 关键字保护;对于集群部署的服务,那么JVM提供的锁机制就不能满足需求,这时候就要使用分布式锁保证集群中一个服务的一个线程对数据进行修改。
JVM中的锁,是通过判断线程是否获取到某个标记资源。如果得到就认为加锁成功,否则加锁失败。那么分布式锁的实现也是类似的,只不过这个标记资源单独出来一个服务,可以被其他所有服务访问,这个单独的服务可以由自己编码实现,也可以使用现有的技术,业界比较常用的几种实现方式有:基于数据库、zookeeper、redis三种实现方式,性能也是从低到高。

一、基于数据库

使用数据库实现分布式锁,主要是通过表的唯一索引不能重复的特性,保证某个key在表中只能存在一个,某个key插入表成功就获取到锁,删除这条数据就释放锁。
实现数据库分布式锁的建表语句:

CREATE TABLE `t_distributed_lock`  (`id` bigint NOT NULL AUTO_INCREMENT COMMENT '主键ID',`lock_key` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '锁的键',`lock_val` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL COMMENT '锁的值',`timeout` int NULL DEFAULT NULL COMMENT '锁超时时间,单位毫秒',`create_time` datetime NULL DEFAULT NULL COMMENT '创建时间',`remove_time` datetime NULL DEFAULT NULL COMMENT '移除时间',PRIMARY KEY (`id`) USING BTREE,UNIQUE INDEX `lock_key`(`lock_key` ASC) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 1 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_0900_ai_ci ROW_FORMAT = DYNAMIC;

操作数据库的sql语句接口:

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="org.xingo.lock.mapper.LockMapper"></mapper>
import org.apache.ibatis.annotations.*;import java.sql.Timestamp;/**
* @Author xingo
* @Date 2024/10/12
*/
@Mapper
public interface LockMapper {@Insert("insert into t_distributed_lock(lock_key, lock_val, timeout, create_time) " +"values(#{key}, #{val}, -1, now())")int lock1(@Param(value = "key") String key, @Param(value = "val") String val);@Insert("insert into t_distributed_lock(lock_key, lock_val, timeout, create_time, remove_time) " +"values(#{key}, #{val}, #{timeout}, now(), #{removeTime})")int lock2(@Param(value = "key") String key, @Param(value = "val") String val,@Param(value = "timeout") int timeout, @Param(value = "removeTime") Timestamp removeTime);@Delete("delete from t_distributed_lock where lock_key = #{key} and lock_val = #{val}")int unlock(@Param(value = "key") String key, @Param(value = "val") String val);@Delete("delete from t_distributed_lock where remove_time < now()")int removeLock();@Select("select id from t_distributed_lock where lock_key = #{key}")Long existsLock(@Param(value = "key") String key);
}

使用数据库实现锁,需要自定义一套锁操作的服务接口:

/**
* @Author xingo
* @Date 2024/10/12
*/
public interface LockService {/*** 获取锁** @param key 锁的键* @return*/boolean lock(String key);/*** 获取锁并指定锁超时时间** @param key     锁的键* @param timeout 锁超时时间* @return*/boolean lock(String key, int timeout);/*** 获取锁并指定等待时间和锁超时时间** @param key      锁的键* @param waitTime 锁等待时间* @param timeout  锁超时时间* @return*/boolean lock(String key, int waitTime, int timeout);/*** 释放锁** @param key 锁的键* @return*/boolean unlock(String key);
}

接口的实现类:

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.xingo.lock.mapper.LockMapper;import java.sql.Timestamp;
import java.util.UUID;
import java.util.concurrent.TimeUnit;/**
* @Author xingo
* @Date 2024/10/12
*/
@Slf4j
@Service
public class MysqlLockService implements LockService {ThreadLocal<String> lockVal = new ThreadLocal<>();@Autowiredprivate LockMapper lockMapper;@Autowiredprivate DelayClearLock delayClearLock;private String getLockVal() {String val = Thread.currentThread().getName() + "|" +UUID.randomUUID().toString().replace("-", "");lockVal.set(val);return val;}@Overridepublic boolean lock(String key) {if (key == null) {throw new RuntimeException("lock key is not null");}String val = getLockVal();while (true) {Long id = lockMapper.existsLock(key);if (id == null) {try {lockMapper.lock1(key, val);break;} catch (Exception e) {
//                log.error("获取锁异常", e);}}try {TimeUnit.MILLISECONDS.sleep(10);} catch (InterruptedException e) {log.error("等待异常", e);}}return true;}@Overridepublic boolean lock(String key, int timeout) {if (key == null) {throw new RuntimeException("lock key is not null");}if (timeout <= 0) {return false;}String val = getLockVal();while (true) {Long id = lockMapper.existsLock(key);if (id == null) {try {Timestamp removeTime = new Timestamp(System.currentTimeMillis() + timeout);lockMapper.lock2(key, val, timeout, removeTime);delayClearLock.add(key, val, timeout, TimeUnit.MILLISECONDS);break;} catch (Exception e) {
//                log.error("获取锁异常", e);}}try {TimeUnit.MILLISECONDS.sleep(10);} catch (InterruptedException e) {log.error("等待异常", e);}}return true;}@Overridepublic boolean lock(String key, int waitTime, int timeout) {if (key == null) {throw new RuntimeException("lock key is not null");}if (timeout <= 0 || waitTime <= 0) {return false;}String val = getLockVal();long t = System.currentTimeMillis();int cnt = 0;while (cnt < waitTime) {Long id = lockMapper.existsLock(key);if (id == null) {try {Timestamp removeTime = new Timestamp(System.currentTimeMillis() + timeout);lockMapper.lock2(key, val, timeout, removeTime);delayClearLock.add(key, val, timeout, TimeUnit.MILLISECONDS);return true;} catch (Exception e) {
//                log.error("获取锁异常", e);}}try {TimeUnit.MILLISECONDS.sleep(10);} catch (InterruptedException e) {log.error("等待异常", e);}cnt = (int) (System.currentTimeMillis() - t);}return false;}@Overridepublic boolean unlock(String key) {String val = lockVal.get();if (val == null) {return false;}return lockMapper.unlock(key, val) > 0;}
}

上面定义的锁有一个有效时长,如果锁在一定时间内没有释放就要自动清理,这里选择java提供的延迟队列实现这个清理的任务:

import jakarta.annotation.PostConstruct;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.xingo.lock.mapper.LockMapper;import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;/**
* 事件延时队列
*
* @Author xingo
* @Date 2024/3/29
*/
@Component
@Slf4j
public class DelayClearLock {/*** 延迟队列*/private final DelayQueue<DelayEvent> queue = new DelayQueue<>();@Autowiredprivate LockMapper lockMapper;/*** 将延迟执行的事件加入队列* @param event     处理事件对象*/public void add(DelayEvent event) {try {// 添加数据到延迟队列this.queue.add(event);log.info("添加延迟清理锁任务|{}|{}|{}", event.getTriggerTime(), event.getKey(), event.getVal());} catch (Exception e) {log.error("error", e);}}/*** 添加延迟消息* @param key       锁的键* @param val       锁的值* @param timeout   超时时间* @param unit      时间类型*/public void add(String key, String val, int timeout, TimeUnit unit) {DelayEvent event = new DelayEvent(key, val, timeout, unit == null ? TimeUnit.MILLISECONDS : unit);this.add(event);}/*** 开启一个线程异步消费到期数据*/@PostConstructpublic void run() {new Thread(() -> {while (true) {try {// 阻塞等待数据,有数据时会被唤醒DelayEvent event = queue.take();int unlock = lockMapper.unlock(event.getKey(), event.getVal());log.info("消费延迟清理锁数据|{}|{}|{}|{}", event.getTriggerTime(), event.getKey(), event.getVal(), unlock);} catch (Exception e) {log.error("error", e);}}}, "delay-consumer").start();}@Datapublic static class DelayEvent implements Delayed {/*** 触发时间戳*/private long triggerTime;/*** 锁的键*/private String key;/*** 锁的值*/private String val;/*** 构建延迟消息事件* @param key       锁的键* @param val       锁的值* @param timeout   触发时间:当前时间距离触发事件的剩余时间* @param unit      时间格式*/public DelayEvent(String key, String val, int timeout, TimeUnit unit) {this.key = key;this.val = val;this.triggerTime = unit.toMillis(timeout) + System.currentTimeMillis();}/*** 构建延迟消息事件* @param key           锁的键* @param val           锁的值* @param triggerTime   触发时间:事件发生的毫秒时间戳*/public DelayEvent(String key, String val, long triggerTime) {this.key = key;this.val = val;this.triggerTime = triggerTime;}/*** 返回延迟事件的剩余时间,如果结果返回0或者负数,表示延迟时间已经到了,事件会被触发* @param unit  数据需要被转换的时间类型* @return*/@Overridepublic long getDelay(TimeUnit unit) {// 通过与当前时间比较来判断是否到达执行时间long diff = this.triggerTime - System.currentTimeMillis();return unit.convert(diff, TimeUnit.MILLISECONDS);}/*** 队列排序比较器,用来对入队数据进行排序,通常都是升序排列:* 当此对象小于、等于或大于指定对象时,返回负整数、零或正整数。* @param obj   被比较对象* @return*/@Overridepublic int compareTo(Delayed obj) {DelayEvent data = (DelayEvent) obj;long diff = this.triggerTime - data.triggerTime;return (diff < 0) ? -1 : ((diff > 0) ? 1 : 0);}}
}

上面的任务可以实现锁的清理,但是如果服务宕机了就有可能导致任务失效,这时就要提供一个定时任务可以定时清理过期的锁,上面的延迟任务可以在锁临近超时时及时清理掉,而这个定时任务只是一个补偿,当任务已经失效了也可以清理掉垃圾锁。
定时清理数据任务:

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.xingo.lock.mapper.LockMapper;/**
* 刷新本地缓存定时任务
*
* @Author xingo
* @Date 2024/10/12
*/
@Slf4j
@Component
@EnableScheduling
public class RemoveLockJob {@Autowiredprivate LockMapper lockMapper;@Scheduled(cron="1/30 * * * * ?")public void run() {int rs = lockMapper.removeLock();log.info("定时任务清理过期的键|{}", rs);}
}
二、基于zookeeper

使用zookeeper实现分布式锁,主要使用zookeeper的 临时顺序节点 特性实现的,在某个节点下创建临时顺序节点,如果发现自己是所有子节点的最小节点,就认为获取锁成功,使用完后删除这个子节点就释放了锁;如果创建的子节点不是所有节点中最小的,说明还没有获取到锁,这时要找到比自己小的那个节点注册事件监听器,监听节点删除事件;当监听到节点被删除了,再次判断自己是不是所有节点中最小的那个,如果是最小节点就获取到了锁;如果已经获取到锁的服务宕机,由于创建的是临时节点,这个节点会自动删除,不需要像数据库实现的分布式锁要定时检测锁过期。
连接zookeeper的客户端选择curator,这是apache下的一个顶级项目,对zk操作进行了一些封装,相比于直接使用zookeeper提供的客户端方便一些,首先封装一个工具类负责连接的创建和获取操作的客户端。

import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.Getter;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;import java.util.concurrent.TimeUnit;/**
* @Author wangxixin
* @Date 2024/10/18
*/
@Component
public class ZkClient {@Value("${zookeeper.url}")private String url;@Getterprivate CuratorFramework client;@PostConstructpublic void init() {client = CuratorFrameworkFactory.builder().connectString(url).sessionTimeoutMs((int) TimeUnit.MINUTES.toMillis(5)).connectionTimeoutMs((int) TimeUnit.SECONDS.toMillis(30)).retryPolicy(new ExponentialBackoffRetry(3000, 5)).build();client.start();}@PreDestroypublic void destroy() {client.close();}
}

因为curator对分布式锁做了封装,所以使用起来很方便:

InterProcessMutex lock = new InterProcessMutex(zkClient.getClient(), key);
try {// 获取锁boolean acquire = lock.acquire(60, TimeUnit.SECONDS);if (acquire) {// 执行被保护的代码// 释放锁try {lock.release();} catch (Exception e) {e.printStackTrace();}}
} catch (Exception e) {e.printStackTrace();
}
三、基于redis

使用redis实现分布式锁,主要是判断键是否存在,如果键存在,那么表示当前的锁被占用,加锁失败;如果键不存在那么就设置键到redis表示获取锁成功,简单的实现redis锁可以使用下面的命令:

set key value PX milliseconds NX

PX :表示键的过期时间,到时间后自动删除这个key
NX :表示键不存在就设置键到redis中,已经存在的键将不会设置

上面的命令可以实现分布式锁,但是在生产中一般也不用自己实现了,一般会选择使用redisson封装好的工具类,它是使用lua表达式,而且锁是hash结构存储的,考虑了很多种使用场景非常成熟的工具,在代码中使用也比较简单:
redisson配置类:

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;/*** @Author xingo* @Date 2024/10/12*/
@Data
@Component
@ConfigurationProperties(prefix = "spring.data.redis")
public class RedissonConfig {// ========= 单节点配置 ===========/*** 服务主机*/private String host;/*** 服务端口号*/private String port;/*** 服务密码*/private String password;/*** 哨兵模式配置*/private Sentinel sentinel;/*** 集群模式*/private Cluster cluster;@Datapublic static final class Sentinel {/*** 监听名称*/private String master;/*** 哨兵节点列表*/private String[] nodes;/*** 哨兵节点连接密码*/private String password;}@Datapublic static final class Cluster {/*** 集群节点列表*/private String[] nodes;}
}

redisson客户端:

import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.ClusterServersConfig;
import org.redisson.config.Config;
import org.redisson.config.SentinelServersConfig;
import org.redisson.config.SingleServerConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @Author xingo* @Date 2024/10/12*/
@Configuration
public class RedissonClientConfig {@Autowiredprivate RedissonConfig redisson;@Beanpublic RedissonClient getRedisson() {Config config = new Config();if (redisson.getHost() != null) {    //单机版SingleServerConfig singleServer = config.useSingleServer();if (redisson.getPassword() != null) {singleServer.setPassword(redisson.getPassword());}singleServer.setAddress("redis://" + redisson.getHost() + ":" + redisson.getPort());} else if (redisson.getCluster() != null) {    // 集群版ClusterServersConfig clusterServers = config.useClusterServers();if (redisson.getPassword() != null) {clusterServers.setPassword(redisson.getPassword());}for (String address : redisson.getCluster().getNodes()) {clusterServers.addNodeAddress("redis://" + address);}} else if (redisson.getSentinel() != null) {SentinelServersConfig sentinelServers = config.useSentinelServers();sentinelServers.setMasterName(redisson.getSentinel().getMaster());if (redisson.getPassword() != null) {sentinelServers.setPassword(redisson.getPassword());}if (redisson.getSentinel().getPassword() != null) {    // 哨兵版sentinelServers.setSentinelPassword(redisson.getSentinel().getPassword());}for (String address : redisson.getSentinel().getNodes()) {sentinelServers.addSentinelAddress("redis://" + address);}}return Redisson.create(config);}
}

使用redisson锁也封装的很好:

RLock rlock = redissonClient.getLock(key);
try {boolean tryLock = rlock.tryLock(60, 60, TimeUnit.SECONDS);if (tryLock) {// 执行被保护的代码// 释放锁if (rlock.isHeldByCurrentThread()) {rlock.unlock();}}
} catch (Exception e) {e.printStackTrace();
}

以上就是分布式锁的几种实现方式,对比几种实现方式的性能,我们采用多线程同时对某个字段累加,接口如下:

import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.xingo.lock.config.ZkClient;
import org.xingo.lock.service.LockService;import java.sql.Timestamp;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;/*** @Author xingo* @Date 2024/10/18*/
@Slf4j
@RestController
public class LockController {private long incr = 0;@GetMapping("/no_lock")public String testNolockIncr(int threads) throws Exception {incr = 0;long t = System.currentTimeMillis();CountDownLatch latch = new CountDownLatch(threads);for (int i = 0; i < threads; i++) {new Thread(() -> {for (int j = 0; j < 10; j++) {incr++;}latch.countDown();}, "lock-thread").start();}latch.await();long diff = System.currentTimeMillis() - t;System.out.println("累加结果|" + incr + "|" + diff + "ms");return "ok";}private final Lock lock = new ReentrantLock();@GetMapping("/jvm_lock")public String testJvmIncr(int threads) throws Exception {incr = 0;long t = System.currentTimeMillis();CountDownLatch latch = new CountDownLatch(threads);for (int i = 0; i < threads; i++) {new Thread(() -> {for (int j = 0; j < 10; j++) {lock.lock();try {incr++;} finally {lock.unlock();}}latch.countDown();}, "lock-thread-" + i).start();}latch.await();long diff = System.currentTimeMillis() - t;System.out.println("jvm锁累加结果|" + incr + "|" + diff + "ms");return "ok";}@Autowiredprivate LockService lockService;@GetMapping("/mysql_lock")public String testMySQLIncr(int threads) throws InterruptedException {incr = 0;long t = System.currentTimeMillis();CountDownLatch latch = new CountDownLatch(threads);String key = "mysql-lock";for (int i = 0; i < threads; i++) {new Thread(() -> {for (int j = 0; j < 10; j++) {try {boolean lock = lockService.lock(key);if (lock) {incr++;System.out.println(new Timestamp(System.currentTimeMillis()) + "|" + incr);}} catch (Exception e) {e.printStackTrace();} finally {lockService.unlock(key);}}latch.countDown();}, "lock-thread-" + i).start();}latch.await();long diff = System.currentTimeMillis() - t;System.out.println("数据库锁累加结果|" + incr + "|" + diff + "ms");return "ok";}@Autowiredprivate ZkClient zkClient;@GetMapping("/zk_lock")public String testZookeeperIncr(int threads) throws InterruptedException {incr = 0;long t = System.currentTimeMillis();CountDownLatch latch = new CountDownLatch(threads);String key = "/zklock";for (int i = 0; i < threads; i++) {new Thread(() -> {for (int j = 0; j < 10; j++) {InterProcessMutex lock = new InterProcessMutex(zkClient.getClient(), key);try {boolean acquire = lock.acquire(60, TimeUnit.SECONDS);if (acquire) {incr++;try {lock.release();} catch (Exception e) {e.printStackTrace();}}} catch (Exception e) {e.printStackTrace();}}latch.countDown();}, "lock-thread-" + i).start();}latch.await();long diff = System.currentTimeMillis() - t;System.out.println("zk锁累加结果|" + incr + "|" + diff + "ms");return "ok";}@Autowiredprivate RedissonClient redissonClient;@GetMapping("/redis_lock")public String testRedisIncr(int threads) throws InterruptedException {incr = 0;long t = System.currentTimeMillis();CountDownLatch latch = new CountDownLatch(threads);String key = "redis-lock";for (int i = 0; i < threads; i++) {new Thread(() -> {for (int j = 0; j < 10; j++) {RLock rlock = redissonClient.getLock(key);try {boolean tryLock = rlock.tryLock(60, 60, TimeUnit.SECONDS);if (tryLock) {incr++;if (rlock.isHeldByCurrentThread()) {rlock.unlock();}}} catch (Exception e) {e.printStackTrace();}}latch.countDown();}, "lock-thread-" + i).start();}latch.await();long diff = System.currentTimeMillis() - t;System.out.println("redis锁累加结果|" + incr + "|" + diff + "ms");return "ok";}
}

我这里测试使用了3000个线程请求,不加锁的情况下累加结果不保证安全,发现结果无法预测:
无锁情况
如果使用jvm提供的锁,能够保证结果的安全:
jvm锁
数据库锁性能最差,使用同样的线程数请求,发现竞争资源非常激烈,可能是我的实现方式有问题,大家可以一起讨论,由于无法等待最终结果,我只能每次累加输出一个结果,大家可以看看两次累加之间的时间间隔:
数据库锁
zookeeper性能也不错,但是对于这么大的资源竞争耗时也比较久:
zk锁
性能最好的是redis实现的分布式锁,耗时基本是zookeeper的10倍:
redis锁
通过以上的对比,在资源竞争非常激烈的情况下,性能从高到低依次是:
jvm锁 > redis锁 > zookeeper锁 > 数据库锁

虽然redis性能很好,但是在生产中我们要考虑一旦redis发生主从切换,有可能导致锁丢失的问题,redis官方给出了RedLock方案,但是性能就会大打折扣,如果使用RedLock还不如直接使用zookeeper方案,因为zookeeper天生就是强一致性的,不会因为节点宕机导致数据丢失。

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com