链路传播(Propagate)机制及使用场景

发布时间:2024年01月24日

前言

服务间链路追踪传播机制是指在微服务架构中,通过记录和跟踪服务之间的请求和响应信息,来实现对服务间链路的追踪和监控。这种机制可以帮助开发人员快速定位服务间出现的问题,并进行优化和调整。

具体来说,服务间链路追踪传播机制可以通过在每个服务的请求和响应中添加唯一标识符来实现。当一个服务发送请求到另一个服务时,它会将自己的唯一标识符添加到请求头中,并发送给目标服务。目标服务收到请求后,会将请求头中的唯一标识符复制到响应头中,并返回给调用方。调用方收到响应后,可以通过唯一标识符来追踪服务间的调用链路。

为了实现服务间链路追踪传播机制,通常会使用一些开源工具或框架,比如OpenTelemetryDataDogZipkinSkyWalking?等。这些工具可以自动记录服务间的请求和响应信息,并提供可视化的界面来帮助开发人员进行调试和优化。

通常,我们会将这种特殊标识的请求头称之为传播协议

常见传播协议

不同的 APM 工具支持一种或者多种传播协议,以便更好的服务于应用。

  • b3
  • uber
  • sw8
  • w3c
  • datadog
  • ot

OpenTelemetry 除了原生支持?otw3c?、?b3?和?uber?几种协议外,通过:opentelemetry-collector-contrib?支持?sw8?和?datadog?传播协议。

B3 协议

B3 传播协议是第一个链路追踪协议,这里重点讲述 B3 传播协议相关内容。

B3 有两种编码:Single Header 和 Multiple Header。

  • 多个标头编码 X-B3-在跟踪上下文中使用每个项目的前缀标头
  • 单个标头将上下文分隔为一个名为 b3. 提取字段时,单头变体优先于多头变体

这是一个使用多个标头编码的示例流程,假设 HTTP 请求带有传播的跟踪:

Multiple Headers

需要配合多个 header key 使用。

TraceId

X-B3-TraceId?标头编码为 32 或 16 个低十六进制字符。例如,128 位 TraceId 标头:X-B3-TraceId:463ac35c9f6413ad48485a3953bb6124。除非仅传播“采样状态”,否则需要?X-B3-TraceId?标头。

SpanId

X-B3-SpanId?标头编码为16个较低的十六进制字符。例如:X-B3-SpanId:a2fb4a1d1a96d312。除非仅传播“采样状态”,否则需要?X-B3-SpanId?标头。

ParentSpanId

X-B3-ParentSpanId?标头可能存在于子跨度上,而在根跨度上必须不存在。它被编码为16个低十六进制字符。例如,ParentSpanId?标头可能看起来像:X-B3-ParentSpanId:00020000000000001

Sampling State

接受采样决定编码为?X-B3-Sampled:1,拒绝编码为?X-B3-Sampled:0。缺席意味着将决定推迟到该报头的接收方。例如:X-B3-Sampled:1

注:在编写本规范之前,一些示踪剂传播?X-B3-Sampled?为?true?或?false,而不是?1?或?0。虽然您不应该将?X-B3-Sampled?编码为?true?或?false,但宽松的实现可能会接受它们。

Debug Flag

调试编码为?X-B3-Flags: 1。可以忽略 Absent 或任何其他值。调试意味着接受决定,所以不要同时发送?X-B3-Sampled?标头。

Single Header

Single Header?只有一个 header 为?b3?的 key 作为标记。

b3={TraceId}-{SpanId}-{SamplingState}-{ParentSpanId}

b3: 80f198ee56343ba864fe8b2a57d3eff7-e457b5a2e4d86bd1-d

实现场景

目前主要有两大实现场景:

  • APM:主要是服务端之间传播
  • RUM:用户端(web、小程序等)与服务端传播

APM

OpenTelemetry 传播器源码

OpenTelemetry 传播器源码

传播器接口:TextMapPropagator

opentelemetry-java?通过定义统一的接口?TextMapPropagator?来实现各种 Propagate。

TextMapPropagator?默认提供了一个空实现?noop()

TextMapPropagator?定义了三个接口方法:

  • fields() : 规定了哪些字段作为传播器识别的对象
  • inject():定义了传播器字段注入规则
  • extract():定义传播器字段提取规则,返回?Context?对象

一些厂商有自己的 APM 标准,通过扩展?TextMapPropagator?接口,实现对接厂商特定的传播器。

@ThreadSafe
public interface TextMapPropagator {

  static TextMapPropagator composite(TextMapPropagator... propagators) {
    return composite(Arrays.asList(propagators));
  }

  static TextMapPropagator composite(Iterable<TextMapPropagator> propagators) {
    List<TextMapPropagator> propagatorsList = new ArrayList<>();
    for (TextMapPropagator propagator : propagators) {
      propagatorsList.add(propagator);
    }
    if (propagatorsList.isEmpty()) {
      return NoopTextMapPropagator.getInstance();
    }
    if (propagatorsList.size() == 1) {
      return propagatorsList.get(0);
    }
    return new MultiTextMapPropagator(propagatorsList);
  }

  static TextMapPropagator noop() {
    return NoopTextMapPropagator.getInstance();
  }

  Collection<String> fields();

  <C> void inject(Context context, @Nullable C carrier, TextMapSetter<C> setter);

  <C> Context extract(Context context, @Nullable C carrier, TextMapGetter<C> getter);
}
OtTracePropagator

OtTracePropagator?是 OpenTelemetry 特有的传播器,识别解析?ot-?开头的 header,部分源码如下:

@Immutable
public final class OtTracePropagator implements TextMapPropagator {

  static final String TRACE_ID_HEADER = "ot-tracer-traceid";
  static final String SPAN_ID_HEADER = "ot-tracer-spanid";
  static final String SAMPLED_HEADER = "ot-tracer-sampled";
  static final String PREFIX_BAGGAGE_HEADER = "ot-baggage-";
  private static final Collection<String> FIELDS =
      Collections.unmodifiableList(Arrays.asList(TRACE_ID_HEADER, SPAN_ID_HEADER, SAMPLED_HEADER));
    ....
  @Override
  public Collection<String> fields() {
    return FIELDS;
  }

  @Override
  public <C> void inject(Context context, @Nullable C carrier, TextMapSetter<C> setter) {
    if (context == null || setter == null) {
      return;
    }
    SpanContext spanContext = Span.fromContext(context).getSpanContext();
    if (!spanContext.isValid()) {
      return;
    }
    // Lightstep trace id MUST be 64-bits therefore OpenTelemetry trace id is truncated to 64-bits
    // by retaining least significant (right-most) bits.
    setter.set(
        carrier, TRACE_ID_HEADER, spanContext.getTraceId().substring(TraceId.getLength() / 2));
    setter.set(carrier, SPAN_ID_HEADER, spanContext.getSpanId());
    setter.set(carrier, SAMPLED_HEADER, String.valueOf(spanContext.isSampled()));

    // Baggage is only injected if there is a valid SpanContext
    Baggage baggage = Baggage.fromContext(context);
    if (!baggage.isEmpty()) {
      // Metadata is not supported by OpenTracing
      baggage.forEach(
          (key, baggageEntry) ->
              setter.set(carrier, PREFIX_BAGGAGE_HEADER + key, baggageEntry.getValue()));
    }
  }

  @Override
  public <C> Context extract(Context context, @Nullable C carrier, TextMapGetter<C> getter) {
    if (context == null) {
      return Context.root();
    }
    if (getter == null) {
      return context;
    }
    String incomingTraceId = getter.get(carrier, TRACE_ID_HEADER);
    String traceId =
        incomingTraceId == null
            ? TraceId.getInvalid()
            : StringUtils.padLeft(incomingTraceId, MAX_TRACE_ID_LENGTH);
    String spanId = getter.get(carrier, SPAN_ID_HEADER);
    String sampled = getter.get(carrier, SAMPLED_HEADER);
    SpanContext spanContext = buildSpanContext(traceId, spanId, sampled);
    if (!spanContext.isValid()) {
      return context;
    }

    Context extractedContext = context.with(Span.wrap(spanContext));

    // Baggage is only extracted if there is a valid SpanContext
    if (carrier != null) {
      BaggageBuilder baggageBuilder = Baggage.builder();
      for (String key : getter.keys(carrier)) {
        if (!key.startsWith(PREFIX_BAGGAGE_HEADER)) {
          continue;
        }
        String value = getter.get(carrier, key);
        if (value == null) {
          continue;
        }
        baggageBuilder.put(key.replace(OtTracePropagator.PREFIX_BAGGAGE_HEADER, ""), value);
      }
      Baggage baggage = baggageBuilder.build();
      if (!baggage.isEmpty()) {
        extractedContext = extractedContext.with(baggage);
      }
    }

    return extractedContext;
  }
 ...
}
初始化传播器

