forked from open-telemetry/opentelemetry-collector-contrib
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add Google Pubsub receiver for OTLP messages - PR1 (open-telemetry#3552)
* Add Google Pubsub receiver for OTLP messages * Update receiver/googlecloudpubsubreceiver/config.go Co-authored-by: Anthony Mirabella <[email protected]> Co-authored-by: Anthony Mirabella <[email protected]>
- Loading branch information
1 parent
def0054
commit 7b4f708
Showing
13 changed files
with
2,120 additions
and
0 deletions.
There are no files selected for viewing
Validating CODEOWNERS rules …
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
include ../../Makefile.Common |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,63 @@ | ||
# Google Pubsub Receiver | ||
|
||
> ⚠️ This is a community-provided module. It has been developed and extensively tested at Collibra, but it is not officially supported by GCP. | ||
This receiver gets OTLP messages from a Google Cloud [Pubsub](https://cloud.google.com/pubsub) subscription. | ||
|
||
The following configuration options are supported: | ||
|
||
* `project` (Optional): The Google Cloud Project of the client connects to. | ||
* `subscription` (Required): The subscription name to receive OTLP data from. The subscription name should be a | ||
fully qualified resource name (eg: `projects/otel-project/subscriptions/otlp`). | ||
* `encoding` (Optional): The encoding that will be used to received data from the subscription. This can either be | ||
`otlp_proto_trace`, `otlp_proto_metric`, `otlp_proto_log`, or `raw_text` (see `encoding`) | ||
|
||
```yaml | ||
receivers: | ||
googlecloudpubsub: | ||
project: otel-project | ||
subscription: projects/otel-project/subscriptions/otlp-logs | ||
encoding: raw_json | ||
``` | ||
|
||
## Encoding | ||
|
||
You should not need to set the encoding of the subscription as the receiver will try to discover the type of the data | ||
by looking at the `ce-type` and `ce-datacontenttype` attributes of the message. Only when those attributes are not set | ||
must the `encoding` field in the configuration be set. | ||
|
||
| ce-type] | ce-datacontenttype | encoding | description | | ||
| --- | --- | --- | --- | | ||
| org.opentelemetry.otlp.traces.v1 | application/x-protobuf | | Decode OTLP trace message | | ||
| org.opentelemetry.otlp.metrics.v1 | application/x-protobuf | | Decode OTLP metric message | | ||
| org.opentelemetry.otlp.logs.v1 | application/json | | Decode OTLP log message | | ||
| - | - | otlp_proto_trace | Decode OTLP trace message | | ||
| - | - | otlp_proto_metric | Decode OTLP trace message | | ||
| - | - | otlp_proto_log | Decode OTLP trace message | | ||
| - | - | raw_text | Wrap in an OTLP log message | | ||
|
||
When the `encoding` configuration is set, the attributes on the message are ignored. | ||
|
||
The receiver can be used for ingesting arbitrary text message on a Pubsub subscription and wrap them in OTLP Log | ||
message, making it a convenient way to ingest log lines from Pubsub. | ||
|
||
## Pubsub subscription | ||
|
||
The Google Cloud [Pubsub](https://cloud.google.com/pubsub) receiver doesn't automatically create subscriptions, | ||
it expects the subscription to be created upfront. Security wise it's best to give the collector its own | ||
service account and give the subscription `Pub/Sub Subscriber` permission. | ||
|
||
The subscription should also be of delivery type `Pull`. | ||
|
||
### Filtering | ||
|
||
When the messages on the subscription are accompanied by the correct attributes and you only need a specific | ||
type in your pipeline, the messages can be [filtered](https://cloud.google.com/pubsub/docs/filtering) on the | ||
subscription saving on egress fees. | ||
|
||
An example of filtering on trace message only: | ||
``` | ||
attributes.ce-type = "org.opentelemetry.otlp.traces.v1" | ||
AND | ||
attributes.ce-datacontenttype = "application/x-protobuf" | ||
``` |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,99 @@ | ||
// Copyright The OpenTelemetry Authors | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http:https://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package googlecloudpubsubreceiver | ||
|
||
import ( | ||
"fmt" | ||
"regexp" | ||
|
||
"go.opentelemetry.io/collector/config" | ||
"go.opentelemetry.io/collector/exporter/exporterhelper" | ||
) | ||
|
||
var subscriptionMatcher = regexp.MustCompile(`projects/[a-z][a-z0-9\-]*/subscriptions/`) | ||
|
||
type Config struct { | ||
config.ReceiverSettings `mapstructure:",squash"` | ||
|
||
// Google Cloud Project ID where the Pubsub client will connect to | ||
ProjectID string `mapstructure:"project"` | ||
// User agent that will be used by the Pubsub client to connect to the service | ||
UserAgent string `mapstructure:"user_agent"` | ||
// Override of the Pubsub endpoint | ||
Endpoint string `mapstructure:"endpoint"` | ||
// Only has effect if Endpoint is not "" | ||
Insecure bool `mapstructure:"insecure"` | ||
// Timeout for all API calls. If not set, defaults to 12 seconds. | ||
exporterhelper.TimeoutSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct. | ||
|
||
// The fully qualified resource name of the Pubsub subscription | ||
Subscription string `mapstructure:"subscription"` | ||
// Lock down the encoding of the payload, leave empty for attribute based detection | ||
Encoding string `mapstructure:"encoding"` | ||
|
||
// The client id that will be used by Pubsub to make load balancing decisions | ||
ClientID string `mapstructure:"client_id"` | ||
} | ||
|
||
func (config *Config) validateForLog() error { | ||
err := config.validate() | ||
if err != nil { | ||
return err | ||
} | ||
switch config.Encoding { | ||
case "": | ||
case "otlp_proto_log": | ||
case "raw_text": | ||
case "raw_json": | ||
default: | ||
return fmt.Errorf("if specified, log encoding should be either otlp_proto_log, raw_text or raw_json") | ||
} | ||
return nil | ||
} | ||
|
||
func (config *Config) validateForTrace() error { | ||
err := config.validate() | ||
if err != nil { | ||
return err | ||
} | ||
switch config.Encoding { | ||
case "": | ||
case "otlp_proto_trace": | ||
default: | ||
return fmt.Errorf("if specified, trace encoding can be be only otlp_proto_trace") | ||
} | ||
return nil | ||
} | ||
|
||
func (config *Config) validateForMetric() error { | ||
err := config.validate() | ||
if err != nil { | ||
return err | ||
} | ||
switch config.Encoding { | ||
case "": | ||
case "otlp_proto_metric": | ||
default: | ||
return fmt.Errorf("if specified, trace encoding can be be only otlp_proto_metric") | ||
} | ||
return nil | ||
} | ||
|
||
func (config *Config) validate() error { | ||
if !subscriptionMatcher.MatchString(config.Subscription) { | ||
return fmt.Errorf("subscription '%s' is not a valid format, use 'projects/<project_id>/subscriptions/<name>'", config.Subscription) | ||
} | ||
return nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,131 @@ | ||
// Copyright The OpenTelemetry Authors | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http:https://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package googlecloudpubsubreceiver | ||
|
||
import ( | ||
"path" | ||
"testing" | ||
"time" | ||
|
||
"github.com/stretchr/testify/assert" | ||
"github.com/stretchr/testify/require" | ||
"go.opentelemetry.io/collector/component/componenttest" | ||
"go.opentelemetry.io/collector/config" | ||
"go.opentelemetry.io/collector/config/configtest" | ||
"go.opentelemetry.io/collector/exporter/exporterhelper" | ||
) | ||
|
||
func TestLoadConfig(t *testing.T) { | ||
factories, err := componenttest.NopFactories() | ||
assert.Nil(t, err) | ||
|
||
factory := NewFactory() | ||
factories.Receivers[config.Type(typeStr)] = factory | ||
cfg, err := configtest.LoadConfig( | ||
path.Join(".", "testdata", "config.yaml"), factories, | ||
) | ||
|
||
require.NoError(t, err) | ||
require.NotNil(t, cfg) | ||
|
||
assert.Equal(t, len(cfg.Receivers), 2) | ||
|
||
defaultConfig := factory.CreateDefaultConfig().(*Config) | ||
assert.Equal(t, cfg.Receivers[config.NewID(typeStr)], defaultConfig) | ||
|
||
customConfig := factory.CreateDefaultConfig().(*Config) | ||
customConfig.SetIDName("customname") | ||
|
||
customConfig.ProjectID = "my-project" | ||
customConfig.UserAgent = "opentelemetry-collector-contrib {{version}}" | ||
customConfig.Endpoint = "test-endpoint" | ||
customConfig.Insecure = true | ||
customConfig.TimeoutSettings = exporterhelper.TimeoutSettings{ | ||
Timeout: 20 * time.Second, | ||
} | ||
customConfig.Subscription = "projects/my-project/subscriptions/otlp-subscription" | ||
assert.Equal(t, cfg.Receivers[config.NewIDWithName(typeStr, "customname")], customConfig) | ||
} | ||
|
||
func TestConfigValidation(t *testing.T) { | ||
factory := NewFactory() | ||
config := factory.CreateDefaultConfig().(*Config) | ||
assert.Error(t, config.validateForTrace()) | ||
assert.Error(t, config.validateForLog()) | ||
assert.Error(t, config.validateForMetric()) | ||
config.Subscription = "projects/000project/subscriptions/my-subscription" | ||
assert.Error(t, config.validate()) | ||
config.Subscription = "projects/my-project/topics/my-topic" | ||
assert.Error(t, config.validate()) | ||
config.Subscription = "projects/my-project/subscriptions/my-subscription" | ||
assert.NoError(t, config.validate()) | ||
} | ||
|
||
func TestTraceConfigValidation(t *testing.T) { | ||
factory := NewFactory() | ||
config := factory.CreateDefaultConfig().(*Config) | ||
config.Subscription = "projects/my-project/subscriptions/my-subscription" | ||
assert.NoError(t, config.validateForTrace()) | ||
|
||
config.Encoding = "otlp_proto_metric" | ||
assert.Error(t, config.validateForTrace()) | ||
config.Encoding = "otlp_proto_log" | ||
assert.Error(t, config.validateForTrace()) | ||
config.Encoding = "raw_text" | ||
assert.Error(t, config.validateForTrace()) | ||
config.Encoding = "raw_json" | ||
assert.Error(t, config.validateForTrace()) | ||
|
||
config.Encoding = "otlp_proto_trace" | ||
assert.NoError(t, config.validateForTrace()) | ||
} | ||
|
||
func TestMetricConfigValidation(t *testing.T) { | ||
factory := NewFactory() | ||
config := factory.CreateDefaultConfig().(*Config) | ||
config.Subscription = "projects/my-project/subscriptions/my-subscription" | ||
assert.NoError(t, config.validateForMetric()) | ||
|
||
config.Encoding = "otlp_proto_trace" | ||
assert.Error(t, config.validateForMetric()) | ||
config.Encoding = "otlp_proto_log" | ||
assert.Error(t, config.validateForMetric()) | ||
config.Encoding = "raw_text" | ||
assert.Error(t, config.validateForMetric()) | ||
config.Encoding = "raw_json" | ||
assert.Error(t, config.validateForMetric()) | ||
|
||
config.Encoding = "otlp_proto_metric" | ||
assert.NoError(t, config.validateForMetric()) | ||
} | ||
|
||
func TestLogConfigValidation(t *testing.T) { | ||
factory := NewFactory() | ||
config := factory.CreateDefaultConfig().(*Config) | ||
config.Subscription = "projects/my-project/subscriptions/my-subscription" | ||
assert.NoError(t, config.validateForLog()) | ||
|
||
config.Encoding = "otlp_proto_trace" | ||
assert.Error(t, config.validateForLog()) | ||
config.Encoding = "otlp_proto_metric" | ||
assert.Error(t, config.validateForLog()) | ||
|
||
config.Encoding = "raw_text" | ||
assert.NoError(t, config.validateForLog()) | ||
config.Encoding = "raw_json" | ||
assert.NoError(t, config.validateForLog()) | ||
config.Encoding = "otlp_proto_log" | ||
assert.NoError(t, config.validateForLog()) | ||
} |
Oops, something went wrong.