Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-9951] Adding integration tests for synthetic pipelines in Go #11870

Merged
merged 3 commits into from
Jun 11, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Fixup: Reverting unintended change.
  • Loading branch information
youngoli committed Jun 11, 2020
commit c79fdefb03054e221566f77ba8f91b781400cca3
18 changes: 9 additions & 9 deletions sdks/go/pkg/beam/io/synthetic/step.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ func init() {
func Step(s beam.Scope, cfg StepConfig, col beam.PCollection) beam.PCollection {
s = s.Scope("synthetic.Step")
if cfg.Splittable {
return beam.ParDo(s, &sdfStepFn{Cfg: cfg}, col)
return beam.ParDo(s, &sdfStepFn{cfg: cfg}, col)
} else {
return beam.ParDo(s, &stepFn{Cfg: cfg}, col)
return beam.ParDo(s, &stepFn{cfg: cfg}, col)
}
}

Expand All @@ -58,7 +58,7 @@ func Step(s beam.Scope, cfg StepConfig, col beam.PCollection) beam.PCollection {
// The stepFn is expected to be initialized with a cfg and will follow that
// config to determine its behavior when emitting elements.
type stepFn struct {
Cfg StepConfig
cfg StepConfig
rng randWrapper
}

Expand All @@ -71,9 +71,9 @@ func (fn *stepFn) Setup() {
// outputs identical to that input based on the outputs per input configuration
// in StepConfig.
func (fn *stepFn) ProcessElement(key, val []byte, emit func([]byte, []byte)) {
filtered := fn.Cfg.FilterRatio > 0 && fn.rng.Float64() < fn.Cfg.FilterRatio
filtered := fn.cfg.FilterRatio > 0 && fn.rng.Float64() < fn.cfg.FilterRatio

for i := 0; i < fn.Cfg.OutputPerInput; i++ {
for i := 0; i < fn.cfg.OutputPerInput; i++ {
if !filtered {
emit(key, val)
}
Expand All @@ -86,7 +86,7 @@ func (fn *stepFn) ProcessElement(key, val []byte, emit func([]byte, []byte)) {
// The sdfStepFn is expected to be initialized with a cfg and will follow
// that config to determine its behavior when splitting and emitting elements.
type sdfStepFn struct {
Cfg StepConfig
cfg StepConfig
rng randWrapper
}

Expand All @@ -96,7 +96,7 @@ type sdfStepFn struct {
func (fn *sdfStepFn) CreateInitialRestriction(key, val []byte) offsetrange.Restriction {
return offsetrange.Restriction{
Start: 0,
End: int64(fn.Cfg.OutputPerInput),
End: int64(fn.cfg.OutputPerInput),
}
}

Expand All @@ -105,7 +105,7 @@ func (fn *sdfStepFn) CreateInitialRestriction(key, val []byte) offsetrange.Restr
// method will contain at least one element, so the number of splits will not
// exceed the number of elements.
func (fn *sdfStepFn) SplitRestriction(key, val []byte, rest offsetrange.Restriction) (splits []offsetrange.Restriction) {
return rest.EvenSplits(int64(fn.Cfg.InitialSplits))
return rest.EvenSplits(int64(fn.cfg.InitialSplits))
}

// RestrictionSize outputs the size of the restriction as the number of elements
Expand All @@ -128,7 +128,7 @@ func (fn *sdfStepFn) Setup() {
// ProcessElement takes an input and either filters it or produces a number of
// outputs identical to that input based on the restriction size.
func (fn *sdfStepFn) ProcessElement(rt *offsetrange.Tracker, key, val []byte, emit func([]byte, []byte)) {
filtered := fn.Cfg.FilterRatio > 0 && fn.rng.Float64() < fn.Cfg.FilterRatio
filtered := fn.cfg.FilterRatio > 0 && fn.rng.Float64() < fn.cfg.FilterRatio

for i := rt.Rest.Start; rt.TryClaim(i) == true; i++ {
if !filtered {
Expand Down
Loading