C++并发编程2——为共享数据加锁
ZeroJiu 愚昧之巅V4

找到问题的解决办法,而不是找蹩脚的接口。

在应届生面试的时候,很多面试官都会问——“多线程如何共享资源”。在操作系统层面上可以给出若干关键词答案,但是在语言层面,这个问题考虑的就没有那么简单了。同时,很多人会将多线程数据共享和线程同步混淆。有关线程同步,我们会在接下来的章节里着重阐述。本文主要聚焦于保护共享数据,首先从加锁入手,进而扩展到加锁无法解决的问题,最后会给出一些其他保护方案。

参数入栈

一个存放参数的栈数据结构,相同函数的参数必须要在栈中相连,我们来实现这个功能,看下面代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
#include <stack>
#include <iostream>

class MutexTest
{
public:
MutexTest(): m_charStack() { }
~MutexTest() { }

void Push(int n, char c)
{
for (int i = 0; i < n; ++i)
{
m_charStack.push(c);
std::cout << c;
}
std::cout << std::endl;
}
private:
std::stack<char> m_charStack;
};

MutexTest test;
std::thread mutexTestThread1(&MutexTest::Push, &test, 10, 'a');
std::thread mutexTestThread2(&MutexTest::Push, &test, 10, 'b');

mutexTestThread1.join();
mutexTestThread2.join();

上面这段代码的执行结果是不确定的,这是因为**我们无法预测线程的执行顺序,多个线程共享同一个数据栈存在竞态条件(Race Condition)。**所以我们可能得到下面的执行结果,所有的参数都是交叉在一起的,这不是我们想要的结果。

1
2
aabbbbbbbaaaaaaaabbb

竞态条件是多线程编程的噩梦,为什么会出现竞态条件可以自行百度,我们主要是为了解决这个问题。让最终执行的结果为:

1
2
aaaaaaaaaa
bbbbbbbbbb

参数入栈保护

std::mutex是C11提供的数据加锁类,C中通过实例化 std::mutex 创建互斥量,通过调用成员函数lock()进行上锁,unlock()进行解锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class MutexTest
{
public:
MutexTest(): m_mutex(), m_charStack() { }
~MutexTest() { }

void Push(int n, char c)
{
m_mutex().lock();
for (int i = 0; i < n; ++i)
{
m_charStack.push(c);
std::cout << c;
}
std::cout << std::endl;
m_mutex().unlock();
}
private:
std::mutex m_mutex;
std::stack<char> m_charStack;
};

这段代码和上面的不同点就是使用std::mutex,在访问m_charStack之前上锁,其他线程就必须要等待解锁后才能访问m_charStack。如果我们忘记解锁,那么m_charStack就再也无法被访问了,所以有必要用RAII类std::lock_guard进行封装——构造时上锁,析构时解锁。

1
2
3
4
5
6
7
8
9
10
void MutexTest::Push(int n, char c)
{
std::lock_guard<std::mutex> lg(m_mutex);
for (int i = 0; i < n; ++i)
{
m_charStack.push(c);
std::cout << c;
}
std::cout << std::endl;
}

C++还提供了std::unique_lock锁,相对于std::lock_guard,该锁提供了更好地上锁和解锁灵活性控制。std::unique_lock以独占所有权的方式来管理mutex对象的上锁和解锁操作。我们来看看其用法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
#include <iostream>   
#include <thread>
#include <mutex>

std::mutex foo,bar;

void task_a () {
/* simultaneous lock (prevents deadlock) */
std::lock (foo,bar);
std::unique_lock<std::mutex> lck1 (foo,std::adopt_lock);
std::unique_lock<std::mutex> lck2 (bar,std::adopt_lock);
std::cout << "task a\n";
/* (unlocked automatically on destruction of lck1 and lck2) */
}

void task_b () {
/* foo.lock(); bar.lock(); // replaced by: */
std::unique_lock<std::mutex> lck1, lck2;
lck1 = std::unique_lock<std::mutex>(bar,std::defer_lock);
lck2 = std::unique_lock<std::mutex>(foo,std::defer_lock);
/* simultaneous lock (prevents deadlock) */
std::lock (lck1,lck2);
std::cout << "task b\n";
/* (unlocked automatically on destruction of lck1 and lck2) */
}


int main ()
{
std::thread th1 (task_a);
std::thread th2 (task_b);

th1.join();
th2.join();

return 0;
}

现在我们终于得到了我们想要的结果,可惜在很多时候加锁并不是解决数据共享的万能药。std::mutex并不能完全解决保护数据的问题。存在好几种情况,即使我们已经使用了互斥量,数据还是被破坏了。

  • 将被保护数据暴露到互斥量作用域之外
  • 被保护数据的访问接口本身就存在竞态条件(条件竞争)

不要暴露你的数据

来看下面例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
struct protected_data
{
char data[100];
}
class MutexTest
{
public:
template<typename Function>
void process(Function func)
{
std::lock_guard<std::mutex> guard<m_dataMutex>;
func(m_data);
}
private:
std::mutex m_dataMutex;
struct protected_data m_data;
}
struct protected_data *pData;
void inject(Data &data)
{
pData = &data;
}
/* 即使process没有显式传出,但是还是被inject传出 */
/* process执行完后,pData能在无锁的情况下访问数据 */
void Test()
{
process(inject);
for(int i = 0; i < 100; ++i)
{
pData.data[i] = i;
}
}
std::thread mutexTestThread1(Test);
std::thread mutexTestThread2(Test);

我想不到比较好的例子来说明这个问题,上面的例子是基于C++并发编程上面改编的例子,其也能说明问题:在上锁后执行了用户定义的函数,将被保护数据传递到互斥锁作用域之外

这个场景,mutexTestThread1解锁,mutexTestThread2上锁后,mutexTestThread2仍然无法独占被保护数据。pData总是获取到了被保护的数据,并在mutexTestThread2访问数据时修改该数据。

这种代码看起来很正常,也很不容易被发现,但是背后的错误逻辑是致命的,数据常常被莫名修改,奔溃也有可能随之而来。

切勿将受保护数据的指针或引用传递到互斥锁作用域之外,无论是函数返回值,还是存储在外部可见内存,亦或是以参数的形式传递到用户提供的函数中去。

谨慎的设计你的数据接口

来看下面例子:

1
2
3
4
5
6
7
8
9
10
11
12
std::deque<int> intDeque(1, 10);
std::stack<int> intStack(intDeque);
void Process()
{
if(!intStack.empty())
{
const int value = intStack.top();
intStack.pop();
}
}
std::thread t1(Process);
std::thread t2(Process);

即使top()操作和pop()操作都已经上锁,也无法解决条件竞争的问题。

假设栈的实现中对数据的访问已加锁,在单线程情况下,上面程序可以无误执行,但是在多线程情况下,就有可能出现异常。**调用空stack的top()是未定义行为。**在多线程情况下,intStack.empty()操作获取的结果是不可靠的。

上述例子中intStack栈只有一个元素,如果线程t1和t2的执行顺序如下,就会出现未定义行为:

1
2
3
4
5
6
// example 1
t1: intStack.empty() /* one element in intStack */
t1: intStack.top() /* one element in intStack */
t2: intStack.empty() /* one element in intStack */
t1: intStack.pop() /* no element in intStack */
t2: intStack.top() /* undefined behavior, intStack is empty() */

即使不出现未定义行为,也有可能出现非预期行为——处理同一份数据多次:

1
2
3
4
5
/* example 2 */
t1: intStack.empty() /* one element in intStack */
t2: intStack.empty() /* one element in intStack */
t1: intStack.top() /* handle this data */
t2: intStack.top() /* handle this data again */

要解决上述问题,就需要接口设计上有较大的改动,最好的操作是重新设计接口

  • 1、重新设计接口实现:top()接口内提供异常机制,当栈大小为零时,抛出异常
  • 2、重新设计接口功能:将pop()和top()操作合并

第1种方案并不能解决example 2,所以推荐重新设计接口功能。一个线程安全的栈类定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
template<typename T>
class Stack
{
private:
std::stack<T> m_data;
mutable std::mutex m_mutex;
public:
Stack(): m_data(std::stack<int>()){}
Stack(const Stack& other)
{
std::lock_guard<std::mutex> lock(other.m);
data = other.data;
}
Stack& operator=(const Stack&) = delete;
void push(T new_value)
{
std::lock_guard<std::mutex> lock(m);
data.push(new_value);
}
std::shared_ptr<T> pop()
{
std::lock_guard<std::mutex> lock(m);
if(data.empty()) nullptr;
const std::shared_ptr<T> res(std::make_shared<T>(data.top()));
data.pop();
return res;
}
void pop(T& value)
{
std::lock_guard<std::mutex> lock(m);
if(data.empty()) return nullptr;
value=data.top();
data.pop();
}
bool empty() const
{
std::lock_guard<std::mutex> lock(m);
return data.empty();
}
};

栈操作为什么需要先top()后pop(),而不直接pop()时返回数据?这是为了防止pop()时的拷贝操作失败,导致数据丢失。

如果不重新设计接口,在使用的时候加锁也能解决这个问题:

