跳至主要內容

Redis-实践篇-Redis实现分布式锁

holic-x...大约 18 分钟RedisRedis

Redis-实践篇-Redis实现分布式锁

学习核心

  • Redis 实现分布式锁实践

学习资料

实践版本说明

  • Redis 5.0.14.1
  • Springboot 2.7.6

基于Redis实现分布式锁(JAVA版本)

​ Redis 实现分布式锁核心思路:通过set指令配置相关参数构建分布式锁的特性:SET key value [EX seconds] [PX milliseconds] [NX|XX]

  • 将字符串值 value(确保唯一) 关联到 key ,如果 key 已经持有其他值, SET 就覆写旧值,无视类型。

  • TTL 设置key的过期时间

    • EX second :设置键的过期时间为 second 秒。 SET key value EX second 效果等同于 SETEX key second value
    • PX millisecond :设置键的过期时间为 millisecond 毫秒。 SET key value PX millisecond 效果等同于 PSETEX key millisecond value
    • NX :只在键不存在时,才对键进行设置操作。 SET key value NX 效果等同于 SETNX key value
    • XX :只在键已经存在时,才对键进行设置操作
  • 加锁:**SET key value [PX milliseconds] [NX]**命令

    • 如果key不存在,设置value,并设置过期时间(加锁成功)
    • 如果已经存在lock(也就是有客户端持有锁了),则设置失败(加锁失败)
  • 解锁:del 命令,通过删除键释放锁

    • 释放锁之后,其他客户端可以通过set命令进行加锁

1.编程式分布式锁

加解锁代码示例

public void doSomething() {
    try {
        // 上锁
        redisLock.lock();
        // 处理业务
        ...
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
      // 释放锁
      redisLock.unlock();
    }
}

构建参考

构建springboot项目,引入redis相关依赖

<!--引入Jedis依赖-->
<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
    <!-- <version>4.2.0</version> -->
    <version>2.9.3</version>
</dependency>

加锁设置的参数(锁的属性配置)

/**
 * 加锁设置的参数
 */
@Data
public class LockParam {
    // 锁的key
    private String lockKey;
    // 尝试获得锁的时间(单位:毫秒),默认值:3000毫秒
    private Long tryLockTime;
    // 尝试获得锁后,持有锁的时间(单位:毫秒),默认值:5000毫秒
    private Long holdLockTime;

    // 构造函数:设定锁键值
    public LockParam(String lockKey){
        this(lockKey,1000*3L,1000*5L);
    };

    public LockParam(String lockKey,Long tryLockTime){
        this(lockKey,tryLockTime,1000*5L);
    };

    public LockParam(String lockKey,Long tryLockTime,Long holdLockTime){
        this.lockKey = lockKey;
        this.tryLockTime = tryLockTime;
        this.holdLockTime = holdLockTime;
    };
}

分布式锁操作类

/**
 * redis分布式锁
 */
@Slf4j
public class RedisLock {

    // 锁key的前缀
    private final static String prefix_key = "redisLock:";
    // 释放锁的lua脚本
    private final static String unLockScript = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
    // 执行unLockScript脚本,释放锁成功值
    private final static Long unLockSuccess = 1L;

    // 加锁设置的参数(key值、超时时间、持有锁的时间)
    private LockParam lockParam;
    // 尝试获得锁的截止时间【lockParam.getTryLockTime()+System.currentTimeMillis()】
    private Long tryLockEndTime;
    // redis加锁的key
    private String redisLockKey;
    // redis加锁的value
    private String redisLockValue;
    // redis加锁的成功标示
    private Boolean holdLockSuccess = Boolean.FALSE;

    // jedis实例
    private Jedis jedis;

    // 获取jedis实例
    private Jedis getJedis() {
        return this.jedis;
    }

    // 关闭jedis
    private void closeJedis(Jedis jedis) {
        jedis.close();
        jedis = null;
    }

    public RedisLock(LockParam lockParam) {
        if (lockParam == null) {
            throw new RuntimeException("lockParam is null");
        }
        if (lockParam.getLockKey() == null || lockParam.getLockKey().trim().isEmpty()) {
            throw new RuntimeException("lockParam lockKey is error");
        }
        this.lockParam = lockParam;

        this.tryLockEndTime = lockParam.getTryLockTime() + System.currentTimeMillis();
        this.redisLockKey = prefix_key.concat(lockParam.getLockKey());
        this.redisLockValue = UUID.randomUUID().toString().replaceAll("-", "");

        // todo 自定义获取Jedis实例的实现
        jedis = new Jedis("127.0.0.1", 6379);
    }

    /**
     * 加锁
     *
     * @return 成功返回true,失败返回false
     */
    public boolean lock() {
        while (true) {
            // 判断是否超过了,尝试获取锁的时间
            if (System.currentTimeMillis() > tryLockEndTime) {
                return false;
            }
            // 尝试获取锁
            holdLockSuccess = tryLock();
            if (Boolean.TRUE.equals(holdLockSuccess)) {
                return true;//获取锁成功
            }

            try {
                // 获得锁失败,休眠50毫秒再去尝试获得锁,避免一直请求redis,导致redis cpu飙升
                Thread.sleep(50);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 执行一次加锁操作:成功返回true 失败返回false
     *
     * @return 成功返回true,失败返回false
     */
    private boolean tryLock() {
        try {
            // todo 指令版本确认
            String result = getJedis().set(redisLockKey, redisLockValue, "NX", "PX", lockParam.getHoldLockTime()); // jedis 2.9.3 版本
            if ("OK".equals(result)) {
                return true;
            }
        } catch (Exception e) {
            log.warn("tryLock failure redisLockKey:{} redisLockValue:{} lockParam:{}", redisLockKey, redisLockValue, lockParam, e);
        }
        return false;
    }

    /**
     * 解锁
     *
     * @return 成功返回true,失败返回false
     */
    public Boolean unlock() {
        Object result = null;
        try {
            // 获得锁成功,才执行lua脚本
            if (Boolean.TRUE.equals(holdLockSuccess)) {
                // 执行Lua脚本
                result = getJedis().eval(unLockScript, Collections.singletonList(redisLockKey), Collections.singletonList(redisLockValue));
                if (unLockSuccess.equals(result)) {// 释放成功
                    return true;
                }
            }
        } catch (Exception e) {
            log.warn("unlock failure redisLockKey:{} redisLockValue:{} lockParam:{} result:{}", redisLockKey, redisLockValue, lockParam, result, e);
        } finally {
            this.closeJedis(jedis);
        }
        return false;
    }
}

分布式锁测试demo

/**
 * Redis 实现分布式锁 使用案例
 */
@Slf4j
public class RedisLockDemo {
    // 设置锁键 lockKey
    static String lockKey = "666";
    public static void main(String[] args) throws InterruptedException {
        // 测试1:验证分布式锁的互斥性、安全性
        log.info("测试:测试两个线程同时抢占锁的结果");
        Thread thread1 = new Thread(()->{
            // 尝试获得锁时间2秒,获得锁成功后持有锁时间10秒,模拟业务代码执行时间5秒
            testRedisLock(2000L,1000*10L,1000*5L);
        });
        thread1.setName("线程1");
        Thread thread2 = new Thread(()->{
            // 尝试获得锁时间2秒,获得锁成功后持有锁时间10秒,模拟业务代码执行时间5秒
            testRedisLock(2000L,1000*10L,1000*5L);
        });
        thread2.setName("线程2");

        // 同时启动线程
        thread1.start();
        thread2.start();

        // 沉睡一段时间,确保上述所有业务执行完成
        Thread.sleep(1000*20);
        log.info("");
        log.info("---------------------------------分割线--------------------------------");
        log.info("");

        // 测试2:验证分布式锁的对称性(谁申请谁释放)
        log.info("测试:验证一个线程获取锁成功后,由于业务执行时间超过了设置持有锁的时间,是否会把其他线程持有的锁给释放掉");
        Thread thread3 = new Thread(()->{
            // 业务执行时间超过锁的持有时间,业务执行过程中锁过期
            testRedisLock(1000*2L,1000*2L,1000*10L);
        });
        thread3.setName("线程3");
        thread3.start();

        Thread.sleep(1000*1);// 多暂停一秒是为了让线程3获取到锁
        Thread thread4 = new Thread(()->{
            // 线程4
            testRedisLock(2000L,1000*20L,1000*15L);
        });
        thread4.setName("线程4");
        thread4.start(); // 线程4启动,此时线程3持有的锁被释放,线程4可以正常获取锁
    }

    /**
     * 测试获取Redis分布式锁
     */
    public static void testRedisLock(Long tryLockTime,Long holdLockTime,Long businessTime){
        LockParam lockParam = new LockParam(lockKey);
        lockParam.setTryLockTime(tryLockTime);// 尝试获得锁时间设定
        lockParam.setHoldLockTime(holdLockTime);// 获得锁成功后持有锁时间设定
        RedisLock redisLock = new RedisLock(lockParam);
        try {
            Boolean lockFlag = redisLock.lock();
            log.info("加锁结果:{}",lockFlag);
            if(lockFlag){
                try {
                    // 模拟处理业务代码时间
                    Thread.sleep(businessTime);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }catch (Exception e) {
            log.info("testRedisLock e---->",e);
        }finally {
            boolean unlockResp = redisLock.unlock();
            log.info("释放锁结果:{}",unlockResp);
        }
    }
}

测试结果

# output
15:45:02.043 [main] INFO com.noob.redis.lock.RedisLockDemo - 测试:测试两个线程同时抢占锁的结果
15:45:02.167 [线程1] INFO com.noob.redis.lock.RedisLockDemo - 加锁结果:true
15:45:04.116 [线程2] INFO com.noob.redis.lock.RedisLockDemo - 加锁结果:false
15:45:04.116 [线程2] INFO com.noob.redis.lock.RedisLockDemo - 释放锁结果:false
15:45:07.178 [线程1] INFO com.noob.redis.lock.RedisLockDemo - 释放锁结果:true
15:45:22.079 [main] INFO com.noob.redis.lock.RedisLockDemo - 
15:45:22.079 [main] INFO com.noob.redis.lock.RedisLockDemo - ---------------------------------分割线--------------------------------
15:45:22.079 [main] INFO com.noob.redis.lock.RedisLockDemo - 
15:45:22.079 [main] INFO com.noob.redis.lock.RedisLockDemo - 测试:验证一个线程获取锁成功后,由于业务执行时间超过了设置持有锁的时间,是否会把其他线程持有的锁给释放掉
15:45:22.083 [线程3] INFO com.noob.redis.lock.RedisLockDemo - 加锁结果:true
15:45:24.129 [线程4] INFO com.noob.redis.lock.RedisLockDemo - 加锁结果:true
15:45:32.084 [线程3] INFO com.noob.redis.lock.RedisLockDemo - 释放锁结果:false
15:45:39.142 [线程4] INFO com.noob.redis.lock.RedisLockDemo - 释放锁结果:true

​ 结合结果分析,测试1:两个线程同时抢占锁,其抢占的结果并不是局限于哪个线程(可能是线程1也可能是线程2),但必定是遵循互斥性和安全性的原则,同一时刻不会出现多个线程都占用同一把锁。

​ 测试2:线程3获取到锁,但是由于其业务执行时间超过了设置的持有锁的时间,就会导致在线程3执行业务的过程中,锁自动过期失效了。此时线程4启动刚好可以获取到锁,然后执行业务逻辑,但是线程4执行业务逻辑时间较长,在这个时候线程3执行完成需要释放锁(释放失败,因为校验该锁并不归属自身),线程4业务逻辑执行完成,此时锁还没过期,因此可以成功释放锁

实际使用注意问题

​ 假设有这样一个场景: 有一个修改订单状态的接口,订单状态修改为失败,就不允许修改为其他状态了;在单台机器上,在代码方法上加了synchronized来做并发控制,由于代码逻辑比较复杂,假设目前它的TPS是1,一秒就只能处理一个订单。后面对这个系统做集群,部署了一百台,那么这个接口性能就提升了100倍了。但是synchronized是进程级别的锁,在集群环境下synchronized没办法控制其他服务器下线程并发访问临界代码,因此采用了分布式锁来做并发控制

​ 基于上述构建的分布式锁,需要注意其在实际场景中的应用问题。可以结合下述几个方面进行分析:

锁设置:锁粒度控制(lockKey设置)

​ 如果分布式锁的key 设置的是 redisLock:updateOrderStatus 相当于集群下对这个接口加了相同的一把大锁,按照上面那个场景TPS就变成1了,集群部署就浪费了。而实际上锁针对的不是整个接口的控制,而是针对修改订单这一操作,因此可以将锁粒度进行细化,让其对某个订单进行加锁(可以理解为将数据库表锁转化为行锁),让接口支持更高的并发场景。

​ 例如可以将分布式的key设置为:redisLock:updateOrderStatus:{orderCode} ,{orderCode}执行的时候动态的替换成订单编号,则锁粒度就控制到这条订单

锁设置:获取锁的时间(tryLockTime)

​ 此处tryLockTime的设置可以理解为一种重试机制的设计,在设定的tryLockTime范围内,线程会一直不断地尝试获取锁(尝试加锁)直至成功。如果这段时间没有获取成功,则最终认为加锁失败。

​ 因此针对tryLockTime的设置要结合不同的业务,衡量大概需要设置多长的时间。

  • 如果设置太短:可能用户多次操作,频繁获取锁失败,告知业务执行失败,用户体验不好
  • 如果设置太长:用户等待太久才能得到响应结果

锁设置:持有锁的时间(holdLockTime)

​ holdLockTime 即为指令中锁的过期时间设定

​ 如果锁粒度比较小,时间可以设置长一点,就算 业务代码较复杂 执行比较耗时,对客户的影响也较小 比较容易可以接受

分布式锁的使用

​ 结合上述案例分析,可以看到每次使用一个分布式锁的流程都是清一色的固有流程:

  • 【1】创建分布式锁对象:RedisLock redisLock = new RedisLock(lockParam);
  • 【2】加锁:Boolean lockFlag = redisLock.lock();
  • 【3】执行业务:if(lockFlag)
  • 【4】finally 释放锁:redisLock.unlock();

​ 这个流程中除了步骤【3】其他的步骤都是固化的,它的流程很像Spring的编程式事务,因此可以联想到Spring的声明式事务,思考是否可以通过Spring AOP + 自定义注解的方式来简化分布式锁的使用。

  • 编程式分布式锁:每次实现都要额外编程,与业务代码耦合,存在一定的工作量
  • 声明式分布式锁:无侵入式的横向切入,不会影响业务代码逻辑(但需注意AOP失效的场景)

2.声明式分布式锁

构建参考

​ 基于上述编程式分布式锁实现,进一步优化其使用,构建声明式分布式锁

构建springboot 项目 引入aop依赖

<!-- SpringBoot AOP start -->
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-aop</artifactId>
</dependency>

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-aop</artifactId>
</dependency>
<dependency>
    <groupId>org.aspectj</groupId>
    <artifactId>aspectjweaver</artifactId>
</dependency>
<!-- SpringBoot AOP end -->

启动类开启AOP配置

@EnableAspectJAutoProxy
@SpringBootApplication
public class DemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }

}

自定义注解:DistributionLock、DistributionLockParam

/**
 * 通常和DistributionLockParam 注解配合使用,降低锁粒度
 */
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
public @interface DistributionLock {

    /**
     * 分布式锁key
     */
    String key();

    /**
     * 尝试获得锁的时间(单位:毫秒),默认值:5000毫秒
     *
     * @return 锁key过期时间
     */
    long tryLockTime() default 5000;

    /**
     * 尝试获得锁后,持有锁的时间(单位:毫秒),默认值:60000毫秒
     *
     * @return
     */
    long holdLockTime() default 60000;

    /**
     * 分布式锁key 的分隔符(默认 :)
     */
    String delimiter() default ":";

}
/**
 * 分布式锁参数:用于给DistributionLock用来控制锁粒度
 */
@Target({ElementType.PARAMETER, ElementType.METHOD, ElementType.FIELD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
public @interface DistributionLockParam {
}

AOP实现

/**
 * 切面类 对springboot中aop切面编程
 */
@Aspect
@Component
@Slf4j
@Order(Ordered.HIGHEST_PRECEDENCE)
public class RedisAopAspect {
    public RedisAopAspect(){
        log.info("分布锁 aop init");
    }
    /***
     * 定义切入点
     */
    @Pointcut("execution(@com.noob.redis.aopLock.annotation.DistributionLock * *(..))")
    public void pointCut(){

    }

    @Around(value = "pointCut()")
    public Object aroundMethod(ProceedingJoinPoint pjp) {
        MethodSignature signature = (MethodSignature) pjp.getSignature();
        Method method = signature.getMethod();

        /////////////////////AOP 能取得的信息 start////////////////////////
//        log.info("目标方法名为:{}",pjp.getSignature().getName());
//        log.info("目标方法所属类的简单类名:{}" , pjp.getSignature().getDeclaringType().getSimpleName());
//        log.info("目标方法所属类的类名:{}", pjp.getSignature().getDeclaringTypeName());
//        log.info("目标方法声明类型:{}" , Modifier.toString(pjp.getSignature().getModifiers()));
//        log.info("目标方法返回值类型:{}" , method.getReturnType());
//        //获取传入目标方法的参数
//        Object[] args = pjp.getArgs();
//        for (int i = 0; i < args.length; i++) {
//            log.info("第{}个参数为:{}" ,(i + 1) , args[i]);
//        }
//        log.info("被代理的对象:{}" , pjp.getTarget());
//        log.info("代理对象自己:{}" , pjp.getThis());
        /////////////////////AOP 能取得的信息 end////////////////////////

        //取得注解对象数据
        DistributionLock lock = method.getAnnotation(DistributionLock.class);
        //分布式锁实际的key
        String lockKey = getRealDistributionLockKey(pjp,lock);
        //创建分布式锁对象 start
        LockParam lockParam = new LockParam(lockKey,lock.tryLockTime(),lock.holdLockTime());
        RedisLock redisLock = new RedisLock(lockParam);
        //创建分布式锁对象 end

        //获取锁
        Boolean holdLock = redisLock.lock();
        log.info("lockKey:{}   holdLock:{} ",lockKey,holdLock);
        if(Boolean.FALSE.equals(holdLock)){
            //获取锁失败后,处理返回结果
            return handleAcquireLockFailReturn(pjp);
        }

        try {
            return pjp.proceed();
        } catch (Throwable e) {
            e.printStackTrace();
            throw new RuntimeException(e.getMessage());
        }finally {
            if(redisLock!=null){
                Boolean unlock = redisLock.unlock();
                log.info("释放锁:unlock {}",unlock);
            }
        }
    }

    /**
     * 分布式锁获取失败,处理方法
     * @param pjp
     * @return
     */
    public Object handleAcquireLockFailReturn(ProceedingJoinPoint pjp){
        MethodSignature signature = (MethodSignature) pjp.getSignature();
        Method method = signature.getMethod();
        Class returnType = method.getReturnType();
        // 开发规范:通常项目都有自己的统一的返回对象,Resp.class可以根据自己现有的进行构建
        if(returnType.equals(Resp.class) ){
            log.info("返回值类型 Resp");
            return Resp.buildFail("业务处理繁忙,请稍后重试");
        }

        throw new RuntimeException("获取锁失败");
    }

    /**
     * 加了DistributionLockParam注解参数值,按照顺序返回list
     * @param pjp
     * @return
     */
    public List<Object> getDistributionLockParamList(ProceedingJoinPoint pjp){
        ArrayList<Object> distributionLockParamList = null;
        MethodSignature signature = ((MethodSignature) pjp.getSignature());
        //得到拦截的方法
        Method method = signature.getMethod();
        //获取方法参数注解,返回二维数组是因为某些参数可能存在多个注解
        Annotation[][] parameterAnnotations = method.getParameterAnnotations();
//        log.info("parameterAnnotations:{}",parameterAnnotations);
        //获取全部参数
        Object[] objects = pjp.getArgs();
        for(int i = 0; i < parameterAnnotations.length; i++){
            for(Annotation a: parameterAnnotations[i]){
                if(a.annotationType() == DistributionLockParam.class){
                    //初始化distributionLockParamList
                    if(distributionLockParamList==null){
                        distributionLockParamList = new ArrayList();
                    }
                    //获得参数值
                    Object o = objects[i];
                    distributionLockParamList.add(o);
                }
            }
        }

        return distributionLockParamList;
    }

    /**
     * 加了DistributionLockParam注解参数值,拼接成字符串
     * @param pjp
     * @param lock
     * @return
     */
    public String getDistributionLockParamStr(ProceedingJoinPoint pjp,DistributionLock lock){
        List<Object> distributionLockParamList = getDistributionLockParamList(pjp);
        if(distributionLockParamList!=null && distributionLockParamList.size()>0){
            StringBuffer sb = new StringBuffer();
            for (int i = 0; i < distributionLockParamList.size(); i++) {
                Object param = distributionLockParamList.get(i);
                sb.append(lock.delimiter());
                sb.append(param);
            }
            return sb.toString();
        }
        return "";
    }

    /**
     * 返回分布式锁key完整的key
     * @param pjp
     * @param lock
     * @return
     */
    public String getRealDistributionLockKey(ProceedingJoinPoint pjp,DistributionLock lock){
        String distributionLockParamStr = getDistributionLockParamStr(pjp,lock);
        return lock.key().concat(distributionLockParamStr);
    }

}

Service 层 构建(使用自定义注解引入分布式锁)

public interface IOrderService {
    Resp updateOrder(String orderCode, Integer userId, Integer status);
}
@Slf4j
@Service
public class OrderServiceImpl implements IOrderService {

    @DistributionLock(key = "updateOrderStatus",tryLockTime = 1000)
    @Override
    public Resp updateOrder(@DistributionLockParam String orderCode, Integer userId, Integer status){
        try {
            log.info("updateOrder 处理业务 start");
            // 模拟处理业务3s
            Thread.sleep(1000*3);
            log.info("updateOrder 处理业务 end");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return Resp.buildSuccess("修改订单状态成功");
    }

}

controller 层构建

@Slf4j
@RestController
@RequestMapping(value = "/v1/test")
public class OrderController {

    @Autowired
    IOrderService orderService;

    /*
    // Swagger 配置
    @ApiOperation(value = "修改订单状态")
    @ApiImplicitParams({
            @ApiImplicitParam(name = "orderCode", value = "订单编号", paramType = "query"),
            @ApiImplicitParam(name = "userId", value = "用户ID", paramType = "query"),
            @ApiImplicitParam(name = "status", value = "订单状态 1:未发货 2:已发货 3:完成", paramType = "query"),
    })
     */
    @RequestMapping(value = "/updateOrderStatus", method = RequestMethod.PUT)
    public Resp updateOrderStatus(@RequestParam(value = "orderCode")String orderCode,
                                  @RequestParam(value = "userId")Integer userId,
                                  @RequestParam(value = "status")Integer status){
        log.info("updateOrderStatus reqParam:orderCode:{},userId:{},status:{}",orderCode,userId,status);
        return orderService.updateOrder(orderCode,userId,status);
    }

}

测试

​ 启动DemoApplication,访问接口:http://127.0.0.1:8080/api/v1/test/updateOrderStatus

image-20240727165345638

3.可重入锁(基于Redisson构建的分布式锁)

​ 基于上述构建的分布式锁存在一个问题,即不可重入。因此针对redis的重入锁业界还是有很多解决方案的,当前比较流行的就是采用Redisson。

Redissonopen in new window是Redis官方推荐的Java版的Redis客户端。 它基于Java实用工具包中常用接口,为使用者提供了一系列具有分布式特性的常用工具类。 它在网络通信上是基于NIO的Netty框架,保证网络通信的高性能。 在分布式锁的功能上,它提供了一系列的分布式锁;如:可重入锁(Reentrant Lock)、公平锁(Fair Lock、联锁(MultiLock)、 红锁(RedLock)、 读写锁(ReadWriteLock)等等。

构建参考

构建Springboot 项目,引入redisson相关依赖

<!-- redisson start -->
<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.15.5</version>
</dependency>
<!-- redisson start -->

构建测试Demo

@Slf4j
public class ReentrantLockDemo {
    // 锁
    public static RLock lock;

    static {
        // Redisson需要的配置
        Config config = new Config();
        String node = "127.0.0.1:6379";//redis地址
        node = node.startsWith("redis://") ? node : "redis://" + node;
        SingleServerConfig serverConfig = config.useSingleServer()
                .setAddress(node)
                .setTimeout(3000)//超时时间
                .setConnectionPoolSize(10)
                .setConnectionMinimumIdleSize(10);
        // serverConfig.setPassword("123456");//设置redis密码
        // 创建RedissonClient客户端实例
        RedissonClient redissonClient = Redisson.create(config);
        // 创建redisson的分布式锁
        RLock rLock = redissonClient.getLock("666");
        lock = rLock;
    }

    /**
     * 模拟业务操作
     * @param n
     */
    public void doSomething(int n) {
        try {
            // 进入递归第一件事:加锁
            lock.lock();
            log.info("--------lock()执行后,getState()的值:{} lock.isLocked():{}", lock.getHoldCount(), lock.isLocked());
            log.info("--------递归{}次--------", n);
            if (n <= 2) {
                this.doSomething(++n);
            } else {
                return;
            }
        } finally {
            lock.unlock();
            log.info("--------unlock()执行后,getState()的值:{} lock.isLocked():{}", lock.getHoldCount(), lock.isLocked());
        }
    }

    public static void testRedissonLock() {
        log.info("--------------start---------------");
        ReentrantLockDemo reentrantLockDemo = new ReentrantLockDemo();
        reentrantLockDemo.doSomething(1);
        log.info("执行完doSomething方法 是否还持有锁:{}", ReentrantLockDemo.lock.isLocked());
        log.info("--------------end---------------");
    }

    public static void main(String[] args) {
        testRedissonLock();
    }
}

测试

# output
17:12:58.876 [main] INFO org.redisson.Version - Redisson 3.15.5
17:12:59.318 [redisson-netty-2-13] INFO org.redisson.connection.pool.MasterPubSubConnectionPool - 1 connections initialized for /127.0.0.1:6379
17:12:59.318 [redisson-netty-2-16] INFO org.redisson.connection.pool.MasterConnectionPool - 10 connections initialized for /127.0.0.1:6379
17:12:59.343 [main] INFO com.noob.redis.redisson.ReentrantLockDemo - --------------start---------------
17:12:59.363 [main] INFO com.noob.redis.redisson.ReentrantLockDemo - --------lock()执行后,getState()的值:1 lock.isLocked():true
17:12:59.363 [main] INFO com.noob.redis.redisson.ReentrantLockDemo - --------递归1--------
17:12:59.365 [main] INFO com.noob.redis.redisson.ReentrantLockDemo - --------lock()执行后,getState()的值:2 lock.isLocked():true
17:12:59.366 [main] INFO com.noob.redis.redisson.ReentrantLockDemo - --------递归2--------
17:12:59.367 [main] INFO com.noob.redis.redisson.ReentrantLockDemo - --------lock()执行后,getState()的值:3 lock.isLocked():true
17:12:59.367 [main] INFO com.noob.redis.redisson.ReentrantLockDemo - --------递归3--------
17:12:59.369 [main] INFO com.noob.redis.redisson.ReentrantLockDemo - --------unlock()执行后,getState()的值:2 lock.isLocked():true
17:12:59.371 [main] INFO com.noob.redis.redisson.ReentrantLockDemo - --------unlock()执行后,getState()的值:1 lock.isLocked():true
17:12:59.372 [main] INFO com.noob.redis.redisson.ReentrantLockDemo - --------unlock()执行后,getState()的值:0 lock.isLocked():false
17:12:59.373 [main] INFO com.noob.redis.redisson.ReentrantLockDemo - 执行完doSomething方法 是否还持有锁:false
17:12:59.373 [main] INFO com.noob.redis.redisson.ReentrantLockDemo - --------------end---------------

​ 结合测试结果分析,Redisson支持可重入锁

Redisson 如何实现可重入锁?

​ 可以跟踪lock.lock()代码实现进行分析,发现它最终调用的是org.redisson.RedissonLock#tryLockInnerAsync的方法

<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
                          "if (redis.call('exists', KEYS[1]) == 0) then " +
                          "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                          "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                          "return nil; " +
                          "end; " +
                          "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                          "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                          "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                          "return nil; " +
                          "end; " +
                          "return redis.call('pttl', KEYS[1]);",
                          Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
}

// lua脚本
if (redis.call('exists', KEYS[1]) == 0) then
    redis.call('hincrby', KEYS[1], ARGV[2], 1);
    redis.call('pexpire', KEYS[1], ARGV[1]);
    return 1;
end ;
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
    redis.call('hincrby', KEYS[1], ARGV[2], 1);
    redis.call('pexpire', KEYS[1], ARGV[1]);
    return 1;
end ;
return ;

上述Redis命令梳理如下:

  • exists 查询一个key是否存在

    • EXISTS key [key ...]:返回值(0 如果key不存在;1 如果key存在)
  • hincrby :将hash中指定域的值增加给定的数字

  • pexpire:设置key的有效时间以毫秒为单位

  • hexists:判断field是否存在于hash中

  • pttl:获取key的有效毫秒数

lua脚本传入的参数:

  • KEYS[1] = key的值

  • ARGV[1]) = 持有锁的时间

  • ARGV[2] = getLockName(threadId) ,系统在启动的时候会全局生成的uuid 来作为当前进程的id,加上线程id就是getLockName(threadId),即进程ID+系统ID = ARGV[2]

    protected String getLockName(long threadId) {
        return id + ":" + threadId;
    }	
    

    基于上述代码分析,其使用lua脚本保证多个命令执行的原子性,使用了hash来实现了分布式锁

(1)lua 脚本的加锁流程

第一个if判断

  • 先判断了当前key是否存在(返回值是0说明key不存在,说明没有加锁)
  • hincrby命令是对 ARGV[2] = 进程ID+系统ID 进行原子自增加1
  • 是对整个hash设置过期期间

看第二个if判断

  • 判断field是否存在于hash中,如果存在返回1,返回1说明是当前进程+当前线程ID 之前已经获得到锁了
  • hincrby命令是对 ARGV[2] = 进程ID+系统ID 进行原子自增加1,说明重入次数加1了
  • 再对整个hash设置过期期间

​ 通过断点查看客户端中对应键值的存储情况

image-20240727175258325

(2)解锁代码

解锁代码位于 org.redisson.RedissonLock#unlockInnerAsync

return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                      "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                      "return nil;" +
                      "end; " +
                      "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                      "if (counter > 0) then " +
                      "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                      "return 0; " +
                      "else " +
                      "redis.call('del', KEYS[1]); " +
                      "redis.call('publish', KEYS[2], ARGV[1]); " +
                      "return 1; " +
                      "end; " +
                      "return nil;",
                      Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}

// lua 脚本
if (redis.call('hexists', KEYS[1], ARGV[1]) == ) then
    return nil;
end ;
-- 计算当前可重入次数
local counter = redis.call('hincrby', KEYS[1], ARGV[1], -1);
-- 小于等于  代表可以解锁
if (counter > ) then
    return ;
else
    redis.call('del', KEYS[1]);
    return 1;
end ;
return nil;
  • if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) 判断锁是否存在
  • redis.call('hincrby', KEYS[1], ARGV[3], -1) 加锁次数原子自减
  • if (counter > 0) then redis.call('pexpire', KEYS[1], ARGV[2]); 自减后当前线程还持有锁(counter > 0),更新下锁的过期时间
  • counter > 0 == false 走else逻辑(当前持有锁的线程 都解锁完成,删除该锁 )
  • redis.call('publish', KEYS[2], ARGV[1]); 发一个通知队列,让等待锁的线程 可以去获取锁

解锁的Lua脚本,流程跟Reentrantlock的解锁流程 也是差不多的

如果要采用声明式的可重入分布式锁,相应的在AOP的实现中调整加解锁为Redisson相关操作即可

评论
  • 按正序
  • 按倒序
  • 按热度
Powered by Waline v3.1.3