Skip to content

Commit

Permalink
Add Google Pubsub receiver for OTLP messages
Browse files Browse the repository at this point in the history
  • Loading branch information
alexvanboxel committed May 28, 2021
1 parent c31f30c commit ba0075f
Show file tree
Hide file tree
Showing 11 changed files with 2,169 additions and 0 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,8 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/receiver/memca

replace github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkametricsreceiver => ./receiver/kafkametricsreceiver

replace github.com/open-telemetry/opentelemetry-collector-contrib/receiver/googlecloudpubsubreceiver => ./receiver/googlecloudpubsubreceiver

replace github.com/open-telemetry/opentelemetry-collector-contrib/processor/groupbyattrsprocessor => ./processor/groupbyattrsprocessor

replace github.com/open-telemetry/opentelemetry-collector-contrib/processor/groupbytraceprocessor => ./processor/groupbytraceprocessor
Expand Down
1 change: 1 addition & 0 deletions receiver/googlecloudpubsubreceiver/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
include ../../Makefile.Common
61 changes: 61 additions & 0 deletions receiver/googlecloudpubsubreceiver/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# Google Pubsub Receiver

This receiver gets OTLP messages from a Google Cloud [Pubsub](https://cloud.google.com/pubsub) subscription.

The following configuration options are supported:

* `project` (Required): The Google Cloud Project of the client connects to.
* `subscription` (Required): The subscription name to receive OTLP data from. The subscription name should be a
fully qualified resource name (eg: `projects/otel-project/subscriptions/otlp`).
* `encoding` (Options): The encoding that will be used to received data from the subscription. This can either be
`otlp_proto_trace`, `otlp_proto_metric`, `otlp_proto_log`, or `raw_text` (see `encoding`)

```yaml
receivers:
googlecloudpubsub:
project: otel-project
subscription: projects/otel-project/subscriptions/otlp-logs
encoding: raw_json
```

## Encoding

You should not need to set the encoding of the subscription as the receiver will try to discover the type of the data
by looking at the `ce-type` as `ce-datacontenttype` attributes of the message. Only when those attributes are not set
the `encoding` field in the configuration should be set.

| ce-type] | ce-datacontenttype | encoding | description |
| --- | --- | --- | --- |
| org.opentelemetry.otlp.traces.v1 | application/x-protobuf | | Decode OTLP trace message |
| org.opentelemetry.otlp.metrics.v1 | application/x-protobuf | | Decode OTLP metric message |
| org.opentelemetry.otlp.logs.v1 | application/json | | Decode OTLP log message |
| - | - | otlp_proto_trace | Decode OTLP trace message |
| - | - | otlp_proto_metric | Decode OTLP trace message |
| - | - | otlp_proto_log | Decode OTLP trace message |
| - | - | raw_text | Wrap in an OTLP log message |

When the `encoding` configuration is set, the attributes on the message are ignored.

The receiver can be used for ingesting arbitrary text message on a Pubsub subscription and wrap them in OTLP Log
message, making it a convenient way to ingest log lines from Pubsub.

## Pubsub subscription

