构建连接器
OpenTelemetry 中的连接器
如果您已经有一个已经使用仪表的应用程序生成某种形式追踪遥测数据,并且已经对 OpenTelemetry Collector 有了一定的了解,那么这个页面的内容对您来说应该最有用。
什么是连接器?
连接器充当连接不同集线器管道并在其之间发送遥测数据的手段。连接器充当一个输出器到一个管道和一个接收器到另一个管道。OpenTelemetry Collector 中的每个管道都处理一种类型的遥测数据。可能需要将一种形式的遥测数据处理为另一种形式,但是需要将相应的数据路由到相应的集线器管道。
为什么使用连接器?
连接器有利于合并、路由和复制数据流。除了顺序管线化将管线连接到一起之外,连接器组件还能够进行条件数据流和生成数据流。条件数据流是指将数据发送到最高优先级的管道,并具有错误检测功能,以便在需要时路由到替代管道。生成的数据流意味着组件根据接收到的数据生成并发射自己的数据。本教程强调了连接器连接管道的能力。
在 OpenTelemetry 中有一些处理器将一种类型的遥测数据转换为另一种类型。一些示例是 spanmetrics 处理器和 servicegraph 处理器。spanmetrics 处理器从 span 数据中生成聚合请求、错误和持续时间度量。servicegraph 处理器分析跟踪数据并生成描述服务之间关系的度量。这些处理器都会输入跟踪数据并将其转换为度量数据。由于 OpenTelemetry Collector 中的管道只用于一种类型的数据,因此有必要从处理器中的跟踪管道转换跟踪数据并将其发送到度量管道。在历史上,一些处理器通过使用绕过方法来传输数据,该方法遵循不良实践,即处理器在处理后直接导出数据。连接器组件解决了对这种绕过方法的需求,并且使用该绕过方法的处理器已被弃用。
有关连接器完整功能的其他详细信息可以在以下链接中找到: OpenTelemetry 中的连接器是什么?, OpenTelemetry 连接器配置
旧架构:
使用连接器的新架构:
构建示例连接器
在本教程中,我们将编写一个示例连接器,它会将追踪数据转换为度量数据,作为连接器组件在 OpenTelemetry 中的功能的基本示例。基本连接器的功能是简单地计算包含特定属性名的追踪中的 span 数量。这些出现次数的计数存储在连接器中。
配置项
设置 Collector 配置:
在config.yaml
文件中设置您将在 OpenTelemetry Collector 中使用的配置。该文件定义了您的数据将如何路由、处理和导出。文件中定义的配置项详细说明了您希望数据管道如何运行。您可以定义组件以及数据如何在您定义的管道中从开头到结尾的移动。有关如何配置收集器的更多详细信息,请参阅Collector 配置。
使用以下代码构建示例连接器的示例。该代码是一个有效的 OpenTelemetry Collector 配置文件的示例。
receivers:
otlp:
protocols:
grpc:
endpoint: localhost:4317
http:
endpoint: localhost:4318
exporters:
# 注意:在 v0.86.0 之前使用 `logging` 而不是 `debug`。
debug:
connectors:
example:
service:
pipelines:
traces:
receivers: [otlp]
processors: [batch]
exporters: [example]
metrics:
receivers: [example]
exporters: [debug]
在上述代码的连接器部分,您需要声明可用于管道的连接器的名称。这里,example
是我们在本教程中将创建的连接器的名称。
实现
-
为您的示例连接器创建一个文件夹。在本教程中,我们将创建一个名为
exampleconnector
的文件夹。 -
进入文件夹并运行
go mod init github.com/gord02/exampleconnector
-
运行
go mod tidy
这会创建文件
go.mod
和go.sum
。 -
在文件夹中创建以下文件
config.go
- 用于定义连接器的设置的文件factory.go
- 用于创建连接器实例的文件
在 config.go 文件中创建连接器设置
为了能够实例化连接器并参与管道,集线器需要识别您的连接器并从其配置文件中正确加载其设置。
为了能够让连接器访问其设置,创建一个 Config
结构。该结构必须为连接器的每个设置项添加一个导出字段。通过结构标记设置配置文件中的字段名称。创建结构并添加参数。可以选择添加验证函数来检查给定的默认值是否对连接器的实例有效。
package exampleconnector
import "fmt"
type Config struct {
AttributeName string `mapstructure:"attribute_name"`
}
func (c *Config) Validate() error {
if c.AttributeName == "" {
return fmt.Errorf("attribute_name 不能为空")
}
return nil
}
可以在Go mapstructure中找到有关 mapstructure 的更多详细信息。
实现 Factory
要实例化对象,您将需要使用与每个组件关联的 NewFactory
函数。我们将使用 connector.NewFactory
函数。connector.NewFactory
函数会实例化并返回一个 connector.Factory
,它需要以下参数:
component.Type
:在所有集线器组件中唯一的字符串标识符,用于标识您的连接器。这个字符串也是引用连接器的名称。component.CreateDefaultConfigFunc
:对于您的连接器,返回默认的component.Config
实例的函数的引用。...FactoryOption
:connector.FactoryOptions
切片将确定您的连接器能够处理的信号类型。
-
创建 factory.go 文件并在其中定义唯一字符串以标识您的连接器为全局常量。
const ( defaultVal = "request.n" // 这是在配置文件中引用连接器的名称 typeStr = "example" )
-
创建默认配置函数。这是您选择使用默认值初始化连接器对象的方式。
func createDefaultConfig() component.Config { return &Config{ AttributeName: defaultVal, } }
-
定义您要使用的连接器类型。这将作为一个工厂选项传递。连接器可以连接不同或相似类型的管道。我们定义连接器的导出端和接收端的类型。一个导出跟踪并接收度量单位的连接器只是连接器组件的不同配置,并且其定义的顺序很重要。导出跟踪并接收度量单位的连接器与导出度量单位并接收跟踪的连接器不同。
// createTracesToMetricsConnector 定义了连接器的消费者类型 // 我们希望消费跟踪并导出度量单位,因此把 nextConsumer 定义为度量单位,因为消费者是管道中下一个组件 func createTracesToMetricsConnector(ctx context.Context, params connector.CreateSettings, cfg component.Config, nextConsumer consumer.Metrics) (connector.Traces, error) { c, err := newConnector(params.Logger, cfg) if err != nil { return nil, err } c.metricsConsumer = nextConsumer return c, nil }
createTracesToMetricsConnector
是一个进一步初始化连接器组件的函数,它通过定义其消费者组件,或者在连接器传输数据后将接收数据的下一个组件来进行初始化。值得注意的是,连接器不限于一个有序类型的组合,就像我们这里的示例一样。例如,count 连接器为跟踪到度量单位,日志到度量单位和度量单位到度量单位定义了几个这样的函数。createTracesToMetricsConnector
的参数:context.Context
:对收集器的context.Context
的引用,以便您的跟踪接收器能够正确管理其执行上下文。connector.CreateSettings
:对在创建接收器时由收集器传递的一些设置的引用。component.Config
:接收器配置设置的引用,由收集器传递给工厂以便它可以从收集器配置中正确地读取其设置。consumer.Metrics
:管道中的下一个消费者类型的引用,这是接收到的跟踪数据所需去的地方。这可以是处理器、输出器或其他连接器。
-
编写一个
NewFactory
函数,用于创建您的连接器的自定义工厂。// NewFactory 创建 example 连接器的工厂。 func NewFactory() connector.Factory { // OpenTelemetry 连接器工厂用于创建连接器的工厂 return connector.NewFactory( typeStr, createDefaultConfig, connector.WithTracesToMetrics(createTracesToMetricsConnector, component.StabilityLevelAlpha), ) }
值得注意的是,连接器可以支持多个有序的数据类型组合。
完成后的 factory.go 文件如下所示:
package exampleconnector
import (
"context"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/connector"
"go.opentelemetry.io/collector/consumer"
)
const (
defaultVal = "request.n"
// 这是在配置文件中引用连接器的名称
typeStr = "example"
)
// NewFactory 创建 example 连接器的工厂。
func NewFactory() connector.Factory {
// OpenTelemetry 连接器工厂用于创建连接器的工厂
return connector.NewFactory(
typeStr,
createDefaultConfig,
connector.WithTracesToMetrics(createTracesToMetricsConnector, component.StabilityLevelAlpha),
)
}
func createDefaultConfig() component.Config {
return &Config{
AttributeName: defaultVal,
}
}
// createTracesToMetricsConnector 定义了连接器的消费者类型
// 我们希望消费跟踪并导出度量单位,因此把 nextConsumer 定义为度量单位,因为消费者是管道中下一个组件
func createTracesToMetricsConnector(ctx context.Context, params connector.CreateSettings, cfg component.Config, nextConsumer consumer.Metrics) (connector.Traces, error) {
c, err := newConnector(params.Logger, cfg)
if err != nil {
return nil, err
}
c.metricsConsumer = nextConsumer
return c, nil
}
实现 Trace 连接器
在 connector.go
文件中实现与组件类型特定接口相关的方法。在本教程中,我们将实现 Traces 连接器,因此必须实现接口:baseConsumer
、Traces
和 component.Component
。
-
使用所需的参数为连接器定义连接器结构
// 连接器的 schema type connectorImp struct { config Config metricsConsumer consumer.Metrics logger *zap.Logger }
-
定义创建连接器的
newConnector
函数// newConnector 是一个创建新连接器的函数 func newConnector(logger *zap.Logger, config component.Config) (*connectorImp, error) { logger.Info("构建 exampleconnector 连接器") cfg := config.(*Config) return &connectorImp{ config: *cfg, logger: logger, }, nil }
newConnector
函数是一个工厂函数,用于创建连接器的实例。 -
实现
Capabilities
方法以正确实现接口// Capabilities 实现了消费者接口。 func (c *connectorImp) Capabilities() consumer.Capabilities { return consumer.Capabilities{MutatesData: false} }
实现
Capabilities
方法以确保您的连接器是消费者类型。该方法定义了组件的能力,指示连接器是否可以改变数据。如果MutatesData
设置为 true,则表示连接器改变了其所处理的数据结构。 -
实现
Consumer
方法以消费遥测数据// ConsumeTraces 方法对连接器中的每个跟踪实例进行调用 func (c *connectorImp) ConsumeTraces(ctx context.Context, td ptrace.Traces) error { // 遍历消费的一个跟踪的 span 的层级 for i := 0; i < td.ResourceSpans().Len(); i++ { resourceSpan := td.ResourceSpans().At(i) for j := 0; j < resourceSpan.ScopeSpans().Len(); j++ { scopeSpan := resourceSpan.ScopeSpans().At(j) for k := 0; k < scopeSpan.Spans().Len(); k++ { span := scopeSpan.Spans().At(k) attrs := span.Attributes() mapping := attrs.AsRaw() for key := range mapping { if key == c.config.AttributeName { // 仅当跟踪的 span 具有特定属性时创建度量单位 metrics := pmetric.NewMetrics() return c.metricsConsumer.ConsumeMetrics(ctx, metrics) } } } } } return nil }
-
可选:只有在需要特定实现时才实现
Start
和Shutdown
方法以正确实现接口。否则,将component.StartFunc
和component.ShutdownFunc
包含在定义的连接器结构中即可。
完整的连接器文件如下:
package exampleconnector
import (
"context"
"fmt"
"go.uber.org/zap"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
)
// 连接器的 schema
type connectorImp struct {
config Config
metricsConsumer consumer.Metrics
logger *zap.Logger
// 只有在需要特定实现时才包含这些参数。否则,将 `component.StartFunc` 和 `component.ShutdownFunc` 包含在定义的连接器结构中即可。
component.StartFunc
component.ShutdownFunc
}
// newConnector 是一个创建新连接器的函数
func newConnector(logger *zap.Logger, config component.Config) (*connectorImp, error) {
logger.Info("构建 exampleconnector 连接器")
cfg := config.(*Config)
return &connectorImp{
config: *cfg,
logger: logger,
}, nil
}
// Capabilities 实现了消费者接口。
func (c *connectorImp) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: false}
}
// ConsumeTraces 方法对连接器中的每个跟踪实例进行调用
func (c *connectorImp) ConsumeTraces(ctx context.Context, td ptrace.Traces) error {
// 遍历消费的一个跟踪的 span 的层级
for i := 0; i < td.ResourceSpans().Len(); i++ {
resourceSpan := td.ResourceSpans().At(i)
for j := 0; j < resourceSpan.ScopeSpans().Len(); j++ {
scopeSpan := resourceSpan.ScopeSpans().At(j)
for k := 0; k < scopeSpan.Spans().Len(); k++ {
span := scopeSpan.Spans().At(k)
attrs := span.Attributes()
mapping := attrs.AsRaw()
for key := range mapping {
if key == c.config.AttributeName {
// 仅当跟踪的 span 具有特定属性时创建度量单位
metrics := pmetric.NewMetrics()
return c.metricsConsumer.ConsumeMetrics(ctx, metrics)
}
}
}
}
}
return nil
}
使用组件
OpenTelemetry Collector Builder 使用摘要:
您可以使用OpenTelemetry Collector Builder来构建并运行您的代码。构建器是一个工具,可以让您构建自己的 OpenTelemetry Collector 二进制文件。您可以添加或删除组件(接收器、处理器、连接器和输出器)以满足您的需求。
-
按照 OpenTelemetry Collector Builder 的安装说明进行安装。
-
写一个配置文件:
安装完毕后,下一步是创建一个名为
builder-config.yaml
的配置文件。该文件定义了您想要包括在自定义二进制文件中的收集器组件。这里是一个示例配置文件的示例,其中包含了您的新连接器组件:
dist: name: otelcol-dev-bin description: 开发人员基本的 OpenTelemetry collector 发行版 output_path: ./otelcol-dev otelcol_version: 0.86.0 exporters: - gomod: # 注意:在 v0.86.0 之前使用 `loggingexporter` 而不是 `debugexporter`。 go.opentelemetry.io/collector/exporter/debugexporter v0.86.0 processors: - gomod: go.opentelemetry.io/collector/processor/batchprocessor v0.86.0 receivers: - gomod: go.opentelemetry.io/collector/receiver/otlpreceiver v0.86.0 connectors: - gomod: github.com/gord02/exampleconnector v0.86.0 replaces: # the resulting go.mod will include a list of "replaces" directives # This replace statement is necessary since the newly added component is not found/published to GitHub yet. Replace references to GitHub path with the local path - github.com/gord02/exampleconnector => [PATH-TO-COMPONENT-CODE]/exampleconnector
需要包含一个替换语句。替换语句,因为您新创建的组件尚未发布到 GitHub 上。您的组件的 GitHub 路径引用需要替换为您代码的本地路径。
有关在 go 中替换的更多详细信息,请参阅Go mod 文件替换。
-
构建您的收集器二进制文件:
运行构建器时,同时传递记录了包含连接器组件的构建配置文件,它将构建自定义收集器二进制文件:
builder --config [PATH-TO-CONFIG]/builder-config.yaml
这将在配置文件中指定的输出路径目录中生成收集器二进制文件。
-
运行您的收集器二进制文件:
现在您可以运行您的自定义收集器二进制文件:
./[OUTPUT_PATH]/[NAME-OF-DIST] --config [PATH-TO-CONFIG]/config.yaml
输出路径名称和发行版名称在
build-config.yaml
中详细说明。
有关 OpenTelemetry Collector Builder 的更多资源:
- 构建自定义 collector
- OpenTelemetry Collector Builder README
- Dan Jaglowski 在 OpenTelemetry Collector 中的连接观测管道(Connected Observability Pipelines)
- 连接器 README