Skip to content

Commit

Permalink
[BEAM-9951] Wrapping up synthetic integration tests.
Browse files Browse the repository at this point in the history
Adding testing, and adjusting the error message for the count transform.
  • Loading branch information
youngoli committed Jun 10, 2020
1 parent 843bf37 commit c1c3513
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 545 deletions.
16 changes: 15 additions & 1 deletion sdks/go/pkg/beam/testing/passert/count.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,18 @@ import (

"github.com/apache/beam/sdks/go/pkg/beam"
"github.com/apache/beam/sdks/go/pkg/beam/core/typex"
"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
)

// Count verifies the given PCollection<T> has the specified number of elements.
func Count(s beam.Scope, col beam.PCollection, name string, count int) {
s = s.Scope(fmt.Sprintf("passert.Count(%v)", name))

if typex.IsKV(col.Type()) {
col = beam.DropKey(s, col)
}
counted := beam.Combine(s, &elmCountCombineFn{}, col)
Equals(s, counted, count)
beam.ParDo0(s, &errFn{Name: name, Count: count}, counted)
}

type elmCountCombineFn struct {
Expand All @@ -50,3 +52,15 @@ func (f *elmCountCombineFn) MergeAccumulators(a, b int) int {
func (f *elmCountCombineFn) ExtractOutput(a int) int {
return a
}

type errFn struct {
Name string `json:"name,omitempty"`
Count int `json:"count,omitempty"`
}

func (f *errFn) ProcessElement(count int) error {
if f.Count != count {
return errors.Errorf("passert.Count(%v) = %v, want %v", f.Name, count, f.Count)
}
return nil
}
45 changes: 45 additions & 0 deletions sdks/go/pkg/beam/testing/passert/count_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package passert

import (
"testing"

"github.com/apache/beam/sdks/go/pkg/beam"
"github.com/apache/beam/sdks/go/pkg/beam/testing/ptest"
)

func TestCount_Good(t *testing.T) {
p, s := beam.NewPipelineWithRoot()
col := beam.Create(s, "a", "b", "c", "d", "e")
count := 5

Count(s, col, "TestCount_Good", count)
if err := ptest.Run(p); err != nil {
t.Errorf("Pipeline failed: %v", err)
}
}

func TestCount_Bad(t *testing.T) {
p, s := beam.NewPipelineWithRoot()
col := beam.Create(s, "a", "b", "c", "d", "e")
count := 10

Count(s, col, "TestCount_Bad", count)
if err := ptest.Run(p); err == nil {
t.Errorf("pipeline SUCCEEDED but should have failed")
}
}
2 changes: 1 addition & 1 deletion sdks/go/pkg/beam/testing/passert/passert.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
)

//go:generate go install github.com/apache/beam/sdks/go/cmd/starcgen
//go:generate starcgen --package=passert --identifiers=diffFn,failFn,failIfBadEntries,failKVFn,failGBKFn,hashFn,sumFn,elmCountCombineFn
//go:generate starcgen --package=passert --identifiers=diffFn,failFn,failIfBadEntries,failKVFn,failGBKFn,hashFn,sumFn,errFn,elmCountCombineFn
//go:generate go fmt

// Diff splits 2 incoming PCollections into 3: left only, both, right only. Duplicates are
Expand Down
Loading

0 comments on commit c1c3513

Please sign in to comment.