-
Notifications
You must be signed in to change notification settings - Fork 826
/
TracingRequestHandler.java
101 lines (86 loc) · 3.68 KB
/
TracingRequestHandler.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.awssdk.v1_11;
import com.amazonaws.AmazonWebServiceRequest;
import com.amazonaws.Request;
import com.amazonaws.Response;
import com.amazonaws.handlers.HandlerContextKey;
import com.amazonaws.handlers.RequestHandler2;
import com.google.errorprone.annotations.CanIgnoreReturnValue;
import io.opentelemetry.context.Context;
import io.opentelemetry.contrib.awsxray.propagator.AwsXrayPropagator;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import java.util.List;
import javax.annotation.Nullable;
/** Tracing Request Handler. */
final class TracingRequestHandler extends RequestHandler2 {
static final HandlerContextKey<Context> CONTEXT =
new HandlerContextKey<>(Context.class.getName());
private final Instrumenter<Request<?>, Response<?>> requestInstrumenter;
private final Instrumenter<Request<?>, Response<?>> consumerInstrumenter;
TracingRequestHandler(
Instrumenter<Request<?>, Response<?>> requestInstrumenter,
Instrumenter<Request<?>, Response<?>> consumerInstrumenter) {
this.requestInstrumenter = requestInstrumenter;
this.consumerInstrumenter = consumerInstrumenter;
}
@Override
@SuppressWarnings("deprecation") // deprecated class to be updated once published in new location
public void beforeRequest(Request<?> request) {
Context parentContext = Context.current();
if (!requestInstrumenter.shouldStart(parentContext, request)) {
return;
}
Context context = requestInstrumenter.start(parentContext, request);
AwsXrayPropagator.getInstance().inject(context, request, HeaderSetter.INSTANCE);
request.addHandlerContext(CONTEXT, context);
}
@Override
@CanIgnoreReturnValue
public AmazonWebServiceRequest beforeMarshalling(AmazonWebServiceRequest request) {
if (SqsReceiveMessageRequestAccess.isInstance(request)) {
if (!SqsReceiveMessageRequestAccess.getAttributeNames(request)
.contains(SqsParentContext.AWS_TRACE_SYSTEM_ATTRIBUTE)) {
SqsReceiveMessageRequestAccess.withAttributeNames(
request, SqsParentContext.AWS_TRACE_SYSTEM_ATTRIBUTE);
}
}
return request;
}
@Override
public void afterResponse(Request<?> request, Response<?> response) {
if (SqsReceiveMessageRequestAccess.isInstance(request.getOriginalRequest())) {
afterConsumerResponse(request, response);
}
finish(request, response, null);
}
/** Create and close CONSUMER span for each message consumed. */
private void afterConsumerResponse(Request<?> request, Response<?> response) {
Object receiveMessageResult = response.getAwsResponse();
List<?> messages = SqsReceiveMessageResultAccess.getMessages(receiveMessageResult);
for (Object message : messages) {
createConsumerSpan(message, request, response);
}
}
private void createConsumerSpan(Object message, Request<?> request, Response<?> response) {
Context parentContext =
SqsParentContext.ofSystemAttributes(SqsMessageAccess.getAttributes(message));
Context context = consumerInstrumenter.start(parentContext, request);
consumerInstrumenter.end(context, request, response, null);
}
@Override
public void afterError(Request<?> request, Response<?> response, Exception e) {
finish(request, response, e);
}
private void finish(Request<?> request, Response<?> response, @Nullable Throwable error) {
// close outstanding "client" span
Context context = request.getHandlerContext(CONTEXT);
if (context == null) {
return;
}
request.addHandlerContext(CONTEXT, null);
requestInstrumenter.end(context, request, response, error);
}
}