构建追踪接收器

如果您正在阅读本教程,您可能已经对分布式跟踪背后的OpenTelemetry概念有了一定了解,但是如果不了解,您可以通过这里快速了解。

以下是根据OpenTelemetry的定义的这些概念:

跟踪是追踪构成应用程序的服务处理的单个请求(称为跟踪)的进展。该请求可能由用户或应用程序发起。分布式跟踪是一种穿越进程、网络和安全边界的跟踪形式。

尽管该定义看起来非常应用程序中心化,但您可以利用OpenTelemetry跟踪模型来表示请求并快速了解其持续时间以及完成它所涉及的每个步骤的详细信息。

假设您已经有一个生成某种跟踪遥测的系统,OpenTelemetry收集器是帮助您将其提供给OTel世界的入口。

在收集器中,跟踪接收器负责接收并将请求遥测从其原始格式转换为OTel跟踪模型,以便可以通过收集器管道正确处理信息。

要实现跟踪接收器,您需要以下内容:

  • Config 实现,以使跟踪接收器能够在收集器的 config.yaml 中收集和验证其配置。
  • receiver.Factory 实现,这样收集器就可以正确实例化跟踪接收器组件。
  • TracesReceiver 实现,负责收集遥测,将其转换为内部跟踪表示,并将信息传递给管道中的下一个消费者。

在本教程中,我们将创建一个名为 tailtracer 的示例跟踪接收器,该接收器模拟拉取操作,并生成跟踪作为该操作的结果。接下来的几节将指导您按顺序实现上述步骤以创建接收器,让我们开始吧。

设置接收器的开发和测试环境

首先,使用构建自定义收集器教程创建名为 otelcol-dev 的收集器实例;您只需复制步骤2中描述的 builder-config.yaml 并运行 builder。这样,您就会得到一个名为 otelcol-dev 的文件夹,其中包含已准备好使用的收集器开发实例。

为了正确测试跟踪接收器,您需要一个分布式跟踪后端,以便收集器可以将遥测发送到该后端。我们将使用Jaeger,如果您还没有运行中的 Jaeger 实例,可以使用以下命令轻松启动一个 Docker 容器。

docker run -d --name jaeger \
  -e COLLECTOR_OTLP_ENABLED=true \
  -p 16686:16686 \
  -p 14317:4317 \
  -p 14318:4318 \
  jaegertracing/all-in-one:1.41

现在,创建一个 config.yaml 文件,以便设置收集器的组件。

cd otelcol-dev
touch config.yaml

目前,您只需要一个基本的跟踪管道,其中包含 otlp 接收器以及 otlpdebug1 导出器,以下是您的 config.yaml 文件的内容:

config.yaml

receivers:
  otlp:
    protocols:
      grpc:

processors:

exporters:
  # 注意:在 v0.86.0 之前,请使用 `logging` 而不是 `debug`。
  debug:
    verbosity: detailed
  otlp/jaeger:
    endpoint: localhost:14317
    tls:
      insecure: true

service:
  pipelines:
    traces:
      receivers: [otlp]
      processors: []
      exporters: [otlp/jaeger, debug]
  telemetry:
    logs:
      level: debug

请注意,我只在 otlp 导出器配置中使用了 insecure 标志,以便让本地开发环境更加轻松;在生产环境中运行收集器时,请不要使用此标志。

为了验证初始管道是否正确设置,运行 otelcol-dev 命令后,您应该看到以下输出:

$ ./otelcol-dev --config config.yaml
2023-09-12T15:22:18.652-0700    info    service/telemetry.go:84 Setting up own telemetry...
2023-09-12T15:22:18.652-0700    info    service/telemetry.go:201        Serving Prometheus metrics      {"address": ":8888", "level": "Basic"}
2023-09-12T15:22:18.652-0700    debug   exporter@v0.85.0/exporter.go:273        Stable component.       {"kind": "exporter", "data_type": "traces", "name": "otlp/jaeger"}
2023-09-12T15:22:18.652-0700    info    exporter@v0.85.0/exporter.go:275        Development component. May change in the future.        {"kind": "exporter", "data_type": "traces", "name": "debug"}
2023-09-12T15:22:18.652-0700    debug   receiver@v0.85.0/receiver.go:294        Stable component.       {"kind": "receiver", "name": "otlp", "data_type": "traces"}
2023-09-12T15:22:18.652-0700    info    service/service.go:138  Starting otelcontribcol...      {"Version": "0.85.0", "NumCPU": 10}
2023-09-12T15:22:18.652-0700    info    extensions/extensions.go:31     Starting extensions...
2023-09-12T15:22:18.652-0700    info    zapgrpc/zapgrpc.go:178  [core] [Channel #1] Channel created     {"grpc_log": true}
2023-09-12T15:22:18.652-0700    info    zapgrpc/zapgrpc.go:178  [core] [Channel #1] original dial target is: "localhost:14317"  {"grpc_log": true}
2023-09-12T15:22:18.652-0700    info    zapgrpc/zapgrpc.go:178  [core] [Channel #1] parsed dial target is: {URL:{Scheme:localhost Opaque:14317 User: Host: Path: RawPath: OmitHost:false ForceQuery:false RawQuery: Fragment: RawFragment:}}   {"grpc_log": true}
2023-09-12T15:22:18.652-0700    info    zapgrpc/zapgrpc.go:178  [core] [Channel #1] fallback to scheme "passthrough"    {"grpc_log": true}
2023-09-12T15:22:18.652-0700    info    zapgrpc/zapgrpc.go:178  [core] [Channel #1] parsed dial target is: {URL:{Scheme:passthrough Opaque: User: Host: Path:/localhost:14317 RawPath: OmitHost:false ForceQuery:false RawQuery: Fragment: RawFragment:}}      {"grpc_log": true}
2023-09-12T15:22:18.652-0700    info    zapgrpc/zapgrpc.go:178  [core] [Channel #1] Channel authority set to "localhost:14317"  {"grpc_log": true}
2023-09-12T15:22:18.652-0700    info    zapgrpc/zapgrpc.go:178  [core] [Channel #1] Channel switches to new LB policy "pick_first"      {"grpc_log": true}
2023-09-12T15:22:18.652-0700    info    zapgrpc/zapgrpc.go:178  [core] [Channel #1 SubChannel #2] Subchannel created    {"grpc_log": true}
2023-09-12T15:22:18.652-0700    info    zapgrpc/zapgrpc.go:178  [core] [Channel #1] Channel Connectivity change to CONNECTING   {"grpc_log": true}
2023-09-12T15:22:18.652-0700    info    zapgrpc/zapgrpc.go:178  [core] [Channel #1 SubChannel #2] Subchannel Connectivity change to CONNECTING  {"grpc_log": true}
2023-09-12T15:22:18.652-0700    info    zapgrpc/zapgrpc.go:178  [core] [Server #3] Server created       {"grpc_log": true}
2023-09-12T15:22:18.652-0700    info    zapgrpc/zapgrpc.go:178  [core] [pick-first-lb 0x140027d9b30] Received SubConn state update: 0x140027d9ce0, {ConnectivityState:CONNECTING ConnectionError:<nil>}        {"grpc_log": true}
2023-09-12T15:22:18.652-0700    info    zapgrpc/zapgrpc.go:178  [core] [Channel #1 SubChannel #2] Subchannel picks a new address "localhost:14317" to connect   {"grpc_log": true}
2023-09-12T15:22:18.652-0700    info    otlpreceiver@v0.85.0/otlp.go:83 Starting GRPC server    {"kind": "receiver", "name": "otlp", "data_type": "traces", "endpoint": "0.0.0.0:4317"}
2023-09-12T15:22:18.652-0700    info    service/service.go:161  Everything is ready. Begin running and processing data.
2023-09-12T15:22:18.652-0700    info    zapgrpc/zapgrpc.go:178  [core] [Server #3 ListenSocket #4] ListenSocket created {"grpc_log": true}
2023-09-12T15:22:18.662-0700    info    zapgrpc/zapgrpc.go:178  [core] [Channel #1 SubChannel #2] Subchannel Connectivity change to READY       {"grpc_log": true}

确保您看到最后一行,这将确认OTLP导出器已成功建立与本地Jaeger实例的gRPC连接。现在,我们准备好环境后,让我们开始编写接收器代码。

现在,创建名为 tailtracer 的另一个文件夹,以便我们有一个位置来存放所有接收器代码。

mkdir tailtracer

每个收集器组件都应该作为一个 Go 模块创建,所以您需要正确初始化 tailtracer 模块。在我的情况下,命令如下所示:

cd tailtracer
go mod init github.com/rquedas/otel4devs/collector/receiver/trace-receiver/tailtracer

读取和验证接收器设置

为了能够实例化并参与管道,收集器需要识别您的接收器并从其配置文件中正确加载设置。

tailtracer 接收器将具有以下设置:

  • interval:表示遥测拉取操作之间的时间间隔(以分钟为单位)的字符串
  • number_of_traces:每个间隔生成的模拟跟踪数量

以下是 tailtracer 接收器设置的样子:

receivers:
  tailtracer: # 此行表示接收器的 ID
    interval: 1m
    number_of_traces: 1

tailtracer 文件夹下,创建一个名为 config.go 的文件,您将在其中编写支持接收器设置的所有代码。

cd tailtracer
touch config.go

为了实现组件的配置方面,您需要创建一个 Config 结构体。将以下代码添加到 config.go 文件中:

package tailtracer

type Config struct{

}

为了让接收器访问其设置,Config 结构体必须对应接收器的每个设置字段。

在实现的要求中,interval 值将是可选的(我们稍后将看到生成默认值),但是当定义时应该至少为1分钟(1m),而 number_of_traces 将是一个必需的值。在实现了上述要求之后,config.go 文件应如下所示。

config.go

package tailtracer

// Config represents the receiver config settings within the collector's config.yaml
type Config struct {
   Interval    string `mapstructure:"interval"`
   NumberOfTraces int `mapstructure:"number_of_traces"`
}

既然您可以访问这些设置,那么您就可以通过根据可选的 ConfigValidator 接口实现 Validate 方法来提供所需的任何类型的验证。

在这种情况下,interval 值将是可选的(我们将在后面生成默认值),但是当定义时应该至少为1分钟(1m),而 number_of_traces 将是一个必需的值。在实现了 Validate 方法后,config.go 文件应如下所示。

config.go

package tailtracer

import (
	"fmt"
	"time"
)

// Config represents the receiver config settings within the collector's config.yaml
type Config struct {
	Interval       string `mapstructure:"interval"`
	NumberOfTraces int    `mapstructure:"number_of_traces"`
}

// Validate checks if the receiver configuration is valid
func (cfg *Config) Validate() error {
	interval, _ := time.ParseDuration(cfg.Interval)
	if interval.Minutes() < 1 {
		return fmt.Errorf("当定义时,间隔必须设置为至少1分钟(1m)")
	}

	if cfg.NumberOfTraces < 1 {
		return fmt.Errorf("number_of_traces 必须大于或等于1")
	}
	return nil
}

如果您想更详细地了解在组件的配置方面涉及的结构体和接口,请参阅收集器的 GitHub 项目中的 component/config.go 文件。

启用Collector以实例化你的接收器

在本教程开始时,你创建了一个名为otelcol-dev实例,其中包含以下组件:

  • 接收器:OTLP接收器
  • 处理器:批处理器
  • 导出器:Debug(调试)1 和 OTLP导出器

前往并打开otelcol-dev文件夹下的components.go文件,我们来看一下components()函数。

func components() (otelcol.Factories, error) {
	var err error
	factories := otelcol.Factories{}

	factories.Extensions, err = extension.MakeFactoryMap(
	)
	if err != nil {
		return otelcol.Factories{}, err
	}

	factories.Receivers, err = receiver.MakeFactoryMap(
		otlpreceiver.NewFactory(),
	)
	if err != nil {
		return otelcol.Factories{}, err
	}

	factories.Exporters, err = exporter.MakeFactoryMap(
		debugexporter.NewFactory(),
		otlpexporter.NewFactory(),
	)
	if err != nil {
		return otelcol.Factories{}, err
	}

	factories.Processors, err = processor.MakeFactoryMap(
		batchprocessor.NewFactory(),
	)
	if err != nil {
		return otelcol.Factories{}, err
	}

	return factories, nil
}

正如你所看到的,components()函数负责为Collector提供所有组件的工厂,这些工厂由类型为otelcol.Factories的变量factories表示(otelcol.Factories结构体的声明在这里),然后用于实例化由Collector的流水线配置和使用的组件。

注意,factories.Receivers是保存所有接收器工厂(receiver.Factory的实例)的地方,目前只有otlpreceiver工厂,该工厂通过调用otlpreceiver.NewFactory()函数进行实例化。

tailtracer接收器必须提供一个receiver.Factory实现,虽然你可以找到一个receiver.Factory接口(你可以在Collector项目中的receiver/receiver.go文件中找到其定义),但是,为了提供正确的实现方式,你应该使用go.opentelemetry.io/collector/receiver包中可用的函数。

实现你的receiver.Factory

首先,在tailtracer文件夹中创建一个名为factory.go的文件。

cd tailtracer
touch factory.go

现在,让我们按照惯例添加一个名为NewFactory()的函数,该函数将负责实例化tailtracer工厂。请在你的factory.go文件中添加以下代码:

package tailtracer

import (
	"go.opentelemetry.io/collector/receiver"
)

// NewFactory creates a factory for tailtracer receiver.
func NewFactory() receiver.Factory {

}

为了实例化你的tailtracer接收器工厂,你将使用receiver包中的以下函数:

func NewFactory(cfgType component.Type, createDefaultConfig component.CreateDefaultConfigFunc, options ...FactoryOption) Factory

receiver.NewFactory()会实例化并返回一个receiver.Factory,它需要以下参数:

  • component.Type:在所有Collector组件中用于标识你的接收器的唯一字符串标识符。
  • component.CreateDefaultConfigFunc:一个引用,返回你的接收器的component.Config实例的函数。
  • ...FactoryOption:一个receiver.FactoryOption切片,用于确定你的接收器能够处理哪种类型的信号。

现在,让我们实现支持receiver.NewFactory()所需的所有参数。

标识并为接收器提供默认设置

前面我们提到,我们的tailtracer接收器的interval设置是可选的,因此你需要为其提供一个默认值,以便在默认设置中使用。

请继续向你的factory.go文件添加以下代码:

const (
	typeStr = "tailtracer"
	defaultInterval = 1 * time.Minute
)

对于默认设置,你只需要添加一个函数,该函数返回一个包含tailtracer接收器的默认配置的component.Config

为了实现这一点,请继续向你的factory.go文件添加以下代码:

func createDefaultConfig() component.Config {
	return &Config{
		Interval: string(defaultInterval),
	}
}

在进行这两个更改后,你会发现有一些所需的导入缺失,所以这是你的factory.go文件,包括适当的导入:

factory.go

package tailtracer

import (
	"time"

	"go.opentelemetry.io/collector/component"
	"go.opentelemetry.io/collector/receiver"
)

const (
	typeStr = "tailtracer"
	defaultInterval = 1 * time.Minute
)

func createDefaultConfig() component.Config {
	return &Config{
		Interval: string(defaultInterval),
	}
}

// NewFactory creates a factory for tailtracer receiver.
func NewFactory() receiver.Factory {
	return nil
}

使工厂能够描述接收器可以处理的信号

同一个接收器组件可以处理跟踪(traces)、指标(metrics)和日志(logs)。接收器的工厂负责描述这些能力。

鉴于本教程的主题是跟踪,我们只会启用tailtracer接收器处理跟踪信号。receiver包提供了以下函数和类型,来帮助工厂描述跟踪信号的处理能力:

func WithTraces(createTracesReceiver CreateTracesFunc, sl component.StabilityLevel) FactoryOption

receiver.WithTraces()会实例化并返回一个receiver.FactoryOption,它需要以下参数:

  • createTracesReceiver:与receiver.CreateTracesFunc类型匹配的函数引用。

receiver.CreateTracesFunc是指向函数的指针,该函数负责实例化并返回一个receiver.Traces实例,它需要以下参数:

  • context.Context:Collector的context.Context引用,以便你的跟踪接收器可以正确管理其执行环境。
  • receiver.CreateSettings:创建接收器的某些Collector设置的引用。
  • component.Config:由Collector传递给工厂的接收器配置设置的引用,以便它可以从Collector配置中正确读取其设置。
  • consumer.Traces:下一个consumer.Traces的引用,在其中接收到的跟踪信号将进行处理。这可以是处理器(processor)或导出器(exporter)。

首先,添加引导代码以正确实现receiver.CreateTracesFunc函数指针。请继续向你的factory.go文件添加以下代码:

func createTracesReceiver(_ context.Context, params receiver.CreateSettings, baseCfg component.Config, consumer consumer.Traces) (receiver.Traces, error) {
	return nil, nil
}

现在,你已经具备了成功使用receiver.NewFactory函数实例化接收器工厂的所有必要组件。请继续更新你的factory.go文件中的NewFactory()函数,如下所示:

// NewFactory creates a factory for tailtracer receiver.
func NewFactory() receiver.Factory {
	return receiver.NewFactory(
		typeStr,
		createDefaultConfig,
		receiver.WithTraces(createTracesReceiver, component.StabilityLevelAlpha))
}

在进行这两个更改后,你会发现有一些所需的导入缺失,所以这是你的factory.go文件,包括适当的导入:

factory.go

package tailtracer

import (
	"context"
	"time"

	"go.opentelemetry.io/collector/component"
	"go.opentelemetry.io/collector/consumer"
	"go.opentelemetry.io/collector/receiver"
)

const (
	typeStr = "tailtracer"
	defaultInterval = 1 * time.Minute
)

func createDefaultConfig() component.Config {
	return &Config{
		Interval: string(defaultInterval),
	}
}

func createTracesReceiver(_ context.Context, params receiver.CreateSettings, baseCfg component.Config, consumer consumer.Traces) (receiver.Traces, error) {
	return nil, nil
}

// NewFactory creates a factory for tailtracer receiver.
func NewFactory() receiver.Factory {
	return receiver.NewFactory(
		typeStr,
		createDefaultConfig,
		receiver.WithTraces(createTracesReceiver, component.StabilityLevelAlpha))
}

此时,你已经具备了tailtracer工厂和配置代码,Collector将能够验证是否定义了config.yamltailtracer接收器的设置。你只需将其添加到Collector的初始化过程中。

将接收器工厂添加到Collector的初始化过程中

如前所述,所有Collector组件都由components.go文件中的components()函数实例化。

需要将tailtracer接收器工厂实例添加到factories映射中,以便Collector可以正确加载它作为其初始化过程的一部分。

在进行下面的更改后,这是具备支持的代码库构建的Collector运行otelcol-dev命令的输出的开头:

$ ./otelcol-dev --config config.yaml
2023-09-28T08:56:53.027-0700    信息    service@v0.86.0/telemetry.go:84 启用自有遥测...
2023-09-28T08:56:53.027-0700    信息    service@v0.86.0/telemetry.go:201        提供Prometheus度量      {"地址": ":8888", "级别": "Basic"}
2023-09-28T08:56:53.027-0700    调试   exporter@v0.86.0/exporter.go:273        稳定组件。       {"类型": "exporter", "数据类型": "跟踪", "名称": "otlp/jaeger"}
2023-09-28T08:56:53.027-0700    信息    exporter@v0.86.0/exporter.go:275        开发中组件。将来可能会更改。         {"类型": "exporter", "数据类型": "跟踪", "名称": "debug"}
2023-09-28T08:56:53.027-0700    调试   receiver@v0.86.0/receiver.go:294        Alpha组件。将来可能会更改。        {"类型": "receiver", "名称": "tailtracer", "数据类型": "跟踪"}
2023-09-28T08:56:53.027-0700    调试   receiver@v0.86.0/receiver.go:294        稳定组件。       {"类型": "receiver", "名称": "otlp", "数据类型": "跟踪"}
2023-09-28T08:56:53.027-0700    信息    service@v0.86.0/service.go:138  开始运行otelcol-dev... {"版本": "1.0.0", "NumCPU": 10}
2023-09-28T08:56:53.027-0700    信息    extensions/extensions.go:31     启动扩展...

查找"log builder/receivers_builder.go:111"的日志行(它是所示片段中从底部开始的第4行),你可以看到Collector找到了tailtracer接收器的设置,验证了这些设置(当前的设置都是正确的),但是由于它未在任何流水线中使用,因此被忽略。

让我们检查一下tailtracer工厂是否正确验证了接收器的设置,number_of_traces设置是可选的,因此如果你将其从config.yaml中删除,然后再次运行该命令,你会得到相同的输出。

现在,让我们测试一个tailtracer设置验证规则。从config.yaml中删除number_of_traces设置,运行Collector的输出如下所示:

$ ./otelcol-dev --config config.yaml
错误: 无效的配置:接收器“tailtracer”具有无效的配置:number_of_traces必须至少为1
2022/02/24 13:00:20 集合器服务器运行完成,出现错误:无效的配置:接收器“tailtracer”具有无效的配置:number_of_traces必须至少为1

tailtracer接收器工厂和配置要求已经完成,Collector正在正确加载你的组件。你现在可以进入接收器的核心部分,即组件本身的实现。

实现追踪接收器组件

在上一节中,我提到接收器可以处理任何 OpenTelemetry 信号,收集器的 API 旨在帮助你完成这一任务。

当前,负责启用信号的所有接收器 API 都在 receiver/receiver.go 文件中声明,在 OTel Collector 的 GitHub 项目中可以找到,打开该文件并花一分钟浏览其中声明的所有接口。

请注意,receiver.Traces(及其兄弟 receiver.Metricsreceiver.Logs)此时并未描述任何特定的方法,除了它从 component.Component “继承”的方法之外。

也许这感觉有点奇怪,但请记住,收集器的 API 是可扩展的,组件及其信号可能以不同的方式发展,因此这些接口的作用是支持这一点。

因此,要创建一个 receiver.Traces,你只需要按照 component.Component 接口所描述的以下方法进行实现:

Start(ctx context.Context, host Host) error
Shutdown(ctx context.Context) error

这两个方法实际上是事件处理程序,用于收集器与其组件在生命周期中的通信。

Start() 表示收集器向组件发出启动处理的信号。作为该事件的一部分,收集器将传递以下信息:

  • context.Context:大部分时间内,接收器将处理一个长时间运行的操作,因此建议忽略此上下文,实际上应该从 context.Background() 创建一个新上下文。
  • Host:该主机用于在收集器启动和运行之后与接收器进行通信。

Shutdown() 表示收集器向组件发出服务正在关闭的信号,因此组件应该停止其处理并进行必要的清理工作:

  • context.Context:作为关闭操作的一部分由收集器传递的上下文。

你将通过在项目的 tailtracer 文件夹中创建一个名为 trace-receiver.go 的新文件,并在其中添加名为 tailtracerReceiver 的类型声明来开始实现。

type tailtracerReceiver struct{

}

现在你已经有了 tailtracerReceiver 类型,可以实现 Start()Shutdown() 方法,使接收器类型与 receiver.Traces 接口兼容。

下面是在 tailtracer/trace-receiver.go 文件中实现了方法的内容:

trace-receiver.go

package tailtracer

import (
	"context"
	"go.opentelemetry.io/collector/component"
)

type tailtracerReceiver struct {
}

func (tailtracerRcvr *tailtracerReceiver) Start(ctx context.Context, host component.Host) error {
	return nil
}

func (tailtracerRcvr *tailtracerReceiver) Shutdown(ctx context.Context) error {
	return nil
}

Start() 方法中,传递了两个引用(context.Contextcomponent.Host),你的接收器可能需要保留这些引用,以便在处理其操作时使用。

context.Context 引用应该用于创建新的上下文,以支持接收器的处理操作。在这种情况下,你需要决定如何处理上下文取消,以便在组件的 Shutdown() 方法中正确地完成它。

component.Host 在接收器的整个生命周期中可能很有用,因此应将该引用保留在 tailtracerReceiver 类型中。

接下来,你需要更新 Start() 方法,以便接收器可以正确初始化其自身的处理上下文,并在 cancel 字段中保留取消函数。你还需要更新 Stop() 方法,以便通过调用 cancel 函数来完成上下文的清理。

下面是在进行了上述更改后,trace-receiver.go 文件的内容:

trace-receiver.go

package tailtracer

import (
	"context"
	"go.opentelemetry.io/collector/component"
)

type tailtracerReceiver struct {
    host component.Host
	cancel context.CancelFunc
}

func (tailtracerRcvr *tailtracerReceiver) Start(ctx context.Context, host component.Host) error {
    tailtracerRcvr.host = host
    ctx = context.Background()
	ctx, tailtracerRcvr.cancel = context.WithCancel(ctx)

	return nil
}

func (tailtracerRcvr *tailtracerReceiver) Shutdown(ctx context.Context) error {
	tailtracerRcvr.cancel()
	return nil
}

现在,tailtracerReceiver 类型已经完全准备好实例化,并保留了其工厂传递的所有有意义的信息。

打开 tailtracer/factory.go 文件,导航到 createTracesReceiver() 函数。请注意,仅在作为管道中的组件声明时,才会实例化接收器,并且工厂负责确保管道中的下一个消费者(处理器或导出器)有效,否则应生成错误。

收集器的 API 提供了一些标准错误类型,以帮助工厂处理管道配置。如果下一个消费者存在问题并且作为 nil 传递,则你的接收器工厂应该抛出 component.ErrNilNextConsumer

createTracesReceiver() 函数将需要一个守卫子句来进行验证。

你还需要变量来正确初始化 tailtracerReceiver 实例的 configlogger 字段。

下面是使用更新后的 createTracesReceiver() 函数后的 factory.go 文件的内容:

factory.go

package tailtracer

import (
	"context"
	"time"

	"go.opentelemetry.io/collector/component"
	"go.opentelemetry.io/collector/consumer"
	"go.opentelemetry.io/collector/receiver"
)

const (
	typeStr = "tailtracer"
	defaultInterval = 1 * time.Minute
)

func createDefaultConfig() component.Config {
	return &Config{
		Interval: string(defaultInterval),
	}
}

func createTracesReceiver(_ context.Context, params receiver.CreateSettings, baseCfg component.Config, consumer consumer.Traces) (receiver.Traces, error) {
	if consumer == nil {
		return nil, component.ErrNilNextConsumer
	}

	logger := params.Logger
	tailtracerCfg := baseCfg.(*Config)

	traceRcvr := &tailtracerReceiver{
		logger:       logger,
		nextConsumer: consumer,
		config:       tailtracerCfg,
	}

	return traceRcvr, nil
}

// NewFactory creates a factory for tailtracer receiver.
func NewFactory() receiver.Factory {
	return receiver.NewFactory(
		typeStr,
		createDefaultConfig,
		receiver.WithTraces(createTracesReceiver, component.StabilityLevelAlpha))
}

工厂完全实现并且实例化了追踪接收器组件,你已经准备好测试该接收器作为管道的一部分了。继续并在 config.yaml 中的 traces 管道中添加 tailtracer 接收器,如下所示:

service:
  pipelines:
    traces:
      receivers: [otlp, tailtracer]
      processors: []
      exporters: [otlp/jaeger, debug]

以下是在更新了 traces 管道后使用 otelcol-dev 命令运行你的 Collector 所显示的输出:

$ ./otelcol-dev --config config.yaml
2023-09-28T08:59:52.111-0700    info    service@v0.86.0/telemetry.go:84 Setting up own telemetry...
2023-09-28T08:59:52.111-0700    info    service@v0.86.0/telemetry.go:201        Serving Prometheus metrics      {"address": ":8888", "level": "Basic"}
2023-09-28T08:59:52.111-0700    debug   exporter@v0.86.0/exporter.go:273        Stable component.       {"kind": "exporter", "data_type": "traces", "name": "otlp/jaeger"}
2023-09-28T08:59:52.112-0700    info    exporter@v0.86.0/exporter.go:275        Development component. May change in the future.        {"kind": "exporter", "data_type": "traces", "name": "debug"}
2023-09-28T08:59:52.112-0700    debug   receiver@v0.86.0/receiver.go:294        Stable component.       {"kind": "receiver", "name": "otlp", "data_type": "traces"}
2023-09-28T08:59:52.112-0700    debug   receiver@v0.86.0/receiver.go:294        Alpha component. May change in the future.      {"kind": "receiver", "name": "tailtracer", "data_type": "traces"}
2023-09-28T08:59:52.112-0700    info    service@v0.86.0/service.go:138  Starting otelcol-dev... {"Version": "1.0.0", "NumCPU": 10}
2023-09-28T08:59:52.112-0700    info    extensions/extensions.go:31     Starting extensions...
2023-09-28T08:59:52.113-0700    info    otlpreceiver@v0.86.0/otlp.go:83 Starting GRPC server    {"kind": "receiver", "name": "otlp", "data_type": "traces", "endpoint": "0.0.0.0:4317"}
2023-09-28T08:59:52.113-0700    info    service@v0.86.0/service.go:161  Everything is ready. Begin running and processing data.
2023-09-28T09:00:52.113-0700    info    tailtracer/receiver.go:33       I should start processing traces now!   {"kind": "receiver", "name": "tailtracer", "data_type": "traces"}

查找 “builder/receivers_builder.go:68 Receiver is starting… {“kind”: “receiver”, “name”: “tailtracer”}” 的日志行,你会发现收集器在 traces 管道中找到了 tailtracer 接收器的设置,正在实例化并启动它,因为收集器启动后的1分钟内,你可以看到我们在 Start() 方法中的 ticker 函数添加的 info 日志行。

现在,请在 Collector 的终端上按下 Ctrl + C,以便你可以观察到关闭过程。以下是输出的样子:

^C2023-09-28T09:01:18.784-0700  info    otelcol@v0.86.0/collector.go:250        Received signal from OS {"signal": "interrupt"}
2023-09-28T09:01:18.784-0700    info    service@v0.86.0/service.go:170  Starting shutdown...
2023-09-28T09:01:18.784-0700    info    zapgrpc/zapgrpc.go:178  [core] [Server #3 ListenSocket #4] ListenSocket deleted {"grpc_log": true}
2023-09-28T09:01:18.784-0700    info    zapgrpc/zapgrpc.go:178  [core] [Channel #1] Channel Connectivity change to SHUTDOWN     {"grpc_log": true}
2023-09-28T09:01:18.784-0700    info    zapgrpc/zapgrpc.go:178  [core] [Channel #1] ccBalancerWrapper: closing  {"grpc_log": true}
2023-09-28T09:01:18.785-0700    info    zapgrpc/zapgrpc.go:178  [core] [Channel #1] Closing the name resolver   {"grpc_log": true}
2023-09-28T09:01:18.785-0700    info    zapgrpc/zapgrpc.go:178  [core] [Channel #1 SubChannel #2] Subchannel Connectivity change to SHUTDOWN    {"grpc_log": true}
2023-09-28T09:01:18.785-0700    info    zapgrpc/zapgrpc.go:178  [core] [Channel #1 SubChannel #2] Subchannel deleted    {"grpc_log": true}
2023-09-28T09:01:18.785-0700    info    zapgrpc/zapgrpc.go:178  [transport] [client-transport 0x140002c8000] Closing: rpc error: code = Canceled desc = grpc: the client connection is closing   {"grpc_log": true}
2023-09-28T09:01:18.785-0700    info    zapgrpc/zapgrpc.go:178  [core] [Channel #1] Channel deleted     {"grpc_log": true}
2023-09-28T09:01:18.785-0700    info    extensions/extensions.go:45     Stopping extensions...
2023-09-28T09:01:18.785-0700    info    service@v0.86.0/service.go:184  Shutdown complete.

如你所见,这有一个用于 tailtracer 接收器的 info 日志行,这意味着该组件正确响应了 Shutdown() 事件。在下一节中,你将了解有关 OpenTelemetry 跟踪数据模型的更多信息,以便 tailtracer 接收器最终可以生成跟踪!

收集器的跟踪数据模型

你可能通过使用SDK和应用程序中的仪表化,熟悉了OpenTelemetry的跟踪,以便在像Jaeger这样的分布式跟踪后端中查看和评估跟踪。

下面是Jaeger中跟踪的样例:

Jaeger跟踪

虽然这是Jaeger的跟踪,但它是由收集器内的跟踪管道生成的,因此你可以使用它来了解一些有关OTel跟踪数据模型的信息:

  • 一个跟踪由一个或多个跨度组成,这些跨度按层次结构进行组织以表示依赖关系。
  • 跨度可以表示服务内和/或服务间的操作。

在跟踪接收器中创建跟踪与使用SDK的方式略有不同,因此让我们开始回顾一下高级概念。

使用资源

在OTel世界中,所有遥测数据都由资源(Resource)生成,以下是根据OTel规范的定义:

资源(Resource)是以属性形式生成遥测数据的实体的不可变表示。例如,在Kubernetes容器中运行的生成遥测数据的进程包括Pod名称、所属命名空间以及可能的部署名称。这三个属性都可以包含在资源(Resource)中。

跟踪最常用于表示服务请求(Jaeger模型中描述的服务实体),它们通常是作为计算单元中运行的进程实现的,但OTel的API方式通过属性来描述资源(Resource)是足够灵活的,可以表示您可能需要的任何实体,比如ATM机、物联网传感器等。

因此,可以说要存在一个跟踪,就必须有一个资源(Resource)来启动它。

在本教程中,我们将模拟一个系统,该系统有遥测数据,展示了位于2个不同州(例如伊利诺伊州和加利福尼亚州)的ATM机访问账户后端系统以执行余额、存款和提现操作,因此我们将需要实现代码来创建代表ATM机和后端系统的资源(Resource)类型。

创建一个名为model.go的文件,放在tailtracer文件夹中

cd tailtracer
touch model.go

现在,在model.go文件中,添加以下定义的AtmBackendSystem类型:

model.go

package tailtracer

type Atm struct{
    ID           int64
	Version      string
	Name         string
	StateID      string
	SerialNumber string
	ISPNetwork   string
}

type BackendSystem struct{
	Version       string
	ProcessName   string
	OSType        string
    OSVersion     string
	CloudProvider string
	CloudRegion   string
	ServiceName   string
	Endpoint      string
}

这些类型用于代表系统中的实体,它们包含的信息将作为资源(Resource)定义的一部分添加到跟踪中。你将添加一些辅助函数来生成这些类型的实例。

使用下面的辅助函数代码更新model.go文件:

model.go

package tailtracer

import (
	"math/rand"
	"time"
)

type Atm struct{
    ID           int64
	Version      string
	Name         string
	StateID      string
	SerialNumber string
	ISPNetwork   string
}

type BackendSystem struct{
	Version       string
	ProcessName   string
	OSType        string
    OSVersion     string
	CloudProvider string
	CloudRegion   string
	Endpoint      string
}

func generateAtm() Atm{
	i := getRandomNumber(1, 2)
    var newAtm Atm

	switch i {
		case 1:
			newAtm = Atm{
				ID: 111,
				Name: "ATM-111-IL",
				SerialNumber: "atmxph-2022-111",
				Version: "v1.0",
				ISPNetwork: "comcast-chicago",
				StateID: "IL",

			}

		case 2:
			newAtm = Atm{
				ID: 222,
				Name: "ATM-222-CA",
				SerialNumber: "atmxph-2022-222",
				Version: "v1.0",
				ISPNetwork: "comcast-sanfrancisco",
				StateID: "CA",
			}
	}

	return newAtm
}

func generateBackendSystem() BackendSystem{
    i := getRandomNumber(1, 3)

	newBackend := BackendSystem{
    	ProcessName: "accounts",
		Version: "v2.5",
		OSType: "lnx",
		OSVersion: "4.16.10-300.fc28.x86_64",
		CloudProvider: "amzn",
		CloudRegion: "us-east-2",
	}

	switch i {
		case 1:
		 	newBackend.Endpoint = "api/v2.5/balance"
		case 2:
		  	newBackend.Endpoint = "api/v2.5/deposit"
		case 3:
			newBackend.Endpoint = "api/v2.5/withdrawn"

	}

	return newBackend
}

func getRandomNumber(min int, max int) int {
	rand.Seed(time.Now().UnixNano())
	i := (rand.Intn(max - min + 1) + min)
    return i
}

现在,你已经有了用于生成生成遥测数据实体的函数,准备在OTel Collector中表示这些实体。

收集器的API提供了一个名为ptrace(位于pdata包下的嵌套包)的包,其中包含了处理收集器管道中的跟踪所需的所有类型、接口和辅助函数。

打开tailtracer/model.go文件,并将go.opentelemetry.io/collector/pdata/ptrace添加到import子句中,以便可以访问ptrace包的功能。

在定义Resource之前,你需要创建一个ptrace.Traces,它将负责通过收集器的管道传播跟踪,并且你可以使用辅助函数ptrace.NewTraces()来实例化它。你还需要创建AtmBackendSystem类型的实例,以便有数据来表示在跟踪中所涉及的遥测数据源。

打开tailtracer/model.go文件,并添加以下函数:

func generateTraces(numberOfTraces int) ptrace.Traces{
	traces := ptrace.NewTraces()

	for i := 0; i <= numberOfTraces; i++{
		newAtm := generateAtm()
		newBackendSystem := generateBackendSystem()
	}

	return traces
}

到目前为止,你已经听到和阅读了足够多关于跟踪由跨度组成的内容。你可能也编写了一些使用SDK的函数和类型的仪表化代码来创建这些跨度,但你可能不知道的是,在收集器的API中,创建跟踪涉及到其他类型的"跨度"。

你将从一个名为ptrace.ResourceSpans的类型开始,它表示资源以及它在参与跟踪时产生或接收的所有操作。你可以在/pdata/internal/data/protogen/trace/v1/trace.pb.go中找到其定义。

ptrace.Traces有一个名为ResourceSpans()的方法,它返回一个名为ptrace.ResourceSpansSlice的辅助类型的实例。这个ptrace.ResourceSpansSlice类型有一些方法来帮助你处理由Resource实体参与请求表示的跟踪的ptrace.ResourceSpans数组。

ptrace.ResourceSpansSlice有一个名为AppendEmpty()的方法,用于向数组添加一个新的ptrace.ResourceSapn并返回其引用。

一旦你有了ptrace.ResourceSpan的实例,你将使用名为Resource()的方法来返回与ResourceSpan关联的pcommon.Resource的实例。

用以下更改更新generateTrace()函数:

  • 添加一个名为resourceSpan的变量来表示ResourceSpan
  • 添加一个名为atmResource的变量来表示与ResourceSpan关联的pcommon.Resource
  • 使用上面提到的方法分别初始化这两个变量。

在你实施这些更改后,函数应该如下所示:

func generateTraces(numberOfTraces int) ptrace.Traces{
	traces := ptrace.NewTraces()

	for i := 0; i <= numberOfTraces; i++{
		newAtm := generateAtm()
		newBackendSystem := generateBackendSystem()

		resourceSpan := traces.ResourceSpans().AppendEmpty()
		atmResource := resourceSpan.Resource()
	}

	return traces
}

通过属性描述资源

收集器的 API 提供了一个名为 pcommon 的包(在 pdata 包下嵌套),其中包含了描述 Resource 所需的所有类型和辅助函数。

在收集器的世界中,Resource 是以键/值对格式描述的,表示为 pcommon.Map 类型。

您可以在 Otel Collector 的 GitHub 项目的 /pdata/pcommon/common.go 文件中检查 pcommon.Map 类型的定义以及创建支持的格式的属性值的相关辅助函数。

键/值对提供了非常灵活的方式来帮助建模 Resource 数据,因此 OTel 规范针对此提供了一些指南,以帮助组织和减少可能需要表示的不同类型的遥测生成实体之间的冲突。

这些指南被称为资源语义约定,在 OTel 规范中有记录

当创建自己的属性来表示自己的遥测生成实体时,应遵循规范提供的指南:

属性按描述它们的概念类型进行逻辑分组。同一组中的属性具有以点(.)结尾的公共前缀。例如,所有描述 Kubernetes 属性的属性都以 k8s. 开头。

让我们从打开 tailtracer/model.go 开始,并将 go.opentelemetry.io/collector/pdata/pcommon 添加到 import 语句中,以便您可以访问 pcommon 包的功能。

现在,请继续添加一个函数,从 Atm 实例中读取字段值并将其作为属性(使用前缀“atm.”进行分组)写入 pcommon.Resource 实例。以下是该函数的外观:

func fillResourceWithAtm(resource *pcommon.Resource, atm Atm){
   atmAttrs := resource.Attributes()
   atmAttrs.PutInt("atm.id", atm.ID)
   atmAttrs.PutStr("atm.stateid", atm.StateID)
   atmAttrs.PutStr("atm.ispnetwork", atm.ISPNetwork)
   atmAttrs.PutStr("atm.serialnumber", atm.SerialNumber)
}

资源语义约定还具有规定的属性名称和用于表示在不同领域通用且适用的遥测生成实体的众所周知的值,例如 compute unitenvironment

因此,当您查看 BackendSystem 实体时,它具有表示与 OS 相关的信息和与 Cloud 相关的信息的字段,并且我们将使用资源语义约定规定的属性名称和值来表示该信息在其 Resource 上。

所有资源语义约定属性名称和值都保存在收集器的 GitHub 项目的 /semconv/v1.9.0/generated_resource.go 文件中。

让我们创建一个函数,从 BackendSystem 实例中读取字段值,并将其作为属性写入 pcommon.Resource 实例。打开 tailtracer/model.go 文件并添加以下函数:

func fillResourceWithBackendSystem(resource *pcommon.Resource, backend BackendSystem){
	backendAttrs := resource.Attributes()
	var osType, cloudProvider string

	switch {
		case backend.CloudProvider == "amzn":
			cloudProvider = conventions.AttributeCloudProviderAWS
		case backend.OSType == "mcrsft":
			cloudProvider = conventions.AttributeCloudProviderAzure
		case backend.OSType == "gogl":
			cloudProvider = conventions.AttributeCloudProviderGCP
	}

	backendAttrs.PutStr(conventions.AttributeCloudProvider, cloudProvider)
	backendAttrs.PutStr(conventions.AttributeCloudRegion, backend.CloudRegion)

	switch {
		case backend.OSType == "lnx":
			osType = conventions.AttributeOSTypeLinux
		case backend.OSType == "wndws":
			osType = conventions.AttributeOSTypeWindows
		case backend.OSType == "slrs":
			osType = conventions.AttributeOSTypeSolaris
	}

	backendAttrs.PutStr(conventions.AttributeOSType, osType)
	backendAttrs.PutStr(conventions.AttributeOSVersion, backend.OSVersion)
 }

请注意,我没有将属性名为 “atm.name” 或 “backendsystem.name” 的属性添加到代表 AtmBackendSystem 实体的 pcommon.Resource 中。这是因为大多数(不是全部)与 OTel 追踪规范兼容的分布式跟踪后端系统将 pcommon.Resource 解释为 Service,因此它们期望 pcommon.Resource 包含一个名为 service.name 的必需属性,正如资源语义约定规定的那样。

我们还将使用名为 service.version 的非必需属性来表示 AtmBackendSystem 实体的版本信息。

添加了正确分配 “service.” 分组属性的代码后,tailtracer/model.go 文件如下所示:

model.go

package tailtracer

import (
	"math/rand"
	"time"
	"go.opentelemetry.io/collector/pdata/pcommon"
	"go.opentelemetry.io/collector/pdata/ptrace"
	conventions "go.opentelemetry.io/collector/semconv/v1.9.0"
)

type Atm struct {
	ID           int64
	Version      string
	Name         string
	StateID      string
	SerialNumber string
	ISPNetwork   string
}

type BackendSystem struct {
	Version       string
	ProcessName   string
	OSType        string
	OSVersion     string
	CloudProvider string
	CloudRegion   string
	Endpoint      string
}

func generateAtm() Atm {
	i := getRandomNumber(1, 2)
	var newAtm Atm

	switch i {
		case 1:
			newAtm = Atm{
				ID:           111,
				Name:         "ATM-111-IL",
				SerialNumber: "atmxph-2022-111",
				Version:      "v1.0",
				ISPNetwork:   "comcast-chicago",
				StateID:      "IL",
			}

		case 2:
			newAtm = Atm{
				ID:           222,
				Name:         "ATM-222-CA",
				SerialNumber: "atmxph-2022-222",
				Version:      "v1.0",
				ISPNetwork:   "comcast-sanfrancisco",
				StateID:      "CA",
			}
	}

	return newAtm
}

func generateBackendSystem() BackendSystem {
	i := getRandomNumber(1, 3)

	newBackend := BackendSystem{
		ProcessName:   "accounts",
		Version:       "v2.5",
		OSType:        "lnx",
		OSVersion:     "4.16.10-300.fc28.x86_64",
		CloudProvider: "amzn",
		CloudRegion:   "us-east-2",
	}

	switch i {
		case 1:
			newBackend.Endpoint = "api/v2.5/balance"
		case 2:
			newBackend.Endpoint = "api/v2.5/deposit"
		case 3:
			newBackend.Endpoint = "api/v2.5/withdrawn"
	}

	return newBackend
}

func getRandomNumber(min int, max int) int {
	rand.Seed(time.Now().UnixNano())
	i := (rand.Intn(max-min+1) + min)
	return i
}

func generateTraces(numberOfTraces int) ptrace.Traces {
	traces := ptrace.NewTraces()

	for i := 0; i <= numberOfTraces; i++ {
		newAtm := generateAtm()
		newBackendSystem := generateBackendSystem()

		resourceSpan := traces.ResourceSpans().AppendEmpty()
		atmResource := resourceSpan.Resource()
		fillResourceWithAtm(&atmResource, newAtm)

		resourceSpan = traces.ResourceSpans().AppendEmpty()
		backendResource := resourceSpan.Resource()
		fillResourceWithBackendSystem(&backendResource, newBackendSystem)
	}

	return traces
}

func fillResourceWithAtm(resource *pcommon.Resource, atm Atm) {
	atmAttrs := resource.Attributes()
	atmAttrs.PutInt("atm.id", atm.ID)
	atmAttrs.PutStr("atm.stateid", atm.StateID)
	atmAttrs.PutStr("atm.ispnetwork", atm.ISPNetwork)
	atmAttrs.PutStr("atm.serialnumber", atm.SerialNumber)
	atmAttrs.PutStr(conventions.AttributeServiceName, atm.Name)
	atmAttrs.PutStr(conventions.AttributeServiceVersion, atm.Version)

}

func fillResourceWithBackendSystem(resource *pcommon.Resource, backend BackendSystem) {
	backendAttrs := resource.Attributes()
	var osType, cloudProvider string

	switch {
		case backend.CloudProvider == "amzn":
			cloudProvider = conventions.AttributeCloudProviderAWS
		case backend.OSType == "mcrsft":
			cloudProvider = conventions.AttributeCloudProviderAzure
		case backend.OSType == "gogl":
			cloudProvider = conventions.AttributeCloudProviderGCP
	}

	backendAttrs.PutStr(conventions.AttributeCloudProvider, cloudProvider)
	backendAttrs.PutStr(conventions.AttributeCloudRegion, backend.CloudRegion)

	switch {
		case backend.OSType == "lnx":
			osType = conventions.AttributeOSTypeLinux
		case backend.OSType == "wndws":
			osType = conventions.AttributeOSTypeWindows
		case backend.OSType == "slrs":
			osType = conventions.AttributeOSTypeSolaris
	}

	backendAttrs.PutStr(conventions.AttributeOSType, osType)
	backendAttrs.PutStr(conventions.AttributeOSVersion, backend.OSVersion)

	backendAttrs.PutStr(conventions.AttributeServiceName, backend.ProcessName)
	backendAttrs.PutStr(conventions.AttributeServiceVersion, backend.Version)
}

用跨度表示操作

现在,您已经有了一个ResourceSpan实例,其中包含正确填充属性以表示AtmBackendSystem实体的Resource,您可以开始在ResourceSpan中表示每个Resource作为跟踪的一部分执行的操作。

在OTel世界中,为了生成遥测数据,系统需要手动或通过一个仪表库进行自动仪表化。

仪表化库负责在操作在跟踪中发生的范围(也称为仪表化范围)中设置范围,然后在跟踪的上下文中将这些操作描述为跨度。

pdata.ResourceSpans有一个名为ScopeSpans()的方法,它返回一个名为ptrace.ScopeSpansSlice的辅助类型实例。ptrace.ScopeSpansSlice类型有助于处理ptrace.ScopeSpans数组,该数组包含与不同仪表化范围和跟踪上下文内生成的跨度的ptrace.ScopeSpan数量一样多的项。

ptrace.ScopeSpansSlice有一个名为AppendEmpty()的方法,该方法向数组中添加一个新的ptrace.ScopeSpans并返回其引用。

让我们创建一个函数来实例化一个ptrace.ScopeSpans,表示ATM系统的仪表化范围和它的跨度。打开tailtracer/model.go文件,并添加以下函数:

func appendAtmSystemInstrScopeSpans(resourceSpans *ptrace.ResourceSpans) (ptrace.ScopeSpans){
    scopeSpans := resourceSpans.ScopeSpans().AppendEmpty()

    return scopeSpans
}

ptrace.ScopeSpans有一个名为Scope()的方法,返回表示生成跨度的仪表化范围的pcommon.InstrumentationScope实例的引用。

pcommon.InstrumentationScope有以下方法来描述仪表化范围:

  • SetName(v string)设置仪表化库的名称

  • SetVersion(v string)设置仪表化库的版本

  • Name() string返回与仪表化库关联的名称

  • Version() string返回与仪表化库关联的版本

让我们更新appendAtmSystemInstrScopeSpans函数,以便我们可以设置新的ptrace.ScopeSpans的仪表化范围的名称和版本。下面是更新后的appendAtmSystemInstrScopeSpans函数的样子:

func appendAtmSystemInstrScopeSpans(resourceSpans *ptrace.ResourceSpans) (ptrace.ScopeSpans){
    scopeSpans := resourceSpans.ScopeSpans().AppendEmpty()
    scopeSpans.Scope().SetName("atm-system")
    scopeSpans.Scope().SetVersion("v1.0")
    return scopeSpans
}

现在,您可以更新generateTraces函数,并添加变量来表示AtmBackendSystem实体使用的仪表化范围,通过使用appendAtmSystemInstrScopeSpans()来初始化它们。下面是更新后的generateTraces()函数的样子:

func generateTraces(numberOfTraces int) ptrace.Traces{
    traces := ptrace.NewTraces()

    for i := 0; i <= numberOfTraces; i++{
        newAtm := generateAtm()
        newBackendSystem := generateBackendSystem()

        resourceSpan := traces.ResourceSpans().AppendEmpty()
        atmResource := resourceSpan.Resource()
        fillResourceWithAtm(&atmResource, newAtm)

        atmInstScope := appendAtmSystemInstrScopeSpans(&resourceSpan)

        resourceSpan = traces.ResourceSpans().AppendEmpty()
        backendResource := resourceSpan.Resource()
        fillResourceWithBackendSystem(&backendResource, newBackendSystem)

        backendInstScope := appendAtmSystemInstrScopeSpans(&resourceSpan)
    }

    return traces
}

到此为止,您已经拥有了在您的系统中表示遥测生成实体所需的一切,以及负责标识操作并为系统生成跟踪的仪表化范围。下一步是最终创建表示给定仪表化范围生成的操作的跨度。

ptrace.ScopeSpans有一个名为Spans()的方法,它返回一个名为ptrace.SpanSlice的辅助类型实例。ptrace.SpanSlice类型有助于处理ptrace.Span数组,该数组包含与仪表化范围能够识别并描述的操作数量一样多的项,作为跟踪的一部分。

ptrace.SpanSlice有一个名为AppendEmpty()的方法,该方法向数组中添加一个新的ptrace.Span并返回其引用。

ptrace.Span有以下方法来描述操作:

  • SetTraceID(v pcommon.TraceID)设置与此跨度关联的跟踪的pcommon.TraceID

  • SetSpanID(v pcommon.SpanID)设置唯一标识跨度在其所属的跟踪的上下文中的pcommon.SpanID

  • SetParentSpanID(v pcommon.SpanID)设置父跨度/操作的pcommon.SpanID,以表示此跨度表示的操作是否作为父跨度(嵌套)的一部分执行

  • SetName(v string)设置跨度的操作名称

  • SetKind(v ptrace.SpanKind)设置ptrace.SpanKind,定义跨度表示的操作类型。

  • SetStartTimestamp(v pcommon.Timestamp)设置表示跨度表示的操作开始时的pcommon.Timestamp

  • SetEndTimestamp(v pcommon.Timestamp)设置表示跨度表示的操作结束时的pcommon.Timestamp

如上所述,ptrace.Span通过2个必需的ID唯一标识,它们自己的唯一ID使用pcommon.SpanID类型表示,以及它们所关联的跟踪的ID使用pcommon.TraceID类型表示。

pcommon.TraceID必须包含一个通过16个字节数组表示的全局唯一ID,并且应遵循W3C跟踪上下文规范的要求,而pcommon.SpanID是在其所关联的跟踪的上下文中的唯一ID,它通过8个字节的数组表示。

pcommon包提供了以下类型来生成跨度的ID:

  • type TraceID [16]byte

  • type SpanID [8]byte

在本教程中,您将使用github.com/google/uuid包中的函数创建pcommon.TraceID的ID,使用crypto/rand包随机生成pcommon.SpanID。打开tailtracer/model.go文件,并将这两个包添加到import语句中;之后,添加以下函数以帮助生成这两个ID:

import (
    crand "crypto/rand"
    "math/rand"
    ...
)

func NewTraceID() pcommon.TraceID {
    return pcommon.TraceID(uuid.New())
}

func NewSpanID() pcommon.SpanID {
    var rngSeed int64
    _ = binary.Read(crand.Reader, binary.LittleEndian, &rngSeed)
    randSource := rand.New(rand.NewSource(rngSeed))

    var sid [8]byte
    randSource.Read(sid[:])
    spanID := pcommon.SpanID(sid)

    return spanID
}

现在,您有了正确标识跨度的方法,可以开始创建它们来表示系统中实体的操作及其之间的关系。

作为generateBackendSystem()函数的一部分,我们已经随机分配了BackEndSystem实体作为系统服务提供的操作。此刻,我们将打开tailtracer/model.go文件,并添加一个名为appendTraceSpans()的函数,该函数将负责创建一个跟踪并附加表示BackendSystem操作的跨度。下面是appendTraceSpans()函数的初始实现:

func appendTraceSpans(backend *BackendSystem, backendScopeSpans *ptrace.ScopeSpans, atmScopeSpans *ptrace.ScopeSpans) {
    traceId := NewTraceID()
    backendSpanId := NewSpanID()

    backendDuration, _ := time.ParseDuration("1s")
    backendSpanStartTime := time.Now()
    backendSpanFinishTime := backendSpanStartTime.Add(backendDuration)

    backendSpan := backendScopeSpans.Spans().AppendEmpty()
    backendSpan.SetTraceID(traceId)
    backendSpan.SetSpanID(backendSpanId)
    backendSpan.SetName(backend.Endpoint)
    backendSpan.SetKind(ptrace.SpanKindServer)
    backendSpan.SetStartTimestamp(pcommon.NewTimestampFromTime(backendSpanStartTime))
    backendSpan.SetEndTimestamp(pcommon.NewTimestampFromTime(backendSpanFinishTime))
}

您可能已经注意到appendTraceSpans()函数中的两个ptrace.ScopeSpans参数的引用的问题,但现在不用担心,我们将稍后回到这里。

现在,您将更新generateTraces()函数,使其能够通过调用appendTraceSpans()函数实际生成跟踪。下面是更新后的generateTraces()函数的样子:

func generateTraces(numberOfTraces int) ptrace.Traces {
    traces := ptrace.NewTraces()

    for i := 0; i <= numberOfTraces; i++ {
        newAtm := generateAtm()
        newBackendSystem := generateBackendSystem()

        resourceSpan := traces.ResourceSpans().AppendEmpty()
        atmResource := resourceSpan.Resource()
        fillResourceWithAtm(&atmResource, newAtm)

        atmInstScope := appendAtmSystemInstrScopeSpans(&resourceSpan)

        resourceSpan = traces.ResourceSpans().AppendEmpty()
        backendResource := resourceSpan.Resource()
        fillResourceWithBackendSystem(&backendResource, newBackendSystem)

        backendInstScope := appendAtmSystemInstrScopeSpans(&resourceSpan)

        appendTraceSpans(&newBackendSystem, &backendInstScope, &atmInstScope)
    }

    return traces
}

现在,您已经表示了BackendSystem实体及其操作的跨度,并妥善放置在了一个跟踪上下文中!您需要做的就是将生成的跟踪通过管道推送,以便下一个消费者(处理器或导出器)可以接收和处理它。

consumer.Traces有一个名为ConsumeTraces()的方法,该方法负责将生成的跟踪推送到管道中的下一个消费者。现在,您需要更新tailtracerReceiver类型的Start()方法并添加使用该方法的代码。

打开tailtracer/trace-receiver.go文件,并将Start()方法更新为以下内容:

func (tailtracerRcvr *tailtracerReceiver) Start(ctx context.Context, host component.Host) error {
    tailtracerRcvr.host = host
    ctx = context.Background()
    ctx, tailtracerRcvr.cancel = context.WithCancel(ctx)

    interval, _ := time.ParseDuration(tailtracerRcvr.config.Interval)
    go func() {
        ticker := time.NewTicker(interval)
        defer ticker.Stop()
        for {
            select {
                case <-ticker.C:
                    tailtracerRcvr.logger.Info("I should start processing traces now!")
                    tailtracerRcvr.nextConsumer.ConsumeTraces(ctx, generateTraces(tailtracerRcvr.config.NumberOfTraces))
                case <-ctx.Done():
                    return
            }
        }
    }()

    return nil
}

如果你运行了otelcol-dev,那么在运行2分钟后,输出应该如下所示:

Starting: /Users/rquedas/go/bin/dlv dap --check-go-version=false --listen=127.0.0.1:54625 --log-dest=3 from /Users/rquedas/Documents/vscode-workspace/otel4devs/collector/receiver/trace-receiver/otelcol-dev
DAP server listening at: 127.0.0.1:54625
2023-09-28T08:59:52.111-0700    info    service@v0.86.0/telemetry.go:84 设置自己的遥测...
2023-09-28T08:59:52.111-0700    info    service@v0.86.0/telemetry.go:201        提供Prometheus指标      {"address": ":8888", "level": "Basic"}
2023-09-28T08:59:52.111-0700    debug   exporter@v0.86.0/exporter.go:273        稳定的组件。       {"kind": "exporter", "data_type": "traces", "name": "otlp/jaeger"}
2023-09-28T08:59:52.112-0700    info    exporter@v0.86.0/exporter.go:275        开发中的组件。可能会在未来更改。        {"kind": "exporter", "data_type": "traces", "name": "debug"}
2023-09-28T08:59:52.112-0700    debug   receiver@v0.86.0/receiver.go:294        稳定的组件。       {"kind": "receiver", "name": "otlp", "data_type": "traces"}
2023-09-28T08:59:52.112-0700    debug   receiver@v0.86.0/receiver.go:294        Alpha组件。未来可能更改。      {"kind": "receiver", "name": "tailtracer", "data_type": "traces"}
2023-09-28T08:59:52.112-0700    info    service@v0.86.0/service.go:138  开始运行otelcol-dev... {"Version": "1.0.0", "NumCPU": 10}
2023-09-28T08:59:52.112-0700    info    extensions/extensions.go:31     开始扩展程序...
2023-09-28T08:59:52.113-0700    info    otlpreceiver@v0.86.0/otlp.go:83 开始GRPC服务器    {"kind": "receiver", "name": "otlp", "data_type": "traces", "endpoint": "0.0.0.0:4317"}
2023-09-28T08:59:52.113-0700    info    service@v0.86.0/service.go:161  一切准备就绪。开始运行和处理数据。
2023-09-28T08:59:52.113-0700    info    zapgrpc/zapgrpc.go:178  [core] [Server #3 ListenSocket #4] 创建ListenSocket {"grpc_log": true}
2023-09-28T08:59:52.124-0700    info    zapgrpc/zapgrpc.go:178  [core] [Channel #1 SubChannel #2] Subchannel Connectivity change to READY       {"grpc_log": true}
2023-09-28T08:59:52.124-0700    info    zapgrpc/zapgrpc.go:178  [core] [pick-first-lb 0x1400054fd10] 收到SubConn状态更新: 0x1400054fec0, {ConnectivityState:READY ConnectionError:<nil>}       {"grpc_log": true}
2023-09-28T08:59:52.124-0700    info    zapgrpc/zapgrpc.go:178  [core] [Channel #1] Channel Connectivity change to READY        {"grpc_log": true}
2023-09-28T09:00:52.113-0700    info    tailtracer/receiver.go:33       我现在应该开始处理跟踪了!   {"kind": "receiver", "name": "tailtracer", "data_type": "traces"}

2023-09-28T09:00:52.743-0700	INFO	debugexporter/debug_exporter.go:40	TracesExporter	{"#spans": 1}
2023-09-28T09:00:52.743-0700	DEBUG	debugexporter/debug_exporter.go:49	ResourceSpans #0
Resource SchemaURL:
Resource labels:
     -> atm.id: INT(222)
     -> atm.stateid: STRING(CA)
     -> atm.ispnetwork: STRING(comcast-sanfrancisco)
     -> atm.serialnumber: STRING(atmxph-2022-222)
     -> service.name: STRING(ATM-222-CA)
     -> service.version: STRING(v1.0)
ScopeSpans #0
ScopeSpans SchemaURL:
InstrumentationScope atm-system v1.0
ResourceSpans #1
Resource SchemaURL:
Resource labels:
     -> cloud.provider: STRING(aws)
     -> cloud.region: STRING(us-east-2)
     -> os.type: STRING(linux)
     -> os.version: STRING(4.16.10-300.fc28.x86_64)
     -> service.name: STRING(accounts)
     -> service.version: STRING(v2.5)
ScopeSpans #0
ScopeSpans SchemaURL:
InstrumentationScope atm-system v1.0
Span #0
    Trace ID       : 5cce8a774d4546c2a5cbdeb607ec74c9
    Parent ID      :
    ID             : bb25c05c7fb13084
    Name           : api/v2.5/balance
    Kind           : SPAN_KIND_SERVER
    Start time     : 2023-09-28 09:00:52.743385 +0000 UTC
    End time       : 2023-09-28 09:00:53.743385 +0000 UTC
    Status code    : STATUS_CODE_OK
    Status message :
2023-09-28T09:00:52.743-0500	info	tailtracer/trace-receiver.go:33	我现在应该开始处理跟踪了!	{"kind": "receiver", "name": "tailtracer"}
2023-09-28T09:00:52.744-0500	INFO	debugexporter/debug_exporter.go:40	TracesExporter	{"#spans": 1}
2023-09-28T09:00:52.744-0500	DEBUG	debugexporter/debug_exporter.go:49	ResourceSpans #0
Resource SchemaURL:
Resource labels:
     -> atm.id: INT(111)
     -> atm.stateid: STRING(IL)
     -> atm.ispnetwork: STRING(comcast-chicago)
     -> atm.serialnumber: STRING(atmxph-2022-111)
     -> service.name: STRING(ATM-111-IL)
     -> service.version: STRING(v1.0)
ScopeSpans #0
ScopeSpans SchemaURL:
InstrumentationScope atm-system v1.0
ResourceSpans #1
Resource SchemaURL:
Resource labels:
     -> cloud.provider: STRING(aws)
     -> cloud.region: STRING(us-east-2)
     -> os.type: STRING(linux)
     -> os.version: STRING(4.16.10-300.fc28.x86_64)
     -> service.name: STRING(accounts)
     -> service.version: STRING(v2.5)
ScopeSpans #0
ScopeSpans SchemaURL:
InstrumentationScope atm-system v1.0
Span #0
    Trace ID       : 8a6ca822db0847f48facfebbb08bbb9e
    Parent ID      :
    ID             : 7cf668c1273ecee5
    Name           : api/v2.5/withdrawn
    Kind           : SPAN_KIND_SERVER
    Start time     : 2023-09-28 09:00:52.74404 +0000 UTC
    End time       : 2023-09-28 09:00:53.74404 +0000 UTC
    Status code    : STATUS_CODE_OK
    Status message :

这是Jaeger中生成的跟踪的样本: Jaeger跟踪

您当前在Jaeger中看到的是表示一个服务,该服务正在接收来自未经OTel SDK检测的外部实体的请求,因此无法将其识别为跟踪的起源/开始。为了使ptrace.Span能够理解它表示作为另一个操作的结果而执行的操作,该操作可以是在相同跟踪上下文中的同一Resource内发起的,也可以是外部(嵌套/子节点)发起的,您需要执行以下操作:

  • 通过调用SetTraceID()方法并将父/调用者ptrace.Spanpcommon.TraceID作为参数传递,设置与调用者操作相同的跟踪上下文。
  • 通过调用SetParentId()方法并将父/调用者ptrace.Spanpcommon.SpanID作为参数传递,定义调用者操作在跟踪上下文中的身份。

现在,您将创建一个表示Atm实体操作的ptrace.Span,并将其设置为BackendSystem跨度的父跨度。打开tailtracer/model.go文件,并将appendTraceSpans()函数更新如下:

func appendTraceSpans(backend *BackendSystem, backendScopeSpans *ptrace.ScopeSpans, atmScopeSpans *ptrace.ScopeSpans) {
	traceId := NewTraceID()

	var atmOperationName string

	switch {
		case strings.Contains(backend.Endpoint, "balance"):
			atmOperationName = "Check Balance"
		case strings.Contains(backend.Endpoint, "deposit"):
			atmOperationName = "Make Deposit"
		case strings.Contains(backend.Endpoint, "withdraw"):
			atmOperationName = "Fast Cash"
		}

	atmSpanId := NewSpanID()
    atmSpanStartTime := time.Now()
    atmDuration, _ := time.ParseDuration("4s")
    atmSpanFinishTime := atmSpanStartTime.Add(atmDuration)


	atmSpan := atmScopeSpans.Spans().AppendEmpty()
	atmSpan.SetTraceID(traceId)
	atmSpan.SetSpanID(atmSpanId)
	atmSpan.SetName(atmOperationName)
	atmSpan.SetKind(ptrace.SpanKindClient)
	atmSpan.Status().SetCode(ptrace.StatusCodeOk)
	atmSpan.SetStartTimestamp(pcommon.NewTimestampFromTime(atmSpanStartTime))
	atmSpan.SetEndTimestamp(pcommon.NewTimestampFromTime(atmSpanFinishTime))


	backendSpanId := NewSpanID()

	backendDuration, _ := time.ParseDuration("2s")
    backendSpanStartTime := atmSpanStartTime.Add(backendDuration)


	backendSpan := backendScopeSpans.Spans().AppendEmpty()
	backendSpan.SetTraceID(atmSpan.TraceID())
	backendSpan.SetSpanID(backendSpanId)
	backendSpan.SetParentSpanID(atmSpan.SpanID())
	backendSpan.SetName(backend.Endpoint)
	backendSpan.SetKind(ptrace.SpanKindServer)
	backendSpan.Status().SetCode(ptrace.StatusCodeOk)
	backendSpan.SetStartTimestamp(pcommon.NewTimestampFromTime(backendSpanStartTime))
	backendSpan.SetEndTimestamp(atmSpan.EndTimestamp())

}

现在再次运行otelcol-dev,并在运行2分钟后,您应该开始在Jaeger中看到类似以下的跟踪: Jaeger跟踪

我们现在有了表示AtmBackendSystem遥测生成实体的服务,并且可以充分理解这两个实体如何被使用并对用户执行的操作的性能做出贡献。

以下是Jaeger中其中一个跟踪的详细视图: Jaeger跟踪

到此为止!您已经完成了本教程并成功实现了一个跟踪接收器,祝贺您!


  1. 在v0.86.0之前,请使用loggingexporter而不是debugexporter。 ↩︎ ↩︎

最后修改 December 13, 2023: improve glossary translation (46f8201b)