Skip to content

Commit

Permalink
Merge branch '2.2.x'
Browse files Browse the repository at this point in the history
  • Loading branch information
hanwavefront committed May 1, 2021
2 parents 830722f + 02bb7b9 commit 5d0414f
Show file tree
Hide file tree
Showing 4 changed files with 172 additions and 58 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package com.wavefront.spring.autoconfigure;

import java.io.Closeable;
import java.io.IOException;

import brave.handler.MutableSpan;
import brave.handler.SpanHandler;
import brave.propagation.TraceContext;

import org.springframework.cloud.sleuth.brave.bridge.BraveFinishedSpan;
import org.springframework.cloud.sleuth.brave.bridge.BraveTraceContext;

class WavefrontSleuthBraveSpanHandler extends SpanHandler implements Runnable, Closeable {

final WavefrontSleuthSpanHandler spanHandler;

WavefrontSleuthBraveSpanHandler(WavefrontSleuthSpanHandler spanHandler) {
this.spanHandler = spanHandler;
}

@Override
public boolean end(TraceContext context, MutableSpan span, Cause cause) {
return spanHandler.end(BraveTraceContext.fromBrave(context), BraveFinishedSpan.fromBrave(span));
}

@Override
public void close() throws IOException {
this.spanHandler.close();
}

@Override
public void run() {
this.spanHandler.run();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Executors;
Expand All @@ -17,10 +17,8 @@
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import brave.handler.MutableSpan;
import brave.handler.SpanHandler;
import brave.propagation.TraceContext;
import com.wavefront.internal.reporter.WavefrontInternalReporter;
import com.wavefront.java_sdk.com.google.common.collect.Iterators;
import com.wavefront.java_sdk.com.google.common.collect.Sets;
import com.wavefront.sdk.common.NamedThreadFactory;
import com.wavefront.sdk.common.Pair;
Expand All @@ -32,6 +30,10 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import org.springframework.cloud.sleuth.TraceContext;
import org.springframework.cloud.sleuth.exporter.FinishedSpan;
import org.springframework.util.StringUtils;

import static com.wavefront.internal.SpanDerivedMetricsUtils.TRACING_DERIVED_PREFIX;
import static com.wavefront.internal.SpanDerivedMetricsUtils.reportHeartbeats;
import static com.wavefront.internal.SpanDerivedMetricsUtils.reportWavefrontGeneratedData;
Expand Down Expand Up @@ -66,7 +68,7 @@
* {@link UUID#timestamp()} on UUIDs converted here, or in other Wavefront code, as it might
* throw.
*/
final class WavefrontSleuthSpanHandler extends SpanHandler implements Runnable, Closeable {
public final class WavefrontSleuthSpanHandler implements Runnable, Closeable {
private static final Log LOG = LogFactory.getLog(WavefrontSleuthSpanHandler.class);

// https://github.com/wavefrontHQ/wavefront-proxy/blob/3dd1fa11711a04de2d9d418e2269f0f9fb464f36/proxy/src/main/java/com/wavefront/agent/listeners/tracing/ZipkinPortUnificationHandler.java#L114-L114
Expand All @@ -75,7 +77,21 @@ final class WavefrontSleuthSpanHandler extends SpanHandler implements Runnable,
private final static String DEFAULT_SOURCE = "wavefront-spring-boot";
private final static String WAVEFRONT_GENERATED_COMPONENT = "wavefront-generated";

final LinkedBlockingQueue<Pair<TraceContext, MutableSpan>> spanBuffer;
private static final int LONG_BYTES = Long.SIZE / Byte.SIZE;

private static final int BYTE_BASE16 = 2;

private static final int LONG_BASE16 = BYTE_BASE16 * LONG_BYTES;

private static final int TRACE_ID_HEX_SIZE = 2 * LONG_BASE16;

private static final String ALPHABET = "0123456789abcdef";

private static final int ASCII_CHARACTERS = 128;

private static final byte[] DECODING = buildDecodingArray();

final LinkedBlockingQueue<Pair<TraceContext, FinishedSpan>> spanBuffer;
final WavefrontSender wavefrontSender;
final WavefrontInternalReporter wfInternalReporter;
final Set<String> traceDerivedCustomTagKeys;
Expand Down Expand Up @@ -144,7 +160,7 @@ final class WavefrontSleuthSpanHandler extends SpanHandler implements Runnable,

// Exact same behavior as WavefrontSpanReporter
// https://github.com/wavefrontHQ/wavefront-opentracing-sdk-java/blob/f1f08d8daf7b692b9b61dcd5bc24ca6befa8e710/src/main/java/com/wavefront/opentracing/reporting/WavefrontSpanReporter.java#L163-L179
@Override public boolean end(TraceContext context, MutableSpan span, Cause cause) {
public boolean end(TraceContext context, FinishedSpan span) {
spansReceived.increment();
if (!spanBuffer.offer(Pair.of(context, span))) {
spansDropped.increment();
Expand All @@ -160,31 +176,49 @@ List<Pair<String, String>> getDefaultTags() {
return Collections.unmodifiableList(this.defaultTags);
}

private void send(TraceContext context, MutableSpan span) {
UUID traceId = new UUID(context.traceIdHigh(), context.traceId());
UUID spanId = new UUID(0L, context.spanId());
private String padLeftWithZeros(String string, int length) {
if (string.length() >= length) {
return string;
}
else {
StringBuilder sb = new StringBuilder(length);
for (int i = string.length(); i < length; i++) {
sb.append('0');
}

return sb.append(string).toString();
}
}

private void send(TraceContext context, FinishedSpan span) {
String traceIdString = padLeftWithZeros(context.traceId(), TRACE_ID_HEX_SIZE);
String traceIdHigh = traceIdString.substring(0, traceIdString.length() / 2);
String traceIdLow = traceIdString.substring(traceIdString.length() / 2);
UUID traceId = new UUID(longFromBase16String(traceIdHigh), longFromBase16String(traceIdLow));
UUID spanId = new UUID(0L, longFromBase16String(context.spanId()));

// NOTE: wavefront-opentracing-sdk-java and wavefront-proxy differ, but we prefer the former.
// https://github.com/wavefrontHQ/wavefront-opentracing-sdk-java/blob/f1f08d8daf7b692b9b61dcd5bc24ca6befa8e710/src/main/java/com/wavefront/opentracing/reporting/WavefrontSpanReporter.java#L187-L190
// https://github.com/wavefrontHQ/wavefront-proxy/blob/3dd1fa11711a04de2d9d418e2269f0f9fb464f36/proxy/src/main/java/com/wavefront/agent/listeners/tracing/ZipkinPortUnificationHandler.java#L248-L252
List<UUID> parents = null;
if (context.parentIdAsLong() != 0L) {
parents = Collections.singletonList(new UUID(0L, context.parentIdAsLong()));
String parentId = context.parentId();
if (StringUtils.hasText(parentId) && longFromBase16String(parentId) != 0L) {
parents = Collections.singletonList(new UUID(0L, longFromBase16String(parentId)));
}
List<UUID> followsFrom = null;

// https://github.com/wavefrontHQ/wavefront-proxy/blob/3dd1fa11711a04de2d9d418e2269f0f9fb464f36/proxy/src/main/java/com/wavefront/agent/listeners/tracing/ZipkinPortUnificationHandler.java#L344-L345
String name = span.name();
String name = span.getName();
if (name == null) name = DEFAULT_SPAN_NAME;

// Start and duration become 0L if unset. Any positive duration rounds up to 1 millis.
long startMillis = span.startTimestamp() / 1000L, finishMillis = span.finishTimestamp() / 1000L;
long startMillis = span.getStartTimestamp() / 1000L, finishMillis = span.getEndTimestamp() / 1000L;
long durationMillis = startMillis != 0 && finishMillis != 0L ? Math.max(finishMillis - startMillis, 1L) : 0L;
long durationMicros = span.startTimestamp() != 0L && span.finishTimestamp() != 0L ?
span.finishTimestamp() - span.startTimestamp() : 0;
long durationMicros = span.getStartTimestamp() != 0L && span.getEndTimestamp() != 0L ?
span.getEndTimestamp() - span.getStartTimestamp() : 0;

List<SpanLog> spanLogs = convertAnnotationsToSpanLogs(span);
TagList tags = new TagList(defaultTagKeys, defaultTags, context, span);
TagList tags = new TagList(defaultTagKeys, defaultTags, span);

try {
wavefrontSender.sendSpan(name, startMillis, durationMillis, source, traceId, spanId,
Expand Down Expand Up @@ -213,6 +247,38 @@ private void send(TraceContext context, MutableSpan span) {
}
}

private static byte[] buildDecodingArray() {
byte[] decoding = new byte[ASCII_CHARACTERS];
Arrays.fill(decoding, (byte) -1);
for (int i = 0; i < ALPHABET.length(); i++) {
char c = ALPHABET.charAt(i);
decoding[c] = (byte) i;
}
return decoding;
}

/**
* Returns the {@code long} value whose base16 representation is stored in the first
* 16 chars of {@code chars} starting from the {@code offset}.
* @param chars the base16 representation of the {@code long}.
*/
private static long longFromBase16String(CharSequence chars) {
int offset = 0;
return (decodeByte(chars.charAt(offset), chars.charAt(offset + 1)) & 0xFFL) << 56
| (decodeByte(chars.charAt(offset + 2), chars.charAt(offset + 3)) & 0xFFL) << 48
| (decodeByte(chars.charAt(offset + 4), chars.charAt(offset + 5)) & 0xFFL) << 40
| (decodeByte(chars.charAt(offset + 6), chars.charAt(offset + 7)) & 0xFFL) << 32
| (decodeByte(chars.charAt(offset + 8), chars.charAt(offset + 9)) & 0xFFL) << 24
| (decodeByte(chars.charAt(offset + 10), chars.charAt(offset + 11)) & 0xFFL) << 16
| (decodeByte(chars.charAt(offset + 12), chars.charAt(offset + 13)) & 0xFFL) << 8
| (decodeByte(chars.charAt(offset + 14), chars.charAt(offset + 15)) & 0xFFL);
}

private static byte decodeByte(char hi, char lo) {
int decoded = DECODING[hi] << 4 | DECODING[lo];
return (byte) decoded;
}

/**
* Extracted for test isolation and as parsing otherwise implies multiple-returns or scanning
* later.
Expand All @@ -227,17 +293,20 @@ static final class TagList extends ArrayList<Pair<String, String>> {
TagList(
Set<String> defaultTagKeys,
List<Pair<String, String>> defaultTags,
TraceContext context,
MutableSpan span
FinishedSpan span
){
super(defaultTags.size() + span.tagCount());
boolean debug = context.debug(), hasAnnotations = span.annotationCount() > 0;
isError = span.error() != null;
super(defaultTags.size() + span.getTags().size());
// TODO: OTel doesn't have a notion of debug
boolean debug = false;
boolean hasAnnotations = span.getEvents().size() > 0;
isError = span.getError() != null;

int tagCount = span.tagCount();
int tagCount = span.getTags().size();
addAll(defaultTags);
for (int i = 0; i < tagCount; i++) {
String key = span.tagKeyAt(i), value = span.tagValueAt(i);
String tagKey = Iterators.get(span.getTags().keySet().iterator(), i);
String tagValue = Iterators.get(span.getTags().values().iterator(), i);
String key = tagKey, value = tagValue;
String lcKey = key.toLowerCase(Locale.ROOT);
if (lcKey.equals(ERROR_TAG_KEY)) {
isError = true;
Expand All @@ -262,8 +331,8 @@ static final class TagList extends ArrayList<Pair<String, String>> {
if (debug) add(Pair.of(DEBUG_TAG_KEY, "true"));

// https://github.com/wavefrontHQ/wavefront-proxy/blob/3dd1fa11711a04de2d9d418e2269f0f9fb464f36/proxy/src/main/java/com/wavefront/agent/listeners/tracing/ZipkinPortUnificationHandler.java#L254-L266
if (span.kind() != null) {
String kind = span.kind().toString().toLowerCase();
if (span.getKind() != null) {
String kind = span.getKind().toString().toLowerCase();
add(Pair.of("span.kind", kind));
if (hasAnnotations) {
add(Pair.of("_spanSecondaryId", kind));
Expand All @@ -274,20 +343,21 @@ static final class TagList extends ArrayList<Pair<String, String>> {
if (hasAnnotations) add(Pair.of(SPAN_LOG_KEY, "true"));

// https://github.com/wavefrontHQ/wavefront-proxy/blob/3dd1fa11711a04de2d9d418e2269f0f9fb464f36/proxy/src/main/java/com/wavefront/agent/listeners/tracing/ZipkinPortUnificationHandler.java#L324-L327
if (span.localIp() != null) {
add(Pair.of("ipv4", span.localIp())); // NOTE: this could be IPv6!!
if (span.getLocalIp() != null) {
add(Pair.of("ipv4", span.getLocalIp())); // NOTE: this could be IPv6!!
}
}
}

// https://github.com/wavefrontHQ/wavefront-proxy/blob/3dd1fa11711a04de2d9d418e2269f0f9fb464f36/proxy/src/main/java/com/wavefront/agent/listeners/tracing/ZipkinPortUnificationHandler.java#L397-L402
static List<SpanLog> convertAnnotationsToSpanLogs(MutableSpan span) {
int annotationCount = span.annotationCount();
static List<SpanLog> convertAnnotationsToSpanLogs(FinishedSpan span) {
int annotationCount = span.getEvents().size();
if (annotationCount == 0) return Collections.emptyList();
List<SpanLog> spanLogs = new ArrayList<>(annotationCount);
for (int i = 0; i < annotationCount; i++) {
long epochMicros = span.annotationTimestampAt(i);
String value = span.annotationValueAt(i);
Map.Entry<Long, String> entry = Iterators.get(span.getEvents().iterator(), i);
long epochMicros = entry.getKey();
String value = entry.getValue();
spanLogs.add(new SpanLog(epochMicros, Collections.singletonMap("annotation", value)));
}
return spanLogs;
Expand All @@ -296,7 +366,7 @@ static List<SpanLog> convertAnnotationsToSpanLogs(MutableSpan span) {
@Override public void run() {
while (!stop) {
try {
Pair<TraceContext, MutableSpan> contextAndSpan = spanBuffer.take();
Pair<TraceContext, FinishedSpan> contextAndSpan = spanBuffer.take();
send(contextAndSpan._1, contextAndSpan._2);
} catch (InterruptedException ex) {
if (LOG.isInfoEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.wavefront.spring.autoconfigure;

import brave.Tracer;
import brave.TracingCustomizer;
import brave.handler.SpanHandler;
import com.wavefront.sdk.common.WavefrontSender;
Expand All @@ -23,30 +24,37 @@
* @author Stephane Nicoll
*/
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass({ SpanNamer.class, TracingCustomizer.class, SpanHandler.class })
@ConditionalOnClass({ SpanNamer.class, MeterRegistry.class, WavefrontConfig.class, WavefrontSender.class })
@AutoConfigureBefore(BraveAutoConfiguration.class)
class WavefrontTracingSleuthConfiguration {

static final String BEAN_NAME = "wavefrontTracingCustomizer";

@Bean(BEAN_NAME)
@ConditionalOnMissingBean(name = BEAN_NAME)
@Bean
@ConditionalOnBean({ MeterRegistry.class, WavefrontConfig.class, WavefrontSender.class })
TracingCustomizer wavefrontTracingCustomizer(MeterRegistry meterRegistry,
WavefrontSender wavefrontSender,
ApplicationTags applicationTags,
WavefrontConfig wavefrontConfig,
WavefrontProperties wavefrontProperties) {
WavefrontSleuthSpanHandler spanHandler = new WavefrontSleuthSpanHandler(
// https://github.com/wavefrontHQ/wavefront-opentracing-sdk-java/blob/f1f08d8daf7b692b9b61dcd5bc24ca6befa8e710/src/main/java/com/wavefront/opentracing/reporting/WavefrontSpanReporter.java#L54
50000, // TODO: maxQueueSize should be a property, ya?
wavefrontSender,
meterRegistry,
wavefrontConfig.source(),
applicationTags,
wavefrontProperties);

return t -> t.traceId128Bit(true).supportsJoin(false).addSpanHandler(spanHandler);
WavefrontSleuthSpanHandler wavefrontSleuthSpanHandler(MeterRegistry meterRegistry,
WavefrontSender wavefrontSender,
ApplicationTags applicationTags,
WavefrontConfig wavefrontConfig,
WavefrontProperties wavefrontProperties) {
return new WavefrontSleuthSpanHandler(
// https://github.com/wavefrontHQ/wavefront-opentracing-sdk-java/blob/f1f08d8daf7b692b9b61dcd5bc24ca6befa8e710/src/main/java/com/wavefront/opentracing/reporting/WavefrontSpanReporter.java#L54
50000, // TODO: maxQueueSize should be a property, ya?
wavefrontSender,
meterRegistry,
wavefrontConfig.source(),
applicationTags,
wavefrontProperties);
}

@Configuration(proxyBeanMethods = false)
@ConditionalOnClass({Tracer.class, TracingCustomizer.class, SpanHandler.class })
static class BraveCustomizerConfiguration {
@Bean(BEAN_NAME)
@ConditionalOnMissingBean(name = BEAN_NAME)
@ConditionalOnBean({ MeterRegistry.class, WavefrontConfig.class, WavefrontSender.class })
TracingCustomizer wavefrontTracingCustomizer(WavefrontSleuthSpanHandler spanHandler) {
return t -> t.traceId128Bit(true).supportsJoin(false).addSpanHandler(new WavefrontSleuthBraveSpanHandler(spanHandler));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,8 @@ void tracingWithSleuthIsConfiguredWithWavefrontSender() {
.with(sleuth())
.run((context) -> {
assertThat(context).hasSingleBean(TracingCustomizer.class);
WavefrontSleuthSpanHandler spanHandler = extractSpanHandler(context.getBean(Tracer.class));
assertThat(spanHandler).hasFieldOrPropertyWithValue("wavefrontSender", sender);
WavefrontSleuthBraveSpanHandler braveSpanHandler = extractSpanHandler(context.getBean(Tracer.class));
assertThat(braveSpanHandler.spanHandler).hasFieldOrPropertyWithValue("wavefrontSender", sender);
});
}

Expand Down Expand Up @@ -247,8 +247,8 @@ private ContextConsumer<AssertableApplicationContext> assertSleuthSpanDefaultTag
String serviceName, String cluster, String shard) {
return (context) -> {
assertThat(context).hasSingleBean(TracingCustomizer.class);
WavefrontSleuthSpanHandler spanHandler = extractSpanHandler(context.getBean(Tracer.class));
assertThat(spanHandler.getDefaultTags()).contains(
WavefrontSleuthBraveSpanHandler braveSpanHandler = extractSpanHandler(context.getBean(Tracer.class));
assertThat(braveSpanHandler.spanHandler.getDefaultTags()).contains(
new Pair<>("application", applicationName),
new Pair<>("service", serviceName),
new Pair<>("cluster", cluster),
Expand All @@ -266,7 +266,8 @@ void tracingWithSleuthCanBeConfigured() {
.with(sleuth())
.run((context) -> {
assertThat(context).hasSingleBean(TracingCustomizer.class);
WavefrontSleuthSpanHandler spanHandler = extractSpanHandler(context.getBean(Tracer.class));
WavefrontSleuthBraveSpanHandler braveSpanHandler = extractSpanHandler(context.getBean(Tracer.class));
WavefrontSleuthSpanHandler spanHandler = braveSpanHandler.spanHandler;
Set<String> traceDerivedCustomTagKeys = (Set<String>) ReflectionTestUtils.getField(
spanHandler, "traceDerivedCustomTagKeys");
assertThat(traceDerivedCustomTagKeys).containsExactlyInAnyOrder("region", "test");
Expand Down Expand Up @@ -392,12 +393,12 @@ void tracingIsNotConfiguredWithNonWavefrontRegistry() {
}

@SuppressWarnings("ConstantConditions")
private WavefrontSleuthSpanHandler extractSpanHandler(Tracer tracer) {
private WavefrontSleuthBraveSpanHandler extractSpanHandler(Tracer tracer) {
SpanHandler[] handlers = (SpanHandler[]) ReflectionTestUtils.getField(
ReflectionTestUtils.getField(
ReflectionTestUtils.getField(tracer, "spanHandler"), "delegate"),
"handlers");
return (WavefrontSleuthSpanHandler) handlers[1];
return (WavefrontSleuthBraveSpanHandler) handlers[1];
}

@SuppressWarnings("unchecked")
Expand Down

0 comments on commit 5d0414f

Please sign in to comment.