From 96a0a37ff8f14f0367c36d7410cef66963962cbd Mon Sep 17 00:00:00 2001 From: Brandon Johnson Date: Mon, 12 Sep 2022 15:48:20 -0400 Subject: [PATCH 1/7] fix invalid parsing of v4.2 logs --- .../internal/model/logs.go | 17 +- receiver/mongodbatlasreceiver/log_decoder.go | 124 ++++++++++ .../mongodbatlasreceiver/log_decoder_test.go | 223 ++++++++++++++++++ receiver/mongodbatlasreceiver/logs.go | 38 +-- .../mongodb_event_to_logdata.go | 38 ++- .../mongodb_event_to_logdata_test.go | 60 ++++- .../testdata/logs/sample-payloads/4.2.log | 3 + .../logs/sample-payloads/4.2_invalid_log.log | 3 + .../testdata/logs/sample-payloads/5.0.log | 3 + .../logs/sample-payloads/5.0_invalid_log.log | 3 + unreleased/mongodb-atlas-4_2-logging.yaml | 11 + 11 files changed, 476 insertions(+), 47 deletions(-) create mode 100644 receiver/mongodbatlasreceiver/log_decoder.go create mode 100644 receiver/mongodbatlasreceiver/log_decoder_test.go create mode 100644 receiver/mongodbatlasreceiver/testdata/logs/sample-payloads/4.2.log create mode 100644 receiver/mongodbatlasreceiver/testdata/logs/sample-payloads/4.2_invalid_log.log create mode 100644 receiver/mongodbatlasreceiver/testdata/logs/sample-payloads/5.0.log create mode 100644 receiver/mongodbatlasreceiver/testdata/logs/sample-payloads/5.0_invalid_log.log create mode 100755 unreleased/mongodb-atlas-4_2-logging.yaml diff --git a/receiver/mongodbatlasreceiver/internal/model/logs.go b/receiver/mongodbatlasreceiver/internal/model/logs.go index f396be80a37e3..efbc7f399524b 100644 --- a/receiver/mongodbatlasreceiver/internal/model/logs.go +++ b/receiver/mongodbatlasreceiver/internal/model/logs.go @@ -14,7 +14,9 @@ package model // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/mongodbatlasreceiver/internal/model" -// LogEntry represents a MongoDB Atlas JSON log entry +import ( + "encoding/json" +) // LogEntry represents a MongoDB Atlas JSON log entry type LogEntry struct { Timestamp LogTimestamp `json:"t"` Severity string `json:"s"` @@ -23,6 +25,19 @@ type LogEntry struct { Context string `json:"ctx"` Message string `json:"msg"` Attributes map[string]interface{} `json:"attr"` + Raw *string `json:"-"` +} + +// RawLog returns a raw representation of the log entry. +// In the case of console logs, this is the actual log line. +// In the case of JSON logs, it is reconstructed (re-marshaled) after being unmarshalled +func (l LogEntry) RawLog() (string, error) { + if l.Raw != nil { + return *l.Raw, nil + } + + data, err := json.Marshal(l) + return string(data), err } // AuditLog represents a MongoDB Atlas JSON audit log entry diff --git a/receiver/mongodbatlasreceiver/log_decoder.go b/receiver/mongodbatlasreceiver/log_decoder.go new file mode 100644 index 0000000000000..7701059f5cca9 --- /dev/null +++ b/receiver/mongodbatlasreceiver/log_decoder.go @@ -0,0 +1,124 @@ +package mongodbatlasreceiver + +import ( + "bufio" + "compress/gzip" + "encoding/json" + "errors" + "fmt" + "io" + "regexp" + + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/mongodbatlasreceiver/internal/model" + "go.uber.org/zap" +) + +// logDecoder is an interface that decodes logs from an io.Reader into LogEntry structs. +type logDecoder interface { + Decode(r io.Reader) ([]model.LogEntry, error) +} + +func decoderForVersion(logger *zap.Logger, clusterMajorVersion string) logDecoder { + var decoder logDecoder + switch clusterMajorVersion { + case mongoDBMajorVersion4_2: + // 4.2 clusters use a console log format + decoder = newConsoleLogDecoder(logger.Named("consoledecoder")) + default: + // All other versions use JSON logging + decoder = newJSONLogDecoder(logger.Named("jsondecoder")) + } + + return decoder +} + +// jsonLogDecoder is a logDecoder that decodes JSON formatted mongodb logs. +// This is the format used for mongodb 4.4+ +type jsonLogDecoder struct { + logger *zap.Logger +} + +func newJSONLogDecoder(logger *zap.Logger) *jsonLogDecoder { + return &jsonLogDecoder{ + logger: logger, + } +} + +func (j *jsonLogDecoder) Decode(r io.Reader) ([]model.LogEntry, error) { + // Pass this into a gzip reader for decoding + reader, err := gzip.NewReader(r) + if err != nil { + return nil, err + } + + // Logs are in JSON format so create a JSON decoder to process them + dec := json.NewDecoder(reader) + + var entries []model.LogEntry + for { + var entry model.LogEntry + err := dec.Decode(&entry) + if errors.Is(err, io.EOF) { + return entries, nil + } + if err != nil { + return entries, fmt.Errorf("entry could not be decoded into LogEntry: %w", err) + } + + entries = append(entries, entry) + } +} + +var consoleLogRegex = regexp.MustCompile(`^(?P\S+)\s+(?P\w+)\s+(?P[\w-]+)\s+\[(?P\S+)\]\s+(?P.*)$`) + +// consoleLogDecoder is a logDecoder that decodes "console" formatted mongodb logs. +// This is the format used for mongodb 4.2 +type consoleLogDecoder struct { + logger *zap.Logger +} + +func newConsoleLogDecoder(logger *zap.Logger) *consoleLogDecoder { + return &consoleLogDecoder{ + logger: logger, + } +} + +func (c *consoleLogDecoder) Decode(r io.Reader) ([]model.LogEntry, error) { + + // Pass this into a gzip reader for decoding + gzipReader, err := gzip.NewReader(r) + if err != nil { + return nil, err + } + + scanner := bufio.NewScanner(gzipReader) + var entries []model.LogEntry + for { + if !scanner.Scan() { + // Scan failed; This might just be EOF, in which case Err will be nil, or it could be some other IO error. + return entries, scanner.Err() + } + + submatches := consoleLogRegex.FindSubmatch(scanner.Bytes()) + if submatches == nil || len(submatches) != 6 { + // Match failed for line + c.logger.Error("Entry did not match regex", zap.String("entry", scanner.Text())) + continue + } + + rawLog := string(submatches[0]) + + entry := model.LogEntry{ + Timestamp: model.LogTimestamp{ + Date: string(submatches[1]), + }, + Severity: string(submatches[2]), + Component: string(submatches[3]), + Context: string(submatches[4]), + Message: string(submatches[5]), + Raw: &rawLog, + } + + entries = append(entries, entry) + } +} diff --git a/receiver/mongodbatlasreceiver/log_decoder_test.go b/receiver/mongodbatlasreceiver/log_decoder_test.go new file mode 100644 index 0000000000000..d74a7f08b0bf2 --- /dev/null +++ b/receiver/mongodbatlasreceiver/log_decoder_test.go @@ -0,0 +1,223 @@ +package mongodbatlasreceiver + +import ( + "bytes" + "compress/gzip" + "os" + "path/filepath" + "testing" + + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/mongodbatlasreceiver/internal/model" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap/zaptest" +) + +func TestDecode4_2(t *testing.T) { + b, err := os.ReadFile(filepath.Join("testdata", "logs", "sample-payloads", "4.2.log")) + require.NoError(t, err) + + zippedBuffer := &bytes.Buffer{} + gzipWriter := gzip.NewWriter(zippedBuffer) + + _, err = gzipWriter.Write(b) + require.NoError(t, err) + require.NoError(t, gzipWriter.Close()) + + decoder := decoderForVersion(zaptest.NewLogger(t), "4.2") + + entries, err := decoder.Decode(zippedBuffer) + require.NoError(t, err) + + require.Equal(t, []model.LogEntry{ + { + Timestamp: model.LogTimestamp{ + Date: "2022-09-11T18:53:02.541+0000", + }, + Severity: "I", + Component: "NETWORK", + Context: "listener", + Message: "connection accepted from 192.168.248.5:51972 #25288 (31 connections now open)", + Raw: strp("2022-09-11T18:53:02.541+0000 I NETWORK [listener] connection accepted from 192.168.248.5:51972 #25288 (31 connections now open)"), + }, + { + Timestamp: model.LogTimestamp{ + Date: "2022-09-11T18:53:02.541+0000", + }, + Severity: "I", + Component: "NETWORK", + Context: "listener", + Message: "connection accepted from 192.168.248.5:51974 #25289 (32 connections now open)", + Raw: strp("2022-09-11T18:53:02.541+0000 I NETWORK [listener] connection accepted from 192.168.248.5:51974 #25289 (32 connections now open)"), + }, + { + Timestamp: model.LogTimestamp{ + Date: "2022-09-11T18:53:02.563+0000", + }, + Severity: "I", + Component: "NETWORK", + Context: "conn25289", + Message: `received client metadata from 192.168.248.5:51974 conn25289: { driver: { name: "mongo-go-driver", version: "v1.7.2+prerelease" }, os: { type: "linux", architecture: "amd64" }, platform: "go1.18.2", application: { name: "MongoDB Automation Agent v12.3.4.7674 (git: 4c7df3ac1d15ef3269d44aa38b17376ca00147eb)" } }`, + Raw: strp(`2022-09-11T18:53:02.563+0000 I NETWORK [conn25289] received client metadata from 192.168.248.5:51974 conn25289: { driver: { name: "mongo-go-driver", version: "v1.7.2+prerelease" }, os: { type: "linux", architecture: "amd64" }, platform: "go1.18.2", application: { name: "MongoDB Automation Agent v12.3.4.7674 (git: 4c7df3ac1d15ef3269d44aa38b17376ca00147eb)" } }`), + }, + }, entries) +} + +func TestDecode4_2InvalidLog(t *testing.T) { + b, err := os.ReadFile(filepath.Join("testdata", "logs", "sample-payloads", "4.2_invalid_log.log")) + require.NoError(t, err) + + zippedBuffer := &bytes.Buffer{} + gzipWriter := gzip.NewWriter(zippedBuffer) + + _, err = gzipWriter.Write(b) + require.NoError(t, err) + require.NoError(t, gzipWriter.Close()) + + decoder := decoderForVersion(zaptest.NewLogger(t), "4.2") + + entries, err := decoder.Decode(zippedBuffer) + require.NoError(t, err) + + require.Equal(t, []model.LogEntry{ + { + Timestamp: model.LogTimestamp{ + Date: "2022-09-11T18:53:02.541+0000", + }, + Severity: "I", + Component: "NETWORK", + Context: "listener", + Message: "connection accepted from 192.168.248.5:51972 #25288 (31 connections now open)", + Raw: strp("2022-09-11T18:53:02.541+0000 I NETWORK [listener] connection accepted from 192.168.248.5:51972 #25288 (31 connections now open)"), + }, + { + Timestamp: model.LogTimestamp{ + Date: "2022-09-11T18:53:02.563+0000", + }, + Severity: "I", + Component: "NETWORK", + Context: "conn25289", + Message: `received client metadata from 192.168.248.5:51974 conn25289: { driver: { name: "mongo-go-driver", version: "v1.7.2+prerelease" }, os: { type: "linux", architecture: "amd64" }, platform: "go1.18.2", application: { name: "MongoDB Automation Agent v12.3.4.7674 (git: 4c7df3ac1d15ef3269d44aa38b17376ca00147eb)" } }`, + Raw: strp(`2022-09-11T18:53:02.563+0000 I NETWORK [conn25289] received client metadata from 192.168.248.5:51974 conn25289: { driver: { name: "mongo-go-driver", version: "v1.7.2+prerelease" }, os: { type: "linux", architecture: "amd64" }, platform: "go1.18.2", application: { name: "MongoDB Automation Agent v12.3.4.7674 (git: 4c7df3ac1d15ef3269d44aa38b17376ca00147eb)" } }`), + }, + }, entries) +} + +func TestDecode4_2NotGzip(t *testing.T) { + decoder := decoderForVersion(zaptest.NewLogger(t), "4.2") + entries, err := decoder.Decode(bytes.NewBuffer([]byte("Not compressed log"))) + require.ErrorContains(t, err, "gzip: invalid header") + require.Nil(t, entries) +} + +func TestDecode5_0(t *testing.T) { + b, err := os.ReadFile(filepath.Join("testdata", "logs", "sample-payloads", "5.0.log")) + require.NoError(t, err) + + zippedBuffer := &bytes.Buffer{} + gzipWriter := gzip.NewWriter(zippedBuffer) + + _, err = gzipWriter.Write(b) + require.NoError(t, err) + require.NoError(t, gzipWriter.Close()) + + decoder := decoderForVersion(zaptest.NewLogger(t), "5.0") + + entries, err := decoder.Decode(zippedBuffer) + require.NoError(t, err) + + require.Equal(t, []model.LogEntry{ + { + Timestamp: model.LogTimestamp{ + Date: "2022-09-11T18:53:14.675+00:00", + }, + Severity: "I", + Component: "NETWORK", + ID: 22944, + Context: "conn35107", + Message: "Connection ended", + Attributes: map[string]interface{}{ + "remote": "192.168.248.2:52066", + "uuid": "d3f4641a-14ca-4a24-b5bb-7d7b391a02e7", + "connectionId": float64(35107), + "connectionCount": float64(33), + }, + }, + { + Timestamp: model.LogTimestamp{ + Date: "2022-09-11T18:53:14.676+00:00", + }, + Severity: "I", + Component: "NETWORK", + ID: 22944, + Context: "conn35109", + Message: "Connection ended", + Attributes: map[string]interface{}{ + "remote": "192.168.248.2:52070", + "uuid": "dcdb08ac-981d-41ea-9d6b-f85fe0475bd1", + "connectionId": float64(35109), + "connectionCount": float64(32), + }, + }, + { + Timestamp: model.LogTimestamp{ + Date: "2022-09-11T18:53:37.727+00:00", + }, + Severity: "I", + Component: "SHARDING", + ID: 20997, + Context: "conn9957", + Message: "Refreshed RWC defaults", + Attributes: map[string]interface{}{ + "newDefaults": map[string]interface{}{}, + }, + }, + }, entries) +} + +func TestDecode5_0InvalidLog(t *testing.T) { + b, err := os.ReadFile(filepath.Join("testdata", "logs", "sample-payloads", "5.0_invalid_log.log")) + require.NoError(t, err) + + zippedBuffer := &bytes.Buffer{} + gzipWriter := gzip.NewWriter(zippedBuffer) + + _, err = gzipWriter.Write(b) + require.NoError(t, err) + require.NoError(t, gzipWriter.Close()) + + decoder := decoderForVersion(zaptest.NewLogger(t), "5.0") + + entries, err := decoder.Decode(zippedBuffer) + assert.ErrorContains(t, err, "entry could not be decoded into LogEntry") + + assert.Equal(t, []model.LogEntry{ + { + Timestamp: model.LogTimestamp{ + Date: "2022-09-11T18:53:14.675+00:00", + }, + Severity: "I", + Component: "NETWORK", + ID: 22944, + Context: "conn35107", + Message: "Connection ended", + Attributes: map[string]interface{}{ + "remote": "192.168.248.2:52066", + "uuid": "d3f4641a-14ca-4a24-b5bb-7d7b391a02e7", + "connectionId": float64(35107), + "connectionCount": float64(33), + }, + }, + }, entries) +} + +func TestDecode5_0NotGzip(t *testing.T) { + decoder := decoderForVersion(zaptest.NewLogger(t), "5.0") + entries, err := decoder.Decode(bytes.NewBuffer([]byte("Not compressed log"))) + require.ErrorContains(t, err, "gzip: invalid header") + require.Nil(t, entries) +} + +func strp(s string) *string { + return &s +} diff --git a/receiver/mongodbatlasreceiver/logs.go b/receiver/mongodbatlasreceiver/logs.go index e9c4733c8c50c..52c650df751d3 100644 --- a/receiver/mongodbatlasreceiver/logs.go +++ b/receiver/mongodbatlasreceiver/logs.go @@ -34,6 +34,8 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/mongodbatlasreceiver/internal/model" ) +const mongoDBMajorVersion4_2 = "4.2" + type logsReceiver struct { log *zap.Logger cfg *Config @@ -174,8 +176,8 @@ func (s *logsReceiver) collectClusterLogs(clusters []mongodbatlas.Cluster, proje for _, cluster := range clusters { hostnames := parseHostNames(cluster.ConnectionStrings.Standard, s.log) for _, hostname := range hostnames { - s.collectLogs(pc, hostname, "mongodb.gz", cluster.Name) - s.collectLogs(pc, hostname, "mongos.gz", cluster.Name) + s.collectLogs(pc, hostname, "mongodb.gz", cluster.Name, cluster.MongoDBMajorVersion) + s.collectLogs(pc, hostname, "mongos.gz", cluster.Name, cluster.MongoDBMajorVersion) if projectCfg.EnableAuditLogs { s.collectAuditLogs(pc, hostname, "mongodb-audit-log.gz", cluster.Name) @@ -201,34 +203,14 @@ func filterClusters(clusters []mongodbatlas.Cluster, clusterNames []string, incl return filtered } -func (s *logsReceiver) getHostLogs(groupID, hostname, logName string) ([]model.LogEntry, error) { +func (s *logsReceiver) getHostLogs(groupID, hostname, logName string, decoder logDecoder) ([]model.LogEntry, error) { // Get gzip bytes buffer from API buf, err := s.client.GetLogs(context.Background(), groupID, hostname, logName, s.start, s.end) if err != nil { return nil, err } - // Pass this into a gzip reader for decoding - reader, err := gzip.NewReader(buf) - if err != nil { - return nil, err - } - // Logs are in JSON format so create a JSON decoder to process them - dec := json.NewDecoder(reader) - - var entries []model.LogEntry - for { - var entry model.LogEntry - err := dec.Decode(&entry) - if errors.Is(err, io.EOF) { - return entries, nil - } - if err != nil { - s.log.Error("Entry could not be decoded into LogEntry", zap.Error(err)) - } - - entries = append(entries, entry) - } + return decoder.Decode(buf) } func (s *logsReceiver) getHostAuditLogs(groupID, hostname, logName string) ([]model.AuditLog, error) { @@ -259,8 +241,9 @@ func (s *logsReceiver) getHostAuditLogs(groupID, hostname, logName string) ([]mo } } -func (s *logsReceiver) collectLogs(pc ProjectContext, hostname, logName, clusterName string) { - logs, err := s.getHostLogs(pc.Project.ID, hostname, logName) +func (s *logsReceiver) collectLogs(pc ProjectContext, hostname, logName, clusterName, clusterMajorVersion string) { + decoder := decoderForVersion(s.log, clusterMajorVersion) + logs, err := s.getHostLogs(pc.Project.ID, hostname, logName, decoder) if err != nil && !errors.Is(err, io.EOF) { s.log.Warn("Failed to retrieve logs from: "+logName, zap.Error(err)) } @@ -270,7 +253,8 @@ func (s *logsReceiver) collectLogs(pc ProjectContext, hostname, logName, cluster pc, hostname, logName, - clusterName) + clusterName, + clusterMajorVersion) err = s.consumer.ConsumeLogs(context.Background(), plog) if err != nil { s.log.Error("Failed to consume logs", zap.Error(err)) diff --git a/receiver/mongodbatlasreceiver/mongodb_event_to_logdata.go b/receiver/mongodbatlasreceiver/mongodb_event_to_logdata.go index eece5158f063e..b47e9496a380a 100644 --- a/receiver/mongodbatlasreceiver/mongodb_event_to_logdata.go +++ b/receiver/mongodbatlasreceiver/mongodb_event_to_logdata.go @@ -33,8 +33,9 @@ const ( totalResourceAttributes = 4 ) -// layout for the timestamp format in the plog.Logs structure -const layout = "2006-01-02T15:04:05.000-07:00" +// jsonTimestampLayout for the timestamp format in the plog.Logs structure +const jsonTimestampLayout = "2006-01-02T15:04:05.000-07:00" +const consoleTimestampLayout = "2006-01-02T15:04:05.000-0700" // Severity mapping of the mongodb atlas logs var severityMap = map[string]plog.SeverityNumber{ @@ -71,7 +72,7 @@ func mongodbAuditEventToLogData(logger *zap.Logger, logs []model.AuditLog, pc Pr if err != nil { logger.Warn("failed to marshal", zap.Error(err)) } - t, err := time.Parse(layout, log.Timestamp.Date) + t, err := time.Parse(jsonTimestampLayout, log.Timestamp.Date) if err != nil { logger.Warn("Time failed to parse correctly", zap.Error(err)) } @@ -107,7 +108,7 @@ func mongodbAuditEventToLogData(logger *zap.Logger, logs []model.AuditLog, pc Pr } // mongoEventToLogRecord converts model.LogEntry event to plog.LogRecordSlice and adds the resource attributes. -func mongodbEventToLogData(logger *zap.Logger, logs []model.LogEntry, pc ProjectContext, hostname, logName, clusterName string) plog.Logs { +func mongodbEventToLogData(logger *zap.Logger, logs []model.LogEntry, pc ProjectContext, hostname, logName, clusterName, clusterMajorVersion string) plog.Logs { ld := plog.NewLogs() rl := ld.ResourceLogs().AppendEmpty() sl := rl.ScopeLogs().AppendEmpty() @@ -121,20 +122,25 @@ func mongodbEventToLogData(logger *zap.Logger, logs []model.LogEntry, pc Project resourceAttrs.UpsertString("mongodb_atlas.cluster", clusterName) resourceAttrs.UpsertString("mongodb_atlas.host.name", hostname) + logTsFormat := tsLayout(clusterMajorVersion) + for _, log := range logs { lr := sl.LogRecords().AppendEmpty() - data, err := json.Marshal(log) + + rawLog, err := log.RawLog() if err != nil { - logger.Warn("failed to marshal", zap.Error(err)) + logger.Warn("Failed to determine raw log", zap.Error(err)) } - t, err := time.Parse(layout, log.Timestamp.Date) + + t, err := time.Parse(logTsFormat, log.Timestamp.Date) if err != nil { logger.Warn("Time failed to parse correctly", zap.Error(err)) } + lr.SetTimestamp(pcommon.NewTimestampFromTime(t)) lr.SetObservedTimestamp(pcommon.NewTimestampFromTime(time.Now())) // Insert Raw Log message into Body of LogRecord - lr.Body().SetStringVal(string(data)) + lr.Body().SetStringVal(rawLog) // Set the "SeverityNumber" and "SeverityText" if a known type of // severity is found. if severityNumber, ok := severityMap[log.Severity]; ok { @@ -149,10 +155,22 @@ func mongodbEventToLogData(logger *zap.Logger, logs []model.LogEntry, pc Project attrs.UpsertString("message", log.Message) attrs.UpsertString("component", log.Component) attrs.UpsertString("context", log.Context) - attrs.UpsertInt("id", log.ID) + // log ID is not present on MongoDB 4.2 systems + if clusterMajorVersion != mongoDBMajorVersion4_2 { + attrs.UpsertInt("id", log.ID) + } attrs.UpsertString("log_name", logName) - attrs.UpsertString("raw", string(data)) + attrs.UpsertString("raw", rawLog) } return ld } + +func tsLayout(clusterVersion string) string { + switch clusterVersion { + case mongoDBMajorVersion4_2: + return consoleTimestampLayout + default: + return jsonTimestampLayout + } +} diff --git a/receiver/mongodbatlasreceiver/mongodb_event_to_logdata_test.go b/receiver/mongodbatlasreceiver/mongodb_event_to_logdata_test.go index 6daa66810d486..9b0bc6133a22b 100644 --- a/receiver/mongodbatlasreceiver/mongodb_event_to_logdata_test.go +++ b/receiver/mongodbatlasreceiver/mongodb_event_to_logdata_test.go @@ -19,42 +19,69 @@ import ( "github.com/stretchr/testify/assert" "go.mongodb.org/atlas/mongodbatlas" + "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" "go.uber.org/zap" + "go.uber.org/zap/zaptest" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/mongodbatlasreceiver/internal/model" ) -func TestMongoeventToLogData(t *testing.T) { - mongoevent := GetTestEvent() +func TestMongoeventToLogData4_4(t *testing.T) { + mongoevent := GetTestEvent4_4() pc := ProjectContext{ orgName: "Org", Project: mongodbatlas.Project{Name: "Project"}, } - ld := mongodbEventToLogData(zap.NewNop(), []model.LogEntry{mongoevent}, pc, "hostname", "clusterName", "logName") + ld := mongodbEventToLogData(zap.NewNop(), []model.LogEntry{mongoevent}, pc, "hostname", "clusterName", "logName", "4.4") rl := ld.ResourceLogs().At(0) resourceAttrs := rl.Resource().Attributes() - lr := rl.ScopeLogs().At(0) - attrs := lr.LogRecords().At(0).Attributes() + sl := rl.ScopeLogs().At(0) + lr := sl.LogRecords().At(0) + attrs := lr.Attributes() assert.Equal(t, ld.ResourceLogs().Len(), 1) assert.Equal(t, resourceAttrs.Len(), 4) assert.Equal(t, attrs.Len(), 9) + assert.Equal(t, pcommon.Timestamp(1663006227215000000), lr.Timestamp()) + _, exists := attrs.Get("id") + assert.True(t, exists, "expected attribute id to exist, but it didn't") // Count attribute will not be present in the LogData - ld = mongodbEventToLogData(zap.NewNop(), []model.LogEntry{mongoevent}, pc, "hostname", "clusterName", "logName") + ld = mongodbEventToLogData(zap.NewNop(), []model.LogEntry{mongoevent}, pc, "hostname", "clusterName", "logName", "4.4") assert.Equal(t, ld.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Attributes().Len(), 9) } +func TestMongoeventToLogData4_2(t *testing.T) { + mongoevent := GetTestEvent4_2() + pc := ProjectContext{ + orgName: "Org", + Project: mongodbatlas.Project{Name: "Project"}, + } + + ld := mongodbEventToLogData(zaptest.NewLogger(t), []model.LogEntry{mongoevent}, pc, "hostname", "clusterName", "logName", "4.2") + rl := ld.ResourceLogs().At(0) + resourceAttrs := rl.Resource().Attributes() + sl := rl.ScopeLogs().At(0) + lr := sl.LogRecords().At(0) + attrs := lr.Attributes() + assert.Equal(t, ld.ResourceLogs().Len(), 1) + assert.Equal(t, resourceAttrs.Len(), 4) + assert.Equal(t, attrs.Len(), 5) + assert.Equal(t, pcommon.Timestamp(1663004293902000000), lr.Timestamp()) + _, exists := attrs.Get("id") + assert.False(t, exists, "expected attribute id to not exist, but it did") +} + func TestUnknownSeverity(t *testing.T) { - mongoevent := GetTestEvent() + mongoevent := GetTestEvent4_4() mongoevent.Severity = "Unknown" pc := ProjectContext{ orgName: "Org", Project: mongodbatlas.Project{Name: "Project"}, } - ld := mongodbEventToLogData(zap.NewNop(), []model.LogEntry{mongoevent}, pc, "hostname", "clusterName", "logName") + ld := mongodbEventToLogData(zap.NewNop(), []model.LogEntry{mongoevent}, pc, "hostname", "clusterName", "logName", "4.4") rl := ld.ResourceLogs().At(0) logEntry := rl.ScopeLogs().At(0).LogRecords().At(0) @@ -82,8 +109,11 @@ func TestMongoEventToAuditLogData(t *testing.T) { assert.Equal(t, 12, ld.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Attributes().Len()) } -func GetTestEvent() model.LogEntry { +func GetTestEvent4_4() model.LogEntry { return model.LogEntry{ + Timestamp: model.LogTimestamp{ + Date: "2022-09-12T18:10:27.215+00:00", + }, Severity: "I", Component: "NETWORK", ID: 12312, @@ -93,6 +123,18 @@ func GetTestEvent() model.LogEntry { } } +func GetTestEvent4_2() model.LogEntry { + return model.LogEntry{ + Severity: "I", + Component: "NETWORK", + Context: "context", + Message: "Connection ended", + Timestamp: model.LogTimestamp{ + Date: "2022-09-12T17:38:13.902+0000", + }, + } +} + func GetTestAuditEvent() model.AuditLog { return model.AuditLog{ AuthType: "authtype", diff --git a/receiver/mongodbatlasreceiver/testdata/logs/sample-payloads/4.2.log b/receiver/mongodbatlasreceiver/testdata/logs/sample-payloads/4.2.log new file mode 100644 index 0000000000000..599073312fdb1 --- /dev/null +++ b/receiver/mongodbatlasreceiver/testdata/logs/sample-payloads/4.2.log @@ -0,0 +1,3 @@ +2022-09-11T18:53:02.541+0000 I NETWORK [listener] connection accepted from 192.168.248.5:51972 #25288 (31 connections now open) +2022-09-11T18:53:02.541+0000 I NETWORK [listener] connection accepted from 192.168.248.5:51974 #25289 (32 connections now open) +2022-09-11T18:53:02.563+0000 I NETWORK [conn25289] received client metadata from 192.168.248.5:51974 conn25289: { driver: { name: "mongo-go-driver", version: "v1.7.2+prerelease" }, os: { type: "linux", architecture: "amd64" }, platform: "go1.18.2", application: { name: "MongoDB Automation Agent v12.3.4.7674 (git: 4c7df3ac1d15ef3269d44aa38b17376ca00147eb)" } } diff --git a/receiver/mongodbatlasreceiver/testdata/logs/sample-payloads/4.2_invalid_log.log b/receiver/mongodbatlasreceiver/testdata/logs/sample-payloads/4.2_invalid_log.log new file mode 100644 index 0000000000000..bb194932ae405 --- /dev/null +++ b/receiver/mongodbatlasreceiver/testdata/logs/sample-payloads/4.2_invalid_log.log @@ -0,0 +1,3 @@ +2022-09-11T18:53:02.541+0000 I NETWORK [listener] connection accepted from 192.168.248.5:51972 #25288 (31 connections now open) +2022-09-11T18:53:02.541+0000 I NETWORK connection accepted from 192.168.248.5:51974 #25289 (32 connections now open) +2022-09-11T18:53:02.563+0000 I NETWORK [conn25289] received client metadata from 192.168.248.5:51974 conn25289: { driver: { name: "mongo-go-driver", version: "v1.7.2+prerelease" }, os: { type: "linux", architecture: "amd64" }, platform: "go1.18.2", application: { name: "MongoDB Automation Agent v12.3.4.7674 (git: 4c7df3ac1d15ef3269d44aa38b17376ca00147eb)" } } diff --git a/receiver/mongodbatlasreceiver/testdata/logs/sample-payloads/5.0.log b/receiver/mongodbatlasreceiver/testdata/logs/sample-payloads/5.0.log new file mode 100644 index 0000000000000..fc7ddf11961e2 --- /dev/null +++ b/receiver/mongodbatlasreceiver/testdata/logs/sample-payloads/5.0.log @@ -0,0 +1,3 @@ +{"t":{"$date":"2022-09-11T18:53:14.675+00:00"},"s":"I", "c":"NETWORK", "id":22944, "ctx":"conn35107","msg":"Connection ended","attr":{"remote":"192.168.248.2:52066","uuid":"d3f4641a-14ca-4a24-b5bb-7d7b391a02e7","connectionId":35107,"connectionCount":33}} +{"t":{"$date":"2022-09-11T18:53:14.676+00:00"},"s":"I", "c":"NETWORK", "id":22944, "ctx":"conn35109","msg":"Connection ended","attr":{"remote":"192.168.248.2:52070","uuid":"dcdb08ac-981d-41ea-9d6b-f85fe0475bd1","connectionId":35109,"connectionCount":32}} +{"t":{"$date":"2022-09-11T18:53:37.727+00:00"},"s":"I", "c":"SHARDING", "id":20997, "ctx":"conn9957","msg":"Refreshed RWC defaults","attr":{"newDefaults":{}}} diff --git a/receiver/mongodbatlasreceiver/testdata/logs/sample-payloads/5.0_invalid_log.log b/receiver/mongodbatlasreceiver/testdata/logs/sample-payloads/5.0_invalid_log.log new file mode 100644 index 0000000000000..f23be899c9ed1 --- /dev/null +++ b/receiver/mongodbatlasreceiver/testdata/logs/sample-payloads/5.0_invalid_log.log @@ -0,0 +1,3 @@ +{"t":{"$date":"2022-09-11T18:53:14.675+00:00"},"s":"I", "c":"NETWORK", "id":22944, "ctx":"conn35107","msg":"Connection ended","attr":{"remote":"192.168.248.2:52066","uuid":"d3f4641a-14ca-4a24-b5bb-7d7b391a02e7","connectionId":35107,"connectionCount":33}} +"t":{"$date":"2022-09-11T18:53:14.676+00:00"},"s":"I", "c":"NETWORK", "id":22944, "ctx":"conn35109","msg":"Connection ended","attr":{"remote":"192.168.248.2:52070","uuid":"dcdb08ac-981d-41ea-9d6b-f85fe0475bd1","connectionId":35109,"connectionCount":32}} +{"t":{"$date":"2022-09-11T18:53:37.727+00:00"},"s":"I", "c":"SHARDING", "id":20997, "ctx":"conn9957","msg":"Refreshed RWC defaults","attr":{"newDefaults":{}}} diff --git a/unreleased/mongodb-atlas-4_2-logging.yaml b/unreleased/mongodb-atlas-4_2-logging.yaml new file mode 100755 index 0000000000000..a4c89d7b01f87 --- /dev/null +++ b/unreleased/mongodb-atlas-4_2-logging.yaml @@ -0,0 +1,11 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: mongodbatlasreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: fix log parsing for clusters using major version 4.2 + +# One or more tracking issues related to the change +issues: [14008] From e3944d54e33a9c4a2068ecf06cd7aacbf31d2a90 Mon Sep 17 00:00:00 2001 From: Brandon Johnson Date: Mon, 12 Sep 2022 15:57:47 -0400 Subject: [PATCH 2/7] de-abstract decoding --- ...log_decoder_test.go => log_decode_test.go} | 22 +++------ receiver/mongodbatlasreceiver/log_decoder.go | 46 +++---------------- receiver/mongodbatlasreceiver/logs.go | 7 ++- 3 files changed, 16 insertions(+), 59 deletions(-) rename receiver/mongodbatlasreceiver/{log_decoder_test.go => log_decode_test.go} (90%) diff --git a/receiver/mongodbatlasreceiver/log_decoder_test.go b/receiver/mongodbatlasreceiver/log_decode_test.go similarity index 90% rename from receiver/mongodbatlasreceiver/log_decoder_test.go rename to receiver/mongodbatlasreceiver/log_decode_test.go index d74a7f08b0bf2..157888bf4da70 100644 --- a/receiver/mongodbatlasreceiver/log_decoder_test.go +++ b/receiver/mongodbatlasreceiver/log_decode_test.go @@ -24,9 +24,7 @@ func TestDecode4_2(t *testing.T) { require.NoError(t, err) require.NoError(t, gzipWriter.Close()) - decoder := decoderForVersion(zaptest.NewLogger(t), "4.2") - - entries, err := decoder.Decode(zippedBuffer) + entries, err := decodeLogs(zaptest.NewLogger(t), "4.2", zippedBuffer) require.NoError(t, err) require.Equal(t, []model.LogEntry{ @@ -74,9 +72,7 @@ func TestDecode4_2InvalidLog(t *testing.T) { require.NoError(t, err) require.NoError(t, gzipWriter.Close()) - decoder := decoderForVersion(zaptest.NewLogger(t), "4.2") - - entries, err := decoder.Decode(zippedBuffer) + entries, err := decodeLogs(zaptest.NewLogger(t), "4.2", zippedBuffer) require.NoError(t, err) require.Equal(t, []model.LogEntry{ @@ -104,8 +100,7 @@ func TestDecode4_2InvalidLog(t *testing.T) { } func TestDecode4_2NotGzip(t *testing.T) { - decoder := decoderForVersion(zaptest.NewLogger(t), "4.2") - entries, err := decoder.Decode(bytes.NewBuffer([]byte("Not compressed log"))) + entries, err := decodeLogs(zaptest.NewLogger(t), "4.2", bytes.NewBuffer([]byte("Not compressed log"))) require.ErrorContains(t, err, "gzip: invalid header") require.Nil(t, entries) } @@ -121,9 +116,7 @@ func TestDecode5_0(t *testing.T) { require.NoError(t, err) require.NoError(t, gzipWriter.Close()) - decoder := decoderForVersion(zaptest.NewLogger(t), "5.0") - - entries, err := decoder.Decode(zippedBuffer) + entries, err := decodeLogs(zaptest.NewLogger(t), "5.0", zippedBuffer) require.NoError(t, err) require.Equal(t, []model.LogEntry{ @@ -186,9 +179,7 @@ func TestDecode5_0InvalidLog(t *testing.T) { require.NoError(t, err) require.NoError(t, gzipWriter.Close()) - decoder := decoderForVersion(zaptest.NewLogger(t), "5.0") - - entries, err := decoder.Decode(zippedBuffer) + entries, err := decodeLogs(zaptest.NewLogger(t), "5.0", zippedBuffer) assert.ErrorContains(t, err, "entry could not be decoded into LogEntry") assert.Equal(t, []model.LogEntry{ @@ -212,8 +203,7 @@ func TestDecode5_0InvalidLog(t *testing.T) { } func TestDecode5_0NotGzip(t *testing.T) { - decoder := decoderForVersion(zaptest.NewLogger(t), "5.0") - entries, err := decoder.Decode(bytes.NewBuffer([]byte("Not compressed log"))) + entries, err := decodeLogs(zaptest.NewLogger(t), "5.0", bytes.NewBuffer([]byte("Not compressed log"))) require.ErrorContains(t, err, "gzip: invalid header") require.Nil(t, entries) } diff --git a/receiver/mongodbatlasreceiver/log_decoder.go b/receiver/mongodbatlasreceiver/log_decoder.go index 7701059f5cca9..8dcfbc62a89f3 100644 --- a/receiver/mongodbatlasreceiver/log_decoder.go +++ b/receiver/mongodbatlasreceiver/log_decoder.go @@ -13,38 +13,18 @@ import ( "go.uber.org/zap" ) -// logDecoder is an interface that decodes logs from an io.Reader into LogEntry structs. -type logDecoder interface { - Decode(r io.Reader) ([]model.LogEntry, error) -} - -func decoderForVersion(logger *zap.Logger, clusterMajorVersion string) logDecoder { - var decoder logDecoder +func decodeLogs(logger *zap.Logger, clusterMajorVersion string, r io.Reader) ([]model.LogEntry, error) { switch clusterMajorVersion { case mongoDBMajorVersion4_2: // 4.2 clusters use a console log format - decoder = newConsoleLogDecoder(logger.Named("consoledecoder")) + return decodeConsole(logger.Named("console_decoder"), r) default: // All other versions use JSON logging - decoder = newJSONLogDecoder(logger.Named("jsondecoder")) + return decodeJSON(r) } - - return decoder } -// jsonLogDecoder is a logDecoder that decodes JSON formatted mongodb logs. -// This is the format used for mongodb 4.4+ -type jsonLogDecoder struct { - logger *zap.Logger -} - -func newJSONLogDecoder(logger *zap.Logger) *jsonLogDecoder { - return &jsonLogDecoder{ - logger: logger, - } -} - -func (j *jsonLogDecoder) Decode(r io.Reader) ([]model.LogEntry, error) { +func decodeJSON(r io.Reader) ([]model.LogEntry, error) { // Pass this into a gzip reader for decoding reader, err := gzip.NewReader(r) if err != nil { @@ -71,19 +51,7 @@ func (j *jsonLogDecoder) Decode(r io.Reader) ([]model.LogEntry, error) { var consoleLogRegex = regexp.MustCompile(`^(?P\S+)\s+(?P\w+)\s+(?P[\w-]+)\s+\[(?P\S+)\]\s+(?P.*)$`) -// consoleLogDecoder is a logDecoder that decodes "console" formatted mongodb logs. -// This is the format used for mongodb 4.2 -type consoleLogDecoder struct { - logger *zap.Logger -} - -func newConsoleLogDecoder(logger *zap.Logger) *consoleLogDecoder { - return &consoleLogDecoder{ - logger: logger, - } -} - -func (c *consoleLogDecoder) Decode(r io.Reader) ([]model.LogEntry, error) { +func decodeConsole(logger *zap.Logger, r io.Reader) ([]model.LogEntry, error) { // Pass this into a gzip reader for decoding gzipReader, err := gzip.NewReader(r) @@ -101,8 +69,8 @@ func (c *consoleLogDecoder) Decode(r io.Reader) ([]model.LogEntry, error) { submatches := consoleLogRegex.FindSubmatch(scanner.Bytes()) if submatches == nil || len(submatches) != 6 { - // Match failed for line - c.logger.Error("Entry did not match regex", zap.String("entry", scanner.Text())) + // Match failed for line; We will skip this line and continue processing others. + logger.Error("Entry did not match regex", zap.String("entry", scanner.Text())) continue } diff --git a/receiver/mongodbatlasreceiver/logs.go b/receiver/mongodbatlasreceiver/logs.go index 52c650df751d3..2bb61574cbdc2 100644 --- a/receiver/mongodbatlasreceiver/logs.go +++ b/receiver/mongodbatlasreceiver/logs.go @@ -203,14 +203,14 @@ func filterClusters(clusters []mongodbatlas.Cluster, clusterNames []string, incl return filtered } -func (s *logsReceiver) getHostLogs(groupID, hostname, logName string, decoder logDecoder) ([]model.LogEntry, error) { +func (s *logsReceiver) getHostLogs(groupID, hostname, logName string, clusterMajorVersion string) ([]model.LogEntry, error) { // Get gzip bytes buffer from API buf, err := s.client.GetLogs(context.Background(), groupID, hostname, logName, s.start, s.end) if err != nil { return nil, err } - return decoder.Decode(buf) + return decodeLogs(s.log, clusterMajorVersion, buf) } func (s *logsReceiver) getHostAuditLogs(groupID, hostname, logName string) ([]model.AuditLog, error) { @@ -242,8 +242,7 @@ func (s *logsReceiver) getHostAuditLogs(groupID, hostname, logName string) ([]mo } func (s *logsReceiver) collectLogs(pc ProjectContext, hostname, logName, clusterName, clusterMajorVersion string) { - decoder := decoderForVersion(s.log, clusterMajorVersion) - logs, err := s.getHostLogs(pc.Project.ID, hostname, logName, decoder) + logs, err := s.getHostLogs(pc.Project.ID, hostname, logName, clusterMajorVersion) if err != nil && !errors.Is(err, io.EOF) { s.log.Warn("Failed to retrieve logs from: "+logName, zap.Error(err)) } From 588b2fd5f8a3e45ae63bcb0cfb475e6de6678aaa Mon Sep 17 00:00:00 2001 From: Brandon Johnson Date: Mon, 12 Sep 2022 16:00:27 -0400 Subject: [PATCH 3/7] fix comments in log model --- receiver/mongodbatlasreceiver/internal/model/logs.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/receiver/mongodbatlasreceiver/internal/model/logs.go b/receiver/mongodbatlasreceiver/internal/model/logs.go index efbc7f399524b..9793f0abbe475 100644 --- a/receiver/mongodbatlasreceiver/internal/model/logs.go +++ b/receiver/mongodbatlasreceiver/internal/model/logs.go @@ -16,7 +16,9 @@ package model // import "github.com/open-telemetry/opentelemetry-collector-contr import ( "encoding/json" -) // LogEntry represents a MongoDB Atlas JSON log entry +) + +// LogEntry represents a MongoDB Atlas JSON log entry type LogEntry struct { Timestamp LogTimestamp `json:"t"` Severity string `json:"s"` @@ -25,7 +27,8 @@ type LogEntry struct { Context string `json:"ctx"` Message string `json:"msg"` Attributes map[string]interface{} `json:"attr"` - Raw *string `json:"-"` + // Raw, if it is present, is the original log line. It is not a part of the payload, but transient data added during decoding. + Raw *string `json:"-"` } // RawLog returns a raw representation of the log entry. From 68ad38d0d4c9edd90e58fedebb9b3f0125c76e36 Mon Sep 17 00:00:00 2001 From: Brandon Johnson Date: Mon, 12 Sep 2022 17:09:38 -0400 Subject: [PATCH 4/7] fix lint errors --- receiver/mongodbatlasreceiver/log_decode_test.go | 15 +++++++++++++++ receiver/mongodbatlasreceiver/log_decoder.go | 15 +++++++++++++++ 2 files changed, 30 insertions(+) diff --git a/receiver/mongodbatlasreceiver/log_decode_test.go b/receiver/mongodbatlasreceiver/log_decode_test.go index 157888bf4da70..caee50d6d0b88 100644 --- a/receiver/mongodbatlasreceiver/log_decode_test.go +++ b/receiver/mongodbatlasreceiver/log_decode_test.go @@ -1,3 +1,17 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package mongodbatlasreceiver import ( @@ -8,6 +22,7 @@ import ( "testing" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/mongodbatlasreceiver/internal/model" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/zap/zaptest" diff --git a/receiver/mongodbatlasreceiver/log_decoder.go b/receiver/mongodbatlasreceiver/log_decoder.go index 8dcfbc62a89f3..5aef55e02a29c 100644 --- a/receiver/mongodbatlasreceiver/log_decoder.go +++ b/receiver/mongodbatlasreceiver/log_decoder.go @@ -1,3 +1,17 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package mongodbatlasreceiver import ( @@ -10,6 +24,7 @@ import ( "regexp" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/mongodbatlasreceiver/internal/model" + "go.uber.org/zap" ) From dbda3f687d3905f732305768b9f0f74a7d5ff4fb Mon Sep 17 00:00:00 2001 From: Brandon Johnson Date: Mon, 12 Sep 2022 17:10:41 -0400 Subject: [PATCH 5/7] make goporto --- receiver/mongodbatlasreceiver/log_decoder.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/receiver/mongodbatlasreceiver/log_decoder.go b/receiver/mongodbatlasreceiver/log_decoder.go index 5aef55e02a29c..e0d7979e86f88 100644 --- a/receiver/mongodbatlasreceiver/log_decoder.go +++ b/receiver/mongodbatlasreceiver/log_decoder.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package mongodbatlasreceiver +package mongodbatlasreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/mongodbatlasreceiver" import ( "bufio" From f17d106fc9779b6b2b987823d9db9a2e39ba8783 Mon Sep 17 00:00:00 2001 From: Brandon Johnson Date: Tue, 13 Sep 2022 09:14:44 -0400 Subject: [PATCH 6/7] fix import order --- receiver/mongodbatlasreceiver/log_decode_test.go | 4 ++-- receiver/mongodbatlasreceiver/log_decoder.go | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/receiver/mongodbatlasreceiver/log_decode_test.go b/receiver/mongodbatlasreceiver/log_decode_test.go index caee50d6d0b88..8a54aaeca6709 100644 --- a/receiver/mongodbatlasreceiver/log_decode_test.go +++ b/receiver/mongodbatlasreceiver/log_decode_test.go @@ -21,11 +21,11 @@ import ( "path/filepath" "testing" - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/mongodbatlasreceiver/internal/model" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/zap/zaptest" + + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/mongodbatlasreceiver/internal/model" ) func TestDecode4_2(t *testing.T) { diff --git a/receiver/mongodbatlasreceiver/log_decoder.go b/receiver/mongodbatlasreceiver/log_decoder.go index e0d7979e86f88..36efe2194256c 100644 --- a/receiver/mongodbatlasreceiver/log_decoder.go +++ b/receiver/mongodbatlasreceiver/log_decoder.go @@ -23,9 +23,9 @@ import ( "io" "regexp" - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/mongodbatlasreceiver/internal/model" - "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/mongodbatlasreceiver/internal/model" ) func decodeLogs(logger *zap.Logger, clusterMajorVersion string, r io.Reader) ([]model.LogEntry, error) { From 3cb23b19cdbbbb24b3f001224958733b4298d114 Mon Sep 17 00:00:00 2001 From: Brandon Johnson Date: Tue, 13 Sep 2022 09:54:47 -0400 Subject: [PATCH 7/7] use slightly more efficient method for regex matching --- receiver/mongodbatlasreceiver/log_decoder.go | 23 +++++++++----------- 1 file changed, 10 insertions(+), 13 deletions(-) diff --git a/receiver/mongodbatlasreceiver/log_decoder.go b/receiver/mongodbatlasreceiver/log_decoder.go index 36efe2194256c..f57ec8b538b9f 100644 --- a/receiver/mongodbatlasreceiver/log_decoder.go +++ b/receiver/mongodbatlasreceiver/log_decoder.go @@ -32,7 +32,7 @@ func decodeLogs(logger *zap.Logger, clusterMajorVersion string, r io.Reader) ([] switch clusterMajorVersion { case mongoDBMajorVersion4_2: // 4.2 clusters use a console log format - return decodeConsole(logger.Named("console_decoder"), r) + return decode4_2(logger.Named("console_decoder"), r) default: // All other versions use JSON logging return decodeJSON(r) @@ -64,10 +64,9 @@ func decodeJSON(r io.Reader) ([]model.LogEntry, error) { } } -var consoleLogRegex = regexp.MustCompile(`^(?P\S+)\s+(?P\w+)\s+(?P[\w-]+)\s+\[(?P\S+)\]\s+(?P.*)$`) - -func decodeConsole(logger *zap.Logger, r io.Reader) ([]model.LogEntry, error) { +var mongo4_2LogRegex = regexp.MustCompile(`^(?P\S+)\s+(?P\w+)\s+(?P[\w-]+)\s+\[(?P\S+)\]\s+(?P.*)$`) +func decode4_2(logger *zap.Logger, r io.Reader) ([]model.LogEntry, error) { // Pass this into a gzip reader for decoding gzipReader, err := gzip.NewReader(r) if err != nil { @@ -82,24 +81,22 @@ func decodeConsole(logger *zap.Logger, r io.Reader) ([]model.LogEntry, error) { return entries, scanner.Err() } - submatches := consoleLogRegex.FindSubmatch(scanner.Bytes()) + submatches := mongo4_2LogRegex.FindStringSubmatch(scanner.Text()) if submatches == nil || len(submatches) != 6 { // Match failed for line; We will skip this line and continue processing others. logger.Error("Entry did not match regex", zap.String("entry", scanner.Text())) continue } - rawLog := string(submatches[0]) - entry := model.LogEntry{ Timestamp: model.LogTimestamp{ - Date: string(submatches[1]), + Date: submatches[1], }, - Severity: string(submatches[2]), - Component: string(submatches[3]), - Context: string(submatches[4]), - Message: string(submatches[5]), - Raw: &rawLog, + Severity: submatches[2], + Component: submatches[3], + Context: submatches[4], + Message: submatches[5], + Raw: &submatches[0], } entries = append(entries, entry)