Skip to content

Commit

Permalink
[exporter/elasticsearch] Support Persistent queue for Elasticsearch e…
Browse files Browse the repository at this point in the history
…xporter (open-telemetry#19445)

Support Persistent queue for Elasticsearch exporter

Signed-off-by: Jared Tan <[email protected]>
  • Loading branch information
JaredTan95 committed Mar 27, 2023
1 parent 559fe46 commit ef5bfc7
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 16 deletions.
16 changes: 16 additions & 0 deletions .chloggen/support_queue_in_elasticsearchexporter.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: elasticsearchexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Support Persistent queue for Elasticsearch exporter.

# One or more tracking issues related to the change
issues: [19424]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
9 changes: 8 additions & 1 deletion exporter/elasticsearchexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,10 @@ This exporter supports sending OpenTelemetry logs to [Elasticsearch](https://www
will reject documents that have duplicate fields.
- `dedot` (default=true): When enabled attributes with `.` will be split into
proper json objects.

- `sending_queue`
- `enabled` (default = false)
- `num_consumers` (default = 10): Number of consumers that dequeue batches; ignored if `enabled` is `false`
- `queue_size` (default = 5000): Maximum number of batches kept in memory before data; ignored if `enabled` is `false`;
### HTTP settings

- `read_buffer_size` (default=0): Read buffer size.
Expand Down Expand Up @@ -99,6 +102,10 @@ exporters:
elasticsearch/log:
endpoints: [http:https://localhost:9200]
logs_index: my_log_index
sending_queue:
enabled: true
num_consumers: 20
queue_size: 1000
······
service:
pipelines:
Expand Down
3 changes: 2 additions & 1 deletion exporter/elasticsearchexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,12 @@ import (

"go.opentelemetry.io/collector/config/configopaque"
"go.opentelemetry.io/collector/config/configtls"
"go.opentelemetry.io/collector/exporter/exporterhelper"
)

// Config defines configuration for Elastic exporter.
type Config struct {

exporterhelper.QueueSettings `mapstructure:"sending_queue"`
// Endpoints holds the Elasticsearch URLs the exporter should send events to.
//
// This setting is required if CloudID is not set and if the
Expand Down
38 changes: 29 additions & 9 deletions exporter/elasticsearchexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/confmap/confmaptest"
"go.opentelemetry.io/collector/exporter/exporterhelper"
)

func TestLoad_DeprecatedIndexConfigOption(t *testing.T) {
Expand All @@ -36,6 +37,11 @@ func TestLoad_DeprecatedIndexConfigOption(t *testing.T) {
require.NoError(t, component.UnmarshalConfig(sub, cfg))

assert.Equal(t, cfg, &Config{
QueueSettings: exporterhelper.QueueSettings{
Enabled: false,
NumConsumers: 10,
QueueSize: 5000,
},
Endpoints: []string{"http:https://localhost:9200"},
CloudID: "TRNMxjXlNJEt",
Index: "my_log_index",
Expand Down Expand Up @@ -76,23 +82,28 @@ func TestLoad_DeprecatedIndexConfigOption(t *testing.T) {
func TestLoadConfig(t *testing.T) {
t.Parallel()

cm, err := confmaptest.LoadConf(filepath.Join("testdata", "config.yaml"))
require.NoError(t, err)

defaultCfg := createDefaultConfig()
defaultCfg.(*Config).Endpoints = []string{"https://elastic.example.com:9200"}

tests := []struct {
id component.ID
expected component.Config
configFile string
id component.ID
expected component.Config
}{
{
id: component.NewIDWithName(typeStr, ""),
expected: defaultCfg,
id: component.NewIDWithName(typeStr, ""),
configFile: "config.yaml",
expected: defaultCfg,
},
{
id: component.NewIDWithName(typeStr, "trace"),
id: component.NewIDWithName(typeStr, "trace"),
configFile: "config.yaml",
expected: &Config{
QueueSettings: exporterhelper.QueueSettings{
Enabled: false,
NumConsumers: 10,
QueueSize: 5000,
},
Endpoints: []string{"https://elastic.example.com:9200"},
CloudID: "TRNMxjXlNJEt",
Index: "",
Expand Down Expand Up @@ -130,8 +141,14 @@ func TestLoadConfig(t *testing.T) {
},
},
{
id: component.NewIDWithName(typeStr, "log"),
id: component.NewIDWithName(typeStr, "log"),
configFile: "config.yaml",
expected: &Config{
QueueSettings: exporterhelper.QueueSettings{
Enabled: true,
NumConsumers: 10,
QueueSize: 5000,
},
Endpoints: []string{"http:https://localhost:9200"},
CloudID: "TRNMxjXlNJEt",
Index: "",
Expand Down Expand Up @@ -175,6 +192,9 @@ func TestLoadConfig(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig()

cm, err := confmaptest.LoadConf(filepath.Join("testdata", tt.configFile))
require.NoError(t, err)

sub, err := cm.Sub(tt.id.String())
require.NoError(t, err)
require.NoError(t, component.UnmarshalConfig(sub, cfg))
Expand Down
21 changes: 16 additions & 5 deletions exporter/elasticsearchexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,10 @@ func NewFactory() exporter.Factory {
}

func createDefaultConfig() component.Config {
qs := exporterhelper.NewDefaultQueueSettings()
qs.Enabled = false
return &Config{
QueueSettings: qs,
HTTPClientSettings: HTTPClientSettings{
Timeout: 90 * time.Second,
},
Expand Down Expand Up @@ -73,11 +76,12 @@ func createLogsExporter(
set exporter.CreateSettings,
cfg component.Config,
) (exporter.Logs, error) {
if cfg.(*Config).Index != "" {
cf := cfg.(*Config)
if cf.Index != "" {
set.Logger.Warn("index option are deprecated and replaced with logs_index and traces_index.")
}

exporter, err := newLogsExporter(set.Logger, cfg.(*Config))
exporter, err := newLogsExporter(set.Logger, cf)
if err != nil {
return nil, fmt.Errorf("cannot configure Elasticsearch logs exporter: %w", err)
}
Expand All @@ -88,17 +92,24 @@ func createLogsExporter(
cfg,
exporter.pushLogsData,
exporterhelper.WithShutdown(exporter.Shutdown),
exporterhelper.WithQueue(cf.QueueSettings),
)
}

func createTracesExporter(ctx context.Context,
set exporter.CreateSettings,
cfg component.Config) (exporter.Traces, error) {

exporter, err := newTracesExporter(set.Logger, cfg.(*Config))
cf := cfg.(*Config)
exporter, err := newTracesExporter(set.Logger, cf)
if err != nil {
return nil, fmt.Errorf("cannot configure Elasticsearch traces exporter: %w", err)
}
return exporterhelper.NewTracesExporter(ctx, set, cfg, exporter.pushTraceData,
exporterhelper.WithShutdown(exporter.Shutdown))
return exporterhelper.NewTracesExporter(
ctx,
set,
cfg,
exporter.pushTraceData,
exporterhelper.WithShutdown(exporter.Shutdown),
exporterhelper.WithQueue(cf.QueueSettings))
}
2 changes: 2 additions & 0 deletions exporter/elasticsearchexporter/testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,5 @@ elasticsearch/log:
bytes: 10485760
retry:
max_requests: 5
sending_queue:
enabled: true

0 comments on commit ef5bfc7

Please sign in to comment.