-
Notifications
You must be signed in to change notification settings - Fork 25
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Custom XCom GCS #681
Custom XCom GCS #681
Conversation
22c5d91
to
a144bb3
Compare
Codecov ReportBase: 98.47% // Head: 98.49% // Increases project coverage by
Additional details and impacted files@@ Coverage Diff @@
## main #681 +/- ##
==========================================
+ Coverage 98.47% 98.49% +0.02%
==========================================
Files 86 87 +1
Lines 4661 4724 +63
==========================================
+ Hits 4590 4653 +63
Misses 71 71
Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here. ☔ View full report at Codecov. |
347f904
to
97baf46
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would restructure the implementation like this:
# Only implements the required interface and "forward" logic.
class GCSXCom(BaseXCom):
def orm_deserialize_value(self) -> str:
...
@staticmethod
def serialize_value(value: Any, **kwargs) -> Any:
value = _GCSXComBackend().write_and_upload_value(value, **kwargs)
return BaseXCom.serialize_value(value)
@staticmethod
def deserialize_value(xcom: "XCom") -> Any:
result = BaseXCom.deserialize_value(xcom)
if isinstance(result, str) and result.startswith(GCSXComBackend.PREFIX):
result = _GCSXComBackend().download_and_read_value(result)
return result
# Implements the actual logic!
class _GCSXComBackend:
def write_and_upload_value(self, value, **kwargs):
...
def download_and_read_value(self, result):
...
The advantage to this is we can reference values with self
, which enables us to cache things (for example the GCSHook) and make the environment variable retrievals lazy properties. This reduces import-time logic (makes startup fast) and can also simplify testing.
Yes, i am doing it. Will need to modify test as well for the changes |
417c303
to
ed9c54b
Compare
@kaxil @uranusjr @josh-fell Need your review on this PR |
TRANSFORM_DATASET = """ | ||
7.0,3.2,4.7,1.4 | ||
6.4,3.2,4.5,1.5 | ||
""" | ||
TRAIN_DATASET = """ | ||
2,4.9,3.0,1.4,0.2 | ||
0,7.3,2.9,6.3,1.8 | ||
2,5.1,3.5,1.4,0.2 | ||
0,4.9,2.5,4.5,1.7 | ||
""" | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unrelated changes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rebased branch
a4358f0
to
a78928b
Compare
7f3b301
to
2516258
Compare
Co-authored-by: Tzu-ping Chung <[email protected]>
2516258
to
06c4d87
Compare
Custom XCOM to upload xcom values to the GCS bucket for the given name.
Adding ENV
AIRFLOW__CORE__XCOM_BACKEND: astronomer.providers.google.cloud.xcom_backends.gcs.GCSXComBackend
in thedocker-compose.yaml
or where the airflow is running will enable the custom XCOM for GCS.closes #596