Skip to main content

Apache Airflow Introduction

Guangzhou, China

I have been looking into Python Ray to provision / update ML models for prediction API. In my research I have been hearing about two alternative platforms that supposedly offer the same or similar benefits to MLOps workflows. So before diving into the nitty, gritty details of Ray let's take a look at the alternatives - Apache Airflow and the Kubernetes native Kubeflow. So let's start with Airflow.

Introduction

Apache Airflow is an open-source platform for developing, scheduling, and monitoring batch-oriented workflows. Airflow’s extensible Python framework enables you to build workflows connecting with virtually any technology. A web interface helps manage the state of your workflows. Airflow is deployable in many ways, varying from a single process on your laptop to a distributed setup to support even the biggest workflows.

The main characteristic of Airflow workflows is that all workflows are defined in Python code allowing us to write dynamic Data Pipelines called DAG's (Directed Acyclic Graph) — a mathematical abstraction of a pipeline. “Workflows as code” serves several purposes:

  • Dynamic: Airflow pipelines are configured as Python code, allowing for dynamic pipeline generation.

  • Extensible: The Airflow framework contains operators to connect with numerous technologies. All Airflow components are extensible to easily adjust to your environment.

  • Flexible: Workflow parameterization is built-in leveraging the Jinja templating engine.

Installation with Docker

Clone the Github Repository (linked above) and enter the docker directory. Here you find a Dockerfile and shell script we will need to start Airflow inside a Docker container.

Note that the shell script contains instructions to create the default admin user. You can change the login credentials there - the default login is set to admin/admin.

We can build the Docker image by running the following command inside the directory:

docker build -t airflow-postgres .

This will download the official Python Docker image as base and installs the latest version of Airflow (currently v2.5.1 with support for Python v3.10).

You can now run the container exposing the WebUI port using:

docker run --rm -d -p 8080:8080 --name airflow airflow-postgres

You can visit the WebUI on http://localhost:8080 and login with the user that was created by the shell script:

Apache Airflow Introduction

We now have access to the user interface and can take control over workflows:

Apache Airflow Introduction

Command Line Interface

Besides the web interface we can also interact with Airflow through it's CLI interface. Since Airflow is running inside the airflow Docker container we first have to access this environment:

docker exec -ti airflow /bin/bash
usage: airflow [-h] GROUP_OR_COMMAND ...

positional arguments:
  GROUP_OR_COMMAND

    Groups:
      celery         Celery components
      config         View configuration
      connections    Manage connections
      dags           Manage DAGs
      db             Database operations
      jobs           Manage jobs
      kubernetes     Tools to help run the KubernetesExecutor
      pools          Manage pools
      providers      Display providers
      roles          Manage roles
      tasks          Manage tasks
      users          Manage users
      variables      Manage variables

    Commands:
      cheat-sheet    Display cheat sheet
      dag-processor  Start a standalone Dag Processor instance
      info           Show information about current Airflow and environment
      kerberos       Start a kerberos ticket renewer
      plugins        Dump information about loaded plugins
      rotate-fernet-key
                     Rotate encrypted connection credentials and variables
      scheduler      Start a scheduler instance
      standalone     Run an all-in-one copy of Airflow
      sync-perm      Update permissions for existing roles and optionally DAGs
      triggerer      Start a triggerer instance
      version        Show the version
      webserver      Start a Airflow webserver instanc

CLI Cheat Sheet

