Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Generate external transform wrappers using a script #29834

Merged
merged 72 commits into from
Feb 22, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
72 commits
Select commit Hold shift + click to select a range
f80f1ac
checkpoint
ahmedabu98 Dec 13, 2023
2517445
Merge branch 'master' of https://github.com/ahmedabu98/beam into gen_…
ahmedabu98 Dec 19, 2023
e5bf704
gen_xlang_wrappers workflow
ahmedabu98 Dec 20, 2023
9295861
add tests; integrate with setup.py
ahmedabu98 Dec 20, 2023
1ec6c11
remove duplicate changes; adjust transform positions in standard serv…
ahmedabu98 Dec 20, 2023
f9bbf42
undo deleted line
ahmedabu98 Dec 20, 2023
6a2d3d2
add more config modifications
ahmedabu98 Dec 20, 2023
23450fd
warn when generation script not found
ahmedabu98 Dec 20, 2023
01dbc63
add jinja2 dependency; include script in MANIFEST.in; create a xlang …
ahmedabu98 Dec 20, 2023
0bc4ff4
add MarkupSafe==2.1.3 because jinja2 needs it
ahmedabu98 Dec 20, 2023
390d8f8
lints and fixes
ahmedabu98 Dec 20, 2023
45a71ea
jinja template abides more by lint/format rules in case yapf doesn't …
ahmedabu98 Dec 21, 2023
8847562
lint and fixes
ahmedabu98 Dec 21, 2023
18a157d
no yapf; use random dir name for tests
ahmedabu98 Dec 21, 2023
c64d75b
lint
ahmedabu98 Dec 22, 2023
7049f80
lint
ahmedabu98 Dec 22, 2023
ddd5b2f
format fix
ahmedabu98 Dec 22, 2023
91fe758
address comments
ahmedabu98 Jan 17, 2024
29d50ac
commit the transform config yaml file; in setup, only generate from t…
ahmedabu98 Jan 22, 2024
fc739fd
Merge branch 'master' of https://github.com/ahmedabu98/beam into gen_…
ahmedabu98 Jan 22, 2024
3f843dd
format generated files
ahmedabu98 Jan 22, 2024
4347cd4
add yapf dep
ahmedabu98 Jan 22, 2024
8093519
template fixes
ahmedabu98 Jan 23, 2024
1c8a17f
set default inside init
ahmedabu98 Jan 23, 2024
f33e3f8
yapf best effort
ahmedabu98 Jan 23, 2024
da03ff3
add support for test suites to spin up multiple services; add PreComm…
ahmedabu98 Jan 24, 2024
c356ba8
Merge branch 'master' of https://github.com/ahmedabu98/beam into gen_…
ahmedabu98 Jan 25, 2024
2690d0a
address comments
ahmedabu98 Jan 26, 2024
0b6827a
try catch imports
ahmedabu98 Jan 26, 2024
cf04b3d
camel_case_to_snake_case in provider module
ahmedabu98 Jan 26, 2024
a20e2a3
Merge branch 'master' of https://github.com/ahmedabu98/beam into gen_…
ahmedabu98 Jan 26, 2024
2bd52e8
lint
ahmedabu98 Jan 26, 2024
cc60dfd
skip unit tests if importing out of apache_beam
ahmedabu98 Jan 27, 2024
6672d61
generate wrappers in transforms/_external_transforms.py subdir and im…
ahmedabu98 Jan 29, 2024
7d81370
add _external_transforms package
ahmedabu98 Jan 30, 2024
1c8cafc
lint
ahmedabu98 Jan 30, 2024
fe02266
fix precommit workflow name
ahmedabu98 Jan 30, 2024
37d844e
add workflow_dispatch option
ahmedabu98 Jan 30, 2024
dcc37fa
address comments; add description to CHANGES.md
ahmedabu98 Feb 1, 2024
346346f
resolve branch conflict
ahmedabu98 Feb 1, 2024
3da5f1f
Merge branch 'master' of https://github.com/ahmedabu98/beam into gen_…
ahmedabu98 Feb 1, 2024
d1f103e
python deps
ahmedabu98 Feb 1, 2024
92664fb
manually add greenlet dep
ahmedabu98 Feb 2, 2024
11fbd99
address comments
ahmedabu98 Feb 5, 2024
51c2086
add xlang __init__
ahmedabu98 Feb 5, 2024
19358f0
lint
ahmedabu98 Feb 6, 2024
128fdbc
Merge branch 'master' of https://github.com/ahmedabu98/beam into gen_…
ahmedabu98 Feb 6, 2024
1c32d2a
remove jinja from test dependencies; allow option to pass in extra de…
ahmedabu98 Feb 6, 2024
9cb5a98
address comments; always generate wrappers
ahmedabu98 Feb 7, 2024
a20154d
add greenlet back
ahmedabu98 Feb 7, 2024
f28b053
better link to docs
ahmedabu98 Feb 7, 2024
da8d51d
don't add unneeded io direct test suite
ahmedabu98 Feb 8, 2024
538b40a
add bounds to jinja2 install
ahmedabu98 Feb 8, 2024
27b7f6a
Merge branch 'master' of https://github.com/ahmedabu98/beam into gen_…
ahmedabu98 Feb 12, 2024
889b17c
try pass import; command only generates config; install pyyaml before…
ahmedabu98 Feb 12, 2024
5dd9477
lint
ahmedabu98 Feb 12, 2024
9568de5
let python automatically start up expansion services instead of manua…
ahmedabu98 Feb 15, 2024
1f49d75
remove merge conflict
ahmedabu98 Feb 15, 2024
bdf290f
Merge branch 'master' into gen_wrappers_script
ahmedabu98 Feb 15, 2024
c96ade5
touch postcommit files to trigger GHA
ahmedabu98 Feb 15, 2024
082295d
Merge branch 'gen_wrappers_script' of https://github.com/ahmedabu98/b…
ahmedabu98 Feb 15, 2024
28146bd
rename to external_provider_it_test.py to avoid running on unit test …
ahmedabu98 Feb 15, 2024
5f9a227
rename tests ..Test -> ..IT
ahmedabu98 Feb 15, 2024
06d5fd5
small adjustments to pass python unit tests: import script from file …
ahmedabu98 Feb 16, 2024
e42b3fe
run tests only when expansion jars are built
ahmedabu98 Feb 16, 2024
0d11059
load the script after importing
ahmedabu98 Feb 16, 2024
c1b29f3
skip test if jars not built
ahmedabu98 Feb 16, 2024
620344c
Merge branch 'master' of https://github.com/ahmedabu98/beam into gen_…
ahmedabu98 Feb 17, 2024
8fdb02a
Merge branch 'master' into gen_wrappers_script
ahmedabu98 Feb 21, 2024
35da8b9
touch postcommit files to trigger GHA
ahmedabu98 Feb 21, 2024
bc43578
Merge branch 'gen_wrappers_script' of https://github.com/ahmedabu98/b…
ahmedabu98 Feb 21, 2024
89e139e
correct command name (generateExternalTransformsConfig)
ahmedabu98 Feb 22, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
add tests; integrate with setup.py
  • Loading branch information
