-
Notifications
You must be signed in to change notification settings - Fork 0
/
target_data_loading.py
63 lines (49 loc) · 2.56 KB
/
target_data_loading.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
from pyspark.sql import SparkSession
import yaml
import os.path
from pyspark.sql.functions import current_date, col
import aws_utils as ut
if __name__ == '__main__':
# Create the SparkSession
spark = SparkSession \
.builder \
.appName("Read ingestion enterprise applications") \
.getOrCreate()
spark.sparkContext.setLogLevel('ERROR')
current_dir = os.path.abspath(os.path.dirname(__file__))
app_config_path = os.path.abspath(current_dir + "/../" + "application.yml")
app_secrets_path = os.path.abspath(current_dir + "/../" + ".secrets")
conf = open(app_config_path)
app_conf = yaml.load(conf, Loader=yaml.FullLoader)
secret = open(app_secrets_path)
app_secret = yaml.load(secret, Loader=yaml.FullLoader)
# Setup spark to use s3
hadoop_conf = spark.sparkContext._jsc.hadoopConfiguration()
hadoop_conf.set("fs.s3a.access.key", app_secret["s3_conf"]["access_key"])
hadoop_conf.set("fs.s3a.secret.key", app_secret["s3_conf"]["secret_access_key"])
tgt_list = app_conf['target_list']
for tgt in tgt_list:
print('Preparing', tgt, 'data,')
tgt_conf = app_conf[tgt]
if tgt == 'REGIS_DIM':
print('Loading the source data,')
for src in tgt_conf['source_data']:
file_path = "s3a:https://" + app_conf["s3_conf"]["s3_bucket"] + "/" + app_conf["s3_conf"]["staging_location"] + "/" + src
src_df = ut.read_parquet_from_s3(spark, file_path)
src_df.show()
src_df.createOrReplaceTempView(src)
src_df.printSchema()
src_df.show(5, False)
print('Preparing the', tgt, 'data,')
regis_dim_df = spark.sql(tgt_conf['loadingQuery'])
regis_dim_df.show()
jdbc_url = ut.get_redshift_jdbc_url(app_secret)
print(jdbc_url)
ut.write_data_to_redshift(regis_dim_df,
jdbc_url,
"s3a:https://" + app_conf["s3_conf"]["s3_bucket"] + "/temp",
tgt_conf['tableName'])
print("Completed <<<<<<<<<")
# spark-submit --packages "io.github.spark-redshift-community:spark-redshift_2.11:4.0.1,org.apache.spark:spark-avro_2.11:2.4.2,org.apache.hadoop:hadoop-aws:2.7.4"\
# --jars "/usr/share/aws/redshift/jdbc/RedshiftJDBC.jar,/usr/share/aws/redshift/spark-redshift/lib/spark-redshift.jar,/usr/share/aws/redshift/spark-redshift/lib/spark-avro.jar,/usr/share/aws/redshift/spark-redshift/lib/minimal-json.jar" \
# com.test/target_data_loading.py