diff --git a/aerleon/lib/cloudarmor.py b/aerleon/lib/cloudarmor.py index 96e3d840..ce68f49e 100644 --- a/aerleon/lib/cloudarmor.py +++ b/aerleon/lib/cloudarmor.py @@ -9,11 +9,18 @@ import copy import json +import sys +from typing import Dict, List, Set, Tuple from absl import logging from aerleon.lib import aclgenerator +from aerleon.lib.policy import Policy +if sys.version_info < (3, 8): + from typing_extensions import TypedDict +else: + from typing import TypedDict # Generic error class class Error(aclgenerator.Error): @@ -28,6 +35,14 @@ class UnsupportedFilterTypeError(Error): """Raised when unsupported filter type (i.e address family) is specified.""" +RuleMatchConfig = TypedDict("RuleMatchConfig", {"srcIpRanges": "list[str]"}) +RuleMatch = TypedDict("RuleMatch", {"config": RuleMatchConfig, "versionedExpr": str}) +PolicyRule = TypedDict( + "PolicyRule", + {"action": str, "description": str, "match": RuleMatch, "preview": bool, "priority": int}, +) + + class Term(aclgenerator.Term): """Generates the Term for CloudArmor.""" @@ -38,16 +53,16 @@ class Term(aclgenerator.Term): _MAX_TERM_COMMENT_LENGTH = 64 - def __init__(self, term, address_family='inet', verbose=True): + def __init__(self, term, address_family='inet', verbose=True) -> None: super().__init__(term) self.term = term self.address_family = address_family self.verbose = verbose - def __str__(self): + def __str__(self) -> str: return '' - def ConvertToDict(self, priority_index): + def ConvertToDict(self, priority_index) -> List[PolicyRule]: """Converts term to dictionary representation of CloudArmor's JSON format. Takes all of the attributes associated with a term (match, action, etc) and @@ -58,15 +73,15 @@ def ConvertToDict(self, priority_index): Args: priority_index: An integer priority value assigned to the term. In case - the term is split into i sub-terms, the ith sub-term has - priority = priority_index + i + the term is split into i sub-terms, the ith sub-term has + priority = priority_index + i Returns: A list of dicts where each dict is a term Raises: UnsupportedFilterTypeError: Raised when an unsupported filter type is - specified + specified """ term_dict = {} rules = [] @@ -137,14 +152,10 @@ def ConvertToDict(self, priority_index): } rules.append(rule) - # TODO(robankeny@): Review this log entry to make it cleaner/more useful. - # Right now, it prints the entire term which might be huge if len(source_addr_chunks) > 1: logging.debug( - 'Current term [%s] was split into %d sub-terms since ' + f'Current term {self.term.name} was split into {len(source_addr_chunks)} sub-terms since ' '_MAX_IP_RANGES_PER_TERM was exceeded', - str(term_dict), - len(source_addr_chunks), ) return rules @@ -165,7 +176,7 @@ class CloudArmor(aclgenerator.ACLGenerator): # Maps indiviudal filter options to their index positions in the POL header _FILTER_OPTIONS_MAP = {'filter_type': 0} - def _BuildTokens(self): + def _BuildTokens(self) -> Tuple[Set[str], Dict[str, Set[str]]]: """Build supported tokens for platform. Returns: @@ -190,7 +201,7 @@ def _BuildTokens(self): supported_sub_tokens = {'action': {'accept', 'deny'}} return supported_tokens, supported_sub_tokens - def _TranslatePolicy(self, pol, exp_info): + def _TranslatePolicy(self, pol: Policy, exp_info: int) -> None: """Translates a Aerleon policy into a CloudArmor-specific data structure. Takes in a POL file, parses each term and populates the cloudarmor_policies @@ -201,15 +212,12 @@ def _TranslatePolicy(self, pol, exp_info): pol: A Policy() object representing a given POL file. exp_info: An int that specifies number of weeks till policy expiry. - Returns: - N.A. - Raises: ExceededMaxTermsError: Raised when the number of terms in a policy exceed - _MAX_RULES_PER_POLICY. + _MAX_RULES_PER_POLICY. UnsupportedFilterTypeError: Raised when an unsupported filter type is - specified + specified """ self.cloudarmor_policies = [] @@ -261,7 +269,7 @@ def _TranslatePolicy(self, pol, exp_info): self._MAX_RULES_PER_POLICY, ) - def __str__(self): + def __str__(self) -> str: """Return the JSON blob for CloudArmor.""" out = '%s\n\n' % (