Skip to content

Commit

Permalink
Init Elasticsearch exporter (open-telemetry#2324)
Browse files Browse the repository at this point in the history
* Init Elasticsearch exporter

This is the first step in adding the Elasticsearch exporter. Initially
we will only support the Logs exporter interface, and potentially will
add metrics in the future as well.

This change only provides some boilerplate initializing the exporter.
But the exporter is not yet usable (or part of) any opentelemetry
collector distribution.

The elasticsearch exporter is based on the official
[go-elasticsearch](https://github.com/elastic/go-elasticsearch) client.
We will use the BulkIndexer provided by the client for event publishing.
The client and BulkIndexer provide some support for retrying already.
The Elasticsearch Bulk API can report errors at the HTTP level, but uses
selective ACKs for individual events. This allows us to retry only
failed events and/or reject events that can not be indexed (e.g. due to
an mapping error). The 429 error code might even inidcate that we should
backoff a little before retrying.

**Link to tracking Issue:** open-telemetry#1800

**Testing:** Only configuration loading and validation tests have been
added so far. The exporter currently panics when trying to publish
events. More unit and integration tests will be added in the future.

**Documentation:** All settings that will be available initially are
documented in the README.md file.

* Rename urls setting to endpoints

* fix lint

* Add factory and exporter initialization tests

* Lint checks

* Error lint

* Fix import order

* fix typo

* Do not "shadow" err in a deferred func

* review

* fix typo in exporter_test.go

* use const for environmant variable name in tests

* fix format after gorename

* typo
  • Loading branch information
Steffen Siering committed Feb 18, 2021
1 parent 28d721d commit b5500c8
Show file tree
Hide file tree
Showing 11 changed files with 2,403 additions and 0 deletions.
1 change: 1 addition & 0 deletions exporter/elasticsearchexporter/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
include ../../Makefile.Common
82 changes: 82 additions & 0 deletions exporter/elasticsearchexporter/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
# Elasticsearch Exporter

This exporter supports sending OpenTelemetry logs to [Elasticsearch](https://www.elastic.co/elasticsearch).

## Configuration options

- `endpoints`: List of Elasticsearch URLs. If endpoints and cloudid is missing, the
ELASTICSEARCH_URL environment variable will be used.
- `cloudid` (optional):
[ID](https://www.elastic.co/guide/en/cloud/current/ec-cloud-id.html) of the
Elastic Cloud Cluster to publish events to. The `cloudid` can be used instead
of `endpoints`.
- `num_workers` (optional): Number of workers publishing bulk requests concurrently.
- `index`: The
[index](https://www.elastic.co/guide/en/elasticsearch/reference/current/indices.html)
or [datastream](https://www.elastic.co/guide/en/elasticsearch/reference/current/data-streams.html)
name to publish events to. The default value is `logs-generic-default`.
- `pipeline` (optional): Optional [Ingest Node](https://www.elastic.co/guide/en/elasticsearch/reference/current/ingest.html)
pipeline ID used for processing documents published by the exporter.
- `flush`: Event bulk buffer flush settings
- `bytes` (default=5242880): Write buffer flush limit.
- `interval` (default=30s): Write buffer time limit.
- `retry`: Event retry settings
- `enabled` (default=true): Enable/Disable event retry on error. Retry
support is enabled by default.
- `max_requests` (default=3): Number of HTTP request retries.
- `initial_interval` (default=100ms): Initial waiting time if a HTTP request failed.
- `max_interval` (default=1m): Max waiting time if a HTTP request failed.
- `mapping`: Events are encoded to JSON. The `mapping` allows users to
configure additional mapping rules.
- `mode` (default=ecs): The fields naming mode. valid modes are:
- `none`: Use original fields and event structure from the OTLP event.
- `ecs`: Try to map fields defined in the
[OpenTelemetry Semantic Conventions](https://github.com/open-telemetry/opentelemetry-specification/tree/main/semantic_conventions)
to [Elastic Common Schema (ECS)](https://www.elastic.co/guide/en/ecs/current/index.html).
- `fields` (optional): Configure additional fields mappings.
- `file` (optional): Read additional field mappings from the provided YAML file.
- `dedup` (default=true): Try to find and remove duplicate fields/attributes
from events before publishing to Elasticsearch. Some structured logging
libraries can produce duplicate fields (for example zap). Elasticsearch
will reject documents that have duplicate fields.
- `dedot` (default=true): When enabled attributes with `.` will be split into
proper json objects.

### HTTP settings

- `read_buffer_size` (default=0): Read buffer size.
- `write_buffer_size` (default=0): Write buffer size used when.
- `timeout` (default=90s): HTTP request time limit.
- `headers` (optional): Headers to be send with each HTTP request.

### Security and Authentication settings

- `user` (optional): Username used for HTTP Basic Authentication.
- `password` (optional): Password used for HTTP Basic Authentication.
- `api_key` (optional): Authorization [API Key](https://www.elastic.co/guide/en/elasticsearch/reference/current/security-api-create-api-key.html).
- `ca_file` (optional): Root Certificate Authority (CA) certificate, for
verifying the server's identity, if TLS is enabled.
- `cert_file` (optional): Client TLS certificate.
- `key_file` (optional): Client TLS key.
- `insecure` (optional): Disable verification of the server's identity, if TLS
is enabled.

### Node Discovery

The Elasticsearch Exporter will check Elasticsearch regularly for available
nodes and updates the list of hosts if discovery is enabled. Newly discovered
nodes will automatically be used for load balancing.

- `discover`:
- `on_start` (optional): If enabled the exporter queries Elasticsearch
for all known nodes in the cluster on startup.
- `interval` (optional): Interval to update the list of Elasticsearch nodes.

## Example

```yaml
exporters:
elasticsearch:
endpoints:
- "https://localhost:9200"
```
228 changes: 228 additions & 0 deletions exporter/elasticsearchexporter/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,228 @@
// Copyright 2020, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http:https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package elasticsearchexporter

import (
"errors"
"fmt"
"os"
"strings"
"time"

"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/config/configtls"
)

// Config defines configuration for Elastic exporter.
type Config struct {
configmodels.ExporterSettings `mapstructure:",squash"`

// Endpoints holds the Elasticsearch URLs the exporter should send events to.
//
// This setting is required if CloudID is not set and if the
// ELASTICSEARCH_URL environment variable is not set.
Endpoints []string `mapstructure:"endpoints"`

// CloudID holds the cloud ID to identify the Elastic Cloud cluster to send events to.
// https://www.elastic.co/guide/en/cloud/current/ec-cloud-id.html
//
// This setting is required if no URL is configured.
CloudID string `mapstructure:"cloudid"`

// NumWorkers configures the number of workers publishing bulk requests.
NumWorkers int `mapstructure:"num_workers"`

// Index configures the index, index alias, or data stream name events should be indexed in.
//
// https://www.elastic.co/guide/en/elasticsearch/reference/current/indices.html
// https://www.elastic.co/guide/en/elasticsearch/reference/current/data-streams.html
//
// This setting is required.
Index string `mapstructure:"index"`

// Pipeline configures the ingest node pipeline name that should be used to process the
// events.
//
// https://www.elastic.co/guide/en/elasticsearch/reference/current/ingest.html
Pipeline string `mapstructure:"pipeline"`

HTTPClientSettings `mapstructure:",squash"`
Discovery DiscoverySettings `mapstructure:"discover"`
Retry RetrySettings `mapstructure:"retry"`
Flush FlushSettings `mapstructure:"flush"`
Mapping MappingsSettings `mapstructure:"mapping"`
}

type HTTPClientSettings struct {
Authentication AuthenticationSettings `mapstructure:",squash"`

// ReadBufferSize for HTTP client. See http.Transport.ReadBufferSize.
ReadBufferSize int `mapstructure:"read_buffer_size"`

// WriteBufferSize for HTTP client. See http.Transport.WriteBufferSize.
WriteBufferSize int `mapstructure:"write_buffer_size"`

// Timeout configures the HTTP request timeout.
Timeout time.Duration `mapstructure:"timeout"`

// Headers allows users to configure optional HTTP headers that
// will be send with each HTTP request.
Headers map[string]string `mapstructure:"headers,omitempty"`

configtls.TLSClientSetting `mapstructure:",squash"`
}

// AuthenticationSettings defines user authentication related settings.
type AuthenticationSettings struct {
// User is used to configure HTTP Basic Authentication.
User string `mapstructure:"user"`

// Password is used to configure HTTP Basic Authentication.
Password string `mapstructure:"password"`

// APIKey is used to configure ApiKey based Authentication.
//
// https://www.elastic.co/guide/en/elasticsearch/reference/current/security-api-create-api-key.html
APIKey string `mapstructure:"api_key"`
}

// DiscoverySettings defines Elasticsearch node discovery related settings.
// The exporter will check Elasticsearch regularly for available nodes
// and updates the list of hosts if discovery is enabled. Newly discovered
// nodes will automatically be used for load balancing.
//
// DiscoverySettings should not be enabled when operating Elasticsearch behind a proxy
// or load balancer.
//
// https://www.elastic.co/blog/elasticsearch-sniffing-best-practices-what-when-why-how
type DiscoverySettings struct {
// OnStart, if set, instructs the exporter to look for available Elasticsearch
// nodes the first time the exporter connects to the cluster.
OnStart bool `mapstructure:"on_start"`

// Interval instructs the exporter to renew the list of Elasticsearch URLs
// with the given interval. URLs will not be updated if Interval is <=0.
Interval time.Duration `mapstructure:"interval"`
}

// FlushSettings defines settings for configuring the write buffer flushing
// policy in the Elasticsearch exporter. The exporter sends a bulk request with
// all events already serialized into the send-buffer.
type FlushSettings struct {
// Bytes sets the send buffer flushing limit.
Bytes int `mapstructure:"bytes"`

// Interval configures the max age of a document in the send buffer.
Interval time.Duration `mapstructure:"interval"`
}

// RetrySettings defines settings for the HTTP request retries in the Elasticsearch exporter.
// Failed sends are retried with exponential backoff.
type RetrySettings struct {
// Enabled allows users to disable retry without having to comment out all settings.
Enabled bool `mapstructure:"enabled"`

// MaxRequests configures how often an HTTP request is retried before it is assumed to be failed.
MaxRequests int `mapstructure:"max_requests"`

// InitialInterval configures the initial waiting time if a request failed.
InitialInterval time.Duration `mapstructure:"initial_interval"`

// MaxInterval configures the max waiting time if consecutive requests failed.
MaxInterval time.Duration `mapstructure:"max_interval"`
}

type MappingsSettings struct {
// Mode configures the field mappings.
Mode string `mapstructure:"mode"`

// Additional field mappings.
Fields map[string]string `mapstructure:"fields"`

// File to read additional fields mappings from.
File string `mapstructure:"file"`

// Try to find and remove duplicate fields
Dedup bool `mapstructure:"dedup"`

Dedot bool `mapstructure:"dedot"`
}

type MappingMode int

const (
MappingNone MappingMode = iota
MappingECS
)

var (
errConfigNoEndpoint = errors.New("endpoints or cloudid must be specified")
errConfigEmptyEndpoint = errors.New("endpoints must not include empty entries")
errConfigNoIndex = errors.New("index must be specified")
)

func (m MappingMode) String() string {
switch m {
case MappingNone:
return ""
case MappingECS:
return "ecs"
default:
return ""
}
}

var mappingModes = func() map[string]MappingMode {
table := map[string]MappingMode{}
for _, m := range []MappingMode{
MappingNone,
MappingECS,
} {
table[strings.ToLower(m.String())] = m
}

// config aliases
table["no"] = MappingNone
table["none"] = MappingNone

return table
}()

const defaultElasticsearchEnvName = "ELASTICSEARCH_URL"

// Validate validates the elasticsearch server configuration.
func (cfg *Config) Validate() error {
if len(cfg.Endpoints) == 0 && cfg.CloudID == "" {
if os.Getenv(defaultElasticsearchEnvName) == "" {
return errConfigNoEndpoint
}
}

for _, endpoint := range cfg.Endpoints {
if endpoint == "" {
return errConfigEmptyEndpoint
}
}

if cfg.Index == "" {
return errConfigNoIndex
}

if _, ok := mappingModes[cfg.Mapping.Mode]; !ok {
return fmt.Errorf("unknown mapping mode %v", cfg.Mapping.Mode)
}

return nil
}
Loading

0 comments on commit b5500c8

Please sign in to comment.