Skip to content

Commit

Permalink
[chore] move pulsar exporter to generated lifecycle tests (open-telem…
Browse files Browse the repository at this point in the history
…etry#30542)

Relates to
open-telemetry#27849

lifecycle test can't pass, encounter endless loop:
```
=== RUN   TestComponentLifecycle
=== RUN   TestComponentLifecycle/logs-shutdown
time="2024-01-13T16:17:22+08:00" level=info msg="[Connecting to broker]" remote_addr="pulsar:https://localhost:6650"
time="2024-01-13T16:17:22+08:00" level=warning msg="[Failed to connect to broker.]" error="dial tcp [::1]:6650: connect: connection refused" remote_addr="pulsar:https://localhost:6650"
time="2024-01-13T16:17:22+08:00" level=info msg="Removed connection from pool key=localhost:665045 0 logical_addr=pulsar:https://localhost:6650 physical_addr=pulsar:https://localhost:6650"
time="2024-01-13T16:17:22+08:00" level=info msg="[Connecting to broker]" remote_addr="pulsar:https://localhost:6650"
time="2024-01-13T16:17:22+08:00" level=warning msg="[Failed to connect to broker.]" error="dial tcp [::1]:6650: connect: connection refused" remote_addr="pulsar:https://localhost:6650"
time="2024-01-13T16:17:23+08:00" level=info msg="Removed connection from pool key=localhost:665045 0 logical_addr=pulsar:https://localhost:6650 physical_addr=pulsar:https://localhost:6650"
```

---------

Co-authored-by: Antoine Toulme <[email protected]>
  • Loading branch information
2 people authored and cparkins committed Feb 1, 2024
1 parent b9f6fee commit 8674820
Show file tree
Hide file tree
Showing 5 changed files with 177 additions and 24 deletions.
3 changes: 3 additions & 0 deletions exporter/pulsarexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ func (f *pulsarExporterFactory) createTracesExporter(
exporterhelper.WithTimeout(exporterhelper.TimeoutSettings{Timeout: 0}),
exporterhelper.WithRetry(oCfg.BackOffConfig),
exporterhelper.WithQueue(oCfg.QueueSettings),
exporterhelper.WithStart(exp.start),
exporterhelper.WithShutdown(exp.Close))
}

Expand Down Expand Up @@ -136,6 +137,7 @@ func (f *pulsarExporterFactory) createMetricsExporter(
exporterhelper.WithTimeout(exporterhelper.TimeoutSettings{Timeout: 0}),
exporterhelper.WithRetry(oCfg.BackOffConfig),
exporterhelper.WithQueue(oCfg.QueueSettings),
exporterhelper.WithStart(exp.start),
exporterhelper.WithShutdown(exp.Close))
}

Expand Down Expand Up @@ -166,5 +168,6 @@ func (f *pulsarExporterFactory) createLogsExporter(
exporterhelper.WithTimeout(exporterhelper.TimeoutSettings{Timeout: 0}),
exporterhelper.WithRetry(oCfg.BackOffConfig),
exporterhelper.WithQueue(oCfg.QueueSettings),
exporterhelper.WithStart(exp.start),
exporterhelper.WithShutdown(exp.Close))
}
17 changes: 11 additions & 6 deletions exporter/pulsarexporter/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/configretry"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.opentelemetry.io/collector/exporter/exportertest"
Expand Down Expand Up @@ -39,9 +40,10 @@ func TestWithTracesMarshalers_err(t *testing.T) {
tracesMarshaler := &customTraceMarshaler{encoding: "unknown"}
f := NewFactory(withTracesMarshalers(tracesMarshaler))
r, err := f.CreateTracesExporter(context.Background(), exportertest.NewNopCreateSettings(), cfg)
require.NoError(t, err)
err = r.Start(context.Background(), componenttest.NewNopHost())
// no available broker
require.Error(t, err)
assert.Nil(t, r)
}

func TestCreateTracesExporter_err(t *testing.T) {
Expand All @@ -50,27 +52,30 @@ func TestCreateTracesExporter_err(t *testing.T) {

f := pulsarExporterFactory{tracesMarshalers: tracesMarshalers()}
r, err := f.createTracesExporter(context.Background(), exportertest.NewNopCreateSettings(), cfg)
require.NoError(t, err)
err = r.Start(context.Background(), componenttest.NewNopHost())
// no available broker
require.Error(t, err)
assert.Nil(t, r)
}

func TestCreateMetricsExporter_err(t *testing.T) {
cfg := createDefaultConfig().(*Config)
cfg.Endpoint = ""

mf := pulsarExporterFactory{metricsMarshalers: metricsMarshalers()}
mr, err := mf.createMetricsExporter(context.Background(), exportertest.NewNopCreateSettings(), cfg)
r, err := mf.createMetricsExporter(context.Background(), exportertest.NewNopCreateSettings(), cfg)
require.NoError(t, err)
err = r.Start(context.Background(), componenttest.NewNopHost())
require.Error(t, err)
assert.Nil(t, mr)
}

func TestCreateLogsExporter_err(t *testing.T) {
cfg := createDefaultConfig().(*Config)
cfg.Endpoint = ""

mf := pulsarExporterFactory{logsMarshalers: logsMarshalers()}
mr, err := mf.createLogsExporter(context.Background(), exportertest.NewNopCreateSettings(), cfg)
r, err := mf.createLogsExporter(context.Background(), exportertest.NewNopCreateSettings(), cfg)
require.NoError(t, err)
err = r.Start(context.Background(), componenttest.NewNopHost())
require.Error(t, err)
assert.Nil(t, mr)
}
113 changes: 113 additions & 0 deletions exporter/pulsarexporter/generated_component_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions exporter/pulsarexporter/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,7 @@ status:
distributions: [contrib, splunk]
codeowners:
active: [dmitryax, dao-jun]

tests:
config:
skip_lifecycle: true
64 changes: 46 additions & 18 deletions exporter/pulsarexporter/pulsar_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"

"github.com/apache/pulsar-client-go/pulsar"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/pdata/plog"
Expand All @@ -20,6 +21,7 @@ import (
var errUnrecognizedEncoding = fmt.Errorf("unrecognized encoding")

type PulsarTracesProducer struct {
cfg Config
client pulsar.Client
producer pulsar.Producer
topic string
Expand Down Expand Up @@ -48,12 +50,26 @@ func (e *PulsarTracesProducer) tracesPusher(ctx context.Context, td ptrace.Trace
}

func (e *PulsarTracesProducer) Close(context.Context) error {
if e.producer == nil {
return nil
}
e.producer.Close()
e.client.Close()
return nil
}

func (e *PulsarTracesProducer) start(_ context.Context, _ component.Host) error {
client, producer, err := newPulsarProducer(e.cfg)
if err != nil {
return err
}
e.client = client
e.producer = producer
return nil
}

type PulsarMetricsProducer struct {
cfg Config
client pulsar.Client
producer pulsar.Producer
topic string
Expand Down Expand Up @@ -82,12 +98,26 @@ func (e *PulsarMetricsProducer) metricsDataPusher(ctx context.Context, md pmetri
}

func (e *PulsarMetricsProducer) Close(context.Context) error {
if e.producer == nil {
return nil
}
e.producer.Close()
e.client.Close()
return nil
}

func (e *PulsarMetricsProducer) start(_ context.Context, _ component.Host) error {
client, producer, err := newPulsarProducer(e.cfg)
if err != nil {
return err
}
e.client = client
e.producer = producer
return nil
}

type PulsarLogsProducer struct {
cfg Config
client pulsar.Client
producer pulsar.Producer
topic string
Expand Down Expand Up @@ -116,11 +146,24 @@ func (e *PulsarLogsProducer) logsDataPusher(ctx context.Context, ld plog.Logs) e
}

func (e *PulsarLogsProducer) Close(context.Context) error {
if e.producer == nil {
return nil
}
e.producer.Close()
e.client.Close()
return nil
}

func (e *PulsarLogsProducer) start(_ context.Context, _ component.Host) error {
client, producer, err := newPulsarProducer(e.cfg)
if err != nil {
return err
}
e.client = client
e.producer = producer
return nil
}

func newPulsarProducer(config Config) (pulsar.Client, pulsar.Producer, error) {
options := config.clientOptions()

Expand All @@ -146,14 +189,9 @@ func newMetricsExporter(config Config, set exporter.CreateSettings, marshalers m
if marshaler == nil {
return nil, errUnrecognizedEncoding
}
client, producer, err := newPulsarProducer(config)
if err != nil {
return nil, err
}

return &PulsarMetricsProducer{
client: client,
producer: producer,
cfg: config,
topic: config.Topic,
marshaler: marshaler,
logger: set.Logger,
Expand All @@ -166,13 +204,8 @@ func newTracesExporter(config Config, set exporter.CreateSettings, marshalers ma
if marshaler == nil {
return nil, errUnrecognizedEncoding
}
client, producer, err := newPulsarProducer(config)
if err != nil {
return nil, err
}
return &PulsarTracesProducer{
client: client,
producer: producer,
cfg: config,
topic: config.Topic,
marshaler: marshaler,
logger: set.Logger,
Expand All @@ -184,14 +217,9 @@ func newLogsExporter(config Config, set exporter.CreateSettings, marshalers map[
if marshaler == nil {
return nil, errUnrecognizedEncoding
}
client, producer, err := newPulsarProducer(config)
if err != nil {
return nil, err
}

return &PulsarLogsProducer{
client: client,
producer: producer,
cfg: config,
topic: config.Topic,
marshaler: marshaler,
logger: set.Logger,
Expand Down

0 comments on commit 8674820

Please sign in to comment.