Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-9951] Adding integration tests for synthetic pipelines in Go #11870

Merged
merged 3 commits into from
Jun 11, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions sdks/go/pkg/beam/io/synthetic/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,17 @@ package synthetic
import (
"fmt"
"math/rand"
"reflect"
"time"

"github.com/apache/beam/sdks/go/pkg/beam"
"github.com/apache/beam/sdks/go/pkg/beam/io/rtrackers/offsetrange"
)

func init() {
beam.RegisterType(reflect.TypeOf((*sourceFn)(nil)).Elem())
}

// Source creates a synthetic source transform that emits randomly
// generated KV<[]byte, []byte> elements.
//
Expand Down
24 changes: 15 additions & 9 deletions sdks/go/pkg/beam/io/synthetic/step.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,18 @@ package synthetic
import (
"fmt"
"math/rand"
"reflect"
"time"

"github.com/apache/beam/sdks/go/pkg/beam"
"github.com/apache/beam/sdks/go/pkg/beam/io/rtrackers/offsetrange"
)

func init() {
beam.RegisterType(reflect.TypeOf((*stepFn)(nil)).Elem())
beam.RegisterType(reflect.TypeOf((*sdfStepFn)(nil)).Elem())
}

// Step creates a synthetic step transform that receives KV<[]byte, []byte>
// elements from other synthetic transforms, and outputs KV<[]byte, []byte>
// elements based on its inputs.
Expand All @@ -40,9 +46,9 @@ import (
func Step(s beam.Scope, cfg StepConfig, col beam.PCollection) beam.PCollection {
s = s.Scope("synthetic.Step")
if cfg.Splittable {
return beam.ParDo(s, &sdfStepFn{cfg: cfg}, col)
return beam.ParDo(s, &sdfStepFn{Cfg: cfg}, col)
} else {
return beam.ParDo(s, &stepFn{cfg: cfg}, col)
return beam.ParDo(s, &stepFn{Cfg: cfg}, col)
}
}

Expand All @@ -52,7 +58,7 @@ func Step(s beam.Scope, cfg StepConfig, col beam.PCollection) beam.PCollection {
// The stepFn is expected to be initialized with a cfg and will follow that
// config to determine its behavior when emitting elements.
type stepFn struct {
cfg StepConfig
Cfg StepConfig
rng randWrapper
}

Expand All @@ -65,9 +71,9 @@ func (fn *stepFn) Setup() {
// outputs identical to that input based on the outputs per input configuration
// in StepConfig.
func (fn *stepFn) ProcessElement(key, val []byte, emit func([]byte, []byte)) {
filtered := fn.cfg.FilterRatio > 0 && fn.rng.Float64() < fn.cfg.FilterRatio
filtered := fn.Cfg.FilterRatio > 0 && fn.rng.Float64() < fn.Cfg.FilterRatio

for i := 0; i < fn.cfg.OutputPerInput; i++ {
for i := 0; i < fn.Cfg.OutputPerInput; i++ {
if !filtered {
emit(key, val)
}
Expand All @@ -80,7 +86,7 @@ func (fn *stepFn) ProcessElement(key, val []byte, emit func([]byte, []byte)) {
// The sdfStepFn is expected to be initialized with a cfg and will follow
// that config to determine its behavior when splitting and emitting elements.
type sdfStepFn struct {
cfg StepConfig
Cfg StepConfig
rng randWrapper
}

Expand All @@ -90,7 +96,7 @@ type sdfStepFn struct {
func (fn *sdfStepFn) CreateInitialRestriction(key, val []byte) offsetrange.Restriction {
return offsetrange.Restriction{
Start: 0,
End: int64(fn.cfg.OutputPerInput),
End: int64(fn.Cfg.OutputPerInput),
}
}

Expand All @@ -99,7 +105,7 @@ func (fn *sdfStepFn) CreateInitialRestriction(key, val []byte) offsetrange.Restr
// method will contain at least one element, so the number of splits will not
// exceed the number of elements.
func (fn *sdfStepFn) SplitRestriction(key, val []byte, rest offsetrange.Restriction) (splits []offsetrange.Restriction) {
return rest.EvenSplits(int64(fn.cfg.InitialSplits))
return rest.EvenSplits(int64(fn.Cfg.InitialSplits))
}

// RestrictionSize outputs the size of the restriction as the number of elements
Expand All @@ -122,7 +128,7 @@ func (fn *sdfStepFn) Setup() {
// ProcessElement takes an input and either filters it or produces a number of
// outputs identical to that input based on the restriction size.
func (fn *sdfStepFn) ProcessElement(rt *offsetrange.Tracker, key, val []byte, emit func([]byte, []byte)) {
filtered := fn.cfg.FilterRatio > 0 && fn.rng.Float64() < fn.cfg.FilterRatio
filtered := fn.Cfg.FilterRatio > 0 && fn.rng.Float64() < fn.Cfg.FilterRatio

for i := rt.Rest.Start; rt.TryClaim(i) == true; i++ {
if !filtered {
Expand Down
52 changes: 52 additions & 0 deletions sdks/go/pkg/beam/testing/passert/count.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http:https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package passert

import (
"fmt"

"github.com/apache/beam/sdks/go/pkg/beam"
"github.com/apache/beam/sdks/go/pkg/beam/core/typex"
)

func Count(s beam.Scope, col beam.PCollection, name string, count int) {
s = s.Scope(fmt.Sprintf("passert.Count(%v)", name))

if typex.IsKV(col.Type()) {
col = beam.DropKey(s, col)
}
counted := beam.Combine(s, &elmCountCombineFn{}, col)
Equals(s, counted, count)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can the Sum transform be re-used instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it could, and I considered it, but the error message isn't too helpful either (better than equals though). I added a ParDo0 with a custom error message which I think ends up being the most user-friendly.

Copy link
Contributor

@lostluck lostluck Jun 10, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant passert.Sum. Sorry that should have been clearer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought so, that's what I was referring to. The error message in passert.Sum (while better than passert.Equals) is still a bit unclear, just because it specifically mentions passert.Sum instead of passert.Count.

}

type elmCountCombineFn struct {
}

func (f *elmCountCombineFn) CreateAccumulator() int {
return 0
}

func (f *elmCountCombineFn) AddInput(a int, _ beam.T) int {
return a + 1
}

func (f *elmCountCombineFn) MergeAccumulators(a, b int) int {
return a + b
}

func (f *elmCountCombineFn) ExtractOutput(a int) int {
return a
}
2 changes: 1 addition & 1 deletion sdks/go/pkg/beam/testing/passert/passert.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
)

//go:generate go install github.com/apache/beam/sdks/go/cmd/starcgen
//go:generate starcgen --package=passert --identifiers=diffFn,failFn,failIfBadEntries,failKVFn,failGBKFn,hashFn,sumFn
//go:generate starcgen --package=passert --identifiers=diffFn,failFn,failIfBadEntries,failKVFn,failGBKFn,hashFn,sumFn,elmCountCombineFn
//go:generate go fmt

// Diff splits 2 incoming PCollections into 3: left only, both, right only. Duplicates are
Expand Down
120 changes: 120 additions & 0 deletions sdks/go/pkg/beam/testing/passert/passert.shims.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions sdks/go/test/integration/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/apache/beam/sdks/go/pkg/beam/log"
"github.com/apache/beam/sdks/go/pkg/beam/x/beamx"
"github.com/apache/beam/sdks/go/test/integration/primitives"
"github.com/apache/beam/sdks/go/test/integration/synthetic"
"github.com/apache/beam/sdks/go/test/integration/wordcount"
)

Expand Down Expand Up @@ -58,6 +59,8 @@ func main() {
pipelines := []namedPipeline{
{"wordcount:memfs", wordcount.WordCount(old_pond, "+Qj8iAnV5BI2A4sbzUbb6Q==", 8)},
{"wordcount:kinglear", wordcount.WordCount("gs:https://apache-beam-samples/shakespeare/kinglear.txt", "7ZCh5ih9m8IW1w+iS8sRKg==", 4749)},
{"synthetic:simple", synthetic.SimplePipeline()},
{"synthetic:splittable", synthetic.SplittablePipeline()},
{"pardo:multioutput", primitives.ParDoMultiOutput()},
{"pardo:sideinput", primitives.ParDoSideInput()},
{"pardo:kvsideinput", primitives.ParDoKVSideInput()},
Expand Down
Loading