Skip to content

Commit

Permalink
Cleaner output
Browse files Browse the repository at this point in the history
  • Loading branch information
svebk committed Jan 27, 2020
1 parent d39f6e7 commit 54929b3
Show file tree
Hide file tree
Showing 17 changed files with 157 additions and 39 deletions.
2 changes: 1 addition & 1 deletion cufacesearch/cufacesearch/extractor/generic_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def run(self):
batch = self.q_in.get(timeout=self.qin_timeout)
except Exception:
# This may appear in the log when the following update is being processed.
if self.verbose > 5:
if self.verbose > 6:
print "[{}] Did not get a batch. Leaving".format(self.pp)
sys.stdout.flush()
empty = True
Expand Down
38 changes: 24 additions & 14 deletions cufacesearch/cufacesearch/ingester/kinesis_ingester.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ def __init__(self, global_conf, prefix="", pid=None):
# When running as deamon, save process id
self.pid = pid
self.verbose = 1
self.sleep_count = 0

super(KinesisIngester, self).__init__(global_conf, prefix)

Expand Down Expand Up @@ -196,7 +197,6 @@ def get_msg_json(self):
sleep_time = self.get_required_param('sleep_time')
sifn = self.get_shard_infos_filename()
nb_shards = len(self.shard_iters)
sleep_count = 0

# Invalidate any previous shard iterator first...
for sh_num in range(nb_shards):
Expand Down Expand Up @@ -250,7 +250,7 @@ def get_msg_json(self):
self.shard_iters[sh_id] = None
else:
# Update iterator. Is this working?
if self.verbose > 4:
if self.verbose > 5:
msg = "[{}: log] Found valid next shard iterator {} for shard {}"
print(msg.format(self.pp, sh_it, sh_id))
self.shard_iters[sh_id] = sh_it
Expand All @@ -266,7 +266,7 @@ def get_msg_json(self):
self.shard_iters[sh_id] = None

records = rec_response['Records']

read_records = 0
if len(records) > 0:
if self.verbose > 3:
# msg = "[{}: log] Found message at SequenceNumber {} in shard {}: {}"
Expand All @@ -277,7 +277,6 @@ def get_msg_json(self):
lag_ms = rec_response['MillisBehindLatest']
msg = "[{}: log] Lagging by {:.3f}s"
print(msg.format(self.pp, lag_ms/1000.0))
sleep_count = 0
for rec in records:
sqn = str(rec['SequenceNumber'].decode("utf-8"))
# This could throw a JSONDecodeError (subclass of ValueError): No JSON object could be decoded
Expand All @@ -293,7 +292,7 @@ def get_msg_json(self):
msg = "[{}: WARNING] Could not parse record at SequenceNumber {}. Record has no 'Data' field"
print(msg.format(self.pp, sqn))
continue
if self.verbose > 5:
if self.verbose > 6:
#msg = "[{}: log] Found message at SequenceNumber {} in shard {}: {}"
#print(msg.format(self.pp, sqn, sh_id, rec_json))
rec_ts = rec['ApproximateArrivalTimestamp']
Expand All @@ -304,24 +303,34 @@ def get_msg_json(self):
# Maybe number of records read for sanity check
# Start read time too?
if sh_id in self.shard_infos:
self.shard_infos[sh_id]['sqn'] = sqn
self.shard_infos[sh_id]['nb_read'] += 1
if sqn != self.shard_infos[sh_id]['sqn']:
self.shard_infos[sh_id]['sqn'] = sqn
self.shard_infos[sh_id]['nb_read'] += 1
read_records += 1
else:
# Skip already read record
if self.verbose > 5:
msg = "[{}: log] Skipping one already processed record"
print(msg.format(self.pp))
continue
else:
self.shard_infos[sh_id] = dict()
self.shard_infos[sh_id]['sqn'] = sqn
self.shard_infos[sh_id]['start_read'] = datetime.now().isoformat()
self.shard_infos[sh_id]['nb_read'] = 1
read_records += 1

self.sleep_count = 0
yield rec_json

if self.verbose > 5:
if self.verbose > 5 and read_records > 0:
msg = "[{}: log] Finished looping on {} records"
print(msg.format(self.pp,len(records)))
print(msg.format(self.pp, read_records))

#if self.shard_iters[sh_id] is None:
else:
empty += 1
if self.verbose > 3:
if self.verbose > 4:
msg = "[{}: log] Shard {} seems empty"
print(msg.format(self.pp, sh_id))

Expand All @@ -331,19 +340,20 @@ def get_msg_json(self):
# self.shard_iters[sh_id] = None

# Dump current self.shard_infos
if self.verbose > 1:
if self.verbose > 4:
msg = "[{}: log] shard_infos: {}"
print(msg.format(self.pp, self.shard_infos))
with open(sifn, 'w') as sif:
json.dump(self.shard_infos, sif)

# Sleep?
if empty == len(self.shard_iters):
if self.verbose > 1:
if self.verbose > 2:
msg = "[{}: log] All shards seem empty or fully processed."
print(msg.format(self.pp))
time.sleep(min(5*sleep_count + 1, sleep_time))
sleep_count += 1
time.sleep(min(5*self.sleep_count + 1, sleep_time))
self.sleep_count += 1
break
else:
time.sleep(1)
except Exception as inst:
Expand Down
1 change: 1 addition & 0 deletions cufacesearch/cufacesearch/pusher/kinesis_pusher.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ def init_client(self):
aws_profile = self.get_param('aws_profile', None)
# This is mostly to be able to test locally
endpoint_url = self.get_param('endpoint_url', None)
# This trigger some error
verify = self.get_param('verify_certificates', True)
use_ssl = self.get_param('use_ssl', True)

Expand Down
8 changes: 7 additions & 1 deletion cufacesearch/cufacesearch/pusher/local_images_pusher.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ def toc_process_failed(self, start_process, end_time=None):
def print_push_stats(self):
# How come self.process_time is negative?
avg_process_time = self.process_time / max(1, self.process_count + self.process_failed)
msg = "[{}: log] push count: {}, failed: {}, avg. time: {}"
msg = "[{}: log] Push count: {}, failed: {}, avg. time: {}"
print(msg.format(self.pp, self.process_count, self.process_failed, avg_process_time))

def process(self):
Expand All @@ -162,8 +162,14 @@ def process(self):
print(msg.format(self.pp, img_path))
try:
img_buffer = get_buffer_from_filepath(img_path)
if self.verbose > 6:
msg = "[{}.process: log] Got image buffer from: {}"
print(msg.format(self.pp, img_path))
if img_buffer:
sha1, img_type, width, height = get_SHA1_img_info_from_buffer(img_buffer)
if self.verbose > 5:
msg = "[{}.process: log] Got image info from: {}"
print(msg.format(self.pp, img_path))
dict_imgs[img_path] = {'img_buffer': img_buffer, 'sha1': sha1,
'img_info': {'format': img_type, 'width': width, 'height': height}}
self.toc_process_ok(start_process, end_time=time.time())
Expand Down
12 changes: 6 additions & 6 deletions cufacesearch/cufacesearch/searcher/searcher_lopqhbase.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,8 +215,8 @@ def init_searcher(self):
raise ValueError("Unknown 'lopq_searcher' type: {}".format(self.lopq_searcher))
# NB: an empty lopq_model would make sense only if we just want to detect...

def get_feats_from_lmbd(self, feats_db, nb_features, dtype):
"""Get features from LMBD database
def get_feats_from_lmdb(self, feats_db, nb_features, dtype):
"""Get features from LMDB database
:param feats_db: features database
:type feats_db: str
Expand All @@ -237,15 +237,15 @@ def get_feats_from_lmbd(self, feats_db, nb_features, dtype):
first_item = cursor.item()
first_feat = np.frombuffer(first_item[1], dtype=dtype)
feats = np.zeros((nb_feats_to_read, first_feat.shape[0]))
print("[get_feats_from_lmbd] Filling up features matrix: {}".format(feats.shape))
print("[get_feats_from_lmdb] Filling up features matrix: {}".format(feats.shape))
sys.stdout.flush()
for i, item in enumerate(cursor.iternext()):
if i >= nb_feats_to_read:
break
feats[i, :] = np.frombuffer(item[1], dtype=dtype)
return feats

