Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature Request] Support DateTimeSensor-like behavior for FivetranSensor #49

Closed
dwreeves opened this issue Aug 12, 2023 · 4 comments · Fixed by #58
Closed

[Feature Request] Support DateTimeSensor-like behavior for FivetranSensor #49

dwreeves opened this issue Aug 12, 2023 · 4 comments · Fixed by #58
Assignees
Milestone

Comments

@dwreeves
Copy link
Contributor

dwreeves commented Aug 12, 2023

Overview of existing FivetranSensor behavior

The FivetranSensor does the following:

  • At start, retrieves the last completed time of a Fivetran sync job
  • Waits around for a new completed time of a Fivetran sync job

I feel that this control flow is not what a majority of users will find optimal for a couple reasons.

First, any user triggering Fivetran via the FivetranOperator should probably prefer to use an ExternalSensorOperator over the FivetranSensor. This means the FivetranSensor is at its best when waiting on jobs scheduled by Fivetran automatically, rather than jobs scheduled by Airflow (and thus scheduled in Fivetran "manually").

For that reason, all examples and issues described below assume the user's system is that they are using the Sensor to wait on Fivetran jobs scheduled by Fivetran rather than jobs scheduled by Airflow. Many of the issues I will be describing still apply when scheduling Fivetran jobs via Airflow (and see my Implementation proposal section where I discuss a feature that would make sense to be added to FivetranOperator), but these issues are more pronounced and easier to understand when thinking about the Fivetran scheduler interoperating with the Airflow scheduler.

The main issue: backfilling (with 2 examples)

The main issue is that when waiting on Fivetran scheduled jobs, a DAG that is being backfilled does not necessarily need to wait for a new Fivetran job.

Scenario A (Last fail > Last success)

Imagine the following situation (Scenario A):

  • Today is January 15th
  • The last successful Fivetran sync was January 10th
  • The last unsuccessful Fivetran sync was January 12th
  • We are backfilling an Airflow dag with a @daily schedule starting January 1st

The user wants the following behavior to occur:

  • Successes up through January 10th because the data has been succesfully synced as of those dates.
  • Failures on Jan 11th and 12th because a sync was attempted, but the job failed.
  • The ability to select between one of two control flow behaviors:
    • (1) Waiting on Jan 14th and 15th because the system has not received a completed status since the Jan 12th failure.
    • (2) Failures on Jan 14th and 15th because previous Fivetran sensor tasks failed, and they don't want their DAG to have any potential gaps that come out of a temporal inconsistency of when the jobs were run. (The user can also do depends_on_past=True, but there is a subtle difference between that and this.)

This will allow the user to backfill data up to January 10th, at which point the DAG will fail:

Su Mo Tu We Th Fr Sa

The user in this case implements the FivetranSensor, and instead the way their DAG works is that it waits for the next Fivetran job to trigger, even though the backfilled jobs don't need to wait around for anything.

For this small backfill and with Fivetran jobs that aren't failing, this isn't a huge deal, but when backfilling years of data in daily chunks, this can unnecessarily slow down a backfill a lot. Imagine you sync your Fivetran data once a day, and your Airflow DAG's max active DagRuns is 10. In this case, implementing a FivetranSensor is bottlenecking your backfill by 10 DagRuns a day, and the only way around it is to implement complex control flow logic (e.g. BranchDateTimeOperator), or to implement a custom implementation, or to implement the backfill as a separate DAG.

Scenario B (Last success > Last time)

Scenario A was designed to see the full range of behaviors, but a far more typical scenario is that a last success occurred much more recently in the future than a last failure.

Imagine the following situation (Scenario B):

  • Today is January 15th
  • The last successful Fivetran sync was January 10th
  • The last unsuccessful Fivetran sync was January 2th
  • We are backfilling an Airflow dag with a @daily schedule starting January 1st

The user wants the following behavior to occur:

  • Successes up through January 10th because the data has been succesfully synced as of those dates.
  • Waiting between Jan 11th to Jan 15th because there are no further system updates since then.

The desired behavior when last fail time > last success time was up to interpretation, but here it is a little more straightforward that we should just be waiting.

Su Mo Tu We Th Fr Sa

Other issues

Fault tolerance

The FivetranSensor is not fault tolerant in the sense that, if the FivetranSensor is restarted, the new instance of the sensor may end up waiting on a different datetime than the previous instance.

