Skip to content

Commit

Permalink
[receiver/loki] Added new component (#19399)
Browse files Browse the repository at this point in the history
Co-authored-by: Alex Boten <[email protected]>
  • Loading branch information
mar4uk and codeboten committed Mar 19, 2023
1 parent 0aea76a commit 25cbf9d
Show file tree
Hide file tree
Showing 17 changed files with 1,041 additions and 5 deletions.
16 changes: 16 additions & 0 deletions .chloggen/add-loki-receiver.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: new_component

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: lokireceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: The Loki receiver implements the [Loki push api](https://grafana.com/docs/loki/latest/clients/promtail/#loki-push-api) as specified [here](https://grafana.com/docs/loki/latest/api/#push-log-entries-to-loki)

# One or more tracking issues related to the change
issues: [18635]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
1 change: 1 addition & 0 deletions .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ receiver/k8sobjectsreceiver/ @open-telemetry/collect
receiver/kafkametricsreceiver/ @open-telemetry/collector-contrib-approvers @dmitryax
receiver/kafkareceiver/ @open-telemetry/collector-contrib-approvers @pavolloffay @MovieStoreGuy
receiver/kubeletstatsreceiver/ @open-telemetry/collector-contrib-approvers @dmitryax
receiver/lokireceiver/ @open-telemetry/collector-contrib-approvers @mar4uk @kovrus @jpkrohling
receiver/memcachedreceiver/ @open-telemetry/collector-contrib-approvers @djaglowski
receiver/mongodbreceiver/ @open-telemetry/collector-contrib-approvers @djaglowski @schmikei
receiver/mongodbatlasreceiver/ @open-telemetry/collector-contrib-approvers @djaglowski @schmikei
Expand Down
1 change: 1 addition & 0 deletions .github/ISSUE_TEMPLATE/bug_report.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ body:
- receiver/kafka
- receiver/kafkametrics
- receiver/kubeletstats
- receiver/loki
- receiver/memcached
- receiver/mongodb
- receiver/mongodbatlas
Expand Down
1 change: 1 addition & 0 deletions .github/ISSUE_TEMPLATE/feature_request.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ body:
- receiver/kafka
- receiver/kafkametrics
- receiver/kubeletstats
- receiver/loki
- receiver/memcached
- receiver/mongodb
- receiver/mongodbatlas
Expand Down
1 change: 1 addition & 0 deletions .github/ISSUE_TEMPLATE/other.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ body:
- receiver/kafka
- receiver/kafkametrics
- receiver/kubeletstats
- receiver/loki
- receiver/memcached
- receiver/mongodb
- receiver/mongodbatlas
Expand Down
10 changes: 5 additions & 5 deletions .github/dependabot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -877,6 +877,11 @@ updates:
schedule:
interval: "weekly"
day: "wednesday"
- package-ecosystem: "gomod"
directory: "/receiver/lokireceiver"
schedule:
interval: "weekly"
day: "wednesday"
- package-ecosystem: "gomod"
directory: "/receiver/memcachedreceiver"
schedule:
Expand Down Expand Up @@ -1097,8 +1102,3 @@ updates:
schedule:
interval: "weekly"
day: "wednesday"
- package-ecosystem: "gomod"
directory: "/testbed"
schedule:
interval: "weekly"
day: "wednesday"
1 change: 1 addition & 0 deletions receiver/lokireceiver/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
include ../../Makefile.Common
32 changes: 32 additions & 0 deletions receiver/lokireceiver/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Loki Receiver

| Status | |
| ------------------------ |---------------|
| Stability | [development] |
| Supported pipeline types | logs |
| Distributions | [contrib] |

The Loki receiver implements the [Loki push api](https://grafana.com/docs/loki/latest/clients/promtail/#loki-push-api) as specified [here](https://grafana.com/docs/loki/latest/api/#push-log-entries-to-loki).
It allows Promtail instances to specify the open telemetry collector as their lokiAddress.

This receiver runs HTTP and GRPC servers to ingest log entries in Loki format.

## Getting Started

The settings are:

- `endpoint` (required, default = 0.0.0.0:3500 for grpc protocol, 0.0.0.0:3600 http protocol): host:port to which the receiver is going to receive data.
- `use_incoming_timestamp` (optional, default = false) if set `true` the timestamp from Loki log entry is used

Example:
```yaml
receivers:
loki:
protocols:
http:
grpc:
use_incoming_timestamp: true
```

[development]:https://github.com/open-telemetry/opentelemetry-collector#development
[contrib]:https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib
72 changes: 72 additions & 0 deletions receiver/lokireceiver/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http:https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package lokireceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/lokireceiver"

import (
"errors"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/confmap"
)

const (
// Protocol values.
protoGRPC = "protocols::grpc"
protoHTTP = "protocols::http"
)

// Protocols is the configuration for the supported protocols.
type Protocols struct {
GRPC *configgrpc.GRPCServerSettings `mapstructure:"grpc"`
HTTP *confighttp.HTTPServerSettings `mapstructure:"http"`
}

// Config defines configuration for the lokireceiver receiver.
type Config struct {
// Protocols is the configuration for the supported protocols, currently gRPC and HTTP (Proto and JSON).
Protocols `mapstructure:"protocols"`
KeepTimestamp bool `mapstructure:"use_incoming_timestamp"`
}

var _ component.Config = (*Config)(nil)
var _ confmap.Unmarshaler = (*Config)(nil)

// Validate checks the receiver configuration is valid
func (cfg *Config) Validate() error {
if cfg.GRPC == nil && cfg.HTTP == nil {
return errors.New("must specify at least one protocol when using the Loki receiver")
}
return nil
}

// Unmarshal a confmap.Conf into the config struct.
func (cfg *Config) Unmarshal(conf *confmap.Conf) error {
err := conf.Unmarshal(cfg, confmap.WithErrorUnused())
if err != nil {
return err
}

if !conf.IsSet(protoGRPC) {
cfg.GRPC = nil
}

if !conf.IsSet(protoHTTP) {
cfg.HTTP = nil
}

return nil
}
143 changes: 143 additions & 0 deletions receiver/lokireceiver/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http:https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package lokireceiver

import (
"path/filepath"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/config/confignet"
"go.opentelemetry.io/collector/confmap/confmaptest"
)

func TestLoadConfig(t *testing.T) {
t.Parallel()

cm, err := confmaptest.LoadConf(filepath.Join("testdata", "config.yaml"))
require.NoError(t, err)

tests := []struct {
id component.ID
expected component.Config
}{
{
id: component.NewIDWithName(typeStr, "defaults"),
expected: &Config{
Protocols: Protocols{
GRPC: &configgrpc.GRPCServerSettings{
NetAddr: confignet.NetAddr{
Endpoint: "0.0.0.0:3600",
Transport: "tcp",
},
},
HTTP: &confighttp.HTTPServerSettings{
Endpoint: "0.0.0.0:3500",
},
},
},
},
{
id: component.NewIDWithName(typeStr, "mixed"),
expected: &Config{
Protocols: Protocols{
GRPC: &configgrpc.GRPCServerSettings{
NetAddr: confignet.NetAddr{
Endpoint: "localhost:4600",
Transport: "tcp",
},
},
HTTP: &confighttp.HTTPServerSettings{
Endpoint: "localhost:4500",
},
},
KeepTimestamp: true,
},
},
}

for _, tt := range tests {
t.Run(tt.id.String(), func(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig()

sub, err := cm.Sub(tt.id.String())
require.NoError(t, err)
require.NoError(t, component.UnmarshalConfig(sub, cfg))

assert.NoError(t, component.ValidateConfig(cfg))
assert.Equal(t, tt.expected, cfg)
})
}
}

func TestInvalidConfig(t *testing.T) {
cm, err := confmaptest.LoadConf(filepath.Join("testdata", "config.yaml"))
require.NoError(t, err)

tests := []struct {
id component.ID
err string
}{
{
id: component.NewIDWithName(typeStr, "empty"),
err: "must specify at least one protocol when using the Loki receiver",
},
}

for _, tt := range tests {
t.Run(tt.id.String(), func(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig()

sub, err := cm.Sub(tt.id.String())
require.NoError(t, err)
require.NoError(t, component.UnmarshalConfig(sub, cfg))

err = component.ValidateConfig(cfg)
assert.Contains(t, err.Error(), tt.err)
})
}
}

func TestConfigWithUnknownKeysConfig(t *testing.T) {
cm, err := confmaptest.LoadConf(filepath.Join("testdata", "config.yaml"))
require.NoError(t, err)

tests := []struct {
id component.ID
err string
}{
{
id: component.NewIDWithName(typeStr, "extra_keys"),
err: "'' has invalid keys: foo",
},
}

for _, tt := range tests {
t.Run(tt.id.String(), func(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig()

sub, err := cm.Sub(tt.id.String())
require.NoError(t, err)
assert.Contains(t, component.UnmarshalConfig(sub, cfg).Error(), tt.err)
})
}
}
71 changes: 71 additions & 0 deletions receiver/lokireceiver/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http:https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package lokireceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/lokireceiver"

import (
"context"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/config/confignet"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/receiver"
)

const (
// The value of "type" key in configuration.
typeStr = "loki"
// The stability level of the receiver.
stability = component.StabilityLevelDevelopment

defaultGRPCBindEndpoint = "0.0.0.0:3600"
defaultHTTPBindEndpoint = "0.0.0.0:3500"
)

// NewFactory return a new receiver.Factory for loki receiver.
func NewFactory() receiver.Factory {
return receiver.NewFactory(
typeStr,
createDefaultConfig,
receiver.WithLogs(createLogsReceiver, stability))
}

func createDefaultConfig() component.Config {
return &Config{
Protocols: Protocols{
GRPC: &configgrpc.GRPCServerSettings{
NetAddr: confignet.NetAddr{
Endpoint: defaultGRPCBindEndpoint,
Transport: "tcp",
},
},
HTTP: &confighttp.HTTPServerSettings{
Endpoint: defaultHTTPBindEndpoint,
},
},
}
}

func createLogsReceiver(
_ context.Context,
settings receiver.CreateSettings,
cfg component.Config,
consumer consumer.Logs,
) (receiver.Logs, error) {

rCfg := cfg.(*Config)
return newLokiReceiver(rCfg, consumer, settings)
}
Loading

0 comments on commit 25cbf9d

Please sign in to comment.