Skip to content

Commit

Permalink
Add new health check feature for collector pipeline (open-telemetry#5643
Browse files Browse the repository at this point in the history
)
  • Loading branch information
skyduo committed Nov 1, 2021
1 parent 8e5b3ac commit 12b27b7
Show file tree
Hide file tree
Showing 13 changed files with 393 additions and 15 deletions.
17 changes: 17 additions & 0 deletions extension/healthcheckextension/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,33 @@ Health Check extension enables an HTTP url that can be probed to check the
status of the OpenTelemetry Collector. This extension can be used as a
liveness and/or readiness probe on Kubernetes.

There is an optional configuration `check_collector_pipeline` which allows
users to enable health check for the collector pipeline. This feature can
monitor the number of times that components failed send data to the destinations.
It only supports monitoring exporter failures and will support receivers and
processors in the future.

The following settings are required:

- `endpoint` (default = 0.0.0.0:13133): Address to publish the health check status to
- `port` (default = 13133): [deprecated] What port to expose HTTP health information.
- `check_collector_pipeline:` (optional): Settings of collector pipeline health check
- `enabled` (default = false): Whether enable collector pipeline check or not
- `interval` (default = "5m"): Time interval to check the number of failures
- `exporter_failure_threshold` (default = 5): The failure number threshold to mark
containers as healthy.

Example:

```yaml
extensions:
health_check:
health_check/1:
endpoint: "localhost:13"
check_collector_pipeline:
enabled: true
interval: "5m"
exporter_failure_threshold: 5
```
The full list of settings exposed for this exporter is documented [here](./config.go)
Expand Down
29 changes: 29 additions & 0 deletions extension/healthcheckextension/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
package healthcheckextension

import (
"errors"
"time"

"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/config/confignet"
)
Expand All @@ -33,11 +36,37 @@ type Config struct {
// check status.
// The default endpoint is "0.0.0.0:13133".
TCPAddr confignet.TCPAddr `mapstructure:",squash"`

// CheckCollectorPipeline contains the list of settings of collector pipeline health check
CheckCollectorPipeline checkCollectorPipelineSettings `mapstructure:"check_collector_pipeline"`
}

var _ config.Extension = (*Config)(nil)
var (
errNoEndpointProvided = errors.New("bad config: endpoint must be specified")
errInvalidExporterFailureThresholdProvided = errors.New("bad config: exporter_failure_threshold expects a positive number")
)

// Validate checks if the extension configuration is valid
func (cfg *Config) Validate() error {
_, err := time.ParseDuration(cfg.CheckCollectorPipeline.Interval)
if err != nil {
return err
}
if cfg.TCPAddr.Endpoint == "" {
return errNoEndpointProvided
}
if cfg.CheckCollectorPipeline.ExporterFailureThreshold <= 0 {
return errInvalidExporterFailureThresholdProvided
}
return nil
}

type checkCollectorPipelineSettings struct {
// Enabled indicates whether to not enable collector pipeline check.
Enabled bool `mapstructure:"enabled"`
// Interval the time range to check healthy status of collector pipeline
Interval string `mapstructure:"interval"`
// ExporterFailureThreshold is the threshold of exporter failure numbers during the Interval
ExporterFailureThreshold int `mapstructure:"exporter_failure_threshold"`
}
28 changes: 28 additions & 0 deletions extension/healthcheckextension/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,37 @@ func TestLoadConfig(t *testing.T) {
TCPAddr: confignet.TCPAddr{
Endpoint: "localhost:13",
},
CheckCollectorPipeline: defaultCheckCollectorPipelineSettings(),
},
ext1)

assert.Equal(t, 1, len(cfg.Service.Extensions))
assert.Equal(t, config.NewComponentIDWithName(typeStr, "1"), cfg.Service.Extensions[0])
}

func TestLoadConfigError(t *testing.T) {
factories, err := componenttest.NopFactories()
assert.NoError(t, err)

tests := []struct {
configName string
expectedErr error
}{
{
"missingendpoint",
errNoEndpointProvided,
},
{
"invalidthreshold",
errInvalidExporterFailureThresholdProvided,
},
}
for _, tt := range tests {
factory := NewFactory()
factories.Extensions[typeStr] = factory
cfg, _ := configtest.LoadConfig(path.Join(".", "testdata", "config_bad.yaml"), factories)
extension := cfg.Extensions[config.NewComponentIDWithName(typeStr, tt.configName)]
err := extension.Validate()
require.ErrorIs(t, err, tt.expectedErr)
}
}
69 changes: 69 additions & 0 deletions extension/healthcheckextension/exporter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package healthcheckextension

import (
"sync"
"time"

"go.opencensus.io/stats/view"
)

const (
exporterFailureView = "exporter/send_failed_requests"
)

// healthCheckExporter is a struct implement the exporter interface in open census that could export metrics
type healthCheckExporter struct {
mu sync.Mutex
exporterFailureQueue []*view.Data
}

func newHealthCheckExporter() *healthCheckExporter {
return &healthCheckExporter{}
}

// ExportView function could export the failure view to the queue
func (e *healthCheckExporter) ExportView(vd *view.Data) {
e.mu.Lock()
defer e.mu.Unlock()

if vd.View.Name == exporterFailureView {
e.exporterFailureQueue = append(e.exporterFailureQueue, vd)
}
}

func (e *healthCheckExporter) checkHealthStatus(exporterFailureThreshold int) bool {
e.mu.Lock()
defer e.mu.Unlock()

return exporterFailureThreshold >= len(e.exporterFailureQueue)
}

// rotate function could rotate the error logs that expired the time interval
func (e *healthCheckExporter) rotate(interval time.Duration) {
e.mu.Lock()
defer e.mu.Unlock()

viewNum := len(e.exporterFailureQueue)
currentTime := time.Now()
for i := 0; i < viewNum; i++ {
vd := e.exporterFailureQueue[0]
if vd.Start.Add(interval).After(currentTime) {
e.exporterFailureQueue = append(e.exporterFailureQueue, vd)
}
e.exporterFailureQueue = e.exporterFailureQueue[1:]
}
}
61 changes: 61 additions & 0 deletions extension/healthcheckextension/exporter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package healthcheckextension

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"go.opencensus.io/stats/view"
)

func TestHealthCheckExporter_ExportView(t *testing.T) {
exporter := &healthCheckExporter{}
newView := view.View{Name: exporterFailureView}
vd := &view.Data{
View: &newView,
Start: time.Time{},
End: time.Time{},
Rows: nil,
}
exporter.ExportView(vd)
assert.Equal(t, 1, len(exporter.exporterFailureQueue))
}

func TestHealthCheckExporter_rotate(t *testing.T) {
exporter := &healthCheckExporter{}
currentTime := time.Now()
time1 := currentTime.Add(-10 * time.Minute)
time2 := currentTime.Add(-3 * time.Minute)
newView := view.View{Name: exporterFailureView}
vd1 := &view.Data{
View: &newView,
Start: time1,
End: currentTime,
Rows: nil,
}
vd2 := &view.Data{
View: &newView,
Start: time2,
End: currentTime,
Rows: nil,
}
exporter.ExportView(vd1)
exporter.ExportView(vd2)
assert.Equal(t, 2, len(exporter.exporterFailureQueue))
exporter.rotate(5 * time.Minute)
assert.Equal(t, 1, len(exporter.exporterFailureQueue))
}
10 changes: 10 additions & 0 deletions extension/healthcheckextension/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ func createDefaultConfig() config.Extension {
TCPAddr: confignet.TCPAddr{
Endpoint: defaultEndpoint,
},
CheckCollectorPipeline: defaultCheckCollectorPipelineSettings(),
}
}

Expand All @@ -54,3 +55,12 @@ func createExtension(_ context.Context, set component.ExtensionCreateSettings, c

return newServer(*config, set.Logger), nil
}

// defaultCheckCollectorPipelineSettings returns the default settings for CheckCollectorPipeline.
func defaultCheckCollectorPipelineSettings() checkCollectorPipelineSettings {
return checkCollectorPipelineSettings{
Enabled: false,
Interval: "5m",
ExporterFailureThreshold: 5,
}
}
1 change: 1 addition & 0 deletions extension/healthcheckextension/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func TestFactory_CreateDefaultConfig(t *testing.T) {
TCPAddr: confignet.TCPAddr{
Endpoint: defaultEndpoint,
},
CheckCollectorPipeline: defaultCheckCollectorPipelineSettings(),
}, cfg)

assert.NoError(t, configtest.CheckConfigStruct(cfg))
Expand Down
1 change: 1 addition & 0 deletions extension/healthcheckextension/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
github.com/jaegertracing/jaeger v1.27.0
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.38.0
github.com/stretchr/testify v1.7.0
go.opencensus.io v0.23.0
go.opentelemetry.io/collector v0.38.0
go.uber.org/zap v1.19.1

Expand Down
2 changes: 2 additions & 0 deletions extension/healthcheckextension/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 12b27b7

Please sign in to comment.