For example, imagine a user has some sort of error that causes the task to fail (doesn't need to even come from the sensor itself; it could be e.g. an OOM error on the worker running the job). If a Fivetran jobs completes between the time the job failed and was resumed, the sensor will now be waiting on a different Fivetran job to complete.

Race condition (into cascading failures)

Imagine a user syncs the Fivetran job once every hour on the hour, and their Airflow DAG also syncs once every hour on the hour. One of the tasks within the DAG is a FivetranSensor.

Imagine the Fivetran job typically takes 55 seconds after the hour to run. This means that if the FivetranSensor is not executed by 00:00:55, then the sensor will end up waiting a whole hour to run, i.e. it completes at around 01:00:55.

It gets worse from here. Imagine the whole DAG is configured with default_args={"depends_on_past": True}. The FivetranSensor with execution date 00:00:00 that got stuck for an hour will end up blocking the next Fivetran sensor task from getting scheduled, meaning that the FivetranSensor with execution date 01:00:00 won't finish until 02:00:55. This is a cascading failure!

Implementation proposal

FivetranDateTimeSensor

The core functionality I am proposing is a FivetranDateTimeSensor.

  • For backwards compatibility reasons, this should be a separate sensor and should not replace the existing FivetranSensor.
  • Please provide feedback on the name, as I don't know what it should be called. I do believe this is a reasonable name because the behavior of this Sensor is very similar to how the DateTimeSensor works, which is that it waits for a time to pass but passes immediately on backfills.

The majority of the functionality can just become a new method in the hook.

Additional control flow kwargs:

  • target_time: datetime | str = "{{ data_interval_end }}". (Note: this is a templated field). This kwarg name comes directly from DateTimeSensor, albeit here it is optional. This is the timestamp that the Fivetran completed time is compared to.
  • propagate_failures_forward: bool = True - The behavior of this should be: when this flag is True, it makes it so the sensor fails when context["data_interval_end"] > fivetran_failed_at > fivetran_succeeded_at. If the flag is False, then instead it will wait around until there is a completed at time.
    • By default this should be True because (1) Fivetran shows the status of a connector as its latest status (2) it is more likely in practice to be a good "better safe than sorry" option for people.
  • always_wait_when_syncing: bool = False - Fivetran syncs in chunks and this can cause issues when reading from a database currently being synced in some situations. Imagine for example a transform job that "increments" using select max(write_time) from tbl. Fivetran writes in chunks not necessarily ordered by write_time, meaning doing stuff while Fivetran is syncing can cause you to skip data. (This is not the best example because you probably wouldn't have a backfill job touching the most recent write_time data, but this can still happen in other contexts).
    • By default this should be False because (1) it is not an obvious behavior / it is an added layer of complexity over the default and simple advertised behavior of the sensor (2) it is not typical or ideal that data being backfilled should be impacted by most recent syncs.

I see it as uncommon in most Airflow provider packages to create large inheritance trees, so it is reasonable to simply implement all of this as a subclass of BaseSensor and just allow the FivetranHook abstraction to do most of the heavy lifting.

Additional kwarg for FivetranOperator

On the topic of supporting backfills in a sensible manner, the FivetranOperator should also have a kwarg that skips running the Fivetran job when context["data_interval_end"] > fivetran_succeeded_at.

I'm not sure what a good name for this kwarg may be. skip_if_succeeded_after_data_interval_end is a little on the verbose side, but is an accurate description. I'd love to hear if anyone has ideas for a snappier name though.

For backwards compatibility, it should be introduced as having a default of False, albeit I do believe this would be a good default in a future release.

Other notes

  • I already have a version of this implemented in a production system, and am willing to implement a version of this functionality in this open source library.

  • Regardless of what happens, the documentation for the existing FivetranSensor should be more clear about what is happening.

  • EDIT: I realize I should be using the data_interval_end and not the logical_date. Logical date would be sensible if we knew when Fivetran last syncs start, but we do not have that info available; we only know when a sync ends.

@phanikumv
Copy link
Contributor

Thank you @dwreeves for providing detailed requirements. Appreciate it.
Would you like to create a PR for this proposal?

@dwreeves
Copy link
Contributor Author

@phanikumv Yeah, I can get to it in the next couple days.

@dwreeves
Copy link
Contributor Author

dwreeves commented Sep 3, 2023

Hold on a second... 🤔❗

It looks like the xcom parameter sort of does the thing I want it to do.... Only sort of.

First of all, the name xcom is highly misleading. The timestamp doesn't necessarily need to come from xcom; it just comes from Jinja templating. For example, {{ data_interval_end }} would be a perfectly valid input that doesn't use xcom. I would propose renaming this to target_time (in line with the convention of DateTimeSensor) or target_completed_time.

Second, the behavior and feature is not very well documented, and could use a description or example in the README.


Still, there are differences between what I am proposing and what currently exists. With FivetranSensor(..., xcom="{{ data_interval_end }}"), it will always fail so long as there is any failure after "{{ data_interval_end }}", even if the latest sync was a success.

For example, imagine:

  • Today is January 15th
  • The last successful Fivetran sync was January 12th
  • The last unsuccessful Fivetran sync was January 10th
  • We are backfilling an Airflow dag with a @daily schedule starting January 1st

In this case, the DAG will look like this:

Su Mo Tu We Th Fr Sa

Whereas the behavior we would like is:

Su Mo Tu We Th Fr Sa

Obviously this is a very different behavior than what we would want when using xcom="{{ data_interval_end }}". And in my opinion, it is not sensible behavior-- at least in the context of using a "target time" in the sense of how DateTimeSensor works.

The reason for this is the order of operations is:

  1. If failed_at > previous_completed_at, then fail.
  2. If sync_state == "rescheduled" and connector_details["schedule_type"] == "manual", then wait.
  3. If max([failed_at, succeeded_at]) > previous_completed_at, then pass.
  4. Else, wait.

Note that because max([failed_at, succeeded_at]) > target_completed_time is run after failed_at > target_completed_time is already shown to be not true, then the third condition can be reduced to simply checking succeeded_at > target_completed_time. Also, I am renaming previous_completed_at to target_completed_time. So I restate the existing control flow equivalent as follows:

Existing flow:

  1. If failed_at > target_completed_time, then fail.
  2. If sync_state == "rescheduled" and schedule_type == "manual", then restart and wait.
  3. If succeeded_at > target_completed_time, then pass.
  4. Else, wait.

And so the failures for all the previous dates occur because latest states being in failure always take precedence over any successes. Essentially, the existing control flow essentially treats failed_at > target_completed_time > succeeded_at and failed_at > target_completed_time > succeeded_at the same way and fails both.

Whereas the order of operations I am proposing is:

Proposed flow:

There is no way to emulate the exact behavior to my proposal in the original post, unless I add another kwarg, something like "always_fail_if_latest_status_is_failed". This gives us the following flow:

  1. If always_wait_when_syncing and sync_state == "syncing", then wait.
  2. If failed_at > target_completed_time and always_fail_if_latest_status_is_failed, then fail.
  3. If succeeded_at > target_completed_time, then pass.
  4. If sync_state == "rescheduled" and schedule_type == "manual", then restart and wait.
  5. If failed_at > target_completed_time > succeeded_at, then fail.
  6. If failed_at > succeeded_at > target_completed_time and propagate_failures_forward, then fail.
  7. Else, wait.

Note that if you do the following:

  • always_fail_if_latest_status_is_failed=True
  • always_wait_when_syncing=False

Then this is almost perfectly a refactor, with the exception that the "restart and wait" step is flipped with the "if succeeded_at > target_completed_time then pass" step.

In practice I am not convinced it is a big deal to flip these for the existing FivetranSensor. If you just use the FivetranSensor completely unadorned, then the target_completed_time is equal to the previous last sync time, and if the sensor is ever "rescheduled" between the last run and next run, then the next run has to have not happened.

However, flipping these does matter for the behavior I am proposing because if the connector is rescheduled but has still succeeded historically, the user should be able to pass the backfill up to the latest success.


I am bringing all of this up because it begs the question of what should be done. Essentially my proposal right now could be implemented into FivetranSensor by adding 3 additional kwargs to it (always_wait_when_syncing, always_fail_if_latest_status_is_failed, and propagate_failures_forward). But that seems very complex. I am going to sit on this a bit and get back.

In any event, I would recommend renaming the xcom variable to target_completed_time, and parse it the same way that DateTimeSensor parses datetimes. (Meaning, allow for not only str inputs but also datetime inputs.) This would make it clearer to users that the value doesn't technically need to come from xcom to be a valid input, and would make the API of the FivetranSensor more general, and more similar to other Airflow APIs.

Renaming xcom is still not sufficient on its own, I just think it is a start as I think through the full API...

@dwreeves
Copy link
Contributor Author

dwreeves commented Sep 3, 2023

Alright, I thought about it a bit... you can mostly ignore my word vomit above.

I think the biggest question I have is: Is there a downside with the current design to swapping from (essentially) this...

if failed_at > target_completed_time:
    raise AirflowException
elif succeeded_at > target_completed_time:
    return True
else:
    return False

... to (essentially) this?

if succeeded_at > target_completed_time:
    return True
elif failed_at > target_completed_time:
    raise AirflowException
else:
    return False

I think for 99% of users, the answer should be essentially that there is no difference. The reason why is because if the timestamp is being set based on the last completed time, then it is unlikely that you ever run into a situation where there is both a succeeded_at and failed_at which are greater than the target_completed_time.

So, it should be safe to switch this logic. And this change in the logic would allow for using the xcom kwarg (rename pending...) to set it to {{ data_interval_end }} and have that pass all backfill DAG runs up to the latest success date.

So overall, I think this change in logic would be extremely beneficial, and would allow the FivetranSensor to work for backfill DAG runs with minimal extra complexity added, and it should not have any real impact on existing users. (This repo only has 12 stars for now, anyway, and probably isn't mass adopted yet).

What do you think? I will be putting this logic into a PR for further consideration.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants