Skip to content

Commit

Permalink
[receiver/filelog] Add support for parsing header lines as log metada…
Browse files Browse the repository at this point in the history
…ta (open-telemetry#18921)

* add header metadata parsing to filelog receiver
  • Loading branch information
BinaryFissionGames committed Mar 6, 2023
1 parent d702d86 commit d4352ea
Show file tree
Hide file tree
Showing 16 changed files with 940 additions and 18 deletions.
13 changes: 13 additions & 0 deletions .chloggen/filelog-header-parsing.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: filelogreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Adds ability to parse a file header and parse it as metadata to decorate log entries.

# One or more tracking issues related to the change
issues: [18198]

subtext: This feature must be activated with the `filelog.allowHeaderMetadataParsing` feature gate.
10 changes: 10 additions & 0 deletions pkg/stanza/docs/operators/file_input.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ The `file_input` operator reads logs from files. It will place the lines read in
| `delete_after_read` | `false` | If `true`, each log file will be read and then immediately deleted. Requires that the `filelog.allowFileDeletion` feature gate is enabled. |
| `attributes` | {} | A map of `key: value` pairs to add to the entry's attributes. |
| `resource` | {} | A map of `key: value` pairs to add to the entry's resource. |
| `header` | nil | Specifies options for parsing header metadata. Requires that the `filelog.allowHeaderMetadataParsing` feature gate is enabled. See below for details. |
| `header.pattern` | required for header metadata parsing | A regex that matches every header line. |
| `header.metadata_operators` | required for header metadata parsing | A list of operators used to parse metadata from the header. |

Note that by default, no logs will be read unless the monitored file is actively being written to because `start_at` defaults to `end`.

Expand Down Expand Up @@ -65,6 +68,13 @@ To avoid the data loss, choose move/create rotation method and set `max_concurre

Other less common encodings are supported on a best-effort basis. See [https://www.iana.org/assignments/character-sets/character-sets.xhtml](https://www.iana.org/assignments/character-sets/character-sets.xhtml) for other encodings available.

### Header Metadata Parsing

To enable header metadata parsing, the `filelog.allowHeaderMetadataParsing` feature gate must be set, and `start_at` must be `beginning`.

If set, the file input operator will attempt to read a header from the start of the file. Each header line must match the `header.pattern` pattern. Each line is emitted into a pipeline defined by `header.metadata_operators`. Any attributes on the resultant entry from the embedded pipeline will be merged with the attributes from previous lines (attribute collisions will be resolved with an upsert strategy). After all header lines are read, the final merged header attributes will be present on every log line that is emitted for the file.

The header lines are not emitted to the output operator.

### Example Configurations

Expand Down
14 changes: 10 additions & 4 deletions pkg/stanza/fileconsumer/attributes.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,16 @@ import (
)

type FileAttributes struct {
Name string
Path string
NameResolved string
PathResolved string
Name string `json:"-"`
Path string `json:"-"`
NameResolved string `json:"-"`
PathResolved string `json:"-"`
HeaderAttributes map[string]any
}

// HeaderAttributesCopy gives a copy of the HeaderAttributes, in order to restrict mutation of the HeaderAttributes.
func (f *FileAttributes) HeaderAttributesCopy() map[string]any {
return mapCopy(f.HeaderAttributes)
}

// resolveFileAttributes resolves file attributes
Expand Down
39 changes: 39 additions & 0 deletions pkg/stanza/fileconsumer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,13 @@ var allowFileDeletion = featuregate.GlobalRegistry().MustRegister(
featuregate.WithRegisterReferenceURL("https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/16314"),
)

var AllowHeaderMetadataParsing = featuregate.GlobalRegistry().MustRegister(
"filelog.allowHeaderMetadataParsing",
featuregate.StageAlpha,
featuregate.WithRegisterDescription("When enabled, allows usage of the `header` setting."),
featuregate.WithRegisterReferenceURL("https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/18198"),
)

// NewConfig creates a new input config with default values
func NewConfig() *Config {
return &Config{
Expand Down Expand Up @@ -71,13 +78,19 @@ type Config struct {
MaxBatches int `mapstructure:"max_batches,omitempty"`
DeleteAfterRead bool `mapstructure:"delete_after_read,omitempty"`
Splitter helper.SplitterConfig `mapstructure:",squash,omitempty"`
Header *HeaderConfig `mapstructure:"header,omitempty"`
}

// Build will build a file input operator from the supplied configuration
func (c Config) Build(logger *zap.SugaredLogger, emit EmitFunc) (*Manager, error) {
if c.DeleteAfterRead && !allowFileDeletion.IsEnabled() {
return nil, fmt.Errorf("`delete_after_read` requires feature gate `%s`", allowFileDeletion.ID())
}

if c.Header != nil && !AllowHeaderMetadataParsing.IsEnabled() {
return nil, fmt.Errorf("`header` requires feature gate `%s`", AllowHeaderMetadataParsing.ID())
}

if err := c.validate(); err != nil {
return nil, err
}
Expand Down Expand Up @@ -124,6 +137,20 @@ func (c Config) buildManager(logger *zap.SugaredLogger, emit EmitFunc, factory s
default:
return nil, fmt.Errorf("invalid start_at location '%s'", c.StartAt)
}

var hs *headerSettings
if c.Header != nil {
enc, err := c.Splitter.EncodingConfig.Build()
if err != nil {
return nil, fmt.Errorf("failed to create encoding: %w", err)
}

hs, err = c.Header.buildHeaderSettings(enc.Encoding)
if err != nil {
return nil, fmt.Errorf("failed to build header config: %w", err)
}
}

return &Manager{
SugaredLogger: logger.With("component", "fileconsumer"),
cancel: func() {},
Expand All @@ -137,6 +164,7 @@ func (c Config) buildManager(logger *zap.SugaredLogger, emit EmitFunc, factory s
fromBeginning: startAtBeginning,
splitterFactory: factory,
encodingConfig: c.Splitter.EncodingConfig,
headerSettings: hs,
},
finder: c.Finder,
roller: newRoller(),
Expand Down Expand Up @@ -186,6 +214,10 @@ func (c Config) validate() error {
return fmt.Errorf("`delete_after_read` cannot be used with `start_at: end`")
}

if c.Header != nil && c.StartAt == "end" {
return fmt.Errorf("`header` cannot be specified with `start_at: end`")
}

if c.MaxBatches < 0 {
return errors.New("`max_batches` must not be negative")
}
Expand All @@ -194,5 +226,12 @@ func (c Config) validate() error {
if err != nil {
return err
}

if c.Header != nil {
if err := c.Header.validate(); err != nil {
return fmt.Errorf("invalid config for `header`: %w", err)
}
}

return nil
}
119 changes: 119 additions & 0 deletions pkg/stanza/fileconsumer/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,12 @@ import (
"time"

"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/featuregate"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/operatortest"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/parser/regex"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/testutil"
)

Expand Down Expand Up @@ -360,6 +363,22 @@ func TestUnmarshal(t *testing.T) {
return newMockOperatorConfig(cfg)
}(),
},
{
Name: "header_config",
Expect: func() *mockOperatorConfig {
cfg := NewConfig()
regexCfg := regex.NewConfig()
cfg.Header = &HeaderConfig{
Pattern: "^#",
MetadataOperators: []operator.Config{
{
Builder: regexCfg,
},
},
}
return newMockOperatorConfig(cfg)
}(),
},
},
}.Run(t)
}
Expand Down Expand Up @@ -518,6 +537,14 @@ func TestBuild(t *testing.T) {
require.Equal(t, 6, m.maxBatches)
},
},
{
"HeaderConfigNoFlag",
func(f *Config) {
f.Header = &HeaderConfig{}
},
require.Error,
nil,
},
}

for _, tc := range cases {
Expand Down Expand Up @@ -620,3 +647,95 @@ func TestBuildWithSplitFunc(t *testing.T) {
})
}
}

func TestBuildWithHeader(t *testing.T) {
require.NoError(t, featuregate.GlobalRegistry().Set(AllowHeaderMetadataParsing.ID(), true))
t.Cleanup(func() {
require.NoError(t, featuregate.GlobalRegistry().Set(AllowHeaderMetadataParsing.ID(), false))
})

basicConfig := func() *Config {
cfg := NewConfig()
cfg.Include = []string{"/var/log/testpath.*"}
cfg.Exclude = []string{"/var/log/testpath.ex*"}
cfg.PollInterval = 10 * time.Millisecond
return cfg
}

cases := []struct {
name string
modifyBaseConfig func(*Config)
errorRequirement require.ErrorAssertionFunc
validate func(*testing.T, *Manager)
}{
{
"InvalidHeaderConfig",
func(f *Config) {
f.Header = &HeaderConfig{}
f.StartAt = "beginning"
},
require.Error,
nil,
},
{
"HeaderConfigWithStartAtEnd",
func(f *Config) {
regexCfg := regex.NewConfig()
regexCfg.Regex = "^(?P<field>.*)"
f.Header = &HeaderConfig{
Pattern: "^#",
MetadataOperators: []operator.Config{
{
Builder: regexCfg,
},
},
}
f.StartAt = "end"
},
require.Error,
nil,
},
{
"ValidHeaderConfig",
func(f *Config) {
regexCfg := regex.NewConfig()
regexCfg.Regex = "^(?P<field>.*)"
f.Header = &HeaderConfig{
Pattern: "^#",
MetadataOperators: []operator.Config{
{
Builder: regexCfg,
},
},
}
f.StartAt = "beginning"
},
require.NoError,
func(t *testing.T, f *Manager) {
require.NotNil(t, f.readerFactory.headerSettings)
require.NotNil(t, f.readerFactory.headerSettings.matchRegex)
require.NotNil(t, f.readerFactory.headerSettings.splitFunc)
require.NotNil(t, f.readerFactory.headerSettings.config)
},
},
}

for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
tc := tc
t.Parallel()
cfg := basicConfig()
tc.modifyBaseConfig(cfg)

nopEmit := func(_ context.Context, _ *FileAttributes, _ []byte) {}

input, err := cfg.Build(testutil.Logger(t), nopEmit)
tc.errorRequirement(t, err)
if err != nil {
return
}

tc.validate(t, input)
})
}
}
Loading

0 comments on commit d4352ea

Please sign in to comment.