Skip to content

Commit

Permalink
[receiver/hostmetrics] remove direction for disk scraper (open-teleme…
Browse files Browse the repository at this point in the history
…try#12172)

This is the last hostmetrics scraper that needs updating.

Fixes open-telemetry#11816
  • Loading branch information
Alex Boten committed Jul 13, 2022
1 parent 081edac commit d4674d1
Show file tree
Hide file tree
Showing 9 changed files with 807 additions and 61 deletions.
13 changes: 13 additions & 0 deletions receiver/hostmetricsreceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,19 @@ Some host metrics reported are transitioning from being reported with a `directi
direction included in the metric name to adhere to the OpenTelemetry specification
(https://github.com/open-telemetry/opentelemetry-specification/pull/2617):

- `disk` scraper metrics:
- `system.disk.io` will become:
- `system.disk.io.read`
- `system.disk.io.write`
- `system.disk.operations` will become:
- `system.disk.operations.read`
- `system.disk.operations.write`
- `system.disk.operation_time` will become:
- `system.disk.operation_time.read`
- `system.disk.operation_time.write`
- `system.disk.merged` will become:
- `system.disk.merged.read`
- `system.disk.merged.write`
- `network` scraper metrics:
- `system.network.dropped` will become:
- `system.network.dropped.receive`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@ import (
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/receiver/scrapererror"
"go.opentelemetry.io/collector/service/featuregate"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/processor/filterset"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/hostmetricsreceiver/internal"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/hostmetricsreceiver/internal/scraper/diskscraper/internal/metadata"
)

Expand All @@ -48,13 +50,17 @@ type scraper struct {
excludeFS filterset.FilterSet

// for mocking
bootTime func() (uint64, error)
ioCounters func(names ...string) (map[string]disk.IOCountersStat, error)
bootTime func() (uint64, error)
ioCounters func(names ...string) (map[string]disk.IOCountersStat, error)
emitMetricsWithDirectionAttribute bool
emitMetricsWithoutDirectionAttribute bool
}

// newDiskScraper creates a Disk Scraper
func newDiskScraper(_ context.Context, settings component.ReceiverCreateSettings, cfg *Config) (*scraper, error) {
scraper := &scraper{settings: settings, config: cfg, bootTime: host.BootTime, ioCounters: disk.IOCounters}
scraper.emitMetricsWithDirectionAttribute = featuregate.GetRegistry().IsEnabled(internal.EmitMetricsWithDirectionAttributeFeatureGateID)
scraper.emitMetricsWithoutDirectionAttribute = featuregate.GetRegistry().IsEnabled(internal.EmitMetricsWithoutDirectionAttributeFeatureGateID)

var err error

Expand Down Expand Up @@ -110,15 +116,27 @@ func (s *scraper) scrape(_ context.Context) (pmetric.Metrics, error) {

func (s *scraper) recordDiskIOMetric(now pcommon.Timestamp, ioCounters map[string]disk.IOCountersStat) {
for device, ioCounter := range ioCounters {
s.mb.RecordSystemDiskIoDataPoint(now, int64(ioCounter.ReadBytes), device, metadata.AttributeDirectionRead)
s.mb.RecordSystemDiskIoDataPoint(now, int64(ioCounter.WriteBytes), device, metadata.AttributeDirectionWrite)
if s.emitMetricsWithDirectionAttribute {
s.mb.RecordSystemDiskIoDataPoint(now, int64(ioCounter.ReadBytes), device, metadata.AttributeDirectionRead)
s.mb.RecordSystemDiskIoDataPoint(now, int64(ioCounter.WriteBytes), device, metadata.AttributeDirectionWrite)
}
if s.emitMetricsWithoutDirectionAttribute {
s.mb.RecordSystemDiskIoReadDataPoint(now, int64(ioCounter.ReadBytes), device)
s.mb.RecordSystemDiskIoWriteDataPoint(now, int64(ioCounter.WriteBytes), device)
}
}
}

func (s *scraper) recordDiskOperationsMetric(now pcommon.Timestamp, ioCounters map[string]disk.IOCountersStat) {
for device, ioCounter := range ioCounters {
s.mb.RecordSystemDiskOperationsDataPoint(now, int64(ioCounter.ReadCount), device, metadata.AttributeDirectionRead)
s.mb.RecordSystemDiskOperationsDataPoint(now, int64(ioCounter.WriteCount), device, metadata.AttributeDirectionWrite)
if s.emitMetricsWithDirectionAttribute {
s.mb.RecordSystemDiskOperationsDataPoint(now, int64(ioCounter.ReadCount), device, metadata.AttributeDirectionRead)
s.mb.RecordSystemDiskOperationsDataPoint(now, int64(ioCounter.WriteCount), device, metadata.AttributeDirectionWrite)
}
if s.emitMetricsWithoutDirectionAttribute {
s.mb.RecordSystemDiskOperationsReadDataPoint(now, int64(ioCounter.ReadCount), device)
s.mb.RecordSystemDiskOperationsWriteDataPoint(now, int64(ioCounter.WriteCount), device)
}
}
}

Expand All @@ -130,8 +148,14 @@ func (s *scraper) recordDiskIOTimeMetric(now pcommon.Timestamp, ioCounters map[s

func (s *scraper) recordDiskOperationTimeMetric(now pcommon.Timestamp, ioCounters map[string]disk.IOCountersStat) {
for device, ioCounter := range ioCounters {
s.mb.RecordSystemDiskOperationTimeDataPoint(now, float64(ioCounter.ReadTime)/1e3, device, metadata.AttributeDirectionRead)
s.mb.RecordSystemDiskOperationTimeDataPoint(now, float64(ioCounter.WriteTime)/1e3, device, metadata.AttributeDirectionWrite)
if s.emitMetricsWithDirectionAttribute {
s.mb.RecordSystemDiskOperationTimeDataPoint(now, float64(ioCounter.ReadTime)/1e3, device, metadata.AttributeDirectionRead)
s.mb.RecordSystemDiskOperationTimeDataPoint(now, float64(ioCounter.WriteTime)/1e3, device, metadata.AttributeDirectionWrite)
}
if s.emitMetricsWithoutDirectionAttribute {
s.mb.RecordSystemDiskOperationTimeReadDataPoint(now, float64(ioCounter.ReadTime)/1e3, device)
s.mb.RecordSystemDiskOperationTimeWriteDataPoint(now, float64(ioCounter.WriteTime)/1e3, device)
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,13 @@ func (s *scraper) recordDiskWeightedIOTimeMetric(now pcommon.Timestamp, ioCounte

func (s *scraper) recordDiskMergedMetric(now pcommon.Timestamp, ioCounters map[string]disk.IOCountersStat) {
for device, ioCounter := range ioCounters {
s.mb.RecordSystemDiskMergedDataPoint(now, int64(ioCounter.MergedReadCount), device, metadata.AttributeDirectionRead)
s.mb.RecordSystemDiskMergedDataPoint(now, int64(ioCounter.MergedWriteCount), device, metadata.AttributeDirectionWrite)
if s.emitMetricsWithDirectionAttribute {
s.mb.RecordSystemDiskMergedDataPoint(now, int64(ioCounter.MergedReadCount), device, metadata.AttributeDirectionRead)
s.mb.RecordSystemDiskMergedDataPoint(now, int64(ioCounter.MergedWriteCount), device, metadata.AttributeDirectionWrite)
}
if s.emitMetricsWithoutDirectionAttribute {
s.mb.RecordSystemDiskMergedReadDataPoint(now, int64(ioCounter.MergedReadCount), device)
s.mb.RecordSystemDiskMergedWriteDataPoint(now, int64(ioCounter.MergedWriteCount), device)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package diskscraper
import (
"context"
"errors"
"runtime"
"testing"

"github.com/stretchr/testify/assert"
Expand All @@ -39,6 +40,12 @@ func TestScrape(t *testing.T) {
initializationErr string
expectMetrics int
expectedStartTime pcommon.Timestamp
mutateScraper func(*scraper)
}

metricsWithDirection := 3
if runtime.GOOS == "linux" {
metricsWithDirection++
}

testCases := []testCase{
Expand All @@ -47,6 +54,15 @@ func TestScrape(t *testing.T) {
config: Config{Metrics: metadata.DefaultMetricsSettings()},
expectMetrics: metricsLen,
},
{
name: "With direction removed",
config: Config{Metrics: metadata.DefaultMetricsSettings()},
expectMetrics: metricsLen + metricsWithDirection,
mutateScraper: func(s *scraper) {
s.emitMetricsWithDirectionAttribute = false
s.emitMetricsWithoutDirectionAttribute = true
},
},
{
name: "Validate Start Time",
config: Config{Metrics: metadata.DefaultMetricsSettings()},
Expand Down Expand Up @@ -99,6 +115,9 @@ func TestScrape(t *testing.T) {
for _, test := range testCases {
t.Run(test.name, func(t *testing.T) {
scraper, err := newDiskScraper(context.Background(), componenttest.NewNopReceiverCreateSettings(), &test.config)
if test.mutateScraper != nil {
test.mutateScraper(scraper)
}
if test.newErrRegex != "" {
require.Error(t, err)
require.Regexp(t, test.newErrRegex, err)
Expand Down Expand Up @@ -134,17 +153,33 @@ func TestScrape(t *testing.T) {
reportedMetricsCount[metric.Name()]++
switch metric.Name() {
case "system.disk.io":
assertInt64DiskMetricValid(t, metric, test.expectedStartTime)
assertInt64DiskMetricValid(t, metric, true, test.expectedStartTime)
case "system.disk.io.read":
assertInt64DiskMetricValid(t, metric, false, test.expectedStartTime)
case "system.disk.io.write":
assertInt64DiskMetricValid(t, metric, false, test.expectedStartTime)
case "system.disk.io_time":
assertDoubleDiskMetricValid(t, metric, false, test.expectedStartTime)
case "system.disk.operation_time":
assertDoubleDiskMetricValid(t, metric, true, test.expectedStartTime)
case "system.disk.operation_time.read":
assertDoubleDiskMetricValid(t, metric, false, test.expectedStartTime)
case "system.disk.operation_time.write":
assertDoubleDiskMetricValid(t, metric, false, test.expectedStartTime)
case "system.disk.operations":
assertInt64DiskMetricValid(t, metric, test.expectedStartTime)
assertInt64DiskMetricValid(t, metric, true, test.expectedStartTime)
case "system.disk.operations.read":
assertInt64DiskMetricValid(t, metric, false, test.expectedStartTime)
case "system.disk.operations.write":
assertInt64DiskMetricValid(t, metric, false, test.expectedStartTime)
case "system.disk.weighted.io.time":
assertDoubleDiskMetricValid(t, metric, false, test.expectedStartTime)
case "system.disk.merged":
assertInt64DiskMetricValid(t, metric, test.expectedStartTime)
assertInt64DiskMetricValid(t, metric, true, test.expectedStartTime)
case "system.disk.merged.read":
assertInt64DiskMetricValid(t, metric, false, test.expectedStartTime)
case "system.disk.merged.write":
assertInt64DiskMetricValid(t, metric, false, test.expectedStartTime)
case "system.disk.pending_operations":
assertDiskPendingOperationsMetricValid(t, metric)
case "system.disk.weighted_io_time":
Expand All @@ -162,18 +197,24 @@ func TestScrape(t *testing.T) {
}
}

func assertInt64DiskMetricValid(t *testing.T, metric pmetric.Metric, startTime pcommon.Timestamp) {
func assertInt64DiskMetricValid(t *testing.T, metric pmetric.Metric, expectDirectionLabels bool, startTime pcommon.Timestamp) {
if startTime != 0 {
internal.AssertSumMetricStartTimeEquals(t, metric, startTime)
}

assert.GreaterOrEqual(t, metric.Sum().DataPoints().Len(), 2)
expectedDataPointsLen := 2
if !expectDirectionLabels {
expectedDataPointsLen = 1
}
assert.GreaterOrEqual(t, metric.Sum().DataPoints().Len(), expectedDataPointsLen)

internal.AssertSumMetricHasAttribute(t, metric, 0, "device")
internal.AssertSumMetricHasAttributeValue(t, metric, 0, "direction",
pcommon.NewValueString(metadata.AttributeDirectionRead.String()))
internal.AssertSumMetricHasAttributeValue(t, metric, 1, "direction",
pcommon.NewValueString(metadata.AttributeDirectionWrite.String()))
if expectDirectionLabels {
internal.AssertSumMetricHasAttributeValue(t, metric, 0, "direction",
pcommon.NewValueString(metadata.AttributeDirectionRead.String()))
internal.AssertSumMetricHasAttributeValue(t, metric, 1, "direction",
pcommon.NewValueString(metadata.AttributeDirectionWrite.String()))
}
}

func assertDoubleDiskMetricValid(t *testing.T, metric pmetric.Metric, expectDirectionLabels bool, startTime pcommon.Timestamp) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@ import (
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/receiver/scrapererror"
"go.opentelemetry.io/collector/service/featuregate"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/processor/filterset"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/hostmetricsreceiver/internal"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/hostmetricsreceiver/internal/perfcounters"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/hostmetricsreceiver/internal/scraper/diskscraper/internal/metadata"
)
Expand Down Expand Up @@ -61,12 +63,16 @@ type scraper struct {
perfCounterScraper perfcounters.PerfCounterScraper

// for mocking
bootTime func() (uint64, error)
bootTime func() (uint64, error)
emitMetricsWithDirectionAttribute bool
emitMetricsWithoutDirectionAttribute bool
}

// newDiskScraper creates a Disk Scraper
func newDiskScraper(_ context.Context, settings component.ReceiverCreateSettings, cfg *Config) (*scraper, error) {
scraper := &scraper{settings: settings, config: cfg, perfCounterScraper: &perfcounters.PerfLibScraper{}, bootTime: host.BootTime}
scraper.emitMetricsWithDirectionAttribute = featuregate.GetRegistry().IsEnabled(internal.EmitMetricsWithDirectionAttributeFeatureGateID)
scraper.emitMetricsWithoutDirectionAttribute = featuregate.GetRegistry().IsEnabled(internal.EmitMetricsWithoutDirectionAttributeFeatureGateID)

var err error

Expand Down Expand Up @@ -133,15 +139,27 @@ func (s *scraper) scrape(ctx context.Context) (pmetric.Metrics, error) {

func (s *scraper) recordDiskIOMetric(now pcommon.Timestamp, logicalDiskCounterValues []*perfcounters.CounterValues) {
for _, logicalDiskCounter := range logicalDiskCounterValues {
s.mb.RecordSystemDiskIoDataPoint(now, logicalDiskCounter.Values[readBytesPerSec], logicalDiskCounter.InstanceName, metadata.AttributeDirectionRead)
s.mb.RecordSystemDiskIoDataPoint(now, logicalDiskCounter.Values[writeBytesPerSec], logicalDiskCounter.InstanceName, metadata.AttributeDirectionWrite)
if s.emitMetricsWithDirectionAttribute {
s.mb.RecordSystemDiskIoDataPoint(now, logicalDiskCounter.Values[readBytesPerSec], logicalDiskCounter.InstanceName, metadata.AttributeDirectionRead)
s.mb.RecordSystemDiskIoDataPoint(now, logicalDiskCounter.Values[writeBytesPerSec], logicalDiskCounter.InstanceName, metadata.AttributeDirectionWrite)
}
if s.emitMetricsWithoutDirectionAttribute {
s.mb.RecordSystemDiskIoReadDataPoint(now, logicalDiskCounter.Values[readBytesPerSec], logicalDiskCounter.InstanceName)
s.mb.RecordSystemDiskIoWriteDataPoint(now, logicalDiskCounter.Values[writeBytesPerSec], logicalDiskCounter.InstanceName)
}
}
}

func (s *scraper) recordDiskOperationsMetric(now pcommon.Timestamp, logicalDiskCounterValues []*perfcounters.CounterValues) {
for _, logicalDiskCounter := range logicalDiskCounterValues {
s.mb.RecordSystemDiskOperationsDataPoint(now, logicalDiskCounter.Values[readsPerSec], logicalDiskCounter.InstanceName, metadata.AttributeDirectionRead)
s.mb.RecordSystemDiskOperationsDataPoint(now, logicalDiskCounter.Values[writesPerSec], logicalDiskCounter.InstanceName, metadata.AttributeDirectionWrite)
if s.emitMetricsWithDirectionAttribute {
s.mb.RecordSystemDiskOperationsDataPoint(now, logicalDiskCounter.Values[readsPerSec], logicalDiskCounter.InstanceName, metadata.AttributeDirectionRead)
s.mb.RecordSystemDiskOperationsDataPoint(now, logicalDiskCounter.Values[writesPerSec], logicalDiskCounter.InstanceName, metadata.AttributeDirectionWrite)
}
if s.emitMetricsWithoutDirectionAttribute {
s.mb.RecordSystemDiskOperationsReadDataPoint(now, logicalDiskCounter.Values[readsPerSec], logicalDiskCounter.InstanceName)
s.mb.RecordSystemDiskOperationsWriteDataPoint(now, logicalDiskCounter.Values[writesPerSec], logicalDiskCounter.InstanceName)
}
}
}

Expand All @@ -154,8 +172,14 @@ func (s *scraper) recordDiskIOTimeMetric(now pcommon.Timestamp, logicalDiskCount

func (s *scraper) recordDiskOperationTimeMetric(now pcommon.Timestamp, logicalDiskCounterValues []*perfcounters.CounterValues) {
for _, logicalDiskCounter := range logicalDiskCounterValues {
s.mb.RecordSystemDiskOperationTimeDataPoint(now, float64(logicalDiskCounter.Values[avgDiskSecsPerRead])/1e7, logicalDiskCounter.InstanceName, metadata.AttributeDirectionRead)
s.mb.RecordSystemDiskOperationTimeDataPoint(now, float64(logicalDiskCounter.Values[avgDiskSecsPerWrite])/1e7, logicalDiskCounter.InstanceName, metadata.AttributeDirectionWrite)
if s.emitMetricsWithDirectionAttribute {
s.mb.RecordSystemDiskOperationTimeDataPoint(now, float64(logicalDiskCounter.Values[avgDiskSecsPerRead])/1e7, logicalDiskCounter.InstanceName, metadata.AttributeDirectionRead)
s.mb.RecordSystemDiskOperationTimeDataPoint(now, float64(logicalDiskCounter.Values[avgDiskSecsPerWrite])/1e7, logicalDiskCounter.InstanceName, metadata.AttributeDirectionWrite)
}
if s.emitMetricsWithoutDirectionAttribute {
s.mb.RecordSystemDiskOperationTimeReadDataPoint(now, float64(logicalDiskCounter.Values[avgDiskSecsPerRead])/1e7, logicalDiskCounter.InstanceName)
s.mb.RecordSystemDiskOperationTimeWriteDataPoint(now, float64(logicalDiskCounter.Values[avgDiskSecsPerWrite])/1e7, logicalDiskCounter.InstanceName)
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,18 @@ These are the metrics available for this scraper.
| Name | Description | Unit | Type | Attributes |
| ---- | ----------- | ---- | ---- | ---------- |
| **system.disk.io** | Disk bytes transferred. | By | Sum(Int) | <ul> <li>device</li> <li>direction</li> </ul> |
| **system.disk.io.read** | Disk bytes read. | By | Sum(Int) | <ul> <li>device</li> </ul> |
| **system.disk.io.write** | Disk bytes written. | By | Sum(Int) | <ul> <li>device</li> </ul> |
| **system.disk.io_time** | Time disk spent activated. On Windows, this is calculated as the inverse of disk idle time. | s | Sum(Double) | <ul> <li>device</li> </ul> |
| **system.disk.merged** | The number of disk reads merged into single physical disk access operations. | {operations} | Sum(Int) | <ul> <li>device</li> <li>direction</li> </ul> |
| **system.disk.merged** | The number of disk reads/writes merged into single physical disk access operations. | {operations} | Sum(Int) | <ul> <li>device</li> <li>direction</li> </ul> |
| **system.disk.merged.read** | The number of disk reads merged into single physical disk access operations. | {operations} | Sum(Int) | <ul> <li>device</li> </ul> |
| **system.disk.merged.write** | The number of disk writes merged into single physical disk access operations. | {operations} | Sum(Int) | <ul> <li>device</li> </ul> |
| **system.disk.operation_time** | Time spent in disk operations. | s | Sum(Double) | <ul> <li>device</li> <li>direction</li> </ul> |
| **system.disk.operation_time.read** | Time spent in disk reads. | s | Sum(Double) | <ul> <li>device</li> </ul> |
| **system.disk.operation_time.write** | Time spent in disk writes. | s | Sum(Double) | <ul> <li>device</li> </ul> |
| **system.disk.operations** | Disk operations count. | {operations} | Sum(Int) | <ul> <li>device</li> <li>direction</li> </ul> |
| **system.disk.operations.read** | Disk reads count. | {operations} | Sum(Int) | <ul> <li>device</li> </ul> |
| **system.disk.operations.write** | Disk writes count. | {operations} | Sum(Int) | <ul> <li>device</li> </ul> |
| **system.disk.pending_operations** | The queue size of pending I/O operations. | {operations} | Sum(Int) | <ul> <li>device</li> </ul> |
| **system.disk.weighted_io_time** | Time disk spent activated multiplied by the queue length. | s | Sum(Double) | <ul> <li>device</li> </ul> |

Expand Down
Loading

0 comments on commit d4674d1

Please sign in to comment.