学习线程互斥之前,我们先对linux的线程库进行封装,熟悉一下C++的线程库。并且方便我们后续使用
我们主要要实现start stop join三个功能,线程启动,线程终止,线程等待。完成这些就可以快速使用线程了!
类内部需要:
拥有这些成员变量,就这样就可以保证我们的基本工作了!
namespace ThreadMouble { //回调函数的类型 typedef void(*func_t)(const std::string& name); class Thread { public: //构造函数需要传入名字和回调函数 Thread(const std::string& name , func_t func): _name(name), _func(func) { } bool Start() { } void Stop() { } void Join() { } ~Thread() { } private: //线程名字 std::string _name; //线程ID pthread_t id; //是否运行判断符 bool isrunning; //回调函数 func_t _func; //函数返回值 std::string _result; }; }
线程启动接口很简单就可以实现,我们调用系统调用pthread_create
传入对应的参数.
但是执行的ThreadRun函数就要费一些头脑,pthread_create
系统调用中需要传入一个void* (* )(void*)
的函数指针
void* ThreadRoutinue(void* args) --- 执行回调方法
但是对象内的函数都有一个默认参数 this指针,所以需要加入 static修饰成为类的函数,这样也造成不能调用内部的成员了, 为了优雅的执行 多加一个Excute()成员 进行调用回调函数
void Excute() { isrunning = true; _func(_name); isrunning = false; } static void* ThreadRun(void* args) { //获取类对象 Thread* self = static_cast(args); self->Excute(); return nullptr; } bool Start() { //需要启动线程 isrunning = true; int n = ::pthread_create(&id , nullptr , ThreadRun , this); if(n == 0) { return true; } else { return false; } }
PS: ::
表示使用标准库的接口
这样就优雅的执行了线程启动
只有线程运行了才可以进行终止,直接调用系统调用即可
void Stop() { if(isrunning ) { isrunning = false; ::pthread_cancel(id); } return ; }
直接调用系统调用即可!
void Join() { ::pthread_join(id , nullptr); return ; }
我们写好了线程的封装,接下来就来使用一下,来看看效果:
#include #include"Thread.hpp" #include using namespace ThreadMouble; void threadrun(const std::string& name) { while(true) { std::cout << "name: " << name << " is running..." << std::endl; sleep(1); } return ; } int main() { Thread t("thread-1" , threadrun); t.Start(); std::cout << t.status() << std::endl; sleep(10); t.Stop(); t.Join(); return 0; }
运行来看:
很好,可以正常创建线程并执行任务!
线程可以看到的大部分资源是共享资源,即多个线程可以看到的资源叫做共享资源!那么如果今天这个共享资源是一个大数组,一个线程可以进行写入,其他线程可以进行读取,这样不就实现了线程通信了!可是还是有问题的,因为线程读取不受对方控制,可以刚写一个字符立马就被读取了。就造成了读取不一致的问题。所以共享资源往往需要进行保护,类似取ATM机取钱,虽然是公共场所但是只有你一个人可以使用当前的ATM机。而线程也有这样的场景,就是线程互斥!
首先我们先来看看多线程访问中会遇到的问题 — 我们设置一个情景,抢10000张票,我们设置4个并发线程一起来抢票:
#include #include "Thread.hpp" #include #include using namespace ThreadMouble; // 一共10000张票 int num = 10000; void threadrun(const std::string &name) { while (true) { if (num > 0) { usleep(1000);//抢票的时间 std::cout << "name: " << name << "剩余票数: " << num << std::endl; num--; } else { break; } } return; } int main() { std::vector thds; for (int i = 0; i < 4; i++) { char buffer[128]; std::string name = "thread-" + std::to_string(i); snprintf(buffer, 128, "%s", name.c_str()); thds.emplace_back(buffer, threadrun); } for (int i = 0; i < 4; i++) { thds[i].Start(); } std::cout << "所以票已经强光!!! " << std::endl; for (int i = 0; i < 4; i++) { thds[i].Join(); } return 0; }
我们运行看看:
运行之后发现怎么抢到了负数票?这是为什么???这其实就是多线程并发访问中会遇到的问题,访问全局资源时就发生了问题!
我们分析一下为什么会发生问题
num
,然后再到一个寄存器中进行储存,再然后将判断数0
移动到寄存器进行判断,最后得到结果。每个线程都会进行这样一个过程--
,就可能造成多个线程都存储着最后一张票,这样就造成了负数!为了解决上述的问题,就要使用锁,我们先来了解锁和对应接口。
在pthread库中有我们锁的对应接口,和类型pthread_mutex_t
互斥锁(任何时刻只允许一个线程进行资源访问)。有了这把锁既有对应的初始化和销毁。设置时不管是全局的还是静态的,只需要进行init即可。
pthread_mutex_init
的第一个参数传入锁的地址,第二个参数设置为nullptr
就行.pthread_mutex_destory
传入锁的地址就可以进行销毁了,全局或者静态的其实不需要进行主动销毁,在程序运行结束之后就自动销毁了。使用临时锁时才需要进行主动销毁。进行加锁时需要使用lock
,解锁使用unlock
,非常直观!在使用过程中,会有多个线程竞争一个锁,成功的正常运行,失败的直接阻塞。
所谓的对共享资源的保护,本质是对临界区代码的保护!因为访问资源是由代码进行访问的,把访问资源的代码保护起来就保护了共享资源!接下来我们来了解一下临界区和非临界区
在需要保护的区域进行上锁,使其串行执行线程,就不会出现之前并发执行的问题了!
我们快速上手一下:
void threadrun(const std::string &name) { while (true) { pthread_mutex_lock(&gmutex); if (num > 0) { usleep(1000); std::cout << "name: " << name << "剩余票数: " << num << std::endl; num--; pthread_mutex_unlock(&gmutex); } else { pthread_mutex_unlock(&gmutex); break; } } return; }
我们分析过,出现问题的原因是这个判断语句,也就是临界区,要在临界区之前上锁。也就是在进行抢票判断之前,我们先将代码上锁。之后处理完成就解锁(一定要保证解锁)。
注意:
总之,对于其他线程,要么没有申请锁,要么释放了锁,对于其他线程才有意义!相当于我访问临界区,对于其他线程是原子的!
我们在对锁和线程名进行一个整体封装,更加优雅地进行使用:
// 包含回调函数所需的数据 class ThreadData { public: ThreadData(const std::string name, pthread_mutex_t *gmutex) : _name(name), _lock(gmutex) { } ~ThreadData() { } public: std::string _name; pthread_mutex_t *_lock; };
再稍微修改一下线程类内部的构造函数,将主函数的传参修改一下:
int main() { //设置一个局部锁 pthread_mutex_t mutex ; pthread_mutex_init(&mutex , nullptr); std::vector thds; for (int i = 0; i < 4; i++) { char buffer[128]; std::string name = "thread-" + std::to_string(i); snprintf(buffer, 128, "%s", name.c_str()); //每个线程都需要一个td对象 ThreadData *td = new ThreadData(name, &mutex); thds.emplace_back(name, threadrun, td); } for (int i = 0; i < 4; i++) { thds[i].Start(); } for (int i = 0; i < 4; i++) { thds[i].Join(); } //销毁锁 pthread_mutex_destroy(&mutex); return 0; }
我们运行一下:
可以看到使用的是同一个锁!
我们还可以进行优化,我们可以将锁单独封装起来,做到自动解锁释放:
#include class LockGuard { public: LockGuard(pthread_mutex_t *td) : _td(td) { pthread_mutex_lock(_td); } ~LockGuard() { pthread_mutex_unlock(_td); } private: pthread_mutex_t *_td; };
这样每次在临界区之前创建一个锁对象,就可以完成对临界区的保护!
上面我们见到了锁的作用,那我们如何理解:
pthread_mutex_lock()
会返回,否则不返回(就阻塞了,直到函数内部被唤醒,重新申请锁)!这就一个类似scanf()
的情况 。 i++
或者 ++i
都不是原子的,因为++
这个运算至少经历三条汇编语句,在运行其中一条时退出, 有可能会有数据一致性问题!swap
或exchange
指令,该指令的作用是把寄存器和内存单元的数据相交换,由于只有一条指令,保证了原子性(一条汇编语句的是原子性的)。即使是多处理器平台,访问内存的总线周期也有先后,一个处理器上的交换指令执行时另一个处理器的交换指令只能等待总线周期。我们可以画图来看:
1
带走!0
就阻塞在这里了1
交换到内存中,此时就可以别其他线程使用了!
后序文章继续学习线程互斥与线程同步!