`

用java 实现生产者、消费者模型

    博客分类:
  • JAVA
阅读更多

public class MessageQueue {

	private List<String> queue = new ArrayList<String>();

	public synchronized String getMessage() throws InterruptedException {
		while (queue.isEmpty()) {
			wait();
		}
		String message = queue.remove(0);
		notify();
		return message;
	}

	public synchronized void putMessage(String message) throws InterruptedException {
		if (!queue.isEmpty()) {
			wait();
		}
		queue.add(message);
		notify();
	}

	public static void main(String[] args) {
		MessageQueue queue = new MessageQueue();
		new Thread(new MessageProducer("生产者1", queue)).start();
		new Thread(new MessageProducer("生产者2", queue)).start();
		new Thread(new MessageConsumer("消费者1", queue)).start();
		new Thread(new MessageConsumer("消费者2", queue)).start();
		new Thread(new MessageConsumer("消费者3", queue)).start();
		new Thread(new MessageConsumer("消费者4", queue)).start();
		new Thread(new MessageConsumer("消费者5", queue)).start();
		new Thread(new MessageConsumer("消费者6", queue)).start();
	}
}

public class MessageProducer implements Runnable {

	private MessageQueue queue;

	private String name;

	public MessageProducer(String name, MessageQueue queue) {
		super();
		this.queue = queue;
		this.name = name;
	}

	@Override
	public void run() {
		int i = 0;
		while (i < 20) {
			i++;
			try {
				try {
					Thread.sleep((long) (Math.random() * 1000));
				}
				catch (InterruptedException e) {
					e.printStackTrace();
				}
				queue.putMessage("***" + name + "*** 生产了 第 " + i + "  消息!");
			}
			catch (InterruptedException e) {
				e.printStackTrace();
			}
		}

	}

}

public class MessageConsumer implements Runnable {

	private MessageQueue queue;

	private String name;

	public MessageConsumer(String name, MessageQueue queue) {
		this.queue = queue;
		this.name = name;
	}

	@Override
	public void run() {
		try {
			while (true) {
				try {
					Thread.sleep((long) (Math.random() * 1000));
				}
				catch (InterruptedException e) {
					e.printStackTrace();
				}
				System.out.println(name + "==>读取消息:" + queue.getMessage());
			}
		}
		catch (InterruptedException e) {
			e.printStackTrace();
		}

	}

}

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics