Skip to content

Commit

Permalink
increased exception specificity
Browse files Browse the repository at this point in the history
  • Loading branch information
mattpodolak committed Sep 8, 2021
1 parent dd87151 commit f4b9f5a
Show file tree
Hide file tree
Showing 7 changed files with 56 additions and 20 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
- Implemented functional tests
- Reduced `max_ids_per_request` to 500
- Added automated testing
- Increased exception handling specificity

## 1.1.0 (2021/05/27)

Expand Down
9 changes: 3 additions & 6 deletions pmaw/Cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,7 @@ def __init__(self, payload, safe_exit, cache_dir=None):

# create cache folder
self.folder = str(cache_dir) if cache_dir else "./cache"
try:
Path(self.folder).mkdir(exist_ok=True, parents=True)
except Exception as exc:
log.debug(f'Folder creation failed - {exc}')
Path(self.folder).mkdir(exist_ok=True, parents=True)

self.response_cache = []
self.size = 0
Expand All @@ -51,15 +48,15 @@ def load_info(self):
try:
with gzip.open(f'{self.folder}/{self.key}_info.pickle.gz', 'rb') as handle:
return pickle.load(handle)
except Exception:
except FileNotFoundError:
log.info('No previous requests to load')

def load_resp(self, cache_num):
filename = self.response_cache[cache_num]
try:
with gzip.open(f'{self.folder}/{filename}', 'rb') as handle:
return pickle.load(handle)
except Exception as exc:
except FileNotFoundError as exc:
warnings.warn(f'Failed to load responses from {filename} - {exc}')

