目录

Java并发编程实战单例模式-阻塞队列的终极实现指南

【Java】并发编程实战:单例模式 + 阻塞队列的终极实现指南

各位看官,大家早安午安晚安呀~~~

如果您觉得这篇文章对您有帮助的话

**欢迎您一键三连,小编尽全力做到更好

欢迎您分享给更多人哦**

https://i-blog.csdnimg.cn/direct/151c4f80248f4633bb12f6fb03384638.png

今天我们来学习【Java】并发编程实战:单例模式 + 阻塞队列的终极实现指南

1.单例模式

先回答问题: https://i-blog.csdnimg.cn/direct/0e2dae293164403f841907c629bd73b1.png

提问:

单例模式是什么呢?我们在某一个特定的场景下只能new一个对象。

譬如代码里面用来管理对象就应该是单例的

像:MySQL中的JDBC编程中的DataSource(描述了mysql服务器的位置,这个服务器的描述信息就应该是单例的)

那我就要问了,那我new一次对象不就好了,还整个这个模式干嘛呢?

这个不靠谱呀,一不小心我们new多个对象了不就完蛋了?所以这个时候我们就需要一个模式 让编译器帮我们做检查

就像final关键字(我们不能修改这个变量,不然编译器就会报错)同理:interface,@Override,throws也是相同的道理

但是:java在语法层面上没有对单例模式作出支持,所以我们只能通过一些编程技巧来实现类似的效果

1.1饿汉模式

第一种方式(饿汉模式):先类里面创建出有一个实例,然后把构造方法给隐藏起来(你就new不了了,哈哈)

class Singleton {
    private static Singleton instance = new Singleton();
  //静态成员变量,类加载的时候就被创建好了
    public static Singleton getInstance(){  // 一定要是静态方法,不然一开始别人都拿不到这个对象,又没办法调用这个方法,岂不是贻笑大方
        return instance;
    }
    private Singleton(){} // 啥也不用干,也干了呀,把构造方法给藏起来了

}
public class SingletonDemo{
    public static void main(String[] args) {
       // Singleton singleton = new Singleton();
  // 直接报错,编译器帮我们检查
        Singleton singleton1 = Singleton.getInstance();  // 都通过getInstance创建实例
        Singleton singleton2 = Singleton.getInstance();
        System.out.println(singleton1 == singleton2); // true,俩对象引用都一致
    }
}

**这种方式也叫做饿汉模式,(这么急切)类一加载对象就被创建好了

(创建的时机比较早)**

那我们能不能不先创建呢?我想用的时候再去创建,当然可以,

这个就叫做懒汉模式

1.2.懒汉模式

class SingletonLazy{
    private static SingletonLazy instance = null; // 先置为null
    public static SingletonLazy getInstance(){
        if(instance == null){  // 如果不为空直接返回instance
            instance = new SingletonLazy(); //  为null 再new
        }
        return instance;
    }
    private SingletonLazy(){} // 私有构造方法
}

1.3.饿汉模式和懒汉模式有一个是线程不安全的

我们第一次说的线程不安全的时候是在count++的问题上面,这个 **count++这个操作再cpu上面是分三步进行的

(两个线程同时修改一个变量就很容易出现线程安全问题)** (非原子当时加锁解决的,当然还有其他解决办法)。

解释:

1.饿汉模式: (我们在类加载的时候就把对象创建好了,大家调用getInstance() 就只是涉及到读操作)

2.懒汉模式: 我们需要的时候才去创建这个对象, 这个时候就会涉及到两个线程同时修改一个变量。

并且new对象的操作也不是原子的

1.申请内存空间

2.在内存上面构造对象

3.把内存的地址赋值给instance引用(一共这三步)

1.3.1.那我们把new这个操作变成原子的可以了吗?
class SingletonLazy{
    private static SingletonLazy instance = null; // 先置为null
    public static SingletonLazy getInstance(){
        if(instance == null){  // 如果不为空直接返回instance
            synchronized(SingletonLazy.class){
                instance = new SingletonLazy(); // 把这一步变成原子的
            }
        }
        return instance;
    }
    private SingletonLazy(){} // 私有构造方法
}

现在确实new对象的时候不会被穿插了,但是这个锁的位置不对

**我们判断这个引用是否为null的时候也被穿插的了呢。另一个线程这个时候也来判断这个引用是否为null。

好了这两个线程判断的都是null,又创建了两个对象(图解如下)**

https://i-blog.csdnimg.cn/direct/f75234a0d8ad41359501475e6fd006b3.png

总结:​​​​​​加锁的关键是:

**不是锁一个操作在cpu上是多个指令就只把这个操作锁起来,

而是要综合代码仔细分析,看看哪些操作是要一起被锁起来的**

**特别是这种判断语句:

这个地方最容易出问题     (小编也是写博客的时候才意识到)** https://i-blog.csdnimg.cn/direct/4edf071ecf9c4b229f4dfc713ecf3989.png (好开心哈哈)

1.3.2.所 以说这个锁的加法很有说法的!

我们这个时候就可以把这个判断语句一起加上锁了

 public static SingletonLazy getInstance(){
        synchronized (SingletonLazy.class){
            if(instance == null){
                instance = new SingletonLazy();
            }
        }
         return instance;
    }

现在是解决了刚才的问题但是,又引发了新的问题

一旦我们这样写: 后续每次调用getInstance()都会进行加锁 ,但是懒汉模式就只有在最开始的时候(后面都是读的操作了,线程就安全了)

两个很大的问题:

1.我们一直加锁,加锁就会涉及到锁冲突,然后在阻塞等待(鬼知道你下次上cpu是啥时候,时间相对是很大被浪费了)

2.一个代码一旦涉及到加锁(那么这个代码基本就和高性能无缘了,而且我们还是每一次都加锁)

1.3.3.那我们能不能既可以让线程安全,又不会对效率有太大的影响呢(只加第一次的锁)?

当然可以!!!:我们现在无非就是担心后面每次getInstance都要加锁,那我们再加一层if(instance == null )不就行了?后面就不需要进入到加锁的那一步了

 public static SingletonLazy getInstance(){
        if(instance == null){
            synchronized (SingletonLazy.class){
                if(instance == null){
                    instance = new SingletonLazy();
                }
            }
        }
        return instance;
    }

https://i-blog.csdnimg.cn/direct/a78607be7a664abab06add605141dea3.png

我们的代码看似已经很完美了

但是但是但是(还是有问题)!!!

1.3.4.指令重排序问题

指令重排序也会对上述的代码产生影响

(也是编译器为了执行效率,帮助我们调整代码的执行顺序,调整的前提是保证代码的逻辑不变)

上一次编译器是帮助我们 把一些要读内存的操作优化成读寄存器,我们用volatile就能解决这个问题,我们这一次也是用volatile解决这个问题。我给大家分析一下。

https://i-blog.csdnimg.cn/direct/6e31ba741c5b45cc9af735b5e53fc5af.png

针对这个情况:我们给instance加上volatile就可以了(告诉编译器你不用帮我优化(指令重排序)),完结撒花!!!

正确的完整的代码如下

class SingletonLazy{
    private static volatile SingletonLazy instance = null; // 先置为null
    public static SingletonLazy getInstance(){
        if(instance == null){
            synchronized (SingletonLazy.class){
                if(instance == null){
                    instance = new SingletonLazy();
                }
            }
        }
        return instance;
    }
    private SingletonLazy(){} // 私有构造方法
}

https://i-blog.csdnimg.cn/direct/6434635903704e77a27da74c6226aa70.png

三个注意点:

1.加锁的位置

2.第二层if(我们只需要第一次new的时候加锁)

3.指令重新排序问题

但是但是但是但是!!!哈哈哈,还是有一点小问题:

这个单例模式可以被反射打破,或者序列化/反序列化打破(不过这个小编也不太懂,后续小编懂了再讲解吧~)

2.阻塞队列

我们接下来来讲解一下阻塞队列吧~~那什么是阻塞队列呢?

阻塞队列是一种特殊的队列(线程安全的队列)

1.如果队列为 ,继续出队列,就会发生阻塞,阻塞到其他线程 往队列里面添加 元素为止

2.如果队列为 ,继续入队列,就会发生阻塞,阻塞到其他线程 从队列里面取出 元素为止

这个队列最大的用处: 用来实现 “生产者消费者模型”(一种常见的多线程代码编写方式

那生产者消费者模型有什么意义呢?

1.解耦合

2.削峰填谷:阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力

在 “秒杀” 场景下,消费者服务器同一时刻可能会收到大量的支付请求. 如果直接处理这些支付请求, 服务器可能扛不住,这个时候就可以把这些请求都放到一个阻塞队列中, 然后再由 消费者 线程慢慢的来处理每个支付请求

2.1解耦合

两张图

https://i-blog.csdnimg.cn/direct/8af0aeb1e99646af8231ec75f3ded1e1.png

https://i-blog.csdnimg.cn/direct/7bcec6a4d5af486a95285058fd63c8b2.png

2.2:削峰填谷

https://i-blog.csdnimg.cn/direct/34a59529707246018afd463b1e1fa03d.png

https://i-blog.csdnimg.cn/direct/3eb72719b05f485a9c57f8935928f71d.png

2.3.阻塞队列实现

在java标准库里面已经提供了现成的阻塞队列,我们先学习一下怎么使用 https://i-blog.csdnimg.cn/direct/d2aa5ac63c78425aaea52a67d6042a27.png

解释:

我们可以看到 这个阻塞队列继承了我们的队列接口所以说Queue这里提供的各种方法对于BlockingQueue也是可以使用的,但是我们尽量还是不要去使用Queue里面的方法(因为是线程不安全的)

BlockingQueue的两个主要方法:

1.put方法:阻塞入队列(如果队列满了,只有等其他线程take走了一个元素,才能够放put元素)

2.take方法:阻塞出队列(如果队列为空,只有等其他线程put了一个元素,才能继续take元素)

众所周知:队列有用数组实现的(一般是环形队列),也有用链表实现的

自然我们的阻塞队列也是这个样子。

接下来我们先学习一下怎么使用的吧,然后我们在进行模拟实现

public static void main1(String[] args) throws InterruptedException {
       // BlockingDeque<String> queue1 = new ArrayBlockingQueue<>(); // 数组实现的阻塞队列的
        BlockingQeque<String> queue = new LinkedBlockingQeque<>();
        queue.put("111"); // 要抛出异常,一般带阻塞的都会抛出InterruptException
        queue.put("222");
        queue.put("333");
        queue.put("444");// 放进去四个元素
        
        String s = queue.take();   // 取出元素
        System.out.println(s);
        
        s = queue.take();
        System.out.println(s);
        s = queue.take();
        System.out.println(s);
        s = queue.take();
        System.out.println(s);
        s = queue.take();
        System.out.println(s);//我们发现这个时候已经阻塞了,队列里面没有元素了

    }

2.4:模拟实现(用循环数组实现)

https://i-blog.csdnimg.cn/direct/1953534777c94f94b6a1535c51b7bbd2.png

这里还是我们以前的问题,如果head和tail重合了,那么这个队列到底是满了还是空的?

两个解决办法:

1.浪费一个数组格子 ,(tail +1) % data.length = head(这个时候就是满了)。head = tail这个时候就是空的

2.专门搞一个size(数组有效元素个数) ,如果size = data.length(这个时候就是满的)。如果 size = 0(这个时候就是空的)

我用第二种方式给大家先写一个正常循环队列

   class MyBlockingQueue{
    String[] data = new String[1000];
    private int head; // 头
    private int tail;  // 尾巴进头出
    private int size;// 有效元素个数

 // 两个主要的方法;
    public void put(String elem){
        if(size == data.length){  // 有效元素个数 = 数组长度
            return;
        }
        data[tail] = elem;  // 别搞elem[tail] = elem ,都混乱了

        tail++;
        // 数组比实际元素个数 -1; 下标为1000时说明刚好转了一圈回来了,同时下标置0
        if(tail == data.length){
            tail = 0;
        }
        size++; // 有效元素++
    }
    public String take(){
        if(size == 0){
            return null;
        }
        // 队列不为空,就把对首元素删除,先保存一下
        String ret = data[head];
        head++;
        if(head == data.length){
            head = 0;
        }
        size--;
        return ret;
    }

}

这个时候 我们在这个循环队列的上进行线程安全的改进:

第一个改进:

我们可以看到,我们这里的take和put操作几乎每一步都涉及到 变量的修改 或者 **条件判断(上一篇博客我们刚讲过条件判断这里最容易出现内存可见性问题

(从读内存变成读寄存器)

)** ,我们索性直接给 **两个方法都给加上锁。

(并且给head,tail,size都加上volatile

(因为他们每一个都涉及到了条件判断))**

第二个改进:

**put方法:阻塞入队列(如果队列满了,只有等其他线程take走了一个元素才能够放put元素)

(take走元素的时候我们就可以通知另一个线程可以put元素了)**

**take方法:阻塞出队列(如果队列为空,只有等其他线程put了一个元素才能继续take元素)

(put进去元素的时候我们就可以通知另一个线程可以take元素了)**

代码如下:(小编接下来有两个问题,这个代码还是不很完善,需要把问题解决)

class MyBlockingQueue{
    String[] data = new String[1000];
    volatile private int head; // 头
    volatile private int tail;  // 尾巴进头出
    volatile private int size;// 有效元素个数
    //take 和 put方法几乎每一步都涉及到修改值,而且还有判断(这个最容易)被
    synchronized public void put(String elem) throws InterruptedException {  // 扔出异常还是???
        if(size == data.length){  // 有效元素个数 = 数组长度
            this.wait();  // 队列满了,等待别的线程取走元素,然后再put元素
        }
        data[tail] = elem;  // 别搞elem[tail] = elem ,都混乱了

        tail++;
        // 数组比实际元素个数 -1; 下标为1000时说明刚好转了一圈回来了,同时下标置0
        if(tail == data.length){
            tail = 0;
        }
        this.notify(); // 提醒另一个线程我们已经放进去元素了

        size++; // 有效元素++
    }

       synchronized  public String take() throws InterruptedException {
        if(size == 0){
            this.wait();
        }
        // 队列不为空,就把对首元素删除,先保存一下
        String ret = data[head];
        head++;
        if(head == data.length){
            head = 0;
        }
        size--;
        this.notify(); // 提醒另一个线程我们已经拿走一个元素了
        return ret;
    }
   
}

这里小编有两个问题,写这个代码的时候不是很明白?

问题一:为什么put和take要用同一把锁?

问题二:我wait的时候抛出的异常是应该try-catch还是直接抛出去呢?

2.5:解决问题

解决问题一:

我不会

解决问题二:

**这个有一个很大问题, 我们wait的时候不仅仅可以被notify唤醒

还可以被interrupt唤醒**

我们队列满的时候还要put时需要wait,interrupt唤醒时会抛出异常(这个时候我们这个线程就以为是notify唤醒的)这个时候就出事了。

如图:

https://i-blog.csdnimg.cn/direct/a9a0be80cc1b444897a67faf3294a390.png

所以我们在被唤醒的时候要多加一步看看是否是因为interrupt导致的唤醒(如果是那就继续wait)。那又被唤醒又要检查,再wait (套娃了哈哈哈哈)

所以我们直接循环一下就好了


        while(size == data.length){  // 被唤醒检查一下还是满的话就不是notify唤醒的
            this.wait();  // 队列满了,等待别的线程取走元素,然后再put元素
        }
        

所以说如果是我们抛出异常的话程序就终止了,其实没问题, **但是try-catch我们处理不当的话就容易出问题(保险起见我们还是while检查)

(这个操作是java标准文档建议的哈哈哈)**

https://i-blog.csdnimg.cn/direct/0f92523a4dc34a719f35e06e32fbfa99.png

OK!!!最后的终极代码来了!!!

class MyBlockingQueue{
    String[] data = new String[1000];
    volatile private int head; // 头  volatile
    volatile private int tail;  // 尾巴进头出
    volatile private int size;// 有效元素个数
    //take 和 put方法几乎每一步都涉及到修改值,而且还有判断(这个最容易)被
    synchronized public void put(String elem) throws InterruptedException {  // 扔出异常还是???
        while(size == data.length){  // 有效元素个数 = 数组长度
            this.wait();  // 队列满了,等待别的线程取走元素,然后再put元素
        }
        data[tail] = elem;

        tail++;
        // 数组比实际元素个数 -1; 下标为1000时说明刚好转了一圈回来了,同时下标置0
        if(tail == data.length){
            tail = 0;
        }
        this.notify(); //唤醒(小编解释了在上文中)

        size++; // 有效元素++
    }

       synchronized  public String take() throws InterruptedException {
        while (size == 0){
            this.wait();
        }
        // 队列不为空,就把对首元素删除,先保存一下
        String ret = data[head];
         head++;
        if(head == data.length){
            head = 0;
        }
        size--;
        this.notify(); 
        return ret;
    }

}

实话说,问了ai我第一个问题还是没搞懂,唉。有点累,这个问题明明问问其他同学或者老师吧。真的有点累。

没事还有一个使用我这个阻塞队列。

2.6.验证我的阻塞队列,实现一个简单的生产者消费者模型

class MyBlockingQueue{
    String[] data = new String[1000];
    volatile private int head; // 头  volatile
    volatile private int tail;  // 尾巴进头出
    volatile private int size;// 有效元素个数
    //take 和 put方法几乎每一步都涉及到修改值,而且还有判断(这个最容易)被
    synchronized public void put(String elem) throws InterruptedException {  // 扔出异常还是???
        while(size == data.length){  // 有效元素个数 = 数组长度
            this.wait();  // 队列满了,等待别的线程取走元素,然后再put元素
        }
        data[tail] = elem;

        tail++;
        // 数组比实际元素个数 -1; 下标为1000时说明刚好转了一圈回来了,同时下标置0
        if(tail == data.length){
            tail = 0;
        }
        this.notify(); //唤醒(小编解释了在上文中)

        size++; // 有效元素++
    }

       synchronized  public String take() throws InterruptedException {
        while (size == 0){
            this.wait();
        }
        // 队列不为空,就把对首元素删除,先保存一下
        String ret = data[head];
        head++;
        if(head == data.length){
            head = 0;
        }
        size--;
        this.notify();
        return ret;
    }

}

public class Demo1 {
    public static void main(String[] args) {
        MyBlockingQueue queue = new MyBlockingQueue();
        Thread t1 = new Thread(() -> {
            int count = 0;
            while (true) {
                try {
                    Thread.sleep(500);
                    queue.put(" " + count);  // 慢点生产
                    System.out.println(" put " + count);
                    count++;
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });

        Thread t2 = new Thread(() -> {
            while (true) {
                try {
                    String s = queue.take();
                    System.out.println(" take" + s);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });

        t1.start();
        t2.start();

    }

结果:生产多少消费多少。

https://i-blog.csdnimg.cn/direct/34b47fa02bfc425396499c35417fd03b.png

如果代码这样改写(生产的快,消费的慢)

    public static void main(String[] args) {
        MyBlockingQueue queue = new MyBlockingQueue();
        Thread t1 = new Thread(() -> {
            int count = 0;
            while (true) {
                try {
                    queue.put(" " + count);
                    System.out.println(" put " + count);
                    count++;
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });

        Thread t2 = new Thread(() -> {
            while (true) {
                try {
                    String s = queue.take();
                    System.out.println(" take" + s);
                    Thread.sleep(500);  //  消费慢一点
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });

        t1.start();
        t2.start();
    }

结果:

https://i-blog.csdnimg.cn/direct/0941998e986347d4bbf47a022eaffd72.png

小编这里还有一个问题

2.4的问题一:为什么put和take要用同一把锁?

小编一直没懂,大家一起来讨论呀

上述就是【Java】并发编程精要:单例模式 + 阻塞队列的终极实现指南

的全部内容了。单例模式与阻塞队列是构建高并发系统的两大核心设计利器,对于我们解决高并发问题提供了很好的思路~~~

预知后事如何,请听下回分解~~~

能看到这里相信您一定对小编的文章有了一定的认可。

**有什么问题欢迎各位大佬指出

欢迎各位大佬评论区留言修正~~

https://i-blog.csdnimg.cn/direct/01086e152c724175828d9b69206ce934.gif https://i-blog.csdnimg.cn/direct/2ab2bf5d651b402e9289a57263b99fab.gif 您的支持就是我最大的动力​​​!!!**

最后记得和小编一起解决问题呀