From 9527fbc6defe225a2e43b60a84ef1f6398792549 Mon Sep 17 00:00:00 2001 From: Joshua MacDonald Date: Wed, 24 Apr 2024 12:51:00 -0700 Subject: [PATCH] OpenTelemetry Protocol with Apache Arrow Receiver component (#32015) Description: This is the same code as OTel-Arrow at [14c63d1eaac7c53585e6b9195d09f1f9703869ed](https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/32015/commits/c73fd9243d6b375b27782ee5d0a1e84b5d89e509), at which point lint fixes required for this repository were applied. Only import statements change here, to match the host repository. Link to tracking Issue: https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/26491 Testing: Test coverage is approximately 90%. Documentation: I double-checked and the existing README, no changes required. --------- Signed-off-by: Alex Boten <223565+codeboten@users.noreply.github.com> Co-authored-by: Alex Boten <223565+codeboten@users.noreply.github.com> --- .chloggen/otelarrowreceiver.yaml | 27 + receiver/otelarrowreceiver/config.go | 17 +- receiver/otelarrowreceiver/config_test.go | 20 +- receiver/otelarrowreceiver/factory.go | 33 +- receiver/otelarrowreceiver/go.mod | 16 +- receiver/otelarrowreceiver/go.sum | 2 + .../otelarrowreceiver/internal/arrow/arrow.go | 520 ++++++- .../internal/arrow/arrow_test.go | 1230 +++++++++++++++++ .../internal/arrow/mock/auth.go | 84 ++ .../internal/arrow/mock/consumer.go | 174 +++ .../otelarrowreceiver/internal/logs/otlp.go | 17 +- .../internal/logs/otlp_test.go | 95 ++ .../internal/metrics/otlp.go | 17 +- .../internal/metrics/otlp_test.go | 96 ++ .../otelarrowreceiver/internal/trace/otlp.go | 18 +- .../internal/trace/otlp_test.go | 93 ++ receiver/otelarrowreceiver/otelarrow.go | 6 +- receiver/otelarrowreceiver/otelarrow_test.go | 576 ++++++++ .../otelarrowreceiver/testdata/config.yaml | 3 - .../testdata/typo_default_proto_config.yaml | 3 + 20 files changed, 2974 insertions(+), 73 deletions(-) create mode 100644 .chloggen/otelarrowreceiver.yaml create mode 100644 receiver/otelarrowreceiver/internal/arrow/arrow_test.go create mode 100644 receiver/otelarrowreceiver/internal/arrow/mock/auth.go create mode 100644 receiver/otelarrowreceiver/internal/arrow/mock/consumer.go create mode 100644 receiver/otelarrowreceiver/internal/logs/otlp_test.go create mode 100644 receiver/otelarrowreceiver/internal/metrics/otlp_test.go create mode 100644 receiver/otelarrowreceiver/internal/trace/otlp_test.go create mode 100644 receiver/otelarrowreceiver/otelarrow_test.go create mode 100644 receiver/otelarrowreceiver/testdata/typo_default_proto_config.yaml diff --git a/.chloggen/otelarrowreceiver.yaml b/.chloggen/otelarrowreceiver.yaml new file mode 100644 index 0000000000000..c71f3b425ec52 --- /dev/null +++ b/.chloggen/otelarrowreceiver.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: new_component + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: OpenTelemetry Protocol with Apache Arrow Receiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Implementation copied from opentelemetry/otel-arrow repository @v0.20.0. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [26491] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/receiver/otelarrowreceiver/config.go b/receiver/otelarrowreceiver/config.go index cd3d4190dc148..38b8e2a1aa2ca 100644 --- a/receiver/otelarrowreceiver/config.go +++ b/receiver/otelarrowreceiver/config.go @@ -14,11 +14,11 @@ import ( // Protocols is the configuration for the supported protocols. type Protocols struct { GRPC configgrpc.ServerConfig `mapstructure:"grpc"` - Arrow ArrowSettings `mapstructure:"arrow"` + Arrow ArrowConfig `mapstructure:"arrow"` } -// ArrowSettings support configuring the Arrow receiver. -type ArrowSettings struct { +// ArrowConfig support configuring the Arrow receiver. +type ArrowConfig struct { // MemoryLimitMiB is the size of a shared memory region used // by all Arrow streams, in MiB. When too much load is // passing through, they will see ResourceExhausted errors. @@ -35,16 +35,9 @@ type Config struct { } var _ component.Config = (*Config)(nil) +var _ component.ConfigValidator = (*ArrowConfig)(nil) -// Validate checks the receiver configuration is valid -func (cfg *Config) Validate() error { - if err := cfg.Arrow.Validate(); err != nil { - return err - } - return nil -} - -func (cfg *ArrowSettings) Validate() error { +func (cfg *ArrowConfig) Validate() error { if err := cfg.Zstd.Validate(); err != nil { return fmt.Errorf("zstd decoder: invalid configuration: %w", err) } diff --git a/receiver/otelarrowreceiver/config_test.go b/receiver/otelarrowreceiver/config_test.go index 3b3ca739c5c47..9cb16dab0ecfe 100644 --- a/receiver/otelarrowreceiver/config_test.go +++ b/receiver/otelarrowreceiver/config_test.go @@ -8,7 +8,6 @@ import ( "testing" "time" - "github.com/open-telemetry/otel-arrow/collector/compression/zstd" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" @@ -77,11 +76,8 @@ func TestUnmarshalConfig(t *testing.T) { }, }, }, - Arrow: ArrowSettings{ + Arrow: ArrowConfig{ MemoryLimitMiB: 123, - Zstd: zstd.DecoderConfig{ - MemoryLimitMiB: 8, - }, }, }, }, cfg) @@ -104,13 +100,21 @@ func TestUnmarshalConfigUnix(t *testing.T) { }, ReadBufferSize: 512 * 1024, }, - Arrow: ArrowSettings{ + Arrow: ArrowConfig{ MemoryLimitMiB: defaultMemoryLimitMiB, }, }, }, cfg) } +func TestUnmarshalConfigTypoDefaultProtocol(t *testing.T) { + cm, err := confmaptest.LoadConf(filepath.Join("testdata", "typo_default_proto_config.yaml")) + require.NoError(t, err) + factory := NewFactory() + cfg := factory.CreateDefaultConfig() + assert.EqualError(t, component.UnmarshalConfig(cm, cfg), "1 error(s) decoding:\n\n* 'protocols' has invalid keys: htttp") +} + func TestUnmarshalConfigInvalidProtocol(t *testing.T) { cm, err := confmaptest.LoadConf(filepath.Join("testdata", "bad_proto_config.yaml")) require.NoError(t, err) @@ -121,5 +125,7 @@ func TestUnmarshalConfigInvalidProtocol(t *testing.T) { func TestUnmarshalConfigNoProtocols(t *testing.T) { cfg := Config{} - assert.Error(t, component.ValidateConfig(cfg)) + // This now produces an error due to breaking change. + // https://github.com/open-telemetry/opentelemetry-collector/pull/9385 + assert.ErrorContains(t, component.ValidateConfig(cfg), "invalid transport type") } diff --git a/receiver/otelarrowreceiver/factory.go b/receiver/otelarrowreceiver/factory.go index 4e29b3fdf38db..2751340831517 100644 --- a/receiver/otelarrowreceiver/factory.go +++ b/receiver/otelarrowreceiver/factory.go @@ -6,13 +6,13 @@ package otelarrowreceiver // import "github.com/open-telemetry/opentelemetry-col import ( "context" + "github.com/open-telemetry/otel-arrow/collector/sharedcomponent" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configgrpc" "go.opentelemetry.io/collector/config/confignet" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/receiver" - "github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/otelarrowreceiver/internal/metadata" ) @@ -44,7 +44,7 @@ func createDefaultConfig() component.Config { // We almost write 0 bytes, so no need to tune WriteBufferSize. ReadBufferSize: 512 * 1024, }, - Arrow: ArrowSettings{ + Arrow: ArrowConfig{ MemoryLimitMiB: defaultMemoryLimitMiB, }, }, @@ -57,17 +57,16 @@ func createTraces( set receiver.CreateSettings, cfg component.Config, nextConsumer consumer.Traces, -) (_ receiver.Traces, err error) { +) (receiver.Traces, error) { oCfg := cfg.(*Config) - r := receivers.GetOrAdd(oCfg, func() (comp component.Component) { - comp, err = newOTelArrowReceiver(oCfg, set) - return + r, err := receivers.GetOrAdd(oCfg, func() (*otelArrowReceiver, error) { + return newOTelArrowReceiver(oCfg, set) }) if err != nil { return nil, err } - r.Unwrap().(*otelArrowReceiver).registerTraceConsumer(nextConsumer) + r.Unwrap().registerTraceConsumer(nextConsumer) return r, nil } @@ -77,17 +76,16 @@ func createMetrics( set receiver.CreateSettings, cfg component.Config, consumer consumer.Metrics, -) (_ receiver.Metrics, err error) { +) (receiver.Metrics, error) { oCfg := cfg.(*Config) - r := receivers.GetOrAdd(oCfg, func() (comp component.Component) { - comp, err = newOTelArrowReceiver(oCfg, set) - return comp + r, err := receivers.GetOrAdd(oCfg, func() (*otelArrowReceiver, error) { + return newOTelArrowReceiver(oCfg, set) }) if err != nil { return nil, err } - r.Unwrap().(*otelArrowReceiver).registerMetricsConsumer(consumer) + r.Unwrap().registerMetricsConsumer(consumer) return r, nil } @@ -97,17 +95,16 @@ func createLog( set receiver.CreateSettings, cfg component.Config, consumer consumer.Logs, -) (_ receiver.Logs, err error) { +) (receiver.Logs, error) { oCfg := cfg.(*Config) - r := receivers.GetOrAdd(oCfg, func() (comp component.Component) { - comp, err = newOTelArrowReceiver(oCfg, set) - return comp + r, err := receivers.GetOrAdd(oCfg, func() (*otelArrowReceiver, error) { + return newOTelArrowReceiver(oCfg, set) }) if err != nil { return nil, err } - r.Unwrap().(*otelArrowReceiver).registerLogsConsumer(consumer) + r.Unwrap().registerLogsConsumer(consumer) return r, nil } @@ -117,4 +114,4 @@ func createLog( // create separate objects, they must use one otelArrowReceiver object per configuration. // When the receiver is shutdown it should be removed from this map so the same configuration // can be recreated successfully. -var receivers = sharedcomponent.NewSharedComponents() +var receivers = sharedcomponent.NewSharedComponents[*Config, *otelArrowReceiver]() diff --git a/receiver/otelarrowreceiver/go.mod b/receiver/otelarrowreceiver/go.mod index 2a8432cfee7e3..27da1b28c4847 100644 --- a/receiver/otelarrowreceiver/go.mod +++ b/receiver/otelarrowreceiver/go.mod @@ -3,23 +3,29 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/receiver/otelar go 1.21.0 require ( - github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent v0.99.0 github.com/open-telemetry/otel-arrow v0.18.0 github.com/open-telemetry/otel-arrow/collector v0.22.0 github.com/stretchr/testify v1.9.0 + go.opentelemetry.io/collector v0.99.0 go.opentelemetry.io/collector/component v0.99.0 + go.opentelemetry.io/collector/config/configauth v0.99.0 go.opentelemetry.io/collector/config/configgrpc v0.99.0 go.opentelemetry.io/collector/config/confignet v0.99.0 + go.opentelemetry.io/collector/config/configtelemetry v0.99.0 go.opentelemetry.io/collector/config/configtls v0.99.0 go.opentelemetry.io/collector/confmap v0.99.0 go.opentelemetry.io/collector/consumer v0.99.0 go.opentelemetry.io/collector/extension/auth v0.99.0 go.opentelemetry.io/collector/pdata v1.6.0 go.opentelemetry.io/collector/receiver v0.99.0 + go.opentelemetry.io/otel v1.25.0 go.opentelemetry.io/otel/metric v1.25.0 go.opentelemetry.io/otel/trace v1.25.0 go.uber.org/goleak v1.3.0 + go.uber.org/mock v0.4.0 + go.uber.org/multierr v1.11.0 go.uber.org/zap v1.27.0 + golang.org/x/net v0.24.0 google.golang.org/grpc v1.63.2 ) @@ -61,23 +67,17 @@ require ( github.com/prometheus/procfs v0.12.0 // indirect github.com/x448/float16 v0.8.4 // indirect github.com/zeebo/xxh3 v1.0.2 // indirect - go.opentelemetry.io/collector v0.99.0 // indirect - go.opentelemetry.io/collector/config/configauth v0.99.0 // indirect go.opentelemetry.io/collector/config/configcompression v1.6.0 // indirect go.opentelemetry.io/collector/config/configopaque v1.6.0 // indirect - go.opentelemetry.io/collector/config/configtelemetry v0.99.0 // indirect go.opentelemetry.io/collector/config/internal v0.99.0 // indirect go.opentelemetry.io/collector/exporter v0.99.0 // indirect go.opentelemetry.io/collector/extension v0.99.0 // indirect go.opentelemetry.io/collector/featuregate v1.6.0 // indirect go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.50.0 // indirect - go.opentelemetry.io/otel v1.25.0 // indirect go.opentelemetry.io/otel/exporters/prometheus v0.47.0 // indirect go.opentelemetry.io/otel/sdk v1.25.0 // indirect go.opentelemetry.io/otel/sdk/metric v1.25.0 // indirect - go.uber.org/multierr v1.11.0 // indirect golang.org/x/mod v0.13.0 // indirect - golang.org/x/net v0.24.0 // indirect golang.org/x/sys v0.19.0 // indirect golang.org/x/text v0.14.0 // indirect golang.org/x/tools v0.14.0 // indirect @@ -86,5 +86,3 @@ require ( google.golang.org/protobuf v1.33.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) - -replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent => ../../internal/sharedcomponent diff --git a/receiver/otelarrowreceiver/go.sum b/receiver/otelarrowreceiver/go.sum index c29f2d0cd19d5..a1ec0398076d9 100644 --- a/receiver/otelarrowreceiver/go.sum +++ b/receiver/otelarrowreceiver/go.sum @@ -167,6 +167,8 @@ go.opentelemetry.io/otel/trace v1.25.0 h1:tqukZGLwQYRIFtSQM2u2+yfMVTgGVeqRLPUYx1 go.opentelemetry.io/otel/trace v1.25.0/go.mod h1:hCCs70XM/ljO+BeQkyFnbK28SBIJ/Emuha+ccrCRT7I= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= +go.uber.org/mock v0.4.0 h1:VcM4ZOtdbR4f6VXfiOpwpVJDL6lCReaZ6mw31wqh7KU= +go.uber.org/mock v0.4.0/go.mod h1:a6FSlNadKUHUa9IP5Vyt1zh4fC7uAwxMutEAscFbkZc= go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= diff --git a/receiver/otelarrowreceiver/internal/arrow/arrow.go b/receiver/otelarrowreceiver/internal/arrow/arrow.go index b7495aa061baf..62dbcb9995402 100644 --- a/receiver/otelarrowreceiver/internal/arrow/arrow.go +++ b/receiver/otelarrowreceiver/internal/arrow/arrow.go @@ -4,18 +4,45 @@ package arrow // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/otelarrowreceiver/internal/arrow" import ( + "context" + "errors" "fmt" + "io" + "strings" arrowpb "github.com/open-telemetry/otel-arrow/api/experimental/arrow/v1" "github.com/open-telemetry/otel-arrow/collector/netstats" arrowRecord "github.com/open-telemetry/otel-arrow/pkg/otel/arrow_record" + "go.opentelemetry.io/collector/client" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configgrpc" + "go.opentelemetry.io/collector/config/configtelemetry" "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/extension/auth" + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/collector/receiver" "go.opentelemetry.io/collector/receiver/receiverhelper" + "go.opentelemetry.io/otel" + otelcodes "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/propagation" "go.opentelemetry.io/otel/trace" + "go.uber.org/multierr" + "go.uber.org/zap" + "golang.org/x/net/http2/hpack" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" +) + +const ( + streamFormat = "arrow" + hpackMaxDynamicSize = 4096 + scopeName = "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/otelarrowreceiver" ) var ( @@ -38,13 +65,16 @@ type Receiver struct { arrowpb.UnsafeArrowLogsServiceServer arrowpb.UnsafeArrowMetricsServiceServer - telemetry component.TelemetrySettings - tracer trace.Tracer - obsrecv *receiverhelper.ObsReport - gsettings configgrpc.ServerConfig - authServer auth.Server - newConsumer func() arrowRecord.ConsumerAPI - netReporter netstats.Interface + telemetry component.TelemetrySettings + tracer trace.Tracer + obsrecv *receiverhelper.ObsReport + gsettings configgrpc.ServerConfig + authServer auth.Server + newConsumer func() arrowRecord.ConsumerAPI + netReporter netstats.Interface + recvInFlightBytes metric.Int64UpDownCounter + recvInFlightItems metric.Int64UpDownCounter + recvInFlightRequests metric.Int64UpDownCounter } // New creates a new Receiver reference. @@ -56,9 +86,10 @@ func New( authServer auth.Server, newConsumer func() arrowRecord.ConsumerAPI, netReporter netstats.Interface, -) *Receiver { +) (*Receiver, error) { tracer := set.TelemetrySettings.TracerProvider.Tracer("otel-arrow-receiver") - return &Receiver{ + var errors, err error + recv := &Receiver{ Consumers: cs, obsrecv: obsrecv, telemetry: set.TelemetrySettings, @@ -68,16 +99,477 @@ func New( gsettings: gsettings, netReporter: netReporter, } + + meter := recv.telemetry.MeterProvider.Meter(scopeName) + recv.recvInFlightBytes, err = meter.Int64UpDownCounter( + "otel_arrow_receiver_in_flight_bytes", + metric.WithDescription("Number of bytes in flight"), + metric.WithUnit("By"), + ) + errors = multierr.Append(errors, err) + + recv.recvInFlightItems, err = meter.Int64UpDownCounter( + "otel_arrow_receiver_in_flight_items", + metric.WithDescription("Number of items in flight"), + ) + errors = multierr.Append(errors, err) + + recv.recvInFlightRequests, err = meter.Int64UpDownCounter( + "otel_arrow_receiver_in_flight_requests", + metric.WithDescription("Number of requests in flight"), + ) + errors = multierr.Append(errors, err) + + if errors != nil { + return nil, errors + } + + return recv, nil } -func (r *Receiver) ArrowTraces(_ arrowpb.ArrowTracesService_ArrowTracesServer) error { - return nil +// headerReceiver contains the state necessary to decode per-request metadata +// from an arrow stream. +type headerReceiver struct { + // decoder maintains state across the stream. + decoder *hpack.Decoder + + // includeMetadata as configured by gRPC settings. + includeMetadata bool + + // hasAuthServer indicates that headers must be produced + // independent of includeMetadata. + hasAuthServer bool + + // client connection info from the stream context, (optionally + // if includeMetadata) to be extended with per-request metadata. + connInfo client.Info + + // streamHdrs was translated from the incoming context, will be + // merged with per-request metadata. Note that the contents of + // this map are equivalent to connInfo.Metadata, however that + // library does not let us iterate over the map so we recalculate + // this from the gRPC incoming stream context. + streamHdrs map[string][]string + + // tmpHdrs is used by the decoder's emit function during Write. + tmpHdrs map[string][]string } -func (r *Receiver) ArrowLogs(_ arrowpb.ArrowLogsService_ArrowLogsServer) error { - return nil +func newHeaderReceiver(streamCtx context.Context, as auth.Server, includeMetadata bool) *headerReceiver { + hr := &headerReceiver{ + includeMetadata: includeMetadata, + hasAuthServer: as != nil, + connInfo: client.FromContext(streamCtx), + } + + // Note that we capture the incoming context if there is an + // Auth plugin configured or includeMetadata is set. + if hr.includeMetadata || hr.hasAuthServer { + if smd, ok := metadata.FromIncomingContext(streamCtx); ok { + hr.streamHdrs = smd + } + } + + // Note the hpack decoder supports additional protections, + // such as SetMaxStringLength(), but as we already have limits + // on stream request size, this seems unnecessary. + hr.decoder = hpack.NewDecoder(hpackMaxDynamicSize, hr.tmpHdrsAppend) + + return hr +} + +// combineHeaders calculates per-request Metadata by combining the stream's +// client.Info with additional key:values associated with the arrow batch. +func (h *headerReceiver) combineHeaders(ctx context.Context, hdrsBytes []byte) (context.Context, map[string][]string, error) { + if len(hdrsBytes) == 0 && len(h.streamHdrs) == 0 { + return ctx, nil, nil + } + + if len(hdrsBytes) == 0 { + return h.newContext(ctx, h.streamHdrs), h.streamHdrs, nil + } + + // Note that we will parse the headers even if they are not + // used, to check for validity and/or trace context. Also + // note this code was once optimized to avoid the following + // map allocation in cases where the return value would not be + // used. This logic was "is metadata present" or "is auth + // server used". Then we added to this, "is trace propagation + // in use" and simplified this function to always store the + // headers into a temporary map. + h.tmpHdrs = map[string][]string{} + + // Write calls the emitFunc, appending directly into `tmpHdrs`. + if _, err := h.decoder.Write(hdrsBytes); err != nil { + return ctx, nil, err + } + + // Get the global propagator, to extract context. When there + // are no fields, it's a no-op propagator implementation and + // we can skip the allocations inside this block. + carrier := otel.GetTextMapPropagator() + if len(carrier.Fields()) != 0 { + // When there are no fields, it's a no-op + // implementation and we can skip the allocations. + flat := map[string]string{} + for _, key := range carrier.Fields() { + have := h.tmpHdrs[key] + if len(have) > 0 { + flat[key] = have[0] + delete(h.tmpHdrs, key) + } + } + + ctx = carrier.Extract(ctx, propagation.MapCarrier(flat)) + } + + // Add streamHdrs that were not carried in the per-request headers. + for k, v := range h.streamHdrs { + // Note: This is done after the per-request metadata is defined + // in recognition of a potential for duplicated values stemming + // from the Arrow exporter's independent call to the Auth + // extension's GetRequestMetadata(). This paired with the + // headersetter's return of empty-string values means, we would + // end up with an empty-string element for any headersetter + // `from_context` rules b/c the stream uses background context. + // This allows static headers through. + // + // See https://github.com/open-telemetry/opentelemetry-collector/issues/6965 + lk := strings.ToLower(k) + if _, ok := h.tmpHdrs[lk]; !ok { + h.tmpHdrs[lk] = v + } + } + + // Release the temporary copy used in emitFunc(). + newHdrs := h.tmpHdrs + h.tmpHdrs = nil + + // Note: newHdrs is passed to the Auth plugin. Whether + // newHdrs is set in the context depends on h.includeMetadata. + return h.newContext(ctx, newHdrs), newHdrs, nil +} + +// tmpHdrsAppend appends to tmpHdrs, from decoder's emit function. +func (h *headerReceiver) tmpHdrsAppend(hf hpack.HeaderField) { + if h.tmpHdrs != nil { + // We force strings.ToLower to ensure consistency. gRPC itself + // does this and would do the same. + hn := strings.ToLower(hf.Name) + h.tmpHdrs[hn] = append(h.tmpHdrs[hn], hf.Value) + } +} + +func (h *headerReceiver) newContext(ctx context.Context, hdrs map[string][]string) context.Context { + // Retain the Addr/Auth of the stream connection, update the + // per-request metadata from the Arrow batch. + var md client.Metadata + if h.includeMetadata && hdrs != nil { + md = client.NewMetadata(hdrs) + } + return client.NewContext(ctx, client.Info{ + Addr: h.connInfo.Addr, + Auth: h.connInfo.Auth, + Metadata: md, + }) +} + +// logStreamError decides how to log an error. +func (r *Receiver) logStreamError(err error) { + var code codes.Code + var msg string + // gRPC tends to supply status-wrapped errors, so we always + // unpack them. A wrapped Canceled code indicates intentional + // shutdown, which can be due to normal causes (EOF, e.g., + // max-stream-lifetime reached) or unusual causes (Canceled, + // e.g., because the other stream direction reached an error). + if status, ok := status.FromError(err); ok { + code = status.Code() + msg = status.Message() + } else if errors.Is(err, io.EOF) || errors.Is(err, context.Canceled) { + code = codes.Canceled + msg = err.Error() + } else { + code = codes.Internal + msg = err.Error() + } + + if code == codes.Canceled { + r.telemetry.Logger.Debug("arrow stream shutdown", zap.String("message", msg)) + } else { + r.telemetry.Logger.Error("arrow stream error", zap.String("message", msg), zap.Int("code", int(code))) + } +} + +func gRPCName(desc grpc.ServiceDesc) string { + return netstats.GRPCStreamMethodName(desc, desc.Streams[0]) } -func (r *Receiver) ArrowMetrics(_ arrowpb.ArrowMetricsService_ArrowMetricsServer) error { +var ( + arrowTracesMethod = gRPCName(arrowpb.ArrowTracesService_ServiceDesc) + arrowMetricsMethod = gRPCName(arrowpb.ArrowMetricsService_ServiceDesc) + arrowLogsMethod = gRPCName(arrowpb.ArrowLogsService_ServiceDesc) +) + +func (r *Receiver) ArrowTraces(serverStream arrowpb.ArrowTracesService_ArrowTracesServer) error { + return r.anyStream(serverStream, arrowTracesMethod) +} + +func (r *Receiver) ArrowLogs(serverStream arrowpb.ArrowLogsService_ArrowLogsServer) error { + return r.anyStream(serverStream, arrowLogsMethod) +} + +func (r *Receiver) ArrowMetrics(serverStream arrowpb.ArrowMetricsService_ArrowMetricsServer) error { + return r.anyStream(serverStream, arrowMetricsMethod) +} + +type anyStreamServer interface { + Send(*arrowpb.BatchStatus) error + Recv() (*arrowpb.BatchArrowRecords, error) + grpc.ServerStream +} + +func (r *Receiver) anyStream(serverStream anyStreamServer, method string) (retErr error) { + streamCtx := serverStream.Context() + ac := r.newConsumer() + hrcv := newHeaderReceiver(serverStream.Context(), r.authServer, r.gsettings.IncludeMetadata) + + defer func() { + if err := recover(); err != nil { + // When this happens, the stacktrace is + // important and lost if we don't capture it + // here. + r.telemetry.Logger.Debug("panic detail in otel-arrow-adapter", + zap.Reflect("recovered", err), + zap.Stack("stacktrace"), + ) + retErr = fmt.Errorf("panic in otel-arrow-adapter: %v", err) + } + if err := ac.Close(); err != nil { + r.telemetry.Logger.Error("arrow stream close", zap.Error(err)) + } + }() + + for { + // Receive a batch corresponding with one ptrace.Traces, pmetric.Metrics, + // or plog.Logs item. + req, err := serverStream.Recv() + if err != nil { + // This includes the case where a client called CloseSend(), in + // which case we see an EOF error here. + r.logStreamError(err) + + if errors.Is(err, io.EOF) { + return status.Error(codes.Canceled, "client stream shutdown") + } else if errors.Is(err, context.Canceled) { + return status.Error(codes.Canceled, "server stream shutdown") + } + return err + } + + // Check for optional headers and set the incoming context. + thisCtx, authHdrs, err := hrcv.combineHeaders(streamCtx, req.GetHeaders()) + if err != nil { + // Failing to parse the incoming headers breaks the stream. + r.telemetry.Logger.Error("arrow metadata error", zap.Error(err)) + return err + } + + var authErr error + if r.authServer != nil { + var newCtx context.Context + if newCtx, err = r.authServer.Authenticate(thisCtx, authHdrs); err != nil { + authErr = err + } else { + thisCtx = newCtx + } + } + + if err := r.processAndConsume(thisCtx, method, ac, req, serverStream, authErr); err != nil { + return err + } + } +} + +func (r *Receiver) processAndConsume(ctx context.Context, method string, arrowConsumer arrowRecord.ConsumerAPI, req *arrowpb.BatchArrowRecords, serverStream anyStreamServer, authErr error) (retErr error) { + var err error + + ctx, span := r.tracer.Start(ctx, "otel_arrow_stream_recv") + defer span.End() + + r.recvInFlightRequests.Add(ctx, 1) + defer func() { + r.recvInFlightRequests.Add(ctx, -1) + // Set span status if an error is returned. + if retErr != nil { + span := trace.SpanFromContext(ctx) + span.SetStatus(otelcodes.Error, retErr.Error()) + } + }() + + // Process records: an error in this code path does + // not necessarily break the stream. + if authErr != nil { + err = authErr + } else { + err = r.processRecords(ctx, method, arrowConsumer, req) + } + + // Note: Statuses can be batched, but we do not take + // advantage of this feature. + status := &arrowpb.BatchStatus{ + BatchId: req.GetBatchId(), + } + if err == nil { + status.StatusCode = arrowpb.StatusCode_OK + } else { + status.StatusMessage = err.Error() + switch { + case errors.Is(err, arrowRecord.ErrConsumerMemoryLimit): + r.telemetry.Logger.Error("arrow resource exhausted", zap.Error(err)) + status.StatusCode = arrowpb.StatusCode_RESOURCE_EXHAUSTED + case consumererror.IsPermanent(err): + r.telemetry.Logger.Error("arrow data error", zap.Error(err)) + status.StatusCode = arrowpb.StatusCode_INVALID_ARGUMENT + default: + r.telemetry.Logger.Debug("arrow consumer error", zap.Error(err)) + status.StatusCode = arrowpb.StatusCode_UNAVAILABLE + } + } + + err = serverStream.Send(status) + if err != nil { + r.logStreamError(err) + return err + } return nil } + +// processRecords returns an error and a boolean indicating whether +// the error (true) was from processing the data (i.e., invalid +// argument) or (false) from the consuming pipeline. The boolean is +// not used when success (nil error) is returned. +func (r *Receiver) processRecords(ctx context.Context, method string, arrowConsumer arrowRecord.ConsumerAPI, records *arrowpb.BatchArrowRecords) error { + payloads := records.GetArrowPayloads() + if len(payloads) == 0 { + return nil + } + var uncompSize int64 + if r.telemetry.MetricsLevel > configtelemetry.LevelNormal { + defer func() { + // The netstats code knows that uncompressed size is + // unreliable for arrow transport, so we instrument it + // directly here. Only the primary direction of transport + // is instrumented this way. + var sized netstats.SizesStruct + sized.Method = method + if r.telemetry.MetricsLevel > configtelemetry.LevelNormal { + sized.Length = uncompSize + } + r.netReporter.CountReceive(ctx, sized) + r.netReporter.SetSpanSizeAttributes(ctx, sized) + }() + } + + switch payloads[0].Type { + case arrowpb.ArrowPayloadType_UNIVARIATE_METRICS: + if r.Metrics() == nil { + return status.Error(codes.Unimplemented, "metrics service not available") + } + var sizer pmetric.ProtoMarshaler + var numPts int + + ctx = r.obsrecv.StartMetricsOp(ctx) + + data, err := arrowConsumer.MetricsFrom(records) + if err != nil { + err = consumererror.NewPermanent(err) + } else { + for _, metrics := range data { + items := metrics.DataPointCount() + sz := int64(sizer.MetricsSize(metrics)) + + r.recvInFlightBytes.Add(ctx, sz) + r.recvInFlightItems.Add(ctx, int64(items)) + + numPts += items + uncompSize += sz + err = multierr.Append(err, + r.Metrics().ConsumeMetrics(ctx, metrics), + ) + } + // entire request has been processed, decrement counter. + r.recvInFlightBytes.Add(ctx, -uncompSize) + r.recvInFlightItems.Add(ctx, int64(-numPts)) + } + r.obsrecv.EndMetricsOp(ctx, streamFormat, numPts, err) + return err + + case arrowpb.ArrowPayloadType_LOGS: + if r.Logs() == nil { + return status.Error(codes.Unimplemented, "logs service not available") + } + var sizer plog.ProtoMarshaler + var numLogs int + ctx = r.obsrecv.StartLogsOp(ctx) + + data, err := arrowConsumer.LogsFrom(records) + if err != nil { + err = consumererror.NewPermanent(err) + } else { + for _, logs := range data { + items := logs.LogRecordCount() + sz := int64(sizer.LogsSize(logs)) + + r.recvInFlightBytes.Add(ctx, sz) + r.recvInFlightItems.Add(ctx, int64(items)) + numLogs += items + uncompSize += sz + err = multierr.Append(err, + r.Logs().ConsumeLogs(ctx, logs), + ) + } + // entire request has been processed, decrement counter. + r.recvInFlightBytes.Add(ctx, -uncompSize) + r.recvInFlightItems.Add(ctx, int64(-numLogs)) + } + r.obsrecv.EndLogsOp(ctx, streamFormat, numLogs, err) + return err + + case arrowpb.ArrowPayloadType_SPANS: + if r.Traces() == nil { + return status.Error(codes.Unimplemented, "traces service not available") + } + var sizer ptrace.ProtoMarshaler + var numSpans int + ctx = r.obsrecv.StartTracesOp(ctx) + + data, err := arrowConsumer.TracesFrom(records) + if err != nil { + err = consumererror.NewPermanent(err) + } else { + for _, traces := range data { + items := traces.SpanCount() + sz := int64(sizer.TracesSize(traces)) + + r.recvInFlightBytes.Add(ctx, sz) + r.recvInFlightItems.Add(ctx, int64(items)) + + numSpans += items + uncompSize += sz + err = multierr.Append(err, + r.Traces().ConsumeTraces(ctx, traces), + ) + } + + // entire request has been processed, decrement counter. + r.recvInFlightBytes.Add(ctx, -uncompSize) + r.recvInFlightItems.Add(ctx, int64(-numSpans)) + } + r.obsrecv.EndTracesOp(ctx, streamFormat, numSpans, err) + return err + + default: + return ErrUnrecognizedPayload + } +} diff --git a/receiver/otelarrowreceiver/internal/arrow/arrow_test.go b/receiver/otelarrowreceiver/internal/arrow/arrow_test.go new file mode 100644 index 0000000000000..67350750ec8eb --- /dev/null +++ b/receiver/otelarrowreceiver/internal/arrow/arrow_test.go @@ -0,0 +1,1230 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package arrow + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "strings" + "sync" + "testing" + + arrowpb "github.com/open-telemetry/otel-arrow/api/experimental/arrow/v1" + arrowCollectorMock "github.com/open-telemetry/otel-arrow/api/experimental/arrow/v1/mock" + "github.com/open-telemetry/otel-arrow/collector/netstats" + "github.com/open-telemetry/otel-arrow/collector/testdata" + arrowRecord "github.com/open-telemetry/otel-arrow/pkg/otel/arrow_record" + arrowRecordMock "github.com/open-telemetry/otel-arrow/pkg/otel/arrow_record/mock" + otelAssert "github.com/open-telemetry/otel-arrow/pkg/otel/assert" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/client" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/config/configgrpc" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/extension/auth" + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/ptrace" + "go.opentelemetry.io/collector/receiver" + "go.opentelemetry.io/collector/receiver/receiverhelper" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/propagation" + "go.opentelemetry.io/otel/trace" + "go.uber.org/mock/gomock" + "go.uber.org/zap/zaptest" + "golang.org/x/net/http2/hpack" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" + + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/otelarrowreceiver/internal/arrow/mock" +) + +type compareJSONTraces struct{ ptrace.Traces } +type compareJSONMetrics struct{ pmetric.Metrics } +type compareJSONLogs struct{ plog.Logs } + +func (c compareJSONTraces) MarshalJSON() ([]byte, error) { + var m ptrace.JSONMarshaler + return m.MarshalTraces(c.Traces) +} + +func (c compareJSONMetrics) MarshalJSON() ([]byte, error) { + var m pmetric.JSONMarshaler + return m.MarshalMetrics(c.Metrics) +} + +func (c compareJSONLogs) MarshalJSON() ([]byte, error) { + var m plog.JSONMarshaler + return m.MarshalLogs(c.Logs) +} + +type consumeResult struct { + Ctx context.Context + Data any +} + +type commonTestCase struct { + *testing.T + + ctrl *gomock.Controller + cancel context.CancelFunc + telset component.TelemetrySettings + consumers mockConsumers + stream *arrowCollectorMock.MockArrowTracesService_ArrowTracesServer + receive chan recvResult + consume chan consumeResult + streamErr chan error + + // testProducer is for convenience -- not thread safe, see copyBatch(). + testProducer *arrowRecord.Producer + + ctxCall *gomock.Call + recvCall *gomock.Call +} + +type testChannel interface { + onConsume() error +} + +type healthyTestChannel struct{} + +func (healthyTestChannel) onConsume() error { + return nil +} + +type unhealthyTestChannel struct{} + +func (unhealthyTestChannel) onConsume() error { + return fmt.Errorf("consumer unhealthy") +} + +type recvResult struct { + payload *arrowpb.BatchArrowRecords + err error +} + +type mockConsumers struct { + traces *mock.MockTraces + logs *mock.MockLogs + metrics *mock.MockMetrics + + tracesCall *gomock.Call + logsCall *gomock.Call + metricsCall *gomock.Call +} + +func newTestTelemetry(t *testing.T) component.TelemetrySettings { + telset := componenttest.NewNopTelemetrySettings() + telset.Logger = zaptest.NewLogger(t) + return telset +} + +func (ctc *commonTestCase) putBatch(payload *arrowpb.BatchArrowRecords, err error) { + ctc.receive <- recvResult{ + payload: payload, + err: err, + } +} + +func (ctc *commonTestCase) doAndReturnGetBatch(ctx context.Context) func() (*arrowpb.BatchArrowRecords, error) { + return func() (*arrowpb.BatchArrowRecords, error) { + select { + case <-ctx.Done(): + return nil, ctx.Err() + case r, ok := <-ctc.receive: + if !ok { + return nil, io.EOF + } + return r.payload, r.err + } + } +} + +func (ctc *commonTestCase) doAndReturnConsumeTraces(tc testChannel) func(ctx context.Context, traces ptrace.Traces) error { + return func(ctx context.Context, traces ptrace.Traces) error { + ctc.consume <- consumeResult{ + Ctx: ctx, + Data: traces, + } + return tc.onConsume() + } +} + +func (ctc *commonTestCase) doAndReturnConsumeMetrics(tc testChannel) func(ctx context.Context, metrics pmetric.Metrics) error { + return func(ctx context.Context, metrics pmetric.Metrics) error { + ctc.consume <- consumeResult{ + Ctx: ctx, + Data: metrics, + } + return tc.onConsume() + } +} + +func (ctc *commonTestCase) doAndReturnConsumeLogs(tc testChannel) func(ctx context.Context, logs plog.Logs) error { + return func(ctx context.Context, logs plog.Logs) error { + ctc.consume <- consumeResult{ + Ctx: ctx, + Data: logs, + } + return tc.onConsume() + } +} + +func newMockConsumers(ctrl *gomock.Controller) mockConsumers { + mc := mockConsumers{ + traces: mock.NewMockTraces(ctrl), + logs: mock.NewMockLogs(ctrl), + metrics: mock.NewMockMetrics(ctrl), + } + mc.traces.EXPECT().Capabilities().Times(0) + mc.tracesCall = mc.traces.EXPECT().ConsumeTraces( + gomock.Any(), + gomock.Any(), + ).Times(0) + mc.logs.EXPECT().Capabilities().Times(0) + mc.logsCall = mc.logs.EXPECT().ConsumeLogs( + gomock.Any(), + gomock.Any(), + ).Times(0) + mc.metrics.EXPECT().Capabilities().Times(0) + mc.metricsCall = mc.metrics.EXPECT().ConsumeMetrics( + gomock.Any(), + gomock.Any(), + ).Times(0) + return mc +} + +func (m mockConsumers) Traces() consumer.Traces { + return m.traces +} + +func (m mockConsumers) Logs() consumer.Logs { + return m.logs +} +func (m mockConsumers) Metrics() consumer.Metrics { + return m.metrics +} + +var _ Consumers = mockConsumers{} + +func newCommonTestCase(t *testing.T, tc testChannel) *commonTestCase { + ctrl := gomock.NewController(t) + stream := arrowCollectorMock.NewMockArrowTracesService_ArrowTracesServer(ctrl) + + ctx, cancel := context.WithCancel(context.Background()) + ctx = metadata.NewIncomingContext(ctx, metadata.MD{ + "stream_ctx": []string{"per-request"}, + }) + + ctc := &commonTestCase{ + T: t, + ctrl: ctrl, + cancel: cancel, + telset: newTestTelemetry(t), + consumers: newMockConsumers(ctrl), + stream: stream, + receive: make(chan recvResult), + consume: make(chan consumeResult), + streamErr: make(chan error), + testProducer: arrowRecord.NewProducer(), + ctxCall: stream.EXPECT().Context().Times(0), + recvCall: stream.EXPECT().Recv().Times(0), + } + + ctc.ctxCall.AnyTimes().Return(ctx) + ctc.recvCall.AnyTimes().DoAndReturn(ctc.doAndReturnGetBatch(ctx)) + ctc.consumers.tracesCall.AnyTimes().DoAndReturn(ctc.doAndReturnConsumeTraces(tc)) + ctc.consumers.logsCall.AnyTimes().DoAndReturn(ctc.doAndReturnConsumeLogs(tc)) + ctc.consumers.metricsCall.AnyTimes().DoAndReturn(ctc.doAndReturnConsumeMetrics(tc)) + return ctc +} + +func (ctc *commonTestCase) cancelAndWait() error { + ctc.cancel() + return ctc.wait() +} + +func (ctc *commonTestCase) wait() error { + return <-ctc.streamErr +} + +func statusOKFor(batchID int64) *arrowpb.BatchStatus { + return &arrowpb.BatchStatus{ + BatchId: batchID, + StatusCode: arrowpb.StatusCode_OK, + } +} + +func statusUnavailableFor(batchID int64, msg string) *arrowpb.BatchStatus { + return &arrowpb.BatchStatus{ + BatchId: batchID, + StatusCode: arrowpb.StatusCode_UNAVAILABLE, + StatusMessage: msg, + } +} + +func statusInvalidFor(batchID int64, msg string) *arrowpb.BatchStatus { + return &arrowpb.BatchStatus{ + BatchId: batchID, + StatusCode: arrowpb.StatusCode_INVALID_ARGUMENT, + StatusMessage: msg, + } +} + +func statusExhaustedFor(batchID int64, msg string) *arrowpb.BatchStatus { + return &arrowpb.BatchStatus{ + BatchId: batchID, + StatusCode: arrowpb.StatusCode_RESOURCE_EXHAUSTED, + StatusMessage: msg, + } +} + +func (ctc *commonTestCase) newRealConsumer() arrowRecord.ConsumerAPI { + mock := arrowRecordMock.NewMockConsumerAPI(ctc.ctrl) + cons := arrowRecord.NewConsumer() + + mock.EXPECT().Close().Times(1).Return(nil) + mock.EXPECT().TracesFrom(gomock.Any()).AnyTimes().DoAndReturn(cons.TracesFrom) + mock.EXPECT().MetricsFrom(gomock.Any()).AnyTimes().DoAndReturn(cons.MetricsFrom) + mock.EXPECT().LogsFrom(gomock.Any()).AnyTimes().DoAndReturn(cons.LogsFrom) + + return mock +} + +func (ctc *commonTestCase) newErrorConsumer() arrowRecord.ConsumerAPI { + mock := arrowRecordMock.NewMockConsumerAPI(ctc.ctrl) + + mock.EXPECT().Close().Times(1).Return(nil) + mock.EXPECT().TracesFrom(gomock.Any()).AnyTimes().Return(nil, fmt.Errorf("test invalid error")) + mock.EXPECT().MetricsFrom(gomock.Any()).AnyTimes().Return(nil, fmt.Errorf("test invalid error")) + mock.EXPECT().LogsFrom(gomock.Any()).AnyTimes().Return(nil, fmt.Errorf("test invalid error")) + + return mock +} + +func (ctc *commonTestCase) newOOMConsumer() arrowRecord.ConsumerAPI { + mock := arrowRecordMock.NewMockConsumerAPI(ctc.ctrl) + + mock.EXPECT().Close().Times(1).Return(nil) + mock.EXPECT().TracesFrom(gomock.Any()).AnyTimes().Return(nil, fmt.Errorf("test oom error %w", arrowRecord.ErrConsumerMemoryLimit)) + mock.EXPECT().MetricsFrom(gomock.Any()).AnyTimes().Return(nil, fmt.Errorf("test oom error %w", arrowRecord.ErrConsumerMemoryLimit)) + mock.EXPECT().LogsFrom(gomock.Any()).AnyTimes().Return(nil, fmt.Errorf("test oom error %w", arrowRecord.ErrConsumerMemoryLimit)) + + return mock +} + +func (ctc *commonTestCase) start(newConsumer func() arrowRecord.ConsumerAPI, opts ...func(*configgrpc.ServerConfig, *auth.Server)) { + var authServer auth.Server + var gsettings configgrpc.ServerConfig + for _, gf := range opts { + gf(&gsettings, &authServer) + } + rc := receiver.CreateSettings{ + TelemetrySettings: ctc.telset, + BuildInfo: component.NewDefaultBuildInfo(), + } + obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ + ReceiverID: component.NewID(component.MustNewType("arrowtest")), + Transport: "grpc", + ReceiverCreateSettings: rc, + }) + require.NoError(ctc.T, err) + + rcvr, err := New( + ctc.consumers, + rc, + obsrecv, + gsettings, + authServer, + newConsumer, + netstats.Noop{}, + ) + require.NoError(ctc.T, err) + go func() { + ctc.streamErr <- rcvr.ArrowTraces(ctc.stream) + }() +} + +func requireCanceledStatus(t *testing.T, err error) { + require.Error(t, err) + status, ok := status.FromError(err) + require.True(t, ok, "is status-wrapped %v", err) + require.Equal(t, codes.Canceled, status.Code()) +} + +func TestReceiverTraces(t *testing.T) { + tc := healthyTestChannel{} + ctc := newCommonTestCase(t, tc) + + td := testdata.GenerateTraces(2) + batch, err := ctc.testProducer.BatchArrowRecordsFromTraces(td) + require.NoError(t, err) + + ctc.stream.EXPECT().Send(statusOKFor(batch.BatchId)).Times(1).Return(nil) + + ctc.start(ctc.newRealConsumer) + ctc.putBatch(batch, nil) + + assert.EqualValues(t, td, (<-ctc.consume).Data) + + err = ctc.cancelAndWait() + requireCanceledStatus(t, err) +} + +func TestReceiverLogs(t *testing.T) { + tc := healthyTestChannel{} + ctc := newCommonTestCase(t, tc) + + ld := testdata.GenerateLogs(2) + batch, err := ctc.testProducer.BatchArrowRecordsFromLogs(ld) + require.NoError(t, err) + + ctc.stream.EXPECT().Send(statusOKFor(batch.BatchId)).Times(1).Return(nil) + + ctc.start(ctc.newRealConsumer) + ctc.putBatch(batch, nil) + + assert.EqualValues(t, []json.Marshaler{compareJSONLogs{ld}}, []json.Marshaler{compareJSONLogs{(<-ctc.consume).Data.(plog.Logs)}}) + + err = ctc.cancelAndWait() + requireCanceledStatus(t, err) +} + +func TestReceiverMetrics(t *testing.T) { + tc := healthyTestChannel{} + ctc := newCommonTestCase(t, tc) + stdTesting := otelAssert.NewStdUnitTest(t) + + md := testdata.GenerateMetrics(2) + batch, err := ctc.testProducer.BatchArrowRecordsFromMetrics(md) + require.NoError(t, err) + + ctc.stream.EXPECT().Send(statusOKFor(batch.BatchId)).Times(1).Return(nil) + + ctc.start(ctc.newRealConsumer) + ctc.putBatch(batch, nil) + + otelAssert.Equiv(stdTesting, []json.Marshaler{ + compareJSONMetrics{md}, + }, []json.Marshaler{ + compareJSONMetrics{(<-ctc.consume).Data.(pmetric.Metrics)}, + }) + + err = ctc.cancelAndWait() + requireCanceledStatus(t, err) +} + +func TestReceiverRecvError(t *testing.T) { + tc := healthyTestChannel{} + ctc := newCommonTestCase(t, tc) + + ctc.start(ctc.newRealConsumer) + + ctc.putBatch(nil, fmt.Errorf("test recv error")) + + err := ctc.wait() + require.Error(t, err) + require.Contains(t, err.Error(), "test recv error") +} + +func TestReceiverSendError(t *testing.T) { + tc := healthyTestChannel{} + ctc := newCommonTestCase(t, tc) + + ld := testdata.GenerateLogs(2) + batch, err := ctc.testProducer.BatchArrowRecordsFromLogs(ld) + require.NoError(t, err) + + ctc.stream.EXPECT().Send(statusOKFor(batch.BatchId)).Times(1).Return(fmt.Errorf("test send error")) + + ctc.start(ctc.newRealConsumer) + ctc.putBatch(batch, nil) + + assert.EqualValues(t, ld, (<-ctc.consume).Data) + + err = ctc.wait() + require.Error(t, err) + require.Contains(t, err.Error(), "test send error") +} + +func TestReceiverConsumeError(t *testing.T) { + stdTesting := otelAssert.NewStdUnitTest(t) + + data := []any{ + testdata.GenerateTraces(2), + testdata.GenerateMetrics(2), + testdata.GenerateLogs(2), + } + + for _, item := range data { + tc := unhealthyTestChannel{} + ctc := newCommonTestCase(t, tc) + + var batch *arrowpb.BatchArrowRecords + var err error + switch input := item.(type) { + case ptrace.Traces: + batch, err = ctc.testProducer.BatchArrowRecordsFromTraces(input) + case plog.Logs: + batch, err = ctc.testProducer.BatchArrowRecordsFromLogs(input) + case pmetric.Metrics: + batch, err = ctc.testProducer.BatchArrowRecordsFromMetrics(input) + default: + panic(input) + } + require.NoError(t, err) + + batch = copyBatch(batch) + + ctc.stream.EXPECT().Send(statusUnavailableFor(batch.BatchId, "consumer unhealthy")).Times(1).Return(nil) + + ctc.start(ctc.newRealConsumer) + + ctc.putBatch(batch, nil) + + switch input := item.(type) { + case ptrace.Traces: + otelAssert.Equiv(stdTesting, []json.Marshaler{ + compareJSONTraces{input}, + }, []json.Marshaler{ + compareJSONTraces{(<-ctc.consume).Data.(ptrace.Traces)}, + }) + case plog.Logs: + otelAssert.Equiv(stdTesting, []json.Marshaler{ + compareJSONLogs{input}, + }, []json.Marshaler{ + compareJSONLogs{(<-ctc.consume).Data.(plog.Logs)}, + }) + case pmetric.Metrics: + otelAssert.Equiv(stdTesting, []json.Marshaler{ + compareJSONMetrics{input}, + }, []json.Marshaler{ + compareJSONMetrics{(<-ctc.consume).Data.(pmetric.Metrics)}, + }) + } + + err = ctc.cancelAndWait() + requireCanceledStatus(t, err) + } +} + +func TestReceiverInvalidData(t *testing.T) { + data := []any{ + testdata.GenerateTraces(2), + testdata.GenerateMetrics(2), + testdata.GenerateLogs(2), + } + + for _, item := range data { + tc := unhealthyTestChannel{} + ctc := newCommonTestCase(t, tc) + + var batch *arrowpb.BatchArrowRecords + var err error + switch input := item.(type) { + case ptrace.Traces: + batch, err = ctc.testProducer.BatchArrowRecordsFromTraces(input) + case plog.Logs: + batch, err = ctc.testProducer.BatchArrowRecordsFromLogs(input) + case pmetric.Metrics: + batch, err = ctc.testProducer.BatchArrowRecordsFromMetrics(input) + default: + panic(input) + } + require.NoError(t, err) + + batch = copyBatch(batch) + + ctc.stream.EXPECT().Send(statusInvalidFor(batch.BatchId, "Permanent error: test invalid error")).Times(1).Return(nil) + + ctc.start(ctc.newErrorConsumer) + ctc.putBatch(batch, nil) + + err = ctc.cancelAndWait() + requireCanceledStatus(t, err) + } +} + +func TestReceiverMemoryLimit(t *testing.T) { + data := []any{ + testdata.GenerateTraces(2), + testdata.GenerateMetrics(2), + testdata.GenerateLogs(2), + } + + for _, item := range data { + tc := healthyTestChannel{} + ctc := newCommonTestCase(t, tc) + + var batch *arrowpb.BatchArrowRecords + var err error + switch input := item.(type) { + case ptrace.Traces: + batch, err = ctc.testProducer.BatchArrowRecordsFromTraces(input) + case plog.Logs: + batch, err = ctc.testProducer.BatchArrowRecordsFromLogs(input) + case pmetric.Metrics: + batch, err = ctc.testProducer.BatchArrowRecordsFromMetrics(input) + default: + panic(input) + } + require.NoError(t, err) + + batch = copyBatch(batch) + + ctc.stream.EXPECT().Send(statusExhaustedFor(batch.BatchId, "Permanent error: test oom error "+arrowRecord.ErrConsumerMemoryLimit.Error())).Times(1).Return(nil) + + ctc.start(ctc.newOOMConsumer) + ctc.putBatch(batch, nil) + + err = ctc.cancelAndWait() + requireCanceledStatus(t, err) + } +} + +func copyBatch(in *arrowpb.BatchArrowRecords) *arrowpb.BatchArrowRecords { + // Because Arrow-IPC uses zero copy, we have to copy inside the test + // instead of sharing pointers to BatchArrowRecords. + + hcpy := make([]byte, len(in.Headers)) + copy(hcpy, in.Headers) + + pays := make([]*arrowpb.ArrowPayload, len(in.ArrowPayloads)) + + for i, inp := range in.ArrowPayloads { + rcpy := make([]byte, len(inp.Record)) + copy(rcpy, inp.Record) + pays[i] = &arrowpb.ArrowPayload{ + SchemaId: inp.SchemaId, + Type: inp.Type, + Record: rcpy, + } + } + + return &arrowpb.BatchArrowRecords{ + BatchId: in.BatchId, + Headers: hcpy, + ArrowPayloads: pays, + } +} + +func TestReceiverEOF(t *testing.T) { + tc := healthyTestChannel{} + ctc := newCommonTestCase(t, tc) + + // send a sequence of data then simulate closing the connection. + const times = 10 + + var actualData []ptrace.Traces + var expectData []ptrace.Traces + + ctc.stream.EXPECT().Send(gomock.Any()).Times(times).Return(nil) + + ctc.start(ctc.newRealConsumer) + + go func() { + for i := 0; i < times; i++ { + td := testdata.GenerateTraces(2) + expectData = append(expectData, td) + + batch, err := ctc.testProducer.BatchArrowRecordsFromTraces(td) + require.NoError(t, err) + + batch = copyBatch(batch) + + ctc.putBatch(batch, nil) + } + close(ctc.receive) + }() + + var wg sync.WaitGroup + wg.Add(1) + + go func() { + err := ctc.wait() + // EOF is treated the same as Canceled. + requireCanceledStatus(t, err) + wg.Done() + }() + + for i := 0; i < times; i++ { + actualData = append(actualData, (<-ctc.consume).Data.(ptrace.Traces)) + } + + assert.EqualValues(t, expectData, actualData) + + wg.Wait() +} + +func TestReceiverHeadersNoAuth(t *testing.T) { + t.Run("include", func(t *testing.T) { testReceiverHeaders(t, true) }) + t.Run("noinclude", func(t *testing.T) { testReceiverHeaders(t, false) }) +} + +func testReceiverHeaders(t *testing.T, includeMeta bool) { + tc := healthyTestChannel{} + ctc := newCommonTestCase(t, tc) + + expectData := []map[string][]string{ + {"k1": []string{"v1"}}, + nil, + {"k2": []string{"v2"}, "k3": []string{"v3"}}, + nil, + {"k1": []string{"v5"}}, + {"k1": []string{"v1"}, "k3": []string{"v2", "v3", "v4"}}, + nil, + } + + ctc.stream.EXPECT().Send(gomock.Any()).Times(len(expectData)).Return(nil) + + ctc.start(ctc.newRealConsumer, func(gsettings *configgrpc.ServerConfig, _ *auth.Server) { + gsettings.IncludeMetadata = includeMeta + }) + + go func() { + var hpb bytes.Buffer + hpe := hpack.NewEncoder(&hpb) + + for _, md := range expectData { + td := testdata.GenerateTraces(2) + + batch, err := ctc.testProducer.BatchArrowRecordsFromTraces(td) + require.NoError(t, err) + + batch = copyBatch(batch) + + if len(md) != 0 { + hpb.Reset() + for key, vals := range md { + for _, val := range vals { + err := hpe.WriteField(hpack.HeaderField{ + Name: key, + Value: val, + }) + require.NoError(t, err) + } + } + + batch.Headers = make([]byte, hpb.Len()) + copy(batch.Headers, hpb.Bytes()) + } + ctc.putBatch(batch, nil) + } + close(ctc.receive) + }() + + var wg sync.WaitGroup + wg.Add(1) + + go func() { + err := ctc.wait() + // EOF is treated the same as Canceled. + requireCanceledStatus(t, err) + wg.Done() + }() + + for _, expect := range expectData { + info := client.FromContext((<-ctc.consume).Ctx) + + // The static stream context contains one extra variable. + if expect == nil { + expect = map[string][]string{} + } + expect["stream_ctx"] = []string{"per-request"} + + for key, vals := range expect { + if includeMeta { + require.Equal(t, vals, info.Metadata.Get(key)) + } else { + require.Equal(t, []string(nil), info.Metadata.Get(key)) + } + } + } + + wg.Wait() +} + +func TestReceiverCancel(t *testing.T) { + tc := healthyTestChannel{} + ctc := newCommonTestCase(t, tc) + + ctc.cancel() + ctc.start(ctc.newRealConsumer) + + err := ctc.wait() + requireCanceledStatus(t, err) +} + +func requireContainsAll(t *testing.T, md client.Metadata, exp map[string][]string) { + for key, vals := range exp { + require.Equal(t, vals, md.Get(key)) + } +} + +func requireContainsNone(t *testing.T, md client.Metadata, exp map[string][]string) { + for key := range exp { + require.Equal(t, []string(nil), md.Get(key)) + } +} + +func TestHeaderReceiverStreamContextOnly(t *testing.T) { + expect := map[string][]string{ + "K": {"k1", "k2"}, + "L": {"l1"}, + } + + ctx := metadata.NewIncomingContext(context.Background(), metadata.MD(expect)) + + h := newHeaderReceiver(ctx, nil, true) + + for i := 0; i < 3; i++ { + cc, _, err := h.combineHeaders(ctx, nil) + + require.NoError(t, err) + requireContainsAll(t, client.FromContext(cc).Metadata, expect) + } +} + +func TestHeaderReceiverNoIncludeMetadata(t *testing.T) { + noExpect := map[string][]string{ + "K": {"k1", "k2"}, + "L": {"l1"}, + } + + ctx := metadata.NewIncomingContext(context.Background(), metadata.MD(noExpect)) + + h := newHeaderReceiver(ctx, nil, false) + + for i := 0; i < 3; i++ { + cc, _, err := h.combineHeaders(ctx, nil) + + require.NoError(t, err) + requireContainsNone(t, client.FromContext(cc).Metadata, noExpect) + } +} + +func TestHeaderReceiverAuthServerNoIncludeMetadata(t *testing.T) { + expectForAuth := map[string][]string{ + "L": {"k1", "k2"}, + "K": {"l1"}, + } + + ctx := metadata.NewIncomingContext(context.Background(), metadata.MD(expectForAuth)) + + ctrl := gomock.NewController(t) + as := mock.NewMockServer(ctrl) + + // The auth server is not called, it just needs to be non-nil. + as.EXPECT().Authenticate(gomock.Any(), gomock.Any()).Times(0) + + h := newHeaderReceiver(ctx, as, false) + + for i := 0; i < 3; i++ { + cc, hdrs, err := h.combineHeaders(ctx, nil) + + // The incoming metadata keys are not in the context. + require.NoError(t, err) + requireContainsNone(t, client.FromContext(cc).Metadata, expectForAuth) + + // Headers are returned for the auth server, though + // names have been forced to lower case. + require.Equal(t, len(hdrs), len(expectForAuth)) + for k, v := range expectForAuth { + require.Equal(t, hdrs[strings.ToLower(k)], v) + } + } +} + +func TestHeaderReceiverRequestNoStreamMetadata(t *testing.T) { + expect := map[string][]string{ + "K": {"k1", "k2"}, + "L": {"l1"}, + } + + var hpb bytes.Buffer + + hpe := hpack.NewEncoder(&hpb) + + ctx := context.Background() + + h := newHeaderReceiver(ctx, nil, true) + + for i := 0; i < 3; i++ { + hpb.Reset() + + for key, vals := range expect { + for _, val := range vals { + err := hpe.WriteField(hpack.HeaderField{ + Name: strings.ToLower(key), + Value: val, + }) + require.NoError(t, err) + } + } + + cc, _, err := h.combineHeaders(ctx, hpb.Bytes()) + + require.NoError(t, err) + requireContainsAll(t, client.FromContext(cc).Metadata, expect) + } +} + +func TestHeaderReceiverAuthServerIsSetNoIncludeMetadata(t *testing.T) { + expect := map[string][]string{ + "K": {"k1", "k2"}, + "L": {"l1"}, + } + + var hpb bytes.Buffer + + hpe := hpack.NewEncoder(&hpb) + + ctx := context.Background() + + ctrl := gomock.NewController(t) + as := mock.NewMockServer(ctrl) + + // The auth server is not called, it just needs to be non-nil. + as.EXPECT().Authenticate(gomock.Any(), gomock.Any()).Times(0) + + h := newHeaderReceiver(ctx, as, true) + + for i := 0; i < 3; i++ { + hpb.Reset() + + for key, vals := range expect { + for _, val := range vals { + err := hpe.WriteField(hpack.HeaderField{ + Name: strings.ToLower(key), + Value: val, + }) + require.NoError(t, err) + } + } + + cc, hdrs, err := h.combineHeaders(ctx, hpb.Bytes()) + + require.NoError(t, err) + + // Note: The call to client.Metadata.Get() inside + // requireContainsAll() actually modifies the metadata + // map (this is weird, but true and possibly + // valid). In cases where the input has incorrect + // case. It's not safe to check that the map sizes + // are equal after calling Get() below, so we assert + // same size first. + require.Equal(t, len(hdrs), len(expect)) + + requireContainsAll(t, client.FromContext(cc).Metadata, expect) + + // Headers passed to the auth server are equivalent w/ + // with names forced to lower case. + + for k, v := range expect { + require.Equal(t, hdrs[strings.ToLower(k)], v, "for %v", k) + } + } +} + +func TestHeaderReceiverBothMetadata(t *testing.T) { + expectK := map[string][]string{ + "K": {"k1", "k2"}, + } + expectL := map[string][]string{ + "L": {"l1"}, + "M": {"m1", "m2"}, + } + expect := map[string][]string{ + "K": {"k1", "k2"}, + "L": {"l1"}, + "M": {"m1", "m2"}, + } + + var hpb bytes.Buffer + + hpe := hpack.NewEncoder(&hpb) + + ctx := metadata.NewIncomingContext(context.Background(), metadata.MD(expectK)) + + h := newHeaderReceiver(ctx, nil, true) + + for i := 0; i < 3; i++ { + hpb.Reset() + + for key, vals := range expectL { + for _, val := range vals { + err := hpe.WriteField(hpack.HeaderField{ + Name: strings.ToLower(key), + Value: val, + }) + require.NoError(t, err) + } + } + + cc, _, err := h.combineHeaders(ctx, hpb.Bytes()) + + require.NoError(t, err) + requireContainsAll(t, client.FromContext(cc).Metadata, expect) + } +} + +func TestHeaderReceiverDuplicateMetadata(t *testing.T) { + expectStream := map[string][]string{ + "K": {"k1", "k2"}, + + // "M" value does not appear b/c the same header + // appears in per-request metadata. + "M": {""}, + } + expectRequest := map[string][]string{ + "L": {"l1"}, + "M": {"m1", "m2"}, + } + expectCombined := map[string][]string{ + "K": {"k1", "k2"}, + "L": {"l1"}, + "M": {"m1", "m2"}, + } + + var hpb bytes.Buffer + + hpe := hpack.NewEncoder(&hpb) + + ctx := metadata.NewIncomingContext(context.Background(), metadata.MD(expectStream)) + + h := newHeaderReceiver(ctx, nil, true) + + for i := 0; i < 3; i++ { + hpb.Reset() + + for key, vals := range expectRequest { + for _, val := range vals { + err := hpe.WriteField(hpack.HeaderField{ + Name: strings.ToLower(key), + Value: val, + }) + require.NoError(t, err) + } + } + + cc, _, err := h.combineHeaders(ctx, hpb.Bytes()) + + require.NoError(t, err) + requireContainsAll(t, client.FromContext(cc).Metadata, expectCombined) + } +} + +func TestReceiverAuthHeadersStream(t *testing.T) { + t.Run("no-metadata", func(t *testing.T) { testReceiverAuthHeaders(t, false, false) }) + t.Run("per-stream", func(t *testing.T) { testReceiverAuthHeaders(t, true, false) }) + t.Run("per-data", func(t *testing.T) { testReceiverAuthHeaders(t, true, true) }) +} + +func testReceiverAuthHeaders(t *testing.T, includeMeta bool, dataAuth bool) { + tc := healthyTestChannel{} + ctc := newCommonTestCase(t, tc) + + expectData := []map[string][]string{ + {"auth": []string{"true"}}, + nil, + {"auth": []string{"false"}}, + nil, + } + + var recvBatches []*arrowpb.BatchStatus + + ctc.stream.EXPECT().Send(gomock.Any()).Times(len(expectData)).DoAndReturn(func(batch *arrowpb.BatchStatus) error { + recvBatches = append(recvBatches, batch) + return nil + }) + + var authCall *gomock.Call + ctc.start(ctc.newRealConsumer, func(gsettings *configgrpc.ServerConfig, authPtr *auth.Server) { + gsettings.IncludeMetadata = includeMeta + + as := mock.NewMockServer(ctc.ctrl) + *authPtr = as + + authCall = as.EXPECT().Authenticate(gomock.Any(), gomock.Any()).AnyTimes() + }) + + dataCount := 0 + + authCall.DoAndReturn(func(ctx context.Context, hdrs map[string][]string) (context.Context, error) { + dataCount++ + if !dataAuth { + return ctx, nil + } + + ok := false + for _, val := range hdrs["auth"] { + ok = ok || (val == "true") + } + + if ok { + newmd := map[string][]string{} + for k, v := range hdrs { + newmd[k] = v + } + newmd["has_auth"] = []string{":+1:", ":100:"} + return client.NewContext(ctx, client.Info{ + Metadata: client.NewMetadata(newmd), + }), nil + } + return ctx, fmt.Errorf("not authorized") + }) + + go func() { + var hpb bytes.Buffer + hpe := hpack.NewEncoder(&hpb) + + for _, md := range expectData { + td := testdata.GenerateTraces(2) + + batch, err := ctc.testProducer.BatchArrowRecordsFromTraces(td) + require.NoError(t, err) + + batch = copyBatch(batch) + + if len(md) != 0 { + + hpb.Reset() + for key, vals := range md { + for _, val := range vals { + err := hpe.WriteField(hpack.HeaderField{ + Name: strings.ToLower(key), + Value: val, + }) + require.NoError(t, err) + } + } + + batch.Headers = make([]byte, hpb.Len()) + copy(batch.Headers, hpb.Bytes()) + } + ctc.putBatch(batch, nil) + } + close(ctc.receive) + }() + + var expectErrs []bool + + for _, testInput := range expectData { + // The static stream context contains one extra variable. + cpy := map[string][]string{} + cpy["stream_ctx"] = []string{"per-request"} + + for k, v := range testInput { + cpy[k] = v + } + + expectErr := false + if dataAuth { + hasAuth := false + for _, val := range cpy["auth"] { + hasAuth = hasAuth || (val == "true") + } + if hasAuth { + cpy["has_auth"] = []string{":+1:", ":100:"} + } else { + expectErr = true + } + } + + expectErrs = append(expectErrs, expectErr) + + if expectErr { + continue + } + + info := client.FromContext((<-ctc.consume).Ctx) + + for key, vals := range cpy { + if includeMeta { + require.Equal(t, vals, info.Metadata.Get(key)) + } else { + require.Equal(t, []string(nil), info.Metadata.Get(key)) + } + } + } + + err := ctc.wait() + // EOF is treated the same as Canceled + requireCanceledStatus(t, err) + + // Add in expectErrs for when receiver sees EOF, + // the status code will not be arrowpb.StatusCode_OK. + expectErrs = append(expectErrs, true) + + require.Equal(t, len(expectData), dataCount) + require.Equal(t, len(recvBatches), dataCount) + + for idx, batch := range recvBatches { + if expectErrs[idx] { + require.NotEqual(t, arrowpb.StatusCode_OK, batch.StatusCode) + } else { + require.Equal(t, arrowpb.StatusCode_OK, batch.StatusCode) + } + } +} + +func TestHeaderReceiverIsTraced(t *testing.T) { + streamHeaders := map[string][]string{ + "K": {"k1", "k2"}, + } + requestHeaders := map[string][]string{ + "L": {"l1"}, + "traceparent": {"00-00112233445566778899aabbccddeeff-0011223344556677-01"}, + } + expectCombined := map[string][]string{ + "K": {"k1", "k2"}, + "L": {"l1"}, + } + + var hpb bytes.Buffer + + otel.SetTextMapPropagator(propagation.TraceContext{}) + + hpe := hpack.NewEncoder(&hpb) + + ctx := metadata.NewIncomingContext(context.Background(), metadata.MD(streamHeaders)) + + h := newHeaderReceiver(ctx, nil, true) + + for i := 0; i < 3; i++ { + hpb.Reset() + + for key, vals := range requestHeaders { + for _, val := range vals { + err := hpe.WriteField(hpack.HeaderField{ + Name: strings.ToLower(key), + Value: val, + }) + require.NoError(t, err) + } + } + + newCtx, _, err := h.combineHeaders(ctx, hpb.Bytes()) + + require.NoError(t, err) + requireContainsAll(t, client.FromContext(newCtx).Metadata, expectCombined) + + // Check for hard-coded trace and span IDs from `traceparent` header above. + spanCtx := trace.SpanContextFromContext(newCtx) + require.Equal( + t, + trace.TraceID{0x00, 0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77, 0x88, 0x99, 0xaa, 0xbb, 0xcc, 0xdd, 0xee, 0xff}, + spanCtx.TraceID()) + require.Equal( + t, + trace.SpanID{0x00, 0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77}, + spanCtx.SpanID()) + } +} diff --git a/receiver/otelarrowreceiver/internal/arrow/mock/auth.go b/receiver/otelarrowreceiver/internal/arrow/mock/auth.go new file mode 100644 index 0000000000000..9fa3c833451b3 --- /dev/null +++ b/receiver/otelarrowreceiver/internal/arrow/mock/auth.go @@ -0,0 +1,84 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: go.opentelemetry.io/collector/extension/auth (interfaces: Server) +// +// Generated by this command: +// +// mockgen -package mock go.opentelemetry.io/collector/extension/auth Server +// + +// Package mock is a generated GoMock package. +package mock + +import ( + context "context" + reflect "reflect" + + component "go.opentelemetry.io/collector/component" + gomock "go.uber.org/mock/gomock" +) + +// MockServer is a mock of Server interface. +type MockServer struct { + ctrl *gomock.Controller + recorder *MockServerMockRecorder +} + +// MockServerMockRecorder is the mock recorder for MockServer. +type MockServerMockRecorder struct { + mock *MockServer +} + +// NewMockServer creates a new mock instance. +func NewMockServer(ctrl *gomock.Controller) *MockServer { + mock := &MockServer{ctrl: ctrl} + mock.recorder = &MockServerMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockServer) EXPECT() *MockServerMockRecorder { + return m.recorder +} + +// Authenticate mocks base method. +func (m *MockServer) Authenticate(arg0 context.Context, arg1 map[string][]string) (context.Context, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Authenticate", arg0, arg1) + ret0, _ := ret[0].(context.Context) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Authenticate indicates an expected call of Authenticate. +func (mr *MockServerMockRecorder) Authenticate(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Authenticate", reflect.TypeOf((*MockServer)(nil).Authenticate), arg0, arg1) +} + +// Shutdown mocks base method. +func (m *MockServer) Shutdown(arg0 context.Context) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Shutdown", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// Shutdown indicates an expected call of Shutdown. +func (mr *MockServerMockRecorder) Shutdown(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Shutdown", reflect.TypeOf((*MockServer)(nil).Shutdown), arg0) +} + +// Start mocks base method. +func (m *MockServer) Start(arg0 context.Context, arg1 component.Host) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Start", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// Start indicates an expected call of Start. +func (mr *MockServerMockRecorder) Start(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Start", reflect.TypeOf((*MockServer)(nil).Start), arg0, arg1) +} diff --git a/receiver/otelarrowreceiver/internal/arrow/mock/consumer.go b/receiver/otelarrowreceiver/internal/arrow/mock/consumer.go new file mode 100644 index 0000000000000..7654f091c17d7 --- /dev/null +++ b/receiver/otelarrowreceiver/internal/arrow/mock/consumer.go @@ -0,0 +1,174 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: go.opentelemetry.io/collector/consumer (interfaces: Traces,Metrics,Logs) +// +// Generated by this command: +// +// mockgen -package mock go.opentelemetry.io/collector/consumer Traces,Metrics,Logs +// + +// Package mock is a generated GoMock package. +package mock + +import ( + context "context" + reflect "reflect" + + consumer "go.opentelemetry.io/collector/consumer" + plog "go.opentelemetry.io/collector/pdata/plog" + pmetric "go.opentelemetry.io/collector/pdata/pmetric" + ptrace "go.opentelemetry.io/collector/pdata/ptrace" + gomock "go.uber.org/mock/gomock" +) + +// MockTraces is a mock of Traces interface. +type MockTraces struct { + ctrl *gomock.Controller + recorder *MockTracesMockRecorder +} + +// MockTracesMockRecorder is the mock recorder for MockTraces. +type MockTracesMockRecorder struct { + mock *MockTraces +} + +// NewMockTraces creates a new mock instance. +func NewMockTraces(ctrl *gomock.Controller) *MockTraces { + mock := &MockTraces{ctrl: ctrl} + mock.recorder = &MockTracesMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockTraces) EXPECT() *MockTracesMockRecorder { + return m.recorder +} + +// Capabilities mocks base method. +func (m *MockTraces) Capabilities() consumer.Capabilities { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Capabilities") + ret0, _ := ret[0].(consumer.Capabilities) + return ret0 +} + +// Capabilities indicates an expected call of Capabilities. +func (mr *MockTracesMockRecorder) Capabilities() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Capabilities", reflect.TypeOf((*MockTraces)(nil).Capabilities)) +} + +// ConsumeTraces mocks base method. +func (m *MockTraces) ConsumeTraces(arg0 context.Context, arg1 ptrace.Traces) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ConsumeTraces", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// ConsumeTraces indicates an expected call of ConsumeTraces. +func (mr *MockTracesMockRecorder) ConsumeTraces(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ConsumeTraces", reflect.TypeOf((*MockTraces)(nil).ConsumeTraces), arg0, arg1) +} + +// MockMetrics is a mock of Metrics interface. +type MockMetrics struct { + ctrl *gomock.Controller + recorder *MockMetricsMockRecorder +} + +// MockMetricsMockRecorder is the mock recorder for MockMetrics. +type MockMetricsMockRecorder struct { + mock *MockMetrics +} + +// NewMockMetrics creates a new mock instance. +func NewMockMetrics(ctrl *gomock.Controller) *MockMetrics { + mock := &MockMetrics{ctrl: ctrl} + mock.recorder = &MockMetricsMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockMetrics) EXPECT() *MockMetricsMockRecorder { + return m.recorder +} + +// Capabilities mocks base method. +func (m *MockMetrics) Capabilities() consumer.Capabilities { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Capabilities") + ret0, _ := ret[0].(consumer.Capabilities) + return ret0 +} + +// Capabilities indicates an expected call of Capabilities. +func (mr *MockMetricsMockRecorder) Capabilities() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Capabilities", reflect.TypeOf((*MockMetrics)(nil).Capabilities)) +} + +// ConsumeMetrics mocks base method. +func (m *MockMetrics) ConsumeMetrics(arg0 context.Context, arg1 pmetric.Metrics) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ConsumeMetrics", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// ConsumeMetrics indicates an expected call of ConsumeMetrics. +func (mr *MockMetricsMockRecorder) ConsumeMetrics(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ConsumeMetrics", reflect.TypeOf((*MockMetrics)(nil).ConsumeMetrics), arg0, arg1) +} + +// MockLogs is a mock of Logs interface. +type MockLogs struct { + ctrl *gomock.Controller + recorder *MockLogsMockRecorder +} + +// MockLogsMockRecorder is the mock recorder for MockLogs. +type MockLogsMockRecorder struct { + mock *MockLogs +} + +// NewMockLogs creates a new mock instance. +func NewMockLogs(ctrl *gomock.Controller) *MockLogs { + mock := &MockLogs{ctrl: ctrl} + mock.recorder = &MockLogsMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockLogs) EXPECT() *MockLogsMockRecorder { + return m.recorder +} + +// Capabilities mocks base method. +func (m *MockLogs) Capabilities() consumer.Capabilities { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Capabilities") + ret0, _ := ret[0].(consumer.Capabilities) + return ret0 +} + +// Capabilities indicates an expected call of Capabilities. +func (mr *MockLogsMockRecorder) Capabilities() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Capabilities", reflect.TypeOf((*MockLogs)(nil).Capabilities)) +} + +// ConsumeLogs mocks base method. +func (m *MockLogs) ConsumeLogs(arg0 context.Context, arg1 plog.Logs) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ConsumeLogs", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// ConsumeLogs indicates an expected call of ConsumeLogs. +func (mr *MockLogsMockRecorder) ConsumeLogs(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ConsumeLogs", reflect.TypeOf((*MockLogs)(nil).ConsumeLogs), arg0, arg1) +} diff --git a/receiver/otelarrowreceiver/internal/logs/otlp.go b/receiver/otelarrowreceiver/internal/logs/otlp.go index 47a76ac7c6545..72f62dc9d28d7 100644 --- a/receiver/otelarrowreceiver/internal/logs/otlp.go +++ b/receiver/otelarrowreceiver/internal/logs/otlp.go @@ -11,6 +11,8 @@ import ( "go.opentelemetry.io/collector/receiver/receiverhelper" ) +const dataFormatProtobuf = "protobuf" + // Receiver is the type used to handle logs from OpenTelemetry exporters. type Receiver struct { plogotlp.UnimplementedGRPCServer @@ -27,9 +29,18 @@ func New(nextConsumer consumer.Logs, obsrecv *receiverhelper.ObsReport) *Receive } // Export implements the service Export logs func. -func (r *Receiver) Export(_ context.Context, _ plogotlp.ExportRequest) (plogotlp.ExportResponse, error) { - // TODO: Implementation. - return plogotlp.NewExportResponse(), nil +func (r *Receiver) Export(ctx context.Context, req plogotlp.ExportRequest) (plogotlp.ExportResponse, error) { + ld := req.Logs() + numSpans := ld.LogRecordCount() + if numSpans == 0 { + return plogotlp.NewExportResponse(), nil + } + + ctx = r.obsrecv.StartLogsOp(ctx) + err := r.nextConsumer.ConsumeLogs(ctx, ld) + r.obsrecv.EndLogsOp(ctx, dataFormatProtobuf, numSpans, err) + + return plogotlp.NewExportResponse(), err } func (r *Receiver) Consumer() consumer.Logs { diff --git a/receiver/otelarrowreceiver/internal/logs/otlp_test.go b/receiver/otelarrowreceiver/internal/logs/otlp_test.go new file mode 100644 index 0000000000000..8519d715848b8 --- /dev/null +++ b/receiver/otelarrowreceiver/internal/logs/otlp_test.go @@ -0,0 +1,95 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package logs + +import ( + "context" + "errors" + "net" + "testing" + + "github.com/open-telemetry/otel-arrow/collector/testdata" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/pdata/plog/plogotlp" + "go.opentelemetry.io/collector/receiver/receiverhelper" + "go.opentelemetry.io/collector/receiver/receivertest" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +func TestExport(t *testing.T) { + ld := testdata.GenerateLogs(1) + req := plogotlp.NewExportRequestFromLogs(ld) + + logSink := new(consumertest.LogsSink) + logClient := makeLogsServiceClient(t, logSink) + resp, err := logClient.Export(context.Background(), req) + require.NoError(t, err, "Failed to export trace: %v", err) + require.NotNil(t, resp, "The response is missing") + + lds := logSink.AllLogs() + require.Len(t, lds, 1) + assert.EqualValues(t, ld, lds[0]) +} + +func TestExport_EmptyRequest(t *testing.T) { + logSink := new(consumertest.LogsSink) + + logClient := makeLogsServiceClient(t, logSink) + resp, err := logClient.Export(context.Background(), plogotlp.NewExportRequest()) + assert.NoError(t, err, "Failed to export trace: %v", err) + assert.NotNil(t, resp, "The response is missing") +} + +func TestExport_ErrorConsumer(t *testing.T) { + ld := testdata.GenerateLogs(1) + req := plogotlp.NewExportRequestFromLogs(ld) + + logClient := makeLogsServiceClient(t, consumertest.NewErr(errors.New("my error"))) + resp, err := logClient.Export(context.Background(), req) + assert.EqualError(t, err, "rpc error: code = Unknown desc = my error") + assert.Equal(t, plogotlp.ExportResponse{}, resp) +} + +func makeLogsServiceClient(t *testing.T, lc consumer.Logs) plogotlp.GRPCClient { + addr := otlpReceiverOnGRPCServer(t, lc) + cc, err := grpc.Dial(addr.String(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock()) + require.NoError(t, err, "Failed to create the TraceServiceClient: %v", err) + t.Cleanup(func() { + require.NoError(t, cc.Close()) + }) + + return plogotlp.NewGRPCClient(cc) +} + +func otlpReceiverOnGRPCServer(t *testing.T, lc consumer.Logs) net.Addr { + ln, err := net.Listen("tcp", "localhost:") + require.NoError(t, err, "Failed to find an available address to run the gRPC server: %v", err) + + t.Cleanup(func() { + require.NoError(t, ln.Close()) + }) + + set := receivertest.NewNopCreateSettings() + set.ID = component.NewIDWithName(component.MustNewType("otlp"), "log") + obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ + ReceiverID: set.ID, + Transport: "grpc", + ReceiverCreateSettings: set, + }) + require.NoError(t, err) + r := New(lc, obsrecv) + // Now run it as a gRPC server + srv := grpc.NewServer() + plogotlp.RegisterGRPCServer(srv, r) + go func() { + _ = srv.Serve(ln) + }() + + return ln.Addr() +} diff --git a/receiver/otelarrowreceiver/internal/metrics/otlp.go b/receiver/otelarrowreceiver/internal/metrics/otlp.go index fe72d316708cd..77e12a86ce14d 100644 --- a/receiver/otelarrowreceiver/internal/metrics/otlp.go +++ b/receiver/otelarrowreceiver/internal/metrics/otlp.go @@ -11,6 +11,8 @@ import ( "go.opentelemetry.io/collector/receiver/receiverhelper" ) +const dataFormatProtobuf = "protobuf" + // Receiver is the type used to handle metrics from OpenTelemetry exporters. type Receiver struct { pmetricotlp.UnimplementedGRPCServer @@ -27,9 +29,18 @@ func New(nextConsumer consumer.Metrics, obsrecv *receiverhelper.ObsReport) *Rece } // Export implements the service Export metrics func. -func (r *Receiver) Export(_ context.Context, _ pmetricotlp.ExportRequest) (pmetricotlp.ExportResponse, error) { - // TODO: Implementation. - return pmetricotlp.NewExportResponse(), nil +func (r *Receiver) Export(ctx context.Context, req pmetricotlp.ExportRequest) (pmetricotlp.ExportResponse, error) { + md := req.Metrics() + dataPointCount := md.DataPointCount() + if dataPointCount == 0 { + return pmetricotlp.NewExportResponse(), nil + } + + ctx = r.obsrecv.StartMetricsOp(ctx) + err := r.nextConsumer.ConsumeMetrics(ctx, md) + r.obsrecv.EndMetricsOp(ctx, dataFormatProtobuf, dataPointCount, err) + + return pmetricotlp.NewExportResponse(), err } func (r *Receiver) Consumer() consumer.Metrics { diff --git a/receiver/otelarrowreceiver/internal/metrics/otlp_test.go b/receiver/otelarrowreceiver/internal/metrics/otlp_test.go new file mode 100644 index 0000000000000..61d4c0412ff10 --- /dev/null +++ b/receiver/otelarrowreceiver/internal/metrics/otlp_test.go @@ -0,0 +1,96 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package metrics + +import ( + "context" + "errors" + "net" + "testing" + + "github.com/open-telemetry/otel-arrow/collector/testdata" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp" + "go.opentelemetry.io/collector/receiver/receiverhelper" + "go.opentelemetry.io/collector/receiver/receivertest" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +func TestExport(t *testing.T) { + md := testdata.GenerateMetrics(1) + req := pmetricotlp.NewExportRequestFromMetrics(md) + + metricSink := new(consumertest.MetricsSink) + metricsClient := makeMetricsServiceClient(t, metricSink) + resp, err := metricsClient.Export(context.Background(), req) + + require.NoError(t, err, "Failed to export metrics: %v", err) + require.NotNil(t, resp, "The response is missing") + + mds := metricSink.AllMetrics() + require.Len(t, mds, 1) + assert.EqualValues(t, md, mds[0]) +} + +func TestExport_EmptyRequest(t *testing.T) { + metricSink := new(consumertest.MetricsSink) + metricsClient := makeMetricsServiceClient(t, metricSink) + resp, err := metricsClient.Export(context.Background(), pmetricotlp.NewExportRequest()) + require.NoError(t, err) + require.NotNil(t, resp) +} + +func TestExport_ErrorConsumer(t *testing.T) { + md := testdata.GenerateMetrics(1) + req := pmetricotlp.NewExportRequestFromMetrics(md) + + metricsClient := makeMetricsServiceClient(t, consumertest.NewErr(errors.New("my error"))) + resp, err := metricsClient.Export(context.Background(), req) + assert.EqualError(t, err, "rpc error: code = Unknown desc = my error") + assert.Equal(t, pmetricotlp.ExportResponse{}, resp) +} + +func makeMetricsServiceClient(t *testing.T, mc consumer.Metrics) pmetricotlp.GRPCClient { + addr := otlpReceiverOnGRPCServer(t, mc) + + cc, err := grpc.Dial(addr.String(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock()) + require.NoError(t, err, "Failed to create the MetricsServiceClient: %v", err) + t.Cleanup(func() { + require.NoError(t, cc.Close()) + }) + + return pmetricotlp.NewGRPCClient(cc) +} + +func otlpReceiverOnGRPCServer(t *testing.T, mc consumer.Metrics) net.Addr { + ln, err := net.Listen("tcp", "localhost:") + require.NoError(t, err, "Failed to find an available address to run the gRPC server: %v", err) + + t.Cleanup(func() { + require.NoError(t, ln.Close()) + }) + + set := receivertest.NewNopCreateSettings() + set.ID = component.NewIDWithName(component.MustNewType("otlp"), "metrics") + obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ + ReceiverID: set.ID, + Transport: "grpc", + ReceiverCreateSettings: set, + }) + require.NoError(t, err) + r := New(mc, obsrecv) + // Now run it as a gRPC server + srv := grpc.NewServer() + pmetricotlp.RegisterGRPCServer(srv, r) + go func() { + _ = srv.Serve(ln) + }() + + return ln.Addr() +} diff --git a/receiver/otelarrowreceiver/internal/trace/otlp.go b/receiver/otelarrowreceiver/internal/trace/otlp.go index 089ea9a3f5d07..82d836ed8b7e3 100644 --- a/receiver/otelarrowreceiver/internal/trace/otlp.go +++ b/receiver/otelarrowreceiver/internal/trace/otlp.go @@ -11,6 +11,8 @@ import ( "go.opentelemetry.io/collector/receiver/receiverhelper" ) +const dataFormatProtobuf = "protobuf" + // Receiver is the type used to handle spans from OpenTelemetry exporters. type Receiver struct { ptraceotlp.UnimplementedGRPCServer @@ -27,9 +29,19 @@ func New(nextConsumer consumer.Traces, obsrecv *receiverhelper.ObsReport) *Recei } // Export implements the service Export traces func. -func (r *Receiver) Export(_ context.Context, _ ptraceotlp.ExportRequest) (ptraceotlp.ExportResponse, error) { - // TODO: Implementation. - return ptraceotlp.NewExportResponse(), nil +func (r *Receiver) Export(ctx context.Context, req ptraceotlp.ExportRequest) (ptraceotlp.ExportResponse, error) { + td := req.Traces() + // We need to ensure that it propagates the receiver name as a tag + numSpans := td.SpanCount() + if numSpans == 0 { + return ptraceotlp.NewExportResponse(), nil + } + + ctx = r.obsrecv.StartTracesOp(ctx) + err := r.nextConsumer.ConsumeTraces(ctx, td) + r.obsrecv.EndTracesOp(ctx, dataFormatProtobuf, numSpans, err) + + return ptraceotlp.NewExportResponse(), err } func (r *Receiver) Consumer() consumer.Traces { diff --git a/receiver/otelarrowreceiver/internal/trace/otlp_test.go b/receiver/otelarrowreceiver/internal/trace/otlp_test.go new file mode 100644 index 0000000000000..826aa28bbc30e --- /dev/null +++ b/receiver/otelarrowreceiver/internal/trace/otlp_test.go @@ -0,0 +1,93 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package trace + +import ( + "context" + "errors" + "net" + "testing" + + "github.com/open-telemetry/otel-arrow/collector/testdata" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/pdata/ptrace/ptraceotlp" + "go.opentelemetry.io/collector/receiver/receiverhelper" + "go.opentelemetry.io/collector/receiver/receivertest" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +func TestExport(t *testing.T) { + td := testdata.GenerateTraces(1) + req := ptraceotlp.NewExportRequestFromTraces(td) + + traceSink := new(consumertest.TracesSink) + traceClient := makeTraceServiceClient(t, traceSink) + resp, err := traceClient.Export(context.Background(), req) + require.NoError(t, err, "Failed to export trace: %v", err) + require.NotNil(t, resp, "The response is missing") + + require.Len(t, traceSink.AllTraces(), 1) + assert.EqualValues(t, td, traceSink.AllTraces()[0]) +} + +func TestExport_EmptyRequest(t *testing.T) { + traceSink := new(consumertest.TracesSink) + traceClient := makeTraceServiceClient(t, traceSink) + resp, err := traceClient.Export(context.Background(), ptraceotlp.NewExportRequest()) + assert.NoError(t, err, "Failed to export trace: %v", err) + assert.NotNil(t, resp, "The response is missing") +} + +func TestExport_ErrorConsumer(t *testing.T) { + td := testdata.GenerateTraces(1) + req := ptraceotlp.NewExportRequestFromTraces(td) + + traceClient := makeTraceServiceClient(t, consumertest.NewErr(errors.New("my error"))) + resp, err := traceClient.Export(context.Background(), req) + assert.EqualError(t, err, "rpc error: code = Unknown desc = my error") + assert.Equal(t, ptraceotlp.ExportResponse{}, resp) +} + +func makeTraceServiceClient(t *testing.T, tc consumer.Traces) ptraceotlp.GRPCClient { + addr := otlpReceiverOnGRPCServer(t, tc) + cc, err := grpc.Dial(addr.String(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock()) + require.NoError(t, err, "Failed to create the TraceServiceClient: %v", err) + t.Cleanup(func() { + require.NoError(t, cc.Close()) + }) + + return ptraceotlp.NewGRPCClient(cc) +} + +func otlpReceiverOnGRPCServer(t *testing.T, tc consumer.Traces) net.Addr { + ln, err := net.Listen("tcp", "localhost:") + require.NoError(t, err, "Failed to find an available address to run the gRPC server: %v", err) + + t.Cleanup(func() { + require.NoError(t, ln.Close()) + }) + + set := receivertest.NewNopCreateSettings() + set.ID = component.NewIDWithName(component.MustNewType("otlp"), "trace") + obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ + ReceiverID: set.ID, + Transport: "grpc", + ReceiverCreateSettings: set, + }) + require.NoError(t, err) + r := New(tc, obsrecv) + // Now run it as a gRPC server + srv := grpc.NewServer() + ptraceotlp.RegisterGRPCServer(srv, r) + go func() { + _ = srv.Serve(ln) + }() + + return ln.Addr() +} diff --git a/receiver/otelarrowreceiver/otelarrow.go b/receiver/otelarrowreceiver/otelarrow.go index 044fc844cba6f..6d909a6059e0b 100644 --- a/receiver/otelarrowreceiver/otelarrow.go +++ b/receiver/otelarrowreceiver/otelarrow.go @@ -114,7 +114,7 @@ func (r *otelArrowReceiver) startProtocolServers(host component.Host) error { } } - r.arrowReceiver = arrow.New(arrow.Consumers(r), r.settings, r.obsrepGRPC, r.cfg.GRPC, authServer, func() arrowRecord.ConsumerAPI { + r.arrowReceiver, err = arrow.New(arrow.Consumers(r), r.settings, r.obsrepGRPC, r.cfg.GRPC, authServer, func() arrowRecord.ConsumerAPI { var opts []arrowRecord.Option if r.cfg.Arrow.MemoryLimitMiB != 0 { // in which case the default is selected in the arrowRecord package. @@ -126,6 +126,10 @@ func (r *otelArrowReceiver) startProtocolServers(host component.Host) error { return arrowRecord.NewConsumer(opts...) }, r.netReporter) + if err != nil { + return err + } + if r.tracesReceiver != nil { ptraceotlp.RegisterGRPCServer(r.serverGRPC, r.tracesReceiver) diff --git a/receiver/otelarrowreceiver/otelarrow_test.go b/receiver/otelarrowreceiver/otelarrow_test.go new file mode 100644 index 0000000000000..9a480e6b44ba8 --- /dev/null +++ b/receiver/otelarrowreceiver/otelarrow_test.go @@ -0,0 +1,576 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package otelarrowreceiver + +import ( + "bytes" + "context" + "errors" + "fmt" + "net" + "sync" + "testing" + "time" + + arrowpb "github.com/open-telemetry/otel-arrow/api/experimental/arrow/v1" + "github.com/open-telemetry/otel-arrow/collector/testdata" + "github.com/open-telemetry/otel-arrow/collector/testutil" + arrowRecord "github.com/open-telemetry/otel-arrow/pkg/otel/arrow_record" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/client" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/config/configauth" + "go.opentelemetry.io/collector/config/configgrpc" + "go.opentelemetry.io/collector/config/confignet" + "go.opentelemetry.io/collector/config/configtelemetry" + "go.opentelemetry.io/collector/config/configtls" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/extension/auth" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/ptrace" + "go.opentelemetry.io/collector/pdata/ptrace/ptraceotlp" + "go.opentelemetry.io/collector/receiver" + "go.opentelemetry.io/collector/receiver/receivertest" + "go.uber.org/mock/gomock" + "golang.org/x/net/http2/hpack" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" + + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/otelarrowreceiver/internal/arrow/mock" + componentMetadata "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/otelarrowreceiver/internal/metadata" +) + +const otlpReceiverName = "receiver_test" + +var testReceiverID = component.NewIDWithName(componentMetadata.Type, otlpReceiverName) + +func TestGRPCNewPortAlreadyUsed(t *testing.T) { + addr := testutil.GetAvailableLocalAddress(t) + ln, err := net.Listen("tcp", addr) + require.NoError(t, err, "failed to listen on %q: %v", addr, err) + t.Cleanup(func() { + assert.NoError(t, ln.Close()) + }) + tt := componenttest.NewNopTelemetrySettings() + r := newGRPCReceiver(t, addr, tt, consumertest.NewNop(), consumertest.NewNop()) + require.NotNil(t, r) + + require.Error(t, r.Start(context.Background(), componenttest.NewNopHost())) +} + +// TestOTLPReceiverGRPCTracesIngestTest checks that the gRPC trace receiver +// is returning the proper response (return and metrics) when the next consumer +// in the pipeline reports error. The test changes the responses returned by the +// next trace consumer, checks if data was passed down the pipeline and if +// proper metrics were recorded. It also uses all endpoints supported by the +// trace receiver. +func TestOTLPReceiverGRPCTracesIngestTest(t *testing.T) { + type ingestionStateTest struct { + okToIngest bool + expectedCode codes.Code + } + + expectedReceivedBatches := 2 + ingestionStates := []ingestionStateTest{ + { + okToIngest: true, + expectedCode: codes.OK, + }, + { + okToIngest: false, + expectedCode: codes.Unknown, + }, + { + okToIngest: true, + expectedCode: codes.OK, + }, + } + + addr := testutil.GetAvailableLocalAddress(t) + td := testdata.GenerateTraces(1) + + tt, err := componenttest.SetupTelemetry(testReceiverID) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) + + sink := &errOrSinkConsumer{TracesSink: new(consumertest.TracesSink)} + + ocr := newGRPCReceiver(t, addr, tt.TelemetrySettings(), sink, nil) + require.NotNil(t, ocr) + require.NoError(t, ocr.Start(context.Background(), componenttest.NewNopHost())) + t.Cleanup(func() { require.NoError(t, ocr.Shutdown(context.Background())) }) + + cc, err := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock()) + require.NoError(t, err) + defer func() { + assert.NoError(t, cc.Close()) + }() + + for _, ingestionState := range ingestionStates { + if ingestionState.okToIngest { + sink.SetConsumeError(nil) + } else { + sink.SetConsumeError(errors.New("consumer error")) + } + + _, err = ptraceotlp.NewGRPCClient(cc).Export(context.Background(), ptraceotlp.NewExportRequestFromTraces(td)) + errStatus, ok := status.FromError(err) + require.True(t, ok) + assert.Equal(t, ingestionState.expectedCode, errStatus.Code()) + } + + require.Equal(t, expectedReceivedBatches, len(sink.AllTraces())) + + expectedIngestionBlockedRPCs := 1 + require.NoError(t, tt.CheckReceiverTraces("grpc", int64(expectedReceivedBatches), int64(expectedIngestionBlockedRPCs))) +} + +func TestGRPCInvalidTLSCredentials(t *testing.T) { + cfg := &Config{ + Protocols: Protocols{ + GRPC: configgrpc.ServerConfig{ + NetAddr: confignet.AddrConfig{ + Endpoint: testutil.GetAvailableLocalAddress(t), + Transport: confignet.TransportTypeTCP, + }, + TLSSetting: &configtls.ServerConfig{ + Config: configtls.Config{ + CertFile: "willfail", + }, + }, + }, + }, + } + + r, err := NewFactory().CreateTracesReceiver( + context.Background(), + receivertest.NewNopCreateSettings(), + cfg, + consumertest.NewNop()) + + require.NoError(t, err) + assert.NotNil(t, r) + + assert.EqualError(t, + r.Start(context.Background(), componenttest.NewNopHost()), + `failed to load TLS config: failed to load TLS cert and key: for auth via TLS, provide both certificate and key, or neither`) +} + +func TestGRPCMaxRecvSize(t *testing.T) { + addr := testutil.GetAvailableLocalAddress(t) + sink := new(consumertest.TracesSink) + + factory := NewFactory() + cfg := factory.CreateDefaultConfig().(*Config) + cfg.GRPC.NetAddr.Endpoint = addr + tt := componenttest.NewNopTelemetrySettings() + ocr := newReceiver(t, factory, tt, cfg, testReceiverID, sink, nil) + + require.NotNil(t, ocr) + require.NoError(t, ocr.Start(context.Background(), componenttest.NewNopHost())) + + cc, err := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock()) + require.NoError(t, err) + + td := testdata.GenerateTraces(50000) + require.Error(t, exportTraces(cc, td)) + assert.NoError(t, cc.Close()) + require.NoError(t, ocr.Shutdown(context.Background())) + + cfg.GRPC.MaxRecvMsgSizeMiB = 100 + + ocr = newReceiver(t, factory, tt, cfg, testReceiverID, sink, nil) + + require.NotNil(t, ocr) + require.NoError(t, ocr.Start(context.Background(), componenttest.NewNopHost())) + t.Cleanup(func() { require.NoError(t, ocr.Shutdown(context.Background())) }) + + cc, err = grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock()) + require.NoError(t, err) + defer func() { + assert.NoError(t, cc.Close()) + }() + + td = testdata.GenerateTraces(50000) + require.NoError(t, exportTraces(cc, td)) + require.Len(t, sink.AllTraces(), 1) + assert.Equal(t, td, sink.AllTraces()[0]) +} + +func newGRPCReceiver(t *testing.T, endpoint string, settings component.TelemetrySettings, tc consumer.Traces, mc consumer.Metrics) component.Component { + factory := NewFactory() + cfg := factory.CreateDefaultConfig().(*Config) + cfg.GRPC.NetAddr.Endpoint = endpoint + return newReceiver(t, factory, settings, cfg, testReceiverID, tc, mc) +} + +func newReceiver(t *testing.T, factory receiver.Factory, settings component.TelemetrySettings, cfg *Config, id component.ID, tc consumer.Traces, mc consumer.Metrics) component.Component { + set := receivertest.NewNopCreateSettings() + set.TelemetrySettings = settings + set.TelemetrySettings.MetricsLevel = configtelemetry.LevelNormal + set.ID = id + var r component.Component + var err error + if tc != nil { + r, err = factory.CreateTracesReceiver(context.Background(), set, cfg, tc) + require.NoError(t, err) + } + if mc != nil { + r, err = factory.CreateMetricsReceiver(context.Background(), set, cfg, mc) + require.NoError(t, err) + } + return r +} + +type senderFunc func(td ptrace.Traces) + +func TestShutdown(t *testing.T) { + endpointGrpc := testutil.GetAvailableLocalAddress(t) + + nextSink := new(consumertest.TracesSink) + + // Create OTLP receiver + factory := NewFactory() + cfg := factory.CreateDefaultConfig().(*Config) + cfg.GRPC.NetAddr.Endpoint = endpointGrpc + set := receivertest.NewNopCreateSettings() + set.ID = testReceiverID + r, err := NewFactory().CreateTracesReceiver( + context.Background(), + set, + cfg, + nextSink) + require.NoError(t, err) + require.NotNil(t, r) + require.NoError(t, r.Start(context.Background(), componenttest.NewNopHost())) + + conn, err := grpc.Dial(endpointGrpc, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock()) + require.NoError(t, err) + defer conn.Close() + + doneSignalGrpc := make(chan bool) + + senderGrpc := func(td ptrace.Traces) { + // Ignore error, may be executed after the receiver shutdown. + _ = exportTraces(conn, td) + } + + // Send traces to the receiver until we signal via done channel, and then + // send one more trace after that. + go generateTraces(senderGrpc, doneSignalGrpc) + + // Wait until the receiver outputs anything to the sink. + assert.Eventually(t, func() bool { + return nextSink.SpanCount() > 0 + }, time.Second, 10*time.Millisecond) + + // Now shutdown the receiver, while continuing sending traces to it. + ctx, cancelFn := context.WithTimeout(context.Background(), 10*time.Second) + defer cancelFn() + err = r.Shutdown(ctx) + assert.NoError(t, err) + + // Remember how many spans the sink received. This number should not change after this + // point because after Shutdown() returns the component is not allowed to produce + // any more data. + sinkSpanCountAfterShutdown := nextSink.SpanCount() + + // Now signal to generateTraces to exit the main generation loop, then send + // one more trace and stop. + doneSignalGrpc <- true + + // Wait until all follow up traces are sent. + <-doneSignalGrpc + + // The last, additional trace should not be received by sink, so the number of spans in + // the sink should not change. + assert.EqualValues(t, sinkSpanCountAfterShutdown, nextSink.SpanCount()) +} + +func generateTraces(senderFn senderFunc, doneSignal chan bool) { + // Continuously generate spans until signaled to stop. +loop: + for { + select { + case <-doneSignal: + break loop + default: + } + senderFn(testdata.GenerateTraces(1)) + } + + // After getting the signal to stop, send one more span and then + // finally stop. We should never receive this last span. + senderFn(testdata.GenerateTraces(1)) + + // Indicate that we are done. + close(doneSignal) +} + +func exportTraces(cc *grpc.ClientConn, td ptrace.Traces) error { + acc := ptraceotlp.NewGRPCClient(cc) + req := ptraceotlp.NewExportRequestFromTraces(td) + _, err := acc.Export(context.Background(), req) + + return err +} + +type errOrSinkConsumer struct { + *consumertest.TracesSink + *consumertest.MetricsSink + mu sync.Mutex + consumeError error // to be returned by ConsumeTraces, if set +} + +// SetConsumeError sets an error that will be returned by the Consume function. +func (esc *errOrSinkConsumer) SetConsumeError(err error) { + esc.mu.Lock() + defer esc.mu.Unlock() + esc.consumeError = err +} + +func (esc *errOrSinkConsumer) Capabilities() consumer.Capabilities { + return consumer.Capabilities{MutatesData: false} +} + +// ConsumeTraces stores traces to this sink. +func (esc *errOrSinkConsumer) ConsumeTraces(ctx context.Context, td ptrace.Traces) error { + esc.mu.Lock() + defer esc.mu.Unlock() + + if esc.consumeError != nil { + return esc.consumeError + } + + return esc.TracesSink.ConsumeTraces(ctx, td) +} + +// ConsumeMetrics stores metrics to this sink. +func (esc *errOrSinkConsumer) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error { + esc.mu.Lock() + defer esc.mu.Unlock() + + if esc.consumeError != nil { + return esc.consumeError + } + + return esc.MetricsSink.ConsumeMetrics(ctx, md) +} + +// Reset deletes any stored in the sinks, resets error to nil. +func (esc *errOrSinkConsumer) Reset() { + esc.mu.Lock() + defer esc.mu.Unlock() + + esc.consumeError = nil + if esc.TracesSink != nil { + esc.TracesSink.Reset() + } + if esc.MetricsSink != nil { + esc.MetricsSink.Reset() + } +} + +type tracesSinkWithMetadata struct { + consumertest.TracesSink + MDs []client.Metadata +} + +func (ts *tracesSinkWithMetadata) ConsumeTraces(ctx context.Context, td ptrace.Traces) error { + info := client.FromContext(ctx) + ts.MDs = append(ts.MDs, info.Metadata) + return ts.TracesSink.ConsumeTraces(ctx, td) +} + +type anyStreamClient interface { + Send(*arrowpb.BatchArrowRecords) error + Recv() (*arrowpb.BatchStatus, error) + grpc.ClientStream +} + +func TestGRPCArrowReceiver(t *testing.T) { + addr := testutil.GetAvailableLocalAddress(t) + sink := new(tracesSinkWithMetadata) + + factory := NewFactory() + cfg := factory.CreateDefaultConfig().(*Config) + cfg.GRPC.NetAddr.Endpoint = addr + cfg.GRPC.IncludeMetadata = true + id := component.NewID(component.MustNewType("arrow")) + tt := componenttest.NewNopTelemetrySettings() + ocr := newReceiver(t, factory, tt, cfg, id, sink, nil) + + require.NotNil(t, ocr) + require.NoError(t, ocr.Start(context.Background(), componenttest.NewNopHost())) + + cc, err := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock()) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var stream anyStreamClient + client := arrowpb.NewArrowTracesServiceClient(cc) + stream, err = client.ArrowTraces(ctx, grpc.WaitForReady(true)) + require.NoError(t, err) + producer := arrowRecord.NewProducer() + + var headerBuf bytes.Buffer + hpd := hpack.NewEncoder(&headerBuf) + + var expectTraces []ptrace.Traces + var expectMDs []metadata.MD + + // Repeatedly send traces via arrow. Set the expected traces + // metadata to receive. + for i := 0; i < 10; i++ { + td := testdata.GenerateTraces(2) + expectTraces = append(expectTraces, td) + + headerBuf.Reset() + err := hpd.WriteField(hpack.HeaderField{ + Name: "seq", + Value: fmt.Sprint(i), + }) + require.NoError(t, err) + err = hpd.WriteField(hpack.HeaderField{ + Name: "test", + Value: "value", + }) + require.NoError(t, err) + expectMDs = append(expectMDs, metadata.MD{ + "seq": []string{fmt.Sprint(i)}, + "test": []string{"value"}, + }) + + batch, err := producer.BatchArrowRecordsFromTraces(td) + require.NoError(t, err) + + batch.Headers = headerBuf.Bytes() + + err = stream.Send(batch) + + require.NoError(t, err) + + resp, err := stream.Recv() + require.NoError(t, err) + require.Equal(t, batch.BatchId, resp.BatchId) + require.Equal(t, arrowpb.StatusCode_OK, resp.StatusCode) + } + + assert.NoError(t, cc.Close()) + require.NoError(t, ocr.Shutdown(context.Background())) + + assert.Equal(t, expectTraces, sink.AllTraces()) + + assert.Equal(t, len(expectMDs), len(sink.MDs)) + // gRPC adds its own metadata keys, so we check for only the + // expected ones below: + for idx := range expectMDs { + for key, vals := range expectMDs[idx] { + require.Equal(t, vals, sink.MDs[idx].Get(key), "for key %s", key) + } + } +} + +type hostWithExtensions struct { + component.Host + exts map[component.ID]component.Component +} + +func newHostWithExtensions(exts map[component.ID]component.Component) component.Host { + return &hostWithExtensions{ + Host: componenttest.NewNopHost(), + exts: exts, + } +} + +func (h *hostWithExtensions) GetExtensions() map[component.ID]component.Component { + return h.exts +} + +func newTestAuthExtension(t *testing.T, authFunc func(ctx context.Context, hdrs map[string][]string) (context.Context, error)) auth.Server { + ctrl := gomock.NewController(t) + as := mock.NewMockServer(ctrl) + as.EXPECT().Authenticate(gomock.Any(), gomock.Any()).AnyTimes().DoAndReturn(authFunc) + return as +} + +func TestGRPCArrowReceiverAuth(t *testing.T) { + addr := testutil.GetAvailableLocalAddress(t) + sink := new(tracesSinkWithMetadata) + + authID := component.NewID(component.MustNewType("testauth")) + + factory := NewFactory() + cfg := factory.CreateDefaultConfig().(*Config) + cfg.GRPC.NetAddr.Endpoint = addr + cfg.GRPC.IncludeMetadata = true + cfg.GRPC.Auth = &configauth.Authentication{ + AuthenticatorID: authID, + } + id := component.NewID(component.MustNewType("arrow")) + tt := componenttest.NewNopTelemetrySettings() + ocr := newReceiver(t, factory, tt, cfg, id, sink, nil) + + require.NotNil(t, ocr) + + const errorString = "very much not authorized" + + type inStreamCtx struct{} + + host := newHostWithExtensions( + map[component.ID]component.Component{ + authID: newTestAuthExtension(t, func(ctx context.Context, _ map[string][]string) (context.Context, error) { + if ctx.Value(inStreamCtx{}) != nil { + return ctx, fmt.Errorf(errorString) + } + return context.WithValue(ctx, inStreamCtx{}, t), nil + }), + }, + ) + + require.NoError(t, ocr.Start(context.Background(), host)) + + cc, err := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock()) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + client := arrowpb.NewArrowTracesServiceClient(cc) + stream, err := client.ArrowTraces(ctx, grpc.WaitForReady(true)) + require.NoError(t, err) + producer := arrowRecord.NewProducer() + + // Repeatedly send traces via arrow. Expect an auth error. + for i := 0; i < 10; i++ { + td := testdata.GenerateTraces(2) + + batch, err := producer.BatchArrowRecordsFromTraces(td) + require.NoError(t, err) + + err = stream.Send(batch) + require.NoError(t, err) + + resp, err := stream.Recv() + require.NoError(t, err) + // The stream has to be successful to get this far. The + // authenticator fails every data item: + require.Equal(t, batch.BatchId, resp.BatchId) + require.Equal(t, arrowpb.StatusCode_UNAVAILABLE, resp.StatusCode) + require.Equal(t, errorString, resp.StatusMessage) + } + + assert.NoError(t, cc.Close()) + require.NoError(t, ocr.Shutdown(context.Background())) + + assert.Equal(t, 0, len(sink.AllTraces())) +} diff --git a/receiver/otelarrowreceiver/testdata/config.yaml b/receiver/otelarrowreceiver/testdata/config.yaml index 074992aac57e9..0db443736428d 100644 --- a/receiver/otelarrowreceiver/testdata/config.yaml +++ b/receiver/otelarrowreceiver/testdata/config.yaml @@ -27,6 +27,3 @@ protocols: permit_without_stream: true arrow: memory_limit_mib: 123 - zstd: - memory_limit_mib: 8 - diff --git a/receiver/otelarrowreceiver/testdata/typo_default_proto_config.yaml b/receiver/otelarrowreceiver/testdata/typo_default_proto_config.yaml new file mode 100644 index 0000000000000..61ffd2b7a5d2c --- /dev/null +++ b/receiver/otelarrowreceiver/testdata/typo_default_proto_config.yaml @@ -0,0 +1,3 @@ +protocols: + grpc: + htttp: