-
Notifications
You must be signed in to change notification settings - Fork 0
/
monitor.py
96 lines (80 loc) · 2.49 KB
/
monitor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
import os
import sys
import time
import ctypes
import configparser
from providers import *
# Importing the MessageBox function from the ctypes library
MessageBox = ctypes.windll.user32.MessageBoxW
class MetricHandler:
"""Class to handle metrics"""
def __init__(self, client, updateTime):
"""
Initialize MetricHandler object
Args:
client: MQTT client object
updateTime: Time interval for updating metrics
"""
self._providers = []
self._mqttClient = client
self._updateTime = updateTime
self._currentTimestamp = time.time()
self._prefix = "hwinfo/"
def addProvider(self, provider):
"""
Add a provider to the list of providers
Args:
provider: Provider object
"""
self._providers.append(provider)
def sendUpdate(self):
"""Send metric update to MQTT broker"""
for provider in self._providers:
if provider.isValid():
try:
self._mqttClient.publish(self._prefix + provider.getName(), provider.getValue())
except:
pass
def loop(self):
"""Main loop for updating metrics"""
if (time.time() - self._currentTimestamp) > self._updateTime:
self.sendUpdate()
self._currentTimestamp = time.time()
def main():
"""Main function"""
print("Starting...")
config = configparser.ConfigParser()
config.read('config.ini')
client = mqtt.Client()
client.username_pw_set(config.get('credentials', 'user'), config.get('credentials', 'pass'))
client.connect(config.get('credentials', 'host'), 1883, 60)
Metrics = MetricHandler(client, 15)
Metrics.addProvider(CPUUsageMetricProvider())
Metrics.addProvider(RAMUsageMetricProvider())
Metrics.addProvider(GPUTempMetricProvider())
while True:
Metrics.loop()
client.loop()
if __name__ == "__main__":
if "task" in sys.argv:
main()
else:
from taskscheduler import *
scheduler = TaskScheduler()
if "install" in sys.argv:
script_path = os.path.abspath(sys.argv[0])
try:
scheduler.create_or_update_task('PerfMonMQTT', script_path)
MessageBox(None, 'Scheduled task has been created', 'Info', 0)
except Exception as e:
MessageBox(None, 'Scheduled task creation failed!\n\n' + str(e), 'Info', 0)
else:
if "uninstall" in sys.argv:
try:
scheduler.delete_scheduled_task('PerfMonMQTT')
MessageBox(None, 'Scheduled task has been deleted', 'Info', 0)
except Exception as e:
MessageBox(None, 'Scheduled task deletion failed!\n\n' + str(e), 'Info', 0)
else:
MessageBox(None, 'No args specified!\n\n[WITH ADMING RIGHTS]\nUsage: python monitor.py install', 'Info', 0)
pass