Skip to content

Commit

Permalink
Renamed polling_file_limit to max_batches and changed logic to match …
Browse files Browse the repository at this point in the history
…the intent

Signed-off-by: Corbin Phelps <[email protected]>
  • Loading branch information
Corbin Phelps committed Feb 9, 2023
1 parent 37c98cc commit f8a6813
Show file tree
Hide file tree
Showing 8 changed files with 46 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ change_type: 'enhancement'
component: filelogreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Added `poll_file_limit` configuration parameter to limit the number of files consumed in a polling interval.
note: Added `max_batches` configuration parameter to limit the number of batches processed in a polling interval.

# One or more tracking issues related to the change
issues: [18476]
Expand Down
2 changes: 1 addition & 1 deletion pkg/stanza/docs/operators/file_input.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ The `file_input` operator reads logs from files. It will place the lines read in
| `include` | required | A list of file glob patterns that match the file paths to be read. |
| `exclude` | [] | A list of file glob patterns to exclude from reading. |
| `poll_interval` | 200ms | The duration between filesystem polls. |
| `poll_file_limit` | 0 | A limit on the number of files that can be consumed for a single polling cycle. A value of 0 implies an unlimited number of files. The value must be greater than or equal to `max_concurrent_files`. Ideally the value should be a multiple of `max_concurrent_file` or else the limit may be exceeded before it is enforced. |
| `multiline` | | A `multiline` configuration block. See below for details. |
| `force_flush_period` | `500ms` | Time since last read of data from file, after which currently buffered log should be send to pipeline. Takes `time.Time` as value. Zero means waiting for new data forever. |
| `encoding` | `utf-8` | The encoding of the file being read. See the list of supported encodings below for available options. |
Expand All @@ -23,6 +22,7 @@ The `file_input` operator reads logs from files. It will place the lines read in
| `fingerprint_size` | `1kb` | The number of bytes with which to identify a file. The first bytes in the file are used as the fingerprint. Decreasing this value at any point will cause existing fingerprints to forgotten, meaning that all files will be read from the beginning (one time). |
| `max_log_size` | `1MiB` | The maximum size of a log entry to read before failing. Protects against reading large amounts of data into memory |.
| `max_concurrent_files` | 1024 | The maximum number of log files from which logs will be read concurrently (minimum = 2). If the number of files matched in the `include` pattern exceeds half of this number, then files will be processed in batches. |
| `max_batches` | 0 | Only applicable when files must be batched in order to respect max_concurrent_files. This value limits the number of batches that will be processed during a single poll interval. A value of 0 indicates no limit. |
| `delete_after_read` | `false` | If `true`, each log file will be read and then immediately deleted. Requires that the `filelog.allowFileDeletion` feature gate is enabled. |
| `attributes` | {} | A map of `key: value` pairs to add to the entry's attributes. |
| `resource` | {} | A map of `key: value` pairs to add to the entry's resource. |
Expand Down
13 changes: 6 additions & 7 deletions pkg/stanza/fileconsumer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package fileconsumer // import "github.com/open-telemetry/opentelemetry-collecto

import (
"bufio"
"errors"
"fmt"
"time"

Expand Down Expand Up @@ -46,12 +47,12 @@ func NewConfig() *Config {
IncludeFileNameResolved: false,
IncludeFilePathResolved: false,
PollInterval: 200 * time.Millisecond,
PollFileLimit: 0,
Splitter: helper.NewSplitterConfig(),
StartAt: "end",
FingerprintSize: DefaultFingerprintSize,
MaxLogSize: defaultMaxLogSize,
MaxConcurrentFiles: defaultMaxConcurrentFiles,
MaxBatches: 0,
}
}

