Introduction
The pipe Go package offers an easy way for Go programs to make use of other applications available in the system via a Unix-like pipeline mechanism. The input and output streams in such pipelines do work as streams, so large content can go across the pipeline without being loaded entirely in memory.
The following blog post introduces the concept:
API documentation
The API documentation may be accessed via the package path itself:
Building and installing
$ go get gopkg.in/pipe.v2
Examples
Simple pipeline
This simple example implements the equivalent of "cat article.ps | lpr":
package main
import (
"fmt"
"gopkg.in/pipe.v2"
)
func main() {
p := pipe.Line(
pipe.ReadFile("article.ps"),
pipe.Exec("lpr"),
)
output, err := pipe.CombinedOutput(p)
if err != nil {
fmt.Printf("%v\n", err)
}
fmt.Printf("%s", output)
}
Rich pipeline
The following example is a bit more interesting. It grabs the free space information for the /boot partition, and writes it both to a file named "boot.txt" in the local directory, and to an in-memory buffer. It would be more easily implemented via pipe.TeeFile, but this shows more clearly the flexibility of the system.
package main
import (
"bytes"
"fmt"
"gopkg.in/pipe.v2"
)
func main() {
b := &bytes.Buffer{}
p := pipe.Line(
pipe.Exec("df"),
pipe.Filter(func(line []byte) bool {
return bytes.HasSuffix(line, []byte(" /boot"))
}),
pipe.Tee(b),
pipe.WriteFile("boot.txt", 0644),
)
err := pipe.Run(p)
if err != nil {
fmt.Printf("%v\n", err)
}
fmt.Print(b.String())
}
Using scripts
The examples so far demonstrated the use of pipelines, which connect the output of the entry N to the input of entry N+1. In some cases, though, it is useful to run entries sequentially. For example, the equivalent of "cat article.ps | lpr; mv article.ps{,.done}" using the pipe package would be:
p := pipe.Script(
pipe.Line(
pipe.ReadFile("article.ps"),
pipe.Exec("lpr"),
),
pipe.RenameFile("article.ps", "article.ps.done"),
)
The following example demonstrates that concept being used in a richer pipe. It outputs a passwd line for the root user, and then streams all the content from the /etc/passwd file, except for the line starting with "root:". The result is then streamed to os.Stdout.
package main
import (
"bytes"
"fmt"
"os"
"gopkg.in/pipe.v2"
)
func main() {
prefix := []byte("root:")
script := pipe.Script(
pipe.Println("root:x:0:0:root:/root:/bin/sh"),
pipe.Line(
pipe.ReadFile("/etc/passwd"),
pipe.Filter(func(line []byte) bool {
return !bytes.HasPrefix(line, prefix)
}),
),
)
p := pipe.Line(
script,
pipe.Write(os.Stdout),
)
err := pipe.Run(p)
if err != nil {
fmt.Printf("%v\n", err)
}
}
Timeouts
There's a version of each of the runner functions (Run, Output, etc) with a Timeout suffix (RunTimeout, OutputTimeout, etc) that includes support for a time limit. If the pipe takes longer to run than the provided time limit, all the pending tasks are aborted.
Extending with custom logic
This is the implementation of pipe.MkDir:
func MkDir(dir string, perm os.FileMode) Pipe {
return func(s *State) error {
return os.Mkdir(s.Path(dir), perm)
}
}
Note the use of State.Path to turn the provided directory into a path relative to the pipe's current directory.
This implements a trivial echo-like function:
func Echo(str string) Pipe {
return TaskFunc(func(s *State) error {
_, err := s.Stdout.Write([]byte(str))
return err
})
}
In this case, the TaskFunc helper is used to return a Pipe that registers a simple Task.
Source code and bug reports
Source code and bug reporting are handled in GitHub:
License
The pipe package is made available under the simplified BSD license.
Contact
Gustavo Niemeyer <[email protected]>