Skip to content

Commit

Permalink
[receiver/elasticsearch] Implement Client (open-telemetry#7019)
Browse files Browse the repository at this point in the history
* implement Elasticsearch client

* Add status to cluster health struct

* add jvm uptime to nodestats

* remove unused sample payload (for now)

* Allow client to specify nodes to get stats from

* add empty nodes check, add test for nil/empty nodes

* go mod tidy

* minor wording tweaks

* add in max heap mem to nodestats model

* changelog

* fix lint errors

* addlicense

* re-order imports in client.go

* make porto

* add newline to sample health payload

* fix disjoint client error handling

* take in config struct for client creation

* omitempty on IOStats

* add header to request 7.x compatible response
  • Loading branch information
BinaryFissionGames authored Jan 6, 2022
1 parent 88683b9 commit 62a1fd4
Show file tree
Hide file tree
Showing 8 changed files with 1,425 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
## 💡 Enhancements 💡

- `couchdbreceiver`: Add couchdb client (#6880)
- `elasticsearchreceiver`: Implement scraper client (#7019)
- `prometheusremotewriteexporter`: Handling Staleness flag from OTLP (#6679)
- `prometheusexporter`: Handling Staleness flag from OTLP (#6805)
- `prometheusreceiver`: Set OTLP no-data-present flag for stale scraped metrics. (#7043)
Expand Down
167 changes: 167 additions & 0 deletions receiver/elasticsearchreceiver/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package elasticsearchreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/elasticsearchreceiver"

import (
"context"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"net/url"
"strings"

"go.opentelemetry.io/collector/component"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/elasticsearchreceiver/internal/model"
)

var (
errUnauthenticated = errors.New("status 401, unauthenticated")
errUnauthorized = errors.New("status 403, unauthorized")
)

// elasticsearchClient defines the interface to retrieve metrics from an Elasticsearch cluster.
type elasticsearchClient interface {
NodeStats(ctx context.Context, nodes []string) (*model.NodeStats, error)
ClusterHealth(ctx context.Context) (*model.ClusterHealth, error)
}

// defaultElasticsearchClient is the main implementation of elasticsearchClient.
// It retrieves the required metrics from Elasticsearch's REST api.
type defaultElasticsearchClient struct {
client *http.Client
endpoint *url.URL
authHeader string
logger *zap.Logger
}

var _ elasticsearchClient = (*defaultElasticsearchClient)(nil)

func newElasticsearchClient(logger *zap.Logger, c Config, h component.Host) (*defaultElasticsearchClient, error) {
client, err := c.HTTPClientSettings.ToClient(h.GetExtensions())
if err != nil {
return nil, err
}

endpoint, err := url.Parse(c.Endpoint)
if err != nil {
return nil, err
}

var authHeader string
if c.Username != "" && c.Password != "" {
userPass := fmt.Sprintf("%s:%s", c.Username, c.Password)
authb64 := base64.StdEncoding.EncodeToString([]byte(userPass))
authHeader = fmt.Sprintf("Basic %s", authb64)
}

return &defaultElasticsearchClient{
client: client,
authHeader: authHeader,
endpoint: endpoint,
logger: logger,
}, nil
}

// nodeStatsMetrics is a comma separated list of metrics that will be gathered from NodeStats.
// The available metrics are documented here for Elasticsearch 7.9:
// https://www.elastic.co/guide/en/elasticsearch/reference/7.9/cluster-nodes-stats.html#cluster-nodes-stats-api-path-params
const nodeStatsMetrics = "indices,process,jvm,thread_pool,transport,http,fs"

// nodeStatsIndexMetrics is a comma separated list of index metrics that will be gathered from NodeStats.
const nodeStatsIndexMetrics = "store,docs,indexing,get,search,merge,refresh,flush,warmer,query_cache,fielddata"

func (c defaultElasticsearchClient) NodeStats(ctx context.Context, nodes []string) (*model.NodeStats, error) {
var nodeSpec string
if len(nodes) > 0 {
nodeSpec = strings.Join(nodes, ",")
} else {
nodeSpec = "_all"
}

nodeStatsPath := fmt.Sprintf("_nodes/%s/stats/%s/%s", nodeSpec, nodeStatsMetrics, nodeStatsIndexMetrics)

body, err := c.doRequest(ctx, nodeStatsPath)
if err != nil {
return nil, err
}

nodeStats := model.NodeStats{}
err = json.Unmarshal(body, &nodeStats)
return &nodeStats, err
}

func (c defaultElasticsearchClient) ClusterHealth(ctx context.Context) (*model.ClusterHealth, error) {
body, err := c.doRequest(ctx, "_cluster/health")
if err != nil {
return nil, err
}

clusterHealth := model.ClusterHealth{}
err = json.Unmarshal(body, &clusterHealth)
return &clusterHealth, err
}

func (c defaultElasticsearchClient) doRequest(ctx context.Context, path string) ([]byte, error) {
endpoint, err := c.endpoint.Parse(path)
if err != nil {
return nil, err
}

req, err := http.NewRequestWithContext(ctx, "GET", endpoint.String(), nil)
if err != nil {
return nil, err
}

if c.authHeader != "" {
req.Header.Add("Authorization", c.authHeader)
}

// See https://www.elastic.co/guide/en/elasticsearch/reference/8.0/api-conventions.html#api-compatibility
// the compatible-with=7 should signal to newer version of Elasticsearch to use the v7.x API format
req.Header.Add("Accept", "application/vnd.elasticsearch+json; compatible-with=7")

resp, err := c.client.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()

if resp.StatusCode == 200 {
return io.ReadAll(resp.Body)
}

body, err := io.ReadAll(resp.Body)
c.logger.Debug(
"Failed to make request to Elasticsearch",
zap.String("path", path),
zap.Int("status_code", resp.StatusCode),
zap.ByteString("body", body),
zap.NamedError("body_read_error", err),
)

switch resp.StatusCode {
case 401:
return nil, errUnauthenticated
case 403:
return nil, errUnauthorized
default:
return nil, fmt.Errorf("got non 200 status code %d", resp.StatusCode)
}
}
Loading

0 comments on commit 62a1fd4

Please sign in to comment.