Skip to content

Commit

Permalink
K8s Processor: logs support (#1051)
Browse files Browse the repository at this point in the history
  • Loading branch information
pmm-sumo committed Sep 21, 2020
1 parent bd1d51b commit 7730017
Show file tree
Hide file tree
Showing 5 changed files with 135 additions and 22 deletions.
36 changes: 18 additions & 18 deletions processor/k8sprocessor/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,16 @@
// See the License for the specific language governing permissions and
// limitations under the License.

// Package k8sprocessor allow automatic tagging of spans and metrics with k8s metadata.
// Package k8sprocessor allow automatic tagging of spans, metrics and logs with k8s metadata.
//
// The processor automatically discovers k8s resources (pods), extracts metadata from them and adds the
// extracted metadata to the relevant spans and metrics. The processor use the kubernetes API to discover all pods
// running in a cluster, keeps a record of their IP addresses and interesting metadata. Upon receiving spans,
// the processor tries to identify the source IP address of the service that sent the spans and matches
// it with the in memory data. To find a k8s pod producing metrics, the processor looks at "host.hostname"
// resource attribute which is set by prometheus receiver and some metrics instrumentation libraries.
// If a match is found, the cached metadata is added to the spans and metrics as resource attributes.
// extracted metadata to the relevant spans, metrics and logs. The processor use the kubernetes API to discover all pods
// running in a cluster, keeps a record of their IP addresses and interesting metadata. Upon receiving telemetry data,
// the processor looks for presence of well-known resource attributes which might contain IP address ("ip",
// "k8s.pod.ip" for logs, metrics or traces and "host.hostname" for metrics). If this field is not available, or it
// does not contain a valid IP address, the processor tries to identify the source IP address of the service
// that sent the telemetry data.
// If a match is found, the cached metadata is added to the data as resource attributes.
//
// RBAC
//
Expand All @@ -36,8 +37,8 @@
//
// As an agent
//
// When running as an agent, the processor detects IP addresses of pods sending spans or metrics to the agent and uses
// this information to extract metadata from pods. When running as an agent, it is important to apply
// When running as an agent, the processor detects IP addresses of pods sending spans, metrics or logs to the agent
// and uses this information to extract metadata from pods. When running as an agent, it is important to apply
// a discovery filter so that the processor only discovers pods from the same host that it is running on. Not using
// such a filter can result in unnecessary resource usage especially on very large clusters. Once the filter is applied,
// each processor will only query the k8s API for pods running on it's own node.
Expand Down Expand Up @@ -75,10 +76,11 @@
// The processor can be deployed both as an agent or as a collector.
//
// When running as a collector, the processor cannot correctly detect the IP address of the pods generating
// the spans when it receives the spans from an agent instead of receiving them directly from the pods. To
// the telemetry data without any of the well-known IP attributes, when it receives them
// from an agent instead of receiving them directly from the pods. To
// workaround this issue, agents deployed with the k8s_tagger processor can be configured to detect
// the IP addresses and forward them along with the span resources. Collector can then match this IP address
// with k8s pods and enrich the spans with the metadata. In order to set this up, you'll need to complete the
// the IP addresses and forward them along with the telemetry data resources. Collector can then match this IP address
// with k8s pods and enrich the records with the metadata. In order to set this up, you'll need to complete the
// following steps:
//
// 1. Setup agents in passthrough mode
Expand All @@ -88,16 +90,13 @@
// k8s_tagger:
// passthrough: true
//
// This will ensure that the agents detect the IP address as add it as an attribute to all span resources.
// This will ensure that the agents detect the IP address as add it as an attribute to all telemetry resources.
// Agents will not make any k8s API calls, do any discovery of pods or extract any metadata.
//
// 2. Configure the collector as usual
// No special configuration changes are needed to be made on the collector. It'll automatically detect
// the IP address of spans sent by the agents as well as directly by other services/pods.
// the IP address of spans, logs and metrics sent by the agents as well as directly by other services/pods.
//
// This approach is also relevant for metrics data since it's not guaranteed that all the metric formats
// that used to send data from agent to collector preserve "host.hostname" attribute. We need to rely on an additional
// attribute keeping a k8s pod IP value in the passthrough mode.
//
// Caveats
//
Expand All @@ -107,7 +106,8 @@
// Host networking mode
//
// The processor cannot correct identify pods running in the host network mode and
// enriching spans generated by such pods is not supported at the moment.
// enriching telemetry data generated by such pods is not supported at the moment, unless the attributes contain
// information about the source IP.
//
// As a sidecar
//
Expand Down
34 changes: 33 additions & 1 deletion processor/k8sprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ func NewFactory() component.ProcessorFactory {
typeStr,
createDefaultConfig,
processorhelper.WithTraces(createTraceProcessor),
processorhelper.WithMetrics(createMetricsProcessor))
processorhelper.WithMetrics(createMetricsProcessor),
processorhelper.WithLogs(createLogsProcessor),
)
}

func createDefaultConfig() configmodels.Processor {
Expand All @@ -62,6 +64,15 @@ func createTraceProcessor(
return createTraceProcessorWithOptions(ctx, params, cfg, nextTraceConsumer)
}

func createLogsProcessor(
ctx context.Context,
params component.ProcessorCreateParams,
cfg configmodels.Processor,
nextLogsConsumer consumer.LogsConsumer,
) (component.LogsProcessor, error) {
return createLogsProcessorWithOptions(ctx, params, cfg, nextLogsConsumer)
}

func createMetricsProcessor(
ctx context.Context,
params component.ProcessorCreateParams,
Expand Down Expand Up @@ -113,6 +124,27 @@ func createMetricsProcessorWithOptions(
processorhelper.WithShutdown(kp.Shutdown))
}

func createLogsProcessorWithOptions(
_ context.Context,
params component.ProcessorCreateParams,
cfg configmodels.Processor,
nextLogsConsumer consumer.LogsConsumer,
options ...Option,
) (component.LogsProcessor, error) {
kp, err := createKubernetesProcessor(params, cfg, options...)
if err != nil {
return nil, err
}

return processorhelper.NewLogsProcessor(
cfg,
nextLogsConsumer,
kp,
processorhelper.WithCapabilities(processorCapabilities),
processorhelper.WithStart(kp.Start),
processorhelper.WithShutdown(kp.Shutdown))
}

func createKubernetesProcessor(
params component.ProcessorCreateParams,
cfg configmodels.Processor,
Expand Down
8 changes: 8 additions & 0 deletions processor/k8sprocessor/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ func TestCreateProcessor(t *testing.T) {
assert.NotNil(t, mp)
assert.NoError(t, err)

lp, err := factory.CreateLogsProcessor(context.Background(), params, cfg, exportertest.NewNopLogsExporter())
assert.NotNil(t, lp)
assert.NoError(t, err)

oCfg := cfg.(*Config)
oCfg.Passthrough = true

Expand All @@ -60,6 +64,10 @@ func TestCreateProcessor(t *testing.T) {
assert.NotNil(t, mp)
assert.NoError(t, err)

lp, err = factory.CreateLogsProcessor(context.Background(), params, cfg, exportertest.NewNopLogsExporter())
assert.NotNil(t, lp)
assert.NoError(t, err)

// Switch it back so other tests run afterwards will not fail on unexpected state
kubeClientProvider = realClient
}
15 changes: 15 additions & 0 deletions processor/k8sprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,21 @@ func (kp *kubernetesprocessor) ProcessMetrics(ctx context.Context, md pdata.Metr
return md, nil
}

// ProcessLogs process logs and add k8s metadata using resource IP, hostname or incoming IP as pod origin.
func (kp *kubernetesprocessor) ProcessLogs(ctx context.Context, ld pdata.Logs) (pdata.Logs, error) {
rl := ld.ResourceLogs()
for i := 0; i < rl.Len(); i++ {
ls := rl.At(i)
if ls.IsNil() {
continue
}

kp.processResource(ctx, ls.Resource(), k8sIPFromAttributes())
}

return ld, nil
}

func (kp *kubernetesprocessor) processResource(ctx context.Context, resource pdata.Resource, attributeExtractors ...ipExtractor) {
var podIP string

Expand Down
64 changes: 61 additions & 3 deletions processor/k8sprocessor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,17 @@ func newMetricsProcessor(cfg configmodels.Processor, nextMetricsConsumer consume
)
}

func newLogsProcessor(cfg configmodels.Processor, nextLogsConsumer consumer.LogsConsumer, options ...Option) (component.LogsProcessor, error) {
opts := append(options, withKubeClientProvider(newFakeClient))
return createLogsProcessorWithOptions(
context.Background(),
component.ProcessorCreateParams{Logger: zap.NewNop()},
cfg,
nextLogsConsumer,
opts...,
)
}

// withKubeClientProvider sets the specific implementation for getting K8s Client instances
func withKubeClientProvider(kcp kube.ClientProvider) Option {
return func(p *kubernetesprocessor) error {
Expand All @@ -78,12 +89,15 @@ type multiTest struct {

tp component.TraceProcessor
mp component.MetricsProcessor
lp component.LogsProcessor

nextTrace *exportertest.SinkTraceExporter
nextMetrics *exportertest.SinkMetricsExporter
nextLogs *exportertest.SinkLogsExporter

kpTrace *kubernetesprocessor
kpMetrics *kubernetesprocessor
kpTrace *kubernetesprocessor
kpLogs *kubernetesprocessor
}

func newMultiTest(
Expand All @@ -96,6 +110,7 @@ func newMultiTest(
t: t,
nextTrace: &exportertest.SinkTraceExporter{},
nextMetrics: &exportertest.SinkMetricsExporter{},
nextLogs: &exportertest.SinkLogsExporter{},
}

tp, err := newTraceProcessor(cfg, m.nextTrace, append(options, withExtractKubernetesProcessorInto(&m.kpTrace))...)
Expand All @@ -116,20 +131,32 @@ func newMultiTest(
errFunc(err)
}

lp, err := newLogsProcessor(cfg, m.nextLogs, append(options, withExtractKubernetesProcessorInto(&m.kpLogs))...)
if errFunc == nil {
assert.NotNil(t, lp)
require.NoError(t, err)
} else {
assert.Nil(t, lp)
errFunc(err)
}

m.tp = tp
m.mp = mp
m.lp = lp
return m
}

func (m *multiTest) testConsume(
ctx context.Context,
traces pdata.Traces,
metrics pdata.Metrics,
logs pdata.Logs,
errFunc func(err error),
) {
errs := []error{
m.tp.ConsumeTraces(ctx, traces),
m.mp.ConsumeMetrics(ctx, metrics),
m.lp.ConsumeLogs(ctx, logs),
}

for _, err := range errs {
Expand All @@ -142,21 +169,25 @@ func (m *multiTest) testConsume(
func (m *multiTest) kubernetesProcessorOperation(kpOp func(kp *kubernetesprocessor)) {
kpOp(m.kpTrace)
kpOp(m.kpMetrics)
kpOp(m.kpLogs)
}

func (m *multiTest) assertBatchesLen(batchesLen int) {
require.Len(m.t, m.nextTrace.AllTraces(), batchesLen)
require.Len(m.t, m.nextMetrics.AllMetrics(), batchesLen)
require.Len(m.t, m.nextLogs.AllLogs(), batchesLen)
}

func (m *multiTest) assertResourceObjectLen(batchNo int, rsObjectLen int) {
assert.Equal(m.t, m.nextTrace.AllTraces()[batchNo].ResourceSpans().Len(), rsObjectLen)
assert.Equal(m.t, m.nextMetrics.AllMetrics()[batchNo].ResourceMetrics().Len(), rsObjectLen)
assert.Equal(m.t, m.nextLogs.AllLogs()[batchNo].ResourceLogs().Len(), rsObjectLen)
}

func (m *multiTest) assertResourceAttributesLen(batchNo int, resourceObjectNo int, attrsLen int) {
assert.Equal(m.t, m.nextTrace.AllTraces()[batchNo].ResourceSpans().At(resourceObjectNo).Resource().Attributes().Len(), attrsLen)
assert.Equal(m.t, m.nextMetrics.AllMetrics()[batchNo].ResourceMetrics().At(resourceObjectNo).Resource().Attributes().Len(), attrsLen)
assert.Equal(m.t, m.nextLogs.AllLogs()[batchNo].ResourceLogs().At(resourceObjectNo).Resource().Attributes().Len(), attrsLen)
}

func (m *multiTest) assertResource(batchNum int, resourceObjectNum int, resourceFunc func(res pdata.Resource)) {
Expand Down Expand Up @@ -236,6 +267,25 @@ func generateMetrics(resourceFunc ...generateResourceFunc) pdata.Metrics {
return m
}

func generateLogs(resourceFunc ...generateResourceFunc) pdata.Logs {
l := pdata.NewLogs()
ls := l.ResourceLogs()
ls.Resize(1)
ls.At(0).InitEmpty()
ls.At(0).InstrumentationLibraryLogs().Resize(1)
ls.At(0).InstrumentationLibraryLogs().At(0).Logs().Resize(1)
for _, resFun := range resourceFunc {
res := ls.At(0).Resource()
if res.IsNil() {
res.InitEmpty()
}
resFun(res)
}
log := ls.At(0).InstrumentationLibraryLogs().At(0).Logs().At(0)
log.SetName("foobar")
return l
}

func withPassthroughIP(passthroughIP string) generateResourceFunc {
return func(res pdata.Resource) {
res.Attributes().InsertString(k8sIPLabelName, passthroughIP)
Expand All @@ -256,6 +306,7 @@ func TestIPDetectionFromContext(t *testing.T) {
ctx,
generateTraces(),
generateMetrics(),
generateLogs(),
func(err error) {
assert.NoError(t, err)
})
Expand All @@ -274,6 +325,7 @@ func TestNilBatch(t *testing.T) {
context.Background(),
pdata.NewTraces(),
pdata.NewMetrics(),
generateLogs(),
func(err error) {
assert.NoError(t, err)
})
Expand All @@ -300,6 +352,7 @@ func TestProcessorNoAttrs(t *testing.T) {
ctx,
generateTraces(),
generateMetrics(),
generateLogs(),
func(err error) {
assert.NoError(t, err)
})
Expand All @@ -324,6 +377,7 @@ func TestProcessorNoAttrs(t *testing.T) {
ctx,
generateTraces(),
generateMetrics(),
generateLogs(),
func(err error) {
assert.NoError(t, err)
})
Expand All @@ -340,6 +394,7 @@ func TestProcessorNoAttrs(t *testing.T) {
ctx,
generateTraces(),
generateMetrics(),
generateLogs(),
func(err error) {
assert.NoError(t, err)
})
Expand All @@ -356,7 +411,7 @@ func TestNoIP(t *testing.T) {
nil,
)

m.testConsume(context.Background(), generateTraces(), generateMetrics(), nil)
m.testConsume(context.Background(), generateTraces(), generateMetrics(), generateLogs(), nil)

m.assertBatchesLen(1)
m.assertResourceObjectLen(0, 1)
Expand Down Expand Up @@ -406,6 +461,7 @@ func TestIPSource(t *testing.T) {

traces := generateTraces()
metrics := generateMetrics()
logs := generateLogs()

resources := []pdata.Resource{
traces.ResourceSpans().At(0).Resource(),
Expand All @@ -424,7 +480,7 @@ func TestIPSource(t *testing.T) {
}
}

m.testConsume(ctx, traces, metrics, nil)
m.testConsume(ctx, traces, metrics, logs, nil)
m.assertBatchesLen(i + 1)
m.assertResource(i, 0, func(res pdata.Resource) {
require.False(t, res.IsNil())
Expand Down Expand Up @@ -464,6 +520,7 @@ func TestProcessorAddLabels(t *testing.T) {
ctx,
generateTraces(),
generateMetrics(),
generateLogs(),
func(err error) {
assert.NoError(t, err)
})
Expand Down Expand Up @@ -503,6 +560,7 @@ func TestProcessorPicksUpPassthoughPodIp(t *testing.T) {
context.Background(),
generateTraces(withPassthroughIP("2.2.2.2")),
generateMetrics(withPassthroughIP("2.2.2.2")),
generateLogs(withPassthroughIP("2.2.2.2")),
func(err error) {
assert.NoError(t, err)
})
Expand Down

0 comments on commit 7730017

Please sign in to comment.