Skip to content

Commit

Permalink
[Typing] Initial Pcap typing (aerleon#269)
Browse files Browse the repository at this point in the history
* Initial pcap typing.

* .

* Fixing some typing in pcap.py

* Reformatted pcap.py.

* Removing more Any usage in typing.

* Formatting.
  • Loading branch information
ankenyr committed Mar 16, 2023
1 parent 5982440 commit cc4ece5
Showing 1 changed file with 21 additions and 13 deletions.
34 changes: 21 additions & 13 deletions aerleon/lib/pcap.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,13 @@
"""


from typing import Dict, List, Set, Tuple, Union

from absl import logging

from aerleon.lib import aclgenerator
from aerleon.lib.nacaddr import IPv4, IPv6
from aerleon.lib.policy import Policy, Term


class Error(aclgenerator.Error):
Expand Down Expand Up @@ -86,7 +90,9 @@ class Term(aclgenerator.Term):
'hopopt': 'ip6 protochain 0',
}

def __init__(self, term, filter_name, af='inet', direction=''):
def __init__(
self, term: Term, filter_name: str, af: str = 'inet', direction: str = ''
) -> None:
"""Setup a new term.
Args:
Expand All @@ -106,7 +112,7 @@ def __init__(self, term, filter_name, af='inet', direction=''):
self.af = af
self.direction = direction

def __str__(self):
def __str__(self) -> str:
"""Render config output from this term object."""
conditions = []

Expand Down Expand Up @@ -192,7 +198,7 @@ def __str__(self):

return cond + '\n'

def _CheckAddressAf(self, addrs):
def _CheckAddressAf(self, addrs: List[Union[IPv4, IPv6]]) -> List[Union[str, IPv4, IPv6]]:
"""Verify that the requested address-family matches the address's family."""
if not addrs:
return ['any']
Expand All @@ -206,7 +212,7 @@ def _CheckAddressAf(self, addrs):
return af_addrs

@staticmethod
def JoinConditionals(condition_list, operator):
def JoinConditionals(condition_list: List[str], operator: str) -> str:
"""Join conditionals using the specified operator.
Filters out empty elements and blank strings.
Expand All @@ -228,7 +234,9 @@ def JoinConditionals(condition_list, operator):
res = '(%s)' % (op.join(condition_list))
return res

def _GenerateAddrStatement(self, addrs, exclude_addrs):
def _GenerateAddrStatement(
self, addrs: List[Union[str, IPv6, IPv4]], exclude_addrs: List[Union[str, IPv6, IPv4]]
) -> str:
addrlist = []
for d in addrs:
if d != 'any' and str(d) != '::/0':
Expand All @@ -251,10 +259,10 @@ def _GenerateAddrStatement(self, addrs, exclude_addrs):
else:
return Term.JoinConditionals(addrlist, 'or')

def _GenerateProtoStatement(self, protocols):
def _GenerateProtoStatement(self, protocols: List[str]) -> str:
return Term.JoinConditionals([self._PROTO_TABLE[p] for p in protocols], 'or')

def _GeneratePortStatement(self, ports, direction):
def _GeneratePortStatement(self, ports: List[Tuple[int, int]], direction: str) -> str:
conditions = []
# term.destination_port is a list of tuples containing the start and end
# ports of the port range. In the event it is a single port, the start
Expand All @@ -266,7 +274,7 @@ def _GeneratePortStatement(self, ports, direction):
conditions.append('%s portrange %s-%s' % (direction, port_tuple[0], port_tuple[1]))
return Term.JoinConditionals(conditions, 'or')

def _GenerateTcpOptions(self, options):
def _GenerateTcpOptions(self, options: List[str]) -> str:
opts = [str(x) for x in options]
tcp_flags_set = []
tcp_flags_check = []
Expand All @@ -289,7 +297,7 @@ def _GenerateTcpOptions(self, options):
)
return ''

def _GenerateIcmpType(self, icmp_types, icmp_code):
def _GenerateIcmpType(self, icmp_types: List[int], icmp_code: List[int]) -> str:
rtr_str = ''
if icmp_types:
code_strings = ['']
Expand All @@ -314,7 +322,7 @@ class PcapFilter(aclgenerator.ACLGenerator):
SUFFIX = '.pcap'
_TERM = Term

def __init__(self, *args, **kwargs):
def __init__(self, *args, **kwargs) -> None:
"""Initialize a PcapFilter generator.
Takes standard ACLGenerator arguments, as well as an 'invert' kwarg. If
Expand All @@ -332,7 +340,7 @@ def __init__(self, *args, **kwargs):
del kwargs['invert']
super().__init__(*args, **kwargs)

def _BuildTokens(self):
def _BuildTokens(self) -> Tuple[Set[str], Dict[str, Set[str]]]:
"""Build supported tokens for platform.
Returns:
Expand Down Expand Up @@ -363,7 +371,7 @@ def _BuildTokens(self):

return supported_tokens, supported_sub_tokens

def _TranslatePolicy(self, pol, exp_info):
def _TranslatePolicy(self, pol: Policy, exp_info: int) -> None:
self.pcap_policies = []
good_afs = ['inet', 'inet6', 'mixed']
good_options = ['in', 'out']
Expand Down Expand Up @@ -430,7 +438,7 @@ def _TranslatePolicy(self, pol, exp_info):

self.pcap_policies.append((header, filter_name, filter_type, accept_terms, deny_terms))

def __str__(self):
def __str__(self) -> str:
"""Render the output of the PF policy into config."""
target = []

Expand Down

0 comments on commit cc4ece5

Please sign in to comment.