Skip to content

Commit

Permalink
stats: Add optional locality label in cluster_impl picker (#7434)
Browse files Browse the repository at this point in the history
  • Loading branch information
zasweq committed Jul 25, 2024
1 parent 9671c4a commit 1feeaec
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 18 deletions.
9 changes: 8 additions & 1 deletion stats/opentelemetry/client_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,12 @@ func (h *clientStatsHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo)
var labels *istats.Labels
if labels = istats.GetLabels(ctx); labels == nil {
labels = &istats.Labels{
TelemetryLabels: make(map[string]string),
// The defaults for all the per call labels from a plugin that
// executes on the callpath that this OpenTelemetry component
// currently supports.
TelemetryLabels: map[string]string{
"grpc.lb.locality": "",
},
}
ctx = istats.SetLabels(ctx, labels)
}
Expand Down Expand Up @@ -232,6 +237,8 @@ func (h *clientStatsHandler) processRPCEnd(ctx context.Context, ai *attemptInfo,
}

for _, o := range h.options.MetricsOptions.OptionalLabels {
// TODO: Add a filter for converting to unknown if not present in the
// CSM Plugin Option layer by adding an optional labels API.
if val, ok := ai.xdsLabels[o]; ok {
attributes = append(attributes, otelattribute.String(o, val))
}
Expand Down
4 changes: 4 additions & 0 deletions stats/opentelemetry/csm/observability.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ func (o *perTargetDialOption) DialOptionForTarget(parsedTarget url.URL) grpc.Dia

func dialOptionWithCSMPluginOption(options opentelemetry.Options, po otelinternal.PluginOption) grpc.DialOption {
options.MetricsOptions.OptionalLabels = []string{"csm.service_name", "csm.service_namespace_name"} // Attach the two xDS Optional Labels for this component to not filter out.
return dialOptionSetCSM(options, po)
}

func dialOptionSetCSM(options opentelemetry.Options, po otelinternal.PluginOption) grpc.DialOption {
otelinternal.SetPluginOption.(func(options *opentelemetry.Options, po otelinternal.PluginOption))(&options, po)
return opentelemetry.DialOption(options)
}
Expand Down
16 changes: 11 additions & 5 deletions stats/opentelemetry/csm/observability_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,9 +425,12 @@ func (s) TestCSMPluginOptionStreaming(t *testing.T) {
func unaryInterceptorAttachXDSLabels(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
ctx = istats.SetLabels(ctx, &istats.Labels{
TelemetryLabels: map[string]string{
// mock what the cluster impl would write here ("csm." xDS Labels)
// mock what the cluster impl would write here ("csm." xDS Labels
// and locality label)
"csm.service_name": "service_name_val",
"csm.service_namespace_name": "service_namespace_val",

"grpc.lb.locality": "grpc.lb.locality_val",
},
})

Expand All @@ -441,8 +444,9 @@ func unaryInterceptorAttachXDSLabels(ctx context.Context, method string, req, re
// Optional Labels turned on. It then configures an interceptor to attach
// labels, representing the cluster_impl picker. It then makes a unary RPC, and
// expects xDS Labels labels to be attached to emitted relevant metrics. Full
// xDS System alongside OpenTelemetry will be tested with interop. (there is
// a test for xDS -> Stats handler and this tests -> OTel -> emission).
// xDS System alongside OpenTelemetry will be tested with interop. (there is a
// test for xDS -> Stats handler and this tests -> OTel -> emission). It also
// tests the optional per call locality label in the same manner.
func (s) TestXDSLabels(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
Expand All @@ -457,11 +461,11 @@ func (s) TestXDSLabels(t *testing.T) {
}

po := newPluginOption(ctx)
dopts := []grpc.DialOption{dialOptionWithCSMPluginOption(opentelemetry.Options{
dopts := []grpc.DialOption{dialOptionSetCSM(opentelemetry.Options{
MetricsOptions: opentelemetry.MetricsOptions{
MeterProvider: provider,
Metrics: opentelemetry.DefaultMetrics(),
OptionalLabels: []string{"csm.service_name", "csm.service_namespace_name"},
OptionalLabels: []string{"csm.service_name", "csm.service_namespace_name", "grpc.lb.locality"},
},
}, po), grpc.WithUnaryInterceptor(unaryInterceptorAttachXDSLabels)}
if err := ss.Start(nil, dopts...); err != nil {
Expand Down Expand Up @@ -489,6 +493,7 @@ func (s) TestXDSLabels(t *testing.T) {

serviceNameAttr := attribute.String("csm.service_name", "service_name_val")
serviceNamespaceAttr := attribute.String("csm.service_namespace_name", "service_namespace_val")
localityAttr := attribute.String("grpc.lb.locality", "grpc.lb.locality_val")
meshIDAttr := attribute.String("csm.mesh_id", "unknown")
workloadCanonicalServiceAttr := attribute.String("csm.workload_canonical_service", "unknown")
remoteWorkloadTypeAttr := attribute.String("csm.remote_workload_type", "unknown")
Expand All @@ -500,6 +505,7 @@ func (s) TestXDSLabels(t *testing.T) {
unaryStatusAttr,
serviceNameAttr,
serviceNamespaceAttr,
localityAttr,
meshIDAttr,
workloadCanonicalServiceAttr,
remoteWorkloadTypeAttr,
Expand Down
19 changes: 12 additions & 7 deletions test/xds/xds_telemetry_labels_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,12 @@ import (
"google.golang.org/grpc/internal/stubserver"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/internal/testutils/xds/e2e"
testgrpc "google.golang.org/grpc/interop/grpc_testing"
testpb "google.golang.org/grpc/interop/grpc_testing"
"google.golang.org/grpc/stats"

v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
testgrpc "google.golang.org/grpc/interop/grpc_testing"
testpb "google.golang.org/grpc/interop/grpc_testing"
"github.com/google/go-cmp/cmp"
"google.golang.org/protobuf/types/known/structpb"
)

Expand All @@ -43,6 +44,9 @@ const serviceNamespaceKeyCSM = "csm.service_namespace_name"
const serviceNameValue = "grpc-service"
const serviceNamespaceValue = "grpc-service-namespace"

const localityKey = "grpc.lb.locality"
const localityValue = `{"region":"region-1","zone":"zone-1","subZone":"subzone-1"}`

// TestTelemetryLabels tests that telemetry labels from CDS make their way to
// the stats handler. The stats handler sets the mutable context value that the
// cluster impl picker will write telemetry labels to, and then the stats
Expand Down Expand Up @@ -126,13 +130,14 @@ func (fsh *fakeStatsHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
// aren't started. All of these should have access to the desired telemetry
// labels.
case *stats.OutPayload, *stats.InPayload, *stats.End:
if label, ok := fsh.labels.TelemetryLabels[serviceNameKeyCSM]; !ok || label != serviceNameValue {
fsh.t.Fatalf("for telemetry label %v, want: %v, got: %v", serviceNameKeyCSM, serviceNameValue, label)
want := map[string]string{
serviceNameKeyCSM: serviceNameValue,
serviceNamespaceKeyCSM: serviceNamespaceValue,
localityKey: localityValue,
}
if label, ok := fsh.labels.TelemetryLabels[serviceNamespaceKeyCSM]; !ok || label != serviceNamespaceValue {
fsh.t.Fatalf("for telemetry label %v, want: %v, got: %v", serviceNamespaceKeyCSM, serviceNamespaceValue, label)
if diff := cmp.Diff(fsh.labels.TelemetryLabels, want); diff != "" {
fsh.t.Fatalf("fsh.labels.TelemetryLabels (-got +want): %v", diff)
}

default:
// Nothing to assert for the other stats.Handler callouts.
}
Expand Down
25 changes: 20 additions & 5 deletions xds/internal/balancer/clusterimpl/picker.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package clusterimpl

import (
"context"

v3orcapb "github.com/cncf/xds/go/xds/data/orca/v3"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -96,14 +98,23 @@ func (b *clusterImplBalancer) newPicker(config *dropConfigs) *picker {
}
}

func telemetryLabels(ctx context.Context) map[string]string {
if ctx == nil {
return nil
}
labels := stats.GetLabels(ctx)
if labels == nil {
return nil
}
return labels.TelemetryLabels
}

func (d *picker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
// Unconditionally set labels if present, even dropped or queued RPC's can
// use these labels.
if info.Ctx != nil {
if labels := stats.GetLabels(info.Ctx); labels != nil && labels.TelemetryLabels != nil {
for key, value := range d.telemetryLabels {
labels.TelemetryLabels[key] = value
}
if labels := telemetryLabels(info.Ctx); labels != nil {
for key, value := range d.telemetryLabels {
labels[key] = value
}
}

Expand Down Expand Up @@ -156,6 +167,10 @@ func (d *picker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
return pr, err
}

if labels := telemetryLabels(info.Ctx); labels != nil {
labels["grpc.lb.locality"] = lIDStr
}

if d.loadStore != nil {
d.loadStore.CallStarted(lIDStr)
oldDone := pr.Done
Expand Down

0 comments on commit 1feeaec

Please sign in to comment.