Skip to content

Commit

Permalink
Add Airflow example with scale-out workers
Browse files Browse the repository at this point in the history
  • Loading branch information
mmdriley committed Jun 5, 2018
1 parent 53cb231 commit 38e239f
Show file tree
Hide file tree
Showing 7 changed files with 201 additions and 0 deletions.
2 changes: 2 additions & 0 deletions aws-ts-airflow/Pulumi.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
name: airflow
runtime: nodejs
1 change: 1 addition & 0 deletions aws-ts-airflow/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
A Pulumi program to deploy an RDS Postgres instance and containerized Airflow
3 changes: 3 additions & 0 deletions aws-ts-airflow/airflow-container/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
FROM puckel/docker-airflow

ADD dags /usr/local/airflow/dags
54 changes: 54 additions & 0 deletions aws-ts-airflow/airflow-container/dags/example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
"""
Code that goes along with the Airflow tutorial located at:
https://github.com/airbnb/airflow/blob/master/airflow/example_dags/tutorial.py
"""
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta


default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2015, 6, 1),
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}

dag = DAG('tutorial', default_args=default_args)

# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag)

t2 = BashOperator(
task_id='sleep',
bash_command='sleep 5',
retries=3,
dag=dag)

templated_command = """
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
echo "{{ params.my_param }}"
{% endfor %}
"""

t3 = BashOperator(
task_id='templated',
bash_command=templated_command,
params={'my_param': 'Parameter I passed in'},
dag=dag)

t2.set_upstream(t1)
t3.set_upstream(t1)
98 changes: 98 additions & 0 deletions aws-ts-airflow/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
import * as pulumi from "@pulumi/pulumi";
import * as aws from "@pulumi/aws";
import * as awscloud from "@pulumi/cloud-aws";

let config = new pulumi.Config("airflow");
const dbPassword = config.require("dbPassword");

let securityGroupIds = [ awscloud.getCluster()!.securityGroupId! ];

let dbSubnets = new aws.rds.SubnetGroup("dbsubnets", {
subnetIds: awscloud.getNetwork().subnetIds,
});

let db = new aws.rds.Instance("postgresdb", {
engine: "postgres",

instanceClass: "db.t2.micro",
allocatedStorage: 20,

dbSubnetGroupName: dbSubnets.id,
vpcSecurityGroupIds: securityGroupIds,

name: "airflow",
username: "airflow",
password: dbPassword,

skipFinalSnapshot: true,
});

let cacheSubnets = new aws.elasticache.SubnetGroup("cachesubnets", {
subnetIds: awscloud.getNetwork().subnetIds,
});

let cacheCluster = new aws.elasticache.Cluster("cachecluster", {
clusterId: "cache-" + pulumi.getStack(),
engine: "redis",

nodeType: "cache.t2.micro",
numCacheNodes: 1,

subnetGroupName: cacheSubnets.id,
securityGroupIds: securityGroupIds,
});

let environment = {
"POSTGRES_HOST": db.endpoint.apply(e => e.split(":")[0]),
"POSTGRES_PASSWORD": dbPassword,

"REDIS_HOST": cacheCluster.cacheNodes.apply(n => n[0].address),

"EXECUTOR": "Celery",
};

let airflowController = new awscloud.Service("airflowcontroller", {
containers: {
"webserver": {
build: "./airflow-container",
ports: [{ port: 8080, external: true, protocol: "http" }],
environment: environment,
command: [ "webserver" ],
},

"scheduler": {
build: "./airflow-container",
environment: environment,
command: [ "scheduler" ],
},
},
replicas: 1,
});

let airflower = new awscloud.Service("airflower", {
containers: {
// If the container is named "flower", we create environment variables that start
// with `FLOWER_` and Flower tries and fails to parse them as configuration.
"notflower": {
build: "./airflow-container",
ports: [{ port: 5555, external: true, protocol: "http" }],
environment: environment,
command: [ "flower" ],
},
},
});

let airflowWorkers = new awscloud.Service("airflowworkers", {
containers: {
"worker": {
build: "./airflow-container",
environment: environment,
command: [ "worker" ],
memory: 1024,
},
},
replicas: 3,
});

export let airflowEndpoint = airflowController.defaultEndpoint.apply(e => e.hostname);
export let flowerEndpoint = airflower.defaultEndpoint.apply(e => e.hostname);
19 changes: 19 additions & 0 deletions aws-ts-airflow/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{
"name": "airflow",
"version": "0.1",
"main": "bin/index.js",
"typings": "bin/index.d.ts",
"scripts": {
"build": "tsc"
},
"devDependencies": {
"@types/node": "^10.1.2",
"typescript": "^2.8.3"
},
"dependencies": {
"@pulumi/aws": "^0.13.0",
"@pulumi/cloud": "^0.13.1",
"@pulumi/cloud-aws": "^0.13.1",
"@pulumi/pulumi": "^0.12.2"
}
}
24 changes: 24 additions & 0 deletions aws-ts-airflow/tsconfig.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
{
"compilerOptions": {
"outDir": "bin",
"target": "es6",
"lib": [
"es6"
],
"module": "commonjs",
"moduleResolution": "node",
"declaration": true,
"sourceMap": true,
"stripInternal": true,
"experimentalDecorators": true,
"pretty": true,
"noFallthroughCasesInSwitch": true,
"noImplicitAny": true,
"noImplicitReturns": true,
"forceConsistentCasingInFileNames": true,
"strictNullChecks": true
},
"files": [
"index.ts"
]
}

0 comments on commit 38e239f

Please sign in to comment.