信号量用于不同进程之间的同步,有POSIX信号量和Sysetm V信号量。
二值信号量是值为0或1的信号量。
POSIX信号量分为有名信号量和无名信号量。
有名信号量使用POSIX ipc名字标识,无名信号量把信号量使用共享内存实现。
信号量的三种操作:
创建:初始化信号量初值。
P:信号量值减1,减到0后阻塞等待。
V:如果信号量的阻塞队列有进程等待,就唤醒进程,否则信号量值加1。
互斥锁和条件变量区别:
(1)互斥锁必须由给它上锁的线程解锁,信号量可以在不同线程之间等待和释放。
(2)互斥锁只有两种状态(类似二值信号量)
POSIX信号量相关函数调用
POSIX信号量在linux中最小值为0。当信号量值为0时,进程会被添加到等待队列中。
生产者消费者问题:
(1)一个生产者和一个消费者存取循环缓冲区.
定义一个shared结构体,里面有一个缓冲区,和3个POSIX信号量。
struct { /* data shared by producer and consumer */int buff[NBUFF];sem_t *mutex, *nempty, *nstored;
} shared;
mutex是二值信号量,用于互斥访问缓冲区。
nempty表示初始资源的个数,nstored表示初始存取的个数。
定义两个线程,一个是生产者,一个是消费者,初始化信号量(这里使用的是有名信号量,根据一个路径字符串初始化信号量的值。也可以使用sem_init和sem_destory创建基于内存的信号量),mutex初始化为1,nempty初始化为NBUFF,nstored为0。然后创建两个线程,最后使用pthread_join等待线程结束,然后释放信号量。
void *produce(void *), *consume(void *);
int main(int argc, char **argv)
{pthread_t tid_produce, tid_consume;shared.mutex = Sem_open(SEM_MUTEX, O_CREAT | O_EXCL,FILE_MODE, 1);shared.nempty = Sem_open(SEM_NEMPTY, O_CREAT | O_EXCL,FILE_MODE, NBUFF);shared.nstored = Sem_open(SEM_NSTORED, O_CREAT | O_EXCL,FILE_MODE, 0);Pthread_create(&tid_produce, NULL, produce, NULL);Pthread_create(&tid_consume, NULL, consume, NULL);Pthread_join(tid_produce, NULL);Pthread_join(tid_consume, NULL);Sem_unlink(SEM_MUTEX);Sem_unlink(SEM_NEMPTY);Sem_unlink(SEM_NSTORED);exit(0);
}
生产者代码:
void *produce(void *arg)
{int i;for (i = 0; i < nitems; i++) {Sem_wait(shared.nempty); /* wait for at least 1 empty slot */Sem_wait(shared.mutex);shared.buff[i % NBUFF] = i; /* store i into circular buffer */Sem_post(shared.mutex);Sem_post(shared.nstored); /* 1 more stored item */}return(NULL);
}
(nitems不一定比NBUFF小,但是使用循环缓冲区,就不会溢出)
消费者代码:
void *consume(void *arg){int i;for (i = 0; i < nitems; i++) {Sem_wait(shared.nstored); /* wait for at least 1 stored item */Sem_wait(shared.mutex);if (shared.buff[i % NBUFF] != i)printf("buff[%d] = %d\n", i, shared.buff[i % NBUFF]);Sem_post(shared.mutex);Sem_post(shared.nempty); /* 1 more empty slot */}
}
上面代码只有一个消费者和一个生产者,并且消费者的下标通过信号量机制不会超过生产者的下标。当消费者的下标没有数据时,就会在信号量上等待。
(2)多个生产者,一个消费者存取一个缓冲区
int nitems, nproducers; /* read-only by producer and consumer */
struct { /* data shared by producers and consumer */int buff[NBUFF];int nput;int nputval;sem_t mutex, nempty, nstored; /* semaphores, not pointers */
} shared;
nitems是要存取的个数,buff表示缓冲区,nput 和 nputval 表示已经存放的个数和下一个存放的值,三个信号量作用和(1)相同。
void *produce(void *), *consume(void *);
int main(int argc, char **argv)
{int i, count[MAXNTHREADS];pthread_t tid_produce[MAXNTHREADS], tid_consume;Sem_init(&shared.mutex, 0, 1);Sem_init(&shared.nempty, 0, NBUFF);Sem_init(&shared.nstored, 0, 0);for (i = 0; i < nproducers; i++) {count[i] = 0;Pthread_create(&tid_produce[i], NULL, produce, &count[i]);}Pthread_create(&tid_consume, NULL, consume, NULL);for (i = 0; i < nproducers; i++) {Pthread_join(tid_produce[i], NULL);printf("count[%d] = %d\n", i, count[i]); }Pthread_join(tid_consume, NULL);Sem_destroy(&shared.mutex);Sem_destroy(&shared.nempty);Sem_destroy(&shared.nstored);exit(0);
}
初始化多个生产者线程和一个消费者线程,然后初始化3个信号量(这里用到的是基于内存的信号量),同(1)。然后创建多个生产者,和一个消费者,执行join等线程结束时,最后销毁信号量。
void *
produce(void *arg)
{for ( ; ; ) {Sem_wait(&shared.nempty); /* wait for at least 1 empty slot */Sem_wait(&shared.mutex);if (shared.nput >= nitems) {Sem_post(&shared.nempty);Sem_post(&shared.mutex);return(NULL); /* all done */}shared.buff[shared.nput % NBUFF] = shared.nputval;shared.nput++; shared.nputval++;Sem_post(&shared.mutex);Sem_post(&shared.nstored); /* 1 more stored item */*((int *) arg) += 1;}
}
生产者线程:资源信号量减1,然后互斥锁上锁,如果已经完成生产,就要释放已经获取的信号量;否则生产一个数据到缓冲区,然后更新nput,释放信号量。
有多个生产者,但是因为有mutex二值信号量上锁,所以不会出现多个生产者同时访问nput和nval的情况。
void *
consume(void *arg)
{int i;for (i = 0; i < nitems; i++) {Sem_wait(&shared.nstored); /* wait for at least 1 stored item */Sem_wait(&shared.mutex);if (shared.buff[i % NBUFF] != i)printf("error: buff[%d] = %d\n", i, shared.buff[i % NBUFF]);Sem_post(&shared.mutex);Sem_post(&shared.nempty); /* 1 more empty slot */}return(NULL);
}
消费者和(1)相同。
(3)多个消费者和多个生产者存取一个缓冲区
在(2)的基础上加上nget和ngetval
struct { /* data shared by producers and consumers */int buff[NBUFF];int nput; /* item number: 0, 1, 2, ... */int nputval; /* value to store in buff[] */int nget; /* item number: 0, 1, 2, ... */int ngetval; /* value fetched from buff[] */sem_t mutex, nempty, nstored; /* semaphores, not pointers */
} shared;
void *produce(void *), *consume(void *);
int main(int argc, char **argv)
{int i, prodcount[MAXNTHREADS], conscount[MAXNTHREADS];pthread_t tid_produce[MAXNTHREADS], tid_consume[MAXNTHREADS];Sem_init(&shared.mutex, 0, 1);Sem_init(&shared.nempty, 0, NBUFF);Sem_init(&shared.nstored, 0, 0);for (i = 0; i < nproducers; i++) {prodcount[i] = 0;Pthread_create(&tid_produce[i], NULL, produce, &prodcount[i]);}for (i = 0; i < nconsumers; i++) {conscount[i] = 0;Pthread_create(&tid_consume[i], NULL, consume, &conscount[i]);}for (i = 0; i < nproducers; i++) {Pthread_join(tid_produce[i], NULL);printf("producer count[%d] = %d\n", i, prodcount[i]); }for (i = 0; i < nconsumers; i++) {Pthread_join(tid_consume[i], NULL);printf("consumer count[%d] = %d\n", i, conscount[i]); }Sem_destroy(&shared.mutex);Sem_destroy(&shared.nempty);Sem_destroy(&shared.nstored);exit(0);
}
创建过程和(3)基本相同。