Skip to content

Commit

Permalink
enhance memory ballast extension. set ballast size in percentage is s…
Browse files Browse the repository at this point in the history
…upported for containers and physical hosts (open-telemetry#3456)
  • Loading branch information
mxiamxia authored Jun 29, 2021
1 parent cbee3b3 commit cbd822e
Show file tree
Hide file tree
Showing 16 changed files with 238 additions and 55 deletions.
30 changes: 28 additions & 2 deletions extension/ballastextension/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,38 @@ Memory Ballast extension enables applications to configure memory ballast for th

The following settings can be configured:

- `size_mib` (default = 0, disabled): Is the memory ballast size, in MiB.
- `size_mib` (default = 0, disabled): Is the memory ballast size, in MiB.
Takes higher priority than `size_in_percentage` if both are specified at the same time.
- `size_in_percentage` (default = 0, disabled): Set the memory ballast based on the
total memory in percentage, value range is `1-100`.
It is supported in both containerized(eg, docker, k8s) and physical host environments.

**How ballast size is calculated with percentage configuration**
When `size_in_percentage` is enabled with the value(1-100), the absolute `ballast_size` will be calculated by
`size_in_percentage * totalMemory / 100`. The `totalMemory` can be retrieved for hosts and containers(in docker, k8s, etc) by the following steps,
1. Look up Memory Cgroup subsystem on the target host or container, find out if there is any total memory limitation has been set for the running collector process.
Check the value in `memory.limit_in_bytes` file under cgroup memory files (eg, `/sys/fs/cgroup/memory/memory.limit_in_bytes`).

2. If `memory.limit_in_bytes` is positive value other than `9223372036854771712`(`0x7FFFFFFFFFFFF000`). The `ballest_size`
will be calculated by `memory.limit_in_bytes * size_in_percentage / 100`.
If `memory.limit_in_bytes` value is `9223372036854771712`(`0x7FFFFFFFFFFFF000`), it indicates there is no memory limit has
been set for the collector process or the running container in cgroup. Then the `totalMemory` will be determined in next step.

3. if there is no memory limit set in cgroup for the collector process or container where the collector is running. The total memory will be
calculated by `github.com/shirou/gopsutil/mem`[[link]](https://github.com/shirou/gopsutil/) on `mem.VirtualMemory().total` which is supported in multiple OS systems.

Example:

Example:
Config that uses 64 Mib of memory for the ballast:
```yaml
extensions:
memory_ballast:
size_mib: 64
```
Config that uses 20% of the total memory for the ballast:
```yaml
extensions:
memory_ballast:
size_in_percentage: 20
```
19 changes: 17 additions & 2 deletions extension/ballastextension/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,29 @@
package ballastextension

import (
"fmt"

"go.opentelemetry.io/collector/config"
)

// Config has the configuration for the fluentbit extension.
// Config has the configuration for the ballast extension.
type Config struct {
config.ExtensionSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct

// SizeMiB is the size, in MiB, of the memory ballast
// to be created for this process.
SizeMiB uint32 `mapstructure:"size_mib"`
SizeMiB uint64 `mapstructure:"size_mib"`

// SizeInPercentage is the maximum amount of memory ballast, in %, targeted to be
// allocated. The fixed memory settings SizeMiB has a higher precedence.
SizeInPercentage uint64 `mapstructure:"size_in_percentage"`
}

// Validate checks if the extension configuration is valid
func (cfg *Config) Validate() error {
// no need to validate less than 0 case for uint64
if cfg.SizeInPercentage > 100 {
return fmt.Errorf("size_in_percentage is not in range 0 to 100")
}
return nil
}
14 changes: 14 additions & 0 deletions extension/ballastextension/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,23 @@ func TestLoadConfig(t *testing.T) {
&Config{
ExtensionSettings: config.NewExtensionSettings(config.NewIDWithName(typeStr, "1")),
SizeMiB: 123,
SizeInPercentage: 20,
},
ext1)

assert.Equal(t, 1, len(cfg.Service.Extensions))
assert.Equal(t, config.NewIDWithName(typeStr, "1"), cfg.Service.Extensions[0])
}

func TestLoadInvalidConfig(t *testing.T) {
factories, err := componenttest.NopFactories()
assert.NoError(t, err)

factory := NewFactory()
factories.Extensions[typeStr] = factory
_, err = configtest.LoadConfigAndValidate(path.Join(".", "testdata", "config_invalid.yaml"), factories)

require.NotNil(t, err)
assert.Equal(t, err.Error(), "extension \"memory_ballast\" has invalid configuration: size_in_percentage is not in range 0 to 100")

}
6 changes: 5 additions & 1 deletion extension/ballastextension/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,17 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/extension/extensionhelper"
"go.opentelemetry.io/collector/internal/iruntime"
)

