diff --git a/README.md b/README.md index af315f0..46e482a 100644 --- a/README.md +++ b/README.md @@ -33,32 +33,72 @@ The sensor assumes the `Conn Id` is set to `fivetran`, however if you are managi ### [Fivetran Operator Async](https://github.com/astronomer/airflow-provider-fivetran-async/tree/main/fivetran_provider_async/operators.py) -`FivetranOperator` submits a Fivetran sync job and monitors it on trigger for completion. -It requires that you specify the `connector_id` of the sync job to start. You can find `connector_id` in the Settings page of the connector you configured in the [Fivetran dashboard](https://fivetran.com/dashboard/connectors). - -Import into your DAG via: ```python from fivetran_provider_async.operators import FivetranOperator ``` +`FivetranOperator` submits a Fivetran sync job and monitors it on trigger for completion. + +`FivetranOperator` requires that you specify the `connector_id` of the Fivetran connector you wish to trigger. You can find `connector_id` in the Settings page of the connector you configured in the [Fivetran dashboard](https://fivetran.com/dashboard/connectors). + +The `FivetranOperator` will wait for the sync to complete so long as `wait_for_completion=True` (this is the default). It is recommended that +you run in deferrable mode (this is also the default). If `wait_for_completion=False`, the operator will return the timestamp for the last sync. + +Import into your DAG via: + ### [Fivetran Sensor Async](https://github.com/astronomer/airflow-provider-fivetran-async/tree/main/fivetran_provider_async/sensors.py) +```python +from fivetran_provider_async.sensors import FivetranSensor +``` + `FivetranSensor` monitors a Fivetran sync job for completion. Monitoring with `FivetranSensor` allows you to trigger downstream processes only when the Fivetran sync jobs have completed, ensuring data consistency. - +`FivetranSensor` requires that you specify the `connector_id` of the Fivetran connector you want to wait for. You can find `connector_id` in the Settings page of the connector you configured in the [Fivetran dashboard](https://fivetran.com/dashboard/connectors). You can use multiple instances of `FivetranSensor` to monitor multiple Fivetran connectors. -If used in this way, +`FivetranSensor` is most commonly useful in two scenarios: -`FivetranSensor` requires that you specify the `connector_id` of the sync job to start. You can find `connector_id` in the Settings page of the connector you configured in the [Fivetran dashboard](https://fivetran.com/dashboard/connectors). +1. Fivetran is using a separate scheduler than the Airflow scheduler. +2. You set `wait_for_completion=False` in the `FivetranOperator`, and you need to await the `FivetranOperator` task later. (You may want to do this if you want to arrange your DAG such that some tasks are dependent on _starting_ a sync and other tasks are dependent on _completing_ a sync). + +If you are doing the 1st pattern, you may find it useful to set the `completed_after_time` to `data_interval_end`, or `data_interval_end` with some buffer: -Import into your DAG via: ```python -from fivetran_provider_async.sensors import FivetranSensor +fivetran_sensor = FivetranSensor( + task_id="wait_for_fivetran_externally_scheduled_sync", + connector_id="bronzing_largely", + poke_interval=5, + completed_after_time="{{ data_interval_end + macros.timedelta(minutes=1) }}", +) +``` + +If you are doing the 2nd pattern, you can use XComs to pass the target completed time to the sensor: + +```python +fivetran_op = FivetranOperator( + task_id="fivetran_sync_my_db", + connector_id="bronzing_largely", + wait_for_completion=False, +) + +fivetran_sensor = FivetranSensor( + task_id="wait_for_fivetran_db_sync", + connector_id="bronzing_largely", + poke_interval=5, + completed_after_time="{{ task_instance.xcom_pull('fivetran_sync_op', key='return_value') }}", +) + +fivetran_op >> fivetran_sensor ``` +You may also specify the `FivetranSensor` without a `completed_after_time`. +In this case, the sensor will make note of when the last completed time was, and will wait for a new completed time. + +Import into your DAG via: + ## Examples See the [**examples**](https://github.com/astronomer/airflow-provider-fivetran-async/tree/main/fivetran_provider_async/example_dags) directory for an example DAG. diff --git a/fivetran_provider_async/example_dags/example_fivetran.py b/fivetran_provider_async/example_dags/example_fivetran.py index c5dde05..db92157 100644 --- a/fivetran_provider_async/example_dags/example_fivetran.py +++ b/fivetran_provider_async/example_dags/example_fivetran.py @@ -3,7 +3,6 @@ from airflow import DAG from fivetran_provider_async.operators import FivetranOperator -from fivetran_provider_async.sensors import FivetranSensor default_args = { "owner": "Airflow", @@ -19,18 +18,8 @@ with dag: fivetran_sync_start = FivetranOperator( - task_id="fivetran-task", + task_id="fivetran_task", fivetran_conn_id="fivetran_default", connector_id="{{ var.value.connector_id }}", deferrable=False, ) - - fivetran_sync_wait = FivetranSensor( - task_id="fivetran-sensor", - fivetran_conn_id="fivetran_default", - connector_id="{{ var.value.connector_id }}", - poke_interval=5, - deferrable=False, - ) - - fivetran_sync_start >> fivetran_sync_wait diff --git a/fivetran_provider_async/example_dags/example_fivetran_async.py b/fivetran_provider_async/example_dags/example_fivetran_async.py index 4f12b75..3b6f7d9 100644 --- a/fivetran_provider_async/example_dags/example_fivetran_async.py +++ b/fivetran_provider_async/example_dags/example_fivetran_async.py @@ -3,7 +3,6 @@ from airflow import DAG from fivetran_provider_async.operators import FivetranOperator -from fivetran_provider_async.sensors import FivetranSensor default_args = { "owner": "Airflow", @@ -19,22 +18,20 @@ ) with dag: + # Both of these tasks will start a Fivetran sync, + # and will wait for the Fivetran sync to complete before marking + # the task as success. + + # However, the async operator uses the triggerer instance to do this, + # which frees up a worker slot. + fivetran_async_op = FivetranOperator( task_id="fivetran_async_op", connector_id="bronzing_largely", ) fivetran_sync_op = FivetranOperator( - task_id="fivetran_sync_op", - connector_id="bronzing_largely", - deferrable=False, - ) - - fivetran_async_sensor = FivetranSensor( - task_id="fivetran_async_sensor", - connector_id="bronzing_largely", - poke_interval=5, - xcom="{{ task_instance.xcom_pull('fivetran_sync_op', key='return_value') }}", + task_id="fivetran_sync_op", connector_id="bronzing_largely", deferrable=False ) - fivetran_async_op >> fivetran_sync_op >> fivetran_async_sensor + fivetran_async_op >> fivetran_sync_op diff --git a/fivetran_provider_async/example_dags/example_fivetran_dbt.py b/fivetran_provider_async/example_dags/example_fivetran_dbt.py index 4933b17..6660b7f 100644 --- a/fivetran_provider_async/example_dags/example_fivetran_dbt.py +++ b/fivetran_provider_async/example_dags/example_fivetran_dbt.py @@ -4,7 +4,6 @@ from airflow.providers.ssh.operators.ssh import SSHOperator from fivetran_provider_async.operators import FivetranOperator -from fivetran_provider_async.sensors import FivetranSensor default_args = { "owner": "Airflow", @@ -22,29 +21,15 @@ connector_id="{{ var.value.linkedin_connector_id }}", ) - linkedin_sensor = FivetranSensor( - task_id="linkedin-sensor", - connector_id="{{ var.value.linkedin_connector_id }}", - poke_interval=600, - ) - twitter_sync = FivetranOperator( task_id="twitter-ads-sync", connector_id="{{ var.value.twitter_connector_id }}", ) - twitter_sensor = FivetranSensor( - task_id="twitter-sensor", - connector_id="{{ var.value.twitter_connector_id }}", - poke_interval=600, - ) - dbt_run = SSHOperator( task_id="dbt_ad_reporting", command="cd dbt_ad_reporting ; ~/.local/bin/dbt run -m +ad_reporting", ssh_conn_id="dbtvm", ) - linkedin_sync >> linkedin_sensor - twitter_sync >> twitter_sensor - [linkedin_sensor, twitter_sensor] >> dbt_run + [linkedin_sync, twitter_sync] >> dbt_run diff --git a/fivetran_provider_async/example_dags/example_fivetran_xcom.py b/fivetran_provider_async/example_dags/example_fivetran_xcom.py index 4946e82..27f6590 100644 --- a/fivetran_provider_async/example_dags/example_fivetran_xcom.py +++ b/fivetran_provider_async/example_dags/example_fivetran_xcom.py @@ -25,6 +25,7 @@ task_id="fivetran-operator", fivetran_conn_id="fivetran_default", connector_id="{{ var.value.connector_id }}", + wait_for_completion=False, ) delay_task = PythonOperator(task_id="delay_python_task", python_callable=lambda: time.sleep(60)) @@ -34,7 +35,7 @@ fivetran_conn_id="fivetran_default", connector_id="{{ var.value.connector_id }}", poke_interval=5, - xcom="{{ task_instance.xcom_pull('fivetran-operator', key='return_value') }}", + completed_after_time="{{ task_instance.xcom_pull('fivetran-operator', key='return_value') }}", ) fivetran_operator >> delay_task >> fivetran_sensor diff --git a/fivetran_provider_async/hooks.py b/fivetran_provider_async/hooks.py index c31b9ef..1e77d33 100644 --- a/fivetran_provider_async/hooks.py +++ b/fivetran_provider_async/hooks.py @@ -359,11 +359,19 @@ def get_last_sync(self, connector_id: str, xcom: str = "") -> pendulum.DateTime: :param connector_id: Fivetran connector_id, found in connector settings page in the Fivetran user interface. - :param xcom: Timestamp as string pull from FivetranOperator via XCOM + :param xcom: Deprecated. Timestamp as string pull from FivetranOperator via XCOM :return: Timestamp of last completed sync :rtype: Pendulum.DateTime """ if xcom: + import warnings + + warnings.warn( + "Param `xcom` is deprecated. Timestamps should be rendered in the Operator, Sensor," + " or Triggerer instead", + stacklevel=2, + ) + last_sync = self._parse_timestamp(xcom) else: connector_details = self.get_connector(connector_id) @@ -373,56 +381,105 @@ def get_last_sync(self, connector_id: str, xcom: str = "") -> pendulum.DateTime: return last_sync def get_sync_status( + self, connector_id: str, previous_completed_at: pendulum.DateTime, reschedule_time: int | None = None + ) -> bool: + import warnings + + warnings.warn( + "`get_sync_status()` is deprecated. Please use `is_synced_after_target_time()` instead.", + stacklevel=2, + ) + + return self.is_synced_after_target_time( + connector_id=connector_id, + completed_after_time=previous_completed_at, + reschedule_wait_time=reschedule_time, + always_wait_when_syncing=False, + propagate_failures_forward=False, + ) + + def is_synced_after_target_time( self, connector_id: str, - previous_completed_at: pendulum.DateTime, + completed_after_time: datetime, reschedule_wait_time: int | None = None, - *, - reschedule_time: int | None = None, # deprecated! + always_wait_when_syncing: bool = False, + propagate_failures_forward: bool = True, ) -> bool: """ For sensor, return True if connector's 'succeeded_at' field has updated. :param connector_id: Fivetran connector_id, found in connector settings page in the Fivetran user interface. - :param previous_completed_at: The last time the connector ran, collected on Sensor - initialization. - :param reschedule_wait_time: Optional, if connector is in reset state - number of seconds to wait before restarting, else Fivetran suggestion used - :param reschedule_time: Deprecated + :param completed_after_time: The time we are comparing the Fivetran + `succeeded_at` and `failed_at` timestamps to. The method returns + True when both the last `succeeded_at` exceeds the + `completed_after_time` and other conditions (determined by other + params to this method) are met. + :param reschedule_wait_time: Optional. If connector is in a + rescheduled state, this will be the number of seconds to wait + before restarting. If None, then Fivetran's suggestion is used + instead. + :param always_wait_when_syncing: If True, then this method will + always return False when the connector's sync_state is "syncing", + no matter what. + :param propagate_failures_forward: If True, then this method will always + raise an AirflowException when the most recent connector status is + a failure and there are no successes dated after the target time. + Specifically, this makes it so that + `completed_after_time > failed_at > succeeded_at` is considered a + fail condition. """ - if reschedule_time is not None: - import warnings - - warnings.warn( - "`reschedule_time` arg is deprecated. Please use `reschedule_wait_time` instead.", - stacklevel=2, - ) - if reschedule_wait_time is None: - reschedule_wait_time = reschedule_time - # @todo Need logic here to tell if the sync is not running at all and not # likely to run in the near future. connector_details = self.get_connector(connector_id) + return self._determine_if_synced_from_connector_details( + connector_id=connector_id, + connector_details=connector_details, + completed_after_time=completed_after_time, + reschedule_wait_time=reschedule_wait_time, + always_wait_when_syncing=always_wait_when_syncing, + propagate_failures_forward=propagate_failures_forward, + ) + + def _determine_if_synced_from_connector_details( + self, + connector_id: str, + connector_details: dict[str, Any], + completed_after_time: datetime, + reschedule_wait_time: int | None = None, + always_wait_when_syncing: bool = False, + propagate_failures_forward: bool = True, + ) -> bool: succeeded_at = self._parse_timestamp(connector_details["succeeded_at"]) failed_at = self._parse_timestamp(connector_details["failed_at"]) - current_completed_at = succeeded_at if succeeded_at > failed_at else failed_at + + sync_state = connector_details["status"]["sync_state"] + self.log.info("Connector %s: sync_state = %s", connector_id, sync_state) + + if always_wait_when_syncing and sync_state == "syncing": + return False # The only way to tell if a sync failed is to check if its latest # failed_at value is greater than then last known "sync completed at" value. - if failed_at > previous_completed_at: + if failed_at > completed_after_time > succeeded_at or ( + completed_after_time > failed_at > succeeded_at and propagate_failures_forward + ): service_name = connector_details["service"] schema_name = connector_details["schema"] raise AirflowException( - f'Fivetran sync for connector "{connector_id}" failed; ' + f"Fivetran sync for connector {connector_id} failed; " f"please see logs at " f"{self._connector_ui_url_logs(service_name, schema_name)}" ) - sync_state = connector_details["status"]["sync_state"] - self.log.info("Connector %s: sync_state = %s", connector_id, sync_state) + # Check if sync started by FivetranOperator has finished + # indicated by new 'succeeded_at' timestamp + if succeeded_at > completed_after_time: + self.log.info("Connector %s: succeeded_at: %s", connector_id, succeeded_at.to_iso8601_string()) + return True - # if sync in resheduled start, wait for time recommended by Fivetran + # if sync in rescheduled start, wait for time recommended by Fivetran # or manually specified, then restart sync if sync_state == "rescheduled" and connector_details["schedule_type"] == "manual": self.log.info('Connector is in "rescheduled" state and needs to be manually restarted') @@ -433,13 +490,7 @@ def get_sync_status( ) return False - # Check if sync started by FivetranOperator has finished - # indicated by new 'succeeded_at' timestamp - if current_completed_at > previous_completed_at: - self.log.info("Connector %s: succeeded_at: %s", connector_id, succeeded_at.to_iso8601_string()) - return True - else: - return False + return False def pause_and_restart( self, @@ -617,57 +668,67 @@ async def get_sync_status_async( connector_id: str, previous_completed_at: pendulum.DateTime, reschedule_wait_time: int | None = None, - ): + ) -> str: + import warnings + + warnings.warn( + "`get_sync_status_async()` is deprecated." + " Please use `is_synced_after_target_time_async()` instead.", + stacklevel=2, + ) + + return await self.is_synced_after_target_time_async( + connector_id=connector_id, + completed_after_time=previous_completed_at, + reschedule_wait_time=reschedule_wait_time, + always_wait_when_syncing=False, + propagate_failures_forward=False, + ) + + async def is_synced_after_target_time_async( + self, + connector_id: str, + completed_after_time: datetime, + reschedule_wait_time: int | None = None, + always_wait_when_syncing: bool = False, + propagate_failures_forward: bool = True, + ) -> str: """ For sensor, return True if connector's 'succeeded_at' field has updated. :param connector_id: Fivetran connector_id, found in connector settings page in the Fivetran user interface. - :param previous_completed_at: The last time the connector ran, collected on Sensor - initialization. - :param reschedule_wait_time: Optional, if connector is in reset state, - number of seconds to wait before restarting the sync. + :param completed_after_time: The time we are comparing the Fivetran + `succeeded_at` and `failed_at` timestamps to. The method returns + True when both the last `succeeded_at` exceeds the + `completed_after_time` and other conditions (determined by other + params to this method) are met. + :param reschedule_wait_time: Optional. If connector is in a + rescheduled state, this will be the number of seconds to wait + before restarting. If None, then Fivetran's suggestion is used + instead. + :param always_wait_when_syncing: If True, then this method will + always return False when the connector's sync_state is "syncing", + no matter what. + :param propagate_failures_forward: If True, then this method will always + raise an AirflowException when the most recent connector status is + a failure and there are no successes dated after the target time. + Specifically, this makes it so that + `completed_after_time > failed_at > succeeded_at` is considered a + fail condition. """ connector_details = await self.get_connector_async(connector_id) - succeeded_at = self._parse_timestamp(connector_details["succeeded_at"]) - failed_at = self._parse_timestamp(connector_details["failed_at"]) - current_completed_at = succeeded_at if succeeded_at > failed_at else failed_at - - # The only way to tell if a sync failed is to check if its latest - # failed_at value is greater than then last known "sync completed at" value. - if failed_at > previous_completed_at: - service_name = connector_details["service"] - schema_name = connector_details["schema"] - raise AirflowException( - f"Fivetran sync for connector {connector_id} failed; " - f"please see logs at " - f"{self._connector_ui_url_logs(service_name, schema_name)}" - ) - - sync_state = connector_details["status"]["sync_state"] - self.log.info('Connector "%s": sync_state = "%s"', connector_id, sync_state) - - # if sync in rescheduled start, wait for time recommended by Fivetran - # or manually specified, then restart sync - if sync_state == "rescheduled" and connector_details["schedule_type"] == "manual": - self.log.info('Connector is in "rescheduled" state and needs to be manually restarted') - self.pause_and_restart( - connector_id=connector_id, - reschedule_for=connector_details["status"]["rescheduled_for"], - reschedule_wait_time=reschedule_wait_time, - ) - - # Check if sync started by airflow has finished - # indicated by new 'succeeded_at' timestamp - if current_completed_at > previous_completed_at: - self.log.info( - 'Connector "%s": succeeded_at = "%s"', connector_id, succeeded_at.to_iso8601_string() - ) - job_status = "success" - return job_status - else: - job_status = "pending" - return job_status + is_completed = self._determine_if_synced_from_connector_details( + connector_id=connector_id, + connector_details=connector_details, + completed_after_time=completed_after_time, + reschedule_wait_time=reschedule_wait_time, + always_wait_when_syncing=always_wait_when_syncing, + propagate_failures_forward=propagate_failures_forward, + ) + if is_completed: + return "success" + return "pending" async def get_last_sync_async(self, connector_id: str, xcom: str = "") -> pendulum.DateTime: """ diff --git a/fivetran_provider_async/operators.py b/fivetran_provider_async/operators.py index 61df582..bc67cdb 100644 --- a/fivetran_provider_async/operators.py +++ b/fivetran_provider_async/operators.py @@ -1,8 +1,10 @@ from __future__ import annotations from functools import cached_property +from time import sleep from typing import TYPE_CHECKING, Any, Dict, Optional +import pendulum from airflow.exceptions import AirflowException from airflow.models import BaseOperator, BaseOperatorLink @@ -49,6 +51,7 @@ class FivetranOperator(BaseOperator): :param reschedule_wait_time: Optional, if connector is in reset state, number of seconds to wait before restarting the sync. :param deferrable: Run operator in deferrable mode. Default is True. + :param wait_for_completion: Wait for Fivetran sync to complete to finish the task. """ operator_extra_links = (RegistryLink(),) @@ -59,7 +62,6 @@ def __init__( self, connector_id: str, run_name: Optional[str] = None, - timeout_seconds: Optional[int] = None, fivetran_conn_id: str = "fivetran", fivetran_retry_limit: int = 3, fivetran_retry_delay: int = 1, @@ -67,17 +69,30 @@ def __init__( schedule_type: str = "manual", reschedule_wait_time: int = 0, deferrable: bool = True, + wait_for_completion: bool = True, **kwargs, - ): + ) -> None: self.connector_id = connector_id self.fivetran_conn_id = fivetran_conn_id self.run_name = run_name - self.timeout_seconds = timeout_seconds + + if "timeout_seconds" in kwargs: + import warnings + + warnings.warn( + "kwarg `timeout_seconds` is deprecated. Please use `execution_timeout` instead.", + DeprecationWarning, + stacklevel=2, + ) + if "execution_timeout" not in kwargs: + kwargs["execution_timeout"] = kwargs.pop("timeout_seconds", None) + self.fivetran_retry_limit = fivetran_retry_limit self.fivetran_retry_delay = fivetran_retry_delay self.poll_frequency = poll_frequency self.schedule_type = schedule_type self.reschedule_wait_time = reschedule_wait_time + self.wait_for_completion = wait_for_completion self.deferrable = deferrable super().__init__(**kwargs) @@ -87,11 +102,20 @@ def execute(self, context: Context) -> None | str: hook.prep_connector(self.connector_id, self.schedule_type) last_sync = hook.start_fivetran_sync(self.connector_id) - if not self.deferrable: + if not self.wait_for_completion: return last_sync + + if not self.deferrable: + self._wait_synchronously(pendulum.parse(last_sync)) # type: ignore[arg-type] + return None else: previous_completed_at = hook.get_last_sync(self.connector_id) - completed = hook.get_sync_status(self.connector_id, previous_completed_at) + completed = hook.is_synced_after_target_time( + self.connector_id, + previous_completed_at, + propagate_failures_forward=False, + always_wait_when_syncing=True, + ) if not completed: self.defer( timeout=self.execution_timeout, @@ -106,6 +130,25 @@ def execute(self, context: Context) -> None | str: ) return None + def _wait_synchronously(self, last_sync: pendulum.DateTime) -> None: + """ + Wait for the task synchronously. + + It is recommended that you do not use this, and instead set + `deferrable=True` and use a Triggerer if you want to wait for the task + to complete. + """ + while True: + is_completed = self.hook.is_synced_after_target_time( + self.connector_id, last_sync, propagate_failures_forward=False, always_wait_when_syncing=True + ) + if is_completed: + return + else: + self.log.info("sync is still running...") + self.log.info("sleeping for %s seconds.", self.poll_frequency) + sleep(self.poll_frequency) + @cached_property def hook(self) -> FivetranHook: """Create and return a FivetranHook.""" diff --git a/fivetran_provider_async/sensors.py b/fivetran_provider_async/sensors.py index ea1357c..6a8aba8 100644 --- a/fivetran_provider_async/sensors.py +++ b/fivetran_provider_async/sensors.py @@ -1,10 +1,12 @@ from __future__ import annotations +from datetime import datetime from functools import cached_property from typing import TYPE_CHECKING, Any from airflow.exceptions import AirflowException from airflow.sensors.base import BaseSensorOperator +from airflow.utils import timezone if TYPE_CHECKING: from airflow.utils.context import Context @@ -24,9 +26,16 @@ class FivetranSensor(BaseSensorOperator): use multiple instances of `FivetranSensor` to monitor multiple Fivetran connectors. - `FivetranSensor` starts monitoring for a new sync to complete starting from - the clock time the sensor is triggered. This sensor does not take into - account the DagRun's `logical_date` or `data_interval_end`. + By default, `FivetranSensor` starts monitoring for a new sync to complete + starting from the clock time the sensor is triggered. Alternatively, you can + specify the `completed_after_time`, in which case the sensor will wait for + a success which occurred after this timestamp. + + This sensor does not take into account when a Fivetran sync job is started; + it only looks at when it completes. For example, if the Fivetran sync job + starts at `2020-01-01 02:55:00` and ends at `2020-01-01 03:05:00`, and the + `completed_after_time` is `2020-01-01 03:00:00`, then the sensor will + stop waiting at `03:05:00`. `FivetranSensor` requires that you specify the `connector_id` of the sync job to start. You can find `connector_id` in the Settings page of the @@ -41,15 +50,32 @@ class FivetranSensor(BaseSensorOperator): :param connector_id: ID of the Fivetran connector to sync, found on the Connector settings page in the Fivetran Dashboard. :param poke_interval: Time in seconds that the job should wait in - between each tries + between each try :param fivetran_retry_limit: # of retries when encountering API errors :param fivetran_retry_delay: Time to wait before retrying API request - :param reschedule_wait_time: Optional, if connector is in reset state - number of seconds to wait before restarting, else Fivetran suggestion used + :param completed_after_time: Optional. The time we are comparing the + Fivetran `succeeded_at` and `failed_at` timestamps to. This field + is templated; common use cases may be to use XCOM or to use + "{{ data_interval_end }}". If left as None, then the Sensor will + set the `completed_after_time` to be the latest completed time, + meaning the Sensor will pass only when a new sync succeeds after + the time this Sensor started executing. + :param reschedule_wait_time: Optional. If connector is in a rescheduled + state, this will be the number of seconds to wait before restarting. If + None, then Fivetran's suggestion is used instead. + :param always_wait_when_syncing: If True, then this method will + always return False when the connector's sync_state is "syncing", + no matter what. + :param propagate_failures_forward: If True, then this method will always + raise an AirflowException when the most recent connector status is + a failure and there are no successes dated after the target time. + Specifically, this makes it so that + `completed_after_time > failed_at > succeeded_at` is considered a + fail condition. :param deferrable: Run sensor in deferrable mode. default is True. """ - template_fields = ["connector_id", "xcom"] + template_fields = ["connector_id", "completed_after_time"] def __init__( self, @@ -58,8 +84,10 @@ def __init__( poke_interval: int = 60, fivetran_retry_limit: int = 3, fivetran_retry_delay: int = 1, - xcom: str = "", + completed_after_time: str | datetime | None = None, reschedule_wait_time: int | None = None, + always_wait_when_syncing: bool = False, + propagate_failures_forward: bool = False, deferrable: bool = True, **kwargs: Any, ) -> None: @@ -69,13 +97,39 @@ def __init__( self.previous_completed_at: pendulum.DateTime | None = None self.fivetran_retry_limit = fivetran_retry_limit self.fivetran_retry_delay = fivetran_retry_delay - self.xcom = xcom + + if "xcom" in kwargs: + import warnings + + warnings.warn( + "kwarg `xcom` is deprecated. Please use `completed_after_time` instead.", + DeprecationWarning, + stacklevel=2, + ) + if completed_after_time is None: + completed_after_time = kwargs.pop("xcom", None) + + # (Similar parsing logic as airflow.sensors.date_time.DateTimeSensor, except allow None.) + # self.completed_after_time can't be a datetime object as it is a template_field + if isinstance(completed_after_time, datetime): + self.completed_after_time = completed_after_time.isoformat() + elif isinstance(completed_after_time, str): + self.completed_after_time = completed_after_time + elif completed_after_time is None: + self.completed_after_time = "" + else: + raise TypeError( + "Expected None, str, or datetime.datetime type for completed_after_time." + f" Got {type(completed_after_time)}" + ) + + self._completed_after_time_rendered: datetime | None = None if "reschedule_time" in kwargs: import warnings warnings.warn( - "kwarg `reschedule_time` is deprecated." " Please use `reschedule_wait_time` instead.", + "kwarg `reschedule_time` is deprecated. Please use `reschedule_wait_time` instead.", DeprecationWarning, stacklevel=2, ) @@ -83,6 +137,8 @@ def __init__( reschedule_wait_time = kwargs.pop("reschedule_time", None) self.reschedule_wait_time = reschedule_wait_time + self.always_wait_when_syncing = always_wait_when_syncing + self.propagate_failures_forward = propagate_failures_forward self.deferrable = deferrable super().__init__(**kwargs) @@ -98,7 +154,7 @@ def execute(self, context: Context) -> None: fivetran_conn_id=self.fivetran_conn_id, connector_id=self.connector_id, previous_completed_at=self.previous_completed_at, - xcom=self.xcom, + xcom=self.completed_after_time, poke_interval=self.poke_interval, reschedule_wait_time=self.reschedule_wait_time, ), @@ -114,12 +170,41 @@ def hook(self) -> FivetranHook: retry_delay=self.fivetran_retry_delay, ) - def poke(self, context: Context): - if self.previous_completed_at is None: - self.previous_completed_at = self.hook.get_last_sync(self.connector_id, self.xcom) + @property + def xcom(self) -> str: + import warnings - return self.hook.get_sync_status( - self.connector_id, self.previous_completed_at, self.reschedule_wait_time + warnings.warn( + "`xcom` attribute is deprecated. Use `completed_after_time` attribute instead.", + DeprecationWarning, + stacklevel=2, + ) + return self.completed_after_time + + @xcom.setter + def xcom(self, value: str) -> None: + import warnings + + warnings.warn( + "`xcom` attribute is deprecated. Use `completed_after_time` attribute instead.", + DeprecationWarning, + stacklevel=2, + ) + self.completed_after_time = value + + def poke(self, context: Context): + if self._completed_after_time_rendered is None: + if not self.completed_after_time: + self._completed_after_time_rendered = self.hook.get_last_sync(self.connector_id) + else: + self._completed_after_time_rendered = timezone.parse(self.completed_after_time) + + return self.hook.is_synced_after_target_time( + connector_id=self.connector_id, + completed_after_time=self._completed_after_time_rendered, + reschedule_wait_time=self.reschedule_wait_time, + always_wait_when_syncing=self.always_wait_when_syncing, + propagate_failures_forward=self.propagate_failures_forward, ) def execute_complete(self, context: Context, event: dict[Any, Any] | None = None) -> None: diff --git a/tests/hooks/test_fivetran.py b/tests/hooks/test_fivetran.py index 0e65ffc..4a3496c 100644 --- a/tests/hooks/test_fivetran.py +++ b/tests/hooks/test_fivetran.py @@ -186,30 +186,20 @@ async def test_fivetran_hook_get_sync_status_async( result = await hook.get_sync_status_async( connector_id="interchangeable_revenge", previous_completed_at=mock_previous_completed_at, - reschedule_wait_time=60, + reschedule_wait_time=5, ) assert result == expected_result @pytest.mark.asyncio - @pytest.mark.parametrize( - "mock_previous_completed_at, expected_result", - [ - ( - pendulum.datetime(2021, 3, 23), # current_completed_at > previous_completed_at - "success", - ), - ( - pendulum.datetime(2021, 3, 23, 21, 55), # current_completed_at < previous_completed_at - "pending", - ), - ], - ) @mock.patch("fivetran_provider_async.hooks.FivetranHookAsync._do_api_call_async") async def test_fivetran_hook_get_sync_status_async_with_reschedule_mode_error_for_wait_time( - self, mock_api_call_async_response, mock_previous_completed_at, expected_result + self, mock_api_call_async_response ): """Tests that get_sync_status_async method return error with rescheduled_for in Fivetran API response along with schedule_type as manual and negative wait time.""" + + # current_completed_at < previous_completed_at + mock_previous_completed_at = pendulum.datetime(2021, 3, 23, 21, 55) hook = FivetranHookAsync(fivetran_conn_id="conn_fivetran") mock_api_call_async_response.return_value = MOCK_FIVETRAN_RESPONSE_PAYLOAD_SHEETS_RESCHEDULE_MODE with pytest.raises(ValueError, match="Sync connector manually."): @@ -218,6 +208,29 @@ async def test_fivetran_hook_get_sync_status_async_with_reschedule_mode_error_fo previous_completed_at=mock_previous_completed_at, ) + @pytest.mark.asyncio + @mock.patch("fivetran_provider_async.hooks.FivetranHookAsync._do_api_call_async") + async def test_fivetran_hook_get_sync_status_async_with_reschedule_mode_returns_success( + self, + mock_api_call_async_response, + ): + """ + Tests that get_sync_status_async method returns success when + current_completed_at > previous_completed_at. + (The hook returns success because data is not being blocked up to the target completed time.) + """ + + # current_completed_at > previous_completed_at + mock_previous_completed_at = pendulum.datetime(2021, 3, 23) + + hook = FivetranHookAsync(fivetran_conn_id="conn_fivetran") + mock_api_call_async_response.return_value = MOCK_FIVETRAN_RESPONSE_PAYLOAD_SHEETS_RESCHEDULE_MODE + result = await hook.get_sync_status_async( + connector_id="interchangeable_revenge", + previous_completed_at=mock_previous_completed_at, + ) + assert result == "success" + @pytest.mark.asyncio @pytest.mark.parametrize( "mock_previous_completed_at, expected_result", @@ -250,7 +263,7 @@ async def test_fivetran_hook_get_sync_status_async_with_reschedule_mode( result = await hook.get_sync_status_async( connector_id="interchangeable_revenge", previous_completed_at=mock_previous_completed_at, - reschedule_wait_time=10, + reschedule_wait_time=3, ) assert result == expected_result @@ -294,10 +307,20 @@ async def test_fivetran_hook_get_sync_status_async_with_reschedule_for_and_sched @pytest.mark.asyncio @mock.patch("fivetran_provider_async.hooks.FivetranHookAsync._do_api_call_async") async def test_fivetran_hook_get_sync_status_async_exception(self, mock_api_call_async_response): - """Tests that get_sync_status_async method raises exception when failed_at > previous_completed_at""" + """ + Tests that get_sync_status_async method raises exception + when failed_at > previous_completed_at > succeeded_at + """ mock_previous_completed_at = pendulum.datetime(2021, 3, 21, 21, 55) hook = FivetranHookAsync(fivetran_conn_id="conn_fivetran") - mock_api_call_async_response.return_value = MOCK_FIVETRAN_RESPONSE_PAYLOAD_SHEETS + + # Set `failed_at` value so that failed_at > completed_after_time > succeeded_at + mock_fivetran_payload_sheets_modified = MOCK_FIVETRAN_RESPONSE_PAYLOAD_SHEETS.copy() + mock_fivetran_payload_sheets_modified["data"] = mock_fivetran_payload_sheets_modified["data"].copy() + mock_fivetran_payload_sheets_modified["data"]["failed_at"] = "2021-03-23T20:55:12.670390Z" + mock_fivetran_payload_sheets_modified["data"]["succeeded_at"] = "2021-03-19T20:55:12.670390Z" + + mock_api_call_async_response.return_value = mock_fivetran_payload_sheets_modified with pytest.raises(AirflowException) as exc: await hook.get_sync_status_async( @@ -305,6 +328,62 @@ async def test_fivetran_hook_get_sync_status_async_exception(self, mock_api_call ) assert "Fivetran sync for connector interchangeable_revenge failed" in str(exc.value) + @pytest.mark.asyncio + @mock.patch("fivetran_provider_async.hooks.FivetranHookAsync._do_api_call_async") + async def test_fivetran_hook_is_synced_async_propagate_errors_forward_exception( + self, mock_api_call_async_response + ): + """ + Tests that get_sync_status_async method raises exception + when completed_after_time > failed_at > succeeded_at + and propagate_failures_forward=True + """ + mock_completed_after_time = pendulum.datetime(2021, 3, 21, 21, 55) + hook = FivetranHookAsync(fivetran_conn_id="conn_fivetran") + + # Set `failed_at` value so that completed_after_time > failed_at > succeeded_at + mock_fivetran_payload_sheets_modified = MOCK_FIVETRAN_RESPONSE_PAYLOAD_SHEETS.copy() + mock_fivetran_payload_sheets_modified["data"] = mock_fivetran_payload_sheets_modified["data"].copy() + mock_fivetran_payload_sheets_modified["data"]["failed_at"] = "2021-03-20T20:55:12.670390Z" + mock_fivetran_payload_sheets_modified["data"]["succeeded_at"] = "2021-03-19T20:55:12.670390Z" + + mock_api_call_async_response.return_value = mock_fivetran_payload_sheets_modified + + with pytest.raises(AirflowException) as exc: + await hook.is_synced_after_target_time_async( + connector_id="interchangeable_revenge", + completed_after_time=mock_completed_after_time, + propagate_failures_forward=True, + ) + assert "Fivetran sync for connector interchangeable_revenge failed" in str(exc.value) + + @pytest.mark.asyncio + @mock.patch("fivetran_provider_async.hooks.FivetranHookAsync._do_api_call_async") + async def test_fivetran_hook_is_synced_async_propagate_errors_forward_is_false( + self, mock_api_call_async_response + ): + """ + Tests that get_sync_status_async method returns "pending" + when completed_after_time > failed_at > succeeded_at + and propagate_failures_forward=False + """ + mock_completed_after_time = pendulum.datetime(2021, 3, 24, 21, 55) + hook = FivetranHookAsync(fivetran_conn_id="conn_fivetran") + + # Set `failed_at` value so that completed_after_time > failed_at > succeeded_at + mock_fivetran_payload_sheets_modified = MOCK_FIVETRAN_RESPONSE_PAYLOAD_SHEETS.copy() + mock_fivetran_payload_sheets_modified["data"] = mock_fivetran_payload_sheets_modified["data"].copy() + mock_fivetran_payload_sheets_modified["data"]["failed_at"] = "2021-03-23T20:59:12.670390Z" + + mock_api_call_async_response.return_value = mock_fivetran_payload_sheets_modified + + result = await hook.is_synced_after_target_time_async( + connector_id="interchangeable_revenge", + completed_after_time=mock_completed_after_time, + propagate_failures_forward=False, + ) + assert result == "pending" + @pytest.mark.asyncio @mock.patch("fivetran_provider_async.hooks.FivetranHookAsync.start_fivetran_sync") @mock.patch("fivetran_provider_async.hooks.FivetranHookAsync._do_api_call_async")