From d4352eab1f689d8b5dc33c16b73eafc0e055c920 Mon Sep 17 00:00:00 2001 From: Brandon Johnson Date: Mon, 6 Mar 2023 09:35:18 -0500 Subject: [PATCH] [receiver/filelog] Add support for parsing header lines as log metadata (#18921) * add header metadata parsing to filelog receiver --- .chloggen/filelog-header-parsing.yaml | 13 ++ pkg/stanza/docs/operators/file_input.md | 10 + pkg/stanza/fileconsumer/attributes.go | 14 +- pkg/stanza/fileconsumer/config.go | 39 ++++ pkg/stanza/fileconsumer/config_test.go | 119 ++++++++++++ pkg/stanza/fileconsumer/file_test.go | 117 ++++++++++++ pkg/stanza/fileconsumer/header.go | 158 +++++++++++++++ pkg/stanza/fileconsumer/header_test.go | 190 +++++++++++++++++++ pkg/stanza/fileconsumer/reader.go | 100 +++++++++- pkg/stanza/fileconsumer/reader_factory.go | 80 +++++++- pkg/stanza/fileconsumer/reader_test.go | 64 +++++++ pkg/stanza/fileconsumer/testdata/config.yaml | 6 + pkg/stanza/fileconsumer/util_test.go | 28 +++ pkg/stanza/operator/input/file/config.go | 5 + pkg/stanza/operator/input/file/file.go | 4 + receiver/filelogreceiver/README.md | 11 ++ 16 files changed, 940 insertions(+), 18 deletions(-) create mode 100755 .chloggen/filelog-header-parsing.yaml create mode 100644 pkg/stanza/fileconsumer/header.go create mode 100644 pkg/stanza/fileconsumer/header_test.go diff --git a/.chloggen/filelog-header-parsing.yaml b/.chloggen/filelog-header-parsing.yaml new file mode 100755 index 0000000000000..12d8c6a02c6c6 --- /dev/null +++ b/.chloggen/filelog-header-parsing.yaml @@ -0,0 +1,13 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: filelogreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Adds ability to parse a file header and parse it as metadata to decorate log entries. + +# One or more tracking issues related to the change +issues: [18198] + +subtext: This feature must be activated with the `filelog.allowHeaderMetadataParsing` feature gate. diff --git a/pkg/stanza/docs/operators/file_input.md b/pkg/stanza/docs/operators/file_input.md index 575877f856dd0..d039c88baf14c 100644 --- a/pkg/stanza/docs/operators/file_input.md +++ b/pkg/stanza/docs/operators/file_input.md @@ -28,6 +28,9 @@ The `file_input` operator reads logs from files. It will place the lines read in | `delete_after_read` | `false` | If `true`, each log file will be read and then immediately deleted. Requires that the `filelog.allowFileDeletion` feature gate is enabled. | | `attributes` | {} | A map of `key: value` pairs to add to the entry's attributes. | | `resource` | {} | A map of `key: value` pairs to add to the entry's resource. | +| `header` | nil | Specifies options for parsing header metadata. Requires that the `filelog.allowHeaderMetadataParsing` feature gate is enabled. See below for details. | +| `header.pattern` | required for header metadata parsing | A regex that matches every header line. | +| `header.metadata_operators` | required for header metadata parsing | A list of operators used to parse metadata from the header. | Note that by default, no logs will be read unless the monitored file is actively being written to because `start_at` defaults to `end`. @@ -65,6 +68,13 @@ To avoid the data loss, choose move/create rotation method and set `max_concurre Other less common encodings are supported on a best-effort basis. See [https://www.iana.org/assignments/character-sets/character-sets.xhtml](https://www.iana.org/assignments/character-sets/character-sets.xhtml) for other encodings available. +### Header Metadata Parsing + +To enable header metadata parsing, the `filelog.allowHeaderMetadataParsing` feature gate must be set, and `start_at` must be `beginning`. + +If set, the file input operator will attempt to read a header from the start of the file. Each header line must match the `header.pattern` pattern. Each line is emitted into a pipeline defined by `header.metadata_operators`. Any attributes on the resultant entry from the embedded pipeline will be merged with the attributes from previous lines (attribute collisions will be resolved with an upsert strategy). After all header lines are read, the final merged header attributes will be present on every log line that is emitted for the file. + +The header lines are not emitted to the output operator. ### Example Configurations diff --git a/pkg/stanza/fileconsumer/attributes.go b/pkg/stanza/fileconsumer/attributes.go index db77044224fc0..c03a8d923aa83 100644 --- a/pkg/stanza/fileconsumer/attributes.go +++ b/pkg/stanza/fileconsumer/attributes.go @@ -21,10 +21,16 @@ import ( ) type FileAttributes struct { - Name string - Path string - NameResolved string - PathResolved string + Name string `json:"-"` + Path string `json:"-"` + NameResolved string `json:"-"` + PathResolved string `json:"-"` + HeaderAttributes map[string]any +} + +// HeaderAttributesCopy gives a copy of the HeaderAttributes, in order to restrict mutation of the HeaderAttributes. +func (f *FileAttributes) HeaderAttributesCopy() map[string]any { + return mapCopy(f.HeaderAttributes) } // resolveFileAttributes resolves file attributes diff --git a/pkg/stanza/fileconsumer/config.go b/pkg/stanza/fileconsumer/config.go index 9ef87b49b033b..b179cdb8199ad 100644 --- a/pkg/stanza/fileconsumer/config.go +++ b/pkg/stanza/fileconsumer/config.go @@ -39,6 +39,13 @@ var allowFileDeletion = featuregate.GlobalRegistry().MustRegister( featuregate.WithRegisterReferenceURL("https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/16314"), ) +var AllowHeaderMetadataParsing = featuregate.GlobalRegistry().MustRegister( + "filelog.allowHeaderMetadataParsing", + featuregate.StageAlpha, + featuregate.WithRegisterDescription("When enabled, allows usage of the `header` setting."), + featuregate.WithRegisterReferenceURL("https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/18198"), +) + // NewConfig creates a new input config with default values func NewConfig() *Config { return &Config{ @@ -71,6 +78,7 @@ type Config struct { MaxBatches int `mapstructure:"max_batches,omitempty"` DeleteAfterRead bool `mapstructure:"delete_after_read,omitempty"` Splitter helper.SplitterConfig `mapstructure:",squash,omitempty"` + Header *HeaderConfig `mapstructure:"header,omitempty"` } // Build will build a file input operator from the supplied configuration @@ -78,6 +86,11 @@ func (c Config) Build(logger *zap.SugaredLogger, emit EmitFunc) (*Manager, error if c.DeleteAfterRead && !allowFileDeletion.IsEnabled() { return nil, fmt.Errorf("`delete_after_read` requires feature gate `%s`", allowFileDeletion.ID()) } + + if c.Header != nil && !AllowHeaderMetadataParsing.IsEnabled() { + return nil, fmt.Errorf("`header` requires feature gate `%s`", AllowHeaderMetadataParsing.ID()) + } + if err := c.validate(); err != nil { return nil, err } @@ -124,6 +137,20 @@ func (c Config) buildManager(logger *zap.SugaredLogger, emit EmitFunc, factory s default: return nil, fmt.Errorf("invalid start_at location '%s'", c.StartAt) } + + var hs *headerSettings + if c.Header != nil { + enc, err := c.Splitter.EncodingConfig.Build() + if err != nil { + return nil, fmt.Errorf("failed to create encoding: %w", err) + } + + hs, err = c.Header.buildHeaderSettings(enc.Encoding) + if err != nil { + return nil, fmt.Errorf("failed to build header config: %w", err) + } + } + return &Manager{ SugaredLogger: logger.With("component", "fileconsumer"), cancel: func() {}, @@ -137,6 +164,7 @@ func (c Config) buildManager(logger *zap.SugaredLogger, emit EmitFunc, factory s fromBeginning: startAtBeginning, splitterFactory: factory, encodingConfig: c.Splitter.EncodingConfig, + headerSettings: hs, }, finder: c.Finder, roller: newRoller(), @@ -186,6 +214,10 @@ func (c Config) validate() error { return fmt.Errorf("`delete_after_read` cannot be used with `start_at: end`") } + if c.Header != nil && c.StartAt == "end" { + return fmt.Errorf("`header` cannot be specified with `start_at: end`") + } + if c.MaxBatches < 0 { return errors.New("`max_batches` must not be negative") } @@ -194,5 +226,12 @@ func (c Config) validate() error { if err != nil { return err } + + if c.Header != nil { + if err := c.Header.validate(); err != nil { + return fmt.Errorf("invalid config for `header`: %w", err) + } + } + return nil } diff --git a/pkg/stanza/fileconsumer/config_test.go b/pkg/stanza/fileconsumer/config_test.go index 7e255b24dc90f..12b232747342f 100644 --- a/pkg/stanza/fileconsumer/config_test.go +++ b/pkg/stanza/fileconsumer/config_test.go @@ -21,9 +21,12 @@ import ( "time" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/featuregate" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/operatortest" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/parser/regex" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/testutil" ) @@ -360,6 +363,22 @@ func TestUnmarshal(t *testing.T) { return newMockOperatorConfig(cfg) }(), }, + { + Name: "header_config", + Expect: func() *mockOperatorConfig { + cfg := NewConfig() + regexCfg := regex.NewConfig() + cfg.Header = &HeaderConfig{ + Pattern: "^#", + MetadataOperators: []operator.Config{ + { + Builder: regexCfg, + }, + }, + } + return newMockOperatorConfig(cfg) + }(), + }, }, }.Run(t) } @@ -518,6 +537,14 @@ func TestBuild(t *testing.T) { require.Equal(t, 6, m.maxBatches) }, }, + { + "HeaderConfigNoFlag", + func(f *Config) { + f.Header = &HeaderConfig{} + }, + require.Error, + nil, + }, } for _, tc := range cases { @@ -620,3 +647,95 @@ func TestBuildWithSplitFunc(t *testing.T) { }) } } + +func TestBuildWithHeader(t *testing.T) { + require.NoError(t, featuregate.GlobalRegistry().Set(AllowHeaderMetadataParsing.ID(), true)) + t.Cleanup(func() { + require.NoError(t, featuregate.GlobalRegistry().Set(AllowHeaderMetadataParsing.ID(), false)) + }) + + basicConfig := func() *Config { + cfg := NewConfig() + cfg.Include = []string{"/var/log/testpath.*"} + cfg.Exclude = []string{"/var/log/testpath.ex*"} + cfg.PollInterval = 10 * time.Millisecond + return cfg + } + + cases := []struct { + name string + modifyBaseConfig func(*Config) + errorRequirement require.ErrorAssertionFunc + validate func(*testing.T, *Manager) + }{ + { + "InvalidHeaderConfig", + func(f *Config) { + f.Header = &HeaderConfig{} + f.StartAt = "beginning" + }, + require.Error, + nil, + }, + { + "HeaderConfigWithStartAtEnd", + func(f *Config) { + regexCfg := regex.NewConfig() + regexCfg.Regex = "^(?P.*)" + f.Header = &HeaderConfig{ + Pattern: "^#", + MetadataOperators: []operator.Config{ + { + Builder: regexCfg, + }, + }, + } + f.StartAt = "end" + }, + require.Error, + nil, + }, + { + "ValidHeaderConfig", + func(f *Config) { + regexCfg := regex.NewConfig() + regexCfg.Regex = "^(?P.*)" + f.Header = &HeaderConfig{ + Pattern: "^#", + MetadataOperators: []operator.Config{ + { + Builder: regexCfg, + }, + }, + } + f.StartAt = "beginning" + }, + require.NoError, + func(t *testing.T, f *Manager) { + require.NotNil(t, f.readerFactory.headerSettings) + require.NotNil(t, f.readerFactory.headerSettings.matchRegex) + require.NotNil(t, f.readerFactory.headerSettings.splitFunc) + require.NotNil(t, f.readerFactory.headerSettings.config) + }, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + tc := tc + t.Parallel() + cfg := basicConfig() + tc.modifyBaseConfig(cfg) + + nopEmit := func(_ context.Context, _ *FileAttributes, _ []byte) {} + + input, err := cfg.Build(testutil.Logger(t), nopEmit) + tc.errorRequirement(t, err) + if err != nil { + return + } + + tc.validate(t, input) + }) + } +} diff --git a/pkg/stanza/fileconsumer/file_test.go b/pkg/stanza/fileconsumer/file_test.go index 98cc33bb43c4c..a7243238523f3 100644 --- a/pkg/stanza/fileconsumer/file_test.go +++ b/pkg/stanza/fileconsumer/file_test.go @@ -1323,6 +1323,36 @@ func TestMaxBatching(t *testing.T) { } } +// TestReadExistingLogsWithHeader tests that, when starting from beginning, we +// read all the lines that are already there, and parses the headers +func TestReadExistingLogsWithHeader(t *testing.T) { + require.NoError(t, featuregate.GlobalRegistry().Set(AllowHeaderMetadataParsing.ID(), true)) + t.Cleanup(func() { + require.NoError(t, featuregate.GlobalRegistry().Set(AllowHeaderMetadataParsing.ID(), false)) + }) + + tempDir := t.TempDir() + cfg := NewConfig().includeDir(tempDir) + cfg.StartAt = "beginning" + cfg = cfg.withHeader("^#", "(?P[A-z]+): (?P[A-z]+)") + + operator, emitCalls := buildTestManager(t, cfg) + + // Create a file, then start + temp := openTemp(t, tempDir) + writeString(t, temp, "#headerField: headerValue\ntestlog\n") + + require.NoError(t, operator.Start(testutil.NewMockPersister("test"))) + defer func() { + require.NoError(t, operator.Stop()) + }() + + waitForTokenHeaderAttributes(t, emitCalls, []byte("testlog"), map[string]any{ + "header_key": "headerField", + "header_value": "headerValue", + }) +} + func TestDeleteAfterRead_SkipPartials(t *testing.T) { bytesPerLine := 100 shortFileLine := tokenWithLength(bytesPerLine - 1) @@ -1394,3 +1424,90 @@ func TestDeleteAfterRead_SkipPartials(t *testing.T) { require.Greater(t, reader.Offset, int64(0)) require.Less(t, reader.Offset, int64(longFileSize)) } + +func TestHeaderPersistance(t *testing.T) { + require.NoError(t, featuregate.GlobalRegistry().Set(AllowHeaderMetadataParsing.ID(), true)) + t.Cleanup(func() { + require.NoError(t, featuregate.GlobalRegistry().Set(AllowHeaderMetadataParsing.ID(), false)) + }) + + tempDir := t.TempDir() + cfg := NewConfig().includeDir(tempDir) + cfg.StartAt = "beginning" + cfg = cfg.withHeader("^#", "(?P[A-z]+): (?P[A-z]+)") + + op1, emitCalls1 := buildTestManager(t, cfg) + + // Create a file, then start + temp := openTemp(t, tempDir) + writeString(t, temp, "#headerField: headerValue\nlog line\n") + + persister := testutil.NewUnscopedMockPersister() + require.NoError(t, op1.Start(persister)) + + waitForTokenHeaderAttributes(t, emitCalls1, []byte("log line"), map[string]any{ + "header_key": "headerField", + "header_value": "headerValue", + }) + + require.NoError(t, op1.Stop()) + + writeString(t, temp, "log line 2\n") + + op2, emitCalls2 := buildTestManager(t, cfg) + + require.NoError(t, op2.Start(persister)) + + waitForTokenHeaderAttributes(t, emitCalls2, []byte("log line 2"), map[string]any{ + "header_key": "headerField", + "header_value": "headerValue", + }) + + require.NoError(t, op2.Stop()) + +} + +func TestHeaderPersistanceInHeader(t *testing.T) { + require.NoError(t, featuregate.GlobalRegistry().Set(AllowHeaderMetadataParsing.ID(), true)) + t.Cleanup(func() { + require.NoError(t, featuregate.GlobalRegistry().Set(AllowHeaderMetadataParsing.ID(), false)) + }) + + tempDir := t.TempDir() + cfg1 := NewConfig().includeDir(tempDir) + cfg1.StartAt = "beginning" + cfg1 = cfg1.withHeader(`^\|`, "headerField1: (?P[A-z0-9]+)") + + op1, _ := buildTestManager(t, cfg1) + + // Create a file, then start + temp := openTemp(t, tempDir) + writeString(t, temp, "|headerField1: headerValue1\n") + + persister := testutil.NewUnscopedMockPersister() + require.NoError(t, op1.Start(persister)) + + // The operator will poll at fixed time intervals, but we just want to make sure at least + // one poll operation occurs between now and when we stop. + op1.poll(context.Background()) + + require.NoError(t, op1.Stop()) + + writeString(t, temp, "|headerField2: headerValue2\nlog line\n") + + cfg2 := NewConfig().includeDir(tempDir) + cfg2.StartAt = "beginning" + cfg2 = cfg2.withHeader(`^\|`, "headerField2: (?P[A-z0-9]+)") + + op2, emitCalls := buildTestManager(t, cfg2) + + require.NoError(t, op2.Start(persister)) + + waitForTokenHeaderAttributes(t, emitCalls, []byte("log line"), map[string]any{ + "header_value_1": "headerValue1", + "header_value_2": "headerValue2", + }) + + require.NoError(t, op2.Stop()) + +} diff --git a/pkg/stanza/fileconsumer/header.go b/pkg/stanza/fileconsumer/header.go new file mode 100644 index 0000000000000..98f47bc569524 --- /dev/null +++ b/pkg/stanza/fileconsumer/header.go @@ -0,0 +1,158 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package fileconsumer // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer" + +import ( + "bufio" + "bytes" + "context" + "errors" + "fmt" + "regexp" + + "go.uber.org/zap" + "golang.org/x/text/encoding" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/pipeline" +) + +const headerPipelineOutputType = "header_log_emitter" + +type HeaderConfig struct { + Pattern string `mapstructure:"pattern"` + MetadataOperators []operator.Config `mapstructure:"metadata_operators"` +} + +// validate returns an error describing why the configuration is invalid, or nil if the configuration is valid. +func (hc *HeaderConfig) validate() error { + if len(hc.MetadataOperators) == 0 { + return errors.New("at least one operator must be specified for `metadata_operators`") + } + + nopLogger := zap.NewNop().Sugar() + outOp := newHeaderPipelineOutput(nopLogger) + p, err := pipeline.Config{ + Operators: hc.MetadataOperators, + DefaultOutput: outOp, + }.Build(nopLogger) + + if err != nil { + return fmt.Errorf("failed to build pipelines: %w", err) + } + + for _, op := range p.Operators() { + // This is the default output we created, it's always valid + if op.Type() == headerPipelineOutputType { + continue + } + + if !op.CanProcess() { + return fmt.Errorf("operator '%s' in `metadata_operators` cannot process entries", op.ID()) + } + + if !op.CanOutput() { + return fmt.Errorf("operator '%s' in `metadata_operators` does not propagate entries", op.ID()) + } + + // Filter processor also may fail to propagate some entries + if op.Type() == "filter" { + return fmt.Errorf("operator of type filter is not allowed in `metadata_operators`") + } + } + + return nil +} + +func (hc *HeaderConfig) buildHeaderSettings(enc encoding.Encoding) (*headerSettings, error) { + var err error + matchRegex, err := regexp.Compile(hc.Pattern) + if err != nil { + return nil, fmt.Errorf("failed to compile `pattern`: %w", err) + } + + splitFunc, err := helper.NewNewlineSplitFunc(enc, false, func(b []byte) []byte { + return bytes.Trim(b, "\r\n") + }) + if err != nil { + return nil, fmt.Errorf("failed to create split func: %w", err) + } + + return &headerSettings{ + matchRegex: matchRegex, + splitFunc: splitFunc, + config: hc, + }, nil +} + +// headerSettings contains compiled objects defined by a HeaderConfig +type headerSettings struct { + matchRegex *regexp.Regexp + splitFunc bufio.SplitFunc + config *HeaderConfig +} + +// headerPipelineOutput is a stanza operator that emits log entries to a channel +type headerPipelineOutput struct { + helper.OutputOperator + logChan chan *entry.Entry +} + +// newHeaderPipelineOutput creates a new receiver output +func newHeaderPipelineOutput(logger *zap.SugaredLogger) *headerPipelineOutput { + return &headerPipelineOutput{ + OutputOperator: helper.OutputOperator{ + BasicOperator: helper.BasicOperator{ + OperatorID: headerPipelineOutputType, + OperatorType: headerPipelineOutputType, + SugaredLogger: logger, + }, + }, + logChan: make(chan *entry.Entry, 1), + } +} + +// Start starts the goroutine(s) required for this operator +func (e *headerPipelineOutput) Start(_ operator.Persister) error { + return nil +} + +// Stop will close the log channel and stop running goroutines +func (e *headerPipelineOutput) Stop() error { + return nil +} + +func (e *headerPipelineOutput) Process(_ context.Context, ent *entry.Entry) error { + // Drop the entry if logChan is full, in order to avoid this operator blocking. + // This protects against a case where an operator could return an error, but continue propagating a log entry, + // leaving an unexpected entry in the output channel. + select { + case e.logChan <- ent: + default: + } + + return nil +} + +func (e *headerPipelineOutput) WaitForEntry(ctx context.Context) (*entry.Entry, error) { + select { + case <-ctx.Done(): + return nil, fmt.Errorf("got context cancellation while waiting for entry: %w", ctx.Err()) + case ent := <-e.logChan: + return ent, nil + } +} diff --git a/pkg/stanza/fileconsumer/header_test.go b/pkg/stanza/fileconsumer/header_test.go new file mode 100644 index 0000000000000..e701ca0105aac --- /dev/null +++ b/pkg/stanza/fileconsumer/header_test.go @@ -0,0 +1,190 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package fileconsumer + +import ( + "testing" + + "github.com/stretchr/testify/require" + "golang.org/x/text/encoding" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/input/generate" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/output/stdout" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/parser/regex" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/transformer/filter" +) + +func TestHeaderConfig_validate(t *testing.T) { + regexConf := regex.NewConfig() + regexConf.Regex = "^#(?P.*)" + + invalidRegexConf := regex.NewConfig() + invalidRegexConf.Regex = "(" + + generateConf := generate.NewConfig("") + stdoutConf := stdout.NewConfig("") + filterConfg := filter.NewConfig() + filterConfg.Expression = "true" + + testCases := []struct { + name string + conf HeaderConfig + expectedErr string + }{ + { + name: "Valid config", + conf: HeaderConfig{ + Pattern: "^#", + MetadataOperators: []operator.Config{ + { + Builder: regexConf, + }, + }, + }, + }, + { + name: "Valid without specified header size", + conf: HeaderConfig{ + Pattern: "^#", + MetadataOperators: []operator.Config{ + { + Builder: regexConf, + }, + }, + }, + }, + { + name: "No operators specified", + conf: HeaderConfig{ + Pattern: "^#", + MetadataOperators: []operator.Config{}, + }, + expectedErr: "at least one operator must be specified for `metadata_operators`", + }, + { + name: "Invalid operator specified", + conf: HeaderConfig{ + Pattern: "^#", + MetadataOperators: []operator.Config{ + { + Builder: invalidRegexConf, + }, + }, + }, + expectedErr: "failed to build pipelines:", + }, + { + name: "first operator cannot process", + conf: HeaderConfig{ + Pattern: "^#", + MetadataOperators: []operator.Config{ + { + Builder: generateConf, + }, + }, + }, + expectedErr: "operator 'generate_input' in `metadata_operators` cannot process entries", + }, + { + name: "operator cannot output", + conf: HeaderConfig{ + Pattern: "^#", + MetadataOperators: []operator.Config{ + { + Builder: stdoutConf, + }, + }, + }, + expectedErr: "operator 'stdout' in `metadata_operators` does not propagate entries", + }, + { + name: "filter operator present", + conf: HeaderConfig{ + Pattern: "^#", + MetadataOperators: []operator.Config{ + { + Builder: filterConfg, + }, + }, + }, + expectedErr: "operator of type filter is not allowed in `metadata_operators`", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + err := tc.conf.validate() + if tc.expectedErr != "" { + require.ErrorContains(t, err, tc.expectedErr) + } else { + require.NoError(t, err) + } + }) + } +} + +func TestHeaderConfig_buildHeaderSettings(t *testing.T) { + regexConf := regex.NewConfig() + regexConf.Regex = "^#(?P.*)" + + invalidRegexConf := regex.NewConfig() + invalidRegexConf.Regex = "(" + + testCases := []struct { + name string + enc encoding.Encoding + conf HeaderConfig + expectedErr string + }{ + { + name: "valid config", + enc: encoding.Nop, + conf: HeaderConfig{ + Pattern: "^#", + MetadataOperators: []operator.Config{ + { + Builder: regexConf, + }, + }, + }, + }, + { + name: "Invalid pattern", + conf: HeaderConfig{ + Pattern: "(", + MetadataOperators: []operator.Config{ + { + Builder: regexConf, + }, + }, + }, + expectedErr: "failed to compile `pattern`:", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + h, err := tc.conf.buildHeaderSettings(tc.enc) + if tc.expectedErr != "" { + require.ErrorContains(t, err, tc.expectedErr) + } else { + require.NoError(t, err) + require.NotNil(t, h) + } + + }) + } +} diff --git a/pkg/stanza/fileconsumer/reader.go b/pkg/stanza/fileconsumer/reader.go index 133fc3255355d..307f02ea29ba7 100644 --- a/pkg/stanza/fileconsumer/reader.go +++ b/pkg/stanza/fileconsumer/reader.go @@ -22,7 +22,9 @@ import ( "go.uber.org/zap" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/pipeline" ) type readerConfig struct { @@ -35,15 +37,24 @@ type readerConfig struct { type Reader struct { *zap.SugaredLogger `json:"-"` // json tag excludes embedded fields from storage *readerConfig - splitFunc bufio.SplitFunc - encoding helper.Encoding + lineSplitFunc bufio.SplitFunc + splitFunc bufio.SplitFunc + encoding helper.Encoding + processFunc EmitFunc Fingerprint *Fingerprint Offset int64 generation int file *os.File - fileAttributes *FileAttributes + FileAttributes *FileAttributes eof bool + + HeaderFinalized bool + recreateScanner bool + + headerSettings *headerSettings + headerPipeline pipeline.Pipeline + headerPipelineOutput *headerPipelineOutput } // offsetToEnd sets the starting offset @@ -88,13 +99,72 @@ func (r *Reader) ReadToEnd(ctx context.Context) { if err != nil { r.Errorw("decode: %w", zap.Error(err)) } else { - r.emit(ctx, r.fileAttributes, token) + r.processFunc(ctx, r.FileAttributes, token) + } + + if r.recreateScanner { + r.recreateScanner = false + // recreate the scanner with the log-line's split func. + // We do not use the updated offset from the scanner, + // as the log line we just read could be multiline, and would be + // split differently with the new splitter. + if _, err := r.file.Seek(r.Offset, 0); err != nil { + r.Errorw("Failed to seek post-header", zap.Error(err)) + return + } + + scanner = NewPositionalScanner(r, r.maxLogSize, r.Offset, r.splitFunc) } r.Offset = scanner.Pos() } } +// consumeHeaderLine checks if the given token is a line of the header, and consumes it if it is. +// The return value dictates whether the given line was a header line or not. +// If false is returned, the full header can be assumed to be read. +func (r *Reader) consumeHeaderLine(ctx context.Context, attrs *FileAttributes, token []byte) { + if !r.headerSettings.matchRegex.Match(token) { + // Finalize and cleanup the pipeline + r.HeaderFinalized = true + + // Stop and drop the header pipeline. + if err := r.headerPipeline.Stop(); err != nil { + r.Errorw("Failed to stop header pipeline during finalization", zap.Error(err)) + } + r.headerPipeline = nil + r.headerPipelineOutput = nil + + // Use the line split func instead of the header split func + r.splitFunc = r.lineSplitFunc + r.processFunc = r.emit + // Mark that we should recreate the scanner, since we changed the split function + r.recreateScanner = true + return + } + + firstOperator := r.headerPipeline.Operators()[0] + + newEntry := entry.New() + newEntry.Body = string(token) + + if err := firstOperator.Process(ctx, newEntry); err != nil { + r.Errorw("Failed to process header entry", zap.Error(err)) + return + } + + ent, err := r.headerPipelineOutput.WaitForEntry(ctx) + if err != nil { + r.Errorw("Error while waiting for header entry", zap.Error(err)) + return + } + + // Copy resultant attributes over current set of attributes (upsert) + for k, v := range ent.Attributes { + r.FileAttributes.HeaderAttributes[k] = v + } +} + // Close will close the file func (r *Reader) Close() { if r.file != nil { @@ -102,6 +172,12 @@ func (r *Reader) Close() { r.Debugw("Problem closing reader", zap.Error(err)) } } + + if r.headerPipeline != nil { + if err := r.headerPipeline.Stop(); err != nil { + r.Errorw("Failed to stop header pipeline", zap.Error(err)) + } + } } // Read from the file and update the fingerprint if necessary @@ -132,3 +208,19 @@ func min0(a, b int) int { } return b } + +// mapCopy deep copies the provided attributes map. +func mapCopy(m map[string]any) map[string]any { + newMap := make(map[string]any, len(m)) + for k, v := range m { + switch typedVal := v.(type) { + case map[string]any: + newMap[k] = mapCopy(typedVal) + default: + // Assume any other values are safe to directly copy. + // Struct types and slice types shouldn't appear in attribute maps from pipelines + newMap[k] = v + } + } + return newMap +} diff --git a/pkg/stanza/fileconsumer/reader_factory.go b/pkg/stanza/fileconsumer/reader_factory.go index b35ab3030ea79..b8cf7f67c1332 100644 --- a/pkg/stanza/fileconsumer/reader_factory.go +++ b/pkg/stanza/fileconsumer/reader_factory.go @@ -16,11 +16,14 @@ package fileconsumer // import "github.com/open-telemetry/opentelemetry-collecto import ( "bufio" + "fmt" "os" + "go.opentelemetry.io/collector/extension/experimental/storage" "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/pipeline" ) type readerFactory struct { @@ -29,6 +32,7 @@ type readerFactory struct { fromBeginning bool splitterFactory splitterFactory encodingConfig helper.EncodingConfig + headerSettings *headerSettings } func (f *readerFactory) newReader(file *os.File, fp *Fingerprint) (*Reader, error) { @@ -44,7 +48,9 @@ func (f *readerFactory) copy(old *Reader, newFile *os.File) (*Reader, error) { withFile(newFile). withFingerprint(old.Fingerprint.Copy()). withOffset(old.Offset). - withSplitterFunc(old.splitFunc). + withSplitterFunc(old.lineSplitFunc). + withHeaderAttributes(mapCopy(old.FileAttributes.HeaderAttributes)). + withHeaderFinalized(old.HeaderFinalized). build() } @@ -58,10 +64,12 @@ func (f *readerFactory) newFingerprint(file *os.File) (*Fingerprint, error) { type readerBuilder struct { *readerFactory - file *os.File - fp *Fingerprint - offset int64 - splitFunc bufio.SplitFunc + file *os.File + fp *Fingerprint + offset int64 + splitFunc bufio.SplitFunc + headerFinalized bool + headerAttributes map[string]any } func (f *readerFactory) newReaderBuilder() *readerBuilder { @@ -88,21 +96,40 @@ func (b *readerBuilder) withOffset(offset int64) *readerBuilder { return b } +func (b *readerBuilder) withHeaderFinalized(finalized bool) *readerBuilder { + b.headerFinalized = finalized + return b +} + +func (b *readerBuilder) withHeaderAttributes(attrs map[string]any) *readerBuilder { + b.headerAttributes = attrs + return b +} + func (b *readerBuilder) build() (r *Reader, err error) { r = &Reader{ - readerConfig: b.readerConfig, - Offset: b.offset, + readerConfig: b.readerConfig, + Offset: b.offset, + headerSettings: b.headerSettings, + HeaderFinalized: b.headerFinalized, } if b.splitFunc != nil { - r.splitFunc = b.splitFunc + r.lineSplitFunc = b.splitFunc } else { - r.splitFunc, err = b.splitterFactory.Build(b.readerConfig.maxLogSize) + r.lineSplitFunc, err = b.splitterFactory.Build(b.readerConfig.maxLogSize) if err != nil { return } } + if b.headerSettings != nil && !b.headerFinalized { + // If we are reading the header, we should start with the header split func + r.splitFunc = b.headerSettings.splitFunc + } else { + r.splitFunc = r.lineSplitFunc + } + enc, err := b.encodingConfig.Build() if err != nil { return @@ -112,7 +139,7 @@ func (b *readerBuilder) build() (r *Reader, err error) { if b.file != nil { r.file = b.file r.SugaredLogger = b.SugaredLogger.With("path", b.file.Name()) - r.fileAttributes, err = resolveFileAttributes(b.file.Name()) + r.FileAttributes, err = resolveFileAttributes(b.file.Name()) if err != nil { b.Errorf("resolve attributes: %w", err) } @@ -125,6 +152,13 @@ func (b *readerBuilder) build() (r *Reader, err error) { } } else { r.SugaredLogger = b.SugaredLogger.With("path", "uninitialized") + r.FileAttributes = &FileAttributes{} + } + + if b.headerAttributes != nil { + r.FileAttributes.HeaderAttributes = b.headerAttributes + } else { + r.FileAttributes.HeaderAttributes = map[string]any{} } if b.fp != nil { @@ -137,5 +171,31 @@ func (b *readerBuilder) build() (r *Reader, err error) { r.Fingerprint = fp } + // Create the header pipeline if we need it + // (if we are doing header parsing (headerSettings != nil), and if the header is not yet finalized) + if b.headerSettings != nil && !b.headerFinalized { + outOp := newHeaderPipelineOutput(b.SugaredLogger) + p, err := pipeline.Config{ + Operators: b.headerSettings.config.MetadataOperators, + DefaultOutput: outOp, + }.Build(b.SugaredLogger) + + if err != nil { + return nil, fmt.Errorf("failed to build pipeline: %w", err) + } + + if err := p.Start(storage.NewNopClient()); err != nil { + return nil, fmt.Errorf("failed to start header pipeline: %w", err) + } + + r.headerPipeline = p + r.headerPipelineOutput = outOp + + // Set initial emit func to header function + r.processFunc = r.consumeHeaderLine + } else { + r.processFunc = b.readerConfig.emit + } + return r, nil } diff --git a/pkg/stanza/fileconsumer/reader_test.go b/pkg/stanza/fileconsumer/reader_test.go index 9e7e67ba71557..f78533abfe46d 100644 --- a/pkg/stanza/fileconsumer/reader_test.go +++ b/pkg/stanza/fileconsumer/reader_test.go @@ -19,9 +19,12 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/parser/regex" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/testutil" ) @@ -161,6 +164,46 @@ func TestTokenizationTooLongWithLineStartPattern(t *testing.T) { } } +func TestHeaderFingerprintIncluded(t *testing.T) { + fileContent := []byte("#header-line\naaa\n") + + f, _ := testReaderFactory(t) + f.readerConfig.maxLogSize = 10 + + regexConf := regex.NewConfig() + regexConf.Regex = "^#(?P
.*)" + + headerConf := &HeaderConfig{ + Pattern: "^#", + MetadataOperators: []operator.Config{ + { + Builder: regexConf, + }, + }, + } + + enc, err := helper.EncodingConfig{ + Encoding: "utf-8", + }.Build() + require.NoError(t, err) + + h, err := headerConf.buildHeaderSettings(enc.Encoding) + require.NoError(t, err) + f.headerSettings = h + + temp := openTemp(t, t.TempDir()) + + r, err := f.newReaderBuilder().withFile(temp).build() + require.NoError(t, err) + + _, err = temp.Write(fileContent) + require.NoError(t, err) + + r.ReadToEnd(context.Background()) + + require.Equal(t, []byte("#header-line\naaa\n"), r.Fingerprint.FirstBytes) +} + func testReaderFactory(t *testing.T) (*readerFactory, chan *emitParams) { emitChan := make(chan *emitParams, 100) splitterConfig := helper.NewSplitterConfig() @@ -186,3 +229,24 @@ func readToken(t *testing.T, c chan *emitParams) []byte { } return nil } + +func TestMapCopy(t *testing.T) { + initMap := map[string]any{ + "mapVal": map[string]any{ + "nestedVal": "value1", + }, + "intVal": 1, + "strVal": "OrigStr", + } + + copyMap := mapCopy(initMap) + // Mutate values on the copied map + copyMap["mapVal"].(map[string]any)["nestedVal"] = "overwrittenValue" + copyMap["intVal"] = 2 + copyMap["strVal"] = "CopyString" + + // Assert that the original map should have the same values + assert.Equal(t, "value1", initMap["mapVal"].(map[string]any)["nestedVal"]) + assert.Equal(t, 1, initMap["intVal"]) + assert.Equal(t, "OrigStr", initMap["strVal"]) +} diff --git a/pkg/stanza/fileconsumer/testdata/config.yaml b/pkg/stanza/fileconsumer/testdata/config.yaml index aa81cd7f351c2..911868cfe09cc 100644 --- a/pkg/stanza/fileconsumer/testdata/config.yaml +++ b/pkg/stanza/fileconsumer/testdata/config.yaml @@ -157,3 +157,9 @@ start_at_string: max_batches_1: type: mock max_batches: 1 +header_config: + type: mock + header: + pattern: "^#" + metadata_operators: + - type: "regex_parser" diff --git a/pkg/stanza/fileconsumer/util_test.go b/pkg/stanza/fileconsumer/util_test.go index d10a51087f824..2d3748045b679 100644 --- a/pkg/stanza/fileconsumer/util_test.go +++ b/pkg/stanza/fileconsumer/util_test.go @@ -30,6 +30,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/parser/regex" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/testutil" ) @@ -47,6 +48,23 @@ func (c *Config) includeDir(dir string) *Config { return c } +// withHeader is a builder-like helper for quickly setting up a test config header +func (c *Config) withHeader(headerMatchPattern, extractRegex string) *Config { + regexOpConfig := regex.NewConfig() + regexOpConfig.Regex = extractRegex + + c.Header = &HeaderConfig{ + Pattern: headerMatchPattern, + MetadataOperators: []operator.Config{ + { + Builder: regexOpConfig, + }, + }, + } + + return c +} + func emitOnChan(received chan []byte) EmitFunc { return func(_ context.Context, _ *FileAttributes, token []byte) { received <- token @@ -156,6 +174,16 @@ func waitForToken(t *testing.T, c chan *emitParams, expected []byte) { } } +func waitForTokenHeaderAttributes(t *testing.T, c chan *emitParams, expected []byte, headerAttributes map[string]any) { + select { + case call := <-c: + require.Equal(t, expected, call.token) + require.Equal(t, headerAttributes, call.attrs.HeaderAttributes) + case <-time.After(3 * time.Second): + require.FailNow(t, fmt.Sprintf("Timed out waiting for token: %s", expected)) + } +} + func waitForTokens(t *testing.T, c chan *emitParams, expected [][]byte) { actual := make([][]byte, 0, len(expected)) LOOP: diff --git a/pkg/stanza/operator/input/file/config.go b/pkg/stanza/operator/input/file/config.go index 0ed45eeda4cc4..d21a4ba7fd06f 100644 --- a/pkg/stanza/operator/input/file/config.go +++ b/pkg/stanza/operator/input/file/config.go @@ -55,6 +55,11 @@ func (c Config) Build(logger *zap.SugaredLogger) (operator.Operator, error) { } var preEmitOptions []preEmitOption + + if fileconsumer.AllowHeaderMetadataParsing.IsEnabled() { + preEmitOptions = append(preEmitOptions, setHeaderMetadata) + } + if c.IncludeFileName { preEmitOptions = append(preEmitOptions, setFileName) } diff --git a/pkg/stanza/operator/input/file/file.go b/pkg/stanza/operator/input/file/file.go index 6605c5fa7dfb5..f43bf5763a9a8 100644 --- a/pkg/stanza/operator/input/file/file.go +++ b/pkg/stanza/operator/input/file/file.go @@ -67,6 +67,10 @@ func (f *Input) emit(ctx context.Context, attrs *fileconsumer.FileAttributes, to type preEmitOption func(*fileconsumer.FileAttributes, *entry.Entry) error +func setHeaderMetadata(attrs *fileconsumer.FileAttributes, ent *entry.Entry) error { + return ent.Set(entry.NewAttributeField(), attrs.HeaderAttributesCopy()) +} + func setFileName(attrs *fileconsumer.FileAttributes, ent *entry.Entry) error { return ent.Set(entry.NewAttributeField("log.file.name"), attrs.Name) } diff --git a/receiver/filelogreceiver/README.md b/receiver/filelogreceiver/README.md index 75d5175b1662f..9b2ae8455c770 100644 --- a/receiver/filelogreceiver/README.md +++ b/receiver/filelogreceiver/README.md @@ -34,6 +34,9 @@ Tails and parses logs from files. | `resource` | {} | A map of `key: value` pairs to add to the entry's resource | | `operators` | [] | An array of [operators](../../pkg/stanza/docs/operators/README.md#what-operators-are-available). See below for more details | | `storage` | none | The ID of a storage extension to be used to store file checkpoints. File checkpoints allow the receiver to pick up where it left off in the case of a collector restart. If no storage extension is used, the receiver will manage checkpoints in memory only. | +| `header` | nil | Specifies options for parsing header metadata. Requires that the `filelog.allowHeaderMetadataParsing` feature gate is enabled. See below for details. | +| `header.pattern` | required for header metadata parsing | A regex that matches every header line. | +| `header.metadata_operators` | required for header metadata parsing | A list of operators used to parse metadata from the header. | Note that _by default_, no logs will be read from a file that is not actively being written to because `start_at` defaults to `end`. @@ -66,6 +69,14 @@ match either the beginning of a new log entry, or the end of a log entry. Other less common encodings are supported on a best-effort basis. See [https://www.iana.org/assignments/character-sets/character-sets.xhtml](https://www.iana.org/assignments/character-sets/character-sets.xhtml) for other encodings available. +### Header Metadata Parsing + +To enable header metadata parsing, the `filelog.allowHeaderMetadataParsing` feature gate must be set, and `start_at` must be `beginning`. + +If set, the file input operator will attempt to read a header from the start of the file. Each header line must match the `header.pattern` pattern. Each line is emitted into a pipeline defined by `header.metadata_operators`. Any attributes on the resultant entry from the embedded pipeline will be merged with the attributes from previous lines (attribute collisions will be resolved with an upsert strategy). After all header lines are read, the final merged header attributes will be present on every log line that is emitted for the file. + +The header lines are not emitted by the receiver. + ## Additional Terminology and Features - An [entry](../../pkg/stanza/docs/types/entry.md) is the base representation of log data as it moves through a pipeline. All operators either create, modify, or consume entries.