The Google Cloud [Pubsub](https://cloud.google.com/pubsub) receiver doesn't automatic create subscriptions,
it expects the subscription to be created upfront. Security wise it's best to give the collector its own
service account and give the subscription `Pub/Sub Subscriber` permission.

The subscription should also be of delivery type `Pull`.

### Filtering

When the messages on the subscription are accompanied by the correct attributes and you only need a specific
type in your pipeline, the messages can be [filtered](https://cloud.google.com/pubsub/docs/filtering) on the
subscription saving on egress fees.

An example of filtering on trace message only:
```
attributes.ce-type = "org.opentelemetry.otlp.traces.v1"
AND
attributes.ce-datacontenttype = "application/x-protobuf"
```
99 changes: 99 additions & 0 deletions receiver/googlecloudpubsubreceiver/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http:https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package googlecloudpubsubreceiver

import (
"fmt"
"regexp"

"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/exporter/exporterhelper"
)

var subscriptionMatcher = regexp.MustCompile(`projects/[a-z][a-z0-9\-]*/subscriptions/.*`)

type Config struct {
config.ReceiverSettings `mapstructure:",squash"`

// Google Cloud Project ID where the Pubsub client will connect to
ProjectID string `mapstructure:"project"`
// User agent that will be used by the Pubsub client to connect to the service
UserAgent string `mapstructure:"user_agent"`
// Override of the Pubsub endpoint
Endpoint string `mapstructure:"endpoint"`
// Only has effect if Endpoint is not ""
UseInsecure bool `mapstructure:"use_insecure"`
// Timeout for all API calls. If not set, defaults to 12 seconds.
exporterhelper.TimeoutSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct.

// The fully qualified resource name of the Pubsub subscription
Subscription string `mapstructure:"subscription"`
// Lock down the encoding of the payload, leave empty for attribute based detection
Encoding string `mapstructure:"encoding"`

// The client id that will be used by Pubsub to make load balancing decisions
ClientID string `mapstructure:"client_id"`
}

func (config *Config) validateForLog() error {
err := config.validate()
if err != nil {
return err
}
switch config.Encoding {
case "":
case "otlp_proto_log":
case "raw_text":
case "raw_json":
default:
return fmt.Errorf("if specified, log encoding should be either otlp_proto_log, raw_text or raw_json")
}
return nil
}

func (config *Config) validateForTrace() error {
err := config.validate()
if err != nil {
return err
}
switch config.Encoding {
case "":
case "otlp_proto_trace":
default:
return fmt.Errorf("log format should be either otlp_proto, raw_string or raw_json")
}
return nil
}

func (config *Config) validateForMetric() error {
err := config.validate()
if err != nil {
return err
}
switch config.Encoding {
case "":
case "otlp_proto_metric":
default:
return fmt.Errorf("log format should be either otlp_proto, raw_string or raw_json")
}
return nil
}

func (config *Config) validate() error {
if !subscriptionMatcher.MatchString(config.Subscription) {
return fmt.Errorf("subscription '%s' is not a valide format, use 'projects/<project_id>/subscriptions/<name>'", config.Subscription)
}
return nil
}
131 changes: 131 additions & 0 deletions receiver/googlecloudpubsubreceiver/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http:https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package googlecloudpubsubreceiver

import (
"path"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/config/configtest"
"go.opentelemetry.io/collector/exporter/exporterhelper"
)

func TestLoadConfig(t *testing.T) {
factories, err := componenttest.NopFactories()
assert.Nil(t, err)

factory := NewFactory()
factories.Receivers[config.Type(typeStr)] = factory
cfg, err := configtest.LoadConfigFile(
t, path.Join(".", "testdata", "config.yaml"), factories,
)

require.NoError(t, err)
require.NotNil(t, cfg)

assert.Equal(t, len(cfg.Receivers), 2)

defaultConfig := factory.CreateDefaultConfig().(*Config)
assert.Equal(t, cfg.Receivers[config.NewID(typeStr)], defaultConfig)

customConfig := factory.CreateDefaultConfig().(*Config)
customConfig.SetIDName("customname")

customConfig.ProjectID = "my-project"
customConfig.UserAgent = "opentelemetry-collector-contrib {{version}}"
customConfig.Endpoint = "test-endpoint"
customConfig.UseInsecure = true
customConfig.TimeoutSettings = exporterhelper.TimeoutSettings{
Timeout: 20 * time.Second,
}
customConfig.Subscription = "projects/my-project/subscriptions/otlp-subscription"
assert.Equal(t, cfg.Receivers[config.NewIDWithName(typeStr, "customname")], customConfig)
}

func TestConfigValidation(t *testing.T) {
factory := NewFactory()
config := factory.CreateDefaultConfig().(*Config)
assert.Error(t, config.validateForTrace())
assert.Error(t, config.validateForLog())
assert.Error(t, config.validateForMetric())
config.Subscription = "projects/000project/subscriptions/my-subscription"
assert.Error(t, config.validate())
config.Subscription = "projects/my-project/topics/my-topic"
assert.Error(t, config.validate())
config.Subscription = "projects/my-project/subscriptions/my-subscription"
assert.NoError(t, config.validate())
}

func TestTraceConfigValidation(t *testing.T) {
factory := NewFactory()
config := factory.CreateDefaultConfig().(*Config)
config.Subscription = "projects/my-project/subscriptions/my-subscription"
assert.NoError(t, config.validateForTrace())

config.Encoding = "otlp_proto_metric"
assert.Error(t, config.validateForTrace())
config.Encoding = "otlp_proto_log"
assert.Error(t, config.validateForTrace())
config.Encoding = "raw_text"
assert.Error(t, config.validateForTrace())
config.Encoding = "raw_json"
assert.Error(t, config.validateForTrace())

config.Encoding = "otlp_proto_trace"
assert.NoError(t, config.validateForTrace())
}

func TestMetricConfigValidation(t *testing.T) {
factory := NewFactory()
config := factory.CreateDefaultConfig().(*Config)
config.Subscription = "projects/my-project/subscriptions/my-subscription"
assert.NoError(t, config.validateForMetric())

config.Encoding = "otlp_proto_trace"
assert.Error(t, config.validateForMetric())
config.Encoding = "otlp_proto_log"
assert.Error(t, config.validateForMetric())
config.Encoding = "raw_text"
assert.Error(t, config.validateForMetric())
config.Encoding = "raw_json"
assert.Error(t, config.validateForMetric())

config.Encoding = "otlp_proto_metric"
assert.NoError(t, config.validateForMetric())
}

func TestLogConfigValidation(t *testing.T) {
factory := NewFactory()
config := factory.CreateDefaultConfig().(*Config)
config.Subscription = "projects/my-project/subscriptions/my-subscription"
assert.NoError(t, config.validateForLog())

config.Encoding = "otlp_proto_trace"
assert.Error(t, config.validateForLog())
config.Encoding = "otlp_proto_metric"
assert.Error(t, config.validateForLog())

config.Encoding = "raw_text"
assert.NoError(t, config.validateForLog())
config.Encoding = "raw_json"
assert.NoError(t, config.validateForLog())
config.Encoding = "otlp_proto_log"
assert.NoError(t, config.validateForLog())
}
Loading

0 comments on commit ba0075f

Please sign in to comment.