This tutorial is loosely based on the Airflow tutorial in the official documentation. It will walk you through the basics of starting up Airflow and creating a job.
Airflow is installable with pip
. It's prepackaged in the virtual environment in environment.yml
or you can install it yourself. You may run into problems later if you don't have the right binaries or Python packages installed. If you run into HiveOperator errors, do a pip install airflow[hive]
. Make sure you have the database installed if you're specifying database support with pip
(e.g. do a brew install postgresql
or apt-get install postgresql
if you did pip install airflow[postgres]
).
Use the conda virtual environment as defined in environment.yml
:
- Install miniconda
- Make sure that conda is on the path:
$ which conda
~/miniconda2/envs/ns/bin/conda
- Install the virtual environment:
$ conda env create -f environment.yml
- Activate the virtual environment:
$ source activate airflow
Alternatively, install Airflow yourself by running:
$ pip install airflow
Before we can actually use Airflow, we have to initialize its database (the default is a SQLite database). Once that's done, we can access the UI by running a web server and we can start scheduling jobs.
First, set environment variable AIRFLOW_HOME
to the current directory (if you don't set this, Airflow will create a directory ~/airflow
to put its files in):
$ export AIRFLOW_HOME="$(pwd)"
Initialize the database:
$ airflow initdb
Start the web server and go to localhost:8080 to check out the UI:
$ airflow webserver -p 8080
With the webserver running, we'll start a job from a new terminal window. Open a new terminal, activate the virtual environment and set the environment variable for this terminal:
$ source activate airflow
$ export AIRFLOW_HOME="$(pwd)"
Run a supplied example and check in the UI that it's running:
$ airflow run example_bash_operator runme_0 2015-01-01
We'll first create a job by specifying actions as a Directed Acyclic Graph (DAG) in Python and then submit it.
Open a file with the name airflow_tutorial.py
for your DAG.
Settings for tasks can be passed as arguments when creating them. This can be done by setting each of them or by passing a dictionary with default values, allowing us to share default arguments for multiple tasks. Add the following dictionary to airflow_tutorial.py
to specify time, email and retry settings that are shared by our tasks:
from datetime import datetime
from datetime import timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2015, 6, 1),
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
We'll create a DAG object that will contain our actions. This DAG will run every day, starting with the start_date
specified in default_args
. The run will start after a given date has passed (i.e. the workflow for 2016-05-01 will run after 2016-05-01 23:59). All times in Airflow are in UTC.
from airflow import DAG
dag = DAG('airflow_tutorial', default_args=default_args,
schedule_interval=timedelta(days=1))
We'll run a job consisting of four tasks: we'll print 'hello', wait for 10 seconds and finally print 'world', all done using BashOperators. Give each operator an unique ID, a bash command and our dag
object (the parameter retries
is overwritten by 3 for the third operator).
from airflow.operators.bash_operator import BashOperator
task_hello = BashOperator(task_id='print_hello',
bash_command='echo "hello"', dag=dag)
task_sleep = BashOperator(task_id='sleep', bash_command='sleep 5',
dag=dag)
task_world = BashOperator(task_id='print_world',
bash_command='echo "world"', retries=3,
dag=dag)
Dependencies in actions are added by setting other actions as upstream (or downstream). Link the operations in a chain so that task_sleep
will be run after task_hello
and is followed by task_world
; task_hello
-> task_sleep
-> task_world
:
task_sleep.set_upstream(task_hello)
task_world.set_upstream(task_sleep)
Your final DAG should look something like:
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime
from datetime import timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2015, 6, 1),
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG('airflow_tutorial', default_args=default_args,
schedule_interval=timedelta(days=1))
task_hello = BashOperator(task_id='print_hello',
bash_command='echo "hello"', dag=dag)
task_sleep = BashOperator(task_id='sleep', bash_command='sleep 5',
dag=dag)
task_world = BashOperator(task_id='print_world',
bash_command='echo "world"', retries=3,
dag=dag)
task_sleep.set_upstream(task_hello)
task_world.set_upstream(task_sleep)
Check that the DAG is valid by executing the file with python:
$ python airflow_tutorial.py
Airflow checks for DAGs in its $AIRFLOW_HOME/dags/
folder. Move airflow_tutorial.py
to the folder dags/
(or ~/airflow/dags if you didn't set
$AIRFLOW_HOME`). Your job is automatically picked up and scheduled to run.
You can manually test a single task with airflow test
:
airflow test airflow_tutorial print_world 2016-07-01
This runs the task locally as if it was for the given date, ignoring other tasks and without communication to the database. You should see
Use airflow run
to manually run a task with its dependencies for a given date.
airflow run airflow_tutorial print_world 2016-07-01
If you want to backfill hobs over a period, specify a start and end date with -s
and -e
:
airflow backfill airflow_tutorial -s 2016-10-01 -e 2016-10-05
You've set up Airflow, created a DAG and ran a job. Try changing the interval to every minute, implement templating as in the original tutorial and checking our more example DAGs. Read the docs before really using Airflow.