欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 汽车 > 时评 > Redission源码详解

Redission源码详解

2024/10/24 4:45:41 来源:https://blog.csdn.net/ludkjava/article/details/141231254  浏览:    关键词:Redission源码详解

环境搭建

redis搭建

在本地搭建个redis,启动redis服务端和客户端。客户端是为了观察redis中锁的信息。

依赖

新建个SpringBoot项目,依赖Redission jar包

<dependency><groupId>org.redisson</groupId><artifactId>redisson-spring-boot-starter</artifactId><version>3.21.1</version>
</dependency>

工具类

@Service
public class RedisUtil {@Autowiredprivate RedissonClient redissonClient;public  boolean tryLock(Long key){RLock rLock= redissonClient.getLock(String.valueOf(key));try {boolean lockResult=rLock.tryLock(10,30, TimeUnit.SECONDS);return lockResult;} catch (InterruptedException e) {e.printStackTrace();return false;}}
}

测试Controller

package com.ludk.weixin.controller;import com.ludk.weixin.common.tool.RedisUtil;
import com.ludk.weixin.entity.Product;
import com.ludk.weixin.service.IProductService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.ui.Model;
import org.springframework.web.bind.annotation.*;import java.util.Collections;@Controller
@Slf4j
@RequestMapping("/product")
public class ProductController {@AutowiredRedisUtil redisUtil;@RequestMapping("/myProduct")@ResponseBodyObject myProduct(@RequestParam("userId") Long userId){long before=System.currentTimeMillis();log.error("获取锁之前:"+before);redisUtil.tryLock(userId);long after=System.currentTimeMillis();System.out.println("获取锁之后:"+after);System.out.println((after-before)/1000);return productService.getMyProduct(userId);}

源码思路分析

首先需要知道Redission实现的分布式锁有哪些特点?

  • 支持锁的超时,避免死锁。通过对Redis key设置超时时间实现。
  • 支持可重入锁,同一个线程可以多次获取锁。Redis中通过存储hash类型,其中一个字段记录次数实现。
  • 当线程A获取到锁执行业务逻辑且未执行完,如果锁超时,可以通过看门狗续锁的时间,避免其他线程获取锁。通过看门狗实现。

源码分析

初始化锁对象

主要是初始化默认锁的释放时间和uuid,下面会用到。

加锁方法

org.redisson.api.RLock#tryLock方法参数如下:

  • long waitTime:当获取锁失败时,等待waitTime时间后返回获取锁失败。
  • long leaseTime:锁释放时间,当获取到锁以后,这个锁多久失效。也就是redis中存储的锁的key的有效时间。
  • TimeUnit unit:上面两个时间的单位。
@Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {//将锁的等待时间转为毫秒数long time = unit.toMillis(waitTime);//当前时间,从现在起算锁的等待时间long current = System.currentTimeMillis();//当前线程id,因为是可重入锁,需要用threadId区别是否是当前线程。long threadId = Thread.currentThread().getId();//获取锁,如果ttl 为null则说明获取锁成功。否则ttl代表这个锁还有多久超时,超时则当前线程有获取到锁的可能Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);// lock acquiredif (ttl == null) {return true;}// System.currentTimeMillis()是当前时间,current是进入tryLock这个方法的时间,time=time-(System.currentTimeMillis() - current)表示还有多少时间获取锁超时,time<=0就说明超时time -= System.currentTimeMillis() - current;if (time <= 0) {//获取锁超时,返回false。acquireFailed(waitTime, unit, threadId);return false;}//记录当前时间current = System.currentTimeMillis();//订阅监听redis消息,并且创建RedissonLockEntry,其中RedissonLockEntry中比较关键的是一个 Semaphore属性对象,用来控制本地的锁请求的信号量同步,返回的是netty框架的Future实现。CompletableFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);try {//添加锁的监听,如果锁释放,会进入监听类,将Semaphore释放// 阻塞等待subscribe的future的结果对象,如果subscribe方法调用超过了time,说明已经超过了客户端设置的最大wait time,则直接返回false,取消订阅,不再继续申请锁了。subscribeFuture.get(time, TimeUnit.MILLISECONDS);} catch (TimeoutException e) {if (!subscribeFuture.completeExceptionally(new RedisTimeoutException("Unable to acquire subscription lock after " + time + "ms. " +"Try to increase 'subscriptionsPerConnection' and/or 'subscriptionConnectionPoolSize' parameters."))) {subscribeFuture.whenComplete((res, ex) -> {if (ex == null) {unsubscribe(res, threadId);}});}acquireFailed(waitTime, unit, threadId);return false;} catch (ExecutionException e) {acquireFailed(waitTime, unit, threadId);return false;}try {//经过上面的计算,time是还多长时间获取锁超时,time已经被减掉一部分了。这里继续把消耗的时间减掉。time -= System.currentTimeMillis() - current;if (time <= 0) {//time<0,获取锁超时,返回false,获取锁失败。acquireFailed(waitTime, unit, threadId);return false;}while (true) {//死循环,有两个点可以调出循环。获取锁成功跳出、获取锁超时(失败)跳出long currentTime = System.currentTimeMillis();ttl = tryAcquire(waitTime, leaseTime, unit, threadId);// lock acquiredif (ttl == null) {return true;}time -= System.currentTimeMillis() - currentTime;if (time <= 0) {acquireFailed(waitTime, unit, threadId);return false;}// waiting for messagecurrentTime = System.currentTimeMillis();//下面的阻塞会在释放锁的时候,通过订阅发布及时获取锁if (ttl >= 0 && ttl < time) {//如果锁的超时时间小于等待时间,通过SemaphorerelaseryAcquire阻塞锁的释放时间commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);} else {//否则,通过Semaphore的tryAcquire阻塞传入的最大等待时间commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);}time -= System.currentTimeMillis() - currentTime;if (time <= 0) {acquireFailed(waitTime, unit, threadId);return false;}}} finally {//finally中取消订阅unsubscribe(commandExecutor.getNow(subscribeFuture), threadId);}
//        return get(tryLockAsync(waitTime, leaseTime, unit));
}

org.redisson.RedissonLock#tryAcquire方法

private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {return get(tryAcquireAsync0(waitTime, leaseTime, unit, threadId));
}

org.redisson.RedissonLock#tryAcquireAsync0方法

