Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MLTransform #26795

Merged
merged 68 commits into from
Jul 6, 2023
Merged

MLTransform #26795

merged 68 commits into from
Jul 6, 2023

Conversation

AnandInguva
Copy link
Contributor

@AnandInguva AnandInguva commented May 19, 2023

Design doc: https://docs.google.com/document/d/1rQkSm_8tseLqDQaLohtlCGqt5pvMaP0XIpPi5UD0LCQ/edit

This PR introduces TFTProcessHandlerSchema which takes a pcoll and perform user provided data processing transforms. TFTProcessHandler uses tensorflow_transform's beam implementation for computing results.

This PR aims to provide a simple to use API which abstracts the user away from the complexities that one can find with using tensorflow transform such as providing a tensorflow feature spec etc.

Not considered/implemented in this PR

Alternative design - Alternative way to pass columns to the transforms

Instead of passing columns to the every data transform, we can let MLTransform take a param columns and assume that all the data transforms within the scope of that MLTransform are applicable only on the input columns passed.

But this could have a downside if the user calls multple MLTransforms, then there will be fusion break(due to tft internal implementation)

 with beam.Pipeline() as p:
      transforms = [ComputeAndApplyVocab(...), TFIDF(...)]
      result = (
          p
          | "Create" >> beam.Create(input_data)
          | beam.Map(lambda x: ComputeAndVocabBatchedInputType(**x)
                     ).with_output_types(ComputeAndVocabBatchedInputType)
          | "MLTransform" >>
          base.MLTransform(columns=['x'], transforms=transforms)

Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI.

@codecov
Copy link

codecov bot commented May 19, 2023

Codecov Report

Merging #26795 (a315091) into master (33b3437) will decrease coverage by 0.45%.
The diff coverage is 14.70%.

@@            Coverage Diff             @@
##           master   #26795      +/-   ##
==========================================
- Coverage   71.50%   71.05%   -0.45%     
==========================================
  Files         858      858              
  Lines      104809   104076     -733     
==========================================
- Hits        74944    73955     -989     
- Misses      28317    28573     +256     
  Partials     1548     1548              
Flag Coverage Δ
python 80.25% <14.70%> (-0.59%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
sdks/python/apache_beam/ml/transforms/utils.py 0.00% <0.00%> (ø)
sdks/python/apache_beam/ml/transforms/tft.py 6.97% <6.97%> (ø)
sdks/python/apache_beam/ml/transforms/handlers.py 7.00% <7.00%> (ø)
sdks/python/apache_beam/ml/transforms/base.py 81.08% <81.08%> (ø)

... and 24 files with indirect coverage changes

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

Copy link
Contributor

@damccorm damccorm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didn't do a thorough review, but the approach here looks pretty good to me

sdks/python/apache_beam/ml/transforms/base.py Outdated Show resolved Hide resolved
sdks/python/apache_beam/ml/transforms/base.py Outdated Show resolved Hide resolved
sdks/python/apache_beam/ml/transforms/handlers.py Outdated Show resolved Hide resolved
sdks/python/apache_beam/ml/transforms/handlers.py Outdated Show resolved Hide resolved
fields = row_type._inner_types()
return Dict[str, Union[tuple(fields)]]

def _get_artifact_location(self, pipeline: beam.Pipeline):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of dumping this in the staging directory or a temp directory, should we require users to provide an output directory? Presumably, users will want a well defined location for retrieving their artifacts

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added it as optional. If user doesn't provide it, I am falling back to this approach

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think the staging/temp directory makes sense as a default--if there are artifacts to be produced/consumed, this should be a required argument.

sdks/python/apache_beam/ml/transforms/tft_transforms.py Outdated Show resolved Hide resolved
sdks/python/apache_beam/ml/transforms/tft_transforms.py Outdated Show resolved Hide resolved
@AnandInguva AnandInguva marked this pull request as ready for review June 1, 2023 21:27
@AnandInguva AnandInguva changed the title [DRAFT] MLTransform and TFTProcessHandler MLTransform and TFTProcessHandler Jun 1, 2023
@github-actions
Copy link
Contributor

github-actions bot commented Jun 2, 2023

Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment assign set of reviewers

@github-actions
Copy link
Contributor

github-actions bot commented Jun 2, 2023

Assigning reviewers. If you would like to opt out of this review, comment assign to next reviewer:

R: @jrmccluskey for label python.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

@AnandInguva
Copy link
Contributor Author

R: @tvalentyn

@github-actions
Copy link
Contributor

github-actions bot commented Jun 5, 2023

Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control

pass types
Support Pyarrow schema
Artifact WIP
WIP on inferring types
Remove pyarrow implementation
Add MLTransformOutput
Refactor files
Fix artifacts code
Add more tests
fix lint erors
Change namespaces from ml_transform to transforms
Add doc strings
Add tests and refactor
Sort imports
Add metrics namespaces
Refactor
…d address PR comments

Add skip conditions for tests
Add test suite for tft tests
Try except in __init__.py
Remove imports from __init__
Add docstrings, refactor
Mock tensorflow_transform in pydocs
fix tft pypi name

Skip a test
Add step name
Update  supported versions of TFT
@@ -0,0 +1,340 @@
#
Copy link
Contributor Author

@AnandInguva AnandInguva Jun 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a better name for this file would transforms/tft.py.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we rename (here or in a follow on PR)?

@AnandInguva AnandInguva changed the title MLTransform and TFTProcessHandler MLTransform Jun 26, 2023

import apache_beam as beam

# TODO: Abstract methods are not getting pickled with dill.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this TODO still apply? What are the consequences?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not relevant anymore. I tried today and I wasn't able to reproduce it now

sdks/python/apache_beam/ml/transforms/handlers_test.py Outdated Show resolved Hide resolved
@@ -326,6 +326,12 @@ commands =
# Run all DataFrame API unit tests
bash {toxinidir}/scripts/run_pytest.sh {envname} 'apache_beam/dataframe'

[testenv:py{38,39}-tft-113]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason to limit this to 3.8/9 (and not 3.10/11)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And should we trigger any non-3.8 versions in a precommit? Maybe 3.11 to get lowest/highest?

Copy link
Contributor Author

@AnandInguva AnandInguva Jun 29, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tfx_bsl only available in 3.8 and 3.9 python - https://pypi.org/project/tfx-bsl/#files.

I have asked for update on other python versions,

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, I forgot this was known.

sdks/python/apache_beam/ml/transforms/base.py Outdated Show resolved Hide resolved
# we will convert scalar values to list values and TFT will ouput
# numpy array all the time.
if not self.is_input_record_batches:
raw_data |= beam.ParDo(ConvertScalarValuesToListValues())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than having the is_input_record_batches param here, could we:

  1. do a type check in ConvertScalarValuesToListValues and no-op if its a record batch
  2. introspect schema to determine if it needs the TensorAdapter

Not totally sure about the second piece, but if we can do something like that it would be very helpful. I don't like the idea of having is_input_record_batches as a top level config, especially once we extend this to other frameworks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main advantage is that if we can kill this, it defers the problem of having framework specific inputs. With that said, I think we need an idea of how we'll solve that regardless

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I guess we'll need to solve this now for the output_record_batches param anyways

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to know before pipeline runtime whether it is a record batch or dict to determine the right schema.

So during pipeline construction, I use this flag to construct respective schema. I don't think this can be inferable later

Comment on lines 94 to 95
is_input_record_batches: bool = False,
output_record_batches: bool = False,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we need something like this in MLTransform itself, I'd significantly prefer a public TFTProcessHandler to take in that config. These options will be meaningless for most other frameworks we might support.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One option would be to make these properties of the operations instead of the top level transform. Chained TFT operations could then just use the values from the first/last operation.

I also expect that we're going to run into this problem with other frameworks in the future, so I think we need a way for adding additional framework or transform specific parameters.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that would be my preferred experience. So users could do something like:

MLTransform(
            artifact_location=args.artifact_location,
            artifact_mode=ArtifactMode.PRODUCE,
        ).with_transform(ComputeAndApplyVocabulary(
            columns=['x'], is_input_record_batches=True)).with_transform(TFIDF(columns=['x'], output_input_record_batches=True))

For TFT, we'd then fuse together any consecutive TFT transforms into a single TFTProcessHandler, resolve any conflicting arguments (e.g. throw if one transform says output_record_batches and the next doesn't take recordBatches or something), and construct the graph

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

info about record_batch lives within the context of TFTProcessHandler. I don't like the idea of passing this arg via operations since in the operations, we don't use this arg anywhere in the operation. It would be just a different way of inferring this arg in TFTProcessHandler.

Alternative would be MLTransform would take a config: Dict[str, Any], in which we will instruct users to pass certain kind of args such as {'out_record_batch' : True}, we could pass this config arg to the respective process handler and discard irrelevant args to that ProcessHandler.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't use this arg anywhere in the operation

I think from a user's point of view we use it in the first one; is_input_record_batches=True just means "I'm expecting that its ok to pass record batches into the first transform, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now, the columns are specified at operation level instead of transform level. The entry point for column x could be at the beginning but the entry point at column y could be in the middle of the list. User might pass input_record_batch=True to the y column if they doesn't understand exactly what we were instructing.

If we ask the user to provide like this, I feel like it could get a little complicated

transforms = [
   op1(columns=['x']),
   op2(columns=['y', input_record_batch=True],
   op3(columns=['z'] 
]

We can also iterate on this in the v2 since I guess this needs another discussion and remove the option for Record batches for now. We would support Dict[str, Any] in V1. what do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm comfortable iterating on it in v2

sdks/python/apache_beam/ml/transforms/base.py Outdated Show resolved Hide resolved
sdks/python/apache_beam/ml/transforms/handlers.py Outdated Show resolved Hide resolved
if not os.path.exists(os.path.join(
self.artifact_location, RAW_DATA_METADATA_DIR, SCHEMA_FILE)):
raise FileNotFoundError(
"Raw data metadata not found at %s" %
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should be more descriptive in these errors (something along the lines of "you're running in consume mode" what that means, and "have you run this in produce mode?")

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed wording

Comment on lines 29 to 32
should be wrapped inside a TFTProcessHandler object before being passed
to the beam.ml.MLTransform class. The TFTProcessHandler will let MLTransform
know which type of input is expected and infers the relevant schema required
for the TFT library.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is outdated

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated

@AnandInguva
Copy link
Contributor Author

@damccorm removed pyarrow record batch. Addressed other comments as well. PTAL

ref: #26640 (comment)

Copy link
Contributor

@damccorm damccorm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this LGTM for our initial pass and we can follow up on everything else in future PRs. Known work that remains that I'm aware of:

  1. Figure out how to support record batch types (and more generally, ProcessHandler specific parameters)
  2. Follow up with tfx_bsl team to get 3.10+ support
  3. (minor) rename tft_transforms.py to tft.py
  4. Support remaining operations
  5. Future work from https://docs.google.com/document/d/1rQkSm_8tseLqDQaLohtlCGqt5pvMaP0XIpPi5UD0LCQ/edit#heading=h.2jxsxqh

Lets sure we are tracking all of those somewhere (could be an issue or a doc, please just share whatever you choose)

@@ -0,0 +1,340 @@
#
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we rename (here or in a follow on PR)?

@@ -326,6 +326,12 @@ commands =
# Run all DataFrame API unit tests
bash {toxinidir}/scripts/run_pytest.sh {envname} 'apache_beam/dataframe'

[testenv:py{38,39}-tft-113]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, I forgot this was known.

@AnandInguva
Copy link
Contributor Author

R: @rszper

@AnandInguva
Copy link
Contributor Author

R: @rszper

Actually, I am thinking I will iterate on the docs in the following PRs. Docs will include pydocs edits and doc pages as well for users

@AnandInguva AnandInguva merged commit b529efe into apache:master Jul 6, 2023
59 of 61 checks passed
andreydevyatkin pushed a commit to akvelon/beam that referenced this pull request Jul 7, 2023
* Initial work on MLTransform and ProcessHandler

* Support for containers: List, Dict[str, np.ndarray]
pass types
Support Pyarrow schema
Artifact WIP

* Add  min, max, artifacts for scale_0_to_1

* Add more transform functions and artifacts
WIP on inferring types
Remove pyarrow implementation
Add MLTransformOutput
Refactor files

* Add generic type annotations

* Add unit tests
Fix artifacts code
Add more tests
fix lint erors
Change namespaces from ml_transform to transforms
Add doc strings
Add tests and refactor

* Add support for saving intermediate results for a transform
Sort imports
Add metrics namespaces
Refactor

* Add schema to the output PCollection

* Remove MLTransformOutput and return Row instead with schema

* Convert primitive type to list using a DoFn. Remove FixedLenFeatureSpec

Make VarLenFeatureSpec as default
Refactoring

* Add append_transform to the ProcessHandler
Some more refactoring

* Remove param self.has_artifacts, add artifact_location to handler..and address PR comments
Add skip conditions for tests
Add test suite for tft tests

* Move tensorflow import into the try except catch
Try except in __init__.py
Remove imports from __init__
Add docstrings, refactor

* Add type annotations for the data transforms

* Add tft test in tox.ini

Mock tensorflow_transform in pydocs
fix tft pypi name

Skip a test
Add step name
Update  supported versions of TFT

* Add step name for TFTProcessHandler

* Remove unsupported tft versions

* Fix mypy

* Refactor TFTProcessHandlerDict to TFTProcessHandlerSchema

* Update doc for data processing transforms

* Fix checking the typing container types

* Refactor code

* Fail TFTProcessHandler on a non-global window PColl

* Remove underscore

* Remove high level functions

* Add TFIDF

* Fix tests with new changes[WIP]

* Fix tests

* Refactor class name to CamelCase and remove kwrags

* use is_default instead of isinstance

* Remove falling back to staging location for artifact location

* Add TFIDF tests

* Remove __str__

* Refactor skip statement

* Add utils for fetching artifacts on compute and apply vocab

* Make ProcessHandler internal class

* Only run analyze stage when transform_fn(artifacts) is not computed before.

* Fail if pipeline has non default window during artifact producing stage

* Add support for Dict, recordbatch and introduce artifact_mode

* Hide process_handler from user. Make TFTProcessHandler as default

* Refactor few tests

* Comment a test

* Save raw_data_meta_data so that it can be used during consume stage

* Refactor code

* Add test on artifacts

* Fix imports

* Add tensorflow_metadata to pydocs

* Fix test

* Add TFIDF to import

* Add basic example

* Remove redundant logging statements

* Add test for multiple columns on MLTransform

* Add todo about what to do when new process handler is introduced

* Add abstractmethod decorator

* Edit Error message

* Update docs, error messages

* Remove record batch input/output arg

* Modify generic types

* Fix import sort

* Fix mypy errors - best effort

* Fix tests

* Add TFTOperation doc

* Rename tft_transform  to tft

* Fix hadler_test

* Fix base_test

* Fix pydocs
aleksandr-dudko pushed a commit to aleksandr-dudko/beam that referenced this pull request Jul 10, 2023
* Initial work on MLTransform and ProcessHandler

* Support for containers: List, Dict[str, np.ndarray]
pass types
Support Pyarrow schema
Artifact WIP

* Add  min, max, artifacts for scale_0_to_1

* Add more transform functions and artifacts
WIP on inferring types
Remove pyarrow implementation
Add MLTransformOutput
Refactor files

* Add generic type annotations

* Add unit tests
Fix artifacts code
Add more tests
fix lint erors
Change namespaces from ml_transform to transforms
Add doc strings
Add tests and refactor

* Add support for saving intermediate results for a transform
Sort imports
Add metrics namespaces
Refactor

* Add schema to the output PCollection

* Remove MLTransformOutput and return Row instead with schema

* Convert primitive type to list using a DoFn. Remove FixedLenFeatureSpec

Make VarLenFeatureSpec as default
Refactoring

* Add append_transform to the ProcessHandler
Some more refactoring

* Remove param self.has_artifacts, add artifact_location to handler..and address PR comments
Add skip conditions for tests
Add test suite for tft tests

* Move tensorflow import into the try except catch
Try except in __init__.py
Remove imports from __init__
Add docstrings, refactor

* Add type annotations for the data transforms

* Add tft test in tox.ini

Mock tensorflow_transform in pydocs
fix tft pypi name

Skip a test
Add step name
Update  supported versions of TFT

* Add step name for TFTProcessHandler

* Remove unsupported tft versions

* Fix mypy

* Refactor TFTProcessHandlerDict to TFTProcessHandlerSchema

* Update doc for data processing transforms

* Fix checking the typing container types

* Refactor code

* Fail TFTProcessHandler on a non-global window PColl

* Remove underscore

* Remove high level functions

* Add TFIDF

* Fix tests with new changes[WIP]

* Fix tests

* Refactor class name to CamelCase and remove kwrags

* use is_default instead of isinstance

* Remove falling back to staging location for artifact location

* Add TFIDF tests

* Remove __str__

* Refactor skip statement

* Add utils for fetching artifacts on compute and apply vocab

* Make ProcessHandler internal class

* Only run analyze stage when transform_fn(artifacts) is not computed before.

* Fail if pipeline has non default window during artifact producing stage

* Add support for Dict, recordbatch and introduce artifact_mode

* Hide process_handler from user. Make TFTProcessHandler as default

* Refactor few tests

* Comment a test

* Save raw_data_meta_data so that it can be used during consume stage

* Refactor code

* Add test on artifacts

* Fix imports

* Add tensorflow_metadata to pydocs

* Fix test

* Add TFIDF to import

* Add basic example

* Remove redundant logging statements

* Add test for multiple columns on MLTransform

* Add todo about what to do when new process handler is introduced

* Add abstractmethod decorator

* Edit Error message

* Update docs, error messages

* Remove record batch input/output arg

* Modify generic types

* Fix import sort

* Fix mypy errors - best effort

* Fix tests

* Add TFTOperation doc

* Rename tft_transform  to tft

* Fix hadler_test

* Fix base_test

* Fix pydocs
aleksandr-dudko pushed a commit to aleksandr-dudko/beam that referenced this pull request Jul 17, 2023
* Initial work on MLTransform and ProcessHandler

* Support for containers: List, Dict[str, np.ndarray]
pass types
Support Pyarrow schema
Artifact WIP

* Add  min, max, artifacts for scale_0_to_1

* Add more transform functions and artifacts
WIP on inferring types
Remove pyarrow implementation
Add MLTransformOutput
Refactor files

* Add generic type annotations

* Add unit tests
Fix artifacts code
Add more tests
fix lint erors
Change namespaces from ml_transform to transforms
Add doc strings
Add tests and refactor

* Add support for saving intermediate results for a transform
Sort imports
Add metrics namespaces
Refactor

* Add schema to the output PCollection

* Remove MLTransformOutput and return Row instead with schema

* Convert primitive type to list using a DoFn. Remove FixedLenFeatureSpec

Make VarLenFeatureSpec as default
Refactoring

* Add append_transform to the ProcessHandler
Some more refactoring

* Remove param self.has_artifacts, add artifact_location to handler..and address PR comments
Add skip conditions for tests
Add test suite for tft tests

* Move tensorflow import into the try except catch
Try except in __init__.py
Remove imports from __init__
Add docstrings, refactor

* Add type annotations for the data transforms

* Add tft test in tox.ini

Mock tensorflow_transform in pydocs
fix tft pypi name

Skip a test
Add step name
Update  supported versions of TFT

* Add step name for TFTProcessHandler

* Remove unsupported tft versions

* Fix mypy

* Refactor TFTProcessHandlerDict to TFTProcessHandlerSchema

* Update doc for data processing transforms

* Fix checking the typing container types

* Refactor code

* Fail TFTProcessHandler on a non-global window PColl

* Remove underscore

* Remove high level functions

* Add TFIDF

* Fix tests with new changes[WIP]

* Fix tests

* Refactor class name to CamelCase and remove kwrags

* use is_default instead of isinstance

* Remove falling back to staging location for artifact location

* Add TFIDF tests

* Remove __str__

* Refactor skip statement

* Add utils for fetching artifacts on compute and apply vocab

* Make ProcessHandler internal class

* Only run analyze stage when transform_fn(artifacts) is not computed before.

* Fail if pipeline has non default window during artifact producing stage

* Add support for Dict, recordbatch and introduce artifact_mode

* Hide process_handler from user. Make TFTProcessHandler as default

* Refactor few tests

* Comment a test

* Save raw_data_meta_data so that it can be used during consume stage

* Refactor code

* Add test on artifacts

* Fix imports

* Add tensorflow_metadata to pydocs

* Fix test

* Add TFIDF to import

* Add basic example

* Remove redundant logging statements

* Add test for multiple columns on MLTransform

* Add todo about what to do when new process handler is introduced

* Add abstractmethod decorator

* Edit Error message

* Update docs, error messages

* Remove record batch input/output arg

* Modify generic types

* Fix import sort

* Fix mypy errors - best effort

* Fix tests

* Add TFTOperation doc

* Rename tft_transform  to tft

* Fix hadler_test

* Fix base_test

* Fix pydocs
bullet03 pushed a commit to akvelon/beam that referenced this pull request Aug 11, 2023
* Initial work on MLTransform and ProcessHandler

* Support for containers: List, Dict[str, np.ndarray]
pass types
Support Pyarrow schema
Artifact WIP

* Add  min, max, artifacts for scale_0_to_1

* Add more transform functions and artifacts
WIP on inferring types
Remove pyarrow implementation
Add MLTransformOutput
Refactor files

* Add generic type annotations

* Add unit tests
Fix artifacts code
Add more tests
fix lint erors
Change namespaces from ml_transform to transforms
Add doc strings
Add tests and refactor

* Add support for saving intermediate results for a transform
Sort imports
Add metrics namespaces
Refactor

* Add schema to the output PCollection

* Remove MLTransformOutput and return Row instead with schema

* Convert primitive type to list using a DoFn. Remove FixedLenFeatureSpec

Make VarLenFeatureSpec as default
Refactoring

* Add append_transform to the ProcessHandler
Some more refactoring

* Remove param self.has_artifacts, add artifact_location to handler..and address PR comments
Add skip conditions for tests
Add test suite for tft tests

* Move tensorflow import into the try except catch
Try except in __init__.py
Remove imports from __init__
Add docstrings, refactor

* Add type annotations for the data transforms

* Add tft test in tox.ini

Mock tensorflow_transform in pydocs
fix tft pypi name

Skip a test
Add step name
Update  supported versions of TFT

* Add step name for TFTProcessHandler

* Remove unsupported tft versions

* Fix mypy

* Refactor TFTProcessHandlerDict to TFTProcessHandlerSchema

* Update doc for data processing transforms

* Fix checking the typing container types

* Refactor code

* Fail TFTProcessHandler on a non-global window PColl

* Remove underscore

* Remove high level functions

* Add TFIDF

* Fix tests with new changes[WIP]

* Fix tests

* Refactor class name to CamelCase and remove kwrags

* use is_default instead of isinstance

* Remove falling back to staging location for artifact location

* Add TFIDF tests

* Remove __str__

* Refactor skip statement

* Add utils for fetching artifacts on compute and apply vocab

* Make ProcessHandler internal class

* Only run analyze stage when transform_fn(artifacts) is not computed before.

* Fail if pipeline has non default window during artifact producing stage

* Add support for Dict, recordbatch and introduce artifact_mode

* Hide process_handler from user. Make TFTProcessHandler as default

* Refactor few tests

* Comment a test

* Save raw_data_meta_data so that it can be used during consume stage

* Refactor code

* Add test on artifacts

* Fix imports

* Add tensorflow_metadata to pydocs

* Fix test

* Add TFIDF to import

* Add basic example

* Remove redundant logging statements

* Add test for multiple columns on MLTransform

* Add todo about what to do when new process handler is introduced

* Add abstractmethod decorator

* Edit Error message

* Update docs, error messages

* Remove record batch input/output arg

* Modify generic types

* Fix import sort

* Fix mypy errors - best effort

* Fix tests

* Add TFTOperation doc

* Rename tft_transform  to tft

* Fix hadler_test

* Fix base_test

* Fix pydocs
cushon pushed a commit to cushon/beam that referenced this pull request May 24, 2024
* Initial work on MLTransform and ProcessHandler

* Support for containers: List, Dict[str, np.ndarray]
pass types
Support Pyarrow schema
Artifact WIP

* Add  min, max, artifacts for scale_0_to_1

* Add more transform functions and artifacts
WIP on inferring types
Remove pyarrow implementation
Add MLTransformOutput
Refactor files

* Add generic type annotations

* Add unit tests
Fix artifacts code
Add more tests
fix lint erors
Change namespaces from ml_transform to transforms
Add doc strings
Add tests and refactor

* Add support for saving intermediate results for a transform
Sort imports
Add metrics namespaces
Refactor

* Add schema to the output PCollection

* Remove MLTransformOutput and return Row instead with schema

* Convert primitive type to list using a DoFn. Remove FixedLenFeatureSpec

Make VarLenFeatureSpec as default
Refactoring

* Add append_transform to the ProcessHandler
Some more refactoring

* Remove param self.has_artifacts, add artifact_location to handler..and address PR comments
Add skip conditions for tests
Add test suite for tft tests

* Move tensorflow import into the try except catch
Try except in __init__.py
Remove imports from __init__
Add docstrings, refactor

* Add type annotations for the data transforms

* Add tft test in tox.ini

Mock tensorflow_transform in pydocs
fix tft pypi name

Skip a test
Add step name
Update  supported versions of TFT

* Add step name for TFTProcessHandler

* Remove unsupported tft versions

* Fix mypy

* Refactor TFTProcessHandlerDict to TFTProcessHandlerSchema

* Update doc for data processing transforms

* Fix checking the typing container types

* Refactor code

* Fail TFTProcessHandler on a non-global window PColl

* Remove underscore

* Remove high level functions

* Add TFIDF

* Fix tests with new changes[WIP]

* Fix tests

* Refactor class name to CamelCase and remove kwrags

* use is_default instead of isinstance

* Remove falling back to staging location for artifact location

* Add TFIDF tests

* Remove __str__

* Refactor skip statement

* Add utils for fetching artifacts on compute and apply vocab

* Make ProcessHandler internal class

* Only run analyze stage when transform_fn(artifacts) is not computed before.

* Fail if pipeline has non default window during artifact producing stage

* Add support for Dict, recordbatch and introduce artifact_mode

* Hide process_handler from user. Make TFTProcessHandler as default

* Refactor few tests

* Comment a test

* Save raw_data_meta_data so that it can be used during consume stage

* Refactor code

* Add test on artifacts

* Fix imports

* Add tensorflow_metadata to pydocs

* Fix test

* Add TFIDF to import

* Add basic example

* Remove redundant logging statements

* Add test for multiple columns on MLTransform

* Add todo about what to do when new process handler is introduced

* Add abstractmethod decorator

* Edit Error message

* Update docs, error messages

* Remove record batch input/output arg

* Modify generic types

* Fix import sort

* Fix mypy errors - best effort

* Fix tests

* Add TFTOperation doc

* Rename tft_transform  to tft

* Fix hadler_test

* Fix base_test

* Fix pydocs
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants