Skip to content

Commit

Permalink
1. more careful treatment of database exceptions, including logging
Browse files Browse the repository at this point in the history
2. take more time to retry mark_published_synda() upon failures (normally
"database is locked" exceptions)
  • Loading branch information
painter1 committed Mar 29, 2021
1 parent c70365b commit 11a9a09
Showing 1 changed file with 126 additions and 57 deletions.
183 changes: 126 additions & 57 deletions mark_published.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from pprint import pprint
import sqlite3, debug
import logging
global conn, curs, dryrun
global conn, dryrun
from retrying import retry
import pdb

Expand All @@ -23,7 +23,7 @@

def setup(db):
"""Initializes the connection to the database, etc."""
global conn, curs
global conn
# normal:
conn = sqlite3.connect(db) # typical db: '/var/lib/synda/sdt/sdt.db'
# or test db: '/home/painter/db/sdt.db'
Expand All @@ -32,7 +32,7 @@ def setup(db):

def finish():
"""Closes connections to databases, etc."""
global conn, curs
global conn
conn.commit()
conn.close()

Expand Down Expand Up @@ -68,41 +68,62 @@ def next_suffix():
# 2^x * 1000 milliseconds between retries, a maximum of 120 seconds (2 minutes), and gives up
# after 900 seconds (15 minutes)
#@retry(wait_exponential_multiplier=1000, wait_exponential_max=120000, stop_max_delay=900000)
# 2021.02.19: give up more easily. This will spread the work over more days, so there will
# be fewer failures of other Synda jobs on days when this one runs: max wait between retries
# is now 1 minute, and give up entirely after 3 minutes.
@retry(wait_exponential_multiplier=1000, wait_exponential_max=60000, stop_max_delay=180000)
# 2021.03.29: Wait longer. The 2021.02.29 retry profile was failing every day.
# Double the time between retries, maximum of 10 minutes, give up after 1 hour.
@retry(wait_exponential_multiplier=1000, wait_exponential_max=600000, stop_max_delay=3600000)
def mark_published_synda( dataset_functional_id, filenotfound ):
"""The specified dataset has its status changed to 'published' if its status is 'complete'
or 'staged'. Its files become 'published' if they are 'done' or 'staged'. If a file is
not found in the file system, then a message will be written to the supplied open text
file, filenotfound.
"""
logging.info( "entering mark_published_synda for dataset %s" %dataset_functional_id )
if dataset_functional_id is None:
return
num_found, latest_version = dataset_published( dataset_functional_id, filenotfound )
try:
num_found, latest_version = dataset_published( dataset_functional_id, filenotfound )
except Exception as e:
logging.error( "mark_published_synda caught an exception from dataset_published: %s" % e )
raise(e)
if num_found==0:
return

cmd = "SELECT dataset_id FROM dataset WHERE dataset_functional_id='%s'" % dataset_functional_id
curs = conn.cursor()
curs.execute(cmd)
results = curs.fetchall()
try:
cmd = "SELECT dataset_id FROM dataset WHERE dataset_functional_id='%s'" % dataset_functional_id
curs = conn.cursor()
curs.execute(cmd)
results = curs.fetchall()
except Exception as e:
logging.error( "Exception in mark_published_synda 1: %s" % e )
raise(e)
finally:
curs.close()

if len(results)!=1:
logging.debug( "dataset %s found %s times"%(dataset_functional_id,len(results)) )
curs.close()
return
dataset_id = results[0][0]

cmd = "SELECT file_functional_id FROM file WHERE dataset_id=%s" % dataset_id
curs.execute(cmd)
results = curs.fetchall()
curs.close()
try:
cmd = "SELECT file_functional_id FROM file WHERE dataset_id=%s" % dataset_id
curs = conn.cursor()
curs.execute(cmd)
results = curs.fetchall()
except Exception as e:
logging.error( "Exception in mark_published_synda 2: %s" % e )
raise(e)
finally:
curs.close()

all_successful = True
for file_functional_id_tuple in results:
this_successful = file_published( file_functional_id_tuple[0], filenotfound, latest_version )
all_successful = all_successful and this_successful
try:
for file_functional_id_tuple in results:
this_successful = file_published( file_functional_id_tuple[0], filenotfound, latest_version )
all_successful = all_successful and this_successful
except Exception as e:
logging.error( "mark_published_synda caught an exception from file_published: %s" % e )
raise(e)

if not all_successful:
# Print a blank line so that the files in filenotfound will be visibly grouped by dataset.
filenotfound.write(dataset_functional_id+"\n")
Expand All @@ -112,19 +133,26 @@ def mark_published_synda( dataset_functional_id, filenotfound ):

def dataset_published( dataset_functional_id, filenotfound ):
"""the dataset portion of mark_published_synda()"""
global conn, curs, dryrun
global conn, dryrun
global datasets_already_published, datasets_not_published_not_marked, datasets_marked_published
cmd = "SELECT status FROM dataset WHERE dataset_functional_id='%s'" % dataset_functional_id
curs = conn.cursor()
curs.execute( cmd )
results = curs.fetchall()
curs.close()

try:
cmd = "SELECT status FROM dataset WHERE dataset_functional_id='%s'" % dataset_functional_id
curs = conn.cursor()
curs.execute( cmd )
results = curs.fetchall()
except Exception as e:
logging.debug( "Exception in dataset_published 1: %s" %e )
raise e
finally:
curs.close()

if len(results)==0:
logging.debug( "no dataset found matching %s" % dataset_functional_id )
logging.info( "no dataset found matching %s" % dataset_functional_id )
return 0, None
elif len(results)>1:
msg = "%s datasets found matching %s" % (len(results),dataset_functional_id )
logging.error( "Exception: "+msg )
logging.error( "Exception generated in dataset_published due to: "+msg )
raise Exception( msg )

