欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 科技 > IT业 > 进程通信(6):POSIX信号量

进程通信(6):POSIX信号量

2024/10/24 2:03:05 来源:https://blog.csdn.net/m0_52043808/article/details/140613201  浏览:    关键词:进程通信(6):POSIX信号量

信号量用于不同进程之间的同步,有POSIX信号量和Sysetm V信号量。

二值信号量是值为0或1的信号量。

POSIX信号量分为有名信号量和无名信号量。

有名信号量使用POSIX ipc名字标识,无名信号量把信号量使用共享内存实现。

信号量的三种操作:

创建:初始化信号量初值。

P:信号量值减1,减到0后阻塞等待。

V:如果信号量的阻塞队列有进程等待,就唤醒进程,否则信号量值加1。

互斥锁和条件变量区别:

(1)互斥锁必须由给它上锁的线程解锁,信号量可以在不同线程之间等待和释放。

(2)互斥锁只有两种状态(类似二值信号量)

POSIX信号量相关函数调用

POSIX信号量在linux中最小值为0。当信号量值为0时,进程会被添加到等待队列中。

生产者消费者问题:

(1)一个生产者和一个消费者存取循环缓冲区.

定义一个shared结构体,里面有一个缓冲区,和3个POSIX信号量。

struct {	/* data shared by producer and consumer */int	buff[NBUFF];sem_t	*mutex, *nempty, *nstored;
} shared;

mutex是二值信号量,用于互斥访问缓冲区。

nempty表示初始资源的个数,nstored表示初始存取的个数。

定义两个线程,一个是生产者,一个是消费者,初始化信号量(这里使用的是有名信号量,根据一个路径字符串初始化信号量的值。也可以使用sem_init和sem_destory创建基于内存的信号量),mutex初始化为1,nempty初始化为NBUFF,nstored为0。然后创建两个线程,最后使用pthread_join等待线程结束,然后释放信号量。

void	*produce(void *), *consume(void *);
int main(int argc, char **argv)
{pthread_t tid_produce, tid_consume;shared.mutex = Sem_open(SEM_MUTEX, O_CREAT | O_EXCL,FILE_MODE, 1);shared.nempty = Sem_open(SEM_NEMPTY, O_CREAT | O_EXCL,FILE_MODE, NBUFF);shared.nstored = Sem_open(SEM_NSTORED, O_CREAT | O_EXCL,FILE_MODE, 0);Pthread_create(&tid_produce, NULL, produce, NULL);Pthread_create(&tid_consume, NULL, consume, NULL);Pthread_join(tid_produce, NULL);Pthread_join(tid_consume, NULL);Sem_unlink(SEM_MUTEX);Sem_unlink(SEM_NEMPTY);Sem_unlink(SEM_NSTORED);exit(0);
}

生产者代码:

void *produce(void *arg)
{int	i;for (i = 0; i < nitems; i++) {Sem_wait(shared.nempty);	/* wait for at least 1 empty slot */Sem_wait(shared.mutex);shared.buff[i % NBUFF] = i;	/* store i into circular buffer */Sem_post(shared.mutex);Sem_post(shared.nstored);	/* 1 more stored item */}return(NULL);
}

(nitems不一定比NBUFF小,但是使用循环缓冲区,就不会溢出) 

消费者代码:

void *consume(void *arg){int	i;for (i = 0; i < nitems; i++) {Sem_wait(shared.nstored);		/* wait for at least 1 stored item */Sem_wait(shared.mutex);if (shared.buff[i % NBUFF] != i)printf("buff[%d] = %d\n", i, shared.buff[i % NBUFF]);Sem_post(shared.mutex);Sem_post(shared.nempty);		/* 1 more empty slot */}
}

上面代码只有一个消费者和一个生产者,并且消费者的下标通过信号量机制不会超过生产者的下标。当消费者的下标没有数据时,就会在信号量上等待。

(2)多个生产者,一个消费者存取一个缓冲区

int		nitems, nproducers;		/* read-only by producer and consumer */
struct {	/* data shared by producers and consumer */int	buff[NBUFF];int	nput;int	nputval;sem_t	mutex, nempty, nstored;		/* semaphores, not pointers */
} shared;

