Introduction

The pipe Go package offers an easy way for Go programs to make use of other applications available in the system via a Unix-like pipeline mechanism. The input and output streams in such pipelines do work as streams, so large content can go across the pipeline without being loaded entirely in memory.

The following blog post introduces the concept:

API documentation

The API documentation may be accessed via the package path itself:

Building and installing

$ go get gopkg.in/pipe.v2

Examples

Simple pipeline

This simple example implements the equivalent of "cat article.ps | lpr":

package main import ( "fmt" "gopkg.in/pipe.v2" ) func main() { p := pipe.Line( pipe.ReadFile("article.ps"), pipe.Exec("lpr"), ) output, err := pipe.CombinedOutput(p) if err != nil { fmt.Printf("%v\n", err) } fmt.Printf("%s", output) }

Rich pipeline

The following example is a bit more interesting. It grabs the free space information for the /boot partition, and writes it both to a file named "boot.txt" in the local directory, and to an in-memory buffer. It would be more easily implemented via pipe.TeeFile, but this shows more clearly the flexibility of the system.

package main import ( "bytes" "fmt" "gopkg.in/pipe.v2" ) func main() { b := &bytes.Buffer{} p := pipe.Line( pipe.Exec("df"), pipe.Filter(func(line []byte) bool { return bytes.HasSuffix(line, []byte(" /boot")) }), pipe.Tee(b), pipe.WriteFile("boot.txt", 0644), ) err := pipe.Run(p) if err != nil { fmt.Printf("%v\n", err) } fmt.Print(b.String()) }

Using scripts

The examples so far demonstrated the use of pipelines, which connect the output of the entry N to the input of entry N+1. In some cases, though, it is useful to run entries sequentially. For example, the equivalent of "cat article.ps | lpr; mv article.ps{,.done}" using the pipe package would be:

p := pipe.Script( pipe.Line( pipe.ReadFile("article.ps"), pipe.Exec("lpr"), ), pipe.RenameFile("article.ps", "article.ps.done"), )

The following example demonstrates that concept being used in a richer pipe. It outputs a passwd line for the root user, and then streams all the content from the /etc/passwd file, except for the line starting with "root:". The result is then streamed to os.Stdout.

package main import ( "bytes" "fmt" "os" "gopkg.in/pipe.v2" ) func main() { prefix := []byte("root:") script := pipe.Script( pipe.Println("root:x:0:0:root:/root:/bin/sh"), pipe.Line( pipe.ReadFile("/etc/passwd"), pipe.Filter(func(line []byte) bool { return !bytes.HasPrefix(line, prefix) }), ), ) p := pipe.Line( script, pipe.Write(os.Stdout), ) err := pipe.Run(p) if err != nil { fmt.Printf("%v\n", err) } }

Timeouts

There's a version of each of the runner functions (Run, Output, etc) with a Timeout suffix (RunTimeout, OutputTimeout, etc) that includes support for a time limit. If the pipe takes longer to run than the provided time limit, all the pending tasks are aborted.

Extending with custom logic

This is the implementation of pipe.MkDir:

func MkDir(dir string, perm os.FileMode) Pipe { return func(s *State) error { return os.Mkdir(s.Path(dir), perm) } }

Note the use of State.Path to turn the provided directory into a path relative to the pipe's current directory.

This implements a trivial echo-like function:

func Echo(str string) Pipe { return TaskFunc(func(s *State) error { _, err := s.Stdout.Write([]byte(str)) return err }) }

In this case, the TaskFunc helper is used to return a Pipe that registers a simple Task.

Source code and bug reports

Source code and bug reporting are handled in GitHub:

License

The pipe package is made available under the simplified BSD license.

Contact

Gustavo Niemeyer <[email protected]>