构建追踪接收器
如果您正在阅读本教程,您可能已经对分布式跟踪背后的OpenTelemetry概念有了一定了解,但是如果不了解,您可以通过这里快速了解。
以下是根据OpenTelemetry的定义的这些概念:
跟踪是追踪构成应用程序的服务处理的单个请求(称为跟踪)的进展。该请求可能由用户或应用程序发起。分布式跟踪是一种穿越进程、网络和安全边界的跟踪形式。
尽管该定义看起来非常应用程序中心化,但您可以利用OpenTelemetry跟踪模型来表示请求并快速了解其持续时间以及完成它所涉及的每个步骤的详细信息。
假设您已经有一个生成某种跟踪遥测的系统,OpenTelemetry收集器是帮助您将其提供给OTel世界的入口。
在收集器中,跟踪接收器负责接收并将请求遥测从其原始格式转换为OTel跟踪模型,以便可以通过收集器管道正确处理信息。
要实现跟踪接收器,您需要以下内容:
Config
实现,以使跟踪接收器能够在收集器的config.yaml
中收集和验证其配置。receiver.Factory
实现,这样收集器就可以正确实例化跟踪接收器组件。TracesReceiver
实现,负责收集遥测,将其转换为内部跟踪表示,并将信息传递给管道中的下一个消费者。
在本教程中,我们将创建一个名为 tailtracer
的示例跟踪接收器,该接收器模拟拉取操作,并生成跟踪作为该操作的结果。接下来的几节将指导您按顺序实现上述步骤以创建接收器,让我们开始吧。
设置接收器的开发和测试环境
首先,使用构建自定义收集器教程创建名为 otelcol-dev
的收集器实例;您只需复制步骤2中描述的 builder-config.yaml
并运行 builder。这样,您就会得到一个名为 otelcol-dev
的文件夹,其中包含已准备好使用的收集器开发实例。
为了正确测试跟踪接收器,您需要一个分布式跟踪后端,以便收集器可以将遥测发送到该后端。我们将使用Jaeger,如果您还没有运行中的 Jaeger
实例,可以使用以下命令轻松启动一个 Docker 容器。
docker run -d --name jaeger \
-e COLLECTOR_OTLP_ENABLED=true \
-p 16686:16686 \
-p 14317:4317 \
-p 14318:4318 \
jaegertracing/all-in-one:1.41
现在,创建一个 config.yaml
文件,以便设置收集器的组件。
cd otelcol-dev
touch config.yaml
目前,您只需要一个基本的跟踪管道,其中包含 otlp
接收器以及 otlp
和 debug
1 导出器,以下是您的 config.yaml
文件的内容:
config.yaml
receivers:
otlp:
protocols:
grpc:
processors:
exporters:
# 注意:在 v0.86.0 之前,请使用 `logging` 而不是 `debug`。
debug:
verbosity: detailed
otlp/jaeger:
endpoint: localhost:14317
tls:
insecure: true
service:
pipelines:
traces:
receivers: [otlp]
processors: []
exporters: [otlp/jaeger, debug]
telemetry:
logs:
level: debug
请注意,我只在 otlp
导出器配置中使用了 insecure
标志,以便让本地开发环境更加轻松;在生产环境中运行收集器时,请不要使用此标志。
为了验证初始管道是否正确设置,运行 otelcol-dev
命令后,您应该看到以下输出:
$ ./otelcol-dev --config config.yaml
2023-09-12T15:22:18.652-0700 info service/telemetry.go:84 Setting up own telemetry...
2023-09-12T15:22:18.652-0700 info service/telemetry.go:201 Serving Prometheus metrics {"address": ":8888", "level": "Basic"}
2023-09-12T15:22:18.652-0700 debug exporter@v0.85.0/exporter.go:273 Stable component. {"kind": "exporter", "data_type": "traces", "name": "otlp/jaeger"}
2023-09-12T15:22:18.652-0700 info exporter@v0.85.0/exporter.go:275 Development component. May change in the future. {"kind": "exporter", "data_type": "traces", "name": "debug"}
2023-09-12T15:22:18.652-0700 debug receiver@v0.85.0/receiver.go:294 Stable component. {"kind": "receiver", "name": "otlp", "data_type": "traces"}
2023-09-12T15:22:18.652-0700 info service/service.go:138 Starting otelcontribcol... {"Version": "0.85.0", "NumCPU": 10}
2023-09-12T15:22:18.652-0700 info extensions/extensions.go:31 Starting extensions...
2023-09-12T15:22:18.652-0700 info zapgrpc/zapgrpc.go:178 [core] [Channel #1] Channel created {"grpc_log": true}
2023-09-12T15:22:18.652-0700 info zapgrpc/zapgrpc.go:178 [core] [Channel #1] original dial target is: "localhost:14317" {"grpc_log": true}
2023-09-12T15:22:18.652-0700 info zapgrpc/zapgrpc.go:178 [core] [Channel #1] parsed dial target is: {URL:{Scheme:localhost Opaque:14317 User: Host: Path: RawPath: OmitHost:false ForceQuery:false RawQuery: Fragment: RawFragment:}} {"grpc_log": true}
2023-09-12T15:22:18.652-0700 info zapgrpc/zapgrpc.go:178 [core] [Channel #1] fallback to scheme "passthrough" {"grpc_log": true}
2023-09-12T15:22:18.652-0700 info zapgrpc/zapgrpc.go:178 [core] [Channel #1] parsed dial target is: {URL:{Scheme:passthrough Opaque: User: Host: Path:/localhost:14317 RawPath: OmitHost:false ForceQuery:false RawQuery: Fragment: RawFragment:}} {"grpc_log": true}
2023-09-12T15:22:18.652-0700 info zapgrpc/zapgrpc.go:178 [core] [Channel #1] Channel authority set to "localhost:14317" {"grpc_log": true}
2023-09-12T15:22:18.652-0700 info zapgrpc/zapgrpc.go:178 [core] [Channel #1] Channel switches to new LB policy "pick_first" {"grpc_log": true}
2023-09-12T15:22:18.652-0700 info zapgrpc/zapgrpc.go:178 [core] [Channel #1 SubChannel #2] Subchannel created {"grpc_log": true}
2023-09-12T15:22:18.652-0700 info zapgrpc/zapgrpc.go:178 [core] [Channel #1] Channel Connectivity change to CONNECTING {"grpc_log": true}
2023-09-12T15:22:18.652-0700 info zapgrpc/zapgrpc.go:178 [core] [Channel #1 SubChannel #2] Subchannel Connectivity change to CONNECTING {"grpc_log": true}
2023-09-12T15:22:18.652-0700 info zapgrpc/zapgrpc.go:178 [core] [Server #3] Server created {"grpc_log": true}
2023-09-12T15:22:18.652-0700 info zapgrpc/zapgrpc.go:178 [core] [pick-first-lb 0x140027d9b30] Received SubConn state update: 0x140027d9ce0, {ConnectivityState:CONNECTING ConnectionError:<nil>} {"grpc_log": true}
2023-09-12T15:22:18.652-0700 info zapgrpc/zapgrpc.go:178 [core] [Channel #1 SubChannel #2] Subchannel picks a new address "localhost:14317" to connect {"grpc_log": true}
2023-09-12T15:22:18.652-0700 info otlpreceiver@v0.85.0/otlp.go:83 Starting GRPC server {"kind": "receiver", "name": "otlp", "data_type": "traces", "endpoint": "0.0.0.0:4317"}
2023-09-12T15:22:18.652-0700 info service/service.go:161 Everything is ready. Begin running and processing data.
2023-09-12T15:22:18.652-0700 info zapgrpc/zapgrpc.go:178 [core] [Server #3 ListenSocket #4] ListenSocket created {"grpc_log": true}
2023-09-12T15:22:18.662-0700 info zapgrpc/zapgrpc.go:178 [core] [Channel #1 SubChannel #2] Subchannel Connectivity change to READY {"grpc_log": true}
确保您看到最后一行,这将确认OTLP导出器已成功建立与本地Jaeger实例的gRPC连接。现在,我们准备好环境后,让我们开始编写接收器代码。
现在,创建名为 tailtracer
的另一个文件夹,以便我们有一个位置来存放所有接收器代码。
mkdir tailtracer
每个收集器组件都应该作为一个 Go 模块创建,所以您需要正确初始化 tailtracer
模块。在我的情况下,命令如下所示:
cd tailtracer
go mod init github.com/rquedas/otel4devs/collector/receiver/trace-receiver/tailtracer
读取和验证接收器设置
为了能够实例化并参与管道,收集器需要识别您的接收器并从其配置文件中正确加载设置。
tailtracer
接收器将具有以下设置:
interval
:表示遥测拉取操作之间的时间间隔(以分钟为单位)的字符串number_of_traces
:每个间隔生成的模拟跟踪数量
以下是 tailtracer
接收器设置的样子:
receivers:
tailtracer: # 此行表示接收器的 ID
interval: 1m
number_of_traces: 1
在 tailtracer
文件夹下,创建一个名为 config.go
的文件,您将在其中编写支持接收器设置的所有代码。
cd tailtracer
touch config.go
为了实现组件的配置方面,您需要创建一个 Config
结构体。将以下代码添加到 config.go
文件中:
package tailtracer
type Config struct{
}
为了让接收器访问其设置,Config
结构体必须对应接收器的每个设置字段。
在实现的要求中,interval
值将是可选的(我们稍后将看到生成默认值),但是当定义时应该至少为1分钟(1m),而 number_of_traces
将是一个必需的值。在实现了上述要求之后,config.go
文件应如下所示。
config.go
package tailtracer
// Config represents the receiver config settings within the collector's config.yaml
type Config struct {
Interval string `mapstructure:"interval"`
NumberOfTraces int `mapstructure:"number_of_traces"`
}
检查您的工作
- 添加了
Interval
和NumberOfTraces
字段,以便从config.yaml
中正确获取它们的值。
既然您可以访问这些设置,那么您就可以通过根据可选的 ConfigValidator
接口实现 Validate
方法来提供所需的任何类型的验证。
在这种情况下,interval
值将是可选的(我们将在后面生成默认值),但是当定义时应该至少为1分钟(1m),而 number_of_traces
将是一个必需的值。在实现了 Validate
方法后,config.go 文件应如下所示。
config.go
package tailtracer
import (
"fmt"
"time"
)
// Config represents the receiver config settings within the collector's config.yaml
type Config struct {
Interval string `mapstructure:"interval"`
NumberOfTraces int `mapstructure:"number_of_traces"`
}
// Validate checks if the receiver configuration is valid
func (cfg *Config) Validate() error {
interval, _ := time.ParseDuration(cfg.Interval)
if interval.Minutes() < 1 {
return fmt.Errorf("当定义时,间隔必须设置为至少1分钟(1m)")
}
if cfg.NumberOfTraces < 1 {
return fmt.Errorf("number_of_traces 必须大于或等于1")
}
return nil
}
检查您的工作
- 导入了
fmt
包以便正确格式化打印错误消息。 - 添加了
Validate
方法到 Config 结构体,检查interval
设置值是否至少为1分钟(1m),以及number_of_traces
设置值是否大于或等于1。如果不满足,则收集器在启动过程中会生成错误并相应地显示消息。
如果您想更详细地了解在组件的配置方面涉及的结构体和接口,请参阅收集器的 GitHub 项目中的 component/config.go 文件。
启用Collector以实例化你的接收器
在本教程开始时,你创建了一个名为otelcol-dev
实例,其中包含以下组件:
- 接收器:OTLP接收器
- 处理器:批处理器
- 导出器:Debug(调试)1 和 OTLP导出器
前往并打开otelcol-dev
文件夹下的components.go
文件,我们来看一下components()
函数。
func components() (otelcol.Factories, error) {
var err error
factories := otelcol.Factories{}
factories.Extensions, err = extension.MakeFactoryMap(
)
if err != nil {
return otelcol.Factories{}, err
}
factories.Receivers, err = receiver.MakeFactoryMap(
otlpreceiver.NewFactory(),
)
if err != nil {
return otelcol.Factories{}, err
}
factories.Exporters, err = exporter.MakeFactoryMap(
debugexporter.NewFactory(),
otlpexporter.NewFactory(),
)
if err != nil {
return otelcol.Factories{}, err
}
factories.Processors, err = processor.MakeFactoryMap(
batchprocessor.NewFactory(),
)
if err != nil {
return otelcol.Factories{}, err
}
return factories, nil
}
正如你所看到的,components()
函数负责为Collector提供所有组件的工厂,这些工厂由类型为otelcol.Factories
的变量factories
表示(otelcol.Factories结构体的声明在这里),然后用于实例化由Collector的流水线配置和使用的组件。
注意,factories.Receivers
是保存所有接收器工厂(receiver.Factory
的实例)的地方,目前只有otlpreceiver
工厂,该工厂通过调用otlpreceiver.NewFactory()
函数进行实例化。
tailtracer
接收器必须提供一个receiver.Factory
实现,虽然你可以找到一个receiver.Factory
接口(你可以在Collector项目中的receiver/receiver.go文件中找到其定义),但是,为了提供正确的实现方式,你应该使用go.opentelemetry.io/collector/receiver
包中可用的函数。
实现你的receiver.Factory
首先,在tailtracer
文件夹中创建一个名为factory.go
的文件。
cd tailtracer
touch factory.go
现在,让我们按照惯例添加一个名为NewFactory()
的函数,该函数将负责实例化tailtracer
工厂。请在你的factory.go
文件中添加以下代码:
package tailtracer
import (
"go.opentelemetry.io/collector/receiver"
)
// NewFactory creates a factory for tailtracer receiver.
func NewFactory() receiver.Factory {
}
为了实例化你的tailtracer
接收器工厂,你将使用receiver
包中的以下函数:
func NewFactory(cfgType component.Type, createDefaultConfig component.CreateDefaultConfigFunc, options ...FactoryOption) Factory
receiver.NewFactory()
会实例化并返回一个receiver.Factory
,它需要以下参数:
component.Type
:在所有Collector组件中用于标识你的接收器的唯一字符串标识符。component.CreateDefaultConfigFunc
:一个引用,返回你的接收器的component.Config
实例的函数。...FactoryOption
:一个receiver.FactoryOption
切片,用于确定你的接收器能够处理哪种类型的信号。
现在,让我们实现支持receiver.NewFactory()
所需的所有参数。
标识并为接收器提供默认设置
前面我们提到,我们的tailtracer
接收器的interval
设置是可选的,因此你需要为其提供一个默认值,以便在默认设置中使用。
请继续向你的factory.go
文件添加以下代码:
const (
typeStr = "tailtracer"
defaultInterval = 1 * time.Minute
)
对于默认设置,你只需要添加一个函数,该函数返回一个包含tailtracer
接收器的默认配置的component.Config
。
为了实现这一点,请继续向你的factory.go
文件添加以下代码:
func createDefaultConfig() component.Config {
return &Config{
Interval: string(defaultInterval),
}
}
在进行这两个更改后,你会发现有一些所需的导入缺失,所以这是你的factory.go
文件,包括适当的导入:
factory.go
package tailtracer
import (
"time"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/receiver"
)
const (
typeStr = "tailtracer"
defaultInterval = 1 * time.Minute
)
func createDefaultConfig() component.Config {
return &Config{
Interval: string(defaultInterval),
}
}
// NewFactory creates a factory for tailtracer receiver.
func NewFactory() receiver.Factory {
return nil
}
核对你的工作
- 导入
time
包,以支持defaultInterval
使用的time.Duration
类型。 - 导入
go.opentelemetry.io/collector/component
包,这里声明了component.Config
。 - 导入
go.opentelemetry.io/collector/receiver
包,这里声明了receiver.Factory
。 - 添加了一个名为
defaultInterval
的time.Duration
常量,表示接收器的Interval
设置的默认值。我们将设置默认值为1分钟,因此将其赋值为1 * time.Minute
。 - 添加了一个名为
createDefaultConfig
的函数,该函数负责返回一个tailtracer.Config
结构体的实例,作为接收器的默认配置。 - 对于
createDefaultConfig
函数返回的tailtracer.Config
的实例,初始化了tailtracer.Config.Interval
字段,并赋值为defaultInterval
常量。
使工厂能够描述接收器可以处理的信号
同一个接收器组件可以处理跟踪(traces)、指标(metrics)和日志(logs)。接收器的工厂负责描述这些能力。
鉴于本教程的主题是跟踪,我们只会启用tailtracer
接收器处理跟踪信号。receiver
包提供了以下函数和类型,来帮助工厂描述跟踪信号的处理能力:
func WithTraces(createTracesReceiver CreateTracesFunc, sl component.StabilityLevel) FactoryOption
receiver.WithTraces()
会实例化并返回一个receiver.FactoryOption
,它需要以下参数:
createTracesReceiver
:与receiver.CreateTracesFunc
类型匹配的函数引用。
receiver.CreateTracesFunc
是指向函数的指针,该函数负责实例化并返回一个receiver.Traces
实例,它需要以下参数:
context.Context
:Collector的context.Context
引用,以便你的跟踪接收器可以正确管理其执行环境。receiver.CreateSettings
:创建接收器的某些Collector设置的引用。component.Config
:由Collector传递给工厂的接收器配置设置的引用,以便它可以从Collector配置中正确读取其设置。consumer.Traces
:下一个consumer.Traces
的引用,在其中接收到的跟踪信号将进行处理。这可以是处理器(processor)或导出器(exporter)。
首先,添加引导代码以正确实现receiver.CreateTracesFunc
函数指针。请继续向你的factory.go
文件添加以下代码:
func createTracesReceiver(_ context.Context, params receiver.CreateSettings, baseCfg component.Config, consumer consumer.Traces) (receiver.Traces, error) {
return nil, nil
}
现在,你已经具备了成功使用receiver.NewFactory
函数实例化接收器工厂的所有必要组件。请继续更新你的factory.go
文件中的NewFactory()
函数,如下所示:
// NewFactory creates a factory for tailtracer receiver.
func NewFactory() receiver.Factory {
return receiver.NewFactory(
typeStr,
createDefaultConfig,
receiver.WithTraces(createTracesReceiver, component.StabilityLevelAlpha))
}
在进行这两个更改后,你会发现有一些所需的导入缺失,所以这是你的factory.go
文件,包括适当的导入:
factory.go
package tailtracer
import (
"context"
"time"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/receiver"
)
const (
typeStr = "tailtracer"
defaultInterval = 1 * time.Minute
)
func createDefaultConfig() component.Config {
return &Config{
Interval: string(defaultInterval),
}
}
func createTracesReceiver(_ context.Context, params receiver.CreateSettings, baseCfg component.Config, consumer consumer.Traces) (receiver.Traces, error) {
return nil, nil
}
// NewFactory creates a factory for tailtracer receiver.
func NewFactory() receiver.Factory {
return receiver.NewFactory(
typeStr,
createDefaultConfig,
receiver.WithTraces(createTracesReceiver, component.StabilityLevelAlpha))
}
核对你的工作
- 导入
context
包,以支持createTracesReceiver
函数中引用到的context.Context
类型。 - 导入
go.opentelemetry.io/collector/consumer
包,以支持createTracesReceiver
函数中引用到的consumer.Traces
类型。 - 更新了
NewFactory()
函数,使其返回使用所需参数生成的receiver.Factory
的调用结果。通过调用receiver.WithTraces(createTracesReceiver, component.StabilityLevelAlpha)
,生成的接收器工厂能够处理跟踪信号。
此时,你已经具备了tailtracer
工厂和配置代码,Collector将能够验证是否定义了config.yaml
中tailtracer
接收器的设置。你只需将其添加到Collector的初始化过程中。
将接收器工厂添加到Collector的初始化过程中
如前所述,所有Collector组件都由components.go
文件中的components()
函数实例化。
需要将tailtracer
接收器工厂实例添加到factories
映射中,以便Collector可以正确加载它作为其初始化过程的一部分。
在进行下面的更改后,这是具备支持的代码库构建的Collector运行otelcol-dev
命令的输出的开头:
$ ./otelcol-dev --config config.yaml
2023-09-28T08:56:53.027-0700 信息 service@v0.86.0/telemetry.go:84 启用自有遥测...
2023-09-28T08:56:53.027-0700 信息 service@v0.86.0/telemetry.go:201 提供Prometheus度量 {"地址": ":8888", "级别": "Basic"}
2023-09-28T08:56:53.027-0700 调试 exporter@v0.86.0/exporter.go:273 稳定组件。 {"类型": "exporter", "数据类型": "跟踪", "名称": "otlp/jaeger"}
2023-09-28T08:56:53.027-0700 信息 exporter@v0.86.0/exporter.go:275 开发中组件。将来可能会更改。 {"类型": "exporter", "数据类型": "跟踪", "名称": "debug"}
2023-09-28T08:56:53.027-0700 调试 receiver@v0.86.0/receiver.go:294 Alpha组件。将来可能会更改。 {"类型": "receiver", "名称": "tailtracer", "数据类型": "跟踪"}
2023-09-28T08:56:53.027-0700 调试 receiver@v0.86.0/receiver.go:294 稳定组件。 {"类型": "receiver", "名称": "otlp", "数据类型": "跟踪"}
2023-09-28T08:56:53.027-0700 信息 service@v0.86.0/service.go:138 开始运行otelcol-dev... {"版本": "1.0.0", "NumCPU": 10}
2023-09-28T08:56:53.027-0700 信息 extensions/extensions.go:31 启动扩展...
查找"log builder/receivers_builder.go:111"的日志行(它是所示片段中从底部开始的第4行),你可以看到Collector找到了tailtracer
接收器的设置,验证了这些设置(当前的设置都是正确的),但是由于它未在任何流水线中使用,因此被忽略。
让我们检查一下tailtracer
工厂是否正确验证了接收器的设置,number_of_traces
设置是可选的,因此如果你将其从config.yaml
中删除,然后再次运行该命令,你会得到相同的输出。
现在,让我们测试一个tailtracer
设置验证规则。从config.yaml
中删除number_of_traces
设置,运行Collector的输出如下所示:
$ ./otelcol-dev --config config.yaml
错误: 无效的配置:接收器“tailtracer”具有无效的配置:number_of_traces必须至少为1
2022/02/24 13:00:20 集合器服务器运行完成,出现错误:无效的配置:接收器“tailtracer”具有无效的配置:number_of_traces必须至少为1
tailtracer
接收器工厂和配置要求已经完成,Collector正在正确加载你的组件。你现在可以进入接收器的核心部分,即组件本身的实现。
实现追踪接收器组件
在上一节中,我提到接收器可以处理任何 OpenTelemetry 信号,收集器的 API 旨在帮助你完成这一任务。
当前,负责启用信号的所有接收器 API 都在 receiver/receiver.go 文件中声明,在 OTel Collector 的 GitHub 项目中可以找到,打开该文件并花一分钟浏览其中声明的所有接口。
请注意,receiver.Traces
(及其兄弟 receiver.Metrics
和 receiver.Logs
)此时并未描述任何特定的方法,除了它从 component.Component
“继承”的方法之外。
也许这感觉有点奇怪,但请记住,收集器的 API 是可扩展的,组件及其信号可能以不同的方式发展,因此这些接口的作用是支持这一点。
因此,要创建一个 receiver.Traces
,你只需要按照 component.Component
接口所描述的以下方法进行实现:
Start(ctx context.Context, host Host) error
Shutdown(ctx context.Context) error
这两个方法实际上是事件处理程序,用于收集器与其组件在生命周期中的通信。
Start()
表示收集器向组件发出启动处理的信号。作为该事件的一部分,收集器将传递以下信息:
context.Context
:大部分时间内,接收器将处理一个长时间运行的操作,因此建议忽略此上下文,实际上应该从context.Background()
创建一个新上下文。Host
:该主机用于在收集器启动和运行之后与接收器进行通信。
Shutdown()
表示收集器向组件发出服务正在关闭的信号,因此组件应该停止其处理并进行必要的清理工作:
context.Context
:作为关闭操作的一部分由收集器传递的上下文。
你将通过在项目的 tailtracer
文件夹中创建一个名为 trace-receiver.go
的新文件,并在其中添加名为 tailtracerReceiver
的类型声明来开始实现。
type tailtracerReceiver struct{
}
现在你已经有了 tailtracerReceiver
类型,可以实现 Start()
和 Shutdown()
方法,使接收器类型与 receiver.Traces
接口兼容。
下面是在 tailtracer/trace-receiver.go
文件中实现了方法的内容:
trace-receiver.go
package tailtracer
import (
"context"
"go.opentelemetry.io/collector/component"
)
type tailtracerReceiver struct {
}
func (tailtracerRcvr *tailtracerReceiver) Start(ctx context.Context, host component.Host) error {
return nil
}
func (tailtracerRcvr *tailtracerReceiver) Shutdown(ctx context.Context) error {
return nil
}
检查你的工作
- 导入了
context
包,其中声明了Context
类型和函数。 - 导入了
go.opentelemetry.io/collector/component
包,其中声明了Host
类型。 - 添加了一个对应于
receiver.Traces
接口的Start(ctx context.Context, host component.Host)
方法的引导实现。 - 添加了一个对应于
receiver.Traces
接口的Shutdown(ctx context.Context)
方法的引导实现。
在 Start()
方法中,传递了两个引用(context.Context
和 component.Host
),你的接收器可能需要保留这些引用,以便在处理其操作时使用。
context.Context
引用应该用于创建新的上下文,以支持接收器的处理操作。在这种情况下,你需要决定如何处理上下文取消,以便在组件的 Shutdown()
方法中正确地完成它。
component.Host
在接收器的整个生命周期中可能很有用,因此应将该引用保留在 tailtracerReceiver
类型中。
接下来,你需要更新 Start()
方法,以便接收器可以正确初始化其自身的处理上下文,并在 cancel
字段中保留取消函数。你还需要更新 Stop()
方法,以便通过调用 cancel
函数来完成上下文的清理。
下面是在进行了上述更改后,trace-receiver.go
文件的内容:
trace-receiver.go
package tailtracer
import (
"context"
"go.opentelemetry.io/collector/component"
)
type tailtracerReceiver struct {
host component.Host
cancel context.CancelFunc
}
func (tailtracerRcvr *tailtracerReceiver) Start(ctx context.Context, host component.Host) error {
tailtracerRcvr.host = host
ctx = context.Background()
ctx, tailtracerRcvr.cancel = context.WithCancel(ctx)
return nil
}
func (tailtracerRcvr *tailtracerReceiver) Shutdown(ctx context.Context) error {
tailtracerRcvr.cancel()
return nil
}
检查你的工作
- 通过为
Start()
方法添加对host
的初始化,使用了收集器传递的component.Host
引用。 - 通过使用基于
context.Background()
创建的新上下文的context.WithCancel()
函数调用,在cancel()
函数中添加取消基础上为tailtracerRcvr.cancel
执行的任务。
现在,tailtracerReceiver
类型已经完全准备好实例化,并保留了其工厂传递的所有有意义的信息。
打开 tailtracer/factory.go
文件,导航到 createTracesReceiver()
函数。请注意,仅在作为管道中的组件声明时,才会实例化接收器,并且工厂负责确保管道中的下一个消费者(处理器或导出器)有效,否则应生成错误。
收集器的 API 提供了一些标准错误类型,以帮助工厂处理管道配置。如果下一个消费者存在问题并且作为 nil
传递,则你的接收器工厂应该抛出 component.ErrNilNextConsumer
。
createTracesReceiver()
函数将需要一个守卫子句来进行验证。
你还需要变量来正确初始化 tailtracerReceiver
实例的 config
和 logger
字段。
下面是使用更新后的 createTracesReceiver()
函数后的 factory.go
文件的内容:
factory.go
package tailtracer
import (
"context"
"time"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/receiver"
)
const (
typeStr = "tailtracer"
defaultInterval = 1 * time.Minute
)
func createDefaultConfig() component.Config {
return &Config{
Interval: string(defaultInterval),
}
}
func createTracesReceiver(_ context.Context, params receiver.CreateSettings, baseCfg component.Config, consumer consumer.Traces) (receiver.Traces, error) {
if consumer == nil {
return nil, component.ErrNilNextConsumer
}
logger := params.Logger
tailtracerCfg := baseCfg.(*Config)
traceRcvr := &tailtracerReceiver{
logger: logger,
nextConsumer: consumer,
config: tailtracerCfg,
}
return traceRcvr, nil
}
// NewFactory creates a factory for tailtracer receiver.
func NewFactory() receiver.Factory {
return receiver.NewFactory(
typeStr,
createDefaultConfig,
receiver.WithTraces(createTracesReceiver, component.StabilityLevelAlpha))
}
检查你的工作
- 添加了一个守卫子句,验证消费者是否正确实例化,如果否,则返回
component.ErrNilNextConsumer
错误。 - 添加了名为
logger
的变量,并使用收集器的日志记录器(作为receiver.CreateSettings
引用中名为Logger
的字段)进行初始化。 - 添加了名为
tailtracerCfg
的变量,并通过将component.Config
引用转换为tailtracer
接收器Config
进行初始化。 - 添加了一个名为
traceRcvr
的变量,并使用存储在变量中的工厂信息初始化该变量。 - 更新了返回语句,现在包含
traceRcvr
实例。
工厂完全实现并且实例化了追踪接收器组件,你已经准备好测试该接收器作为管道的一部分了。继续并在 config.yaml
中的 traces
管道中添加 tailtracer
接收器,如下所示:
service:
pipelines:
traces:
receivers: [otlp, tailtracer]
processors: []
exporters: [otlp/jaeger, debug]
以下是在更新了 traces
管道后使用 otelcol-dev
命令运行你的 Collector 所显示的输出:
$ ./otelcol-dev --config config.yaml
2023-09-28T08:59:52.111-0700 info service@v0.86.0/telemetry.go:84 Setting up own telemetry...
2023-09-28T08:59:52.111-0700 info service@v0.86.0/telemetry.go:201 Serving Prometheus metrics {"address": ":8888", "level": "Basic"}
2023-09-28T08:59:52.111-0700 debug exporter@v0.86.0/exporter.go:273 Stable component. {"kind": "exporter", "data_type": "traces", "name": "otlp/jaeger"}
2023-09-28T08:59:52.112-0700 info exporter@v0.86.0/exporter.go:275 Development component. May change in the future. {"kind": "exporter", "data_type": "traces", "name": "debug"}
2023-09-28T08:59:52.112-0700 debug receiver@v0.86.0/receiver.go:294 Stable component. {"kind": "receiver", "name": "otlp", "data_type": "traces"}
2023-09-28T08:59:52.112-0700 debug receiver@v0.86.0/receiver.go:294 Alpha component. May change in the future. {"kind": "receiver", "name": "tailtracer", "data_type": "traces"}
2023-09-28T08:59:52.112-0700 info service@v0.86.0/service.go:138 Starting otelcol-dev... {"Version": "1.0.0", "NumCPU": 10}
2023-09-28T08:59:52.112-0700 info extensions/extensions.go:31 Starting extensions...
2023-09-28T08:59:52.113-0700 info otlpreceiver@v0.86.0/otlp.go:83 Starting GRPC server {"kind": "receiver", "name": "otlp", "data_type": "traces", "endpoint": "0.0.0.0:4317"}
2023-09-28T08:59:52.113-0700 info service@v0.86.0/service.go:161 Everything is ready. Begin running and processing data.
2023-09-28T09:00:52.113-0700 info tailtracer/receiver.go:33 I should start processing traces now! {"kind": "receiver", "name": "tailtracer", "data_type": "traces"}
查找 “builder/receivers_builder.go:68 Receiver is
starting… {“kind”: “receiver”, “name”: “tailtracer”}” 的日志行,你会发现收集器在 traces
管道中找到了 tailtracer
接收器的设置,正在实例化并启动它,因为收集器启动后的1分钟内,你可以看到我们在 Start()
方法中的 ticker
函数添加的 info 日志行。
现在,请在 Collector 的终端上按下 Ctrl + C,以便你可以观察到关闭过程。以下是输出的样子:
^C2023-09-28T09:01:18.784-0700 info otelcol@v0.86.0/collector.go:250 Received signal from OS {"signal": "interrupt"}
2023-09-28T09:01:18.784-0700 info service@v0.86.0/service.go:170 Starting shutdown...
2023-09-28T09:01:18.784-0700 info zapgrpc/zapgrpc.go:178 [core] [Server #3 ListenSocket #4] ListenSocket deleted {"grpc_log": true}
2023-09-28T09:01:18.784-0700 info zapgrpc/zapgrpc.go:178 [core] [Channel #1] Channel Connectivity change to SHUTDOWN {"grpc_log": true}
2023-09-28T09:01:18.784-0700 info zapgrpc/zapgrpc.go:178 [core] [Channel #1] ccBalancerWrapper: closing {"grpc_log": true}
2023-09-28T09:01:18.785-0700 info zapgrpc/zapgrpc.go:178 [core] [Channel #1] Closing the name resolver {"grpc_log": true}
2023-09-28T09:01:18.785-0700 info zapgrpc/zapgrpc.go:178 [core] [Channel #1 SubChannel #2] Subchannel Connectivity change to SHUTDOWN {"grpc_log": true}
2023-09-28T09:01:18.785-0700 info zapgrpc/zapgrpc.go:178 [core] [Channel #1 SubChannel #2] Subchannel deleted {"grpc_log": true}
2023-09-28T09:01:18.785-0700 info zapgrpc/zapgrpc.go:178 [transport] [client-transport 0x140002c8000] Closing: rpc error: code = Canceled desc = grpc: the client connection is closing {"grpc_log": true}
2023-09-28T09:01:18.785-0700 info zapgrpc/zapgrpc.go:178 [core] [Channel #1] Channel deleted {"grpc_log": true}
2023-09-28T09:01:18.785-0700 info extensions/extensions.go:45 Stopping extensions...
2023-09-28T09:01:18.785-0700 info service@v0.86.0/service.go:184 Shutdown complete.
如你所见,这有一个用于 tailtracer
接收器的 info 日志行,这意味着该组件正确响应了 Shutdown()
事件。在下一节中,你将了解有关 OpenTelemetry 跟踪数据模型的更多信息,以便 tailtracer
接收器最终可以生成跟踪!
收集器的跟踪数据模型
你可能通过使用SDK和应用程序中的仪表化,熟悉了OpenTelemetry的跟踪,以便在像Jaeger这样的分布式跟踪后端中查看和评估跟踪。
下面是Jaeger中跟踪的样例:
虽然这是Jaeger的跟踪,但它是由收集器内的跟踪管道生成的,因此你可以使用它来了解一些有关OTel跟踪数据模型的信息:
- 一个跟踪由一个或多个跨度组成,这些跨度按层次结构进行组织以表示依赖关系。
- 跨度可以表示服务内和/或服务间的操作。
在跟踪接收器中创建跟踪与使用SDK的方式略有不同,因此让我们开始回顾一下高级概念。
使用资源
在OTel世界中,所有遥测数据都由资源(Resource)
生成,以下是根据OTel规范的定义:
资源(Resource)
是以属性形式生成遥测数据的实体的不可变表示。例如,在Kubernetes容器中运行的生成遥测数据的进程包括Pod名称、所属命名空间以及可能的部署名称。这三个属性都可以包含在资源(Resource)
中。
跟踪最常用于表示服务请求(Jaeger模型中描述的服务实体),它们通常是作为计算单元中运行的进程实现的,但OTel的API方式通过属性来描述资源(Resource)
是足够灵活的,可以表示您可能需要的任何实体,比如ATM机、物联网传感器等。
因此,可以说要存在一个跟踪,就必须有一个资源(Resource)
来启动它。
在本教程中,我们将模拟一个系统,该系统有遥测数据,展示了位于2个不同州(例如伊利诺伊州和加利福尼亚州)的ATM机访问账户后端系统以执行余额、存款和提现操作,因此我们将需要实现代码来创建代表ATM机和后端系统的资源(Resource)
类型。
创建一个名为model.go
的文件,放在tailtracer
文件夹中
cd tailtracer
touch model.go
现在,在model.go
文件中,添加以下定义的Atm
和BackendSystem
类型:
model.go
package tailtracer
type Atm struct{
ID int64
Version string
Name string
StateID string
SerialNumber string
ISPNetwork string
}
type BackendSystem struct{
Version string
ProcessName string
OSType string
OSVersion string
CloudProvider string
CloudRegion string
ServiceName string
Endpoint string
}
这些类型用于代表系统中的实体,它们包含的信息将作为资源(Resource)
定义的一部分添加到跟踪中。你将添加一些辅助函数来生成这些类型的实例。
使用下面的辅助函数代码更新model.go
文件:
model.go
package tailtracer
import (
"math/rand"
"time"
)
type Atm struct{
ID int64
Version string
Name string
StateID string
SerialNumber string
ISPNetwork string
}
type BackendSystem struct{
Version string
ProcessName string
OSType string
OSVersion string
CloudProvider string
CloudRegion string
Endpoint string
}
func generateAtm() Atm{
i := getRandomNumber(1, 2)
var newAtm Atm
switch i {
case 1:
newAtm = Atm{
ID: 111,
Name: "ATM-111-IL",
SerialNumber: "atmxph-2022-111",
Version: "v1.0",
ISPNetwork: "comcast-chicago",
StateID: "IL",
}
case 2:
newAtm = Atm{
ID: 222,
Name: "ATM-222-CA",
SerialNumber: "atmxph-2022-222",
Version: "v1.0",
ISPNetwork: "comcast-sanfrancisco",
StateID: "CA",
}
}
return newAtm
}
func generateBackendSystem() BackendSystem{
i := getRandomNumber(1, 3)
newBackend := BackendSystem{
ProcessName: "accounts",
Version: "v2.5",
OSType: "lnx",
OSVersion: "4.16.10-300.fc28.x86_64",
CloudProvider: "amzn",
CloudRegion: "us-east-2",
}
switch i {
case 1:
newBackend.Endpoint = "api/v2.5/balance"
case 2:
newBackend.Endpoint = "api/v2.5/deposit"
case 3:
newBackend.Endpoint = "api/v2.5/withdrawn"
}
return newBackend
}
func getRandomNumber(min int, max int) int {
rand.Seed(time.Now().UnixNano())
i := (rand.Intn(max - min + 1) + min)
return i
}
检查你的工作
- 导入了
math/rand
和time
包以支持generateRandomNumber
函数的实现。 - 添加了
generateAtm
函数,它实例化一个Atm
类型,并随机为StateID
和相应的ISPNetwork
分配伊利诺伊州或加利福尼亚州作为值。 - 添加了
generateBackendSystem
函数,它实例化了一个BackendSystem
类型,并随机为Endpoint
字段分配服务终点值。 - 添加了
generateRandomNumber
函数,以帮助生成所需范围内的随机数。
现在,你已经有了用于生成生成遥测数据实体的函数,准备在OTel Collector中表示这些实体。
收集器的API提供了一个名为ptrace
(位于pdata
包下的嵌套包)的包,其中包含了处理收集器管道中的跟踪所需的所有类型、接口和辅助函数。
打开tailtracer/model.go
文件,并将go.opentelemetry.io/collector/pdata/ptrace
添加到import子句中,以便可以访问ptrace
包的功能。
在定义Resource
之前,你需要创建一个ptrace.Traces
,它将负责通过收集器的管道传播跟踪,并且你可以使用辅助函数ptrace.NewTraces()
来实例化它。你还需要创建Atm
和BackendSystem
类型的实例,以便有数据来表示在跟踪中所涉及的遥测数据源。
打开tailtracer/model.go
文件,并添加以下函数:
func generateTraces(numberOfTraces int) ptrace.Traces{
traces := ptrace.NewTraces()
for i := 0; i <= numberOfTraces; i++{
newAtm := generateAtm()
newBackendSystem := generateBackendSystem()
}
return traces
}
到目前为止,你已经听到和阅读了足够多关于跟踪由跨度组成的内容。你可能也编写了一些使用SDK的函数和类型的仪表化代码来创建这些跨度,但你可能不知道的是,在收集器的API中,创建跟踪涉及到其他类型的"跨度"。
你将从一个名为ptrace.ResourceSpans
的类型开始,它表示资源以及它在参与跟踪时产生或接收的所有操作。你可以在/pdata/internal/data/protogen/trace/v1/trace.pb.go中找到其定义。
ptrace.Traces
有一个名为ResourceSpans()
的方法,它返回一个名为ptrace.ResourceSpansSlice
的辅助类型的实例。这个ptrace.ResourceSpansSlice
类型有一些方法来帮助你处理由Resource
实体参与请求表示的跟踪的ptrace.ResourceSpans
数组。
ptrace.ResourceSpansSlice
有一个名为AppendEmpty()
的方法,用于向数组添加一个新的ptrace.ResourceSapn
并返回其引用。
一旦你有了ptrace.ResourceSpan
的实例,你将使用名为Resource()
的方法来返回与ResourceSpan
关联的pcommon.Resource
的实例。
用以下更改更新generateTrace()
函数:
- 添加一个名为
resourceSpan
的变量来表示ResourceSpan
。 - 添加一个名为
atmResource
的变量来表示与ResourceSpan
关联的pcommon.Resource
。 - 使用上面提到的方法分别初始化这两个变量。
在你实施这些更改后,函数应该如下所示:
func generateTraces(numberOfTraces int) ptrace.Traces{
traces := ptrace.NewTraces()
for i := 0; i <= numberOfTraces; i++{
newAtm := generateAtm()
newBackendSystem := generateBackendSystem()
resourceSpan := traces.ResourceSpans().AppendEmpty()
atmResource := resourceSpan.Resource()
}
return traces
}
检查你的工作
- 添加了
resourceSpan
变量,并使用traces.ResourceSpans().AppendEmpty()
调用返回的ResourceSpan
引用对其进行了初始化。 - 添加了
atmResource
变量,并使用resourceSpan.Resource()
调用返回的pcommon.Resource
引用对其进行了初始化。
通过属性描述资源
收集器的 API 提供了一个名为 pcommon
的包(在 pdata
包下嵌套),其中包含了描述 Resource
所需的所有类型和辅助函数。
在收集器的世界中,Resource
是以键/值对格式描述的,表示为 pcommon.Map
类型。
您可以在 Otel Collector 的 GitHub 项目的 /pdata/pcommon/common.go 文件中检查 pcommon.Map
类型的定义以及创建支持的格式的属性值的相关辅助函数。
键/值对提供了非常灵活的方式来帮助建模 Resource
数据,因此 OTel 规范针对此提供了一些指南,以帮助组织和减少可能需要表示的不同类型的遥测生成实体之间的冲突。
这些指南被称为资源语义约定,在 OTel 规范中有记录。
当创建自己的属性来表示自己的遥测生成实体时,应遵循规范提供的指南:
属性按描述它们的概念类型进行逻辑分组。同一组中的属性具有以点(.)结尾的公共前缀。例如,所有描述 Kubernetes 属性的属性都以
k8s.
开头。
让我们从打开 tailtracer/model.go
开始,并将 go.opentelemetry.io/collector/pdata/pcommon
添加到 import
语句中,以便您可以访问 pcommon
包的功能。
现在,请继续添加一个函数,从 Atm
实例中读取字段值并将其作为属性(使用前缀“atm.”进行分组)写入 pcommon.Resource
实例。以下是该函数的外观:
func fillResourceWithAtm(resource *pcommon.Resource, atm Atm){
atmAttrs := resource.Attributes()
atmAttrs.PutInt("atm.id", atm.ID)
atmAttrs.PutStr("atm.stateid", atm.StateID)
atmAttrs.PutStr("atm.ispnetwork", atm.ISPNetwork)
atmAttrs.PutStr("atm.serialnumber", atm.SerialNumber)
}
检查您的工作
- 声明了一个名为
atmAttrs
的变量,并将其初始化为resource.Attributes()
调用返回的pcommon.Map
引用。 - 使用
pcommon.Map
的PutInt()
和PutStr()
方法,根据对应的Atm
字段类型添加整型和字符串属性。请注意,因为这些属性非常具体,只表示Atm
实体,所以它们都在 “atm.” 前缀内组合。
资源语义约定还具有规定的属性名称和用于表示在不同领域通用且适用的遥测生成实体的众所周知的值,例如 compute unit 和 environment。
因此,当您查看 BackendSystem
实体时,它具有表示与 OS 相关的信息和与 Cloud 相关的信息的字段,并且我们将使用资源语义约定规定的属性名称和值来表示该信息在其 Resource
上。
所有资源语义约定属性名称和值都保存在收集器的 GitHub 项目的 /semconv/v1.9.0/generated_resource.go 文件中。
让我们创建一个函数,从 BackendSystem
实例中读取字段值,并将其作为属性写入 pcommon.Resource
实例。打开 tailtracer/model.go
文件并添加以下函数:
func fillResourceWithBackendSystem(resource *pcommon.Resource, backend BackendSystem){
backendAttrs := resource.Attributes()
var osType, cloudProvider string
switch {
case backend.CloudProvider == "amzn":
cloudProvider = conventions.AttributeCloudProviderAWS
case backend.OSType == "mcrsft":
cloudProvider = conventions.AttributeCloudProviderAzure
case backend.OSType == "gogl":
cloudProvider = conventions.AttributeCloudProviderGCP
}
backendAttrs.PutStr(conventions.AttributeCloudProvider, cloudProvider)
backendAttrs.PutStr(conventions.AttributeCloudRegion, backend.CloudRegion)
switch {
case backend.OSType == "lnx":
osType = conventions.AttributeOSTypeLinux
case backend.OSType == "wndws":
osType = conventions.AttributeOSTypeWindows
case backend.OSType == "slrs":
osType = conventions.AttributeOSTypeSolaris
}
backendAttrs.PutStr(conventions.AttributeOSType, osType)
backendAttrs.PutStr(conventions.AttributeOSVersion, backend.OSVersion)
}
请注意,我没有将属性名为 “atm.name” 或 “backendsystem.name” 的属性添加到代表 Atm
和 BackendSystem
实体的 pcommon.Resource
中。这是因为大多数(不是全部)与 OTel 追踪规范兼容的分布式跟踪后端系统将 pcommon.Resource
解释为 Service
,因此它们期望 pcommon.Resource
包含一个名为 service.name
的必需属性,正如资源语义约定规定的那样。
我们还将使用名为 service.version
的非必需属性来表示 Atm
和 BackendSystem
实体的版本信息。
添加了正确分配 “service.” 分组属性的代码后,tailtracer/model.go
文件如下所示:
model.go
package tailtracer
import (
"math/rand"
"time"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/ptrace"
conventions "go.opentelemetry.io/collector/semconv/v1.9.0"
)
type Atm struct {
ID int64
Version string
Name string
StateID string
SerialNumber string
ISPNetwork string
}
type BackendSystem struct {
Version string
ProcessName string
OSType string
OSVersion string
CloudProvider string
CloudRegion string
Endpoint string
}
func generateAtm() Atm {
i := getRandomNumber(1, 2)
var newAtm Atm
switch i {
case 1:
newAtm = Atm{
ID: 111,
Name: "ATM-111-IL",
SerialNumber: "atmxph-2022-111",
Version: "v1.0",
ISPNetwork: "comcast-chicago",
StateID: "IL",
}
case 2:
newAtm = Atm{
ID: 222,
Name: "ATM-222-CA",
SerialNumber: "atmxph-2022-222",
Version: "v1.0",
ISPNetwork: "comcast-sanfrancisco",
StateID: "CA",
}
}
return newAtm
}
func generateBackendSystem() BackendSystem {
i := getRandomNumber(1, 3)
newBackend := BackendSystem{
ProcessName: "accounts",
Version: "v2.5",
OSType: "lnx",
OSVersion: "4.16.10-300.fc28.x86_64",
CloudProvider: "amzn",
CloudRegion: "us-east-2",
}
switch i {
case 1:
newBackend.Endpoint = "api/v2.5/balance"
case 2:
newBackend.Endpoint = "api/v2.5/deposit"
case 3:
newBackend.Endpoint = "api/v2.5/withdrawn"
}
return newBackend
}
func getRandomNumber(min int, max int) int {
rand.Seed(time.Now().UnixNano())
i := (rand.Intn(max-min+1) + min)
return i
}
func generateTraces(numberOfTraces int) ptrace.Traces {
traces := ptrace.NewTraces()
for i := 0; i <= numberOfTraces; i++ {
newAtm := generateAtm()
newBackendSystem := generateBackendSystem()
resourceSpan := traces.ResourceSpans().AppendEmpty()
atmResource := resourceSpan.Resource()
fillResourceWithAtm(&atmResource, newAtm)
resourceSpan = traces.ResourceSpans().AppendEmpty()
backendResource := resourceSpan.Resource()
fillResourceWithBackendSystem(&backendResource, newBackendSystem)
}
return traces
}
func fillResourceWithAtm(resource *pcommon.Resource, atm Atm) {
atmAttrs := resource.Attributes()
atmAttrs.PutInt("atm.id", atm.ID)
atmAttrs.PutStr("atm.stateid", atm.StateID)
atmAttrs.PutStr("atm.ispnetwork", atm.ISPNetwork)
atmAttrs.PutStr("atm.serialnumber", atm.SerialNumber)
atmAttrs.PutStr(conventions.AttributeServiceName, atm.Name)
atmAttrs.PutStr(conventions.AttributeServiceVersion, atm.Version)
}
func fillResourceWithBackendSystem(resource *pcommon.Resource, backend BackendSystem) {
backendAttrs := resource.Attributes()
var osType, cloudProvider string
switch {
case backend.CloudProvider == "amzn":
cloudProvider = conventions.AttributeCloudProviderAWS
case backend.OSType == "mcrsft":
cloudProvider = conventions.AttributeCloudProviderAzure
case backend.OSType == "gogl":
cloudProvider = conventions.AttributeCloudProviderGCP
}
backendAttrs.PutStr(conventions.AttributeCloudProvider, cloudProvider)
backendAttrs.PutStr(conventions.AttributeCloudRegion, backend.CloudRegion)
switch {
case backend.OSType == "lnx":
osType = conventions.AttributeOSTypeLinux
case backend.OSType == "wndws":
osType = conventions.AttributeOSTypeWindows
case backend.OSType == "slrs":
osType = conventions.AttributeOSTypeSolaris
}
backendAttrs.PutStr(conventions.AttributeOSType, osType)
backendAttrs.PutStr(conventions.AttributeOSVersion, backend.OSVersion)
backendAttrs.PutStr(conventions.AttributeServiceName, backend.ProcessName)
backendAttrs.PutStr(conventions.AttributeServiceVersion, backend.Version)
}
检查您的工作
- 将
go.opentelemetry.io/collector/semconv/v1.9.0
包作为conventions
导入,以便访问所有资源语义约定的属性名称和值。 - 更新了
fillResourceWithAtm()
函数,通过添加行来正确分配 “service.name” 和 “service.version” 属性,用于表示Atm
实体的pcommon.Resource
。 - 更新了
fillResourceWithBackendSystem()
函数,通过添加行来正确分配 “service.name” 和 “service.version” 属性,用于表示BackendSystem
实体的pcommon.Resource
。 - 更新了
generateTraces
函数,通过添加行来正确实例化pcommon.Resource
,并使用fillResourceWithAtm()
和fillResourceWithBackendSystem()
函数填充Atm
和BackendSystem
实体的属性信息。
用跨度表示操作
现在,您已经有了一个ResourceSpan
实例,其中包含正确填充属性以表示Atm
和BackendSystem
实体的Resource
,您可以开始在ResourceSpan
中表示每个Resource
作为跟踪的一部分执行的操作。
在OTel世界中,为了生成遥测数据,系统需要手动或通过一个仪表库进行自动仪表化。
仪表化库负责在操作在跟踪中发生的范围(也称为仪表化范围)中设置范围,然后在跟踪的上下文中将这些操作描述为跨度。
pdata.ResourceSpans
有一个名为ScopeSpans()
的方法,它返回一个名为ptrace.ScopeSpansSlice
的辅助类型实例。ptrace.ScopeSpansSlice
类型有助于处理ptrace.ScopeSpans
数组,该数组包含与不同仪表化范围和跟踪上下文内生成的跨度的ptrace.ScopeSpan
数量一样多的项。
ptrace.ScopeSpansSlice
有一个名为AppendEmpty()
的方法,该方法向数组中添加一个新的ptrace.ScopeSpans
并返回其引用。
让我们创建一个函数来实例化一个ptrace.ScopeSpans
,表示ATM系统的仪表化范围和它的跨度。打开tailtracer/model.go
文件,并添加以下函数:
func appendAtmSystemInstrScopeSpans(resourceSpans *ptrace.ResourceSpans) (ptrace.ScopeSpans){
scopeSpans := resourceSpans.ScopeSpans().AppendEmpty()
return scopeSpans
}
ptrace.ScopeSpans
有一个名为Scope()
的方法,返回表示生成跨度的仪表化范围的pcommon.InstrumentationScope
实例的引用。
pcommon.InstrumentationScope
有以下方法来描述仪表化范围:
-
SetName(v string)
设置仪表化库的名称 -
SetVersion(v string)
设置仪表化库的版本 -
Name() string
返回与仪表化库关联的名称 -
Version() string
返回与仪表化库关联的版本
让我们更新appendAtmSystemInstrScopeSpans
函数,以便我们可以设置新的ptrace.ScopeSpans
的仪表化范围的名称和版本。下面是更新后的appendAtmSystemInstrScopeSpans
函数的样子:
func appendAtmSystemInstrScopeSpans(resourceSpans *ptrace.ResourceSpans) (ptrace.ScopeSpans){
scopeSpans := resourceSpans.ScopeSpans().AppendEmpty()
scopeSpans.Scope().SetName("atm-system")
scopeSpans.Scope().SetVersion("v1.0")
return scopeSpans
}
现在,您可以更新generateTraces
函数,并添加变量来表示Atm
和BackendSystem
实体使用的仪表化范围,通过使用appendAtmSystemInstrScopeSpans()
来初始化它们。下面是更新后的generateTraces()
函数的样子:
func generateTraces(numberOfTraces int) ptrace.Traces{
traces := ptrace.NewTraces()
for i := 0; i <= numberOfTraces; i++{
newAtm := generateAtm()
newBackendSystem := generateBackendSystem()
resourceSpan := traces.ResourceSpans().AppendEmpty()
atmResource := resourceSpan.Resource()
fillResourceWithAtm(&atmResource, newAtm)
atmInstScope := appendAtmSystemInstrScopeSpans(&resourceSpan)
resourceSpan = traces.ResourceSpans().AppendEmpty()
backendResource := resourceSpan.Resource()
fillResourceWithBackendSystem(&backendResource, newBackendSystem)
backendInstScope := appendAtmSystemInstrScopeSpans(&resourceSpan)
}
return traces
}
到此为止,您已经拥有了在您的系统中表示遥测生成实体所需的一切,以及负责标识操作并为系统生成跟踪的仪表化范围。下一步是最终创建表示给定仪表化范围生成的操作的跨度。
ptrace.ScopeSpans
有一个名为Spans()
的方法,它返回一个名为ptrace.SpanSlice
的辅助类型实例。ptrace.SpanSlice
类型有助于处理ptrace.Span
数组,该数组包含与仪表化范围能够识别并描述的操作数量一样多的项,作为跟踪的一部分。
ptrace.SpanSlice
有一个名为AppendEmpty()
的方法,该方法向数组中添加一个新的ptrace.Span
并返回其引用。
ptrace.Span
有以下方法来描述操作:
-
SetTraceID(v pcommon.TraceID)
设置与此跨度关联的跟踪的pcommon.TraceID
-
SetSpanID(v pcommon.SpanID)
设置唯一标识跨度在其所属的跟踪的上下文中的pcommon.SpanID
-
SetParentSpanID(v pcommon.SpanID)
设置父跨度/操作的pcommon.SpanID
,以表示此跨度表示的操作是否作为父跨度(嵌套)的一部分执行 -
SetName(v string)
设置跨度的操作名称 -
SetKind(v ptrace.SpanKind)
设置ptrace.SpanKind
,定义跨度表示的操作类型。 -
SetStartTimestamp(v pcommon.Timestamp)
设置表示跨度表示的操作开始时的pcommon.Timestamp
-
SetEndTimestamp(v pcommon.Timestamp)
设置表示跨度表示的操作结束时的pcommon.Timestamp
如上所述,ptrace.Span
通过2个必需的ID唯一标识,它们自己的唯一ID使用pcommon.SpanID
类型表示,以及它们所关联的跟踪的ID使用pcommon.TraceID
类型表示。
pcommon.TraceID
必须包含一个通过16个字节数组表示的全局唯一ID,并且应遵循W3C跟踪上下文规范的要求,而pcommon.SpanID
是在其所关联的跟踪的上下文中的唯一ID,它通过8个字节的数组表示。
pcommon
包提供了以下类型来生成跨度的ID:
-
type TraceID [16]byte
-
type SpanID [8]byte
在本教程中,您将使用github.com/google/uuid
包中的函数创建pcommon.TraceID
的ID,使用crypto/rand
包随机生成pcommon.SpanID
。打开tailtracer/model.go
文件,并将这两个包添加到import
语句中;之后,添加以下函数以帮助生成这两个ID:
import (
crand "crypto/rand"
"math/rand"
...
)
func NewTraceID() pcommon.TraceID {
return pcommon.TraceID(uuid.New())
}
func NewSpanID() pcommon.SpanID {
var rngSeed int64
_ = binary.Read(crand.Reader, binary.LittleEndian, &rngSeed)
randSource := rand.New(rand.NewSource(rngSeed))
var sid [8]byte
randSource.Read(sid[:])
spanID := pcommon.SpanID(sid)
return spanID
}
检查您的工作
- 使用
crand
作为crypto/rand
的导入名(以避免与math/rand
冲突)。
现在,您有了正确标识跨度的方法,可以开始创建它们来表示系统中实体的操作及其之间的关系。
作为generateBackendSystem()
函数的一部分,我们已经随机分配了BackEndSystem
实体作为系统服务提供的操作。此刻,我们将打开tailtracer/model.go
文件,并添加一个名为appendTraceSpans()
的函数,该函数将负责创建一个跟踪并附加表示BackendSystem
操作的跨度。下面是appendTraceSpans()
函数的初始实现:
func appendTraceSpans(backend *BackendSystem, backendScopeSpans *ptrace.ScopeSpans, atmScopeSpans *ptrace.ScopeSpans) {
traceId := NewTraceID()
backendSpanId := NewSpanID()
backendDuration, _ := time.ParseDuration("1s")
backendSpanStartTime := time.Now()
backendSpanFinishTime := backendSpanStartTime.Add(backendDuration)
backendSpan := backendScopeSpans.Spans().AppendEmpty()
backendSpan.SetTraceID(traceId)
backendSpan.SetSpanID(backendSpanId)
backendSpan.SetName(backend.Endpoint)
backendSpan.SetKind(ptrace.SpanKindServer)
backendSpan.SetStartTimestamp(pcommon.NewTimestampFromTime(backendSpanStartTime))
backendSpan.SetEndTimestamp(pcommon.NewTimestampFromTime(backendSpanFinishTime))
}
检查您的工作
- 在
appendTraceSpans()
函数中,添加了一个名为traceId
的变量和一个名为backendSpanId
的变量,分别表示跟踪和跨度ID,并使用之前创建的辅助函数初始化它们。 - 添加了
backendSpanStartTime
和backendSpanFinishTime
来表示操作的开始和结束时间。对于该教程中的任何BackendSystem
操作,都需要1秒钟。 - 添加了名为
backendSpan
的变量,它将保存表示此操作的ptrace.Span
的实例。 - 使用
BackendSystem
实例的Endpoint
字段值设置了跨度的Name
。 - 将跨度的
Kind
设置为ptrace.SpanKindServer
。请查阅跟踪规范中的SpanKind章节,了解如何正确定义SpanKind。 - 使用上述所有方法填充
ptrace.Span
,以正确表示BackendSystem
操作。
您可能已经注意到appendTraceSpans()
函数中的两个ptrace.ScopeSpans
参数的引用的问题,但现在不用担心,我们将稍后回到这里。
现在,您将更新generateTraces()
函数,使其能够通过调用appendTraceSpans()
函数实际生成跟踪。下面是更新后的generateTraces()
函数的样子:
func generateTraces(numberOfTraces int) ptrace.Traces {
traces := ptrace.NewTraces()
for i := 0; i <= numberOfTraces; i++ {
newAtm := generateAtm()
newBackendSystem := generateBackendSystem()
resourceSpan := traces.ResourceSpans().AppendEmpty()
atmResource := resourceSpan.Resource()
fillResourceWithAtm(&atmResource, newAtm)
atmInstScope := appendAtmSystemInstrScopeSpans(&resourceSpan)
resourceSpan = traces.ResourceSpans().AppendEmpty()
backendResource := resourceSpan.Resource()
fillResourceWithBackendSystem(&backendResource, newBackendSystem)
backendInstScope := appendAtmSystemInstrScopeSpans(&resourceSpan)
appendTraceSpans(&newBackendSystem, &backendInstScope, &atmInstScope)
}
return traces
}
现在,您已经表示了BackendSystem
实体及其操作的跨度,并妥善放置在了一个跟踪上下文中!您需要做的就是将生成的跟踪通过管道推送,以便下一个消费者(处理器或导出器)可以接收和处理它。
consumer.Traces
有一个名为ConsumeTraces()
的方法,该方法负责将生成的跟踪推送到管道中的下一个消费者。现在,您需要更新tailtracerReceiver
类型的Start()
方法并添加使用该方法的代码。
打开tailtracer/trace-receiver.go
文件,并将Start()
方法更新为以下内容:
func (tailtracerRcvr *tailtracerReceiver) Start(ctx context.Context, host component.Host) error {
tailtracerRcvr.host = host
ctx = context.Background()
ctx, tailtracerRcvr.cancel = context.WithCancel(ctx)
interval, _ := time.ParseDuration(tailtracerRcvr.config.Interval)
go func() {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
tailtracerRcvr.logger.Info("I should start processing traces now!")
tailtracerRcvr.nextConsumer.ConsumeTraces(ctx, generateTraces(tailtracerRcvr.config.NumberOfTraces))
case <-ctx.Done():
return
}
}
}()
return nil
}
检查您的工作
- 在
case <-ticker.C
条件下添加了一行代码,调用tailtracerRcvr.nextConsumer.ConsumeTraces()
方法,传入在Start()
方法中创建的新上下文(ctx
),以及调用generateTraces
函数将生成的跟踪推送给管道中的下一个消费者。
如果你运行了otelcol-dev
,那么在运行2分钟后,输出应该如下所示:
Starting: /Users/rquedas/go/bin/dlv dap --check-go-version=false --listen=127.0.0.1:54625 --log-dest=3 from /Users/rquedas/Documents/vscode-workspace/otel4devs/collector/receiver/trace-receiver/otelcol-dev
DAP server listening at: 127.0.0.1:54625
2023-09-28T08:59:52.111-0700 info service@v0.86.0/telemetry.go:84 设置自己的遥测...
2023-09-28T08:59:52.111-0700 info service@v0.86.0/telemetry.go:201 提供Prometheus指标 {"address": ":8888", "level": "Basic"}
2023-09-28T08:59:52.111-0700 debug exporter@v0.86.0/exporter.go:273 稳定的组件。 {"kind": "exporter", "data_type": "traces", "name": "otlp/jaeger"}
2023-09-28T08:59:52.112-0700 info exporter@v0.86.0/exporter.go:275 开发中的组件。可能会在未来更改。 {"kind": "exporter", "data_type": "traces", "name": "debug"}
2023-09-28T08:59:52.112-0700 debug receiver@v0.86.0/receiver.go:294 稳定的组件。 {"kind": "receiver", "name": "otlp", "data_type": "traces"}
2023-09-28T08:59:52.112-0700 debug receiver@v0.86.0/receiver.go:294 Alpha组件。未来可能更改。 {"kind": "receiver", "name": "tailtracer", "data_type": "traces"}
2023-09-28T08:59:52.112-0700 info service@v0.86.0/service.go:138 开始运行otelcol-dev... {"Version": "1.0.0", "NumCPU": 10}
2023-09-28T08:59:52.112-0700 info extensions/extensions.go:31 开始扩展程序...
2023-09-28T08:59:52.113-0700 info otlpreceiver@v0.86.0/otlp.go:83 开始GRPC服务器 {"kind": "receiver", "name": "otlp", "data_type": "traces", "endpoint": "0.0.0.0:4317"}
2023-09-28T08:59:52.113-0700 info service@v0.86.0/service.go:161 一切准备就绪。开始运行和处理数据。
2023-09-28T08:59:52.113-0700 info zapgrpc/zapgrpc.go:178 [core] [Server #3 ListenSocket #4] 创建ListenSocket {"grpc_log": true}
2023-09-28T08:59:52.124-0700 info zapgrpc/zapgrpc.go:178 [core] [Channel #1 SubChannel #2] Subchannel Connectivity change to READY {"grpc_log": true}
2023-09-28T08:59:52.124-0700 info zapgrpc/zapgrpc.go:178 [core] [pick-first-lb 0x1400054fd10] 收到SubConn状态更新: 0x1400054fec0, {ConnectivityState:READY ConnectionError:<nil>} {"grpc_log": true}
2023-09-28T08:59:52.124-0700 info zapgrpc/zapgrpc.go:178 [core] [Channel #1] Channel Connectivity change to READY {"grpc_log": true}
2023-09-28T09:00:52.113-0700 info tailtracer/receiver.go:33 我现在应该开始处理跟踪了! {"kind": "receiver", "name": "tailtracer", "data_type": "traces"}
2023-09-28T09:00:52.743-0700 INFO debugexporter/debug_exporter.go:40 TracesExporter {"#spans": 1}
2023-09-28T09:00:52.743-0700 DEBUG debugexporter/debug_exporter.go:49 ResourceSpans #0
Resource SchemaURL:
Resource labels:
-> atm.id: INT(222)
-> atm.stateid: STRING(CA)
-> atm.ispnetwork: STRING(comcast-sanfrancisco)
-> atm.serialnumber: STRING(atmxph-2022-222)
-> service.name: STRING(ATM-222-CA)
-> service.version: STRING(v1.0)
ScopeSpans #0
ScopeSpans SchemaURL:
InstrumentationScope atm-system v1.0
ResourceSpans #1
Resource SchemaURL:
Resource labels:
-> cloud.provider: STRING(aws)
-> cloud.region: STRING(us-east-2)
-> os.type: STRING(linux)
-> os.version: STRING(4.16.10-300.fc28.x86_64)
-> service.name: STRING(accounts)
-> service.version: STRING(v2.5)
ScopeSpans #0
ScopeSpans SchemaURL:
InstrumentationScope atm-system v1.0
Span #0
Trace ID : 5cce8a774d4546c2a5cbdeb607ec74c9
Parent ID :
ID : bb25c05c7fb13084
Name : api/v2.5/balance
Kind : SPAN_KIND_SERVER
Start time : 2023-09-28 09:00:52.743385 +0000 UTC
End time : 2023-09-28 09:00:53.743385 +0000 UTC
Status code : STATUS_CODE_OK
Status message :
2023-09-28T09:00:52.743-0500 info tailtracer/trace-receiver.go:33 我现在应该开始处理跟踪了! {"kind": "receiver", "name": "tailtracer"}
2023-09-28T09:00:52.744-0500 INFO debugexporter/debug_exporter.go:40 TracesExporter {"#spans": 1}
2023-09-28T09:00:52.744-0500 DEBUG debugexporter/debug_exporter.go:49 ResourceSpans #0
Resource SchemaURL:
Resource labels:
-> atm.id: INT(111)
-> atm.stateid: STRING(IL)
-> atm.ispnetwork: STRING(comcast-chicago)
-> atm.serialnumber: STRING(atmxph-2022-111)
-> service.name: STRING(ATM-111-IL)
-> service.version: STRING(v1.0)
ScopeSpans #0
ScopeSpans SchemaURL:
InstrumentationScope atm-system v1.0
ResourceSpans #1
Resource SchemaURL:
Resource labels:
-> cloud.provider: STRING(aws)
-> cloud.region: STRING(us-east-2)
-> os.type: STRING(linux)
-> os.version: STRING(4.16.10-300.fc28.x86_64)
-> service.name: STRING(accounts)
-> service.version: STRING(v2.5)
ScopeSpans #0
ScopeSpans SchemaURL:
InstrumentationScope atm-system v1.0
Span #0
Trace ID : 8a6ca822db0847f48facfebbb08bbb9e
Parent ID :
ID : 7cf668c1273ecee5
Name : api/v2.5/withdrawn
Kind : SPAN_KIND_SERVER
Start time : 2023-09-28 09:00:52.74404 +0000 UTC
End time : 2023-09-28 09:00:53.74404 +0000 UTC
Status code : STATUS_CODE_OK
Status message :
这是Jaeger中生成的跟踪的样本:
您当前在Jaeger中看到的是表示一个服务,该服务正在接收来自未经OTel SDK检测的外部实体的请求,因此无法将其识别为跟踪的起源/开始。为了使ptrace.Span
能够理解它表示作为另一个操作的结果而执行的操作,该操作可以是在相同跟踪上下文中的同一Resource
内发起的,也可以是外部(嵌套/子节点)发起的,您需要执行以下操作:
- 通过调用
SetTraceID()
方法并将父/调用者ptrace.Span
的pcommon.TraceID
作为参数传递,设置与调用者操作相同的跟踪上下文。 - 通过调用
SetParentId()
方法并将父/调用者ptrace.Span
的pcommon.SpanID
作为参数传递,定义调用者操作在跟踪上下文中的身份。
现在,您将创建一个表示Atm
实体操作的ptrace.Span
,并将其设置为BackendSystem
跨度的父跨度。打开tailtracer/model.go
文件,并将appendTraceSpans()
函数更新如下:
func appendTraceSpans(backend *BackendSystem, backendScopeSpans *ptrace.ScopeSpans, atmScopeSpans *ptrace.ScopeSpans) {
traceId := NewTraceID()
var atmOperationName string
switch {
case strings.Contains(backend.Endpoint, "balance"):
atmOperationName = "Check Balance"
case strings.Contains(backend.Endpoint, "deposit"):
atmOperationName = "Make Deposit"
case strings.Contains(backend.Endpoint, "withdraw"):
atmOperationName = "Fast Cash"
}
atmSpanId := NewSpanID()
atmSpanStartTime := time.Now()
atmDuration, _ := time.ParseDuration("4s")
atmSpanFinishTime := atmSpanStartTime.Add(atmDuration)
atmSpan := atmScopeSpans.Spans().AppendEmpty()
atmSpan.SetTraceID(traceId)
atmSpan.SetSpanID(atmSpanId)
atmSpan.SetName(atmOperationName)
atmSpan.SetKind(ptrace.SpanKindClient)
atmSpan.Status().SetCode(ptrace.StatusCodeOk)
atmSpan.SetStartTimestamp(pcommon.NewTimestampFromTime(atmSpanStartTime))
atmSpan.SetEndTimestamp(pcommon.NewTimestampFromTime(atmSpanFinishTime))
backendSpanId := NewSpanID()
backendDuration, _ := time.ParseDuration("2s")
backendSpanStartTime := atmSpanStartTime.Add(backendDuration)
backendSpan := backendScopeSpans.Spans().AppendEmpty()
backendSpan.SetTraceID(atmSpan.TraceID())
backendSpan.SetSpanID(backendSpanId)
backendSpan.SetParentSpanID(atmSpan.SpanID())
backendSpan.SetName(backend.Endpoint)
backendSpan.SetKind(ptrace.SpanKindServer)
backendSpan.Status().SetCode(ptrace.StatusCodeOk)
backendSpan.SetStartTimestamp(pcommon.NewTimestampFromTime(backendSpanStartTime))
backendSpan.SetEndTimestamp(atmSpan.EndTimestamp())
}
现在再次运行otelcol-dev
,并在运行2分钟后,您应该开始在Jaeger中看到类似以下的跟踪:
我们现在有了表示Atm
和BackendSystem
遥测生成实体的服务,并且可以充分理解这两个实体如何被使用并对用户执行的操作的性能做出贡献。
以下是Jaeger中其中一个跟踪的详细视图:
到此为止!您已经完成了本教程并成功实现了一个跟踪接收器,祝贺您!