Skip to content

Commit

Permalink
[receiver/hostmetricsreceiver] Scrape process delay (open-telemetry#1…
Browse files Browse the repository at this point in the history
…2201)

add configuration option called scrape_process_delay to allow the user to delay collecting process metrics using hostmetricsreceiver until a process has been running for a given amount of time.
  • Loading branch information
davidmirza408 committed Jul 9, 2022
1 parent b9444ea commit cab4e7a
Show file tree
Hide file tree
Showing 6 changed files with 127 additions and 46 deletions.
1 change: 1 addition & 0 deletions receiver/hostmetricsreceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ process:
names: [ <process name>, ... ]
match_type: <strict|regexp>
mute_process_name_error: <true|false>
scrape_process_delay: <time>
```

## Advanced Configuration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package processscraper // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/hostmetricsreceiver/internal/scraper/processscraper"

import (
"time"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/processor/filterset"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/hostmetricsreceiver/internal/scraper/processscraper/internal/metadata"
)
Expand All @@ -33,6 +35,10 @@ type Config struct {
// collector does not have permission for.
// See https://github.com/open-telemetry/opentelemetry-collector/issues/3004 for more information.
MuteProcessNameError bool `mapstructure:"mute_process_name_error,omitempty"`

// ScrapeProcessDelay is used to indicate the minimum amount of time a process must be running
// before metrics are scraped for it. The default value is 0 seconds (0s)
ScrapeProcessDelay time.Duration `mapstructure:"scrape_process_delay"`
}

type MatchConfig struct {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ type processHandle interface {
Times() (*cpu.TimesStat, error)
MemoryInfo() (*process.MemoryInfoStat, error)
IOCounters() (*process.IOCountersStat, error)
CreateTime() (int64, error)
}

type gopsProcessHandles struct {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,12 @@ const (

// scraper for Process Metrics
type scraper struct {
settings component.ReceiverCreateSettings
config *Config
mb *metadata.MetricsBuilder
includeFS filterset.FilterSet
excludeFS filterset.FilterSet

settings component.ReceiverCreateSettings
config *Config
mb *metadata.MetricsBuilder
includeFS filterset.FilterSet
excludeFS filterset.FilterSet
scrapeProcessDelay time.Duration
// for mocking
bootTime func() (uint64, error)
getProcessHandles func() (processHandles, error)
Expand All @@ -64,6 +64,7 @@ func newProcessScraper(settings component.ReceiverCreateSettings, cfg *Config) (
getProcessHandles: getProcessHandlesInternal,
emitMetricsWithDirectionAttribute: featuregate.GetRegistry().IsEnabled(internal.EmitMetricsWithDirectionAttributeFeatureGateID),
emitMetricsWithoutDirectionAttribute: featuregate.GetRegistry().IsEnabled(internal.EmitMetricsWithoutDirectionAttributeFeatureGateID),
scrapeProcessDelay: cfg.ScrapeProcessDelay,
}

var err error
Expand Down Expand Up @@ -170,6 +171,16 @@ func (s *scraper) getProcessMetadata() ([]*processMetadata, error) {
errs.AddPartial(0, fmt.Errorf("error reading username for process %q (pid %v): %w", executable.name, pid, err))
}

createTime, err := handle.CreateTime()
if err != nil {
errs.AddPartial(0, fmt.Errorf("error reading create time for process %q (pid %v): %w", executable.name, pid, err))
// set the start time to now to avoid including this when a scrape_process_delay is set
createTime = time.Now().UnixMilli()
}
if s.scrapeProcessDelay.Milliseconds() > (time.Now().UnixMilli() - createTime) {
continue
}

md := &processMetadata{
pid: pid,
executable: executable,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"runtime"
"testing"
"time"

"github.com/shirou/gopsutil/v3/cpu"
"github.com/shirou/gopsutil/v3/process"
Expand Down Expand Up @@ -304,6 +305,11 @@ func (p *processHandleMock) IOCounters() (*process.IOCountersStat, error) {
return args.Get(0).(*process.IOCountersStat), args.Error(1)
}

func (p *processHandleMock) CreateTime() (int64, error) {
args := p.MethodCalled("CreateTime")
return args.Get(0).(int64), args.Error(1)
}

func newDefaultHandleMock() *processHandleMock {
handleMock := &processHandleMock{}
handleMock.On("Username").Return("username", nil)
Expand All @@ -319,51 +325,81 @@ func TestScrapeMetrics_Filtered(t *testing.T) {
skipTestOnUnsupportedOS(t)

type testCase struct {
name string
names []string
include []string
exclude []string
expectedNames []string
name string
names []string
include []string
exclude []string
upTimeMs []int64
scrapeProcessDelay string
expectedNames []string
}

testCases := []testCase{
{
name: "No Filter",
names: []string{"test1", "test2"},
include: []string{"test*"},
expectedNames: []string{"test1", "test2"},
name: "No Filter",
names: []string{"test1", "test2"},
include: []string{"test*"},
upTimeMs: []int64{5000, 5000},
scrapeProcessDelay: "0s",
expectedNames: []string{"test1", "test2"},
},
{
name: "Include All",
names: []string{"test1", "test2"},
include: []string{"test*"},
upTimeMs: []int64{5000, 5000},
scrapeProcessDelay: "0s",
expectedNames: []string{"test1", "test2"},
},
{
name: "Include All",
names: []string{"test1", "test2"},
include: []string{"test*"},
expectedNames: []string{"test1", "test2"},
name: "Include One",
names: []string{"test1", "test2"},
include: []string{"test1"},
upTimeMs: []int64{5000, 5000},
scrapeProcessDelay: "0s",
expectedNames: []string{"test1"},
},
{
name: "Include One",
names: []string{"test1", "test2"},
include: []string{"test1"},
expectedNames: []string{"test1"},
name: "Exclude All",
names: []string{"test1", "test2"},
exclude: []string{"test*"},
upTimeMs: []int64{5000, 5000},
scrapeProcessDelay: "0s",
expectedNames: []string{},
},
{
name: "Exclude All",
names: []string{"test1", "test2"},
exclude: []string{"test*"},
expectedNames: []string{},
name: "Include & Exclude",
names: []string{"test1", "test2"},
include: []string{"test*"},
exclude: []string{"test2"},
upTimeMs: []int64{5000, 5000},
scrapeProcessDelay: "0s",
expectedNames: []string{"test1"},
},
{
name: "Include & Exclude",
names: []string{"test1", "test2"},
include: []string{"test*"},
exclude: []string{"test2"},
expectedNames: []string{"test1"},
name: "Scrape Process Delay Keep One",
names: []string{"test1", "test2"},
include: []string{"test*"},
upTimeMs: []int64{5000, 50000},
scrapeProcessDelay: "10s",
expectedNames: []string{"test2"},
},
{
name: "Scrape Process Delay Keep Both",
names: []string{"test1", "test2"},
include: []string{"test*"},
upTimeMs: []int64{50000, 50000},
scrapeProcessDelay: "10s",
expectedNames: []string{"test1", "test2"},
},
}

for _, test := range testCases {
t.Run(test.name, func(t *testing.T) {
scrapeProcessDelay, _ := time.ParseDuration(test.scrapeProcessDelay)
config := &Config{
Metrics: metadata.DefaultMetricsSettings(),
Metrics: metadata.DefaultMetricsSettings(),
ScrapeProcessDelay: scrapeProcessDelay,
}

if len(test.include) > 0 {
Expand All @@ -385,10 +421,11 @@ func TestScrapeMetrics_Filtered(t *testing.T) {
require.NoError(t, err, "Failed to initialize process scraper: %v", err)

handles := make([]*processHandleMock, 0, len(test.names))
for _, name := range test.names {
for i, name := range test.names {
handleMock := newDefaultHandleMock()
handleMock.On("Name").Return(name, nil)
handleMock.On("Exe").Return(name, nil)
handleMock.On("CreateTime").Return(time.Now().UnixMilli()-test.upTimeMs[i], nil)
handles = append(handles, handleMock)
}

Expand Down Expand Up @@ -422,6 +459,7 @@ func TestScrapeMetrics_ProcessErrors(t *testing.T) {
timesError error
memoryInfoError error
ioCountersError error
createTimeError error
expectedError string
}

Expand All @@ -447,33 +485,40 @@ func TestScrapeMetrics_ProcessErrors(t *testing.T) {
usernameError: errors.New("err3"),
expectedError: `error reading username for process "test" (pid 1): err3`,
},
{
name: "Create Time Error",
createTimeError: errors.New("err4"),
expectedError: `error reading create time for process "test" (pid 1): err4`,
},
{
name: "Times Error",
timesError: errors.New("err4"),
expectedError: `error reading cpu times for process "test" (pid 1): err4`,
timesError: errors.New("err5"),
expectedError: `error reading cpu times for process "test" (pid 1): err5`,
},
{
name: "Memory Info Error",
memoryInfoError: errors.New("err5"),
expectedError: `error reading memory info for process "test" (pid 1): err5`,
memoryInfoError: errors.New("err6"),
expectedError: `error reading memory info for process "test" (pid 1): err6`,
},
{
name: "IO Counters Error",
ioCountersError: errors.New("err6"),
expectedError: `error reading disk usage for process "test" (pid 1): err6`,
ioCountersError: errors.New("err7"),
expectedError: `error reading disk usage for process "test" (pid 1): err7`,
},
{
name: "Multiple Errors",
cmdlineError: errors.New("err2"),
usernameError: errors.New("err3"),
timesError: errors.New("err4"),
memoryInfoError: errors.New("err5"),
ioCountersError: errors.New("err6"),
createTimeError: errors.New("err4"),
timesError: errors.New("err5"),
memoryInfoError: errors.New("err6"),
ioCountersError: errors.New("err7"),
expectedError: `error reading command for process "test" (pid 1): err2; ` +
`error reading username for process "test" (pid 1): err3; ` +
`error reading cpu times for process "test" (pid 1): err4; ` +
`error reading memory info for process "test" (pid 1): err5; ` +
`error reading disk usage for process "test" (pid 1): err6`,
`error reading create time for process "test" (pid 1): err4; ` +
`error reading cpu times for process "test" (pid 1): err5; ` +
`error reading memory info for process "test" (pid 1): err6; ` +
`error reading disk usage for process "test" (pid 1): err7`,
},
}

Expand Down Expand Up @@ -502,6 +547,7 @@ func TestScrapeMetrics_ProcessErrors(t *testing.T) {
handleMock.On("Times").Return(&cpu.TimesStat{}, test.timesError)
handleMock.On("MemoryInfo").Return(&process.MemoryInfoStat{}, test.memoryInfoError)
handleMock.On("IOCounters").Return(&process.IOCountersStat{}, test.ioCountersError)
handleMock.On("CreateTime").Return(int64(0), test.createTimeError)

scraper.getProcessHandles = func() (processHandles, error) {
return &processHandlesMock{handles: []*processHandleMock{handleMock}}, nil
Expand Down
16 changes: 16 additions & 0 deletions unreleased/hostmetrics-receiver-scrape-process-delay.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: hostmetricsreceiver

# A brief description of the change
note: New config setting scrape_process_delay is used to indicate the minimum amount of time a process must be running before process metrics can be scraped for it. The default value is 0 seconds ("0s").


# One or more tracking issues related to the change
issues: [8976]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
subtext:

0 comments on commit cab4e7a

Please sign in to comment.