Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Google Pubsub receiver for OTLP messages - PR1 #3552

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
Expand Up @@ -83,5 +83,6 @@ receiver/filelogreceiver/ @open-telemetry/collector-c
receiver/statsdreceiver/ @open-telemetry/collector-contrib-approvers @keitwb @jmacd
receiver/wavefrontreceiver/ @open-telemetry/collector-contrib-approvers @pjanotti
receiver/windowsperfcountersreceiver/ @open-telemetry/collector-contrib-approvers @dashpole
receiver/googlecloudpubsubexporter/ @open-telemetry/collector-contrib-approvers @alexvanboxel

tracegen/ @open-telemetry/collector-contrib-approvers @jpkrohling
8 changes: 8 additions & 0 deletions .github/dependabot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,10 @@ updates:
directory: "/receiver/fluentforwardreceiver"
schedule:
interval: "weekly"
- package-ecosystem: "gomod"
directory: "/receiver/googlecloudpubsubreceiver"
schedule:
interval: "weekly"
- package-ecosystem: "gomod"
directory: "/receiver/influxdbreceiver"
schedule:
Expand Down Expand Up @@ -357,6 +361,10 @@ updates:
directory: "/receiver/tcplogreceiver"
schedule:
interval: "weekly"
- package-ecosystem: "gomod"
directory: "/receiver/udplogreceiver"
schedule:
interval: "weekly"
- package-ecosystem: "gomod"
directory: "/receiver/wavefrontreceiver"
schedule:
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,8 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/receiver/memca

replace github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkametricsreceiver => ./receiver/kafkametricsreceiver

replace github.com/open-telemetry/opentelemetry-collector-contrib/receiver/googlecloudpubsubreceiver => ./receiver/googlecloudpubsubreceiver

replace github.com/open-telemetry/opentelemetry-collector-contrib/processor/groupbyattrsprocessor => ./processor/groupbyattrsprocessor

replace github.com/open-telemetry/opentelemetry-collector-contrib/processor/groupbytraceprocessor => ./processor/groupbytraceprocessor
Expand Down
1 change: 1 addition & 0 deletions receiver/googlecloudpubsubreceiver/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
include ../../Makefile.Common
63 changes: 63 additions & 0 deletions receiver/googlecloudpubsubreceiver/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# Google Pubsub Receiver
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi Alexvanboxel, nice to meet you! I'm a member of the Cloud Trace team at Google.

Cloud Trace supports the single receiver approach, with one caveat. Our public documentation will still instruct customers to create one PubSub topic per telemetry type. The multi-topic approach allows for a more fine-grained authorization model (whereby Trace can only publish to the trace topic, Logging can only publish to the logging topic, and so on). However, this shouldn't affect your implementation. You can still write a single receiver that has the ability to accept all telemetry types. Our public documentation can simply encourage "best" security practices. There would be no inherent technical limitation that would prevent the customer from ignoring our documentation and piping all telemetry types through a single PubSub topic anyways.

