Skip to content

Commit

Permalink
Move Runner API protos to portability/runners/api
Browse files Browse the repository at this point in the history
This fixes a circular import issue between transforms/ and runners/
  • Loading branch information
charlesccychen authored and chamikaramj committed Jun 8, 2017
1 parent bc2f97c commit 7689e43
Show file tree
Hide file tree
Showing 24 changed files with 60 additions and 24 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ sdks/python/**/*.egg
sdks/python/LICENSE
sdks/python/NOTICE
sdks/python/README.md
sdks/python/apache_beam/runners/api/*pb2*.*
sdks/python/apache_beam/portability/runners/api/*pb2*.*

# Ignore IntelliJ files.
.idea/
Expand Down
2 changes: 1 addition & 1 deletion sdks/python/apache_beam/coders/coders.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import google.protobuf

from apache_beam.coders import coder_impl
from apache_beam.portability.runners.api import beam_runner_api_pb2
from apache_beam.utils import urns
from apache_beam.utils import proto_utils

Expand Down Expand Up @@ -205,7 +206,6 @@ def to_runner_api(self, context):
"""For internal use only; no backwards-compatibility guarantees.
"""
# TODO(BEAM-115): Use specialized URNs and components.
from apache_beam.runners.api import beam_runner_api_pb2
return beam_runner_api_pb2.Coder(
spec=beam_runner_api_pb2.SdkFunctionSpec(
spec=beam_runner_api_pb2.FunctionSpec(
Expand Down
4 changes: 2 additions & 2 deletions sdks/python/apache_beam/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ def visit_value(self, value, _):
def to_runner_api(self):
"""For internal use only; no backwards-compatibility guarantees."""
from apache_beam.runners import pipeline_context
from apache_beam.runners.api import beam_runner_api_pb2
from apache_beam.portability.runners.api import beam_runner_api_pb2
context = pipeline_context.PipelineContext()
# Mutates context; placing inline would force dependence on
# argument evaluation order.
Expand Down Expand Up @@ -525,7 +525,7 @@ def named_outputs(self):
if isinstance(output, pvalue.PCollection)}

def to_runner_api(self, context):
from apache_beam.runners.api import beam_runner_api_pb2
from apache_beam.portability.runners.api import beam_runner_api_pb2

def transform_to_runner_api(transform, context):
if transform is None:
Expand Down
18 changes: 18 additions & 0 deletions sdks/python/apache_beam/portability/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http:https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""For internal use only; no backwards-compatibility guarantees."""
18 changes: 18 additions & 0 deletions sdks/python/apache_beam/portability/runners/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http:https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""For internal use only; no backwards-compatibility guarantees."""
2 changes: 1 addition & 1 deletion sdks/python/apache_beam/pvalue.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ def __reduce_ex__(self, unused_version):
return _InvalidUnpickledPCollection, ()

def to_runner_api(self, context):
from apache_beam.runners.api import beam_runner_api_pb2
from apache_beam.portability.runners.api import beam_runner_api_pb2
from apache_beam.internal import pickler
return beam_runner_api_pb2.PCollection(
unique_name='%d%s.%s' % (
Expand Down
4 changes: 2 additions & 2 deletions sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -732,7 +732,7 @@ def run__NativeWrite(self, transform_node):
@classmethod
def serialize_windowing_strategy(cls, windowing):
from apache_beam.runners import pipeline_context
from apache_beam.runners.api import beam_runner_api_pb2
from apache_beam.portability.runners.api import beam_runner_api_pb2
context = pipeline_context.PipelineContext()
windowing_proto = windowing.to_runner_api(context)
return cls.byte_array_to_json_string(
Expand All @@ -745,7 +745,7 @@ def deserialize_windowing_strategy(cls, serialized_data):
# Imported here to avoid circular dependencies.
# pylint: disable=wrong-import-order, wrong-import-position
from apache_beam.runners import pipeline_context
from apache_beam.runners.api import beam_runner_api_pb2
from apache_beam.portability.runners.api import beam_runner_api_pb2
from apache_beam.transforms.core import Windowing
proto = beam_runner_api_pb2.MessageWithComponents()
proto.ParseFromString(cls.json_string_to_byte_array(serialized_data))
Expand Down
2 changes: 1 addition & 1 deletion sdks/python/apache_beam/runners/pipeline_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from apache_beam import pipeline
from apache_beam import pvalue
from apache_beam import coders
from apache_beam.runners.api import beam_runner_api_pb2
from apache_beam.portability.runners.api import beam_runner_api_pb2
from apache_beam.transforms import core


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
from apache_beam.internal import pickler
from apache_beam.io import iobase
from apache_beam.transforms.window import GlobalWindows
from apache_beam.runners.api import beam_fn_api_pb2
from apache_beam.portability.runners.api import beam_fn_api_pb2
from apache_beam.runners.portability import maptask_executor_runner
from apache_beam.runners.worker import data_plane
from apache_beam.runners.worker import operation_specs
Expand Down
2 changes: 1 addition & 1 deletion sdks/python/apache_beam/runners/worker/data_plane.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import threading

from apache_beam.coders import coder_impl
from apache_beam.runners.api import beam_fn_api_pb2
from apache_beam.portability.runners.api import beam_fn_api_pb2
import grpc

# This module is experimental. No backwards-compatibility guarantees.
Expand Down
2 changes: 1 addition & 1 deletion sdks/python/apache_beam/runners/worker/data_plane_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from concurrent import futures
import grpc

from apache_beam.runners.api import beam_fn_api_pb2
from apache_beam.portability.runners.api import beam_fn_api_pb2
from apache_beam.runners.worker import data_plane


Expand Down
2 changes: 1 addition & 1 deletion sdks/python/apache_beam/runners/worker/log_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import Queue as queue
import threading

from apache_beam.runners.api import beam_fn_api_pb2
from apache_beam.portability.runners.api import beam_fn_api_pb2
import grpc

# This module is experimental. No backwards-compatibility guarantees.
Expand Down
2 changes: 1 addition & 1 deletion sdks/python/apache_beam/runners/worker/log_handler_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from concurrent import futures
import grpc

from apache_beam.runners.api import beam_fn_api_pb2
from apache_beam.portability.runners.api import beam_fn_api_pb2
from apache_beam.runners.worker import log_handler


Expand Down
2 changes: 1 addition & 1 deletion sdks/python/apache_beam/runners/worker/sdk_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
from apache_beam.io import iobase
from apache_beam.runners.dataflow.native_io import iobase as native_iobase
from apache_beam.utils import counters
from apache_beam.runners.api import beam_fn_api_pb2
from apache_beam.portability.runners.api import beam_fn_api_pb2
from apache_beam.runners.worker import operation_specs
from apache_beam.runners.worker import operations

Expand Down
2 changes: 1 addition & 1 deletion sdks/python/apache_beam/runners/worker/sdk_worker_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import grpc
from google.protobuf import text_format

from apache_beam.runners.api import beam_fn_api_pb2
from apache_beam.portability.runners.api import beam_fn_api_pb2
from apache_beam.runners.worker.log_handler import FnApiLogRecordHandler
from apache_beam.runners.worker.sdk_worker import SdkHarness

Expand Down
2 changes: 1 addition & 1 deletion sdks/python/apache_beam/runners/worker/sdk_worker_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

from apache_beam.io.concat_source_test import RangeSource
from apache_beam.io.iobase import SourceBundle
from apache_beam.runners.api import beam_fn_api_pb2
from apache_beam.portability.runners.api import beam_fn_api_pb2
from apache_beam.runners.worker import data_plane
from apache_beam.runners.worker import sdk_worker

Expand Down
2 changes: 1 addition & 1 deletion sdks/python/apache_beam/transforms/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from apache_beam import typehints
from apache_beam.coders import typecoders
from apache_beam.internal import util
from apache_beam.runners.api import beam_runner_api_pb2
from apache_beam.portability.runners.api import beam_runner_api_pb2
from apache_beam.transforms import ptransform
from apache_beam.transforms.display import DisplayDataItem
from apache_beam.transforms.display import HasDisplayData
Expand Down
2 changes: 1 addition & 1 deletion sdks/python/apache_beam/transforms/ptransform.py
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,7 @@ def register_urn(cls, urn, parameter_type, constructor):
cls._known_urns[urn] = parameter_type, constructor

def to_runner_api(self, context):
from apache_beam.runners.api import beam_runner_api_pb2
from apache_beam.portability.runners.api import beam_runner_api_pb2
urn, typed_param = self.to_runner_api_parameter(context)
return beam_runner_api_pb2.FunctionSpec(
urn=urn,
Expand Down
2 changes: 1 addition & 1 deletion sdks/python/apache_beam/transforms/trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
from apache_beam.transforms.window import TimestampCombiner
from apache_beam.transforms.window import WindowedValue
from apache_beam.transforms.window import WindowFn
from apache_beam.runners.api import beam_runner_api_pb2
from apache_beam.portability.runners.api import beam_runner_api_pb2
from apache_beam.utils.timestamp import MAX_TIMESTAMP
from apache_beam.utils.timestamp import MIN_TIMESTAMP

Expand Down
4 changes: 2 additions & 2 deletions sdks/python/apache_beam/transforms/window.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@
from google.protobuf import timestamp_pb2

from apache_beam.coders import coders
from apache_beam.runners.api import beam_runner_api_pb2
from apache_beam.runners.api import standard_window_fns_pb2
from apache_beam.portability.runners.api import beam_runner_api_pb2
from apache_beam.portability.runners.api import standard_window_fns_pb2
from apache_beam.transforms import timeutil
from apache_beam.utils import proto_utils
from apache_beam.utils import urns
Expand Down
2 changes: 1 addition & 1 deletion sdks/python/apache_beam/utils/urns.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ def to_runner_api(self, context):
Prefer overriding self.to_runner_api_parameter.
"""
from apache_beam.runners.api import beam_runner_api_pb2
from apache_beam.portability.runners.api import beam_runner_api_pb2
urn, typed_param = self.to_runner_api_parameter(context)
return beam_runner_api_pb2.SdkFunctionSpec(
spec=beam_runner_api_pb2.FunctionSpec(
Expand Down
2 changes: 1 addition & 1 deletion sdks/python/gen_protos.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
os.path.join('..', 'common', 'fn-api', 'src', 'main', 'proto')
]

PYTHON_OUTPUT_PATH = os.path.join('apache_beam', 'runners', 'api')
PYTHON_OUTPUT_PATH = os.path.join('apache_beam', 'portability', 'runners', 'api')


def generate_proto_files():
Expand Down
2 changes: 1 addition & 1 deletion sdks/python/run_pylint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ EXCLUDED_GENERATED_FILES=(
"apache_beam/io/gcp/internal/clients/storage/storage_v1_client.py"
"apache_beam/io/gcp/internal/clients/storage/storage_v1_messages.py"
"apache_beam/coders/proto2_coder_test_messages_pb2.py"
apache_beam/runners/api/*pb2*.py
apache_beam/portability/runners/api/*pb2*.py
)

FILES_TO_IGNORE=""
Expand Down

0 comments on commit 7689e43

Please sign in to comment.