且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

二、(LINUX 线程同步) 互斥量、条件变量以及生产者消费者问题

更新时间:2021-07-24 15:28:07

原创转载请注明出处:

接上一篇:

一、(LINUX 线程同步) 引入  http://blog.itpub.net/7728585/viewspace-2137980/


在线程同步中我们经常会使用到mutex互斥量,其作用用于保护一块临界区,避免多线程并发操作对这片临界区带来的数据混乱,
POSIX的互斥量是一种建议锁,因为如果不使用互斥量也可以访问共享数据,但是可能是不安全的。
其原语包含:
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; 
静态初始一个互斥量
int pthread_mutex_destroy(pthread_mutex_t *mutex);
销毁一个互斥量
int pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restrict attr);
初始化一个互斥量
int pthread_mutex_lock(pthread_mutex_t *mutex);
互斥量加锁操作,在这个函数调用下的临界区只允许一个线程进行访问,如果不能获得锁则堵塞等待
int pthread_mutex_trylock(pthread_mutex_t *mutex);
互斥量加锁操作,在这个函数调用下的临界区只允许一个线程进行访问,如果获得不了锁则放弃
int pthread_mutex_unlock(pthread_mutex_t *mutex);
互斥量解锁操作,一般用于在临界区数据操作完成后解锁


而条件变量cond则代表当某个条件不满足的情况下,本线程应该放弃锁,并且将本线程堵塞。典型的
生产者消费者问题,如果生产者还没来得及生产东西,消费者则不应该进行消费操作,应该放弃锁,将
自己堵塞,直到条件满足被生产者唤醒。原语包含:
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
静态初始一个条件变量
int pthread_cond_init(pthread_cond_t *cond,const pthread_condattr_t *cattr);
初始化一个条件变量
int pthread_cond_destroy(pthread_cond_t *cond)
销毁一个条件变量量
int   pthread_cond_wait(pthread_cond_t   *cond,   pthread_mutex_t   *mutex)   
由于某种条件不满足,解锁和他绑定互斥量,本线程堵塞等待条件成熟被其他线程唤醒,如消费者等待生产者生成完成后被唤醒
注意这里就绑定了一个互斥量,也就是说条件变量一般和某个互斥量配套使用,因为单独的条件变量达不到任何堵塞线程的目的
int pthread_cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex, const struct timespec *abstime)
和上面一样,只是加入了堵塞的时间
int pthread_cond_signal(pthread_cond_t *cond)
条件满足唤醒由于wait在这个条件变量上某个线程,一般用于生产者唤醒消费者
int pthread_cond_broadcast(pthread_cond_t *cond)
条件满足唤醒由于wait在这个条件变量上全部线程,一般用于生产者唤醒消费者

下面一张自己画的图希望对大家有所帮助。
二、(LINUX 线程同步) 互斥量、条件变量以及生产者消费者问题

最后是我写的一个,用一个链表栈结构,来模拟一个生产者生产数据,N个消费者并发进行消费的
一个生产者消费者程序。生产者一次生产一批数据,消费者一次只消费一个数据。
当生产者一次生产10个数据的时候,输出是这样的。
p:thread:140034536785664 prod 1 data is:data A:0 data B:0
p:thread:140034536785664 prod 2 data is:data A:1 data B:1
p:thread:140034536785664 prod 3 data is:data A:2 data B:2
p:thread:140034536785664 prod 4 data is:data A:3 data B:3
p:thread:140034536785664 prod 5 data is:data A:4 data B:4
p:thread:140034536785664 prod 6 data is:data A:5 data B:5
p:thread:140034536785664 prod 7 data is:data A:6 data B:6
p:thread:140034536785664 prod 8 data is:data A:7 data B:7
p:thread:140034536785664 prod 9 data is:data A:8 data B:8
p:thread:140034536785664 prod 10 data is:data A:9 data B:9
c:thread:140034520000256 cost 10 data is:data A:9 data B:9
c:thread:140034520000256 cost 9 data is:data A:8 data B:8
c:thread:140034520000256 cost 8 data is:data A:7 data B:7
c:thread:140034520000256 cost 7 data is:data A:6 data B:6
c:thread:140034520000256 cost 6 data is:data A:5 data B:5
c:thread:140034520000256 cost 5 data is:data A:4 data B:4
c:thread:140034520000256 cost 4 data is:data A:3 data B:3
c:thread:140034520000256 cost 3 data is:data A:2 data B:2
c:thread:140034520000256 cost 2 data is:data A:1 data B:1
c:thread:140034520000256 cost 1 data is:data A:0 data B:0
也可以看到他确实是一个后进先出的栈模型
这个时候并没有观察到多个线程竞争的问题,如果我将生产者定义为一次生产10000个数据,就可以
看到多个线程交替使用生产者数据的情况。
c:thread:140270541522688 cost 9959 data is:data A:9958 data B:9958 c:thread:140270667347712 cost 9952 data is:data A:9951 data B:9951
我们可以看到不同的线程ID的线程在竞争这把锁
下面是代码,临界区应该尽量选择小,我这里临界区选择较大,比如链表的创建
c:thread:140270692525824 cost 9962 data is:data A:9961 data B:9961
c:thread:140270692525824 cost 9961 data is:data A:9960 data B:9960
c:thread:140270692525824 cost 9960 data is:data A:9959 data B:9959
c:thread:140270541522688 cost 9959 data is:data A:9958 data B:9958
c:thread:140270541522688 cost 9958 data is:data A:9957 data B:9957
c:thread:140270541522688 cost 9957 data is:data A:9956 data B:9956
c:thread:140270541522688 cost 9956 data is:data A:9955 data B:9955
c:thread:140270541522688 cost 9955 data is:data A:9954 data B:9954
c:thread:140270541522688 cost 9954 data is:data A:9953 data B:9953
c:thread:140270541522688 cost 9953 data is:data A:9952 data B:9952
c:thread:140270667347712 cost 9952 data is:data A:9951 data B:9951
其实不需要放到临界区中,还比如内存的delete也不需要放到临界区,处于测试
目的已经达到,就不在纠结这个问题了:

点击(此处)折叠或打开

  1. 头文件:
  2. /*************************************************************************
  3.   > File Name: chain.h
  4.   > Author: gaopeng QQ:22389860 all right reserved
  5.   > Mail: gaopp_200217@163.com
  6.   > Created Time: Sun 04 Jun 2017 06:27:38 AM CST
  7.  ************************************************************************/

  8. #include<iostream>
  9. #include<stdio.h>
  10. #include <stdlib.h>
  11. #include <unistd.h>

  12. #define MAX_TID 10
  13. using namespace std;


  14. class t1data
  15. {
  16.         private:
  17.                 int a;
  18.                 int b;
  19.         public:
  20.                 t1data(){}
  21.                 t1data(int i)
  22.                 {
  23.                         this->= i;
  24.                         this->= i;
  25.                 }
  26.                 virtual void prin(void)
  27.                 {
  28.                         cout<<"data A:"<<a<<" data B:"<<b;
  29.                 }

  30. };


  31. typedef struct queue_s 
  32. {
  33.         t1data data;
  34.         queue_s *next,*priv;
  35.         queue_s(int i)
  36.         {
  37.                 data = t1data(i);
  38.                 next = NULL;
  39.                 priv = NULL;
  40.         }
  41. } QUE_S,*QUE_S_P ;

  42. typedef struct queue_h
  43. {
  44.         QUE_S_P head_p;
  45.         QUE_S_P last_p;
  46.         unsigned int len;
  47.         pthread_mutex_t pmut;
  48.         pthread_cond_t pcon;
  49.         queue_h()
  50.         {
  51.                 head_p=NULL;
  52.                 last_p=NULL;
  53.                 len = 0;
  54.         }
  55. } QUE_H;

  56. int debug_return(const int ret)
  57. {
  58.         if(ret != 0)
  59.         {
  60.                 strerror(ret);
  61.                 exit(-1);
  62.         }
  63.         return 0;
  64. }
主文件:

点击(此处)折叠或打开

  1. /*************************************************************************
  2.   > File Name: main.cpp
  3.   > Author: gaopeng QQ:22389860 all right reserved
  4.   > Mail: gaopp_200217@163.com
  5.   > Created Time: Sun 04 Jun 2017 07:18:28 AM CST
  6.  ************************************************************************/

  7. #include<iostream>
  8. #include<stdio.h>
  9. #include <pthread.h>
  10. #include<string.h>
  11. #include"chain.h"

  12. #define MAXDATA 10000
  13. //双向链表栈结构模型

  14. using namespace std;

  15. static int COUNT = 0;


  16. void* pro(void* arg)
  17. {

  18.         QUE_H* my_head;
  19.         my_head = (QUE_H*)arg;
  20.         QUE_S_P sp =NULL;
  21.         int ret = 0;

  22.         while(1)
  23.         {
  24.                 ret = pthread_mutex_lock(&my_head->pmut);
  25.                 debug_return(ret);

  26.                 if(my_head->head_p != NULL)//如果元素没有消费完放弃MUTEX,进入下次循环
  27.                 {
  28.                         //cout<<"pro head != NULL unlock\n";
  29.                         ret = pthread_mutex_unlock(&my_head->pmut);
  30.                         debug_return(ret);
  31.                         continue;
  32.                 }
  33.                 //如果消费完进行生产10000个元素
  34.                 for(;COUNT<MAXDATA;COUNT++)//生产10000个元素
  35.                 {
  36.                         if(my_head->head_p == NULL)
  37.                         {
  38.                                 int tm = 0;
  39.                                 QUE_S_P sp = new QUE_S(COUNT);
  40.                                 my_head->head_p = sp;
  41.                                 my_head->last_p = sp;
  42.                                 cout<<"p:thread:"<<pthread_self()<<" prod "<<++my_head->len<<" data is:";
  43.                                 (my_head->last_p->data).prin();
  44.                                 cout<<"\n";
  45.                                 sp = NULL;
  46.                         }
  47.                         else
  48.                         {
  49.                                 QUE_S_P sp = new QUE_S(COUNT);
  50.                                 my_head->last_p->next = sp;
  51.                                 sp->priv = my_head->last_p;
  52.                                 my_head->last_p = sp;
  53.                                 cout<<"p:thread:"<<pthread_self()<<" prod "<<++my_head->len<<" data is:";
  54.                                 my_head->last_p->data.prin();
  55.                                 cout<<"\n";
  56.                                 sp = NULL;
  57.                         }
  58.                 }
  59.                 //cout<<"pro unlock:\n";
  60.                 ret = pthread_mutex_unlock(&my_head->pmut);
  61.                 debug_return(ret);
  62.                 //cout<<"pro signal:\n";
  63.                 ret = pthread_cond_signal(&my_head->pcon);
  64.                 debug_return(ret);

  65.         }

  66. }

  67. void* cus(void* arg)
  68. {
  69.         int ret = 0;
  70.         QUE_H* my_head;
  71.         my_head = (QUE_H*)arg;
  72.         QUE_S_P tmp=NULL;

  73.         while(1)//消费方式为一个消费线程只消费一个元素,来模拟多消费线程竞争
  74.         {
  75.                 ret = pthread_mutex_lock(&my_head->pmut);
  76.                 debug_return(ret);
  77.                 while(my_head->head_p == NULL) //如果已经消费完放弃锁,等到条件及有元素供消费
  78.                 {
  79.                         //cout<<"cus cond wait\n";
  80.                         ret = pthread_cond_wait(&my_head->pcon,&my_head->pmut);
  81.                         debug_return(ret);
  82.                 }

  83.                 if(my_head->len == 1)//消费如果只剩下一个元素处理
  84.                 {
  85.                         cout<<"c:thread:"<<pthread_self()<<" cost "<<my_head->len--<<" data is:";
  86.                         my_head->last_p->data.prin();
  87.                         delete my_head->last_p;
  88.                         my_head->head_p = NULL;
  89.                         my_head->last_p = NULL;
  90.                         COUNT--;
  91.                 }
  92.                 else//否则处理如下
  93.                 {
  94.                         cout<<"c:thread:"<<pthread_self()<<" cost "<<my_head->len--<<" data is:";
  95.                         my_head->last_p->data.prin();
  96.                         cout<<"\n";
  97.                         tmp = my_head->last_p->priv;
  98.                         delete my_head->last_p;
  99.                         my_head->last_p= tmp;
  100.                         tmp=NULL;
  101.                         COUNT--;
  102.                 }

  103.                 ret = pthread_mutex_unlock(&my_head->pmut);
  104.                 debug_return(ret);
  105.         }

  106. }


  107. int main(void)
  108. {
  109.         QUE_H my_head;
  110.         int ret = 0;
  111.         pthread_t tid[MAX_TID];
  112.         int tid_num = 0;
  113.         int i = 0;

  114.         ret = pthread_mutex_init(&my_head.pmut,NULL);
  115.         debug_return(ret);
  116.         ret = pthread_cond_init(&my_head.pcon,NULL);
  117.         debug_return(ret);
  118.         //一个生产者
  119.         ret = pthread_create(tid+tid_num,NULL,pro,(void*)&my_head);
  120.         debug_return(ret);
  121.         tid_num++;
  122.         //n个消费者
  123.         ret = pthread_create(tid+tid_num,NULL,cus,(void*)&my_head);
  124.         debug_return(ret);
  125.         tid_num++;
  126.         ret = pthread_create(tid+tid_num,NULL,cus,(void*)&my_head);
  127.         debug_return(ret);
  128.         tid_num++;
  129.         ret = pthread_create(tid+tid_num,NULL,cus,(void*)&my_head);
  130.         debug_return(ret);
  131.         tid_num++;
  132.         ret = pthread_create(tid+tid_num,NULL,cus,(void*)&my_head);
  133.         debug_return(ret);
  134.         tid_num++;
  135.         ret = pthread_create(tid+tid_num,NULL,cus,(void*)&my_head);
  136.         debug_return(ret);

  137.     //堵塞回收
  138.         for(i = 0;i<=tid_num;i++)
  139.         {
  140.                 ret = pthread_join( *(tid+i) , NULL );
  141.                 debug_return(ret);
  142.         }

  143.         ret=pthread_mutex_destroy(&my_head.pmut);
  144.         debug_return(ret);
  145.         ret=pthread_cond_destroy(&my_head.pcon);
  146.         debug_return(ret);

  147. }

作者微信:

               二、(LINUX 线程同步) 互斥量、条件变量以及生产者消费者问题