const (
// The value of extension "type" in configuration.
typeStr = "memory_ballast"
)

// memHandler returns the total memory of the target host/vm
var memHandler = iruntime.TotalMemory

// NewFactory creates a factory for FluentBit extension.
func NewFactory() component.ExtensionFactory {
return extensionhelper.NewFactory(
Expand All @@ -42,5 +46,5 @@ func createDefaultConfig() config.Extension {
}

func createExtension(_ context.Context, set component.ExtensionCreateSettings, cfg config.Extension) (component.Extension, error) {
return newMemoryBallast(cfg.(*Config), set.Logger), nil
return newMemoryBallast(cfg.(*Config), set.Logger, memHandler), nil
}
32 changes: 24 additions & 8 deletions extension/ballastextension/memory_ballast.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,32 @@ import (
const megaBytes = 1024 * 1024

type memoryBallast struct {
cfg *Config
logger *zap.Logger
ballast []byte
cfg *Config
logger *zap.Logger
ballast []byte
getTotalMem func() (uint64, error)
}

func (m *memoryBallast) Start(_ context.Context, _ component.Host) error {
var ballastSizeBytes uint64
// absolute value supersedes percentage setting
if m.cfg.SizeMiB > 0 {
ballastSizeBytes := uint64(m.cfg.SizeMiB) * megaBytes
ballastSizeBytes = m.cfg.SizeMiB * megaBytes
} else {
totalMemory, err := m.getTotalMem()
if err != nil {
return err
}
ballastPercentage := m.cfg.SizeInPercentage
ballastSizeBytes = ballastPercentage * totalMemory / 100
}

if ballastSizeBytes > 0 {
m.ballast = make([]byte, ballastSizeBytes)
m.logger.Info("Using memory ballast", zap.Uint32("MiBs", m.cfg.SizeMiB))
}

m.logger.Info("Setting memory ballast", zap.Uint32("MiBs", uint32(ballastSizeBytes/megaBytes)))

return nil
}

Expand All @@ -44,9 +59,10 @@ func (m *memoryBallast) Shutdown(_ context.Context) error {
return nil
}

func newMemoryBallast(cfg *Config, logger *zap.Logger) *memoryBallast {
func newMemoryBallast(cfg *Config, logger *zap.Logger, getTotalMem func() (uint64, error)) *memoryBallast {
return &memoryBallast{
cfg: cfg,
logger: logger,
cfg: cfg,
logger: logger,
getTotalMem: getTotalMem,
}
}
72 changes: 51 additions & 21 deletions extension/ballastextension/memory_ballast_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,34 +23,64 @@ import (
"go.uber.org/zap"

"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/internal/iruntime"
)

func TestMemoryBallast(t *testing.T) {
config := &Config{
SizeMiB: 13,
tests := []struct {
name string
config *Config
getTotalMem func() (uint64, error)
expect int
}{
{
name: "test_abs_ballast",
config: &Config{
SizeMiB: 13,
},
getTotalMem: iruntime.TotalMemory,
expect: 13 * megaBytes,
},
{
name: "test_abs_ballast_priority",
config: &Config{
SizeMiB: 13,
SizeInPercentage: 20,
},
getTotalMem: iruntime.TotalMemory,
expect: 13 * megaBytes,
},
{
name: "test_ballast_zero_val",
config: &Config{},
getTotalMem: iruntime.TotalMemory,
expect: 0,
},
{
name: "test_ballast_in_percentage",
config: &Config{
SizeInPercentage: 20,
},
getTotalMem: mockTotalMem,
expect: 20 * megaBytes,
},
}

mbExt := newMemoryBallast(config, zap.NewNop())
require.NotNil(t, mbExt)
assert.Nil(t, mbExt.ballast)
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
mbExt := newMemoryBallast(tt.config, zap.NewNop(), tt.getTotalMem)
require.NotNil(t, mbExt)
assert.Nil(t, mbExt.ballast)

assert.NoError(t, mbExt.Start(context.Background(), componenttest.NewNopHost()))
assert.Equal(t, 13*megaBytes, len(mbExt.ballast))
assert.NoError(t, mbExt.Start(context.Background(), componenttest.NewNopHost()))
assert.Equal(t, tt.expect, len(mbExt.ballast))

assert.NoError(t, mbExt.Shutdown(context.Background()))
assert.Nil(t, mbExt.ballast)
assert.NoError(t, mbExt.Shutdown(context.Background()))
assert.Nil(t, mbExt.ballast)
})
}
}

func TestMemoryBallast_ZeroSize(t *testing.T) {
config := &Config{}

mbExt := newMemoryBallast(config, zap.NewNop())
require.NotNil(t, mbExt)
assert.Nil(t, mbExt.ballast)

assert.NoError(t, mbExt.Start(context.Background(), componenttest.NewNopHost()))
assert.Nil(t, mbExt.ballast)

assert.NoError(t, mbExt.Shutdown(context.Background()))
assert.Nil(t, mbExt.ballast)
func mockTotalMem() (uint64, error) {
return uint64(100 * megaBytes), nil
}
1 change: 1 addition & 0 deletions extension/ballastextension/testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ extensions:
memory_ballast:
memory_ballast/1:
size_mib: 123
size_in_percentage: 20

# Data pipeline is required to load the config.
receivers:
Expand Down
19 changes: 19 additions & 0 deletions extension/ballastextension/testdata/config_invalid.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
extensions:
memory_ballast:
size_in_percentage: 200

# Data pipeline is required to load the config.
receivers:
nop:
processors:
nop:
exporters:
nop:

service:
extensions: [memory_ballast]
pipelines:
traces:
receivers: [nop]
processors: [nop]
exporters: [nop]
2 changes: 1 addition & 1 deletion internal/cgroups/cgroups.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func NewCGroupsForCurrentProcess() (CGroups, error) {

// MemoryQuota returns the total memory a
// It is a result of `memory.limit_in_bytes`. If the value of
// `memory.limit_in_bytes` was not set (-1), the method returns `(-1, false, nil)`.
// `memory.limit_in_bytes` was not set (-1) or (9223372036854771712), the method returns `(-1, false, nil)`.
func (cg CGroups) MemoryQuota() (int64, bool, error) {
memCGroup, exists := cg[_cgroupSubsysMemory]
if !exists {
Expand Down
26 changes: 26 additions & 0 deletions internal/iruntime/mem_info.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package iruntime

import (
"github.com/shirou/gopsutil/mem"
)

// readMemInfo returns the total memory
// supports in linux, darwin and windows
func readMemInfo() (uint64, error) {
vmStat, err := mem.VirtualMemory()
return vmStat.Total, err
}
27 changes: 27 additions & 0 deletions internal/iruntime/mem_info_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package iruntime

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestReadMemInfo(t *testing.T) {
vmStat, err := readMemInfo()
assert.NoError(t, err)
assert.True(t, vmStat > 0)
}
17 changes: 15 additions & 2 deletions internal/iruntime/total_memory_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,13 @@ package iruntime

import "go.opentelemetry.io/collector/internal/cgroups"

// unlimitedMemorySize defines the bytes size when memory limit is not set
// for the container and process with cgroups
const unlimitedMemorySize = 9223372036854771712

// TotalMemory returns total available memory.
// This implementation is meant for linux and uses cgroups to determine available memory.
func TotalMemory() (int64, error) {
func TotalMemory() (uint64, error) {
cgroups, err := cgroups.NewCGroupsForCurrentProcess()
if err != nil {
return 0, err
Expand All @@ -29,5 +33,14 @@ func TotalMemory() (int64, error) {
if err != nil || !defined {
return 0, err
}
return memoryQuota, nil

if memoryQuota == unlimitedMemorySize {
totalMem, err := readMemInfo()
if err != nil {
return 0, err
}
return totalMem, nil
}

return uint64(memoryQuota), nil
}
Loading

0 comments on commit cbd822e

Please sign in to comment.