From 784514fb0440f8cb29e36671e6ed1854c41b12fc Mon Sep 17 00:00:00 2001 From: Paulo Janotti Date: Fri, 13 Oct 2023 10:59:45 -0700 Subject: [PATCH] [pkg/stanza] Cache event publishers: log warn once per provider (#27658) **Description:** Cache the publisher event to: 1. Avoid logging the same error message every time one event from the given source is logged. 2. Avoid opening and closing the event publisher for every single event. **Link to tracking Issue:** [Item 4 described on the investigation](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/21491#issuecomment-1745627714) for issue #21491. **Testing:** * Go tests for `pkg/stanza` and `receiver/windowseventlogreceiver` on Windows box. * Ran the contrib build locally to validate the change. * Can't run the full make locally: misspell is failing on Windows because the command line is too long. **Documentation:** Let me know if changing the severity of the log message requires a changelog update. --- ...-warning-when-publisher-not-available.yaml | 27 ++++++++ pkg/stanza/operator/input/windows/operator.go | 17 +++-- .../operator/input/windows/publisher.go | 4 ++ .../operator/input/windows/publisher_test.go | 23 ++++++- .../operator/input/windows/publishercache.go | 50 ++++++++++++++ .../input/windows/publishercache_test.go | 68 +++++++++++++++++++ 6 files changed, 182 insertions(+), 7 deletions(-) create mode 100644 .chloggen/log-warning-when-publisher-not-available.yaml create mode 100644 pkg/stanza/operator/input/windows/publishercache.go create mode 100644 pkg/stanza/operator/input/windows/publishercache_test.go diff --git a/.chloggen/log-warning-when-publisher-not-available.yaml b/.chloggen/log-warning-when-publisher-not-available.yaml new file mode 100644 index 0000000000000..1228492e1f65e --- /dev/null +++ b/.chloggen/log-warning-when-publisher-not-available.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: pkg/stanza + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Log warning, instead of error, when Windows Event Log publisher metadata is not available and cache the successfully retrieved ones. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [27658] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/pkg/stanza/operator/input/windows/operator.go b/pkg/stanza/operator/input/windows/operator.go index 5df98bb64cefd..c34a0db6086da 100644 --- a/pkg/stanza/operator/input/windows/operator.go +++ b/pkg/stanza/operator/input/windows/operator.go @@ -94,6 +94,7 @@ type Input struct { excludeProviders []string pollInterval time.Duration persister operator.Persister + publisherCache publisherCache cancel context.CancelFunc wg sync.WaitGroup } @@ -123,6 +124,8 @@ func (e *Input) Start(persister operator.Persister) error { return fmt.Errorf("failed to open subscription: %w", err) } + e.publisherCache = newPublisherCache() + e.wg.Add(1) go e.readOnInterval(ctx) return nil @@ -141,6 +144,10 @@ func (e *Input) Stop() error { return fmt.Errorf("failed to close bookmark: %w", err) } + if err := e.publisherCache.evictAll(); err != nil { + return fmt.Errorf("failed to close publishers: %w", err) + } + return nil } @@ -231,13 +238,15 @@ func (e *Input) processEvent(ctx context.Context, event Event) { } } - publisher := NewPublisher() - if err := publisher.Open(simpleEvent.Provider.Name); err != nil { - e.Errorf("Failed to open publisher: %s: writing log entry to pipeline without metadata", err) + publisher, openPublisherErr := e.publisherCache.get(simpleEvent.Provider.Name) + if openPublisherErr != nil { + e.Warnf("Failed to open the %q event source, respective log entries can't be formatted: %s", simpleEvent.Provider.Name, openPublisherErr) + } + + if !publisher.Valid() { e.sendEvent(ctx, simpleEvent) return } - defer publisher.Close() formattedEvent, err := event.RenderFormatted(e.buffer, publisher) if err != nil { diff --git a/pkg/stanza/operator/input/windows/publisher.go b/pkg/stanza/operator/input/windows/publisher.go index dc6e8668063c2..8000440362e5f 100644 --- a/pkg/stanza/operator/input/windows/publisher.go +++ b/pkg/stanza/operator/input/windows/publisher.go @@ -36,6 +36,10 @@ func (p *Publisher) Open(provider string) error { return nil } +func (p *Publisher) Valid() bool { + return p.handle != 0 +} + // Close will close the publisher handle. func (p *Publisher) Close() error { if p.handle == 0 { diff --git a/pkg/stanza/operator/input/windows/publisher_test.go b/pkg/stanza/operator/input/windows/publisher_test.go index f4d8ef2f6237f..5fb2dae0ef8c0 100644 --- a/pkg/stanza/operator/input/windows/publisher_test.go +++ b/pkg/stanza/operator/input/windows/publisher_test.go @@ -17,6 +17,7 @@ func TestPublisherOpenPreexisting(t *testing.T) { err := publisher.Open("") require.Error(t, err) require.Contains(t, err.Error(), "publisher handle is already open") + require.True(t, publisher.Valid()) } func TestPublisherOpenInvalidUTF8(t *testing.T) { @@ -25,44 +26,60 @@ func TestPublisherOpenInvalidUTF8(t *testing.T) { err := publisher.Open(invalidUTF8) require.Error(t, err) require.Contains(t, err.Error(), "failed to convert the provider name \"\\x00\" to utf16: invalid argument") + require.False(t, publisher.Valid()) } func TestPublisherOpenSyscallFailure(t *testing.T) { publisher := NewPublisher() provider := "provider" - openPublisherMetadataProc = SimpleMockProc(0, 0, ErrorNotSupported) + defer mockWithDeferredRestore(&openPublisherMetadataProc, SimpleMockProc(0, 0, ErrorNotSupported))() err := publisher.Open(provider) require.Error(t, err) require.Contains(t, err.Error(), "failed to open the metadata for the \"provider\" provider: The request is not supported.") + require.False(t, publisher.Valid()) } func TestPublisherOpenSuccess(t *testing.T) { publisher := NewPublisher() provider := "provider" - openPublisherMetadataProc = SimpleMockProc(5, 0, ErrorSuccess) + defer mockWithDeferredRestore(&openPublisherMetadataProc, SimpleMockProc(5, 0, ErrorSuccess))() err := publisher.Open(provider) require.NoError(t, err) require.Equal(t, uintptr(5), publisher.handle) + require.True(t, publisher.Valid()) } func TestPublisherCloseWhenAlreadyClosed(t *testing.T) { publisher := NewPublisher() err := publisher.Close() require.NoError(t, err) + require.False(t, publisher.Valid()) } func TestPublisherCloseSyscallFailure(t *testing.T) { publisher := Publisher{handle: 5} - closeProc = SimpleMockProc(0, 0, ErrorNotSupported) + defer mockWithDeferredRestore(&closeProc, SimpleMockProc(0, 0, ErrorNotSupported))() err := publisher.Close() require.Error(t, err) require.Contains(t, err.Error(), "failed to close publisher") + require.True(t, publisher.Valid()) } func TestPublisherCloseSuccess(t *testing.T) { publisher := Publisher{handle: 5} + originalCloseProc := closeProc closeProc = SimpleMockProc(1, 0, ErrorSuccess) + defer func() { closeProc = originalCloseProc }() err := publisher.Close() require.NoError(t, err) require.Equal(t, uintptr(0), publisher.handle) + require.False(t, publisher.Valid()) +} + +func mockWithDeferredRestore(call *SyscallProc, mockCall SyscallProc) func() { + original := *call + *call = mockCall + return func() { + *call = original + } } diff --git a/pkg/stanza/operator/input/windows/publishercache.go b/pkg/stanza/operator/input/windows/publishercache.go new file mode 100644 index 0000000000000..e45a0e6bfbc5e --- /dev/null +++ b/pkg/stanza/operator/input/windows/publishercache.go @@ -0,0 +1,50 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +//go:build windows +// +build windows + +package windows // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/input/windows" + +import ( + "errors" +) + +type publisherCache struct { + cache map[string]Publisher +} + +func newPublisherCache() publisherCache { + return publisherCache{ + cache: make(map[string]Publisher), + } +} + +func (c *publisherCache) get(provider string) (publisher Publisher, openPublisherErr error) { + publisher, ok := c.cache[provider] + if ok { + return publisher, nil + } + + publisher = NewPublisher() + err := publisher.Open(provider) + + // Always store the publisher even if there was an error opening it. + c.cache[provider] = publisher + + return publisher, err +} + +func (c *publisherCache) evictAll() error { + var errs error + for _, publisher := range c.cache { + if publisher.Valid() { + if err := publisher.Close(); err != nil { + errs = errors.Join(errs, err) + } + } + } + + c.cache = make(map[string]Publisher) + return errs +} diff --git a/pkg/stanza/operator/input/windows/publishercache_test.go b/pkg/stanza/operator/input/windows/publishercache_test.go new file mode 100644 index 0000000000000..63f29cb688651 --- /dev/null +++ b/pkg/stanza/operator/input/windows/publishercache_test.go @@ -0,0 +1,68 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +//go:build windows +// +build windows + +package windows + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestGetValidPublisher(t *testing.T) { + publisherCache := newPublisherCache() + defer publisherCache.evictAll() + + // Provider "Application" exists in all Windows versions. + publisher, openPublisherErr := publisherCache.get("Application") + require.NoError(t, openPublisherErr) + require.True(t, publisher.Valid()) + + // Get the same publisher again. + publisher, openPublisherErr = publisherCache.get("Application") + require.NoError(t, openPublisherErr) + require.True(t, publisher.Valid()) +} + +func TestGetInvalidPublisher(t *testing.T) { + publisherCache := newPublisherCache() + defer publisherCache.evictAll() + + // Provider "InvalidProvider" does not exist in any Windows version. + publisher, openPublisherErr := publisherCache.get("InvalidProvider") + require.Error(t, openPublisherErr, "%v", publisherCache) + require.False(t, publisher.Valid()) + + // Get "InvalidProvider" publisher again. + publisher, openPublisherErr = publisherCache.get("InvalidProvider") + require.NoError(t, openPublisherErr) // It is cached, no error opening it. + require.False(t, publisher.Valid()) +} + +func TestValidAndInvalidPublishers(t *testing.T) { + publisherCache := newPublisherCache() + defer publisherCache.evictAll() + + // Provider "Application" exists in all Windows versions. + publisher, openPublisherErr := publisherCache.get("Application") + require.NoError(t, openPublisherErr) + require.True(t, publisher.Valid()) + + // Provider "InvalidProvider" does not exist in any Windows version. + publisher, openPublisherErr = publisherCache.get("InvalidProvider") + require.Error(t, openPublisherErr, "%v", publisherCache) + require.False(t, publisher.Valid()) + + // Get the existing publisher again. + publisher, openPublisherErr = publisherCache.get("Application") + require.NoError(t, openPublisherErr) + require.True(t, publisher.Valid()) + + // Get "InvalidProvider" publisher again. + publisher, openPublisherErr = publisherCache.get("InvalidProvider") + require.NoError(t, openPublisherErr) // It is cached, no error opening it. + require.False(t, publisher.Valid()) +}