-
Notifications
You must be signed in to change notification settings - Fork 2.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Google Pubsub receiver for OTLP messages - PR1 #3552
Merged
bogdandrutu
merged 2 commits into
open-telemetry:main
from
alexvanboxel:feature/googlecloudpubsubreceiver-pr1
Jun 21, 2021
Merged
Changes from all commits
Commits
Show all changes
2 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
Validating CODEOWNERS rules …
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
include ../../Makefile.Common |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,63 @@ | ||
# Google Pubsub Receiver | ||
|
||
> ⚠️ This is a community-provided module. It has been developed and extensively tested at Collibra, but it is not officially supported by GCP. | ||
|
||
This receiver gets OTLP messages from a Google Cloud [Pubsub](https://cloud.google.com/pubsub) subscription. | ||
|
||
The following configuration options are supported: | ||
|
||
* `project` (Optional): The Google Cloud Project of the client connects to. | ||
* `subscription` (Required): The subscription name to receive OTLP data from. The subscription name should be a | ||
fully qualified resource name (eg: `projects/otel-project/subscriptions/otlp`). | ||
* `encoding` (Optional): The encoding that will be used to received data from the subscription. This can either be | ||
`otlp_proto_trace`, `otlp_proto_metric`, `otlp_proto_log`, or `raw_text` (see `encoding`) | ||
|
||
```yaml | ||
receivers: | ||
googlecloudpubsub: | ||
project: otel-project | ||
subscription: projects/otel-project/subscriptions/otlp-logs | ||
encoding: raw_json | ||
``` | ||
|
||
## Encoding | ||
|
||
You should not need to set the encoding of the subscription as the receiver will try to discover the type of the data | ||
by looking at the `ce-type` and `ce-datacontenttype` attributes of the message. Only when those attributes are not set | ||
must the `encoding` field in the configuration be set. | ||
|
||
| ce-type] | ce-datacontenttype | encoding | description | | ||
| --- | --- | --- | --- | | ||
| org.opentelemetry.otlp.traces.v1 | application/x-protobuf | | Decode OTLP trace message | | ||
| org.opentelemetry.otlp.metrics.v1 | application/x-protobuf | | Decode OTLP metric message | | ||
| org.opentelemetry.otlp.logs.v1 | application/json | | Decode OTLP log message | | ||
| - | - | otlp_proto_trace | Decode OTLP trace message | | ||
| - | - | otlp_proto_metric | Decode OTLP trace message | | ||
| - | - | otlp_proto_log | Decode OTLP trace message | | ||
| - | - | raw_text | Wrap in an OTLP log message | | ||
|
||
When the `encoding` configuration is set, the attributes on the message are ignored. | ||
|
||
The receiver can be used for ingesting arbitrary text message on a Pubsub subscription and wrap them in OTLP Log | ||
message, making it a convenient way to ingest log lines from Pubsub. | ||
|
||
## Pubsub subscription | ||
|
||
The Google Cloud [Pubsub](https://cloud.google.com/pubsub) receiver doesn't automatically create subscriptions, | ||
it expects the subscription to be created upfront. Security wise it's best to give the collector its own | ||
service account and give the subscription `Pub/Sub Subscriber` permission. | ||
|
||
The subscription should also be of delivery type `Pull`. | ||
|
||
### Filtering | ||
|
||
When the messages on the subscription are accompanied by the correct attributes and you only need a specific | ||
type in your pipeline, the messages can be [filtered](https://cloud.google.com/pubsub/docs/filtering) on the | ||
subscription saving on egress fees. | ||
|
||
An example of filtering on trace message only: | ||
``` | ||
attributes.ce-type = "org.opentelemetry.otlp.traces.v1" | ||
AND | ||
attributes.ce-datacontenttype = "application/x-protobuf" | ||
``` |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,99 @@ | ||
// Copyright The OpenTelemetry Authors | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http:https://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package googlecloudpubsubreceiver | ||
|
||
import ( | ||
"fmt" | ||
"regexp" | ||
|
||
"go.opentelemetry.io/collector/config" | ||
"go.opentelemetry.io/collector/exporter/exporterhelper" | ||
) | ||
|
||
var subscriptionMatcher = regexp.MustCompile(`projects/[a-z][a-z0-9\-]*/subscriptions/`) | ||
|
||
type Config struct { | ||
config.ReceiverSettings `mapstructure:",squash"` | ||
|
||
// Google Cloud Project ID where the Pubsub client will connect to | ||
ProjectID string `mapstructure:"project"` | ||
// User agent that will be used by the Pubsub client to connect to the service | ||
UserAgent string `mapstructure:"user_agent"` | ||
// Override of the Pubsub endpoint | ||
Endpoint string `mapstructure:"endpoint"` | ||
// Only has effect if Endpoint is not "" | ||
Insecure bool `mapstructure:"insecure"` | ||
// Timeout for all API calls. If not set, defaults to 12 seconds. | ||
exporterhelper.TimeoutSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct. | ||
|
||
// The fully qualified resource name of the Pubsub subscription | ||
Subscription string `mapstructure:"subscription"` | ||
// Lock down the encoding of the payload, leave empty for attribute based detection | ||
Encoding string `mapstructure:"encoding"` | ||
|
||
// The client id that will be used by Pubsub to make load balancing decisions | ||
ClientID string `mapstructure:"client_id"` | ||
} | ||
|
||
func (config *Config) validateForLog() error { | ||
err := config.validate() | ||
if err != nil { | ||
return err | ||
} | ||
switch config.Encoding { | ||
case "": | ||
case "otlp_proto_log": | ||
case "raw_text": | ||
case "raw_json": | ||
default: | ||
return fmt.Errorf("if specified, log encoding should be either otlp_proto_log, raw_text or raw_json") | ||
} | ||
return nil | ||
} | ||
|
||
func (config *Config) validateForTrace() error { | ||
err := config.validate() | ||
if err != nil { | ||
return err | ||
} | ||
switch config.Encoding { | ||
case "": | ||
case "otlp_proto_trace": | ||
default: | ||
return fmt.Errorf("if specified, trace encoding can be be only otlp_proto_trace") | ||
} | ||
return nil | ||
} | ||
|
||
func (config *Config) validateForMetric() error { | ||
err := config.validate() | ||
if err != nil { | ||
return err | ||
} | ||
switch config.Encoding { | ||
case "": | ||
case "otlp_proto_metric": | ||
default: | ||
return fmt.Errorf("if specified, trace encoding can be be only otlp_proto_metric") | ||
} | ||
return nil | ||
} | ||
|
||
func (config *Config) validate() error { | ||
if !subscriptionMatcher.MatchString(config.Subscription) { | ||
return fmt.Errorf("subscription '%s' is not a valid format, use 'projects/<project_id>/subscriptions/<name>'", config.Subscription) | ||
} | ||
return nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,131 @@ | ||
// Copyright The OpenTelemetry Authors | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http:https://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package googlecloudpubsubreceiver | ||
|
||
import ( | ||
"path" | ||
"testing" | ||
"time" | ||
|
||
"github.com/stretchr/testify/assert" | ||
"github.com/stretchr/testify/require" | ||
"go.opentelemetry.io/collector/component/componenttest" | ||
"go.opentelemetry.io/collector/config" | ||
"go.opentelemetry.io/collector/config/configtest" | ||
"go.opentelemetry.io/collector/exporter/exporterhelper" | ||
) | ||
|
||
func TestLoadConfig(t *testing.T) { | ||
factories, err := componenttest.NopFactories() | ||
assert.Nil(t, err) | ||
|
||
factory := NewFactory() | ||
factories.Receivers[config.Type(typeStr)] = factory | ||
cfg, err := configtest.LoadConfig( | ||
path.Join(".", "testdata", "config.yaml"), factories, | ||
) | ||
|
||
require.NoError(t, err) | ||
require.NotNil(t, cfg) | ||
|
||
assert.Equal(t, len(cfg.Receivers), 2) | ||
|
||
defaultConfig := factory.CreateDefaultConfig().(*Config) | ||
assert.Equal(t, cfg.Receivers[config.NewID(typeStr)], defaultConfig) | ||
|
||
customConfig := factory.CreateDefaultConfig().(*Config) | ||
customConfig.SetIDName("customname") | ||
|
||
customConfig.ProjectID = "my-project" | ||
customConfig.UserAgent = "opentelemetry-collector-contrib {{version}}" | ||
customConfig.Endpoint = "test-endpoint" | ||
customConfig.Insecure = true | ||
customConfig.TimeoutSettings = exporterhelper.TimeoutSettings{ | ||
Timeout: 20 * time.Second, | ||
} | ||
customConfig.Subscription = "projects/my-project/subscriptions/otlp-subscription" | ||
assert.Equal(t, cfg.Receivers[config.NewIDWithName(typeStr, "customname")], customConfig) | ||
} | ||
|
||
func TestConfigValidation(t *testing.T) { | ||
factory := NewFactory() | ||
config := factory.CreateDefaultConfig().(*Config) | ||
assert.Error(t, config.validateForTrace()) | ||
assert.Error(t, config.validateForLog()) | ||
assert.Error(t, config.validateForMetric()) | ||
config.Subscription = "projects/000project/subscriptions/my-subscription" | ||
assert.Error(t, config.validate()) | ||
config.Subscription = "projects/my-project/topics/my-topic" | ||
assert.Error(t, config.validate()) | ||
config.Subscription = "projects/my-project/subscriptions/my-subscription" | ||
assert.NoError(t, config.validate()) | ||
} | ||
|
||
func TestTraceConfigValidation(t *testing.T) { | ||
factory := NewFactory() | ||
config := factory.CreateDefaultConfig().(*Config) | ||
config.Subscription = "projects/my-project/subscriptions/my-subscription" | ||
assert.NoError(t, config.validateForTrace()) | ||
|
||
config.Encoding = "otlp_proto_metric" | ||
assert.Error(t, config.validateForTrace()) | ||
config.Encoding = "otlp_proto_log" | ||
assert.Error(t, config.validateForTrace()) | ||
config.Encoding = "raw_text" | ||
assert.Error(t, config.validateForTrace()) | ||
config.Encoding = "raw_json" | ||
assert.Error(t, config.validateForTrace()) | ||
|
||
config.Encoding = "otlp_proto_trace" | ||
assert.NoError(t, config.validateForTrace()) | ||
} | ||
|
||
func TestMetricConfigValidation(t *testing.T) { | ||
factory := NewFactory() | ||
config := factory.CreateDefaultConfig().(*Config) | ||
config.Subscription = "projects/my-project/subscriptions/my-subscription" | ||
assert.NoError(t, config.validateForMetric()) | ||
|
||
config.Encoding = "otlp_proto_trace" | ||
assert.Error(t, config.validateForMetric()) | ||
config.Encoding = "otlp_proto_log" | ||
assert.Error(t, config.validateForMetric()) | ||
config.Encoding = "raw_text" | ||
assert.Error(t, config.validateForMetric()) | ||
config.Encoding = "raw_json" | ||
assert.Error(t, config.validateForMetric()) | ||
|
||
config.Encoding = "otlp_proto_metric" | ||
assert.NoError(t, config.validateForMetric()) | ||
} | ||
|
||
func TestLogConfigValidation(t *testing.T) { | ||
factory := NewFactory() | ||
config := factory.CreateDefaultConfig().(*Config) | ||
config.Subscription = "projects/my-project/subscriptions/my-subscription" | ||
assert.NoError(t, config.validateForLog()) | ||
|
||
config.Encoding = "otlp_proto_trace" | ||
assert.Error(t, config.validateForLog()) | ||
config.Encoding = "otlp_proto_metric" | ||
assert.Error(t, config.validateForLog()) | ||
|
||
config.Encoding = "raw_text" | ||
assert.NoError(t, config.validateForLog()) | ||
config.Encoding = "raw_json" | ||
assert.NoError(t, config.validateForLog()) | ||
config.Encoding = "otlp_proto_log" | ||
assert.NoError(t, config.validateForLog()) | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi Alexvanboxel, nice to meet you! I'm a member of the Cloud Trace team at Google.
Cloud Trace supports the single receiver approach, with one caveat. Our public documentation will still instruct customers to create one PubSub topic per telemetry type. The multi-topic approach allows for a more fine-grained authorization model (whereby Trace can only publish to the trace topic, Logging can only publish to the logging topic, and so on). However, this shouldn't affect your implementation. You can still write a single receiver that has the ability to accept all telemetry types. Our public documentation can simply encourage "best" security practices. There would be no inherent technical limitation that would prevent the customer from ignoring our documentation and piping all telemetry types through a single PubSub topic anyways.
We also favor this CloudEvents approach (as opposed to the OneOf() approach discussed in PR #157), implemented as Pub/Sub attributes. Our reasoning is that customers should be able to filter by telemetry type using message attributes before needing to deserialize the message.
One question I had, though, is your reasoning for using the CloudEvents spec? I'm unfamiliar with the spec, but reading through it, it appears rather thorough. Is the expectation that the receiver (and Trace, the publisher) will be fully compliant with the spec? And if not, and we are only using portions of it for inspiration, then does it make sense to give the PubSub attributes more familiar names such as (just as an example):
Finally, I wanted to thank you for your contributions! Let me know if you have any other questions for the Trace team.
Thanks,
Matt
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey, Matt: it's maybe worth having a meeting, last time I had a meeting with "Eyamba Ita" there was no mention on OTLP over Pubsub. You can mail me at "alex.vanboxel", at the company I work for "collibra.com". <- email spam bot proof