Skip to content

Commit

Permalink
[BEAM-10179] Clean-up URNJavaDoFn workaround in Go SDK. (apache#11899)
Browse files Browse the repository at this point in the history
Now that the SDK has migrated to use the Dataflow runner v2, this is no longer required.
  • Loading branch information
lukecwik authored and yirutang committed Jul 23, 2020
1 parent 9927d75 commit 125ead1
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 10 deletions.
1 change: 0 additions & 1 deletion sdks/go/pkg/beam/core/runtime/exec/translate.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,6 @@ func (b *builder) makeLink(from string, id linkID) (Node, error) {
var u Node
switch urn {
case graphx.URNParDo,
graphx.URNJavaDoFn,
urnPerKeyCombinePre,
urnPerKeyCombineMerge,
urnPerKeyCombineExtract,
Expand Down
13 changes: 4 additions & 9 deletions sdks/go/pkg/beam/core/runtime/graphx/translate.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,7 @@ const (
URNSessionsWindowFn = "beam:window_fn:session_windows:v1"

// SDK constants

// URNJavaDoFn is the legacy constant for marking a DoFn.
// TODO: remove URNJavaDoFN when the Dataflow runner
// uses the model pipeline and no longer falls back to Java.
URNJavaDoFn = "beam:dofn:javasdk:0.1"
URNDoFn = "beam:go:transform:dofn:v1"
URNDoFn = "beam:go:transform:dofn:v1"

URNIterableSideInputKey = "beam:go:transform:iterablesideinputkey:v1"
URNReshuffleInput = "beam:go:transform:reshuffleinput:v1"
Expand Down Expand Up @@ -239,7 +234,7 @@ func (m *marshaller) updateIfCombineComposite(s *ScopeTree, transform *pipepb.PT
acID := m.coders.Add(edge.AccumCoder)
payload := &pipepb.CombinePayload{
CombineFn: &pipepb.FunctionSpec{
Urn: URNJavaDoFn,
Urn: URNDoFn,
Payload: []byte(mustEncodeMultiEdgeBase64(edge)),
},
AccumulatorCoderId: acID,
Expand Down Expand Up @@ -341,7 +336,7 @@ func (m *marshaller) addMultiEdge(edge NamedEdge) []string {

payload := &pipepb.ParDoPayload{
DoFn: &pipepb.FunctionSpec{
Urn: URNJavaDoFn,
Urn: URNDoFn,
Payload: []byte(mustEncodeMultiEdgeBase64(edge.Edge)),
},
SideInputs: si,
Expand All @@ -355,7 +350,7 @@ func (m *marshaller) addMultiEdge(edge NamedEdge) []string {
case graph.Combine:
payload := &pipepb.ParDoPayload{
DoFn: &pipepb.FunctionSpec{
Urn: URNJavaDoFn,
Urn: URNDoFn,
Payload: []byte(mustEncodeMultiEdgeBase64(edge.Edge)),
},
}
Expand Down

0 comments on commit 125ead1

Please sign in to comment.