Skip to content

Commit

Permalink
[fileconsumer] Move fingerprint into internal package (open-telemetry…
Browse files Browse the repository at this point in the history
…#23998)

Incremental refactoring of the fileconsumer package creates an
`internal/fingerprint` package.
  • Loading branch information
djaglowski committed Jul 6, 2023
1 parent 806a59b commit 2b58e23
Show file tree
Hide file tree
Showing 9 changed files with 116 additions and 123 deletions.
20 changes: 20 additions & 0 deletions .chloggen/fileconsumer-internal.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Use this changelog template to create an entry for release notes.
# If your change doesn't affect end users, such as a test fix or a tooling change,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: breaking

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: pkg/stanza

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Make fileconsumer.Fingerprint internal

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [23998]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
5 changes: 3 additions & 2 deletions pkg/stanza/fileconsumer/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/stretchr/testify/require"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/testutil"
)

Expand Down Expand Up @@ -108,7 +109,7 @@ func BenchmarkFileInput(b *testing.B) {
cfg.Include = []string{
"file*.log",
}
cfg.FingerprintSize = 10 * DefaultFingerprintSize
cfg.FingerprintSize = 10 * fingerprint.DefaultSize
return cfg
},
},
Expand All @@ -122,7 +123,7 @@ func BenchmarkFileInput(b *testing.B) {
cfg.Include = []string{
"file*.log",
}
cfg.FingerprintSize = DefaultFingerprintSize / 10
cfg.FingerprintSize = fingerprint.DefaultSize / 10
return cfg
},
},
Expand Down
7 changes: 4 additions & 3 deletions pkg/stanza/fileconsumer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"go.opentelemetry.io/collector/featuregate"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper"
)

Expand Down Expand Up @@ -45,7 +46,7 @@ func NewConfig() *Config {
PollInterval: 200 * time.Millisecond,
Splitter: helper.NewSplitterConfig(),
StartAt: "end",
FingerprintSize: DefaultFingerprintSize,
FingerprintSize: fingerprint.DefaultSize,
MaxLogSize: defaultMaxLogSize,
MaxConcurrentFiles: defaultMaxConcurrentFiles,
MaxBatches: 0,
Expand Down Expand Up @@ -205,8 +206,8 @@ func (c Config) validate() error {
return fmt.Errorf("`max_concurrent_files` must be greater than 1")
}

if c.FingerprintSize < MinFingerprintSize {
return fmt.Errorf("`fingerprint_size` must be at least %d bytes", MinFingerprintSize)
if c.FingerprintSize < fingerprint.MinSize {
return fmt.Errorf("`fingerprint_size` must be at least %d bytes", fingerprint.MinSize)
}

if c.DeleteAfterRead && c.StartAt == "end" {
Expand Down
13 changes: 7 additions & 6 deletions pkg/stanza/fileconsumer/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
)

Expand All @@ -37,7 +38,7 @@ type Manager struct {
knownFiles []*Reader
seenPaths map[string]struct{}

currentFps []*Fingerprint
currentFps []*fingerprint.Fingerprint
}

func (m *Manager) Start(persister operator.Persister) error {
Expand Down Expand Up @@ -188,7 +189,7 @@ func (m *Manager) consume(ctx context.Context, paths []string) {
m.clearCurrentFingerprints()
}

func (m *Manager) makeFingerprint(path string) (*Fingerprint, *os.File) {
func (m *Manager) makeFingerprint(path string) (*fingerprint.Fingerprint, *os.File) {
if _, ok := m.seenPaths[path]; !ok {
if m.readerFactory.fromBeginning {
m.Infow("Started watching file", "path", path)
Expand Down Expand Up @@ -221,7 +222,7 @@ func (m *Manager) makeFingerprint(path string) (*Fingerprint, *os.File) {
return fp, file
}

func (m *Manager) checkDuplicates(fp *Fingerprint) bool {
func (m *Manager) checkDuplicates(fp *fingerprint.Fingerprint) bool {
for i := 0; i < len(m.currentFps); i++ {
fp2 := m.currentFps[i]
if fp.StartsWith(fp2) || fp2.StartsWith(fp) {
Expand Down Expand Up @@ -260,7 +261,7 @@ func (m *Manager) makeReader(path string) *Reader {
}

func (m *Manager) clearCurrentFingerprints() {
m.currentFps = make([]*Fingerprint, 0)
m.currentFps = make([]*fingerprint.Fingerprint, 0)
}

// saveCurrent adds the readers from this polling interval to this list of
Expand All @@ -282,7 +283,7 @@ func (m *Manager) saveCurrent(readers []*Reader) {
}
}

func (m *Manager) newReader(file *os.File, fp *Fingerprint) (*Reader, error) {
func (m *Manager) newReader(file *os.File, fp *fingerprint.Fingerprint) (*Reader, error) {
// Check if the new path has the same fingerprint as an old path
if oldReader, ok := m.findFingerprintMatch(fp); ok {
return m.readerFactory.copy(oldReader, file)
Expand All @@ -292,7 +293,7 @@ func (m *Manager) newReader(file *os.File, fp *Fingerprint) (*Reader, error) {
return m.readerFactory.newReader(file, fp)
}

func (m *Manager) findFingerprintMatch(fp *Fingerprint) (*Reader, bool) {
func (m *Manager) findFingerprintMatch(fp *fingerprint.Fingerprint) (*Reader, bool) {
// Iterate backwards to match newest first
for i := len(m.knownFiles) - 1; i >= 0; i-- {
oldReader := m.knownFiles[i]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package fileconsumer // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer"
package fingerprint // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint"

import (
"bytes"
Expand All @@ -11,24 +11,18 @@ import (
"os"
)

// Deprecated: [v0.80.0] This will be made internal in a future release, tentatively v0.82.0.
const DefaultFingerprintSize = 1000 // bytes
const DefaultSize = 1000 // bytes

// Deprecated: [v0.80.0] This will be made internal in a future release, tentatively v0.82.0.
const MinFingerprintSize = 16 // bytes
const MinSize = 16 // bytes

// Fingerprint is used to identify a file
// A file's fingerprint is the first N bytes of the file
//
// Deprecated: [v0.80.0] This will be made internal in a future release, tentatively v0.82.0.
type Fingerprint struct {
FirstBytes []byte
}

// NewFingerprint creates a new fingerprint from an open file
//
// Deprecated: [v0.80.0] This will be made internal in a future release, tentatively v0.82.0.
func NewFingerprint(file *os.File, size int) (*Fingerprint, error) {
// New creates a new fingerprint from an open file
func New(file *os.File, size int) (*Fingerprint, error) {
buf := make([]byte, size)

n, err := file.ReadAt(buf, 0)
Expand Down
Loading

0 comments on commit 2b58e23

Please sign in to comment.