Skip to content

Commit

Permalink
Implement producer part of RocketMQ new client instrumentation (open-…
Browse files Browse the repository at this point in the history
…telemetry#6884)

Fix open-telemetry#6764 . This pull request is about the producer part.
  • Loading branch information
aaron-ai authored and LironKS committed Oct 31, 2022
1 parent 0171fd4 commit a08509f
Show file tree
Hide file tree
Showing 17 changed files with 745 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Settings for the Apache RocketMQ Remoting-based client instrumentation
# Settings for the Apache RocketMQ remoting-based client instrumentation

| System property | Type | Default | Description |
|---|---|---|---|
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Library Instrumentation for Apache RocketMQ Remoting-based Client 4.0.0+
# Library Instrumentation for Apache RocketMQ remoting-based client 4.0.0+

Provides OpenTelemetry instrumentation for [Apache RocketMQ](https://rocketmq.apache.org/) remoting-based client.

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
plugins {
id("otel.javaagent-instrumentation")
}

muzzle {
pass {
group.set("org.apache.rocketmq")
module.set("rocketmq-client-java")
versions.set("[5.0.0,)")
assertInverse.set(true)
}
}

dependencies {
library("org.apache.rocketmq:rocketmq-client-java:5.0.0")

testImplementation(project(":instrumentation:rocketmq:rocketmq-client:rocketmq-client-5.0:testing"))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.rocketmqclient.v5_0;

import java.util.ArrayList;
import java.util.List;
import org.apache.rocketmq.shaded.com.google.common.util.concurrent.FutureCallback;
import org.apache.rocketmq.shaded.com.google.common.util.concurrent.Futures;
import org.apache.rocketmq.shaded.com.google.common.util.concurrent.MoreExecutors;
import org.apache.rocketmq.shaded.com.google.common.util.concurrent.SettableFuture;

/** Future converter, which covert future of list into list of future. */
public class FutureConverter {
private FutureConverter() {}

public static <T> List<SettableFuture<T>> convert(SettableFuture<List<T>> future, int num) {
List<SettableFuture<T>> futures = new ArrayList<>(num);
for (int i = 0; i < num; i++) {
SettableFuture<T> f = SettableFuture.create();
futures.add(f);
}
ListFutureCallback<T> futureCallback = new ListFutureCallback<>(futures);
Futures.addCallback(future, futureCallback, MoreExecutors.directExecutor());
return futures;
}

public static class ListFutureCallback<T> implements FutureCallback<List<T>> {
private final List<SettableFuture<T>> futures;

public ListFutureCallback(List<SettableFuture<T>> futures) {
this.futures = futures;
}

@Override
public void onSuccess(List<T> result) {
for (int i = 0; i < result.size(); i++) {
futures.get(i).set(result.get(i));
}
}

@Override
public void onFailure(Throwable t) {
for (SettableFuture<T> future : futures) {
future.setException(t);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.rocketmqclient.v5_0;

import io.opentelemetry.context.propagation.TextMapSetter;
import java.util.HashMap;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.rocketmq.client.java.message.PublishingMessageImpl;

enum MapSetter implements TextMapSetter<PublishingMessageImpl> {
INSTANCE;

@Override
public void set(@Nullable PublishingMessageImpl message, String key, String value) {
if (message == null) {
return;
}
Map<String, String> extraProperties = VirtualFieldStore.getExtraPropertiesByMessage(message);
if (extraProperties == null) {
extraProperties = new HashMap<>();
VirtualFieldStore.setExtraPropertiesByMessage(message, extraProperties);
}
extraProperties.put(key, value);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.rocketmqclient.v5_0;

import static java.util.Arrays.asList;

import com.google.auto.service.AutoService;
import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import java.util.List;

@AutoService(InstrumentationModule.class)
public final class RocketMqInstrumentationModule extends InstrumentationModule {
public RocketMqInstrumentationModule() {
super("rocketmq-client", "rocketmq-client-5.0");
}

@Override
public List<TypeInstrumentation> typeInstrumentations() {
return asList(
new RocketMqPublishingMessageImplInstrumentation(), new RocketMqProducerInstrumentation());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.rocketmqclient.v5_0;

import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingSpanNameExtractor;
import java.util.List;
import org.apache.rocketmq.client.java.impl.producer.SendReceiptImpl;
import org.apache.rocketmq.client.java.message.PublishingMessageImpl;

final class RocketMqInstrumenterFactory {
private static final String INSTRUMENTATION_NAME = "io.opentelemetry.rocketmq-client-5.0";

private RocketMqInstrumenterFactory() {}

public static Instrumenter<PublishingMessageImpl, SendReceiptImpl> createProducerInstrumenter(
OpenTelemetry openTelemetry, List<String> capturedHeaders) {

RocketMqProducerAttributeGetter getter = RocketMqProducerAttributeGetter.INSTANCE;
MessageOperation operation = MessageOperation.SEND;

AttributesExtractor<PublishingMessageImpl, SendReceiptImpl> attributesExtractor =
buildMessagingAttributesExtractor(getter, operation, capturedHeaders);

InstrumenterBuilder<PublishingMessageImpl, SendReceiptImpl> instrumenterBuilder =
Instrumenter.<PublishingMessageImpl, SendReceiptImpl>builder(
openTelemetry,
INSTRUMENTATION_NAME,
MessagingSpanNameExtractor.create(getter, operation))
.addAttributesExtractor(attributesExtractor)
.addAttributesExtractor(RocketMqProducerAttributeExtractor.INSTANCE)
.setSpanStatusExtractor(
(spanStatusBuilder, message, sendReceipt, error) -> {
if (null != error) {
spanStatusBuilder.setStatus(StatusCode.ERROR);
}
});

return instrumenterBuilder.buildProducerInstrumenter(MapSetter.INSTANCE);
}

private static <T, R> MessagingAttributesExtractor<T, R> buildMessagingAttributesExtractor(
MessagingAttributesGetter<T, R> getter,
MessageOperation operation,
List<String> capturedHeaders) {
return MessagingAttributesExtractor.builder(getter, operation)
.setCapturedHeaders(capturedHeaders)
.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.rocketmqclient.v5_0;

import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_KEYS;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_TAG;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_TYPE;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MessagingRocketmqMessageTypeValues.DELAY;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MessagingRocketmqMessageTypeValues.FIFO;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MessagingRocketmqMessageTypeValues.NORMAL;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MessagingRocketmqMessageTypeValues.TRANSACTION;

import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor;
import java.util.ArrayList;
import javax.annotation.Nullable;
import org.apache.rocketmq.client.java.impl.producer.SendReceiptImpl;
import org.apache.rocketmq.client.java.message.PublishingMessageImpl;

enum RocketMqProducerAttributeExtractor
implements AttributesExtractor<PublishingMessageImpl, SendReceiptImpl> {
INSTANCE;

@Override
public void onStart(
AttributesBuilder attributes, Context parentContext, PublishingMessageImpl message) {
message.getTag().ifPresent(s -> attributes.put(MESSAGING_ROCKETMQ_MESSAGE_TAG, s));
attributes.put(MESSAGING_ROCKETMQ_MESSAGE_KEYS, new ArrayList<>(message.getKeys()));
switch (message.getMessageType()) {
case FIFO:
attributes.put(MESSAGING_ROCKETMQ_MESSAGE_TYPE, FIFO);
break;
case DELAY:
attributes.put(MESSAGING_ROCKETMQ_MESSAGE_TYPE, DELAY);
break;
case TRANSACTION:
attributes.put(MESSAGING_ROCKETMQ_MESSAGE_TYPE, TRANSACTION);
break;
default:
attributes.put(MESSAGING_ROCKETMQ_MESSAGE_TYPE, NORMAL);
}
}

@Override
public void onEnd(
AttributesBuilder attributes,
Context context,
PublishingMessageImpl message,
@Nullable SendReceiptImpl sendReceipt,
@Nullable Throwable error) {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.rocketmqclient.v5_0;

import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
import java.util.Collections;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.rocketmq.client.java.impl.producer.SendReceiptImpl;
import org.apache.rocketmq.client.java.message.PublishingMessageImpl;

enum RocketMqProducerAttributeGetter
implements MessagingAttributesGetter<PublishingMessageImpl, SendReceiptImpl> {
INSTANCE;

@Nullable
@Override
public String system(PublishingMessageImpl message) {
return "rocketmq";
}

@Nullable
@Override
public String destinationKind(PublishingMessageImpl message) {
return SemanticAttributes.MessagingDestinationKindValues.TOPIC;
}

@Nullable
@Override
public String destination(PublishingMessageImpl message) {
return message.getTopic();
}

@Override
public boolean temporaryDestination(PublishingMessageImpl message) {
return false;
}

@Nullable
@Override
public String protocol(PublishingMessageImpl message) {
return null;
}

@Nullable
@Override
public String protocolVersion(PublishingMessageImpl message) {
return null;
}

@Nullable
@Override
public String url(PublishingMessageImpl message) {
return null;
}

@Nullable
@Override
public String conversationId(PublishingMessageImpl message) {
return null;
}

@Nullable
@Override
public Long messagePayloadSize(PublishingMessageImpl message) {
return (long) message.getBody().remaining();
}

@Nullable
@Override
public Long messagePayloadCompressedSize(PublishingMessageImpl message) {
return null;
}

@Nullable
@Override
public String messageId(PublishingMessageImpl message, @Nullable SendReceiptImpl sendReceipt) {
return message.getMessageId().toString();
}

@Override
public List<String> header(PublishingMessageImpl message, String name) {
String value = message.getProperties().get(name);
if (value != null) {
return Collections.singletonList(value);
}
return Collections.emptyList();
}
}
Loading

0 comments on commit a08509f

Please sign in to comment.