使用OpenTelemetry对Apache Kafka客户端进行仪表化
在现今的分布式环境中,Apache Kafka 被选择作为神经系统。 不同的服务通过使用 Apache Kafka 作为消息系统,更多地作为事件或数据流平台与彼此通信。
考虑到基于云原生的微服务开发方法,通常还会使用 Kubernetes 来运行工作负载。 在这种情况下,您可以通过使用像 Strimzi 这样的项目,在其上轻松部署和管理 Apache Kafka 集群。 Strimzi 会负责整个 Kafka 基础设施,而您可以专注于开发使用它的应用程序。
在整体架构中,由于其分布式的特性,跟踪消息如何传递非常困难。这是 OpenTelemetry 起作用的地方。 它提供了多个仪表化库,用于为基于消息的应用程序添加跟踪。当然,也有适用于 Apache Kafka 客户端的库。 它还定义了用于 消息系统 的语义约定的规范。
但通常,架构可能更加复杂:应用程序无法直接连接到 Apache Kafka 集群,并使用其自己的自定义协议进行通信, 而是使用其他协议,例如 HTTP。在这种情况下,跟踪通过 Apache Kafka 的 HTTP 发送和接收的消息非常复杂。 Strimzi 项目提供了一个桥接工具,具备 OpenTelemetry 支持,通过使用相应的仪表库来添加跟踪数据。
在本文中,您将了解如何使用不同的方式在基于 Apache Kafka 的客户端应用程序上启用跟踪。我们将以 Java 为例进行说明。 您还可以在此存储库中找到所有示例。
在 Kafka 客户端上启用跟踪
假设您有一个使用 Kafka 客户端 API 来发送和接收消息的应用程序。 为了简化场景,假设您不想在业务逻辑中添加任何额外的跟踪信息。 您只想为与 Kafka 相关的部分添加跟踪,即追踪消息是如何通过 Kafka 客户端进行发送和接收的。
为了做到这一点,有两种不同的方式:
- 使用运行在应用程序旁边的外部代理来添加跟踪。
- 直接在应用程序中启用 Kafka 客户端上的跟踪。
前者实际上是一种 “自动” 的方法,不需要对您的应用程序进行任何更改。 运行在应用程序旁边的代理能够拦截进出的消息,并向其添加跟踪信息。
后者主要是一种 “手动” 的方法,需要直接对应用程序进行仪表化。 这意味着需要向您的项目添加一些特定的依赖项,并进行代码更改。
使用代理进行仪表化
最简单和自动的方法是在不对应用程序进行任何更改或添加的情况下, 通过使用可以从这里下载的 OpenTelemetry 代理来为应用程序添加跟踪。 需要在应用程序旁边运行该代理,以便注入发送到/接收自 Kafka 集群的消息的跟踪逻辑。
通过以下方式运行生产者应用程序。
java -javaagent:path/to/opentelemetry-javaagent.jar \
-Dotel.service.name=my-kafka-service \
-Dotel.traces.exporter=jaeger \
-Dotel.metrics.exporter=none \
-jar kafka-producer-agent/target/kafka-producer-agent-1.0-SNAPSHOT-jar-with-dependencies.jar
以类似的方式运行消费者应用程序。
java -javaagent:path/to/opentelemetry-javaagent.jar \
-Dotel.service.name=my-kafka-service \
-Dotel.traces.exporter=jaeger \
-Dotel.metrics.exporter=none \
-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=true \
-jar kafka-consumer-agent/target/kafka-consumer-agent-1.0-SNAPSHOT-jar-with-dependencies.jar
代理利用自动配置的 SDK 扩展,稍后我们将详细介绍,通过系统属性设置主要参数。
仪表化 Apache Kafka 客户端
OpenTelemetry 项目提供了 opentelemetry-kafka-clients-2.6
模块,为 Kafka 客户端提供了跟踪仪表化。
首先,您需要将相应的依赖项添加到应用程序中。
<dependency>
<groupId>io.opentelemetry.instrumentation</groupId>
<artifactId>opentelemetry-kafka-clients-2.6</artifactId>
</dependency>
根据要使用的导出器,您还需要添加相应的依赖项来导出跟踪信息。 例如,要使用 Jaeger 导出器,依赖项如下。
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-exporter-jaeger</artifactId>
</dependency>
这样,您就可以最小化设置以启用 Kafka 的跟踪。
配置 OpenTelemetry 实例
在代码中,整个跟踪仪表化由一个 OpenTelemetry
实例处理。
需要创建它并在全局范围内注册,以便 Kafka 客户端仪表库可以使用它。
可以使用两种不同的方式来完成这个过程:
- 使用基于环境的自动配置的 SDK 扩展。
- 使用基于程序配置的 SDK 构建器类。
使用 SDK 自动配置
通过使用环境变量配置全局 OpenTelemetry
实例,可以使用基于环境变量的自动配置的 SDK 扩展来配置。
可以通过将以下依赖项添加到应用程序中启用此功能。
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk-extension-autoconfigure</artifactId>
</dependency>
当使用 Kafka 客户端仪表库时,它会检查是否已创建并注册了 OpenTelemetry
实例。
如果没有,库代码会检查类路径中是否存在 SDK 自动配置模块,并在这种情况下通过此模块自动初始化 OpenTelemetry
实例。
相应的配置通过环境变量(或相应的系统属性)进行。
这实际上是简化跟踪的初始化的一种方式。
主要需要设置的环境变量如下:
OTEL_SERVICE_NAME
:指定逻辑服务名称。这对于使用跟踪 UI(如 Jaeger UI)显示数据非常有用,建议设置它。OTEL_TRACES_EXPORTER
:要用于跟踪的导出器列表。例如,通过使用jaeger
,还需要在应用程序中增加相应的依赖项。
除了使用上述环境变量外,还可以使用相应的系统属性,在代码中或在命令行上启动应用程序时进行程序化设置。
它们分别是 otel.service.name
和 otel.traces.exporter
。
使用 SDK 构建器
如果希望构建自己的 OpenTelemetry
实例,而不依赖自动配置,可以通过使用程序化的 SDK 构建器类来实现。
为此,需要添加 OpenTelemetry SDK 依赖项以在代码中提供 SDK 构建器。
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk</artifactId>
</dependency>
以下代码片段设置了主要属性,比如服务名称,然后配置了 Jaeger 导出器。
最后,创建 OpenTelemetry
实例并全局注册,以便 Kafka 客户端仪表库可以使用它。
Resource resource = Resource.getDefault()
.merge(Resource.create(Attributes.of(ResourceAttributes.SERVICE_NAME, "my-kafka-service")));
SdkTracerProvider sdkTracerProvider = SdkTracerProvider.builder()
.addSpanProcessor(BatchSpanProcessor.builder(JaegerGrpcSpanExporter.builder().build()).build())
.setSampler(Sampler.alwaysOn())
.setResource(resource)
.build();
OpenTelemetry openTelemetry = OpenTelemetrySdk.builder()
.setTracerProvider(sdkTracerProvider)
.setPropagators(ContextPropagators.create(W3CTraceContextPropagator.getInstance()))
.buildAndRegisterGlobal();
使用拦截器
Kafka 客户端 API 提供了一种在消息发送到代理之前以及从代理接收之后"拦截"消息的方法。 当您需要在发送消息之前添加一些逻辑或内容,同时还希望在将消息传递给应用程序的上层之前处理消费消息时,这种方法非常有用。 在跟踪时,您需要在发送和接收消息时创建或关闭 span,它与拦截器非常契合。
Kafka 客户端仪表库提供了两个拦截器,可配置以自动添加跟踪信息。 必须在用于在应用程序中创建 Kafka 客户端的属性包中设置拦截器类。
对于生产者,使用 TracingProducerInterceptor
拦截器以自动创建"发送" span,每次发送消息时自动创建一个 span。
props.setProperty(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingProducerInterceptor.class.getName());
对于消费者,使用 TracingConsumerInterceptor
拦截器以自动创建"接收" span,每次接收消息时自动创建一个 span。
props.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingConsumerInterceptor.class.getName());
封装客户端
另一种方式是通过使用启用跟踪的封装过的 Kafka 客户端来实现。
对于生产者方面,假设您有一个 Producer<K, V>
实例,可以按以下方式封装它。
KafkaTelemetry telemetry = KafkaTelemetry.create(GlobalOpenTelemetry.get());
Producer<String, String> tracingProducer = telemetry.wrap(producer);
然后,像往常一样使用 tracingProducer
发送消息到 Kafka 集群。
对于消费者方面,假设您有一个 Consumer<K, V>
实例,可以按以下方式封装它。
KafkaTelemetry telemetry = KafkaTelemetry.create(GlobalOpenTelemetry.get());
Consumer<String, String> tracingConsumer = telemetry.wrap(this.consumer);
然后,像往常一样使用 tracingConsumer
从 Kafka 集群接收消息。
仪表化实例
为了练习使用提供的示例对 Kafka 客户端进行仪表化,首先需要一个 Apache Kafka 集群。 最简单的方法是从官方网站下载它,并只运行一个 ZooKeeper 节点和一个 Kafka 代理。 您可以按照快速入门教程在几分钟内设置和运行此类集群。 如果使用 Web UI 进行跟踪信息分析,例如 Jaeger 提供的 Web UI,操作也更简单。在这种情况下, 从官方网站下载并在本地运行它非常简单。
当环境准备好之后,首先尝试的是使用拦截器或封装器来运行仪表化的生产者和消费者应用程序。 只发送一条消息并进行消费将提供以下跟踪信息。
如您所见,“发送” 和 “接收” 的 span 都在同一个跟踪中,“接收” span 以 CHILD_OF
方式关联到 “发送” span。
您还可以看到语义定义了一些特定的消息相关标签,使用了 messaging.
前缀。
该语义不实际正确,因为发送操作不依赖于接收操作(这是 CHILD_OF
的含义)。
不过,这将发生改变,在此 GitHub 讨论中,
并且通过
OTEP(OpenTelemetry Enhancement Proposal) 计划将新的消息语义规范稳定下来。
目标是将 “发送” 和 “接收” 的 span 放在两个不同的跟踪中,并使用 FOLLOW_FROM
关系将它们链接在一起。
当使用代理时,该方法更能体现出来,“发送” 的 span 在其自己的跟踪中,如下所示。
在接收端,还有一个 “接收” 和 “处理” 的 span,与生产者的 span 相关。
结论
Apache Kafka 只是用于在分布式系统中进行微服务通信的众多消息平台之一。 监视消息交换过程和解决问题非常复杂。这就是 OpenTelemetry 项目发挥作用的地方,使您可以轻松进行跟踪。 在本文中,我们介绍了 Kafka 客户端仪表化库,它非常简单地为基于 Kafka 的应用程序添加跟踪信息。 您可以获取关于生产者和消费者行为的更多信息,并跟踪每个消息从一个端到另一个端。那么… 还有什么? 试试吧!
参考资料
- Apache Kafka
- Strimzi
- Kubernetes