We also favor this CloudEvents approach (as opposed to the OneOf() approach discussed in PR #157), implemented as Pub/Sub attributes. Our reasoning is that customers should be able to filter by telemetry type using message attributes before needing to deserialize the message.

One question I had, though, is your reasoning for using the CloudEvents spec? I'm unfamiliar with the spec, but reading through it, it appears rather thorough. Is the expectation that the receiver (and Trace, the publisher) will be fully compliant with the spec? And if not, and we are only using portions of it for inspiration, then does it make sense to give the PubSub attributes more familiar names such as (just as an example):

Finally, I wanted to thank you for your contributions! Let me know if you have any other questions for the Trace team.

Thanks,
Matt

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey, Matt: it's maybe worth having a meeting, last time I had a meeting with "Eyamba Ita" there was no mention on OTLP over Pubsub. You can mail me at "alex.vanboxel", at the company I work for "collibra.com". <- email spam bot proof


> ⚠️ This is a community-provided module. It has been developed and extensively tested at Collibra, but it is not officially supported by GCP.

This receiver gets OTLP messages from a Google Cloud [Pubsub](https://cloud.google.com/pubsub) subscription.

The following configuration options are supported:

* `project` (Optional): The Google Cloud Project of the client connects to.
* `subscription` (Required): The subscription name to receive OTLP data from. The subscription name should be a
fully qualified resource name (eg: `projects/otel-project/subscriptions/otlp`).
* `encoding` (Optional): The encoding that will be used to received data from the subscription. This can either be
`otlp_proto_trace`, `otlp_proto_metric`, `otlp_proto_log`, or `raw_text` (see `encoding`)

```yaml
receivers:
googlecloudpubsub:
project: otel-project
subscription: projects/otel-project/subscriptions/otlp-logs
encoding: raw_json
```

## Encoding

You should not need to set the encoding of the subscription as the receiver will try to discover the type of the data
by looking at the `ce-type` and `ce-datacontenttype` attributes of the message. Only when those attributes are not set
must the `encoding` field in the configuration be set.

| ce-type] | ce-datacontenttype | encoding | description |
| --- | --- | --- | --- |
| org.opentelemetry.otlp.traces.v1 | application/x-protobuf | | Decode OTLP trace message |
| org.opentelemetry.otlp.metrics.v1 | application/x-protobuf | | Decode OTLP metric message |
| org.opentelemetry.otlp.logs.v1 | application/json | | Decode OTLP log message |
| - | - | otlp_proto_trace | Decode OTLP trace message |
| - | - | otlp_proto_metric | Decode OTLP trace message |
| - | - | otlp_proto_log | Decode OTLP trace message |
| - | - | raw_text | Wrap in an OTLP log message |

When the `encoding` configuration is set, the attributes on the message are ignored.

The receiver can be used for ingesting arbitrary text message on a Pubsub subscription and wrap them in OTLP Log
message, making it a convenient way to ingest log lines from Pubsub.

## Pubsub subscription

The Google Cloud [Pubsub](https://cloud.google.com/pubsub) receiver doesn't automatically create subscriptions,
it expects the subscription to be created upfront. Security wise it's best to give the collector its own
service account and give the subscription `Pub/Sub Subscriber` permission.

The subscription should also be of delivery type `Pull`.

### Filtering

When the messages on the subscription are accompanied by the correct attributes and you only need a specific
type in your pipeline, the messages can be [filtered](https://cloud.google.com/pubsub/docs/filtering) on the
subscription saving on egress fees.

An example of filtering on trace message only:
```
attributes.ce-type = "org.opentelemetry.otlp.traces.v1"
AND
attributes.ce-datacontenttype = "application/x-protobuf"
```
99 changes: 99 additions & 0 deletions receiver/googlecloudpubsubreceiver/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http:https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package googlecloudpubsubreceiver

import (
"fmt"
"regexp"

"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/exporter/exporterhelper"
)

var subscriptionMatcher = regexp.MustCompile(`projects/[a-z][a-z0-9\-]*/subscriptions/`)

type Config struct {
config.ReceiverSettings `mapstructure:",squash"`

// Google Cloud Project ID where the Pubsub client will connect to
ProjectID string `mapstructure:"project"`
// User agent that will be used by the Pubsub client to connect to the service
UserAgent string `mapstructure:"user_agent"`
// Override of the Pubsub endpoint
Endpoint string `mapstructure:"endpoint"`
// Only has effect if Endpoint is not ""
Insecure bool `mapstructure:"insecure"`
// Timeout for all API calls. If not set, defaults to 12 seconds.
exporterhelper.TimeoutSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct.

// The fully qualified resource name of the Pubsub subscription
Subscription string `mapstructure:"subscription"`
// Lock down the encoding of the payload, leave empty for attribute based detection
Encoding string `mapstructure:"encoding"`

// The client id that will be used by Pubsub to make load balancing decisions
ClientID string `mapstructure:"client_id"`
}

func (config *Config) validateForLog() error {
err := config.validate()
if err != nil {
return err
}
switch config.Encoding {
case "":
case "otlp_proto_log":
case "raw_text":
case "raw_json":
default:
return fmt.Errorf("if specified, log encoding should be either otlp_proto_log, raw_text or raw_json")
}
return nil
}

func (config *Config) validateForTrace() error {
err := config.validate()
if err != nil {
return err
}
switch config.Encoding {
case "":
case "otlp_proto_trace":
default:
return fmt.Errorf("if specified, trace encoding can be be only otlp_proto_trace")
}
return nil
}

func (config *Config) validateForMetric() error {
err := config.validate()
if err != nil {
return err
}
switch config.Encoding {
case "":
case "otlp_proto_metric":
default:
return fmt.Errorf("if specified, trace encoding can be be only otlp_proto_metric")
}
return nil
}

func (config *Config) validate() error {
if !subscriptionMatcher.MatchString(config.Subscription) {
return fmt.Errorf("subscription '%s' is not a valid format, use 'projects/<project_id>/subscriptions/<name>'", config.Subscription)
}
return nil
}
131 changes: 131 additions & 0 deletions receiver/googlecloudpubsubreceiver/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http:https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package googlecloudpubsubreceiver

import (
"path"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/config/configtest"
"go.opentelemetry.io/collector/exporter/exporterhelper"
)

func TestLoadConfig(t *testing.T) {
factories, err := componenttest.NopFactories()
assert.Nil(t, err)

factory := NewFactory()
factories.Receivers[config.Type(typeStr)] = factory
cfg, err := configtest.LoadConfig(
path.Join(".", "testdata", "config.yaml"), factories,
)

require.NoError(t, err)
require.NotNil(t, cfg)

assert.Equal(t, len(cfg.Receivers), 2)

defaultConfig := factory.CreateDefaultConfig().(*Config)
assert.Equal(t, cfg.Receivers[config.NewID(typeStr)], defaultConfig)

customConfig := factory.CreateDefaultConfig().(*Config)
customConfig.SetIDName("customname")

customConfig.ProjectID = "my-project"
customConfig.UserAgent = "opentelemetry-collector-contrib {{version}}"
customConfig.Endpoint = "test-endpoint"
customConfig.Insecure = true
customConfig.TimeoutSettings = exporterhelper.TimeoutSettings{
Timeout: 20 * time.Second,
}
customConfig.Subscription = "projects/my-project/subscriptions/otlp-subscription"
assert.Equal(t, cfg.Receivers[config.NewIDWithName(typeStr, "customname")], customConfig)
}

func TestConfigValidation(t *testing.T) {
factory := NewFactory()
config := factory.CreateDefaultConfig().(*Config)
assert.Error(t, config.validateForTrace())
assert.Error(t, config.validateForLog())
assert.Error(t, config.validateForMetric())
config.Subscription = "projects/000project/subscriptions/my-subscription"
assert.Error(t, config.validate())
config.Subscription = "projects/my-project/topics/my-topic"
assert.Error(t, config.validate())
config.Subscription = "projects/my-project/subscriptions/my-subscription"
assert.NoError(t, config.validate())
}

func TestTraceConfigValidation(t *testing.T) {
factory := NewFactory()
config := factory.CreateDefaultConfig().(*Config)
config.Subscription = "projects/my-project/subscriptions/my-subscription"
assert.NoError(t, config.validateForTrace())

config.Encoding = "otlp_proto_metric"
assert.Error(t, config.validateForTrace())
config.Encoding = "otlp_proto_log"
assert.Error(t, config.validateForTrace())
config.Encoding = "raw_text"
assert.Error(t, config.validateForTrace())
config.Encoding = "raw_json"
assert.Error(t, config.validateForTrace())

config.Encoding = "otlp_proto_trace"
assert.NoError(t, config.validateForTrace())
}

func TestMetricConfigValidation(t *testing.T) {
factory := NewFactory()
config := factory.CreateDefaultConfig().(*Config)
config.Subscription = "projects/my-project/subscriptions/my-subscription"
assert.NoError(t, config.validateForMetric())

config.Encoding = "otlp_proto_trace"
assert.Error(t, config.validateForMetric())
config.Encoding = "otlp_proto_log"
assert.Error(t, config.validateForMetric())
config.Encoding = "raw_text"
assert.Error(t, config.validateForMetric())
config.Encoding = "raw_json"
assert.Error(t, config.validateForMetric())

config.Encoding = "otlp_proto_metric"
assert.NoError(t, config.validateForMetric())
}

func TestLogConfigValidation(t *testing.T) {
factory := NewFactory()
config := factory.CreateDefaultConfig().(*Config)
config.Subscription = "projects/my-project/subscriptions/my-subscription"
assert.NoError(t, config.validateForLog())

config.Encoding = "otlp_proto_trace"
assert.Error(t, config.validateForLog())
config.Encoding = "otlp_proto_metric"
assert.Error(t, config.validateForLog())

config.Encoding = "raw_text"
assert.NoError(t, config.validateForLog())
config.Encoding = "raw_json"
assert.NoError(t, config.validateForLog())
config.Encoding = "otlp_proto_log"
assert.NoError(t, config.validateForLog())
}
Loading