abseil中的微操

发布时间:2024年01月15日

给分支预测器的建议

原始代码

以下代码用于实现多线程中只调用一次的效果,这里的if大多数情况下都是false,即已经被调用过了。这里是否被调用过用的是一个`std::atomic<uint32_t>`的原子变量
template <typename Callable, typename... Args>
void call_once(absl::once_flag& flag, Callable&& fn, Args&&... args) {
  std::atomic<uint32_t>* once = base_internal::ControlWord(&flag);
  uint32_t s = once->load(std::memory_order_acquire);
  if (ABSL_PREDICT_FALSE(s != base_internal::kOnceDone)) {
    base_internal::CallOnceImpl(
        once, base_internal::SCHEDULE_COOPERATIVE_AND_KERNEL,
        std::forward<Callable>(fn), std::forward<Args>(args)...);
  }
}

用于做分支预测建议的宏

// Recommendation: Modern CPUs dynamically predict branch execution paths,
// typically with accuracy greater than 97%. As a result, annotating every
// branch in a codebase is likely counterproductive; however, annotating
// specific branches that are both hot and consistently mispredicted is likely
// to yield performance improvements.
#if ABSL_HAVE_BUILTIN(__builtin_expect) || \
    (defined(__GNUC__) && !defined(__clang__))
#define ABSL_PREDICT_FALSE(x) (__builtin_expect(false || (x), false))
#define ABSL_PREDICT_TRUE(x) (__builtin_expect(false || (x), true))
#else
#define ABSL_PREDICT_FALSE(x) (x)
#define ABSL_PREDICT_TRUE(x) (x)
#endif

解释

(__builtin_expect(false || (x), true)) 是一个使用了 GCC 内置函数 __builtin_expect 的表达式。这个内置函数通常用于向编译器提供分支预测信息,以优化代码的执行。

__builtin_expect 函数的语法是:

__builtin_expect(EXPRESSION, EXPECTED_VALUE)
  • EXPRESSION 是一个表达式,可以是任何布尔表达式。
  • EXPECTED_VALUE 是一个编译器期望表达式 EXPRESSION 的结果为真或假的值。通常使用 truefalse

__builtin_expect 函数告诉编译器表达式 EXPRESSION 的结果很可能是 EXPECTED_VALUE,以便编译器对代码进行优化。这种优化涉及到对条件分支的预测,使得最有可能的分支能够更快地执行,提高代码的性能。

在你的表达式中,(__builtin_expect(false || (x), true)) 使用了 __builtin_expect 函数,期望 (false || (x)) 的结果为真。这样的编码风格通常用于告诉编译器,(false || (x)) 表达式中的 x 很可能为真,以便编译器在生成机器代码时进行相关的优化。

需要注意的是,__builtin_expect 是 GCC 提供的特定于编译器的内置函数,因此它在其他编译器或开发环境中可能不可用。如果你的代码需要在其他编译器中编译,可能需要进行适当的修改或条件编译。

其他

讲讲这个call_once在面对多线程竞争时的实现原理。

  1. 第一个进入的线程可以执行
  2. 后续进入的线程需要等待
    有了这个认识,剩下的就是看原子变量的改变过程和等待过程了。

template <typename Callable, typename... Args>
ABSL_ATTRIBUTE_NOINLINE
void CallOnceImpl(std::atomic<uint32_t>* control,
                  base_internal::SchedulingMode scheduling_mode, Callable&& fn,
                  Args&&... args) {
  static const base_internal::SpinLockWaitTransition trans[] = {
      {kOnceInit, kOnceRunning, true},
      {kOnceRunning, kOnceWaiter, false},
      {kOnceDone, kOnceDone, true}};

  // Must do this before potentially modifying control word's state.
  base_internal::SchedulingHelper maybe_disable_scheduling(scheduling_mode);
  // Short circuit the simplest case to avoid procedure call overhead.
  // The base_internal::SpinLockWait() call returns either kOnceInit or
  // kOnceDone. If it returns kOnceDone, it must have loaded the control word
  // with std::memory_order_acquire and seen a value of kOnceDone.
  uint32_t old_control = kOnceInit;
  if (control->compare_exchange_strong(old_control, kOnceRunning,
                                       std::memory_order_relaxed) ||
      base_internal::SpinLockWait(control, ABSL_ARRAYSIZE(trans), trans,
                                  scheduling_mode) == kOnceInit) {
    base_internal::invoke(std::forward<Callable>(fn),
                          std::forward<Args>(args)...);
    old_control =
        control->exchange(base_internal::kOnceDone, std::memory_order_release);
    if (old_control == base_internal::kOnceWaiter) {
      base_internal::SpinLockWake(control, true);
    }
  }  // else *control is already kOnceDone
}
// See spinlock_wait.h for spec.
uint32_t SpinLockWait(std::atomic<uint32_t> *w, int n,
                      const SpinLockWaitTransition trans[],
                      base_internal::SchedulingMode scheduling_mode) {
  int loop = 0;
  for (;;) {
    uint32_t v = w->load(std::memory_order_acquire);
    int i;
    for (i = 0; i != n && v != trans[i].from; i++) {
    }
    if (i == n) {
      SpinLockDelay(w, v, ++loop, scheduling_mode);  // no matching transition
    } else if (trans[i].to == v ||                   // null transition
               w->compare_exchange_strong(v, trans[i].to,
                                          std::memory_order_acquire,
                                          std::memory_order_relaxed)) {
      if (trans[i].done) return v;
    }
  }
}

这里精彩的地方有两个,一个是多线程进入时候的状态机转换过程,即原子变量遵循的trans数组。第二个是SpinLockDelay在多个平台下的实现。

//posix linux 
ABSL_ATTRIBUTE_WEAK void ABSL_INTERNAL_C_SYMBOL(AbslInternalSpinLockDelay)(
    std::atomic<uint32_t>* /* lock_word */, uint32_t /* value */, int loop,
    absl::base_internal::SchedulingMode /* mode */) {
  absl::base_internal::ErrnoSaver errno_saver;
  if (loop == 0) {
  } else if (loop == 1) {
    sched_yield();
  } else {
    struct timespec tm;
    tm.tv_sec = 0;
    tm.tv_nsec = absl::base_internal::SpinLockSuggestedDelayNS(loop);
    nanosleep(&tm, nullptr);
  }
}
//win32
void ABSL_INTERNAL_C_SYMBOL(AbslInternalSpinLockDelay)(
    std::atomic<uint32_t>* /* lock_word */, uint32_t /* value */, int loop,
    absl::base_internal::SchedulingMode /* mode */) {
  if (loop == 0) {
  } else if (loop == 1) {
    Sleep(0);
  } else {
    // SpinLockSuggestedDelayNS() always returns a positive integer, so this
    // static_cast is safe.
    Sleep(static_cast<DWORD>(
        absl::base_internal::SpinLockSuggestedDelayNS(loop) / 1000000));
  }
}
//sleep ms consideration
// Return a suggested delay in nanoseconds for iteration number "loop"
int SpinLockSuggestedDelayNS(int loop) {
  // Weak pseudo-random number generator to get some spread between threads
  // when many are spinning.
  uint64_t r = delay_rand.load(std::memory_order_relaxed);
  r = 0x5deece66dLL * r + 0xb;   // numbers from nrand48()
  delay_rand.store(r, std::memory_order_relaxed);

  if (loop < 0 || loop > 32) {   // limit loop to 0..32
    loop = 32;
  }
  const int kMinDelay = 128 << 10;  // 128us
  // Double delay every 8 iterations, up to 16x (2ms).
  int delay = kMinDelay << (loop / 8);
  // Randomize in delay..2*delay range, for resulting 128us..4ms range.
  return delay | ((delay - 1) & static_cast<int>(r));
}

L1数据预取

abseil里面还定义了三个函数用于数据预取(prefetch)到本地缓存的函数。

数据预取是一种优化技术,通过提前将数据移动到CPU的缓存中,以便在数据被使用之前加速访问。这些函数的作用是将指定地址的数据预取到L1缓存中,以便在读取数据之前移动数据到缓存中。这样,当读取发生时,数据可能已经在缓存中,以提高访问速度。

下面是这些函数的简要说明:

  1. void PrefetchToLocalCache(const void* addr): 将数据预取到L1缓存中,具有最高程度的时间局部性(temporal locality)。在可能的情况下,数据将预取到所有级别的缓存中。这个函数适用于具有长期重复访问的数据。

  2. void PrefetchToLocalCacheNta(const void* addr): 与PrefetchToLocalCache函数相同,但具有非时间局部性(non-temporal locality)。这意味着预取的数据不应该留在任何缓存层级中。这在数据只使用一次或短期使用的情况下很有用,例如对对象调用析构函数。

  3. void PrefetchToLocalCacheForWrite(const void* addr): 将具有修改意图的数据预取到L1缓存中。这个函数类似于PrefetchToLocalCache,但会预取带有“修改意图”的缓存行。通常包括在所有其他缓存层级中使该地址的缓存条目无效,并具有独占访问意图。这个函数用于在修改数据之前将数据预取到缓存中。

这些函数需要注意的是,不正确或滥用使用这些函数可能会降低性能。只有在经过充分的基准测试表明有改进时,才应使用这些函数。

ABSL_ATTRIBUTE_ALWAYS_INLINE inline void PrefetchToLocalCache(
    const void* addr) {
  _mm_prefetch(reinterpret_cast<const char*>(addr), _MM_HINT_T0);
}

ABSL_ATTRIBUTE_ALWAYS_INLINE inline void PrefetchToLocalCacheNta(
    const void* addr) {
  _mm_prefetch(reinterpret_cast<const char*>(addr), _MM_HINT_NTA);
}

ABSL_ATTRIBUTE_ALWAYS_INLINE inline void PrefetchToLocalCacheForWrite(
    const void* addr) {
#if defined(_MM_HINT_ET0)
  _mm_prefetch(reinterpret_cast<const char*>(addr), _MM_HINT_ET0);
#elif !defined(_MSC_VER) && defined(__x86_64__)
  // _MM_HINT_ET0 is not universally supported. As we commented further
  // up, PREFETCHW is recognized as a no-op on older Intel processors
  // and has been present on AMD processors since the K6-2. We have this
  // disabled for MSVC compilers as this miscompiles on older MSVC compilers.
  asm("prefetchw (%0)" : : "r"(addr));
#endif
}

编译器静态检查

#if ABSL_HAVE_ATTRIBUTE(guarded_by)
#define ABSL_GUARDED_BY(x) __attribute__((guarded_by(x)))
#else
#define ABSL_GUARDED_BY(x)
#endif

__attribute__((guarded_by(x))) 是一个GCC/Clang的扩展属性(attribute),用于指定一个互斥量(mutex)或锁(lock)来保护变量的访问。

这个属性的语法如下:

__attribute__((guarded_by(x)))

其中,x 是一个标识符,用于指定用于保护变量访问的互斥量或锁的名称。

该属性的作用是向编译器提供关于变量的额外信息,以帮助进行静态分析和检查多线程代码中的数据竞争问题。通过将 __attribute__((guarded_by(x))) 应用于变量,我们可以指示编译器该变量受特定互斥量的保护,从而在编译时进行检查。

例如,考虑以下示例:

#include <mutex>

std::mutex mutex;
int shared_data __attribute__((guarded_by(mutex)));

void foo()
{
    std::lock_guard<std::mutex> lock(mutex);
    // 访问 shared_data
    shared_data = 42;
}

在上面的示例中,shared_data 变量被 guarded_by 属性修饰,指示它受 mutex 互斥量的保护。这样,当在没有获取 mutex 互斥量的情况下访问 shared_data 时,编译器会发出警告或错误,以帮助检测潜在的数据竞争问题。

需要注意的是,__attribute__((guarded_by(x))) 是GCC/Clang的扩展属性,不是标准C++的一部分。因此,它在不同编译器之间可能具有不同的行为或不受支持。在使用该属性时,应注意编译器的兼容性和文档。

文章来源:https://blog.csdn.net/qq_33882435/article/details/135606822
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。