opentelemetry-java?通过?PropagatorConfiguration?对象对?Propagate?进行初始化操作,源码如下:

final class PropagatorConfiguration {

  private static final List<String> DEFAULT_PROPAGATORS = Arrays.asList("tracecontext", "baggage");

  static ContextPropagators configurePropagators(
      ConfigProperties config,
      ClassLoader serviceClassLoader,
      BiFunction<? super TextMapPropagator, ConfigProperties, ? extends TextMapPropagator>
          propagatorCustomizer) {
    Set<TextMapPropagator> propagators = new LinkedHashSet<>();
    List<String> requestedPropagators = config.getList("otel.propagators", DEFAULT_PROPAGATORS);

    NamedSpiManager<TextMapPropagator> spiPropagatorsManager =
        SpiUtil.loadConfigurable(
            ConfigurablePropagatorProvider.class,
            ConfigurablePropagatorProvider::getName,
            ConfigurablePropagatorProvider::getPropagator,
            config,
            serviceClassLoader);

    if (requestedPropagators.contains("none")) {
      if (requestedPropagators.size() > 1) {
        throw new ConfigurationException(
            "otel.propagators contains 'none' along with other propagators");
      }
      return ContextPropagators.noop();
    }
    for (String propagatorName : requestedPropagators) {
      propagators.add(
          propagatorCustomizer.apply(getPropagator(propagatorName, spiPropagatorsManager), config));
    }

    return ContextPropagators.create(TextMapPropagator.composite(propagators));
  }

  private static TextMapPropagator getPropagator(
      String name, NamedSpiManager<TextMapPropagator> spiPropagatorsManager) {
    if (name.equals("tracecontext")) {
      return W3CTraceContextPropagator.getInstance();
    }
    if (name.equals("baggage")) {
      return W3CBaggagePropagator.getInstance();
    }

    TextMapPropagator spiPropagator = spiPropagatorsManager.getByName(name);
    if (spiPropagator != null) {
      return spiPropagator;
    }
    throw new ConfigurationException(
        "Unrecognized value for otel.propagators: "
            + name
            + ". Make sure the artifact including the propagator is on the classpath.");
  }

  private PropagatorConfiguration() {}
}

启动 agent 时,通过指定传播器类型来实现对应实例构造,通过方法得知,otel 支持同时指定多种类型的传播器协议,默认采用?tracecontext?传播器——即?w3c?标准的传播器:
List<String> requestedPropagators = config.getList("otel.propagators", DEFAULT_PROPAGATORS);

获取传播器

opentelemetry-java?通过?OpenTelemetry?对象的?propagating()?方法构造当前支持的传播器,且通过?getPropagators()?方法来获取已有的传播器,OpenTelemetry?是一个接口类,部分代码如下:

public interface OpenTelemetry {
    static OpenTelemetry noop() {
        return DefaultOpenTelemetry.getNoop();
    }

    static OpenTelemetry propagating(ContextPropagators propagators) {
        return DefaultOpenTelemetry.getPropagating(propagators);
    }

    TracerProvider getTracerProvider();
    
    ContextPropagators getPropagators();

    ...
}

每一个?Instrumente?都需要通过?InstrumenterBuilder?获取?OpenTelemetry?对象来实现 trace、metric 相关业务逻辑。比如,启动一个 span。

如何获取 OpenTelemetry 对象

通过上面的研究,我们知道如何获取传播器,但是获取传播器需要获取?OpenTelemetry?对象,opentelemetry-java?提供了一个全局的对象?GlobalOpenTelemetry?来获取?OpenTelemetry?对象。我们来看看具体代码

public final class GlobalOpenTelemetry {
...
    @Nullable
    private static volatile ObfuscatedOpenTelemetry globalOpenTelemetry;

    public static OpenTelemetry get() {
        OpenTelemetry openTelemetry = globalOpenTelemetry;
        if (openTelemetry == null) {
            synchronized(mutex) {
                openTelemetry = globalOpenTelemetry;
                if (openTelemetry == null) {
                    OpenTelemetry autoConfigured = maybeAutoConfigureAndSetGlobal();
                    if (autoConfigured != null) {
                        return autoConfigured;
                    }

                    set(OpenTelemetry.noop());
                    return OpenTelemetry.noop();
                }
            }
        }

        return openTelemetry;
    }

    public static void set(OpenTelemetry openTelemetry) {
        synchronized(mutex) {
            if (globalOpenTelemetry != null) {
                throw new IllegalStateException("GlobalOpenTelemetry.set has already been called. GlobalOpenTelemetry.set must be called only once before any calls to GlobalOpenTelemetry.get. If you are using the OpenTelemetrySdk, use OpenTelemetrySdkBuilder.buildAndRegisterGlobal instead. Previous invocation set to cause of this exception.", setGlobalCaller);
            } else {
                globalOpenTelemetry = new ObfuscatedOpenTelemetry(openTelemetry);
                setGlobalCaller = new Throwable();
            }
        }
    }
    @Nullable
    private static OpenTelemetry maybeAutoConfigureAndSetGlobal() {
        Class openTelemetrySdkAutoConfiguration;
        try {
            openTelemetrySdkAutoConfiguration = Class.forName("io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdk");
        } catch (ClassNotFoundException var7) {
            return null;
        }

        boolean globalAutoconfigureEnabled = Boolean.parseBoolean(ConfigUtil.getString("otel.java.global-autoconfigure.enabled", "false"));
        if (!globalAutoconfigureEnabled) {
            logger.log(Level.INFO, "AutoConfiguredOpenTelemetrySdk found on classpath but automatic configuration is disabled. To enable, run your JVM with -Dotel.java.global-autoconfigure.enabled=true");
            return null;
        } else {
            try {
                Method initialize = openTelemetrySdkAutoConfiguration.getMethod("initialize");
                Object autoConfiguredSdk = initialize.invoke((Object)null);
                Method getOpenTelemetrySdk = openTelemetrySdkAutoConfiguration.getMethod("getOpenTelemetrySdk");
                return new ObfuscatedOpenTelemetry((OpenTelemetry)getOpenTelemetrySdk.invoke(autoConfiguredSdk));
            } catch (IllegalAccessException | NoSuchMethodException var5) {
                throw new IllegalStateException("AutoConfiguredOpenTelemetrySdk detected on classpath but could not invoke initialize method. This is a bug in OpenTelemetry.", var5);
            } catch (InvocationTargetException var6) {
                logger.log(Level.SEVERE, "Error automatically configuring OpenTelemetry SDK. OpenTelemetry will not be enabled.", var6.getTargetException());
                return null;
            }
        }
    }

    ...

GlobalOpenTelemetry?通过单例模式实现获取全局?OpenTelemetry?对象。而?OpenTelemetry?则主要是通过?AutoConfiguredOpenTelemetrySdk?的?getOpenTelemetrySdk?方法初始化构建。

至此,基本上完成了opentelemetry-java从传播器实现、构造、初始化及获取相关操作。

DataDog(DDtrace)传播器源码

DataDog(DDtrace)传播器源码

DDtrace除了支持上面表格中的传播协议之外,还支持其他传播协议,如:

  • Haystack

    • Haystack-Trace-ID
    • Haystack-Span-ID
    • Haystack-Parent-ID
  • XRAY

    • X-Amzn-Trace-Id

RUM

前面了解了传播器的原理、作用以及实现方式。传播器提到最多的是 header,也就说大多数都是基于 http 协议。基于 http 协议,进一步引申到 html、小程序、Android、iOS 等,从而引出另一个概念:RUM

RUM 全称为 Real User Monitor,即真实用户监控。

RUM 通过实现不同的传播器协议,将用户端与服务端紧密联系在一起,我们知道 APM 是后端接口及服务的性能表现,但接口用于地方很多,有的来源于 web,有的来源于小程序等等。通过引入 RUM,结合链路分析,很容易追踪到数据来源于哪一端。

当然 RUM 的作用不仅仅是这一方面,目前市面上常见的是基于 W3C(万维网联盟)定义的[navigation-timing] 标准(见下图),该标准详细定义了各种浏览器事件,通过浏览器事件的简单计算就可以算出来前端页面的首屏、白屏、DOM 加载、HTML 加载等时长,相较于测试环境的 F12 检查者模式,能有更效的收集生产环境真实用户的前端体验。

opentelemetry-js

opentelemetry-js?是 OpenTelemetry 具体的实现,目前支持 nodejs 和 js,作用范围比较有限,主要围绕 OpenTelemetry 的集中传播器协议实现,产品成熟度仍需进一步提升。

opentelemetry-js demo

观测云 RUM

是观测云推出的 RUM 产品,可以与多种 APM 技术融合,本质上是支持多种传播器协议,进而能够与各种 APM 结合使用。

支持的传播器协议有:

  • ddtrace :x-datadog-parent-id,x-datadog-sampled,x-datadog-sampling-priority,x-datadog-trace-id。
  • skywalking: sw8。
  • jaeger: uber-trace-id。
  • zipkin: X-B3-TraceId、X-B3-SpanId、X-B3-ParentSpanId、X-B3-Sampled、X-B3-Flags。
  • zipkin_single_header: b3。
  • w3c_traceparent: traceparent。
  • opentelemetry: 该类型支持 zipkin_single_header,w3c_traceparent,zipkin、jaeger三种类型的配置方式,根据在 rum sdk 中配置的 traceType 类型 添加对应的 header。

目前支持的应用有:

  • web H5
  • 小程序
  • Android
  • iOS
  • React Native
  • Flutter
  • UniApp
  • C++

以 web 为例,展示接入方式,接入简单。

<script src="https://static.guance.com/browser-sdk/v3/dataflux-rum.js" type="text/javascript"></script>
<script>
  window.DATAFLUX_RUM &&
    window.DATAFLUX_RUM.init({
      applicationId: '<应用 ID>',
      datakitOrigin: '<DATAKIT ORIGIN>', // 协议(包括://),域名(或IP地址)[和端口号]
      env: 'production',
      version: '1.0.0',
      service: 'browser',
      sessionSampleRate: 100,
      sessionReplaySampleRate: 70,
      trackInteractions: true,
      traceType: 'ddtrace', // 非必填,默认为ddtrace,目前支持 ddtrace、zipkin、skywalking_v3、jaeger、zipkin_single_header、w3c_traceparent 6种类型
      allowedTracingOrigins: ['https://api.example.com',/https:\/\/.*\.my-api-domain\.com/],  // 非必填,允许注入trace采集器所需header头部的所有请求列表。可以是请求的origin,也可以是是正则
    })
</script>

其中?traceType?为传播器协议。

通过 CDN 加速缓存,以同步脚本引入的方式引入 SDK,此方式可以确保能够收集到所有的错误,资源,请求,性能指标,不过可能会影响页面的加载性能。
单页应用建议将下方代码复制粘贴到 html 文件 head 标签的首行,多页应用建议将下方代码复制粘贴到公共 head 模版文件的首行。

更多更详细接入方式,参考官方文档用户访问监测

RUM 与 APM 实际应用

以某开源平台访问用户资源为例,查看实际应用场景,如下图所示:

右侧服务列表:

  • browser :浏览器链路,由RUM端上报
  • 其他服务为 APM 上报

RUM 通过传递特定的 header 信息,将链路信息传递给 APM,APM 正确解析相关传播协议,并以 RUM 传递的 trace为基准,创建 span信息,实现前后端串联。

browser?同时可以查看当前用户的请求耗时分布(128.3 ms)

通过右上角?view?按钮,可以跳转到当前页面的查看器,以此来查看分析用户行为。如下图所示

Baggage

前面已经分析了传播器一般需要携带的参数信息,便于前后端的应用串联,但实际上这只是传播器的基础用法,通常都是用于传递 trace 信息,实际上,在上面?opentelemetry-java?源码分析中,有一个频繁出现的单词?baggagebaggage?主要作用于业务参数的传递。

通常,研发人员不会直接在接口上填上用户信息,所以无法准确追踪这个接口是哪个用户触发的,借助 baggage,通过 header 进行传递,能够在服务间无限传递下去。

OpenTelemetry Baggage 用法

Header key 以?ot-baggage-?开头的参数,会被 OpenTelemetry 一直传递下去。

还可以采用sdk的方式:OpenTelemetry Baggage SDK用法

DataDog Baggage 用法

需要配置启动参数?dd.trace.header.baggage,设置哪些 header 作为传递的参数依据。用法如下:

Environment Variable: DD_TRACE_HEADER_BAGGAGE
Default: null
Example: CASE-insensitive-Header:my-baggage-name,User-ID:userId,My-Header-And-Baggage-Name
Default: null
Example: CASE-insensitive-Header:my-baggage-name,User-ID:userId,My-Header-And-Baggage-Name
Accepts a map of case-insensitive header keys to baggage keys and automatically applies matching request header values as baggage on traces. On propagation the reverse mapping is applied: Baggage is mapped to headers.
Available since version 1.3.0.

也可以采用 SDK 的方式:DataDog Baggage SDK 用法

文章来源:https://blog.csdn.net/DataFlux/article/details/135826864
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。