-
Notifications
You must be signed in to change notification settings - Fork 0
/
PySparkJob2.py
50 lines (39 loc) · 1.65 KB
/
PySparkJob2.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import argparse
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
def process(spark, flights_path, result_path):
"""
Основной процесс задачи.
:param spark: SparkSession
:param flights_path: путь до датасета c рейсами
:param result_path: путь с результатами преобразований
"""
data = spark.read.parquet(flights_path)
df = data.where(F.col('TAIL_NUMBER').isNotNull()) \
.groupBy(F.col('ORIGIN_AIRPORT'), F.col('DESTINATION_AIRPORT')) \
.agg(F.count(F.col('TAIL_NUMBER')).alias('tail_count'),
F.avg(F.col('AIR_TIME')).alias('avg_air_time')) \
.select(F.col('ORIGIN_AIRPORT'),
F.col('DESTINATION_AIRPORT'),
F.col('tail_count'),
F.col('avg_air_time')) \
.orderBy(F.col('tail_count').desc()) \
.limit(10)
df.write.parquet(result_path)
def main(flights_path, result_path):
spark = _spark_session()
process(spark, flights_path, result_path)
def _spark_session():
"""
Создание SparkSession.
:return: SparkSession
"""
return SparkSession.builder.appName('PySparkJob1').getOrCreate()
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument('--flights_path', type=str, default='flights.parquet', help='Please set flights datasets path.')
parser.add_argument('--result_path', type=str, default='result', help='Please set result path.')
args = parser.parse_args()
flights_path = args.flights_path
result_path = args.result_path
main(flights_path, result_path)