Apache Airflow

Apache Airflow is an open source job scheduler made for data pipelines.


While the installation is pretty straightforward, getting it to work is a little more detailed:

export AIRFLOW_HOME=~/airflow
pip install apache-airflow
airflow initdb
airflow webserver -p 8080

pip install apache-airflow[devel]
pip install apache-airflow[async]
pip install apache-airflow[crypto]
pip install apache-airflow[mysql]
pip install apache-airflow[rabbitmq]


A good Docker container setup is here:




I had issues with this rabbitmq so I built rabbitmq:

git clone git://github.com/celery/librabbitmq.git
cd rabbitmq
make install

Setup Gunicorn:

pip install gunicorn==19.3.0  # needed for python 3
Then in my bashrc add:
export PATH=$PATH:~/.local/bin

Generating an RSA public/private-key pair

Self Sign Certificates

openssl genrsa -out private.pem 2048

Generating a self-signed certificate

openssl req -new -x509 -key private.pem -out cacert.pem -days 1095

Then you can access the web gui with https:localhost:8080

In your airflow.cfg under [webserver]

web_server_ssl_cert = path/to/cacert.pem
web_server_ssl_key = path/to/private.pem

Important Configs

Under airflow.cfg, there’s a few important settings, including:

Airflow Admin Connections

In the Airflow Admin (e.g. https://localhost:8079/admin/connection/), select your connection (e.g. ‘mysql_default’) (or define your own), then create or edit this information. E.g. change the following to your connection information:

host : my_db 
Schema : airflow
Login : root 
Password : root 

You can also add an environment variable with this connection info if you want to start it up by default:

ENV AIRFLOW_CONN_MYSQL_DEFAULT="mysql://root:root@my_db:3306/airflow"

Now you can run ad hoc queries! Make sure you’re not really using user root on a production system. You should be able to run a command like this:

mysql> show tables in airflow;
| Tables_in_airflow |
| alembic_version   |
| chart             |
| connection        |
| dag               |
| dag_pickle        |
| dag_run           |
| dag_stats         |
| import_error      |
| job               |
| known_event       |
| known_event_type  |
| log               |
| sla_miss          |
| slot_pool         |
| task_fail         |
| task_instance     |
| users             |
| variable          |
| xcom              |


If you setup LocalExecutor and change out the sql_alchemy_conn, you can get going pretty quickly with Airflow. It might look like this:

sql_alchemy_conn = mysql://myusername:mypassword@mydb:3306/airflow

Run Webserver and Scheduler

airflow webserver # shows GUI airflow scheduler # sends tasks (and picks up tasks if you’re LocalExecutor) airflow worker # picks up tasks only if you’re using Celery

Distributed Mode

Setup distributed mode using the celery executor

With Distributed Mode you need a backend like RabbitMQ. We’ll modify the airflow.cfg file to:

executor = CeleryExectuor # instead of SequentialExecutor

celery_result_backend = db+mysql://root:root@my_db:3306/airflow

# By default, the `airflow.cfg` specifies a default celery queue of `default_queue`
# Make sure to specify what `queue` to use (from BaseOperator)

# Remember to kick off some workers then with:
airflow worker

# To monitor your workers, check out 'Celery Flower' from
airflow flower


Airflow can integrate with systemd based systems, allowing systemd to watch restarting a daemon on failure. In the scripts/systemd directory, there’s unit files that you can copy over to /usr/lib/systemd/system. Airflow tasks will run under user airflow:airflow.

Environment configuration is picked up from /etc/sysconfig/airflow. Make sure to specify SCHEDULER_RUNS and AIRFLOW_HOME and AIRFLOW_CONFIG.



A DAG (Directed Acyclic Graphic) is a collection of all the tasks you want to run, organized by their relationships and dependencies. A sample DAG can be three tasks: A, B, C. A DAG describes HOW you want to carry out your workflow, but not say anything about exactly WHAT we actually want to do (A, B, C can be anything). DAGS are defined in Python and placed into the DAG_FOLDER.

So I think of this as a DAG can only go down the line in tasks. The tasks might split so that some run in parallel, but they can’t circle back to another task (because that’ll cause an infinite loop for that task), which will only cause more issues when that task scheduled to run at regular intervals (say every 15 minutes, it creates another infinite loop). So yeah, that’s why DAGs just go down the line in tasks instead of circling back.

Important Note: For DAGs, these should be atomic, idempotent items.

DAGs and default arguments

When we create a task, we can define a dictionary of default parameters that we can use. These parameters depend on the type of Operator you’re choosing. Remember that all our tasks are nested into a DAG object. Here we create the DAG and pass in a default argument dictionary.

from datetime import datetime, timedelta

# Define a dictionary of default arguments
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2015, 6, 1),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),

