forked from open-telemetry/opentelemetry-collector-contrib
-
Notifications
You must be signed in to change notification settings - Fork 0
/
inmemory.go
109 lines (90 loc) · 3.16 KB
/
inmemory.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package ackextension // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/ackextension"
import (
"context"
"sync/atomic"
lru "github.com/hashicorp/golang-lru/v2"
"go.opentelemetry.io/collector/component"
)
// inMemoryAckExtension is the in-memory implementation of the AckExtension
// When MaxNumPartition is reached, the acks associated with the least recently used partition are evicted.
// When MaxNumPendingAcksPerPartition is reached, the least recently used ack is evicted
type inMemoryAckExtension struct {
partitionMap *lru.Cache[string, *ackPartition]
maxNumPendingAcksPerPartition uint64
}
func newInMemoryAckExtension(conf *Config) *inMemoryAckExtension {
cache, _ := lru.New[string, *ackPartition](int(conf.MaxNumPartition))
return &inMemoryAckExtension{
partitionMap: cache,
maxNumPendingAcksPerPartition: conf.MaxNumPendingAcksPerPartition,
}
}
type ackPartition struct {
id atomic.Uint64
ackMap *lru.Cache[uint64, bool]
}
func newAckPartition(maxPendingAcks uint64) *ackPartition {
cache, _ := lru.New[uint64, bool](int(maxPendingAcks))
return &ackPartition{
ackMap: cache,
}
}
func (as *ackPartition) nextAck() uint64 {
id := as.id.Add(1)
as.ackMap.Add(id, false)
return id
}
func (as *ackPartition) ack(key uint64) {
if _, ok := as.ackMap.Get(key); ok {
as.ackMap.Add(key, true)
}
}
func (as *ackPartition) computeAcks(ackIDs []uint64) map[uint64]bool {
result := make(map[uint64]bool, len(ackIDs))
for _, val := range ackIDs {
if isAcked, ok := as.ackMap.Get(val); ok && isAcked {
result[val] = true
as.ackMap.Remove(val)
} else {
result[val] = false
}
}
return result
}
// Start of inMemoryAckExtension does nothing and returns nil
func (i *inMemoryAckExtension) Start(_ context.Context, _ component.Host) error {
return nil
}
// Shutdown of inMemoryAckExtension does nothing and returns nil
func (i *inMemoryAckExtension) Shutdown(_ context.Context) error {
return nil
}
// ProcessEvent marks the beginning of processing an event. It generates an ack ID for the associated partition ID.
func (i *inMemoryAckExtension) ProcessEvent(partitionID string) (ackID uint64) {
if val, ok := i.partitionMap.Get(partitionID); ok {
return val.nextAck()
}
i.partitionMap.ContainsOrAdd(partitionID, newAckPartition(i.maxNumPendingAcksPerPartition))
val, _ := i.partitionMap.Get(partitionID)
return val.nextAck()
}
// Ack acknowledges an event has been processed.
func (i *inMemoryAckExtension) Ack(partitionID string, ackID uint64) {
if val, ok := i.partitionMap.Get(partitionID); ok {
val.ack(ackID)
}
}
// QueryAcks checks the statuses of given ackIDs for a partition.
// ackIDs that are not generated from ProcessEvent or have been removed as a result of previous calls to QueryAcks will return false.
func (i *inMemoryAckExtension) QueryAcks(partitionID string, ackIDs []uint64) map[uint64]bool {
if val, ok := i.partitionMap.Get(partitionID); ok {
return val.computeAcks(ackIDs)
}
result := make(map[uint64]bool, len(ackIDs))
for _, ackID := range ackIDs {
result[ackID] = false
}
return result
}