Skip to content

Commit

Permalink
merge from kinesis
Browse files Browse the repository at this point in the history
  • Loading branch information
svebk committed Jan 14, 2020
2 parents 86a0698 + 95796b8 commit 2b4af5d
Show file tree
Hide file tree
Showing 44 changed files with 3,146 additions and 434 deletions.
8 changes: 6 additions & 2 deletions conf/aws_credentials/credentials.sample
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
[cuimagesearch]
aws_access_key_id = FILLME
aws_secret_access_key = FILLME
aws_access_key_id=FILLME
aws_secret_access_key=FILLME

[kinesis]
aws_access_key_id=FILLME
aws_secret_access_key=FILLME
152 changes: 76 additions & 76 deletions conf/hue/hue.ini
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
http_port=8000

# Choose whether to enable the new Hue 4 interface.
## is_hue_4=true
is_hue_4=true

# Choose whether to still allow users to enable the old Hue 3 interface.
## disable_hue_3=false
Expand Down Expand Up @@ -181,7 +181,7 @@

# Comma separated list of apps to not load at server startup.
# e.g.: pig,zookeeper
## app_blacklist=
app_blacklist=hive,impala,pig

# Id of the cluster where Hue is located.
## cluster_id='default'
Expand Down Expand Up @@ -636,16 +636,16 @@

# Configuration options for connecting to an external SMTP server
# ------------------------------------------------------------------------
[[smtp]]
#[[smtp]]

# The SMTP server information for email notification delivery
host=localhost
port=25
user=
password=
#host=localhost
#port=25
#user=
#password=

# Whether to use a TLS (secure) connection when talking to the SMTP server
tls=no
#tls=no

# Default email address to use for various automated notification from Hue
## default_from_email=hue@localhost
Expand Down Expand Up @@ -708,7 +708,7 @@
[notebook]

## Show the notebook menu or not
# show_notebooks=true
show_notebooks=false

## Flag to enable the selection of queries from files, saved queries into the editor or as snippet.
# enable_external_statements=true
Expand All @@ -720,10 +720,10 @@
# enable_sql_indexer=false

## Flag to turn on the Presentation mode of the editor.
# enable_presentation=true
#enable_presentation=false

## Flag to enable the SQL query builder of the table assist.
# enable_query_builder=true
#enable_query_builder=false

## Flag to enable the creation of a coordinator for the current SQL query.
# enable_query_scheduling=false
Expand All @@ -742,97 +742,97 @@
[[interpreters]]
# Define the name and how to connect and execute the language.

[[[hive]]]
# The name of the snippet.
name=Hive
# The backend connection to use to communicate with the server.
interface=hiveserver2
# [[[hive]]]
# # The name of the snippet.
# name=Hive
# # The backend connection to use to communicate with the server.
# interface=hiveserver2

[[[impala]]]
name=Impala
interface=hiveserver2
# [[[impala]]]
# name=Impala
# interface=hiveserver2

# [[[sparksql]]]
# name=SparkSql
# interface=hiveserver2
# # [[[sparksql]]]
# # name=SparkSql
# # interface=hiveserver2

[[[spark]]]
name=Scala
interface=livy
# [[[spark]]]
# name=Scala
# interface=livy

[[[pyspark]]]
name=PySpark
interface=livy
# [[[pyspark]]]
# name=PySpark
# interface=livy

[[[r]]]
name=R
interface=livy
# [[[r]]]
# name=R
# interface=livy

[[[jar]]]
name=Spark Submit Jar
interface=livy-batch
# [[[jar]]]
# name=Spark Submit Jar
# interface=livy-batch

[[[py]]]
name=Spark Submit Python
interface=livy-batch
# [[[py]]]
# name=Spark Submit Python
# interface=livy-batch

[[[text]]]
name=Text
interface=text

[[[markdown]]]
name=Markdown
interface=text
# [[[markdown]]]
# name=Markdown
# interface=text

