Skip to content

Commit

Permalink
Add uptime & rss self-observability metrics, and fix cpu time to work…
Browse files Browse the repository at this point in the history
… on non-Linux OSs (#1549)
  • Loading branch information
james-bebbington committed Aug 20, 2020
1 parent 1e65674 commit bc898bd
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 24 deletions.
56 changes: 45 additions & 11 deletions internal/collector/telemetry/process_telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,30 @@ import (
"runtime"
"time"

"github.com/prometheus/procfs"
"github.com/shirou/gopsutil/process"
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
)

// ProcessMetricsViews is a struct that contains views related to process metrics (cpu, mem, etc)
type ProcessMetricsViews struct {
prevTimeUnixNano int64
ballastSizeBytes uint64
views []*view.View
done chan struct{}
proc *procfs.Proc
proc *process.Process
}

var mUptime = stats.Float64(
"process/uptime",
"Uptime of the process",
stats.UnitSeconds)
var viewProcessUptime = &view.View{
Name: mUptime.Name(),
Description: mUptime.Description(),
Measure: mUptime,
Aggregation: view.Sum(),
TagKeys: nil,
}

var mRuntimeAllocMem = stats.Int64(
Expand Down Expand Up @@ -81,23 +94,37 @@ var viewCPUSeconds = &view.View{
TagKeys: nil,
}

var mRSSMemory = stats.Int64(
"process/memory/rss",
"Total physical memory (resident set size)",
stats.UnitDimensionless)
var viewRSSMemory = &view.View{
Name: mRSSMemory.Name(),
Description: mRSSMemory.Description(),
Measure: mRSSMemory,
Aggregation: view.LastValue(),
TagKeys: nil,
}

// NewProcessMetricsViews creates a new set of ProcessMetrics (mem, cpu) that can be used to measure
// basic information about this process.
func NewProcessMetricsViews(ballastSizeBytes uint64) *ProcessMetricsViews {
func NewProcessMetricsViews(ballastSizeBytes uint64) (*ProcessMetricsViews, error) {
pmv := &ProcessMetricsViews{
prevTimeUnixNano: time.Now().UnixNano(),
ballastSizeBytes: ballastSizeBytes,
views: []*view.View{viewAllocMem, viewTotalAllocMem, viewSysMem, viewCPUSeconds},
views: []*view.View{viewProcessUptime, viewAllocMem, viewTotalAllocMem, viewSysMem, viewCPUSeconds, viewRSSMemory},
done: make(chan struct{}),
}

// procfs.Proc is not available on windows and expected to fail.
pid := os.Getpid()
proc, err := procfs.NewProc(pid)
if err == nil {
pmv.proc = &proc

var err error
pmv.proc, err = process.NewProcess(int32(pid))
if err != nil {
return nil, err
}

return pmv
return pmv, nil
}

// StartCollection starts a ticker'd goroutine that will update the PMV measurements every 5 seconds
Expand Down Expand Up @@ -127,15 +154,22 @@ func (pmv *ProcessMetricsViews) StopCollection() {
}

func (pmv *ProcessMetricsViews) updateViews() {
now := time.Now().UnixNano()
stats.Record(context.Background(), mUptime.M(float64(now-pmv.prevTimeUnixNano)/1e9))
pmv.prevTimeUnixNano = now

ms := &runtime.MemStats{}
pmv.readMemStats(ms)
stats.Record(context.Background(), mRuntimeAllocMem.M(int64(ms.Alloc)))
stats.Record(context.Background(), mRuntimeTotalAllocMem.M(int64(ms.TotalAlloc)))
stats.Record(context.Background(), mRuntimeSysMem.M(int64(ms.Sys)))

if pmv.proc != nil {
if procStat, err := pmv.proc.Stat(); err == nil {
stats.Record(context.Background(), mCPUSeconds.M(int64(procStat.CPUTime())))
if times, err := pmv.proc.Times(); err == nil {
stats.Record(context.Background(), mCPUSeconds.M(int64(times.Total())))
}
if mem, err := pmv.proc.MemoryInfo(); err == nil {
stats.Record(context.Background(), mRSSMemory.M(int64(mem.RSS)))
}
}
}
Expand Down
26 changes: 14 additions & 12 deletions internal/collector/telemetry/process_telemetry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package telemetry

import (
"runtime"
"testing"
"time"

Expand All @@ -27,17 +26,20 @@ import (
func TestProcessTelemetry(t *testing.T) {
const ballastSizeBytes uint64 = 0

pmv := NewProcessMetricsViews(ballastSizeBytes)
pmv, err := NewProcessMetricsViews(ballastSizeBytes)
require.NoError(t, err)
assert.NotNil(t, pmv)

expectedViews := []string{
// Changing a metric name is a breaking change.
// Adding new metrics is ok as long it follows the conventions described at
// https://pkg.go.dev/go.opentelemetry.io/collector/obsreport?tab=doc#hdr-Naming_Convention_for_New_Metrics
"process/uptime",
"process/runtime/heap_alloc_bytes",
"process/runtime/total_alloc_bytes",
"process/runtime/total_sys_memory_bytes",
"process/cpu_seconds",
"process/memory/rss",
}
processViews := pmv.Views()
assert.Len(t, processViews, len(expectedViews))
Expand All @@ -50,26 +52,26 @@ func TestProcessTelemetry(t *testing.T) {
<-time.After(200 * time.Millisecond)

for _, viewName := range expectedViews {
if (runtime.GOOS == "windows" || runtime.GOOS == "darwin") && viewName == "process/cpu_seconds" {
// "process/cpu_seconds" is not supported on windows or darwin because there is
// no procfs which is used for reading that metric.
continue
}

rows, err := view.RetrieveData(viewName)
require.NoError(t, err, viewName)

require.Len(t, rows, 1, viewName)
row := rows[0]
assert.Len(t, row.Tags, 0)

lastValue := row.Data.(*view.LastValueData)
if viewName == "process/cpu_seconds" {
var value float64
if viewName == "process/uptime" {
value = row.Data.(*view.SumData).Value
} else {
value = row.Data.(*view.LastValueData).Value
}

if viewName == "process/uptime" || viewName == "process/cpu_seconds" {
// This likely will still be zero when running the test.
assert.True(t, lastValue.Value >= 0, viewName)
assert.True(t, value >= 0, viewName)
continue
}

assert.True(t, lastValue.Value > 0, viewName)
assert.True(t, value > 0, viewName)
}
}
6 changes: 5 additions & 1 deletion service/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,18 @@ func (tel *appTelemetry) init(asyncErrorChannel chan<- error, ballastSizeBytes u
return nil
}

processMetricsViews, err := telemetry.NewProcessMetricsViews(ballastSizeBytes)
if err != nil {
return err
}

var views []*view.View
views = append(views, obsreport.Configure(telemetry.UseLegacyMetrics(), telemetry.UseNewMetrics())...)
views = append(views, processor.MetricViews(level)...)
views = append(views, queuedprocessor.MetricViews(level)...)
views = append(views, batchprocessor.MetricViews(level)...)
views = append(views, tailsamplingprocessor.SamplingProcessorMetricViews(level)...)
views = append(views, kafkareceiver.MetricViews()...)
processMetricsViews := telemetry.NewProcessMetricsViews(ballastSizeBytes)
views = append(views, processMetricsViews.Views()...)
views = append(views, fluentobserv.Views(level)...)
tel.views = views
Expand Down

0 comments on commit bc898bd

Please sign in to comment.