From fbe16e156cd90e11cbe5b70c59a83d94dc923cf4 Mon Sep 17 00:00:00 2001 From: Daniel Jaglowski Date: Mon, 26 Jun 2023 09:24:56 -0400 Subject: [PATCH] [pkg/stanza/fileconsumer] Fix issue where buffer size could cause incorrect fingerprint update (#23183) The logic for determining when to update a fingerprint previously failed to account for the case where the buffer size is smaller than the fingerprint. This allows the fingerprint to be truncated in some cases. This PR rewrites the logic to explicitly handle each expected case, with optimization for handling the most common cases first. --- .chloggen/fileconsumer-fix-offset.yaml | 20 ++ pkg/stanza/fileconsumer/config.go | 2 + pkg/stanza/fileconsumer/file_test.go | 4 +- pkg/stanza/fileconsumer/reader.go | 87 ++++-- pkg/stanza/fileconsumer/reader_test.go | 321 ++++++++++++++++++++++- pkg/stanza/fileconsumer/rotation_test.go | 1 + pkg/stanza/fileconsumer/scanner.go | 8 +- pkg/stanza/fileconsumer/scanner_test.go | 2 +- pkg/stanza/fileconsumer/util_test.go | 4 +- 9 files changed, 413 insertions(+), 36 deletions(-) create mode 100755 .chloggen/fileconsumer-fix-offset.yaml diff --git a/.chloggen/fileconsumer-fix-offset.yaml b/.chloggen/fileconsumer-fix-offset.yaml new file mode 100755 index 0000000000000..633301dd6aaed --- /dev/null +++ b/.chloggen/fileconsumer-fix-offset.yaml @@ -0,0 +1,20 @@ +# Use this changelog template to create an entry for release notes. +# If your change doesn't affect end users, such as a test fix or a tooling change, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: pkg/stanza + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Fix issue where large fingerprint_size could cause duplication of logs + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [22936] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: diff --git a/pkg/stanza/fileconsumer/config.go b/pkg/stanza/fileconsumer/config.go index 6178368cadec0..b0cfe941299f5 100644 --- a/pkg/stanza/fileconsumer/config.go +++ b/pkg/stanza/fileconsumer/config.go @@ -19,6 +19,7 @@ import ( const ( defaultMaxLogSize = 1024 * 1024 defaultMaxConcurrentFiles = 1024 + defaultBufSize = 16 * 1024 ) var allowFileDeletion = featuregate.GlobalRegistry().MustRegister( @@ -148,6 +149,7 @@ func (c Config) buildManager(logger *zap.SugaredLogger, emit EmitFunc, factory s readerConfig: &readerConfig{ fingerprintSize: int(c.FingerprintSize), maxLogSize: int(c.MaxLogSize), + bufferSize: defaultBufSize, emit: emit, }, fromBeginning: startAtBeginning, diff --git a/pkg/stanza/fileconsumer/file_test.go b/pkg/stanza/fileconsumer/file_test.go index 52d55064b0e21..ad82be9046494 100644 --- a/pkg/stanza/fileconsumer/file_test.go +++ b/pkg/stanza/fileconsumer/file_test.go @@ -617,12 +617,12 @@ func TestIgnoreEmptyFiles(t *testing.T) { temp4 := openTemp(t, tempDir) writeString(t, temp, "testlog1\n") - writeString(t, temp3, "testlog2\n") + writeString(t, temp2, "testlog2\n") operator.poll(context.Background()) waitForTokens(t, emitCalls, [][]byte{[]byte("testlog1"), []byte("testlog2")}) - writeString(t, temp2, "testlog3\n") + writeString(t, temp3, "testlog3\n") writeString(t, temp4, "testlog4\n") operator.poll(context.Background()) diff --git a/pkg/stanza/fileconsumer/reader.go b/pkg/stanza/fileconsumer/reader.go index e4e18ecbb868f..639c8cafb03eb 100644 --- a/pkg/stanza/fileconsumer/reader.go +++ b/pkg/stanza/fileconsumer/reader.go @@ -19,6 +19,7 @@ import ( type readerConfig struct { fingerprintSize int maxLogSize int + bufferSize int emit EmitFunc } @@ -65,7 +66,11 @@ func (r *Reader) ReadToEnd(ctx context.Context) { return } - scanner := NewPositionalScanner(r, r.maxLogSize, r.Offset, r.splitFunc) + bufferSize := r.bufferSize + if r.bufferSize < r.fingerprintSize { + bufferSize = r.fingerprintSize + } + scanner := NewPositionalScanner(r, r.maxLogSize, bufferSize, r.Offset, r.splitFunc) // Iterate over the tokenized file, emitting entries as we go for { @@ -104,7 +109,7 @@ func (r *Reader) ReadToEnd(ctx context.Context) { return } - scanner = NewPositionalScanner(r, r.maxLogSize, r.Offset, r.splitFunc) + scanner = NewPositionalScanner(r, r.maxLogSize, r.bufferSize, r.Offset, r.splitFunc) } r.Offset = scanner.Pos() @@ -172,32 +177,64 @@ func (r *Reader) Close() { } // Read from the file and update the fingerprint if necessary -func (r *Reader) Read(dst []byte) (int, error) { - // Skip if fingerprint is already built - // or if fingerprint is behind Offset - if len(r.Fingerprint.FirstBytes) == r.fingerprintSize || int(r.Offset) > len(r.Fingerprint.FirstBytes) { - return r.file.Read(dst) - } - n, err := r.file.Read(dst) - appendCount := min0(n, r.fingerprintSize-int(r.Offset)) - // return for n == 0 or r.Offset >= r.fileInput.fingerprintSize - if appendCount == 0 { - return n, err - } - - // for appendCount==0, the following code would add `0` to fingerprint - r.Fingerprint.FirstBytes = append(r.Fingerprint.FirstBytes[:r.Offset], dst[:appendCount]...) - return n, err -} +func (r *Reader) Read(dst []byte) (n int, err error) { + n, err = r.file.Read(dst) + + if len(r.Fingerprint.FirstBytes) == r.fingerprintSize { + // Steady state. Just return data to scanner. + return + } -func min0(a, b int) int { - if a < 0 || b < 0 { - return 0 + if len(r.Fingerprint.FirstBytes) > r.fingerprintSize { + // Oversized fingerprint. The component was restarted with a decreased 'fingerprint_size'. + // Just return data to scanner. + return } - if a < b { - return a + + if int(r.Offset) > len(r.Fingerprint.FirstBytes) { + // Undersized fingerprint. The component was restarted with an increased 'fingerprint_size. + // However, we've already read past the fingerprint. Just keep reading. + return + } + + if len(r.Fingerprint.FirstBytes) == int(r.Offset) { + // The fingerprint is incomplete but is exactly aligned with the offset. + // Take advantage of the simple case and avoid some computation. + appendCount := r.fingerprintSize - len(r.Fingerprint.FirstBytes) + if appendCount > n { + appendCount = n + } + r.Fingerprint.FirstBytes = append(r.Fingerprint.FirstBytes, dst[:appendCount]...) + } + + // The fingerprint is incomplete and is NOT aligned with the offset. This means the fingerprint + // contains data that hasn't yet been emitted. Either we observed an incomplete token at the end of the + // file, or we are running with 'start_at: beginning' in which case the fingerprint is initialized + // independently of the Reader. + + // Allowing the fingerprint to run ahead of tokenization improves our ability to uniquely identify files. + // However, it also means we must compensate for the misalignment when appending to the fingerprint. + + // WE MUST ASSUME that the fingerprint will never contain a token longer than the 'dst' buffer. + // The easiest way to enforce this is to ensure the buffer is at least as large as the fingerprint. + // Unfortunately, this must be enforced outside of this function. + // Without this guarantee, the scanner may call this function consecutively before we are able to update + // the offset, which means we cannot trust the offset to tell us which data in the 'dst' buffer has + // already been appended to the fingerprint. + + newBytesIndex := len(r.Fingerprint.FirstBytes) - int(r.Offset) + if n <= newBytesIndex { + // Already have this data in the fingerprint. Just return data to scanner. + return + } + + appendCount := r.fingerprintSize - len(r.Fingerprint.FirstBytes) + if appendCount > n-newBytesIndex { + // Not enough new data to complete the fingerprint, but append what we have. + appendCount = n - newBytesIndex } - return b + r.Fingerprint.FirstBytes = append(r.Fingerprint.FirstBytes, dst[newBytesIndex:newBytesIndex+appendCount]...) + return } // mapCopy deep copies the provided attributes map. diff --git a/pkg/stanza/fileconsumer/reader_test.go b/pkg/stanza/fileconsumer/reader_test.go index 2fd341d0c2bd5..ec9b49ccddcb4 100644 --- a/pkg/stanza/fileconsumer/reader_test.go +++ b/pkg/stanza/fileconsumer/reader_test.go @@ -7,6 +7,7 @@ import ( "bytes" "context" "encoding/json" + "fmt" "testing" "time" @@ -192,7 +193,7 @@ func TestHeaderFingerprintIncluded(t *testing.T) { r.ReadToEnd(context.Background()) - require.Equal(t, []byte("#header-line\naaa\n"), r.Fingerprint.FirstBytes) + require.Equal(t, fileContent, r.Fingerprint.FirstBytes) } func testReaderFactory(t *testing.T) (*readerFactory, chan *emitParams) { @@ -292,3 +293,321 @@ func TestEncodingDecode(t *testing.T) { assert.Empty(t, decodedReader.FileAttributes.NameResolved) assert.Empty(t, decodedReader.FileAttributes.PathResolved) } + +func TestReaderUpdateFingerprint(t *testing.T) { + bufferSizes := []int{2, 3, 5, 8, 10, 13, 20, 50} + testCases := []updateFingerprintTest{ + { + name: "new_file", + fingerprintSize: 10, + maxLogSize: 100, + initBytes: []byte(""), + moreBytes: []byte("1234567890\n"), + expectTokens: [][]byte{[]byte("1234567890")}, + expectOffset: 11, + expectFingerprint: []byte("1234567890"), + }, + { + name: "existing_partial_line_from_start", + fingerprintSize: 10, + maxLogSize: 100, + fromBeginning: true, + initBytes: []byte("foo"), + moreBytes: []byte("1234567890\n"), + expectTokens: [][]byte{[]byte("foo1234567890")}, + expectOffset: 14, + expectFingerprint: []byte("foo1234567"), + }, + { + name: "existing_partial_line", + fingerprintSize: 10, + maxLogSize: 100, + initBytes: []byte("foo"), + moreBytes: []byte("1234567890\n"), + expectTokens: [][]byte{[]byte("1234567890")}, + expectOffset: 14, + expectFingerprint: []byte("foo1234567"), + }, + { + name: "existing_full_line_from_start", + fingerprintSize: 10, + maxLogSize: 100, + fromBeginning: true, + initBytes: []byte("foo\n"), + moreBytes: []byte("1234567890\n"), + expectTokens: [][]byte{[]byte("foo"), []byte("1234567890")}, + expectOffset: 15, + expectFingerprint: []byte("foo\n123456"), + }, + { + name: "existing_full_line", + fingerprintSize: 10, + maxLogSize: 100, + initBytes: []byte("foo\n"), + moreBytes: []byte("1234567890\n"), + expectTokens: [][]byte{[]byte("1234567890")}, + expectOffset: 15, + expectFingerprint: []byte("foo\n123456"), + }, + { + name: "split_none_from_start", + fingerprintSize: 10, + maxLogSize: 100, + fromBeginning: true, + initBytes: []byte("foo"), + moreBytes: []byte("1234567890"), + expectTokens: [][]byte{}, + expectOffset: 0, + expectFingerprint: []byte("foo1234567"), + }, + { + name: "split_none", + fingerprintSize: 10, + maxLogSize: 100, + initBytes: []byte("foo"), + moreBytes: []byte("1234567890"), + expectTokens: [][]byte{}, + expectOffset: 3, + expectFingerprint: []byte("foo1234567"), + }, + { + name: "split_mid_from_start", + fingerprintSize: 10, + maxLogSize: 100, + fromBeginning: true, + initBytes: []byte("foo"), + moreBytes: []byte("12345\n67890"), + expectTokens: [][]byte{[]byte("foo12345")}, + expectOffset: 9, + expectFingerprint: []byte("foo12345\n6"), + }, + { + name: "split_mid", + fingerprintSize: 10, + maxLogSize: 100, + initBytes: []byte("foo"), + moreBytes: []byte("12345\n67890"), + expectTokens: [][]byte{[]byte("12345")}, + expectOffset: 9, + expectFingerprint: []byte("foo12345\n6"), + }, + { + name: "clean_end_from_start", + fingerprintSize: 10, + maxLogSize: 100, + fromBeginning: true, + initBytes: []byte("foo"), + moreBytes: []byte("12345\n67890\n"), + expectTokens: [][]byte{[]byte("foo12345"), []byte("67890")}, + expectOffset: 15, + expectFingerprint: []byte("foo12345\n6"), + }, + { + name: "clean_end", + fingerprintSize: 10, + maxLogSize: 100, + initBytes: []byte("foo"), + moreBytes: []byte("12345\n67890\n"), + expectTokens: [][]byte{[]byte("12345"), []byte("67890")}, + expectOffset: 15, + expectFingerprint: []byte("foo12345\n6"), + }, + { + name: "full_lines_only_from_start", + fingerprintSize: 10, + maxLogSize: 100, + fromBeginning: true, + initBytes: []byte("foo\n"), + moreBytes: []byte("12345\n67890\n"), + expectTokens: [][]byte{[]byte("foo"), []byte("12345"), []byte("67890")}, + expectOffset: 16, + expectFingerprint: []byte("foo\n12345\n"), + }, + { + name: "full_lines_only", + fingerprintSize: 10, + maxLogSize: 100, + initBytes: []byte("foo\n"), + moreBytes: []byte("12345\n67890\n"), + expectTokens: [][]byte{[]byte("12345"), []byte("67890")}, + expectOffset: 16, + expectFingerprint: []byte("foo\n12345\n"), + }, + { + name: "tiny_max_log_size_from_start", + fingerprintSize: 10, + maxLogSize: 2, + fromBeginning: true, + initBytes: []byte("foo"), + moreBytes: []byte("12345\n67890"), + expectTokens: [][]byte{[]byte("fo"), []byte("o1"), []byte("23"), []byte("45"), []byte("67"), []byte("89")}, + expectOffset: 13, + expectFingerprint: []byte("foo12345\n6"), + }, + { + name: "tiny_max_log_size", + fingerprintSize: 10, + maxLogSize: 2, + initBytes: []byte("foo"), + moreBytes: []byte("12345\n67890"), + expectTokens: [][]byte{[]byte("12"), []byte("34"), []byte("5"), []byte("67"), []byte("89")}, + expectOffset: 13, + expectFingerprint: []byte("foo12345\n6"), + }, + { + name: "small_max_log_size_from_start", + fingerprintSize: 20, + maxLogSize: 4, + fromBeginning: true, + initBytes: []byte("foo"), + moreBytes: []byte("1234567890\nbar\nhelloworld\n"), + expectTokens: [][]byte{[]byte("foo1"), []byte("2345"), []byte("6789"), []byte("0"), []byte("bar"), []byte("hell"), []byte("owor"), []byte("ld")}, + expectOffset: 29, + expectFingerprint: []byte("foo1234567890\nbar\nhe"), + }, + { + name: "small_max_log_size", + fingerprintSize: 20, + maxLogSize: 4, + initBytes: []byte("foo"), + moreBytes: []byte("1234567890\nbar\nhelloworld\n"), + expectTokens: [][]byte{[]byte("1234"), []byte("5678"), []byte("90"), []byte("bar"), []byte("hell"), []byte("owor"), []byte("ld")}, + expectOffset: 29, + expectFingerprint: []byte("foo1234567890\nbar\nhe"), + }, + { + name: "leading_empty_from_start", + fingerprintSize: 10, + maxLogSize: 100, + fromBeginning: true, + initBytes: []byte(""), + moreBytes: []byte("\n12345\n67890\n"), + expectTokens: [][]byte{[]byte(""), []byte("12345"), []byte("67890")}, + expectOffset: 13, + expectFingerprint: []byte("\n12345\n678"), + }, + { + name: "leading_empty", + fingerprintSize: 10, + maxLogSize: 100, + initBytes: []byte(""), + moreBytes: []byte("\n12345\n67890\n"), + expectTokens: [][]byte{[]byte(""), []byte("12345"), []byte("67890")}, + expectOffset: 13, + expectFingerprint: []byte("\n12345\n678"), + }, + { + name: "multiple_empty_from_start", + fingerprintSize: 10, + maxLogSize: 100, + fromBeginning: true, + initBytes: []byte(""), + moreBytes: []byte("\n\n12345\n\n67890\n\n"), + expectTokens: [][]byte{[]byte(""), []byte(""), []byte("12345"), []byte(""), []byte("67890"), []byte("")}, + expectOffset: 16, + expectFingerprint: []byte("\n\n12345\n\n6"), + }, + { + name: "multiple_empty", + fingerprintSize: 10, + maxLogSize: 100, + initBytes: []byte(""), + moreBytes: []byte("\n\n12345\n\n67890\n\n"), + expectTokens: [][]byte{[]byte(""), []byte(""), []byte("12345"), []byte(""), []byte("67890"), []byte("")}, + expectOffset: 16, + expectFingerprint: []byte("\n\n12345\n\n6"), + }, + { + name: "multiple_empty_partial_end_from_start", + fingerprintSize: 10, + maxLogSize: 100, + fromBeginning: true, + initBytes: []byte(""), + moreBytes: []byte("\n\n12345\n\n67890"), + expectTokens: [][]byte{[]byte(""), []byte(""), []byte("12345"), []byte("")}, + expectOffset: 9, + expectFingerprint: []byte("\n\n12345\n\n6"), + }, + { + name: "multiple_empty_partial_end", + fingerprintSize: 10, + maxLogSize: 100, + initBytes: []byte(""), + moreBytes: []byte("\n\n12345\n\n67890"), + expectTokens: [][]byte{[]byte(""), []byte(""), []byte("12345"), []byte("")}, + expectOffset: 9, + expectFingerprint: []byte("\n\n12345\n\n6"), + }, + } + + for _, tc := range testCases { + for _, bufferSize := range bufferSizes { + t.Run(fmt.Sprintf("%s/bufferSize:%d", tc.name, bufferSize), tc.run(bufferSize)) + } + } +} + +type updateFingerprintTest struct { + name string + fingerprintSize int + maxLogSize int + fromBeginning bool + initBytes []byte + moreBytes []byte + expectTokens [][]byte + expectOffset int64 + expectFingerprint []byte +} + +func (tc updateFingerprintTest) run(bufferSize int) func(*testing.T) { + return func(t *testing.T) { + splitterConfig := helper.NewSplitterConfig() + emitChan := make(chan *emitParams, 100) + f := &readerFactory{ + SugaredLogger: testutil.Logger(t), + readerConfig: &readerConfig{ + fingerprintSize: tc.fingerprintSize, + maxLogSize: tc.maxLogSize, + bufferSize: bufferSize, + emit: testEmitFunc(emitChan), + }, + fromBeginning: tc.fromBeginning, + splitterFactory: newMultilineSplitterFactory(splitterConfig), + encodingConfig: splitterConfig.EncodingConfig, + } + + temp := openTemp(t, t.TempDir()) + _, err := temp.Write(tc.initBytes) + require.NoError(t, err) + + fi, err := temp.Stat() + require.NoError(t, err) + require.Equal(t, int64(len(tc.initBytes)), fi.Size()) + + fp, err := NewFingerprint(temp, tc.fingerprintSize) + require.NoError(t, err) + r, err := f.newReader(temp, fp) + require.NoError(t, err) + require.Same(t, temp, r.file) + + if tc.fromBeginning { + assert.Equal(t, int64(0), r.Offset) + } else { + assert.Equal(t, int64(len(tc.initBytes)), r.Offset) + } + assert.Equal(t, tc.initBytes, r.Fingerprint.FirstBytes) + + i, err := temp.Write(tc.moreBytes) + require.NoError(t, err) + require.Equal(t, i, len(tc.moreBytes)) + + r.ReadToEnd(context.Background()) + + for _, token := range tc.expectTokens { + tk := readToken(t, emitChan) + require.Equal(t, token, tk) + } + assert.Equal(t, tc.expectOffset, r.Offset) + assert.Equal(t, tc.expectFingerprint, r.Fingerprint.FirstBytes) + } +} diff --git a/pkg/stanza/fileconsumer/rotation_test.go b/pkg/stanza/fileconsumer/rotation_test.go index 5c02fddd86e45..59ecfc52f0bd0 100644 --- a/pkg/stanza/fileconsumer/rotation_test.go +++ b/pkg/stanza/fileconsumer/rotation_test.go @@ -252,6 +252,7 @@ func (rt rotationTest) run(tc rotationTest, copyTruncate, sequential bool) func( cfg := NewConfig().includeDir(tempDir) cfg.StartAt = "beginning" cfg.PollInterval = tc.pollInterval + cfg.FingerprintSize = 128 emitCalls := make(chan *emitParams, tc.totalLines) operator, _ := buildTestManager(t, cfg, withEmitChan(emitCalls)) diff --git a/pkg/stanza/fileconsumer/scanner.go b/pkg/stanza/fileconsumer/scanner.go index 410c868080a13..798355edf7e88 100644 --- a/pkg/stanza/fileconsumer/scanner.go +++ b/pkg/stanza/fileconsumer/scanner.go @@ -11,8 +11,6 @@ import ( stanzaerrors "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/errors" ) -const defaultBufSize = 16 * 1024 - // PositionalScanner is a scanner that maintains position // // Deprecated: [v0.80.0] This will be made internal in a future release, tentatively v0.82.0. @@ -24,18 +22,18 @@ type PositionalScanner struct { // NewPositionalScanner creates a new positional scanner // // Deprecated: [v0.80.0] This will be made internal in a future release, tentatively v0.82.0. -func NewPositionalScanner(r io.Reader, maxLogSize int, startOffset int64, splitFunc bufio.SplitFunc) *PositionalScanner { +func NewPositionalScanner(r io.Reader, maxLogSize int, bufferCap int, startOffset int64, splitFunc bufio.SplitFunc) *PositionalScanner { ps := &PositionalScanner{ pos: startOffset, Scanner: bufio.NewScanner(r), } - buf := make([]byte, 0, defaultBufSize) + buf := make([]byte, 0, bufferCap) ps.Scanner.Buffer(buf, maxLogSize*2) scanFunc := func(data []byte, atEOF bool) (advance int, token []byte, err error) { advance, token, err = splitFunc(data, atEOF) - if (advance == 0 && token == nil && err == nil) && len(data) >= 2*maxLogSize { + if (advance == 0 && token == nil && err == nil) && len(data) >= maxLogSize { // reference: https://pkg.go.dev/bufio#SplitFunc // splitFunc returns (0, nil, nil) to signal the Scanner to read more data but the buffer is full. // Truncate the log entry. diff --git a/pkg/stanza/fileconsumer/scanner_test.go b/pkg/stanza/fileconsumer/scanner_test.go index fa0c4a7b7ae20..402af98421745 100644 --- a/pkg/stanza/fileconsumer/scanner_test.go +++ b/pkg/stanza/fileconsumer/scanner_test.go @@ -72,7 +72,7 @@ func TestScanner(t *testing.T) { t.Run(tc.name, func(t *testing.T) { reader := bytes.NewReader(tc.stream) splitter := simpleSplit(tc.delimiter) - scanner := NewPositionalScanner(reader, tc.maxSize, tc.startOffset, splitter) + scanner := NewPositionalScanner(reader, tc.maxSize, defaultBufSize, tc.startOffset, splitter) for i, p := 0, 0; scanner.Scan(); i++ { require.NoError(t, scanner.getError()) diff --git a/pkg/stanza/fileconsumer/util_test.go b/pkg/stanza/fileconsumer/util_test.go index 5f5dc386032d5..9324c0d29b4fe 100644 --- a/pkg/stanza/fileconsumer/util_test.go +++ b/pkg/stanza/fileconsumer/util_test.go @@ -205,8 +205,8 @@ func expectNoTokens(t *testing.T, c chan *emitParams) { func expectNoTokensUntil(t *testing.T, c chan *emitParams, d time.Duration) { select { - case token := <-c: - require.FailNow(t, "Received unexpected message", "Message: %s", token) + case call := <-c: + require.FailNow(t, "Received unexpected message", "Message: %s", call.token) case <-time.After(d): } }