我们已经实现过锁+条件变量的PC模型,
但是BlockingQueue并不能进行生产与消费的并发,原因在于我们使用的是STL提供的队列,进行了一个适配,底层的实现可能会修改到成员变量造成未知的错误。
而这次我们选择使用环形队列(底层vector),可以实现生产与消费的并发。
POSIX比System V版本的信号量更加简洁明了。
首先他们都是在头文件下的
我们来简单的看一下接口:
关于信号量的一些详细概念可以看一次进程间通信。
信号量初始化:
第一个参数是你创建的sem对象地址,第二个参数由于我们是进程间的线程通信,设置为0即可,第三个是你信号量“计数器”个数。
关于第二个参数更详细的可以直接看参考文档。
信号量销毁:
传入你的信号量对象地址即可。
P操作:
V操作:
环形队列在数据结构中有一个很讲究的点,那就是如何判断空与满。
当添加完一圈元素后,头与尾又指向了同一个位置
因此我们的解决方案通常是增加一个空节点进行判断或者增加一个计数。
但是利用信号量 + 环形队列实现PC即可解决这个判断问题,因为信号量本身就是一个计数器。
那我们该如何利用环形队列实现PC?
我们采用先单消费单生产进行,这样只需考虑生产者与消费者的关系。
由于我们底层是使用vector模拟,因此在进行放数据或者拿数据时需要每次都进行%=
一个N,防止超出vector范围。
RingQueue.hpp
#include #include #include #include #include const int defaultnum = 5; template class RingQueue { private: void P(sem_t &sem) { sem_wait(&sem); } void V(sem_t &sem) { sem_post(&sem); } public: RingQueue(int cap = defaultnum) : _cap(cap), _p_index(0), _c_index(0), _ringqueue(cap) { sem_init(&_sem_data, 0, _cap); sem_init(&_sem_space, 0, _cap); } ~RingQueue() { sem_destroy(&_sem_data); sem_destroy(&_sem_space); } void Push(const T &in) { P(_sem_space); _ringqueue[_p_index++] = in; _p_index %= _cap; V(_sem_data); } void Pop(T* out) { P(_sem_data); *out = _ringqueue[_c_index++];; _p_index %= _cap; V(_sem_space); } private: std::vector _ringqueue; int _cap; int _p_index; int _c_index; sem_t _sem_data; sem_t _sem_space; };
Main.cc
#include "RingQueue.hpp" #include void *Consumer(void *args) { while (true) { sleep(1); RingQueue *rq = static_cast *>(args); // 接收数据 int data = 0; rq->Pop(&data); // 处理数据 std::cout << "Consumer->" << data << std::endl; } } void *Producer(void *args) { while (true) { RingQueue *rq = static_cast *>(args); // 生产数据 int data = rand() % 10 + 1; rq->Push(data); std::cout << "Producer->" << data << std::endl; } } int main() { srand(time(nullptr)); RingQueue rq; pthread_t p, c; pthread_create(&c, nullptr, Consumer, &rq); pthread_create(&p, nullptr, Producer, &rq); pthread_join(p, nullptr); pthread_join(c, nullptr); return 0; }
注意,这里我们的数据依然可以传内置类型,自定义类型,或者可调用对象。
多生产多消费势必带来生产与生产的关系,消费与消费的联系。
也就意味着我们要进行加锁保护,因为多个生产者线程同时访问vector与_c_index这些共享资源,因此需要加锁。
我们此时有两个加锁方案(在P之前,V之后加锁解锁或者P之后V之前)
void Push(const T &in) { Lock(_p_mutex); P(_sem_space); _ringqueue[_p_index++] = in; _p_index %= _cap; V(_sem_data); Unlock(_p_mutex); } void Pop(T *out) { Lock(_c_mutex); P(_sem_data); *out = _ringqueue[_c_index++]; _p_index %= _cap; V(_sem_space); Unlock(_c_mutex); }
我们怎么选择?
我们的PV操作本质是一种预定机制,放在加锁之前可以让所有的线程共同申请信号量,达到申请并发,所以我们选择P之后V之前
加锁方案。
#include #include #include #include #include #include const int defaultnum = 5; template class RingQueue { private: void P(sem_t &sem) { sem_wait(&sem); } void V(sem_t &sem) { sem_post(&sem); } void Lock(pthread_mutex_t &mtx) { pthread_mutex_lock(&mtx); } void Unlock(pthread_mutex_t &mtx) { pthread_mutex_unlock(&mtx); } public: RingQueue(int cap = defaultnum) : _cap(cap), _p_index(0), _c_index(0), _ringqueue(cap) { sem_init(&_sem_data, 0, _cap); sem_init(&_sem_space, 0, _cap); pthread_mutex_init(&_p_mutex, nullptr); pthread_mutex_init(&_c_mutex, nullptr); } ~RingQueue() { sem_destroy(&_sem_data); sem_destroy(&_sem_space); pthread_mutex_destroy(&_p_mutex); pthread_mutex_destroy(&_c_mutex); } void Push(const T &in) { P(_sem_space); Lock(_p_mutex); _ringqueue[_p_index++] = in; _p_index %= _cap; Unlock(_p_mutex); V(_sem_data); } void Pop(T *out) { P(_sem_data); Lock(_c_mutex); *out = _ringqueue[_c_index++]; _p_index %= _cap; Unlock(_c_mutex); V(_sem_space); } private: std::vector _ringqueue; int _cap; int _p_index; int _c_index; sem_t _sem_data; sem_t _sem_space; pthread_mutex_t _p_mutex; pthread_mutex_t _c_mutex; };
完~