找到问题的解决办法,而不是找蹩脚的接口。
在应届生面试的时候,很多面试官都会问——“多线程如何共享资源”。在操作系统层面上可以给出若干关键词答案,但是在语言层面,这个问题考虑的就没有那么简单了。 同时,很多人会将多线程数据共享和线程同步混淆。有关线程同步,我们会在接下来的章节里着重阐述。本文主要聚焦于保护共享数据,首先从加锁入手,进而扩展到加锁无法解决的问题,最后会给出一些其他保护方案。
参数入栈
一个存放参数的栈数据结构,相同函数的参数必须要在栈中相连,我们来实现这个功能,看下面代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 #include <stack> #include <iostream> class MutexTest { public : MutexTest (): m_charStack () { } ~MutexTest () { } void Push (int n, char c) { for (int i = 0 ; i < n; ++i) { m_charStack.push (c); std::cout << c; } std::cout << std::endl; } private : std::stack<char > m_charStack; }; MutexTest test; std::thread mutexTestThread1 (&MutexTest::Push, &test, 10 , 'a' ) ;std::thread mutexTestThread2 (&MutexTest::Push, &test, 10 , 'b' ) ;mutexTestThread1. join (); mutexTestThread2. join ();
上面这段代码的执行结果是不确定的,这是因为**我们无法预测线程的执行顺序,多个线程共享同一个数据栈存在竞态条件(Race Condition)。**所以我们可能得到下面的执行结果,所有的参数都是交叉在一起的,这不是我们想要的结果。
竞态条件
是多线程编程的噩梦,为什么会出现竞态条件可以自行百度,我们主要是为了解决这个问题。让最终执行的结果为:
参数入栈保护
std::mutex
是C11提供的数据加锁类,C 中通过实例化 std::mutex 创建互斥量,通过调用成员函数lock()进行上锁,unlock()进行解锁。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 class MutexTest { public : MutexTest (): m_mutex (), m_charStack () { } ~MutexTest () { } void Push (int n, char c) { m_mutex ().lock (); for (int i = 0 ; i < n; ++i) { m_charStack.push (c); std::cout << c; } std::cout << std::endl; m_mutex ().unlock (); } private : std::mutex m_mutex; std::stack<char > m_charStack; };
这段代码和上面的不同点就是使用std::mutex,在访问m_charStack之前上锁,其他线程就必须要等待解锁后才能访问m_charStack。如果我们忘记解锁,那么m_charStack就再也无法被访问了,所以有必要用RAII类std::lock_guard
进行封装——构造时上锁,析构时解锁。
1 2 3 4 5 6 7 8 9 10 void MutexTest::Push (int n, char c) { std::lock_guard<std::mutex> lg (m_mutex) ; for (int i = 0 ; i < n; ++i) { m_charStack.push (c); std::cout << c; } std::cout << std::endl; }
C++还提供了std::unique_lock
锁,相对于std::lock_guard
,该锁提供了更好地上锁和解锁灵活性控制。std::unique_lock
以独占所有权的方式来管理mutex对象的上锁和解锁操作。我们来看看其用法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 #include <iostream> #include <thread> #include <mutex> std::mutex foo,bar; void task_a () { /* simultaneous lock (prevents deadlock) */ std::lock (foo,bar); std::unique_lock<std::mutex> lck1 (foo,std::adopt_lock); std::unique_lock<std::mutex> lck2 (bar,std::adopt_lock); std::cout << "task a\n"; /* (unlocked automatically on destruction of lck1 and lck2) */ } void task_b () { /* foo.lock(); bar.lock(); // replaced by: */ std::unique_lock<std::mutex> lck1, lck2; lck1 = std::unique_lock<std::mutex>(bar,std::defer_lock); lck2 = std::unique_lock<std::mutex>(foo,std::defer_lock); /* simultaneous lock (prevents deadlock) */ std::lock (lck1,lck2); std::cout << "task b\n"; /* (unlocked automatically on destruction of lck1 and lck2) */ } int main () { std::thread th1 (task_a); std::thread th2 (task_b); th1.join(); th2.join(); return 0; }
现在我们终于得到了我们想要的结果,可惜在很多时候加锁并不是解决数据共享的万能药。std::mutex并不能完全解决保护数据的问题。存在好几种情况,即使我们已经使用了互斥量,数据还是被破坏了。
将被保护数据暴露到互斥量作用域之外
被保护数据的访问接口本身就存在竞态条件(条件竞争)
不要暴露你的数据
来看下面例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 struct protected_data { char data[100]; } class MutexTest { public: template<typename Function> void process(Function func) { std::lock_guard<std::mutex> guard<m_dataMutex>; func(m_data); } private: std::mutex m_dataMutex; struct protected_data m_data; } struct protected_data *pData; void inject(Data &data) { pData = &data; } /* 即使process没有显式传出,但是还是被inject传出 */ /* process执行完后,pData能在无锁的情况下访问数据 */ void Test() { process(inject); for(int i = 0; i < 100; ++i) { pData.data[i] = i; } } std::thread mutexTestThread1(Test); std::thread mutexTestThread2(Test);
我想不到比较好的例子来说明这个问题,上面的例子是基于C++并发编程上面改编的例子,其也能说明问题:在上锁后执行了用户定义的函数,将被保护数据传递到互斥锁作用域之外 。
这个场景,mutexTestThread1
解锁,mutexTestThread2
上锁后,mutexTestThread2
仍然无法独占被保护数据。pData总是获取到了被保护的数据,并在mutexTestThread2
访问数据时修改该数据。
这种代码看起来很正常,也很不容易被发现,但是背后的错误逻辑是致命的,数据常常被莫名修改,奔溃也有可能随之而来。
切勿将受保护数据的指针或引用传递到互斥锁作用域之外,无论是函数返回值,还是存储在外部可见内存,亦或是以参数的形式传递到用户提供的函数中去。
谨慎的设计你的数据接口
来看下面例子:
1 2 3 4 5 6 7 8 9 10 11 12 std::deque<int> intDeque(1, 10); std::stack<int> intStack(intDeque); void Process() { if(!intStack.empty()) { const int value = intStack.top(); intStack.pop(); } } std::thread t1(Process); std::thread t2(Process);
即使top()操作和pop()操作都已经上锁,也无法解决条件竞争的问题。
假设栈的实现中对数据的访问已加锁,在单线程情况下,上面程序可以无误执行,但是在多线程情况下,就有可能出现异常。**调用空stack的top()是未定义行为。**在多线程情况下,intStack.empty()操作获取的结果是不可靠的。
上述例子中intStack栈只有一个元素,如果线程t1和t2的执行顺序如下,就会出现未定义行为:
1 2 3 4 5 6 // example 1 t1: intStack.empty() /* one element in intStack */ t1: intStack.top() /* one element in intStack */ t2: intStack.empty() /* one element in intStack */ t1: intStack.pop() /* no element in intStack */ t2: intStack.top() /* undefined behavior, intStack is empty() */
即使不出现未定义行为,也有可能出现非预期行为——处理同一份数据多次:
1 2 3 4 5 /* example 2 */ t1: intStack.empty() /* one element in intStack */ t2: intStack.empty() /* one element in intStack */ t1: intStack.top() /* handle this data */ t2: intStack.top() /* handle this data again */
要解决上述问题,就需要接口设计上有较大的改动,最好的操作是重新设计接口
1、重新设计接口实现:top()接口内提供异常机制,当栈大小为零时,抛出异常
2、重新设计接口功能:将pop()和top()操作合并
第1种方案并不能解决example 2,所以推荐重新设计接口功能。一个线程安全的栈类定义如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 template<typename T> class Stack { private: std::stack<T> m_data; mutable std::mutex m_mutex; public: Stack(): m_data(std::stack<int>()){} Stack(const Stack& other) { std::lock_guard<std::mutex> lock(other.m); data = other.data; } Stack& operator=(const Stack&) = delete; void push(T new_value) { std::lock_guard<std::mutex> lock(m); data.push(new_value); } std::shared_ptr<T> pop() { std::lock_guard<std::mutex> lock(m); if(data.empty()) nullptr; const std::shared_ptr<T> res(std::make_shared<T>(data.top())); data.pop(); return res; } void pop(T& value) { std::lock_guard<std::mutex> lock(m); if(data.empty()) return nullptr; value=data.top(); data.pop(); } bool empty() const { std::lock_guard<std::mutex> lock(m); return data.empty(); } };
栈操作为什么需要先top()后pop(),而不直接pop()时返回数据?这是为了防止pop()时的拷贝操作失败,导致数据丢失。
如果不重新设计接口,在使用的时候加锁也能解决这个问题:
1 2 3 4 5 6 7 8 9 10 std::mutex stackMutex; void Process () { std::lock_gurad<std::mutex> guard (statckMutex) ; if (!intStack.empty ()) { const int value = intStack.top (); intStack.pop (); } }
上述两种可能导致加锁失效的竞态条件场景,需要我们在组织代码或设计接口时精雕细琢,在很多场景下,提供线程安全的代码是很有必要的。
死锁
使用多线程,我们会遇到死锁问题,即使没有加锁,也是有可能出现死锁,必须要按照一定的规范来涉及代码,才能有效的避免死锁问题。
死锁的概念略去不说,死锁有可能发生在使用多个互斥量的场景下,也可能存在没有使用互斥量的场景:
两个线程都在等待对方释放互斥量
两个线程都调用了对方的join()函数
为了解决两个线程都在等待对方释放互斥量 导致的死锁问题,C++11提供了若干机制:
std::lock()函数
std::unique_lock类
锁住所有互斥量
只要将互斥量作为参数传递给std::lock(),std::lock()就能够锁住多个互斥量。std::lock()并未指定解锁和上锁的顺序,其能够保证:
std::lock()执行成功时,所有互斥量都已经被上锁,并且没有死锁问题
std::lock()执行失败时,已被其上锁的互斥量都会被解锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 #include <iostream> #include <thread> #include <mutex> class some_big_object { public : some_big_object (int a) :x (a) {} void Print () { std::cout << x << std::endl; } private : int x; }; class X { private : some_big_object& some_detail; std::mutex m; public : X (some_big_object & sd):some_detail (sd){} friend void swap (X& lhs, X& rhs) { if (&lhs==&rhs) return ; std::lock (lhs.m,rhs.m); std::lock_guard<std::mutex> lock_a (lhs.m,std::adopt_lock) ; std::lock_guard<std::mutex> lock_b (rhs.m,std::adopt_lock) ; std::swap (lhs.some_detail,rhs.some_detail); } }; template <class T>void swap (T& lhs,T& rhs) ;template <>void swap <some_big_object>(some_big_object &x, some_big_object &y){ X a (x) , b (y) ; swap (a, b); } int main () { some_big_object a (1 ) ,b (2 ) ; a.Print (), b.Print (); swap (a,b); a.Print (), b.Print (); return 0 ; }
上面一段代码使用了模板的偏特化特性,这里不需要深究,只需要知道swap(a, b)最终会调用X类的swap友元函数。在该友元函数中,std::lock()函数锁住两个互斥量,std::lock_guard负责unlock两个互斥量,如果不调用std::lock_guard(),需要手动unlock()。std::adopt_lock
参数表示互斥量已经上锁,这里仅仅是不会重复上锁。下面两个例子起到相同作用。
1 2 3 4 5 6 7 8 9 std::mutex mtx; std::lock (mtx); std::lock_guard<std::mutex> guard (mtx, std::adopt_lock) ;std::mutex mtx; std::lock (mtx); mtx.unlock ();
避免死锁的一点建议
C++并发编程中给出了几点避免死锁的进阶指导:
1、避免嵌套锁
2、避免在持有锁时调用用户提供的代码
3、使用固定顺序获取锁
4、使用锁的层次结构
前三个建议看字面意思就可以了,我们这里主要阐述锁的层次结构。层次锁需要遵守如下原则:
当代码试图对一个互斥量上锁,在该层锁已被低层持有时,上锁是不允许的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 hierarchical_mutex high_level_mutex (10000 ) ;hierarchical_mutex low_level_mutex (7000 ) ;hierarchical_mutex low_level_mutex (5000 ) ;int do_low_level_stuff () ;int low_level_func () { std::lock_guard<hierarchical_mutex> lk (low_level_mutex) ; return do_low_level_stuff (); } void high_level_stuff (int some_param) ;void high_level_func () { std::lock_guard<hierarchical_mutex> lk (high_level_mutex) ; high_level_stuff (low_level_func ()); } void middle_level_stuff (int some_param) ;void middle_level_func () { std::lock_guard<hierarchical_mutex> lk (middle_level_mutex) ; middle_level_stuff (high_level_stuff ()); } int main () { high_level_func (); middle_level_func (); }
按照层次锁的原则,high_level_func()能够正确执行,而middle_level_func()不能正确执行:
high_level_func()先获取到高层级的锁,然后获取到低层级的锁,符合原则
middle_level_func()先获取低层级的锁,然后获取到高层级的锁,不符合原则
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 class hierarchical_mutex { std::mutex internal_mutex; unsigned long const hierarchy_value; unsigned long previous_hierarchy_value; static thread_local unsigned long this_thread_hierarchy_value; void check_for_hierarchy_violation () { if (this_thread_hierarchy_value <= hierarchy_value) { throw std::logic_error (“mutex hierarchy violated”); } } void update_hierarchy_value () { previous_hierarchy_value=this_thread_hierarchy_value; this_thread_hierarchy_value=hierarchy_value; } public : explicit hierarchical_mutex (unsigned long value) : hierarchy_value(value), previous_hierarchy_value(0 ) { }void lock () { check_for_hierarchy_violation (); internal_mutex.lock (); update_hierarchy_value (); } void unlock () { this_thread_hierarchy_value=previous_hierarchy_value; internal_mutex.unlock (); } bool try_lock () { check_for_hierarchy_violation (); if (!internal_mutex.try_lock ()) return false ; update_hierarchy_value (); return true ; } }; thread_local unsigned long hierarchical_mutex::this_thread_hierarchy_value (ULONG_MAX) ;
保护共享数据的初始化过程
为了防止共享数据初始化时数据被破坏,C++提供了std::once_flag和std::call_once来保证共享数据初始化的正确性。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 /* using mutex */ std::shared_ptr<some_resource> resource_ptr; std::mutex resource_mutex; void foo() { std::unique_lock<std::mutex> lk(resource_mutex); if(!resource_ptr) { resource_ptr.reset(new some_resource); } lk.unlock(); resource_ptr->do_something(); } /* using call_once */ std::shared_ptr<some_resource> resource_ptr; std::once_flag resource_flag; void int_resource() { resource_ptr.reset(new some_resource); } void foo() { std::call_once(resource_flag, init_resource); resource_ptr->do_something(); }
保护很少更新的数据量
对于这种共享数据可以采用“读者-写着锁”,其允许两种不同的使用方式:一个作者线程独占访问和共享访问,让多个读者线程并发访问。
C++标准并没有提供相关的解决方案,我们可以使用boost::shared_mutex来做同步。对于更新操作,可以使
用std::lock_guard<boost::shared_mutex>
和std::unique_lock<boost::shared_mutex>
进行上锁;对于访问操作,可以使用boost::shared_lock<boost::shared_mutex>
获取共享访问权。我们来看下面例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 #include <map> #include <string> #include <mutex> #include <boost/thread/shared_mutex.hpp> class dns_entry; class dns_cache { std::map<std::string,dns_entry> entries; mutable boost::shared_mutex entry_mutex; public: dns_entry find_entry(std::string const& domain) const { boost::shared_lock<boost::shared_mutex> lk(entry_mutex); std::map<std::string,dns_entry>::const_iterator const it= entries.find(domain); return (it==entries.end())?dns_entry():it->second; } void update_or_add_entry(std::string const& domain, dns_entry const& dns_details) { std::lock_guard<boost::shared_mutex> lk(entry_mutex); entries[domain]=dns_details; } };
上面代码,find_entry()使用了boost::shared_lock<>
实例来保护器共享和只读权限;update_or_add_entry()使用std::lock_guard<>
实例来独占访问权限。
嵌套锁
对于一个已经上锁的互斥量多次上锁,会出现未定义行为。然而对于嵌套锁std::recursive_mutex
来说,多次上锁不会出现问题。
在互斥量锁住其他线程前,你必须释放你拥有的所有 锁,所以当你调用lock()三次时,你也必须调用unlock()三次。正确使
用 std::lock_guardstd::recursive_mutex 和 std::unique_lockstd::recursice_mutex 可以帮我们处理这些问题。
大多数情况下,嵌套锁是用在可被多线程并发访问的类上,所以其拥有一个互斥量保护其成员数据。每个公共成员函数 都会对互斥量上锁,然后完成对应的功能,之后再解锁互斥量。不过,有时一个公共成员函 数会调用另一个公共函数作为其操作的一部分。
不过上面提高的方案是不推荐的,推荐的做法是——从中提取出一个函数作为类的私有成员, 并且让所有成员函数都对其进行调用,这个私有成员函数不会对互斥量进行上锁(在调用前必 须获得锁)。