forked from cc-archive/cccatalog
-
Notifications
You must be signed in to change notification settings - Fork 0
/
common_api_workflows.py
80 lines (63 loc) · 2.07 KB
/
common_api_workflows.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
import logging
import os
from airflow import DAG
from croniter import croniter
import util.config as conf
from util.operator_util import get_runner_operator, get_log_operator
logging.basicConfig(
format='%(asctime)s: [%(levelname)s - DAG Loader] %(message)s',
level=logging.INFO)
CRONTAB_STR = conf.CRONTAB_STR
SCRIPT = conf.SCRIPT
DAG_DEFAULT_ARGS = conf.DAG_DEFAULT_ARGS
DAG_VARIABLES = conf.DAG_VARIABLES
def load_dag_conf(source, DAG_VARIABLES):
"""Validate and load configuration variables"""
logging.info('Loading configuration for {}'.format(source))
logging.debug('DAG_VARIABLES: {}'.format(DAG_VARIABLES))
dag_id = '{}_workflow'.format(source)
script_location = DAG_VARIABLES[source].get(SCRIPT)
try:
assert os.path.exists(script_location)
except Exception as e:
logging.warning(
'Invalid script location: {}. Error: {}'
.format(script_location, e)
)
script_location = None
crontab_str = DAG_VARIABLES[source].get(CRONTAB_STR)
try:
croniter(crontab_str)
except Exception as e:
logging.warning(
'Invalid crontab string: {}. Error: {}'.format(crontab_str, e)
)
crontab_str = None
return script_location, dag_id, crontab_str
def create_dag(
source,
script_location,
dag_id,
crontab_str=None,
default_args=DAG_DEFAULT_ARGS):
dag = DAG(
dag_id=dag_id,
default_args=default_args,
schedule_interval=crontab_str,
catchup=False
)
with dag:
start_task = get_log_operator(dag, source, 'starting')
run_task = get_runner_operator(dag, source, script_location)
end_task = get_log_operator(dag, source, 'finished')
start_task >> run_task >> end_task
return dag
for source in DAG_VARIABLES:
script_location, dag_id, crontab_str = load_dag_conf(source, DAG_VARIABLES)
if script_location:
globals()[dag_id] = create_dag(
source,
script_location,
dag_id,
crontab_str
)