Skip to content

Commit

Permalink
kafka monitor lfw OK
Browse files Browse the repository at this point in the history
  • Loading branch information
svebk committed Jan 23, 2020
1 parent c6c209b commit 79b26ec
Show file tree
Hide file tree
Showing 12 changed files with 93 additions and 35 deletions.
1 change: 1 addition & 0 deletions cufacesearch/cufacesearch/ingester/kinesis_ingester.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import boto3
from datetime import datetime
from ..common.conf_reader import ConfReader

# Cannot be imported?
#from botocore.errorfactory import ExpiredIteratorException
#from json import JSONDecodeError
Expand Down
21 changes: 12 additions & 9 deletions cufacesearch/cufacesearch/pusher/kafka_pusher.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,15 +124,18 @@ def init_producer(self):
self.topic_name = self.get_required_param("topic_name")

def send(self, msg):
"""Push `msg` to `self.topic_name`
"""
# Check msg has been JSON dumped
if isinstance(msg, dict):
msg = json.dump(msg).encode('utf-8')
self.producer.send(self.topic_name, msg)
if self.verbose > 1:
self.push_count += 1
self.print_stats()
"""Push `msg` to `self.topic_name`
:param msg: message to be pushed
:type msg: str, dict
"""
# Check msg has been JSON dumped
if isinstance(msg, dict):
msg = json.dump(msg).encode('utf-8')
self.producer.send(self.topic_name, msg)
if self.verbose > 1:
self.push_count += 1
self.print_stats()

def print_stats(self):
"""Print statistics of producer
Expand Down
42 changes: 34 additions & 8 deletions cufacesearch/cufacesearch/pusher/kinesis_pusher.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
from __future__ import print_function

import sys
import time
import json
import boto3
from datetime import datetime
#from cufacesearch.common.conf_reader import ConfReader
from ..common.conf_reader import ConfReader

# Cannot be imported?
#from botocore.errorfactory import ExpiredIteratorException
from cufacesearch.common.conf_reader import ConfReader


def get_random_sha1():
from hashlib import sha1
Expand All @@ -14,6 +18,7 @@ def get_random_sha1():

# TODO: Should we have a GenericPusher class that exposes the `send` message method?


class KinesisPusher(ConfReader):
"""KinesisPusher
"""
Expand Down Expand Up @@ -45,6 +50,12 @@ def __init__(self, global_conf, prefix="", pid=None):
self.shard_infos = dict()
self.stream_name = self.get_required_param('stream_name')

# Initialize stats attributes
self.push_count = 0
self.last_display = 0
self.display_count = 1000
self.start_time = time.time()

# Initialize everything
self.init_pusher()

Expand Down Expand Up @@ -100,22 +111,37 @@ def init_pusher(self):
nb_shards = self.get_param('nb_shards', 2)
self.client.create_stream(StreamName=self.stream_name, ShardCount=nb_shards)
except:
msg = "[{}: warning] Trial #{}: could not create kinesis stream : {}. {}"
msg = "[{}: Warning] Trial #{}: could not create kinesis stream : {}. {}"
print(msg.format(self.pp, tries, self.stream_name, inst))
msg = "[{}: warning] Trial #{}: could not describe kinesis stream : {}. {}"
msg = "[{}: Warning] Trial #{}: could not describe kinesis stream : {}. {}"
print(msg.format(self.pp, tries, self.stream_name, inst))
time.sleep(1)
else:
msg = "[{}: ERROR] Stream {} not active after {} trials. Aborting..."
raise RuntimeError(msg.format(self.pp, self.stream_name, nb_trials))


def send(self, msg):
"""Push `msg` to `self.stream_name`
:param msg: message to be pushed
:type msg: str, dict
"""
# Check if msg was already json_dumped
# Check if msg was already JSON dumped
if isinstance(msg, dict):
msg = json.dump(msg).encode('utf-8')
# TODO: what is a good partition key?
# Use a random sha1 as partition key
single_rec = [{'Data': msg, 'PartitionKey': get_random_sha1()}]
self.client.put_records(Records=single_rec, StreamName=self.stream_name)
self.client.put_records(Records=single_rec, StreamName=self.stream_name)
self.push_count += 1
if self.verbose > 1:
self.print_stats()

def print_stats(self):
"""Print statistics of producer
"""
if self.push_count - self.last_display > self.display_count:
display_time = datetime.today().strftime('%Y/%m/%d-%H:%M.%S')
print_msg = "[{} at {}] Push count: {}"
print(print_msg.format(self.pp, display_time, self.push_count))
sys.stdout.flush()
self.last_display = self.push_count
4 changes: 2 additions & 2 deletions cufacesearch/cufacesearch/pusher/local_images_pusher.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
from __future__ import print_function

import sys
import json
import time
from argparse import ArgumentParser
Expand All @@ -17,6 +15,7 @@
skip_formats = ['SVG', 'RIFF']
valid_formats = ['JPEG', 'JPG', 'GIF', 'PNG']


class LocalImagePusher(ConfReader):
# To push list of images to be processed from the folder 'input_path' containing the images

Expand Down Expand Up @@ -208,6 +207,7 @@ def process(self):
download_file(lip.source_zip, local_zip)
untar_file(local_zip, lip.input_path)

# Looping to automatically add images added to the folder in non-zip mode
while True:
lip.process()
time.sleep(60)
15 changes: 14 additions & 1 deletion cufacesearch/docs/source/ingester.rst
Original file line number Diff line number Diff line change
@@ -1,8 +1,21 @@
Ingester
========

.. automodule:: cufacesearch.ingester
Kafka Ingester
--------------

.. automodule:: cufacesearch.ingester.kafka_ingester
:members:

Kinesis Ingester
----------------

.. automodule:: cufacesearch.ingester.kinesis_ingester
:members:

Generic Kafka Processor
-----------------------

.. automodule:: cufacesearch.ingester.generic_kafka_processor
:members:

7 changes: 4 additions & 3 deletions setup/ConfGenerator/create_conf_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,13 @@
# - conf_name (required)
# - extr_type (required)
# - input_type (required)
# - image_ingestion_type (required)
# - extr_nb_threads (optional, default: 1)
# Kafka related environment variables
# - images_topic (required)
# - extr_check_consumer_group (required)
# - extr_proc_consumer_group (required)
# - updates_topic
# - updates_topic (required if update_ingestion_type is kafka)
# - extr_check_consumer_group (deprecated)
# - extr_proc_consumer_group (deprecated)
# - kafka_servers (optional, default: memex HG kakfa brokers)
# - kafka_security (optional)
# Hbase related environment variables
Expand Down
6 changes: 4 additions & 2 deletions setup/ConfGenerator/create_conf_ingester.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
conf_name = os.environ['conf_name']
input_type = os.environ['input_type']
image_pushing_type = os.environ['image_pushing_type']
#image_pushing_type = os.getenv('image_pushing_type',None)
image_ingester_prefix = os.getenv("image_ingester_prefix", "IMG_ING_")
image_pusher_prefix = os.getenv("image_pusher_prefix", "IMG_PUSH_")
#producer_prefix = os.environ['producer_prefix']
Expand Down Expand Up @@ -80,11 +81,12 @@
conf[image_ingester_prefix + 'aws_profile'] = os.environ['aws_profile']
else:
raise ValueError("Unknown input type: {}".format(os.environ['input_type']))
# Is this actually used? Fro what?
conf[image_ingester_prefix + 'nb_threads'] = int(os.getenv('input_nb_threads', 4))
# This actually not used. Could be used for multi-threaded ingestion...
#conf[image_ingester_prefix + 'nb_threads'] = int(os.getenv('input_nb_threads', 4))

if input_type == "kafka" or image_pushing_type == "kafka":
env_kafka_security = os.getenv('kafka_security')
kafka_security = ""
if env_kafka_security:
kafka_security = json.loads(env_kafka_security)

Expand Down
5 changes: 4 additions & 1 deletion setup/all-in-one/.env.catlech101local.kafka
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,15 @@ extr_check_max_delay=120
# Kafka settings (local)
kafka_servers=["kafka:9092"]
kafka_security=

image_ingestion_type=kafka
images_topic=test-local-caltech101
updates_topic=test-local-caltech101-sbpycaffeimg-updates
extr_check_consumer_group=test-local-caltech101-sbpycaffeimg-extrchecker
extr_proc_consumer_group=test-local-caltech101-sbpycaffeimg-extrproc

# Updates ingestion should be from HBase
update_ingestion_type=hbase

# HBase settings
# (local)
hbase_host=hbase
Expand Down
8 changes: 6 additions & 2 deletions setup/all-in-one/.env.lfwlocal.kafka
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ indocker_repo_path=/home/ubuntu/memex/ColumbiaImageSearch
verbose=3

# Example source for LFW (to be used with DLib face detection/featurizer)
input_conf_name=lfw_local
input_conf_name=lfw_local_kafka
input_type=local
source_zip=https://vis-www.cs.umass.edu/lfw/lfw.tgz
# Should we mount a volume to make sure path are consistent ?
Expand All @@ -22,6 +22,7 @@ extr_nb_threads=4
extr_check_max_delay=120

# Kafka settings (local)
image_ingestion_type=kafka
kafka_servers=["kafka:9092"]
kafka_security=

Expand All @@ -33,6 +34,9 @@ updates_topic=lfw-local-dlibface-updates
extr_check_consumer_group=lfw-local-dlibface-extrchecker
extr_proc_consumer_group=lfw-local-dlibface-extrproc

# Updates ingestion should be from HBase
update_ingestion_type=hbase

# HBase settings
# (local)
hbase_host=hbase
Expand All @@ -53,7 +57,7 @@ lopq_M=32
lopq_subq=256
file_input=true
storer=local
reranking=true
reranking=1
wait_for_nbtrain=false

# API settings
Expand Down
5 changes: 3 additions & 2 deletions setup/all-in-one/.env.lfwlocal.kinesis
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ verbose=3
# Example source for LFW (to be used with DLib face detection/featurizer)
input_conf_name=lfw_local_kinesis
input_type=local
update_ingestion_type=hbase
source_zip=https://vis-www.cs.umass.edu/lfw/lfw.tgz
# Should we mount a volume to make sure path are consistent ?
input_path=./data/input_images_lfw
Expand All @@ -23,9 +24,9 @@ extr_nb_threads=4
extr_check_max_delay=120

# Kinesis pusher settings
image_ingestion_type=kinesis
images_stream=lfw-local-kinesis
create_stream=1
image_ingestion_type=kinesis
region_name=us-east-1
endpoint_url=https://kinesis:4567
verify_certificates=0
Expand Down Expand Up @@ -65,7 +66,7 @@ lopq_M=32
lopq_subq=256
file_input=true
storer=local
reranking=true
reranking=1
wait_for_nbtrain=false

# API settings
Expand Down
12 changes: 8 additions & 4 deletions setup/all-in-one/docker-compose_kafka_monitor.yml
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ services:
environment:
# General environment variables
- input_type
- "image_pushing_type=${image_ingestion_type}"
- verbose
- input_path
- source_zip
Expand Down Expand Up @@ -180,25 +181,28 @@ services:
environment:
# General environment variables
- input_type
- image_ingestion_type
- update_ingestion_type
- verbose
- "conf_name=${extr_conf_name}"
- "extr_type=${extr_type}"
- "extr_nb_threads=${extr_nb_threads}"
- "extr_check_max_delay=${extr_check_max_delay}"
# Kafka related environment variables
- images_topic
- updates_topic
- kafka_servers
- kafka_security
- "extr_check_consumer_group=${extr_check_consumer_group}"
- "extr_proc_consumer_group=${extr_proc_consumer_group}"
- "updates_topic=${updates_topic}"
- images_consumer_group
#- "extr_check_consumer_group=${extr_check_consumer_group}"
#- "extr_proc_consumer_group=${extr_proc_consumer_group}"
# Hbase related environment variables
- hbase_host
- table_sha1infos
- table_updateinfos
- batch_update_size
- column_list_sha1s
- extr_family_column
- extr_column_family
- image_info_column_family
- image_buffer_column_family
- image_buffer_column_name
Expand Down
2 changes: 1 addition & 1 deletion setup/all-in-one/docker-compose_kinesis_monitor.yml
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,8 @@ services:
environment:
# General environment variables
- input_type
- "image_pushing_type=${image_ingestion_type}"
- input_path
- input_nb_threads
- verbose
- source_zip
- "nb_workers=${input_nb_workers}"
Expand Down

0 comments on commit 79b26ec

Please sign in to comment.