CommandsDescription
Miscellaneous commands
airflow cheat-sheetDisplay cheat sheet
airflow dag-processorStart a standalone Dag Processor instance
airflow infoShow information about current Airflow and environment
airflow kerberosStart a kerberos ticket renewer
airflow pluginsDump information about loaded plugins
airflow rotate-fernet-keyRotate encrypted connection credentials and variables
airflow schedulerStart a scheduler instance
airflow standaloneRun an all-in-one copy of Airflow
airflow sync-permUpdate permissions for existing roles and optionally DAGs
airflow triggererStart a triggerer instance
airflow versionShow the version
airflow webserverStart a Airflow webserver instance
Celery components
airflow celery flowerStart a Celery Flower
airflow celery stopStop the Celery worker gracefully
airflow celery workerStart a Celery worker node
View configuration
airflow config get-valuePrint the value of the configuration
airflow config listList options for the configuration
Manage connections
airflow connections addAdd a connection
airflow connections deleteDelete a connection
airflow connections exportExport all connections
airflow connections getGet a connection
airflow connections importImport connections from a file
airflow connections listList connections
Manage DAGs
airflow dags backfillRun subsections of a DAG for a specified date range
airflow dags deleteDelete all DB records related to the specified DAG
airflow dags listList all the DAGs
airflow dags list-import-errorsList all the DAGs that have import errors
airflow dags list-jobsList the jobs
airflow dags list-runsList DAG runs given a DAG id
airflow dags next-executionGet the next execution datetimes of a DAG
airflow dags pausePause a DAG
airflow dags reportShow DagBag loading report
airflow dags reserializeReserialize all DAGs by parsing the DagBag files
airflow dags showDisplays DAG's tasks with their dependencies
airflow dags show-dependenciesDisplays DAGs with their dependencies
airflow dags stateGet the status of a dag run
airflow dags testExecute one single DagRun
airflow dags triggerTrigger a DAG run
airflow dags unpauseResume a paused DAG
Database operations
airflow db checkCheck if the database can be reached
airflow db check-migrationsCheck if migration have finished
airflow db cleanPurge old records in metastore tables
airflow db downgradeDowngrade the schema of the metadata database.
airflow db initInitialize the metadata database
airflow db resetBurn down and rebuild the metadata database
airflow db shellRuns a shell to access the database
airflow db upgradeUpgrade the metadata database to latest version
Manage jobs
airflow jobs checkChecks if job(s) are still alive
Tools to help run the KubernetesExecutor
airflow kubernetes cleanup-podsClean up Kubernetes pods (created by KubernetesExecutor/KubernetesPodOperator) in evicted/failed/succeeded/pending states
airflow kubernetes generate-dag-yamlGenerate YAML files for all tasks in DAG. Useful for debugging tasks without launching into a cluster
Manage pools
airflow pools deleteDelete pool
airflow pools exportExport all pools
airflow pools getGet pool size
airflow pools importImport pools
airflow pools listList pools
airflow pools setConfigure pool
Display providers
airflow providers authGet information about API auth backends provided
airflow providers behavioursGet information about registered connection types with custom behaviours
airflow providers getGet detailed information about a provider
airflow providers hooksList registered provider hooks
airflow providers linksList extra links registered by the providers
airflow providers listList installed providers
airflow providers loggingGet information about task logging handlers provided
airflow providers secretsGet information about secrets backends provided
airflow providers widgetsGet information about registered connection form widgets
Manage roles
airflow roles add-permsAdd roles permissions
airflow roles createCreate role
airflow roles del-permsDelete roles permissions
airflow roles deleteDelete role
airflow roles exportExport roles (without permissions) from db to JSON file
airflow roles importImport roles (without permissions) from JSON file to db
airflow roles listList roles
Manage tasks
airflow tasks clearClear a set of task instance, as if they never ran
airflow tasks failed-depsReturns the unmet dependencies for a task instance
airflow tasks listList the tasks within a DAG
airflow tasks renderRender a task instance's template(s)
airflow tasks runRun a single task instance
airflow tasks stateGet the status of a task instance
airflow tasks states-for-dag-runGet the status of all task instances in a dag run
airflow tasks testTest a task instance
Manage users
airflow users add-roleAdd role to a user
airflow users createCreate a user
airflow users deleteDelete a user
airflow users exportExport all users
airflow users importImport users
airflow users listList users
airflow users remove-roleRemove role from a user
Manage variables
airflow variables deleteDelete variable
airflow variables exportExport all variables
airflow variables getGet variable
airflow variables importImport variables
airflow variables listList variables
airflow variables setSet variable

Common Commands

  • airflow db init: Initialise the metadatabase
  • airflow db reset: Reinitialize the metadatabase (Drop everything)
  • airflow db upgrade: Upgrade the metadatabase (Latest schemas, values, ...)
  • airflow webserver: Start Airflow’s webserver
  • airflow scheduler: Start Airflow’s scheduler
  • airflow celery worker: Start a Celery worker (Useful in distributed mode to spread tasks among nodes - machines)
  • airflow dags list: Give the list of known dags (either those in the examples folder or in dags folder)
  • airflow dags trigger example_python_operator: Trigger the dag example_python_operator with the current date as execution date
  • airflow dags trigger example_python_operator -e 2023-01-01: Trigger the dag example_python_operator with a date in the past as execution date (This won’t trigger the tasks of that dag unless you set the option catchup=True in the DAG definition)
  • airflow dags trigger example_python_operator -e '2023-01-01 19:04:00+00:00': Trigger the dag example_python_operator with a date in the future (change the date here with one having +2 minutes later than the current date displayed in the Airflow UI). The dag will be scheduled at that date.
  • airflow dags list-runs -d example_python_operator: Display the history of example_python_operator’s dag runs
  • airflow tasks list example_python_operator: List the tasks contained into the example_python_operator dag
  • airflow tasks test example_python_operator print_the_context 2023-01-01: Allow to test a task (print_the_context) from a given dag (example_python_operator here) without taking care of dependencies and past runs. Useful for debugging.