ahmedabu98 committed Dec 20, 2023
commit 92958611d0163d772e578d9e50bbd68ba91f8b0c
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,30 @@
#
import logging
import os
import shutil
import unittest

import pytest

import apache_beam as beam
import yaml
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to
from apache_beam.transforms.external import BeamJarExpansionService
from apache_beam.transforms.external_schematransform_provider import STANDARD_URN_PATTERN
from apache_beam.transforms.external_schematransform_provider import ExternalSchemaTransform
from apache_beam.transforms.external_schematransform_provider import ExternalSchemaTransformProvider
from apache_beam.transforms.external_schematransform_provider import camel_case_to_snake_case
from apache_beam.transforms.external_schematransform_provider import infer_name_from_identifier
from apache_beam.transforms.external_schematransform_provider import snake_case_to_lower_camel_case
from apache_beam.transforms.external_schematransform_provider import snake_case_to_upper_camel_case
from gen_xlang_wrappers import PYTHON_SUFFIX
from gen_xlang_wrappers import delete_generated_files
from gen_xlang_wrappers import generate_transform_configs
from gen_xlang_wrappers import get_wrappers_from_transform_configs
from gen_xlang_wrappers import run_script
from gen_xlang_wrappers import write_wrappers_to_destinations


class NameUtilsTest(unittest.TestCase):
Expand Down Expand Up @@ -135,6 +144,270 @@ def test_run_generate_sequence(self):
assert_that(numbers, equal_to([i for i in range(10)]))


@pytest.mark.uses_io_java_expansion_service
# @unittest.skipUnless(
# os.environ.get('EXPANSION_PORT'),
# "EXPANSION_PORT environment var is not provided.")
class AutoGenerationScriptTest(unittest.TestCase):
"""
This class tests the generation and regeneration operations in
`sdks/python/gen_xlang_wrappers.py`.
"""
TEST_DIR = os.path.join(
os.path.abspath(os.path.dirname(__file__)), 'test_gen_script')
SERVICE_CONFIG_PATH = os.path.join(
TEST_DIR, "test_expansion_service_config.yaml")
TRANSFORMS_CONFIG_PATH = os.path.join(TEST_DIR, "test_transform_config.yaml")
# tests cases will use GenerateSequence
GEN_SEQ_IDENTIFIER = \
'beam:schematransform:org.apache.beam:generate_sequence:v1'

def setUp(self):
shutil.rmtree(self.TEST_DIR, ignore_errors=True)
os.mkdir(self.TEST_DIR)

def tearDown(self):
shutil.rmtree(self.TEST_DIR)

def test_script_workflow(self):
expansion_service_config = {
"gradle_target": 'sdks:java:io:expansion-service:shadowJar',
'destinations': {
'python': 'apache_beam/transforms/test_gen_script'
}
}
with open(self.SERVICE_CONFIG_PATH, 'w') as f:
yaml.dump([expansion_service_config], f)

# test that transform config YAML file is created
generate_transform_configs(
self.SERVICE_CONFIG_PATH, self.TRANSFORMS_CONFIG_PATH)
self.assertTrue(os.path.exists(self.TRANSFORMS_CONFIG_PATH))

# test that transform config is populated correctly
with open(self.TRANSFORMS_CONFIG_PATH) as f:
transforms = yaml.safe_load(f)
gen_seq_config = None
for transform in transforms:
if transform['identifier'] == self.GEN_SEQ_IDENTIFIER:
gen_seq_config = transform
self.assertIsNotNone(gen_seq_config)
self.assertEqual(
gen_seq_config['default_service'],
expansion_service_config['gradle_target'])
self.assertEqual(gen_seq_config['name'], 'GenerateSequence')
self.assertEqual(
gen_seq_config['destinations']['python'],
'apache_beam/transforms/test_gen_script/generate_sequence')
self.assertIn("end", gen_seq_config['fields'])
self.assertIn("start", gen_seq_config['fields'])
self.assertIn("rate", gen_seq_config['fields'])

# test that the code for GenerateSequence is set to the right destination
grouped_wrappers = get_wrappers_from_transform_configs(
self.TRANSFORMS_CONFIG_PATH)
self.assertIn(
'apache_beam/transforms/test_gen_script/generate_sequence',
grouped_wrappers)
# only the GenerateSequence wrapper is set to this destination
self.assertEqual(
len(
grouped_wrappers[
'apache_beam/transforms/test_gen_script/generate_sequence']),
1)

