Skip to content

Commit

Permalink
[loadbalancingexporter] Support the timeout period of k8s resolver li…
Browse files Browse the repository at this point in the history
…st watch can be configured (open-telemetry#31904)

**Link to tracking Issue:** close
open-telemetry#31757

---------

Signed-off-by: Jared Tan <[email protected]>
  • Loading branch information
JaredTan95 committed Mar 26, 2024
1 parent e677a5a commit e360afe
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 9 deletions.
27 changes: 27 additions & 0 deletions .chloggen/support_timeout_configurable_for_k8s_resolver.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: loadbalancingexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Support the timeout period of k8s resolver list watch can be configured.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [31757]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
1 change: 1 addition & 0 deletions exporter/loadbalancingexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ Refer to [config.yaml](./testdata/config.yaml) for detailed examples on using th
* The `k8s` node accepts the following optional properties:
* `service` Kubernetes service to resolve, e.g. `lb-svc.lb-ns`. If no namespace is specified, an attempt will be made to infer the namespace for this collector, and if this fails it will fall back to the `default` namespace.
* `ports` port to be used for exporting the traces to the addresses resolved from `service`. If `ports` is not specified, the default port 4317 is used. When multiple ports are specified, two backends are added to the load balancer as if they were at different pods.
* `timeout` resolver timeout in go-Duration format, e.g. `5s`, `1d`, `30m`. If not specified, `1s` will be used.
* The `awsCloudMap` node accepts the following properties:
* `namespace` The CloudMap namespace where the service is register, e.g. `cloudmap`. If no `namespace` is specified, this will fail to start the Load Balancer exporter.
* `serviceName` The name of the service that you specified when you registered the instance, e.g. `otelcollectors`. If no `serviceName` is specified, this will fail to start the Load Balancer exporter.
Expand Down
5 changes: 3 additions & 2 deletions exporter/loadbalancingexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,9 @@ type DNSResolver struct {

// K8sSvcResolver defines the configuration for the DNS resolver
type K8sSvcResolver struct {
Service string `mapstructure:"service"`
Ports []int32 `mapstructure:"ports"`
Service string `mapstructure:"service"`
Ports []int32 `mapstructure:"ports"`
Timeout time.Duration `mapstructure:"timeout"`
}

type AWSCloudMapResolver struct {
Expand Down
2 changes: 1 addition & 1 deletion exporter/loadbalancingexporter/loadbalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func newLoadBalancer(params exporter.CreateSettings, cfg component.Config, facto
if err != nil {
return nil, err
}
res, err = newK8sResolver(clt, k8sLogger, oCfg.Resolver.K8sSvc.Service, oCfg.Resolver.K8sSvc.Ports)
res, err = newK8sResolver(clt, k8sLogger, oCfg.Resolver.K8sSvc.Service, oCfg.Resolver.K8sSvc.Ports, oCfg.Resolver.K8sSvc.Timeout)
if err != nil {
return nil, err
}
Expand Down
21 changes: 17 additions & 4 deletions exporter/loadbalancingexporter/resolver_k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"strconv"
"strings"
"sync"
"time"

"go.opencensus.io/stats"
"go.opencensus.io/tag"
Expand All @@ -37,6 +38,10 @@ var (
k8sResolverSuccessFalseMutators = []tag.Mutator{k8sResolverMutator, successFalseMutator}
)

const (
defaultListWatchTimeout = 1 * time.Second
)

type k8sResolver struct {
logger *zap.Logger
svcName string
Expand All @@ -48,6 +53,8 @@ type k8sResolver struct {
epsListWatcher cache.ListerWatcher
endpointsStore *sync.Map

lwTimeout time.Duration

endpoints []string
onChangeCallbacks []func([]string)

Expand All @@ -60,12 +67,16 @@ type k8sResolver struct {
func newK8sResolver(clt kubernetes.Interface,
logger *zap.Logger,
service string,
ports []int32) (*k8sResolver, error) {
ports []int32, timeout time.Duration) (*k8sResolver, error) {

if len(service) == 0 {
return nil, errNoSvc
}

if timeout == 0 {
timeout = defaultListWatchTimeout
}

nAddr := strings.SplitN(service, ".", 2)
name, namespace := nAddr[0], "default"
if len(nAddr) > 1 {
Expand All @@ -84,12 +95,12 @@ func newK8sResolver(clt kubernetes.Interface,
epsListWatcher := &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
options.FieldSelector = epsSelector
options.TimeoutSeconds = ptr.To[int64](1)
options.TimeoutSeconds = ptr.To[int64](int64(timeout.Seconds()))
return clt.CoreV1().Endpoints(namespace).List(context.Background(), options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
options.FieldSelector = epsSelector
options.TimeoutSeconds = ptr.To[int64](1)
options.TimeoutSeconds = ptr.To[int64](int64(timeout.Seconds()))
return clt.CoreV1().Endpoints(namespace).Watch(context.Background(), options)
},
}
Expand All @@ -106,6 +117,7 @@ func newK8sResolver(clt kubernetes.Interface,
epsListWatcher: epsListWatcher,
handler: h,
stopCh: make(chan struct{}),
lwTimeout: timeout,
}
h.callback = r.resolve

Expand Down Expand Up @@ -134,7 +146,8 @@ func (r *k8sResolver) start(_ context.Context) error {
r.logger.Debug("K8s service resolver started",
zap.String("service", r.svcName),
zap.String("namespace", r.svcNs),
zap.Int32s("ports", r.port))
zap.Int32s("ports", r.port),
zap.Duration("timeout", r.lwTimeout))
return nil
}

Expand Down
4 changes: 2 additions & 2 deletions exporter/loadbalancingexporter/resolver_k8s_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func TestK8sResolve(t *testing.T) {
}

cl := fake.NewSimpleClientset(endpoint)
res, err := newK8sResolver(cl, zap.NewNop(), service, ports)
res, err := newK8sResolver(cl, zap.NewNop(), service, ports, defaultListWatchTimeout)
require.NoError(t, err)

require.NoError(t, res.start(context.Background()))
Expand Down Expand Up @@ -241,7 +241,7 @@ func Test_newK8sResolver(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := newK8sResolver(fake.NewSimpleClientset(), tt.args.logger, tt.args.service, tt.args.ports)
got, err := newK8sResolver(fake.NewSimpleClientset(), tt.args.logger, tt.args.service, tt.args.ports, defaultListWatchTimeout)
if tt.wantErr != nil {
require.Error(t, err, tt.wantErr)
} else {
Expand Down

0 comments on commit e360afe

Please sign in to comment.