[[[mysql]]]
name = MySQL
interface=rdbms
# [[[mysql]]]
# name = MySQL
# interface=rdbms

[[[sqlite]]]
name = SQLite
interface=rdbms
# [[[sqlite]]]
# name = SQLite
# interface=rdbms

[[[postgresql]]]
name = PostgreSQL
interface=rdbms
# [[[postgresql]]]
# name = PostgreSQL
# interface=rdbms

[[[oracle]]]
name = Oracle
interface=rdbms
# [[[oracle]]]
# name = Oracle
# interface=rdbms

[[[solr]]]
name = Solr SQL
interface=solr
# [[[solr]]]
# name = Solr SQL
# interface=solr
## Name of the collection handler
# options='{"collection": "default"}'

[[[pig]]]
name=Pig
interface=oozie
# [[[pig]]]
# name=Pig
# interface=oozie

[[[java]]]
name=Java
interface=oozie
# [[[java]]]
# name=Java
# interface=oozie

[[[spark2]]]
name=Spark
interface=oozie
# [[[spark2]]]
# name=Spark
# interface=oozie

[[[mapreduce]]]
name=MapReduce
interface=oozie
# [[[mapreduce]]]
# name=MapReduce
# interface=oozie

[[[sqoop1]]]
name=Sqoop1
interface=oozie
# [[[sqoop1]]]
# name=Sqoop1
# interface=oozie

[[[distcp]]]
name=Distcp
interface=oozie
# [[[distcp]]]
# name=Distcp
# interface=oozie

[[[shell]]]
name=Shell
interface=oozie
# [[[shell]]]
# name=Shell
# interface=oozie

