Skip to content

Commit

Permalink
Initial Cloudarmor typing. (aerleon#260)
Browse files Browse the repository at this point in the history
  • Loading branch information
ankenyr committed Mar 11, 2023
1 parent f428e20 commit 5db124d
Showing 1 changed file with 27 additions and 19 deletions.
46 changes: 27 additions & 19 deletions aerleon/lib/cloudarmor.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,18 @@

import copy
import json
import sys
from typing import Dict, List, Set, Tuple

from absl import logging

from aerleon.lib import aclgenerator
from aerleon.lib.policy import Policy

if sys.version_info < (3, 8):
from typing_extensions import TypedDict
else:
from typing import TypedDict

# Generic error class
class Error(aclgenerator.Error):
Expand All @@ -28,6 +35,14 @@ class UnsupportedFilterTypeError(Error):
"""Raised when unsupported filter type (i.e address family) is specified."""


RuleMatchConfig = TypedDict("RuleMatchConfig", {"srcIpRanges": "list[str]"})
RuleMatch = TypedDict("RuleMatch", {"config": RuleMatchConfig, "versionedExpr": str})
PolicyRule = TypedDict(
"PolicyRule",
{"action": str, "description": str, "match": RuleMatch, "preview": bool, "priority": int},
)


class Term(aclgenerator.Term):
"""Generates the Term for CloudArmor."""

Expand All @@ -38,16 +53,16 @@ class Term(aclgenerator.Term):

_MAX_TERM_COMMENT_LENGTH = 64

def __init__(self, term, address_family='inet', verbose=True):
def __init__(self, term, address_family='inet', verbose=True) -> None:
super().__init__(term)
self.term = term
self.address_family = address_family
self.verbose = verbose

def __str__(self):
def __str__(self) -> str:
return ''

def ConvertToDict(self, priority_index):
def ConvertToDict(self, priority_index) -> List[PolicyRule]:
"""Converts term to dictionary representation of CloudArmor's JSON format.
Takes all of the attributes associated with a term (match, action, etc) and
Expand All @@ -58,15 +73,15 @@ def ConvertToDict(self, priority_index):
Args:
priority_index: An integer priority value assigned to the term. In case
the term is split into i sub-terms, the ith sub-term has
priority = priority_index + i
the term is split into i sub-terms, the ith sub-term has
priority = priority_index + i
Returns:
A list of dicts where each dict is a term
Raises:
UnsupportedFilterTypeError: Raised when an unsupported filter type is
specified
specified
"""
term_dict = {}
rules = []
Expand Down Expand Up @@ -137,14 +152,10 @@ def ConvertToDict(self, priority_index):
}
rules.append(rule)

# TODO(robankeny@): Review this log entry to make it cleaner/more useful.
# Right now, it prints the entire term which might be huge
if len(source_addr_chunks) > 1:
logging.debug(
'Current term [%s] was split into %d sub-terms since '
f'Current term {self.term.name} was split into {len(source_addr_chunks)} sub-terms since '
'_MAX_IP_RANGES_PER_TERM was exceeded',
str(term_dict),
len(source_addr_chunks),
)
return rules

Expand All @@ -165,7 +176,7 @@ class CloudArmor(aclgenerator.ACLGenerator):
# Maps indiviudal filter options to their index positions in the POL header
_FILTER_OPTIONS_MAP = {'filter_type': 0}

def _BuildTokens(self):
def _BuildTokens(self) -> Tuple[Set[str], Dict[str, Set[str]]]:
"""Build supported tokens for platform.
Returns:
Expand All @@ -190,7 +201,7 @@ def _BuildTokens(self):
supported_sub_tokens = {'action': {'accept', 'deny'}}
return supported_tokens, supported_sub_tokens

def _TranslatePolicy(self, pol, exp_info):
def _TranslatePolicy(self, pol: Policy, exp_info: int) -> None:
"""Translates a Aerleon policy into a CloudArmor-specific data structure.
Takes in a POL file, parses each term and populates the cloudarmor_policies
Expand All @@ -201,15 +212,12 @@ def _TranslatePolicy(self, pol, exp_info):
pol: A Policy() object representing a given POL file.
exp_info: An int that specifies number of weeks till policy expiry.
Returns:
N.A.
Raises:
ExceededMaxTermsError: Raised when the number of terms in a policy exceed
_MAX_RULES_PER_POLICY.
_MAX_RULES_PER_POLICY.
UnsupportedFilterTypeError: Raised when an unsupported filter type is
specified
specified
"""
self.cloudarmor_policies = []

Expand Down Expand Up @@ -261,7 +269,7 @@ def _TranslatePolicy(self, pol, exp_info):
self._MAX_RULES_PER_POLICY,
)

def __str__(self):
def __str__(self) -> str:
"""Return the JSON blob for CloudArmor."""

out = '%s\n\n' % (
Expand Down

0 comments on commit 5db124d

Please sign in to comment.