Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement consumer part of rocketmq new client instrumentation #7019

Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,23 @@ dependencies {

testImplementation(project(":instrumentation:rocketmq:rocketmq-client:rocketmq-client-5.0:testing"))
}

tasks {
val testReceiveSpanDisabled by registering(Test::class) {
filter {
includeTestsMatching("RocketMqClientSuppressReceiveSpanTest")
}
include("**/RocketMqClientSuppressReceiveSpanTest.*")
}

test {
filter {
excludeTestsMatching("RocketMqClientSuppressReceiveSpanTest")
}
jvmArgs("-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=true")
}

check {
dependsOn(testReceiveSpanDisabled)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.rocketmqclient.v5_0;

import static net.bytebuddy.matcher.ElementMatchers.isConstructor;
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;

import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.rocketmq.client.apis.consumer.MessageListener;

final class ConsumeServiceInstrumentation implements TypeInstrumentation {
@Override
public ElementMatcher<TypeDescription> typeMatcher() {
// Instrument ConsumerService instead of MessageListener because lambda could not be enhanced.
aaron-ai marked this conversation as resolved.
Show resolved Hide resolved
return named("org.apache.rocketmq.client.java.impl.consumer.ConsumeService");
}

@Override
public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
isConstructor()
.and(
isPublic()
.and(takesArguments(5))
.and(
takesArgument(
1, named("org.apache.rocketmq.client.apis.consumer.MessageListener")))),
ConsumeServiceInstrumentation.class.getName() + "$ConstructorAdvice");
}

@SuppressWarnings("unused")
public static class ConstructorAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(
@Advice.Argument(value = 1, readOnly = false) MessageListener messageListener) {
// Replace messageListener by wrapper.
if (!(messageListener instanceof MessageListenerWrapper)) {
messageListener = new MessageListenerWrapper(messageListener);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.rocketmqclient.v5_0;

import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;

import apache.rocketmq.v2.ReceiveMessageRequest;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.rocketmq.client.java.impl.consumer.ReceiveMessageResult;
import org.apache.rocketmq.shaded.com.google.common.util.concurrent.Futures;
import org.apache.rocketmq.shaded.com.google.common.util.concurrent.ListenableFuture;
import org.apache.rocketmq.shaded.com.google.common.util.concurrent.MoreExecutors;

final class ConsumerImplInstrumentation implements TypeInstrumentation {
@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return named("org.apache.rocketmq.client.java.impl.consumer.ConsumerImpl");
}

@Override
public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
isMethod()
.and(named("receiveMessage"))
.and(takesArguments(3))
.and(takesArgument(0, named("apache.rocketmq.v2.ReceiveMessageRequest")))
.and(takesArgument(1, named("org.apache.rocketmq.client.java.route.MessageQueueImpl")))
.and(takesArgument(2, named("java.time.Duration"))),
ConsumerImplInstrumentation.class.getName() + "$ReceiveMessageAdvice");
}

@SuppressWarnings("unused")
public static class ReceiveMessageAdvice {

@Advice.OnMethodEnter(suppress = Throwable.class)
public static Timer onStart() {
return Timer.start();
}

@Advice.OnMethodExit(suppress = Throwable.class)
public static void onExit(
@Advice.Argument(0) ReceiveMessageRequest request,
@Advice.Enter Timer timer,
@Advice.Return ListenableFuture<ReceiveMessageResult> future) {
ReceiveSpanFinishingCallback spanFinishingCallback =
new ReceiveSpanFinishingCallback(request, timer);
Futures.addCallback(future, spanFinishingCallback, MoreExecutors.directExecutor());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.rocketmqclient.v5_0;

import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.MessageListener;
import org.apache.rocketmq.client.apis.message.MessageView;

public final class MessageListenerWrapper implements MessageListener {
private final MessageListener delegator;

public MessageListenerWrapper(MessageListener delegator) {
this.delegator = delegator;
}

@Override
public ConsumeResult consume(MessageView messageView) {
Context parentContext = VirtualFieldStore.getContextByMessage(messageView);
if (parentContext == null) {
parentContext = Context.current();
}
Instrumenter<MessageView, ConsumeResult> processInstrumenter =
RocketMqSingletons.consumerProcessInstrumenter();
if (!processInstrumenter.shouldStart(parentContext, messageView)) {
return delegator.consume(messageView);
}
Context context = processInstrumenter.start(parentContext, messageView);
try (Scope ignored = context.makeCurrent()) {
ConsumeResult consumeResult = delegator.consume(messageView);
processInstrumenter.end(context, messageView, consumeResult, null);
return consumeResult;
} catch (Throwable t) {
processInstrumenter.end(context, messageView, null, t);
throw t;
}
aaron-ai marked this conversation as resolved.
Show resolved Hide resolved
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.rocketmqclient.v5_0;

import io.opentelemetry.context.propagation.TextMapGetter;
import javax.annotation.Nullable;
import org.apache.rocketmq.client.apis.message.MessageView;

enum MessageMapGetter implements TextMapGetter<MessageView> {
INSTANCE;

@Override
public Iterable<String> keys(MessageView carrier) {
return carrier.getProperties().keySet();
}

@Nullable
@Override
public String get(@Nullable MessageView carrier, String key) {
return carrier == null ? null : carrier.getProperties().get(key);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import javax.annotation.Nullable;
import org.apache.rocketmq.client.java.message.PublishingMessageImpl;

enum MapSetter implements TextMapSetter<PublishingMessageImpl> {
enum MessageMapSetter implements TextMapSetter<PublishingMessageImpl> {
INSTANCE;

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,11 @@
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.rocketmq.client.java.impl.producer.SendReceiptImpl;
import org.apache.rocketmq.client.java.message.PublishingMessageImpl;
import org.apache.rocketmq.shaded.com.google.common.util.concurrent.FutureCallback;
import org.apache.rocketmq.shaded.com.google.common.util.concurrent.Futures;
import org.apache.rocketmq.shaded.com.google.common.util.concurrent.MoreExecutors;
import org.apache.rocketmq.shaded.com.google.common.util.concurrent.SettableFuture;

final class RocketMqProducerInstrumentation implements TypeInstrumentation {
final class ProducerImplInstrumentation implements TypeInstrumentation {

/** Match the implementation of RocketMQ producer. */
@Override
Expand All @@ -52,7 +51,7 @@ public void transform(TypeTransformer transformer) {
.and(takesArgument(3, List.class))
.and(takesArgument(4, List.class))
.and(takesArgument(5, int.class)),
RocketMqProducerInstrumentation.class.getName() + "$SendAdvice");
ProducerImplInstrumentation.class.getName() + "$SendAdvice");
}

@SuppressWarnings("unused")
Expand Down Expand Up @@ -86,34 +85,9 @@ public static void onEnter(
Context context = instrumenter.start(parentContext, message);
Futures.addCallback(
future,
new SpanFinishingCallback(instrumenter, context, message),
new SendSpanFinishingCallback(instrumenter, context, message),
MoreExecutors.directExecutor());
}
}
}

public static class SpanFinishingCallback implements FutureCallback<SendReceiptImpl> {
private final Instrumenter<PublishingMessageImpl, SendReceiptImpl> instrumenter;
private final Context context;
private final PublishingMessageImpl message;

public SpanFinishingCallback(
Instrumenter<PublishingMessageImpl, SendReceiptImpl> instrumenter,
Context context,
PublishingMessageImpl message) {
this.instrumenter = instrumenter;
this.context = context;
this.message = message;
}

@Override
public void onSuccess(SendReceiptImpl sendReceipt) {
instrumenter.end(context, message, sendReceipt, null);
}

@Override
public void onFailure(Throwable t) {
instrumenter.end(context, message, null, t);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.apache.rocketmq.client.java.message.MessageImpl;
import org.apache.rocketmq.client.java.message.PublishingMessageImpl;

final class RocketMqPublishingMessageImplInstrumentation implements TypeInstrumentation {
final class PublishingMessageImplInstrumentation implements TypeInstrumentation {

@Override
public ElementMatcher<TypeDescription> typeMatcher() {
Expand All @@ -44,10 +44,10 @@ public void transform(TypeTransformer transformer) {
takesArgument(
1, named("org.apache.rocketmq.client.java.impl.producer.PublishingSettings")))
.and(takesArgument(2, boolean.class)),
RocketMqPublishingMessageImplInstrumentation.class.getName() + "$ConstructorAdvice");
PublishingMessageImplInstrumentation.class.getName() + "$ConstructorAdvice");
transformer.applyAdviceToMethod(
isMethod().and(named("getProperties")).and(isPublic()),
RocketMqPublishingMessageImplInstrumentation.class.getName() + "$GetPropertiesAdvice");
PublishingMessageImplInstrumentation.class.getName() + "$GetPropertiesAdvice");
}

@SuppressWarnings("unused")
Expand All @@ -56,7 +56,7 @@ public static class ConstructorAdvice {
* The constructor of {@link PublishingMessageImpl} is always called in the same thread that
* user invoke {@link Producer#send(Message)}/{@link Producer#sendAsync(Message)}/{@link
* Producer#send(Message, Transaction)}. Store the {@link Context} here and fetch it in {@link
* RocketMqProducerInstrumentation}.
* ProducerImplInstrumentation}.
*/
@Advice.OnMethodExit(suppress = Throwable.class)
public static void onExit(@Advice.This PublishingMessageImpl message) {
Expand All @@ -66,7 +66,7 @@ public static void onExit(@Advice.This PublishingMessageImpl message) {

@SuppressWarnings("unused")
public static class GetPropertiesAdvice {
/** Update the message properties to propagate context recorded by {@link MapSetter}. */
/** Update the message properties to propagate context recorded by {@link MessageMapSetter}. */
@Advice.OnMethodExit(suppress = Throwable.class)
public static void onExit(
@Advice.This MessageImpl messageImpl,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.rocketmqclient.v5_0;

import apache.rocketmq.v2.ReceiveMessageRequest;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.api.internal.InstrumenterUtil;
import java.util.List;
import org.apache.rocketmq.client.apis.message.MessageView;
import org.apache.rocketmq.client.java.impl.consumer.ReceiveMessageResult;
import org.apache.rocketmq.client.java.message.MessageViewImpl;
import org.apache.rocketmq.shaded.com.google.common.util.concurrent.FutureCallback;

public final class ReceiveSpanFinishingCallback implements FutureCallback<ReceiveMessageResult> {

private final ReceiveMessageRequest request;
private final Timer timer;

public ReceiveSpanFinishingCallback(ReceiveMessageRequest request, Timer timer) {
this.request = request;
this.timer = timer;
}

@Override
public void onSuccess(ReceiveMessageResult receiveMessageResult) {
List<MessageViewImpl> messageViews = receiveMessageResult.getMessageViewImpls();
// Don't create spans when no messages were received.
if (messageViews.isEmpty()) {
return;
}
String consumerGroup = request.getGroup().getName();
for (MessageViewImpl messageView : messageViews) {
VirtualFieldStore.setConsumerGroupByMessage(messageView, consumerGroup);
}
Instrumenter<ReceiveMessageRequest, List<MessageView>> receiveInstrumenter =
RocketMqSingletons.consumerReceiveInstrumenter();
Context parentContext = Context.current();
if (receiveInstrumenter.shouldStart(parentContext, request)) {
Context context =
InstrumenterUtil.startAndEnd(
receiveInstrumenter,
parentContext,
request,
null,
null,
timer.startTime(),
timer.now());
for (MessageViewImpl messageView : messageViews) {
VirtualFieldStore.setContextByMessage(messageView, context);
}
}
}

@Override
public void onFailure(Throwable throwable) {
Instrumenter<ReceiveMessageRequest, List<MessageView>> receiveInstrumenter =
RocketMqSingletons.consumerReceiveInstrumenter();
Context parentContext = Context.current();
if (receiveInstrumenter.shouldStart(parentContext, request)) {
InstrumenterUtil.startAndEnd(
receiveInstrumenter,
parentContext,
request,
null,
throwable,
timer.startTime(),
timer.now());
}
}
}
Loading