Skip to content

Commit

Permalink
Data pipeline for concurrently processing images
Browse files Browse the repository at this point in the history
  • Loading branch information
aditya43 committed May 31, 2021
1 parent 3a1589a commit cdbc80a
Show file tree
Hide file tree
Showing 3 changed files with 192 additions and 1 deletion.
181 changes: 181 additions & 0 deletions 07-data-pipelines/04-image-processing-pipeline/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
package main

import (
"fmt"
"image"
"log"
"net/http"
"os"
"path/filepath"
"sync"
"time"

"github.com/disintegration/imaging"
)

// Pipeline
// walkfile ----------> processImage -----------> saveImage
// (paths) (results)

type result struct {
srcImagePath string
thumbnailImage *image.NRGBA
err error
}

// Image processing - Pipeline
// Input - directory with images.
// output - thumbnail images
func main() {
if len(os.Args) < 2 {
log.Fatal("need to send directory path of images")
}
start := time.Now()
err := setupPipeLine(os.Args[1])

if err != nil {
log.Fatal(err)
}
fmt.Printf("Time taken: %s\n", time.Since(start))
}

func setupPipeLine(root string) error {
done := make(chan struct{})
defer close(done)

// do the file walk
paths, errc := walkFiles(done, root)

// process the image
results := processImage(done, paths)

// save thumbnail images
for r := range results {
if r.err != nil {
return r.err
}
saveThumbnail(r.srcImagePath, r.thumbnailImage)
}

// check for error on the channel, from walkfiles stage.
if err := <-errc; err != nil {
return err
}
return nil
}

func walkFiles(done <-chan struct{}, root string) (<-chan string, <-chan error) {

// create output channels
paths := make(chan string)
errc := make(chan error, 1)

go func() {
defer close(paths)
errc <- filepath.Walk(root, func(path string, info os.FileInfo, err error) error {

// filter out error
if err != nil {
return err
}

// check if it is file
if !info.Mode().IsRegular() {
return nil
}

// check if it is image/jpeg
contentType, _ := getFileContentType(path)
if contentType != "image/jpeg" {
return nil
}

// send file path to next stage
select {
case paths <- path:
case <-done:
return fmt.Errorf("walk cancelled")
}
return nil
})
}()
return paths, errc
}

func processImage(done <-chan struct{}, paths <-chan string) <-chan result {
results := make(chan result)
var wg sync.WaitGroup

thumbnailer := func() {
for srcImagePath := range paths {
srcImage, err := imaging.Open(srcImagePath)
if err != nil {
select {
case results <- result{srcImagePath, nil, err}:
case <-done:
return
}
}
thumbnailImage := imaging.Thumbnail(srcImage, 100, 100, imaging.Lanczos)

select {
case results <- result{srcImagePath, thumbnailImage, err}:
case <-done:
return
}
}
}

const numThumbnailer = 5
for i := 0; i < numThumbnailer; i++ {
wg.Add(1)
go func() {
thumbnailer()
wg.Done()
}()
}

go func() {
wg.Wait()
close(results)
}()
return results
}

// saveThumbnail - save the thumnail image to folder
func saveThumbnail(srcImagePath string, thumbnailImage *image.NRGBA) error {
filename := filepath.Base(srcImagePath)
dstImagePath := "thumbnail/" + filename

// save the image in the thumbnail folder.
err := imaging.Save(thumbnailImage, dstImagePath)
if err != nil {
return err
}
fmt.Printf("%s -> %s\n", srcImagePath, dstImagePath)
return nil
}

// getFileContentType - return content type and error status
func getFileContentType(file string) (string, error) {

out, err := os.Open(file)
if err != nil {
return "", err
}
defer out.Close()

// Only the first 512 bytes are used to sniff the content type.
buffer := make([]byte, 512)

_, err = out.Read(buffer)
if err != nil {
return "", err
}

// Use the net/http package's handy DetectContentType function. Always returns a valid
// content-type by returning "application/octet-stream" if no others seemed to match.
contentType := http.DetectContentType(buffer)

return contentType, nil
}
6 changes: 5 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,8 @@ module github.com/aditya43/golang_concurrency

go 1.16

require golang.org/x/net v0.0.0-20210525063256-abc453219eb5 // indirect
require (
github.com/disintegration/imaging v1.6.2 // indirect
golang.org/x/image v0.0.0-20210504121937-7319ad40d33e // indirect
golang.org/x/net v0.0.0-20210525063256-abc453219eb5 // indirect
)
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
github.com/disintegration/imaging v1.6.2 h1:w1LecBlG2Lnp8B3jk5zSuNqd7b4DXhcjwek1ei82L+c=
github.com/disintegration/imaging v1.6.2/go.mod h1:44/5580QXChDfwIclfc/PCwrr44amcmDAg8hxG0Ewe4=
golang.org/x/image v0.0.0-20191009234506-e7c1f5e7dbb8/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0=
golang.org/x/image v0.0.0-20210504121937-7319ad40d33e h1:PzJMNfFQx+QO9hrC1GwZ4BoPGeNGhfeQEgcQFArEjPk=
golang.org/x/image v0.0.0-20210504121937-7319ad40d33e/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0=
golang.org/x/net v0.0.0-20210525063256-abc453219eb5 h1:wjuX4b5yYQnEQHzd+CBcrcC6OVR2J1CN6mUy0oSxIPo=
golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=

0 comments on commit cdbc80a

Please sign in to comment.