原子变量
C++11提供了一个原子类型std::atomic<T>
,通过这个原子类型管理的内部变量就可以称之为原子变量,我们可以给原子类型指定bool、char、int、long、指针
等类型作为模板参数(不支持浮点类型和复合类型
)。
原子指的是一系列不可被CPU上下文交换的机器指令,这些指令组合在一起就形成了原子操作。在多核CPU下,当某个CPU核心开始运行原子操作时,会先暂停其它CPU内核对内存的操作,以保证原子操作不会被其它CPU内核所干扰。
由于原子操作是通过指令提供的支持,因此它的性能相比锁和消息传递会好很多。相比较于锁而言,原子类型不需要开发者处理加锁和释放锁的问题,同时支持修改,读取等操作,还具备较高的并发性能,几乎所有的语言都支持原子类型。
可以看出原子类型是无锁类型,但是无锁不代表无需等待,因为原子类型内部使用了CAS
循环,当大量的冲突发生时,该等待还是得等待!但是总归比锁要好。
C++11内置了整形的原子变量,这样就可以更方便的使用原子变量了。在多线程操作中,使用原子变量之后就不需要再使用互斥量来保护该变量了,用起来更简洁。因为对原子变量进行的操作只能是一个原子操作(atomic operation
),原子操作指的是不会被线程调度机制打断的操作( read-modify-write ),这种操作一旦开始,就一直运行到结束,中间不会有任何的上下文切换。 多线程同时访问共享资源造成数据混乱的原因就是因为CPU的上下文切换导致的,使用原子变量解决了这个问题,因此互斥锁的使用也就不再需要了。
让我们看一些代码来理解这一点。
Copy #include <iostream>
#include <thread>
int main()
{
int sum = 0;
auto f = [&sum](){
for(int i = 0; i < 1000000; i++)
{
sum += 1;
}
};
std::thread t1(f);
std::thread t2(f);
t1.join();
t2.join();
std::cout << sum << std::endl;
return 0;
}
在此代码中,我们在两个线程 sum
上递增,它们之间没有任何同步。我们希望程序输出的结果是 2000000
,但是通过编译运行代码,我们会发现结果不尽如意:
Copy $ g++ --version
g++ (GCC) 11.4.0
$ g++ -dumpmachine
x86_64-pc-cygwin
$ g++ nonatomic.cpp -o app
$ ./app
1508328
$ ./app
1096773
$ ./app
1303703
这段代码中对 sum
变量的递增操作在多个线程中并不是线程安全的。多个线程同时对 sum
进行读取、修改、写入的操作,会引发数据竞争(data race),导致结果不确定。
线程不安全的递增操作
在 C++ 中,sum += 1
并不是一个原子操作。它实际分为以下几个步骤:
在多线程环境下,如果两个线程同时进行上述操作,就可能出现以下情况:
线程 A 将值加 1 并写回,sum
的值变为 1001。
线程 B 将值加 1 并写回,sum
的值也变为 1001。
这样,两个线程各执行了一次加 1 操作,但 sum
的值只增加了 1。这就是数据竞争导致的错误。
atomic 类成员
不会存储到缓存中,运算完毕后直接给内存了atomic变量无法保护复合类型的数据(类或者结构体),对于指针类型的atomic变量来说,只有指针 ++/--
才是原子操作,指针所指的对象(类或者结构体)并不能保证其原子性
类定义
Copy // 定义于头文件 <atomic>
template< class T >
struct atomic;
通过定义可得知:在使用这个模板类的时候,一定要指定模板类型。
构造函数
Copy // 1
atomic() noexcept = default;
// 2
constexpr atomic( T desired ) noexcept;
// 3
atomic( const atomic& ) = delete;
构造函数2:使用 desired
初始化原子变量的值。
构造函数3:使用=delete
显示删除拷贝构造函数, 不允许进行对象之间的拷贝
Copy /**
* @brief 初始化atomic变量
*/
void test()
{
atomic<char> c{'a'};
atomic<int> b;
atomic_init(&b, 9);
}
公共成员函数
原子类型在类内部重载了=
操作符,并且不允许在类的外部使用 =
进行对象的拷贝。
Copy T operator=( T desired ) noexcept;
T operator=( T desired ) volatile noexcept;
atomic& operator=( const atomic& ) = delete;
atomic& operator=( const atomic& ) volatile = delete;
原子地以 desired
替换当前值。按照 order
的值影响内存。
Copy void store( T desired, std::memory_order order = std::memory_order_seq_cst ) noexcept;
void store( T desired, std::memory_order order = std::memory_order_seq_cst ) volatile noexcept;
原子地加载并返回原子变量的当前值。按照 order
的值影响内存。直接访问原子对象也可以得到原子变量的当前值。
Copy T load( std::memory_order order = std::memory_order_seq_cst ) const noexcept;
T load( std::memory_order order = std::memory_order_seq_cst ) const volatile noexcept;
Copy /**
* @brief 修改atomic变量的值
*/
void test01()
{
atomic_char cc('b');
cc = 'd';
cout << cc << endl;
cc.store('a');
cout << cc << endl;
char ccc = cc.exchange('e');//返回之前的旧值
cout << cc.load() << endl;
cout << ccc << endl;
}
Copy class Counter {
public:
void increment() {
for (int i = 0; i < 100; ++i) {
// mtx.lock();
number++;
cout << "+++ increment thread id: " << this_thread::get_id()
<< ", number: " << number << endl;
// mtx.unlock();
this_thread::sleep_for(chrono::milliseconds(100));
}
}
void increment1() {
for (int i = 0; i < 100; ++i) {
// mtx.lock();
number++;
cout << "*** decrement thread id: " << this_thread::get_id()
<< ", number: " << number << endl;
// mtx.unlock();
this_thread::sleep_for(chrono::milliseconds(50));
}
}
private:
// int number = 0;
atomic_int number{0};
// atomic<Base*> base;
// mutex mtx;
};
特化成员函数
T operator+= (T val) volatile noexcept;
T operator+= (T val) noexcept;
T operator-= (T val) volatile noexcept;
T operator-= (T val) noexcept;
T operator&= (T val) volatile noexcept;
T operator&= (T val) noexcept;
T operator|= (T val) volatile noexcept;
T operator|= (T val) noexcept;
T operator^= (T val) volatile noexcept;
T operator^= (T val) noexcept;
T operator+= (ptrdiff_t val) volatile noexcept;
T operator+= (ptrdiff_t val) noexcept;
T operator-= (ptrdiff_t val) volatile noexcept;
T operator-= (ptrdiff_t val) noexcept;
以上各个 operator 都会有对应的 fetch_* 操作,详细见下表:
CAS(compare and swap)
没有交换操作,CAS 就不完整。交换操作可通过 exchange() 成员函数实现。
Copy #include <iostream>
#include <atomic>
int main() {
std::atomic<int> atomicInt(10);
int oldValue = atomicInt.exchange(20);
std::cout << "Old value: " << oldValue << ", New value: " << atomicInt.load() << std::endl;
return 0;
}
调用 exchange() 会将原子的当前值与所需值互换,并返回原来的值。所有操作均以原子方式完成。
CAS 在swap前增加了一个附加条件,可通过 compare_exchange_strong() 和 compare_exchange_weak()这两个函数使用。
Copy std::atomic<int> x(0);
int expected = x.load();
int desired = 42;
// If x == expected, x = desired and return true
// Otherwise, expected = x and return false
while(!x.compare_exchange_strong(expected, desired));
如上代码所示,该方法将一个期望值与原子变量进行比较。如果它们确实相等,原子变量将被交换,并返回 true。
否则,原子变量的当前值将被加载到expected变量,并返回 false。这种机制允许我们创建一个重试循环,而无需再次显式地读取原子值。
CAS 会失败的原因是与其他线程的争用。在我们对原子进行初始读取以获得预期值后,我们必须确保其他线程在我们上次读取原子后没有对其进行更改。 假设多个线程都在尝试 CAS,那么重试循环就会一直运行,直到我们的 CAS 比其他线程更快地完成更改。
使用 compare_exchange_strong
的场景:确定性操作
在需要严格保证操作成功或失败时使用 compare_exchange_strong
。
Copy #include <iostream>
#include <atomic>
#include <thread>
std::atomic<int> atomicInt(0);
void updateValue() {
int expected = 0;
int desired = 1;
if (atomicInt.compare_exchange_strong(expected, desired)) {
std::cout << "Value changed to " << desired << std::endl;
} else {
std::cout << "Expected " << expected << " but found " << atomicInt.load() << std::endl;
}
}
int main() {
std::thread t(updateValue);
t.join();
return 0;
}
使用 compare_exchange_weak
的场景
compare_exchange_weak
允许伪失败,即使当前值等于预期值,也可能返回 false
。这种设计是为了优化在某些架构上的性能,适用于需要反复尝试的场景,如实现自旋锁或无锁数据结构。
示例 1:自旋锁
在实现自旋锁时,使用 compare_exchange_weak
可以更高效地反复尝试获取锁。
Copy #include <iostream>
#include <atomic>
#include <thread>
#include <vector>
std::atomic<bool> lockFlag(false);
void spinlock_acquire() {
bool expected = false;
while (!lockFlag.compare_exchange_weak(expected, true)) {
expected = false;
}
}
void spinlock_release() {
lockFlag.store(false);
}
int counter = 0;
void increment() {
for (int i = 0; i < 1000; ++i) {
spinlock_acquire();
++counter;
spinlock_release();
}
}
int main() {
const int numThreads = 10;
std::vector<std::thread> threads;
for (int i = 0; i < numThreads; ++i) {
threads.push_back(std::thread(increment));
}
for (auto& t : threads) {
t.join();
}
std::cout << "Final counter value: " << counter << std::endl;
return 0;
}
示例 2:无锁队列
在无锁数据结构(如无锁队列)中,使用 compare_exchange_weak
反复尝试更新指针或计数器,优化性能。
Copy #include <iostream>
#include <atomic>
#include <thread>
#include <vector>
class Node {
public:
int value;
Node* next;
Node(int val) : value(val), next(nullptr) {}
};
std::atomic<Node*> head(nullptr);
void push(int value) {
Node* newNode = new Node(value);
Node* oldHead;
do {
oldHead = head.load(std::memory_order_relaxed);
newNode->next = oldHead;
} while (!head.compare_exchange_weak(oldHead, newNode, std::memory_order_release, std::memory_order_relaxed));
}
int main() {
std::vector<std::thread> threads;
for (int i = 0; i < 10; ++i) {
threads.push_back(std::thread(push, i));
}
for (auto& t : threads) {
t.join();
}
Node* current = head.load();
while (current) {
std::cout << current->value << " ";
Node* tmp = current;
current = current->next;
delete tmp;
}
std::cout << std::endl;
return 0;
}
Strong vs Weak
std::atomic 为 CAS 提供了强和弱两种选项。
compare_exchange_strong() 只有在原子的当前值由于争用而不再是我们所期望的值时,才会失败。
另一方面,compare_exchange_weak()允许虚假失败。这意味着,即使预期值等于当前值,它仍可能返回 false。
但为什么我们希望即使当前值等于预期值,CAS 也会失败呢?这是因为在某些平台上,获取独占访问可能代价非常大,但读取原子值却往往代价很小。
因此,弱 CAS 不会要求独占访问 ,而是执行一定时间的尝试来获取锁(硬件),这可能会超时--类似于网络套接字。
强 (compare_exchange_strong
)
弱 (compare_exchange_weak
)
如果匹配,进行强制尝试以获得独占访问,即等待直到成功。
如果不匹配,将当前值存储在预期变量中,并立即返回 false
。
如果匹配,它将进行一次有时限的尝试以获得独占访问。
如果不匹配,将当前值存储在预期变量中,并立即返回 false
。
如果我们的有时限的尝试超时,我们返回 false
——即使预期值与当前值匹配。
一旦获得独占访问,我们再次将当前值与预期值进行比较,以防它发生了变化。
如果匹配,执行交换并返回 true
。
否则,将当前值存储在预期变量中,并返回 false
。
一旦获得独占访问,我们再次将当前值与预期值进行比较,以防它发生了变化。
如果匹配,执行交换并返回 true
。
否则,将当前值存储在预期变量中,并返回 false
。
内存顺序
编译器和 CPU 能够对程序指令进行重新排序,通常彼此独立。也就是说,编译器可以重新排序指令,CPU 可以再次重新排序指令。但是,只有当编译器在两组指令之间没有建立任何依赖关系时,才允许这样做。
例如,下面的代码可以重新排序,因为对 x 的赋值和对 y 的赋值之间没有关系。也就是说,编译器或 CPU 可能先分配 y,然后分配 x。但是,这不会改变程序的语义含义。
Copy int x = 10;
int y = 5;
另一方面,下面的代码示例不能重新排序,因为编译器建立了 x 和 y 之间的关系。这里很容易看出,因为 y 取决于 x 的值。
Copy int x = 10;
int y = x + 1;
我们先从一个简单的例子开始,一步步的引出内存顺序的概念
Copy #include <cassert>
#include <thread>
int data = 0;
void producer() {
data = 100; // Write data
}
void consumer() {
assert(data == 100);
}
int main() {
std::thread t1(producer);
std::thread t2(consumer);
t1.join();
t2.join();
return 0;
}
上面的多线程示例将无法编译,因为当线程 1 尝试设置数据值而线程 2 尝试读取数据值时,存在明显的数据争用。多个线程对共享资源进行操作时,需要加互斥锁或者使用原子变量,我们使用原子变量作为条件对上面的代码进行改进
Copy #include <atomic>
#include <cassert>
#include <thread>
int data = 0;
std::atomic<bool> ready(false);
void producer() {
data = 100;
ready.store(true); // Set flag
}
void consumer() {
while (!ready.load())
;
assert(data == 100);
}
int main() {
std::thread t1(producer);
std::thread t2(consumer);
t1.join();
t2.join();
return 0;
}
上面的代码正确的解决了多线程之间数据竞争的问题,但是如果我替换 ready.store(true);
为 ready.store(true, std::memory_order_relaxed);
并替换 while (!ready.load())
为 while (!ready.load(std::memory_order_relaxed))
,会发现数据竞争又产生了。what f**K!
主要问题在于两个线程中的操作不再有顺序了,编译器或 CPU 可以自由地对两个线程中的指令进行重新排序。 一旦进程 2 确定该标志已设置为 true,它就会尝试读取 的 data
值。但是,此时的 data
值可能压根并没有修改。大概的样子如下图所示:
上面的示例我们可以看出,在 memory_order_relaxed
模式下,两个线程无法就共享变量的操作顺序达成一致。从线程 1 的角度来看,它执行的操作如下:
Copy write(data, 100)
store(ready, true)
但是,从线程 2 的角度来看,它看到线程 1 执行的操作顺序是:
Copy store(ready, true)
write(data, 100)
如果不就操作共享变量的顺序达成一致,则跨线程对这些变量进行更改是不安全的。
现在我们将ready.store(true, std::memory_order_relaxed);
替换为 ready.store(true, std::memory_order_seq_cst);
并替换 while (!ready.load(std::memory_order_relaxed))为
while (!ready.load(std::memory_order_seq_cst))
.
再次编译会发现,数据竞争被成功解决了,那这背后的原理是什么呢?——Memory Barrier 内存屏障
Copy Thread 1 Memory Thread 2
--------- ------- ---------
| | |
| write(data, 100 ) | |
| ----------------------- > | |
| | |
| ================Memory Barrier=================== |
| store(ready, true ) | |
| ----------------------- > | |
| | load(ready ) == true |
| | < ---------------------- |
| ================Memory Barrier=================== |
| | |
| | read ( data ) |
| | < ---------------------- |
| | |
线程 1 中 write(data, 100)
之后的内存屏障保证了对 data
的写操作发生在对 ready
的存储之前。
线程 2 中 read(data)
之前的内存屏障确保了对 data
的读取发生在从 ready
的加载之后。
内存顺序的类型
C++ 提供了不同级别的内存顺序,按如下顺序从最严格到最不严格排列。
memory_order_relaxed
, 这是最宽松的规则,它对编译器和CPU不做任何限制,可以乱序
memory_order_release
释放,设定内存屏障(Memory barrier),保证它之前的操作永远在它之前,但是它后面的操作可能被重排到它前面
memory_order_acquire
获取, 设定内存屏障,保证在它之后的访问永远在它之后,但是它之前的操作却有可能被重排到它后面,往往和Release在不同线程中联合使用
memory_order_acq_rel
,它是Acquire 和 Release 的结合,同时拥有它们俩提供的保证。比如你要对一个 atomic 自增 1,同时希望该操作之前和之后的读取或写入操作不会被重新排序
memory_order_seq_cst
顺序一致性, memory_order_seq_cst 就像是memory_order_acq_rel的加强版,它不管原子操作是属于读取还是写入的操作,只要某个线程有用到memory_order_seq_cst 的原子操作,线程中该memory_order_seq_cst 操作前的数据操作绝对不会被重新排在该memory_order_seq_cst 操作之后,且该memory_order_seq_cst 操作后的数据操作也绝对不会被重新排在memory_order_seq_cst 操作前。
Copy #include <atomic>
#include <cassert>
#include <iostream>
#include <thread>
int data = 0;
std::atomic<bool> ready(false);
void producer() {
data = 100;
ready.store(true, std::memory_order_release); // Set flag
}
void consumer() {
while (!ready.load(std::memory_order_acquire))
;
assert(data == 100);
}
int main() {
std::thread t1(producer);
std::thread t2(consumer);
t1.join();
t2.join();
return 0;
}
上述例子与之前的例子相同,只是这里使用了 std::memory_order_release
来处理 ready.store()
,并且使用了 memory_order_acquire
来处理 ready.load()
。
然而,不同之处在于,这次的内存屏障是在 ready.store()
和 ready.load()
这一对操作上形成的,并且这种内存屏障只有在跨线程使用相同的原子变量时才有效。
假设你有一个变量 x
,在两个线程中进行修改,你可以在线程 1 中使用 x.store(std::memory_order_release)
,而在线程 2 中使用 x.load(std::memory_order_acquire)
,这样你就可以在这个变量上形成两个线程之间的同步点。
顺序一致性模型和释放-获取模型之间的主要区别在于,顺序一致性模型在所有线程之间强制执行全局操作顺序,而释放-获取模型仅在释放和获取操作对之间强制执行操作顺序。
现在,我们可以回顾一下为什么 一开始在没有指定内存顺序时没有报数据竞争的错误。这是因为在 C++ 中,当没有指定内存顺序时,默认假设使用的是 std::memory_order_seq_cst
。 由于这是最强的内存模式,因此不会存在数据竞争。
原子变量和std::mutex
之间的效率对比
Copy #include <atomic>
#include <chrono>
#include <iostream>
#include <mutex>
#include <thread>
#include <vector>
const int NUM_THREADS = 8;
const int NUM_ITERATIONS = 1000000;
void atomicIncrement(std::atomic<int>& counter) {
for (int i = 0; i < NUM_ITERATIONS; ++i) {
++counter;
}
}
void mutexIncrement(int& counter, std::mutex& mtx) {
for (int i = 0; i < NUM_ITERATIONS; ++i) {
std::lock_guard<std::mutex> lock(mtx);
++counter;
}
}
int main() {
// 使用原子变量
std::atomic<int> atomicCounter(0);
auto atomicStart = std::chrono::high_resolution_clock::now();
std::vector<std::thread> atomicThreads;
for (int i = 0; i < NUM_THREADS; ++i) {
atomicThreads.emplace_back(atomicIncrement, std::ref(atomicCounter));
}
for (auto& t : atomicThreads) {
t.join();
}
auto atomicEnd = std::chrono::high_resolution_clock::now();
std::chrono::duration<double> atomicDuration = atomicEnd - atomicStart;
std::cout << "Atomic counter final value: " << atomicCounter << std::endl;
std::cout << "Atomic duration: " << atomicDuration.count() << " seconds" << std::endl;
// 使用互斥锁
int mutexCounter = 0;
std::mutex mtx;
auto mutexStart = std::chrono::high_resolution_clock::now();
std::vector<std::thread> mutexThreads;
for (int i = 0; i < NUM_THREADS; ++i) {
mutexThreads.emplace_back(mutexIncrement, std::ref(mutexCounter), std::ref(mtx));
}
for (auto& t : mutexThreads) {
t.join();
}
auto mutexEnd = std::chrono::high_resolution_clock::now();
std::chrono::duration<double> mutexDuration = mutexEnd - mutexStart;
std::cout << "Mutex counter final value: " << mutexCounter << std::endl;
std::cout << "Mutex duration: " << mutexDuration.count() << " seconds" << std::endl;
return 0;
}
程序输出结果:
Copy Atomic counter final value: 8000000
Atomic duration: 0.87419 seconds
Mutex counter final value: 8000000
Mutex duration: 2.82929 seconds
refernce