Skip to content

Asgarde allows simplifying error handling with Apache Beam Python, with less code, more concise and expressive code.

License

Notifications You must be signed in to change notification settings

tosun-si/pasgarde

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

3 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Logo

Asgarde

This module allows simplifying error handling with Apache Beam Python.

Versions compatibility between Beam and Asgarde

Asgarde Beam
0.16.0 >= 2.37.0

Installation of project

The project is hosted on PyPi repository.
You can install it with all the build tools compatibles with PyPi and pip.

PyPi

Example with pip command line from bash
pip install asgarde==0.16.0
Example with requirements.txt

requirements.txt file

asgarde==0.16.0
pip install -r requirements.txt
Example with Pipenv

PipFile

[[source]]
url = "https://pypi.org/simple"
verify_ssl = true
name = "pypi"

[packages]
asgarde = "==0.16.0"

[requires]
python_version = "3.8"
pipenv shell

pipenv install
  • pipenv shell creates a virtual env
  • pipenv install installs all the packages specified in the Pipfile
  • A PipFile.lock is generated with a hash on installed packages

https://pipenv.pypa.io/en/latest/

Example of native error handling with Beam

The following example shows error handling in each step with usual Beam code.

@dataclass
class TeamInfo:
    name: str
    country: str
    city: str

@dataclass
class Failure:
    pipeline_step: str
    input_element: str
    exception: Exception

team_names = [
    'PSG',
    'OL',
    'Real',
    'ManU'
]
    
team_countries = {
    'PSG': 'France',
    'OL': 'France',
    'Real': 'Spain',
    'ManU': 'England'
}

team_cities = {
    'PSG': 'Paris',
    'OL': 'France',
    'Real': 'Madrid',
    'ManU': 'Manchester'
}

class MapToTeamWithCountry(DoFn):

    def process(self, element, *args, **kwargs):
        try:
            team_name: str = element

            yield TeamInfo(
                name=team_name,
                country=team_countries[team_name],
                city=''
            )
        except Exception as err:
            failure = Failure(
                pipeline_step="Map 1",
                input_element=element,
                exception=err
            )

            yield pvalue.TaggedOutput(FAILURES, failure)


class MapToTeamWithCity(DoFn):

    def process(self, element, *args, **kwargs):
        try:
            team_info: TeamInfo = element
            city: str = team_cities[team_info.name]

            yield TeamInfo(
                name=team_info.name,
                country=team_info.country,
                city=city
            )
        except Exception as err:
            failure = Failure(
                pipeline_step="Map 2",
                input_element=element,
                exception=err
            )

            yield pvalue.TaggedOutput(FAILURES, failure)


class FilterFranceTeams(DoFn):

    def process(self, element, *args, **kwargs):
        try:
            team_info: TeamInfo = element

            if team_info.country == 'France':
                yield element
        except Exception as err:
            failure = Failure(
                pipeline_step="Filter France teams",
                input_element=element,
                exception=err
            )

            yield pvalue.TaggedOutput(FAILURES, failure)

# In Beam pipeline.
input_teams: PCollection[str] = p | 'Read' >> beam.Create(team_names)

outputs_map1, failures_map1 = (input_teams | 'Map to team with country' >> ParDo(MapToTeamWithCountry())
                               .with_outputs(FAILURES, main='outputs'))

outputs_map2, failures_map2 = (outputs_map1 | 'Map to team with city' >> ParDo(MapToTeamWithCity())
                               .with_outputs(FAILURES, main='outputs'))

outputs_filter, failures_filter = (outputs_map2 | 'Filter France teams' >> ParDo(FilterFranceTeams())
                                   .with_outputs(FAILURES, main='outputs'))

all_failures = (failures_map1, failures_map2, failures_filter) | 'All Failures PCollections' >> beam.Flatten()

This example starts with an input PCollection containing team names.
Then 3 operations and steps are applied : 2 maps and 1 filter.

For each operation a custom DoFn class is proposed and must override process function containing the transformation logic.
A try except bloc is added to catch all the possible errors.
In the Except bloc a Failure object is built with input element and current exception. This object is then added on a tuple tag dedicated to errors.
This tag mechanism allows having multi sink in the pipeline and a dead letter queue for failures.

There are some inconveniences :

  • We have to repeat many technical codes and same logic like try except bloc, tuple tags, failure logic and all this logic can be centralized.
  • If we want to intercept all the possible errors in the pipeline, we have to repeat the recovery of output and failure in each step.
  • All the failures PCollection must be concatenated at end.
  • The code is verbose.

The repetition of technical codes is error-prone and less maintainable.

Example of error handling using Asgarde library

# Beam pipeline with Asgarde library.
input_teams: PCollection[str] = p | 'Read' >> beam.Create(team_names)

