Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[receiver/mongodbatlas] Fix log parsing for v4.2 clusters #14088

Merged
merged 7 commits into from
Sep 13, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
de-abstract decoding
  • Loading branch information
BinaryFissionGames committed Sep 12, 2022
commit e3944d54e33a9c4a2068ecf06cd7aacbf31d2a90
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@ func TestDecode4_2(t *testing.T) {
require.NoError(t, err)
require.NoError(t, gzipWriter.Close())

decoder := decoderForVersion(zaptest.NewLogger(t), "4.2")

entries, err := decoder.Decode(zippedBuffer)
entries, err := decodeLogs(zaptest.NewLogger(t), "4.2", zippedBuffer)
require.NoError(t, err)

require.Equal(t, []model.LogEntry{
Expand Down Expand Up @@ -74,9 +72,7 @@ func TestDecode4_2InvalidLog(t *testing.T) {
require.NoError(t, err)
require.NoError(t, gzipWriter.Close())

decoder := decoderForVersion(zaptest.NewLogger(t), "4.2")

entries, err := decoder.Decode(zippedBuffer)
entries, err := decodeLogs(zaptest.NewLogger(t), "4.2", zippedBuffer)
require.NoError(t, err)

require.Equal(t, []model.LogEntry{
Expand Down Expand Up @@ -104,8 +100,7 @@ func TestDecode4_2InvalidLog(t *testing.T) {
}

func TestDecode4_2NotGzip(t *testing.T) {
decoder := decoderForVersion(zaptest.NewLogger(t), "4.2")
entries, err := decoder.Decode(bytes.NewBuffer([]byte("Not compressed log")))
entries, err := decodeLogs(zaptest.NewLogger(t), "4.2", bytes.NewBuffer([]byte("Not compressed log")))
require.ErrorContains(t, err, "gzip: invalid header")
require.Nil(t, entries)
}
Expand All @@ -121,9 +116,7 @@ func TestDecode5_0(t *testing.T) {
require.NoError(t, err)
require.NoError(t, gzipWriter.Close())

decoder := decoderForVersion(zaptest.NewLogger(t), "5.0")

entries, err := decoder.Decode(zippedBuffer)
entries, err := decodeLogs(zaptest.NewLogger(t), "5.0", zippedBuffer)
require.NoError(t, err)

require.Equal(t, []model.LogEntry{
Expand Down Expand Up @@ -186,9 +179,7 @@ func TestDecode5_0InvalidLog(t *testing.T) {
require.NoError(t, err)
require.NoError(t, gzipWriter.Close())

decoder := decoderForVersion(zaptest.NewLogger(t), "5.0")

entries, err := decoder.Decode(zippedBuffer)
entries, err := decodeLogs(zaptest.NewLogger(t), "5.0", zippedBuffer)
assert.ErrorContains(t, err, "entry could not be decoded into LogEntry")

assert.Equal(t, []model.LogEntry{
Expand All @@ -212,8 +203,7 @@ func TestDecode5_0InvalidLog(t *testing.T) {
}

func TestDecode5_0NotGzip(t *testing.T) {
decoder := decoderForVersion(zaptest.NewLogger(t), "5.0")
entries, err := decoder.Decode(bytes.NewBuffer([]byte("Not compressed log")))
entries, err := decodeLogs(zaptest.NewLogger(t), "5.0", bytes.NewBuffer([]byte("Not compressed log")))
require.ErrorContains(t, err, "gzip: invalid header")
require.Nil(t, entries)
}
Expand Down
46 changes: 7 additions & 39 deletions receiver/mongodbatlasreceiver/log_decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,38 +13,18 @@ import (
"go.uber.org/zap"
)

// logDecoder is an interface that decodes logs from an io.Reader into LogEntry structs.
type logDecoder interface {
Decode(r io.Reader) ([]model.LogEntry, error)
}

func decoderForVersion(logger *zap.Logger, clusterMajorVersion string) logDecoder {
var decoder logDecoder
func decodeLogs(logger *zap.Logger, clusterMajorVersion string, r io.Reader) ([]model.LogEntry, error) {
switch clusterMajorVersion {
case mongoDBMajorVersion4_2:
// 4.2 clusters use a console log format
decoder = newConsoleLogDecoder(logger.Named("consoledecoder"))
return decodeConsole(logger.Named("console_decoder"), r)
default:
// All other versions use JSON logging
decoder = newJSONLogDecoder(logger.Named("jsondecoder"))
return decodeJSON(r)
}

return decoder
}

// jsonLogDecoder is a logDecoder that decodes JSON formatted mongodb logs.
// This is the format used for mongodb 4.4+
type jsonLogDecoder struct {
logger *zap.Logger
}

func newJSONLogDecoder(logger *zap.Logger) *jsonLogDecoder {
return &jsonLogDecoder{
logger: logger,
}
}

func (j *jsonLogDecoder) Decode(r io.Reader) ([]model.LogEntry, error) {
func decodeJSON(r io.Reader) ([]model.LogEntry, error) {
// Pass this into a gzip reader for decoding
reader, err := gzip.NewReader(r)
if err != nil {
Expand All @@ -71,19 +51,7 @@ func (j *jsonLogDecoder) Decode(r io.Reader) ([]model.LogEntry, error) {

var consoleLogRegex = regexp.MustCompile(`^(?P<timestamp>\S+)\s+(?P<severity>\w+)\s+(?P<component>[\w-]+)\s+\[(?P<context>\S+)\]\s+(?P<message>.*)$`)

// consoleLogDecoder is a logDecoder that decodes "console" formatted mongodb logs.
// This is the format used for mongodb 4.2
type consoleLogDecoder struct {
logger *zap.Logger
}

func newConsoleLogDecoder(logger *zap.Logger) *consoleLogDecoder {
return &consoleLogDecoder{
logger: logger,
}
}

func (c *consoleLogDecoder) Decode(r io.Reader) ([]model.LogEntry, error) {
func decodeConsole(logger *zap.Logger, r io.Reader) ([]model.LogEntry, error) {

// Pass this into a gzip reader for decoding
gzipReader, err := gzip.NewReader(r)
Expand All @@ -101,8 +69,8 @@ func (c *consoleLogDecoder) Decode(r io.Reader) ([]model.LogEntry, error) {

submatches := consoleLogRegex.FindSubmatch(scanner.Bytes())
if submatches == nil || len(submatches) != 6 {
// Match failed for line
c.logger.Error("Entry did not match regex", zap.String("entry", scanner.Text()))
// Match failed for line; We will skip this line and continue processing others.
logger.Error("Entry did not match regex", zap.String("entry", scanner.Text()))
continue
}

Expand Down
7 changes: 3 additions & 4 deletions receiver/mongodbatlasreceiver/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,14 +203,14 @@ func filterClusters(clusters []mongodbatlas.Cluster, clusterNames []string, incl
return filtered
}

func (s *logsReceiver) getHostLogs(groupID, hostname, logName string, decoder logDecoder) ([]model.LogEntry, error) {
func (s *logsReceiver) getHostLogs(groupID, hostname, logName string, clusterMajorVersion string) ([]model.LogEntry, error) {
// Get gzip bytes buffer from API
buf, err := s.client.GetLogs(context.Background(), groupID, hostname, logName, s.start, s.end)
if err != nil {
return nil, err
}

return decoder.Decode(buf)
return decodeLogs(s.log, clusterMajorVersion, buf)
}

func (s *logsReceiver) getHostAuditLogs(groupID, hostname, logName string) ([]model.AuditLog, error) {
Expand Down Expand Up @@ -242,8 +242,7 @@ func (s *logsReceiver) getHostAuditLogs(groupID, hostname, logName string) ([]mo
}

func (s *logsReceiver) collectLogs(pc ProjectContext, hostname, logName, clusterName, clusterMajorVersion string) {
decoder := decoderForVersion(s.log, clusterMajorVersion)
logs, err := s.getHostLogs(pc.Project.ID, hostname, logName, decoder)
logs, err := s.getHostLogs(pc.Project.ID, hostname, logName, clusterMajorVersion)
if err != nil && !errors.Is(err, io.EOF) {
s.log.Warn("Failed to retrieve logs from: "+logName, zap.Error(err))
}
Expand Down