-
Notifications
You must be signed in to change notification settings - Fork 74
/
link_rate.py
172 lines (149 loc) · 7.22 KB
/
link_rate.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
# coding=utf-8
from collections import namedtuple
from copy import deepcopy
from os import listdir, path
from random import randint
from re import match
from six import iteritems
from netutils_linux_monitoring.base_top import BaseTop
from netutils_linux_monitoring.colors import Color
from netutils_linux_monitoring.layout import make_table
from netutils_linux_monitoring.pci import PCI
Stat = namedtuple('Stat', ['filename', 'shortname'])
class LinkRateTop(BaseTop):
""" Utility for monitoring network devices' pps and error rate """
stats = [
Stat('rx_packets', 'rx-packets'),
Stat('rx_bytes', 'rx-bytes'),
Stat('rx_errors', 'rx-errors'),
Stat('rx_dropped', 'dropped'),
Stat('rx_missed_errors', 'missed'),
Stat('rx_fifo_errors', 'fifo'),
Stat('rx_length_errors', 'length'),
Stat('rx_over_errors', 'overrun'),
Stat('rx_crc_errors', 'crc'),
Stat('rx_frame_errors', 'frame'),
Stat('tx_packets', 'tx-packets'),
Stat('tx_bytes', 'tx-bytes'),
Stat('tx_errors', 'tx-errors')
]
align_map = None
def __init__(self, pci=None):
BaseTop.__init__(self)
self.pci = pci
def make_parser(self, parser=None):
if not parser:
parser = BaseTop.make_base_parser()
parser.add_argument('--assert', '--assert-mode', default=False, dest='assert_mode',
help='Stops running after errors detected.')
parser.add_argument('--dev', '--devices', default='', dest='devices',
help='Comma-separated list of devices to monitor.')
parser.add_argument('--device-regex', default='^.*$',
help='Regex-mask for devices to monitor.')
parser.add_argument('-s', '--simple', default=False, dest='simple_mode', action='store_true',
help='Hides different kinds of error, showing only general counters.')
parser.add_argument('--rx', '--rx-only', dest='rx_only', default=False, action='store_true',
help='Hides tx-counters')
parser.add_argument('--bits', default=False, action='store_true')
parser.add_argument('--bytes', default=False, action='store_true')
parser.add_argument('--kbits', default=False, action='store_true')
parser.add_argument('--mbits', default=True, action='store_true')
return parser
def parse(self):
return dict((dev, self.__parse_dev__(dev)) for dev in self.options.devices)
def eval(self):
self.diff = deepcopy(self.current)
for dev, data in iteritems(self.current):
for stat in self.stats:
if self.options.random:
self.diff[dev][stat] = randint(0, 10000)
else:
self.diff[dev][stat] = data[stat] - \
self.previous[dev][stat]
def make_header(self):
return ['Device'] + [stat.shortname for stat in self.stats]
def colorize_stat(self, stat, value):
return self.color.colorize(value, 1, 1) if 'errors' in stat.filename or 'dropped' in stat.filename else value
def colorize_stats(self, dev, repr_source):
return [self.colorize_stat(stat, repr_source[dev][stat]) for stat in self.stats]
def make_rows(self):
if not self.pci.devices:
self.pci.devices = self.pci.node_dev_dict(self.options.devices, self.options.random)
repr_source = self.repr_source()
for dev in self.options.devices:
dev_node = self.pci.devices.get(dev)
dev_color = self.color.COLORS_NODE.get(dev_node) if self.options.color else ""
_dev = self.color.wrap(dev, dev_color)
yield [_dev] + self.colorize_stats(dev, repr_source)
def __repr__(self):
table = make_table(self.make_header(), self.align_map, list(self.make_rows()))
return self.__repr_table__(table)
def __parse_dev__(self, dev):
return dict((stat, self.__parse_dev_stat__(dev, stat)) for stat in self.stats)
def __parse_dev_stat__(self, dev, stat):
if self.options.random:
return randint(1, 10000)
filename = '/sys/class/net/{0}/statistics/{1}'.format(dev, stat.filename)
with open(filename) as devfile:
file_value = int(devfile.read().strip())
if 'bytes' in stat.filename:
return int(self.__repr_bytes(file_value))
return file_value
def __repr_bytes(self, value):
if self.options.bytes:
return value
if self.options.bits:
return value * 8
elif self.options.kbits:
return value * 8 / 1024
return value * 8 / 1024 / 1024
@staticmethod
def __indent__(column, value, maxvalue=0):
""" May be used for special indent for first column """
return "{0:<14}".format(value) if column == maxvalue else "{0:>11}".format(value)
def devices_list_validate(self, dev):
return self.options.random or path.isdir('/sys/class/net/{0}'.format(dev))
def devices_list_regex(self):
""" Returns list of network devices matching --device-regex """
net_dev_list = listdir('/sys/class/net/')
return [dev for dev in net_dev_list if match(self.options.device_regex, dev)]
def devices_list(self):
""" Return list of network devices regarding --devices / --device-regex """
if self.options.devices:
devices = self.options.devices.split(',')
else:
devices = self.devices_list_regex()
return [dev for dev in devices if self.devices_list_validate(dev)]
def post_optparse(self):
""" Asserting and applying parsing options """
self.options.devices = self.devices_list()
if not self.options.devices:
raise ValueError("No devices've been specified")
if self.options.rx_only:
self.stats = [
stat for stat in self.stats if stat.filename.startswith('rx')]
if self.options.simple_mode:
simple_stats = ('rx_packets', 'rx_bytes', 'rx_errors', 'tx_packets', 'tx_bytes', 'tx_errors')
self.stats = [
stat for stat in self.stats if stat.filename in simple_stats]
self.unit_change()
self.header = self.make_header()
self.align_map = ['l'] + ['r'] * (len(self.header) - 1)
if not self.pci:
self.pci = PCI()
self.pci.devices = self.pci.node_dev_dict(self.options.devices, self.options.random)
self.color = Color(topology=None, enabled=self.options.color)
def unit_change(self):
if not any([self.options.bits, self.options.kbits, self.options.mbits]):
return
for i, stat in enumerate(self.stats):
if 'bytes' not in stat.shortname:
continue
if self.options.bytes:
continue
elif self.options.bits:
self.stats[i] = Stat(stat.filename, stat.shortname.replace('bytes', 'bits'))
elif self.options.kbits:
self.stats[i] = Stat(stat.filename, stat.shortname.replace('bytes', 'kbits'))
elif self.options.mbits:
self.stats[i] = Stat(stat.filename, stat.shortname.replace('bytes', 'mbits'))