Skip to content

Commit

Permalink
[receiver/mongodbatlas] Fix log parsing for v4.2 clusters (open-telem…
Browse files Browse the repository at this point in the history
…etry#14088)

* fix invalid parsing of v4.2 logs

* de-abstract decoding

* fix comments in log model

* fix lint errors

* make goporto

* fix import order

* use slightly more efficient method for regex matching
  • Loading branch information
BinaryFissionGames committed Sep 13, 2022
1 parent 981bb15 commit 52ba7d5
Show file tree
Hide file tree
Showing 11 changed files with 462 additions and 46 deletions.
18 changes: 18 additions & 0 deletions receiver/mongodbatlasreceiver/internal/model/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@

package model // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/mongodbatlasreceiver/internal/model"

import (
"encoding/json"
)

// LogEntry represents a MongoDB Atlas JSON log entry
type LogEntry struct {
Timestamp LogTimestamp `json:"t"`
Expand All @@ -23,6 +27,20 @@ type LogEntry struct {
Context string `json:"ctx"`
Message string `json:"msg"`
Attributes map[string]interface{} `json:"attr"`
// Raw, if it is present, is the original log line. It is not a part of the payload, but transient data added during decoding.
Raw *string `json:"-"`
}

// RawLog returns a raw representation of the log entry.
// In the case of console logs, this is the actual log line.
// In the case of JSON logs, it is reconstructed (re-marshaled) after being unmarshalled
func (l LogEntry) RawLog() (string, error) {
if l.Raw != nil {
return *l.Raw, nil
}

data, err := json.Marshal(l)
return string(data), err
}

// AuditLog represents a MongoDB Atlas JSON audit log entry
Expand Down
228 changes: 228 additions & 0 deletions receiver/mongodbatlasreceiver/log_decode_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,228 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http:https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package mongodbatlasreceiver

import (
"bytes"
"compress/gzip"
"os"
"path/filepath"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"

"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/mongodbatlasreceiver/internal/model"
)

func TestDecode4_2(t *testing.T) {
b, err := os.ReadFile(filepath.Join("testdata", "logs", "sample-payloads", "4.2.log"))
require.NoError(t, err)

zippedBuffer := &bytes.Buffer{}
gzipWriter := gzip.NewWriter(zippedBuffer)

_, err = gzipWriter.Write(b)
require.NoError(t, err)
require.NoError(t, gzipWriter.Close())

entries, err := decodeLogs(zaptest.NewLogger(t), "4.2", zippedBuffer)
require.NoError(t, err)

require.Equal(t, []model.LogEntry{
{
Timestamp: model.LogTimestamp{
Date: "2022-09-11T18:53:02.541+0000",
},
Severity: "I",
Component: "NETWORK",
Context: "listener",
Message: "connection accepted from 192.168.248.5:51972 #25288 (31 connections now open)",
Raw: strp("2022-09-11T18:53:02.541+0000 I NETWORK [listener] connection accepted from 192.168.248.5:51972 #25288 (31 connections now open)"),
},
{
Timestamp: model.LogTimestamp{
Date: "2022-09-11T18:53:02.541+0000",
},
Severity: "I",
Component: "NETWORK",
Context: "listener",
Message: "connection accepted from 192.168.248.5:51974 #25289 (32 connections now open)",
Raw: strp("2022-09-11T18:53:02.541+0000 I NETWORK [listener] connection accepted from 192.168.248.5:51974 #25289 (32 connections now open)"),
},
{
Timestamp: model.LogTimestamp{
Date: "2022-09-11T18:53:02.563+0000",
},
Severity: "I",
Component: "NETWORK",
Context: "conn25289",
Message: `received client metadata from 192.168.248.5:51974 conn25289: { driver: { name: "mongo-go-driver", version: "v1.7.2+prerelease" }, os: { type: "linux", architecture: "amd64" }, platform: "go1.18.2", application: { name: "MongoDB Automation Agent v12.3.4.7674 (git: 4c7df3ac1d15ef3269d44aa38b17376ca00147eb)" } }`,
Raw: strp(`2022-09-11T18:53:02.563+0000 I NETWORK [conn25289] received client metadata from 192.168.248.5:51974 conn25289: { driver: { name: "mongo-go-driver", version: "v1.7.2+prerelease" }, os: { type: "linux", architecture: "amd64" }, platform: "go1.18.2", application: { name: "MongoDB Automation Agent v12.3.4.7674 (git: 4c7df3ac1d15ef3269d44aa38b17376ca00147eb)" } }`),
},
}, entries)
}

func TestDecode4_2InvalidLog(t *testing.T) {
b, err := os.ReadFile(filepath.Join("testdata", "logs", "sample-payloads", "4.2_invalid_log.log"))
require.NoError(t, err)

zippedBuffer := &bytes.Buffer{}
gzipWriter := gzip.NewWriter(zippedBuffer)

_, err = gzipWriter.Write(b)
require.NoError(t, err)
require.NoError(t, gzipWriter.Close())

entries, err := decodeLogs(zaptest.NewLogger(t), "4.2", zippedBuffer)
require.NoError(t, err)

require.Equal(t, []model.LogEntry{
{
Timestamp: model.LogTimestamp{
Date: "2022-09-11T18:53:02.541+0000",
},
Severity: "I",
Component: "NETWORK",
Context: "listener",
Message: "connection accepted from 192.168.248.5:51972 #25288 (31 connections now open)",
Raw: strp("2022-09-11T18:53:02.541+0000 I NETWORK [listener] connection accepted from 192.168.248.5:51972 #25288 (31 connections now open)"),
},
{
Timestamp: model.LogTimestamp{
Date: "2022-09-11T18:53:02.563+0000",
},
Severity: "I",
Component: "NETWORK",
Context: "conn25289",
Message: `received client metadata from 192.168.248.5:51974 conn25289: { driver: { name: "mongo-go-driver", version: "v1.7.2+prerelease" }, os: { type: "linux", architecture: "amd64" }, platform: "go1.18.2", application: { name: "MongoDB Automation Agent v12.3.4.7674 (git: 4c7df3ac1d15ef3269d44aa38b17376ca00147eb)" } }`,
Raw: strp(`2022-09-11T18:53:02.563+0000 I NETWORK [conn25289] received client metadata from 192.168.248.5:51974 conn25289: { driver: { name: "mongo-go-driver", version: "v1.7.2+prerelease" }, os: { type: "linux", architecture: "amd64" }, platform: "go1.18.2", application: { name: "MongoDB Automation Agent v12.3.4.7674 (git: 4c7df3ac1d15ef3269d44aa38b17376ca00147eb)" } }`),
},
}, entries)
}

func TestDecode4_2NotGzip(t *testing.T) {
entries, err := decodeLogs(zaptest.NewLogger(t), "4.2", bytes.NewBuffer([]byte("Not compressed log")))
require.ErrorContains(t, err, "gzip: invalid header")
require.Nil(t, entries)
}

func TestDecode5_0(t *testing.T) {
b, err := os.ReadFile(filepath.Join("testdata", "logs", "sample-payloads", "5.0.log"))
require.NoError(t, err)

zippedBuffer := &bytes.Buffer{}
gzipWriter := gzip.NewWriter(zippedBuffer)

_, err = gzipWriter.Write(b)
require.NoError(t, err)
require.NoError(t, gzipWriter.Close())

entries, err := decodeLogs(zaptest.NewLogger(t), "5.0", zippedBuffer)
require.NoError(t, err)

require.Equal(t, []model.LogEntry{
{
Timestamp: model.LogTimestamp{
Date: "2022-09-11T18:53:14.675+00:00",
},
Severity: "I",
Component: "NETWORK",
ID: 22944,
Context: "conn35107",
Message: "Connection ended",
Attributes: map[string]interface{}{
"remote": "192.168.248.2:52066",
"uuid": "d3f4641a-14ca-4a24-b5bb-7d7b391a02e7",
"connectionId": float64(35107),
"connectionCount": float64(33),
},
},
{
Timestamp: model.LogTimestamp{
Date: "2022-09-11T18:53:14.676+00:00",
},
Severity: "I",
Component: "NETWORK",
ID: 22944,
Context: "conn35109",
Message: "Connection ended",
Attributes: map[string]interface{}{
"remote": "192.168.248.2:52070",
"uuid": "dcdb08ac-981d-41ea-9d6b-f85fe0475bd1",
"connectionId": float64(35109),
"connectionCount": float64(32),
},
},
{
Timestamp: model.LogTimestamp{
Date: "2022-09-11T18:53:37.727+00:00",
},
Severity: "I",
Component: "SHARDING",
ID: 20997,
Context: "conn9957",
Message: "Refreshed RWC defaults",
Attributes: map[string]interface{}{
"newDefaults": map[string]interface{}{},
},
},
}, entries)
}

func TestDecode5_0InvalidLog(t *testing.T) {
b, err := os.ReadFile(filepath.Join("testdata", "logs", "sample-payloads", "5.0_invalid_log.log"))
require.NoError(t, err)

zippedBuffer := &bytes.Buffer{}
gzipWriter := gzip.NewWriter(zippedBuffer)

_, err = gzipWriter.Write(b)
require.NoError(t, err)
require.NoError(t, gzipWriter.Close())

entries, err := decodeLogs(zaptest.NewLogger(t), "5.0", zippedBuffer)
assert.ErrorContains(t, err, "entry could not be decoded into LogEntry")

assert.Equal(t, []model.LogEntry{
{
Timestamp: model.LogTimestamp{
Date: "2022-09-11T18:53:14.675+00:00",
},
Severity: "I",
Component: "NETWORK",
ID: 22944,
Context: "conn35107",
Message: "Connection ended",
Attributes: map[string]interface{}{
"remote": "192.168.248.2:52066",
"uuid": "d3f4641a-14ca-4a24-b5bb-7d7b391a02e7",
"connectionId": float64(35107),
"connectionCount": float64(33),
},
},
}, entries)
}

func TestDecode5_0NotGzip(t *testing.T) {
entries, err := decodeLogs(zaptest.NewLogger(t), "5.0", bytes.NewBuffer([]byte("Not compressed log")))
require.ErrorContains(t, err, "gzip: invalid header")
require.Nil(t, entries)
}

func strp(s string) *string {
return &s
}
104 changes: 104 additions & 0 deletions receiver/mongodbatlasreceiver/log_decoder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http:https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package mongodbatlasreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/mongodbatlasreceiver"

import (
"bufio"
"compress/gzip"
"encoding/json"
"errors"
"fmt"
"io"
"regexp"

"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/mongodbatlasreceiver/internal/model"
)

func decodeLogs(logger *zap.Logger, clusterMajorVersion string, r io.Reader) ([]model.LogEntry, error) {
switch clusterMajorVersion {
case mongoDBMajorVersion4_2:
// 4.2 clusters use a console log format
return decode4_2(logger.Named("console_decoder"), r)
default:
// All other versions use JSON logging
return decodeJSON(r)
}
}

func decodeJSON(r io.Reader) ([]model.LogEntry, error) {
// Pass this into a gzip reader for decoding
reader, err := gzip.NewReader(r)
if err != nil {
return nil, err
}

// Logs are in JSON format so create a JSON decoder to process them
dec := json.NewDecoder(reader)

var entries []model.LogEntry
for {
var entry model.LogEntry
err := dec.Decode(&entry)
if errors.Is(err, io.EOF) {
return entries, nil
}
if err != nil {
return entries, fmt.Errorf("entry could not be decoded into LogEntry: %w", err)
}

entries = append(entries, entry)
}
}

var mongo4_2LogRegex = regexp.MustCompile(`^(?P<timestamp>\S+)\s+(?P<severity>\w+)\s+(?P<component>[\w-]+)\s+\[(?P<context>\S+)\]\s+(?P<message>.*)$`)

func decode4_2(logger *zap.Logger, r io.Reader) ([]model.LogEntry, error) {
// Pass this into a gzip reader for decoding
gzipReader, err := gzip.NewReader(r)
if err != nil {
return nil, err
}

scanner := bufio.NewScanner(gzipReader)
var entries []model.LogEntry
for {
if !scanner.Scan() {
// Scan failed; This might just be EOF, in which case Err will be nil, or it could be some other IO error.
return entries, scanner.Err()
}

submatches := mongo4_2LogRegex.FindStringSubmatch(scanner.Text())
if submatches == nil || len(submatches) != 6 {
// Match failed for line; We will skip this line and continue processing others.
logger.Error("Entry did not match regex", zap.String("entry", scanner.Text()))
continue
}

entry := model.LogEntry{
Timestamp: model.LogTimestamp{
Date: submatches[1],
},
Severity: submatches[2],
Component: submatches[3],
Context: submatches[4],
Message: submatches[5],
Raw: &submatches[0],
}

entries = append(entries, entry)
}
}
Loading

0 comments on commit 52ba7d5

Please sign in to comment.