-
Notifications
You must be signed in to change notification settings - Fork 2.2k
/
roller_other.go
79 lines (67 loc) · 2.35 KB
/
roller_other.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
//go:build !windows
// +build !windows
package fileconsumer // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer"
import (
"context"
"sync"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint"
)
type detectLostFiles struct {
oldReaders []*reader
fingerprintSize int // used when we check for truncation
}
func newRoller(fingerprintSize int) roller {
return &detectLostFiles{
oldReaders: []*reader{},
fingerprintSize: fingerprintSize,
}
}
func (r *detectLostFiles) readLostFiles(ctx context.Context, newReaders []*reader) {
// Detect files that have been rotated out of matching pattern
lostReaders := make([]*reader, 0, len(r.oldReaders))
OUTER:
for _, oldReader := range r.oldReaders {
for _, newReader := range newReaders {
if newReader.Fingerprint.StartsWith(oldReader.Fingerprint) {
continue OUTER
}
if oldReader.fileName == newReader.fileName {
// At this point, we know that the file has been rotated. However, we do not know
// if it was moved or truncated. If truncated, then both handles point to the same
// file, in which case we should only read from it using the new reader.
// We can detect truncation by recreating a fingerprint from the old handle.
// If it matches the old fingerprint, then we know that the file was moved,
// so we can consider the file lost and continue reading from the old handle.
// If there's an error reading a new fingerprint from the old handle, let's assume we can't
// read the rest of it anyways.
refreshedFingerprint, err := fingerprint.New(oldReader.file, r.fingerprintSize)
if err == nil && !refreshedFingerprint.StartsWith(oldReader.Fingerprint) {
continue OUTER
}
}
}
lostReaders = append(lostReaders, oldReader)
}
var lostWG sync.WaitGroup
for _, lostReader := range lostReaders {
lostWG.Add(1)
go func(r *reader) {
defer lostWG.Done()
r.ReadToEnd(ctx)
}(lostReader)
}
lostWG.Wait()
}
func (r *detectLostFiles) roll(_ context.Context, newReaders []*reader) {
for _, oldReader := range r.oldReaders {
oldReader.Close()
}
r.oldReaders = newReaders
}
func (r *detectLostFiles) cleanup() {
for _, oldReader := range r.oldReaders {
oldReader.Close()
}
}