# [[[mysql]]]
# name=MySql JDBC
Expand Down
31 changes: 20 additions & 11 deletions cufacesearch/cufacesearch/api/api.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""Flask API to expose the search index.
"""

from __future__ import print_function
import os
import sys
import time
Expand Down Expand Up @@ -99,10 +99,11 @@ def put_post(self, mode):
"""
pid = os.getpid()
form = request.form
print("[put/post.{}] received parameters: {}".format(pid, form.keys()))
#print("[put/post.{}] received parameters: {}".format(pid, form.keys()))
if 'data' not in request.form.keys():
print("[put/post.{}] trying to parse input".format(pid))
form = json.loads(request.form.keys())
form = json.loads(request.form.keys()[0])
print("[put/post.{}] received parameters: {}".format(pid, form.keys()))
print("[put/post.{}] received request: {}".format(pid, request))
query = form['data']
try:
Expand Down Expand Up @@ -294,6 +295,9 @@ def search_byB64(self, query, options=None):
:rtype: dict
"""
query_b64s = [str(x) for x in query.split(',') if not x.startswith('data:')]
print("Received {} B64 queries with length of: {}".format(len(query_b64s), [len(x) for x in query_b64s]))
#for i in range(len(query_b64s)):
# print(i,query_b64s[i])
options_dict, errors = self.get_options_dict(options)
outp = self.searcher.search_imageB64_list(query_b64s, options_dict)
outp_we = self.append_errors(outp, errors)
Expand Down Expand Up @@ -400,7 +404,7 @@ def get_clean_urls_from_query(query):
query_urls.append(x[:-1])
else:
query_urls.append(x)
print("[get_clean_urls_from_query: info] {}".format(query_urls))
print("[api.get_clean_urls_from_query: info] {}".format(query_urls))
return query_urls

def view_similar_query_response(self, query_type, query, query_response, options=None):
Expand Down Expand Up @@ -447,16 +451,17 @@ def view_similar_query_response(self, query_type, query, query_response, options
# URLs should already be in query response
pass
else:
print("[view_similar_query_response: error] Unknown query_type: {}".format(query_type))
print("[api.view_similar_query_response: error] Unknown query_type: {}".format(query_type))
return None

# Get errors
options_dict, errors_options = self.get_options_dict(options)

# Parse similar faces response
# Parse similar images response
# TODO: remove 'face' in variable names
all_sim_faces = query_response[self.searcher.do.map['all_similar_'+self.input_type+'s']]
search_results = []
print "[view_similar_query_response: log] len(sim_images): {}".format(len(all_sim_faces))
print("[api.view_similar_query_response: log] len(sim_images): {}".format(len(all_sim_faces)))
for i in range(len(all_sim_faces)):
# Parse query face, and build face tuple (sha1, url/b64 img, face bounding box)
query_face = all_sim_faces[i]
Expand Down Expand Up @@ -486,10 +491,14 @@ def view_similar_query_response(self, query_type, query, query_response, options
osface_sha1 = similar_faces[self.searcher.do.map['image_sha1s']][j]
#if query_type == "PATH":
if self.searcher.file_input:
with open(similar_faces[self.searcher.do.map['cached_image_urls']][j], 'rb') as img_buffer:
img_info = get_SHA1_img_info_from_buffer(img_buffer)
img_B64 = buffer_to_B64(img_buffer)
osface_url = "data:" + ImageMIMETypes[img_info[1]] + ";base64," + str(img_B64)
try:
with open(similar_faces[self.searcher.do.map['cached_image_urls']][j], 'rb') as img_buffer:
img_info = get_SHA1_img_info_from_buffer(img_buffer)
img_B64 = buffer_to_B64(img_buffer)
osface_url = "data:" + ImageMIMETypes[img_info[1]] + ";base64," + str(img_B64)
except Exception as inst:
msg = "[api.view_similar_query_response: WARNING] Could not load image {} from {}"
print(msg.format(osface_sha1, similar_faces[self.searcher.do.map['cached_image_urls']][j]))
else:
osface_url = similar_faces[self.searcher.do.map['cached_image_urls']][j]
osface_bbox_compstr = None
Expand Down
9 changes: 6 additions & 3 deletions cufacesearch/cufacesearch/extractor/generic_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,12 @@ def run(self):
print(err_msg.format(self.pp, img_id, sha1, type(inst), inst, fulltb))
sys.stdout.flush()
# len(img_buffer_b64)
err_msg = "[{}: warning] Extraction failed for img #{} {}. img_buffer_b64 length was {}"
print(err_msg.format(self.pp, img_id, sha1, len(len(img_buffer_b64))))
sys.stdout.flush()
try:
err_msg = "[{}: warning] Extraction failed for img #{} {}. img_buffer_b64 length was {}"
print(err_msg.format(self.pp, img_id, sha1, len(img_buffer_b64)))
sys.stdout.flush()
except:
pass
# Mark this image as corrupted
# But how and when could it be overwritten if we manage to run the extraction later?
out_dict = self.extractor.failed_out_dict()
Expand Down
13 changes: 12 additions & 1 deletion cufacesearch/cufacesearch/ingester/generic_kafka_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import sys
import time
import json
import socket
from datetime import datetime
from kafka import KafkaConsumer, KafkaProducer
Expand All @@ -14,6 +15,9 @@
# Should we consider using Kafka Streams ?
base_path_keys = "../../data/keys/hg-kafka-"


# TODO: Split into ingester/producer

class GenericKafkaProcessor(ConfReader):
"""GenericKafkaProcessor
"""
Expand Down Expand Up @@ -205,7 +209,6 @@ def init_consumer(self):
print(msg.format(self.pp))
return


# NB: topic could be a list
if type(topic) == list:
topic = [str(t) for t in topic]
Expand Down Expand Up @@ -255,3 +258,11 @@ def init_producer(self):
except Exception as inst:
# Would be OK for ingester that do not output to kafka...
print("[{}: warning] Could not initialize producer with arguments {}. Error was: {}".format(self.pp, dict_args, inst))

def get_msg_json(self):
"""Generator of JSON messages from the consumer.
:yield: JSON message
"""
for msg_json in self.consumer:
yield json.loads(msg_json.value)
Loading

0 comments on commit 2b4af5d

Please sign in to comment.