Skip to content

Commit

Permalink
[exporter/kafkaexporter] feat/partition by trace (open-telemetry#29660)
Browse files Browse the repository at this point in the history
**Description:** Adds the `partition_traces_by_id` option to
configuration which defaults to false. When set to true, it sets the
[message key](https://www.geeksforgeeks.org/apache-kafka-message-keys/)
on trace messages to a hexadecimal string representing the trace ID.

**Link to tracking Issue:** open-telemetry#12318 

**Testing:** Refined current unit tests to cover new capabilities.
Config test now covers the `partition_traces_by_id` option.
`marshaler_test.go` now tests both partitioned and non-partitioned
cases.

**Documentation:** Updated the README to include the new
`partition_traced_by_id` option in the section describing optional
configuration items.
  • Loading branch information
jwafle committed Dec 12, 2023
1 parent f6e043f commit b6b6a6f
Show file tree
Hide file tree
Showing 10 changed files with 305 additions and 33 deletions.
27 changes: 27 additions & 0 deletions .chloggen/kafka-exporter-key-by-traceid.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: kafkaexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: add ability to publish kafka messages with message key of TraceID - it will allow partitioning of the kafka Topic.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [12318]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user, api]
1 change: 1 addition & 0 deletions exporter/kafkaexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ The following settings can be optionally configured:
- `zipkin_json`: the payload is serialized to Zipkin v2 JSON Span.
- The following encodings are valid *only* for **logs**.
- `raw`: if the log record body is a byte array, it is sent as is. Otherwise, it is serialized to JSON. Resource and record attributes are discarded.
- `partition_traces_by_id` (default = false): configures the exporter to include the trace ID as the message key in trace messages sent to kafka. *Please note:* this setting does not have any effect on Jaeger encoding exporters since Jaeger exporters include trace ID as the message key by default.
- `auth`
- `plain_text`
- `username`: The username to use.
Expand Down
5 changes: 5 additions & 0 deletions exporter/kafkaexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ type Config struct {
// Encoding of messages (default "otlp_proto")
Encoding string `mapstructure:"encoding"`

// PartitionTracesByID sets the message key of outgoing trace messages to the trace ID.
// Please note: does not have any effect on Jaeger encoding exporters since Jaeger exporters include
// trace ID as the message key by default.
PartitionTracesByID bool `mapstructure:"partition_traces_by_id"`

// Metadata is the namespace for metadata management properties used by the
// Client, and shared by the Producer/Consumer.
Metadata Metadata `mapstructure:"metadata"`
Expand Down
15 changes: 9 additions & 6 deletions exporter/kafkaexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,10 @@ func TestLoadConfig(t *testing.T) {
NumConsumers: 2,
QueueSize: 10,
},
Topic: "spans",
Encoding: "otlp_proto",
Brokers: []string{"foo:123", "bar:456"},
Topic: "spans",
Encoding: "otlp_proto",
PartitionTracesByID: true,
Brokers: []string{"foo:123", "bar:456"},
Authentication: kafka.Authentication{
PlainText: &kafka.PlainTextConfig{
Username: "jdoe",
Expand Down Expand Up @@ -106,9 +107,10 @@ func TestLoadConfig(t *testing.T) {
NumConsumers: 2,
QueueSize: 10,
},
Topic: "spans",
Encoding: "otlp_proto",
Brokers: []string{"foo:123", "bar:456"},
Topic: "spans",
Encoding: "otlp_proto",
PartitionTracesByID: true,
Brokers: []string{"foo:123", "bar:456"},
Authentication: kafka.Authentication{
PlainText: &kafka.PlainTextConfig{
Username: "jdoe",
Expand Down Expand Up @@ -159,6 +161,7 @@ func TestLoadConfig(t *testing.T) {
},
Topic: "spans",
Encoding: "otlp_proto",
PartitionTracesByID: true,
Brokers: []string{"foo:123", "bar:456"},
ResolveCanonicalBootstrapServersOnly: true,
Authentication: kafka.Authentication{
Expand Down
3 changes: 3 additions & 0 deletions exporter/kafkaexporter/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ require (
github.com/jaegertracing/jaeger v1.51.0
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.91.0
github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka v0.91.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchpersignal v0.90.1
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger v0.91.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/zipkin v0.91.0
github.com/openzipkin/zipkin-go v0.4.2
Expand Down Expand Up @@ -84,6 +85,8 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/corei

replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka => ../../internal/kafka

replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchpersignal => ../../pkg/batchpersignal

replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger => ../../pkg/translator/jaeger

retract (
Expand Down
5 changes: 5 additions & 0 deletions exporter/kafkaexporter/kafka_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,11 @@ func newTracesExporter(config Config, set exporter.CreateSettings, marshalers ma
if marshaler == nil {
return nil, errUnrecognizedEncoding
}
if config.PartitionTracesByID {
if keyableMarshaler, ok := marshaler.(KeyableTracesMarshaler); ok {
keyableMarshaler.Key()
}
}
producer, err := newSaramaProducer(config)
if err != nil {
return nil, err
Expand Down
224 changes: 208 additions & 16 deletions exporter/kafkaexporter/marshaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"testing"
"time"

"github.com/IBM/sarama"
zipkin "github.com/openzipkin/zipkin-go/model"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -85,6 +86,8 @@ func TestOTLPTracesJsonMarshaling(t *testing.T) {
ils := rs.ScopeSpans().At(0)
ils.SetSchemaUrl(conventions.SchemaURL)
ils.Spans().AppendEmpty()
ils.Spans().AppendEmpty()
ils.Spans().AppendEmpty()

span := ils.Spans().At(0)
span.SetKind(ptrace.SpanKindServer)
Expand All @@ -95,9 +98,27 @@ func TestOTLPTracesJsonMarshaling(t *testing.T) {
span.SetSpanID([8]byte{0, 1, 2, 3, 4, 5, 6, 7})
span.SetParentSpanID([8]byte{8, 9, 10, 11, 12, 13, 14})

span = ils.Spans().At(1)
span.SetKind(ptrace.SpanKindClient)
span.SetName("bar")
span.SetStartTimestamp(pcommon.NewTimestampFromTime(now))
span.SetEndTimestamp(pcommon.NewTimestampFromTime(now.Add(time.Second)))
span.SetTraceID([16]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16})
span.SetSpanID([8]byte{15, 16, 17, 18, 19, 20, 21})
span.SetParentSpanID([8]byte{0, 1, 2, 3, 4, 5, 6, 7})

span = ils.Spans().At(2)
span.SetKind(ptrace.SpanKindServer)
span.SetName("baz")
span.SetStartTimestamp(pcommon.NewTimestampFromTime(now))
span.SetEndTimestamp(pcommon.NewTimestampFromTime(now.Add(time.Second)))
span.SetTraceID([16]byte{17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32})
span.SetSpanID([8]byte{22, 23, 24, 25, 26, 27, 28})
span.SetParentSpanID([8]byte{29, 30, 31, 32, 33, 34, 35, 36})

// Since marshaling json is not guaranteed to be in order
// within a string, using a map to compare that the expected values are there
otlpJSON := map[string]any{
unkeyedOtlpJSON := map[string]any{
"resourceSpans": []any{
map[string]any{
"resource": map[string]any{},
Expand All @@ -115,6 +136,95 @@ func TestOTLPTracesJsonMarshaling(t *testing.T) {
"endTimeUnixNano": fmt.Sprint(now.Add(time.Second).UnixNano()),
"status": map[string]any{},
},
map[string]any{
"traceId": "0102030405060708090a0b0c0d0e0f10",
"spanId": "0f10111213141500",
"parentSpanId": "0001020304050607",
"name": "bar",
"kind": float64(ptrace.SpanKindClient),
"startTimeUnixNano": fmt.Sprint(now.UnixNano()),
"endTimeUnixNano": fmt.Sprint(now.Add(time.Second).UnixNano()),
"status": map[string]any{},
},
map[string]any{
"traceId": "1112131415161718191a1b1c1d1e1f20",
"spanId": "161718191a1b1c00",
"parentSpanId": "1d1e1f2021222324",
"name": "baz",
"kind": float64(ptrace.SpanKindServer),
"startTimeUnixNano": fmt.Sprint(now.UnixNano()),
"endTimeUnixNano": fmt.Sprint(now.Add(time.Second).UnixNano()),
"status": map[string]any{},
},
},
"schemaUrl": conventions.SchemaURL,
},
},
"schemaUrl": conventions.SchemaURL,
},
},
}

unkeyedOtlpJSONResult := make([]any, 1)
unkeyedOtlpJSONResult[0] = unkeyedOtlpJSON

keyedOtlpJSON1 := map[string]any{
"resourceSpans": []any{
map[string]any{
"resource": map[string]any{},
"scopeSpans": []any{
map[string]any{
"scope": map[string]any{},
"spans": []any{
map[string]any{
"traceId": "0102030405060708090a0b0c0d0e0f10",
"spanId": "0001020304050607",
"parentSpanId": "08090a0b0c0d0e00",
"name": "foo",
"kind": float64(ptrace.SpanKindServer),
"startTimeUnixNano": fmt.Sprint(now.UnixNano()),
"endTimeUnixNano": fmt.Sprint(now.Add(time.Second).UnixNano()),
"status": map[string]any{},
},
map[string]any{
"traceId": "0102030405060708090a0b0c0d0e0f10",
"spanId": "0f10111213141500",
"parentSpanId": "0001020304050607",
"name": "bar",
"kind": float64(ptrace.SpanKindClient),
"startTimeUnixNano": fmt.Sprint(now.UnixNano()),
"endTimeUnixNano": fmt.Sprint(now.Add(time.Second).UnixNano()),
"status": map[string]any{},
},
},
"schemaUrl": conventions.SchemaURL,
},
},
"schemaUrl": conventions.SchemaURL,
},
},
}

unkeyedMessageKey := []sarama.Encoder{nil}

keyedOtlpJSON2 := map[string]any{
"resourceSpans": []any{
map[string]any{
"resource": map[string]any{},
"scopeSpans": []any{
map[string]any{
"scope": map[string]any{},
"spans": []any{
map[string]any{
"traceId": "1112131415161718191a1b1c1d1e1f20",
"spanId": "161718191a1b1c00",
"parentSpanId": "1d1e1f2021222324",
"name": "baz",
"kind": float64(ptrace.SpanKindServer),
"startTimeUnixNano": fmt.Sprint(now.UnixNano()),
"endTimeUnixNano": fmt.Sprint(now.Add(time.Second).UnixNano()),
"status": map[string]any{},
},
},
"schemaUrl": conventions.SchemaURL,
},
Expand All @@ -124,7 +234,13 @@ func TestOTLPTracesJsonMarshaling(t *testing.T) {
},
}

zipkinJSON := []any{
keyedOtlpJSONResult := make([]any, 2)
keyedOtlpJSONResult[0] = keyedOtlpJSON1
keyedOtlpJSONResult[1] = keyedOtlpJSON2

keyedMessageKey := []sarama.Encoder{sarama.ByteEncoder("0102030405060708090a0b0c0d0e0f10"), sarama.ByteEncoder("1112131415161718191a1b1c1d1e1f20")}

unkeyedZipkinJSON := []any{
map[string]any{
"traceId": "0102030405060708090a0b0c0d0e0f10",
"id": "0001020304050607",
Expand All @@ -135,35 +251,111 @@ func TestOTLPTracesJsonMarshaling(t *testing.T) {
"kind": string(zipkin.Server),
"localEndpoint": map[string]any{"serviceName": "otlpresourcenoservicename"},
},
map[string]any{
"traceId": "0102030405060708090a0b0c0d0e0f10",
"id": "0f10111213141500",
"parentId": "0001020304050607",
"name": "bar",
"timestamp": float64(time.Second.Microseconds()),
"duration": float64(time.Second.Microseconds()),
"kind": string(zipkin.Client),
"localEndpoint": map[string]any{"serviceName": "otlpresourcenoservicename"},
},
map[string]any{
"traceId": "1112131415161718191a1b1c1d1e1f20",
"id": "161718191a1b1c00",
"parentId": "1d1e1f2021222324",
"name": "baz",
"timestamp": float64(time.Second.Microseconds()),
"duration": float64(time.Second.Microseconds()),
"kind": string(zipkin.Server),
"localEndpoint": map[string]any{"serviceName": "otlpresourcenoservicename"},
},
}

unkeyedZipkinJSONResult := make([]any, 1)
unkeyedZipkinJSONResult[0] = unkeyedZipkinJSON

keyedZipkinJSON1 := []any{
map[string]any{
"traceId": "0102030405060708090a0b0c0d0e0f10",
"id": "0001020304050607",
"parentId": "08090a0b0c0d0e00",
"name": "foo",
"timestamp": float64(time.Second.Microseconds()),
"duration": float64(time.Second.Microseconds()),
"kind": string(zipkin.Server),
"localEndpoint": map[string]any{"serviceName": "otlpresourcenoservicename"},
},
map[string]any{
"traceId": "0102030405060708090a0b0c0d0e0f10",
"id": "0f10111213141500",
"parentId": "0001020304050607",
"name": "bar",
"timestamp": float64(time.Second.Microseconds()),
"duration": float64(time.Second.Microseconds()),
"kind": string(zipkin.Client),
"localEndpoint": map[string]any{"serviceName": "otlpresourcenoservicename"},
},
}

keyedZipkinJSON2 := []any{
map[string]any{
"traceId": "1112131415161718191a1b1c1d1e1f20",
"id": "161718191a1b1c00",
"parentId": "1d1e1f2021222324",
"name": "baz",
"timestamp": float64(time.Second.Microseconds()),
"duration": float64(time.Second.Microseconds()),
"kind": string(zipkin.Server),
"localEndpoint": map[string]any{"serviceName": "otlpresourcenoservicename"},
},
}

keyedZipkinJSONResult := make([]any, 2)
keyedZipkinJSONResult[0] = keyedZipkinJSON1
keyedZipkinJSONResult[1] = keyedZipkinJSON2

tests := []struct {
encoding string
expectedJSON any
unmarshaled any
encoding string
keyed bool
numExpectedMessages int
expectedJSON []any
expectedMessageKey []sarama.Encoder
unmarshaled any
}{
{encoding: "otlp_json", expectedJSON: otlpJSON, unmarshaled: map[string]any{}},
{encoding: "zipkin_json", expectedJSON: zipkinJSON, unmarshaled: []map[string]any{}},
{encoding: "otlp_json", numExpectedMessages: 1, expectedJSON: unkeyedOtlpJSONResult, expectedMessageKey: unkeyedMessageKey, unmarshaled: map[string]any{}},
{encoding: "otlp_json", keyed: true, numExpectedMessages: 2, expectedJSON: keyedOtlpJSONResult, expectedMessageKey: keyedMessageKey, unmarshaled: map[string]any{}},
{encoding: "zipkin_json", numExpectedMessages: 1, expectedJSON: unkeyedZipkinJSONResult, expectedMessageKey: unkeyedMessageKey, unmarshaled: []map[string]any{}},
{encoding: "zipkin_json", keyed: true, numExpectedMessages: 2, expectedJSON: keyedZipkinJSONResult, expectedMessageKey: keyedMessageKey, unmarshaled: []map[string]any{}},
}

for _, test := range tests {

marshaler, ok := tracesMarshalers()[test.encoding]
require.True(t, ok, fmt.Sprintf("Must have %s marshaller", test.encoding))

if test.keyed {
keyableMarshaler, ok := marshaler.(KeyableTracesMarshaler)
require.True(t, ok, "Must be a KeyableTracesMarshaler")
keyableMarshaler.Key()
}

msg, err := marshaler.Marshal(traces, t.Name())
require.NoError(t, err, "Must have marshaled the data without error")
require.Len(t, msg, 1, "Must have one entry in the message")

data, err := msg[0].Value.Encode()
require.NoError(t, err, "Must not error when encoding value")
require.NotNil(t, data, "Must have valid data to test")
require.Len(t, msg, test.numExpectedMessages, "Expected number of messages in the message")

unmarshaled := test.unmarshaled
err = json.Unmarshal(data, &unmarshaled)
require.NoError(t, err, "Must not error marshaling expected data")
for idx, singleMsg := range msg {
data, err := singleMsg.Value.Encode()
require.NoError(t, err, "Must not error when encoding value")
require.NotNil(t, data, "Must have valid data to test")

assert.Equal(t, test.expectedJSON, unmarshaled, "Must match the expected value")
unmarshaled := test.unmarshaled
err = json.Unmarshal(data, &unmarshaled)
require.NoError(t, err, "Must not error marshaling expected data")

assert.Equal(t, test.expectedJSON[idx], unmarshaled, "Must match the expected value")
assert.Equal(t, test.expectedMessageKey[idx], singleMsg.Key)
}
}
}
Loading

0 comments on commit b6b6a6f

Please sign in to comment.