Skip to content

Commit

Permalink
Add tips and more structure
Browse files Browse the repository at this point in the history
  • Loading branch information
hgrif committed Jul 24, 2017
1 parent 3addeea commit 28d6456
Showing 1 changed file with 92 additions and 49 deletions.
141 changes: 92 additions & 49 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,28 @@ This tutorial is loosely based on the Airflow tutorial in the [official document
It will walk you through the basics of setting up Airflow and creating an Airflow job.


## Setup
## 1. Setup

You can skip this section if Airflow is already set up.
Make sure that you run airflow commands, know where to put your dags and have access to the web UI.


### Install Airflow

Airflow is installable with `pip` via a simple `pip install apache-airflow`.
It's prepackaged in the virtual environment defined in `environment.yml` or you can install it yourself.

Either use the virtual environment as defined in `environment.yml` or install it yourself.

To use the conda virtual environment as defined in `environment.yml`:

* Install [miniconda](http:https://conda.pydata.org/miniconda.html).
* Make sure that conda is on the path:
* Make sure that conda is on your path:

```{bash}
$ which conda
~/miniconda2/bin/conda
```

* Install the virtual environment:
* Create the virtual environment from `environment.yml`:

```{bash}
$ conda env create -f environment.yml
Expand All @@ -35,35 +37,39 @@ $ conda env create -f environment.yml
$ source activate airflow-tutorial
```

You should now have a (almost) working Airflow installation.

Alternatively, install Airflow yourself by running:

```{bash}
$ pip install apache-airflow
```

Airflow used to be packaged as `airflow` but is packaged as `apache-airflow` since version 1.8.1.
Make sure that you install extra packages with the right Python package: e.g. use `pip install apache-airflow[dask]` if you've installed `apache-airflow` and don't use `pip install airflow[dask]`.
Make sure that you install any extra packages with the right Python package: e.g. use `pip install apache-airflow[dask]` if you've installed `apache-airflow` and DO NOT use `pip install airflow[dask]`.
The latter will install an old version of Airflow next to your current version, leading to a world of hurt.

You may run into problems if you don't have the right binaries or Python packages installed for certain backends or operators.
When specifying support for a database when installing extra airflow packages, make sure it's installed.
For example, do a `brew install postgresql` or `apt-get install postgresql` before the `pip install apache-airflow[postgres]`, and when running into HiveOperator errors, do a `pip install apache-airflow[hive]` and make sure you can use Hive.
When specifying support for e.g PostgreSQL when installing extra airflow packages, make sure the database is installed; do a `brew install postgresql` or `apt-get install postgresql` before the `pip install apache-airflow[postgres]`.
Similarly, when running into HiveOperator errors, do a `pip install apache-airflow[hive]` and make sure you can use Hive.


#### Run Airflow
### Run Airflow

Before we can use Airflow, we have to initialize its database.
The database contains information about historical and running jobs, connections to external data sources,
We have to initialize its database before we can use Airflow.
The database contains information about historical & running jobs, connections to external data sources,
user management, etc.
Once the database is set up, we'll access Airflow's UI by running a web server and we can start running jobs.

The default database is a SQLite database, which is fine for this tutorial.
In a production setting you'll probably be using something like MySQL or PostgreSQL.
As this database stores the state of everything related to Airflow, you'll probably want to back it up.

Airflow will use the directory set in the environment variable `$AIRFLOW_HOME` to store its configuration and our SQlite databse.
Airflow will use the directory set in the environment variable `AIRFLOW_HOME` to store its configuration and our SQlite databse.
This directory will be used after your first Airflow command.
If you don't set the environment variable `AIRFLOW_HOME`, Airflow will create a directory `~/airflow` to put its files in.
If you don't set the environment variable `AIRFLOW_HOME`, Airflow will create the directory `~/airflow/` to put its files in.

Set environment variable `AIRFLOW_HOME` to your current directory `$(pwd)`:
Set environment variable `AIRFLOW_HOME` to e.g. your current directory `$(pwd)`:

```{bash}
$ export AIRFLOW_HOME="$(pwd)"
Expand All @@ -77,7 +83,7 @@ Next, initialize the database:
$ airflow initdb
```

Start the web server and go to [localhost:8080](http:https://localhost:8080/) to check out the UI:
Now start the web server and go to [localhost:8080](http:https://localhost:8080/) to check out the UI:

```{bash}
$ airflow webserver --port 8080
Expand All @@ -95,6 +101,8 @@ $ source activate airflow-tutorial
$ export AIRFLOW_HOME="$(pwd)"
```

Make sure that you're an in the same directory as before when using `$(pwd)`.

Run a supplied example:

```{bash}
Expand All @@ -107,24 +115,36 @@ This concludes all the setting up that we need for this tutorial.
For more information on configuration check the sections on [Configuration](https://airflow.incubator.apache.org/configuration.html) and [Security](https://airflow.incubator.apache.org/security.html) of the Airflow documentation.
Check the [Airflow repository](https://github.com/apache/incubator-airflow/tree/master/scripts) for `upstart` and `systemd` templates.

Tips:
* Python 3 doesn't really seem to be supported by Airflow, so go for Python 2.
* Airflow logs extensively, so pick your log folder carefully.
* Set the timezone of your production to be UTC: Airflow assumes it's UTC.


## 2. Jobs

## Job
We'll create a job by specifying actions as a Directed Acyclic Graph (DAG) in Python, test it and let Airflow run it.
The tasks of a job make up a Graph; the graph is Directed because the tasks are ordered; and we don't want to get stuck in an eternal loop so the graph also has to be Acyclic.

We'll first create a job by specifying actions as a Directed Acyclic Graph (DAG) in Python and then submit it.
Tasks make up a Graph; the graph is Directed because the tasks are ordered; and we don't want to get stuck in an eternal loop so the graph also has to be Acyclic.
The figure below shows an example of a DAG:

<img src="https://airflow.incubator.apache.org/_images/subdag_before.png" style="width: 70%;"/>

### File

Go to the folder that you've designated to be your `AIRFLOW_HOME`, go to the subfolder `dags/` and open a Python file with the name `airflow_tutorial.py` that will contain your DAG.
### Create a DAG file

We'll first configure settings that are shared by all our tasks.
Go to the folder that you've designated to be your `AIRFLOW_HOME` and find the DAGs folder located in subfolder `dags/`.
Create a Python file with the name `airflow_tutorial.py` that will contain your DAG.


First we'll configure settings that are shared by all our tasks.
Settings for tasks can be passed as arguments when creating them, but we can also pass a dictionary with default values to the DAG.
This allows us to share default arguments for multiple tasks.
Add the following import and dictionary to `airflow_tutorial.py` to specify the start time, email and retry settings that are shared by our tasks:
This allows us to share default arguments for all the tasks in our DAG is the best place to set e.g. the owner and start date of our DAG.

Add the following import and dictionary to `airflow_tutorial.py` to specify the owner, start time, and retry settings that are shared by our tasks:


### Settings
### Configure common settings

```{python}
import datetime as dt
Expand All @@ -137,49 +157,61 @@ default_args = {
}
```

These settings tell Airflow that this job is valid since June 1st of 2015, it should not send emails and is allowed to retry the job once if it tails.
These settings tell Airflow that this job is owned by `'me'`, that the job is valid since June 1st of 2015, it should not send emails and it is allowed to retry the job once if it fails with a delay of 5 minutes.
Other common default arguments are email settings on failure and the end time.


### Job
### Create the DAG

We'll now create a DAG object that will contain our tasks:
We'll now create a DAG object that will contain our tasks.

Name it `airflow_tutorial_v01` and pass `default_args`:

```{python}
from airflow import DAG
with DAG('airflow_tutorial_v01',
default_args=default_args,
schedule_interval='0 * * * *'
schedule_interval='0 * * * *',
) as dag:
```

For our DAG we've specified a run at every hour 0 with '0 * * * *': the DAG will run each day at 00:00.
See [crontab.guru](https://crontab.guru/#0_*_*_*_*) if you for help deciphering a cron schedule expression.
Alternatively, you can use strings like `@daily` and `hourly`.
With `schedule_interval='0 * * * *'` we've specified a run at every hour 0; the DAG will run each day at 00:00.
See [crontab.guru](https://crontab.guru/#0_*_*_*_*) for help deciphering cron schedule expressions.
Alternatively, you can use strings like `'@daily'` and `'@hourly'`.
I prefer the cron notation because it's a bit more flexible than `'@daily'` and `'@hourly'`.

We've used a [decorator](https://jeffknupp.com/blog/2016/03/07/python-with-context-managers/) to create a DAG (new since 1.8).
We've used a [context manager](https://jeffknupp.com/blog/2016/03/07/python-with-context-managers/) to create a DAG (new since 1.8).
All the tasks for the DAG should be indented to indicate that they are part of this DAG.

Airflow will generate DAGs runs from the `start_date` with the specified `schedule_interval`.
A run will start _after_ the time for the run has passed; the daily job for 2016-06-02 will run after 2016-06-02 23:59 and the hourly job for 2016-07-03 01:00 will start after 2016-07-03 01:59.
Airflow continuously checks if new DAG runs should be scheduled.
Airflow will generate DAG runs from the `start_date` with the specified `schedule_interval`.
Once a DAG is active, Airflow continuously checks in the database if all the DAG runs have successfully ran since the `start_date`.
Any missing DAG runs are automatically scheduled.
When you initialize on 2016-01-04 a DAG with a `start_date` at 2016-01-01 and a daily `schedule_interval`, Airflow will schedule DAG runs for all the days between 2016-01-01 and 2016-01-04.

Because Airflow saves all the (scheduled) DAG runs in its database, you generally should not change the `start_date` and `schedule_interval` of a DAG.
Instead, up the version number of the DAG (e.g. `airflow_tutorial_v02`) and use the web interface of command line tools to avoid running unnecessary tasks.
A run starts _after_ the time for the run has passed.
The time for which the job runs is called the `execution_date`.
The daily job for 2016-06-02 runs after 2016-06-02 23:59 and the hourly job for 2016-07-03 01:00 starts after 2016-07-03 01:59.

From the ETL viewpoint this makes sense: you can only process the daily data for a day after it has passed.
This can, however, ask for some juggling with date for other jobs.
For Machine Learning models you may want to use all the data up to a given date, you'll have to add the `schedule_interval` to your `execution_date` somewhere in the job logic.j

Because Airflow saves all the (scheduled) DAG runs in its database, you should not change the `start_date` and `schedule_interval` of a DAG.
Instead, up the version number of the DAG (e.g. `airflow_tutorial_v02`) and avoid running unnecessary tasks by using the web interface or command line tools

Timezones and especially daylight savings can mean trouble when scheduling things, so keep your Airflow machine in UTC.
You don't want to skip an hour because daylight savings kicks in (or out).


### Tasks
### Create the tasks

Tasks are represented by operators that either perform an action, transfer data, or sense if something has been done.
Examples of actions are running a bash script or calling a Python function; of transfers are copying tables between databases or uploading a file; and of sensors are checking if a file exists or data has been added to a database.

We'll run a job consisting of three tasks: we'll print 'hello', wait for 10 seconds and finally print 'world'.
We'll create a job consisting of three tasks: we'll print 'hello', wait for 10 seconds and finally print 'world'.
The first two are done with the `BashOperator` and the latter with the `PythonOperator`.
Give each operator an unique ID, a bash command and our `dag` object (the parameter `retries` is overwritten by 3 for the third operator).
Give each operator an unique task ID and something to do:

```{python}
from airflow.operators.bash_operator import BashOperator
Expand All @@ -195,17 +227,20 @@ Give each operator an unique ID, a bash command and our `dag` object (the parame
python_callable=print_world)
```

Note how we can pass bash commands in the `BashOperator` and that the `PythonOperator` asks for a Python function that can be called.

Dependencies in tasks are added by setting other actions as upstream (or downstream).
Link the operations in a chain so that `sleep` will be run after `print_hellp` and is followed by `print_world`; `print_hello` -> `sleep` -> `print_world`:
Link the operations in a chain so that `sleep` will be run after `print_hello` and is followed by `print_world`; `print_hello` -> `sleep` -> `print_world`:

```{python}
print_hello >> sleep >> print_world
```

Your final DAG should look something like:
After rearranging the code your final DAG should look something like:

```{python}
import datetime as dt
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
Expand All @@ -225,7 +260,7 @@ default_args = {
with DAG('airflow_tutorial_v01',
default_args=default_args,
schedule_interval='0 * * * *'
schedule_interval='0 * * * *',
) as dag:
print_hello = BashOperator(task_id='print_hello',
Expand All @@ -234,18 +269,19 @@ with DAG('airflow_tutorial_v01',
print_world = PythonOperator(task_id='print_world',
python_callable=print_world)
print_hello >> sleep >> print_world
```


### Test the DAG

Check that the DAG is valid by executing the file with python:

```{bash}
$ python airflow_tutorial.py
```


### Run the job

Airflow checks for DAGs in its `$AIRFLOW_HOME/dags/` folder.
Move `airflow_tutorial.py` to the folder `dags/` (or `~/airflow/dags if you didn't set `$AIRFLOW_HOME`).
Your job is automatically picked up and scheduled to run.
Expand All @@ -255,8 +291,11 @@ You can manually test a single task with `airflow test`:
```{bash}
airflow test airflow_tutorial_v01 print_world 2016-07-01
```

This runs the task locally as if it was for the given date, ignoring other tasks and without communication to the database.
You should see


### Run the DAG

Use `airflow run` to manually run a task with its dependencies for a given date.

Expand All @@ -265,14 +304,18 @@ airflow run airflow_tutorial_v01 print_world 2016-07-01
```


## Conclusion
## 3. UI


## 4. Conclusion

You've set up Airflow, created a DAG and ran a job.
Try changing the interval to every minute, implement templating as in the [original tutorial](https://airflow.incubator.apache.org/tutorial.html#testing) and checking our more [example DAGs](https://github.com/apache/incubator-airflow/tree/master/airflow/example_dags).
Read the [docs](https://pythonhosted.org/airflow/index.html) before really using Airflow.


## Resources
## 5. Resources

* [Airflow documentation](https://pythonhosted.org/airflow/tutorial.html)
* [ETL best practices with Airflow](https://gtoonstra.github.io/etl-with-airflow/)
* [Airflow: Tips, Tricks, and Pitfalls](https://medium.com/handy-tech/airflow-tips-tricks-and-pitfalls-9ba53fba14eb)

0 comments on commit 28d6456

Please sign in to comment.