result = (CollectionComposer.of(input_teams)
            .map('Map with country', lambda tname: TeamInfo(name=tname, country=team_countries[tname], city=''))
            .map('Map with city', lambda tinfo: TeamInfo(name=tinfo.name, country=tinfo.country, city=team_cities[tinfo.name]))
            .filter('Filter french team', lambda tinfo: tinfo.country == 'France'))

result_outputs: PCollection[TeamInfo] = result.outputs
result_failures: PCollection[Failure] = result.failures

CollectionComposer class

Asgarde proposes a CollectionComposer wrapper class instantiated from a PCollection.

Operators exposed by CollectionComposer class

The CollectionComposer class exposes the following operators : map, flatMap and filter.

These classical operators takes a function, the implementation can be :

  • A lambda expression
  • A method having the same signature of the expected function

Failure object exposed by Asgarde

Behind the scene, for each step the CollectionComposer class adds try except bloc and tuple tag logic with output and failure sinks.

The bad sink is based on a Failure object proposed by the library :

@dataclass
class Failure:
    pipeline_step: str
    input_element: str
    exception: Exception

This object contains the current pipeline step name, input element with string form and current exception.

Input element on Failure object are built following these rules :

  • If the current element in the PCollection is a dict, the Json string form of this dict is retrieved
  • For all others types, the string form of object is retrieved. If developers want to bring their own serialization logic, they have to override __str__ method in the object, example for a dataclass :
import dataclasses
import json
from dataclasses import dataclass

@dataclass
class Team:
    name: str

    def __str__(self) -> str:
        return json.dumps(dataclasses.asdict(self))

Result of CollectionComposer flow

The CollectionComposer class after applying and chaining different operations, returns a tuple with :

  • Output PCollection
  • Failures PCollection
result = (CollectionComposer.of(input_teams)
            .map('Map with country', lambda tname: TeamInfo(name=tname, country=team_countries[tname], city=''))
            .map('Map with city', lambda tinfo: TeamInfo(name=tinfo.name, country=tinfo.country, city=team_cities[tinfo.name]))
            .filter('Filter french team', lambda tinfo: tinfo.country == 'France'))

result_outputs: PCollection[TeamInfo] = result.outputs
result_failures: PCollection[Failure] = result.failures

Example of a flow with side inputs

Asgarde allows applying transformations with error handling and passing side inputs.
The syntax is the same as usual Beam pipeline with AsDict or AsList passed as function parameters.

def to_team_with_city(self, team_name: str, team_countries: Dict[str, str]) -> TeamInfo:
    return TeamInfo(name=team_name, country=team_countries[team_name], city='')

team_countries = {
    'PSG': 'France',
    'OL': 'France',
    'Real': 'Spain',
    'ManU': 'England'
}

# Side inputs.
countries_side_inputs = p | 'Countries' >> beam.Create(team_countries)

# Beam Pipeline.
result = (CollectionComposer.of(input_teams)
            .map('Map with country', self.to_team_with_city, team_countries=AsDict(countries_side_inputs))
            .map('Map with city', lambda ti: TeamInfo(name=ti.name, country=ti.country, city=team_cities[ti.name]))
            .filter('Filter french team', lambda ti: ti.country == 'France'))

result_outputs: PCollection[str] = result.outputs
result_failures: PCollection[Failure] = result.failures

Asgarde and error handling with Beam DoFn lifecyle

Asgarde allows interacting with DoFn lifecycle while chaining transformation with error handling, example :

(CollectionComposer.of(input_teams)
     .map('Map to Team info',
          input_element_mapper=lambda team_name: TeamInfo(name=team_name, country='test', city='test'),
          setup_action=lambda: print('Setup action'),
          start_bundle_action=lambda: print('Start bundle action'),
          finish_bundle_action=lambda: print('Finish bundle action'),
          teardown_action=lambda: print('Teardown action'))
     )

The map and flat_map methods of CollectionComposer class propose the following functions to interact with DoFn lifecycle :

  • setup_action
  • start_bundle_action
  • finish_bundle_action
  • teardown_action

These functions take a function without input parameter and return None, it corresponds to an action executed in the dedicated lifecycle method :

https://beam.apache.org/documentation/transforms/python/elementwise/pardo/

Advantage of using Asgarde

Asgarde presents the following advantages :

  • Simplifies error handling with less code and more expressive and concise code
  • No need to repeat same technical code for error handling like try except bloc, tuple tags and concatenation of all the pipeline failures
  • Allows interacting with Beam lifecycle while chaining the transformation and error handling

About

Asgarde allows simplifying error handling with Apache Beam Python, with less code, more concise and expressive code.

Topics

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Languages