def save_info(self, **kwargs):
Expand Down
12 changes: 7 additions & 5 deletions pmaw/PushshiftAPI.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ def search_submission_comment_ids(self, ids, **kwargs):
Response generator object
"""
kwargs['ids'] = ids
return self._search(kind='submission_comment_ids', **kwargs)
return self._search(filter_fn=None, kind='submission_comment_ids', **kwargs)

def search_comments(self, **kwargs):
def search_comments(self, filter_fn=None, **kwargs):
"""
Method for searching comments, returns an array of comments
Expand All @@ -46,12 +46,13 @@ def search_comments(self, **kwargs):
mem_safe (boolean, optional) - If True, stores responses in cache during operation, defaults to False
search_window (int, optional) - Size in days for search window for submissions / comments in non-id based search, defaults to 365
safe_exit (boolean, optional) - If True, will safely exit if interrupted by storing current responses and requests in the cache. Will also load previous requests / responses if found in cache, defaults to False
filter_fn (function, optional) - Function that filters results before saving them, accept a comment parameter, return False to filter the item, otherwise return True
Output:
Response generator object
"""
return self._search(kind='comment', **kwargs)
return self._search(filter_fn, kind='comment', **kwargs)

def search_submissions(self, **kwargs):
def search_submissions(self, filter_fn=None, **kwargs):
"""
Method for searching submissions, returns an array of submissions
Expand All @@ -61,7 +62,8 @@ def search_submissions(self, **kwargs):
mem_safe (boolean, optional) - If True, stores responses in cache during operation, defaults to False
search_window (int, optional) - Size in days for search window for submissions / comments in non-id based search, defaults to 365
safe_exit (boolean, optional) - If True, will safely exit if interrupted by storing current responses and requests in the cache. Will also load previous requests / responses if found in cache, defaults to False
filter_fn (function, optional) - Function that filters results before saving them, accept a submission parameter, return False to filter the item, otherwise return True
Output:
Response generator object
"""
return self._search(kind='submission', **kwargs)
return self._search(filter_fn, kind='submission', **kwargs)
16 changes: 11 additions & 5 deletions pmaw/PushshiftAPIBase.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import time
import requests
from requests import HTTPError
import json
from concurrent.futures import ThreadPoolExecutor, as_completed
import copy
Expand Down Expand Up @@ -70,7 +70,7 @@ def _get(self, url, payload={}):

return r['data']
else:
raise Exception(f"HTTP {status} - {reason}")
raise HTTPError(f"HTTP {status} - {reason}")

@property
def shards_are_down(self):
Expand Down Expand Up @@ -170,16 +170,17 @@ def _futures_handler(self, futures, check_total):
# generate payloads
self.req.gen_slices(
url, payload, after, before, num)
except Exception as exc:
except HTTPError as exc:
log.debug(f"Request Failed -- {exc}")
self._rate_limit._req_fail()
self.req.req_list.appendleft(url_pay)

def _shutdown(self, exc, wait=False, cancel_futures=True):
# shutdown executor
try:
# pass cancel_futures keywords avail in python 3.9
exc.shutdown(wait=wait, cancel_futures=cancel_futures)
except Exception:
except TypeError:
# TODO: manually cancel pending futures
exc.shutdown(wait=wait)

Expand All @@ -194,13 +195,18 @@ def _print_stats(self, prefix):
remaining = 0 # don't print a neg number
print(
f'{prefix}:: Success Rate: {rate:.2f}% - Requests: {self.num_req} - Batches: {self.num_batches} - Items Remaining: {remaining}')
if(self.req.praw and len(self.req.enrich_list) > 0):
# let the user know praw enrichment is still in progress so it doesnt appear to hang after
# finishing retrieval from Pushshift
print(f'Finishing enrichment for {len(self.req.enrich_list)} items')

def _reset(self):
self.num_suc = 0
self.num_req = 0
self.num_batches = 0

def _search(self,
filter_fn,
kind,
max_ids_per_request=500,
max_results_per_request=100,
Expand All @@ -218,7 +224,7 @@ def _search(self,

self.metadata_ = {}
self.resp_dict = {}
self.req = Request(copy.deepcopy(kwargs), kind,
self.req = Request(copy.deepcopy(kwargs), filter_fn, kind,
max_results_per_request, max_ids_per_request, mem_safe, safe_exit, cache_dir, self.praw)

# reset stat tracking
Expand Down
2 changes: 1 addition & 1 deletion pmaw/RateLimit.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ def _average(self):

try:
self.cache.remove(first_req)
except Exception:
except ValueError:
log.debug(f'{first_req} has already been removed RL cache')

num_req = len(self.cache)
Expand Down
24 changes: 21 additions & 3 deletions pmaw/Request.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,11 @@
import signal
import time

from praw.exceptions import RedditAPIException

from pmaw.Cache import Cache
from pmaw.utils.slices import timeslice, mapslice
from pmaw.utils.filter import apply_filter
from pmaw.Response import Response


Expand All @@ -18,7 +21,7 @@
class Request(object):
"""Request: Handles request information, response saving, and cache usage."""

def __init__(self, payload, kind, max_results_per_request, max_ids_per_request, mem_safe, safe_exit, cache_dir=None, praw=None):
def __init__(self, payload, filter_fn, kind, max_results_per_request, max_ids_per_request, mem_safe, safe_exit, cache_dir=None, praw=None):
self.kind = kind
self.max_ids_per_request = min(500, max_ids_per_request)
self.max_results_per_request = min(100, max_results_per_request)
Expand All @@ -29,6 +32,15 @@ def __init__(self, payload, kind, max_results_per_request, max_ids_per_request,
self.limit = payload.get('limit', None)
self.exit = Event()
self.praw = praw
self._filter = filter_fn

if safe_exit and self.payload.get('before', None) is None:
# warn the user not to use safe_exit without setting before,
# doing otherwise will make it impossible to resume without modifying
# future query to use before value from first run
before = int(dt.datetime.now().timestamp())
payload['before'] = before
warnings.warn(f'Using safe_exit without setting before value is not recommended. Setting before to {before}')

if self.praw is not None:
if safe_exit:
Expand Down Expand Up @@ -72,7 +84,7 @@ def check_sigs(self):
try:
getattr(signal, 'SIGHUP')
sigs = ('TERM', 'HUP', 'INT')
except Exception:
except AttributeError:
sigs = ('TERM', 'INT')

for sig in sigs:
Expand All @@ -97,7 +109,7 @@ def _enrich_data(self):
praw_data = [vars(obj) for obj in resp_gen]
self.resp.responses.extend(praw_data)

except Exception as exc:
except RedditAPIException:
self.enrich_list.extend(fullnames)

def _idle_task(self, interval):
Expand Down Expand Up @@ -140,10 +152,16 @@ def _exit(self, signo, _frame):
self.exit.set()

def save_resp(self, results):
# dont filter results before updating limit: limit is the max number of results
# extracted from Pushshift, filtering can reduce the results < limit
if self.kind == 'submission_comment_ids':
self.limit -= 1
else:
self.limit -= len(results)

# apply user defined filter function before storing
if(self._filter is not None):
results = apply_filter(results, self._filter)

if self.praw:
# save fullnames of objects to be enriched with metadata by PRAW
Expand Down
12 changes: 12 additions & 0 deletions pmaw/utils/filter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
def apply_filter(array, filter_fn):
filtered_array = []
for item in array:
try:
if(filter_fn(item)):
filtered_array.append(item)
except TypeError as exc:
raise Exception('An error occured while filtering:\n', exc)
except KeyError as exc:
raise Exception(f'The {exc} key does not exist for the item you are filtering')

return filtered_array

0 comments on commit f4b9f5a

Please sign in to comment.