Skip to content

Commit

Permalink
[chore] ensure all components tested under lifecycle don't panic if s…
Browse files Browse the repository at this point in the history
…hutdown is called first (open-telemetry#17199)

* ensure all components tested under lifecycle don't panic if shutdown is called first

* Update internal/components/extensions_test.go

Co-authored-by: Sean Marciniak <[email protected]>

* address code review

Co-authored-by: Sean Marciniak <[email protected]>
  • Loading branch information
atoulme and MovieStoreGuy committed Dec 27, 2022
1 parent 67b059e commit add0772
Show file tree
Hide file tree
Showing 20 changed files with 150 additions and 2 deletions.
3 changes: 3 additions & 0 deletions extension/fluentbitextension/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ func (pm *processManager) Start(ctx context.Context, _ component.Host) error {

// Shutdown is invoked during service shutdown.
func (pm *processManager) Shutdown(context.Context) error {
if pm.cancel == nil {
return nil
}
pm.cancel()
t := time.NewTimer(5 * time.Second)

Expand Down
3 changes: 3 additions & 0 deletions extension/httpforwarder/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ func (h *httpForwarder) Start(_ context.Context, host component.Host) error {
}

func (h *httpForwarder) Shutdown(_ context.Context) error {
if h.server == nil {
return nil
}
return h.server.Close()
}

Expand Down
3 changes: 3 additions & 0 deletions extension/storage/dbstorage/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ func (ds *databaseStorage) Start(context.Context, component.Host) error {

// Shutdown closes the connection to the database
func (ds *databaseStorage) Shutdown(context.Context) error {
if ds.db == nil {
return nil
}
return ds.db.Close()
}

Expand Down
27 changes: 27 additions & 0 deletions internal/components/exporters_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,7 @@ func TestDefaultExporters(t *testing.T) {
}

verifyExporterLifecycle(t, factory, tt.getConfigFn)
verifyExporterShutdown(t, factory, tt.getConfigFn)
})
}
}
Expand Down Expand Up @@ -508,6 +509,32 @@ func verifyExporterLifecycle(t *testing.T, factory exporter.Factory, getConfigFn
}
}

// verifyExporterShutdown is used to test if an exporter type can be shutdown without being started first.
func verifyExporterShutdown(tb testing.TB, factory exporter.Factory, getConfigFn getExporterConfigFn) {
ctx := context.Background()
expCreateSettings := exportertest.NewNopCreateSettings()

if getConfigFn == nil {
getConfigFn = factory.CreateDefaultConfig
}

createFns := []createExporterFn{
wrapCreateLogsExp(factory),
wrapCreateTracesExp(factory),
wrapCreateMetricsExp(factory),
}

for _, createFn := range createFns {
r, err := createFn(ctx, expCreateSettings, getConfigFn())
if errors.Is(err, component.ErrDataTypeIsNotSupported) {
continue
}
assert.NotPanics(tb, func() {
assert.NoError(tb, r.Shutdown(ctx))
})
}
}

type createExporterFn func(
ctx context.Context,
set exporter.CreateSettings,
Expand Down
17 changes: 17 additions & 0 deletions internal/components/extensions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ func TestDefaultExtensions(t *testing.T) {
}

verifyExtensionLifecycle(t, factory, tt.getConfigFn)
verifyExtensionShutdown(t, factory, tt.getConfigFn)
})
}
}
Expand Down Expand Up @@ -267,6 +268,22 @@ func verifyExtensionLifecycle(t *testing.T, factory extension.Factory, getConfig
require.NoError(t, secondExt.Shutdown(ctx))
}

// verifyExtensionShutdown is used to test if an extension type can be shutdown without being started first.
func verifyExtensionShutdown(tb testing.TB, factory extension.Factory, getConfigFn getExtensionConfigFn) {
ctx := context.Background()
extCreateSet := extensiontest.NewNopCreateSettings()

if getConfigFn == nil {
getConfigFn = factory.CreateDefaultConfig
}

e, _ := factory.CreateExtension(ctx, extCreateSet, getConfigFn())

assert.NotPanics(tb, func() {
assert.NoError(tb, e.Shutdown(ctx))
})
}

