View on GitHub

Emile's Notes

Data Science/Programming notes

Introduction to Airflow

Introduction to Airflow

What is Airflow?

Workflow: A set of steps to accomplish a given data engineering task, such as downloading files, copying data, filtering information, writing to a database etc.

Apache Airflow is a platform to program workflows, including: creation, scheduling and monitoring. Workflows are impletemented as DAGs (Directed Acyclic Graphs), authored in Python. Airflow may be accessed by code, CLI or via a web interface.

DAGs

from airflow.models import DAG

from datetime import datetime

default_arguments = {
    'owner': 'emilegill',
    'email': 'emilegill@email.com',
    'start_date': datetime(2020, 1, 20),
    'retries': 3
}

etl_dag = DAG(
    dag_id='etl_workflow',
    default_args=default_args
)

DAGs on the command line:

airflow CLI contains numerous subcommands, many of which are related to DAGs:

airflow list_dags : show all recognised DAGs

airflow run <dag_id> <task_id> <start_date> Run a specific task from a DAG

Implementing Airflow DAGs

Airflow Operators

The most common type of task in Airflow is the Operator.

Bash Operator

from airflow.operators.bash_operator import BashOperator

example_task = BashOperator(task_id='bash_example',
                            bash_command='echo "Example!"',
                            dag=ml_dag)

shell_task = BashOperator(task_id='bash_example',
                         bash_command='runcleanup.sh',
                         dag=ml_dag)

bash_task = BashOperator(task_id='clean_addresses',
                         bash_command='cat addresses.txt | awk "NF==10" > cleaned.txt',
                         dag=ml_dag)

Considerations:

Airflow Tasks

example_task = BashOperator(task_id='bash_example',
                            bash_command='echo "Example!"'),
                            dag=dag)

Task dependencies:

# Define tasks
task1 = BashOperator(task_id='first_task',
                     bash_command='echo 1',
                     dag=example_dag)

task2 = BashOperator(task_id='second_task',
                     bash_command='echo 2',
                     dag=example_dag)

# Set task1 to run before task2
task1 >> task2 # or task2 << task1

Additional Operators

PythonOperator:

from airflow.operators.python_operator import PythonOperator

def printme():
    print("This goes in the logs!")

python_task = PythonOperator(
    task_id='simple_print',
    python_callable=printme,
    dag=example_dag
)
def sleep(length_of_time):
    time.sleep(length_of_time)

sleep_task = PythonOperator(
    task_id='sleep',
    python_callable=sleep,
    op_kwargs={'length_of_time': 5},
    dag=example_dag
)

EmailOperator:

from airflow.operators.email_operator import EmailOperator

email_task = EmailOperator(
    task_id='email_sales_report',
    to='sales_manager@example.com',
    subject='Automated Sales Report',
    html_content='Attached is the latest sales report',
    files='latest_sales.xlsx',
    dag=example_dag
)

Airflow Scheduling

DAG Runs:

Schedule details:

Schedule interval syntax::

The schedule_interval parameter may be specfied using cron syntax:

# ┌───────────── minute (0 - 59)
# │ ┌───────────── hour (0 - 23)
# │ │ ┌───────────── day of the month (1 - 31)
# │ │ │ ┌───────────── month (1 - 12)
# │ │ │ │ ┌───────────── day of the week (0 - 6) (Sunday to Saturday;
# │ │ │ │ │                                   7 is also Sunday on some systems)
# │ │ │ │ │
# │ │ │ │ │
# * * * * * <command to execute>

e.g.

0 12 * * *              # Run daily at noon

* * 25 2 *              # Run once per minute on February 25th

0,15,30,45 * * * *      # Run every 15minutes

Or, also, may be defined as one of Airflows preset values: @hourly, @daily, @weekly`.

Special presets:

Note: When scheduling a DAG, Airflow will wait one schedule_interval from it’s start_date before triggering the DAG for the first time. Therefore, a DAG with

'start_date': datetime(2020, 2, 25),
'schedule_interval': '@daily'

will trigger for the first time on February 26th 2020.

Maintaining and monitoring Airflow workflows

Airflow Sensors

Sensor:

Sensor arguments:

File Sensor:

from airflow.contrib.sensors.file_sensors import FileSensor

file_sensor_task = FileSensor(task_id='file_sense',
                              filepath='salesdata.csv',
                              poke_interval=300,
                              dag=sales_report_dag)

init_sales_cleanup >> file_sensor_task >> generate_report

Other Sensors:

Airflow Executors

Executors run tasks in Airflow, different executors handle running of tasks differently.

The executor being used by an Airflow configuration can be found in its airflow.cfg file, under the entry executor=.

Debugging and troubleshooting Airflow

DAG won’t run on schedule:

DAG won’t load (not in web UI/airflow list_dags):

SLAs and reporting in Airflow

SLA: Service Level Agreement

Defining SLAs:

task1 = BashOperator(task_id='sla_misses',
                     bash_command='runcode.sh',
                     sla=timedelta(seconds=30),
                     dag=dag) 
default_args = {
  'sla': timedelta(minutes=20),
  'start_date': datetime(2020, 2, 20)
}

dag = DAG('sla_dag', default_args=default_args)

General Reporting:

default_args = {
  'email': ['emile@emilegill.com'],
  'email_on_failure': True,
  'email_on_retry': False,
  'email_on_success': True
}

Building production pipelines in Airflow

Working with templates

Templates:

Non-templated example:

t1 = BashOperator(
      task_id='first_task',
      bash_command='echo "Reading file1.txt"',
      dag=dag)

t1 = BashOperator(
      task_id='second_task',
      bash_command='echo "Reading file2.txt"',
      dag=dag)

Templated example:

Simple example-

templated_command="""
  echo "Reading {{params.filename}}"
"""

t1 = BashOperator(task_id='template_task',
                  bash_command=templated_command,
                  params={'filename': 'file1.txt'},
                  dag=example_dag)

t1 = BashOperator(task_id='template_task',
                  bash_command=templated_command,
                  params={'filename': 'file2.txt'},
                  dag=example_dag)

More advanced example-

templated_command = """
  {% for filename in params.filenames %}
    echo "Reading {{filename}}"
  {% endfor %}
"""

t1 = BashOperator(task_id='templated_task',
                  bash_command=templated_command,
                  params={'filenames': ['file1.txt', 'file2.txt']},
                  dag=example_dag)

Variables:

Airflow provides several built-in runtime variables, providing infor about DAG runs, tasks, system config etc.

Examples:

description variable
Execution Date (YYYY-MM-DD) {{ ds }}
Execution Date, no dashes (YYYYMMDD) {{ds_nodash}}
Previous Execution Date (YYYY-MM-DD) {{ prev_ds }}
Previous Execution Date, no dashes (YYYYMMDD) {{ prev_ds_nodash }}
DAG object {{ dag }}
Airflow config object {{ conf }}

Macros:

In addition to the other variables provided, airflow also provides a macros variable referencing the Airflow macros package. This provides various useful objects/methods for Airflow templates.

Examples:

macro description
{{ macros.datetime }} datetime.datetime object
{{ macros.timedelta }} datetime.timedelta object
{{ macros.uuid }} Python’s uuid object
{{ macros.ds_add(‘2020-04-15’, 5) }} Modify days from a date, e.g. this will return 2020-04-20

Branching

def branch_test(**kwargs):
  if int(kwargs['ds_no_dash']) % 2 == 0:
    return 'even_day_task'
  else:
    return 'odd_day_task'

branch_task = BranchPythonOperator(task_id='branch_task',
                                   provide_context=True,
                                   python_callable=branch_test,
                                   dag=dag)

start_task >> branch_task >> even_day_task >> even_day_task2
branch_task >> odd_day_task >> odd_day_task2

Creating a production pipeline

Running tasks and DAGs:

# Run a specific task from the command line
airflow run <dag> <task_id> <date>

# Run a full DAG
airflow trigger_dag -e <date> <dag_id>

Pipeline Example:

from airflow.models import DAG
from airflow.contrib.sensors.file_sensor import FileSensor
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.python_operator import BranchPythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.email_operator import EmailOperator
from dags.process import process_data
from datetime import datetime, timedelta

def process_data(**context):
  file = open('/home/repl/workspace/processed_data.tmp', 'w')
  file.write(f'Data processed on {date.today()}')
  file.close()

default_args = {
  'start_date': datetime(2019,1,1),
  'sla': timedelta(minutes=90)
}
    
dag = DAG(dag_id='etl_update', default_args=default_args)

sensor = FileSensor(task_id='sense_file', 
                    filepath='/home/repl/workspace/startprocess.txt',
                    poke_interval=45,
                    dag=dag)

bash_task = BashOperator(task_id='cleanup_tempfiles', 
                         bash_command='rm -f /home/repl/*.tmp',
                         dag=dag)

python_task = PythonOperator(task_id='run_processing', 
                             python_callable=process_data,
                             provide_context=True,
                             dag=dag)

email_subject="""
  Email report for {{ params.department }} on {{ ds_nodash }}
"""


email_report_task = EmailOperator(task_id='email_report_task',
                                  to='sales@mycompany.com',
                                  subject=email_subject,
                                  html_content='',
                                  params={'department': 'Data subscription services'},
                                  dag=dag)


no_email_task = DummyOperator(task_id='no_email_task', dag=dag)


def check_weekend(**kwargs):
    dt = datetime.strptime(kwargs['execution_date'],"%Y-%m-%d")
    # If dt.weekday() is 0-4, it's Monday - Friday. If 5 or 6, it's Sat / Sun.
    if (dt.weekday() < 5):
        return 'email_report_task'
    else:
        return 'no_email_task'
    
    
branch_task = BranchPythonOperator(task_id='check_if_weekend',
                                   provide_context=True,
                                   python_callable=check_weekend,
                                   dag=dag)

    
sensor >> bash_task >> python_task

python_task >> branch_task >> [email_report_task, no_email_task]