Skip to content

Commit

Permalink
[receiver/elasticsearch]: add missing datapoints for operation count …
Browse files Browse the repository at this point in the history
…and operation time metrics (open-telemetry#16170)

* feat: add missing operation count datapoints with total aggregation

* feat: add missing operation count datapoints with primary shards aggregation

* feat: add missing operation time datapoints with total shard aggregation

* feat: add missing operation time datapoints with primary shards aggregations
  • Loading branch information
aboguszewski-sumo committed Nov 18, 2022
1 parent 40e7b46 commit 3c15679
Show file tree
Hide file tree
Showing 8 changed files with 5,111 additions and 496 deletions.
16 changes: 16 additions & 0 deletions .chloggen/elastic-missing-operations.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: elasticsearchreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: add missing data points for operation count and operation time

# One or more tracking issues related to the change
issues: [14635]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
13 changes: 13 additions & 0 deletions receiver/elasticsearchreceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,5 +71,18 @@ This feature gate will eventually be enabled by default, and eventually the old
to give users time to migrate to the new implementation. The target release for this featuregate to be enabled by default
is 0.68.0.

**ALPHA**: `receiver.elasticsearch.emitAllIndexOperationMetrics`

The feature gate `receiver.elasticsearch.emitAllIndexOperationMetrics` once enabled starts emitting metrics `elasticsearch.index.operation.count`
and `elasticsearch.index.operation.time` with all possible data points - for every possible operation type and both shard aggregation types.

Because of the amount of added data points, this change might affect performance for existing users of this receiver.
It is recommended to migrate to the new implementation when possible.
Any new users planning to adopt this receiver should enable this feature gate to avoid risking unexpected slowdowns.

This feature gate will eventually be enabled by default, and eventually the old implementation will be removed. It aims
to give users time to migrate to the new implementation. The target release for this featuregate to be enabled by default
is 0.68.0.

[beta]:https://github.com/open-telemetry/opentelemetry-collector#beta
[contrib]:https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib
139 changes: 138 additions & 1 deletion receiver/elasticsearchreceiver/scraper.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ var (
const (
readmeURL = "https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/receiver/elasticsearchreceiver/README.md"
emitClusterHealthDetailedShardMetricsID = "receiver.elasticsearch.emitClusterHealthDetailedShardMetrics"
emitAllIndexOperationMetricsID = "receiver.elasticsearch.emitAllIndexOperationMetrics"
)

func init() {
Expand All @@ -53,6 +54,11 @@ func init() {
featuregate.StageAlpha,
featuregate.WithRegisterDescription("When enabled, the elasticsearch.cluster.shards metric will be emitted with two more datapoints."),
)
featuregate.GetRegistry().MustRegisterID(
emitAllIndexOperationMetricsID,
featuregate.StageAlpha,
featuregate.WithRegisterDescription("When enabled, the elasticsearch.index.operation.* metrics will be emitted with all possible datapoints."),
)
}

var errUnknownClusterStatus = errors.New("unknown cluster status")
Expand All @@ -67,6 +73,7 @@ type elasticsearchScraper struct {

// Feature gates
emitClusterHealthDetailedShardMetrics bool
emitAllIndexOperationMetrics bool
}

func newElasticSearchScraper(
Expand All @@ -78,6 +85,7 @@ func newElasticSearchScraper(
cfg: cfg,
mb: metadata.NewMetricsBuilder(cfg.Metrics, settings.BuildInfo),
emitClusterHealthDetailedShardMetrics: featuregate.GetRegistry().IsEnabled(emitClusterHealthDetailedShardMetricsID),
emitAllIndexOperationMetrics: featuregate.GetRegistry().IsEnabled(emitAllIndexOperationMetricsID),
}

if !e.emitClusterHealthDetailedShardMetrics {
Expand All @@ -86,6 +94,12 @@ func newElasticSearchScraper(
)
}

if !e.emitAllIndexOperationMetrics {
settings.Logger.Warn(
fmt.Sprintf("Feature gate %s is not enabled. Please see the README for more information: %s", emitAllIndexOperationMetricsID, readmeURL),
)
}

return e
}

Expand Down Expand Up @@ -385,7 +399,7 @@ func (r *elasticsearchScraper) scrapeIndicesMetrics(ctx context.Context, now pco
indexStats, err := r.client.IndexStats(ctx, r.cfg.Indices)

if err != nil {
errs.AddPartial(24, err)
errs.AddPartial(63, err)
return
}

Expand Down Expand Up @@ -415,6 +429,129 @@ func (r *elasticsearchScraper) scrapeOneIndexMetrics(now pcommon.Timestamp, name
now, stats.Total.MergeOperations.TotalTimeInMs, metadata.AttributeOperationMerge, metadata.AttributeIndexAggregationTypeTotal,
)

if r.emitAllIndexOperationMetrics {
r.mb.RecordElasticsearchIndexOperationsCompletedDataPoint(
now, stats.Total.IndexingOperations.IndexTotal, metadata.AttributeOperationIndex, metadata.AttributeIndexAggregationTypeTotal,
)
r.mb.RecordElasticsearchIndexOperationsCompletedDataPoint(
now, stats.Total.IndexingOperations.DeleteTotal, metadata.AttributeOperationDelete, metadata.AttributeIndexAggregationTypeTotal,
)
r.mb.RecordElasticsearchIndexOperationsCompletedDataPoint(
now, stats.Total.GetOperation.Total, metadata.AttributeOperationGet, metadata.AttributeIndexAggregationTypeTotal,
)
r.mb.RecordElasticsearchIndexOperationsCompletedDataPoint(
now, stats.Total.SearchOperations.ScrollTotal, metadata.AttributeOperationScroll, metadata.AttributeIndexAggregationTypeTotal,
)
r.mb.RecordElasticsearchIndexOperationsCompletedDataPoint(
now, stats.Total.SearchOperations.SuggestTotal, metadata.AttributeOperationSuggest, metadata.AttributeIndexAggregationTypeTotal,
)
r.mb.RecordElasticsearchIndexOperationsCompletedDataPoint(
now, stats.Total.MergeOperations.Total, metadata.AttributeOperationMerge, metadata.AttributeIndexAggregationTypeTotal,
)
r.mb.RecordElasticsearchIndexOperationsCompletedDataPoint(
now, stats.Total.RefreshOperations.Total, metadata.AttributeOperationRefresh, metadata.AttributeIndexAggregationTypeTotal,
)
r.mb.RecordElasticsearchIndexOperationsCompletedDataPoint(
now, stats.Total.FlushOperations.Total, metadata.AttributeOperationFlush, metadata.AttributeIndexAggregationTypeTotal,
)
r.mb.RecordElasticsearchIndexOperationsCompletedDataPoint(
now, stats.Total.WarmerOperations.Total, metadata.AttributeOperationWarmer, metadata.AttributeIndexAggregationTypeTotal,
)

r.mb.RecordElasticsearchIndexOperationsCompletedDataPoint(
now, stats.Primaries.SearchOperations.FetchTotal, metadata.AttributeOperationFetch, metadata.AttributeIndexAggregationTypePrimaryShards,
)
r.mb.RecordElasticsearchIndexOperationsCompletedDataPoint(
now, stats.Primaries.SearchOperations.QueryTotal, metadata.AttributeOperationQuery, metadata.AttributeIndexAggregationTypePrimaryShards,
)
r.mb.RecordElasticsearchIndexOperationsCompletedDataPoint(
now, stats.Primaries.IndexingOperations.IndexTotal, metadata.AttributeOperationIndex, metadata.AttributeIndexAggregationTypePrimaryShards,
)
r.mb.RecordElasticsearchIndexOperationsCompletedDataPoint(
now, stats.Primaries.IndexingOperations.DeleteTotal, metadata.AttributeOperationDelete, metadata.AttributeIndexAggregationTypePrimaryShards,
)
r.mb.RecordElasticsearchIndexOperationsCompletedDataPoint(
now, stats.Primaries.GetOperation.Total, metadata.AttributeOperationGet, metadata.AttributeIndexAggregationTypePrimaryShards,
)
r.mb.RecordElasticsearchIndexOperationsCompletedDataPoint(
now, stats.Primaries.SearchOperations.ScrollTotal, metadata.AttributeOperationScroll, metadata.AttributeIndexAggregationTypePrimaryShards,
)
r.mb.RecordElasticsearchIndexOperationsCompletedDataPoint(
now, stats.Primaries.SearchOperations.SuggestTotal, metadata.AttributeOperationSuggest, metadata.AttributeIndexAggregationTypePrimaryShards,
)
r.mb.RecordElasticsearchIndexOperationsCompletedDataPoint(
now, stats.Primaries.MergeOperations.Total, metadata.AttributeOperationMerge, metadata.AttributeIndexAggregationTypePrimaryShards,
)
r.mb.RecordElasticsearchIndexOperationsCompletedDataPoint(
now, stats.Primaries.RefreshOperations.Total, metadata.AttributeOperationRefresh, metadata.AttributeIndexAggregationTypePrimaryShards,
)
r.mb.RecordElasticsearchIndexOperationsCompletedDataPoint(
now, stats.Primaries.FlushOperations.Total, metadata.AttributeOperationFlush, metadata.AttributeIndexAggregationTypePrimaryShards,
)
r.mb.RecordElasticsearchIndexOperationsCompletedDataPoint(
now, stats.Primaries.WarmerOperations.Total, metadata.AttributeOperationWarmer, metadata.AttributeIndexAggregationTypePrimaryShards,
)

r.mb.RecordElasticsearchIndexOperationsTimeDataPoint(
now, stats.Total.IndexingOperations.IndexTimeInMs, metadata.AttributeOperationIndex, metadata.AttributeIndexAggregationTypeTotal,
)
r.mb.RecordElasticsearchIndexOperationsTimeDataPoint(
now, stats.Total.IndexingOperations.DeleteTimeInMs, metadata.AttributeOperationDelete, metadata.AttributeIndexAggregationTypeTotal,
)
r.mb.RecordElasticsearchIndexOperationsTimeDataPoint(
now, stats.Total.GetOperation.TotalTimeInMs, metadata.AttributeOperationGet, metadata.AttributeIndexAggregationTypeTotal,
)
r.mb.RecordElasticsearchIndexOperationsTimeDataPoint(
now, stats.Total.SearchOperations.ScrollTimeInMs, metadata.AttributeOperationScroll, metadata.AttributeIndexAggregationTypeTotal,
)
r.mb.RecordElasticsearchIndexOperationsTimeDataPoint(
now, stats.Total.SearchOperations.SuggestTimeInMs, metadata.AttributeOperationSuggest, metadata.AttributeIndexAggregationTypeTotal,
)
r.mb.RecordElasticsearchIndexOperationsTimeDataPoint(
now, stats.Total.RefreshOperations.TotalTimeInMs, metadata.AttributeOperationRefresh, metadata.AttributeIndexAggregationTypeTotal,
)
r.mb.RecordElasticsearchIndexOperationsTimeDataPoint(
now, stats.Total.FlushOperations.TotalTimeInMs, metadata.AttributeOperationFlush, metadata.AttributeIndexAggregationTypeTotal,
)
r.mb.RecordElasticsearchIndexOperationsTimeDataPoint(
now, stats.Total.WarmerOperations.TotalTimeInMs, metadata.AttributeOperationWarmer, metadata.AttributeIndexAggregationTypeTotal,
)

r.mb.RecordElasticsearchIndexOperationsTimeDataPoint(
now, stats.Primaries.SearchOperations.FetchTimeInMs, metadata.AttributeOperationFetch, metadata.AttributeIndexAggregationTypePrimaryShards,
)
r.mb.RecordElasticsearchIndexOperationsTimeDataPoint(
now, stats.Primaries.SearchOperations.QueryTimeInMs, metadata.AttributeOperationQuery, metadata.AttributeIndexAggregationTypePrimaryShards,
)
r.mb.RecordElasticsearchIndexOperationsTimeDataPoint(
now, stats.Primaries.IndexingOperations.IndexTimeInMs, metadata.AttributeOperationIndex, metadata.AttributeIndexAggregationTypePrimaryShards,
)
r.mb.RecordElasticsearchIndexOperationsTimeDataPoint(
now, stats.Primaries.IndexingOperations.DeleteTimeInMs, metadata.AttributeOperationDelete, metadata.AttributeIndexAggregationTypePrimaryShards,
)
r.mb.RecordElasticsearchIndexOperationsTimeDataPoint(
now, stats.Primaries.GetOperation.TotalTimeInMs, metadata.AttributeOperationGet, metadata.AttributeIndexAggregationTypePrimaryShards,
)
r.mb.RecordElasticsearchIndexOperationsTimeDataPoint(
now, stats.Primaries.SearchOperations.ScrollTimeInMs, metadata.AttributeOperationScroll, metadata.AttributeIndexAggregationTypePrimaryShards,
)
r.mb.RecordElasticsearchIndexOperationsTimeDataPoint(
now, stats.Primaries.SearchOperations.SuggestTimeInMs, metadata.AttributeOperationSuggest, metadata.AttributeIndexAggregationTypePrimaryShards,
)
r.mb.RecordElasticsearchIndexOperationsTimeDataPoint(
now, stats.Primaries.MergeOperations.TotalTimeInMs, metadata.AttributeOperationMerge, metadata.AttributeIndexAggregationTypePrimaryShards,
)
r.mb.RecordElasticsearchIndexOperationsTimeDataPoint(
now, stats.Primaries.RefreshOperations.TotalTimeInMs, metadata.AttributeOperationRefresh, metadata.AttributeIndexAggregationTypePrimaryShards,
)
r.mb.RecordElasticsearchIndexOperationsTimeDataPoint(
now, stats.Primaries.FlushOperations.TotalTimeInMs, metadata.AttributeOperationFlush, metadata.AttributeIndexAggregationTypePrimaryShards,
)
r.mb.RecordElasticsearchIndexOperationsTimeDataPoint(
now, stats.Primaries.WarmerOperations.TotalTimeInMs, metadata.AttributeOperationWarmer, metadata.AttributeIndexAggregationTypePrimaryShards,
)
}

r.mb.RecordElasticsearchIndexOperationsMergeSizeDataPoint(
now, stats.Total.MergeOperations.TotalSizeInBytes, metadata.AttributeIndexAggregationTypeTotal,
)
Expand Down
5 changes: 4 additions & 1 deletion receiver/elasticsearchreceiver/scraper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,10 @@ const noNodesExpectedMetricsPath = "./testdata/expected_metrics/noNodes.json"

func TestMain(m *testing.M) {
// Enable the feature gates before all tests to avoid flaky tests.
_ = featuregate.GetRegistry().Apply(map[string]bool{emitClusterHealthDetailedShardMetricsID: true})
_ = featuregate.GetRegistry().Apply(map[string]bool{
emitClusterHealthDetailedShardMetricsID: true,
emitAllIndexOperationMetricsID: true,
})
code := m.Run()
os.Exit(code)
}
Expand Down
Loading

0 comments on commit 3c15679

Please sign in to comment.