status = results[0][0]
Expand All @@ -142,26 +170,45 @@ def dataset_published( dataset_functional_id, filenotfound ):
if dryrun:
logging.info( " changing %s\t from '%s' to 'published'" % (dataset_functional_id,status) )
else:
cmd = "UPDATE dataset SET status='published' WHERE dataset_functional_id='%s'" %\
dataset_functional_id
curs = conn.cursor()
curs.execute( cmd )
conn.commit()
curs.close()
try:
cmd = "UPDATE dataset SET status='published' WHERE dataset_functional_id='%s'" %\
dataset_functional_id
curs = conn.cursor()
curs.execute( cmd )
conn.commit()
except Exception as e:
logging.debug( "Exception in dataset_published 2: %s" %e )
raise e
finally:
curs.close()

# For files_published(), we'll need to know whether this is the latest version.
cmd = "SELECT path_without_version,version FROM dataset WHERE dataset_functional_id='%s'" %\
dataset_functional_id
curs = conn.cursor()
curs.execute( cmd )
results = curs.fetchall()
try:
cmd = "SELECT path_without_version,version FROM dataset WHERE dataset_functional_id='%s'" %\
dataset_functional_id
curs = conn.cursor()
curs.execute( cmd )
results = curs.fetchall()
except Exception as e:
logging.debug( "Exception in dataset_published 3: %s" %e )
raise e
finally:
curs.close()

path_without_version = results[0][0]
version = results[0][1]
cmd = "SELECT version FROM dataset WHERE path_without_version='%s'" %\
path_without_version
curs.execute( cmd )
results = curs.fetchall()
curs.close()
try:
cmd = "SELECT version FROM dataset WHERE path_without_version='%s'" %\
path_without_version
curs = conn.cursor()
curs.execute( cmd )
results = curs.fetchall()
except Exception as e:
logging.debug( "Exception in dataset_published 4: %s" %e )
raise e
finally:
curs.close()

versions = [r[0] for r in results]
if version==max(versions):
latest_version = True
Expand All @@ -181,19 +228,25 @@ def file_published( file_functional_id, filenotfound, latest_version ):
Usually returns True if the file was changed to 'published' status or didn't need to be changed.
Returns False if there was a problem, e.g. file doesn't exist where expected.
"""
global conn, curs, dryrun
global conn, dryrun

cmd = "SELECT status,local_path FROM file WHERE file_functional_id='%s'" % file_functional_id
curs = conn.cursor()
curs.execute( cmd )
results = curs.fetchall()
curs.close()
try:
curs = conn.cursor()
curs.execute( cmd )
results = curs.fetchall()
except Exception as e:
logging.debug( "Exception in file_published 1: %s" %e )
raise e
finally:
curs.close()

if len(results)==0:
filenotfound.write( " no file in database matching %s" % file_functional_id )
return False
elif len(results)>1:
msg = "%s files in database matching %s" % (len(results),file_functional_id )
logging.error( "Exception: "+msg )
logging.error( "Exception generated in file_published due to: "+msg )
raise Exception(msg)

status = results[0][0]
Expand Down Expand Up @@ -230,10 +283,15 @@ def file_published( file_functional_id, filenotfound, latest_version ):
if not dryrun:
cmd = "UPDATE file SET status='published' WHERE file_functional_id='%s'" %\
file_functional_id
curs = conn.cursor()
curs.execute( cmd )
conn.commit()
curs.close()
try:
curs = conn.cursor()
curs.execute( cmd )
conn.commit()
except Exception as e:
logging.debug( "Exception in file_published 2: %s" %e )
raise e
finally:
curs.close()
#print "new status for %s is %s" % (file_functional_id,'published')
return True

Expand Down Expand Up @@ -306,9 +364,13 @@ def mark_published_all( listing_file, db, filenotfound_nom="files_not_found.txt"
if functional_id is None or functional_id.find('mapfile_run_')==0:
# e.g. mapfile_run_1554479110.txt; it's just a mapfile run log
continue
mark_published_synda( functional_id, filenotfound )
try:
mark_published_synda( functional_id, filenotfound )
except Exception as e:
logging.error( "mark_published_all caught an exception from mark_published_synda: %s" %e )
raise(e)
except Exception as e:
logging.error( "Exception: %s" % e )
logging.error( "Exception caught in mark_published_all: %s" % e )
raise(e)
finally:
writeme = "number of datasets already published = %s" % datasets_already_published
Expand All @@ -327,6 +389,7 @@ def mark_published_all( listing_file, db, filenotfound_nom="files_not_found.txt"

if __name__ == '__main__':
logfile = '/p/css03/scratch/logs/mark_published.log'
# Set level to logging.DEBUG to get logs of all database exceptions.
logging.basicConfig( filename=logfile, level=logging.INFO, format='%(asctime)s %(message)s' )
logging.info( "starting" )

Expand Down Expand Up @@ -372,7 +435,13 @@ def mark_published_all( listing_file, db, filenotfound_nom="files_not_found.txt"
if files_not_found is None:
files_not_found = "/p/css03/scratch/logs/files_not_found_"+suffix

mark_published_all( published_datasets, args.database, files_not_found )
try:
mark_published_all( published_datasets, args.database, files_not_found )
except:
# The exception should have been logged already; quit. The file
# last_suffix_f won't be updated, so the same mapfiles will be retried
# the next time this script is run.
sys.exit(1)

if args.suffix is None:
last_suffix_f = '/p/css03/scratch/publishing/CMIP6_last_suffix'
Expand Down

0 comments on commit 11a9a09

Please sign in to comment.