Skip to content

Pearcekieser/airflow-tutorial

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

2 Commits
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Airflow tutorial

This tutorial is loosely based on the Airflow tutorial in the official documentation. It will walk you through the basics of starting up Airflow and creating a job.

Setup

Install Airflow

Airflow is installable with pip. It's prepackaged in the virtual environment in environment.yml or you can install it yourself. You may run into problems later if you don't have the right binaries or Python packages installed. If you run into HiveOperator errors, do a pip install airflow[hive]. Make sure you have the database installed if you're specifying database support with pip (e.g. do a brew install postgresql or apt-get install postgresql if you did pip install airflow[postgres]).

Use the conda virtual environment as defined in environment.yml:

  • Install miniconda
  • Make sure that conda is on the path:
$ which conda
~/miniconda2/envs/ns/bin/conda
  • Install the virtual environment:
$ conda env create -f environment.yml
  • Activate the virtual environment:
$ source activate airflow

Alternatively, install Airflow yourself by running:

$ pip install airflow

Run Airflow

Before we can actually use Airflow, we have to initialize its database (the default is a SQLite database). Once that's done, we can access the UI by running a web server and we can start scheduling jobs.

First, set environment variable AIRFLOW_HOME to the current directory (if you don't set this, Airflow will create a directory ~/airflow to put its files in):

$ export AIRFLOW_HOME="$(pwd)"

Initialize the database:

$ airflow initdb

Start the web server and go to localhost:8080 to check out the UI:

$ airflow webserver -p 8080

With the webserver running, we'll start a job from a new terminal window. Open a new terminal, activate the virtual environment and set the environment variable for this terminal:

$ source activate airflow
$ export AIRFLOW_HOME="$(pwd)"

Run a supplied example and check in the UI that it's running:

$ airflow run example_bash_operator runme_0 2015-01-01

Job

We'll first create a job by specifying actions as a Directed Acyclic Graph (DAG) in Python and then submit it.

Create the DAG

Open a file with the name airflow_tutorial.py for your DAG.

Settings for tasks can be passed as arguments when creating them. This can be done by setting each of them or by passing a dictionary with default values, allowing us to share default arguments for multiple tasks. Add the following dictionary to airflow_tutorial.py to specify time, email and retry settings that are shared by our tasks:

from datetime import datetime
from datetime import timedelta

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2015, 6, 1),
    'email': ['[email protected]'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

We'll create a DAG object that will contain our actions. This DAG will run every day, starting with the start_date specified in default_args. The run will start after a given date has passed (i.e. the workflow for 2016-05-01 will run after 2016-05-01 23:59). All times in Airflow are in UTC.

from airflow import DAG
dag = DAG('airflow_tutorial', default_args=default_args, 
          schedule_interval=timedelta(days=1))

We'll run a job consisting of four tasks: we'll print 'hello', wait for 10 seconds and finally print 'world', all done using BashOperators. Give each operator an unique ID, a bash command and our dag object (the parameter retries is overwritten by 3 for the third operator).

from airflow.operators.bash_operator import BashOperator

task_hello = BashOperator(task_id='print_hello', 
                          bash_command='echo "hello"', dag=dag)
task_sleep = BashOperator(task_id='sleep', bash_command='sleep 5',
                          dag=dag)
task_world = BashOperator(task_id='print_world',
                          bash_command='echo "world"', retries=3,
                          dag=dag)

Dependencies in actions are added by setting other actions as upstream (or downstream). Link the operations in a chain so that task_sleep will be run after task_hello and is followed by task_world; task_hello -> task_sleep -> task_world:

task_sleep.set_upstream(task_hello)
task_world.set_upstream(task_sleep)

Your final DAG should look something like:

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime
from datetime import timedelta

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2015, 6, 1),
    'email': ['[email protected]'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}


dag = DAG('airflow_tutorial', default_args=default_args,
          schedule_interval=timedelta(days=1))


task_hello = BashOperator(task_id='print_hello',
                          bash_command='echo "hello"', dag=dag)
task_sleep = BashOperator(task_id='sleep', bash_command='sleep 5',
                          dag=dag)
task_world = BashOperator(task_id='print_world',
                          bash_command='echo "world"', retries=3,
                          dag=dag)


task_sleep.set_upstream(task_hello)
task_world.set_upstream(task_sleep)

Check that the DAG is valid by executing the file with python:

$ python airflow_tutorial.py

Run the job

Airflow checks for DAGs in its $AIRFLOW_HOME/dags/ folder. Move airflow_tutorial.py to the folder dags/ (or ~/airflow/dags if you didn't set $AIRFLOW_HOME`). Your job is automatically picked up and scheduled to run.

You can manually test a single task with airflow test:

airflow test airflow_tutorial print_world 2016-07-01

This runs the task locally as if it was for the given date, ignoring other tasks and without communication to the database. You should see

Use airflow run to manually run a task with its dependencies for a given date.

airflow run airflow_tutorial print_world 2016-07-01

If you want to backfill hobs over a period, specify a start and end date with -s and -e:

airflow backfill airflow_tutorial -s 2016-10-01 -e 2016-10-05

Conclusion

You've set up Airflow, created a DAG and ran a job. Try changing the interval to every minute, implement templating as in the original tutorial and checking our more example DAGs. Read the docs before really using Airflow.

About

Airflow basics tutorial

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages