Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[exporter/file] Add group_by configuration #31396

Merged
merged 10 commits into from
Mar 12, 2024
27 changes: 27 additions & 0 deletions .chloggen/file-exporter-group-by-attr.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: fileexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Added the option to write telemetry data into multiple files, where the file path is based on a resource attribute.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [24654]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
16 changes: 15 additions & 1 deletion exporter/fileexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ Exporter supports the following features:

+ Support for compressing the telemetry data before exporting.

+ Support for writing into multiple files, where the file path is determined by a resource attribute.

Please note that there is no guarantee that exact field names will remain stable.
This intended for primarily for debugging Collector without setting up backends.
andrzej-stencel marked this conversation as resolved.
Show resolved Hide resolved

The official [opentelemetry-collector-contrib container](https://hub.docker.com/r/otel/opentelemetry-collector-contrib/tags#!) does not have a writable filesystem by default since it's built on the `scratch` layer.
As such, you will need to create a writable directory for the path, potentially by mounting writable volumes or creating a custom image.
Expand All @@ -51,6 +51,11 @@ The following settings are optional:
- `flush_interval`[default: 1s]: `time.Duration` interval between flushes. See [time.ParseDuration](https://pkg.go.dev/time#ParseDuration) for valid formats.
NOTE: a value without unit is in nanoseconds and `flush_interval` is ignored and writes are not buffered if `rotation` is set.

- `group_by` enables writing to separate files based on a resource attribute.
- enabled: [default: false] enables group_by. When group_by is enabled, rotation setting is ignored.
- resource_attribute: [default: fileexporter.path_segment]: specifies the name of the resource attribute that contains the path segment of the file to write to. The final path will be the `path` config value, with the `*` replaced with the value of this resource attribute.
- max_open_files: [default: 100]: specifies the maximum number of open file descriptors for the output files.

## File Rotation
Telemetry data is exported to a single file by default.
`fileexporter` only enables file rotation when the user specifies `rotation:` in the config. However, if specified, related default settings would apply.
Expand Down Expand Up @@ -78,6 +83,15 @@ When `format` is json and `compression` is none , telemetry data is written to f

Otherwise, when using `proto` format or any kind of encoding, each encoded object is preceded by 4 bytes (an unsigned 32 bit integer) which represent the number of bytes contained in the encoded object.When we need read the messages back in, we read the size, then read the bytes into a separate buffer, then parse from that buffer.

## Group by attribute

By specifying `group_by.resource_attribute` in the config, the exporter will determine a filepath for each telemetry record, by substituting the value of the resource attribute into the `path` configuration value.

The final path is guaranteed to start with the prefix part of the `path` config value (the part before the `*` character). For example if `path` is "/data/*.json", and the resource attribute value is "../etc/my_config", then the final path will be sanitized to "/data/etc/my_config.json".

The final path can contain path separators (`/`). The exporter will create missing directories recursively (similarly to `mkdir -p`).

Grouping by attribute currently only supports a **single** **resource** attribute. If you would like to use multiple attributes, please use [Transform processor](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/processor/transformprocessor) create a routing key. If you would like to use a non-resource level (eg: Log/Metric/DataPoint) attribute, please use [Group by Attributes processor](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/processor/groupbyattrsprocessor) first.

## Example:

Expand Down
38 changes: 37 additions & 1 deletion exporter/fileexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package fileexporter // import "github.com/open-telemetry/opentelemetry-collecto

import (
"errors"
"strings"
"time"

"go.opentelemetry.io/collector/component"
Expand All @@ -22,7 +23,8 @@ type Config struct {
// Path of the file to write to. Path is relative to current directory.
Path string `mapstructure:"path"`

// Rotation defines an option about rotation of telemetry files
// Rotation defines an option about rotation of telemetry files. Ignored
// when GroupByAttribute is used.
Rotation *Rotation `mapstructure:"rotation"`

// FormatType define the data format of encoded telemetry data
Expand All @@ -38,6 +40,9 @@ type Config struct {
// FlushInterval is the duration between flushes.
// See time.ParseDuration for valid values.
FlushInterval time.Duration `mapstructure:"flush_interval"`

// GroupBy enables writing to separate files based on a resource attribute.
GroupBy *GroupBy `mapstructure:"group_by"`
}

// Rotation an option to rolling log files
Expand All @@ -63,6 +68,21 @@ type Rotation struct {
LocalTime bool `mapstructure:"localtime"`
}

type GroupBy struct {
// Enables group_by. When group_by is enabled, rotation setting is ignored. Default is false.
Enabled bool `mapstructure:"enabled"`

// ResourceAttribute specifies the name of the resource attribute that
// contains the path segment of the file to write to. The final path will be
// the Path config value, with the * replaced with the value of this resource
// attribute. Default is "fileexporter.path_segment".
ResourceAttribute string `mapstructure:"resource_attribute"`

// MaxOpenFiles specifies the maximum number of open file descriptors for the output files.
// The default is 100.
MaxOpenFiles int `mapstructure:"max_open_files"`
}

var _ component.Config = (*Config)(nil)

// Validate checks if the exporter configuration is valid
Expand All @@ -79,6 +99,22 @@ func (cfg *Config) Validate() error {
if cfg.FlushInterval < 0 {
return errors.New("flush_interval must be larger than zero")
}

if cfg.GroupBy != nil && cfg.GroupBy.Enabled {
pathParts := strings.Split(cfg.Path, "*")
if len(pathParts) != 2 {
return errors.New("path must contain exatcly one * when group_by is enabled")
}

if len(pathParts[0]) == 0 {
return errors.New("path must not start with * when group_by is enabled")
}

if cfg.GroupBy.ResourceAttribute == "" {
return errors.New("resource_attribute must not be empty when group_by is enabled")
}
}

return nil
}

Expand Down
66 changes: 66 additions & 0 deletions exporter/fileexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ func TestLoadConfig(t *testing.T) {
},
FormatType: formatTypeJSON,
FlushInterval: time.Second,
GroupBy: &GroupBy{
MaxOpenFiles: defaultMaxOpenFiles,
ResourceAttribute: defaultResourceAttribute,
},
},
},
{
Expand All @@ -54,6 +58,10 @@ func TestLoadConfig(t *testing.T) {
FormatType: formatTypeProto,
Compression: compressionZSTD,
FlushInterval: time.Second,
GroupBy: &GroupBy{
MaxOpenFiles: defaultMaxOpenFiles,
ResourceAttribute: defaultResourceAttribute,
},
},
},
{
Expand All @@ -65,6 +73,10 @@ func TestLoadConfig(t *testing.T) {
MaxBackups: defaultMaxBackups,
},
FlushInterval: time.Second,
GroupBy: &GroupBy{
MaxOpenFiles: defaultMaxOpenFiles,
ResourceAttribute: defaultResourceAttribute,
},
},
},
{
Expand All @@ -77,6 +89,10 @@ func TestLoadConfig(t *testing.T) {
},
FormatType: formatTypeJSON,
FlushInterval: time.Second,
GroupBy: &GroupBy{
MaxOpenFiles: defaultMaxOpenFiles,
ResourceAttribute: defaultResourceAttribute,
},
},
},
{
Expand All @@ -93,6 +109,10 @@ func TestLoadConfig(t *testing.T) {
Path: "./flushed",
FlushInterval: 5,
FormatType: formatTypeJSON,
GroupBy: &GroupBy{
MaxOpenFiles: defaultMaxOpenFiles,
ResourceAttribute: defaultResourceAttribute,
},
},
},
{
Expand All @@ -101,6 +121,10 @@ func TestLoadConfig(t *testing.T) {
Path: "./flushed",
FlushInterval: 5 * time.Second,
FormatType: formatTypeJSON,
GroupBy: &GroupBy{
MaxOpenFiles: defaultMaxOpenFiles,
ResourceAttribute: defaultResourceAttribute,
},
},
},
{
Expand All @@ -109,6 +133,10 @@ func TestLoadConfig(t *testing.T) {
Path: "./flushed",
FlushInterval: 500 * time.Millisecond,
FormatType: formatTypeJSON,
GroupBy: &GroupBy{
MaxOpenFiles: defaultMaxOpenFiles,
ResourceAttribute: defaultResourceAttribute,
},
},
},
{
Expand All @@ -119,6 +147,44 @@ func TestLoadConfig(t *testing.T) {
id: component.NewIDWithName(metadata.Type, ""),
errorMessage: "path must be non-empty",
},
{
id: component.NewIDWithName(metadata.Type, "group_by"),
expected: &Config{
Path: "./group_by/*.json",
FlushInterval: time.Second,
FormatType: formatTypeJSON,
GroupBy: &GroupBy{
Enabled: true,
MaxOpenFiles: 10,
ResourceAttribute: "dummy",
},
},
},
{
id: component.NewIDWithName(metadata.Type, "group_by_defaults"),
expected: &Config{
Path: "./group_by/*.json",
FlushInterval: time.Second,
FormatType: formatTypeJSON,
GroupBy: &GroupBy{
Enabled: true,
MaxOpenFiles: defaultMaxOpenFiles,
ResourceAttribute: defaultResourceAttribute,
},
},
},
{
id: component.NewIDWithName(metadata.Type, "group_by_invalid_path"),
errorMessage: "path must contain exatcly one * when group_by is enabled",
},
{
id: component.NewIDWithName(metadata.Type, "group_by_invalid_path2"),
errorMessage: "path must not start with * when group_by is enabled",
},
{
id: component.NewIDWithName(metadata.Type, "group_by_empty_resource_attribute"),
errorMessage: "resource_attribute must not be empty when group_by is enabled",
},
}

for _, tt := range tests {
Expand Down
33 changes: 25 additions & 8 deletions exporter/fileexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.uber.org/zap"
"gopkg.in/natefinch/lumberjack.v2"

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/fileexporter/internal/metadata"
Expand All @@ -32,6 +33,10 @@ const (

// the type of compression codec
compressionZSTD = "zstd"

defaultMaxOpenFiles = 100

defaultResourceAttribute = "fileexporter.path_segment"
)

type FileExporter interface {
Expand All @@ -55,6 +60,10 @@ func createDefaultConfig() component.Config {
return &Config{
FormatType: formatTypeJSON,
Rotation: &Rotation{MaxBackups: defaultMaxBackups},
GroupBy: &GroupBy{
ResourceAttribute: defaultResourceAttribute,
MaxOpenFiles: defaultMaxOpenFiles,
},
}
}

Expand All @@ -63,7 +72,7 @@ func createTracesExporter(
set exporter.CreateSettings,
cfg component.Config,
) (exporter.Traces, error) {
fe := getOrCreateFileExporter(cfg)
fe := getOrCreateFileExporter(cfg, set.Logger)
return exporterhelper.NewTracesExporter(
ctx,
set,
Expand All @@ -80,7 +89,7 @@ func createMetricsExporter(
set exporter.CreateSettings,
cfg component.Config,
) (exporter.Metrics, error) {
fe := getOrCreateFileExporter(cfg)
fe := getOrCreateFileExporter(cfg, set.Logger)
return exporterhelper.NewMetricsExporter(
ctx,
set,
Expand All @@ -97,7 +106,7 @@ func createLogsExporter(
set exporter.CreateSettings,
cfg component.Config,
) (exporter.Logs, error) {
fe := getOrCreateFileExporter(cfg)
fe := getOrCreateFileExporter(cfg, set.Logger)
return exporterhelper.NewLogsExporter(
ctx,
set,
Expand All @@ -113,20 +122,28 @@ func createLogsExporter(
// or returns the already cached one. Caching is required because the factory is asked trace and
// metric receivers separately when it gets CreateTracesReceiver() and CreateMetricsReceiver()
// but they must not create separate objects, they must use one Exporter object per configuration.
func getOrCreateFileExporter(cfg component.Config) FileExporter {
func getOrCreateFileExporter(cfg component.Config, logger *zap.Logger) FileExporter {
conf := cfg.(*Config)
fe := exporters.GetOrAdd(cfg, func() component.Component {
return newFileExporter(conf)
return newFileExporter(conf, logger)
})

c := fe.Unwrap()
return c.(FileExporter)
}

func newFileExporter(conf *Config) FileExporter {
return &fileExporter{
conf: conf,
func newFileExporter(conf *Config, logger *zap.Logger) FileExporter {
if conf.GroupBy == nil || !conf.GroupBy.Enabled {
return &fileExporter{
conf: conf,
}
}

return &groupingFileExporter{
conf: conf,
logger: logger,
}

}

func newFileWriter(path string, rotation *Rotation, flushInterval time.Duration, export exportFunc) (*fileWriter, error) {
Expand Down
9 changes: 1 addition & 8 deletions exporter/fileexporter/file_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,7 @@ func (e *fileExporter) consumeLogs(_ context.Context, ld plog.Logs) error {

// Start starts the flush timer if set.
func (e *fileExporter) Start(_ context.Context, _ component.Host) error {
e.marshaller = &marshaller{
formatType: e.conf.FormatType,
tracesMarshaler: tracesMarshalers[e.conf.FormatType],
metricsMarshaler: metricsMarshalers[e.conf.FormatType],
logsMarshaler: logsMarshalers[e.conf.FormatType],
compression: e.conf.Compression,
compressor: buildCompressor(e.conf.Compression),
}
e.marshaller = newMarshaller(e.conf)
export := buildExportFunc(e.conf)

var err error
Expand Down
5 changes: 3 additions & 2 deletions exporter/fileexporter/file_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.uber.org/zap"
"gopkg.in/natefinch/lumberjack.v2"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/testdata"
Expand Down Expand Up @@ -126,7 +127,7 @@ func TestFileTracesExporter(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
conf := tt.args.conf
feI := newFileExporter(conf)
feI := newFileExporter(conf, zap.NewNop())
require.IsType(t, &fileExporter{}, feI)
fe := feI.(*fileExporter)

Expand Down Expand Up @@ -632,7 +633,7 @@ func TestFlushing(t *testing.T) {
// Wrap the buffer with the buffered writer closer that implements flush() method.
bwc := newBufferedWriteCloser(buf)
// Create a file exporter with flushing enabled.
feI := newFileExporter(cfg)
feI := newFileExporter(cfg, zap.NewNop())
assert.IsType(t, &fileExporter{}, feI)
fe := feI.(*fileExporter)

Expand Down
Loading