def save_feats_to_lmbd(self, feats_db, samples_ids, np_features, max_feats=0):
def save_feats_to_lmdb(self, feats_db, samples_ids, np_features, max_feats=0):
"""Save features to LMDB database
:param feats_db: features database name
Expand Down Expand Up @@ -338,7 +338,7 @@ def get_train_features(self, nb_features, lopq_pca_model=None, nb_min_train=None
sys.stdout.flush()
# just appending like this does not account for duplicates...
# train_features.extend(np_features)
nb_saved_feats = self.save_feats_to_lmbd(feats_db, sids, np_features)
nb_saved_feats = self.save_feats_to_lmdb(feats_db, sids, np_features)
seen_updates.add(update_id)
else:
if self.verbose > 3:
Expand Down Expand Up @@ -385,7 +385,7 @@ def get_train_features(self, nb_features, lopq_pca_model=None, nb_min_train=None
sys.stdout.flush()
break

return self.get_feats_from_lmbd(feats_db, nb_features_to_read, dtype)
return self.get_feats_from_lmdb(feats_db, nb_features_to_read, dtype)

def train_index(self):
"""Train search index
Expand Down
50 changes: 38 additions & 12 deletions cufacesearch/cufacesearch/updater/extraction_checker.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
# Should we add a cufacesearch.common.defautls?
DEFAULT_UPDATE_INGESTION_TYPE = "hbase"
DEFAULT_MIN_LENGTH_CHECK = 100
#DEFAULT_MAX_DELAY = 3600
DEFAULT_MAX_DELAY = 600

# Simulates the way updates were generated from the spark workflows but reading from a kafka topic
# Should be run as a single process to ensure data integrity,
Expand Down Expand Up @@ -55,7 +57,7 @@ def __init__(self, global_conf, prefix=DEFAULT_EXTR_CHECK_PREFIX, pid=None):
self.input_type = self.get_required_param("input_type")

# Max delay
self.max_delay = int(self.get_param("max_delay", default=3600))
self.max_delay = int(self.get_param("max_delay", default=DEFAULT_MAX_DELAY))
self.min_len_check = int(self.get_param("min_len_check", default=DEFAULT_MIN_LENGTH_CHECK))

self.list_extr_prefix = [self.featurizer_type, "feat", self.detector_type, self.input_type]
Expand Down Expand Up @@ -269,10 +271,14 @@ def run(self, daemon=False):

try:
list_sha1s_to_process = []
list_check_sha1s = []
# TODO: create update_id here

if self.verbose > 1:
msg = "[{}: log] Start run main loop"
msg.format(self.pp)

while True:
list_check_sha1s = []

try:
# Accumulate images infos
Expand Down Expand Up @@ -321,25 +327,44 @@ def run(self, daemon=False):
print(msg)
sys.stdout.flush()

if self.verbose > 3:
msg = "[{}: log] Gathered {} images to check so far"
msg = msg.format(self.pp, len(list_check_sha1s))
msg2 = ""
if len(list_check_sha1s) > 0:
msg2 = " (first: {}, last: {})"
msg2 = msg2.format(list_check_sha1s[0], list_check_sha1s[-1])
print(msg+msg2)

# To be able to push one (non empty) update every max_delay
if not list_check_sha1s and (time.time() - self.last_push) < self.max_delay:
#if not list_check_sha1s and (time.time() - self.last_push) < self.max_delay:
if len(list_check_sha1s) < self.indexer.batch_update_size and (time.time() - self.last_push) < self.max_delay:
time.sleep(1)
continue