nitems是要存取的个数,buff表示缓冲区,nput  和 nputval 表示已经存放的个数和下一个存放的值,三个信号量作用和(1)相同。

void	*produce(void *), *consume(void *);
int main(int argc, char **argv)
{int		i, count[MAXNTHREADS];pthread_t	tid_produce[MAXNTHREADS], tid_consume;Sem_init(&shared.mutex, 0, 1);Sem_init(&shared.nempty, 0, NBUFF);Sem_init(&shared.nstored, 0, 0);for (i = 0; i < nproducers; i++) {count[i] = 0;Pthread_create(&tid_produce[i], NULL, produce, &count[i]);}Pthread_create(&tid_consume, NULL, consume, NULL);for (i = 0; i < nproducers; i++) {Pthread_join(tid_produce[i], NULL);printf("count[%d] = %d\n", i, count[i]);	}Pthread_join(tid_consume, NULL);Sem_destroy(&shared.mutex);Sem_destroy(&shared.nempty);Sem_destroy(&shared.nstored);exit(0);
}

初始化多个生产者线程和一个消费者线程,然后初始化3个信号量(这里用到的是基于内存的信号量),同(1)。然后创建多个生产者,和一个消费者,执行join等线程结束时,最后销毁信号量。

void *
produce(void *arg)
{for ( ; ; ) {Sem_wait(&shared.nempty);	/* wait for at least 1 empty slot */Sem_wait(&shared.mutex);if (shared.nput >= nitems) {Sem_post(&shared.nempty);Sem_post(&shared.mutex);return(NULL);			/* all done */}shared.buff[shared.nput % NBUFF] = shared.nputval;shared.nput++; shared.nputval++;Sem_post(&shared.mutex);Sem_post(&shared.nstored);	/* 1 more stored item */*((int *) arg) += 1;}
}

生产者线程:资源信号量减1,然后互斥锁上锁,如果已经完成生产,就要释放已经获取的信号量;否则生产一个数据到缓冲区,然后更新nput,释放信号量。

有多个生产者,但是因为有mutex二值信号量上锁,所以不会出现多个生产者同时访问nput和nval的情况。

void *
consume(void *arg)
{int		i;for (i = 0; i < nitems; i++) {Sem_wait(&shared.nstored);		/* wait for at least 1 stored item */Sem_wait(&shared.mutex);if (shared.buff[i % NBUFF] != i)printf("error: buff[%d] = %d\n", i, shared.buff[i % NBUFF]);Sem_post(&shared.mutex);Sem_post(&shared.nempty);		/* 1 more empty slot */}return(NULL);
}

消费者和(1)相同。

(3)多个消费者和多个生产者存取一个缓冲区

在(2)的基础上加上nget和ngetval

struct {	/* data shared by producers and consumers */int	buff[NBUFF];int	nput;			/* item number: 0, 1, 2, ... */int	nputval;		/* value to store in buff[] */int	nget;			/* item number: 0, 1, 2, ... */int	ngetval;		/* value fetched from buff[] */sem_t	mutex, nempty, nstored;		/* semaphores, not pointers */
} shared;

void	*produce(void *), *consume(void *);
int main(int argc, char **argv)
{int		i, prodcount[MAXNTHREADS], conscount[MAXNTHREADS];pthread_t	tid_produce[MAXNTHREADS], tid_consume[MAXNTHREADS];Sem_init(&shared.mutex, 0, 1);Sem_init(&shared.nempty, 0, NBUFF);Sem_init(&shared.nstored, 0, 0);for (i = 0; i < nproducers; i++) {prodcount[i] = 0;Pthread_create(&tid_produce[i], NULL, produce, &prodcount[i]);}for (i = 0; i < nconsumers; i++) {conscount[i] = 0;Pthread_create(&tid_consume[i], NULL, consume, &conscount[i]);}for (i = 0; i < nproducers; i++) {Pthread_join(tid_produce[i], NULL);printf("producer count[%d] = %d\n", i, prodcount[i]);	}for (i = 0; i < nconsumers; i++) {Pthread_join(tid_consume[i], NULL);printf("consumer count[%d] = %d\n", i, conscount[i]);	}Sem_destroy(&shared.mutex);Sem_destroy(&shared.nempty);Sem_destroy(&shared.nstored);exit(0);
}

创建过程和(3)基本相同。

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com