Skip to content

Commit

Permalink
[receiver/mongodbatlas]: Retain actual raw log line as Body (open-tel…
Browse files Browse the repository at this point in the history
…emetry#14388)

Retain the actual raw, original log line as the Body, instead of reconstructing it and saving it as an attribute and on Body.
Remove raw attribute on log
  • Loading branch information
BinaryFissionGames committed Sep 21, 2022
1 parent 7dd2219 commit 5da6313
Show file tree
Hide file tree
Showing 7 changed files with 200 additions and 88 deletions.
20 changes: 4 additions & 16 deletions receiver/mongodbatlasreceiver/internal/model/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
package model // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/mongodbatlasreceiver/internal/model"

import (
"encoding/json"

"go.opentelemetry.io/collector/pdata/pcommon"
)

Expand All @@ -29,20 +27,8 @@ type LogEntry struct {
Context string `json:"ctx"`
Message string `json:"msg"`
Attributes map[string]interface{} `json:"attr"`
// Raw, if it is present, is the original log line. It is not a part of the payload, but transient data added during decoding.
Raw *string `json:"-"`
}

// RawLog returns a raw representation of the log entry.
// In the case of console logs, this is the actual log line.
// In the case of JSON logs, it is reconstructed (re-marshaled) after being unmarshalled
func (l LogEntry) RawLog() (string, error) {
if l.Raw != nil {
return *l.Raw, nil
}

data, err := json.Marshal(l)
return string(data), err
// Raw is the original log line. It is not a part of the payload, but transient data added during decoding.
Raw string `json:"-"`
}

// AuditLog represents a MongoDB Atlas JSON audit log entry
Expand All @@ -56,6 +42,8 @@ type AuditLog struct {
Roles []AuditRole `json:"roles"`
Result int `json:"result"`
Param map[string]any `json:"param"`
// Raw is the original log line. It is not a part of the payload, but transient data added during decoding.
Raw string `json:"-"`
}

// logTimestamp is the structure that represents a Log Timestamp
Expand Down
118 changes: 105 additions & 13 deletions receiver/mongodbatlasreceiver/log_decode_test.go

Large diffs are not rendered by default.

54 changes: 28 additions & 26 deletions receiver/mongodbatlasreceiver/log_decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ import (
"bufio"
"compress/gzip"
"encoding/json"
"errors"
"fmt"
"io"
"regexp"

Expand All @@ -35,31 +33,33 @@ func decodeLogs(logger *zap.Logger, clusterMajorVersion string, r io.Reader) ([]
return decode4_2(logger.Named("console_decoder"), r)
default:
// All other versions use JSON logging
return decodeJSON(r)
return decodeJSON(logger.Named("json_decoder"), r)
}
}

func decodeJSON(r io.Reader) ([]model.LogEntry, error) {
func decodeJSON(logger *zap.Logger, r io.Reader) ([]model.LogEntry, error) {
// Pass this into a gzip reader for decoding
reader, err := gzip.NewReader(r)
gzipReader, err := gzip.NewReader(r)
if err != nil {
return nil, err
}

// Logs are in JSON format so create a JSON decoder to process them
dec := json.NewDecoder(reader)

scanner := bufio.NewScanner(gzipReader)
var entries []model.LogEntry
for {
var entry model.LogEntry
err := dec.Decode(&entry)
if errors.Is(err, io.EOF) {
return entries, nil
if !scanner.Scan() {
// Scan failed; This might just be EOF, in which case Err will be nil, or it could be some other IO error.
return entries, scanner.Err()
}
if err != nil {
return entries, fmt.Errorf("entry could not be decoded into LogEntry: %w", err)

var entry model.LogEntry
if err := json.Unmarshal(scanner.Bytes(), &entry); err != nil {
logger.Error("Failed to parse log entry as JSON", zap.String("entry", scanner.Text()))
continue
}

entry.Raw = scanner.Text()

entries = append(entries, entry)
}
}
Expand Down Expand Up @@ -96,34 +96,36 @@ func decode4_2(logger *zap.Logger, r io.Reader) ([]model.LogEntry, error) {
Component: submatches[3],
Context: submatches[4],
Message: submatches[5],
Raw: &submatches[0],
Raw: submatches[0],
}

entries = append(entries, entry)
}
}

func decodeAuditJSON(r io.Reader) ([]model.AuditLog, error) {
func decodeAuditJSON(logger *zap.Logger, r io.Reader) ([]model.AuditLog, error) {
// Pass this into a gzip reader for decoding
reader, err := gzip.NewReader(r)
gzipReader, err := gzip.NewReader(r)
if err != nil {
return nil, err
}

// Logs are in JSON format so create a JSON decoder to process them
dec := json.NewDecoder(reader)

scanner := bufio.NewScanner(gzipReader)
var entries []model.AuditLog
for {
var entry model.AuditLog
err := dec.Decode(&entry)
if errors.Is(err, io.EOF) {
return entries, nil
if !scanner.Scan() {
// Scan failed; This might just be EOF, in which case Err will be nil, or it could be some other IO error.
return entries, scanner.Err()
}
if err != nil {
return entries, fmt.Errorf("entry could not be decoded into AuditLog: %w", err)

var entry model.AuditLog
if err := json.Unmarshal(scanner.Bytes(), &entry); err != nil {
logger.Error("Failed to parse audit log entry as JSON", zap.String("entry", scanner.Text()))
continue
}

entry.Raw = scanner.Text()

entries = append(entries, entry)
}
}
2 changes: 1 addition & 1 deletion receiver/mongodbatlasreceiver/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ func (s *logsReceiver) getHostAuditLogs(groupID, hostname, logName string) ([]mo
return nil, err
}

return decodeAuditJSON(buf)
return decodeAuditJSON(s.log, buf)
}

func (s *logsReceiver) collectLogs(pc ProjectContext, hostname, logName, clusterName, clusterMajorVersion string) {
Expand Down
17 changes: 3 additions & 14 deletions receiver/mongodbatlasreceiver/mongodb_event_to_logdata.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package mongodbatlasreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/mongodbatlasreceiver"

import (
"encoding/json"
"time"

"go.opentelemetry.io/collector/pdata/pcommon"
Expand All @@ -27,7 +26,7 @@ import (

const (
// Number of log attributes to add to the plog.LogRecordSlice for host logs.
totalLogAttributes = 11
totalLogAttributes = 10
// Number of log attributes to add to the plog.LogRecordSlice for audit logs.
totalAuditLogAttributes = 16

Expand Down Expand Up @@ -70,10 +69,6 @@ func mongodbAuditEventToLogData(logger *zap.Logger, logs []model.AuditLog, pc Pr

for _, log := range logs {
lr := sl.LogRecords().AppendEmpty()
data, err := json.Marshal(log)
if err != nil {
logger.Warn("failed to marshal", zap.Error(err))
}

logTsFormat := tsLayout(clusterMajorVersion)
t, err := time.Parse(logTsFormat, log.Timestamp.Date)
Expand All @@ -84,7 +79,7 @@ func mongodbAuditEventToLogData(logger *zap.Logger, logs []model.AuditLog, pc Pr
lr.SetTimestamp(pcommon.NewTimestampFromTime(t))
lr.SetObservedTimestamp(pcommon.NewTimestampFromTime(time.Now()))
// Insert Raw Log message into Body of LogRecord
lr.Body().SetStringVal(string(data))
lr.Body().SetStringVal(log.Raw)
// Since Audit Logs don't have a severity/level
// Set the "SeverityNumber" and "SeverityText" to INFO
lr.SetSeverityNumber(plog.SeverityNumberInfo)
Expand Down Expand Up @@ -173,11 +168,6 @@ func mongodbEventToLogData(logger *zap.Logger, logs []model.LogEntry, pc Project
for _, log := range logs {
lr := sl.LogRecords().AppendEmpty()

rawLog, err := log.RawLog()
if err != nil {
logger.Warn("Failed to determine raw log", zap.Error(err))
}

t, err := time.Parse(logTsFormat, log.Timestamp.Date)
if err != nil {
logger.Warn("Time failed to parse correctly", zap.Error(err))
Expand All @@ -186,7 +176,7 @@ func mongodbEventToLogData(logger *zap.Logger, logs []model.LogEntry, pc Project
lr.SetTimestamp(pcommon.NewTimestampFromTime(t))
lr.SetObservedTimestamp(pcommon.NewTimestampFromTime(time.Now()))
// Insert Raw Log message into Body of LogRecord
lr.Body().SetStringVal(rawLog)
lr.Body().SetStringVal(log.Raw)
// Set the "SeverityNumber" and "SeverityText" if a known type of
// severity is found.
if severityNumber, ok := severityMap[log.Severity]; ok {
Expand All @@ -206,7 +196,6 @@ func mongodbEventToLogData(logger *zap.Logger, logs []model.LogEntry, pc Project
attrs.PutInt("id", log.ID)
}
attrs.PutString("log_name", logName)
attrs.PutString("raw", rawLog)
}

return ld
Expand Down
66 changes: 48 additions & 18 deletions receiver/mongodbatlasreceiver/mongodb_event_to_logdata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,22 +35,35 @@ func TestMongoeventToLogData4_4(t *testing.T) {
Project: mongodbatlas.Project{Name: "Project"},
}

ld := mongodbEventToLogData(zap.NewNop(), []model.LogEntry{mongoevent}, pc, "hostname", "clusterName", "logName", "4.4")
ld := mongodbEventToLogData(zap.NewNop(), []model.LogEntry{mongoevent}, pc, "hostname", "logName", "clusterName", "4.4")
rl := ld.ResourceLogs().At(0)
resourceAttrs := rl.Resource().Attributes()
sl := rl.ScopeLogs().At(0)
lr := sl.LogRecords().At(0)
attrs := lr.Attributes()
assert.Equal(t, ld.ResourceLogs().Len(), 1)
assert.Equal(t, resourceAttrs.Len(), 4)
assert.Equal(t, attrs.Len(), 9)
assert.Equal(t, pcommon.Timestamp(1663006227215000000), lr.Timestamp())
_, exists := attrs.Get("id")
assert.True(t, exists, "expected attribute id to exist, but it didn't")

// Count attribute will not be present in the LogData
ld = mongodbEventToLogData(zap.NewNop(), []model.LogEntry{mongoevent}, pc, "hostname", "clusterName", "logName", "4.4")
assert.Equal(t, ld.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Attributes().Len(), 9)
assert.Equal(t, 1, ld.ResourceLogs().Len())
assert.Equal(t, 4, resourceAttrs.Len())
assertString(t, resourceAttrs, "mongodb_atlas.org", "Org")
assertString(t, resourceAttrs, "mongodb_atlas.project", "Project")
assertString(t, resourceAttrs, "mongodb_atlas.cluster", "clusterName")
assertString(t, resourceAttrs, "mongodb_atlas.host.name", "hostname")

t.Logf("%+v", attrs.AsRaw())
assert.Equal(t, 8, attrs.Len())
assertInt(t, attrs, "id", 12312)
assertString(t, attrs, "message", "Connection ended")
assertString(t, attrs, "component", "NETWORK")
assertString(t, attrs, "context", "context")
assertString(t, attrs, "log_name", "logName")
assertString(t, attrs, "remote", "192.168.253.105:59742")
assertInt(t, attrs, "connectionCount", 47)
assertInt(t, attrs, "connectionId", 9052)

assert.Equal(t, pcommon.Timestamp(1663006227215000000), lr.Timestamp())
assert.Equal(t, "RAW MESSAGE", lr.Body().StringVal())
assert.Equal(t, "I", lr.SeverityText())
assert.Equal(t, plog.SeverityNumberInfo, lr.SeverityNumber())
}

func TestMongoeventToLogData4_2(t *testing.T) {
Expand All @@ -60,18 +73,33 @@ func TestMongoeventToLogData4_2(t *testing.T) {
Project: mongodbatlas.Project{Name: "Project"},
}

ld := mongodbEventToLogData(zaptest.NewLogger(t), []model.LogEntry{mongoevent}, pc, "hostname", "clusterName", "logName", "4.2")
ld := mongodbEventToLogData(zaptest.NewLogger(t), []model.LogEntry{mongoevent}, pc, "hostname", "logName", "clusterName", "4.2")
rl := ld.ResourceLogs().At(0)
resourceAttrs := rl.Resource().Attributes()
sl := rl.ScopeLogs().At(0)
lr := sl.LogRecords().At(0)
attrs := lr.Attributes()
assert.Equal(t, ld.ResourceLogs().Len(), 1)
assert.Equal(t, resourceAttrs.Len(), 4)
assert.Equal(t, attrs.Len(), 5)

assert.Equal(t, 1, ld.ResourceLogs().Len())
assert.Equal(t, 4, resourceAttrs.Len())
assertString(t, resourceAttrs, "mongodb_atlas.org", "Org")
assertString(t, resourceAttrs, "mongodb_atlas.project", "Project")
assertString(t, resourceAttrs, "mongodb_atlas.cluster", "clusterName")
assertString(t, resourceAttrs, "mongodb_atlas.host.name", "hostname")

assert.Equal(t, 4, attrs.Len())
assertString(t, attrs, "message", "Connection ended")
assertString(t, attrs, "component", "NETWORK")
assertString(t, attrs, "context", "context")
assertString(t, attrs, "log_name", "logName")

assert.Equal(t, pcommon.Timestamp(1663004293902000000), lr.Timestamp())
_, exists := attrs.Get("id")
assert.False(t, exists, "expected attribute id to not exist, but it did")

assert.Equal(t, "RAW MESSAGE", lr.Body().StringVal())
assert.Equal(t, "I", lr.SeverityText())
assert.Equal(t, plog.SeverityNumberInfo, lr.SeverityNumber())
}

func TestUnknownSeverity(t *testing.T) {
Expand Down Expand Up @@ -143,8 +171,7 @@ func TestMongoEventToAuditLogData5_0(t *testing.T) {
assert.Equal(t, pcommon.Timestamp(1663342012563000000), lr.Timestamp())
assert.Equal(t, plog.SeverityNumberInfo, lr.SeverityNumber())
assert.Equal(t, "INFO", lr.SeverityText())
assert.Equal(t, `{"atype":"authenticate","ts":{"$date":"2022-09-16T15:26:52.563+00:00"},"uuid":{"$binary":"binary","$type":"type"},"local":{"ip":"0.0.0.0","port":3000,"isSystemUser":true,"unix":"/var/run/mongodb/mongodb-27017.sock"},"remote":{"ip":"192.168.1.237","port":4000},"users":[{"user":"mongo_user","db":"my_db"}],"roles":[{"role":"test_role","db":"test_db"}],"result":40,"param":{"db":"db","mechanism":"mechanism","user":"name"}}`,
lr.Body().StringVal())
assert.Equal(t, "RAW MESSAGE", lr.Body().StringVal())
}

func TestMongoEventToAuditLogData4_2(t *testing.T) {
Expand Down Expand Up @@ -197,8 +224,7 @@ func TestMongoEventToAuditLogData4_2(t *testing.T) {
assert.Equal(t, pcommon.Timestamp(1663342012563000000), lr.Timestamp())
assert.Equal(t, plog.SeverityNumberInfo, lr.SeverityNumber())
assert.Equal(t, "INFO", lr.SeverityText())
assert.Equal(t, `{"atype":"authenticate","ts":{"$date":"2022-09-16T15:26:52.563+0000"},"local":{"ip":"0.0.0.0","port":3000},"remote":{"ip":"192.168.1.237","port":4000},"users":[{"user":"mongo_user","db":"my_db"}],"roles":[{"role":"test_role","db":"test_db"}],"result":40,"param":{"db":"db","mechanism":"mechanism","user":"name"}}`,
lr.Body().StringVal())
assert.Equal(t, "RAW MESSAGE", lr.Body().StringVal())
}

func GetTestEvent4_4() model.LogEntry {
Expand All @@ -212,6 +238,7 @@ func GetTestEvent4_4() model.LogEntry {
Context: "context",
Message: "Connection ended",
Attributes: map[string]interface{}{"connectionCount": 47, "connectionId": 9052, "remote": "192.168.253.105:59742", "id": "93a8f190-afd0-422d-9de6-f6c5e833e35f"},
Raw: "RAW MESSAGE",
}
}

Expand All @@ -224,6 +251,7 @@ func GetTestEvent4_2() model.LogEntry {
Timestamp: model.LogTimestamp{
Date: "2022-09-12T17:38:13.902+0000",
},
Raw: "RAW MESSAGE",
}
}

Expand Down Expand Up @@ -265,6 +293,7 @@ func GetTestAuditEvent5_0() model.AuditLog {
"db": "db",
"mechanism": "mechanism",
},
Raw: "RAW MESSAGE",
}
}

Expand Down Expand Up @@ -300,6 +329,7 @@ func GetTestAuditEvent4_2() model.AuditLog {
"db": "db",
"mechanism": "mechanism",
},
Raw: "RAW MESSAGE",
}
}

Expand Down
11 changes: 11 additions & 0 deletions unreleased/mongodb-atlas-actual-raw-logs.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: "breaking"

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: mongodbatlasreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "Retain actual raw log line as Body. The `raw` attribute is now removed. Use Body instead for the raw log line."

# One or more tracking issues related to the change
issues: [14178]

0 comments on commit 5da6313

Please sign in to comment.