Skip to content

Commit

Permalink
fix(server): serve artifacts directly from disk to support large arti…
Browse files Browse the repository at this point in the history
…facts (argoproj#4589)

Signed-off-by: Daniel Herman <[email protected]>
  • Loading branch information
dcherman committed Nov 24, 2020
1 parent e3aaf2f commit 9ee4d44
Showing 1 changed file with 29 additions and 25 deletions.
54 changes: 29 additions & 25 deletions server/artifacts/artifact_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ import (
"net/http"
"os"
"path"
"strconv"
"strings"
"time"

log "github.com/sirupsen/logrus"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -62,15 +64,12 @@ func (a *ArtifactServer) GetArtifact(w http.ResponseWriter, r *http.Request) {
return
}

data, filename, err := a.getArtifact(ctx, wf, nodeId, artifactName)
err = a.returnArtifact(ctx, w, r, wf, nodeId, artifactName)

if err != nil {
a.serverInternalError(err, w)
return
}

w.Header().Add("Content-Disposition", fmt.Sprintf(`filename="%s"`, filename))
a.ok(w, data)
}

func (a *ArtifactServer) GetArtifactByUID(w http.ResponseWriter, r *http.Request) {
Expand All @@ -96,15 +95,12 @@ func (a *ArtifactServer) GetArtifactByUID(w http.ResponseWriter, r *http.Request
return
}

data, filename, err := a.getArtifact(ctx, wf, nodeId, artifactName)
err = a.returnArtifact(ctx, w, r, wf, nodeId, artifactName)

if err != nil {
a.serverInternalError(err, w)
return
}

w.Header().Add("Content-Disposition", fmt.Sprintf(`filename="%s"`, filename))
a.ok(w, data)
}

func (a *ArtifactServer) gateKeeping(r *http.Request) (context.Context, error) {
Expand All @@ -123,50 +119,58 @@ func (a *ArtifactServer) gateKeeping(r *http.Request) (context.Context, error) {
return a.gatekeeper.Context(ctx)
}

func (a *ArtifactServer) ok(w http.ResponseWriter, data []byte) {
w.WriteHeader(200)
_, err := w.Write(data)
if err != nil {
a.serverInternalError(err, w)
}
}

func (a *ArtifactServer) serverInternalError(err error, w http.ResponseWriter) {
w.WriteHeader(500)
_, _ = w.Write([]byte(err.Error()))
}

func (a *ArtifactServer) getArtifact(ctx context.Context, wf *wfv1.Workflow, nodeId, artifactName string) ([]byte, string, error) {
func (a *ArtifactServer) returnArtifact(ctx context.Context, w http.ResponseWriter, r *http.Request, wf *wfv1.Workflow, nodeId, artifactName string) error {
kubeClient := auth.GetKubeClient(ctx)

art := wf.Status.Nodes[nodeId].Outputs.GetArtifactByName(artifactName)
if art == nil {
return nil, "", fmt.Errorf("artifact not found")
return fmt.Errorf("artifact not found")
}

driver, err := a.artDriverFactory(art, resources{kubeClient, wf.Namespace})
if err != nil {
return nil, "", err
return err
}
tmp, err := ioutil.TempFile("/tmp", "artifact")
if err != nil {
return nil, "", err
return err
}
tmpPath := tmp.Name()
defer func() { _ = os.Remove(tmpPath) }()

err = driver.Load(art, tmpPath)
if err != nil {
return nil, "", err
return err
}

file, err := os.Open(tmpPath)

if err != nil {
return err
}

file, err := ioutil.ReadFile(tmpPath)
defer file.Close()

stats, err := file.Stat()

if err != nil {
return nil, "", err
return err
}
log.WithFields(log.Fields{"size": len(file)}).Debug("Artifact file size")

return file, path.Base(art.GetKey()), nil
contentLength := strconv.FormatInt(stats.Size(), 10)
log.WithFields(log.Fields{"size": contentLength}).Debug("Artifact file size")

w.Header().Add("Content-Disposition", fmt.Sprintf(`filename="%s"`, path.Base(art.GetKey())))
w.WriteHeader(200)

http.ServeContent(w, r, "", time.Time{}, file)

return nil
}

func (a *ArtifactServer) getWorkflowAndValidate(ctx context.Context, namespace string, workflowName string) (*wfv1.Workflow, error) {
Expand Down

0 comments on commit 9ee4d44

Please sign in to comment.