Javaの生产者消费者问题
生产者消费者问题(多线程应用经典案例)
生产者在生产商品,而消费者在消费生产的商品。生产者把生产的商品放进容器中,而消费者从容器中取出商品进行消费。可是在整个过程中,如果容器装满了,那么生产者应该停止生产,如果容器中没有商品了,消费应该停止消费。这就是一个典型的多生产,多消费的案例。
单生产单消费
- 在学习过程中,为了代码简单明了,大家很容易看懂,就把上述的多生产和多消费进行简化,要求生产者生产一个商品,消费者消费一个商品,然后生产者继续生产,消费者进行消费,以此类推下去。
- 分析案例:
- 生产和消费同时执行,需要多线程。但是执行的任务却不相同,处理的资源确实相同的:线程间的通信。
- 思路:1、描述一下资源。2、描述生产者,具备着自己的任务。3、描述消费者,具备着自己的任务。
//描述资源。属性:商品名称和编号, 行为:对商品名称赋值,获取商品。
class Resource
{
private String name;
private int count = 1;
//对外提供设置商品的方法
public void set(String name)
{
//给成员变量赋值并加上编号。
this.name = name + count;
//编号自增。
count++;
//打印生产了哪个商品。
System.out.println(Thread.currentThread().getName()+".....生产了...."+this.name);
}
public void get()
{
System.out.println(Thread.currentThread().getName()+".....消费了...."+this.name);
}
}
//描述生产者
class Producer implements Runnable
{
private Resource r;
//生产者以创建就应该明确资源
Producer(Resource r)
{
this.r = r;
}
//生产者生产商品的任务
public void run()
{
//生产者无限制的生产
while(true)
{
r.set("面包");
}
}
}
//描述消费者
class Consumer implements Runnable
{
private Resource r;
//生产者以创建就应该明确资源
Consumer(Resource r)
{
this.r = r;
}
//生产者生产商品的任务
public void run()
{
while(true)
{
r.get();
}
}
}
class ThreadDemo5
{
public static void main(String[] args)
{
//创建资源对象
Resource r = new Resource();
//创建生产者对象
Producer pro = new Producer(r);
//创建消费者对象
Consumer con = new Consumer(r);
//创建线程对象
Thread t1 = new Thread(pro);
Thread t2 = new Thread(con);
//开启线程
t1.start();
t2.start();
}
}
上述代码进行运行时发现有严重的问题。
问题1:数据错误:已经被生产很早期的商品,才被消费到。
出现线程安全问题,加入了同步解决。使用同步函数。
class Resource
{
private String name;
private int count = 1;
//对外提供设置商品的方法
public synchronized void set(String name)
{
//给成员变量赋值并加上编号。
this.name = name + count;
//编号自增。
count++;
//打印生产了哪个商品。
System.out.println(Thread.currentThread().getName()+".....生产者...."+this.name);
}
public synchronized void get()
{
System.out.println(Thread.currentThread().getName()+".....消费者...."+this.name);
}
}
先前问题已解决:不会在消费到之前很早期的商品。
但加入同步后又有新的问题产生了。
问题2:发现了连续生产却没有消费,同时对同一个商品进行多次消费。希望的结果应该是生产一个商品,就被消费掉。生产下一个商品。搞清楚几个问题?生产者什么时候生产呢?消费者什么时候应该消费呢?
当容器中没有面包时,就生产,如果有了面包,就不要生产。
当容器中已有面包时,就消费,如果没有面包,就不要消费。
2).等待唤醒机制
生产者生产了商品后应该告诉消费者来消费。这时的生产者应该处于等待状态。消费者消费了商品后,应该告诉生产者,这时消费者处于等待状态。
等待:wait();
通知:notify();//唤醒
问题解决:实现生产一个消费一个。
//描述资源。属性:商品名称和编号, 行为:对商品名称赋值,获取商品。
class Resource
{
private String name;
private int count = 1;
private boolean flag = false;
//对外提供设置商品的方法
public synchronized void set(String name)
{
if(flag)
{
try{wait();}catch(InterruptedException e){}
}
//给成员变量赋值并加上编号。
this.name = name + count;
//编号自增。
count++;
//打印生产了哪个商品。
System.out.println(Thread.currentThread().getName()+".....生产者...."+this.name);
//将标记改为true。
flag = true;
//唤醒消费者。
this.notify();
}
public synchronized void get()
{
if(!flag)
{
try{wait();}catch(InterruptedException e){}
}
System.out.println(Thread.currentThread().getName()+".....消费者...."+this.name);
//将标记改为false。
flag = false;
//唤醒生产者。
this.notify();
}
}
等待/唤醒机制:
wait(): 会让线程处于等待状态,其实就是将线程临时存储到了线程池中。
notify():会唤醒线程池中任意一个等待的线程。
notifyAll():会唤醒线程池中所有的等待线程。
记住:这些方法必须使用在同步中,因为必须要标识wait,notify等方法所属的锁。同一个锁上的notify,只能唤醒该锁上的被wait的线程。
为什么这些方法定义在Object类中呢?
因为这些方法必须标识所属的锁,而锁可以是任意对象,任意对象可以调用的方法必然时Object类中的方法。
3).多生产多消费-1
上述程序只是一个生产和一个消费者,其实就是所谓的单生产和单消费,可是我们都知道生活中经常会有多个生产者和消费者,把代码改为多个生产者或多个消费者。
class ThreadDemo5
{
public static void main(String[] args)
{
//创建资源对象
Resource r = new Resource();
//创建生产者对象
Producer pro = new Producer(r);
//创建消费者对象
Consumer con = new Consumer(r);
//创建线程对象
Thread t1 = new Thread(pro);
Thread t2 = new Thread(pro);
Thread t3 = new Thread(con);
Thread t4 = new Thread(con);
//开启线程
t1.start();
t2.start();
t3.start();
t4.start();
}
}
把生产者和消费者改为多个时,又有新的问题发生了。
问题1:生产了商品没有被消费,同一个商品被消费多次。
问题原因:
被唤醒的线程没有判断标记,造成问题1的产生。
解决:只要让被唤醒的线程必须判断标记就可以了。将if判断标记的方式改为while判断标记。记住:多生产多消费,必须时while判断条件。
当把if改为while之后又出现问题了。
问题2:发现while判断后,死锁了。
原因:生产方唤醒了线程池中生产方的线程。本方唤醒了本方。
解决:希望本方要唤醒对方,没有对应的方法,所以只能唤醒所有。
//描述资源。属性:商品名称和编号, 行为:对商品名称赋值,获取商品。
class Resource
{
private String name;
private int count = 1;
private boolean flag = false;
//对外提供设置商品的方法
public synchronized void set(String name)
{
while(flag)
{
try{wait();}catch(InterruptedException e){}
}
//给成员变量赋值并加上编号。
this.name = name + count;
//编号自增。
count++;
//打印生产了哪个商品。
System.out.println(Thread.currentThread().getName()+".....生产者...."+this.name);
//将标记改为true。
flag = true;
//唤醒消费者。
this.notifyAll();
}
public synchronized void get()
{
while(!flag)
{
try{wait();}catch(InterruptedException e){}
}
System.out.println(Thread.currentThread().getName()+".....消费者...."+this.name);
//将标记改为false。
flag = false;
//唤醒生产者。
this.notifyAll();
}
}
Lock接口和Condition接口
4.1 Lock接口
4.1.1 synchronized的缺陷
synchronized是java中的一个关键字,也就是说是Java语言内置的特性。
如果一个代码块被synchronized修饰了,当一个线程获取了对应的锁,并执行该代码块时,其他线程便只能一直等待,等待获取锁的线程释放锁,而这里获取锁的线程释放锁只会有两种情况:
1)获取锁的线程执行完了该代码块,然后线程释放对锁的占有;
2)线程执行发生异常,此时JVM会让线程自动释放锁。
那么如果这个获取锁的线程由于要等待IO或者其他原因(比如调用sleep方法)被阻塞了,但是又没有释放锁,其他线程便只能干巴巴地等待,试想一下,这多么影响程序执行效率。因此就需要有一种机制可以不让等待的线程一直无期限地等待下去(比如只等待一定的时间或者能够响应中断),通过Lock就可以办到。
再举个例子:
当有多个线程读写文件时,读操作和写操作会发生冲突现象,写操作和写操作会发生冲突现象,但是读操作和读操作不会发生冲突现象。
但是采用synchronized关键字来实现同步的话,就会导致一个问题:
如果多个线程都只是进行读操作,所以当一个线程在进行读操作时,其他线程只能等待无法进行读操作。因此就需要一种机制来使得多个线程都只是进行读操作时,线程之间不会发生冲突,通过Lock就可以办到。另外,通过Lock可以知道线程有没有成功获取到锁。这个是 synchronized无法办到的。
4.1.2 Lock接口
public interface Lock {
void lock();
void lockInterruptibly() throws InterruptedException;
boolean tryLock();
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
void unlock();
Condition newCondition();
}
首先lock()方法是平常使用得最多的一个方法,就是用来获取锁。如果锁已被其他线程获取,则进行等待。
由于在前面讲到如果采用Lock,必须主动去释放锁,并且在发生异常时,不会自动释放锁。因此一般来说,使用Lock必须在try{}catch{}块中进行,并且将释放锁的操作放在finally块中进行,以保证锁一定被被释放,防止死锁的发生。通常使用Lock来进行同步的话,是以下面这种形式去使用的:
Lock lock = new ReentrantLock();
lock.lock();
try{
//处理任务
}catch(Exception ex){
}finally{
lock.unlock(); //释放锁
}
tryLock()方法是有返回值的,它表示用来尝试获取锁,如果获取成功,则返回true,如果获取失败(即锁已被其他线程获取),则返回false,也就说这个方法无论如何都会立即返回。在拿不到锁时不会一直在那等待。
tryLock(long time, TimeUnit unit)方法和tryLock()方法是类似的,只不过区别在于这个方法在拿不到锁时会等待一定的时间,在时间期限之内如果还拿不到锁,就返回false。如果如果一开始拿到锁或者在等待期间内拿到了锁,则返回true。
所以,一般情况下通过tryLock来获取锁时是这样使用的:
Lock lock = new ReentrantLock();
if(lock.tryLock()) {
try{
//处理任务
}catch(Exception ex){
}finally{
lock.unlock(); //释放锁
}
}else {
//如果不能获取锁,则直接做其他事情
}
4.2 Condition接口
虽然把锁换成了显示的锁,可是使用的等待和唤醒还是Object中的wait和notify,这就会导致锁和等待唤醒机制处于两个对象上,而我们想要的应该是那个锁下的线程等待和唤醒,也就是等待唤醒机制和锁有一定的关联关系。Condition对象的出现其实就是替代了Object中的监视器方法。
import java.util.concurrent.locks.*;
//描述资源。属性:商品名称和编号, 行为:对商品名称赋值,获取商品。
class Resource
{
private String name;
private int count = 1;
private boolean flag = false;
//创建锁对象
Lock lock = new ReentrantLock();
//获取监视器对象,其实就是让锁和监视器关联起来
Condition con = lock.newCondition();
//对外提供设置商品的方法
public void set(String name)
{
//获取锁
lock.lock();
try{
while(flag)
{
try{con.await();}catch(InterruptedException e){}
}
//给成员变量赋值并加上编号。
this.name = name + count;
//编号自增。
count++;
//打印生产了哪个商品。
System.out.println(Thread.currentThread().getName()+".....生产者...."+this.name);
//将标记改为true。
flag = true;
//唤醒消费者。
con.signalAll();
}
finally//由于锁的动作一定要释放,所以使用try-finally组合
{
//释放锁
lock.unlock();
}
}
public void get()
{
//获取锁
lock.lock();
try{
while(!flag)
{
try{con.await();}catch(InterruptedException e){}
}
System.out.println(Thread.currentThread().getName()+".....消费者...."+this.name);
//将标记改为false。
flag = false;
//唤醒生产者。
con.signalAll();
}
finally//由于锁的动作一定要释放,所以使用try-finally组合
{
//释放锁
lock.unlock();
}
}
}
将所有的监视器方法替换成了Condition。功能和之前的老程序的功能一样,仅仅是用新的对象。改了写法而已。但是问题依旧;效率还是低。低效的原因是本方可能唤醒的是本方,而我们希望本方可以唤醒对方中的一个。
import java.util.concurrent.locks.*;
//描述资源。属性:商品名称和编号, 行为:对商品名称赋值,获取商品。
class Resource
{
private String name;
private int count = 1;
private boolean flag = false;
//定义一个锁对象。
private final Lock lock = new ReentrantLock();
//获取锁上的Condition对象。为了解决本方唤醒对方的问题。可以一个锁创建两个监视器对象。
private Condition produce = lock.newCondition();//负责生产。
private Condition consume = lock.newCondition();//负责消费。
//对外提供设置商品的方法
public void set(String name)
{
//获取锁
lock.lock();
try{
while(flag)
{
try{produce.await();}catch(InterruptedException e){}
}
//给成员变量赋值并加上编号。
this.name = name + count;
//编号自增。
count++;
//打印生产了哪个商品。
System.out.println(Thread.currentThread().getName()+".....生产者...."+this.name);
//将标记改为true。
flag = true;
//唤醒消费者。
consume.signalAll();
}
finally//由于锁的动作一定要释放,所以使用try-finally组合
{
//释放锁
lock.unlock();
}
}
public void get()
{
//获取锁
lock.lock();
try{
while(!flag)
{
try{consume.await();}catch(InterruptedException e){}
}
System.out.println(Thread.currentThread().getName()+".....消费者...."+this.name);
//将标记改为false。
flag = false;
//唤醒生产者。
produce.signalAll();
}
finally//由于锁的动作一定要释放,所以使用try-finally组合
{
//释放锁
lock.unlock();
}
}
}
多生产多消费 -2
前面的程序出现的问题是,多个生产者,生产同一个商品,或者多个消费者消费同一个商品,其实在生活中我们会到多个生产者同时生产同类以商品,而多个消费者消费这同一类商品。
import java.util.concurrent.locks.*;
class BoundedBuffer {
final Lock lock = new ReentrantLock();//锁
final Condition notFull = lock.newCondition(); //生产
final Condition notEmpty = lock.newCondition(); //消费
private int number = 1;
final Object[] items = new Object[100];//存储商品的容器。
int putptr/*生产者使用的角标*/, takeptr/*消费者使用的角标*/, count/*计数器*/;
/*生产者使用的方法,往数组中存储商品*/
public void put(String x) throws InterruptedException {
lock.lock();
try {
while (count == items.length) //判断计数器是否已到数组长度。满了。
notFull.await();//生产就等待。
x=x+number;
items[putptr] = x; //按照角标将商品存储到数组中
number++;
if (++putptr == items.length) //如果存储的角标到了数组的长度,就将角标归零。
putptr = 0;
++count;//计数器自增。
System.out.println(Thread.currentThread().getName()+"生产了"+x+",当前数量:"+count);
notEmpty.signal();//唤醒一个消费者
} finally {
lock.unlock();
}
}
public void take() throws InterruptedException {
lock.lock();
try {
while (count == 0) //如果计数器为0,说明没有商品,消费者等待。
notEmpty.await();
Object x = items[takeptr]; //从数组中通过消费者角标获取商品。
if (++takeptr == items.length) //如果消费的角标等于了数组的长度,将角标归零。
takeptr = 0;
--count;//计数器自减。
System.out.println(Thread.currentThread().getName()+"消费了"+x+",当前数量:"+count);
notFull.signal();//唤醒生产者。
} finally {
lock.unlock();
}
}
}
//生产者
class Producer implements Runnable
{
private BoundedBuffer r;
//生产者以创建就应该明确资源
Producer(BoundedBuffer r)
{
this.r = r;
}
//生产者生产商品的任务
public void run()
{
//生产者无限制的生产
while(true)
{
try{
Thread.sleep(500);
r.put("苹果");
}catch(InterruptedException e){}
}
}
}
class Consumer implements Runnable
{
private BoundedBuffer r;
//生产者以创建就应该明确资源
Consumer(BoundedBuffer r)
{
this.r = r;
}
//生产者生产商品的任务
public void run()
{
while(true)
{
try{
Thread.sleep(1000);
r.take();
}catch(InterruptedException e){}
}
}
}
public class ThreadDemo5
{
public static void main(String[] args)
{
//创建资源对象
BoundedBuffer r = new BoundedBuffer();
//创建生产者对象
Producer pro = new Producer(r);
//创建消费者对象
Consumer con = new Consumer(r);
//创建线程对象
Thread t1 = new Thread(pro);
Thread t2 = new Thread(pro);
Thread t3 = new Thread(con);
Thread t4 = new Thread(con);
//开启线程
t1.start();
t2.start();
t3.start();
t4.start();
}
}
用集合实现多生产多消费
Resource.java
public class Resource {
private Lock lock = new ReentrantLock();
private Condition notEmpty = lock.newCondition();
private Condition notFull = lock.newCondition();
private int number = 1;
private int count = 1;
private String name;
List<Object> list = new ArrayList<>();
public void put(String nn) {
lock.lock();
try{
while (count == list.size()){
notFull.await();
}
name = nn + number;
list.add(name);
number++;
count++;
System.out.println(Thread.currentThread().getName() + "生产了" + name);
notEmpty.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void get(){
lock.lock();
try{
while (list.size() == 0){//!!!!这边容易忘记!!!!!!
notEmpty.await();
}
Object o = list.get(0);
System.out.println(Thread.currentThread().getName() + "消费了" + o);
list.remove(0);
notFull.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
Producer.java
public class Producer implements Runnable{
Resource r = new Resource();
public Producer(Resource r) {
this.r = r;
}
@Override
public void run() {
while (true){
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
r.put("柚子");
}
}
}
Cudtomer.java
public class Customer implements Runnable {
Resource r = new Resource();
public Customer(Resource r) {
this.r = r;
}
@Override
public void run() {
while (true){
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
r.get();
}
}
}
main方法
public static void main(String[] args) {
Resource r= new Resource();
Producer producer = new Producer(r);
Customer customer = new Customer(r);
Thread t1 = new Thread(producer);
Thread t2 = new Thread(producer);
Thread t3 = new Thread(customer);
Thread t4 = new Thread(customer);
t1.start();
t2.start();
t3.start();
t4.start();
}