目录

分布式锁技术全景解析从传统锁机制到MySQLRedisRedisson与ZooKeeper实现

分布式锁技术全景解析:从传统锁机制到MySQL、Redis/Redisson与ZooKeeper实现



一、分布式锁介绍

1.1 为什么需要分布式锁

在单机部署的系统中,使用线程锁来解决高并发的问题,多线程访问共享变量的问题达到数据一致性,如使用synchornized、ReentrantLock等;

但是在后端集群部署的系统中,程序在不同的JVM虚拟机中运行,且因为synchronized或ReentrantLock都只能保证同一个JVM进程中保证有效,所以这时就需要使用分布式锁了。

1.2 什么是分布式锁?

分布式锁其实就是,控制分布式系统不同进程共同访问共享资源的 一种锁的实现。如果不同的系统或同一个系统的不同主机之间共享 了某个临界资源,往往需要互斥来防止彼此干扰,以保证一致性。

https://i-blog.csdnimg.cn/direct/0537f57b0ed24b6c893a63b64c1b5dfa.png

1.3 分布式锁特点

https://i-blog.csdnimg.cn/direct/c002ffce514a4074831a323aa64067e6.png

二. 传统锁回顾

2.1商品超卖演示

create table mall_stock (
	id int primary key auto_increment COMMENT '库存ID',
	product_id varchar(20) not null COMMENT '商品编号',
	sock_id int not null default 1 COMMENT '仓库ID',
	count int not null default 0 COMMENT '数量'
)

搭建环境

  • 压测

    下载地址:

    https://i-blog.csdnimg.cn/direct/e4ded249eaf24b0795de3e480229d433.png

    双击jemter.bat运行软件

2.2 JVM锁演示

  • 在方法上添加同步关键字
  • ReentrantLock对象使用
ReentrantLock lock=new ReentrantLock();

https://i-blog.csdnimg.cn/direct/832731334c5c4e9e9b58288fde0ef4b6.png

2.3 JVM锁失效的三种情况

2.3.1 多例模式

业务对象或锁对象是多例的情况下

原因:业务中一般使用的lock对象锁,lock锁的范围是针对同一个对象里面不同的线程,也就是说,jvm锁是对象锁,对象之间锁不共用

https://i-blog.csdnimg.cn/direct/5dc06b5657b04aa2af7bad741ceb4638.png

@Scope("singleton") // prototype  原型模式(多例)singleton  单例模式(单例)
public class LockController

2.3.2 事务

在使用了spring事物注解的情况下(不单是jvm锁,大部分锁实现都会出现这个问题)

原因:spring事务是基于aop的方式实现的,是包裹着整个方法的(包括锁),事务不在锁的范围内,很容易出现并发执行的时候,a方法的事务还没提交上去,b事务就读了数据库的旧值。

https://i-blog.csdnimg.cn/direct/ed68f49d447e407c917af1e2e529e0c9.png

https://i-blog.csdnimg.cn/direct/8c3085405cf742f5a08788d4ebd4fcf3.png

2.3.3 分布式集群

原因 :服务都不一样了,锁和对象自然也不一样(就和第一个情况下的环境一样)

解决方法 :利用mysql的排他锁机制,将所有业务sql集中成一条sql(以上三种问题都能解决,但是不灵活,只能在业务允许的情况下使用)

总结

综上所述,我们可以发现jvm锁只适合在单体项目中并且业务需求简单的情况下使用,所以有条件还是使用分布式锁吧。

三. 基于mysql实现分布式锁

3.1 一条SQL

https://i-blog.csdnimg.cn/direct/7dcfbc2cec854722b360c7f627db645c.png

update ,insert,delete 写操作本身带排他锁

优点 :一个sgl语句:更新数量时判断解决:解决了上面三个锁失效的问题

缺点

1 锁范围问题:是表级锁还是行级锁

一个sgl语句:更新数量时判断解决:解决了上面三个锁失效的问题,但是它是表级锁,这种是不能接受的,我要买多种商品结果你把表锁了,整张表都不能并发了、性能肯定就是不行的,最好使用行级锁。

mysql悲观锁中使用行级锁

1,锁的查询或者跟新条件必须是索引字段

2,查询或者更新条件必须是具体值

2.同一个商品有多条库存记录:仓库有多个、商品ID是一个,可以根据算法减库存、一个sql语句做不到

3.无法记录库存变化前后的状态

Mysql锁的区分

参考:

3.2 悲观锁

悲观锁认为被它保护的数据是极其不安全的,每时每刻都有可能被改动,一个事务拿到悲观锁后,其他任何事务都不能对该数据进行修改,只能等待锁被释放才可以执行。

数据库中的行锁,表锁,读锁,写锁均为悲观锁。

select ....  for update
service
@Transactional
public Boolean reduceStock(String productId, int count) {
    // 查询商品库存
    Stock stock = stockMapper.getStockByProductId(productId);
    if (stock!= null && stock.getCount() >= count) {
        // 减少库存
        stock.setCount(stock.getCount() - count);
        this.updateById(stock); // 更新库存
        return true;     // 减库存成功
    }
    return false;
}

mapper.xml

<select id="getStockByProductId" resultType="com.syh.model.entity.Stock">
      select <include refid="Base_Column_List"/>
      from mall_stock
      where product_id = #{productId} for update
  </select>

问题:

1,性能问题

2,死锁问题

3,库存操作要统一

3.3 乐观锁

乐观锁认为数据的变动不会太频繁。

乐观锁通常是通过在表中增加一个版本(version)或时间戳(timestamp)来实现,其中,版本最为常用。

事务在从数据库中取数据时,会将该数据的版本也取出来(v1),当事务对数据变动完毕想要将其更新到表中时,会将之前取出的版本v1与数据中最新的版本v2相对比,如果v1=v2,那么说明在数据变动期间,没有其他事务对数据进行修改,此时,就允许事务对表中的数据进行修改,并且修改时version会加1,以此来表明数据已被变动。

如果,v1不等于v2,那么说明数据变动期间,数据被其他事务改动了,此时不允许数据更新到表中,一般的处理办法是通知用户让其重新操作。不同于悲观锁,乐观锁通常是由开发者实现的。(CAS机制:Compare And Swap 比较并交换)

给表添加version字段

https://i-blog.csdnimg.cn/direct/ee609f845f704ab088fe8b40d206431d.png

service方法

public Boolean reduceStock(String productId, int count) {
        // 查询商品库存
        Stock stock = stockMapper.selectOne(new QueryWrapper<Stock>().eq("product_id", productId));
        if (stock != null && count >= 0) {
            stock.setCount(stock.getCount() - count);//改数量
            Integer version = stock.getVersion();   // 原版本号
            stock.setVersion(version + 1);//改版本号
            if(!this.update(stock,new QueryWrapper<Stock>().eq("product_id", productId)
                    .eq("version", version))){
                // 更新库存 更新失败重试
                try {
                    Thread.sleep(50);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                reduceStock(productId, count);
            }
        }
        return false;
    }

问题:

1,高并发情况下,性能极低

2,读写分离情况下导致乐观锁不可靠

3.4 总结

性能 :一个sql>悲观锁>jvm锁>乐观锁

如果追求极致性能、业务场景简单并且不需要记录数据前后变化的情况下。

优先选择 :一个sql

如果写并发量较低(多读),争抢不是很激烈的情况下优先选择:乐观锁

如果写并发量较高,一般会经常冲突,此时选择乐观锁的话,会导致业务代码不间断的重试。

优先选择:mysql悲观锁

不推荐jvm本地锁。

基于mysql实现分布式锁

不管是jvm锁还是mysql锁,为了保证线程的并发安全,都提供了悲观独占排他锁。所以独占排他也是 分布式锁的基本要求。 可以利用唯一键索引不能重复插入的特点实现。设计表如下:

CREATE TABLE `db_lock` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `lock_name` varchar(50) NOT NULL COMMENT '锁名',
  `class_name` varchar(100) DEFAULT NULL COMMENT '类名',
  `method_name` varchar(50) DEFAULT NULL COMMENT '方法名',
  `server_name` varchar(50) DEFAULT NULL COMMENT '服务器ip',
  `thread_name` varchar(50) DEFAULT NULL COMMENT '线程名',
  `create_time` timestamp NULL DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP 

  COMMENT '获取锁时间',
  `desc` varchar(100) DEFAULT NULL COMMENT '描述',
  PRIMARY KEY (`id`),
  UNIQUE KEY `idx_unique` (`lock_name`)
) ENGINE=InnoDB AUTO_INCREMENT=1332899824461455363 DEFAULT CHARSET=utf8;

Lock实体类:

@Data
@AllArgsConstructor
@NoArgsConstructor
@TableName("db_lock")
public class Lock {
    private Long id;
    private String lockName;
    private String className;
    private String methodName;
    private String serverName;
    private String threadName;
    private Date createTime;
    private String desc;
}

LockMapper接口:

public interface LockMapper extends BaseMapper<Lock> {
}

2.1. 基本思路

synchronized 关键字和 ReetrantLock 锁都是独占排他锁,即多个线程争抢一个资源时,同一时刻只有 一个线程可以抢占该资源,其他线程只能阻塞等待,直到占有资源的线程释放该资源。

https://i-blog.csdnimg.cn/direct/af7450774cfe4956b621f8bbfe0feb1d.png

  1. 线程同时获取锁(insert)
  2. 获取成功,执行业务逻辑,执行完成释放锁(delete)
  3. 其他线程等待重试

2.2. 代码实现

改造StockService:

@Service
 
public class StockService {
    @Autowired
    private StockMapper stockMapper;
    @Autowired
    private LockMapper lockMapper;
    /**
     * 数据库分布式锁
     */
    public void checkAndLock() {
        // 加锁
        Lock lock = new Lock(null, "lock", this.getClass().getName(), new 
Date(), null);
        try {
            this.lockMapper.insert(lock);
       } catch (Exception ex) {
            // 获取锁失败,则重试
            try {
                Thread.sleep(50);
                this.checkAndLock();
           } catch (InterruptedException e) {
                e.printStackTrace();
           }
       }
        // 先查询库存是否充足
        Stock stock = this.stockMapper.selectById(1L);
        // 再减库存
        if (stock != null && stock.getCount() > 0){
            stock.setCount(stock.getCount() - 1);
            this.stockMapper.updateById(stock);
       }
        // 释放锁
        this.lockMapper.deleteById(lock.getId());
   }
}

加锁:

// 加锁
Lock lock = new Lock(null, "lock", this.getClass().getName(), new Date(), null);
try {
    this.lockMapper.insert(lock);
} catch (Exception ex) {
    // 获取锁失败,则重试
    try {
        Thread.sleep(50);
        this.checkAndLock();
   } catch (InterruptedException e) {
        e.printStackTrace();
   }
}

解锁:

// 释放锁
this.lockMapper.deleteById(lock.getId());

2.3. 缺陷及解决方案

  1. 这把锁强依赖数据库的可用性,数据库是一个单点,一旦数据库挂掉,会导致业务系统不可用。

    解决方案:给锁数据库 搭建主备

  2. 这把锁没有失效时间,一旦解锁操作失败,就会导致锁记录一直在数据库中,其他线程无法再获得到锁。

    解决方案:只要做一个定时任务,每隔一定时间把数据库中的超时数据清理一遍。

  3. 这把锁是非重入的,同一个线程在没有释放锁之前无法再次获得该锁。因为数据中数据已经存在了。

    解决方案:记录获取锁的主机信息和线程信息,如果相同线程要获取锁,直接重入。

  4. 受制于数据库性能,并发能力有限。

    解决方案:无法解决。

四. 基于Redis实现分布式锁

4.1 基本实现

借助于redis中的命令setnx(key, value),key不存在就新增,存在就什么都不做。同时有多个客户端发 送setnx命令,只有一个客户端可以成功,返回1(true);其他的客户端返回0(false)。

https://i-blog.csdnimg.cn/direct/db920308242644dab794f34fe2f194aa.png

● 1. 多个客户端同时获取锁(setnx)

● 2. 获取成功,执行业务逻辑,执行完成释放锁(del)

● 3. 其他客户端等待重试

@Service
public class StockService {
    @Autowired
    private StockMapper stockMapper;
    @Autowired
    private LockMapper lockMapper;
    @Autowired
    private StringRedisTemplate redisTemplate;
    public void checkAndLock() {
        // 加锁,获取锁失败重试
        while (!this.redisTemplate.opsForValue().setIfAbsent("lock","xxx")){
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        // 先查询库存是否充足
        Stock stock = this.stockMapper.selectById(1L);
        // 再减库存
        if (stock != null && stock.getCount() > 0){
            stock.setCount(stock.getCount() - 1);
            this.stockMapper.updateById(stock);
        }
        // 释放锁
        this.redisTemplate.delete("lock");
    }
}

4.2 防死锁

https://i-blog.csdnimg.cn/direct/a24927e038614d368a73485f5a313cea.png

解决 :给锁设置过期时间,自动释放锁。 设置过期时间两种方式:

  1. 通过expire设置过期时间(缺乏原子性:如果在setnx和expire之间出现异常,锁也无法释放)

  2. 使用set指令设置过期时间:set key value ex 3 nx(既达到setnx的效果,又设置了过期时间)

    https://i-blog.csdnimg.cn/direct/b45fb9d058d64dc6aa41940b14a15d87.png

4.3 防误删

问题 :可能会释放其他服务器的锁。 场景:如果业务逻辑的执行时间是7s。执行流程如下

  1. index1业务逻辑没执行完,3秒后锁被自动释放。
  2. index2获取到锁,执行业务逻辑,3秒后锁被自动释放。
  3. index3获取到锁,执行业务逻辑
  4. index1业务逻辑执行完成,开始调用del释放锁,这时释放的是index3的锁,导致index3的业务只 执行1s就被别人释放。 最终等于没锁的情况。

解决 :setnx获取锁时,设置一个指定的唯一值(例如:uuid);释放前获取这个值,判断是否自己的 锁

https://i-blog.csdnimg.cn/direct/7b84977268684c1d8cc5e68f8cd8415e.png

https://i-blog.csdnimg.cn/direct/3f79162b6e964621b48df580bdcadee1.png

问题 :删除操作缺乏原子性。 场景:

  1. index1执行删除时,查询到的lock值确实和uuid相等

  2. index1执行删除前,lock刚好过期时间已到,被redis自动释放

  3. index2获取了lock 4. index1执行删除,此时会把index2的lock删除

    解决方案:没有一个命令可以同时做到判断 + 删除,所有只能通过其他方式实现(LUA脚本)

4.4 使用lua保证删除原子性

lua 脚本入门

Lua 是一种轻量小巧的脚本语言,用标准C语言编写并以源代码形式开放, 其设计目的是为了嵌入应用程序中,从而为应用程序提供灵活的扩展和定制功能。

Lua 是巴西里约热内卢天主教大学(Pontifical Catholic University of Rio de Janeiro)里的一个研究小组于 1993 年开发的,该小组成员有:Roberto Ierusalimschy、Waldemar Celes 和 Luiz Henrique de Figueiredo。

Redis 操作lua脚本

命令:EVAL命令

命令格式: EVAL script numkeys key [key …] arg [arg …]

script参数是一段 Lua5.1 脚本程序。脚本不必(也不应该[^1])定义为一个 Lua 函数

numkeys指定后续参数有几个key,即:key [key …]中key的个数。如没有key,则为0

key [key …] 从 EVAL 的第三个参数开始算起,表示在脚本中所用到的那些 Redis 键(key)。在Lua脚本中通过KEYS[1], KEYS[2]获取。

arg [arg …] 附加参数。在Lua脚本中通过ARGV[1],ARGV[2]获取。

删除LUA脚本:

if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', 
KEYS[1]) else return 0 end

代码实现:

public void checkAndLock() {
    // 加锁,获取锁失败重试
    String uuid = UUID.randomUUID().toString();
    while (!this.redisTemplate.opsForValue().setIfAbsent("lock", uuid, 3, TimeUnit.SECONDS)){
        try {
            Thread.sleep(50);
       } catch (InterruptedException e) {
            e.printStackTrace();
       }
   }
    // 先查询库存是否充足
    Stock stock = this.stockMapper.selectById(1L);
    // 再减库存
    if (stock != null && stock.getCount() > 0){
        stock.setCount(stock.getCount() - 1);
        this.stockMapper.updateById(stock);
   }
    // 释放锁
    String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return 
redis.call('del', KEYS[1]) else return 0 end";
    this.redisTemplate.execute(new DefaultRedisScript<>(script, Long.class),
                               Arrays.asList("lock"), uuid);
}

五. 使用Redisson实现分布式锁

导包 :

<dependency>
  <groupId>org.redisson</groupId>
  <artifactId>redisson-spring-boot-starter</artifactId>
  <version>3.24.3</version>
</dependency>

配置 :

package com.syh.config;

import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.codec.JsonJacksonCodec;
import org.redisson.config.Config;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RedissionConfig {
    @Value("${spring.data.redis.host}")
    private String redisHost;

    @Value("${spring.data.redis.password}")
    private String password;
    @Value("${spring.data.redis.port}")
    private int port;

    @Bean
    public RedissonClient getRedisson() {
        System.out.println(redisHost+":"+port);
        Config config = new Config();
        config.useSingleServer().
        setAddress("redis://" + redisHost + ":" + port).
        setPassword(password);
        config.setCodec(new JsonJacksonCodec());
        return Redisson.create(config);
    }
}

使用:

 public Boolean reduceStock(String productId, int count) {
        RLock rLock = redissonClient.getLock("lock");
        try {
            boolean isLocked = rLock.tryLock(3, TimeUnit.SECONDS);
            if (isLocked) {
                // TODO
                // Reids 查询库存
                String stock = redisTemplate.opsForValue().get("count");
                if (stock != null && Integer.parseInt(stock) >= count) {

                    redisTemplate.opsForValue().set("count", String.valueOf(Integer.parseInt(stock) - count));
                }
            }
        } catch (Exception e) { 
        }finally {
            if (rLock.isHeldByCurrentThread()) {
                rLock.unlock();
            }
        }

        return false;
    }

六. 基于zookeeper实现分布式锁

方法概述

ZooKeeper 提供了一种可靠的机制来实现分布式锁,这有助于解决分布式系统中的并发控制问题。为了确保多个节点之间的操作一致性,可以通过创建临时顺序节点并利用其唯一性和有序性特点来构建锁定逻辑。

当应用程序请求获取锁时,会在指定路径下创建一个带有特定前缀的临时顺序节点;随后通过检查当前所创建节点是否是最小编号的那个节点(即第一个),如果是,则认为成功获得了锁;如果不是,则监听比自己序号小一位的节点变化事件,在前任节点消失之后再次尝试获取锁直到成为最小编号为止。

对于异常状况如网络分区或者服务器崩溃等情况下的处理也非常重要。例如连接断开、会话过期等问题都需要被妥善考虑以保障锁的安全性与可靠性。另外,设置合理的超时时间可以帮助预防潜在的死锁现象发生。

示例代码

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;

public class SimpleDistributedLock {

    private static final String LOCK_ROOT_PATH = "/locks";
    
    public void acquireLock(ZooKeeper zk, String lockName) throws KeeperException, InterruptedException {
        String lockPathPrefix = LOCK_ROOT_PATH + "/" + lockName + "-";
        
        // Create ephemeral sequential node.
        String ourLockNode = zk.create(lockPathPrefix, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        
        while (true){
            List<String> childrenNodes = zk.getChildren(LOCK_ROOT_PATH, false);

            Collections.sort(childrenNodes); 

            int index = childrenNodes.indexOf(zk.getState().toString());
            
            if(index == 0){ 
                System.out.println(Thread.currentThread().getName() +" acquired the lock.");
                break;  
            }else{
                Stat predecessorStat = null;
                
                try {   
                    String watchOnPredecessor = LOCK_ROOT_PATH +"/"+childrenNodes.get(index-1);
                    
                    zk.exists(watchOnPredecessor,true);
                    
                    synchronized(this){
                        wait();
                    }
                } catch(Exception e){}
            }
        }
    }

    public void releaseLock(){
        // Release logic here...
        System.out.println(Thread.currentThread().getName()+" released the lock.");
    }
}

此段程序展示了如何在一个给定名称空间内竞争一把互斥锁的过程。请注意实际应用中还需要加入更多健壮性的设计以及错误恢复机制等细节。

推荐实践

考虑到复杂度和维护成本,在真实环境中建议采用成熟的第三方库比如 Curator 来简化开发工作量。Curator 对于上述提到的各种类型的锁都提供了良好的支持,并且经过了广泛的测试验证能够很好地适应生产环境的需求。

思维导图

https://i-blog.csdnimg.cn/direct/1cb04847557f480cb290ce10b67107ec.png