(點選上方公眾號,可快速關註)
來源:猴子007 ,
monkeysayhi.github.io/2017/10/08/Java實現生產者-消費者模型/
考查Java的併發程式設計時,手寫“生產者-消費者模型”是一個經典問題。有如下幾個考點:
-
對Java併發模型的理解
-
對Java併發程式設計介面的熟練程度
-
bug free
-
coding style
本文主要歸納了4種寫法,閱讀後,最好在白板上練習幾遍,檢查自己是否掌握。這4種寫法或者程式設計介面不同,或者併發粒度不同,但本質是相同的——都是在使用或實現BlockingQueue。
生產者-消費者模型
網上有很多生產者-消費者模型的定義和實現。本文研究最常用的有界生產者-消費者模型,簡單概括如下:
-
生產者持續生產,直到緩衝區滿,阻塞;緩衝區不滿後,繼續生產
-
消費者持續消費,直到緩衝區空,阻塞;緩衝區不空後,繼續消費
-
生產者可以有多個,消費者也可以有多個
可透過如下條件驗證模型實現的正確性:
-
同一產品的消費行為一定發生在生產行為之後
-
任意時刻,緩衝區大小不小於0,不大於限制容量
該模型的應用和變種非常多,不贅述。
幾種寫法
準備
面試時可語言說明以下準備程式碼。關鍵部分需要實現,如AbstractConsumer。
下麵會涉及多種生產者-消費者模型的實現,可以先抽象出關鍵的介面,並實現一些抽象類:
public interface Consumer {
void consume() throws InterruptedException;
}
public interface Producer {
void produce() throws InterruptedException;
}
abstract class AbstractConsumer implements Consumer, Runnable {
@Override
public void run() {
while (true) {
try {
consume();
} catch (InterruptedException e) {
e.printStackTrace();
break;
}
}
}
}
abstract class AbstractProducer implements Producer, Runnable {
@Override
public void run() {
while (true) {
try {
produce();
} catch (InterruptedException e) {
e.printStackTrace();
break;
}
}
}
}
不同的模型實現中,生產者、消費者的具體實現也不同,所以需要為模型定義抽象工廠方法:
public interface Model {
Runnable newRunnableConsumer();
Runnable newRunnableProducer();
}
我們將Task作為生產和消費的單位:
public class Task {
public int no;
public Task(int no) {
this.no = no;
}
}
如果需求還不明確(這符合大部分工程工作的實際情況),建議邊實現邊抽象,不要“面向未來程式設計”。
實現一:BlockingQueue
BlockingQueue的寫法最簡單。核心思想是,把併發和容量控制封裝在緩衝區中。而BlockingQueue的性質天生滿足這個要求。
public class BlockingQueueModel implements Model {
private final BlockingQueue
queue; private final AtomicInteger increTaskNo = new AtomicInteger(0);
public BlockingQueueModel(int cap) {
// LinkedBlockingQueue 的佇列是 lazy-init 的,但 ArrayBlockingQueue 在建立時就已經 init
this.queue = new LinkedBlockingQueue<>(cap);
}
@Override
public Runnable newRunnableConsumer() {
return new ConsumerImpl();
}
@Override
public Runnable newRunnableProducer() {
return new ProducerImpl();
}
private class ConsumerImpl extends AbstractConsumer implements Consumer, Runnable {
@Override
public void consume() throws InterruptedException {
Task task = queue.take();
// 固定時間範圍的消費,模擬相對穩定的伺服器處理過程
Thread.sleep(500 + (long) (Math.random() * 500));
System.out.println(“consume: ” + task.no);
}
}
private class ProducerImpl extends AbstractProducer implements Producer, Runnable {
@Override
public void produce() throws InterruptedException {
// 不定期生產,模擬隨機的使用者請求
Thread.sleep((long) (Math.random() * 1000));
Task task = new Task(increTaskNo.getAndIncrement());
queue.put(task);
System.out.println(“produce: ” + task.no);
}
}
public static void main(String[] args) {
Model model = new BlockingQueueModel(3);
for (int i = 0; i < 2; i++) {
new Thread(model.newRunnableConsumer()).start();
}
for (int i = 0; i < 5; i++) {
new Thread(model.newRunnableProducer()).start();
}
}
}
擷取前面的一部分輸出:
produce: 0
produce: 4
produce: 2
produce: 3
produce: 5
consume: 0
produce: 1
consume: 4
produce: 7
consume: 2
produce: 8
consume: 3
produce: 6
consume: 5
produce: 9
consume: 1
produce: 10
consume: 7
由於操作“出隊/入隊+日誌輸出”不是原子的,所以上述日誌的絕對順序與實際的出隊/入隊順序有出入,但對於同一個任務號task.no,其consume日誌一定出現在其produce日誌之後,即:同一任務的消費行為一定發生在生產行為之後。緩衝區的容量留給讀者驗證。符合兩個驗證條件。
BlockingQueue寫法的核心只有兩行程式碼,併發和容量控制都封裝在了BlockingQueue中,正確性由BlockingQueue保證。面試中首選該寫法,自然美觀簡單。
實現二:wait && notify
如果不能將併發與容量控制都封裝在緩衝區中,就只能由消費者與生產者完成。最簡單的方案是使用樸素的wait && notify機制。
public class WaitNotifyModel implements Model {
private final Object BUFFER_LOCK = new Object();
private final Queue
buffer = new LinkedList<>(); private final int cap;
private final AtomicInteger increTaskNo = new AtomicInteger(0);
public WaitNotifyModel(int cap) {
this.cap = cap;
}
@Override
public Runnable newRunnableConsumer() {
return new ConsumerImpl();
}
@Override
public Runnable newRunnableProducer() {
return new ProducerImpl();
}
private class ConsumerImpl extends AbstractConsumer implements Consumer, Runnable {
@Override
public void consume() throws InterruptedException {
synchronized (BUFFER_LOCK) {
while (buffer.size() == 0) {
BUFFER_LOCK.wait();
}
Task task = buffer.poll();
assert task != null;
// 固定時間範圍的消費,模擬相對穩定的伺服器處理過程
Thread.sleep(500 + (long) (Math.random() * 500));
System.out.println(“consume: ” + task.no);
BUFFER_LOCK.notifyAll();
}
}
}
private class ProducerImpl extends AbstractProducer implements Producer, Runnable {
@Override
public void produce() throws InterruptedException {
// 不定期生產,模擬隨機的使用者請求
Thread.sleep((long) (Math.random() * 1000));
synchronized (BUFFER_LOCK) {
while (buffer.size() == cap) {
BUFFER_LOCK.wait();
}
Task task = new Task(increTaskNo.getAndIncrement());
buffer.offer(task);
System.out.println(“produce: ” + task.no);
BUFFER_LOCK.notifyAll();
}
}
}
public static void main(String[] args) {
Model model = new WaitNotifyModel(3);
for (int i = 0; i < 2; i++) {
new Thread(model.newRunnableConsumer()).start();
}
for (int i = 0; i < 5; i++) {
new Thread(model.newRunnableProducer()).start();
}
}
}
驗證方法同上。
樸素的wait && notify機制不那麼靈活,但足夠簡單。synchronized、wait、notifyAll的用法可參考【Java併發程式設計】之十:使用wait/notify/notifyAll實現執行緒間通訊的幾點重要說明,著重理解喚醒與鎖競爭的區別。
http://blog.csdn.net/ns_code/article/details/17225469
實現三:簡單的Lock && Condition
我們要保證理解wait && notify機制。實現時可以使用Object類提供的wait()方法與notifyAll()方法,但更推薦的方式是使用java.util.concurrent包提供的Lock && Condition。
public class LockConditionModel1 implements Model {
private final Lock BUFFER_LOCK = new ReentrantLock();
private final Condition BUFFER_COND = BUFFER_LOCK.newCondition();
private final Queue
buffer = new LinkedList<>(); private final int cap;
private final AtomicInteger increTaskNo = new AtomicInteger(0);
public LockConditionModel1(int cap) {
this.cap = cap;
}
@Override
public Runnable newRunnableConsumer() {
return new ConsumerImpl();
}
@Override
public Runnable newRunnableProducer() {
return new ProducerImpl();
}
private class ConsumerImpl extends AbstractConsumer implements Consumer, Runnable {
@Override
public void consume() throws InterruptedException {
BUFFER_LOCK.lockInterruptibly();
try {
while (buffer.size() == 0) {
BUFFER_COND.await();
}
Task task = buffer.poll();
assert task != null;
// 固定時間範圍的消費,模擬相對穩定的伺服器處理過程
Thread.sleep(500 + (long) (Math.random() * 500));
System.out.println(“consume: ” + task.no);
BUFFER_COND.signalAll();
} finally {
BUFFER_LOCK.unlock();
}
}
}
private class ProducerImpl extends AbstractProducer implements Producer, Runnable {
@Override
public void produce() throws InterruptedException {
// 不定期生產,模擬隨機的使用者請求
Thread.sleep((long) (Math.random() * 1000));
BUFFER_LOCK.lockInterruptibly();
try {
while (buffer.size() == cap) {
BUFFER_COND.await();
}
Task task = new Task(increTaskNo.getAndIncrement());
buffer.offer(task);
System.out.println(“produce: ” + task.no);
BUFFER_COND.signalAll();
} finally {
BUFFER_LOCK.unlock();
}
}
}
public static void main(String[] args) {
Model model = new LockConditionModel1(3);
for (int i = 0; i < 2; i++) {
new Thread(model.newRunnableConsumer()).start();
}
for (int i = 0; i < 5; i++) {
new Thread(model.newRunnableProducer()).start();
}
}
}
該寫法的思路與實現二的思路完全相同,僅僅將鎖與條件變數換成了Lock和Condition。
實現四:更高併發效能的Lock && Condition
現在,如果做一些實驗,你會發現,實現一的併發效能高於實現二、三。暫且不關心BlockingQueue的具體實現,來分析看如何最佳化實現三(與實現二的思路相同,效能相當)的效能。
分析實現三的瓶頸
最好的查證方法是記錄方法執行時間,這樣可以直接定位到真正的瓶頸。但此問題較簡單,我們直接用“瞪眼法”分析。
實現三的併發瓶頸很明顯,因為在鎖 BUFFER_LOCK 看來,任何消費者執行緒與生產者執行緒都是一樣的。換句話說,同一時刻,最多隻允許有一個執行緒(生產者或消費者,二選一)操作緩衝區 buffer。
而實際上,如果緩衝區是一個佇列的話,“生產者將產品入隊”與“消費者將產品出隊”兩個操作之間沒有同步關係,可以在隊首出隊的同時,在隊尾入隊。理想效能可提升至實現三的兩倍。
去掉這個瓶頸
那麼思路就簡單了:需要兩個鎖 CONSUME_LOCK與PRODUCE_LOCK,CONSUME_LOCK控制消費者執行緒併發出隊,PRODUCE_LOCK控制生產者執行緒併發入隊;相應需要兩個條件變數NOT_EMPTY與NOT_FULL,NOT_EMPTY負責控制消費者執行緒的狀態(阻塞、執行),NOT_FULL負責控制生產者執行緒的狀態(阻塞、執行)。以此讓最佳化消費者與消費者(或生產者與生產者)之間是序列的;消費者與生產者之間是並行的。
public class LockConditionModel2 implements Model {
private final Lock CONSUME_LOCK = new ReentrantLock();
private final Condition NOT_EMPTY = CONSUME_LOCK.newCondition();
private final Lock PRODUCE_LOCK = new ReentrantLock();
private final Condition NOT_FULL = PRODUCE_LOCK.newCondition();
private final Buffer
buffer = new Buffer<>(); private AtomicInteger bufLen = new AtomicInteger(0);
private final int cap;
private final AtomicInteger increTaskNo = new AtomicInteger(0);
public LockConditionModel2(int cap) {
this.cap = cap;
}
@Override
public Runnable newRunnableConsumer() {
return new ConsumerImpl();
}
@Override
public Runnable newRunnableProducer() {
return new ProducerImpl();
}
private class ConsumerImpl extends AbstractConsumer implements Consumer, Runnable {
@Override
public void consume() throws InterruptedException {
int newBufSize = -1;
CONSUME_LOCK.lockInterruptibly();
try {
while (bufLen.get() == 0) {
System.out.println(“buffer is empty…”);
NOT_EMPTY.await();
}
Task task = buffer.poll();
newBufSize = bufLen.decrementAndGet();
assert task != null;
// 固定時間範圍的消費,模擬相對穩定的伺服器處理過程
Thread.sleep(500 + (long) (Math.random() * 500));
System.out.println(“consume: ” + task.no);
if (newBufSize > 0) {
NOT_EMPTY.signalAll();
}
} finally {
CONSUME_LOCK.unlock();
}
if (newBufSize < cap) {
PRODUCE_LOCK.lockInterruptibly();
try {
NOT_FULL.signalAll();
} finally {
PRODUCE_LOCK.unlock();
}
}
}
}
private class ProducerImpl extends AbstractProducer implements Producer, Runnable {
@Override
public void produce() throws InterruptedException {
// 不定期生產,模擬隨機的使用者請求
Thread.sleep((long) (Math.random() * 1000));
int newBufSize = -1;
PRODUCE_LOCK.lockInterruptibly();
try {
while (bufLen.get() == cap) {
System.out.println(“buffer is full…”);
NOT_FULL.await();
}
Task task = new Task(increTaskNo.getAndIncrement());
buffer.offer(task);
newBufSize = bufLen.incrementAndGet();
System.out.println(“produce: ” + task.no);
if (newBufSize < cap) {
NOT_FULL.signalAll();
}
} finally {
PRODUCE_LOCK.unlock();
}
if (newBufSize > 0) {
CONSUME_LOCK.lockInterruptibly();
try {
NOT_EMPTY.signalAll();
} finally {
CONSUME_LOCK.unlock();
}
}
}
}
private static class Buffer
{ private Node head;
private Node tail;
Buffer() {
// dummy node
head = tail = new Node(null);
}
public void offer(E e) {
tail.next = new Node(e);
tail = tail.next;
}
public E poll() {
head = head.next;
E e = head.item;
head.item = null;
return e;
}
private class Node {
E item;
Node next;
Node(E item) {
this.item = item;
}
}
}
public static void main(String[] args) {
Model model = new LockConditionModel2(3);
for (int i = 0; i < 2; i++) {
new Thread(model.newRunnableConsumer()).start();
}
for (int i = 0; i < 5; i++) {
new Thread(model.newRunnableProducer()).start();
}
}
需要註意的是,由於需要同時在UnThreadSafe的緩衝區 buffer 上進行消費與生產,我們不能使用實現二、三中使用的佇列了,需要自己實現一個簡單的緩衝區 Buffer。Buffer要滿足以下條件:
-
在頭部出隊,尾部入隊
-
在poll()方法中只操作head
-
在offer()方法中只操作tail
還能進一步最佳化嗎
我們已經最佳化掉了消費者與生產者之間的瓶頸,還能進一步最佳化嗎?
如果可以,必然是繼續最佳化消費者與消費者(或生產者與生產者)之間的併發效能。然而,消費者與消費者之間必須是序列的,因此,併發模型上已經沒有地方可以繼續優化了。
不過在具體的業務場景中,一般還能夠繼續最佳化。如:
-
併發規模中等,可考慮使用CAS代替重入鎖
-
模型上不能最佳化,但一個消費行為或許可以進一步拆解、最佳化,從而降低消費的延遲
-
一個佇列的併發效能達到了極限,可採用“多個佇列”(如分散式訊息佇列等)
4種實現的本質
文章開頭說:這4種寫法的本質相同——都是在使用或實現BlockingQueue。實現一直接使用BlockingQueue,實現四實現了簡單的BlockingQueue,而實現二、三則實現了退化版的BlockingQueue(效能降低一半)。
實現一使用的BlockingQueue實現類是LinkedBlockingQueue,給出其原始碼閱讀對照,寫的不難:
public class LinkedBlockingQueue
extends AbstractQueue implements BlockingQueue
, java.io.Serializable { …
/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();
/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();
/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();
/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();
…
/**
* Signals a waiting take. Called only from put/offer (which do not
* otherwise ordinarily lock takeLock.)
*/
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
/**
* Signals a waiting put. Called only from take/poll.
*/
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
notFull.signal();
} finally {
putLock.unlock();
}
}
/**
* Links node at end of queue.
*
* @param node the node
*/
private void enqueue(Node
node) { // assert putLock.isHeldByCurrentThread();
// assert last.next == null;
last = last.next = node;
}
/**
* Removes a node from head of queue.
*
* @return the node
*/
private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
// assert head.item == null;
Node
h = head; Node
first = h.next; h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}
…
/**
* Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity.
*
* @param capacity the capacity of this queue
* @throws IllegalArgumentException if {@code capacity} is not greater
* than zero
*/
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node
(null); }
…
/**
* Inserts the specified element at the tail of this queue, waiting if
* necessary for space to become available.
*
* @throws InterruptedException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
int c = -1;
Node
node = new Node (e); final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
/*
* Note that count is used in wait guard even though it is
* not protected by lock. This works because count can
* only decrease at this point (all other puts are shut
* out by lock), and we (or some other waiting put) are
* signalled if it ever changes from capacity. Similarly
* for all other uses of count in other wait guards.
*/
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}
…
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
…
}
還存在非常多的寫法,如訊號量Semaphore,也很常見(本科作業系統教材中的生產者-消費者模型就是用訊號量實現的)。不過追究過多了就好像在糾結茴香豆的寫法一樣,本文不繼續探討。
總結
實現一必須掌握,實現四至少要能清楚表述;實現二、三掌握一個即可。
看完本文有收穫?請轉發分享給更多人
關註「ImportNew」,提升Java技能