hello,大家好,欢迎来到银之庭。我是Z,一个普通的程序员。最近在工作中遇到了需要用分布式锁的场景,我就直接拷贝了其他项目里前人写的一个redis分布式锁拿来用了,大体看了一眼也没啥毛病。不过工作之余,我还是搜索了一下有没有开源的redis分布式锁实现,如果有的话就研究一下,看能不能引入到工作项目中来。在搜到spring-integration-redis项目并仔细研究了它的源码后,我惊讶地发现这个spring下属的开源项目,虽然代码质量挺高,但redis分布式锁的实现逻辑居然是有缺陷的,具体地说就是spring-integration-redis在极端场景下,会直接删除其他线程占用的redis锁。下面我们就来仔细研究下spring-integration-redis的代码,看看这个问题是如何产生的。
PS:以下实验用到了两个项目,可以从我的github上下载到,分别是demo项目和demo2项目。
1. 基本使用
我们先来试用一下spring-integration-redis这个组件。打开上面提到的两个项目,先运行起demo项目,并查看demo项目的com.example.demo.controller.TestNoLock
文件,代码如下:
package com.example.demo.controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.concurrent.CountDownLatch;
@RestController
public class TestNoLock {
private int count;
@RequestMapping("/testNoLock/")
public String test() throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1000);
count = 0;
for (int i = 0; i < 1000; i++) {
new Thread(() -> {
try {
int tempCount = count;
tempCount += 1;
try {
Thread.sleep(1);
} catch (InterruptedException e) {
}
count = tempCount;
}finally {
countDownLatch.countDown();
}
}).start();
}
countDownLatch.await();
System.out.println("final count is: " + count);
return "ok";
}
}
这部分代码模拟的是在不加锁的情况下1000个线程并发修改一个共享变量,为了让效果更明显,并发修改的代码拆分了count++
的内部逻辑,并休眠了1ms(不知道是不是因为我电脑性能太高,如果不休眠的话,这段没加锁的代码运行结果大概率也是count=1000,没有产生并发修改的问题)。效果也很明显,多次访问http://127.0.0.1:9982/testNoLock/
URL,可以在终端看到最终的count的值,我的运行结果如下图:
可以看到产生了明显的并发修改问题。下面我们来使用spring-integration-redis提供的分布式锁避免并发修改问题。
使用spring-integration-redis比较简单,先引入依赖包:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
<version>2.3.4.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
<version>2.3.4.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-redis</artifactId>
<version>5.3.2.RELEASE</version>
</dependency>
然后新建一个配置类往spring容器里添加一个org.springframework.integration.redis.util.RedisLockRegistry
的对象,代码如下:
package com.example.demo.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.integration.redis.util.RedisLockRegistry;
@Configuration
public class RedisLock {
@Bean
public RedisLockRegistry redisLockRegistry(RedisConnectionFactory redisConnectionFactory) {
return new RedisLockRegistry(redisConnectionFactory, "registry_key");
}
}
最后在需要的地方使用RedisLockRegistry
新建锁对象即可,示例在demo项目的com.example.demo.controller.TestLock
文件中,代码如下:
package com.example.demo.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.integration.redis.util.RedisLockRegistry;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.locks.Lock;
@RestController
public class TestLock {
private int count;
@Autowired
private RedisLockRegistry redisLockRegistry; // 自动注入我们创建的redisLockRegistry对象
@GetMapping("/testLock/")
public String testLock() throws InterruptedException {
// 创建锁对象
Lock lock = redisLockRegistry.obtain("lock_key");
CountDownLatch countDownLatch = new CountDownLatch(1000);
count = 0;
for (int i = 0; i < 1000; i++) {
new Thread(() -> {
// 加锁
lock.lock();
try {
int tempCount = count;
tempCount += 1;
try {
Thread.sleep(1);
} catch (InterruptedException e) {
}
count = tempCount;
} finally {
countDownLatch.countDown();
// finally中确保释放锁
lock.unlock();
}
}).start();
}
countDownLatch.await();
System.out.println("final count is: " + count);
return "ok";
}
}
这个接口是起1000个线程并发尝试去获取锁,然后修改共享变量count
。这个接口依赖在本地的6379端口启动的redis服务,启动redis比较简单,这里不再赘述。
访问http://127.0.0.1:9982/testLock/
,可以在终端看到count
的值都是1000,确实防止了并发修改的问题,如下图:
到目前为止这个组件的功能看起来好像是没问题的,但实际上,我们现在只测试了一台实例(一个进程)的情况,这种情况实际上使用的是我称之为“进程锁”的特性,即保证一个进程内不会有多个线程同时进入某个临界区,而没有使用到“分布式锁”的特性。网上很多介绍spring-integration-redis的文章中的例子都只到了这一步,而没有验证到“分布式锁”的特性。可能是混淆了“进程锁“和“分布式锁“的概念,也可能是因为模拟多实例比较麻烦,所以用验证“进程锁”的特性糊弄过去了。而spring-integration-redis实现的“分布式锁”特性是有缺陷的。我们先来研究下它的代码,再来看看缺陷在哪里。
2. 源码分析
点击demo项目代码中的redisLockRegistry.obtain
或lock.lock
或lock.unlock
都可以进入spring-integration-redis的主文件org.springframework.integration.redis.util.RedisLockRegistry
。我们主要分析lock.lock()
, lock.tryLock()
, lock.tryLock(long time, TimeUnit unit)
, lock.unlock()
方法。这几个方法也是实现分布式锁语义的关键方法。
2.1 lock()
首先看lock.lock()
。代码截取如下:
@Override
public void lock() {
this.localLock.lock();
while (true) {
try {
while (!obtainLock()) {
Thread.sleep(100); //NOSONAR
}
break;
}
catch (InterruptedException e) {
/***/
}
catch (Exception e) {
this.localLock.unlock();
rethrowAsLockException(e);
}
}
}
private boolean obtainLock() {
Boolean success = RedisLockRegistry.this.redisTemplate.execute(
RedisLockRegistry.this.obtainLockScript,
Collections.singletonList(this.lockKey),
RedisLockRegistry.this.clientId,
String.valueOf(RedisLockRegistry.this.expireAfter));
boolean result = Boolean.TRUE.equals(success);
if (result) {
this.lockedAt = System.currentTimeMillis();
}
return result;
}
方法中会先尝试调用localLock.lock()
方法,localLock是个本地的重入锁ReentrantLock
,这个本地锁有两个作用,一是直接在本地拦截其他线程的请求,避免每次都请求redis,提高锁性能,二是借助重入锁实现了分布式锁的可重入特性的一部分(另一部分是在发给redis执行的脚本中实现的,如果是重入获取锁,则刷新过期时间),这个localLock的使用还是比较巧妙的,值得学习。接下来是个死循环,不断调用obtainLock()
方法尝试获取锁,如果失败就休眠100ms重试,并忽略了中断异常,遇到其他异常时会释放本地锁,并重新抛出异常。下面主要看下obtainLock()
方法,这个方法就是直接向redis发请求,执行一个lua脚本,防止执行多条redis命令中间被其他实例的线程打断,导致锁处于一个不一致的状态。脚本的意思比较简单,先查一下锁的key,如果存在且value是本线程的,就刷新过期时间,证明本次是个重入获取锁的过程,如果锁的key不存在,则新建锁的key,写入本线程的value,其他情况直接返回失败。
2.2 tryLock()
下面看一下tryLock()
,它直接调用了tryLock(long time, TimeUnit unit)
这个重载方法。我们直接看后面的重载方法。tryLock
实现的是可设置超时时间的尝试获取锁的语义。代码如下:
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
long now = System.currentTimeMillis();
if (!this.localLock.tryLock(time, unit)) {
return false;
}
try {
long expire = now + TimeUnit.MILLISECONDS.convert(time, unit);
boolean acquired;
while (!(acquired = obtainLock()) && System.currentTimeMillis() < expire) { //NOSONAR
Thread.sleep(100); //NOSONAR
}
if (!acquired) {
this.localLock.unlock();
}
return acquired;
}
catch (Exception e) {
this.localLock.unlock();
rethrowAsLockException(e);
}
return false;
}
也是先尝试获取本地的重入锁,如果本地锁获取失败直接返回失败。获取成功后再调用obtainLock()
方法获取redis锁,并循环检查是否到超时时间,如果最终没有获取到redis锁,需要释放本地锁,如果获取到了redis锁,则一直持有本地锁,直到unlock()
方法中释放本地和redis的两个锁。这里有个细节是,传入的时间其实用在两个地方,一是重试获取本地锁的超时时间,二是尝试获取redis锁的超时时间,所以调用tryLock(time, unit)
方法,实际可能等待2*time的时间。一个参数代表两个含义,我也不清楚这个设计是好是坏。
到目前为止都还挺正常的,本地锁的使用也值得我们学习,但后面unlock()
的实现就有些糙了,我们一起来看一下。
2.3 unlock()
unlock()
的代码如下:
@Override
public void unlock() {
if (!this.localLock.isHeldByCurrentThread()) {
throw new IllegalStateException("You do not own lock at " + this.lockKey);
}
if (this.localLock.getHoldCount() > 1) {
this.localLock.unlock();
return;
}
try {
if (!isAcquiredInThisProcess()) {
throw new IllegalStateException("Lock was released in the store due to expiration. " +
"The integrity of data protected by this lock may have been compromised.");
}
if (Thread.currentThread().isInterrupted()) {
RedisLockRegistry.this.executor.execute(this::removeLockKey);
}
else {
removeLockKey();
}
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Released lock; " + this);
}
}
catch (Exception e) {
ReflectionUtils.rethrowRuntimeException(e);
}
finally {
this.localLock.unlock();
}
}
unlock()
方法全程实现的语义类似:如果本地锁不是本线程持有的,直接报错,后续就以本地锁是本线程持有为前提。如果本地锁重入次数大于1,则本次释放只需要把重入次数减一即可,然后直接返回。如果重入次数已经等于1了,就得同时释放redis锁和本地锁了。释放redis锁时先检查redis里锁的value是不是本线程的,如果不是就报错,这时候可能是因为本线程写入redis的锁过期释放了,然后被其他实例的线程重新写入锁了。如果是,再发一条命令删除锁的key,命令可以是unlink
或del
,这不重要。到这可能有同学已经发现问题了,我们准备redis分布式锁的面试题的时候,都会注意说“释放锁要用lua脚本,把检查锁是不是本线程持有和删除锁放到一个lua脚本中,防止高并发时误删其他线程写入的锁”。但这里判断锁是不是本线程持有和删除key却分成了两条redis命令,那么在极端情况下,本线程判断锁是本线程持有的,但在删除命令执行前,key过期了,有其他实例的线程写入了锁,然后本线程却直接删除了锁的key,就会误删其他实例线程写入的锁了(其他实例的线程真是躺着中枪了)。这种情况确实比较极端,不容易复现,下面我们借助debug来拉长unlock()
方法每一步的执行时间,并通过起两个服务来模拟多个实例的情况(实际上是多个进程,进程在同一实例还是不同实例其实没区别)。
3. unlock方法缺陷验证
这时候需要用debug模式启动demo和demo2两个项目,demo2项目的代码和demo几乎一样,只是配置不同。我们用两个项目的controller.TestUnlock
中提供的接口来验证unlock()
方法在极端情况下的缺陷。代码如下:
package com.example.demo.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.integration.redis.util.RedisLockRegistry;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
@RestController
public class TestUnlock {
@Autowired
private RedisLockRegistry redisLockRegistry; // 自动注入我们创建的redisLockRegistry对象
@GetMapping("/testUnlock/")
public String testUnlock() throws InterruptedException {
// 创建锁对象
Lock lock = redisLockRegistry.obtain("lock_key");
lock.tryLock(60, TimeUnit.SECONDS);
try {
Thread.sleep(10);
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
return "ok";
}
}
我们把断点打到demo2的lock.unlock()
方法内的removeLockKey()
方法那一行,然后调用demo2的接口http://127.0.0.1:9983/testUnlock/
,如果执行到断点的地方,证明redis锁已经写入成功了,并且已经在尝试释放锁了,我们可以去redis里验证一下,效果如下图:
现在我们什么都不做,等待redis里这个key过期,然后把断点打到demo项目的lock.unlock()
方法那一行,调用demo的接口http://127.0.0.1:9982/testUnlock/
,尝试获取锁,这时候会获取成功。redis里的锁的value已经变成demo线程的clientId了,如下图:
先不要往下执行demo项目,回到demo2,点击继续,会发现demo2直接把demo写入的锁删除掉了,而demo继续执行下去,在解锁时会报Lock was released in the store due to expiration. The integrity of data protected by this lock may have been compromised.
的错,但实际上,它写入的锁并没有超时,而是被demo2给删除了。
以上,我们试用了spring-integration-redis提供的分布式锁功能,并阅读了它的代码,发现了它在实现分布式锁时有“可能删除其他进程写入的锁”的问题。所以,redis分布式锁的实现我并不建议大家用这个组件,我更推荐使用redission这个组件。这个组件也是redis官方推荐的实现。后续有机会我再给大家介绍一下redission的使用。就这样~