Skip to content

Commit

Permalink
API Changes to FivetranSensor for supporting backfills (#58)
Browse files Browse the repository at this point in the history
Co-authored-by: Wei Lee <[email protected]>
  • Loading branch information
dwreeves and Lee-W committed Nov 6, 2023
1 parent e527f30 commit 9fa930d
Show file tree
Hide file tree
Showing 9 changed files with 445 additions and 165 deletions.
58 changes: 49 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,32 +33,72 @@ The sensor assumes the `Conn Id` is set to `fivetran`, however if you are managi

### [Fivetran Operator Async](https://github.com/astronomer/airflow-provider-fivetran-async/tree/main/fivetran_provider_async/operators.py)

`FivetranOperator` submits a Fivetran sync job and monitors it on trigger for completion.
It requires that you specify the `connector_id` of the sync job to start. You can find `connector_id` in the Settings page of the connector you configured in the [Fivetran dashboard](https://fivetran.com/dashboard/connectors).

Import into your DAG via:
```python
from fivetran_provider_async.operators import FivetranOperator
```

`FivetranOperator` submits a Fivetran sync job and monitors it on trigger for completion.

`FivetranOperator` requires that you specify the `connector_id` of the Fivetran connector you wish to trigger. You can find `connector_id` in the Settings page of the connector you configured in the [Fivetran dashboard](https://fivetran.com/dashboard/connectors).

The `FivetranOperator` will wait for the sync to complete so long as `wait_for_completion=True` (this is the default). It is recommended that
you run in deferrable mode (this is also the default). If `wait_for_completion=False`, the operator will return the timestamp for the last sync.

Import into your DAG via:

### [Fivetran Sensor Async](https://github.com/astronomer/airflow-provider-fivetran-async/tree/main/fivetran_provider_async/sensors.py)

```python
from fivetran_provider_async.sensors import FivetranSensor
```

`FivetranSensor` monitors a Fivetran sync job for completion.
Monitoring with `FivetranSensor` allows you to trigger downstream processes only when the Fivetran sync jobs have completed, ensuring data consistency.


`FivetranSensor` requires that you specify the `connector_id` of the Fivetran connector you want to wait for. You can find `connector_id` in the Settings page of the connector you configured in the [Fivetran dashboard](https://fivetran.com/dashboard/connectors).

You can use multiple instances of `FivetranSensor` to monitor multiple Fivetran connectors.

If used in this way,
`FivetranSensor` is most commonly useful in two scenarios:

`FivetranSensor` requires that you specify the `connector_id` of the sync job to start. You can find `connector_id` in the Settings page of the connector you configured in the [Fivetran dashboard](https://fivetran.com/dashboard/connectors).
1. Fivetran is using a separate scheduler than the Airflow scheduler.
2. You set `wait_for_completion=False` in the `FivetranOperator`, and you need to await the `FivetranOperator` task later. (You may want to do this if you want to arrange your DAG such that some tasks are dependent on _starting_ a sync and other tasks are dependent on _completing_ a sync).

If you are doing the 1st pattern, you may find it useful to set the `completed_after_time` to `data_interval_end`, or `data_interval_end` with some buffer:

Import into your DAG via:
```python
from fivetran_provider_async.sensors import FivetranSensor
fivetran_sensor = FivetranSensor(
task_id="wait_for_fivetran_externally_scheduled_sync",
connector_id="bronzing_largely",
poke_interval=5,
completed_after_time="{{ data_interval_end + macros.timedelta(minutes=1) }}",
)
```

If you are doing the 2nd pattern, you can use XComs to pass the target completed time to the sensor:

```python
fivetran_op = FivetranOperator(
task_id="fivetran_sync_my_db",
connector_id="bronzing_largely",
wait_for_completion=False,
)

fivetran_sensor = FivetranSensor(
task_id="wait_for_fivetran_db_sync",
connector_id="bronzing_largely",
poke_interval=5,
completed_after_time="{{ task_instance.xcom_pull('fivetran_sync_op', key='return_value') }}",
)

fivetran_op >> fivetran_sensor
```

You may also specify the `FivetranSensor` without a `completed_after_time`.
In this case, the sensor will make note of when the last completed time was, and will wait for a new completed time.

Import into your DAG via:

## Examples

See the [**examples**](https://github.com/astronomer/airflow-provider-fivetran-async/tree/main/fivetran_provider_async/example_dags) directory for an example DAG.
Expand Down
13 changes: 1 addition & 12 deletions fivetran_provider_async/example_dags/example_fivetran.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from airflow import DAG

from fivetran_provider_async.operators import FivetranOperator
from fivetran_provider_async.sensors import FivetranSensor

default_args = {
"owner": "Airflow",
Expand All @@ -19,18 +18,8 @@

with dag:
fivetran_sync_start = FivetranOperator(
task_id="fivetran-task",
task_id="fivetran_task",
fivetran_conn_id="fivetran_default",
connector_id="{{ var.value.connector_id }}",
deferrable=False,
)

fivetran_sync_wait = FivetranSensor(
task_id="fivetran-sensor",
fivetran_conn_id="fivetran_default",
connector_id="{{ var.value.connector_id }}",
poke_interval=5,
deferrable=False,
)

fivetran_sync_start >> fivetran_sync_wait
21 changes: 9 additions & 12 deletions fivetran_provider_async/example_dags/example_fivetran_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from airflow import DAG

from fivetran_provider_async.operators import FivetranOperator
from fivetran_provider_async.sensors import FivetranSensor

default_args = {
"owner": "Airflow",
Expand All @@ -19,22 +18,20 @@
)

with dag:
# Both of these tasks will start a Fivetran sync,
# and will wait for the Fivetran sync to complete before marking
# the task as success.

# However, the async operator uses the triggerer instance to do this,
# which frees up a worker slot.

fivetran_async_op = FivetranOperator(
task_id="fivetran_async_op",
connector_id="bronzing_largely",
)

fivetran_sync_op = FivetranOperator(
task_id="fivetran_sync_op",
connector_id="bronzing_largely",
deferrable=False,
)

fivetran_async_sensor = FivetranSensor(
task_id="fivetran_async_sensor",
connector_id="bronzing_largely",
poke_interval=5,
xcom="{{ task_instance.xcom_pull('fivetran_sync_op', key='return_value') }}",
task_id="fivetran_sync_op", connector_id="bronzing_largely", deferrable=False
)

fivetran_async_op >> fivetran_sync_op >> fivetran_async_sensor
fivetran_async_op >> fivetran_sync_op
17 changes: 1 addition & 16 deletions fivetran_provider_async/example_dags/example_fivetran_dbt.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
from airflow.providers.ssh.operators.ssh import SSHOperator

from fivetran_provider_async.operators import FivetranOperator
from fivetran_provider_async.sensors import FivetranSensor

default_args = {
"owner": "Airflow",
Expand All @@ -22,29 +21,15 @@
connector_id="{{ var.value.linkedin_connector_id }}",
)

linkedin_sensor = FivetranSensor(
task_id="linkedin-sensor",
connector_id="{{ var.value.linkedin_connector_id }}",
poke_interval=600,
)

twitter_sync = FivetranOperator(
task_id="twitter-ads-sync",
connector_id="{{ var.value.twitter_connector_id }}",
)

twitter_sensor = FivetranSensor(
task_id="twitter-sensor",
connector_id="{{ var.value.twitter_connector_id }}",
poke_interval=600,
)

dbt_run = SSHOperator(
task_id="dbt_ad_reporting",
command="cd dbt_ad_reporting ; ~/.local/bin/dbt run -m +ad_reporting",
ssh_conn_id="dbtvm",
)

linkedin_sync >> linkedin_sensor
twitter_sync >> twitter_sensor
[linkedin_sensor, twitter_sensor] >> dbt_run
[linkedin_sync, twitter_sync] >> dbt_run
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
task_id="fivetran-operator",
fivetran_conn_id="fivetran_default",
connector_id="{{ var.value.connector_id }}",
wait_for_completion=False,
)

delay_task = PythonOperator(task_id="delay_python_task", python_callable=lambda: time.sleep(60))
Expand All @@ -34,7 +35,7 @@
fivetran_conn_id="fivetran_default",
connector_id="{{ var.value.connector_id }}",
poke_interval=5,
xcom="{{ task_instance.xcom_pull('fivetran-operator', key='return_value') }}",
completed_after_time="{{ task_instance.xcom_pull('fivetran-operator', key='return_value') }}",
)

fivetran_operator >> delay_task >> fivetran_sensor
Loading

0 comments on commit 9fa930d

Please sign in to comment.