Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make sure we wait for stdout/stderr pipes to be closed before calling Wait() #299

Merged
merged 1 commit into from
Feb 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 10 additions & 8 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,9 @@ type Client struct {
// goroutines.
clientWaitGroup sync.WaitGroup

// stderrWaitGroup is used to prevent the command's Wait() function from
// being called before we've finished reading from the stderr pipe.
stderrWaitGroup sync.WaitGroup
// pipesWaitGroup is used to prevent the command's Wait() function from
// being called before we've finished reading from the stdout and stderr pipe.
pipesWaitGroup sync.WaitGroup

// processKilled is used for testing only, to flag when the process was
// forcefully killed.
Expand Down Expand Up @@ -756,8 +756,8 @@ func (c *Client) Start() (addr net.Addr, err error) {

// Start goroutine that logs the stderr
c.clientWaitGroup.Add(1)
c.stderrWaitGroup.Add(1)
// logStderr calls Done()
c.pipesWaitGroup.Add(1)
// logStderr calls c.pipesWaitGroup.Done()
go c.logStderr(runner.Name(), runner.Stderr())

c.clientWaitGroup.Add(1)
Expand All @@ -767,9 +767,9 @@ func (c *Client) Start() (addr net.Addr, err error) {

defer c.clientWaitGroup.Done()

// wait to finish reading from stderr since the stderr pipe reader
// wait to finish reading from stdout/stderr since the stdout/stderr pipe readers
// will be closed by the subsequent call to cmd.Wait().
c.stderrWaitGroup.Wait()
c.pipesWaitGroup.Wait()

// Wait for the command to end.
err := runner.Wait(context.Background())
Expand All @@ -792,8 +792,10 @@ func (c *Client) Start() (addr net.Addr, err error) {
// out of stdout
linesCh := make(chan string)
c.clientWaitGroup.Add(1)
c.pipesWaitGroup.Add(1)
go func() {
defer c.clientWaitGroup.Done()
defer c.pipesWaitGroup.Done()
defer close(linesCh)

scanner := bufio.NewScanner(runner.Stdout())
Expand Down Expand Up @@ -1159,7 +1161,7 @@ func (c *Client) getGRPCMuxer(addr net.Addr) (*grpcmux.GRPCClientMuxer, error) {

func (c *Client) logStderr(name string, r io.Reader) {
defer c.clientWaitGroup.Done()
defer c.stderrWaitGroup.Done()
defer c.pipesWaitGroup.Done()
l := c.logger.Named(filepath.Base(name))

reader := bufio.NewReaderSize(r, c.config.PluginLogBufferSize)
Expand Down
4 changes: 2 additions & 2 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1503,7 +1503,7 @@ this line is short

reader := strings.NewReader(msg)

c.stderrWaitGroup.Add(1)
c.pipesWaitGroup.Add(1)
c.logStderr(c.config.Cmd.Path, reader)
read := stderr.String()

Expand Down Expand Up @@ -1531,7 +1531,7 @@ func TestClient_logStderrParseJSON(t *testing.T) {
{"@message": "this is a large message that is more than 64 bytes long", "@level": "info"}`
reader := strings.NewReader(msg)

c.stderrWaitGroup.Add(1)
c.pipesWaitGroup.Add(1)
c.logStderr(c.config.Cmd.Path, reader)
logs := strings.Split(strings.TrimSpace(logBuf.String()), "\n")

Expand Down