生产者-消费者模式是一个经典的多线程设计模式。
它为多线程间的协作提供了良好的解决方案。在生产者-消费模式中,通常有两类线程,即若干个生产线程
和若干个消费者线程。生产者线程负责提交用户请求,消费者线程则负责具体处理生产者提交的任务。
生产者和消费者之间则通过共享内存缓冲区进行通讯。
生产者-消费者模式的核心组件是共享内存缓冲区。
它作为生产者消费者的桥梁,避免了生产者和消费者直接通讯,
从而将生产者和消费者进行解耦。
生产者不需要知道消费者存在,消费者不需要知道生产者的存在。
同时,由于内存缓冲区的存在,
允许生产者和消费者在执行速度上存在时间差,
无论是生产者在某一局部时间内的速度高于消费者的速度。
还是消费者在局部时间内的速度高于生产者的速度。
都可用通过共享内存缓冲区得到缓解,确保系统正常运行。
public class Main{
public static void main(String[] args) {
BlockingQueue<PCData> queue = new LinkedBlockingQueue<PCData>(10);
Producer producer1 = new Producer(queue);
Producer producer2 = new Producer(queue);
Producer producer3 = new Producer(queue);
Consumer consumer1 = new Consumer(queue);
Consumer consumer2 = new Consumer(queue);
Consumer consumer3 = new Consumer(queue);
ExecutorService service = Executors.newCachedThreadPool();
service.execute(producer1);
service.execute(producer2);
service.execute(producer3);
service.execute(consumer1);
service.execute(consumer2);
service.execute(consumer3);
try {
Thread.sleep(10*1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
producer1.stop();
producer2.stop();
producer3.stop();
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
service.shutdown();
}
}
public class Consumer implements Runnable{
private BlockingQueue<PCData> queue;
private static final int SLEEPTIME = 1000;
public Consumer(BlockingQueue<PCData> queue){
this.queue = queue;
}
@Override
public void run() {
System.out.println("start Consumer id="+ Thread.currentThread().getId());
Random r = new Random();
try{
while (true){
PCData data = queue.take();
if(null != data){
int re = data.getData() * data.getData();
System.out.println(MessageFormat.format("{0}*{1}={2}",
data.getData(),data.getData(),re));
Thread.sleep(r.nextInt(SLEEPTIME));
}
}
}catch (InterruptedException e){
e.printStackTrace();
Thread.currentThread().interrupt();
}
}
}
public class PCData {
private final int intData;
public PCData(int d){
intData = d;
}
public PCData(String d){
intData = Integer.valueOf(d);
}
public int getData(){
return intData;
}
@Override
public String toString(){
return "data:"+intData;
}
}
public class Producer implements Runnable{
private volatile boolean isRunning = true;
private BlockingQueue<PCData> queue;
private static AtomicInteger count = new AtomicInteger();
private static final int SLEEPTIME = 1000;
public Producer(BlockingQueue<PCData> queue){
this.queue = queue;
}
@Override
public void run() {
PCData data = null;
Random r = new Random();
System.out.println("start producer id="+Thread.currentThread().getId());
try {
while(isRunning){
Thread.sleep(r.nextInt(SLEEPTIME));
data = new PCData(count.incrementAndGet());
System.out.println(data+" is put into queue");
if(!queue.offer(data,2, TimeUnit.SECONDS)) {
System.err.println("failed to put data: "+ data);
}
}
}catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
}
}
public void stop(){
isRunning = false;
}
}
输出平方结果。
data:46 is put into queue
data:47 is put into queue
44*44=1,936
45*45=2,025
data:48 is put into queue
data:49 is put into queue
data:50 is put into queue