# test that the correct destination is created
write_wrappers_to_destinations(grouped_wrappers)
self.assertTrue(
os.path.exists(
os.path.join(self.TEST_DIR, 'generate_sequence' + PYTHON_SUFFIX)))
# check the wrapper exists in this destination and has correct properties
from .test_gen_script.generate_sequence_et import GenerateSequence
self.assertTrue(
isinstance(GenerateSequence(start=0), ExternalSchemaTransform))
self.assertEqual(GenerateSequence.identifier, self.GEN_SEQ_IDENTIFIER)

# test that we successfully delete the destination
delete_generated_files(self.TEST_DIR)
self.assertFalse(
os.path.exists(
os.path.join(self.TEST_DIR, 'generate_sequence' + PYTHON_SUFFIX)))

def test_script_workflow_with_modified_transforms(self):
modified_name = 'ModifiedSequence'
modified_dest = \
'apache_beam/transforms/test_gen_script/new_dir/modified_gen_seq'
expansion_service_config = {
"gradle_target": 'sdks:java:io:expansion-service:shadowJar',
'destinations': {
'python': 'apache_beam/transforms/test_gen_script'
},
'transforms': {
'beam:schematransform:org.apache.beam:generate_sequence:v1': {
'name': modified_name,
'destinations': {
'python': modified_dest
}
}
}
}
os.mkdir(os.path.join(self.TEST_DIR, 'new_dir'))

with open(self.SERVICE_CONFIG_PATH, 'w') as f:
yaml.dump([expansion_service_config], f)

# test that transform config YAML file is successfully created
generate_transform_configs(
self.SERVICE_CONFIG_PATH, self.TRANSFORMS_CONFIG_PATH)
self.assertTrue(os.path.exists(self.TRANSFORMS_CONFIG_PATH))

# test that transform config is populated correctly
with open(self.TRANSFORMS_CONFIG_PATH) as f:
transforms = yaml.safe_load(f)
gen_seq_config = None
for transform in transforms:
if transform['identifier'] == self.GEN_SEQ_IDENTIFIER:
gen_seq_config = transform
self.assertIsNotNone(gen_seq_config)
self.assertEqual(
gen_seq_config['default_service'],
expansion_service_config['gradle_target'])
self.assertEqual(gen_seq_config['name'], modified_name)
self.assertEqual(gen_seq_config['destinations']['python'], modified_dest)

# test that the code for 'ModifiedSequence' is set to the right destination
grouped_wrappers = get_wrappers_from_transform_configs(
self.TRANSFORMS_CONFIG_PATH)
self.assertIn(modified_dest, grouped_wrappers)
self.assertIn(modified_name, grouped_wrappers[modified_dest][0])
# only one wrapper is set to this destination
self.assertEqual(len(grouped_wrappers[modified_dest]), 1)

# test that the modified destination is successfully created
write_wrappers_to_destinations(grouped_wrappers)
self.assertTrue(
os.path.exists(
os.path.join(
self.TEST_DIR, 'new_dir', 'modified_gen_seq' + PYTHON_SUFFIX)))
# check the modified wrapper exists in the modified destination
# and check it has the correct properties
from .test_gen_script.new_dir.modified_gen_seq_et import ModifiedSequence
self.assertTrue(
isinstance(ModifiedSequence(start=0), ExternalSchemaTransform))
self.assertEqual(ModifiedSequence.identifier, self.GEN_SEQ_IDENTIFIER)

# test that we successfully delete the destination
delete_generated_files(self.TEST_DIR)
self.assertFalse(
os.path.exists(
os.path.join(
self.TEST_DIR, 'new_dir', 'modified_gen_seq' + PYTHON_SUFFIX)))

def test_script_workflow_with_multiple_wrappers_same_destination(self):
modified_dest = 'apache_beam/transforms/test_gen_script/my_wrappers'
expansion_service_config = {
"gradle_target": 'sdks:java:io:expansion-service:shadowJar',
'destinations': {
'python': 'apache_beam/transforms/test_gen_script'
},
'transforms': {
'beam:schematransform:org.apache.beam:generate_sequence:v1': {
'destinations': {
'python': modified_dest
}
},
'beam:schematransform:org.apache.beam:kafka_read:v1': {
'destinations': {
'python': modified_dest
}
},
'beam:schematransform:org.apache.beam:kafka_write:v1': {
'destinations': {
'python': modified_dest
}
}
}
}

with open(self.SERVICE_CONFIG_PATH, 'w') as f:
yaml.dump([expansion_service_config], f)

# test that transform config YAML file is successfully created
generate_transform_configs(
self.SERVICE_CONFIG_PATH, self.TRANSFORMS_CONFIG_PATH)
self.assertTrue(os.path.exists(self.TRANSFORMS_CONFIG_PATH))

# test that our transform configs have the same destination
with open(self.TRANSFORMS_CONFIG_PATH) as f:
transforms = yaml.safe_load(f)
for transform in transforms:
if transform['identifier'] in expansion_service_config['transforms']:
self.assertEqual(transform['destinations']['python'], modified_dest)

grouped_wrappers = get_wrappers_from_transform_configs(
self.TRANSFORMS_CONFIG_PATH)
# check all 3 wrappers are set to this destination
self.assertEqual(len(grouped_wrappers[modified_dest]), 3)

# write wrappers to destination then check that all 3 exist there
write_wrappers_to_destinations(grouped_wrappers)
from .test_gen_script import my_wrappers_et
self.assertTrue(hasattr(my_wrappers_et, 'GenerateSequence'))
self.assertTrue(hasattr(my_wrappers_et, 'KafkaWrite'))
self.assertTrue(hasattr(my_wrappers_et, 'KafkaRead'))

def test_script_workflow_with_ignored_transform(self):
expansion_service_config = {
"gradle_target": 'sdks:java:io:expansion-service:shadowJar',
'destinations': {
'python': 'apache_beam/io'
},
'ignore': ['beam:schematransform:org.apache.beam:generate_sequence:v1']
}

with open(self.SERVICE_CONFIG_PATH, 'w') as f:
yaml.dump([expansion_service_config], f)

generate_transform_configs(
self.SERVICE_CONFIG_PATH, self.TRANSFORMS_CONFIG_PATH)

# test that transform config is populated correctly
ahmedabu98 marked this conversation as resolved.
Show resolved Hide resolved
with open(self.TRANSFORMS_CONFIG_PATH) as f:
transforms = yaml.safe_load(f)
gen_seq_config = None
for transform in transforms:
if transform['identifier'] == self.GEN_SEQ_IDENTIFIER:
gen_seq_config = transform
self.assertIsNone(gen_seq_config)

def test_run_pipeline_with_script_generated_transform(self):
expansion_service_config = {
"gradle_target": 'sdks:java:io:expansion-service:shadowJar',
'destinations': {
'python': 'apache_beam/transforms/test_gen_script'
},
'transforms': {
'beam:schematransform:org.apache.beam:generate_sequence:v1': {
'name': 'MyGenSeq',
'destinations': {
'python': 'apache_beam/transforms/test_gen_script/gen_seq'
}
}
}
}
with open(self.SERVICE_CONFIG_PATH, 'w') as f:
yaml.dump([expansion_service_config], f)

run_script(True, self.SERVICE_CONFIG_PATH, self.TRANSFORMS_CONFIG_PATH)
from .test_gen_script.gen_seq_et import MyGenSeq

with beam.Pipeline() as p:
numbers = (
p | MyGenSeq(start=0, end=10) | beam.Map(lambda row: row.value))

assert_that(numbers, equal_to([i for i in range(10)]))


if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
unittest.main()
Loading
Loading