self.nb_imgs_check += len(list_check_sha1s)
push_delay = (time.time() - self.last_push) > max(int(self.max_delay / 60), 10)
if push_delay and self.nb_imgs_unproc_lastprint != self.nb_imgs_unproc:
msg = "[{}: log] Pushed {} unprocessed images so far"
print(msg.format(self.pp, self.nb_imgs_unproc, self.nb_imgs_check))
self.nb_imgs_unproc_lastprint = self.nb_imgs_unproc

if list_check_sha1s:
# Check which images have not been processed (or pushed in an update) yet
# This seems slow
start_check = time.time()
unprocessed_rows = self.get_unprocessed_rows(list_check_sha1s)
msg = "[{}: log] Found {}/{} unprocessed images in {:.2f}s"
print(msg.format(self.pp, len(unprocessed_rows), len(list_check_sha1s), time.time() - start_check))
if len(unprocessed_rows) != len(list_check_sha1s) and self.verbose > 5:
already_processed = list(set(list_check_sha1s) - set(unprocessed_rows))
msg = "[{}: log] Images ".format(self.pp)
for ap in already_processed:
msg += "{} ".format(ap)
msg += "were already processed."
print(msg)

#unprocessed_rows = self.get_unprocessed_rows(list_check_sha1s)
self.nb_imgs_check += len(list_check_sha1s)
push_delay = (time.time() - self.last_push) > self.max_delay / 60
if push_delay and self.nb_imgs_unproc_lastprint != self.nb_imgs_unproc:
msg = "[{}: log] Found {}/{} unprocessed images"
print(msg.format(self.pp, self.nb_imgs_unproc, self.nb_imgs_check))
self.nb_imgs_unproc_lastprint = self.nb_imgs_unproc

# TODO: we should mark those images as being 'owned' by the update we are constructing
# (only important if we are running multiple threads i.e. daemon is True)
Expand All @@ -352,6 +377,7 @@ def run(self, daemon=False):

# Remove potential duplicates
list_sha1s_to_process = list(set(list_sha1s_to_process))
list_check_sha1s = []

if list_sha1s_to_process:
# Push them to HBase by batch of 'batch_update_size'
Expand All @@ -375,7 +401,7 @@ def run(self, daemon=False):

