Skip to content

Commit

Permalink
Fixup: Reverting unintended change.
Browse files Browse the repository at this point in the history
  • Loading branch information
youngoli committed Jun 10, 2020
1 parent c1c3513 commit 9fbfdd1
Showing 1 changed file with 9 additions and 9 deletions.
18 changes: 9 additions & 9 deletions sdks/go/pkg/beam/io/synthetic/step.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ func init() {
func Step(s beam.Scope, cfg StepConfig, col beam.PCollection) beam.PCollection {
s = s.Scope("synthetic.Step")
if cfg.Splittable {
return beam.ParDo(s, &sdfStepFn{Cfg: cfg}, col)
return beam.ParDo(s, &sdfStepFn{cfg: cfg}, col)
} else {
return beam.ParDo(s, &stepFn{Cfg: cfg}, col)
return beam.ParDo(s, &stepFn{cfg: cfg}, col)
}
}

Expand All @@ -58,7 +58,7 @@ func Step(s beam.Scope, cfg StepConfig, col beam.PCollection) beam.PCollection {
// The stepFn is expected to be initialized with a cfg and will follow that
// config to determine its behavior when emitting elements.
type stepFn struct {
Cfg StepConfig
cfg StepConfig
rng randWrapper
}

Expand All @@ -71,9 +71,9 @@ func (fn *stepFn) Setup() {
// outputs identical to that input based on the outputs per input configuration
// in StepConfig.
func (fn *stepFn) ProcessElement(key, val []byte, emit func([]byte, []byte)) {
filtered := fn.Cfg.FilterRatio > 0 && fn.rng.Float64() < fn.Cfg.FilterRatio
filtered := fn.cfg.FilterRatio > 0 && fn.rng.Float64() < fn.cfg.FilterRatio

for i := 0; i < fn.Cfg.OutputPerInput; i++ {
for i := 0; i < fn.cfg.OutputPerInput; i++ {
if !filtered {
emit(key, val)
}
Expand All @@ -86,7 +86,7 @@ func (fn *stepFn) ProcessElement(key, val []byte, emit func([]byte, []byte)) {
// The sdfStepFn is expected to be initialized with a cfg and will follow
// that config to determine its behavior when splitting and emitting elements.
type sdfStepFn struct {
Cfg StepConfig
cfg StepConfig
rng randWrapper
}

Expand All @@ -96,7 +96,7 @@ type sdfStepFn struct {
func (fn *sdfStepFn) CreateInitialRestriction(key, val []byte) offsetrange.Restriction {
return offsetrange.Restriction{
Start: 0,
End: int64(fn.Cfg.OutputPerInput),
End: int64(fn.cfg.OutputPerInput),
}
}

Expand All @@ -105,7 +105,7 @@ func (fn *sdfStepFn) CreateInitialRestriction(key, val []byte) offsetrange.Restr
// method will contain at least one element, so the number of splits will not
// exceed the number of elements.
func (fn *sdfStepFn) SplitRestriction(key, val []byte, rest offsetrange.Restriction) (splits []offsetrange.Restriction) {
return rest.EvenSplits(int64(fn.Cfg.InitialSplits))
return rest.EvenSplits(int64(fn.cfg.InitialSplits))
}

// RestrictionSize outputs the size of the restriction as the number of elements
Expand All @@ -128,7 +128,7 @@ func (fn *sdfStepFn) Setup() {
// ProcessElement takes an input and either filters it or produces a number of
// outputs identical to that input based on the restriction size.
func (fn *sdfStepFn) ProcessElement(rt *offsetrange.Tracker, key, val []byte, emit func([]byte, []byte)) {
filtered := fn.Cfg.FilterRatio > 0 && fn.rng.Float64() < fn.Cfg.FilterRatio
filtered := fn.cfg.FilterRatio > 0 && fn.rng.Float64() < fn.cfg.FilterRatio

for i := rt.Rest.Start; rt.TryClaim(i) == true; i++ {
if !filtered {
Expand Down

0 comments on commit 9fbfdd1

Please sign in to comment.