# Instantiate a DAG
dag = DAG(
    'tutorial', default_args=default_args, schedule_interval=timedelta(1))

DAG schedule_interval

You can run a DAG on a schedule using the schedule_interval, which happens to take in CRON like times.

dag = DAG(dag_id='dag_do_stuff',
      schedule_interval='*/5 * * * *',  # Run every 5 minutes

My advice is to name your dag_id with versions (e.g. my_dag_v1) because when certain things change about your DAG (like schedule_interval), bump up the version (e.g. from v1 to v2).

You can also pass in presents into the schedule_interval, including:


Keep in mind if you’re running a distrubted system that your DAG is sent to a /tmp folder so if you have code that uses say os.path, make sure that path is there for all your servers.

How DAG schedule runs

The airflow scheduler monitors all tasks and all DAGs, triggering the task instances whose dependencies have been met.

If you run a DAG on a schedule_interval of one day, then the run stamped 2016-01-01 will trigger after 2016-01-01T23:59. This means that the job instance is started once the period it covers has ended.

It’s very important to note: The scheduler runs your job one schedule_interval AFTER the start date, at the END of the period.

Dag Views

You can view DAGs through the GUI, with the following views:


While DAGS describe HOW to run a workflow, Operators determining what actually gets done. Operators describe a single task in a workflow, with common operators including:

When to split out multiple operators

Operators are usually atomic; they should be able to stand on their own without sharing resources with any other operators. At scale, two operators may run on completely different machines so you need to be careful on how to share information. If two operators need to share information, like a file or some data, consider combining them into a single operator. If it absolutely can’t be avoided at all, then consider using operator cross-communication called XCom.


When an operator is instantiated, it is then a task. The parameterized task becomes a node in the DAG. Task Instances are a specific run of a task and is a combination of a dag, a task, and a point in time. Task Instances have a state, which could be ‘running’, ‘success’, ‘failed’, ‘skipped’, etc.

The arguments from an operator depend on the type of Operator. (e.g. bash_command comes from BashOperator and retries comes from BaseOperator)

Example tasks might look like:

t1 = BashOperator(

t2 = BashOperator(
    bash_command='sleep 5',

The precedence rules are:

  1. Explicitly passed arguments
  2. Values that exist in the default_args dictionary
  3. The operator’s default value, if one exists

The following fields are required:

Run even if previous task failed

You can set an Operator to run, even if the previous job failed using trigger_rule="all_done".

t2 = BashOperator(
    bash_command='sleep 5',

Task Running and Clearing

Run - Run a single task instance (can only be done if using something other than SequentialExecutor)


Clear - Clear a set of task instances, as if they never ran

Say a DAG needs to be completely rerun, then just run clear:

airflow clear my_dag -s 2018-09-21 -e 2018-09-22

Clear only dags that have failed

airflow clear my_dag_id -f


Hooks are interfaces to external platforms and databases like S3, MySQL, Postgres, HDFS. Hooks keep authentication code and information out of pipelines, centralized in the metadata database. Hooks use the airflow.models.Connection model.

For example:

conn = MySqlHook(conn_name_attr='my_db')
conn.bulk_load(table_name, local_filepath)

Hooks and Connections

If we want to store a password in a Hook (encrypted with fernet key), you can do the following:

Create a Connection (with encrypted password) like so:

from airflow.models import Connection
def create_conn(username, password, host=None):
    new_conn = Connection(conn_id=f'{username}_connection',
                                  host=host if host else None)

Access the Connection (and password) like so:

from airflow.hooks.base_hook import BaseHook

connection = BaseHook.get_connection("username_connection")
# this connection gives you the host, login, and password for your db connection

password = connection.password # This is a getter that returns the unencrypted password.

We do this over an Airflow variable so that we get an encrypted password.


Some systems can get overwhelmed when too many processes hit them at the same time. Use an Airflow pool to limit the execution parallelism on specific tasks. A task can be associated to a pool with the pool paramter when creating the task (i.e. when setting up the Operator, e.g. BashOperator).

You can also define priority_weight to define priorities in the queue and which tasks get executed first as slots open up in the pool. The default priority_weight is 1.


XComs let tasks exchange messages, allowing you to have more control and share state across systems. An XCom is defined by a key, value, and timestamp, as well as tracks attributes like the task/DAG that created the XCom. XComs can be “pushed” (sent) or “pulled” (received).

XComs Pushing and Pulling

Tasks can push XComs at any time by calling the xcom_push() method. Tasks can pull XComs at any time by calling the xcom_pull() method, optionally applying filters based on the criteria like key, source task_ids, and source dag_id.


Variables are a generic way to store and retrieve arbitrary content or settings as a simple key value store. Variables can be listed, created, updated, and deleted from the UI, code, or CLI. Note that under the hood, a Variable is a SQLAlchemy Model.

from airflow.models import Variable
foo = Variable.get("foo")
bar = Variable.get("bar", deserialize_json=True)


If you need to setup a workflow that goes down a certain path based on an arbitrary condition (e.g. if something happened on an upstream task), then use branching.

TriggerDagRunOperator - Trigger another Dag from current Dag

If branching isn’t enough (i.e. based off some conditional logic, execute a set of tasks), you might want to consider TriggerDagRunOperator so you can kick off entire DAGs from your current DAG. To do this, you need:

You might want to do a Trigger job if you want to do some safety checks/testing before running your real operation.

Example Controller DAG

The Controller DAG conditionally executes the trigger.

"""This example illustrates the use of the TriggerDagRunOperator. There are 2
entities at work in this scenario:
1. The Controller DAG - the DAG that conditionally executes the trigger
2. The Target DAG - DAG being triggered (in example_trigger_target_dag.py)
This example illustrates the following features :
1. A TriggerDagRunOperator that takes:
  a. A python callable that decides whether or not to trigger the Target DAG
  b. An optional params dict passed to the python callable to help in
     evaluating whether or not to trigger the Target DAG
  c. The id (name) of the Target DAG
  d. The python callable can add contextual info to the DagRun created by
     way of adding a Pickleable payload (e.g. dictionary of primitives). This
     state is then made available to the TargetDag
2. A Target DAG : c.f. example_trigger_target_dag.py

import pprint
from datetime import datetime

from airflow import DAG
from airflow.operators.dagrun_operator import TriggerDagRunOperator

pp = pprint.PrettyPrinter(indent=4)

def conditionally_trigger(context, dag_run_obj):
    """This function decides whether or not to Trigger the remote DAG"""
    c_p = context['params']['condition_param']
    print("Controller DAG : conditionally_trigger = {}".format(c_p))
    if context['params']['condition_param']:
        dag_run_obj.payload = {'message': context['params']['message']}
        return dag_run_obj

# Define the DAG
dag = DAG(
        "owner": "airflow",
        "start_date": datetime.utcnow(),

# Define the single task in this controller example DAG
trigger = TriggerDagRunOperator(
    params={'condition_param': True, 'message': 'Hello World'},

The Target DAG

The DAG that is triggered (externally instead of scheduled regularly) and run

import pprint
from datetime import datetime

from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator

pp = pprint.PrettyPrinter(indent=4)

# This example illustrates the use of the TriggerDagRunOperator. There are 2
# entities at work in this scenario:
# 1. The Controller DAG - the DAG that conditionally executes the trigger
#    (in example_trigger_controller.py)
# 2. The Target DAG - DAG being triggered
# This example illustrates the following features :
# 1. A TriggerDagRunOperator that takes:
#   a. A python callable that decides whether or not to trigger the Target DAG
#   b. An optional params dict passed to the python callable to help in
#      evaluating whether or not to trigger the Target DAG
#   c. The id (name) of the Target DAG
#   d. The python callable can add contextual info to the DagRun created by
#      way of adding a Pickleable payload (e.g. dictionary of primitives). This
#      state is then made available to the TargetDag
# 2. A Target DAG : c.f. example_trigger_target_dag.py

args = {
    'start_date': datetime.utcnow(),
    'owner': 'airflow',

dag = DAG(

def run_this_func(ds, **kwargs):
    print("Remotely received value of {} for key=message".

run_this = PythonOperator(

# You can also access the DagRun object in templates
bash_task = BashOperator(
    bash_command='echo "Here is the message: '
                 '" ',


SubDAGs are used for repeating patterns. Defining a function that returns a DAG object is a nice design pattern when using Airflow. Airflow uses the stage-check-exchange pattern when loading data. Data is staged in a temporary table, after which data quality checks are performed against that table. Once all the checks pass, the partition is moved into the production table.


Service Level Agreements is the time that a task or DAG should have succeeded (by setting a task level as a timedelta). If one or more instances haven’t succeeded by that time, you can send out an alert email. You can set the SLA like this example here by adding sla to your arguments:


# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': airflow.utils.dates.days_ago(2),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
    # 'wait_for_downstream': False,
    # 'dag': dag,
    # 'adhoc':False,
    'sla': timedelta(hours=2),
    # 'execution_timeout': timedelta(seconds=300),
    # 'on_failure_callback': some_function,
    # 'on_success_callback': some_other_function,
    # 'on_retry_callback': another_function,
    # 'trigger_rule': u'all_success'

dag = DAG(
    description='A simple tutorial DAG',


If you want to setup sending email, make sure to edit your airflow.cfg file under the [smtp] section:

smtp_host = smtp.gmail.com
smtp_starttls = True
smtp_ssl = False
smtp_password = 16_DIGIT_APP_PASSWORD # Or password in raw text for testing if you don't want to generate a Google App Password
smtp_port = 587
smtp_mail_from = YOUR_EMAIL_ADDRESS

Trigger Rules

The normal workflow is to trigger tasks when all their directly upstream tasks have succeeded. However, an operator can set a trigger_rule argument and set this to say:

Latest Run Only

Standard workflows involve running jobs for a particular date/time range. Some workflows perform tasks that are independent of last run time, but need to be run on a schedule, basically like a cron job; for these situations, use the LatestOnlyOperator to skip tasks that are not being run during the most recent scheduled run for a DAG.

Zombies and Undeads

Zombies are tasks without a heartbeat (emitted by the job periodically) and a running status in the database. They occur when a worker node can’t reach the database, when Airflow processes are killed externally, or when a node gets rebooted. Zombies are killed periodically by the scheduler’s process.

Undead processes have a process and a matching heartbeat, but Airflow isn’t aware of this task as running in the database. This mismatch occurs as the state of the database is altered (e.g. from deleting rows in the ‘Task Instances’ view in the UI. During the heartbeat routine, tasks verify their state and terminate themselves if they are in this ‘undead’ state.

Real Life Scenario

Running a DAG

Let’s list the active dags

# print the list of active DAGs
airflow list_dags

# print the list of tasks from the DAG that has a dag_id of "tutorial"
airflow list_tasks tutorial

# print the hierarchy of tasks in the tutorial DAG
airflow list_tasks tutorial --tree

Testing a DAG

You can test out a DAG with airflow test followed by the command layout:

command subcommand dag_id task_id date

An example is:

# test print_date function from the tutorial dag
airflow test tutorial print_date 2016-01-01

airflow test tutorial sleep 2016-01-01


You can run a backfill to rerun parts of your tasks from a certain time period (e.g. say you have a DAG that consists of 5 operations, but only the last 2 operations failed and that these tasks normally run once a day. You can backfill and say only run these last two operations for the last 7 days).

airflow backfill my_dag -s 2018-09-10 -e 2018-09-11


If your DAG Run has not been run for a specific interval (or has been cleared), we can rerun those times with backfill. An exception would be if your DAG has its own catchup written (e.g. schedule is not limited to an interval, but instead uses Now for instance), then we’ll want to turn off catchup (either on the DAG itself with dag.catchup = False inside the configuration file with catchup_by_default = False).

With catchup turned off, you tell the scheduler to only create a DAG run for the most current instance of the DAG interval series. You should only turn this off if your DAG runs perform backfill internally.


The airflow engine passes a few variables by default that are accessible in templates.

You might need say the execution datetime in your command that you’re running, so it might look like the following where we have a templated command (and the macro.ds_add)

templated_command = """


t3 = BashOperator(
    params={'my_param': 'Parameter I passed in'},


Params is a dictionary that you can pass in. This is useful if you have say a variable like MY_CONSTANT that is used in multiple Operators. You can just pass in that param now. E.g.


t3 = BashOperator(
    params={'timezone': TIMEZONE_SETTING},
    bash_command="python myprogram --timezone_setting "

User Defined Macros

You can create your own macros, say you have your own custom function. One advantage to this is that if you need a value from your rendered template (e.g. the execution_date), then calling the function through the DAG’s user_defined_macro will give you the actual value.

def compute_next_execution_date(dag, execution_date):
    return dag.following_schedule(execution_date)

dag = DAG('my_simple_dag', schedule_interval='@hourly',
              'next_execution_date': compute_next_execution_date

task = BashOperator(
    bash_command='echo ""',

Note that since an item like next_execution_date is a python object (datetime), you can also call methods on it. For example:

echo ''  # '2018-08-04 16:05:00'
echo ''  # '2018-08-03T16:05:00'
echo ''

User Defined Libraries

Say you install a library of your own with:

python setup.py build
python setup.py install

When you run your library through say a Bash Command, you’ll want to make sure that your files have:

if __name__ == '__main__':

Otherwise for some reason, files might not get picked up.

Command Line Interface

If you’re running Airflow on a server, sometimes it might be easier to just jump into the command line. You can run command line airflow with:

airflow <command>

Commands include:

__resetdb__ - burn down and rebuild the metadata database
__render__ - render a task instance's template(s)
__variables__ - CRUD operations on variables (e.g. --set, --get, --delete)
__connections__ - list/add/delete connections
__pause__ - pause a DAG
__unpause__ - unpause a DAG
__trigger_dag__ - trigger a DAG run
__dag_state__ - get the status of a dag run
__task_state__ - get the status of a task instance
__run__ - run a single task instance
__list_tasks__ - list the tasks within a DAG
__list_dags__ - list all the DAGs
__initdb__ - initialize the metadata database
__test__ - test a task instance (run a task without checking for dependencies or recording state in db)
__scheduler__ - run scheduler as a persistent service
__clear__ - clear a previous run

Example Bash Command


Database Tables

So if you’re using a relational database like MySQL to store airflow database, the main tables might look like this:

mysql> use airflow

mysql> show tables;
| Tables_in_airflow  |
| alembic_version    |
| celery_taskmeta    |
| celery_tasksetmeta |
| chart              |
| connection         |
| dag                |
| dag_pickle         |
| dag_run            |
| dag_stats          |
| import_error       |
| job                |
| known_event        |
| known_event_type   |
| log                |
| sla_miss           |
| slot_pool          |
| task_fail          |
| task_instance      |
| users              |
| variable           |
| xcom               |
21 rows in set (0.00 sec)


The task instance table shows you what dags have been executed, when, the state, etc.

mysql> describe task_instance;
| Field           | Type          | Null | Key | Default | Extra |
| task_id         | varchar(250)  | NO   | PRI | NULL    |       |
| dag_id          | varchar(250)  | NO   | PRI | NULL    |       |
| execution_date  | datetime(6)   | NO   | PRI | NULL    |       |
| start_date      | datetime(6)   | YES  |     | NULL    |       |
| end_date        | datetime(6)   | YES  |     | NULL    |       |
| duration        | float         | YES  |     | NULL    |       |
| state           | varchar(20)   | YES  | MUL | NULL    |       |
| try_number      | int(11)       | YES  |     | NULL    |       |
| hostname        | varchar(1000) | YES  |     | NULL    |       |
| unixname        | varchar(1000) | YES  |     | NULL    |       |
| job_id          | int(11)       | YES  | MUL | NULL    |       |
| pool            | varchar(50)   | YES  | MUL | NULL    |       |
| queue           | varchar(50)   | YES  |     | NULL    |       |
| priority_weight | int(11)       | YES  |     | NULL    |       |
| operator        | varchar(1000) | YES  |     | NULL    |       |
| queued_dttm     | datetime(6)   | YES  |     | NULL    |       |
| pid             | int(11)       | YES  |     | NULL    |       |
| max_tries       | int(11)       | YES  |     | -1      |       |

mysql> select * from task_instance;
| task_id              | dag_id              | execution_date             | start_date                 | end_date                   | duration | state   | try_number | hostname     | unixname | job_id | pool | queue   | priority_weight | operator     | queued_dttm                | pid  | max_tries |
| my_task_id_abcdefghi | dag_download_abcdef | 2018-08-02 00:05:00.000000 | 2018-08-03 15:46:14.788703 | 2018-08-03 15:46:18.545962 |  3.75726 | success |          1 | 3d3a39269fbd | root     |      2 | NULL | default |               2 | BashOperator | 2018-08-03 15:46:13.419580 |  965 |         2 |
| my_task_id_abcdefghi | dag_download_abcdef | 2018-08-02 01:05:00.000000 | 2018-08-03 15:46:23.427211 | 2018-08-03 15:46:26.416335 |  2.98912 | success |          1 | 3d3a39269fbd | root     |      4 | NULL | default |               2 | BashOperator | 2018-08-03 15:46:22.171445 | 1000 |         2 |
| my_task_id_abcdefghi | dag_download_abcdef | 2018-08-02 02:05:00.000000 | 2018-08-03 15:46:32.067906 | 2018-08-03 15:46:35.113323 |  3.04542 | success |          1 | 3d3a39269fbd | root     |      5 | NULL | default |               2 | BashOperator | 2018-08-03 15:46:30.859648 | 1043 |         2 |


The log format by default is a little too granular for my tastes. I like to log into one file and just grep from there. In airflow.cfg, just modify the log_filename_format. Default is:

log_filename_template = ///.log

I prefer something like:

log_filename_template = ///loggy.log

It makes it easier to grep and clean out logs


You can run your airflow commands in something like screen or nohup for quick testing, but if you want a production environment, you might want to consider using systemctl to handle airflow restarts.

Basically, check out this: https://github.com/apache/airflow/tree/master/scripts/systemd

As an example, on a CentOS machine, I have the following (where will is my user and airflow is installed in /home/will)


# This file is the environment file for Airflow. Put this file in /etc/sysconfig/airflow per default
# configuration of the systemd unit files.


D /run/airflow 0755 airflow airflow

airflow webserver

Description=Airflow webserver daemon

ExecStart=/bin/sh -c 'source /home/will/.virtualenvs/airflow/bin/activate && airflow webserver'



Description=Airflow scheduler daemon

ExecStart=/bin/sh -c 'source /home/will/.virtualenvs/airflow/bin/activate && /home/will/.virtualenvs/airflow/bin/airflow scheduler'



