Skip to content

Commit

Permalink
[BEAM-10235] Adding a CountElms transform to the Go SDK.
Browse files Browse the repository at this point in the history
This transform counts the number of elements in a PTransform.
  • Loading branch information
youngoli committed Jun 13, 2020
1 parent d5dd47b commit d870be8
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 8 deletions.
28 changes: 23 additions & 5 deletions sdks/go/pkg/beam/transforms/stats/count.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,36 @@ package stats

import (
"github.com/apache/beam/sdks/go/pkg/beam"
"github.com/apache/beam/sdks/go/pkg/beam/core/typex"
)

// Count counts the number of elements in a collection. It expects a
// PCollection<T> as input and returns a PCollection<KV<T,int>>. T's encoding
// must be a well-defined injection.
// Count counts the number of appearances of each element in a collection. It
// expects a PCollection<T> as input and returns a PCollection<KV<T,int>>. T's
// encoding must be deterministic so it is valid as a key.
func Count(s beam.Scope, col beam.PCollection) beam.PCollection {
s = s.Scope("stats.Count")

pre := beam.ParDo(s, mapFn, col)
pre := beam.ParDo(s, keyedCountFn, col)
return SumPerKey(s, pre)
}

func mapFn(elm beam.T) (beam.T, int) {
func keyedCountFn(elm beam.T) (beam.T, int) {
return elm, 1
}

// CountElms counts the number of elements in a collection. It expects a
// PCollection<T> as input and returns a PCollection<int> of one element
// containing the count.
func CountElms(s beam.Scope, col beam.PCollection) beam.PCollection {
s = s.Scope("stats.CountElms")

if typex.IsKV(col.Type()) {
col = beam.DropKey(s, col)
}
pre := beam.ParDo(s, countFn, col)
return Sum(s, pre)
}

func countFn(_ beam.T) int {
return 1
}
51 changes: 51 additions & 0 deletions sdks/go/pkg/beam/transforms/stats/count_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package stats

import (
"fmt"
"testing"

"github.com/apache/beam/sdks/go/pkg/beam"
Expand Down Expand Up @@ -67,3 +68,53 @@ func TestCountInt(t *testing.T) {
}
}
}

// TestCountElms verifies that CountElms works correctly with PCollections of
// various types, including keyed and unkeyed elements.
func TestCountElms(t *testing.T) {

tests := []struct {
name string
in func(s beam.Scope) beam.PCollection
count int
}{
{
name: "single",
in: func(s beam.Scope) beam.PCollection {
return beam.Create(s, 1)
},
count: 1,
},
{
name: "multiple",
in: func(s beam.Scope) beam.PCollection {
return beam.Create(s, "one", "two", "three")
},
count: 3,
},
{
name: "keyed",
in: func(s beam.Scope) beam.PCollection {
vals := beam.Create(s, 1.0, 2.0, 3.0, 4.0, 5.0)
return beam.AddFixedKey(s, vals)
},
count: 5,
},
}

for _, test := range tests {
test := test
t.Run(fmt.Sprintf("_%v", test.name), func(t *testing.T) {
p := beam.NewPipeline()
s := p.Root()
in := test.in(s)
exp := beam.Create(s, test.count)
count := CountElms(s, in)
passert.Equals(s, count, exp)

if err := ptest.Run(p); err != nil {
t.Errorf("CountElms != %v: %v", test.count, err)
}
})
}
}
30 changes: 29 additions & 1 deletion sdks/go/pkg/beam/transforms/stats/stats.shims.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion sdks/go/pkg/beam/transforms/stats/util_gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,4 @@ package stats
// `go generate` executes go:generate commands in lexical file order, top to bottom.

//go:generate go install github.com/apache/beam/sdks/go/cmd/starcgen
//go:generate starcgen --package=stats --identifiers=mapFn,meanFn,maxIntFn,minIntFn,sumIntFn,maxInt8Fn,minInt8Fn,sumInt8Fn,maxInt16Fn,minInt16Fn,sumInt16Fn,maxInt32Fn,minInt32Fn,sumInt32Fn,maxInt64Fn,minInt64Fn,sumInt64Fn,maxUintFn,minUintFn,sumUintFn,maxUint8Fn,minUint8Fn,sumUint8Fn,maxUint16Fn,minUint16Fn,sumUint16Fn,maxUint32Fn,minUint32Fn,sumUint32Fn,maxUint64Fn,minUint64Fn,sumUint64Fn,maxFloat32Fn,minFloat32Fn,sumFloat32Fn,maxFloat64Fn,minFloat64Fn,sumFloat64Fn
//go:generate starcgen --package=stats --identifiers=countFn,keyedCountFn,meanFn,maxIntFn,minIntFn,sumIntFn,maxInt8Fn,minInt8Fn,sumInt8Fn,maxInt16Fn,minInt16Fn,sumInt16Fn,maxInt32Fn,minInt32Fn,sumInt32Fn,maxInt64Fn,minInt64Fn,sumInt64Fn,maxUintFn,minUintFn,sumUintFn,maxUint8Fn,minUint8Fn,sumUint8Fn,maxUint16Fn,minUint16Fn,sumUint16Fn,maxUint32Fn,minUint32Fn,sumUint32Fn,maxUint64Fn,minUint64Fn,sumUint64Fn,maxFloat32Fn,minFloat32Fn,sumFloat32Fn,maxFloat64Fn,minFloat64Fn,sumFloat64Fn
2 changes: 1 addition & 1 deletion sdks/go/pkg/beam/transforms/stats/util_gen.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,6 @@ package stats

//go:generate go install github.com/apache/beam/sdks/go/cmd/starcgen
{{- with $x := .X }}
//go:generate starcgen --package=stats --identifiers=mapFn,meanFn{{- range $i, $t := $x -}},max{{$t.Name}}Fn,min{{$t.Name}}Fn,sum{{$t.Name}}Fn{{- end -}}
//go:generate starcgen --package=stats --identifiers=countFn,keyedCountFn,meanFn{{- range $i, $t := $x -}},max{{$t.Name}}Fn,min{{$t.Name}}Fn,sum{{$t.Name}}Fn{{- end -}}
{{end}}

0 comments on commit d870be8

Please sign in to comment.