forked from cc-archive/cccatalog
-
Notifications
You must be signed in to change notification settings - Fork 0
/
monthlyWorkflow.py
46 lines (36 loc) · 1.79 KB
/
monthlyWorkflow.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator, BranchPythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.hooks.postgres_hook import PostgresHook
from airflow.utils.trigger_rule import TriggerRule
from datetime import datetime, timedelta
from pytz import timezone
import os
airflowHome = os.environ['AIRFLOW_HOME']
args = {
'owner': 'data-eng-admin',
'depends_on_past': False,
'start_date': datetime(2019, 1, 15),
#'email': '[email protected]', #not configured
#'email_on_failure': True,
'email_on_retry': False,
'retries': 3,
'retry_delay': timedelta(days=1),
}
dag = DAG(dag_id='Monthly_Workflow', default_args=args, schedule_interval='0 16 15 * *', catchup=False)#@monthly - the 15th at 16:00
beginTask = BashOperator(task_id='Begin_Monthly_Tasks', bash_command='echo Begin monthly tasks', dag=dag)
#reprocess Cleveland Museum's collection
cmaTask = BashOperator(task_id='ClevelandMuseum',
bash_command='python {0}/dags/api/ClevelandMuseum.py'.format(airflowHome),
dag=dag)
#sync the common crawl AWS ETL data to the OUTPUT_DIR
s3SyncTask = BashOperator(task_id='Sync_Common_Crawl_Image_Data',
bash_command='python {0}/dags/commoncrawl_s3_syncer/SyncImageProviders.py'.format(airflowHome),
dag=dag)
#reprocess Rawpixel
rawpixelTask = BashOperator(task_id='RawPixel',
bash_command='python {0}/dags/api/RawPixel.py'.format(airflowHome),
dag=dag)
endTask = BashOperator(task_id='End', trigger_rule=TriggerRule.ALL_DONE, bash_command='echo Terminating monthly workflow', dag=dag)
beginTask >> [cmaTask, rawpixelTask, s3SyncTask] >> endTask