生产者消费者

假设有一个或多个生产者线程和一个或多个消费者线程。生产者把生成的数据项放入缓冲区,消费者从缓冲区取走数据项,以某种方式消费。

使用锁+条件变量实现生产者消费者模型:

一把锁控制整个模型的临界区,并使用两个条件变量分别控制生产和消费动作。
当消费时,如果已无数据项可供消费,就需要等待;而当生产时,如果缓冲区已满,则需要等待。
两个条件变量表明等待的时机:

  • 消费者在无数据项时无法继续消费,full条件需要等待,必须让出当前锁;消费完毕后说明有空间容纳新的数据项,empty去唤醒生产线程抢占锁。
  • 生产者在缓冲区满时无法生产,empty条件等待,让出当前锁,而生产完毕一个数据项后,便可供消费,让full条件去通知消费线程进行消费。

C语言版

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
#include<stdio.h>
#include<stdlib.h>
#include<pthread.h>

#define CAP 1000

int count = 0;

pthread_cond_t fill, empty;
pthread_mutex_t lock;

void consumer() {
pthread_mutex_lock(&lock);
while(count == 0) {
pthread_cond_wait(&fill,&lock);
}
count--;
pthread_cond_signal(&empty);
pthread_mutex_unlock(&lock);
}

void producer() {
pthread_mutex_lock(&lock);
while(count == CAP) {
pthread_cond_wait(&empty,&lock);
}
count++;
pthread_cond_signal(&fill);
pthread_mutex_unlock(&lock);
}

void* product(void* arg) {
printf("%s start\n", (char*)arg);
for(int i = 0; i < CAP; i++) {
producer();
}
printf("%s end\n", (char*)arg);
}
void* consume(void* arg) {
printf("%s start\n", (char*)arg);
for(int i = 0; i < CAP; i++) {
consumer();
}
printf("%s end\n", (char*)arg);
}

int main() {
pthread_mutex_init(&lock, NULL);
pthread_cond_init(&fill, NULL);
pthread_cond_init(&empty, NULL);
pthread_t t1,t2,t3;
pthread_create(&t1, NULL, product, "A");
pthread_create(&t2, NULL, product, "B");
pthread_create(&t3, NULL, consume, "C");
pthread_join(t1, NULL);
pthread_join(t2, NULL);
pthread_join(t3, NULL);
printf("end count is %d\n", count);
return 0;
}

java版

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
package com.yzb.thread;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
* @author yzb
*/
public class Consumer_producter {

private final static int CAPACITY = 20;
private int fill = 0;
private int use = 0;
private int count = 0;
private Lock lock = new ReentrantLock();
private Condition full = lock.newCondition();
private Condition empty = lock.newCondition();

private int[] buffer = new int[CAPACITY];

private int get(){
int tmp = buffer[use];
use = (use+1) % CAPACITY;
count--;
return tmp;
}

private void put(int data){
buffer[fill] = data;
fill = (fill+1) % CAPACITY;
count++;
}

private void consume(){
lock.lock();
try {
while (count == 0) {
full.await();
}
int c = get();
// System.out.println("消费了" + c);
empty.signal();
} catch (InterruptedException e){
e.printStackTrace();
} finally {
lock.unlock();
}
}

private void producer(int data){
lock.lock();
try {
while (count == CAPACITY) {
empty.await();
}
put(data);
// System.out.println("生产了" + data);
full.signal();
} catch (InterruptedException e){
e.printStackTrace();
} finally {
lock.unlock();
}
}

public static void main(String[] args) {
for(int i = 0; i < 1e3; i++){
Consumer_producter consumerProducter = new Consumer_producter();
Consumer_producter.LocalConsumerThread c1 = consumerProducter.new LocalConsumerThread();
Consumer_producter.LocalProducerThread c2 = consumerProducter.new LocalProducerThread();
Consumer_producter.LocalProducerThread p = consumerProducter.new LocalProducerThread();
Thread t1 = new Thread(c1);
Thread t2 = new Thread(c2);
Thread t3 = new Thread(p);
t1.start();
t2.start();
t3.start();
try {

t1.join();
t2.join();
t3.join();
} catch (InterruptedException e){
e.printStackTrace();
}

assert(consumerProducter.count == 10);
consumerProducter = null;
}
}

private class LocalConsumerThread implements Runnable {
@Override
public void run() {
for(int j = 0; j < 10; j++){
Consumer_producter.this.consume();
}
}
}

private class LocalProducerThread implements Runnable {
@Override
public void run() {
for(int j = 0; j < 10; j++){
Consumer_producter.this.producer(j);
}
}
}
}