Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Custom XCom GCS #681

Merged
merged 10 commits into from
Nov 29, 2022
Merged

Custom XCom GCS #681

merged 10 commits into from
Nov 29, 2022

Conversation

rajaths010494
Copy link
Contributor

Custom XCOM to upload xcom values to the GCS bucket for the given name.
Adding ENV AIRFLOW__CORE__XCOM_BACKEND: astronomer.providers.google.cloud.xcom_backends.gcs.GCSXComBackend in the docker-compose.yaml or where the airflow is running will enable the custom XCOM for GCS.

closes #596

@rajaths010494 rajaths010494 force-pushed the gcs-custom-xcom branch 2 times, most recently from 22c5d91 to a144bb3 Compare October 3, 2022 10:38
@codecov
Copy link

codecov bot commented Oct 3, 2022

Codecov Report

Base: 98.47% // Head: 98.49% // Increases project coverage by +0.02% 🎉

Coverage data is based on head (06c4d87) compared to base (99dd72d).
Patch coverage: 100.00% of modified lines in pull request are covered.

Additional details and impacted files
@@            Coverage Diff             @@
##             main     #681      +/-   ##
==========================================
+ Coverage   98.47%   98.49%   +0.02%     
==========================================
  Files          86       87       +1     
  Lines        4661     4724      +63     
==========================================
+ Hits         4590     4653      +63     
  Misses         71       71              
Impacted Files Coverage Δ
...onomer/providers/google/cloud/xcom_backends/gcs.py 100.00% <100.00%> (ø)

Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here.

☔ View full report at Codecov.
📢 Do you have feedback about the report comment? Let us know in this issue.

Copy link
Member

@uranusjr uranusjr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would restructure the implementation like this:

# Only implements the required interface and "forward" logic.
class GCSXCom(BaseXCom):
    def orm_deserialize_value(self) -> str:
        ...

    @staticmethod
    def serialize_value(value: Any, **kwargs) -> Any:
        value = _GCSXComBackend().write_and_upload_value(value, **kwargs)
        return BaseXCom.serialize_value(value)

    @staticmethod
    def deserialize_value(xcom: "XCom") -> Any:
        result = BaseXCom.deserialize_value(xcom)
        if isinstance(result, str) and result.startswith(GCSXComBackend.PREFIX):
            result = _GCSXComBackend().download_and_read_value(result)
        return result

# Implements the actual logic!
class _GCSXComBackend:
    def write_and_upload_value(self, value, **kwargs):
        ...

    def download_and_read_value(self, result):
        ...

The advantage to this is we can reference values with self, which enables us to cache things (for example the GCSHook) and make the environment variable retrievals lazy properties. This reduces import-time logic (makes startup fast) and can also simplify testing.

astronomer/providers/google/cloud/xcom_backends/gcs.py Outdated Show resolved Hide resolved
@rajaths010494
Copy link
Contributor Author

I would restructure the implementation like this:

# Only implements the required interface and "forward" logic.
class GCSXCom(BaseXCom):
    def orm_deserialize_value(self) -> str:
        ...

    @staticmethod
    def serialize_value(value: Any, **kwargs) -> Any:
        value = _GCSXComBackend().write_and_upload_value(value, **kwargs)
        return BaseXCom.serialize_value(value)

    @staticmethod
    def deserialize_value(xcom: "XCom") -> Any:
        result = BaseXCom.deserialize_value(xcom)
        if isinstance(result, str) and result.startswith(GCSXComBackend.PREFIX):
            result = _GCSXComBackend().download_and_read_value(result)
        return result

# Implements the actual logic!
class _GCSXComBackend:
    def write_and_upload_value(self, value, **kwargs):
        ...

    def download_and_read_value(self, result):
        ...

The advantage to this is we can reference values with self, which enables us to cache things (for example the GCSHook) and make the environment variable retrievals lazy properties. This reduces import-time logic (makes startup fast) and can also simplify testing.

Yes, i am doing it. Will need to modify test as well for the changes

@bharanidharan14
Copy link
Contributor

@kaxil @uranusjr @josh-fell Need your review on this PR

Comment on lines 44 to 65
TRANSFORM_DATASET = """
7.0,3.2,4.7,1.4
6.4,3.2,4.5,1.5
"""
TRAIN_DATASET = """
2,4.9,3.0,1.4,0.2
0,7.3,2.9,6.3,1.8
2,5.1,3.5,1.4,0.2
0,4.9,2.5,4.5,1.7
"""

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated changes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rebased branch

@kaxil kaxil merged commit 9f9fd12 into main Nov 29, 2022
@kaxil kaxil deleted the gcs-custom-xcom branch November 29, 2022 20:07
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add custom XCOM backends - GCS
5 participants