Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[receiver/elasticsearch]: add scraping indices #14748

Merged
merged 10 commits into from
Oct 7, 2022
Prev Previous commit
Next Next commit
feat: update tests
  • Loading branch information
aboguszewski-sumo committed Oct 7, 2022
commit 502cf8058b257bf27e2075f2e35f1ae574891ff0
2 changes: 2 additions & 0 deletions receiver/elasticsearchreceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ The following settings are optional:
- `metrics` (default: see `DefaultMetricsSettings` [here](./internal/metadata/generated_metrics.go): Allows enabling and disabling specific metrics from being collected in this receiver.
- `nodes` (default: `["_all"]`): Allows specifying node filters that define which nodes are scraped for node-level metrics. See [the Elasticsearch documentation](https://www.elastic.co/guide/en/elasticsearch/reference/7.9/cluster.html#cluster-nodes) for allowed filters. If this option is left explicitly empty, then no node-level metrics will be scraped.
- `skip_cluster_metrics` (default: `false`): If true, cluster-level metrics will not be scraped.
- `indices` (default: `["_all"]`): Allows specifying index filters that define which indices are scraped for index-level metrics. See [the Elasticsearch documentation](https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-stats.html#index-stats-api-path-params) for allowed filters. If this option is left explicitly empty, then no index-level metrics will be scraped.
- `endpoint` (default = `http:https://localhost:9200`): The base URL of the Elasticsearch API for the cluster to monitor.
- `username` (no default): Specifies the username used to authenticate with Elasticsearch using basic auth. Must be specified if password is specified.
- `password` (no default): Specifies the password used to authenticate with Elasticsearch using basic auth. Must be specified if username is specified.
Expand All @@ -36,6 +37,7 @@ receivers:
enabled: false
nodes: ["_local"]
skip_cluster_metrics: true
indices: [".geoip_databases"]
endpoint: http:https://localhost:9200
username: otel
password: password
Expand Down
119 changes: 119 additions & 0 deletions receiver/elasticsearchreceiver/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,11 +366,123 @@ func TestDoRequest404(t *testing.T) {
require.Contains(t, err.Error(), "404")
}

func TestIndexStatsNoPassword(t *testing.T) {
indexJSON, err := os.ReadFile("./testdata/sample_payloads/indices.json")
require.NoError(t, err)

actualIndexStats := model.IndexStats{}
require.NoError(t, json.Unmarshal(indexJSON, &actualIndexStats))

elasticsearchMock := mockServer(t, "", "")
defer elasticsearchMock.Close()

client, err := newElasticsearchClient(componenttest.NewNopTelemetrySettings(), Config{
HTTPClientSettings: confighttp.HTTPClientSettings{
Endpoint: elasticsearchMock.URL,
},
}, componenttest.NewNopHost())
require.NoError(t, err)
ctx := context.Background()
indexStats, err := client.IndexStats(ctx, []string{"_all"})
require.NoError(t, err)

require.Equal(t, &actualIndexStats, indexStats)
}

func TestIndexStatsNilNodes(t *testing.T) {
indexJSON, err := os.ReadFile("./testdata/sample_payloads/indices.json")
require.NoError(t, err)

actualIndexStats := model.IndexStats{}
require.NoError(t, json.Unmarshal(indexJSON, &actualIndexStats))

elasticsearchMock := mockServer(t, "", "")
defer elasticsearchMock.Close()

client, err := newElasticsearchClient(componenttest.NewNopTelemetrySettings(), Config{
HTTPClientSettings: confighttp.HTTPClientSettings{
Endpoint: elasticsearchMock.URL,
},
}, componenttest.NewNopHost())
require.NoError(t, err)

ctx := context.Background()
indexStats, err := client.IndexStats(ctx, nil)
require.NoError(t, err)

require.Equal(t, &actualIndexStats, indexStats)
}

func TestIndexStatsAuthentication(t *testing.T) {
indexJSON, err := os.ReadFile("./testdata/sample_payloads/indices.json")
require.NoError(t, err)

actualIndexStats := model.IndexStats{}
require.NoError(t, json.Unmarshal(indexJSON, &actualIndexStats))

username := "user"
password := "pass"

elasticsearchMock := mockServer(t, username, password)
defer elasticsearchMock.Close()

client, err := newElasticsearchClient(componenttest.NewNopTelemetrySettings(), Config{
HTTPClientSettings: confighttp.HTTPClientSettings{
Endpoint: elasticsearchMock.URL,
},
Username: username,
Password: password,
}, componenttest.NewNopHost())
require.NoError(t, err)

ctx := context.Background()
indexStats, err := client.IndexStats(ctx, []string{"_all"})
require.NoError(t, err)

require.Equal(t, &actualIndexStats, indexStats)
}

func TestIndexStatsNoAuthentication(t *testing.T) {
elasticsearchMock := mockServer(t, "user", "pass")
defer elasticsearchMock.Close()

client, err := newElasticsearchClient(componenttest.NewNopTelemetrySettings(), Config{
HTTPClientSettings: confighttp.HTTPClientSettings{
Endpoint: elasticsearchMock.URL,
},
}, componenttest.NewNopHost())
require.NoError(t, err)

ctx := context.Background()
_, err = client.IndexStats(ctx, []string{"_all"})
require.ErrorIs(t, err, errUnauthenticated)
}

func TestIndexStatsBadAuthentication(t *testing.T) {
elasticsearchMock := mockServer(t, "user", "pass")
defer elasticsearchMock.Close()

client, err := newElasticsearchClient(componenttest.NewNopTelemetrySettings(), Config{
HTTPClientSettings: confighttp.HTTPClientSettings{
Endpoint: elasticsearchMock.URL,
},
Username: "bad_user",
Password: "bad_pass",
}, componenttest.NewNopHost())
require.NoError(t, err)

ctx := context.Background()
_, err = client.IndexStats(ctx, []string{"_all"})
require.ErrorIs(t, err, errUnauthorized)
}

// mockServer gives a mock elasticsearch server for testing; if username or password is included, they will be required for the client.
// otherwise, authorization is ignored.
func mockServer(t *testing.T, username, password string) *httptest.Server {
nodes, err := os.ReadFile("./testdata/sample_payloads/nodes_linux.json")
require.NoError(t, err)
indices, err := os.ReadFile("./testdata/sample_payloads/indices.json")
require.NoError(t, err)
health, err := os.ReadFile("./testdata/sample_payloads/health.json")
require.NoError(t, err)
version, err := os.ReadFile("./testdata/sample_payloads/version.json")
Expand All @@ -395,6 +507,13 @@ func mockServer(t *testing.T, username, password string) *httptest.Server {
return
}

if strings.HasPrefix(req.URL.Path, "/_all/_stats") {
rw.WriteHeader(200)
_, err = rw.Write(indices)
require.NoError(t, err)
return
}

if strings.HasPrefix(req.URL.Path, "/_cluster/health") {
rw.WriteHeader(200)
_, err = rw.Write(health)
Expand Down
5 changes: 5 additions & 0 deletions receiver/elasticsearchreceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ type Config struct {
Nodes []string `mapstructure:"nodes"`
// SkipClusterMetrics indicates whether cluster level metrics from /_cluster/health should be scraped or not.
SkipClusterMetrics bool `mapstructure:"skip_cluster_metrics"`
// Indices defines the indices to scrape.
// See https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-stats.html#index-stats-api-path-params
// for which names are viable.
// If Indices is empty, no indices will be scraped.
Indices []string `mapstructure:"indices"`
// Username is the username used when making REST calls to elasticsearch. Must be specified if Password is. Not required.
Username string `mapstructure:"username"`
// Password is the password used when making REST calls to elasticsearch. Must be specified if Username is. Not required.
Expand Down
1 change: 1 addition & 0 deletions receiver/elasticsearchreceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ func TestLoadConfig(t *testing.T) {
expected: &Config{
SkipClusterMetrics: true,
Nodes: []string{"_local"},
Indices: []string{".geoip_databases"},
ScraperControllerSettings: scraperhelper.ScraperControllerSettings{
ReceiverSettings: config.NewReceiverSettings(config.NewComponentID(typeStr)),
CollectionInterval: 2 * time.Minute,
Expand Down
1 change: 1 addition & 0 deletions receiver/elasticsearchreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ func createDefaultConfig() config.Receiver {
},
Metrics: metadata.DefaultMetricsSettings(),
Nodes: []string{"_all"},
Indices: []string{"_all"},
}
}

Expand Down
9 changes: 6 additions & 3 deletions receiver/elasticsearchreceiver/scraper.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,11 +367,14 @@ func (r *elasticsearchScraper) scrapeClusterMetrics(ctx context.Context, now pco
}

func (r *elasticsearchScraper) scrapeIndicesMetrics(ctx context.Context, now pcommon.Timestamp, errs *scrapererror.ScrapeErrors) {
// TODO: config
indexStats, err := r.client.IndexStats(ctx, []string{})
if len(r.cfg.Indices) == 0 {
return
}

indexStats, err := r.client.IndexStats(ctx, r.cfg.Indices)

if err != nil {
errs.AddPartial(4, err)
errs.AddPartial(0, err)
return
}

Expand Down
19 changes: 19 additions & 0 deletions receiver/elasticsearchreceiver/scraper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ func TestScraper(t *testing.T) {
mockClient.On("Version", mock.Anything).Return(versionNumber(t), nil)
mockClient.On("ClusterHealth", mock.Anything).Return(clusterHealth(t), nil)
mockClient.On("NodeStats", mock.Anything, []string{"_all"}).Return(nodeStats(t), nil)
mockClient.On("IndexStats", mock.Anything, []string{"_all"}).Return(indexStats(t), nil)

sc.client = &mockClient

Expand All @@ -77,6 +78,7 @@ func TestScraperMetricsWithoutDirection(t *testing.T) {
mockClient.On("Version", mock.Anything).Return(versionNumber(t), nil)
mockClient.On("ClusterHealth", mock.Anything).Return(clusterHealth(t), nil)
mockClient.On("NodeStats", mock.Anything, []string{"_all"}).Return(nodeStats(t), nil)
mockClient.On("IndexStats", mock.Anything, []string{"_all"}).Return(indexStats(t), nil)

sc.client = &mockClient

Expand Down Expand Up @@ -104,6 +106,7 @@ func TestScraperSkipClusterMetrics(t *testing.T) {
mockClient.On("Version", mock.Anything).Return(versionNumber(t), nil)
mockClient.On("ClusterHealth", mock.Anything).Return(clusterHealth(t), nil)
mockClient.On("NodeStats", mock.Anything, []string{"_all"}).Return(nodeStats(t), nil)
mockClient.On("IndexStats", mock.Anything, []string{"_all"}).Return(indexStats(t), nil)

sc.client = &mockClient

Expand Down Expand Up @@ -131,6 +134,7 @@ func TestScraperNoNodesMetrics(t *testing.T) {
mockClient.On("Version", mock.Anything).Return(versionNumber(t), nil)
mockClient.On("ClusterHealth", mock.Anything).Return(clusterHealth(t), nil)
mockClient.On("NodeStats", mock.Anything, []string{}).Return(nodeStats(t), nil)
mockClient.On("IndexStats", mock.Anything, []string{"_all"}).Return(indexStats(t), nil)

sc.client = &mockClient

Expand Down Expand Up @@ -182,6 +186,7 @@ func TestScrapingError(t *testing.T) {
mockClient.On("Version", mock.Anything).Return(versionNumber(t), nil)
mockClient.On("NodeStats", mock.Anything, []string{"_all"}).Return(nil, err404)
mockClient.On("ClusterHealth", mock.Anything).Return(clusterHealth(t), nil)
mockClient.On("IndexStats", mock.Anything, []string{"_all"}).Return(indexStats(t), nil)

sc := newElasticSearchScraper(componenttest.NewNopReceiverCreateSettings(), createDefaultConfig().(*Config))
err := sc.start(context.Background(), componenttest.NewNopHost())
Expand All @@ -206,6 +211,7 @@ func TestScrapingError(t *testing.T) {
mockClient.On("Version", mock.Anything).Return(versionNumber(t), nil)
mockClient.On("NodeStats", mock.Anything, []string{"_all"}).Return(nodeStats(t), nil)
mockClient.On("ClusterHealth", mock.Anything).Return(nil, err404)
mockClient.On("IndexStats", mock.Anything, []string{"_all"}).Return(indexStats(t), nil)

sc := newElasticSearchScraper(componenttest.NewNopReceiverCreateSettings(), createDefaultConfig().(*Config))
err := sc.start(context.Background(), componenttest.NewNopHost())
Expand All @@ -231,6 +237,7 @@ func TestScrapingError(t *testing.T) {
mockClient.On("Version", mock.Anything).Return(versionNumber(t), nil)
mockClient.On("NodeStats", mock.Anything, []string{"_all"}).Return(nil, err500)
mockClient.On("ClusterHealth", mock.Anything).Return(nil, err404)
mockClient.On("IndexStats", mock.Anything, []string{"_all"}).Return(indexStats(t), nil)

sc := newElasticSearchScraper(componenttest.NewNopReceiverCreateSettings(), createDefaultConfig().(*Config))
err := sc.start(context.Background(), componenttest.NewNopHost())
Expand All @@ -256,6 +263,7 @@ func TestScrapingError(t *testing.T) {
mockClient.On("Version", mock.Anything).Return(nil, err404)
mockClient.On("NodeStats", mock.Anything, []string{"_all"}).Return(nodeStats(t), nil)
mockClient.On("ClusterHealth", mock.Anything).Return(clusterHealth(t), nil)
mockClient.On("IndexStats", mock.Anything, []string{"_all"}).Return(indexStats(t), nil)

sc := newElasticSearchScraper(componenttest.NewNopReceiverCreateSettings(), createDefaultConfig().(*Config))
err := sc.start(context.Background(), componenttest.NewNopHost())
Expand All @@ -280,6 +288,7 @@ func TestScrapingError(t *testing.T) {
mockClient.On("Version", mock.Anything).Return(nil, err404)
mockClient.On("NodeStats", mock.Anything, []string{"_all"}).Return(nil, err500)
mockClient.On("ClusterHealth", mock.Anything).Return(nil, err404)
mockClient.On("IndexStats", mock.Anything, []string{"_all"}).Return(indexStats(t), nil)

sc := newElasticSearchScraper(componenttest.NewNopReceiverCreateSettings(), createDefaultConfig().(*Config))
err := sc.start(context.Background(), componenttest.NewNopHost())
Expand All @@ -306,6 +315,7 @@ func TestScrapingError(t *testing.T) {
mockClient.On("Version", mock.Anything).Return(versionNumber(t), nil)
mockClient.On("NodeStats", mock.Anything, []string{"_all"}).Return(nodeStats(t), nil)
mockClient.On("ClusterHealth", mock.Anything).Return(ch, nil)
mockClient.On("IndexStats", mock.Anything, []string{"_all"}).Return(indexStats(t), nil)

sc := newElasticSearchScraper(componenttest.NewNopReceiverCreateSettings(), createDefaultConfig().(*Config))
err := sc.start(context.Background(), componenttest.NewNopHost())
Expand Down Expand Up @@ -344,6 +354,15 @@ func nodeStats(t *testing.T) *model.NodeStats {
return &nodeStats
}

func indexStats(t *testing.T) *model.IndexStats {
indexJSON, err := os.ReadFile("./testdata/sample_payloads/indices.json")
require.NoError(t, err)

indexStats := model.IndexStats{}
require.NoError(t, json.Unmarshal(indexJSON, &indexStats))
return &indexStats
}

func versionNumber(t *testing.T) *model.VersionResponse {
versionJSON, err := os.ReadFile("./testdata/sample_payloads/version.json")
require.NoError(t, err)
Expand Down
1 change: 1 addition & 0 deletions receiver/elasticsearchreceiver/testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ elasticsearch:
enabled: false
nodes: [ "_local" ]
skip_cluster_metrics: true
indices: [ ".geoip_databases" ]
endpoint: http:https://example.com:9200
username: otel
password: password
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1143,7 +1143,7 @@
"unit": "{files}"
},
{
"description": "The number of operations completed.",
"description": "The number of operations completed for a node.",
"name": "elasticsearch.node.operations.completed",
"sum": {
"aggregationTemporality": "AGGREGATION_TEMPORALITY_CUMULATIVE",
Expand Down Expand Up @@ -1297,7 +1297,7 @@
"unit": "{operations}"
},
{
"description": "Time spent on operations.",
"description": "Time spent on operations for a node.",
"name": "elasticsearch.node.operations.time",
"sum": {
"aggregationTemporality": "AGGREGATION_TEMPORALITY_CUMULATIVE",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1143,7 +1143,7 @@
"unit": "{files}"
},
{
"description": "The number of operations completed.",
"description": "The number of operations completed for a node.",
"name": "elasticsearch.node.operations.completed",
"sum": {
"aggregationTemporality": "AGGREGATION_TEMPORALITY_CUMULATIVE",
Expand Down Expand Up @@ -1297,7 +1297,7 @@
"unit": "{operations}"
},
{
"description": "Time spent on operations.",
"description": "Time spent on operations for a node.",
"name": "elasticsearch.node.operations.time",
"sum": {
"aggregationTemporality": "AGGREGATION_TEMPORALITY_CUMULATIVE",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1138,7 +1138,7 @@
"unit": "{files}"
},
{
"description": "The number of operations completed.",
"description": "The number of operations completed for a node.",
"name": "elasticsearch.node.operations.completed",
"sum": {
"aggregationTemporality": "AGGREGATION_TEMPORALITY_CUMULATIVE",
Expand Down Expand Up @@ -1292,7 +1292,7 @@
"unit": "{operations}"
},
{
"description": "Time spent on operations.",
"description": "Time spent on operations for a node.",
"name": "elasticsearch.node.operations.time",
"sum": {
"aggregationTemporality": "AGGREGATION_TEMPORALITY_CUMULATIVE",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1146,7 +1146,7 @@
"unit": "{files}"
},
{
"description": "The number of operations completed.",
"description": "The number of operations completed for a node.",
"name": "elasticsearch.node.operations.completed",
"sum": {
"aggregationTemporality": "AGGREGATION_TEMPORALITY_CUMULATIVE",
Expand Down Expand Up @@ -1300,7 +1300,7 @@
"unit": "{operations}"
},
{
"description": "Time spent on operations.",
"description": "Time spent on operations for a node.",
"name": "elasticsearch.node.operations.time",
"sum": {
"aggregationTemporality": "AGGREGATION_TEMPORALITY_CUMULATIVE",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -802,7 +802,7 @@
"unit": "{files}"
},
{
"description": "The number of operations completed.",
"description": "The number of operations completed for a node.",
"name": "elasticsearch.node.operations.completed",
"sum": {
"aggregationTemporality": "AGGREGATION_TEMPORALITY_CUMULATIVE",
Expand Down Expand Up @@ -956,7 +956,7 @@
"unit": "{operations}"
},
{
"description": "Time spent on operations.",
"description": "Time spent on operations for a node.",
"name": "elasticsearch.node.operations.time",
"sum": {
"aggregationTemporality": "AGGREGATION_TEMPORALITY_CUMULATIVE",
Expand Down
Loading