forked from open-telemetry/opentelemetry-collector-contrib
-
Notifications
You must be signed in to change notification settings - Fork 0
/
provider.go
122 lines (97 loc) · 3.16 KB
/
provider.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
// Package provider contains the cluster name provider
package provider // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datadogexporter/internal/hostmetadata/provider"
import (
"context"
"fmt"
"sync"
"github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/attributes/source"
"go.uber.org/zap"
)
var _ source.Provider = (*chainProvider)(nil)
type chainProvider struct {
logger *zap.Logger
providers map[string]source.Provider
priorityList []string
}
func (p *chainProvider) Source(ctx context.Context) (source.Source, error) {
// Auxiliary type for storing source provider replies
type reply struct {
src source.Source
err error
}
// Cancel all providers when exiting
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// Run all providers in parallel
replies := make([]chan reply, len(p.priorityList))
for i, source := range p.priorityList {
provider := p.providers[source]
replies[i] = make(chan reply)
go func(i int, source string) {
zapProvider := zap.String("provider", source)
p.logger.Debug("Trying out source provider", zapProvider)
src, err := provider.Source(ctx)
if err != nil {
p.logger.Debug("Unavailable source provider", zapProvider, zap.Error(err))
}
replies[i] <- reply{src: src, err: err}
}(i, source)
}
// Check provider responses in order to ensure priority
for i, ch := range replies {
reply := <-ch
if reply.err != nil {
// Provider was unavailable, error was logged on goroutine
continue
}
p.logger.Info("Resolved source",
zap.String("provider", p.priorityList[i]), zap.Any("source", reply.src),
)
return reply.src, nil
}
return source.Source{}, fmt.Errorf("no source provider was available")
}
// Chain providers into a single provider that returns the first available hostname.
func Chain(logger *zap.Logger, providers map[string]source.Provider, priorityList []string) (source.Provider, error) {
for _, source := range priorityList {
if _, ok := providers[source]; !ok {
return nil, fmt.Errorf("%q source is not available in providers", source)
}
}
return &chainProvider{logger: logger, providers: providers, priorityList: priorityList}, nil
}
var _ source.Provider = (*configProvider)(nil)
type configProvider struct {
hostname string
}
func (p *configProvider) Source(context.Context) (source.Source, error) {
if p.hostname == "" {
return source.Source{}, fmt.Errorf("empty configuration hostname")
}
return source.Source{Kind: source.HostnameKind, Identifier: p.hostname}, nil
}
// Config returns fixed hostname.
func Config(hostname string) source.Provider {
return &configProvider{hostname}
}
var _ source.Provider = (*onceProvider)(nil)
type onceProvider struct {
once sync.Once
src source.Source
err error
provider source.Provider
}
func (c *onceProvider) Source(ctx context.Context) (source.Source, error) {
c.once.Do(func() {
c.src, c.err = c.provider.Source(ctx)
})
return c.src, c.err
}
// Once wraps a provider to call it only once.
func Once(provider source.Provider) source.Provider {
return &onceProvider{
provider: provider,
}
}