Redis分布式锁
目录
Redis分布式锁
1. 库存超卖问题重现
下面这段代码在多线程下存在线程安全问题!
超卖的主要原因是下面的步骤不是原子性的。
- 判断库存是否充足。
- 库存扣减
public String deductProduct(String userId) {
int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
log.info("开始扣减库存");
//加锁,分布式锁
if (stock > 0) {
int stockAfterDeduction = stock - 1;
stringRedisTemplate.opsForValue().set("stock", String.valueOf(stockAfterDeduction));
log.info("扣减库存成功,现库存为 {}", stockAfterDeduction);
} else {
log.error("库存不足,现库存为:{}", stock);
}
return "操作成功!";
}
==> 单机情况下,可以加锁(synchronized | ReentrantLock)解决这个线程安全问题。
但是在分布式场景下,必须使用分布式锁解决这个问题。
2. 手写简单分布式锁
set key value nx ex
2.1 可以基于set nx ex实现一个简单的分布式锁
public interface ILock {
/**
* 尝试获取锁
* @param timeoutSecond 锁持有的超时时间,过期后自动删除
* @return 是否加锁成功
*/
boolean tryLock(long timeoutSecond);
/**
* 释放锁
*/
void unLock();
}
2.2 分布式锁-实现加锁和解锁流程
public class SimpleRedisLock implements ILock {
private static final String KEY_PREFIX = "lock:";
//分布式中,集群的标识符uuid,用于表示集群中的每个节点
private static final String ID_PREFIX = UUID.randomUUID().toString(true) + ":";
//解锁的lua脚本,用于防止锁误删
private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;
private final String name; //锁的名字
private final StringRedisTemplate stringRedisTemplate;
public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {
this.name = name;
this.stringRedisTemplate = stringRedisTemplate;
}
static {
//初始化 unlock.lua脚本文件
UNLOCK_SCRIPT = new DefaultRedisScript<>();
UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
UNLOCK_SCRIPT.setResultType(Long.TYPE);
}
@Override
public boolean tryLock(long timeoutSecond) {
//获取线程标识,uuid + thread-id
String threadId = ID_PREFIX + Thread.currentThread().getId();
Boolean isLock = stringRedisTemplate.opsForValue()
//基于set nx ex实现分布式锁
.setIfAbsent(KEY_PREFIX + name, threadId, timeoutSecond, TimeUnit.SECONDS);
return Boolean.TRUE.equals(isLock);
}
@Override
public void unLock() {
//删除锁,要防止锁误删(lua脚本)
stringRedisTemplate.execute(
UNLOCK_SCRIPT,
Collections.singletonList(KEY_PREFIX + name),
ID_PREFIX + Thread.currentThread().getId()
);
}
}
2.3 lua脚本实现
if (redis.call('get', KEYS[1]) == ARGV[1])
then
-- 一致的话,就删除锁
return redis.call('del', KEYS[1])
end
2.4 分布式锁实战
public String deductProduct(@PathVariable String userId) {
ILock lock = new SimpleRedisLock("product", stringRedisTemplate);
//尝试加锁
boolean isLock = lock.tryLock(60);
if (!isLock) {
log.error("线程加锁失败: {}", Thread.currentThread().getId());
return "加锁失败";
}
try {
int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
log.info("开始扣减库存");
if (stock > 0) {
int stockAfterDeduction = stock - 1;
stringRedisTemplate.opsForValue().set("stock", String.valueOf(stockAfterDeduction));
log.info("扣减库存成功,现库存为 {}", stockAfterDeduction);
} else {
log.error("库存不足,现库存为:{}", stock);
}
return "操作成功!";
}finally {
lock.unLock();
}
}
3. Redisson框架
uuid(区分不同的jvm服务)+线程id(区分jvm服务上面的线程)
3.1 导入pom坐标
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.13.6</version>
</dependency>
3.2 配置Redisson客户端
@Configuration
public class RedissonConfig {
@Value("${spring.redis.host}")
public String host;
@Value("${spring.redis.port}")
public String port;
@Value("${spring.redis.password}")
private String password;
@Bean
public RedissonClient redissonClient() {
// 单机redis配置
Config config = new Config();
String redisAddress = "redis://" + host + ":" + port;
config.useSingleServer().setAddress(redisAddress).setPassword(password);
// 设置分布式锁 watch dog超时时间
// config.setLockWatchdogTimeout()
return Redisson.create(config);
}
}
3.3 使用分布式锁
public class HelloController {
private RedissonClient redissonClient;
public String deductProduct(String userId) {
//获取分布式锁的锁对象
RLock lock = redissonClient.getLock("product");
//加分布式锁
lock.lock(); //set nx ex 30s
try {
//业务逻辑
}finally {
//释放锁
lock.unlock();
}
}
}
3.4 Redission源码分析
3.5 Redis主从架构下,分布式锁失效
Redis的集群架构保证AP,Zookeeper的集群架构保证CP。
但是Redis在主从情况下会出现锁丢失的问题,如何解决呢? => 红锁RedLock