package com.huangbei.test;
import com.huangbei.util.Sleeper;
import lombok.extern.slf4j.Slf4j;
import java.util.LinkedList;
/*
生产者-消费者模型
*/
@Slf4j(topic = "c.Test-producer&consumer")
public class Test25 {
public static void main(String[] args) {
//创建一个消息队列、3个生产者、1个消费者
MessageQueue queue = new MessageQueue(2);
for (int i = 0; i < 3; i++) {
int id = i;
new Thread(() -> {
Message msg = new Message(id, "消息" + id);
queue.put(msg);
}, "生产者" + i).start();
}
new Thread(()->{
while (true){
Sleeper.sleep(1);
Message msg = queue.get();
}
},"消费者").start();
}
}
class Message {
private int id;
private Object message;
Message(int id, Object msg) {
this.id = id;
this.message = msg;
}
@Override
public String toString() {
return "Message{" +
"id=" + id +
", message=" + message +
'}';
}
}
@Slf4j(topic = "c.MessageQueue")
class MessageQueue {
//使用一个数据结构存储消息---使用队列
private LinkedList<Message> queue=new LinkedList<>();
private int maxCapacity;
MessageQueue(int capacity) {
this.maxCapacity = capacity;
}
//放消息
public void put(Message msg) {
synchronized (queue) {
//当队列超过设定的最大容量,等待消费者消费
while (queue.size() == maxCapacity) {
log.debug("缓冲区已满,等待消费...");
try {
queue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.offer(msg);
log.debug("生产了一个消息{}",msg);
//当有消费者在等待,唤醒
queue.notifyAll();
}
}
//收消息
public Message get() {
synchronized (queue) {
//当队列为空,需要等待生产者生产消息
while (queue.isEmpty()) {
log.debug("缓冲区为空,等待生产...");
try {
queue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//队列不为空,获取消息,并通知生产者继续生产
Message msg = queue.poll();
log.debug("消费了一个消息{}",msg);
queue.notifyAll();
return msg;
}
}
}