Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-10235] Adding a CountElms transform to the Go SDK. #11986

Merged
merged 1 commit into from
Jun 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 23 additions & 5 deletions sdks/go/pkg/beam/transforms/stats/count.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,36 @@ package stats

import (
"github.com/apache/beam/sdks/go/pkg/beam"
"github.com/apache/beam/sdks/go/pkg/beam/core/typex"
)

// Count counts the number of elements in a collection. It expects a
// PCollection<T> as input and returns a PCollection<KV<T,int>>. T's encoding
// must be a well-defined injection.
// Count counts the number of appearances of each element in a collection. It
// expects a PCollection<T> as input and returns a PCollection<KV<T,int>>. T's
// encoding must be deterministic so it is valid as a key.
func Count(s beam.Scope, col beam.PCollection) beam.PCollection {
s = s.Scope("stats.Count")

pre := beam.ParDo(s, mapFn, col)
pre := beam.ParDo(s, keyedCountFn, col)
return SumPerKey(s, pre)
}

func mapFn(elm beam.T) (beam.T, int) {
func keyedCountFn(elm beam.T) (beam.T, int) {
return elm, 1
}

// CountElms counts the number of elements in a collection. It expects a
// PCollection<T> as input and returns a PCollection<int> of one element
// containing the count.
func CountElms(s beam.Scope, col beam.PCollection) beam.PCollection {
s = s.Scope("stats.CountElms")

if typex.IsKV(col.Type()) {
col = beam.DropKey(s, col)
}
pre := beam.ParDo(s, countFn, col)
return Sum(s, pre)
}

func countFn(_ beam.T) int {
return 1
}
51 changes: 51 additions & 0 deletions sdks/go/pkg/beam/transforms/stats/count_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package stats

import (
"fmt"
"testing"

"github.com/apache/beam/sdks/go/pkg/beam"
Expand Down Expand Up @@ -67,3 +68,53 @@ func TestCountInt(t *testing.T) {
}
}
}

// TestCountElms verifies that CountElms works correctly with PCollections of
// various types, including keyed and unkeyed elements.
func TestCountElms(t *testing.T) {

tests := []struct {
name string
in func(s beam.Scope) beam.PCollection
count int
}{
{
name: "single",
in: func(s beam.Scope) beam.PCollection {
return beam.Create(s, 1)
},
count: 1,
},
{
name: "multiple",
in: func(s beam.Scope) beam.PCollection {
return beam.Create(s, "one", "two", "three")
},
count: 3,
},
{
name: "keyed",
in: func(s beam.Scope) beam.PCollection {
vals := beam.Create(s, 1.0, 2.0, 3.0, 4.0, 5.0)
return beam.AddFixedKey(s, vals)
},
count: 5,
},
}

for _, test := range tests {
test := test
t.Run(fmt.Sprintf("_%v", test.name), func(t *testing.T) {
p := beam.NewPipeline()
s := p.Root()
in := test.in(s)
exp := beam.Create(s, test.count)
count := CountElms(s, in)
passert.Equals(s, count, exp)

if err := ptest.Run(p); err != nil {
t.Errorf("CountElms != %v: %v", test.count, err)
}
})
}
}
30 changes: 29 additions & 1 deletion sdks/go/pkg/beam/transforms/stats/stats.shims.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion sdks/go/pkg/beam/transforms/stats/util_gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,4 @@ package stats
// `go generate` executes go:generate commands in lexical file order, top to bottom.

//go:generate go install github.com/apache/beam/sdks/go/cmd/starcgen
//go:generate starcgen --package=stats --identifiers=mapFn,meanFn,maxIntFn,minIntFn,sumIntFn,maxInt8Fn,minInt8Fn,sumInt8Fn,maxInt16Fn,minInt16Fn,sumInt16Fn,maxInt32Fn,minInt32Fn,sumInt32Fn,maxInt64Fn,minInt64Fn,sumInt64Fn,maxUintFn,minUintFn,sumUintFn,maxUint8Fn,minUint8Fn,sumUint8Fn,maxUint16Fn,minUint16Fn,sumUint16Fn,maxUint32Fn,minUint32Fn,sumUint32Fn,maxUint64Fn,minUint64Fn,sumUint64Fn,maxFloat32Fn,minFloat32Fn,sumFloat32Fn,maxFloat64Fn,minFloat64Fn,sumFloat64Fn
//go:generate starcgen --package=stats --identifiers=countFn,keyedCountFn,meanFn,maxIntFn,minIntFn,sumIntFn,maxInt8Fn,minInt8Fn,sumInt8Fn,maxInt16Fn,minInt16Fn,sumInt16Fn,maxInt32Fn,minInt32Fn,sumInt32Fn,maxInt64Fn,minInt64Fn,sumInt64Fn,maxUintFn,minUintFn,sumUintFn,maxUint8Fn,minUint8Fn,sumUint8Fn,maxUint16Fn,minUint16Fn,sumUint16Fn,maxUint32Fn,minUint32Fn,sumUint32Fn,maxUint64Fn,minUint64Fn,sumUint64Fn,maxFloat32Fn,minFloat32Fn,sumFloat32Fn,maxFloat64Fn,minFloat64Fn,sumFloat64Fn
2 changes: 1 addition & 1 deletion sdks/go/pkg/beam/transforms/stats/util_gen.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,6 @@ package stats

//go:generate go install github.com/apache/beam/sdks/go/cmd/starcgen
{{- with $x := .X }}
//go:generate starcgen --package=stats --identifiers=mapFn,meanFn{{- range $i, $t := $x -}},max{{$t.Name}}Fn,min{{$t.Name}}Fn,sum{{$t.Name}}Fn{{- end -}}
//go:generate starcgen --package=stats --identifiers=countFn,keyedCountFn,meanFn{{- range $i, $t := $x -}},max{{$t.Name}}Fn,min{{$t.Name}}Fn,sum{{$t.Name}}Fn{{- end -}}
{{end}}