Skip to content

Commit

Permalink
OpenTelemetry Protocol with Apache Arrow Receiver component (open-tel…
Browse files Browse the repository at this point in the history
…emetry#32015)

Description:

This is the same code as OTel-Arrow at
[14c63d1eaac7c53585e6b9195d09f1f9703869ed](open-telemetry@c73fd92),
at which point lint fixes required for this repository were applied.
Only import statements change here, to match the host repository.

Link to tracking Issue:
open-telemetry#26491

Testing: Test coverage is approximately 90%.

Documentation: I double-checked and the existing README, no changes
required.

---------

Signed-off-by: Alex Boten <[email protected]>
Co-authored-by: Alex Boten <[email protected]>
  • Loading branch information
jmacd and codeboten committed Apr 24, 2024
1 parent 05e3e4b commit 9527fbc
Show file tree
Hide file tree
Showing 20 changed files with 2,974 additions and 73 deletions.
27 changes: 27 additions & 0 deletions .chloggen/otelarrowreceiver.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: new_component

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: OpenTelemetry Protocol with Apache Arrow Receiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Implementation copied from opentelemetry/otel-arrow repository @v0.20.0.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [26491]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
17 changes: 5 additions & 12 deletions receiver/otelarrowreceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ import (
// Protocols is the configuration for the supported protocols.
type Protocols struct {
GRPC configgrpc.ServerConfig `mapstructure:"grpc"`
Arrow ArrowSettings `mapstructure:"arrow"`
Arrow ArrowConfig `mapstructure:"arrow"`
}

// ArrowSettings support configuring the Arrow receiver.
type ArrowSettings struct {
// ArrowConfig support configuring the Arrow receiver.
type ArrowConfig struct {
// MemoryLimitMiB is the size of a shared memory region used
// by all Arrow streams, in MiB. When too much load is
// passing through, they will see ResourceExhausted errors.
Expand All @@ -35,16 +35,9 @@ type Config struct {
}

var _ component.Config = (*Config)(nil)
var _ component.ConfigValidator = (*ArrowConfig)(nil)

// Validate checks the receiver configuration is valid
func (cfg *Config) Validate() error {
if err := cfg.Arrow.Validate(); err != nil {
return err
}
return nil
}

func (cfg *ArrowSettings) Validate() error {
func (cfg *ArrowConfig) Validate() error {
if err := cfg.Zstd.Validate(); err != nil {
return fmt.Errorf("zstd decoder: invalid configuration: %w", err)
}
Expand Down
20 changes: 13 additions & 7 deletions receiver/otelarrowreceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"testing"
"time"

"github.com/open-telemetry/otel-arrow/collector/compression/zstd"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
Expand Down Expand Up @@ -77,11 +76,8 @@ func TestUnmarshalConfig(t *testing.T) {
},
},
},
Arrow: ArrowSettings{
Arrow: ArrowConfig{
MemoryLimitMiB: 123,
Zstd: zstd.DecoderConfig{
MemoryLimitMiB: 8,
},
},
},
}, cfg)
Expand All @@ -104,13 +100,21 @@ func TestUnmarshalConfigUnix(t *testing.T) {
},
ReadBufferSize: 512 * 1024,
},
Arrow: ArrowSettings{
Arrow: ArrowConfig{
MemoryLimitMiB: defaultMemoryLimitMiB,
},
},
}, cfg)
}

func TestUnmarshalConfigTypoDefaultProtocol(t *testing.T) {
cm, err := confmaptest.LoadConf(filepath.Join("testdata", "typo_default_proto_config.yaml"))
require.NoError(t, err)
factory := NewFactory()
cfg := factory.CreateDefaultConfig()
assert.EqualError(t, component.UnmarshalConfig(cm, cfg), "1 error(s) decoding:\n\n* 'protocols' has invalid keys: htttp")
}

func TestUnmarshalConfigInvalidProtocol(t *testing.T) {
cm, err := confmaptest.LoadConf(filepath.Join("testdata", "bad_proto_config.yaml"))
require.NoError(t, err)
Expand All @@ -121,5 +125,7 @@ func TestUnmarshalConfigInvalidProtocol(t *testing.T) {

func TestUnmarshalConfigNoProtocols(t *testing.T) {
cfg := Config{}
assert.Error(t, component.ValidateConfig(cfg))
// This now produces an error due to breaking change.
// https://github.com/open-telemetry/opentelemetry-collector/pull/9385
assert.ErrorContains(t, component.ValidateConfig(cfg), "invalid transport type")
}
33 changes: 15 additions & 18 deletions receiver/otelarrowreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ package otelarrowreceiver // import "github.com/open-telemetry/opentelemetry-col
import (
"context"

"github.com/open-telemetry/otel-arrow/collector/sharedcomponent"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/config/confignet"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/receiver"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/otelarrowreceiver/internal/metadata"
)

Expand Down Expand Up @@ -44,7 +44,7 @@ func createDefaultConfig() component.Config {
// We almost write 0 bytes, so no need to tune WriteBufferSize.
ReadBufferSize: 512 * 1024,
},
Arrow: ArrowSettings{
Arrow: ArrowConfig{
MemoryLimitMiB: defaultMemoryLimitMiB,
},
},
Expand All @@ -57,17 +57,16 @@ func createTraces(
set receiver.CreateSettings,
cfg component.Config,
nextConsumer consumer.Traces,
) (_ receiver.Traces, err error) {
) (receiver.Traces, error) {
oCfg := cfg.(*Config)
r := receivers.GetOrAdd(oCfg, func() (comp component.Component) {
comp, err = newOTelArrowReceiver(oCfg, set)
return
r, err := receivers.GetOrAdd(oCfg, func() (*otelArrowReceiver, error) {
return newOTelArrowReceiver(oCfg, set)
})
if err != nil {
return nil, err
}

r.Unwrap().(*otelArrowReceiver).registerTraceConsumer(nextConsumer)
r.Unwrap().registerTraceConsumer(nextConsumer)
return r, nil
}

Expand All @@ -77,17 +76,16 @@ func createMetrics(
set receiver.CreateSettings,
cfg component.Config,
consumer consumer.Metrics,
) (_ receiver.Metrics, err error) {
) (receiver.Metrics, error) {
oCfg := cfg.(*Config)
r := receivers.GetOrAdd(oCfg, func() (comp component.Component) {
comp, err = newOTelArrowReceiver(oCfg, set)
return comp
r, err := receivers.GetOrAdd(oCfg, func() (*otelArrowReceiver, error) {
return newOTelArrowReceiver(oCfg, set)
})
if err != nil {
return nil, err
}

r.Unwrap().(*otelArrowReceiver).registerMetricsConsumer(consumer)
r.Unwrap().registerMetricsConsumer(consumer)
return r, nil
}

Expand All @@ -97,17 +95,16 @@ func createLog(
set receiver.CreateSettings,
cfg component.Config,
consumer consumer.Logs,
) (_ receiver.Logs, err error) {
) (receiver.Logs, error) {
oCfg := cfg.(*Config)
r := receivers.GetOrAdd(oCfg, func() (comp component.Component) {
comp, err = newOTelArrowReceiver(oCfg, set)
return comp
r, err := receivers.GetOrAdd(oCfg, func() (*otelArrowReceiver, error) {
return newOTelArrowReceiver(oCfg, set)
})
if err != nil {
return nil, err
}

r.Unwrap().(*otelArrowReceiver).registerLogsConsumer(consumer)
r.Unwrap().registerLogsConsumer(consumer)
return r, nil
}

Expand All @@ -117,4 +114,4 @@ func createLog(
// create separate objects, they must use one otelArrowReceiver object per configuration.
// When the receiver is shutdown it should be removed from this map so the same configuration
// can be recreated successfully.
var receivers = sharedcomponent.NewSharedComponents()
var receivers = sharedcomponent.NewSharedComponents[*Config, *otelArrowReceiver]()
16 changes: 7 additions & 9 deletions receiver/otelarrowreceiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,29 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/receiver/otelar
go 1.21.0

require (
github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent v0.99.0
github.com/open-telemetry/otel-arrow v0.18.0
github.com/open-telemetry/otel-arrow/collector v0.22.0
github.com/stretchr/testify v1.9.0
go.opentelemetry.io/collector v0.99.0
go.opentelemetry.io/collector/component v0.99.0
go.opentelemetry.io/collector/config/configauth v0.99.0
go.opentelemetry.io/collector/config/configgrpc v0.99.0
go.opentelemetry.io/collector/config/confignet v0.99.0
go.opentelemetry.io/collector/config/configtelemetry v0.99.0
go.opentelemetry.io/collector/config/configtls v0.99.0
go.opentelemetry.io/collector/confmap v0.99.0
go.opentelemetry.io/collector/consumer v0.99.0
go.opentelemetry.io/collector/extension/auth v0.99.0
go.opentelemetry.io/collector/pdata v1.6.0
go.opentelemetry.io/collector/receiver v0.99.0
go.opentelemetry.io/otel v1.25.0
go.opentelemetry.io/otel/metric v1.25.0
go.opentelemetry.io/otel/trace v1.25.0
go.uber.org/goleak v1.3.0
go.uber.org/mock v0.4.0
go.uber.org/multierr v1.11.0
go.uber.org/zap v1.27.0
golang.org/x/net v0.24.0
google.golang.org/grpc v1.63.2
)

Expand Down Expand Up @@ -61,23 +67,17 @@ require (
github.com/prometheus/procfs v0.12.0 // indirect
github.com/x448/float16 v0.8.4 // indirect
github.com/zeebo/xxh3 v1.0.2 // indirect
go.opentelemetry.io/collector v0.99.0 // indirect
go.opentelemetry.io/collector/config/configauth v0.99.0 // indirect
go.opentelemetry.io/collector/config/configcompression v1.6.0 // indirect
go.opentelemetry.io/collector/config/configopaque v1.6.0 // indirect
go.opentelemetry.io/collector/config/configtelemetry v0.99.0 // indirect
go.opentelemetry.io/collector/config/internal v0.99.0 // indirect
go.opentelemetry.io/collector/exporter v0.99.0 // indirect
go.opentelemetry.io/collector/extension v0.99.0 // indirect
go.opentelemetry.io/collector/featuregate v1.6.0 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.50.0 // indirect
go.opentelemetry.io/otel v1.25.0 // indirect
go.opentelemetry.io/otel/exporters/prometheus v0.47.0 // indirect
go.opentelemetry.io/otel/sdk v1.25.0 // indirect
go.opentelemetry.io/otel/sdk/metric v1.25.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/mod v0.13.0 // indirect
golang.org/x/net v0.24.0 // indirect
golang.org/x/sys v0.19.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/tools v0.14.0 // indirect
Expand All @@ -86,5 +86,3 @@ require (
google.golang.org/protobuf v1.33.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent => ../../internal/sharedcomponent
2 changes: 2 additions & 0 deletions receiver/otelarrowreceiver/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 9527fbc

Please sign in to comment.