Skip to content

Commit

Permalink
clarify examples
Browse files Browse the repository at this point in the history
  • Loading branch information
dwreeves committed Nov 2, 2023
1 parent bbda70c commit 5588e1c
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 24 deletions.
13 changes: 1 addition & 12 deletions fivetran_provider_async/example_dags/example_fivetran.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from airflow import DAG

from fivetran_provider_async.operators import FivetranOperator
from fivetran_provider_async.sensors import FivetranSensor

default_args = {
"owner": "Airflow",
Expand All @@ -19,18 +18,8 @@

with dag:
fivetran_sync_start = FivetranOperator(
task_id="fivetran-task",
task_id="fivetran_task",
fivetran_conn_id="fivetran_default",
connector_id="{{ var.value.connector_id }}",
deferrable=False,
)

fivetran_sync_wait = FivetranSensor(
task_id="fivetran-sensor",
fivetran_conn_id="fivetran_default",
connector_id="{{ var.value.connector_id }}",
poke_interval=5,
deferrable=False,
)

fivetran_sync_start >> fivetran_sync_wait
21 changes: 9 additions & 12 deletions fivetran_provider_async/example_dags/example_fivetran_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from airflow import DAG

from fivetran_provider_async.operators import FivetranOperator
from fivetran_provider_async.sensors import FivetranSensor

default_args = {
"owner": "Airflow",
Expand All @@ -19,22 +18,20 @@
)

with dag:
# Both of these tasks will start a Fivetran sync,
# and will wait for the Fivetran sync to complete before marking
# the task as success.

# However, the async operator uses the triggerer instance to do this,
# which frees up a worker slot.

fivetran_async_op = FivetranOperator(
task_id="fivetran_async_op",
connector_id="bronzing_largely",
)

fivetran_sync_op = FivetranOperator(
task_id="fivetran_sync_op",
connector_id="bronzing_largely",
deferrable=False,
)

fivetran_async_sensor = FivetranSensor(
task_id="fivetran_async_sensor",
connector_id="bronzing_largely",
poke_interval=5,
completed_after_time="{{ task_instance.xcom_pull('fivetran_sync_op', key='return_value') }}",
task_id="fivetran_sync_op", connector_id="bronzing_largely", deferrable=False
)

fivetran_async_op >> fivetran_sync_op >> fivetran_async_sensor
fivetran_async_op >> fivetran_sync_op
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
task_id="fivetran-operator",
fivetran_conn_id="fivetran_default",
connector_id="{{ var.value.connector_id }}",
wait_for_completion=False,
)

delay_task = PythonOperator(task_id="delay_python_task", python_callable=lambda: time.sleep(60))
Expand Down

0 comments on commit 5588e1c

Please sign in to comment.