Skip to content

Commit

Permalink
Decode kafka's returned byte object to a string string
Browse files Browse the repository at this point in the history
  • Loading branch information
Schwartz-Matthew-bah committed Apr 8, 2019
1 parent ab4b9ac commit 375d5db
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 13 deletions.
1 change: 1 addition & 0 deletions qa/test-harness/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
git+https://github.com/usdot-jpo-ode/ode-output-validator-library@master#egg=odevalidator
python-dateutil
kafka-python
requests
20 changes: 7 additions & 13 deletions qa/test-harness/test-harness.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
from argparse import ArgumentParser
from kafka import KafkaConsumer
from pathlib import Path
from odevalidator.validator import TestCase
import os
from odevalidator import TestCase
import os

KAFKA_CONSUMER_TIMEOUT = 10000
KAFKA_PORT = '9092'
Expand All @@ -25,14 +25,13 @@ def upload_file(filepath):
return requests.post(destination_url, files={'name':'file', 'file':file}, timeout=2)

def listen_to_kafka_topic(topic, msg_queue):
consumer=KafkaConsumer(topic, bootstrap_servers=DOCKER_HOST_IP+':'+KAFKA_PORT, consumer_timeout_ms=KAFKA_CONSUMER_TIMEOUT)
for msg in consumer:
msg_queue.put(msg.value)
consumer=KafkaConsumer(topic, bootstrap_servers=DOCKER_HOST_IP+':'+KAFKA_PORT, consumer_timeout_ms=KAFKA_CONSUMER_TIMEOUT)
for msg in consumer:
msg_queue.put(str(msg.value, 'utf-8'))

# main function using old functionality
def main():
dir_path = os.path.dirname(os.path.realpath(__file__))
print("CWD: %s" % dir_path)

parser = ArgumentParser()
parser.add_argument("--data-file", dest="data_file_path", help="Path to log data file that will be sent to the ODE for validation.", metavar="DATAFILEPATH", required=True)
Expand All @@ -42,12 +41,7 @@ def main():
args = parser.parse_args()

# Parse test config and create test case
if args.config_file_path:
validator = TestCase(args.config_file_path)
else:
validator = TestCase("ode-output-validator-library/odevalidator/config.ini")

print("[START] Beginning test routine referencing configuration file '%s'." % args.config_file_path)
validator = TestCase(args.config_file_path)

# Setup logger and set output to file if specified
logger = logging.getLogger('test-harness')
Expand All @@ -72,7 +66,7 @@ def main():
else:
print("[ERROR] Aborting test routine! Test file failed to upload, response code %d" % upload_response.status_code)
return
except requests.exceptions.ConnectTimeout as e:
except (requests.exceptions.ConnectionError, requests.exceptions.ConnectTimeout) as e:
print("[ERROR] Aborting test routine! Test file upload failed (unable to reach to ODE). Error: '%s'" % str(e))
return

Expand Down

0 comments on commit 375d5db

Please sign in to comment.