# Push images
fam = self.indexer.get_dictcf_sha1_table()
if self.verbose > 4:
if self.verbose > 5:
msg = "[{}] Pushing images for update {} with fam {}"
print(msg.format(self.pp, update_id, fam))
sha1s_table = self.indexer.table_sha1infos_name
Expand All @@ -389,7 +415,7 @@ def run(self, daemon=False):
self.indexer.get_col_upcreate(): now_str}
# Push it
fam = self.indexer.get_dictcf_update_table()
if self.verbose > 4:
if self.verbose > 5:
msg = "[{}] Pushing update {} info with fam {}"
print(msg.format(self.pp, update_id, fam))
self.indexer.push_dict_rows(dict_updates_db, self.indexer.table_updateinfos_name,
Expand Down Expand Up @@ -422,7 +448,7 @@ def run(self, daemon=False):
# sanity check that len(list_sha1s_to_process) == len(self.dict_sha1_infos) ?

else:
if self.verbose > 4:
if self.verbose > 3:
msg = "[{}: at {}] Gathered {} images so far..."
now_str = datetime.now().strftime('%Y-%m-%d:%H.%M.%S')
print(msg.format(self.pp, now_str, len(list_sha1s_to_process)))
Expand Down
3 changes: 3 additions & 0 deletions scripts/run_images_pusher.sh
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,6 @@ fi
#log="./logs/log_image_ingestion_"${conf_name}
#bash ./scripts/keep_alive_process.sh --cmd="${cmd}" --args="${args}" --log="${log}"
bash ./scripts/keep_alive_process.sh --cmd="${cmd}" --args="${args}"

echo "Push process failed. Restarting docker container..."
exit 1
2 changes: 1 addition & 1 deletion scripts/run_processing.sh
Original file line number Diff line number Diff line change
Expand Up @@ -51,5 +51,5 @@ args=" -c ./conf/generated/conf_extraction_"${conf_name}".json"
#bash ./scripts/keep_alive_process.sh --cmd="${cmd}" --args="${args}" --log="${log}"
bash ./scripts/keep_alive_process.sh --cmd="${cmd}" --args="${args}"

echo "Extraction processor failed. Restarting docker container..."
echo "Extraction processor failed. Restarting docker container..."
exit 1
2 changes: 2 additions & 0 deletions scripts/run_search.sh
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,5 @@ args=" -c ./conf/generated/conf_search_"${conf_name}".json -e "${endpoint}
#bash ./scripts/keep_alive_process.sh --cmd="${cmd}" --args="${args}" --log="${log}"
bash ./scripts/keep_alive_process.sh --cmd="${cmd}" --args="${args}"

echo "Search process failed. Restarting docker container..."
exit 1
3 changes: 3 additions & 0 deletions setup/ConfGenerator/create_conf_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
DEFAULT_POOL_THREADS=1
# KINESIS
DEFAULT_KINESIS_VERIFY_CERTIFICATES = 1
DEFAULT_KINESIS_USE_SSL = 1
DEFAULT_KINESIS_LIMIT_GET_REC = 100
DEFAULT_KINESIS_SLEEP_TIME = 60
DEFAULT_KINESIS_SHARD_ITERATOR_TYPE = "TRIM_HORIZON"
Expand Down Expand Up @@ -277,6 +278,7 @@
# Kinesis
verify_certificates = bool(int(os.getenv('verify_certificates',
DEFAULT_KINESIS_VERIFY_CERTIFICATES)))
use_ssl = bool(int(os.getenv('use_ssl', DEFAULT_KINESIS_USE_SSL)))
sleep_time = int(os.getenv('sleep_time', DEFAULT_KINESIS_SLEEP_TIME))
lim_get_rec = int(os.getenv('lim_get_rec', DEFAULT_KINESIS_LIMIT_GET_REC))
create_stream = bool(int(os.getenv('create_stream', DEFAULT_KINESIS_CREATE_STREAM)))
Expand All @@ -291,6 +293,7 @@
conf[check_ingester_prefix + 'sleep_time'] = sleep_time
conf[check_ingester_prefix + 'lim_get_rec'] = lim_get_rec
conf[check_ingester_prefix + 'verify_certificates'] = verify_certificates
conf[check_ingester_prefix + 'use_ssl'] = use_ssl
conf[check_ingester_prefix + 'shard_iterator_type'] = shard_iterator_type
conf[check_ingester_prefix + 'shard_infos_filename'] = os.getenv('image_shard_infos_filename')
# NB: does not make sense to create input stream
Expand Down
1 change: 1 addition & 0 deletions setup/ConfGenerator/create_conf_ingester.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@
conf[image_pusher_prefix + 'aws_profile'] = os.getenv('aws_profile')
conf[image_pusher_prefix + 'create_stream'] = bool(int(os.getenv('create_stream', 0)))
conf[image_pusher_prefix + 'verify_certificates'] = bool(int(os.getenv('verify_certificates', 1)))
conf[image_pusher_prefix + 'use_ssl'] = bool(int(os.getenv('use_ssl', 1)))

# - images_stream
# - aws_profile
Expand Down
1 change: 1 addition & 0 deletions setup/all-in-one/.env.catlech101local.kinesis
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ image_ingestion_type=kinesis
region_name=us-east-1
endpoint_url=https://kinesis:4567
verify_certificates=0
use_ssl=0
aws_profile=kinesis

# Updates ingestion should be from HBase
Expand Down
1 change: 1 addition & 0 deletions setup/all-in-one/.env.lfwlocal.kinesis
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ create_stream=1
region_name=us-east-1
endpoint_url=https://kinesis:4567
verify_certificates=0
use_ssl=0
aws_profile=kinesis
# This can be shared across extractions
images_topic=lfw-local-kinesis
Expand Down
Loading

0 comments on commit 54929b3

Please sign in to comment.