Expand All @@ -63,11 +64,11 @@ type Config struct {
IncludeFileNameResolved bool `mapstructure:"include_file_name_resolved,omitempty"`
IncludeFilePathResolved bool `mapstructure:"include_file_path_resolved,omitempty"`
PollInterval time.Duration `mapstructure:"poll_interval,omitempty"`
PollFileLimit int `mapstructure:"poll_file_limit,omitempty"`
StartAt string `mapstructure:"start_at,omitempty"`
FingerprintSize helper.ByteSize `mapstructure:"fingerprint_size,omitempty"`
MaxLogSize helper.ByteSize `mapstructure:"max_log_size,omitempty"`
MaxConcurrentFiles int `mapstructure:"max_concurrent_files,omitempty"`
MaxBatches int `mapstructure:"max_batches,omitempty"`
DeleteAfterRead bool `mapstructure:"delete_after_read,omitempty"`
Splitter helper.SplitterConfig `mapstructure:",squash,omitempty"`
}
Expand Down Expand Up @@ -141,7 +142,7 @@ func (c Config) buildManager(logger *zap.SugaredLogger, emit EmitFunc, factory s
roller: newRoller(),
pollInterval: c.PollInterval,
maxBatchFiles: c.MaxConcurrentFiles / 2,
pollFileLimit: c.PollFileLimit,
maxBatches: c.MaxBatches,
deleteAfterRead: c.DeleteAfterRead,
knownFiles: make([]*Reader, 0, 10),
seenPaths: make(map[string]struct{}, 100),
Expand Down Expand Up @@ -185,10 +186,8 @@ func (c Config) validate() error {
return fmt.Errorf("`delete_after_read` cannot be used with `start_at: end`")
}

// Poll file limit can be 0 to signal unlimited or it must be greater than or equal to
// max concurrent files to ensure we can read at least the configured max concurrent files in a polling cycle.
if c.PollFileLimit < 0 || (c.PollFileLimit < c.MaxConcurrentFiles && c.PollFileLimit != 0) {
return fmt.Errorf("`poll_file_limit` must be 0 or greater than or equal to `max_concurrent_files`")
if c.MaxBatches < 0 {
return errors.New("`max_batches` must not be negative")
}

_, err := c.Splitter.EncodingConfig.Build()
Expand Down
24 changes: 7 additions & 17 deletions pkg/stanza/fileconsumer/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,10 +353,10 @@ func TestUnmarshal(t *testing.T) {
}(),
},
{
Name: "poll_file_limit_1",
Name: "max_batches_1",
Expect: func() *mockOperatorConfig {
cfg := NewConfig()
cfg.PollFileLimit = 1
cfg.MaxBatches = 1
return newMockOperatorConfig(cfg)
}(),
},
Expand Down Expand Up @@ -501,31 +501,21 @@ func TestBuild(t *testing.T) {
nil,
},
{
"InvalidNegativePollFileLimit",
"InvalidMaxBatches",
func(f *Config) {
f.PollFileLimit = -1
f.MaxBatches = -1
},
require.Error,
nil,
},
{
"InvalidTooSmallPollFileLimit",
"ValidMaxBatches",
func(f *Config) {
f.MaxConcurrentFiles = 5
f.PollFileLimit = 4
},
require.Error,
nil,
},
{
"ValidPollFileLimit",
func(f *Config) {
f.MaxConcurrentFiles = 5
f.PollFileLimit = 6
f.MaxBatches = 6
},
require.NoError,
func(t *testing.T, m *Manager) {
require.Equal(t, 6, m.pollFileLimit)
require.Equal(t, 6, m.maxBatches)
},
},
}
Expand Down
17 changes: 8 additions & 9 deletions pkg/stanza/fileconsumer/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type Manager struct {
persister operator.Persister

pollInterval time.Duration
pollFileLimit int
maxBatches int
maxBatchFiles int
deleteAfterRead bool

Expand Down Expand Up @@ -113,19 +113,18 @@ func (m *Manager) poll(ctx context.Context) {
m.knownFiles[i].generation++
}

// Used to keep track of the number of files consumed in this poll cycle
filesConsumed := 0
// Used to keep track of the number of batches processed in this poll cycle
batchesProcessed := 0

// Get the list of paths on disk
matches := m.finder.FindFiles()
for len(matches) > m.maxBatchFiles {
matchesToConsume := matches[:m.maxBatchFiles]
m.consume(ctx, matchesToConsume)
m.consume(ctx, matches[:m.maxBatchFiles])

// If a pollFileLimit is set, check if we have consumed enough files
if m.pollFileLimit != 0 {
filesConsumed += len(matchesToConsume)
if filesConsumed >= m.pollFileLimit {
// If a maxBatches is set, check if we have hit the limit
if m.maxBatches != 0 {
batchesProcessed++
if batchesProcessed >= m.maxBatches {
return
}
}
Expand Down
41 changes: 19 additions & 22 deletions pkg/stanza/fileconsumer/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -856,16 +856,16 @@ func TestFileBatching(t *testing.T) {
linesPerFile := 10
maxConcurrentFiles := 20
maxBatchFiles := maxConcurrentFiles / 2
// Explicitly setting pollFileLimit to ensure a value of 0 does not enforce a limit
pollFileLimit := 0
// Explicitly setting maxBatches to ensure a value of 0 does not enforce a limit
maxBatches := 0

expectedBatches := files / maxBatchFiles // assumes no remainder

tempDir := t.TempDir()
cfg := NewConfig().includeDir(tempDir)
cfg.StartAt = "beginning"
cfg.MaxConcurrentFiles = maxConcurrentFiles
cfg.PollFileLimit = pollFileLimit
cfg.MaxBatches = maxBatches
emitCalls := make(chan *emitParams, files*linesPerFile)
operator := buildTestManagerWithEmit(t, cfg, emitCalls)
operator.persister = testutil.NewMockPersister("test")
Expand Down Expand Up @@ -1236,54 +1236,53 @@ func TestDeleteAfterRead(t *testing.T) {
}
}

func TestPollCycleLimiting(t *testing.T) {
func TestMaxBatching(t *testing.T) {
t.Parallel()

files := 50
linesPerFile := 10
maxConcurrentFiles := 20
maxBatchFiles := maxConcurrentFiles / 2
pollFileLimit := 40
maxBatches := 2

expectedBatches := pollFileLimit / maxBatchFiles // assumes no remainder
expectedBatches := maxBatches
expectedMaxFilesPerPoll := maxBatches * maxBatchFiles

tempDir := t.TempDir()
cfg := NewConfig().includeDir(tempDir)
cfg.StartAt = "beginning"
cfg.MaxConcurrentFiles = maxConcurrentFiles
cfg.PollFileLimit = pollFileLimit
cfg.MaxBatches = maxBatches
emitCalls := make(chan *emitParams, files*linesPerFile)
operator := buildTestManagerWithEmit(t, cfg, emitCalls)
operator.persister = testutil.NewMockPersister("test")

core, observedLogs := observer.New(zap.DebugLevel)
operator.SugaredLogger = zap.New(core).Sugar()

// We only expect that pollFileLimit files are consumed
temps := make([]*os.File, 0, pollFileLimit)
for i := 0; i < pollFileLimit; i++ {
temps := make([]*os.File, 0, files)
for i := 0; i < files; i++ {
temps = append(temps, openTemp(t, tempDir))
}

// Write logs to each file
expectedTokens := make([][]byte, 0, files*linesPerFile)
numExpectedTokens := expectedMaxFilesPerPoll * linesPerFile
for i, temp := range temps {
for j := 0; j < linesPerFile; j++ {
message := fmt.Sprintf("%s %d %d", tokenWithLength(100), i, j)
_, err := temp.WriteString(message + "\n")
require.NoError(t, err)
expectedTokens = append(expectedTokens, []byte(message))
}
}

// Poll and wait for all lines
operator.poll(context.Background())
actualTokens := make([][]byte, 0, files*linesPerFile)
actualTokens = append(actualTokens, waitForNTokens(t, emitCalls, len(expectedTokens))...)
require.ElementsMatch(t, expectedTokens, actualTokens)
actualTokens := make([][]byte, 0, numExpectedTokens)
actualTokens = append(actualTokens, waitForNTokens(t, emitCalls, numExpectedTokens)...)
require.Len(t, actualTokens, numExpectedTokens)

// During the first poll, we expect one log per batch and one log per file
require.Equal(t, pollFileLimit+expectedBatches, observedLogs.Len())
require.Equal(t, expectedMaxFilesPerPoll+expectedBatches, observedLogs.Len())
logNum := 0
for b := 0; b < expectedBatches; b++ {
log := observedLogs.All()[logNum]
Expand All @@ -1300,24 +1299,22 @@ func TestPollCycleLimiting(t *testing.T) {
}

// Write more logs to each file so we can validate that all files are still known
expectedTokens = make([][]byte, 0, files*linesPerFile)
for i, temp := range temps {
for j := 0; j < linesPerFile; j++ {
message := fmt.Sprintf("%s %d %d", tokenWithLength(20), i, j)
_, err := temp.WriteString(message + "\n")
require.NoError(t, err)
expectedTokens = append(expectedTokens, []byte(message))
}
}

// Poll again and wait for all new lines
operator.poll(context.Background())
actualTokens = make([][]byte, 0, files*linesPerFile)
actualTokens = append(actualTokens, waitForNTokens(t, emitCalls, len(expectedTokens))...)
require.ElementsMatch(t, expectedTokens, actualTokens)
actualTokens = make([][]byte, 0, numExpectedTokens)
actualTokens = append(actualTokens, waitForNTokens(t, emitCalls, numExpectedTokens)...)
require.Len(t, actualTokens, numExpectedTokens)

// During the second poll, we only expect one log per batch
require.Equal(t, pollFileLimit+expectedBatches*2, observedLogs.Len())
require.Equal(t, expectedMaxFilesPerPoll+expectedBatches*2, observedLogs.Len())
for b := logNum; b < observedLogs.Len(); b++ {
log := observedLogs.All()[logNum]
require.Equal(t, "Consuming files", log.Message)
Expand Down
4 changes: 2 additions & 2 deletions pkg/stanza/fileconsumer/testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,6 @@ poll_interval_no_units:
start_at_string:
type: mock
start_at: "beginning"
poll_file_limit_1:
max_batches_1:
type: mock
poll_file_limit: 1
max_batches: 1
4 changes: 2 additions & 2 deletions receiver/filelogreceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
| Stability | [beta] |
| Supported pipeline types | logs |
| Distributions | [contrib] |

kkjkh
Tails and parses logs from files.

## Configuration
Expand All @@ -23,10 +23,10 @@ Tails and parses logs from files.
| `include_file_name_resolved` | `false` | Whether to add the file name after symlinks resolution as the attribute `log.file.name_resolved`. |
| `include_file_path_resolved` | `false` | Whether to add the file path after symlinks resolution as the attribute `log.file.path_resolved`. |
| `poll_interval` | 200ms | The duration between filesystem polls |
| `poll_file_limit` | 0 | A limit on the number of files that can be consumed for a single polling cycle. A value of 0 implies an unlimited number of files. The value must be greater than or equal to `max_concurrent_files`. Ideally the value should be a multiple of `max_concurrent_file` or else the limit may be exceeded before it is enforced. |
| `fingerprint_size` | `1kb` | The number of bytes with which to identify a file. The first bytes in the file are used as the fingerprint. Decreasing this value at any point will cause existing fingerprints to forgotten, meaning that all files will be read from the beginning (one time) |
| `max_log_size` | `1MiB` | The maximum size of a log entry to read. A log entry will be truncated if it is larger than `max_log_size`. Protects against reading large amounts of data into memory |
| `max_concurrent_files` | 1024 | The maximum number of log files from which logs will be read concurrently. If the number of files matched in the `include` pattern exceeds this number, then files will be processed in batches. |
| `max_batches` | 0 | Only applicable when files must be batched in order to respect max_concurrent_files. This value limits the number of batches that will be processed during a single poll interval. A value of 0 indicates no limit. |
| `delete_after_read` | `false` | If `true`, each log file will be read and then immediately deleted. Requires that the `filelog.allowFileDeletion` feature gate is enabled. |
| `attributes` | {} | A map of `key: value` pairs to add to the entry's attributes |
| `resource` | {} | A map of `key: value` pairs to add to the entry's resource |
Expand Down

0 comments on commit f8a6813

Please sign in to comment.