上一篇已经介绍过生产者消费者模型,这次使用信号量来解决该问题。
使用信号量实现生产者消费者模型: 我们用两个信号量empty和fill来分别表示缓冲区空或者填入数据项。
我们考虑信号量的初始值,因为buffer初始化为空,empty的初始值应该为buffer的容量,表示buffer中没有数据,也是就每一项都代表空,而fill则应该为空,这与消费者必须在生产者生产数据后才能进行相符合(如果消费者先执行,会因为fill信号量而阻塞)。
因为我们的缓冲区的容量超过1,如果两个生产者(pa和pb)同时put,pa线程在tail计数器更新到1之前被中断,pb开始执行,则会将老数据覆盖,所以在每次进行生产或者消费时都需要将临界区保护起来。
使用一个二值信号量来充当互斥锁。将临界区保护起来。 注意这里有一个死锁隐患。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 void * consumer (void *arg) { for (int i = 0 ; i < 20 ; i++){ sem_wait(&mutex); sem_wait(&fill); printf ("get value is %d\n" , get()); sem_post(&empty); sem_post(&mutex); } } void * producer (void * arg) { for (int i = 0 ; i < 20 ; i++){ sem_wait(&mutex); sem_wait(&empty); put(i); printf ("put value is %d\n" , i); sem_post(&fill); sem_post(&mutex); } }
例如,如果写成这个样子,有可能变为死锁。 死锁的状态是:消费者先运行,获得锁,然后对full信号量执行sem_wait(),并阻塞让出CPU(此时还持有mutex锁),然后生产者开始运行,但现在因为消费者持有mutex锁,所以也会阻塞并导致死锁。
所以要把获取和释放互斥量的操作调整为紧挨着临界区。把empty,fill的唤醒和等待操作调整到锁外。
c版本代码: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 #include <stdio.h> #include <stdlib.h> #include <semaphore.h> #include <pthread.h> int head = 0 , tail = 0 ;const int MAX = 20 ;int buffer[20 ];int size = 0 ;sem_t empty, fill, mutex;int get () { int t = buffer[head]; head = (head + 1 ) % MAX; size--; return t; } void put (int a) { buffer[tail] = a; tail = (tail + 1 ) % MAX; size++; } void * consumer (void *arg) { for (int i = 0 ; i < 20 ; i++){ sem_wait(&fill); sem_wait(&mutex); printf ("get value is %d\n" , get()); sem_post(&mutex); sem_post(&empty); } } void * producer (void * arg) { for (int i = 0 ; i < 20 ; i++){ sem_wait(&empty); sem_wait(&mutex); put(i); printf ("put value is %d\n" , i); sem_post(&mutex); sem_post(&fill); } } int main () { sem_init(&empty, 0 , MAX); sem_init(&fill, 0 , 0 ); sem_init(&mutex, 0 , 1 ); pthread_t t1,t2; pthread_create(&t1, NULL , producer, NULL ); pthread_create(&t2, NULL , consumer, NULL ); pthread_join(t1, NULL ); pthread_join(t2, NULL ); printf ("final size is %d\n" , size); return 0 ; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 package com.yzb.thread;import java.util.concurrent.Semaphore;public class ConsumerProducerSem { private final static int CAPACITY = 20 ; private int tail = 0 ; private int head = 0 ; private int count = 0 ; private Semaphore empty = new Semaphore(10 ); private Semaphore fill = new Semaphore(0 ); private Semaphore mutex = new Semaphore(1 ); private int [] buffer = new int [CAPACITY]; private int get () { int tmp = buffer[head]; head = (head+1 ) % CAPACITY; count--; return tmp; } private void put (int data) { buffer[tail] = data; tail = (tail+1 ) % CAPACITY; count++; } private void consume () throws InterruptedException { fill.acquire(); mutex.acquire(); mutex.release(); empty.release(); } private void product (int data) throws InterruptedException { empty.acquire(); mutex.acquire(); put(data); mutex.release(); fill.release(); } public static void main (String[] args) { for (int i = 0 ; i < 1e4 ; i++) { ConsumerProducerSem cps = new ConsumerProducerSem(); LocalConsumerThread c = cps.new LocalConsumerThread(); LocalProducerThread p = cps.new LocalProducerThread(); Thread t1 = new Thread(c); Thread t2 = new Thread(p); t1.start(); t2.start(); try { t1.join(); t2.join(); } catch (InterruptedException e) { e.printStackTrace(); } assert cps.count == 0 ; } } private class LocalConsumerThread implements Runnable { @Override public void run () { for (int j = 0 ; j < ConsumerProducerSem.CAPACITY; j++){ try { ConsumerProducerSem.this .consume(); } catch (InterruptedException e) { e.printStackTrace(); } } } } private class LocalProducerThread implements Runnable { @Override public void run () { for (int j = 0 ; j < ConsumerProducerSem.CAPACITY; j++){ try { ConsumerProducerSem.this .product(j); } catch (InterruptedException e) { e.printStackTrace(); } } } } }