Skip to content

Commit

Permalink
[pkg/stanza] Unexport and organize fields on manager struct (open-tel…
Browse files Browse the repository at this point in the history
…emetry#12793)

* [pkg/stanza] Unexport and organize fields on manager struct

These fields are currently meant only for internal use.

* Add changelog entry
  • Loading branch information
djaglowski committed Aug 1, 2022
1 parent 4467071 commit 7eecb92
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 30 deletions.
20 changes: 10 additions & 10 deletions pkg/stanza/fileconsumer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,16 +117,8 @@ func (c Config) Build(logger *zap.SugaredLogger, emit EmitFunc) (*Input, error)
}

return &Input{
SugaredLogger: logger.With("component", "fileconsumer"),
finder: c.Finder,
PollInterval: c.PollInterval.Raw(),
queuedMatches: make([]string, 0),
firstCheck: true,
cancel: func() {},
knownFiles: make([]*Reader, 0, 10),
roller: newRoller(),
MaxConcurrentFiles: c.MaxConcurrentFiles,
SeenPaths: make(map[string]struct{}, 100),
SugaredLogger: logger.With("component", "fileconsumer"),
cancel: func() {},
readerFactory: readerFactory{
SugaredLogger: logger.With("component", "fileconsumer"),
readerConfig: &readerConfig{
Expand All @@ -137,5 +129,13 @@ func (c Config) Build(logger *zap.SugaredLogger, emit EmitFunc) (*Input, error)
fromBeginning: startAtBeginning,
splitterConfig: c.Splitter,
},
finder: c.Finder,
roller: newRoller(),
pollInterval: c.PollInterval.Raw(),
maxConcurrentFiles: c.MaxConcurrentFiles,
knownFiles: make([]*Reader, 0, 10),
seenPaths: make(map[string]struct{}, 100),
firstCheck: true,
queuedMatches: make([]string, 0),
}, nil
}
2 changes: 1 addition & 1 deletion pkg/stanza/fileconsumer/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@ func TestBuild(t *testing.T) {
require.NoError,
func(t *testing.T, f *Input) {
require.Equal(t, f.finder.Include, []string{"/var/log/testpath.*"})
require.Equal(t, f.PollInterval, 10*time.Millisecond)
require.Equal(t, f.pollInterval, 10*time.Millisecond)
},
},
{
Expand Down
32 changes: 15 additions & 17 deletions pkg/stanza/fileconsumer/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,24 +33,22 @@ type EmitFunc func(ctx context.Context, attrs *FileAttributes, token []byte)
// TODO rename this struct
type Input struct {
*zap.SugaredLogger
finder Finder
PollInterval time.Duration
wg sync.WaitGroup
cancel context.CancelFunc

MaxConcurrentFiles int
SeenPaths map[string]struct{}
readerFactory readerFactory
finder Finder
roller roller
persister operator.Persister

persister operator.Persister
pollInterval time.Duration
maxConcurrentFiles int
maxBatchFiles int

knownFiles []*Reader
seenPaths map[string]struct{}
firstCheck bool
queuedMatches []string
maxBatchFiles int
roller roller

firstCheck bool
wg sync.WaitGroup
cancel context.CancelFunc

readerFactory readerFactory
}

func (f *Input) Start(persister operator.Persister) error {
Expand Down Expand Up @@ -90,7 +88,7 @@ func (f *Input) startPoller(ctx context.Context) {
f.wg.Add(1)
go func() {
defer f.wg.Done()
globTicker := time.NewTicker(f.PollInterval)
globTicker := time.NewTicker(f.pollInterval)
defer globTicker.Stop()

for {
Expand All @@ -107,7 +105,7 @@ func (f *Input) startPoller(ctx context.Context) {

// poll checks all the watched paths for new entries
func (f *Input) poll(ctx context.Context) {
f.maxBatchFiles = f.MaxConcurrentFiles / 2
f.maxBatchFiles = f.maxConcurrentFiles / 2
var matches []string
if len(f.queuedMatches) > f.maxBatchFiles {
matches, f.queuedMatches = f.queuedMatches[:f.maxBatchFiles], f.queuedMatches[f.maxBatchFiles:]
Expand Down Expand Up @@ -166,13 +164,13 @@ func (f *Input) makeReaders(filesPaths []string) []*Reader {
// Open the files first to minimize the time between listing and opening
files := make([]*os.File, 0, len(filesPaths))
for _, path := range filesPaths {
if _, ok := f.SeenPaths[path]; !ok {
if _, ok := f.seenPaths[path]; !ok {
if f.readerFactory.fromBeginning {
f.Infow("Started watching file", "path", path)
} else {
f.Infow("Started watching file from end. To read preexisting logs, configure the argument 'start_at' to 'beginning'", "path", path)
}
f.SeenPaths[path] = struct{}{}
f.seenPaths[path] = struct{}{}
}
file, err := os.Open(path) // #nosec - operator must read in files defined by user
if err != nil {
Expand Down
10 changes: 8 additions & 2 deletions pkg/stanza/operator/input/file/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,8 +407,11 @@ func TestReadExistingAndNewLogs(t *testing.T) {
// we don't read any entries that were in the file before startup
func TestStartAtEnd(t *testing.T) {
t.Parallel()

var pollInterval time.Duration
operator, logReceived, tempDir := newTestFileOperator(t, func(cfg *Config) {
cfg.StartAt = "end"
pollInterval = cfg.PollInterval.Raw()
}, nil)

temp := openTemp(t, tempDir)
Expand All @@ -419,7 +422,7 @@ func TestStartAtEnd(t *testing.T) {
require.NoError(t, operator.Stop())
}()

time.Sleep(2 * operator.fileConsumer.PollInterval)
time.Sleep(2 * pollInterval)

expectNoMessages(t, logReceived)

Expand All @@ -433,16 +436,19 @@ func TestStartAtEnd(t *testing.T) {
// beginning
func TestStartAtEndNewFile(t *testing.T) {
t.Parallel()

var pollInterval time.Duration
operator, logReceived, tempDir := newTestFileOperator(t, func(cfg *Config) {
cfg.StartAt = "end"
pollInterval = cfg.PollInterval.Raw()
}, nil)

require.NoError(t, operator.Start(testutil.NewMockPersister("test")))
defer func() {
require.NoError(t, operator.Stop())
}()

time.Sleep(2 * operator.fileConsumer.PollInterval)
time.Sleep(2 * pollInterval)

temp := openTemp(t, tempDir)
writeString(t, temp, "testlog1\ntestlog2\n")
Expand Down
16 changes: 16 additions & 0 deletions unreleased/pkg-stanza-fileconsumer-cleanup-names.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: breaking

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: pkg/stanza/fileconsumer

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Unexport several fields that are meant for internal usage only

# One or more tracking issues related to the change
issues: [12793]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

0 comments on commit 7eecb92

Please sign in to comment.