Skip to content

Commit

Permalink
[receiver/kafka] Add JSON-encoded log support (open-telemetry#24028)
Browse files Browse the repository at this point in the history
Added support for json encoded logs to the `kafkareceiver`
  • Loading branch information
Caleb-Hurshman committed Jul 10, 2023
1 parent 0ae260c commit 0e4cb4a
Show file tree
Hide file tree
Showing 7 changed files with 103 additions and 6 deletions.
20 changes: 20 additions & 0 deletions .chloggen/feat_kafka-json-logs.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Use this changelog template to create an entry for release notes.
# If your change doesn't affect end users, such as a test fix or a tooling change,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: kafkareceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Added support for json-encoded logs for the kafkareceiver

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [20734]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
10 changes: 5 additions & 5 deletions receiver/kafkareceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ The following settings can be optionally configured:
- `zipkin_thrift`: the payload is deserialized into a list of Zipkin Thrift spans.
- `raw`: (logs only) the payload's bytes are inserted as the body of a log record.
- `text`: (logs only) the payload are decoded as text and inserted as the body of a log record. By default, it uses UTF-8 to decode. You can use `text_<ENCODING>`, like `text_utf-8`, `text_shift_jis`, etc., to customize this behavior.
- `group_id` (default = otel-collector): The consumer group that receiver will be consuming messages from
- `json`: (logs only) the payload is decoded as JSON and inserted as the body of a log record.
- `group_id` (default = otel-collector): The consumer group that receiver will be consuming messages from
- `client_id` (default = otel-collector): The consumer client ID that receiver will use
- `initial_offset` (default = latest): The initial offset to use if no offset was previously committed. Must be `latest` or `earliest`.
- `auth`
Expand All @@ -58,7 +59,7 @@ The following settings can be optionally configured:
- `kerberos`
- `service_name`: Kerberos service name
- `realm`: Kerberos realm
- `use_keytab`: Use of keytab instead of password, if this is true, keytab file will be used instead of password
- `use_keytab`: Use of keytab instead of password, if this is true, keytab file will be used instead of password
- `username`: The Kerberos username used for authenticate with KDC
- `password`: The Kerberos password used for authenticate with KDC
- `config_file`: Path to Kerberos configuration. i.e /etc/krb5.conf
Expand All @@ -74,9 +75,9 @@ The following settings can be optionally configured:
- `enable`: (default = true) Whether or not to auto-commit updated offsets back to the broker
- `interval`: (default = 1s) How frequently to commit updated offsets. Ineffective unless auto-commit is enabled
- `message_marking`:
- `after`: (default = false) If true, the messages are marked after the pipeline execution
- `after`: (default = false) If true, the messages are marked after the pipeline execution
- `on_error`: (default = false) If false, only the successfully processed messages are marked
**Note: this can block the entire partition in case a message processing returns a permanent error**
**Note: this can block the entire partition in case a message processing returns a permanent error**

Example:

Expand All @@ -85,4 +86,3 @@ receivers:
kafka:
protocol_version: 2.0.0
```

2 changes: 1 addition & 1 deletion receiver/kafkareceiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
github.com/apache/thrift v0.18.1
github.com/gogo/protobuf v1.3.2
github.com/jaegertracing/jaeger v1.41.0
github.com/json-iterator/go v1.1.12
github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter v0.81.0
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.81.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger v0.81.0
Expand Down Expand Up @@ -44,7 +45,6 @@ require (
github.com/jcmturner/gokrb5/v8 v8.4.3 // indirect
github.com/jcmturner/rpc/v2 v2.0.3 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.16.7 // indirect
github.com/knadh/koanf v1.5.0 // indirect
github.com/knadh/koanf/v2 v2.0.1 // indirect
Expand Down
43 changes: 43 additions & 0 deletions receiver/kafkareceiver/json_unmarshaler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package kafkareceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkareceiver"
import (
"time"

jsoniter "github.com/json-iterator/go"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
)

type jsonLogsUnmarshaler struct {
}

func newJSONLogsUnmarshaler() LogsUnmarshaler {
return &jsonLogsUnmarshaler{}
}

func (r *jsonLogsUnmarshaler) Unmarshal(buf []byte) (plog.Logs, error) {
// create a new Logs struct to be populated with log data and returned
p := plog.NewLogs()

// get json logs from the buffer
jsonVal := map[string]interface{}{}
if err := jsoniter.Unmarshal(buf, &jsonVal); err != nil {
return p, err
}

// create a new log record
logRecords := p.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty()
logRecords.SetObservedTimestamp(pcommon.NewTimestampFromTime(time.Now()))

// Set the unmarshaled jsonVal as the body of the log record
if err := logRecords.Body().SetEmptyMap().FromRaw(jsonVal); err != nil {
return p, err
}
return p, nil
}

func (r *jsonLogsUnmarshaler) Encoding() string {
return "json"
}
31 changes: 31 additions & 0 deletions receiver/kafkareceiver/json_unmarshaler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package kafkareceiver

import (
"testing"

"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/pdata/plog"
)

func TestNewJSONUnmarshaler(t *testing.T) {
t.Parallel()
um := newJSONLogsUnmarshaler()
assert.Equal(t, "json", um.Encoding())
}

func TestPlogReturnType(t *testing.T) {
t.Parallel()
um := newJSONLogsUnmarshaler()
json := `{"example": "example valid json to test that the unmarshaler is correctly returning a plog value"}`

unmarshaledJSON, err := um.Unmarshal([]byte(json))

assert.NoError(t, err)
assert.Nil(t, err)

var expectedType plog.Logs
assert.IsType(t, expectedType, unmarshaledJSON)
}
2 changes: 2 additions & 0 deletions receiver/kafkareceiver/unmarshaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,11 @@ func defaultLogsUnmarshalers() map[string]LogsUnmarshaler {
otlpPb := newPdataLogsUnmarshaler(&plog.ProtoUnmarshaler{}, defaultEncoding)
raw := newRawLogsUnmarshaler()
text := newTextLogsUnmarshaler()
json := newJSONLogsUnmarshaler()
return map[string]LogsUnmarshaler{
otlpPb.Encoding(): otlpPb,
raw.Encoding(): raw,
text.Encoding(): text,
json.Encoding(): json,
}
}
1 change: 1 addition & 0 deletions receiver/kafkareceiver/unmarshaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ func TestDefaultLogsUnMarshaler(t *testing.T) {
"otlp_proto",
"raw",
"text",
"json",
}
marshalers := defaultLogsUnmarshalers()
assert.Equal(t, len(expectedEncodings), len(marshalers))
Expand Down

0 comments on commit 0e4cb4a

Please sign in to comment.