Skip to content

Commit

Permalink
Add tutorial
Browse files Browse the repository at this point in the history
  • Loading branch information
hgrif committed Oct 24, 2016
1 parent 7a763a2 commit 1b08a02
Show file tree
Hide file tree
Showing 4 changed files with 351 additions and 0 deletions.
96 changes: 96 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
i# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class

# C extensions
*.so

# Distribution / packaging
.Python
env/
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
*.egg-info/
.installed.cfg
*.egg

# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec

# Installer logs
pip-log.txt
pip-delete-this-directory.txt

# Unit test / coverage reports
htmlcov/
.tox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*,cover
.hypothesis/

# Translations
*.mo
*.pot

# Django stuff:
*.log
local_settings.py

# Flask stuff:
instance/
.webassets-cache

# Scrapy stuff:
.scrapy

# Sphinx documentation
docs/_build/

# PyBuilder
target/

# IPython Notebook
.ipynb_checkpoints

# pyenv
.python-version

# celery beat schedule file
celerybeat-schedule

# dotenv
.env

# virtualenv
.venv/
venv/
ENV/

# Spyder project settings
.spyderproject

# Rope project settings
.ropeproject

airflow.cfg
airflow.db
unittests.cfg
airflow-webserver.pid
logs/
191 changes: 191 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
# Airflow tutorial

This tutorial is loosely based on the Airflow tutorial in the [official documentation](https://pythonhosted.org/airflow/tutorial.html). It will walk you through the basics of starting up Airflow and creating a job.


## Setup


### Install Airflow

Airflow is installable with `pip`. It's prepackaged in the virtual environment in `environment.yml` or you can install it yourself. You may run into problems later if you don't have the right binaries or Python packages installed. If you run into HiveOperator errors, do a `pip install airflow[hive]`. Make sure you have the database installed if you're specifying database support with `pip` (e.g. do a `brew install postgresql` or `apt-get install postgresql` if you did `pip install airflow[postgres]`).

Use the conda virtual environment as defined in `environment.yml`:
* Install [miniconda](http:https://conda.pydata.org/miniconda.html)
* Make sure that conda is on the path:

```
$ which conda
~/miniconda2/envs/ns/bin/conda
```

* Install the virtual environment:

```
$ conda env create -f environment.yml
```

* Activate the virtual environment:

```
$ source activate airflow
```

Alternatively, install Airflow yourself by running:
```
$ pip install airflow
```


### Run Airflow

Before we can actually use Airflow, we have to initialize its database (the default is a SQLite database). Once that's done, we can access the UI by running a web server and we can start scheduling jobs.

First, set environment variable `AIRFLOW_HOME` to the current directory (if you don't set this, Airflow will create a directory `~/airflow` to put its files in):
```
$ export AIRFLOW_HOME="$(pwd)"
```

Initialize the database:
```
$ airflow initdb
```


Start the web server and go to [localhost:8080](http:https://localhost:8080/) to check out the UI:
```
$ airflow webserver -p 8080
```

With the webserver running, we'll start a job from a new terminal window. Open a new terminal, activate the virtual environment and set the environment variable for this terminal:
```
$ source activate airflow
$ export AIRFLOW_HOME="$(pwd)"
```
Run a supplied example and check in the UI that it's running:
```
$ airflow run example_bash_operator runme_0 2015-01-01
```


## Job

We'll first create a job by specifying actions as a Directed Acyclic Graph (DAG) in Python and then submit it.


### Create the DAG

Open a file with the name `airflow_tutorial.py` for your DAG.

Settings for tasks can be passed as arguments when creating them. This can be done by setting each of them or by passing a dictionary with default values, allowing us to share default arguments for multiple tasks. Add the following dictionary to `airflow_tutorial.py` to specify time, email and retry settings that are shared by our tasks:
```{python}
from datetime import datetime
from datetime import timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2015, 6, 1),
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
```


We'll create a DAG object that will contain our actions. This DAG will run every day, starting with the `start_date` specified in `default_args`. The run will start _after_ a given date has passed (i.e. the workflow for 2016-05-01 will run after 2016-05-01 23:59). All times in Airflow are in UTC.
```{python}
from airflow import DAG
dag = DAG('airflow_tutorial', default_args=default_args,
schedule_interval=timedelta(days=1))
```


We'll run a job consisting of four tasks: we'll print 'hello', wait for 10 seconds and finally print 'world', all done using BashOperators. Give each operator an unique ID, a bash command and our `dag` object (the parameter `retries` is overwritten by 3 for the third operator).
```{python}
from airflow.operators.bash_operator import BashOperator
task_hello = BashOperator(task_id='print_hello',
bash_command='echo "hello"', dag=dag)
task_sleep = BashOperator(task_id='sleep', bash_command='sleep 5',
dag=dag)
task_world = BashOperator(task_id='print_world',
bash_command='echo "world"', retries=3,
dag=dag)
```


Dependencies in actions are added by setting other actions as upstream (or downstream). Link the operations in a chain so that `task_sleep` will be run after `task_hello` and is followed by `task_world`; `task_hello` -> `task_sleep` -> `task_world`:
```{python}
task_sleep.set_upstream(task_hello)
task_world.set_upstream(task_sleep)
```

Your final DAG should look something like:
```{python}
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime
from datetime import timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2015, 6, 1),
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG('airflow_tutorial', default_args=default_args,
schedule_interval=timedelta(days=1))
task_hello = BashOperator(task_id='print_hello',
bash_command='echo "hello"', dag=dag)
task_sleep = BashOperator(task_id='sleep', bash_command='sleep 5',
dag=dag)
task_world = BashOperator(task_id='print_world',
bash_command='echo "world"', retries=3,
dag=dag)
task_sleep.set_upstream(task_hello)
task_world.set_upstream(task_sleep)
```

Check that the DAG is valid by executing the file with python:
```
$ python airflow_tutorial.py
```


### Run the job

Airflow checks for DAGs in its `$AIRFLOW_HOME/dags/` folder. Move `airflow_tutorial.py` to the folder `dags/` (or `~/airflow/dags if you didn't set `$AIRFLOW_HOME`). Your job is automatically picked up and scheduled to run.

You can manually test a single task with `airflow test`:
```
airflow test airflow_tutorial print_world 2016-07-01
```
This runs the task locally as if it was for the given date, ignoring other tasks and without communication to the database. You should see

Use `airflow run` to manually run a task with its dependencies for a given date.
```
airflow run airflow_tutorial print_world 2016-07-01
```

If you want to backfill hobs over a period, specify a start and end date with `-s` and `-e`:
```
airflow backfill airflow_tutorial -s 2016-10-01 -e 2016-10-05
```


## Conclusion

You've set up Airflow, created a DAG and ran a job. Try changing the interval to every minute, implement templating as in the [original tutorial](https://airflow.incubator.apache.org/tutorial.html#testing) and checking our more [example DAGs](https://github.com/apache/incubator-airflow/tree/master/airflow/example_dags). Read the [docs](https://pythonhosted.org/airflow/index.html) before really using Airflow.
Empty file added dags/.gitignore
Empty file.
64 changes: 64 additions & 0 deletions environment.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
name: airflow
channels:
- defaults
dependencies:
- certifi=2016.8.31=py35_0
- ncurses=5.9=9
- openssl=1.0.2h=2
- pip=8.1.2=py35_0
- python=3.5.2=2
- readline=6.2=0
- setuptools=26.1.1=py35_0
- sqlite=3.13.0=1
- tk=8.5.19=0
- wheel=0.29.0=py35_0
- xz=5.2.2=0
- zlib=1.2.8=3
- pip:
- airflow==1.7.1.3
- alembic==0.8.8
- babel==1.3
- bitarray==0.8.1
- certifi==2016.8.31
- chartkick==0.4.2
- croniter==0.3.12
- dill==0.2.5
- docutils==0.12
- flask==0.10.1
- flask-admin==1.4.0
- flask-cache==0.13.1
- flask-login==0.2.11
- flask-wtf==0.12
- funcsigs==0.4
- future==0.15.2
- gunicorn==19.3.0
- hive-thrift-py==0.0.1
- impyla==0.13.8
- itsdangerous==0.24
- jinja2==2.8
- lockfile==0.12.2
- mako==1.0.4
- markdown==2.6.7
- markupsafe==0.23
- numpy==1.11.2
- pandas==0.19.0
- pip==8.1.2
- ply==3.9
- pygments==2.1.3
- pyhive==0.2.1
- python-daemon==2.1.1
- python-dateutil==2.5.3
- python-editor==1.0.1
- pytz==2016.7
- requests==2.11.1
- setproctitle==1.1.10
- setuptools==26.1.1
- six==1.10.0
- sqlalchemy==1.1.2
- thrift==0.9.3
- thriftpy==0.3.9
- unicodecsv==0.14.1
- werkzeug==0.11.11
- wheel==0.29.0
- wtforms==2.1

0 comments on commit 1b08a02

Please sign in to comment.