Javaの生产者消费者问题

生产者消费者问题(多线程应用经典案例)

生产者在生产商品,而消费者在消费生产的商品。生产者把生产的商品放进容器中,而消费者从容器中取出商品进行消费。可是在整个过程中,如果容器装满了,那么生产者应该停止生产,如果容器中没有商品了,消费应该停止消费。这就是一个典型的多生产,多消费的案例。

  1. 单生产单消费

    • 在学习过程中,为了代码简单明了,大家很容易看懂,就把上述的多生产和多消费进行简化,要求生产者生产一个商品,消费者消费一个商品,然后生产者继续生产,消费者进行消费,以此类推下去。
    • 分析案例:
    • 生产和消费同时执行,需要多线程。但是执行的任务却不相同,处理的资源确实相同的:线程间的通信。
    • 思路:1、描述一下资源。2、描述生产者,具备着自己的任务。3、描述消费者,具备着自己的任务。
//描述资源。属性:商品名称和编号,  行为:对商品名称赋值,获取商品。
class Resource
{
    private String name;
    private int count = 1;
    //对外提供设置商品的方法
    public void set(String name)
    {
        //给成员变量赋值并加上编号。
        this.name = name + count;
        //编号自增。
        count++;
        //打印生产了哪个商品。
        System.out.println(Thread.currentThread().getName()+".....生产了...."+this.name);
    }
    public void get()
    {
        System.out.println(Thread.currentThread().getName()+".....消费了...."+this.name);
    }
}
//描述生产者
class Producer implements Runnable
{
    private Resource r;
    //生产者以创建就应该明确资源
    Producer(Resource r)
    {
        this.r = r;
    }
    //生产者生产商品的任务
    public void run()
    {
        //生产者无限制的生产
        while(true)
        {
            r.set("面包");
        }
    }
}
//描述消费者
class Consumer implements Runnable
{
    private Resource r;
    //生产者以创建就应该明确资源
    Consumer(Resource r)
    {
        this.r = r;
    }
    //生产者生产商品的任务
    public void run()
    {
        while(true)
        {
            r.get();
        }
    }
}
class ThreadDemo5
{
    public static void main(String[] args)
    {
        //创建资源对象
        Resource r = new Resource();
 
        //创建生产者对象
        Producer pro = new Producer(r);
        //创建消费者对象
        Consumer con = new Consumer(r);
 
        //创建线程对象
        Thread t1 = new Thread(pro);
        Thread t2 = new Thread(con);
        //开启线程
        t1.start();
        t2.start();
    }
}

上述代码进行运行时发现有严重的问题。

问题1:数据错误:已经被生产很早期的商品,才被消费到。

出现线程安全问题,加入了同步解决。使用同步函数。

class Resource
{
    private String name;
    private int count = 1;
    //对外提供设置商品的方法
    public synchronized void set(String name)
    {
        //给成员变量赋值并加上编号。
        this.name = name + count;
        //编号自增。
        count++;
        //打印生产了哪个商品。
        System.out.println(Thread.currentThread().getName()+".....生产者...."+this.name);
    }
    public synchronized void get()
    {
        System.out.println(Thread.currentThread().getName()+".....消费者...."+this.name);
    }
}

先前问题已解决:不会在消费到之前很早期的商品。

但加入同步后又有新的问题产生了。

问题2:发现了连续生产却没有消费,同时对同一个商品进行多次消费。希望的结果应该是生产一个商品,就被消费掉。生产下一个商品。搞清楚几个问题?生产者什么时候生产呢?消费者什么时候应该消费呢?

当容器中没有面包时,就生产,如果有了面包,就不要生产。

当容器中已有面包时,就消费,如果没有面包,就不要消费。

2).等待唤醒机制

生产者生产了商品后应该告诉消费者来消费。这时的生产者应该处于等待状态。消费者消费了商品后,应该告诉生产者,这时消费者处于等待状态。

等待:wait();

通知:notify();//唤醒

问题解决:实现生产一个消费一个。

//描述资源。属性:商品名称和编号,  行为:对商品名称赋值,获取商品。
class Resource
{
    private String name;
    private int count = 1;
    private boolean flag = false;
    //对外提供设置商品的方法
    public synchronized void set(String name)
    {
        if(flag)
        {
            try{wait();}catch(InterruptedException e){}
        }
        //给成员变量赋值并加上编号。
        this.name = name + count;
        //编号自增。
        count++;
        //打印生产了哪个商品。
        System.out.println(Thread.currentThread().getName()+".....生产者...."+this.name);
        //将标记改为true。
        flag = true;
        //唤醒消费者。
        this.notify();
    }
    public synchronized void get()
    {
        if(!flag)
        {
            try{wait();}catch(InterruptedException e){}
        }
        System.out.println(Thread.currentThread().getName()+".....消费者...."+this.name);
        //将标记改为false。
        flag = false;
        //唤醒生产者。
        this.notify();
    }
}

等待/唤醒机制:

wait(): 会让线程处于等待状态,其实就是将线程临时存储到了线程池中。

notify():会唤醒线程池中任意一个等待的线程。

notifyAll():会唤醒线程池中所有的等待线程。

记住:这些方法必须使用在同步中,因为必须要标识wait,notify等方法所属的锁。同一个锁上的notify,只能唤醒该锁上的被wait的线程。

为什么这些方法定义在Object类中呢?

因为这些方法必须标识所属的锁,而锁可以是任意对象,任意对象可以调用的方法必然时Object类中的方法。

3).多生产多消费-1

上述程序只是一个生产和一个消费者,其实就是所谓的单生产和单消费,可是我们都知道生活中经常会有多个生产者和消费者,把代码改为多个生产者或多个消费者。

class ThreadDemo5
{
    public static void main(String[] args)
    {
        //创建资源对象
        Resource r = new Resource();
 
        //创建生产者对象
        Producer pro = new Producer(r);
        //创建消费者对象
        Consumer con = new Consumer(r);
 
        //创建线程对象
        Thread t1 = new Thread(pro);
        Thread t2 = new Thread(pro);
        Thread t3 = new Thread(con);
        Thread t4 = new Thread(con);
        //开启线程
        t1.start();
        t2.start();
        t3.start();
        t4.start();
    }
}

把生产者和消费者改为多个时,又有新的问题发生了。

问题1:生产了商品没有被消费,同一个商品被消费多次。

问题原因:

被唤醒的线程没有判断标记,造成问题1的产生。

解决:只要让被唤醒的线程必须判断标记就可以了。将if判断标记的方式改为while判断标记。记住:多生产多消费,必须时while判断条件。

当把if改为while之后又出现问题了。

问题2:发现while判断后,死锁了。

原因:生产方唤醒了线程池中生产方的线程。本方唤醒了本方。

解决:希望本方要唤醒对方,没有对应的方法,所以只能唤醒所有。

//描述资源。属性:商品名称和编号,  行为:对商品名称赋值,获取商品。
class Resource
{
    private String name;
    private int count = 1;
    private boolean flag = false;
    //对外提供设置商品的方法
    public synchronized void set(String name)
    {
        while(flag)
        {
            try{wait();}catch(InterruptedException e){}
        }
        //给成员变量赋值并加上编号。
        this.name = name + count;
        //编号自增。
        count++;
        //打印生产了哪个商品。
        System.out.println(Thread.currentThread().getName()+".....生产者...."+this.name);
        //将标记改为true。
        flag = true;
        //唤醒消费者。
        this.notifyAll();
    }
    public synchronized void get()
    {
        while(!flag)
        {
            try{wait();}catch(InterruptedException e){}
        }
        System.out.println(Thread.currentThread().getName()+".....消费者...."+this.name);
        //将标记改为false。
        flag = false;
        //唤醒生产者。
        this.notifyAll();
    }
}
Lock接口和Condition接口

4.1 Lock接口

4.1.1 synchronized的缺陷

synchronized是java中的一个关键字,也就是说是Java语言内置的特性。

如果一个代码块被synchronized修饰了,当一个线程获取了对应的锁,并执行该代码块时,其他线程便只能一直等待,等待获取锁的线程释放锁,而这里获取锁的线程释放锁只会有两种情况:

1)获取锁的线程执行完了该代码块,然后线程释放对锁的占有;

2)线程执行发生异常,此时JVM会让线程自动释放锁。

那么如果这个获取锁的线程由于要等待IO或者其他原因(比如调用sleep方法)被阻塞了,但是又没有释放锁,其他线程便只能干巴巴地等待,试想一下,这多么影响程序执行效率。因此就需要有一种机制可以不让等待的线程一直无期限地等待下去(比如只等待一定的时间或者能够响应中断),通过Lock就可以办到。

  再举个例子:

当有多个线程读写文件时,读操作和写操作会发生冲突现象,写操作和写操作会发生冲突现象,但是读操作和读操作不会发生冲突现象。

但是采用synchronized关键字来实现同步的话,就会导致一个问题:

如果多个线程都只是进行读操作,所以当一个线程在进行读操作时,其他线程只能等待无法进行读操作。因此就需要一种机制来使得多个线程都只是进行读操作时,线程之间不会发生冲突,通过Lock就可以办到。另外,通过Lock可以知道线程有没有成功获取到锁。这个是 synchronized无法办到的。
4.1.2 Lock接口

public interface Lock {
    void lock();
    void lockInterruptibly() throws InterruptedException;
    boolean tryLock();
    boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
    void unlock();
    Condition newCondition();
}

首先lock()方法是平常使用得最多的一个方法,就是用来获取锁。如果锁已被其他线程获取,则进行等待。

  由于在前面讲到如果采用Lock,必须主动去释放锁,并且在发生异常时,不会自动释放锁。因此一般来说,使用Lock必须在try{}catch{}块中进行,并且将释放锁的操作放在finally块中进行,以保证锁一定被被释放,防止死锁的发生。通常使用Lock来进行同步的话,是以下面这种形式去使用的:

Lock lock = new ReentrantLock();
lock.lock();
try{
    //处理任务
}catch(Exception ex){
      
}finally{
    lock.unlock();   //释放锁
}

tryLock()方法是有返回值的,它表示用来尝试获取锁,如果获取成功,则返回true,如果获取失败(即锁已被其他线程获取),则返回false,也就说这个方法无论如何都会立即返回。在拿不到锁时不会一直在那等待。

  tryLock(long time, TimeUnit unit)方法和tryLock()方法是类似的,只不过区别在于这个方法在拿不到锁时会等待一定的时间,在时间期限之内如果还拿不到锁,就返回false。如果如果一开始拿到锁或者在等待期间内拿到了锁,则返回true。

  所以,一般情况下通过tryLock来获取锁时是这样使用的:

Lock lock = new ReentrantLock();
if(lock.tryLock()) {
     try{
         //处理任务
     }catch(Exception ex){
          
     }finally{
         lock.unlock();   //释放锁
     }
}else {
    //如果不能获取锁,则直接做其他事情
}

4.2 Condition接口

虽然把锁换成了显示的锁,可是使用的等待和唤醒还是Object中的wait和notify,这就会导致锁和等待唤醒机制处于两个对象上,而我们想要的应该是那个锁下的线程等待和唤醒,也就是等待唤醒机制和锁有一定的关联关系。Condition对象的出现其实就是替代了Object中的监视器方法。

import java.util.concurrent.locks.*;
 
//描述资源。属性:商品名称和编号,  行为:对商品名称赋值,获取商品。
class Resource
{
    private String name;
    private int count = 1;
    private boolean flag = false;
    //创建锁对象
    Lock lock = new ReentrantLock();
    //获取监视器对象,其实就是让锁和监视器关联起来
    Condition con = lock.newCondition();
    //对外提供设置商品的方法
    public  void set(String name)
    {
        //获取锁
        lock.lock();
        try{
            while(flag)
            {
                try{con.await();}catch(InterruptedException e){}
            }
            //给成员变量赋值并加上编号。
            this.name = name + count;
            //编号自增。
            count++;
            //打印生产了哪个商品。
            System.out.println(Thread.currentThread().getName()+".....生产者...."+this.name);
            //将标记改为true。
            flag = true;
            //唤醒消费者。
            con.signalAll();
            }
        finally//由于锁的动作一定要释放,所以使用try-finally组合
        {
            //释放锁
            lock.unlock();
        }
    }
    public  void get()
    {
        //获取锁
        lock.lock();
        try{
            while(!flag)
            {
                try{con.await();}catch(InterruptedException e){}
            }
            System.out.println(Thread.currentThread().getName()+".....消费者...."+this.name);
            //将标记改为false。
            flag = false;
            //唤醒生产者。
            con.signalAll();
        }
        finally//由于锁的动作一定要释放,所以使用try-finally组合
        {
            //释放锁
            lock.unlock();
        }
    }
}

将所有的监视器方法替换成了Condition。功能和之前的老程序的功能一样,仅仅是用新的对象。改了写法而已。但是问题依旧;效率还是低。低效的原因是本方可能唤醒的是本方,而我们希望本方可以唤醒对方中的一个。

import java.util.concurrent.locks.*;
 
//描述资源。属性:商品名称和编号,  行为:对商品名称赋值,获取商品。
class Resource
{
    private String name;
    private int count = 1;
    private boolean flag = false;
    //定义一个锁对象。
    private final Lock lock = new ReentrantLock();
    //获取锁上的Condition对象。为了解决本方唤醒对方的问题。可以一个锁创建两个监视器对象。
    private Condition produce = lock.newCondition();//负责生产。
    private Condition consume = lock.newCondition();//负责消费。
    //对外提供设置商品的方法
    public  void set(String name)
    {
        //获取锁
        lock.lock();
        try{
            while(flag)
            {
                try{produce.await();}catch(InterruptedException e){}
            }
            //给成员变量赋值并加上编号。
            this.name = name + count;
            //编号自增。
            count++;
            //打印生产了哪个商品。
            System.out.println(Thread.currentThread().getName()+".....生产者...."+this.name);
            //将标记改为true。
            flag = true;
            //唤醒消费者。
            consume.signalAll();
            }
        finally//由于锁的动作一定要释放,所以使用try-finally组合
        {
            //释放锁
            lock.unlock();
        }
    }
    public  void get()
    {
        //获取锁
        lock.lock();
        try{
            while(!flag)
            {
                try{consume.await();}catch(InterruptedException e){}
            }
            System.out.println(Thread.currentThread().getName()+".....消费者...."+this.name);
            //将标记改为false。
            flag = false;
            //唤醒生产者。
            produce.signalAll();
        }
        finally//由于锁的动作一定要释放,所以使用try-finally组合
        {
            //释放锁
            lock.unlock();
        }
    }
}
多生产多消费 -2

前面的程序出现的问题是,多个生产者,生产同一个商品,或者多个消费者消费同一个商品,其实在生活中我们会到多个生产者同时生产同类以商品,而多个消费者消费这同一类商品。

import java.util.concurrent.locks.*;
 
class BoundedBuffer {
    final Lock lock = new ReentrantLock();//锁
    final Condition notFull  = lock.newCondition(); //生产
    final Condition notEmpty = lock.newCondition(); //消费
 
    private int number = 1;
    final Object[] items = new Object[100];//存储商品的容器。
    int putptr/*生产者使用的角标*/, takeptr/*消费者使用的角标*/, count/*计数器*/;
 
    /*生产者使用的方法,往数组中存储商品*/
    public void put(String x) throws InterruptedException {
        lock.lock();
        try {
            while (count == items.length) //判断计数器是否已到数组长度。满了。
                notFull.await();//生产就等待。
 
            x=x+number;
            items[putptr] = x; //按照角标将商品存储到数组中
            number++;
            if (++putptr == items.length) //如果存储的角标到了数组的长度,就将角标归零。
                putptr = 0;
            ++count;//计数器自增。
            System.out.println(Thread.currentThread().getName()+"生产了"+x+",当前数量:"+count);
            notEmpty.signal();//唤醒一个消费者
        } finally {
            lock.unlock();
        }
    }
 
    public void take() throws InterruptedException {
        lock.lock();
        try {
            while (count == 0) //如果计数器为0,说明没有商品,消费者等待。
                notEmpty.await();
            Object x = items[takeptr]; //从数组中通过消费者角标获取商品。
 
            if (++takeptr == items.length) //如果消费的角标等于了数组的长度,将角标归零。
                takeptr = 0;
            --count;//计数器自减。
            System.out.println(Thread.currentThread().getName()+"消费了"+x+",当前数量:"+count);
            notFull.signal();//唤醒生产者。
 
        } finally {
            lock.unlock();
        }
    }
}
 
 
//生产者
class Producer implements Runnable
{
    private BoundedBuffer r;
 
    //生产者以创建就应该明确资源
    Producer(BoundedBuffer r)
    {
        this.r = r;
    }
    //生产者生产商品的任务
    public void run()
    {
        //生产者无限制的生产
        while(true)
        {
            try{
                Thread.sleep(500);
                r.put("苹果");
            }catch(InterruptedException e){}
 
        }
    }
}
class Consumer implements Runnable
{
    private BoundedBuffer r;
    //生产者以创建就应该明确资源
    Consumer(BoundedBuffer r)
    {
        this.r = r;
    }
    //生产者生产商品的任务
    public void run()
    {
        while(true)
        {
            try{
                Thread.sleep(1000);
                r.take();
            }catch(InterruptedException e){}
        }
    }
}
public class ThreadDemo5
{
    public static void main(String[] args)
    {
        //创建资源对象
        BoundedBuffer r = new BoundedBuffer();
 
        //创建生产者对象
        Producer pro = new Producer(r);
        //创建消费者对象
        Consumer con = new Consumer(r);
 
        //创建线程对象
        Thread t1 = new Thread(pro);
        Thread t2 = new Thread(pro);
        Thread t3 = new Thread(con);
        Thread t4 = new Thread(con);
        //开启线程
        t1.start();
        t2.start();
        t3.start();
        t4.start();
 
    }
}

用集合实现多生产多消费

Resource.java

public class Resource {
    private Lock lock = new ReentrantLock();
    private Condition notEmpty = lock.newCondition();
    private Condition notFull = lock.newCondition();
    private int number = 1;
    private int count = 1;
    private String name;
    List<Object> list = new ArrayList<>();
    public void put(String nn) {
        lock.lock();
        try{
            while (count == list.size()){
                notFull.await();
            }
            name = nn + number;
            list.add(name);
            number++;
            count++;
            System.out.println(Thread.currentThread().getName() + "生产了" + name);
            notEmpty.signalAll();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
    public void get(){
        lock.lock();
        try{
            while (list.size() == 0){//!!!!这边容易忘记!!!!!!
                notEmpty.await();
            }

            Object o = list.get(0);
            System.out.println(Thread.currentThread().getName() + "消费了" + o);
            list.remove(0);
            notFull.signalAll();

        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
}

Producer.java

public class Producer implements Runnable{
    Resource r = new Resource();

    public Producer(Resource r) {
        this.r = r;
    }

    @Override
    public void run() {
       while (true){
           try {
               Thread.sleep(500);
           } catch (InterruptedException e) {
               e.printStackTrace();
           }
           r.put("柚子");
       }
    }
}

Cudtomer.java

public class Customer implements Runnable {
    Resource r = new Resource();

    public Customer(Resource r) {
        this.r = r;
    }

    @Override
    public void run() {
        while (true){
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            r.get();
        }
    }
}

main方法

public static void main(String[] args) {
        Resource r= new Resource();
        Producer producer = new Producer(r);
        Customer customer = new Customer(r);
        Thread t1 = new Thread(producer);
        Thread t2 = new Thread(producer);
        Thread t3 = new Thread(customer);
        Thread t4 = new Thread(customer);
        t1.start();
        t2.start();
        t3.start();
        t4.start();
    }