Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LIN-531 Implement PythonOperatorPerArtifact #748

Merged
merged 31 commits into from
Aug 19, 2022
Merged

LIN-531 Implement PythonOperatorPerArtifact #748

merged 31 commits into from
Aug 19, 2022

Conversation

mingjerli
Copy link
Contributor

@mingjerli mingjerli commented Aug 17, 2022

Description

  • Rebase from PR-738, add PythonOperatorPerArtifact option for airflow pipeline output.
  • Wrap each calculation block with pickle.load and pickle.dump if the calculation has input variables and return variables
  • Wrap dag tasks within the content manager

There is some potential for refactoring the PythonOperatorPerSession and PythonOperatorPerArtifact; however, I think it will make more sense to do it when implementing the DockerOperator.

Fixes # (issue)

LIN-531

Type of change

  • New feature (non-breaking change which adds functionality)

How Has This Been Tested?

  • All local pipelines have been tested with local airflow

yoonspark and others added 28 commits August 5, 2022 12:48
@mingjerli mingjerli marked this pull request as ready for review August 17, 2022 22:42
@@ -252,7 +252,7 @@ def _get_input_variable_sources(self, pred_nodes) -> Dict[str, Set[str]]:

def _get_common_variables(
self, curr_nc: NodeCollection, pred_nc: NodeCollection
) -> Tuple[Set[str], Set[LineaID]]:
) -> Tuple[List[str], Set[LineaID]]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a chance of duplication now that you are returning a list instead of set? do you want to return a list of unique inner vars or are repeats ok?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this function no, since we only convert the set to list and sort it at the return statement.

"dag_flavor", AirflowDagFlavor.PythonOperatorPerSession
)
dag_flavor = AirflowDagFlavor[
self.dag_config.get("dag_flavor", "PythonOperatorPerSession")
Copy link
Contributor

@lionsardesai lionsardesai Aug 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why did we go to string instead of the prev enum?

I think you can use strenums here to jump between string and number to refer to an option.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is more UI reason. Just like when we specify framework for to_pipeline. It's easier for user to specify a string value of PythonOperatorPerSession than find out the class object.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is strenums in 3.7 standard library?

definition.
"""
input_var_loading_block = [
f"{var} = pickle.load(open('/tmp/{pipeline_name}/variable_{var}.pickle','rb'))"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if this is a public method, we'll need to validate pipeline_name. one way to do this is use aubhro's slugify method that he added to utils and ensure that the result is same as the input. if not, raise an excpetion that name is invalid or something.

Copy link
Contributor

@lionsardesai lionsardesai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks good!

@mingjerli mingjerli merged commit 2915a74 into main Aug 19, 2022
@lionsardesai lionsardesai deleted the LIN-531 branch September 19, 2022 16:26
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants