Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

API Changes to FivetranSensor for supporting backfills #58

Merged
merged 10 commits into from
Nov 6, 2023

Conversation

dwreeves
Copy link
Contributor

@dwreeves dwreeves commented Sep 4, 2023

Resolves #49

Changes

FivetranOperator

High level API:

  • Deprecate execution_timeout kwarg. This isn't even being used. It is pretty straightforward to remove this.
  • (Breaking) Make default behavior when deferrable=False to wait for Fivetran sync to complete.
    • Right now there is a fundamental divergence between how the deferrable and non-deferrable modes work.
    • I would argue that:
      • (1) Typically in Airflow, the deferrable param is a parameter that defines the runtime of the operator (i.e. in normal worker nodes vs a triggerer instance), not the fundamental behavior of the operator. For that reason, the deferrable=? kwarg should not impact the behavior as dramatically as it currently does.
      • (2) Adding code to make it wait synchronously is not tricky.
      • (3) The existing behavior when deferrable=False is significantly worse than waiting, as it leads to the FivetranSensor being necessary to await the FivetranOperator, whereas the FivetranSensor would be more interesting if it were a choice rather than a strict necessity. I updated the README.md to clarify what the FivetranSensor can/should be used for going forward.
  • Add wait_for_completion kwarg. (I copied this kwarg name from EcsRunTaskOperator for whatever that is worth.) This combined with deferrable splits what deferrable does into two separate options (waiting for Fivetran vs running on Triggerer instance) rather than having them combined into one option.

FivetranSensor

I start by describing the changes to FivetranOperator because understanding these changes paves the way to understanding why FivetranSensor also changes.

It appears right now that FivetranSensor is designed just to await a FivetranOperator(deferrable=False), out of necessity due to lack of functionality when non-deferrable.

However, a much more relevant use case for the FivetranSensor is when Fivetran is running on its own schedule, and we want to await a sync scheduled outside of Airflow. The main motivation in designing these API changes is to support that use case. By making FivetranOperator always wait by default even when deferrable=False, then FivetranSensor's primary purpose can focus on more interesting use cases.

High level API:

  • Replace xcom with completed_after_time. xcom is a misleading name. The value does not need to necessarily come from xcom. This kwarg was designed originally to be used in the case of FivetranOperator >> FivetranSensor, with the intent that the FivetranSensor was always awaiting a manually scheduled job via the completed at time passed through xcom.
    • I originally wanted to name this target_completed_time or something similar to emulate the DateTimeSensor API. However, this is not a very descriptive name, and it feels especially confusing when using the old xcom_pull() pattern (target_completed_time="{{ task_instance.xcom_pull('fivetran-operator', key='return_value') }}" looks strange and could be interpreted as "targetting" the last sync value. But in that case it would be unclear why this pattern even works!) I found this name to be a lot clearer.
  • Add always_wait_when_syncing kwarg. So the operator never passes while Fivetran is syncing when this is true.
    • Setting this as a default True won't matter for most users.
    • This is at its most useful for backfills. Fivetran syncs are not atomic and it's possible for table A to be updated while table B is not, or alternatively some rows in table A are written while others are still being written. So running downstream operations while Fivetran is syncing can be a little dangerous, and this operates as an imperfect* but reasonable check to see if the coast is clear.
  • Added propagate_failures_forward kwarg.
    • This makes it so target_completed_time > failed_at > succeeded_at raises an error. By default this is False, matching the existing API.
    • A justification why you may want this to be true is because a fail > success indicates an issue that may eventually impact future syncs.

(* Imperfect because for a long running DAG, or for concurrent task runs, Fivetran can just resync again.)

FivetranHook

High level API

  • Add is_synced_after_target_time() and is_synced_after_target_time_async() to replace and get_sync_status_async() (which are deprecated).
    • It was too risky to replace the entire API with something slightly new, so instead I just created a new method name.
    • This name is also a little more descriptive. "get sync status" implies that the value being returned is some sort of enum representing, but in the sync hook implementation it is just a boolean. In reality, it is a mix-- get_sync_status_async returns a string but get_sync_status returns boolean. Here the name is_synced_after_target_time() implies a boolean return value, which is what you get.
  • **Refactor connector_details parsing logic so that it is in _determine_if_synced_from_connector_details(). This means that both the sync and async operator use the same logic, and it gets to be DRY.
  • Check success before fail in get last sync. I explain why this is a beneficial change here: [Feature Request] Support DateTimeSensor-like behavior for FivetranSensor #49 (comment)

Misc.

I did my best to keep the API in tact, which you can see by the fact that there are very few changes in tests/.

The main test change I made was so that "succeeded_at > failed_at > completed_after_time" returns a success rather than a failure.

@dwreeves
Copy link
Contributor Author

dwreeves commented Sep 7, 2023

We have run these code changes in our company's Airflow deployment like this:

FivetranSensor(
    task_id=...,
    connector_id=...,
    completed_after_time="{{ data_interval_end + macros.timedelta(minutes=5) }}"
)

And we confirmed it works as intended in these two cases:

  • When the completed_after_time is long in the past and Fivetran has already run much more recently (so completed_at > completed_after_time), it doesn't spend any time waiting around.
  • When the completed_after_time is near the present and a completed_at hasn't yet occurred greater than the completed_after_time, it does end up waiting around.

@dwreeves dwreeves changed the title [Draft] API Changes to FivetranSensor for supporting backfills API Changes to FivetranSensor for supporting backfills Sep 12, 2023
@dwreeves
Copy link
Contributor Author

dwreeves commented Sep 30, 2023

Will anyone be able to review this? @phanikumv @pankajastro @sunank200.

I know these are big changes, but I think it's worth looking at them. Overall I think these changes really elevate the package to the next level, allowing folks to really customize their experience with Fivetran connection into Airflow. I've been using Airflow for multiple years for data engineering and am an OSS contributor to Airflow. I consider these changes to make for a more idiomatic experience, and also addresses many of the sort of real-world nuances that you want to have control over when designing data pipelines that can guarantee data integrity.

Other reasons why I think this should get reviewed:

  • We have been using this for the past few weeks at my company, and it works great!
    • I did learn the hard way that always_wait_when_syncing should be defaulted to False. 😬 It's a very useful control flow feature in many circumstnaces, but it's not always safe. There are pros and cons to turning it on or off, but I think "off" is a better default.
  • It is nearly fully backwards compatible (with deprecation warnings). The only exception is there is a slight change in behavior when deferrable=False for the FivetranOperator (although, I think this change in behavior is extremely sensible).
  • I think all the API changes are totally defensible and make for a better experience.
  • It's better to get these sort of changes in earlier than later, while the package has fewer dependents.

Let me know if there is anything you need from me to assist in reviewing the changes.

Thank you for all your work on this!

@phanikumv
Copy link
Contributor

Will anyone be able to review this? @phanikumv @pankajastro @sunank200.

I know these are big changes, but I think it's worth looking at them. Overall I think these changes really elevate the package to the next level, allowing folks to really customize their experience with Fivetran connection into Airflow. I've been using Airflow for multiple years for data engineering and am an OSS contributor to Airflow. I consider these changes to make for a more idiomatic experience, and also addresses many of the sort of real-world nuances that you want to have control over when designing data pipelines that can guarantee data integrity.

Other reasons why I think this should get reviewed:

  • We have been using this for the past few weeks at my company, and it works great!

    • I did learn the hard way that always_wait_when_syncing should be defaulted to False. 😬 It's a very useful control flow feature in many circumstnaces, but it's not always safe. There are pros and cons to turning it on or off, but I think "off" is a better default.
  • It is nearly fully backwards compatible (with deprecation warnings). The only exception is there is a slight change in behavior when deferrable=False for the FivetranOperator (although, I think this change in behavior is extremely sensible).

  • I think all the API changes are totally defensible and make for a better experience.

  • It's better to get these sort of changes in earlier than later, while the package has fewer dependents.

Let me know if there is anything you need from me to assist in reviewing the changes.

Thank you for all your work on this!

Sure @dwreeves we will review it.Thank you so much for your contributions

Copy link
Collaborator

@Lee-W Lee-W left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Left a minor nitpick

fivetran_provider_async/operators.py Outdated Show resolved Hide resolved
Copy link
Contributor

@pankajastro pankajastro left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

fivetran_provider_async/hooks.py Outdated Show resolved Hide resolved
@dwreeves
Copy link
Contributor Author

dwreeves commented Oct 10, 2023

Thanks! I will change tonight.

@dwreeves
Copy link
Contributor Author

All requested changes made.

@kaxil
Copy link
Contributor

kaxil commented Oct 16, 2023

Thanks @dwreeves , someone will review it tomorrow

Copy link
Collaborator

@Lee-W Lee-W left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

left a few minor suggestions but overall it looks good to me. @dwreeves thanks for your contribution!

fivetran_provider_async/hooks.py Outdated Show resolved Hide resolved
fivetran_provider_async/operators.py Outdated Show resolved Hide resolved
if not self.completed_after_time:
self._completed_after_time_rendered = self.hook.get_last_sync(self.connector_id)
else:
self._completed_after_time_rendered = timezone.parse(self.completed_after_time)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did a quick run on the old example but it failed here
Example: https://github.com/astronomer/airflow-provider-fivetran-async/blob/main/fivetran_provider_async/example_dags/example_fivetran_async.py#L33
Log

Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/pendulum/parsing/__init__.py", line 131, in _parse
    dt = parser.parse(
         ^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/dateutil/parser/_parser.py", line 1368, in parse
    return DEFAULTPARSER.parse(timestr, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/dateutil/parser/_parser.py", line 643, in parse
    raise ParserError("Unknown string format: %s", timestr)
dateutil.parser._parser.ParserError: Unknown string format: None
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "/usr/local/airflow/airflow_provider_fivetran_async/fivetran_provider_async/sensors.py", line 149, in execute
    elif not self.poke(context):
             ^^^^^^^^^^^^^^^^^^
  File "/usr/local/airflow/airflow_provider_fivetran_async/fivetran_provider_async/sensors.py", line 200, in poke
    self._completed_after_time_rendered = timezone.parse(self.completed_after_time)
                                          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/airflow/utils/timezone.py", line 207, in parse
    return pendulum.parse(string, tz=timezone or TIMEZONE, strict=strict)  # type: ignore
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/pendulum/parser.py", line 29, in parse
    return _parse(text, **options)
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/pendulum/parser.py", line 45, in _parse
    parsed = base_parse(text, **options)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/pendulum/parsing/__init__.py", line 74, in parse
    return _normalize(_parse(text, **_options), **_options)
                      ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/pendulum/parsing/__init__.py", line 135, in _parse
    raise ParserError("Invalid date string: {}".format(text))
pendulum.parsing.exceptions.ParserError: Invalid date string: None
[2023-10-17, 07:27:15 UTC] {taskinstance.py:1398} INFO - Marking task as FAILED. dag_id=example_fivetran_async, task_id=fivetran_async_sensor, execution_date=20231017T072549, start_date=20231017T072715, end_date=20231017T072715
[2023-10-17, 07:27:15 UTC] {standard_task_runner.py:104} ERROR - Failed to execute job 414 for task fivetran_async_sensor (Invalid date string: None; 324)
[2023-10-17, 07:27:15 UTC] {local_task_job_runner.py:228} INFO - Task exited with return code 1
[2023-10-17, 07:27:15 UTC] {taskinstance.py:2776} INFO - 0 downstream tasks scheduled from follow-on schedule check

Is this expected?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for delays, I'd had a very busy last few weeks.

I started a new job recently and don't have Airflow + Fivetran set up just yet (will be setting it up soon).

That said, I think I can see what happened here.


The error message shows the following:

Invalid date string: None

So it looks like what happened is the FivetranOperator returned None. Looking at the code, we see (simplified):

    def execute(self, context: Context) -> None | str:
        ...

        if not self.wait_for_completion:
            return last_sync

        if not self.deferrable:
            self._wait_synchronously(pendulum.parse(last_sync))  # type: ignore[arg-type]
            return None
        else:
            ...
            return None

So it only returns last_sync (str) when when wait_for_completion is False. This makes sense, since in this case you'd be implementing a sort of async-await pattern with operator+sensor and the timestamp is used as the "future" so to speak.

The issue therefore is that the example DAG needs to show wait_for_completion=False.


Reading these examples, I actually found them a little confusing.

I now wrote them a little bit differently to clarify the new behaviors:

  • example_fivetran.py shows a sync example. It is unnecessary to use a sensor now by default, so I removed the sensor.
  • example_fivetran_async.py also does not need to use the sensor, since the FivetranOperator awaits the Fivetran sync (both in sync and async moe).
  • example_fivetran_xcom.py needed wait_for_completion=False to be set for that example to make any sense, which I didn't have.

That said, there are still some possible issues with the Bigquery and Bqml examples? Since they both uses the unnecessary FivetranSensor. (It is unnecessary because the FivetranOperator does the job of waiting). Maybe we remove the FivetranSensors from there too?

We can also potentially provide an example of when Fivetran is being externally scheduled and the {{ data_interval_end }} is being used as the completed time?

@dwreeves
Copy link
Contributor Author

dwreeves commented Oct 23, 2023

Sorry all, I have been on vacation and essentially offline the past week. Will get to this soon!

@dwreeves dwreeves mentioned this pull request Nov 2, 2023
@phanikumv phanikumv merged commit 9fa930d into astronomer:main Nov 6, 2023
6 checks passed
@dwreeves dwreeves deleted the api-changes branch November 30, 2023 01:29
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Feature Request] Support DateTimeSensor-like behavior for FivetranSensor
6 participants