-
Notifications
You must be signed in to change notification settings - Fork 2.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Google Pubsub as an OTLP exporter/receiver #7172
Conversation
939aea2
to
38421b2
Compare
@dashpole do you want to be the sponsor for these components? If yes, please review :) |
Yes, I'll gladly sponsor the component |
Thanks @dashpole for the review, I'm on it. |
@@ -29,16 +28,15 @@ import ( | |||
|
|||
func TestLoadConfig(t *testing.T) { | |||
factories, err := componenttest.NopFactories() | |||
assert.Nil(t, err) | |||
assert.NoError(t, err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be require
since it shouldn't allow the test to progress if this fails.
if ex.userAgent != "" { | ||
dialOpts = append(dialOpts, grpc.WithUserAgent(ex.userAgent)) | ||
} | ||
conn, _ := grpc.Dial(ex.config.Endpoint, append(dialOpts, grpc.WithTransportCredentials(insecure.NewCredentials()))...) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this error able to be captured elsewhere?
Just worried it could have a dead conn or partially set connection and it "working" without actually doing work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this error able to be captured elsewhere?
I think this is the only thing I didn't fix because this is only for testing/mocking.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If it is only for testing/mocking, it probably shouldn't be part of the user-facing config (config can have unexported structs without mapstructure
tags, IIRC). Otherwise, we should consider it part of the user-facing config, and should handle errors.
38421b2
to
da1c247
Compare
0d327f5
to
ffa09a3
Compare
ffa09a3
to
d578677
Compare
@dashpole I think I covered most of your concerns now. I did rebase the PR on master (to fix the errors in the changelog. I put 2 commits on top for the receiver and exporter respectively. |
## 🚀 New components 🚀 | ||
## 💡 Enhancements 💡 | ||
|
||
- `googlecloudpubsubreceiver` Added implementation of Google Cloud Pubsub receiver. (#1802) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: duplicated line
topic: otlp-traces | ||
watermark: | ||
behavior: earliest | ||
allow_drift: 1h |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we default allow_drift to 1h? IIUC, it doesn't apply if behavior == current
, and if someone sets it to earliest
, it seems nice to have the recommended value be the default to simplify config.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I would set another default I would set it to MAX duration then? That would make more sense.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is MAX
duration? I'm not particularly concerned with what default you choose (you are the expert). Whatever you choose, it should work for most users.
|
||
const ( | ||
OtlpProtoTrace encoding = iota | ||
OtlpProtoMetric = iota |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Is this still need to be exported?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you also unexport constants below as well?
if ex.userAgent != "" { | ||
dialOpts = append(dialOpts, grpc.WithUserAgent(ex.userAgent)) | ||
} | ||
conn, _ := grpc.Dial(ex.config.Endpoint, append(dialOpts, grpc.WithTransportCredentials(insecure.NewCredentials()))...) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If it is only for testing/mocking, it probably shouldn't be part of the user-facing config (config can have unexported structs without mapstructure
tags, IIRC). Otherwise, we should consider it part of the user-facing config, and should handle errors.
This PR was marked stale due to lack of activity. It will be closed in 14 days. |
@@ -9,6 +9,12 @@ The following configuration options are supported: | |||
* `project` (Optional): The Google Cloud Project of the topics. | |||
* `topic` (Required): The topic name to receive OTLP data over. The topic name should be a fully qualified resource | |||
name (eg: `projects/otel-project/topics/otlp`). | |||
* `compression` (Optional): Set the payload compression, only `gzip` is supported. Default is no compression. | |||
* `watermark` Behaviour of how the `ce-time` attribute is set (see watermark section for more info) | |||
* `behavior` (Optional): `current` sets the `ce-time` attribute to the system clock, `earlierst` sets the attribute to |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* `behavior` (Optional): `current` sets the `ce-time` attribute to the system clock, `earlierst` sets the attribute to | |
* `behavior` (Optional): `current` sets the `ce-time` attribute to the system clock, `earliest` sets the attribute to |
@@ -42,6 +42,8 @@ type Config struct { | |||
Subscription string `mapstructure:"subscription"` | |||
// Lock down the encoding of the payload, leave empty for attribute based detection | |||
Encoding string `mapstructure:"encoding"` | |||
// Lock down the encoding of the payload, leave empty for attribute based detection |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// Lock down the encoding of the payload, leave empty for attribute based detection | |
// Lock down the compression of the payload, leave empty for attribute based detection |
Closed as inactive. Feel free to reopen if this PR is still being worked on. |
Sorry, to have let this slack. I have a re-opened a mergeable PR with all comments (also the suggested edits here: #8391) |
…en-telemetry#7172) * Remove Get prefix from consumer package functions * Provide deprecation path * Use generics to provide common implementation for structs * Update deprecation version * Experiment with `Data` accessor --------- Co-authored-by: Evan Bradley <[email protected]>
Description:
Add the implementation for Google Pubsub exporter and receiver for OTLP messages. The export is able to send OTEP messages to a Google Pubsub topic. The receiver and exporter are pushed in one PR as they work in pairs. A typical scenario is having an ingestion cluster (gRPC) pushing messages to Pubsub and then having receivers read again from Pubsub, having Pubsub as a buffer. That's why it doesn't make sense to have them reviewed separately.
This PR replaces the following PR:
#3879
The implementation has drifted and also includes several enhancements, it doesn't make sense to put it over the other PR.
The implementation is currently in use on production at Collibra.
Link to tracking Issue:
#1802
Testing:
exporter 93.1% test coverage
receiver 83.4% test coverage
Documentation:
Up-to-date README