Skip to content

Commit

Permalink
fix: send periodic keepalive packets on eventstream connections (argo…
Browse files Browse the repository at this point in the history
  • Loading branch information
dcherman committed Feb 22, 2021
1 parent 0f9b22b commit d33b5cc
Show file tree
Hide file tree
Showing 6 changed files with 47 additions and 4 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ require (
github.com/aliyun/aliyun-oss-go-sdk v2.1.5+incompatible
github.com/antonmedv/expr v1.8.8
github.com/argoproj/argo-events v1.2.0
github.com/argoproj/pkg v0.3.0
github.com/argoproj/pkg v0.4.0
github.com/baiyubin/aliyun-sts-go-sdk v0.0.0-20180326062324-cfa1a18b161f // indirect
github.com/blushft/go-diagrams v0.0.0-20201006005127-c78c821223d9
github.com/colinmarc/hdfs v1.1.4-0.20180805212432-9746310a4d31
Expand Down
6 changes: 4 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,8 @@ github.com/ardielle/ardielle-go v1.5.2/go.mod h1:I4hy1n795cUhaVt/ojz83SNVCYIGsAF
github.com/ardielle/ardielle-tools v1.5.4/go.mod h1:oZN+JRMnqGiIhrzkRN9l26Cej9dEx4jeNG6A+AdkShk=
github.com/argoproj/argo-events v1.2.0 h1:CjF8hVUkeflhaOt9uWjJK6ai6b4pw0CCUUmOnmhWnNY=
github.com/argoproj/argo-events v1.2.0/go.mod h1:eY+egQNBLXAz/AF4mqgHsMMa4Aur7frHjUfBg+RpX04=
github.com/argoproj/pkg v0.3.0 h1:0xxNHc9duXZt1TlDVVsBzcGm8S/V5Bc3eNqawz3wFo8=
github.com/argoproj/pkg v0.3.0/go.mod h1:F4TZgInLUEjzsWFB/BTJBsewoEy0ucnKSq6vmQiD/yc=
github.com/argoproj/pkg v0.4.0 h1:tLd6PcI0Z5H00xlXv582xTDEqU1A8bMPoNBHczCZtEE=
github.com/argoproj/pkg v0.4.0/go.mod h1:pR2iuBmUVmNI3HoAf8I6+N87BuihvGuFrOQzK782RjU=
github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o=
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY=
Expand Down Expand Up @@ -267,6 +267,8 @@ github.com/fatih/color v1.9.0/go.mod h1:eQcE1qtQxscV5RaZvpXrrb8Drkc3/DdQ+uUYCNjL
github.com/fatih/structs v1.0.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M=
github.com/fatih/structs v1.1.0 h1:Q7juDM0QtcnhCpeyLGQKyg4TOIghuNXrkL32pHAUMxo=
github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M=
github.com/felixge/httpsnoop v1.0.1 h1:lvB5Jl89CsZtGIWuTcDM1E/vkVs49/Ml7JJe07l8SPQ=
github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/flynn/go-shlex v0.0.0-20150515145356-3f9db97f8568 h1:BHsljHzVlRcyQhjrss6TZTdY2VfCqZPbv5k3iBFa2ZQ=
github.com/flynn/go-shlex v0.0.0-20150515145356-3f9db97f8568/go.mod h1:xEzjJPgXI435gkrCt3MPfRiAkVrwSbHsst4LCFVfpJc=
github.com/form3tech-oss/jwt-go v3.2.2+incompatible h1:TcekIExNqud5crz4xD2pavyTgWiPvpYe4Xau31I0PRk=
Expand Down
9 changes: 9 additions & 0 deletions pkg/apiclient/logs-intermediary.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package apiclient
import (
"context"

"google.golang.org/grpc/metadata"

workflowpkg "github.com/argoproj/argo-workflows/v3/pkg/apiclient/workflow"
)

Expand All @@ -25,6 +27,13 @@ func (c *logsIntermediary) Recv() (*workflowpkg.LogEntry, error) {
}
}

func (c *logsIntermediary) SendHeader(metadata.MD) error {
// We invoke `SendHeader` in order to eagerly flush headers to allow us to send period
// keepalives when using HTTP/1 and Server Sent Events, so we need to implement this here,
// though we don't use the meta for anything.
return nil
}

func newLogsIntermediary(ctx context.Context) *logsIntermediary {
return &logsIntermediary{newAbstractIntermediary(ctx), make(chan *workflowpkg.LogEntry)}
}
9 changes: 9 additions & 0 deletions pkg/apiclient/watch-intermediary.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package apiclient
import (
"context"

"google.golang.org/grpc/metadata"

workflowpkg "github.com/argoproj/argo-workflows/v3/pkg/apiclient/workflow"
)

Expand All @@ -25,6 +27,13 @@ func (w watchIntermediary) Recv() (*workflowpkg.WorkflowWatchEvent, error) {
}
}

func (w *watchIntermediary) SendHeader(metadata.MD) error {
// We invoke `SendHeader` in order to eagerly flush headers to allow us to send period
// keepalives when using HTTP/1 and Server Sent Events, so we need to implement this here,
// though we don't use the meta for anything.
return nil
}

func newWatchIntermediary(ctx context.Context) *watchIntermediary {
return &watchIntermediary{newAbstractIntermediary(ctx), make(chan *workflowpkg.WorkflowWatchEvent)}
}
2 changes: 1 addition & 1 deletion server/workflow/test_server_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func (t testServerStream) SetHeader(md metadata.MD) error {
}

func (t testServerStream) SendHeader(md metadata.MD) error {
panic("implement me")
return nil
}

func (t testServerStream) SetTrailer(md metadata.MD) {
Expand Down
23 changes: 23 additions & 0 deletions server/workflow/workflow_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

log "github.com/sirupsen/logrus"
"golang.org/x/net/context"
"google.golang.org/grpc/metadata"
corev1 "k8s.io/api/core/v1"
apierr "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -204,6 +205,15 @@ func (s *workflowServer) WatchWorkflows(req *workflowpkg.WatchWorkflowsRequest,
log.Debug("Piping events to channel")
defer log.Debug("Result channel done")

// Eagerly send the headers so that we can begin our keepalive loop if no results are received
// immediately. Without this, we cannot detect a streaming response, and we can't write to the
// response since a subsequent write by the stream causes an error.
err = ws.SendHeader(metadata.MD{})

if err != nil {
return err
}

for {
select {
case <-ctx.Done():
Expand Down Expand Up @@ -250,6 +260,12 @@ func (s *workflowServer) WatchEvents(req *workflowpkg.WatchEventsRequest, ws wor
log.Debug("Piping events to channel")
defer log.Debug("Result channel done")

err = ws.SendHeader(metadata.MD{})

if err != nil {
return err
}

for {
select {
case <-ctx.Done():
Expand Down Expand Up @@ -507,6 +523,13 @@ func (s *workflowServer) PodLogs(req *workflowpkg.WorkflowLogRequest, ws workflo
return err
}
req.Name = wf.Name

err = ws.SendHeader(metadata.MD{})

if err != nil {
return err
}

return logs.WorkflowLogs(ctx, wfClient, kubeClient, req, ws)
}

Expand Down

0 comments on commit d33b5cc

Please sign in to comment.