Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix docker logs tailer with file rotation #4860

Merged
merged 7 commits into from
Feb 13, 2020
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pkg/logs/auditor/auditor.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,15 +68,18 @@ func New(runPath string, health *health.Handle) *Auditor {

// Start starts the Auditor
func (a *Auditor) Start() {
a.mu.Lock()
a.inputChan = make(chan *message.Message, config.ChanSize)
a.done = make(chan struct{})
a.mu.Unlock()
a.registry = a.recoverRegistry()
a.cleanupRegistry()
go a.run()
}

// Stop stops the Auditor
func (a *Auditor) Stop() {
a.mu.Lock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://github.com/DataDog/datadog-agent/pull/4860/files#diff-5142632c5dd9990aa6feb0918f9334f7L85 is in the critical path but I think it should not because any method for the run method acquiring the mutex can be called concurrently

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could probably create a setup and reset methods which are protected by the mutex

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess there's technically a chance of a deadlock if one of these tickers tick right after Stop here has locked the mutex, since cleanupRegistry and flushRegistry both need to hold the lock. The chances of running into it are likely extremely low, so not sure if it's worth addressing. To avoid this we could have a separate mutex for registry.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was also thinking to move it in the New() function.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make sense. I added the new mutex.

if a.inputChan != nil {
close(a.inputChan)
}
Expand All @@ -86,6 +89,7 @@ func (a *Auditor) Stop() {
a.done = nil
}
a.inputChan = nil
a.mu.Unlock()

a.cleanupRegistry()
err := a.flushRegistry()
Expand Down
22 changes: 17 additions & 5 deletions pkg/logs/input/docker/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,9 +132,15 @@ func (l *Launcher) Start() {
func (l *Launcher) Stop() {
l.stop <- struct{}{}
stopper := restart.NewParallelStopper()
l.lock.Lock()
var containerIDs []string
for _, tailer := range l.tailers {
stopper.Add(tailer)
l.removeTailer(tailer.ContainerID)
containerIDs = append(containerIDs, tailer.ContainerID)
}
l.lock.Unlock()
for _, containerID := range containerIDs {
l.removeTailer(containerID)
}
stopper.Stop()
}
Expand Down Expand Up @@ -236,7 +242,7 @@ func (l *Launcher) overrideSource(container *Container, source *config.LogSource
// startTailer starts a new tailer for the container matching with the source.
func (l *Launcher) startTailer(container *Container, source *config.LogSource) {
containerID := container.service.Identifier
if _, isTailed := l.tailers[containerID]; isTailed {
if _, isTailed := l.getTailer(containerID); isTailed {
log.Warnf("Can't tail twice the same container: %v", ShortContainerID(containerID))
return
}
Expand Down Expand Up @@ -270,7 +276,7 @@ func (l *Launcher) startTailer(container *Container, source *config.LogSource) {

// stopTailer stops the tailer matching the containerID.
func (l *Launcher) stopTailer(containerID string) {
if tailer, isTailed := l.tailers[containerID]; isTailed {
if tailer, isTailed := l.getTailer(containerID); isTailed {
// No-op if the tailer source came from AD
if l.collectAllSource != nil {
l.collectAllSource.RemoveInput(containerID)
Expand All @@ -285,8 +291,7 @@ func (l *Launcher) restartTailer(containerID string) {
cumulatedBackoff := 0 * time.Second
var source *config.LogSource

oldTailer, exists := l.tailers[containerID]
if exists {
if oldTailer, exists := l.getTailer(containerID); exists {
source = oldTailer.source
if l.collectAllSource != nil {
l.collectAllSource.RemoveInput(containerID)
Expand Down Expand Up @@ -343,3 +348,10 @@ func (l *Launcher) removeTailer(containerID string) {
delete(l.tailers, containerID)
l.lock.Unlock()
}

func (l *Launcher) getTailer(containerID string) (*Tailer, bool) {
l.lock.Lock()
defer l.lock.Unlock()
tailer, exist := l.tailers[containerID]
return tailer, exist
}
34 changes: 33 additions & 1 deletion pkg/logs/input/docker/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,48 @@ package docker
import (
"errors"
"io"
"time"
)

var errReaderNotInitialized = errors.New("reader not initialized")

const defaultBackoffDuration = time.Second
clamoriniere marked this conversation as resolved.
Show resolved Hide resolved
const maxBackoffDuration = 30 * time.Second

type safeReader struct {
reader io.ReadCloser

err error

backoffRetry int
backoffWaitDuration time.Duration
backoffDefaultDuration time.Duration
}

func newSafeReader() *safeReader {
return &safeReader{}
return &safeReader{
backoffDefaultDuration: defaultBackoffDuration,
}
}

func (s *safeReader) Success() {
s.err = nil
s.backoffRetry = 0
s.backoffWaitDuration = 0
}

func (s *safeReader) getBackoffAndIncrement() time.Duration {
if s.backoffWaitDuration == maxBackoffDuration {
return s.backoffWaitDuration
}
duration := s.backoffWaitDuration
s.backoffRetry++
s.backoffWaitDuration += time.Duration(s.backoffRetry) * s.backoffDefaultDuration
if s.backoffWaitDuration > maxBackoffDuration {
s.backoffWaitDuration = maxBackoffDuration
}

return duration
}

func (s *safeReader) setUnsafeReader(reader io.ReadCloser) {
Expand Down
68 changes: 68 additions & 0 deletions pkg/logs/input/docker/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"errors"
"io"
"testing"
"time"

"github.com/stretchr/testify/assert"
)
Expand Down Expand Up @@ -97,3 +98,70 @@ func TestSafeReaderClose(t *testing.T) {
err = reader.Close()
assert.Equal(t, errReaderNotInitialized, err)
}

func Test_safeReader_getBackoffAndIncrement(t *testing.T) {
type fields struct {
backoffRetry int
backoffWaitDuration time.Duration
backoffDefaultDuration time.Duration
}
tests := []struct {
name string
fields fields
want time.Duration
wantRetry int
wantWaitDuration time.Duration
}{
{
name: "init backoff, should return 0",
fields: fields{
backoffRetry: 0,
backoffWaitDuration: 0,
backoffDefaultDuration: time.Second,
},
want: 0,
wantRetry: 1,
wantWaitDuration: time.Second,
},
{
name: "second backoff, should return 1",
fields: fields{
backoffRetry: 1,
backoffWaitDuration: time.Second,
backoffDefaultDuration: time.Second,
},
want: time.Second,
wantRetry: 2,
wantWaitDuration: 3 * time.Second,
},
{
name: "third backoff, should return 3",
fields: fields{
backoffRetry: 2,
backoffWaitDuration: 3 * time.Second,
backoffDefaultDuration: time.Second,
},
want: 3 * time.Second,
wantRetry: 3,
wantWaitDuration: 6 * time.Second,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s := &safeReader{
backoffRetry: tt.fields.backoffRetry,
backoffWaitDuration: tt.fields.backoffWaitDuration,
backoffDefaultDuration: tt.fields.backoffDefaultDuration,
}
if got := s.getBackoffAndIncrement(); got != tt.want {
t.Errorf("safeReader.getBackoffAndIncrement() = %v, want %v", got, tt.want)
}
if s.backoffRetry != tt.wantRetry {
t.Errorf("safeReader.backoffRetry = %v, want %v", s.backoffRetry, tt.wantRetry)
}
if s.backoffWaitDuration != tt.wantWaitDuration {
t.Errorf("safeReader.backoffWaitDuration = %v, want %v", s.backoffWaitDuration, tt.wantWaitDuration)
}
})
}
}
9 changes: 9 additions & 0 deletions pkg/logs/input/docker/since.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ func Since(registry auditor.Registry, identifier string, creationTime service.Cr
var err error
offset := registry.GetOffset(identifier)
switch {
case isEOFCorruptedOffset(offset):
since = time.Time{}
case offset != "":
// an offset was registered, tail from the offset
since, err = time.Parse(config.DateFormat, offset)
Expand All @@ -36,3 +38,10 @@ func Since(registry auditor.Registry, identifier string, creationTime service.Cr
}
return since, err
}

// isEOFCorruptedOffset return true if the offset doesn't contain a
// valid timestamp value due to a file rotation.
func isEOFCorruptedOffset(offset string) bool {
// check if the offset value is equal to EOF char
return len(offset) > 0 && offset[0] == 0x03
}
44 changes: 35 additions & 9 deletions pkg/logs/input/docker/tailer.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,11 @@ import (
)

const defaultSleepDuration = 1 * time.Second
const readTimeout = 30 * time.Second
const defaultReadTimeout = 30 * time.Second

type dockerContainerLogInterface interface {
ContainerLogs(ctx context.Context, container string, options types.ContainerLogsOptions) (io.ReadCloser, error)
}

// Tailer tails logs coming from stdout and stderr of a docker container
// Logs from stdout and stderr are multiplexed into a single channel and needs to be demultiplexed later one.
Expand All @@ -38,10 +42,11 @@ type Tailer struct {
outputChan chan *message.Message
decoder *decoder.Decoder
reader *safeReader
cli *client.Client
cli dockerContainerLogInterface
source *config.LogSource
tagProvider tag.Provider

readTimeout time.Duration
sleepDuration time.Duration
shouldStop bool
stop chan struct{}
Expand All @@ -61,6 +66,7 @@ func NewTailer(cli *client.Client, containerID string, source *config.LogSource,
source: source,
tagProvider: tag.NewProvider(dockerutil.ContainerIDToTaggerEntityName(containerID)),
cli: cli,
readTimeout: defaultReadTimeout,
sleepDuration: defaultSleepDuration,
stop: make(chan struct{}, 1),
done: make(chan struct{}, 1),
Expand Down Expand Up @@ -132,9 +138,16 @@ func (t *Tailer) setupReader() error {
reader, err := t.cli.ContainerLogs(ctx, t.ContainerID, options)
t.reader.setUnsafeReader(reader)
t.cancelFunc = cancelFunc

return err
}

func (t *Tailer) restartReader() error {
backoffDuration := t.reader.getBackoffAndIncrement()
time.Sleep(backoffDuration)
return t.setupReader()
}

// tail sets up and starts the tailer
func (t *Tailer) tail(since string) error {
t.setLastSince(since)
Expand All @@ -159,13 +172,20 @@ func (t *Tailer) tail(since string) error {
func (t *Tailer) readForever() {
defer t.decoder.Stop()
for {
if t.reader.err != nil {
err := t.restartReader()
if err != nil {
log.Debugf("unable to restart the Reader for container %v, ", ShortContainerID(t.ContainerID))
continue
}
}
select {
case <-t.stop:
// stop reading new logs from container
return
default:
inBuf := make([]byte, 4096)
n, err := t.read(inBuf, readTimeout)
n, err := t.read(inBuf, t.readTimeout)
if err != nil { // an error occurred, stop from reading new logs
switch {
case isReaderClosed(err):
Expand All @@ -185,19 +205,25 @@ func (t *Tailer) readForever() {
// This error is raised when the agent is stopping
return
case err == io.EOF:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here I would have reused the same logic as in case isContextCanceled(err): and reused the method wait() to avoid hammering the docker socket and avoid the CPU to go high.

Copy link
Contributor Author

@clamoriniere clamoriniere Feb 13, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we try only once to setup the reader then ask the launcher to restart the tailer?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in general, does it make sense to have different retry strategies depending on the error (isContextCanceled, io.EOF, and default)? If so, why? (this is an honest question, I don't know these different types of errors enough to answer).

Otherwise I feel all errors that require a retry could use the same overall logic: retry setting up the reader, and if that failed ask the launcher to restart the tailer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have the same feeling.
pr updated in this direction.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed

// This error is raised when the container is stopping
// or when the container has not started to output logs yet.
// Retry to read to make sure all logs are collected
// or stop reading on the next iteration
// if the tailer has been stopped.
log.Debugf("No new logs are available for container %v", ShortContainerID(t.ContainerID))
// This error is raised when:
// * the container is stopping.
// * when the container has not started to output logs yet.
// * during a file rotation.
// restart the reader (by providing the error the t.reader)
// the reader will be restarted with a backoff policy.
t.source.Status.Error(fmt.Errorf("log decoder returns an EOF error that will trigger a Reader restart"))
log.Debugf("log decoder returns an EOF error that will trigger a Reader restart for container %v", ShortContainerID(t.ContainerID))
t.reader.err = err
continue
default:
t.source.Status.Error(err)
log.Errorf("Could not tail logs for container %v: %v", ShortContainerID(t.ContainerID), err)
t.erroredContainerID <- t.ContainerID
return
}
}
t.reader.Success()
t.source.Status.Success()
clamoriniere marked this conversation as resolved.
Show resolved Hide resolved
if n == 0 {
// wait for new data to come
t.wait()
Expand Down
Loading