-
Notifications
You must be signed in to change notification settings - Fork 1.4k
/
persistent_queue.go
132 lines (115 loc) · 3.99 KB
/
persistent_queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package internal // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal"
import (
"context"
"errors"
"fmt"
"sync"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/extension/experimental/storage"
)
var (
// Monkey patching for unit test
stopStorage = func(storage *persistentContiguousStorage, ctx context.Context) error {
if storage == nil {
return nil
}
return storage.stop(ctx)
}
errNoStorageClient = errors.New("no storage client extension found")
errWrongExtensionType = errors.New("requested extension is not a storage extension")
)
// persistentQueue holds the queue backed by file storage
type persistentQueue struct {
stopWG sync.WaitGroup
stopChan chan struct{}
storageID component.ID
storage *persistentContiguousStorage
capacity uint64
numConsumers int
marshaler RequestMarshaler
unmarshaler RequestUnmarshaler
}
// buildPersistentStorageName returns a name that is constructed out of queue name and signal type. This is done
// to avoid conflicts between different signals, which require unique persistent storage name
func buildPersistentStorageName(name string, signal component.DataType) string {
return fmt.Sprintf("%s-%s", name, signal)
}
// NewPersistentQueue creates a new queue backed by file storage; name and signal must be a unique combination that identifies the queue storage
func NewPersistentQueue(capacity int, numConsumers int, storageID component.ID, marshaler RequestMarshaler,
unmarshaler RequestUnmarshaler) Queue {
return &persistentQueue{
capacity: uint64(capacity),
numConsumers: numConsumers,
storageID: storageID,
marshaler: marshaler,
unmarshaler: unmarshaler,
stopChan: make(chan struct{}),
}
}
// Start starts the persistentQueue with the given number of consumers.
func (pq *persistentQueue) Start(ctx context.Context, host component.Host, set QueueSettings) error {
storageClient, err := toStorageClient(ctx, pq.storageID, host, set.ID, set.DataType)
if err != nil {
return err
}
storageName := buildPersistentStorageName(set.ID.Name(), set.DataType)
pq.storage = newPersistentContiguousStorage(ctx, storageName, storageClient, set.Logger, pq.capacity, pq.marshaler, pq.unmarshaler)
for i := 0; i < pq.numConsumers; i++ {
pq.stopWG.Add(1)
go func() {
defer pq.stopWG.Done()
for {
select {
case req := <-pq.storage.get():
set.Callback(req)
case <-pq.stopChan:
return
}
}
}()
}
return nil
}
// Produce adds an item to the queue and returns true if it was accepted
func (pq *persistentQueue) Produce(item Request) bool {
err := pq.storage.put(item)
return err == nil
}
// Shutdown stops accepting items, shuts down the queue and closes the persistent queue
func (pq *persistentQueue) Shutdown(ctx context.Context) error {
close(pq.stopChan)
pq.stopWG.Wait()
return stopStorage(pq.storage, ctx)
}
// Size returns the current depth of the queue, excluding the item already in the storage channel (if any)
func (pq *persistentQueue) Size() int {
return int(pq.storage.size())
}
func (pq *persistentQueue) Capacity() int {
return int(pq.capacity)
}
func (pq *persistentQueue) IsPersistent() bool {
return true
}
func toStorageClient(ctx context.Context, storageID component.ID, host component.Host, ownerID component.ID, signal component.DataType) (storage.Client, error) {
extension, err := getStorageExtension(host.GetExtensions(), storageID)
if err != nil {
return nil, err
}
client, err := extension.GetClient(ctx, component.KindExporter, ownerID, string(signal))
if err != nil {
return nil, err
}
return client, err
}
func getStorageExtension(extensions map[component.ID]component.Component, storageID component.ID) (storage.Extension, error) {
if ext, found := extensions[storageID]; found {
if storageExt, ok := ext.(storage.Extension); ok {
return storageExt, nil
}
return nil, errWrongExtensionType
}
return nil, errNoStorageClient
}