您好,登錄后才能下訂單哦!
本篇文章給大家分享的是有關如何使用OpenTracing和Jaeger 追蹤 Pulsar消息,小編覺得挺實用的,因此分享給大家學習,希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著小編一起來看看吧。
OpenTracing(https://opentracing.io/) 是針對應用程序和 OSS(Open-Source Software)軟件包的開放分布式追蹤標準。許多追蹤后端服務都支持 OpenTracing API,例如 Jaeger、Zipkin 和 SkyWalking。
準備工作
在開始前,需要安裝好 JDK 8、Maven 3 和 Pulsar(集群模式或單機模式)。如果還沒有安裝 Pulsar,可以查看下方鏈接,按照提示進行安裝。
http://pulsar.apache.org/docs/en/standalone/
第 1 步:啟動 Jaeger 后端
1. 在 Docker 中啟動 Jaeger 后端。
docker run -d -p 6831:6831/udp -p 16686:16686 jaegertracing/all-in-one:latest
成功啟動 Jaeger 后,就可以打開 Jaeger UI 網站。
???? 如何你沒有 Jaeger Docker 環境,可以:
下載二進制文件
https://www.jaegertracing.io/download/
通過源代碼構建
https://www.jaegertracing.io/docs/1.17/getting-started/#from-source
2. 訪問 `http://localhost:16686`,無需填寫用戶名或密碼就可以打開 Jeager UI 網站。
第 2 步:添加 maven dependencies
本示例使用 Open Tracing Pulsar Client。
https://hub.streamnative.io/monitoring/opentracing-pulsar-client/0.1.0
它是 Pulsar Client 與 OpenTracing API(基于 Pulsar Client Interceptors)的集成,用于追蹤 Pulsar 消息。OpenTracing Pulsar Client 由 StreamNative 研發,是 StreamNatvie Hub(https://hub.streamnative.io/) 中的監控工具。
添加 Jaeger client dependency 以連接到 Jaeger 后端。
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client</artifactId>
<version>2.5.1</version>
</dependency>
<dependency>
<groupId>io.streamnative</groupId>
<artifactId>opentracing-pulsar-client</artifactId>
<version>0.1.0</version>
</dependency>
<dependency>
<groupId>io.jaegertracing</groupId>
<artifactId>jaeger-client</artifactId>
<version>1.2.0</version>
</dependency>
第 3 步:使用 OpenTracing Pulsar Client
為便于理解,本示例假設有 2 個 Job 和 2 個 topic。Job-1 向 topic-A 發送消息,Job-2 從 topc-A 消費消息。當 Job 2 收到 topic-A 的消息后,Job 2 會向 topic-B 發送消息,然后 Job-3 從 topic-B 消費消息。因此,在這種情況下有 2 個 topic、2 個 producer 和 2 個 consumer。
要完成上述工作場景中的任務,需要啟動三個應用程序。
Job-1:發布消息到 topic-A
Job-2:消費 topic-A 中的消息,并發布消息到 topic-B
Job-3:消費 topic-B 中的消息
以下示例為發布消息至 topic-A。
Configuration.SamplerConfiguration samplerConfig = Configuration.SamplerConfiguration.fromEnv().withType("const").withParam(1);Configuration.ReporterConfiguration reporterConfig = Configuration.ReporterConfiguration.fromEnv().withLogSpans(true);Configuration configuration = new Configuration("Job-1").withSampler(samplerConfig).withReporter(reporterConfig);Tracer tracer = configuration.getTracer();GlobalTracer.registerIfAbsent(tracer);PulsarClient client = PulsarClient.builder() .serviceUrl("pulsar://localhost:6650") .build();Producer<String> producerA = client.newProducer(Schema.STRING) .topic("topic-A") .intercept(new TracingProducerInterceptor()) .create();for (int i = 0; i < 10; i++) { producerA.newMessage().value(String.format("[%d] Hello", i)).send();}
以下示例為從 topic-A 消費消息,并將消息發布到 topic-B。
Configuration.SamplerConfiguration samplerConfig = Configuration.SamplerConfiguration.fromEnv().withType("const").withParam(1);Configuration.ReporterConfiguration reporterConfig = Configuration.ReporterConfiguration.fromEnv().withLogSpans(true);Configuration configuration = new Configuration("Job-2").withSampler(samplerConfig).withReporter(reporterConfig);Tracer tracer = configuration.getTracer();GlobalTracer.registerIfAbsent(tracer);PulsarClient client = PulsarClient.builder() .serviceUrl("pulsar://localhost:6650") .build();Consumer<String> consumer = client.newConsumer(Schema.STRING) .topic("topic-A") .subscriptionName("open-tracing") .subscriptionType(SubscriptionType.Shared) .intercept(new TracingConsumerInterceptor<>()) .subscribe();Producer<String> producerB = client.newProducer(Schema.STRING) .topic("topic-B") .intercept(new TracingProducerInterceptor()) .create();while (true) { Message<String> received = consumer.receive(); SpanContext context = TracingPulsarUtils.extractSpanContext(received, tracer); TypedMessageBuilder<String> messageBuilder = producerB.newMessage(); messageBuilder.value(received.getValue() + " Pulsar and OpenTracing!"); // Inject parent span context tracer.inject(context, Format.Builtin.TEXT_MAP, new TypeMessageBuilderInjectAdapter(messageBuilder)); messageBuilder.send(); consumer.acknowledge(received);}
????Job-3
以下示例為從 topic-B 消費消息。
Configuration.SamplerConfiguration samplerConfig = Configuration.SamplerConfiguration.fromEnv().withType("const").withParam(1);Configuration.ReporterConfiguration reporterConfig = Configuration.ReporterConfiguration.fromEnv().withLogSpans(true);Configuration configuration = new Configuration("Job-3").withSampler(samplerConfig).withReporter(reporterConfig);Tracer tracer = configuration.getTracer();GlobalTracer.registerIfAbsent(tracer);PulsarClient client = PulsarClient.builder() .serviceUrl("pulsar://localhost:6650") .build();Consumer<String> consumer = client.newConsumer(Schema.STRING) .topic("topic-B") .subscriptionName("open-tracing") .subscriptionType(SubscriptionType.Shared) .intercept(new TracingConsumerInterceptor<>()) .subscribe();while (true) { Message<String> received = consumer.receive(); System.out.println(received.getValue()); consumer.acknowledge(received);}
現在,可以分別運行 Job-3、Job-2 和 Job-1。控制臺中會出現 Job-3 接收的日志,如下:
[0] Hello Pulsar and OpenTracing![1] Hello Pulsar and OpenTracing!...[9] Hello Pulsar and OpenTracing!
現在,你可以再次打開 Jaeger UI,頁面中會出現十條消息追蹤鏈路。
點擊任務名稱即可查看消息追蹤鏈路的詳細信息。
可以從 span 名稱輕松辨別是 producer 還是 consumer 發布了此條消息,span 名稱格式為 `To__<topic-name>` 和 `From__<topic-name>__<subscription_name>`。
以上就是如何使用OpenTracing和Jaeger 追蹤 Pulsar消息,小編相信有部分知識點可能是我們日常工作會見到或用到的。希望你能通過這篇文章學到更多知識。更多詳情敬請關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。