以下代码用于实现多线程中只调用一次的效果,这里的if大多数情况下都是false,即已经被调用过了。这里是否被调用过用的是一个`std::atomic<uint32_t>`的原子变量
template <typename Callable, typename... Args>
void call_once(absl::once_flag& flag, Callable&& fn, Args&&... args) {
std::atomic<uint32_t>* once = base_internal::ControlWord(&flag);
uint32_t s = once->load(std::memory_order_acquire);
if (ABSL_PREDICT_FALSE(s != base_internal::kOnceDone)) {
base_internal::CallOnceImpl(
once, base_internal::SCHEDULE_COOPERATIVE_AND_KERNEL,
std::forward<Callable>(fn), std::forward<Args>(args)...);
}
}
// Recommendation: Modern CPUs dynamically predict branch execution paths,
// typically with accuracy greater than 97%. As a result, annotating every
// branch in a codebase is likely counterproductive; however, annotating
// specific branches that are both hot and consistently mispredicted is likely
// to yield performance improvements.
#if ABSL_HAVE_BUILTIN(__builtin_expect) || \
(defined(__GNUC__) && !defined(__clang__))
#define ABSL_PREDICT_FALSE(x) (__builtin_expect(false || (x), false))
#define ABSL_PREDICT_TRUE(x) (__builtin_expect(false || (x), true))
#else
#define ABSL_PREDICT_FALSE(x) (x)
#define ABSL_PREDICT_TRUE(x) (x)
#endif
(__builtin_expect(false || (x), true))
是一个使用了 GCC 内置函数 __builtin_expect
的表达式。这个内置函数通常用于向编译器提供分支预测信息,以优化代码的执行。
__builtin_expect
函数的语法是:
__builtin_expect(EXPRESSION, EXPECTED_VALUE)
EXPRESSION
是一个表达式,可以是任何布尔表达式。EXPECTED_VALUE
是一个编译器期望表达式 EXPRESSION
的结果为真或假的值。通常使用 true
或 false
。__builtin_expect
函数告诉编译器表达式 EXPRESSION
的结果很可能是 EXPECTED_VALUE
,以便编译器对代码进行优化。这种优化涉及到对条件分支的预测,使得最有可能的分支能够更快地执行,提高代码的性能。
在你的表达式中,(__builtin_expect(false || (x), true))
使用了 __builtin_expect
函数,期望 (false || (x))
的结果为真。这样的编码风格通常用于告诉编译器,(false || (x))
表达式中的 x
很可能为真,以便编译器在生成机器代码时进行相关的优化。
需要注意的是,__builtin_expect
是 GCC 提供的特定于编译器的内置函数,因此它在其他编译器或开发环境中可能不可用。如果你的代码需要在其他编译器中编译,可能需要进行适当的修改或条件编译。
讲讲这个call_once在面对多线程竞争时的实现原理。
template <typename Callable, typename... Args>
ABSL_ATTRIBUTE_NOINLINE
void CallOnceImpl(std::atomic<uint32_t>* control,
base_internal::SchedulingMode scheduling_mode, Callable&& fn,
Args&&... args) {
static const base_internal::SpinLockWaitTransition trans[] = {
{kOnceInit, kOnceRunning, true},
{kOnceRunning, kOnceWaiter, false},
{kOnceDone, kOnceDone, true}};
// Must do this before potentially modifying control word's state.
base_internal::SchedulingHelper maybe_disable_scheduling(scheduling_mode);
// Short circuit the simplest case to avoid procedure call overhead.
// The base_internal::SpinLockWait() call returns either kOnceInit or
// kOnceDone. If it returns kOnceDone, it must have loaded the control word
// with std::memory_order_acquire and seen a value of kOnceDone.
uint32_t old_control = kOnceInit;
if (control->compare_exchange_strong(old_control, kOnceRunning,
std::memory_order_relaxed) ||
base_internal::SpinLockWait(control, ABSL_ARRAYSIZE(trans), trans,
scheduling_mode) == kOnceInit) {
base_internal::invoke(std::forward<Callable>(fn),
std::forward<Args>(args)...);
old_control =
control->exchange(base_internal::kOnceDone, std::memory_order_release);
if (old_control == base_internal::kOnceWaiter) {
base_internal::SpinLockWake(control, true);
}
} // else *control is already kOnceDone
}
// See spinlock_wait.h for spec.
uint32_t SpinLockWait(std::atomic<uint32_t> *w, int n,
const SpinLockWaitTransition trans[],
base_internal::SchedulingMode scheduling_mode) {
int loop = 0;
for (;;) {
uint32_t v = w->load(std::memory_order_acquire);
int i;
for (i = 0; i != n && v != trans[i].from; i++) {
}
if (i == n) {
SpinLockDelay(w, v, ++loop, scheduling_mode); // no matching transition
} else if (trans[i].to == v || // null transition
w->compare_exchange_strong(v, trans[i].to,
std::memory_order_acquire,
std::memory_order_relaxed)) {
if (trans[i].done) return v;
}
}
}
这里精彩的地方有两个,一个是多线程进入时候的状态机转换过程,即原子变量遵循的trans
数组。第二个是SpinLockDelay
在多个平台下的实现。
//posix linux
ABSL_ATTRIBUTE_WEAK void ABSL_INTERNAL_C_SYMBOL(AbslInternalSpinLockDelay)(
std::atomic<uint32_t>* /* lock_word */, uint32_t /* value */, int loop,
absl::base_internal::SchedulingMode /* mode */) {
absl::base_internal::ErrnoSaver errno_saver;
if (loop == 0) {
} else if (loop == 1) {
sched_yield();
} else {
struct timespec tm;
tm.tv_sec = 0;
tm.tv_nsec = absl::base_internal::SpinLockSuggestedDelayNS(loop);
nanosleep(&tm, nullptr);
}
}
//win32
void ABSL_INTERNAL_C_SYMBOL(AbslInternalSpinLockDelay)(
std::atomic<uint32_t>* /* lock_word */, uint32_t /* value */, int loop,
absl::base_internal::SchedulingMode /* mode */) {
if (loop == 0) {
} else if (loop == 1) {
Sleep(0);
} else {
// SpinLockSuggestedDelayNS() always returns a positive integer, so this
// static_cast is safe.
Sleep(static_cast<DWORD>(
absl::base_internal::SpinLockSuggestedDelayNS(loop) / 1000000));
}
}
//sleep ms consideration
// Return a suggested delay in nanoseconds for iteration number "loop"
int SpinLockSuggestedDelayNS(int loop) {
// Weak pseudo-random number generator to get some spread between threads
// when many are spinning.
uint64_t r = delay_rand.load(std::memory_order_relaxed);
r = 0x5deece66dLL * r + 0xb; // numbers from nrand48()
delay_rand.store(r, std::memory_order_relaxed);
if (loop < 0 || loop > 32) { // limit loop to 0..32
loop = 32;
}
const int kMinDelay = 128 << 10; // 128us
// Double delay every 8 iterations, up to 16x (2ms).
int delay = kMinDelay << (loop / 8);
// Randomize in delay..2*delay range, for resulting 128us..4ms range.
return delay | ((delay - 1) & static_cast<int>(r));
}
abseil里面还定义了三个函数用于数据预取(prefetch)到本地缓存的函数。
数据预取是一种优化技术,通过提前将数据移动到CPU的缓存中,以便在数据被使用之前加速访问。这些函数的作用是将指定地址的数据预取到L1缓存中,以便在读取数据之前移动数据到缓存中。这样,当读取发生时,数据可能已经在缓存中,以提高访问速度。
下面是这些函数的简要说明:
void PrefetchToLocalCache(const void* addr)
: 将数据预取到L1缓存中,具有最高程度的时间局部性(temporal locality)。在可能的情况下,数据将预取到所有级别的缓存中。这个函数适用于具有长期重复访问的数据。
void PrefetchToLocalCacheNta(const void* addr)
: 与PrefetchToLocalCache
函数相同,但具有非时间局部性(non-temporal locality)。这意味着预取的数据不应该留在任何缓存层级中。这在数据只使用一次或短期使用的情况下很有用,例如对对象调用析构函数。
void PrefetchToLocalCacheForWrite(const void* addr)
: 将具有修改意图的数据预取到L1缓存中。这个函数类似于PrefetchToLocalCache
,但会预取带有“修改意图”的缓存行。通常包括在所有其他缓存层级中使该地址的缓存条目无效,并具有独占访问意图。这个函数用于在修改数据之前将数据预取到缓存中。
这些函数需要注意的是,不正确或滥用使用这些函数可能会降低性能。只有在经过充分的基准测试表明有改进时,才应使用这些函数。
ABSL_ATTRIBUTE_ALWAYS_INLINE inline void PrefetchToLocalCache(
const void* addr) {
_mm_prefetch(reinterpret_cast<const char*>(addr), _MM_HINT_T0);
}
ABSL_ATTRIBUTE_ALWAYS_INLINE inline void PrefetchToLocalCacheNta(
const void* addr) {
_mm_prefetch(reinterpret_cast<const char*>(addr), _MM_HINT_NTA);
}
ABSL_ATTRIBUTE_ALWAYS_INLINE inline void PrefetchToLocalCacheForWrite(
const void* addr) {
#if defined(_MM_HINT_ET0)
_mm_prefetch(reinterpret_cast<const char*>(addr), _MM_HINT_ET0);
#elif !defined(_MSC_VER) && defined(__x86_64__)
// _MM_HINT_ET0 is not universally supported. As we commented further
// up, PREFETCHW is recognized as a no-op on older Intel processors
// and has been present on AMD processors since the K6-2. We have this
// disabled for MSVC compilers as this miscompiles on older MSVC compilers.
asm("prefetchw (%0)" : : "r"(addr));
#endif
}
#if ABSL_HAVE_ATTRIBUTE(guarded_by)
#define ABSL_GUARDED_BY(x) __attribute__((guarded_by(x)))
#else
#define ABSL_GUARDED_BY(x)
#endif
__attribute__((guarded_by(x)))
是一个GCC/Clang的扩展属性(attribute),用于指定一个互斥量(mutex)或锁(lock)来保护变量的访问。
这个属性的语法如下:
__attribute__((guarded_by(x)))
其中,x
是一个标识符,用于指定用于保护变量访问的互斥量或锁的名称。
该属性的作用是向编译器提供关于变量的额外信息,以帮助进行静态分析和检查多线程代码中的数据竞争问题。通过将 __attribute__((guarded_by(x)))
应用于变量,我们可以指示编译器该变量受特定互斥量的保护,从而在编译时进行检查。
例如,考虑以下示例:
#include <mutex>
std::mutex mutex;
int shared_data __attribute__((guarded_by(mutex)));
void foo()
{
std::lock_guard<std::mutex> lock(mutex);
// 访问 shared_data
shared_data = 42;
}
在上面的示例中,shared_data
变量被 guarded_by
属性修饰,指示它受 mutex
互斥量的保护。这样,当在没有获取 mutex
互斥量的情况下访问 shared_data
时,编译器会发出警告或错误,以帮助检测潜在的数据竞争问题。
需要注意的是,__attribute__((guarded_by(x)))
是GCC/Clang的扩展属性,不是标准C++的一部分。因此,它在不同编译器之间可能具有不同的行为或不受支持。在使用该属性时,应注意编译器的兼容性和文档。