    private RFuture<Long> tryAcquireAsync0(long waitTime, long leaseTime, TimeUnit unit, long threadId) {return getServiceManager().execute(() -> tryAcquireAsync(waitTime, leaseTime, unit, threadId));}

org.redisson.RedissonLock#tryAcquireAsync

private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {RFuture<Long> ttlRemainingFuture;if (leaseTime > 0) {//如果leaseTime >0,则传入锁释放时间,ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);} else {//否则用默认的锁释放时间30000msttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);}CompletionStage<Long> s = handleNoSync(threadId, ttlRemainingFuture);ttlRemainingFuture = new CompletableFutureWrapper<>(s);CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {// lock acquiredif (ttlRemaining == null) {if (leaseTime > 0) {internalLockLeaseTime = unit.toMillis(leaseTime);} else {//看门狗续时间,在锁要超时,业务还未执行完这种情况续锁时间。需要注意,leaseTime>0的时候不会有看门狗。scheduleExpirationRenewal(threadId);}}return ttlRemaining;});return new CompletableFutureWrapper<>(f);
}

org.redisson.RedissonLock#tryLockInnerAsync
该方法和redis交互,是锁实现的核心。

<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {return commandExecutor.syncedEval(getRawName(), LongCodec.INSTANCE, command,"if ((redis.call('exists', KEYS[1]) == 0) " +"or (redis.call('hexists', KEYS[1], ARGV[2]) == 1)) then " +"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +"return redis.call('pttl', KEYS[1]);",Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));}

分析这个lua脚本的参数:

  • KEYS[1]:是大key,判断是否有其他线程获取到锁。
  • ARGV[1]:锁的释放时间,这个锁如果不释放,多久会超时。
  • ARGV[2]:uuid+线程id,用于实现可重入锁。

分析lua脚本的逻辑:
if ((redis.call(‘exists’, KEYS[1]) == 0)or (redis.call(‘hexists’, KEYS[1] ,ARGV[2]) == 1))
then
redis.call(‘hincrby’, KEYS[1], ARGV[2], 1);
redis.call(‘pexpire’, KEYS[1], ARGV[1]);
return nil;
end;
return redis.call(‘pttl’, KEYS[1]);

  • redis.call(‘exists’, KEYS[1]) == 0:说明大key不存在,也就是没有其他线程获取锁,可以之间获取锁成功了。
  • redis.call(‘hexists’, KEYS[1] ,ARGV[2]) == 1):redis这大key下,当前线程已经获取到了锁。
  • return redis.call(‘pttl’, KEYS[1]):如果不是上面两种情况,说明其他线程占用锁了,返回这个锁还有多长时间释放。

总结:如果没有其他线程获取锁,或者当前线程已经获取到锁,则锁加上超时时间、当前线程锁的次数加1、然后返回空;否则说明有其他线程已经获取到锁,返回这个锁还有多久超时。

其他

lock、tryLock方法区别,参考这个博客:https://blog.csdn.net/weixin_37684345/article/details/135815110

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com