-
-
Notifications
You must be signed in to change notification settings - Fork 45
/
lqTools.py
executable file
·156 lines (144 loc) · 5.29 KB
/
lqTools.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
#!/usr/bin/python3
import csv
import io
import ipaddress
import json
import os
import os.path
import subprocess
import warnings
import argparse
import logging
from ispConfig import interfaceA, interfaceB, enableActualShellCommands, upstreamBandwidthCapacityDownloadMbps, upstreamBandwidthCapacityUploadMbps, generatedPNDownloadMbps, generatedPNUploadMbps
def shell(command):
if enableActualShellCommands:
logging.info(command)
commands = command.split(' ')
proc = subprocess.Popen(commands, stdout=subprocess.PIPE)
for line in io.TextIOWrapper(proc.stdout, encoding="utf-8"): # or another encoding
print(line)
else:
print(command)
def safeShell(command):
safelyRan = True
if enableActualShellCommands:
commands = command.split(' ')
proc = subprocess.Popen(commands, stdout=subprocess.PIPE)
for line in io.TextIOWrapper(proc.stdout, encoding="utf-8"): # or another encoding
#logging.info(line)
print(line)
if ("RTNETLINK answers" in line) or ("We have an error talking to the kernel" in line):
safelyRan = False
else:
print(command)
safelyRan = True
return safelyRan
def getQdiscForIPaddress(ipAddress):
qDiscID = ''
foundQdisc = False
with open('statsByCircuit.json', 'r') as j:
subscriberCircuits = json.loads(j.read())
for circuit in subscriberCircuits:
for device in circuit['devices']:
for ipv4 in device['ipv4s']:
strippedIPv4 = ipv4.replace('/32','')
if strippedIPv4 == ipAddress:
qDiscID = circuit['classid']
foundQdisc = True
for ipv6 in device['ipv6s']:
if ipv6 == ipAddress:
qDiscID = circuit['qdisc']
foundQdisc = True
if foundQdisc:
return qDiscID
else:
return None
def printStatsFromIP(ipAddress):
qDiscID = getQdiscForIPaddress(ipAddress)
if qDiscID != None:
interfaces = [interfaceA, interfaceB]
for interface in interfaces:
command = 'tc -s qdisc show dev ' + interface + ' parent ' + qDiscID
commands = command.split(' ')
proc = subprocess.Popen(commands, stdout=subprocess.PIPE)
for line in io.TextIOWrapper(proc.stdout, encoding="utf-8"): # or another encoding
print(line.replace('\n',''))
else:
print("Invalid IP address provided")
def printCircuitClassInfo(ipAddress):
qDiscID = getQdiscForIPaddress(ipAddress)
if qDiscID != None:
print("IP: " + ipAddress + " | Class ID: " + qDiscID)
print()
theClassID = ''
interfaces = [interfaceA, interfaceB]
downloadMin = ''
downloadMax = ''
uploadMin = ''
uploadMax = ''
cburst = ''
burst = ''
for interface in interfaces:
command = 'tc class show dev ' + interface + ' classid ' + qDiscID
commands = command.split(' ')
proc = subprocess.Popen(commands, stdout=subprocess.PIPE)
for line in io.TextIOWrapper(proc.stdout, encoding="utf-8"): # or another encoding
if "htb" in line:
listOfThings = line.split(" ")
if interface == interfaceA:
downloadMin = line.split(' rate ')[1].split(' ')[0]
downloadMax = line.split(' ceil ')[1].split(' ')[0]
burst = line.split(' burst ')[1].split(' ')[0]
cburst = line.split(' cburst ')[1].replace('\n','')
else:
uploadMin = line.split(' rate ')[1].split(' ')[0]
uploadMax = line.split(' ceil ')[1].split(' ')[0]
print("Download rate/ceil: " + downloadMin + "/" + downloadMax)
print("Upload rate/ceil: " + uploadMin + "/" + uploadMax)
print("burst/cburst: " + burst + "/" + cburst)
else:
download = min(upstreamBandwidthCapacityDownloadMbps, generatedPNDownloadMbps)
upload = min(upstreamBandwidthCapacityUploadMbps, generatedPNUploadMbps)
bwString = str(download) + '/' + str(upload)
print("Invalid IP address provided (default queue limit is " + bwString + " Mbps)")
def findClassIDForCircuitByIP(data, inputIP, classID):
for node in data:
if 'circuits' in data[node]:
for circuit in data[node]['circuits']:
for device in circuit['devices']:
if device['ipv4s']:
for ipv4 in device['ipv4s']:
if ipv4 == inputIP:
classID = circuit['qdisc']
if device['ipv6s']:
for ipv6 in device['ipv6s']:
if inputIP == ipv6:
classID = circuit['qdisc']
# Recursive call this function for children nodes attached to this node
if 'children' in data[node]:
classID = findClassIDForCircuitByIP(data[node]['children'], inputIP, classID)
return classID
def findClassIDForCircuitByID(data, inputID, classID):
for node in data:
if 'circuits' in data[node]:
for circuit in data[node]['circuits']:
if circuit['circuitID'] == inputID:
classID = circuit['qdisc']
# Recursive call this function for children nodes attached to this node
if 'children' in data[node]:
classID = findClassIDForCircuitByID(data[node]['children'], inputID, classID)
return classID
if __name__ == '__main__':
parser = argparse.ArgumentParser()
subparsers = parser.add_subparsers(dest='command')
planFromIP = subparsers.add_parser('show-active-plan-from-ip', help="Provide tc class info by IP",)
planFromIP.add_argument('ip', type=str,)
statsFromIP = subparsers.add_parser('tc-statistics-from-ip', help="Provide tc qdisc stats by IP",)
statsFromIP.add_argument('ip', type=str,)
args = parser.parse_args()
if (args.command == 'tc-statistics-from-ip'):
printStatsFromIP(args.ip)
elif (args.command == 'show-active-plan-from-ip'):
printCircuitClassInfo(args.ip)
else:
print("Invalid parameters. Use --help to learn more.")