Skip to content

Commit

Permalink
[chore] polish sharedcomponent API (#9157)
Browse files Browse the repository at this point in the history
Attempt to simplify as much as possible the API exposed by the
sharedcomponent package ahead of exposing it as part of a published
module.

Relates to #9156 

Changes:
* Remove the map field in the struct, make the struct a map
* Remove the `NewSharedComponents` function, just initialize the map
instead.
* Rename GetOrAdd to LoadOrStore
  • Loading branch information
atoulme committed Dec 21, 2023
1 parent a1214d1 commit 1c84578
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 70 deletions.
81 changes: 40 additions & 41 deletions internal/sharedcomponent/sharedcomponent.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

// Package sharedcomponent exposes util functionality for receivers and exporters
// that need to share state between different signal types instances such as net.Listener or os.File.
// Package sharedcomponent exposes functionality for components
// to register against a shared key, such as a configuration object, in order to be reused across signal types.
// This is particularly useful when the component relies on a shared resource such as os.File or http.Server.
package sharedcomponent // import "go.opentelemetry.io/collector/internal/sharedcomponent"

import (
Expand All @@ -12,23 +13,21 @@ import (
"go.opentelemetry.io/collector/component"
)

// SharedComponents a map that keeps reference of all created instances for a given configuration,
// and ensures that the shared state is started and stopped only once.
type SharedComponents[K comparable, V component.Component] struct {
comps map[K]*SharedComponent[V]
func NewMap[K comparable, V component.Component]() *Map[K, V] {
return &Map[K, V]{
components: map[K]*Component[V]{},
}
}

// NewSharedComponents returns a new empty SharedComponents.
func NewSharedComponents[K comparable, V component.Component]() *SharedComponents[K, V] {
return &SharedComponents[K, V]{
comps: make(map[K]*SharedComponent[V]),
}
// Map keeps reference of all created instances for a given shared key such as a component configuration.
type Map[K comparable, V component.Component] struct {
components map[K]*Component[V]
}

// GetOrAdd returns the already created instance if exists, otherwise creates a new instance
// LoadOrStore returns the already created instance if exists, otherwise creates a new instance
// and adds it to the map of references.
func (scs *SharedComponents[K, V]) GetOrAdd(key K, create func() (V, error), telemetrySettings *component.TelemetrySettings) (*SharedComponent[V], error) {
if c, ok := scs.comps[key]; ok {
func (m *Map[K, V]) LoadOrStore(key K, create func() (V, error), telemetrySettings *component.TelemetrySettings) (*Component[V], error) {
if c, ok := m.components[key]; ok {
// If we haven't already seen this telemetry settings, this shared component represents
// another instance. Wrap ReportComponentStatus to report for all instances this shared
// component represents.
Expand All @@ -50,23 +49,23 @@ func (scs *SharedComponents[K, V]) GetOrAdd(key K, create func() (V, error), tel
return nil, err
}

newComp := &SharedComponent[V]{
newComp := &Component[V]{
component: comp,
removeFunc: func() {
delete(scs.comps, key)
delete(m.components, key)
},
telemetry: telemetrySettings,
seenSettings: map[*component.TelemetrySettings]struct{}{
telemetrySettings: {},
},
}
scs.comps[key] = newComp
m.components[key] = newComp
return newComp, nil
}

// SharedComponent ensures that the wrapped component is started and stopped only once.
// When stopped it is removed from the SharedComponents map.
type SharedComponent[V component.Component] struct {
// Component ensures that the wrapped component is started and stopped only once.
// When stopped it is removed from the Map.
type Component[V component.Component] struct {
component V

startOnce sync.Once
Expand All @@ -78,42 +77,42 @@ type SharedComponent[V component.Component] struct {
}

// Unwrap returns the original component.
func (r *SharedComponent[V]) Unwrap() V {
return r.component
func (c *Component[V]) Unwrap() V {
return c.component
}

// Start implements component.Component.
func (r *SharedComponent[V]) Start(ctx context.Context, host component.Host) error {
// Start starts the underlying component if it never started before.
func (c *Component[V]) Start(ctx context.Context, host component.Host) error {
var err error
r.startOnce.Do(func() {
// It's important that status for a sharedcomponent is reported through its
// telemetrysettings to keep status in sync and avoid race conditions. This logic duplicates
c.startOnce.Do(func() {
// It's important that status for a shared component is reported through its
// telemetry settings to keep status in sync and avoid race conditions. This logic duplicates
// and takes priority over the automated status reporting that happens in graph, making the
// status reporting in graph a no-op.
_ = r.telemetry.ReportComponentStatus(component.NewStatusEvent(component.StatusStarting))
if err = r.component.Start(ctx, host); err != nil {
_ = r.telemetry.ReportComponentStatus(component.NewPermanentErrorEvent(err))
_ = c.telemetry.ReportComponentStatus(component.NewStatusEvent(component.StatusStarting))
if err = c.component.Start(ctx, host); err != nil {
_ = c.telemetry.ReportComponentStatus(component.NewPermanentErrorEvent(err))
}
})
return err
}

// Shutdown implements component.Component.
func (r *SharedComponent[V]) Shutdown(ctx context.Context) error {
// Shutdown shuts down the underlying component.
func (c *Component[V]) Shutdown(ctx context.Context) error {
var err error
r.stopOnce.Do(func() {
// It's important that status for a sharedcomponent is reported through its
// telemetrysettings to keep status in sync and avoid race conditions. This logic duplicates
c.stopOnce.Do(func() {
// It's important that status for a shared component is reported through its
// telemetry settings to keep status in sync and avoid race conditions. This logic duplicates
// and takes priority over the automated status reporting that happens in graph, making the
// the status reporting in graph a no-op.
_ = r.telemetry.ReportComponentStatus(component.NewStatusEvent(component.StatusStopping))
err = r.component.Shutdown(ctx)
// status reporting in graph a no-op.
_ = c.telemetry.ReportComponentStatus(component.NewStatusEvent(component.StatusStopping))
err = c.component.Shutdown(ctx)
if err != nil {
_ = r.telemetry.ReportComponentStatus(component.NewPermanentErrorEvent(err))
_ = c.telemetry.ReportComponentStatus(component.NewPermanentErrorEvent(err))
} else {
_ = r.telemetry.ReportComponentStatus(component.NewStatusEvent(component.StatusStopped))
_ = c.telemetry.ReportComponentStatus(component.NewStatusEvent(component.StatusStopped))
}
r.removeFunc()
c.removeFunc()
})
return err
}
50 changes: 25 additions & 25 deletions internal/sharedcomponent/sharedcomponent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,37 +23,37 @@ type baseComponent struct {
telemetry *component.TelemetrySettings
}

func TestNewSharedComponents(t *testing.T) {
comps := NewSharedComponents[component.ID, *baseComponent]()
assert.Len(t, comps.comps, 0)
func TestNewMap(t *testing.T) {
comps := NewMap[component.ID, *baseComponent]()
assert.Len(t, comps.components, 0)
}

func TestNewSharedComponentsCreateError(t *testing.T) {
comps := NewSharedComponents[component.ID, *baseComponent]()
assert.Len(t, comps.comps, 0)
comps := NewMap[component.ID, *baseComponent]()
assert.Len(t, comps.components, 0)
myErr := errors.New("my error")
_, err := comps.GetOrAdd(
_, err := comps.LoadOrStore(
id,
func() (*baseComponent, error) { return nil, myErr },
newNopTelemetrySettings(),
)
assert.ErrorIs(t, err, myErr)
assert.Len(t, comps.comps, 0)
assert.Len(t, comps.components, 0)
}

func TestSharedComponentsGetOrAdd(t *testing.T) {
func TestSharedComponentsLoadOrStore(t *testing.T) {
nop := &baseComponent{}

comps := NewSharedComponents[component.ID, *baseComponent]()
got, err := comps.GetOrAdd(
comps := NewMap[component.ID, *baseComponent]()
got, err := comps.LoadOrStore(
id,
func() (*baseComponent, error) { return nop, nil },
newNopTelemetrySettings(),
)
require.NoError(t, err)
assert.Len(t, comps.comps, 1)
assert.Len(t, comps.components, 1)
assert.Same(t, nop, got.Unwrap())
gotSecond, err := comps.GetOrAdd(
gotSecond, err := comps.LoadOrStore(
id,
func() (*baseComponent, error) { panic("should not be called") },
newNopTelemetrySettings(),
Expand All @@ -64,8 +64,8 @@ func TestSharedComponentsGetOrAdd(t *testing.T) {

// Shutdown nop will remove
assert.NoError(t, got.Shutdown(context.Background()))
assert.Len(t, comps.comps, 0)
gotThird, err := comps.GetOrAdd(
assert.Len(t, comps.components, 0)
gotThird, err := comps.LoadOrStore(
id,
func() (*baseComponent, error) { return nop, nil },
newNopTelemetrySettings(),
Expand All @@ -88,8 +88,8 @@ func TestSharedComponent(t *testing.T) {
return wantErr
}}

comps := NewSharedComponents[component.ID, *baseComponent]()
got, err := comps.GetOrAdd(
comps := NewMap[component.ID, *baseComponent]()
got, err := comps.LoadOrStore(
id,
func() (*baseComponent, error) { return comp, nil },
newNopTelemetrySettings(),
Expand All @@ -100,13 +100,13 @@ func TestSharedComponent(t *testing.T) {
// Second time is not called anymore.
assert.NoError(t, got.Start(context.Background(), componenttest.NewNopHost()))
assert.Equal(t, 1, calledStart)
// first time, shutdown is called.
assert.Equal(t, wantErr, got.Shutdown(context.Background()))
assert.Equal(t, 1, calledStop)
// Second time is not called anymore.
assert.NoError(t, got.Shutdown(context.Background()))
assert.Equal(t, 1, calledStop)
}

func TestSharedComponentsReportStatus(t *testing.T) {
reportedStatuses := make(map[*component.InstanceID][]component.Status)
newStatusFunc := func() func(*component.StatusEvent) error {
Expand All @@ -122,31 +122,31 @@ func TestSharedComponentsReportStatus(t *testing.T) {
}

comp := &baseComponent{}
comps := NewSharedComponents[component.ID, *baseComponent]()
comps := NewMap[component.ID, *baseComponent]()
var telemetrySettings *component.TelemetrySettings

// make a shared component that represents three instances
for i := 0; i < 3; i++ {
telemetrySettings = newNopTelemetrySettings()
telemetrySettings.ReportComponentStatus = newStatusFunc()
// The initial settings for the shared component need to match the ones passed to the first
// invocation of GetOrAdd so that underlying telemetry settings reference can be used to
// invocation of LoadOrStore so that underlying telemetry settings reference can be used to
// wrap ReportComponentStatus for subsequently added "instances".
if i == 0 {
comp.telemetry = telemetrySettings
}
got, err := comps.GetOrAdd(
got, err := comps.LoadOrStore(
id,
func() (*baseComponent, error) { return comp, nil },
telemetrySettings,
)
require.NoError(t, err)
assert.Len(t, comps.comps, 1)
assert.Len(t, comps.components, 1)
assert.Same(t, comp, got.Unwrap())
}

// make sure we don't try to represent a fourth instance if we reuse a telemetrySettings
_, _ = comps.GetOrAdd(
_, _ = comps.LoadOrStore(
id,
func() (*baseComponent, error) { return comp, nil },
telemetrySettings,
Expand Down Expand Up @@ -245,16 +245,16 @@ func TestReportStatusOnStartShutdown(t *testing.T) {
return tc.shutdownErr
}
}
comps := NewSharedComponents[component.ID, *baseComponent]()
var comp *SharedComponent[*baseComponent]
comps := NewMap[component.ID, *baseComponent]()
var comp *Component[*baseComponent]
var err error
for i := 0; i < 3; i++ {
telemetrySettings := newNopTelemetrySettings()
telemetrySettings.ReportComponentStatus = newStatusFunc()
if i == 0 {
base.telemetry = telemetrySettings
}
comp, err = comps.GetOrAdd(
comp, err = comps.LoadOrStore(
id,
func() (*baseComponent, error) { return base, nil },
telemetrySettings,
Expand Down
8 changes: 4 additions & 4 deletions receiver/otlpreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func createTraces(
nextConsumer consumer.Traces,
) (receiver.Traces, error) {
oCfg := cfg.(*Config)
r, err := receivers.GetOrAdd(
r, err := receivers.LoadOrStore(
oCfg,
func() (*otlpReceiver, error) {
return newOtlpReceiver(oCfg, &set)
Expand All @@ -93,7 +93,7 @@ func createMetrics(
consumer consumer.Metrics,
) (receiver.Metrics, error) {
oCfg := cfg.(*Config)
r, err := receivers.GetOrAdd(
r, err := receivers.LoadOrStore(
oCfg,
func() (*otlpReceiver, error) {
return newOtlpReceiver(oCfg, &set)
Expand All @@ -118,7 +118,7 @@ func createLog(
consumer consumer.Logs,
) (receiver.Logs, error) {
oCfg := cfg.(*Config)
r, err := receivers.GetOrAdd(
r, err := receivers.LoadOrStore(
oCfg,
func() (*otlpReceiver, error) {
return newOtlpReceiver(oCfg, &set)
Expand All @@ -141,4 +141,4 @@ func createLog(
// create separate objects, they must use one otlpReceiver object per configuration.
// When the receiver is shutdown it should be removed from this map so the same configuration
// can be recreated successfully.
var receivers = sharedcomponent.NewSharedComponents[*Config, *otlpReceiver]()
var receivers = sharedcomponent.NewMap[*Config, *otlpReceiver]()

0 comments on commit 1c84578

Please sign in to comment.