Skip to content

Commit

Permalink
[BEAM-9951] Adding integration tests for synthetic pipelines in Go (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
youngoli committed Jun 11, 2020
1 parent d5dd47b commit 5e31d3a
Show file tree
Hide file tree
Showing 9 changed files with 352 additions and 1 deletion.
5 changes: 5 additions & 0 deletions sdks/go/pkg/beam/io/synthetic/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,17 @@ package synthetic
import (
"fmt"
"math/rand"
"reflect"
"time"

"github.com/apache/beam/sdks/go/pkg/beam"
"github.com/apache/beam/sdks/go/pkg/beam/io/rtrackers/offsetrange"
)

func init() {
beam.RegisterType(reflect.TypeOf((*sourceFn)(nil)).Elem())
}

// Source creates a synthetic source transform that emits randomly
// generated KV<[]byte, []byte> elements.
//
Expand Down
6 changes: 6 additions & 0 deletions sdks/go/pkg/beam/io/synthetic/step.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,18 @@ package synthetic
import (
"fmt"
"math/rand"
"reflect"
"time"

"github.com/apache/beam/sdks/go/pkg/beam"
"github.com/apache/beam/sdks/go/pkg/beam/io/rtrackers/offsetrange"
)

func init() {
beam.RegisterType(reflect.TypeOf((*stepFn)(nil)).Elem())
beam.RegisterType(reflect.TypeOf((*sdfStepFn)(nil)).Elem())
}

// Step creates a synthetic step transform that receives KV<[]byte, []byte>
// elements from other synthetic transforms, and outputs KV<[]byte, []byte>
// elements based on its inputs.
Expand Down
66 changes: 66 additions & 0 deletions sdks/go/pkg/beam/testing/passert/count.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http:https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package passert

import (
"fmt"

"github.com/apache/beam/sdks/go/pkg/beam"
"github.com/apache/beam/sdks/go/pkg/beam/core/typex"
"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
)

// Count verifies the given PCollection<T> has the specified number of elements.
func Count(s beam.Scope, col beam.PCollection, name string, count int) {
s = s.Scope(fmt.Sprintf("passert.Count(%v)", name))

if typex.IsKV(col.Type()) {
col = beam.DropKey(s, col)
}
counted := beam.Combine(s, &elmCountCombineFn{}, col)
beam.ParDo0(s, &errFn{Name: name, Count: count}, counted)
}

type elmCountCombineFn struct {
}

func (f *elmCountCombineFn) CreateAccumulator() int {
return 0
}

func (f *elmCountCombineFn) AddInput(a int, _ beam.T) int {
return a + 1
}

func (f *elmCountCombineFn) MergeAccumulators(a, b int) int {
return a + b
}

func (f *elmCountCombineFn) ExtractOutput(a int) int {
return a
}

type errFn struct {
Name string `json:"name,omitempty"`
Count int `json:"count,omitempty"`
}

func (f *errFn) ProcessElement(count int) error {
if f.Count != count {
return errors.Errorf("passert.Count(%v) = %v, want %v", f.Name, count, f.Count)
}
return nil
}
45 changes: 45 additions & 0 deletions sdks/go/pkg/beam/testing/passert/count_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http:https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package passert

import (
"testing"

"github.com/apache/beam/sdks/go/pkg/beam"
"github.com/apache/beam/sdks/go/pkg/beam/testing/ptest"
)

func TestCount_Good(t *testing.T) {
p, s := beam.NewPipelineWithRoot()
col := beam.Create(s, "a", "b", "c", "d", "e")
count := 5

Count(s, col, "TestCount_Good", count)
if err := ptest.Run(p); err != nil {
t.Errorf("Pipeline failed: %v", err)
}
}

func TestCount_Bad(t *testing.T) {
p, s := beam.NewPipelineWithRoot()
col := beam.Create(s, "a", "b", "c", "d", "e")
count := 10

Count(s, col, "TestCount_Bad", count)
if err := ptest.Run(p); err == nil {
t.Errorf("pipeline SUCCEEDED but should have failed")
}
}
2 changes: 1 addition & 1 deletion sdks/go/pkg/beam/testing/passert/passert.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
)

//go:generate go install github.com/apache/beam/sdks/go/cmd/starcgen
//go:generate starcgen --package=passert --identifiers=diffFn,failFn,failIfBadEntries,failKVFn,failGBKFn,hashFn,sumFn
//go:generate starcgen --package=passert --identifiers=diffFn,failFn,failIfBadEntries,failKVFn,failGBKFn,hashFn,sumFn,errFn,elmCountCombineFn
//go:generate go fmt

// Diff splits 2 incoming PCollections into 3: left only, both, right only. Duplicates are
Expand Down
156 changes: 156 additions & 0 deletions sdks/go/pkg/beam/testing/passert/passert.shims.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions sdks/go/test/integration/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/apache/beam/sdks/go/pkg/beam/log"
"github.com/apache/beam/sdks/go/pkg/beam/x/beamx"
"github.com/apache/beam/sdks/go/test/integration/primitives"
"github.com/apache/beam/sdks/go/test/integration/synthetic"
"github.com/apache/beam/sdks/go/test/integration/wordcount"
)

Expand Down Expand Up @@ -58,6 +59,8 @@ func main() {
pipelines := []namedPipeline{
{"wordcount:memfs", wordcount.WordCount(old_pond, "+Qj8iAnV5BI2A4sbzUbb6Q==", 8)},
{"wordcount:kinglear", wordcount.WordCount("gs:https://apache-beam-samples/shakespeare/kinglear.txt", "7ZCh5ih9m8IW1w+iS8sRKg==", 4749)},
{"synthetic:simple", synthetic.SimplePipeline()},
{"synthetic:splittable", synthetic.SplittablePipeline()},
{"pardo:multioutput", primitives.ParDoMultiOutput()},
{"pardo:sideinput", primitives.ParDoSideInput()},
{"pardo:kvsideinput", primitives.ParDoKVSideInput()},
Expand Down
Loading

0 comments on commit 5e31d3a

Please sign in to comment.