diff --git a/adsb-mqtt/adsb-mqtt.py b/adsb-mqtt/adsb-mqtt.py index 87c88da..5a73c42 100755 --- a/adsb-mqtt/adsb-mqtt.py +++ b/adsb-mqtt/adsb-mqtt.py @@ -43,6 +43,9 @@ import mqtt_wrapper import pandas as pd + +ID = str(random.randint(1,100001)) + # Clean out observations this often OBSERVATION_CLEAN_INTERVAL = 30 # Socket read timeout @@ -423,11 +426,15 @@ def dump1090Read(self) -> str: def run(self): """Run the flight tracker. """ - self.__mqtt_bridge = mqtt_wrapper.bridge(host = self.__mqtt_broker, port = self.__mqtt_port, client_id = "skyscan-adsb-mqtt-%d" % (os.getpid())) # TOOD: , user_id = args.mqtt_user, password = args.mqtt_password) + timeHeartbeat = 0 + self.__mqtt_bridge = mqtt_wrapper.bridge(host = self.__mqtt_broker, port = self.__mqtt_port, client_id = "skyscan-adsb-mqtt-%s" % (ID)) # TOOD: , user_id = args.mqtt_user, password = args.mqtt_password) #threading.Thread(target = self.__publish_thread, daemon = True).start() - + self.__mqtt_bridge.publish("skyscan/registration", "skyscan-adsb-mqtt-"+ID+" Registration", 0, False) while True: + if timeHeartbeat < time.mktime(time.gmtime()): + timeHeartbeat = time.mktime(time.gmtime()) + 10 + self.__mqtt_bridge.publish("skyscan/heartbeat", "skyscan-adsb-mqtt-"+ID+" Heartbeat", 0, False) if not self.dump1090Connect(): continue for data in self.dump1090Read(): @@ -446,6 +453,7 @@ def run(self): retain = False self.__mqtt_bridge.publish(self.__mqtt_topic, self.__observations[icao24].json(), 0, retain) #logging.info("%s alt %5d trk %3d spd %3d %s" % (self.__observations[icao24].getIcao24(), self.__observations[icao24].getAltitude(), self.__observations[icao24].getHeading(), self.__observations[icao24].getGroundSpeed(), self.__observations[icao24].getType())) + time.sleep(0.01) def cleanObservations(self): """Clean observations for planes not seen in a while diff --git a/axis-ptz/camera.py b/axis-ptz/camera.py index 332b3cd..b4e3c7c 100755 --- a/axis-ptz/camera.py +++ b/axis-ptz/camera.py @@ -24,7 +24,7 @@ from sensecam_control import vapix_control,vapix_config - +ID = str(random.randint(1,100001)) tiltCorrect = 15 args = None camera = None @@ -238,17 +238,23 @@ def main(): # Sleep for a bit so we're not hammering the HAT with updates time.sleep(0.005) print("connecting to MQTT broker at "+ args.mqtt_host+", channel '"+args.mqtt_topic+"'") - client = mqtt.Client("skyscan-axis-ptz-camera") #create new instance + client = mqtt.Client("skyscan-axis-ptz-camera-" + ID) #create new instance client.on_message=on_message #attach function to callback client.connect(args.mqtt_host) #connect to broker client.loop_start() #start the loop client.subscribe(args.mqtt_topic+"/#") + client.publish("skyscan/registration", "skyscan-axis-ptz-camera-"+ID+" Registration", 0, False) + ############################################# ## Main Loop ## ############################################# + timeHeartbeat = 0 while True: + if timeHeartbeat < time.mktime(time.gmtime()): + timeHeartbeat = time.mktime(time.gmtime()) + 10 + client.publish("skyscan/heartbeat", "skyscan-axis-ptz-camera-"+ID+" Heartbeat", 0, False) time.sleep(0.1) diff --git a/docker-compose.yml b/docker-compose.yml index 3ed4954..c41ba14 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -72,12 +72,9 @@ services: egi: build: ./egi - entrypoint: "/app/egi.py -m mqtt" - environment: - - LAT=${LAT} - - LONG=${LONG} - - ALT=${ALT} - - ROLL=${ROLL} - - PITCH=${PITCH} - - YAW=${YAW} + entrypoint: bash -c "gpsd ${GPS_SERIAL} -F /var/run/gpsd.sock && python3 egi_mqtt.py -m mqtt -l ${LAT} -L ${LONG} -a ${ALT} -r ${ROLL} -p ${PITCH} -y ${YAW}" + devices: + - /dev/ttyACM0 + depends_on: + - mqtt restart: unless-stopped \ No newline at end of file diff --git a/egi/Dockerfile b/egi/Dockerfile index d7a3349..348eec1 100644 --- a/egi/Dockerfile +++ b/egi/Dockerfile @@ -1,13 +1,15 @@ FROM debian RUN apt update && \ - apt install -y python3 python3-pip && \ - pip3 install paho-mqtt + apt install -y python3 python3-pip gpsd-clients gpsd RUN mkdir -p /app/ WORKDIR /app +ADD *.txt /app/ +RUN pip3 install -r requirements.txt ADD *.py /app/ -#ENTRYPOINT python3 /tmp/egi.py +ENTRYPOINT sh -c "gpsd /dev/ttyACM0 -F /var/run/gpsd.sock && bash" -#docker run -d --restart unless-stopped --network=host -v /home/pi/:/tmp/ --name lamp docker-registry.iqt.org/mission-capabilities/rpi-stats-reporting/lamp-control +#docker run -it --device=/dev/ttyACM0 skyscan_egi +#python3 egi_mqtt.py -h \ No newline at end of file diff --git a/egi/egi.py b/egi/egi.py deleted file mode 100755 index 2052d3d..0000000 --- a/egi/egi.py +++ /dev/null @@ -1,79 +0,0 @@ -#!/usr/bin/env python3 - -import paho.mqtt.client as mqtt #import the client1 -import time -import random -import json -import os -import argparse - -Active = True - -####################################################### -## Initialize Variables ## -####################################################### -config = {} -config['Local'] = ["127.0.0.1", "/egi/", "Local MQTT Bus"] -timeTrigger = 0 -timeHeartbeat = 0 -ID = str(random.randint(1,100001)) - -LLA = [ os.environ['LAT'], os.environ['LONG'], os.environ['ALT'] ] -RPY = [ os.environ['ROLL'], os.environ['PITCH'], os.environ['YAW'] ] - -state = {} -state['lat'] = LLA[0] -state['long'] = LLA[1] -state['alt'] = LLA[2] -state['roll'] = RPY[0] -state['pitch'] = RPY[1] -state['yaw'] = RPY[2] -state=json.dumps(state) - -parser = argparse.ArgumentParser(description='An MQTT based camera controller') - -parser.add_argument('-m', '--mqtt-host', help="MQTT broker hostname", default='127.0.0.1') - -args = parser.parse_args() - - -####################################################### -## Local MQTT Callback Function ## -####################################################### -def on_message_local(client, userdata, message): - payload = str(message.payload.decode("utf-8")) - print('Message Received: ' + message.topic + ' | ' + payload) - #if message.topic == local_topic+"/OFF": - # print("Turning Lamp OFF") - -def on_disconnect(client, userdata, rc): - global Active - Active = False - -############################################# -## Initialize Local MQTT Bus ## -############################################# -Unit = 'Local' -broker_address=config[Unit][0] -broker_address=args.mqtt_host -local_topic= config[Unit][1] -print("connecting to MQTT broker at "+broker_address+", channel '"+local_topic+"'") -clientLocal = mqtt.Client("EGI-"+ID) #create new instance -clientLocal.on_message = on_message_local #attach function to callback -clientLocal.on_disconnect = on_disconnect -clientLocal.connect(broker_address) #connect to broker -clientLocal.loop_start() #start the loop -clientLocal.subscribe(local_topic+"/#") #config/#") -clientLocal.publish(local_topic+"/registration","EGI-"+ID+" Registration") - -############################################# -## Main Loop ## -############################################# -while Active: - if timeHeartbeat < time.mktime(time.gmtime()): - timeHeartbeat = time.mktime(time.gmtime()) + 10 - clientLocal.publish(local_topic+"/Heartbeat","EGI-"+ID+" Heartbeat") - if timeTrigger < time.mktime(time.gmtime()): - timeTrigger = time.mktime(time.gmtime()) + 10 - clientLocal.publish(local_topic,state) - time.sleep(0.001) \ No newline at end of file diff --git a/egi/egi_mqtt.py b/egi/egi_mqtt.py new file mode 100644 index 0000000..e025fa6 --- /dev/null +++ b/egi/egi_mqtt.py @@ -0,0 +1,137 @@ +#!/usr/bin/env python3 + +from gps import * +import paho.mqtt.client as mqtt #import the client1 +import time +import random +import json +import os +import argparse +import logging +import coloredlogs +import threading + +gpsd = None #seting the global variable +Active = True +styles = {'critical': {'bold': True, 'color': 'red'}, 'debug': {'color': 'green'}, 'error': {'color': 'red'}, 'info': {'color': 'white'}, 'notice': {'color': 'magenta'}, 'spam': {'color': 'green', 'faint': True}, 'success': {'bold': True, 'color': 'green'}, 'verbose': {'color': 'blue'}, 'warning': {'color': 'yellow'}} +level = logging.INFO +coloredlogs.install(level=level, fmt='%(asctime)s.%(msecs)03d \033[0;90m%(levelname)-8s ' + '' + '\033[0;36m%(filename)-18s%(lineno)3d\033[00m ' + '%(message)s', + level_styles = styles) +logging.info("Initializing EGI") + +####################################################### +## Initialize Variables ## +####################################################### +config = {} +config['Local'] = ["127.0.0.1", "skyscan/egi", "Local MQTT Bus"] # updated based on naming convention here: https://www.hivemq.com/blog/mqtt-essentials-part-5-mqtt-topics-best-practices/ +timeTrigger = time.mktime(time.gmtime()) + 10 +timeHeartbeat = time.mktime(time.gmtime()) + 10 +ID = str(random.randint(1,100001)) + + +class GpsPoller(threading.Thread): + def __init__(self): + threading.Thread.__init__(self) + global gpsd #bring it in scope + gpsd = gps(mode=WATCH_ENABLE|WATCH_NEWSTYLE) + self.current_value = None + self.running = True #setting the thread running to true + + def run(self): + global gpsd + while self.running: + gpsd.next() #this will continue to loop and grab EACH set of gpsd info to clear the buffer + + +LLA = [38.9510808,-77.3841834,130.1337] +RPY = [0,0,0] + +parser = argparse.ArgumentParser(description='An MQTT based camera controller') +parser.add_argument('-m', '--mqtt-host', help="MQTT broker hostname", default='127.0.0.1') +parser.add_argument('-l', '--latitude', help="Latitude (decimal degrees)", default=LLA[0]) +parser.add_argument('-L', '--longitude', help="Longitude (decimal degrees)", default=LLA[1]) +parser.add_argument('-a', '--altitude', help="Altitude (meters)", default=LLA[2]) +parser.add_argument('-r', '--roll', help="Roll Angle of Camera (degrees)", default=RPY[0]) +parser.add_argument('-p', '--pitch', help="Pitch Angle of Camera (degrees)", default=RPY[1]) +parser.add_argument('-y', '--yaw', help="Yaw Angle of Camera (degrees from True North)", default=RPY[2]) +try: + args = parser.parse_args() +except: + logging.critical("Error in Command Line Argument Parsing. Are all environment variables set?", exc_info=True) + raise + + +state = {} +state['time'] = time.strftime("%Y-%m-%dT%H:%M:%SZ",time.gmtime()) +state['lat'] = float(args.latitude) +state['long'] = float(args.longitude) +state['alt'] = float(args.altitude) +state['roll'] = float(args.roll) +state['pitch'] = float(args.pitch) +state['yaw'] = float(args.yaw) +state['fix'] = 0 +logging.info("Initial State Array: " + str(state)) + + +####################################################### +## Local MQTT Callback Function ## +####################################################### +def on_message_local(client, userdata, message): + payload = str(message.payload.decode("utf-8")) + logging.info('Message Received: ' + message.topic + ' | ' + payload) + +def on_disconnect(client, userdata, rc): + global Active + Active = False + +############################################# +## Initialize Local MQTT Bus ## +############################################# +Unit = 'Local' +broker_address=config[Unit][0] +broker_address=args.mqtt_host +local_topic= config[Unit][1] +logging.info("connecting to MQTT broker at "+broker_address+", channel '"+local_topic+"'") +clientLocal = mqtt.Client("EGI-"+ID) #create new instance +clientLocal.on_message = on_message_local #attach function to callback +clientLocal.on_disconnect = on_disconnect +try: + clientLocal.connect(broker_address) #connect to broker +except: + logging.critical("Could not connect to MQTT Broker.", exc_info=True) + raise +clientLocal.loop_start() #start the loop +clientLocal.publish("skyscan/registration","EGI-"+ID+" Registration") + +gpsp = GpsPoller() # create the thread +try: + gpsp.start() # start it up + ############################################# + ## Main Loop ## + ############################################# + while Active: + state['fix'] = gpsd.fix.status + if gpsd.fix.status: + state['time'] = gpsd.fix.time + state['lat'] = gpsd.fix.latitude + state['long'] = gpsd.fix.longitude + state['alt'] = gpsd.fix.altitude + if timeTrigger < time.mktime(time.gmtime()): + timeTrigger = time.mktime(time.gmtime()) + 10 + clientLocal.publish(local_topic,json.dumps(state)) + if timeHeartbeat < time.mktime(time.gmtime()): + timeHeartbeat = time.mktime(time.gmtime()) + 30 + logging.info("Current EGI State: " + json.dumps(state)) + time.sleep(0.01) +except (KeyboardInterrupt, SystemExit): #when you press ctrl+c + logging.info("Killing GPS Thread...") + gpsp.running = False + gpsp.join(2) +except: + logging.critical("Error starting GPS.", exc_info=True) + gpsp.running = False + gpsp.join(2) + raise \ No newline at end of file diff --git a/egi/requirements.txt b/egi/requirements.txt new file mode 100644 index 0000000..45eb5aa --- /dev/null +++ b/egi/requirements.txt @@ -0,0 +1,3 @@ +paho-mqtt==1.5.0 +coloredlogs +gps \ No newline at end of file diff --git a/env-example b/env-example index ddc655f..f44efee 100644 --- a/env-example +++ b/env-example @@ -14,4 +14,5 @@ CAMERA_MOVE_SPEED=50 # The speed at which the Axis will move for Pan/Tilt CAMERA_DELAY=0.5 # How many seconds after issuing a Pan/Tilt command should a picture be taken CAMERA_ZOOM=9999 # The zoom setting for the camera (0-9999) CAMERA_LEAD=0.25 # How many seconds ahead of a plane's predicted location should the camera be positioned -RTL_DEV=1 # The device ID for the RTL-SDR - set using the rtl_eeprom program \ No newline at end of file +RTL_DEV=1 # The device ID for the RTL-SDR - set using the rtl_eeprom program +GPS_SERIAL=/dev/ttyACM0 # GPS module serial port \ No newline at end of file diff --git a/pan-tilt-pi/camera.py b/pan-tilt-pi/camera.py index 1850131..519a57b 100755 --- a/pan-tilt-pi/camera.py +++ b/pan-tilt-pi/camera.py @@ -22,6 +22,8 @@ import pantilthat from picamera import PiCamera +ID = str(random.randint(1,100001)) + camera = PiCamera() tiltCorrect = 15 @@ -183,17 +185,22 @@ def main(): # Sleep for a bit so we're not hammering the HAT with updates time.sleep(0.005) print("connecting to MQTT broker at "+ args.mqtt_host+", channel '"+args.mqtt_topic+"'") - client = mqtt.Client("pan-tilt-pi-camera") #create new instance + client = mqtt.Client("pan-tilt-pi-camera-" + ID) #create new instance client.on_message=on_message #attach function to callback client.connect(args.mqtt_host) #connect to broker client.loop_start() #start the loop client.subscribe(args.mqtt_topic+"/#") + client.publish("skyscan/registration", "pan-tilt-pi-camera-"+ID+" Registration", 0, False) ############################################# ## Main Loop ## ############################################# + timeHeartbeat = 0 while True: + if timeHeartbeat < time.mktime(time.gmtime()): + timeHeartbeat = time.mktime(time.gmtime()) + 10 + client.publish("Heartbeat", "pan-tilt-pi-camera-"+ID+" Heartbeat", 0, False) time.sleep(0.1) diff --git a/tracker/flighttracker.py b/tracker/flighttracker.py index 073404e..375d01b 100755 --- a/tracker/flighttracker.py +++ b/tracker/flighttracker.py @@ -29,7 +29,6 @@ import sys import os import logging -import logging import coloredlogs import calendar from datetime import datetime, timedelta @@ -45,6 +44,7 @@ import pandas as pd from queue import Queue +ID = str(random.randint(1,100001)) # Clean out observations this often OBSERVATION_CLEAN_INTERVAL = 30 @@ -267,7 +267,9 @@ def dict(self): def on_message(client, userdata, message): - + global camera_altitude + global camera_latitude + global camera_longitude command = str(message.payload.decode("utf-8")) # Assumes you will only be getting JSON on your subscribed messages try: @@ -282,8 +284,13 @@ def on_message(client, userdata, message): log.critical("onMessage - Caught it!") if message.topic == plane_topic: q.put(update) #put messages on queue - elif message.topic == "/egi/": - print(update) + elif message.topic == "skyscan/egi": + logging.info(update) + camera_longitude = float(update["long"]) + camera_latitude = float(update["lat"]) + camera_altitude = float(update["alt"]) + else: + logging.info("Topic not processed: " + message.topic) class FlightTracker(object): __mqtt_broker: str = "" @@ -326,7 +333,11 @@ def __publish_thread(self): """ MQTT publish closest observation every second, more often if the plane is closer """ + timeHeartbeat = 0 while True: + if timeHeartbeat < time.mktime(time.gmtime()): + timeHeartbeat = time.mktime(time.gmtime()) + 10 + self.__client.publish("skyscan/heartbeat", "skyscan-tracker-" +ID+" Heartbeat", 0, False) if not self.__tracking_icao24: time.sleep(1) else: @@ -367,7 +378,7 @@ def run(self): """Run the flight tracker. """ print("connecting to MQTT broker at "+ self.__mqtt_broker +", subcribing on channel '"+ self.__plane_topic+"'publising on: " + self.__tracking_topic) - self.__client = mqtt.Client("skyscan-tracker") #create new instance + self.__client = mqtt.Client("skyscan-tracker-" + ID) #create new instance self.__client.on_message = on_message #attach function to callback print("setup MQTT") @@ -376,7 +387,8 @@ def run(self): self.__client.loop_start() #start the loop print("start MQTT") self.__client.subscribe(self.__plane_topic) - self.__client.subscribe("/egi/") + self.__client.subscribe("skyscan/egi") + self.__client.publish("skyscan/registration", "skyscan-tracker-"+ID+" Registration", 0, False) print("subscribe mqtt") threading.Thread(target = self.__publish_thread, daemon = True).start() diff --git a/tracker/requirements copy.txt b/tracker/requirements copy.txt deleted file mode 100644 index bd35a73..0000000 --- a/tracker/requirements copy.txt +++ /dev/null @@ -1,5 +0,0 @@ -coloredlogs==14.0 -paho-mqtt==1.5.0 -python-dateutil==2.8.1 -requests==2.23.0 -pandas