Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[extension/opamp]: Add mechanism to detect whether the collector has been orphaned #32564

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
WIP monitoring parent process for orphan detection
  • Loading branch information
BinaryFissionGames committed May 3, 2024
commit 27cdca2e43b61c4be329dbecf5a63fa04f245b60
5 changes: 5 additions & 0 deletions extension/opampextension/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ type Config struct {

// Agent descriptions contains options to modify the AgentDescription message
AgentDescription AgentDescription `mapstructure:"agent_description"`

// PPID is the process ID of the parent for the collector. If the PPID is specified,
// the extension will continuously poll for the status of the parent process, and emit a fatal error
// when the parent process is closed.
PPID int `mapstructure:"ppid"`
}

type AgentDescription struct {
Expand Down
5 changes: 5 additions & 0 deletions extension/opampextension/monitor_ppid.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package opampextension

import "time"

var orphanPollInterval = 5 * time.Second
evan-bradley marked this conversation as resolved.
Show resolved Hide resolved
34 changes: 34 additions & 0 deletions extension/opampextension/monitor_ppid_others.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
//go:build !windows

package opampextension

import (
"context"
"fmt"
"os"
"time"

"go.opentelemetry.io/collector/component"
)

// getppid is a function to get ppid of the process. It is mocked in testing.
var getppid = os.Getppid

func monitorPPID(ctx context.Context, ppid int, reportStatus func(*component.StatusEvent)) {
// On unix-based systems, when the parent process dies orphaned processes
// are re-parented to be under the init system process (ppid becomes 1).
for {
if getppid() != ppid {
err := fmt.Errorf("collector was orphaned, parent pid is no longer %d", ppid)
status := component.NewFatalErrorEvent(err)
reportStatus(status)
return
}

select {
case <-time.After(orphanPollInterval):
case <-ctx.Done():
return
}
}
}
60 changes: 60 additions & 0 deletions extension/opampextension/monitor_ppid_others_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
//go:build !windows

package opampextension

import (
"context"
"os"
"testing"
"time"

"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
)

func TestMonitorPPIDOthers(t *testing.T) {
t.Run("Does not trigger if ppid stays as specified", func(t *testing.T) {
statusReportFunc := func(*component.StatusEvent) {
require.FailNow(t, "status report function should not be called")
}

monitorCtx, monitorCtxCancel := context.WithCancel(context.Background())
monitorCtxCancel()

monitorPPID(monitorCtx, os.Getppid(), statusReportFunc)
})

t.Run("Emits fatal status if ppid changes", func(t *testing.T) {
numPolls := 0
setGetPPID(t, func() int {
numPolls++
if numPolls > 1 {
return 1
}
return os.Getppid()
})

setOrphanPollInterval(t, 10*time.Millisecond)

var statusEvent *component.StatusEvent
statusReportFunc := func(evt *component.StatusEvent) {
if statusEvent != nil {
require.FailNow(t, "status report function should not be called twice")
}
statusEvent = evt
}

monitorPPID(context.Background(), os.Getppid(), statusReportFunc)
require.NotNil(t, statusEvent)
require.Equal(t, component.StatusFatalError, statusEvent.Status())
})

}

func setGetPPID(t *testing.T, newFunc func() int) {
old := getppid
getppid = newFunc
t.Cleanup(func() {
getppid = old
})
}
14 changes: 14 additions & 0 deletions extension/opampextension/monitor_ppid_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package opampextension

import (
"testing"
"time"
)

func setOrphanPollInterval(t *testing.T, newInterval time.Duration) {
old := orphanPollInterval
orphanPollInterval = newInterval
t.Cleanup(func() {
orphanPollInterval = old
})
}
55 changes: 55 additions & 0 deletions extension/opampextension/monitor_ppid_windows.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
//go:build windows

package opampextension

import (
"context"
"fmt"
"os"
"time"

"go.opentelemetry.io/collector/component"
)

func monitorPPID(ctx context.Context, ppid int, reportStatus func(*component.StatusEvent)) {
// On Windows systems, we can look up and synchronously wait for the process to exit.
// This is not possible on other systems, since Wait doesn't work on most systems unless the
// process is a child of the current one (see doc on process.Wait).
for {
process, err := os.FindProcess(ppid)
if err != nil {
err := fmt.Errorf("collector was orphaned, error finding process %d: %w", ppid, err)
status := component.NewFatalErrorEvent(err)
reportStatus(status)
return
}

if process == nil {
err := fmt.Errorf("collector was orphaned, process %d does not exist", ppid)
status := component.NewFatalErrorEvent(err)
reportStatus(status)
return
}

processState, err := process.Wait()
if err != nil {
err := fmt.Errorf("collector was orphaned, error while waiting on process %d to exit: %w", ppid, err)
status := component.NewFatalErrorEvent(err)
reportStatus(status)
return
}

if processState.Exited() {
err := fmt.Errorf("collector was orphaned, process %d exited: %w", ppid, err)
status := component.NewFatalErrorEvent(err)
reportStatus(status)
return
}

select {
case <-time.After(orphanPollInterval):
case <-ctx.Done():
return
}
}
}
35 changes: 35 additions & 0 deletions extension/opampextension/monitor_ppid_windows_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
//go:build windows

package opampextension

import (
"context"
"os/exec"
"testing"
"time"

"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
)

func TestMonitorPPIDWindows(t *testing.T) {
cmdContext, cmdContextCancel := context.WithCancel(context.Background())
t.Cleanup(cmdContextCancel)

cmd := exec.CommandContext(cmdContext, "/bin/sh", "-c", "sleep 1000")
err := cmd.Start()
require.NoError(t, err)

statusReportFunc := func(*component.StatusEvent) {
require.FailNow(t, "status report function should not be called")
}

monitorCtx, monitorCtxCancel := context.WithCancel(context.Background())

go func() {
time.Sleep(1 * time.Second)
monitorCtxCancel()
}()

monitorPPID(monitorCtx, cmd.Process.Pid, statusReportFunc)
}