Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use a temporary directory to manage temporary files #117

Merged
merged 1 commit into from
Aug 16, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 62 additions & 56 deletions datapusher/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import subprocess
import tempfile
import time
import decimal
from urllib.parse import urlsplit
import logging

Expand Down Expand Up @@ -366,6 +367,13 @@ def push_to_datastore(task_id, input, dry_run=False):
:type dry_run: boolean

"""

# Ensure temporary files are removed after run
with tempfile.TemporaryDirectory() as temp_dir:
return _push_to_datastore(task_id, input, dry_run=dry_run, temp_dir=temp_dir)


def _push_to_datastore(task_id, input, dry_run=False, temp_dir=None):
handler = util.StoringHandler(task_id, input)
logger = logging.getLogger(task_id)
logger.addHandler(handler)
Expand Down Expand Up @@ -492,7 +500,7 @@ def push_to_datastore(task_id, input, dry_run=False):
else:
logger.info("File format: {}".format(resource_format))

tmp = tempfile.NamedTemporaryFile(suffix="." + resource_format)
tmp = os.path.join(temp_dir, 'tmp.' + resource_format)
length = 0
m = hashlib.md5()

Expand All @@ -502,16 +510,17 @@ def push_to_datastore(task_id, input, dry_run=False):
else:
logger.info("Downloading file of unknown size...")

for chunk in response.iter_content(int(config.get("CHUNK_SIZE"))):
length += len(chunk)
if length > max_content_length and not preview_rows:
raise util.JobError(
"Resource too large to process: {cl} > max ({max_cl}).".format(
cl=length, max_cl=max_content_length
with open(tmp, 'wb') as tmp_file:
for chunk in response.iter_content(int(config.get("CHUNK_SIZE"))):
length += len(chunk)
if length > max_content_length and not preview_rows:
raise util.JobError(
"Resource too large to process: {cl} > max ({max_cl}).".format(
cl=length, max_cl=max_content_length
)
)
)
tmp.write(chunk)
m.update(chunk)
tmp_file.write(chunk)
m.update(chunk)

except requests.HTTPError as e:
raise HTTPError(
Expand All @@ -527,7 +536,6 @@ def push_to_datastore(task_id, input, dry_run=False):
)

file_hash = m.hexdigest()
tmp.seek(0)

# check if the resource metadata (like data dictionary data types)
# has been updated since the last fetch
Expand Down Expand Up @@ -591,24 +599,23 @@ def push_to_datastore(task_id, input, dry_run=False):
# first, we need a temporary spreadsheet filename with the right file extension
# we only need the filename though, that's why we remove it
# and create a hardlink to the file we got from CKAN
qsv_spreadsheet = tempfile.NamedTemporaryFile(suffix="." + resource_format)
os.remove(qsv_spreadsheet.name)
os.link(tmp.name, qsv_spreadsheet.name)
qsv_spreadsheet = os.path.join(temp_dir, 'qsv_spreadsheet.' + resource_format)
os.link(tmp, qsv_spreadsheet)

# run `qsv excel` and export it to a CSV
# use --trim option to trim column names and the data
qsv_excel_csv = tempfile.NamedTemporaryFile(suffix=".csv")
qsv_excel_csv = os.path.join(temp_dir, 'qsv_excel.csv')
try:
qsv_excel = subprocess.run(
[
qsv_bin,
"excel",
qsv_spreadsheet.name,
qsv_spreadsheet,
"--sheet",
str(default_excel_sheet),
"--trim",
"--output",
qsv_excel_csv.name,
qsv_excel_csv,
],
check=True,
capture_output=True,
Expand All @@ -624,7 +631,7 @@ def push_to_datastore(task_id, input, dry_run=False):
# just in case the file is not actually a spreadsheet or is encrypted
# so the user has some actionable info
file_format = subprocess.run(
[file_bin, qsv_spreadsheet.name],
[file_bin, qsv_spreadsheet],
check=True,
capture_output=True,
text=True,
Expand All @@ -637,7 +644,6 @@ def push_to_datastore(task_id, input, dry_run=False):
)

return
qsv_spreadsheet.close()
excel_export_msg = qsv_excel.stderr
logger.info("{}...".format(excel_export_msg))
tmp = qsv_excel_csv
Expand All @@ -649,7 +655,7 @@ def push_to_datastore(task_id, input, dry_run=False):
# Note that we only change the workfile, the resource file itself is unchanged.

# ------------------- Normalize to CSV ---------------------
qsv_input_csv = tempfile.NamedTemporaryFile(suffix=".csv")
qsv_input_csv = os.path.join(temp_dir, 'qsv_input.csv')
# if resource_format is CSV we don't need to normalize
if resource_format.upper() == "CSV":
logger.info("Normalizing/UTF-8 transcoding {}...".format(resource_format))
Expand All @@ -663,10 +669,10 @@ def push_to_datastore(task_id, input, dry_run=False):
[
qsv_bin,
"input",
tmp.name,
tmp,
"--trim-headers",
"--output",
qsv_input_csv.name,
qsv_input_csv,
],
check=True,
)
Expand All @@ -687,7 +693,7 @@ def push_to_datastore(task_id, input, dry_run=False):
logger.info("Validating CSV...")
try:
subprocess.run(
[qsv_bin, "validate", tmp.name], check=True, capture_output=True, text=True
[qsv_bin, "validate", tmp], check=True, capture_output=True, text=True
)
except subprocess.CalledProcessError as e:
# return as we can't push an invalid CSV file
Expand All @@ -707,7 +713,7 @@ def push_to_datastore(task_id, input, dry_run=False):
logger.info("Checking for duplicates and if the CSV is sorted...")
try:
qsv_sortcheck = subprocess.run(
[qsv_bin, "sortcheck", tmp.name, "--json"],
[qsv_bin, "sortcheck", tmp, "--json"],
capture_output=True,
text=True,
)
Expand All @@ -729,9 +735,9 @@ def push_to_datastore(task_id, input, dry_run=False):
# --------------- Do we need to dedup? ------------------
# note that deduping also ends up creating a sorted CSV
if dedup and dupe_count > 0:
qsv_dedup_csv = tempfile.NamedTemporaryFile(suffix=".csv")
qsv_dedup_csv = os.path.join(temp_dir, 'qsv_dedup.csv')
logger.info("{:.} duplicate rows found. Deduping...".format(dupe_count))
qsv_dedup_cmd = [qsv_bin, "dedup", tmp.name, "--output", qsv_dedup_csv.name]
qsv_dedup_cmd = [qsv_bin, "dedup", tmp, "--output", qsv_dedup_csv]

# if the file is already sorted,
# we can save a lot of time by passing the --sorted flag
Expand Down Expand Up @@ -763,7 +769,7 @@ def push_to_datastore(task_id, input, dry_run=False):
# should we need to change the column name to make it "db-safe"
try:
qsv_headers = subprocess.run(
[qsv_bin, "headers", "--just-names", tmp.name],
[qsv_bin, "headers", "--just-names", tmp],
capture_output=True,
check=True,
text=True,
Expand All @@ -779,14 +785,14 @@ def push_to_datastore(task_id, input, dry_run=False):
# i.e. valid postgres/CKAN Datastore identifiers
unsafe_prefix = config.get("UNSAFE_PREFIX", "unsafe_")
reserved_colnames = config.get("RESERVED_COLNAMES", "_id")
qsv_safenames_csv = tempfile.NamedTemporaryFile(suffix=".csv")
qsv_safenames_csv = os.path.join(temp_dir, 'qsv_safenames.csv')
logger.info('Checking for "database-safe" header names...')
try:
qsv_safenames = subprocess.run(
[
qsv_bin,
"safenames",
tmp.name,
tmp,
"--mode",
"json",
"--prefix",
Expand All @@ -813,15 +819,15 @@ def push_to_datastore(task_id, input, dry_run=False):
[
qsv_bin,
"safenames",
tmp.name,
tmp,
"--mode",
"conditional",
"--prefix",
unsafe_prefix,
"--reserved",
reserved_colnames,
"--output",
qsv_safenames_csv.name,
qsv_safenames_csv,
],
capture_output=True,
text=True,
Expand All @@ -836,8 +842,8 @@ def push_to_datastore(task_id, input, dry_run=False):
# first, index csv for speed - count, stats and slice
# are all accelerated/multithreaded when an index is present
try:
qsv_index_file = tmp.name + ".idx"
subprocess.run([qsv_bin, "index", tmp.name], check=True)
qsv_index_file = tmp + ".idx"
subprocess.run([qsv_bin, "index", tmp], check=True)
except subprocess.CalledProcessError as e:
raise util.JobError("Cannot index CSV: {}".format(e))

Expand All @@ -847,7 +853,7 @@ def push_to_datastore(task_id, input, dry_run=False):
# get record count, this is instantaneous with an index
try:
qsv_count = subprocess.run(
[qsv_bin, "count", tmp.name], capture_output=True, check=True, text=True
[qsv_bin, "count", tmp], capture_output=True, check=True, text=True
)
except subprocess.CalledProcessError as e:
raise util.JobError("Cannot count records in CSV: {}".format(e))
Expand All @@ -871,16 +877,16 @@ def push_to_datastore(task_id, input, dry_run=False):
headers_min = []
headers_max = []
headers_cardinality = []
qsv_stats_csv = tempfile.NamedTemporaryFile(suffix=".csv")
qsv_stats_csv = os.path.join(temp_dir, 'qsv_stats.csv')
qsv_stats_cmd = [
qsv_bin,
"stats",
tmp.name,
tmp,
"--infer-dates",
"--dates-whitelist",
"all",
"--output",
qsv_stats_csv.name,
qsv_stats_csv,
]
prefer_dmy = config.get("PREFER_DMY")
if prefer_dmy:
Expand All @@ -899,7 +905,7 @@ def push_to_datastore(task_id, input, dry_run=False):
"Cannot infer data types and compile statistics: {}".format(e)
)

with open(qsv_stats_csv.name, mode="r") as inp:
with open(qsv_stats_csv, mode="r") as inp:
reader = csv.DictReader(inp)
for row in reader:
headers.append(row["field"])
Expand Down Expand Up @@ -1003,17 +1009,17 @@ def push_to_datastore(task_id, input, dry_run=False):
if preview_rows > 0:
# PREVIEW_ROWS is positive, slice from the beginning
logger.info("Preparing {:,}-row preview...".format(preview_rows))
qsv_slice_csv = tempfile.NamedTemporaryFile(suffix=".csv")
qsv_slice_csv = os.path.join(temp_dir, 'qsv_slice.csv')
try:
subprocess.run(
[
qsv_bin,
"slice",
"--len",
str(preview_rows),
tmp.name,
tmp,
"--output",
qsv_slice_csv.name,
qsv_slice_csv,
],
check=True,
)
Expand All @@ -1027,7 +1033,7 @@ def push_to_datastore(task_id, input, dry_run=False):
# to slice from the end
slice_len = abs(preview_rows)
logger.info("Preparing {:,}-row preview from the end...".format(slice_len))
qsv_slice_csv = tempfile.NamedTemporaryFile(suffix=".csv")
qsv_slice_csv = os.path.join(temp_dir, 'qsv_slice.csv')
try:
subprocess.run(
[
Expand All @@ -1037,9 +1043,9 @@ def push_to_datastore(task_id, input, dry_run=False):
"-1",
"--len",
str(slice_len),
tmp.name,
tmp,
"--output",
qsv_slice_csv.name,
qsv_slice_csv,
],
check=True,
)
Expand All @@ -1054,17 +1060,17 @@ def push_to_datastore(task_id, input, dry_run=False):
# if there are any datetime fields, normalize them to RFC3339 format
# so we can readily insert them as timestamps into postgresql with COPY
if datetimecols_list:
qsv_applydp_csv = tempfile.NamedTemporaryFile(suffix=".csv")
qsv_applydp_csv = os.path.join(temp_dir, 'qsv_applydp.csv')
datecols = ",".join(datetimecols_list)

qsv_applydp_cmd = [
qsv_bin,
"applydp",
"datefmt",
datecols,
tmp.name,
tmp,
"--output",
qsv_applydp_csv.name,
qsv_applydp_csv,
]
if prefer_dmy:
qsv_applydp_cmd.append("--prefer-dmy")
Expand Down Expand Up @@ -1140,7 +1146,7 @@ def push_to_datastore(task_id, input, dry_run=False):
"--ignore-case",
"--quick",
pii_regex_fname,
tmp.name,
tmp,
],
capture_output=True,
text=True,
Expand All @@ -1153,7 +1159,7 @@ def push_to_datastore(task_id, input, dry_run=False):

else:
logger.info("Scanning for PII using {}...".format(pii_regex_file))
qsv_searchset_csv = tempfile.NamedTemporaryFile(suffix=".csv")
qsv_searchset_csv = os.path.join(temp_dir, 'qsv_searchset.csv')
try:
qsv_searchset = subprocess.run(
[
Expand All @@ -1165,9 +1171,9 @@ def push_to_datastore(task_id, input, dry_run=False):
"--flag-matches-only",
"--json",
pii_regex_file,
tmp.name,
tmp,
"--output",
qsv_searchset_csv.name,
qsv_searchset_csv,
],
capture_output=True,
text=True,
Expand Down Expand Up @@ -1242,7 +1248,7 @@ def push_to_datastore(task_id, input, dry_run=False):
qsv_bin,
"stats",
"--typesonly",
qsv_searchset_csv.name,
qsv_searchset_csv,
],
capture_output=True,
check=True,
Expand Down Expand Up @@ -1295,7 +1301,7 @@ def push_to_datastore(task_id, input, dry_run=False):
column_names,
)

with open(qsv_searchset_csv.name, "rb") as f:
with open(qsv_searchset_csv, "rb") as f:
try:
cur_pii.copy_expert(copy_sql, f)
except psycopg2.Error as e:
Expand Down Expand Up @@ -1395,7 +1401,7 @@ def push_to_datastore(task_id, input, dry_run=False):
sql.Identifier(resource_id),
column_names,
)
with open(tmp.name, "rb") as f:
with open(tmp, "rb") as f:
try:
cur.copy_expert(copy_sql, f)
except psycopg2.Error as e:
Expand Down Expand Up @@ -1563,7 +1569,7 @@ def push_to_datastore(task_id, input, dry_run=False):
qsv_bin,
"stats",
"--typesonly",
qsv_stats_csv.name,
qsv_stats_csv,
],
capture_output=True,
check=True,
Expand Down Expand Up @@ -1618,7 +1624,7 @@ def push_to_datastore(task_id, input, dry_run=False):
column_names,
)

with open(qsv_stats_csv.name, "rb") as f:
with open(qsv_stats_csv, "rb") as f:
try:
cur.copy_expert(copy_sql, f)
except psycopg2.Error as e:
Expand Down