1
2
3
4
5
6
7
8
9
10
std::mutex stackMutex;
void Process()
{
std::lock_gurad<std::mutex> guard(statckMutex);
if(!intStack.empty())
{
const int value = intStack.top();
intStack.pop();
}
}

上述两种可能导致加锁失效的竞态条件场景,需要我们在组织代码或设计接口时精雕细琢,在很多场景下,提供线程安全的代码是很有必要的。

死锁

使用多线程,我们会遇到死锁问题,即使没有加锁,也是有可能出现死锁,必须要按照一定的规范来涉及代码,才能有效的避免死锁问题。

死锁的概念略去不说,死锁有可能发生在使用多个互斥量的场景下,也可能存在没有使用互斥量的场景:

  • 两个线程都在等待对方释放互斥量
  • 两个线程都调用了对方的join()函数

为了解决两个线程都在等待对方释放互斥量导致的死锁问题,C++11提供了若干机制:

  • std::lock()函数
  • std::unique_lock类

锁住所有互斥量

只要将互斥量作为参数传递给std::lock(),std::lock()就能够锁住多个互斥量。std::lock()并未指定解锁和上锁的顺序,其能够保证:

  • std::lock()执行成功时,所有互斥量都已经被上锁,并且没有死锁问题
  • std::lock()执行失败时,已被其上锁的互斥量都会被解锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
#include <iostream>       /* std::cout */
#include <thread> /* std::thread */
#include <mutex> /* std::mutex, std::lock */

class some_big_object
{
public:
some_big_object(int a) :x(a) {}
void Print(){ std::cout << x << std::endl; }
private:
int x;
};
class X
{
private:
some_big_object& some_detail;
std::mutex m;
public:
X(some_big_object & sd):some_detail(sd){}
friend void swap(X& lhs, X& rhs)
{
if(&lhs==&rhs)
return;
std::lock(lhs.m,rhs.m);
std::lock_guard<std::mutex> lock_a(lhs.m,std::adopt_lock);
std::lock_guard<std::mutex> lock_b(rhs.m,std::adopt_lock);
std::swap(lhs.some_detail,rhs.some_detail);
}
};

template<class T>
void swap(T& lhs,T& rhs);

template<>
void swap<some_big_object>(some_big_object &x, some_big_object &y)
{
X a(x), b(y);
swap(a, b);
}

int main ()
{
some_big_object a(1),b(2);
a.Print(), b.Print();
swap(a,b);
a.Print(), b.Print();
return 0;
}

上面一段代码使用了模板的偏特化特性,这里不需要深究,只需要知道swap(a, b)最终会调用X类的swap友元函数。在该友元函数中,std::lock()函数锁住两个互斥量,std::lock_guard负责unlock两个互斥量,如果不调用std::lock_guard(),需要手动unlock()。std::adopt_lock参数表示互斥量已经上锁,这里仅仅是不会重复上锁。下面两个例子起到相同作用。

1
2
3
4
5
6
7
8
9
/* example 1 */
std::mutex mtx;
std::lock(mtx); // have to lock before the next sentence
std::lock_guard<std::mutex> guard(mtx, std::adopt_lock);

/* example 2 */
std::mutex mtx;
std::lock(mtx);
mtx.unlock();

避免死锁的一点建议

C++并发编程中给出了几点避免死锁的进阶指导:

  • 1、避免嵌套锁
  • 2、避免在持有锁时调用用户提供的代码
  • 3、使用固定顺序获取锁
  • 4、使用锁的层次结构

前三个建议看字面意思就可以了,我们这里主要阐述锁的层次结构。层次锁需要遵守如下原则:

当代码试图对一个互斥量上锁,在该层锁已被低层持有时,上锁是不允许的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
hierarchical_mutex high_level_mutex(10000);
hierarchical_mutex low_level_mutex(7000);
hierarchical_mutex low_level_mutex(5000);

int do_low_level_stuff();
int low_level_func()
{
std::lock_guard<hierarchical_mutex> lk(low_level_mutex);
return do_low_level_stuff();
}

void high_level_stuff(int some_param);
void high_level_func()
{
std::lock_guard<hierarchical_mutex> lk(high_level_mutex);
high_level_stuff(low_level_func());
}

void middle_level_stuff(int some_param);
void middle_level_func()
{
std::lock_guard<hierarchical_mutex> lk(middle_level_mutex);
middle_level_stuff(high_level_stuff());
}

int main()
{
high_level_func();
middle_level_func();
}

