原创转载请注明出处:
接上一篇:
一、(LINUX 线程同步) 引入 http://blog.itpub.net/7728585/viewspace-2137980/
在线程同步中我们经常会使用到mutex互斥量,其作用用于保护一块临界区,避免多线程并发操作对这片临界区带来的数据混乱,
POSIX的互斥量是一种建议锁,因为如果不使用互斥量也可以访问共享数据,但是可能是不安全的。
其原语包含:
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
静态初始一个互斥量
int pthread_mutex_destroy(pthread_mutex_t *mutex);
销毁一个互斥量
int pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restrict attr);
初始化一个互斥量
int pthread_mutex_lock(pthread_mutex_t *mutex);
互斥量加锁操作,在这个函数调用下的临界区只允许一个线程进行访问,如果不能获得锁则堵塞等待
int pthread_mutex_trylock(pthread_mutex_t *mutex);
互斥量加锁操作,在这个函数调用下的临界区只允许一个线程进行访问,如果获得不了锁则放弃
int pthread_mutex_unlock(pthread_mutex_t *mutex);
互斥量解锁操作,一般用于在临界区数据操作完成后解锁
而条件变量cond则代表当某个条件不满足的情况下,本线程应该放弃锁,并且将本线程堵塞。典型的
生产者消费者问题,如果生产者还没来得及生产东西,消费者则不应该进行消费操作,应该放弃锁,将
自己堵塞,直到条件满足被生产者唤醒。原语包含:
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
静态初始一个条件变量
int pthread_cond_init(pthread_cond_t *cond,const pthread_condattr_t *cattr);
初始化一个条件变量
int pthread_cond_destroy(pthread_cond_t *cond)
销毁一个条件变量量
int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex)
由于某种条件不满足,解锁和他绑定互斥量,本线程堵塞等待条件成熟被其他线程唤醒,如消费者等待生产者生成完成后被唤醒
注意这里就绑定了一个互斥量,也就是说条件变量一般和某个互斥量配套使用,因为单独的条件变量达不到任何堵塞线程的目的
int pthread_cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex, const struct timespec *abstime)
和上面一样,只是加入了堵塞的时间
int pthread_cond_signal(pthread_cond_t *cond)
条件满足唤醒由于wait在这个条件变量上某个线程,一般用于生产者唤醒消费者
int pthread_cond_broadcast(pthread_cond_t *cond)
条件满足唤醒由于wait在这个条件变量上全部线程,一般用于生产者唤醒消费者
下面一张自己画的图希望对大家有所帮助。
最后是我写的一个,用一个链表栈结构,来模拟一个生产者生产数据,N个消费者并发进行消费的
一个生产者消费者程序。生产者一次生产一批数据,消费者一次只消费一个数据。
当生产者一次生产10个数据的时候,输出是这样的。
p:thread:140034536785664 prod 1 data is:data A:0 data B:0
p:thread:140034536785664 prod 2 data is:data A:1 data B:1
p:thread:140034536785664 prod 3 data is:data A:2 data B:2
p:thread:140034536785664 prod 4 data is:data A:3 data B:3
p:thread:140034536785664 prod 5 data is:data A:4 data B:4
p:thread:140034536785664 prod 6 data is:data A:5 data B:5
p:thread:140034536785664 prod 7 data is:data A:6 data B:6
p:thread:140034536785664 prod 8 data is:data A:7 data B:7
p:thread:140034536785664 prod 9 data is:data A:8 data B:8
p:thread:140034536785664 prod 10 data is:data A:9 data B:9
c:thread:140034520000256 cost 10 data is:data A:9 data B:9
c:thread:140034520000256 cost 9 data is:data A:8 data B:8
c:thread:140034520000256 cost 8 data is:data A:7 data B:7
c:thread:140034520000256 cost 7 data is:data A:6 data B:6
c:thread:140034520000256 cost 6 data is:data A:5 data B:5
c:thread:140034520000256 cost 5 data is:data A:4 data B:4
c:thread:140034520000256 cost 4 data is:data A:3 data B:3
c:thread:140034520000256 cost 3 data is:data A:2 data B:2
c:thread:140034520000256 cost 2 data is:data A:1 data B:1
c:thread:140034520000256 cost 1 data is:data A:0 data B:0
也可以看到他确实是一个后进先出的栈模型
这个时候并没有观察到多个线程竞争的问题,如果我将生产者定义为一次生产10000个数据,就可以
看到多个线程交替使用生产者数据的情况。
c:thread:140270541522688 cost 9959 data is:data A:9958 data B:9958 c:thread:140270667347712 cost 9952 data is:data A:9951 data B:9951
我们可以看到不同的线程ID的线程在竞争这把锁
下面是代码,临界区应该尽量选择小,我这里临界区选择较大,比如链表的创建
c:thread:140270692525824 cost 9962 data is:data A:9961 data B:9961
c:thread:140270692525824 cost 9961 data is:data A:9960 data B:9960
c:thread:140270692525824 cost 9960 data is:data A:9959 data B:9959
c:thread:140270541522688 cost 9959 data is:data A:9958 data B:9958
c:thread:140270541522688 cost 9958 data is:data A:9957 data B:9957
c:thread:140270541522688 cost 9957 data is:data A:9956 data B:9956
c:thread:140270541522688 cost 9956 data is:data A:9955 data B:9955
c:thread:140270541522688 cost 9955 data is:data A:9954 data B:9954
c:thread:140270541522688 cost 9954 data is:data A:9953 data B:9953
c:thread:140270541522688 cost 9953 data is:data A:9952 data B:9952
c:thread:140270667347712 cost 9952 data is:data A:9951 data B:9951
其实不需要放到临界区中,还比如内存的delete也不需要放到临界区,处于测试
目的已经达到,就不在纠结这个问题了:
-
头文件:
-
/*************************************************************************
-
> File Name: chain.h
-
> Author: gaopeng QQ:22389860 all right reserved
-
> Mail: gaopp_200217@163.com
-
> Created Time: Sun 04 Jun 2017 06:27:38 AM CST
-
************************************************************************/
-
-
#include<iostream>
-
#include<stdio.h>
-
#include <stdlib.h>
-
#include <unistd.h>
-
-
#define MAX_TID 10
-
using namespace std;
-
-
-
class t1data
-
{
-
private:
-
int a;
-
int b;
-
public:
-
t1data(){}
-
t1data(int i)
-
{
-
this->a = i;
-
this->b = i;
-
}
-
virtual void prin(void)
-
{
-
cout<<"data A:"<<a<<" data B:"<<b;
-
}
-
-
};
-
-
-
typedef struct queue_s
-
{
-
t1data data;
-
queue_s *next,*priv;
-
queue_s(int i)
-
{
-
data = t1data(i);
-
next = NULL;
-
priv = NULL;
-
}
-
} QUE_S,*QUE_S_P ;
-
-
typedef struct queue_h
-
{
-
QUE_S_P head_p;
-
QUE_S_P last_p;
-
unsigned int len;
-
pthread_mutex_t pmut;
-
pthread_cond_t pcon;
-
queue_h()
-
{
-
head_p=NULL;
-
last_p=NULL;
-
len = 0;
-
}
-
} QUE_H;
-
-
int debug_return(const int ret)
-
{
-
if(ret != 0)
-
{
-
strerror(ret);
-
exit(-1);
-
}
-
return 0;
-
}
-
主文件:
-
/*************************************************************************
-
> File Name: main.cpp
-
> Author: gaopeng QQ:22389860 all right reserved
-
> Mail: gaopp_200217@163.com
-
> Created Time: Sun 04 Jun 2017 07:18:28 AM CST
-
************************************************************************/
-
-
#include<iostream>
-
#include<stdio.h>
-
#include <pthread.h>
-
#include<string.h>
-
#include"chain.h"
-
-
#define MAXDATA 10000
-
//双向链表栈结构模型
-
-
using namespace std;
-
-
static int COUNT = 0;
-
-
-
void* pro(void* arg)
-
{
-
-
QUE_H* my_head;
-
my_head = (QUE_H*)arg;
-
QUE_S_P sp =NULL;
-
int ret = 0;
-
-
while(1)
-
{
-
ret = pthread_mutex_lock(&my_head->pmut);
-
debug_return(ret);
-
-
if(my_head->head_p != NULL)//如果元素没有消费完放弃MUTEX,进入下次循环
-
{
-
//cout<<"pro head != NULL unlock\n";
-
ret = pthread_mutex_unlock(&my_head->pmut);
-
debug_return(ret);
-
continue;
-
}
-
//如果消费完进行生产10000个元素
-
for(;COUNT<MAXDATA;COUNT++)//生产10000个元素
-
{
-
if(my_head->head_p == NULL)
-
{
-
int tm = 0;
-
QUE_S_P sp = new QUE_S(COUNT);
-
my_head->head_p = sp;
-
my_head->last_p = sp;
-
cout<<"p:thread:"<<pthread_self()<<" prod "<<++my_head->len<<" data is:";
-
(my_head->last_p->data).prin();
-
cout<<"\n";
-
sp = NULL;
-
}
-
else
-
{
-
QUE_S_P sp = new QUE_S(COUNT);
-
my_head->last_p->next = sp;
-
sp->priv = my_head->last_p;
-
my_head->last_p = sp;
-
cout<<"p:thread:"<<pthread_self()<<" prod "<<++my_head->len<<" data is:";
-
my_head->last_p->data.prin();
-
cout<<"\n";
-
sp = NULL;
-
}
-
}
-
//cout<<"pro unlock:\n";
-
ret = pthread_mutex_unlock(&my_head->pmut);
-
debug_return(ret);
-
//cout<<"pro signal:\n";
-
ret = pthread_cond_signal(&my_head->pcon);
-
debug_return(ret);
-
-
}
-
-
}
-
-
void* cus(void* arg)
-
{
-
int ret = 0;
-
QUE_H* my_head;
-
my_head = (QUE_H*)arg;
-
QUE_S_P tmp=NULL;
-
-
while(1)//消费方式为一个消费线程只消费一个元素,来模拟多消费线程竞争
-
{
-
ret = pthread_mutex_lock(&my_head->pmut);
-
debug_return(ret);
-
while(my_head->head_p == NULL) //如果已经消费完放弃锁,等到条件及有元素供消费
-
{
-
//cout<<"cus cond wait\n";
-
ret = pthread_cond_wait(&my_head->pcon,&my_head->pmut);
-
debug_return(ret);
-
}
-
-
if(my_head->len == 1)//消费如果只剩下一个元素处理
-
{
-
cout<<"c:thread:"<<pthread_self()<<" cost "<<my_head->len--<<" data is:";
-
my_head->last_p->data.prin();
-
delete my_head->last_p;
-
my_head->head_p = NULL;
-
my_head->last_p = NULL;
-
COUNT--;
-
}
-
else//否则处理如下
-
{
-
cout<<"c:thread:"<<pthread_self()<<" cost "<<my_head->len--<<" data is:";
-
my_head->last_p->data.prin();
-
cout<<"\n";
-
tmp = my_head->last_p->priv;
-
delete my_head->last_p;
-
my_head->last_p= tmp;
-
tmp=NULL;
-
COUNT--;
-
}
-
-
ret = pthread_mutex_unlock(&my_head->pmut);
-
debug_return(ret);
-
}
-
-
}
-
-
-
int main(void)
-
{
-
QUE_H my_head;
-
int ret = 0;
-
pthread_t tid[MAX_TID];
-
int tid_num = 0;
-
int i = 0;
-
-
ret = pthread_mutex_init(&my_head.pmut,NULL);
-
debug_return(ret);
-
ret = pthread_cond_init(&my_head.pcon,NULL);
-
debug_return(ret);
-
//一个生产者
-
ret = pthread_create(tid+tid_num,NULL,pro,(void*)&my_head);
-
debug_return(ret);
-
tid_num++;
-
//n个消费者
-
ret = pthread_create(tid+tid_num,NULL,cus,(void*)&my_head);
-
debug_return(ret);
-
tid_num++;
-
ret = pthread_create(tid+tid_num,NULL,cus,(void*)&my_head);
-
debug_return(ret);
-
tid_num++;
-
ret = pthread_create(tid+tid_num,NULL,cus,(void*)&my_head);
-
debug_return(ret);
-
tid_num++;
-
ret = pthread_create(tid+tid_num,NULL,cus,(void*)&my_head);
-
debug_return(ret);
-
tid_num++;
-
ret = pthread_create(tid+tid_num,NULL,cus,(void*)&my_head);
-
debug_return(ret);
-
-
//堵塞回收
-
for(i = 0;i<=tid_num;i++)
-
{
-
ret = pthread_join( *(tid+i) , NULL );
-
debug_return(ret);
-
}
-
-
ret=pthread_mutex_destroy(&my_head.pmut);
-
debug_return(ret);
-
ret=pthread_cond_destroy(&my_head.pcon);
-
debug_return(ret);
-
-
}
作者微信: