Skip to content

Commit

Permalink
[receiver/mongodbatlas] Fix issue where storage client can timeout if…
Browse files Browse the repository at this point in the history
… both events and alerts are configured (open-telemetry#19435)

* fix issue where alerts and events compete for storageClient access
  • Loading branch information
schmikei committed Mar 13, 2023
1 parent 5fda2a7 commit 0e6db1c
Show file tree
Hide file tree
Showing 8 changed files with 71 additions and 54 deletions.
16 changes: 16 additions & 0 deletions .chloggen/mongodbatlas-shared-storage-id.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: mongodbatlasreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Fixes issue where filestorageextension usage with the mongodbatlasreceiver would cause a race condition/timeout

# One or more tracking issues related to the change
issues: [19434]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
17 changes: 4 additions & 13 deletions receiver/mongodbatlasreceiver/alerts.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ import (
"go.uber.org/multierr"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/adapter"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/mongodbatlasreceiver/internal"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/mongodbatlasreceiver/internal/model"
)
Expand Down Expand Up @@ -91,8 +90,6 @@ type alertsReceiver struct {
pageSize int64
maxPages int64
doneChan chan bool
id component.ID // ID of the receiver component
storageID *component.ID // ID of the storage extension component
storageClient storage.Client
}

Expand Down Expand Up @@ -129,8 +126,6 @@ func newAlertsReceiver(params rcvr.CreateSettings, baseConfig *Config, consumer
pageSize: baseConfig.Alerts.PageSize,
doneChan: make(chan bool, 1),
logger: params.Logger,
id: params.ID,
storageID: baseConfig.StorageID,
}

if recv.mode == alertModePoll {
Expand All @@ -145,21 +140,17 @@ func newAlertsReceiver(params rcvr.CreateSettings, baseConfig *Config, consumer
return recv, nil
}

func (a *alertsReceiver) Start(ctx context.Context, host component.Host) error {
func (a *alertsReceiver) Start(ctx context.Context, host component.Host, storageClient storage.Client) error {
if a.mode == alertModePoll {
return a.startPolling(ctx, host)
return a.startPolling(ctx, storageClient)
}
return a.startListening(ctx, host)
}

func (a *alertsReceiver) startPolling(ctx context.Context, host component.Host) error {
func (a *alertsReceiver) startPolling(ctx context.Context, storageClient storage.Client) error {
a.logger.Debug("starting alerts receiver in retrieval mode")
storageClient, err := adapter.GetStorageClient(ctx, host, a.storageID, a.id)
if err != nil {
return fmt.Errorf("failed to set up storage: %w", err)
}
a.storageClient = storageClient
err = a.syncPersistence(ctx)
err := a.syncPersistence(ctx)
if err != nil {
a.logger.Error("there was an error syncing the receiver with checkpoint", zap.Error(err))
}
Expand Down
5 changes: 3 additions & 2 deletions receiver/mongodbatlasreceiver/alerts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.opentelemetry.io/collector/extension/experimental/storage"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/receiver/receivertest"
Expand Down Expand Up @@ -600,7 +601,7 @@ func TestAlertsRetrieval(t *testing.T) {
require.NoError(t, err)
alertsRcvr.client = tc.client()

err = alertsRcvr.Start(context.Background(), componenttest.NewNopHost())
err = alertsRcvr.Start(context.Background(), componenttest.NewNopHost(), storage.NewNopClient())
require.NoError(t, err)

require.Eventually(t, func() bool {
Expand Down Expand Up @@ -635,7 +636,7 @@ func TestAlertPollingExclusions(t *testing.T) {
require.NoError(t, err)
alertsRcvr.client = testClient()

err = alertsRcvr.Start(context.Background(), componenttest.NewNopHost())
err = alertsRcvr.Start(context.Background(), componenttest.NewNopHost(), storage.NewNopClient())
require.NoError(t, err)

require.Never(t, func() bool {
Expand Down
20 changes: 15 additions & 5 deletions receiver/mongodbatlasreceiver/combined_logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,34 @@ package mongodbatlasreceiver // import "github.com/open-telemetry/opentelemetry-

import (
"context"
"fmt"

"go.opentelemetry.io/collector/component"
"go.uber.org/multierr"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/adapter"
)

// combinedLogsReceiver wraps alerts and log receivers in a single log receiver to be consumed by the factory
type combinedLogsReceiver struct {
alerts *alertsReceiver
logs *logsReceiver
events *eventsReceiver
alerts *alertsReceiver
logs *logsReceiver
events *eventsReceiver
storageID *component.ID
id component.ID
}

// Starts up the combined MongoDB Atlas Logs and Alert Receiver
func (c *combinedLogsReceiver) Start(ctx context.Context, host component.Host) error {
var errs error

storageClient, err := adapter.GetStorageClient(ctx, host, c.storageID, c.id)
if err != nil {
return fmt.Errorf("failed to get storage client: %w", err)
}

if c.alerts != nil {
if err := c.alerts.Start(ctx, host); err != nil {
if err := c.alerts.Start(ctx, host, storageClient); err != nil {
errs = multierr.Append(errs, err)
}
}
Expand All @@ -45,7 +55,7 @@ func (c *combinedLogsReceiver) Start(ctx context.Context, host component.Host) e
}

if c.events != nil {
if err := c.events.Start(ctx, host); err != nil {
if err := c.events.Start(ctx, host, storageClient); err != nil {
errs = multierr.Append(errs, err)
}
}
Expand Down
11 changes: 1 addition & 10 deletions receiver/mongodbatlasreceiver/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
rcvr "go.opentelemetry.io/collector/receiver"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/adapter"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/mongodbatlasreceiver/internal"
)

Expand All @@ -49,8 +48,6 @@ type eventsClient interface {
type eventsReceiver struct {
client eventsClient
logger *zap.Logger
id component.ID // ID of the receiver component
storageID *component.ID // ID of the storage extension component
storageClient storage.Client
cfg *Config
consumer consumer.Logs
Expand All @@ -72,8 +69,6 @@ func newEventsReceiver(settings rcvr.CreateSettings, c *Config, consumer consume
client: internal.NewMongoDBAtlasClient(c.PublicKey, c.PrivateKey, c.RetrySettings, settings.Logger),
cfg: c,
logger: settings.Logger,
id: settings.ID,
storageID: c.StorageID,
consumer: consumer,
pollInterval: c.Events.PollInterval,
wg: &sync.WaitGroup{},
Expand All @@ -97,14 +92,10 @@ func newEventsReceiver(settings rcvr.CreateSettings, c *Config, consumer consume
return r
}

func (er *eventsReceiver) Start(ctx context.Context, host component.Host) error {
func (er *eventsReceiver) Start(ctx context.Context, host component.Host, storageClient storage.Client) error {
er.logger.Debug("Starting up events receiver")
cancelCtx, cancel := context.WithCancel(ctx)
er.cancel = cancel
storageClient, err := adapter.GetStorageClient(cancelCtx, host, er.storageID, er.id)
if err != nil {
return fmt.Errorf("failed to get storage client: %w", err)
}
er.storageClient = storageClient
er.loadCheckpoint(cancelCtx)

Expand Down
28 changes: 5 additions & 23 deletions receiver/mongodbatlasreceiver/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package mongodbatlasreceiver
import (
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"os"
Expand All @@ -28,9 +27,9 @@ import (
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"go.mongodb.org/atlas/mongodbatlas"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/extension/experimental/storage"
"go.opentelemetry.io/collector/receiver/receivertest"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/golden"
Expand Down Expand Up @@ -60,29 +59,12 @@ func TestStartAndShutdown(t *testing.T) {
return cfg
},
},
{
desc: "invalid storage config",
getConfig: func() *Config {
cfg := createDefaultConfig().(*Config)
cfg.StorageID = &component.ID{}
cfg.Events = &EventsConfig{
Projects: []*ProjectConfig{
{
Name: testProjectName,
},
},
PollInterval: time.Minute,
}
return cfg
},
expectedStartErr: errors.New("failed to get storage client"),
},
}
for _, tc := range cases {
t.Run(tc.desc, func(t *testing.T) {
sink := &consumertest.LogsSink{}
r := newEventsReceiver(receivertest.NewNopCreateSettings(), tc.getConfig(), sink)
err := r.Start(context.Background(), componenttest.NewNopHost())
err := r.Start(context.Background(), componenttest.NewNopHost(), storage.NewNopClient())
if tc.expectedStartErr != nil {
require.ErrorContains(t, err, tc.expectedStartErr.Error())
} else {
Expand Down Expand Up @@ -115,7 +97,7 @@ func TestContextDone(t *testing.T) {
r.client = mClient

ctx, cancel := context.WithCancel(context.Background())
err := r.Start(ctx, componenttest.NewNopHost())
err := r.Start(ctx, componenttest.NewNopHost(), storage.NewNopClient())
require.NoError(t, err)
cancel()

Expand Down Expand Up @@ -144,7 +126,7 @@ func TestPoll(t *testing.T) {
mClient.setupMock(t)
r.client = mClient

err := r.Start(context.Background(), componenttest.NewNopHost())
err := r.Start(context.Background(), componenttest.NewNopHost(), storage.NewNopClient())
require.NoError(t, err)

require.Eventually(t, func() bool {
Expand Down Expand Up @@ -177,7 +159,7 @@ func TestProjectGetFailure(t *testing.T) {
mClient := &mockEventsClient{}
mClient.On("GetProject", mock.Anything, "fake-project").Return(nil, fmt.Errorf("unable to get project: %d", http.StatusUnauthorized))

err := r.Start(context.Background(), componenttest.NewNopHost())
err := r.Start(context.Background(), componenttest.NewNopHost(), storage.NewNopClient())
require.NoError(t, err)

require.Never(t, func() bool {
Expand Down
5 changes: 4 additions & 1 deletion receiver/mongodbatlasreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,10 @@ func createCombinedLogReceiver(
}

var err error
recv := &combinedLogsReceiver{}
recv := &combinedLogsReceiver{
id: params.ID,
storageID: cfg.StorageID,
}

if cfg.Alerts.Enabled {
recv.alerts, err = newAlertsReceiver(params, cfg, consumer)
Expand Down
23 changes: 23 additions & 0 deletions receiver/mongodbatlasreceiver/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@ package mongodbatlasreceiver // import "github.com/open-telemetry/opentelemetry-
import (
"context"
"testing"
"time"

"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/configtls"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/receiver/receivertest"
Expand All @@ -44,3 +47,23 @@ func TestBadAlertsReceiver(t *testing.T) {
require.Error(t, err)
require.ErrorContains(t, err, "unable to create a MongoDB Atlas Alerts Receiver")
}

func TestBadStorageExtension(t *testing.T) {
cfg := createDefaultConfig().(*Config)
cfg.StorageID = &component.ID{}
cfg.Events = &EventsConfig{
Projects: []*ProjectConfig{
{
Name: testProjectName,
},
},
PollInterval: time.Minute,
}

params := receivertest.NewNopCreateSettings()
lr, err := createCombinedLogReceiver(context.Background(), params, cfg, consumertest.NewNop())
require.NoError(t, err)

err = lr.Start(context.Background(), componenttest.NewNopHost())
require.ErrorContains(t, err, "failed to get storage client")
}

0 comments on commit 0e6db1c

Please sign in to comment.