// assertNoErrorHost implements a component.Host that asserts that there were no errors.
type assertNoErrorHost struct {
component.Host
Expand Down
30 changes: 29 additions & 1 deletion internal/components/processors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ func TestDefaultProcessors(t *testing.T) {
return
}
verifyProcessorLifecycle(t, factory, tt.getConfigFn)
verifyProcessorShutdown(t, factory, tt.getConfigFn)
})
}
}
Expand All @@ -165,7 +166,7 @@ func TestDefaultProcessors(t *testing.T) {
// default configuration.
type getProcessorConfigFn func() component.Config

// verifyProcessorLifecycle is used to test if an processor type can handle the typical
// verifyProcessorLifecycle is used to test if a processor type can handle the typical
// lifecycle of a component. The getConfigFn parameter only need to be specified if
// the test can't be done with the default configuration for the component.
func verifyProcessorLifecycle(t *testing.T, factory processor.Factory, getConfigFn getProcessorConfigFn) {
Expand Down Expand Up @@ -199,6 +200,33 @@ func verifyProcessorLifecycle(t *testing.T, factory processor.Factory, getConfig
}
}

// verifyProcessorShutdown is used to test if a processor type can be shutdown without being started first.
// We disregard errors being returned by shutdown, we're just making sure the processors don't panic.
func verifyProcessorShutdown(tb testing.TB, factory processor.Factory, getConfigFn getProcessorConfigFn) {
ctx := context.Background()
processorCreationSet := processortest.NewNopCreateSettings()

if getConfigFn == nil {
getConfigFn = factory.CreateDefaultConfig
}

createFns := []createProcessorFn{
wrapCreateLogsProc(factory),
wrapCreateTracesProc(factory),
wrapCreateMetricsProc(factory),
}

for _, createFn := range createFns {
p, err := createFn(ctx, processorCreationSet, getConfigFn())
if errors.Is(err, component.ErrDataTypeIsNotSupported) {
continue
}
assert.NotPanics(tb, func() {
_ = p.Shutdown(ctx)
})
}
}

type createProcessorFn func(
ctx context.Context,
set processor.CreateSettings,
Expand Down
27 changes: 27 additions & 0 deletions internal/components/receivers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,7 @@ func TestDefaultReceivers(t *testing.T) {
}

verifyReceiverLifecycle(t, factory, tt.getConfigFn)
verifyReceiverShutdown(t, factory, tt.getConfigFn)
})
}
}
Expand Down Expand Up @@ -480,6 +481,32 @@ func verifyReceiverLifecycle(t *testing.T, factory receiver.Factory, getConfigFn
}
}

// verifyReceiverShutdown is used to test if a receiver type can be shutdown without being started first.
func verifyReceiverShutdown(tb testing.TB, factory receiver.Factory, getConfigFn getReceiverConfigFn) {
ctx := context.Background()
receiverCreateSet := receivertest.NewNopCreateSettings()

if getConfigFn == nil {
getConfigFn = factory.CreateDefaultConfig
}

createFns := []createReceiverFn{
wrapCreateLogsRcvr(factory),
wrapCreateTracesRcvr(factory),
wrapCreateMetricsRcvr(factory),
}

for _, createFn := range createFns {
r, err := createFn(ctx, receiverCreateSet, getConfigFn())
if errors.Is(err, component.ErrDataTypeIsNotSupported) {
continue
}
assert.NotPanics(tb, func() {
assert.NoError(tb, r.Shutdown(ctx))
})
}
}

// assertNoErrorHost implements a component.Host that asserts that there were no errors.
type createReceiverFn func(
ctx context.Context,
Expand Down
3 changes: 3 additions & 0 deletions internal/coreinternal/timeutils/ticker_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ func (pt *PolicyTicker) OnTick() {
}

func (pt *PolicyTicker) Stop() {
if pt.StopCh == nil {
return
}
close(pt.StopCh)
pt.Ticker.Stop()
}
4 changes: 4 additions & 0 deletions pkg/stanza/adapter/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,10 @@ func (r *receiver) consumerLoop(ctx context.Context) {

// Shutdown is invoked during service shutdown
func (r *receiver) Shutdown(ctx context.Context) error {
if r.cancel == nil {
return nil
}

r.logger.Info("Stopping stanza receiver")
pipelineErr := r.pipe.Stop()
r.converter.Stop()
Expand Down
3 changes: 3 additions & 0 deletions pkg/stanza/operator/input/tcp/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,9 @@ func (t *Input) goHandleMessages(ctx context.Context, conn net.Conn, cancel cont

// Stop will stop listening for log entries over TCP.
func (t *Input) Stop() error {
if t.cancel == nil {
return nil
}
t.cancel()

if t.listener != nil {
Expand Down
3 changes: 3 additions & 0 deletions pkg/stanza/operator/input/udp/udp.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,9 @@ func (u *Input) readMessage() ([]byte, net.Addr, error) {

// Stop will stop listening for udp messages.
func (u *Input) Stop() error {
if u.cancel == nil {
return nil
}
u.cancel()
if u.connection != nil {
if err := u.connection.Close(); err != nil {
Expand Down
3 changes: 3 additions & 0 deletions receiver/awsfirehosereceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,9 @@ func (fmr *firehoseReceiver) Start(_ context.Context, host component.Host) error
// giving it a chance to perform any necessary clean-up and
// shutting down its HTTP server.
func (fmr *firehoseReceiver) Shutdown(context.Context) error {
if fmr.server == nil {
return nil
}
err := fmr.server.Close()
fmr.shutdownWG.Wait()
return err
Expand Down
3 changes: 3 additions & 0 deletions receiver/fluentforwardreceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ func (r *fluentReceiver) Start(ctx context.Context, _ component.Host) error {
}

func (r *fluentReceiver) Shutdown(context.Context) error {
if r.listener == nil {
return nil
}
r.listener.Close()
r.cancel()
return nil
Expand Down
3 changes: 3 additions & 0 deletions receiver/googlecloudspannerreceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ func (r *googleCloudSpannerReceiver) Shutdown(context.Context) error {
projectReader.Shutdown()
}

if r.metricsBuilder == nil {
return nil
}
err := r.metricsBuilder.Shutdown()
if err != nil {
return err
Expand Down
5 changes: 4 additions & 1 deletion receiver/influxdbreceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,10 @@ func (r *metricsReceiver) Start(_ context.Context, host component.Host) error {
return nil
}

func (r *metricsReceiver) Shutdown(ctx context.Context) error {
func (r *metricsReceiver) Shutdown(_ context.Context) error {
if r.server == nil {
return nil
}
if err := r.server.Close(); err != nil {
return err
}
Expand Down
3 changes: 3 additions & 0 deletions receiver/receivercreator/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,5 +120,8 @@ func (rc *receiverCreator) Shutdown(context.Context) error {
for _, observable := range rc.observables {
observable.Unsubscribe(rc.observerHandler)
}
if rc.observerHandler == nil {
return nil
}
return rc.observerHandler.shutdown()
}
3 changes: 3 additions & 0 deletions receiver/sapmreceiver/trace_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,9 @@ func (sr *sapmReceiver) Start(_ context.Context, host component.Host) error {

// Shutdown stops the the sapmReceiver's server.
func (sr *sapmReceiver) Shutdown(context.Context) error {
if sr.server == nil {
return nil
}
err := sr.server.Close()
sr.shutdownWG.Wait()
return err
Expand Down
3 changes: 3 additions & 0 deletions receiver/signalfxreceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,9 @@ func (r *sfxReceiver) Start(_ context.Context, host component.Host) error {
// Shutdown tells the receiver that should stop reception,
// giving it a chance to perform any necessary clean-up.
func (r *sfxReceiver) Shutdown(context.Context) error {
if r.server == nil {
return nil
}
err := r.server.Close()
r.shutdownWG.Wait()
return err
Expand Down
3 changes: 3 additions & 0 deletions receiver/simpleprometheusreceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,5 +148,8 @@ func getPrometheusConfig(cfg *Config) (*prometheusreceiver.Config, error) {

// Shutdown stops the underlying Prometheus receiver.
func (prw *prometheusReceiverWrapper) Shutdown(ctx context.Context) error {
if prw.prometheusRecever == nil {
return nil
}
return prw.prometheusRecever.Shutdown(ctx)
}
3 changes: 3 additions & 0 deletions receiver/statsdreceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,9 @@ func (r *statsdReceiver) Start(ctx context.Context, host component.Host) error {

// Shutdown stops the StatsD receiver.
func (r *statsdReceiver) Shutdown(context.Context) error {
if r.cancel == nil {
return nil
}
err := r.server.Close()
r.cancel()
return err
Expand Down

0 comments on commit add0772

Please sign in to comment.