Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Google Pubsub as an OTLP exporter/receiver #7172

Closed

Conversation

alexvanboxel
Copy link
Contributor

Description:
Add the implementation for Google Pubsub exporter and receiver for OTLP messages. The export is able to send OTEP messages to a Google Pubsub topic. The receiver and exporter are pushed in one PR as they work in pairs. A typical scenario is having an ingestion cluster (gRPC) pushing messages to Pubsub and then having receivers read again from Pubsub, having Pubsub as a buffer. That's why it doesn't make sense to have them reviewed separately.

This PR replaces the following PR:
#3879

The implementation has drifted and also includes several enhancements, it doesn't make sense to put it over the other PR.

The implementation is currently in use on production at Collibra.

Link to tracking Issue:
#1802

Testing:
exporter 93.1% test coverage
receiver 83.4% test coverage

Documentation:
Up-to-date README

@alexvanboxel alexvanboxel requested a review from a team as a code owner January 14, 2022 07:49
@alexvanboxel alexvanboxel force-pushed the feature/pubsub-full branch 15 times, most recently from 939aea2 to 38421b2 Compare January 20, 2022 06:30
@bogdandrutu
Copy link
Member

@dashpole do you want to be the sponsor for these components? If yes, please review :)

@dashpole
Copy link
Contributor

Yes, I'll gladly sponsor the component

@dashpole dashpole assigned dashpole and unassigned bogdandrutu Jan 20, 2022
CHANGELOG.md Outdated Show resolved Hide resolved
exporter/googlecloudpubsubexporter/README.md Outdated Show resolved Hide resolved
exporter/googlecloudpubsubexporter/README.md Outdated Show resolved Hide resolved
exporter/googlecloudpubsubexporter/config.go Show resolved Hide resolved
exporter/googlecloudpubsubexporter/config.go Outdated Show resolved Hide resolved
receiver/googlecloudpubsubreceiver/internal/handler.go Outdated Show resolved Hide resolved
receiver/googlecloudpubsubreceiver/internal/handler.go Outdated Show resolved Hide resolved
receiver/googlecloudpubsubreceiver/internal/handler.go Outdated Show resolved Hide resolved
receiver/googlecloudpubsubreceiver/internal/handler.go Outdated Show resolved Hide resolved
receiver/googlecloudpubsubreceiver/receiver.go Outdated Show resolved Hide resolved
@alexvanboxel
Copy link
Contributor Author

Thanks @dashpole for the review, I'm on it.

@@ -29,16 +28,15 @@ import (

func TestLoadConfig(t *testing.T) {
factories, err := componenttest.NopFactories()
assert.Nil(t, err)
assert.NoError(t, err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be require since it shouldn't allow the test to progress if this fails.

if ex.userAgent != "" {
dialOpts = append(dialOpts, grpc.WithUserAgent(ex.userAgent))
}
conn, _ := grpc.Dial(ex.config.Endpoint, append(dialOpts, grpc.WithTransportCredentials(insecure.NewCredentials()))...)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this error able to be captured elsewhere?

Just worried it could have a dead conn or partially set connection and it "working" without actually doing work.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this error able to be captured elsewhere?

I think this is the only thing I didn't fix because this is only for testing/mocking.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it is only for testing/mocking, it probably shouldn't be part of the user-facing config (config can have unexported structs without mapstructure tags, IIRC). Otherwise, we should consider it part of the user-facing config, and should handle errors.

@alexvanboxel
Copy link
Contributor Author

@dashpole I think I covered most of your concerns now. I did rebase the PR on master (to fix the errors in the changelog. I put 2 commits on top for the receiver and exporter respectively.

## 🚀 New components 🚀
## 💡 Enhancements 💡

- `googlecloudpubsubreceiver` Added implementation of Google Cloud Pubsub receiver. (#1802)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: duplicated line

topic: otlp-traces
watermark:
behavior: earliest
allow_drift: 1h
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we default allow_drift to 1h? IIUC, it doesn't apply if behavior == current, and if someone sets it to earliest, it seems nice to have the recommended value be the default to simplify config.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I would set another default I would set it to MAX duration then? That would make more sense.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is MAX duration? I'm not particularly concerned with what default you choose (you are the expert). Whatever you choose, it should work for most users.


const (
OtlpProtoTrace encoding = iota
OtlpProtoMetric = iota
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Is this still need to be exported?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you also unexport constants below as well?

if ex.userAgent != "" {
dialOpts = append(dialOpts, grpc.WithUserAgent(ex.userAgent))
}
conn, _ := grpc.Dial(ex.config.Endpoint, append(dialOpts, grpc.WithTransportCredentials(insecure.NewCredentials()))...)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it is only for testing/mocking, it probably shouldn't be part of the user-facing config (config can have unexported structs without mapstructure tags, IIRC). Otherwise, we should consider it part of the user-facing config, and should handle errors.

@github-actions
Copy link
Contributor

This PR was marked stale due to lack of activity. It will be closed in 14 days.

@github-actions github-actions bot added the Stale label Feb 22, 2022
@@ -9,6 +9,12 @@ The following configuration options are supported:
* `project` (Optional): The Google Cloud Project of the topics.
* `topic` (Required): The topic name to receive OTLP data over. The topic name should be a fully qualified resource
name (eg: `projects/otel-project/topics/otlp`).
* `compression` (Optional): Set the payload compression, only `gzip` is supported. Default is no compression.
* `watermark` Behaviour of how the `ce-time` attribute is set (see watermark section for more info)
* `behavior` (Optional): `current` sets the `ce-time` attribute to the system clock, `earlierst` sets the attribute to

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* `behavior` (Optional): `current` sets the `ce-time` attribute to the system clock, `earlierst` sets the attribute to
* `behavior` (Optional): `current` sets the `ce-time` attribute to the system clock, `earliest` sets the attribute to

@@ -42,6 +42,8 @@ type Config struct {
Subscription string `mapstructure:"subscription"`
// Lock down the encoding of the payload, leave empty for attribute based detection
Encoding string `mapstructure:"encoding"`
// Lock down the encoding of the payload, leave empty for attribute based detection

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Lock down the encoding of the payload, leave empty for attribute based detection
// Lock down the compression of the payload, leave empty for attribute based detection

@github-actions
Copy link
Contributor

Closed as inactive. Feel free to reopen if this PR is still being worked on.

@alexvanboxel
Copy link
Contributor Author

Sorry, to have let this slack. I have a re-opened a mergeable PR with all comments (also the suggested edits here: #8391)

animetauren pushed a commit to animetauren/opentelemetry-collector-contrib that referenced this pull request Apr 4, 2023
…en-telemetry#7172)

* Remove Get prefix from consumer package functions

* Provide deprecation path

* Use generics to provide common implementation for structs

* Update deprecation version

* Experiment with `Data` accessor

---------

Co-authored-by: Evan Bradley <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants