Skip to content

Commit

Permalink
Access ballast size from Ballast extension via Host.GetExtensions int…
Browse files Browse the repository at this point in the history
…erface (open-telemetry#3634)

* Remove the global ballast size variable in ballast extension
* Access Ballast extension in Memory Limiter processor via Host.GetExtensions 

**Link to tracking Issue:** 
open-telemetry#2516
  • Loading branch information
mxiamxia authored Jul 21, 2021
1 parent f46871a commit 062e64f
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 29 deletions.
35 changes: 17 additions & 18 deletions extension/ballastextension/memory_ballast.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,51 +24,50 @@ import (

const megaBytes = 1024 * 1024

var ballastSizeBytes uint64

type memoryBallast struct {
cfg *Config
logger *zap.Logger
ballast []byte
getTotalMem func() (uint64, error)
type MemoryBallast struct {
cfg *Config
logger *zap.Logger
ballast []byte
ballastSizeBytes uint64
getTotalMem func() (uint64, error)
}

func (m *memoryBallast) Start(_ context.Context, _ component.Host) error {
func (m *MemoryBallast) Start(_ context.Context, _ component.Host) error {
// absolute value supersedes percentage setting
if m.cfg.SizeMiB > 0 {
ballastSizeBytes = m.cfg.SizeMiB * megaBytes
m.ballastSizeBytes = m.cfg.SizeMiB * megaBytes
} else {
totalMemory, err := m.getTotalMem()
if err != nil {
return err
}
ballastPercentage := m.cfg.SizeInPercentage
ballastSizeBytes = ballastPercentage * totalMemory / 100
m.ballastSizeBytes = ballastPercentage * totalMemory / 100
}

if ballastSizeBytes > 0 {
m.ballast = make([]byte, ballastSizeBytes)
if m.ballastSizeBytes > 0 {
m.ballast = make([]byte, m.ballastSizeBytes)
}

m.logger.Info("Setting memory ballast", zap.Uint32("MiBs", uint32(ballastSizeBytes/megaBytes)))
m.logger.Info("Setting memory ballast", zap.Uint32("MiBs", uint32(m.ballastSizeBytes/megaBytes)))

return nil
}

func (m *memoryBallast) Shutdown(_ context.Context) error {
func (m *MemoryBallast) Shutdown(_ context.Context) error {
m.ballast = nil
return nil
}

func newMemoryBallast(cfg *Config, logger *zap.Logger, getTotalMem func() (uint64, error)) *memoryBallast {
return &memoryBallast{
func newMemoryBallast(cfg *Config, logger *zap.Logger, getTotalMem func() (uint64, error)) *MemoryBallast {
return &MemoryBallast{
cfg: cfg,
logger: logger,
getTotalMem: getTotalMem,
}
}

// GetBallastSize returns the current ballast memory setting in bytes
func GetBallastSize() uint64 {
return ballastSizeBytes
func (m *MemoryBallast) GetBallastSize() uint64 {
return m.ballastSizeBytes
}
1 change: 1 addition & 0 deletions processor/memorylimiter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ func createTracesProcessor(
nextConsumer,
ml.processTraces,
processorhelper.WithCapabilities(processorCapabilities),
processorhelper.WithStart(ml.start),
processorhelper.WithShutdown(ml.shutdown))
}

Expand Down
28 changes: 18 additions & 10 deletions processor/memorylimiter/memorylimiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/extension/ballastextension"
"go.opentelemetry.io/collector/internal/iruntime"
Expand Down Expand Up @@ -89,9 +90,6 @@ const minGCIntervalWhenSoftLimited = 10 * time.Second

// newMemoryLimiter returns a new memorylimiter processor.
func newMemoryLimiter(logger *zap.Logger, cfg *Config) (*memoryLimiter, error) {
// Get ballast size in bytes from ballastextension
ballastSize := ballastextension.GetBallastSize()

if cfg.CheckInterval <= 0 {
return nil, errCheckIntervalOutOfRange
}
Expand All @@ -105,14 +103,13 @@ func newMemoryLimiter(logger *zap.Logger, cfg *Config) (*memoryLimiter, error) {
}

logger.Info("Memory limiter configured",
zap.Uint64("limit_mib", usageChecker.memAllocLimit),
zap.Uint64("spike_limit_mib", usageChecker.memSpikeLimit),
zap.Uint64("limit_mib", usageChecker.memAllocLimit/mibBytes),
zap.Uint64("spike_limit_mib", usageChecker.memSpikeLimit/mibBytes),
zap.Duration("check_interval", cfg.CheckInterval))

ml := &memoryLimiter{
usageChecker: *usageChecker,
memCheckWait: cfg.CheckInterval,
ballastSize: ballastSize,
ticker: time.NewTicker(cfg.CheckInterval),
readMemStatsFn: runtime.ReadMemStats,
logger: logger,
Expand All @@ -122,8 +119,6 @@ func newMemoryLimiter(logger *zap.Logger, cfg *Config) (*memoryLimiter, error) {
}),
}

ml.startMonitoring()

return ml, nil
}

Expand All @@ -138,12 +133,25 @@ func getMemUsageChecker(cfg *Config, logger *zap.Logger) (*memUsageChecker, erro
return nil, fmt.Errorf("failed to get total memory, use fixed memory settings (limit_mib): %w", err)
}
logger.Info("Using percentage memory limiter",
zap.Uint64("total_memory", totalMemory),
zap.Uint64("total_memory_mib", totalMemory/mibBytes),
zap.Uint32("limit_percentage", cfg.MemoryLimitPercentage),
zap.Uint32("spike_limit_percentage", cfg.MemorySpikePercentage))
return newPercentageMemUsageChecker(totalMemory, uint64(cfg.MemoryLimitPercentage), uint64(cfg.MemorySpikePercentage))
}

func (ml *memoryLimiter) start(_ context.Context, host component.Host) error {
extensions := host.GetExtensions()
for _, extension := range extensions {
if ext, ok := extension.(*ballastextension.MemoryBallast); ok {
ml.ballastSize = ext.GetBallastSize()
break
}
}

ml.startMonitoring()
return nil
}

func (ml *memoryLimiter) shutdown(context.Context) error {
ml.ticker.Stop()
return nil
Expand Down Expand Up @@ -245,7 +253,7 @@ func (ml *memoryLimiter) setForcingDrop(b bool) {
}

func memstatToZapField(ms *runtime.MemStats) zap.Field {
return zap.Uint64("cur_mem_mib", ms.Alloc/1024/1024)
return zap.Uint64("cur_mem_mib", ms.Alloc/mibBytes)
}

func (ml *memoryLimiter) doGCandReadMemStats() *runtime.MemStats {
Expand Down
2 changes: 1 addition & 1 deletion processor/memorylimiter/memorylimiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,7 @@ func TestBallastSizeMiB(t *testing.T) {
ballastExtCfg.SizeMiB = tt.ballastExtBallastSizeSetting
ballastExt, _ := ballastExtFactory.CreateExtension(ctx, extCreateSet, ballastExtCfg)
ballastExt.Start(ctx, nil)
assert.Equal(t, tt.expectResult, tt.expectedMemLimiterBallastSize*mibBytes == ballastextension.GetBallastSize())
assert.Equal(t, tt.expectResult, tt.expectedMemLimiterBallastSize*mibBytes == ballastExt.(*ballastextension.MemoryBallast).GetBallastSize())
})
}
}

0 comments on commit 062e64f

Please sign in to comment.