diff --git a/aerleon/lib/arista.py b/aerleon/lib/arista.py index 27b81366..0b6b262f 100644 --- a/aerleon/lib/arista.py +++ b/aerleon/lib/arista.py @@ -16,6 +16,8 @@ """Arista generator.""" +from typing import List + from aerleon.lib import cisco @@ -41,7 +43,7 @@ class Arista(cisco.Cisco): _PROTO_INT = False # Arista omits the "extended" access-list argument. - def _AppendTargetByFilterType(self, filter_name, filter_type): + def _AppendTargetByFilterType(self, filter_name: str, filter_type: str) -> List[str]: """Takes in the filter name and type and appends headers. Args: diff --git a/aerleon/lib/cisco.py b/aerleon/lib/cisco.py index 7f1e30b5..777b806f 100644 --- a/aerleon/lib/cisco.py +++ b/aerleon/lib/cisco.py @@ -17,11 +17,14 @@ """Cisco generator.""" import ipaddress -from typing import Union, cast +from typing import Any, Dict, List, Optional, Set, Tuple, Union, cast from absl import logging from aerleon.lib import aclgenerator, addressbook, nacaddr, summarizer +from aerleon.lib.nacaddr import IPv4, IPv6 +from aerleon.lib.policy import Policy, Term +from aerleon.lib.summarizer import DSMNet _ACTION_TABLE = { 'accept': 'permit', @@ -62,7 +65,9 @@ class ExtendedACLTermError(Error): class TermStandard: """A single standard ACL Term.""" - def __init__(self, term, filter_name, platform='cisco', verbose=True): + def __init__( + self, term: Term, filter_name: str, platform: str = 'cisco', verbose: bool = True + ) -> None: self.term = term self.filter_name = filter_name self.platform = platform @@ -105,7 +110,7 @@ def __init__(self, term, filter_name, platform='cisco', verbose=True): ) self.dscpstring = ' dscp' + self.term.dscp_match - def __str__(self): + def __str__(self) -> str: ret_str = [] # Term verbatim output - this will skip over normal term creation @@ -174,22 +179,22 @@ class ObjectGroup: exit """ - def __init__(self): + def __init__(self) -> None: self.filter_name = '' self.addressbook = addressbook.Addressbook() self.terms = [] - def AddTerm(self, term): + def AddTerm(self, term: Term) -> None: if term.source_address: self.addressbook.AddAddresses('', term.source_address) if term.destination_address: self.addressbook.AddAddresses('', term.destination_address) self.terms.append(term) - def AddName(self, filter_name): + def AddName(self, filter_name: str) -> None: self.filter_name = filter_name - def __str__(self): + def __str__(self) -> str: ret_str = ['\n'] # netgroups will contain two-tuples of group name string and family int. netgroups = set() @@ -414,14 +419,14 @@ class Term(aclgenerator.Term): def __init__( self, - term, - af=4, - proto_int=True, - enable_dsmo=False, - term_remark=True, - platform='cisco', - verbose=True, - ): + term: Term, + af: int = 4, + proto_int: bool = True, + enable_dsmo: bool = False, + term_remark: bool = True, + platform: str = 'cisco', + verbose: bool = True, + ) -> None: self.term = term self.proto_int = proto_int self.options = [] @@ -437,7 +442,7 @@ def __init__( else: self.text_af = 'inet6' - def __str__(self): + def __str__(self) -> str: ret_str = ['\n'] # Don't render icmpv6 protocol terms under inet, or icmp under inet6 @@ -658,7 +663,7 @@ def __str__(self): return '\n'.join(ret_str) - def _GetIpString(self, addr): + def _GetIpString(self, addr: Union[IPv6, IPv4, DSMNet]) -> str: """Formats the address object for printing in the ACL. Args: @@ -685,15 +690,15 @@ def _GetIpString(self, addr): return 'host %s' % (addr.network_address) return addr - def _FormatPort(self, port, proto): + def _FormatPort(self, port: Union[Tuple[()], Tuple[int, int]], proto: Union[int, str]) -> str: """Returns a formatted port string for the range. Args: - port: str list or none, the port range. - proto: str representing proto (tcp, udp, etc). + port: The port range represented as a tuple.. + proto: String or int representing a protocol. Returns: - A string suitable for the ACL. + A string used to filter a single or range of ports.. """ if not port: return '' @@ -707,7 +712,9 @@ def _FormatPort(self, port, proto): return 'range %s %s' % (port0, port1) return 'eq %s' % (port0) - def _FixOptions(self, proto, option): + def _FixOptions( + self, proto: Union[int, str], option: List[Union[str, Any]] + ) -> List[Union[str, Any]]: """Returns a set of options suitable for the given protocol. Fix done: @@ -727,8 +734,17 @@ def _FixOptions(self, proto, option): return sane_options def _TermletToStr( - self, action, proto, saddr, sport, daddr, dport, icmp_type, icmp_code, option - ): + self, + action: str, + proto: Union[int, str], + saddr: str, + sport: str, + daddr: str, + dport: str, + icmp_type: Union[int, str], + icmp_code: Union[int, str], + option: List[str], + ) -> List[str]: """Take the various compenents and turn them into a cisco acl line. Args: @@ -740,7 +756,7 @@ def _TermletToStr( dport: str, the destination port icmp_type: icmp-type numeric specification (if any) icmp_code: icmp-code numeric specification (if any) - option: list or none, optional, eg. 'logging' tokens. + option: list of options, eg. 'logging' tokens. Returns: string of the cisco acl line, suitable for printing. @@ -765,7 +781,7 @@ def _TermletToStr( non_empty_elements = [x for x in all_elements if x] return [' ' + ' '.join(non_empty_elements)] - def _FixConsecutivePorts(self, port_list): + def _FixConsecutivePorts(self, port_list: List[Tuple[int, int]]) -> List[Tuple[int, int]]: """Takes a list of tuples and expands the tuple if the range is two. http://www.cisco.com/warp/public/cc/pd/si/casi/ca6000/tech/65acl_wp.pdf @@ -805,7 +821,7 @@ class ObjectGroupTerm(Term): in the acl. """ - def _FormatPort(self, port, proto): + def _FormatPort(self, port: Union[Tuple[()], Tuple[int, int]], proto: str) -> str: """Returns a formatted port string for the range. Args: @@ -819,7 +835,7 @@ def _FormatPort(self, port, proto): return '' return f'port-group {port[0]}-{port[1]}' - def _GetIpString(self, addr): + def _GetIpString(self, addr: Union[IPv4, IPv6]) -> str: """Formats the address object for printing in the ACL. Args: @@ -842,7 +858,7 @@ class Cisco(aclgenerator.ACLGenerator): _PROTO_INT = True _TERM_REMARK = True - def _BuildTokens(self): + def _BuildTokens(self) -> Tuple[Set[str], Dict[str, Set[str]]]: """Build supported tokens for platform. Returns: @@ -869,7 +885,7 @@ def _BuildTokens(self): ) return supported_tokens, supported_sub_tokens - def _TranslatePolicy(self, pol, exp_info): + def _TranslatePolicy(self, pol: Policy, exp_info: int) -> None: self.cisco_policies = [] # a mixed filter outputs both ipv4 and ipv6 acls in the same output file good_filters = ['extended', 'standard', 'object-group', 'inet6', 'mixed', 'enable_dsmo'] @@ -978,11 +994,11 @@ def _TranslatePolicy(self, pol, exp_info): (header, filter_name, [next_filter], new_terms, obj_target) ) - def _GetObjectGroupTerm(self, term, verbose=True): + def _GetObjectGroupTerm(self, term: Term, verbose: bool = True) -> ObjectGroupTerm: """Returns an ObjectGroupTerm object.""" return ObjectGroupTerm(term, verbose=verbose) - def _AppendTargetByFilterType(self, filter_name, filter_type): + def _AppendTargetByFilterType(self, filter_name: str, filter_type: str) -> List[str]: """Takes in the filter name and type and appends headers. Args: @@ -1017,7 +1033,9 @@ def _AppendTargetByFilterType(self, filter_name, filter_type): ) return target - def _RepositoryTagsHelper(self, target=None, filter_type='', filter_name=''): + def _RepositoryTagsHelper( + self, target: Optional[List[str]] = None, filter_type: str = '', filter_name: str = '' + ) -> List[str]: if target is None: target = [] if filter_type == 'standard' and filter_name.isdigit(): @@ -1030,7 +1048,7 @@ def _RepositoryTagsHelper(self, target=None, filter_type='', filter_name=''): target.extend(aclgenerator.AddRepositoryTags(' remark ', date=False, revision=False)) return target - def __str__(self): + def __str__(self) -> str: target_header = [] target = [] # add the p4 tags diff --git a/aerleon/lib/ciscoasa.py b/aerleon/lib/ciscoasa.py index 813818c8..10f2237a 100644 --- a/aerleon/lib/ciscoasa.py +++ b/aerleon/lib/ciscoasa.py @@ -18,11 +18,13 @@ import ipaddress import re -from typing import cast +from typing import Any, Dict, List, Set, Tuple, Union, cast from absl import logging from aerleon.lib import aclgenerator, cisco, nacaddr, summarizer +from aerleon.lib.nacaddr import IPv4, IPv6 +from aerleon.lib.policy import Policy, Term _ACTION_TABLE = { 'accept': 'permit', @@ -53,7 +55,9 @@ class NoCiscoPolicyError(Error): class Term(cisco.ExtendedTerm): """A single ACL Term.""" - def __init__(self, term, filter_name, af=4, enable_dsmo=False): + def __init__( + self, term: Term, filter_name: str, af: int = 4, enable_dsmo: bool = False + ) -> None: self.term = term self.filter_name = filter_name self.options = [] @@ -61,7 +65,7 @@ def __init__(self, term, filter_name, af=4, enable_dsmo=False): self.af = af self.enable_dsmo = enable_dsmo - def __str__(self): + def __str__(self) -> str: ret_str = ['\n'] # Don't render icmpv6 protocol terms under inet, or icmp under inet6 @@ -193,8 +197,17 @@ def __str__(self): return '\n'.join(ret_str) def _TermletToStr( - self, filter_name, action, proto, saddr, sport, daddr, dport, icmp_type, option - ): + self, + filter_name: str, + action: str, + proto: str, + saddr: Union[str, IPv4, IPv6], + sport: Union[Tuple[()], Tuple[int, int]], + daddr: Union[str, IPv4, IPv6], + dport: Union[Tuple[()], Tuple[int, int]], + icmp_type: str, + option: List[str], + ) -> List[str]: """Take the various compenents and turn them into a cisco acl line. Args: @@ -206,7 +219,7 @@ def _TermletToStr( daddr: str or ipaddress, the destination address dport: str list or none, the destination port icmp_type: icmp-type numeric specification (if any) - option: list or none, optional, eg. 'logging' tokens. + option: list of options, eg. 'logging' tokens. Returns: string of the cisco acl line, suitable for printing. @@ -304,7 +317,7 @@ class CiscoASA(aclgenerator.ACLGenerator): _DEFAULT_PROTOCOL = 'ip' SUFFIX = '.asa' - def _BuildTokens(self): + def _BuildTokens(self) -> Tuple[Set[str], Dict[str, Set[str]]]: """Build supported tokens for platform. Returns: @@ -324,7 +337,7 @@ def _BuildTokens(self): ) return supported_tokens, supported_sub_tokens - def _TranslatePolicy(self, pol, exp_info): + def _TranslatePolicy(self, pol: Policy, exp_info: int) -> None: self.ciscoasa_policies = [] for header, terms in self.policy.filters: filter_name = header.FilterName(self._PLATFORM) @@ -338,7 +351,7 @@ def _TranslatePolicy(self, pol, exp_info): self.ciscoasa_policies.append((header, filter_name, new_terms)) - def __str__(self): + def __str__(self) -> str: target = [] for (header, filter_name, terms) in self.ciscoasa_policies: diff --git a/aerleon/lib/cisconx.py b/aerleon/lib/cisconx.py index 7cdf44ee..b26d49c3 100644 --- a/aerleon/lib/cisconx.py +++ b/aerleon/lib/cisconx.py @@ -15,6 +15,8 @@ # """CiscoNX generator.""" +from typing import List, Optional + from aerleon.lib import aclgenerator, cisco @@ -38,14 +40,16 @@ class CiscoNX(cisco.Cisco): # Protocols should be emitted as they were in the policy (names). _PROTO_INT = False - def _RepositoryTagsHelper(self, target=None, filter_type='', filter_name=''): + def _RepositoryTagsHelper( + self, target: Optional[List[str]] = None, filter_type: str = '', filter_name: str = '' + ) -> List[str]: if target is None: target = [] target.extend(aclgenerator.AddRepositoryTags(' remark ', rid=False, wrap=True)) return target # CiscoNX omits the "extended" access-list argument. - def _AppendTargetByFilterType(self, filter_name, filter_type): + def _AppendTargetByFilterType(self, filter_name: str, filter_type: str) -> List[str]: """Takes in the filter name and type and appends headers. Args: diff --git a/aerleon/lib/ciscoxr.py b/aerleon/lib/ciscoxr.py index 461be9ef..8ee65567 100644 --- a/aerleon/lib/ciscoxr.py +++ b/aerleon/lib/ciscoxr.py @@ -15,8 +15,12 @@ # """Cisco IOS-XR filter renderer.""" +from __future__ import annotations + +from typing import Dict, List, Set, Tuple from aerleon.lib import cisco +from aerleon.lib.policy import Term class CiscoXR(cisco.Cisco): @@ -27,7 +31,7 @@ class CiscoXR(cisco.Cisco): SUFFIX = '.xacl' _PROTO_INT = False - def _AppendTargetByFilterType(self, filter_name, filter_type): + def _AppendTargetByFilterType(self, filter_name: str, filter_type: str) -> List[str]: """Takes in the filter name and type and appends headers. Args: @@ -46,7 +50,7 @@ def _AppendTargetByFilterType(self, filter_name, filter_type): target.append('ipv4 access-list %s' % filter_name) return target - def _BuildTokens(self): + def _BuildTokens(self) -> Tuple[Set[str], Dict[str, Set[str]]]: """Build supported tokens for platform. Returns: @@ -58,7 +62,7 @@ def _BuildTokens(self): return supported_tokens, supported_sub_tokens - def _GetObjectGroupTerm(self, term, verbose=True): + def _GetObjectGroupTerm(self, term: Term, verbose: bool = True) -> CiscoXRObjectGroupTerm: """Returns an ObjectGroupTerm object.""" return CiscoXRObjectGroupTerm(term, platform=self._PLATFORM, verbose=verbose)