按照层次锁的原则,high_level_func()能够正确执行,而middle_level_func()不能正确执行:

  • high_level_func()先获取到高层级的锁,然后获取到低层级的锁,符合原则
  • middle_level_func()先获取低层级的锁,然后获取到高层级的锁,不符合原则
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
class hierarchical_mutex
{
std::mutex internal_mutex;
unsigned long const hierarchy_value;
unsigned long previous_hierarchy_value;
static thread_local unsigned long this_thread_hierarchy_value;
void check_for_hierarchy_violation()
{
if(this_thread_hierarchy_value <= hierarchy_value)
{
throw std::logic_error(“mutex hierarchy violated”);
}
}
void update_hierarchy_value()
{
previous_hierarchy_value=this_thread_hierarchy_value;
this_thread_hierarchy_value=hierarchy_value;
}
public:
explicit hierarchical_mutex(unsigned long value):
hierarchy_value(value),
previous_hierarchy_value(0)
{}
void lock() {
check_for_hierarchy_violation();
internal_mutex.lock();
update_hierarchy_value();
}
void unlock()
{
this_thread_hierarchy_value=previous_hierarchy_value;
internal_mutex.unlock();
}
bool try_lock()
{
check_for_hierarchy_violation();
if(!internal_mutex.try_lock())
return false;
update_hierarchy_value();
return true;
} };
thread_local unsigned long hierarchical_mutex::this_thread_hierarchy_value(ULONG_MAX);

保护共享数据的初始化过程

为了防止共享数据初始化时数据被破坏,C++提供了std::once_flag和std::call_once来保证共享数据初始化的正确性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/* using mutex */
std::shared_ptr<some_resource> resource_ptr;
std::mutex resource_mutex;
void foo()
{
std::unique_lock<std::mutex> lk(resource_mutex);
if(!resource_ptr)
{
resource_ptr.reset(new some_resource);
}
lk.unlock();
resource_ptr->do_something();
}

/* using call_once */
std::shared_ptr<some_resource> resource_ptr;
std::once_flag resource_flag;

void int_resource()
{
resource_ptr.reset(new some_resource);
}

void foo()
{
std::call_once(resource_flag, init_resource);
resource_ptr->do_something();
}

保护很少更新的数据量

对于这种共享数据可以采用“读者-写着锁”,其允许两种不同的使用方式:一个作者线程独占访问和共享访问,让多个读者线程并发访问。

C++标准并没有提供相关的解决方案,我们可以使用boost::shared_mutex来做同步。对于更新操作,可以使
std::lock_guard<boost::shared_mutex>std::unique_lock<boost::shared_mutex>进行上锁;对于访问操作,可以使用boost::shared_lock<boost::shared_mutex>获取共享访问权。我们来看下面例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#include <map>
#include <string>
#include <mutex>
#include <boost/thread/shared_mutex.hpp>
class dns_entry;
class dns_cache
{
std::map<std::string,dns_entry> entries;
mutable boost::shared_mutex entry_mutex;
public:
dns_entry find_entry(std::string const& domain) const
{
boost::shared_lock<boost::shared_mutex> lk(entry_mutex);
std::map<std::string,dns_entry>::const_iterator const it=
entries.find(domain);
return (it==entries.end())?dns_entry():it->second;
}
void update_or_add_entry(std::string const& domain,
dns_entry const& dns_details)
{
std::lock_guard<boost::shared_mutex> lk(entry_mutex);
entries[domain]=dns_details;
} };

上面代码,find_entry()使用了boost::shared_lock<>实例来保护器共享和只读权限;update_or_add_entry()使用std::lock_guard<>实例来独占访问权限。

嵌套锁

对于一个已经上锁的互斥量多次上锁,会出现未定义行为。然而对于嵌套锁std::recursive_mutex来说,多次上锁不会出现问题。

在互斥量锁住其他线程前,你必须释放你拥有的所有 锁,所以当你调用lock()三次时,你也必须调用unlock()三次。正确使
用 std::lock_guardstd::recursive_mutex 和 std::unique_lockstd::recursice_mutex 可以帮我们处理这些问题。

大多数情况下,嵌套锁是用在可被多线程并发访问的类上,所以其拥有一个互斥量保护其成员数据。每个公共成员函数 都会对互斥量上锁,然后完成对应的功能,之后再解锁互斥量。不过,有时一个公共成员函 数会调用另一个公共函数作为其操作的一部分。

不过上面提高的方案是不推荐的,推荐的做法是——从中提取出一个函数作为类的私有成员, 并且让所有成员函数都对其进行调用,这个私有成员函数不会对互斥量进行上锁(在调用前必 须获得锁)。

Powered by Hexo & Theme Keep
This site is deployed on
Unique Visitor Page View