Skip to content

Commit

Permalink
[processor/servicegraph]add virtual node feature. (open-telemetry#17350)
Browse files Browse the repository at this point in the history
Add virtual node feature
  • Loading branch information
JaredTan95 committed Feb 27, 2023
1 parent a5aee67 commit 224d7a3
Show file tree
Hide file tree
Showing 6 changed files with 183 additions and 26 deletions.
16 changes: 16 additions & 0 deletions .chloggen/service_graph_processor_add_virtual_node.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: servicegraphprocessor

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: servicegraph support virtual/peer node

# One or more tracking issues related to the change
issues: [17196]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
4 changes: 4 additions & 0 deletions processor/servicegraphprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -137,5 +137,9 @@ service:
exporters: [prometheus/servicegraph]
```

## Features and Feature-Gates

See the [Collector feature gates](https://github.com/open-telemetry/opentelemetry-collector/blob/main/featuregate/README.md#collector-feature-gates) for an overview of feature gates in the collector.

[alpha]: https://github.com/open-telemetry/opentelemetry-collector#alpha
[contrib]: https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib
17 changes: 15 additions & 2 deletions processor/servicegraphprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,30 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/connector"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/featuregate"
"go.opentelemetry.io/collector/processor"
)

const (
// The value of "type" key in configuration.
typeStr = "servicegraph"
// The stability level of the processor.
stability = component.StabilityLevelAlpha
connectorStability = component.StabilityLevelDevelopment
stability = component.StabilityLevelAlpha
connectorStability = component.StabilityLevelDevelopment
virtualNodeFeatureGateID = "processor.servicegraph.virtualNode"
)

var virtualNodeFeatureGate *featuregate.Gate

func init() {
virtualNodeFeatureGate = featuregate.GlobalRegistry().MustRegister(
virtualNodeFeatureGateID,
featuregate.StageAlpha,
featuregate.WithRegisterDescription("When enabled, when the edge expires, processor checks if it has peer attributes(`db.name, net.sock.peer.addr, net.peer.name, rpc.service, http.url, http.target`), and then aggregate the metrics with virtual node."),
featuregate.WithRegisterReferenceURL("https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/17196"),
)
}

// NewFactory creates a factory for the servicegraph processor.
func NewFactory() processor.Factory {
// TODO: Handle this err
Expand Down
4 changes: 4 additions & 0 deletions processor/servicegraphprocessor/internal/store/edge.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ const (
Unknown ConnectionType = ""
MessagingSystem ConnectionType = "messaging_system"
Database ConnectionType = "database"
VirtualNode ConnectionType = "virtual_node"
)

// Edge is an Edge between two nodes in the graph
Expand All @@ -46,13 +47,16 @@ type Edge struct {

// expiration is the time at which the Edge expires, expressed as Unix time
expiration time.Time

Peer map[string]string
}

func newEdge(key Key, ttl time.Duration) *Edge {
return &Edge{
key: key,
Dimensions: make(map[string]string),
expiration: time.Now().Add(ttl),
Peer: make(map[string]string),
}
}

Expand Down
47 changes: 46 additions & 1 deletion processor/servicegraphprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/collector/processor"
semconv "go.opentelemetry.io/collector/semconv/v1.6.1"
semconv "go.opentelemetry.io/collector/semconv/v1.13.0"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/servicegraphprocessor/internal/store"
Expand All @@ -47,6 +47,9 @@ var (
defaultLatencyHistogramBucketsMs = []float64{
2, 4, 6, 8, 10, 50, 100, 200, 400, 800, 1000, 1400, 2000, 5000, 10_000, 15_000,
}
// PeerAttributes the list of attributes need to match, the higher the front, the higher the priority.
// TODO: Consider making this configurable.
PeerAttributes = []string{semconv.AttributeDBName, semconv.AttributeNetSockPeerAddr, semconv.AttributeNetPeerName, semconv.AttributeRPCService, semconv.AttributeHTTPURL, semconv.AttributeHTTPTarget}
)

type metricSeries struct {
Expand Down Expand Up @@ -222,6 +225,10 @@ func (p *serviceGraphProcessor) aggregateMetrics(ctx context.Context, td ptrace.
e.Failed = e.Failed || span.Status().Code() == ptrace.StatusCodeError
p.upsertDimensions(clientKind, e.Dimensions, rAttributes, span.Attributes())

if virtualNodeFeatureGate.IsEnabled() {
p.upsertPeerAttributes(PeerAttributes, e.Peer, span.Attributes())
}

// A database request will only have one span, we don't wait for the server
// span but just copy details from the client span
if dbName, ok := findAttributeValue(semconv.AttributeDBName, rAttributes, span.Attributes()); ok {
Expand Down Expand Up @@ -278,6 +285,15 @@ func (p *serviceGraphProcessor) upsertDimensions(kind string, m map[string]strin
}
}

func (p *serviceGraphProcessor) upsertPeerAttributes(m []string, peers map[string]string, spanAttr pcommon.Map) {
for _, s := range m {
if v, ok := findAttributeValue(s, spanAttr); ok {
peers[s] = v
break
}
}
}

func (p *serviceGraphProcessor) onComplete(e *store.Edge) {
p.logger.Debug(
"edge completed",
Expand All @@ -297,7 +313,25 @@ func (p *serviceGraphProcessor) onExpire(e *store.Edge) {
zap.String("connection_type", string(e.ConnectionType)),
zap.Stringer("trace_id", e.TraceID),
)

stats.Record(context.Background(), statExpiredEdges.M(1))

if virtualNodeFeatureGate.IsEnabled() {
// speculate virtual node before edge get expired.
// TODO: We could add some logic to check if the server span is an orphan.
// https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/17350#discussion_r1099949579
if len(e.ClientService) == 0 {
e.ClientService = "user"
}

if len(e.ServerService) == 0 {
e.ServerService = p.getPeerHost(PeerAttributes, e.Peer)
}

e.ConnectionType = store.VirtualNode

p.onComplete(e)
}
}

func (p *serviceGraphProcessor) aggregateMetricsForEdge(e *store.Edge) {
Expand Down Expand Up @@ -484,6 +518,17 @@ func (p *serviceGraphProcessor) storeExpirationLoop(d time.Duration) {
}
}

func (p *serviceGraphProcessor) getPeerHost(m []string, peers map[string]string) string {
peerStr := "unknown"
for _, s := range m {
if peer, ok := peers[s]; ok {
peerStr = peer
break
}
}
return peerStr
}

// cacheLoop periodically cleans the cache
func (p *serviceGraphProcessor) cacheLoop(d time.Duration) {
t := time.NewTicker(d)
Expand Down
121 changes: 98 additions & 23 deletions processor/servicegraphprocessor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,12 @@ import (
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exportertest"
"go.opentelemetry.io/collector/exporter/otlpexporter"
"go.opentelemetry.io/collector/featuregate"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/collector/processor/processortest"
semconv "go.opentelemetry.io/collector/semconv/v1.6.1"
semconv "go.opentelemetry.io/collector/semconv/v1.13.0"
"go.uber.org/zap/zaptest"
)

Expand Down Expand Up @@ -132,34 +133,64 @@ func TestConnectorShutdown(t *testing.T) {
}

func TestProcessorConsume(t *testing.T) {
// Prepare
cfg := &Config{
MetricsExporter: "mock",
Dimensions: []string{"some-attribute", "non-existing-attribute"},
}

mockMetricsExporter := newMockMetricsExporter(func(md pmetric.Metrics) error {
metricsExporter := newMockMetricsExporter(func(md pmetric.Metrics) error {
return verifyMetrics(t, md)
})
// set virtual node feature
_ = featuregate.GlobalRegistry().Set(virtualNodeFeatureGate.ID(), true)

processor := newProcessor(zaptest.NewLogger(t), cfg)
processor.tracesConsumer = consumertest.NewNop()

mHost := newMockHost(map[component.DataType]map[component.ID]component.Component{
component.DataTypeMetrics: {
component.NewID("mock"): mockMetricsExporter,
for _, tc := range []struct {
name string
cfg Config
sampleTraces ptrace.Traces
}{
{
name: "traces with client and server span",
cfg: Config{
MetricsExporter: "mock",
Dimensions: []string{"some-attribute", "non-existing-attribute"},
}, sampleTraces: buildSampleTrace("val"),
},
})
{
name: "incomplete traces with server span lost",
cfg: Config{
MetricsExporter: "mock",
Dimensions: []string{"some-attribute", "non-existing-attribute"},
},
sampleTraces: incompleteClientTraces(),
},
{
name: "incomplete traces with client span lost",
cfg: Config{
MetricsExporter: "mock",
Dimensions: []string{"some-attribute", "non-existing-attribute"},
},
sampleTraces: incompleteServerTraces(),
},
} {
t.Run(tc.name, func(t *testing.T) {
// Prepare
processor := newProcessor(zaptest.NewLogger(t), &tc.cfg)
processor.tracesConsumer = consumertest.NewNop()
mHost := newMockHost(map[component.DataType]map[component.ID]component.Component{
component.DataTypeMetrics: {
component.NewID("mock"): metricsExporter,
},
})

assert.NoError(t, processor.Start(context.Background(), mHost))
assert.NoError(t, processor.Start(context.Background(), mHost))

// Test & verify
td := buildSampleTrace("val")
// The assertion is part of verifyMetrics func.
assert.NoError(t, processor.ConsumeTraces(context.Background(), td))
// Test & verify
// The assertion is part of verifyMetrics func.
assert.NoError(t, processor.ConsumeTraces(context.Background(), tc.sampleTraces))
time.Sleep(time.Second * 2)
// Shutdown the processor
assert.NoError(t, processor.Shutdown(context.Background()))
})
}

// Shutdown the processor
assert.NoError(t, processor.Shutdown(context.Background()))
// unset virtual node feature
_ = featuregate.GlobalRegistry().Set(virtualNodeFeatureGate.ID(), false)
}

func TestConnectorConsume(t *testing.T) {
Expand Down Expand Up @@ -217,7 +248,7 @@ func verifyCount(t *testing.T, m pmetric.Metric) {
assert.Equal(t, int64(1), dp.IntValue())

attributes := dp.Attributes()
assert.Equal(t, 4, attributes.Len())
assert.Equal(t, 6, attributes.Len())
verifyAttr(t, attributes, "client", "some-service")
verifyAttr(t, attributes, "server", "some-service")
verifyAttr(t, attributes, "failed", "false")
Expand Down Expand Up @@ -289,6 +320,50 @@ func buildSampleTrace(attrValue string) ptrace.Traces {
return traces
}

func incompleteClientTraces() ptrace.Traces {
tStart := time.Date(2022, 1, 2, 3, 4, 5, 6, time.UTC)
tEnd := time.Date(2022, 1, 2, 3, 4, 6, 6, time.UTC)

traces := ptrace.NewTraces()

resourceSpans := traces.ResourceSpans().AppendEmpty()
resourceSpans.Resource().Attributes().PutStr(semconv.AttributeServiceName, "some-client-service")

scopeSpans := resourceSpans.ScopeSpans().AppendEmpty()
anotherTraceID := pcommon.TraceID([16]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16})
anotherClientSpanID := pcommon.SpanID([8]byte{1, 2, 3, 4, 4, 3, 2, 1})
clientSpanNoServerSpan := scopeSpans.Spans().AppendEmpty()
clientSpanNoServerSpan.SetName("client span")
clientSpanNoServerSpan.SetSpanID(anotherClientSpanID)
clientSpanNoServerSpan.SetTraceID(anotherTraceID)
clientSpanNoServerSpan.SetKind(ptrace.SpanKindClient)
clientSpanNoServerSpan.SetStartTimestamp(pcommon.NewTimestampFromTime(tStart))
clientSpanNoServerSpan.SetEndTimestamp(pcommon.NewTimestampFromTime(tEnd))
clientSpanNoServerSpan.Attributes().PutStr(semconv.AttributeNetSockPeerAddr, "127.10.10.1") // Attribute selected as dimension for metrics

return traces
}

func incompleteServerTraces() ptrace.Traces {
tStart := time.Date(2022, 1, 2, 3, 4, 5, 6, time.UTC)
tEnd := time.Date(2022, 1, 2, 3, 4, 6, 6, time.UTC)

traces := ptrace.NewTraces()

resourceSpans := traces.ResourceSpans().AppendEmpty()
resourceSpans.Resource().Attributes().PutStr(semconv.AttributeServiceName, "some-server-service")
scopeSpans := resourceSpans.ScopeSpans().AppendEmpty()
anotherTraceID := pcommon.TraceID([16]byte{1, 2, 3, 4, 5, 6, 7, 8, 8, 7, 6, 5, 4, 3, 2, 1})
serverSpanNoClientSpan := scopeSpans.Spans().AppendEmpty()
serverSpanNoClientSpan.SetName("server span")
serverSpanNoClientSpan.SetSpanID([8]byte{0x19, 0x20, 0x21, 0x22, 0x23, 0x24, 0x25, 0x26})
serverSpanNoClientSpan.SetTraceID(anotherTraceID)
serverSpanNoClientSpan.SetKind(ptrace.SpanKindServer)
serverSpanNoClientSpan.SetStartTimestamp(pcommon.NewTimestampFromTime(tStart))
serverSpanNoClientSpan.SetEndTimestamp(pcommon.NewTimestampFromTime(tEnd))
return traces
}

func newOTLPExporters(t *testing.T) (component.ID, exporter.Metrics, exporter.Traces) {
otlpExpFactory := otlpexporter.NewFactory()
otlpID := component.NewID("otlp")
Expand Down

0 comments on commit 224d7a3

Please sign in to comment.