Skip to content

Commit

Permalink
Make sure we wait for stdout/stderr pipes to be closed before calling…
Browse files Browse the repository at this point in the history
… runner.Wait()
  • Loading branch information
gabivlj committed Feb 6, 2024
1 parent 586d14f commit 68e4f1f
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 10 deletions.
18 changes: 10 additions & 8 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,9 @@ type Client struct {
// goroutines.
clientWaitGroup sync.WaitGroup

// stderrWaitGroup is used to prevent the command's Wait() function from
// being called before we've finished reading from the stderr pipe.
stderrWaitGroup sync.WaitGroup
// pipesWaitGroup is used to prevent the command's Wait() function from
// being called before we've finished reading from the stdout and stderr pipe.
pipesWaitGroup sync.WaitGroup

// processKilled is used for testing only, to flag when the process was
// forcefully killed.
Expand Down Expand Up @@ -756,8 +756,8 @@ func (c *Client) Start() (addr net.Addr, err error) {

// Start goroutine that logs the stderr
c.clientWaitGroup.Add(1)
c.stderrWaitGroup.Add(1)
// logStderr calls Done()
c.pipesWaitGroup.Add(1)
// logStderr calls c.pipesWaitGroup.Done()
go c.logStderr(runner.Name(), runner.Stderr())

c.clientWaitGroup.Add(1)
Expand All @@ -767,9 +767,9 @@ func (c *Client) Start() (addr net.Addr, err error) {

defer c.clientWaitGroup.Done()

// wait to finish reading from stderr since the stderr pipe reader
// wait to finish reading from stdout/stderr since the stdout/stderr pipe readers
// will be closed by the subsequent call to cmd.Wait().
c.stderrWaitGroup.Wait()
c.pipesWaitGroup.Wait()

// Wait for the command to end.
err := runner.Wait(context.Background())
Expand All @@ -792,8 +792,10 @@ func (c *Client) Start() (addr net.Addr, err error) {
// out of stdout
linesCh := make(chan string)
c.clientWaitGroup.Add(1)
c.pipesWaitGroup.Add(1)
go func() {
defer c.clientWaitGroup.Done()
defer c.pipesWaitGroup.Done()
defer close(linesCh)

scanner := bufio.NewScanner(runner.Stdout())
Expand Down Expand Up @@ -1159,7 +1161,7 @@ func (c *Client) getGRPCMuxer(addr net.Addr) (*grpcmux.GRPCClientMuxer, error) {

func (c *Client) logStderr(name string, r io.Reader) {
defer c.clientWaitGroup.Done()
defer c.stderrWaitGroup.Done()
defer c.pipesWaitGroup.Done()
l := c.logger.Named(filepath.Base(name))

reader := bufio.NewReaderSize(r, c.config.PluginLogBufferSize)
Expand Down
4 changes: 2 additions & 2 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1503,7 +1503,7 @@ this line is short

reader := strings.NewReader(msg)

c.stderrWaitGroup.Add(1)
c.pipesWaitGroup.Add(1)
c.logStderr(c.config.Cmd.Path, reader)
read := stderr.String()

Expand Down Expand Up @@ -1531,7 +1531,7 @@ func TestClient_logStderrParseJSON(t *testing.T) {
{"@message": "this is a large message that is more than 64 bytes long", "@level": "info"}`
reader := strings.NewReader(msg)

c.stderrWaitGroup.Add(1)
c.pipesWaitGroup.Add(1)
c.logStderr(c.config.Cmd.Path, reader)
logs := strings.Split(strings.TrimSpace(logBuf.String()), "\n")

Expand Down

0 comments on commit 68e4f1f

Please sign in to comment.