开发者

Linux基于环形队列的生产消费者模型详解

开发者 https://www.devze.com 2025-04-30 09:56 出处:网络 作者: s_little_monster_
目录一、POSIX信号量1、概述2、调用接口(一)初始化信号量(二)销毁信号量(三)等待信号量(四)发布信号量3、在环形队列中的作用二、基于环形队列的生产消费者模型1、理论探究2、代码实现(一)RingQueue.hpp(二
目录
  • 一、POSIX信号量
    • 1、概述
    • 2、调用接口
      • (一)初始化信号量
      • (二)销毁信号量
      • (三)等待信号量
      • (四)发布信号量
    • 3、在环形队列中的作用
    • 二、基于环形队列的生产消费者模型
      • 1、理论探究
        • 2、代码实现
          • (一)RingQueue.hpp
          • (二)Task.hpp
          • (三)main.cpp
        • 3、PV操作包裹住加解锁操作的原因
        • 总结

          一、POSIX信号量

          1、概述

          在我们进行环形队列的生产消费者模型的学习之前,我们要对前置条件POSIX信号量进行学习,这里的POSIX的信号量与systemV的信号量是几乎一致的,都是用于同步操作,达到无冲突的访问共享资源的目的,只是POSIX信号量的使用要更简单一些,可以用于线程间同步

          信号量的本质就是一个计数器,它的本质就是用来描述资源数目的,把资源是否就绪放到了临界区之外,在申请信号量的时候其实已经就是间接在做判断了

          2、调用接口

          (一)初始化信号量

          #include <semaphore.h>
          int sem_init(sem_t *sem, int pshared, unsigned int value);
          • 返回值:成功返回0,失败返回-1
          • sem:指向要初始化的信号量对象的指针
          • pshared:指定信号量的共享属性,如果pshared为 0,表示信号量是进程内共享的,只能在创建它的进程内的多个线程之间使用,如果pshared非 0,表示信号量可以在多个进程之间共享
          • value:指定信号量的初始值,表示可以同时访问共享资源的线程或进程的数量

          (二)销毁信号量

          #include <semaphore.h>
          int sem_destroy(sem_t *sem);
          • 返回值:成功返回0,失败返回-1
          • sem:指向要销毁的信号量对象的指针

          (三)等待信号量

          #include <semaphore.h>
          int sem_wait(sem_t *sem);
          • 返回值:成功返回0,失败返回-1
          • sem:指向要操作的信号量对象的指针,这个指针一定要是被初始化过的

          sem_wait 函数执行的是信号量的 P 操作

          • 如果信号量 sem 的值大于 0,sem_wait 会将信号量的值减 1,然后立即返回,调用线程或进程可以继续执行后续代码,意味着该线程或进程成功获取了对共享资源的访问权
          • 如果信号量 sem 的值等于 0,sem_wait 会使调用线程或进程进入阻塞状态,直到信号量的值大于 0 为止。一旦信号量的值变为大于 0,sem_wait 会将信号量的值减 1 并返回,线程或进程继续执行

          (四)发布信号量

          #include <semaphore.h>
          int sem_post(sem_t *sem);
          • 返回值:成功返回0,失败返回-1
          • sem:指向要操作的信号量对象的指针,这个指针一定要是被初始化过的

          sem_post 函数执行的是信号量的 V 操作,会将信号量 sem 的值加 1

          • 如果在调用 sem_post 之前,有其他线程或进程因为调用 sem_wait 而阻塞在该信号量上(即信号量的值为 0),那么在信号量的值加 1 之后,系统会唤醒其中一个阻塞的线程或进程,被唤醒的线程或进程会将信号量的值再减 1 并继续执行后续代码

          3、在环形队列中的作用

          我们在之前应该都接触过环形队列,在环形队列中,一般我们是需要一个计数器的,或者在环形队列中留出最后一个位置,因为如果没有这些措施,我们就不知道双指针谁在前谁在后了,我们这里使用信号量替代了这个计数器

          二、基于环形队列的生产消费者模型

          1、理论探究

          Linux基于环形队列的生产消费者模型详解

          我们通过数组以及模运算的方式来模拟环状模型,前面的基于阻塞队列的生产消费者模型底层来说是基于容器queue的,其空间可以动态分配,现在是基于固定大小的,基于容器vector

          其中生产者关注的是环形队列的空间资源,消费者关心的是环形队列的数据资源,而环形队列中的空间资源+数据资源=全部资源,只要有空间生产者就可以生产数据然后放入,只要有数据消费者就可以取出数据然后加工

          2、代码实现

          (一)RingQueue.hpp

          #pragma once
          #include <IOStream>
          #include <vector>
          #include <semaphore.h>
          #include <pthread.h>
          //环形队列默认容量
          const static int defaultcap = 8;
          //环形队列核心接口:PV操作以及加锁解锁
          template<class T>
          class RingQueue{
          private:
              void P(sem_t &sem)
              {
                  sem_wait(&sem);
              }
              void V(sem_t &sem)
              {
          js        sem_post(&sem);
              }
              void Lock(pthread_mutex_t &mutex)
              {
                  pthread_mutex_lock(&mutex);
              }
              void Unlock(pthread_mutex_t &mutex)
              {
                  pthread_mutex_unlock(&mutex);
              }
          public:
          	//初始化
              RingQueue(int cap = defaultcap)
              :ringqueue_(cap), cap_(cap), c_step_(0), p_step_(0)
              {
                  sem_init(&cdata_sem_, 0, 0);
                  sem_init(&pspace_sem_, 0, cap);
          		//生产者消费者的锁
                  pthread_mutex_init(&c_mutex_, nullptr);
                  pthread_mutex_init(&p_mutex_, nullptr);
              }
              void Push(const T &in) // 生产活动
              {
              	//调用P函数检查队列中是否有可用空间,没有可用空间线程会阻塞
                  P(pspace_sem_);
          		//这里为什么要先P后加锁,下面详谈
                  Lock(p_mutex_); 
                  ringqueue_[p_step_] = in;
                  // 位置后移,维持环形特性
                  p_step_++;
                  p_step_ %= cap_;
                  Unlock(p_mutex_); 
          
                  V(cdata_sem_);
          
              }
              void Pop(T *out)       // 消费活动
              {
                  P(cdata_sem_);
          
                  Lock(c_mutex_); 
                  *out = ringqueue_[c_step_];
                  // 位置后移,维持环形特性
                  c_step_++;
                  c_step_ %= cap_;
                  Unlock(c_mutex_); 
          
                  V(pspace_sem_);
              }
              //析构销毁
              ~RingQueue()
              {
                  sem_destroy(&cdata_sem_);
                  sem_destroy(&pspace_sem_);
          
                  pthread_mutex_destroy(&c_mutex_);
                  pthread_mutex_destroy(&p_mutex_);
              }
          private:
              std::vector<T> ringqueue_;// 环形队列的底层实现
              int cap_;		   		  // 队列容量
          
              int c_step_;       		  // 消费者下标
              int p_step_;      		  // 生产者下标
          
              sem_t cdata_sem_; 		  // 队中可用数据资源
              sem_t pspace_sem_;		  //OLIgZ 队中可用空间资源
          
              pthread_mutex_t c_mutex_; // 消费者锁
              pthread_mutex_t p_mutex_; // 生产者锁
          };

          (二)Task.hpp

          任务函数还是上一次的任务

          #pragma once
          #include <iostream>
          #include <string>
          
          std::string opers="+-*/%";
          
          enum{
              DivZero=1,
              Modzero,
              Unknown
          };
          
          class Task
          {
          public:
              Task()
              {}
              Task(int x, int y, char op) : data1_(x), data2_(y), oper_(op), result_(0), exitcode_(0)
              {}
              void run()
              {
                  switch (oper_)
                  {
                  case '+':
                      result_ = data1_ + data2_;
                      break;
                  case '-':
                      result_ = data1_ - data2_;
                      break;
                  case '*':
                      result_ = data1_ * data2_;
                      break;
                  case '/':
                      {
                          if(data2_ == 0) exitcode_ = DivZero;
                          else result_ = data1_ / data2_;
                      }
                      brehttp://www.devze.comak;
                  case '%':
                     {
                          if(data2_ == 0) exitcode_ = ModZero;
                     www.devze.com     else result_ = data1_ % data2_;
                      }            break;
                  default:
                      exitcode_ = Unknown;
                      break;
                  }
              }
              void operator ()()
              {
                  run();
              }
              std::string GetResult()
              {
                  std::string r = std::to_string(data1_);
                  r += oper_;
                  r += std::to_string(data2_);
                  r += "=";
                  r += std::to_string(result_);
                  r += "[code: ";
                  r += std::to_string(exitcode_);
                  r += "]";
          
                  return r;
              }
              std::string GetTask()
              {
                  std::string r = std::to_string(data1_);
                  r += oper_;
                  r += std::to_string(data2_);
                  r += "=?";
                  return r;
              }
              ~Task()
              {}
          
          private:
              int data1_;
              int data2_;
              char oper_;
          
              int result_;
              int exitcode_;
          };

          (三)main.cpp

          #include <iostream>
          #include <pthread.h>
          #include <unistd.h>
          #include <ctime>
          #include "RingQueue.hpp"
          #include "Task.hpp"
          
          using namespace std;
          //这个结构体是方便我们打印的时候查看方便的
          struct ThreadData
          {
              RingQueue<Task> *rq;   //环形队列
              std::string threadname;//线程名字
          };
          
          void *Productor(void *args)
          {
              ThreadData *td = static_cast<ThreadData*>(args);
              RingQueue<Task> *rq = td->rq;
              std::string name = td->threadname;
              int len = opers.size();
              while (true)
              {
                  // 模拟获取数据
                  int data1 = rand() % 10 + 1;
                  usleep(10);
                  int data2 = rand() % 10;
                  char op = opers[rand() % len];
                  Task t(data1, data2, op);
          
                  // 生产数据
                  rq->Push(t);
                  cout << "Productor task done, task is : " << t.GetTask() << " who: " << name << endl;
          
                  sleep(1);
              }
              return nullptr;
          }
          
          void *Consumer(void *args)
          {
              ThreadData *td = static_cast<ThreadData*>(args);
              RingQueue<Task> *rq = td->rq;
              std::string name = td->threadname;
          
              while (true)
              {
                  // 消费数据
                  Task t;
                  rq->Pop(&t);
                 
                  // 处理数据
                  t();
                  cout << "Consumer get task, task is : " << t.GetTask() << " who: " << name << " result: " << t.GetResult() << endl;
              }
              return nullptr;
          }
          
          int main()
          {
              srand(time(nullptr));
              RingQueue<Task> *rq = new RingQueue<Task>(10);
          
              pthread_t c[5], p[3];
              
          	//这里我们为了方便查看,统一用单生产单消费
              for (int i = 0; i < 1; i++)
              {
                  ThreadData *td = new ThreadData();
                  td->rq = rq;
                  td->threadname = "Productor-" + std::to_string(i);
          
                  pthread_create(p + i, nullptr, Productor, td);
              }
              for (int i = 0; i < 1; i++)
              {
                  ThreadData *td = new ThreadData();
                  td->rq = rq;
                  td->threadname = "Consumer-" + std::to_string(i);
          
                  pthread_create(c + i, nullptr, Consumer, td);
              }
          
              for (int i = 0; i < 1; i++)
              {
                  pthread_join(p[i], nullptr);
              }
              for (int i = 0; i < 1; i++)
              {
                  pthread_join(c[i], nullptr);
              }
          
              return 0;
          }

          Linux基于环形队列的生产消费者模型详解

          3、PV操作包裹住加解锁操作的原因

          PopPush 函数中,以Push 函数为例,P(pspace_sem_)V(cdata_sem_) 包裹着 Lock(p_mutex_)Unlock(p_mutex_) 这种设计是为了实现更细粒度的同步控制,尽可能减少锁的竞争,以确保线程安全和高效性,下面详细解释其原因:

          P(pspace_sem_)Lock(p_mutex_) 之前:

          • 信号量的作用pspace_sem_ 信号量用于表示环形队列中可用的空间资源,P(pspace_sem_) 操作会检查信号量的值,如果值大于 0,则将其减 1 并继续执行,如果值为 0,则线程会阻塞,直到android有可用空间(即其他线程调用 V(pspace_sem_) 释放空间)
          • 避免不必要的加锁:在尝试获取互斥锁之前先检查信号量,可以避免在没有可用空间时加锁,因为如果没有可用空间,即使加了锁也无法进行生产操作,还会导致其他线程无法释放空间,造成资源浪费和性能下降,通过先检查信号量,只有在有可用空间时才去获取互斥锁,减少了锁的竞争,提高了程序的效率

          V(cdata_sem_)Unlock(p_mutex_) 之后:

          • 信号量的通知机制cdata_sem_ 信号量用于表示环形队列中可用的数据资源,V(cdata_sem_) 操作会将信号量的值加 1,如果有消费者线程因为等待数据而阻塞,会唤醒其中一个线程
          • 避免死锁和数据不一致:在释放互斥锁之后再增加 cdata_sem_ 信号量的值,可以确保在通知消费者有新数据可用之前,生产者已经完成了对共享资源的修改,并且释放了锁,如果在加锁状态下就增加信号量,可能会导致消费者线程被唤醒后尝试获取锁,但由于生产者还持有锁而无法进入临界区,从而造成死锁或数据不一致的问题

          总结

          以上为个人经验,希望能给大家一个参考,也希望大家多多支持编程客栈(www.devze.com)。

          0

          精彩评论

          暂无评论...
          验证码 换一张
          取 消

          关注公众号