Skip to content

Commit

Permalink
Rename ProducerConsumerQueue to Queue (#8838)
Browse files Browse the repository at this point in the history
* Rename Stop to Shutdown, add context and return error.
* Fix test nits.
* Return errors from Shutdown instead of log them.

Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu committed Nov 10, 2023
1 parent 36a89bc commit fd79d43
Show file tree
Hide file tree
Showing 13 changed files with 163 additions and 232 deletions.
15 changes: 9 additions & 6 deletions exporter/exporterhelper/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package exporterhelper // import "go.opentelemetry.io/collector/exporter/exporte
import (
"context"

"go.uber.org/multierr"
"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
Expand All @@ -17,7 +18,7 @@ import (
// requestSender is an abstraction of a sender for a request independent of the type of the data (traces, metrics, logs).
type requestSender interface {
start(ctx context.Context, host component.Host, set exporter.CreateSettings) error
shutdown()
shutdown(ctx context.Context) error
send(req internal.Request) error
setNextSender(nextSender requestSender)
}
Expand All @@ -32,7 +33,9 @@ func (b *baseRequestSender) start(context.Context, component.Host, exporter.Crea
return nil
}

func (b *baseRequestSender) shutdown() {}
func (b *baseRequestSender) shutdown(context.Context) error {
return nil
}

func (b *baseRequestSender) send(req internal.Request) error {
return b.nextSender.send(req)
Expand Down Expand Up @@ -127,7 +130,7 @@ func WithQueue(config QueueSettings) Option {
if o.requestExporter {
panic("queueing is not available for the new request exporters yet")
}
var queue internal.ProducerConsumerQueue
var queue internal.Queue
if config.Enabled {
if config.StorageID == nil {
queue = internal.NewBoundedMemoryQueue(config.QueueSize, config.NumConsumers)
Expand Down Expand Up @@ -233,13 +236,13 @@ func (be *baseExporter) Start(ctx context.Context, host component.Host) error {

func (be *baseExporter) Shutdown(ctx context.Context) error {
// First shutdown the retry sender, so it can push any pending requests to back the queue.
be.retrySender.shutdown()
err := be.retrySender.shutdown(ctx)

// Then shutdown the queue sender.
be.queueSender.shutdown()
err = multierr.Append(err, be.queueSender.shutdown(ctx))

// Last shutdown the wrapped exporter itself.
return be.ShutdownFunc.Shutdown(ctx)
return multierr.Append(err, be.ShutdownFunc.Shutdown(ctx))
}

func (be *baseExporter) setOnTemporaryFailure(onTemporaryFailure onRequestHandlingFinishedFunc) {
Expand Down
8 changes: 4 additions & 4 deletions exporter/exporterhelper/internal/bounded_memory_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type boundedMemoryQueue struct {
}

// NewBoundedMemoryQueue constructs the new queue of specified capacity. Capacity cannot be 0.
func NewBoundedMemoryQueue(capacity int, numConsumers int) ProducerConsumerQueue {
func NewBoundedMemoryQueue(capacity int, numConsumers int) Queue {
return &boundedMemoryQueue{
items: make(chan Request, capacity),
stopped: &atomic.Bool{},
Expand Down Expand Up @@ -65,12 +65,12 @@ func (q *boundedMemoryQueue) Produce(item Request) bool {
}
}

// Stop stops all consumers, as well as the length reporter if started,
// and releases the items channel. It blocks until all consumers have stopped.
func (q *boundedMemoryQueue) Stop() {
// Shutdown stops accepting items, and stops all consumers. It blocks until all consumers have stopped.
func (q *boundedMemoryQueue) Shutdown(context.Context) error {
q.stopped.Store(true) // disable producer
close(q.items)
q.stopWG.Wait()
return nil
}

// Size returns the current size of the queue
Expand Down
10 changes: 5 additions & 5 deletions exporter/exporterhelper/internal/bounded_memory_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func TestBoundedQueue(t *testing.T) {
consumerState.assertConsumed(expected)
}

q.Stop()
assert.NoError(t, q.Shutdown(context.Background()))
assert.False(t, q.Produce(newStringRequest("x")), "cannot push to closed queue")
}

Expand Down Expand Up @@ -127,7 +127,7 @@ func TestShutdownWhileNotEmpty(t *testing.T) {
q.Produce(newStringRequest("i"))
q.Produce(newStringRequest("j"))

q.Stop()
assert.NoError(t, q.Shutdown(context.Background()))

assert.False(t, q.Produce(newStringRequest("x")), "cannot push to closed queue")
consumerState.assertConsumed(map[string]bool{
Expand Down Expand Up @@ -198,7 +198,7 @@ func queueUsage(b *testing.B, capacity int, numConsumers int, numberOfItems int)
for j := 0; j < numberOfItems; j++ {
q.Produce(newStringRequest(fmt.Sprintf("%d", j)))
}
q.Stop()
assert.NoError(b, q.Shutdown(context.Background()))
}
}

Expand Down Expand Up @@ -255,7 +255,7 @@ func TestZeroSizeWithConsumers(t *testing.T) {

assert.True(t, q.Produce(newStringRequest("a"))) // in process

q.Stop()
assert.NoError(t, q.Shutdown(context.Background()))
}

func TestZeroSizeNoConsumers(t *testing.T) {
Expand All @@ -266,5 +266,5 @@ func TestZeroSizeNoConsumers(t *testing.T) {

assert.False(t, q.Produce(newStringRequest("a"))) // in process

q.Stop()
assert.NoError(t, q.Shutdown(context.Background()))
}
38 changes: 15 additions & 23 deletions exporter/exporterhelper/internal/mock_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,51 +15,42 @@ import (
type mockStorageExtension struct {
component.StartFunc
component.ShutdownFunc
st sync.Map
getClientError error
}

func (m mockStorageExtension) GetClient(_ context.Context, _ component.Kind, _ component.ID, _ string) (storage.Client, error) {
func (m *mockStorageExtension) GetClient(_ context.Context, _ component.Kind, _ component.ID, _ string) (storage.Client, error) {
if m.getClientError != nil {
return nil, m.getClientError
}
return &mockStorageClient{st: map[string][]byte{}}, nil
return &mockStorageClient{st: &m.st}, nil
}

func NewMockStorageExtension(getClientError error) storage.Extension {
return &mockStorageExtension{getClientError: getClientError}
}

type mockStorageClient struct {
st map[string][]byte
mux sync.Mutex
st *sync.Map
closeCounter uint64
}

func (m *mockStorageClient) Get(_ context.Context, s string) ([]byte, error) {
m.mux.Lock()
defer m.mux.Unlock()

val, found := m.st[s]
val, found := m.st.Load(s)
if !found {
return nil, nil
}

return val, nil
return val.([]byte), nil
}

func (m *mockStorageClient) Set(_ context.Context, s string, bytes []byte) error {
m.mux.Lock()
defer m.mux.Unlock()

m.st[s] = bytes
m.st.Store(s, bytes)
return nil
}

func (m *mockStorageClient) Delete(_ context.Context, s string) error {
m.mux.Lock()
defer m.mux.Unlock()

delete(m.st, s)
m.st.Delete(s)
return nil
}

Expand All @@ -69,17 +60,18 @@ func (m *mockStorageClient) Close(_ context.Context) error {
}

func (m *mockStorageClient) Batch(_ context.Context, ops ...storage.Operation) error {
m.mux.Lock()
defer m.mux.Unlock()

for _, op := range ops {
switch op.Type {
case storage.Get:
op.Value = m.st[op.Key]
val, found := m.st.Load(op.Key)
if !found {
break
}
op.Value = val.([]byte)
case storage.Set:
m.st[op.Key] = op.Value
m.st.Store(op.Key, op.Value)
case storage.Delete:
delete(m.st, op.Key)
m.st.Delete(op.Key)
default:
return errors.New("wrong operation type")
}
Expand Down
23 changes: 10 additions & 13 deletions exporter/exporterhelper/internal/persistent_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@ import (

var (
// Monkey patching for unit test
stopStorage = func(queue *persistentQueue) {
if queue.storage != nil {
queue.storage.stop()
stopStorage = func(storage *persistentContiguousStorage, ctx context.Context) error {
if storage == nil {
return nil
}
return storage.stop(ctx)
}
errNoStorageClient = errors.New("no storage client extension found")
errWrongExtensionType = errors.New("requested extension is not a storage extension")
Expand All @@ -27,7 +28,6 @@ var (
// persistentQueue holds the queue backed by file storage
type persistentQueue struct {
stopWG sync.WaitGroup
stopOnce sync.Once
stopChan chan struct{}
storageID component.ID
storage *persistentContiguousStorage
Expand All @@ -45,7 +45,7 @@ func buildPersistentStorageName(name string, signal component.DataType) string {

// NewPersistentQueue creates a new queue backed by file storage; name and signal must be a unique combination that identifies the queue storage
func NewPersistentQueue(capacity int, numConsumers int, storageID component.ID, marshaler RequestMarshaler,
unmarshaler RequestUnmarshaler) ProducerConsumerQueue {
unmarshaler RequestUnmarshaler) Queue {
return &persistentQueue{
capacity: uint64(capacity),
numConsumers: numConsumers,
Expand Down Expand Up @@ -87,14 +87,11 @@ func (pq *persistentQueue) Produce(item Request) bool {
return err == nil
}

// Stop stops accepting items, shuts down the queue and closes the persistent queue
func (pq *persistentQueue) Stop() {
pq.stopOnce.Do(func() {
// stop the consumers before the storage or the successful processing result will fail to write to persistent storage
close(pq.stopChan)
pq.stopWG.Wait()
stopStorage(pq)
})
// Shutdown stops accepting items, shuts down the queue and closes the persistent queue
func (pq *persistentQueue) Shutdown(ctx context.Context) error {
close(pq.stopChan)
pq.stopWG.Wait()
return stopStorage(pq.storage, ctx)
}

// Size returns the current depth of the queue, excluding the item already in the storage channel (if any)
Expand Down
Loading

0 comments on commit fd79d43

Please sign in to comment.