Skip to content

Commit

Permalink
[Typing] Initial Ipset typing (aerleon#263)
Browse files Browse the repository at this point in the history
  • Loading branch information
ankenyr committed Mar 14, 2023
1 parent 84436c7 commit 1d41ec1
Showing 1 changed file with 25 additions and 10 deletions.
35 changes: 25 additions & 10 deletions aerleon/lib/ipset.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@
"""

import string
from typing import Any, List, Tuple, Union

from aerleon.lib import iptables, nacaddr
from aerleon.lib.nacaddr import IPv4, IPv6


class Error(iptables.Error):
Expand All @@ -41,7 +43,7 @@ class Term(iptables.Term):
_COMMENT_FORMAT = string.Template('-A $filter -m comment --comment "$comment"')
_FILTER_TOP_FORMAT = string.Template('-A $filter')

def __init__(self, *args, **kwargs):
def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
# This stores tuples of set name and set contents, keyed by direction.
# For example:
Expand All @@ -50,8 +52,17 @@ def __init__(self, *args, **kwargs):
self.addr_sets = {}

def _CalculateAddresses(
self, src_addr_list, src_addr_exclude_list, dst_addr_list, dst_addr_exclude_list
):
self,
src_addr_list: List[Union[IPv4, IPv6]],
src_addr_exclude_list: List[Union[IPv4, IPv6]],
dst_addr_list: List[Union[IPv4, IPv6]],
dst_addr_exclude_list: List[Union[IPv4, IPv6]],
) -> Tuple[
List[Union[IPv4, IPv6]],
List[Union[IPv4, IPv6]],
List[Union[IPv4, IPv6]],
List[Union[IPv4, IPv6]],
]:
"""Calculates source and destination address list for a term.
Since ipset is very efficient at matching large number of
Expand Down Expand Up @@ -90,7 +101,13 @@ def _CalculateAddresses(
)
return (src_addr_list, [], dst_addr_list, [])

def _CalculateAddrList(self, addr_list, addr_exclude_list, target_af, direction):
def _CalculateAddrList(
self,
addr_list: List[Union[IPv4, IPv6]],
addr_exclude_list: List[Any],
target_af: int,
direction: str,
) -> List[Union[IPv4, IPv6]]:
"""Calculates and stores address list for target AF and direction.
Args:
Expand Down Expand Up @@ -119,7 +136,7 @@ def _CalculateAddrList(self, addr_list, addr_exclude_list, target_af, direction)
addr_list = [self._all_ips]
return addr_list

def _GenerateAddressStatement(self, src_addr, dst_addr):
def _GenerateAddressStatement(self, src_addr: IPv4, dst_addr: IPv4) -> Tuple[str, str]:
"""Returns the address section of an individual iptables rule.
See _CalculateAddresses documentation. Three cases are possible here,
Expand Down Expand Up @@ -157,7 +174,7 @@ def _GenerateAddressStatement(self, src_addr, dst_addr):
dst_addr_stmt = '-d %s/%d' % (dst_addr.network_address, dst_addr.prefixlen)
return (src_addr_stmt, dst_addr_stmt)

def _GenerateSetName(self, term_name, suffix):
def _GenerateSetName(self, term_name: str, suffix: str) -> str:
if self.af == 'inet6':
suffix += '-v6'
if len(term_name) + len(suffix) + 1 > self._SET_MAX_LENGTH:
Expand All @@ -177,9 +194,7 @@ class Ipset(iptables.Iptables):
_MARKER_END = '# end:ipset-rules'
_GOOD_OPTIONS = ['nostate', 'abbreviateterms', 'truncateterms', 'noverbose', 'exists']

# TODO(vklimovs): some not trivial processing is happening inside this
# __str__, replace with explicit method
def __str__(self):
def __str__(self) -> str:
# Actual rendering happens in __str__, so it has to be called
# before we do set specific part.
iptables_output = super().__str__()
Expand All @@ -192,7 +207,7 @@ def __str__(self):
output.append(iptables_output)
return '\n'.join(output)

def _GenerateSetConfig(self, term):
def _GenerateSetConfig(self, term: Term) -> List[str]:
"""Generates set configuration for supplied term.
Args:
Expand Down

0 comments on commit 1d41ec1

Please sign in to comment.