forked from cc-archive/cccatalog
-
Notifications
You must be signed in to change notification settings - Fork 0
/
loaderWorkflow.py
146 lines (109 loc) · 6.25 KB
/
loaderWorkflow.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator, BranchPythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.hooks.postgres_hook import PostgresHook
from airflow.utils.trigger_rule import TriggerRule
from datetime import datetime, timedelta
from pytz import timezone
import os
DB_NAME = 'postgres_openledger_upstream'
def verifyTable():
pgHook = PostgresHook(postgres_conn_id=DB_NAME)
query = """CREATE TABLE IF NOT EXISTS public.provider_image_data (
foreign_identifier character varying(3000),
foreign_landing_url character varying(1000),
url character varying(3000),
thumbnail character varying(3000),
width integer,
height integer,
filesize character varying(100),
license character varying(50),
license_version character varying(25),
creator character varying(2000),
creator_url character varying(2000),
title character varying(5000),
meta_data jsonb,
tags jsonb,
watermarked boolean,
provider character varying(80),
source character varying(80)
);"""
pgHook.run(query)
query = 'ALTER TABLE public.provider_image_data OWNER TO deploy;'
pgHook.run(query)
#CREATE INDEX
query = 'CREATE INDEX IF NOT EXISTS provider_image_data_provider_key ON public.provider_image_data USING btree (provider);'
pgHook.run(query)
query = 'CREATE INDEX IF NOT EXISTS provider_image_data_foreign_identifier_key ON public.provider_image_data USING btree (provider, md5((foreign_identifier)::text));'
pgHook.run(query)
query = 'CREATE INDEX IF NOT EXISTS provider_image_data_url_key ON public.provider_image_data USING btree (provider, md5((url)::text));'
pgHook.run(query)
def loadData():
#create intermediary table and indices if they do not exist
verifyTable()
#get all tsv files that are in the output directory
tsvFiles = getTextFiles()
if tsvFiles:
tsvFiles = list(tsvFiles)
else:
return None
print('Preparing to load the following file(s): {}'.format(','.join(tsvFiles)))
#load each file into the DB
#map(lambda fp: importData(fp), tsvFiles)
for fp in tsvFiles:
importData(fp)
def getTextFiles():
outputDir = os.environ['OUTPUT_DIR']
fileList = os.listdir(outputDir)
curTime = datetime.now()
endTime = curTime - timedelta(minutes=15)
if fileList:
#filter TSV files
fileList = filter(lambda fp: fp.endswith('.tsv'), fileList)
#get all files that were not modified in the last 15 minutes
fileList = filter(lambda fp: datetime.fromtimestamp(
os.stat(os.path.join(outputDir, fp)).st_mtime) <= endTime,
fileList)
if fileList:
return map(lambda fp: '{}{}'.format(outputDir, fp), fileList)
def importData(_filename):
print('Processing: {}'.format(_filename))
pgHook = PostgresHook(postgres_conn_id=DB_NAME)
#LOAD the data INTO the intermediary table
pgHook.bulk_load('provider_image_data', _filename)
#DELETE invalid records
pgHook.run('DELETE FROM provider_image_data WHERE url IS NULL;')
pgHook.run('DELETE FROM provider_image_data WHERE license IS NULL;')
pgHook.run('DELETE FROM provider_image_data WHERE foreign_landing_url IS NULL;')
pgHook.run('DELETE FROM provider_image_data WHERE foreign_identifier IS NULL;')
#DELETE duplicate records
pgHook.run('DELETE FROM provider_image_data p1 USING provider_image_data p2 WHERE p1.ctid < p2.ctid and p1.provider = p2.provider and p1.foreign_identifier = p2.foreign_identifier;')
#UPSERT the records
query = """INSERT INTO image (created_on, updated_on, provider, source, foreign_identifier, foreign_landing_url, url, thumbnail, width, height, license, license_version, creator, creator_url, title, last_synced_with_source, removed_from_source, meta_data, tags, watermarked)
SELECT NOW(), NOW(), provider, source, foreign_identifier, foreign_landing_url, url, thumbnail, width, height, license, license_version, creator, creator_url, title, NOW(), 'f', meta_data, tags, watermarked
FROM provider_image_data
ON CONFLICT (provider, md5((foreign_identifier)::text), md5((url)::text))
DO UPDATE SET updated_on = now(), foreign_landing_url = EXCLUDED.foreign_landing_url, url = EXCLUDED.url, thumbnail = EXCLUDED.thumbnail, width = EXCLUDED.width, height = EXCLUDED.height, license = EXCLUDED.license, license_version = EXCLUDED.license_version, creator = EXCLUDED.creator, creator_url = EXCLUDED.creator_url, title = EXCLUDED.title, last_synced_with_source = NOW(), removed_from_source = 'f', meta_data = EXCLUDED.meta_data, watermarked = EXCLUDED.watermarked
WHERE image.foreign_identifier = EXCLUDED.foreign_identifier and image.provider = EXCLUDED.provider;"""
pgHook.run(query)
#delete rows from the intermediary table
#If the above task was NOT successful, the task state will switch to 'up for retry' and the tasks below will not be performed (i.e. it will be queued until its preceding task is successful
pgHook.run('DELETE FROM provider_image_data;')
#REMOVE the file that was loaded
os.remove(_filename) if os.path.exists(_filename) else None
args = {
'owner': 'data-eng-admin',
'depends_on_past': False,
'start_date': datetime(2019, 1, 15),
#'email': '[email protected]', #not configured
#'email_on_failure': True,
'email_on_retry': False,
'retries': 3,
'retry_delay': timedelta(minutes=30),
}
dag = DAG(dag_id='DB_Loader', default_args=args, schedule_interval='0 */4 * * *', catchup=False) #run every 4 hours
beginTask = BashOperator(task_id='Begin', dag=dag, bash_command='echo Begin DB Loader')
endTask = BashOperator(task_id='End', dag=dag, bash_command='echo Terminating DB Loader')
loadTask = PythonOperator(task_id='Load', provide_context=False, python_callable=loadData, dag=dag)
beginTask >> loadTask >> endTask