-
Notifications
You must be signed in to change notification settings - Fork 7
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Feature Request] Support DateTimeSensor-like behavior for FivetranSensor #49
Comments
Thank you @dwreeves for providing detailed requirements. Appreciate it. |
@phanikumv Yeah, I can get to it in the next couple days. |
Hold on a second... 🤔❗ It looks like the First of all, the name Second, the behavior and feature is not very well documented, and could use a description or example in the Still, there are differences between what I am proposing and what currently exists. With For example, imagine:
In this case, the DAG will look like this:
Whereas the behavior we would like is:
Obviously this is a very different behavior than what we would want when using The reason for this is the order of operations is:
Note that because Existing flow:
And so the failures for all the previous dates occur because latest states being in failure always take precedence over any successes. Essentially, the existing control flow essentially treats Whereas the order of operations I am proposing is: Proposed flow: There is no way to emulate the exact behavior to my proposal in the original post, unless I add another kwarg, something like "
Note that if you do the following:
Then this is almost perfectly a refactor, with the exception that the "restart and wait" step is flipped with the "if In practice I am not convinced it is a big deal to flip these for the existing However, flipping these does matter for the behavior I am proposing because if the connector is rescheduled but has still succeeded historically, the user should be able to pass the backfill up to the latest success. I am bringing all of this up because it begs the question of what should be done. Essentially my proposal right now could be implemented into In any event, I would recommend renaming the Renaming |
Alright, I thought about it a bit... you can mostly ignore my word vomit above. I think the biggest question I have is: Is there a downside with the current design to swapping from (essentially) this... if failed_at > target_completed_time:
raise AirflowException
elif succeeded_at > target_completed_time:
return True
else:
return False ... to (essentially) this? if succeeded_at > target_completed_time:
return True
elif failed_at > target_completed_time:
raise AirflowException
else:
return False I think for 99% of users, the answer should be essentially that there is no difference. The reason why is because if the timestamp is being set based on the last completed time, then it is unlikely that you ever run into a situation where there is both a So, it should be safe to switch this logic. And this change in the logic would allow for using the So overall, I think this change in logic would be extremely beneficial, and would allow the What do you think? I will be putting this logic into a PR for further consideration. |
Overview of existing
FivetranSensor
behaviorThe FivetranSensor does the following:
I feel that this control flow is not what a majority of users will find optimal for a couple reasons.
First, any user triggering Fivetran via the
FivetranOperator
should probably prefer to use anExternalSensorOperator
over theFivetranSensor
. This means theFivetranSensor
is at its best when waiting on jobs scheduled by Fivetran automatically, rather than jobs scheduled by Airflow (and thus scheduled in Fivetran "manually").For that reason, all examples and issues described below assume the user's system is that they are using the Sensor to wait on Fivetran jobs scheduled by Fivetran rather than jobs scheduled by Airflow. Many of the issues I will be describing still apply when scheduling Fivetran jobs via Airflow (and see my Implementation proposal section where I discuss a feature that would make sense to be added to
FivetranOperator
), but these issues are more pronounced and easier to understand when thinking about the Fivetran scheduler interoperating with the Airflow scheduler.The main issue: backfilling (with 2 examples)
The main issue is that when waiting on Fivetran scheduled jobs, a DAG that is being backfilled does not necessarily need to wait for a new Fivetran job.
Scenario A (Last fail > Last success)
Imagine the following situation (Scenario A):
@daily
schedule starting January 1stThe user wants the following behavior to occur:
depends_on_past=True
, but there is a subtle difference between that and this.)This will allow the user to backfill data up to January 10th, at which point the DAG will fail:
The user in this case implements the
FivetranSensor
, and instead the way their DAG works is that it waits for the next Fivetran job to trigger, even though the backfilled jobs don't need to wait around for anything.For this small backfill and with Fivetran jobs that aren't failing, this isn't a huge deal, but when backfilling years of data in daily chunks, this can unnecessarily slow down a backfill a lot. Imagine you sync your Fivetran data once a day, and your Airflow DAG's max active DagRuns is 10. In this case, implementing a FivetranSensor is bottlenecking your backfill by 10 DagRuns a day, and the only way around it is to implement complex control flow logic (e.g. BranchDateTimeOperator), or to implement a custom implementation, or to implement the backfill as a separate DAG.
Scenario B (Last success > Last time)
Scenario A was designed to see the full range of behaviors, but a far more typical scenario is that a last success occurred much more recently in the future than a last failure.
Imagine the following situation (Scenario B):
@daily
schedule starting January 1stThe user wants the following behavior to occur:
The desired behavior when last fail time > last success time was up to interpretation, but here it is a little more straightforward that we should just be waiting.
Other issues
Fault tolerance
The
FivetranSensor
is not fault tolerant in the sense that, if the FivetranSensor is restarted, the new instance of the sensor may end up waiting on a different datetime than the previous instance.For example, imagine a user has some sort of error that causes the task to fail (doesn't need to even come from the sensor itself; it could be e.g. an OOM error on the worker running the job). If a Fivetran jobs completes between the time the job failed and was resumed, the sensor will now be waiting on a different Fivetran job to complete.
Race condition (into cascading failures)
Imagine a user syncs the Fivetran job once every hour on the hour, and their Airflow DAG also syncs once every hour on the hour. One of the tasks within the DAG is a
FivetranSensor
.Imagine the Fivetran job typically takes 55 seconds after the hour to run. This means that if the
FivetranSensor
is not executed by00:00:55
, then the sensor will end up waiting a whole hour to run, i.e. it completes at around01:00:55
.It gets worse from here. Imagine the whole DAG is configured with
default_args={"depends_on_past": True}
. TheFivetranSensor
with execution date00:00:00
that got stuck for an hour will end up blocking the next Fivetran sensor task from getting scheduled, meaning that theFivetranSensor
with execution date01:00:00
won't finish until02:00:55
. This is a cascading failure!Implementation proposal
FivetranDateTimeSensor
The core functionality I am proposing is a
FivetranDateTimeSensor
.FivetranSensor
.DateTimeSensor
works, which is that it waits for a time to pass but passes immediately on backfills.The majority of the functionality can just become a new method in the hook.
Additional control flow kwargs:
target_time: datetime | str = "{{ data_interval_end }}"
. (Note: this is a templated field). This kwarg name comes directly fromDateTimeSensor
, albeit here it is optional. This is the timestamp that the Fivetran completed time is compared to.propagate_failures_forward: bool = True
- The behavior of this should be: when this flag is True, it makes it so the sensor fails whencontext["data_interval_end"] > fivetran_failed_at > fivetran_succeeded_at
. If the flag is False, then instead it will wait around until there is a completed at time.always_wait_when_syncing: bool = False
- Fivetran syncs in chunks and this can cause issues when reading from a database currently being synced in some situations. Imagine for example a transform job that "increments" usingselect max(write_time) from tbl
. Fivetran writes in chunks not necessarily ordered bywrite_time
, meaning doing stuff while Fivetran is syncing can cause you to skip data. (This is not the best example because you probably wouldn't have a backfill job touching the most recentwrite_time
data, but this can still happen in other contexts).I see it as uncommon in most Airflow provider packages to create large inheritance trees, so it is reasonable to simply implement all of this as a subclass of BaseSensor and just allow the
FivetranHook
abstraction to do most of the heavy lifting.Additional kwarg for
FivetranOperator
On the topic of supporting backfills in a sensible manner, the
FivetranOperator
should also have a kwarg that skips running the Fivetran job whencontext["data_interval_end"] > fivetran_succeeded_at
.I'm not sure what a good name for this kwarg may be.
skip_if_succeeded_after_data_interval_end
is a little on the verbose side, but is an accurate description. I'd love to hear if anyone has ideas for a snappier name though.For backwards compatibility, it should be introduced as having a default of
False
, albeit I do believe this would be a good default in a future release.Other notes
I already have a version of this implemented in a production system, and am willing to implement a version of this functionality in this open source library.
Regardless of what happens, the documentation for the existing
FivetranSensor
should be more clear about what is happening.EDIT: I realize I should be using the
data_interval_end
and not thelogical_date
. Logical date would be sensible if we knew when Fivetran last syncs start, but we do not have that info available; we only know when a sync ends.The text was updated successfully, but these errors were encountered: