Skip to content

Commit

Permalink
[receiver/elasticsearch]: add scraping indices (open-telemetry#14748)
Browse files Browse the repository at this point in the history
  • Loading branch information
aboguszewski-sumo authored and shalper2 committed Dec 6, 2022
1 parent 85170bf commit 42003a1
Show file tree
Hide file tree
Showing 24 changed files with 2,581 additions and 82 deletions.
5 changes: 5 additions & 0 deletions .chloggen/elasticsearch-receiver-add-indices.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
change_type: enhancement
component: elasticsearchreceiver
note: Add scraping index stats and generate search metrics
issues: [14635]

4 changes: 3 additions & 1 deletion receiver/elasticsearchreceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
| Supported pipeline types | metrics |
| Distributions | [contrib] |

This receiver queries the Elasticsearch [node stats](https://www.elastic.co/guide/en/elasticsearch/reference/current/cluster-nodes-stats.html) and [cluster health](https://www.elastic.co/guide/en/elasticsearch/reference/current/cluster-health.html) endpoints in order to scrape metrics from a running elasticsearch cluster.
This receiver queries the Elasticsearch [node stats](https://www.elastic.co/guide/en/elasticsearch/reference/current/cluster-nodes-stats.html), [cluster health](https://www.elastic.co/guide/en/elasticsearch/reference/current/cluster-health.html) and [index stats](https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-stats.html) endpoints in order to scrape metrics from a running elasticsearch cluster.

## Prerequisites

Expand All @@ -21,6 +21,7 @@ The following settings are optional:
- `metrics` (default: see `DefaultMetricsSettings` [here](./internal/metadata/generated_metrics.go): Allows enabling and disabling specific metrics from being collected in this receiver.
- `nodes` (default: `["_all"]`): Allows specifying node filters that define which nodes are scraped for node-level metrics. See [the Elasticsearch documentation](https://www.elastic.co/guide/en/elasticsearch/reference/7.9/cluster.html#cluster-nodes) for allowed filters. If this option is left explicitly empty, then no node-level metrics will be scraped.
- `skip_cluster_metrics` (default: `false`): If true, cluster-level metrics will not be scraped.
- `indices` (default: `["_all"]`): Allows specifying index filters that define which indices are scraped for index-level metrics. See [the Elasticsearch documentation](https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-stats.html#index-stats-api-path-params) for allowed filters. If this option is left explicitly empty, then no index-level metrics will be scraped.
- `endpoint` (default = `http:https://localhost:9200`): The base URL of the Elasticsearch API for the cluster to monitor.
- `username` (no default): Specifies the username used to authenticate with Elasticsearch using basic auth. Must be specified if password is specified.
- `password` (no default): Specifies the password used to authenticate with Elasticsearch using basic auth. Must be specified if username is specified.
Expand All @@ -36,6 +37,7 @@ receivers:
enabled: false
nodes: ["_local"]
skip_cluster_metrics: true
indices: [".geoip_databases"]
endpoint: http:https://localhost:9200
username: otel
password: password
Expand Down
30 changes: 27 additions & 3 deletions receiver/elasticsearchreceiver/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ var (
type elasticsearchClient interface {
NodeStats(ctx context.Context, nodes []string) (*model.NodeStats, error)
ClusterHealth(ctx context.Context) (*model.ClusterHealth, error)
Version(ctx context.Context) (*model.VersionResponse, error)
IndexStats(ctx context.Context, indices []string) (*model.IndexStats, error)
ClusterMetadata(ctx context.Context) (*model.ClusterMetadataResponse, error)
}

// defaultElasticsearchClient is the main implementation of elasticsearchClient.
Expand Down Expand Up @@ -88,6 +89,8 @@ const nodeStatsMetrics = "breaker,indices,process,jvm,thread_pool,transport,http
// nodeStatsIndexMetrics is a comma separated list of index metrics that will be gathered from NodeStats.
const nodeStatsIndexMetrics = "store,docs,indexing,get,search,merge,refresh,flush,warmer,query_cache,fielddata,translog"

const indexStatsMetrics = "search"

func (c defaultElasticsearchClient) NodeStats(ctx context.Context, nodes []string) (*model.NodeStats, error) {
var nodeSpec string
if len(nodes) > 0 {
Expand Down Expand Up @@ -119,13 +122,34 @@ func (c defaultElasticsearchClient) ClusterHealth(ctx context.Context) (*model.C
return &clusterHealth, err
}

func (c defaultElasticsearchClient) Version(ctx context.Context) (*model.VersionResponse, error) {
func (c defaultElasticsearchClient) IndexStats(ctx context.Context, indices []string) (*model.IndexStats, error) {
var indexSpec string
if len(indices) > 0 {
indexSpec = strings.Join(indices, ",")
} else {
indexSpec = "_all"
}

indexStatsPath := fmt.Sprintf("%s/_stats/%s", indexSpec, indexStatsMetrics)

body, err := c.doRequest(ctx, indexStatsPath)
if err != nil {
return nil, err
}

indexStats := model.IndexStats{}
err = json.Unmarshal(body, &indexStats)

return &indexStats, err
}

func (c defaultElasticsearchClient) ClusterMetadata(ctx context.Context) (*model.ClusterMetadataResponse, error) {
body, err := c.doRequest(ctx, "")
if err != nil {
return nil, err
}

versionResponse := model.VersionResponse{}
versionResponse := model.ClusterMetadataResponse{}
err = json.Unmarshal(body, &versionResponse)
return &versionResponse, err
}
Expand Down
157 changes: 138 additions & 19 deletions receiver/elasticsearchreceiver/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,12 +236,12 @@ func TestClusterHealthNoAuthorization(t *testing.T) {
require.ErrorIs(t, err, errUnauthorized)
}

func TestVersionNoPassword(t *testing.T) {
versionJSON, err := os.ReadFile("./testdata/sample_payloads/version.json")
func TestMetadataNoPassword(t *testing.T) {
metadataJSON, err := os.ReadFile("./testdata/sample_payloads/metadata.json")
require.NoError(t, err)

actualVersion := model.VersionResponse{}
require.NoError(t, json.Unmarshal(versionJSON, &actualVersion))
actualMetadata := model.ClusterMetadataResponse{}
require.NoError(t, json.Unmarshal(metadataJSON, &actualMetadata))

elasticsearchMock := mockServer(t, "", "")
defer elasticsearchMock.Close()
Expand All @@ -254,18 +254,18 @@ func TestVersionNoPassword(t *testing.T) {
require.NoError(t, err)

ctx := context.Background()
version, err := client.Version(ctx)
metadata, err := client.ClusterMetadata(ctx)
require.NoError(t, err)

require.Equal(t, &actualVersion, version)
require.Equal(t, &actualMetadata, metadata)
}

func TestVersionAuthentication(t *testing.T) {
versionJSON, err := os.ReadFile("./testdata/sample_payloads/version.json")
func TestMetadataAuthentication(t *testing.T) {
metadataJSON, err := os.ReadFile("./testdata/sample_payloads/metadata.json")
require.NoError(t, err)

actualVersion := model.VersionResponse{}
require.NoError(t, json.Unmarshal(versionJSON, &actualVersion))
actualMetadata := model.ClusterMetadataResponse{}
require.NoError(t, json.Unmarshal(metadataJSON, &actualMetadata))

username := "user"
password := "pass"
Expand All @@ -283,13 +283,13 @@ func TestVersionAuthentication(t *testing.T) {
require.NoError(t, err)

ctx := context.Background()
version, err := client.Version(ctx)
metadata, err := client.ClusterMetadata(ctx)
require.NoError(t, err)

require.Equal(t, &actualVersion, version)
require.Equal(t, &actualMetadata, metadata)
}

func TestVersionNoAuthentication(t *testing.T) {
func TestMetadataNoAuthentication(t *testing.T) {
elasticsearchMock := mockServer(t, "user", "pass")
defer elasticsearchMock.Close()

Expand All @@ -301,11 +301,11 @@ func TestVersionNoAuthentication(t *testing.T) {
require.NoError(t, err)

ctx := context.Background()
_, err = client.Version(ctx)
_, err = client.ClusterMetadata(ctx)
require.ErrorIs(t, err, errUnauthenticated)
}

func TestVersionNoAuthorization(t *testing.T) {
func TestMetadataNoAuthorization(t *testing.T) {
elasticsearchMock := mockServer(t, "user", "pass")
defer elasticsearchMock.Close()

Expand All @@ -319,7 +319,7 @@ func TestVersionNoAuthorization(t *testing.T) {
require.NoError(t, err)

ctx := context.Background()
_, err = client.Version(ctx)
_, err = client.ClusterMetadata(ctx)
require.ErrorIs(t, err, errUnauthorized)
}

Expand Down Expand Up @@ -366,14 +366,126 @@ func TestDoRequest404(t *testing.T) {
require.Contains(t, err.Error(), "404")
}

func TestIndexStatsNoPassword(t *testing.T) {
indexJSON, err := os.ReadFile("./testdata/sample_payloads/indices.json")
require.NoError(t, err)

actualIndexStats := model.IndexStats{}
require.NoError(t, json.Unmarshal(indexJSON, &actualIndexStats))

elasticsearchMock := mockServer(t, "", "")
defer elasticsearchMock.Close()

client, err := newElasticsearchClient(componenttest.NewNopTelemetrySettings(), Config{
HTTPClientSettings: confighttp.HTTPClientSettings{
Endpoint: elasticsearchMock.URL,
},
}, componenttest.NewNopHost())
require.NoError(t, err)
ctx := context.Background()
indexStats, err := client.IndexStats(ctx, []string{"_all"})
require.NoError(t, err)

require.Equal(t, &actualIndexStats, indexStats)
}

func TestIndexStatsNilNodes(t *testing.T) {
indexJSON, err := os.ReadFile("./testdata/sample_payloads/indices.json")
require.NoError(t, err)

actualIndexStats := model.IndexStats{}
require.NoError(t, json.Unmarshal(indexJSON, &actualIndexStats))

elasticsearchMock := mockServer(t, "", "")
defer elasticsearchMock.Close()

client, err := newElasticsearchClient(componenttest.NewNopTelemetrySettings(), Config{
HTTPClientSettings: confighttp.HTTPClientSettings{
Endpoint: elasticsearchMock.URL,
},
}, componenttest.NewNopHost())
require.NoError(t, err)

ctx := context.Background()
indexStats, err := client.IndexStats(ctx, nil)
require.NoError(t, err)

require.Equal(t, &actualIndexStats, indexStats)
}

func TestIndexStatsAuthentication(t *testing.T) {
indexJSON, err := os.ReadFile("./testdata/sample_payloads/indices.json")
require.NoError(t, err)

actualIndexStats := model.IndexStats{}
require.NoError(t, json.Unmarshal(indexJSON, &actualIndexStats))

username := "user"
password := "pass"

elasticsearchMock := mockServer(t, username, password)
defer elasticsearchMock.Close()

client, err := newElasticsearchClient(componenttest.NewNopTelemetrySettings(), Config{
HTTPClientSettings: confighttp.HTTPClientSettings{
Endpoint: elasticsearchMock.URL,
},
Username: username,
Password: password,
}, componenttest.NewNopHost())
require.NoError(t, err)

ctx := context.Background()
indexStats, err := client.IndexStats(ctx, []string{"_all"})
require.NoError(t, err)

require.Equal(t, &actualIndexStats, indexStats)
}

func TestIndexStatsNoAuthentication(t *testing.T) {
elasticsearchMock := mockServer(t, "user", "pass")
defer elasticsearchMock.Close()

client, err := newElasticsearchClient(componenttest.NewNopTelemetrySettings(), Config{
HTTPClientSettings: confighttp.HTTPClientSettings{
Endpoint: elasticsearchMock.URL,
},
}, componenttest.NewNopHost())
require.NoError(t, err)

ctx := context.Background()
_, err = client.IndexStats(ctx, []string{"_all"})
require.ErrorIs(t, err, errUnauthenticated)
}

func TestIndexStatsBadAuthentication(t *testing.T) {
elasticsearchMock := mockServer(t, "user", "pass")
defer elasticsearchMock.Close()

client, err := newElasticsearchClient(componenttest.NewNopTelemetrySettings(), Config{
HTTPClientSettings: confighttp.HTTPClientSettings{
Endpoint: elasticsearchMock.URL,
},
Username: "bad_user",
Password: "bad_pass",
}, componenttest.NewNopHost())
require.NoError(t, err)

ctx := context.Background()
_, err = client.IndexStats(ctx, []string{"_all"})
require.ErrorIs(t, err, errUnauthorized)
}

// mockServer gives a mock elasticsearch server for testing; if username or password is included, they will be required for the client.
// otherwise, authorization is ignored.
func mockServer(t *testing.T, username, password string) *httptest.Server {
nodes, err := os.ReadFile("./testdata/sample_payloads/nodes_linux.json")
require.NoError(t, err)
indices, err := os.ReadFile("./testdata/sample_payloads/indices.json")
require.NoError(t, err)
health, err := os.ReadFile("./testdata/sample_payloads/health.json")
require.NoError(t, err)
version, err := os.ReadFile("./testdata/sample_payloads/version.json")
metadata, err := os.ReadFile("./testdata/sample_payloads/metadata.json")
require.NoError(t, err)

elasticsearchMock := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
Expand All @@ -395,17 +507,24 @@ func mockServer(t *testing.T, username, password string) *httptest.Server {
return
}

if strings.HasPrefix(req.URL.Path, "/_all/_stats") {
rw.WriteHeader(200)
_, err = rw.Write(indices)
require.NoError(t, err)
return
}

if strings.HasPrefix(req.URL.Path, "/_cluster/health") {
rw.WriteHeader(200)
_, err = rw.Write(health)
require.NoError(t, err)
return
}

// version check
// metadata check
if req.URL.Path == "/" {
rw.WriteHeader(200)
_, err = rw.Write(version)
_, err = rw.Write(metadata)
require.NoError(t, err)
return
}
Expand Down
5 changes: 5 additions & 0 deletions receiver/elasticsearchreceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ type Config struct {
Nodes []string `mapstructure:"nodes"`
// SkipClusterMetrics indicates whether cluster level metrics from /_cluster/health should be scraped or not.
SkipClusterMetrics bool `mapstructure:"skip_cluster_metrics"`
// Indices defines the indices to scrape.
// See https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-stats.html#index-stats-api-path-params
// for which names are viable.
// If Indices is empty, no indices will be scraped.
Indices []string `mapstructure:"indices"`
// Username is the username used when making REST calls to elasticsearch. Must be specified if Password is. Not required.
Username string `mapstructure:"username"`
// Password is the password used when making REST calls to elasticsearch. Must be specified if Username is. Not required.
Expand Down
1 change: 1 addition & 0 deletions receiver/elasticsearchreceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ func TestLoadConfig(t *testing.T) {
expected: &Config{
SkipClusterMetrics: true,
Nodes: []string{"_local"},
Indices: []string{".geoip_databases"},
ScraperControllerSettings: scraperhelper.ScraperControllerSettings{
ReceiverSettings: config.NewReceiverSettings(config.NewComponentID(typeStr)),
CollectionInterval: 2 * time.Minute,
Expand Down
8 changes: 6 additions & 2 deletions receiver/elasticsearchreceiver/documentation.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ These are the metrics available for this scraper.
| **elasticsearch.cluster.state_queue** | Number of cluster states in queue. | 1 | Sum(Int) | <ul> <li>cluster_state_queue_state</li> </ul> |
| **elasticsearch.cluster.state_update.count** | The number of cluster state update attempts that changed the cluster state since the node started. | 1 | Sum(Int) | <ul> <li>cluster_state_update_state</li> </ul> |
| **elasticsearch.cluster.state_update.time** | The cumulative amount of time updating the cluster state since the node started. | ms | Sum(Int) | <ul> <li>cluster_state_update_state</li> <li>cluster_state_update_type</li> </ul> |
| **elasticsearch.index.operations.completed** | The number of operations completed for an index. | {operations} | Sum(Int) | <ul> <li>operation</li> <li>index_aggregation_type</li> </ul> |
| **elasticsearch.index.operations.time** | Time spent on operations for an index. | ms | Sum(Int) | <ul> <li>operation</li> <li>index_aggregation_type</li> </ul> |
| **elasticsearch.indexing_pressure.memory.limit** | Configured memory limit, in bytes, for the indexing requests. | By | Gauge(Int) | <ul> </ul> |
| **elasticsearch.indexing_pressure.memory.total.primary_rejections** | Cumulative number of indexing requests rejected in the primary stage. | 1 | Sum(Int) | <ul> </ul> |
| **elasticsearch.indexing_pressure.memory.total.replica_rejections** | Number of indexing requests rejected in the replica stage. | 1 | Sum(Int) | <ul> </ul> |
Expand All @@ -44,8 +46,8 @@ These are the metrics available for this scraper.
| **elasticsearch.node.ingest.documents.current** | Total number of documents currently being ingested. | {documents} | Sum(Int) | <ul> </ul> |
| **elasticsearch.node.ingest.operations.failed** | Total number of failed ingest operations during the lifetime of this node. | {operation} | Sum(Int) | <ul> </ul> |
| **elasticsearch.node.open_files** | The number of open file descriptors held by the node. | {files} | Sum(Int) | <ul> </ul> |
| **elasticsearch.node.operations.completed** | The number of operations completed. | {operations} | Sum(Int) | <ul> <li>operation</li> </ul> |
| **elasticsearch.node.operations.time** | Time spent on operations. | ms | Sum(Int) | <ul> <li>operation</li> </ul> |
| **elasticsearch.node.operations.completed** | The number of operations completed by a node. | {operations} | Sum(Int) | <ul> <li>operation</li> </ul> |
| **elasticsearch.node.operations.time** | Time spent on operations by a node. | ms | Sum(Int) | <ul> <li>operation</li> </ul> |
| **elasticsearch.node.pipeline.ingest.documents.current** | Total number of documents currently being ingested by a pipeline. | {documents} | Sum(Int) | <ul> <li>ingest_pipeline_name</li> </ul> |
| **elasticsearch.node.pipeline.ingest.documents.preprocessed** | Number of documents preprocessed by the ingest pipeline. | {documents} | Sum(Int) | <ul> <li>ingest_pipeline_name</li> </ul> |
| **elasticsearch.node.pipeline.ingest.operations.failed** | Total number of failed operations for the ingest pipeline. | {operation} | Sum(Int) | <ul> <li>ingest_pipeline_name</li> </ul> |
Expand Down Expand Up @@ -92,6 +94,7 @@ metrics:
| Name | Description | Type |
| ---- | ----------- | ---- |
| elasticsearch.cluster.name | The name of the elasticsearch cluster. | String |
| elasticsearch.index.name | The name of the elasticsearch index. | String |
| elasticsearch.node.name | The name of the elasticsearch node. | String |

## Metric attributes
Expand All @@ -109,6 +112,7 @@ metrics:
| document_state (state) | The state of the document. | active, deleted |
| fs_direction (direction) | The direction of filesystem IO. | read, write |
| health_status (status) | The health status of the cluster. | green, yellow, red |
| index_aggregation_type (aggregation) | Type of shard aggregation for index statistics | primary_shards, total |
| indexing_memory_state (state) | State of the indexing memory | current, total |
| indexing_pressure_stage (stage) | Stage of the indexing pressure | coordinating, primary, replica |
| ingest_pipeline_name (name) | Name of the ingest pipeline. | |
Expand Down
1 change: 1 addition & 0 deletions receiver/elasticsearchreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ func createDefaultConfig() config.Receiver {
},
Metrics: metadata.DefaultMetricsSettings(),
Nodes: []string{"_all"},
Indices: []string{"_all"},
}
}

Expand Down
Loading

0 comments on commit 42003a1

Please sign in to comment.