-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix docker logs tailer with file rotation #4860
Changes from 6 commits
536675e
9d05a7d
e5440e0
9d68f6c
de622b8
fb97bc1
ea5d600
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -28,7 +28,11 @@ import ( | |
) | ||
|
||
const defaultSleepDuration = 1 * time.Second | ||
const readTimeout = 30 * time.Second | ||
const defaultReadTimeout = 30 * time.Second | ||
|
||
type dockerContainerLogInterface interface { | ||
ContainerLogs(ctx context.Context, container string, options types.ContainerLogsOptions) (io.ReadCloser, error) | ||
} | ||
|
||
// Tailer tails logs coming from stdout and stderr of a docker container | ||
// Logs from stdout and stderr are multiplexed into a single channel and needs to be demultiplexed later one. | ||
|
@@ -38,10 +42,11 @@ type Tailer struct { | |
outputChan chan *message.Message | ||
decoder *decoder.Decoder | ||
reader *safeReader | ||
cli *client.Client | ||
cli dockerContainerLogInterface | ||
source *config.LogSource | ||
tagProvider tag.Provider | ||
|
||
readTimeout time.Duration | ||
sleepDuration time.Duration | ||
shouldStop bool | ||
stop chan struct{} | ||
|
@@ -61,6 +66,7 @@ func NewTailer(cli *client.Client, containerID string, source *config.LogSource, | |
source: source, | ||
tagProvider: tag.NewProvider(dockerutil.ContainerIDToTaggerEntityName(containerID)), | ||
cli: cli, | ||
readTimeout: defaultReadTimeout, | ||
sleepDuration: defaultSleepDuration, | ||
stop: make(chan struct{}, 1), | ||
done: make(chan struct{}, 1), | ||
|
@@ -132,9 +138,16 @@ func (t *Tailer) setupReader() error { | |
reader, err := t.cli.ContainerLogs(ctx, t.ContainerID, options) | ||
t.reader.setUnsafeReader(reader) | ||
t.cancelFunc = cancelFunc | ||
|
||
return err | ||
} | ||
|
||
func (t *Tailer) restartReader() error { | ||
backoffDuration := t.reader.getBackoffAndIncrement() | ||
time.Sleep(backoffDuration) | ||
return t.setupReader() | ||
} | ||
|
||
// tail sets up and starts the tailer | ||
func (t *Tailer) tail(since string) error { | ||
t.setLastSince(since) | ||
|
@@ -159,13 +172,20 @@ func (t *Tailer) tail(since string) error { | |
func (t *Tailer) readForever() { | ||
defer t.decoder.Stop() | ||
for { | ||
if t.reader.err != nil { | ||
err := t.restartReader() | ||
if err != nil { | ||
log.Debugf("unable to restart the Reader for container %v, ", ShortContainerID(t.ContainerID)) | ||
continue | ||
} | ||
} | ||
select { | ||
case <-t.stop: | ||
// stop reading new logs from container | ||
return | ||
default: | ||
inBuf := make([]byte, 4096) | ||
n, err := t.read(inBuf, readTimeout) | ||
n, err := t.read(inBuf, t.readTimeout) | ||
if err != nil { // an error occurred, stop from reading new logs | ||
switch { | ||
case isReaderClosed(err): | ||
|
@@ -185,19 +205,25 @@ func (t *Tailer) readForever() { | |
// This error is raised when the agent is stopping | ||
return | ||
case err == io.EOF: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here I would have reused the same logic as in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should we try only once to setup the reader then ask the launcher to restart the tailer? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. in general, does it make sense to have different retry strategies depending on the error ( Otherwise I feel all errors that require a retry could use the same overall logic: retry setting up the reader, and if that failed ask the launcher to restart the tailer. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have the same feeling. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. agreed |
||
// This error is raised when the container is stopping | ||
// or when the container has not started to output logs yet. | ||
// Retry to read to make sure all logs are collected | ||
// or stop reading on the next iteration | ||
// if the tailer has been stopped. | ||
log.Debugf("No new logs are available for container %v", ShortContainerID(t.ContainerID)) | ||
// This error is raised when: | ||
// * the container is stopping. | ||
// * when the container has not started to output logs yet. | ||
// * during a file rotation. | ||
// restart the reader (by providing the error the t.reader) | ||
// the reader will be restarted with a backoff policy. | ||
t.source.Status.Error(fmt.Errorf("log decoder returns an EOF error that will trigger a Reader restart")) | ||
log.Debugf("log decoder returns an EOF error that will trigger a Reader restart for container %v", ShortContainerID(t.ContainerID)) | ||
t.reader.err = err | ||
continue | ||
default: | ||
t.source.Status.Error(err) | ||
log.Errorf("Could not tail logs for container %v: %v", ShortContainerID(t.ContainerID), err) | ||
t.erroredContainerID <- t.ContainerID | ||
return | ||
} | ||
} | ||
t.reader.Success() | ||
t.source.Status.Success() | ||
clamoriniere marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if n == 0 { | ||
// wait for new data to come | ||
t.wait() | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://github.com/DataDog/datadog-agent/pull/4860/files#diff-5142632c5dd9990aa6feb0918f9334f7L85 is in the critical path but I think it should not because any method for the run method acquiring the mutex can be called concurrently
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could probably create a setup and reset methods which are protected by the mutex
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess there's technically a chance of a deadlock if one of these tickers tick right after
Stop
here has locked the mutex, sincecleanupRegistry
andflushRegistry
both need to hold the lock. The chances of running into it are likely extremely low, so not sure if it's worth addressing. To avoid this we could have a separate mutex forregistry
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was also thinking to move it in the New() function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make sense. I added the new mutex.