Apache Airflow Introduction
I have been looking into Python Ray to provision / update ML models for prediction API. In my research I have been hearing about two alternative platforms that supposedly offer the same or similar benefits to MLOps workflows. So before diving into the nitty, gritty details of Ray let's take a look at the alternatives - Apache Airflow and the Kubernetes native Kubeflow. So let's start with Airflow.
Introduction
Apache Airflow is an open-source platform for developing, scheduling, and monitoring batch-oriented workflows. Airflow’s extensible Python framework enables you to build workflows connecting with virtually any technology. A web interface helps manage the state of your workflows. Airflow is deployable in many ways, varying from a single process on your laptop to a distributed setup to support even the biggest workflows.
The main characteristic of Airflow workflows is that all workflows are defined in Python code allowing us to write dynamic Data Pipelines called DAG's (Directed Acyclic Graph) — a mathematical abstraction of a pipeline. “Workflows as code” serves several purposes:
-
Dynamic: Airflow pipelines are configured as Python code, allowing for dynamic pipeline generation.
-
Extensible: The Airflow framework contains operators to connect with numerous technologies. All Airflow components are extensible to easily adjust to your environment.
-
Flexible: Workflow parameterization is built-in leveraging the Jinja templating engine.
Installation with Docker
Clone the Github Repository (linked above) and enter the docker
directory. Here you find a Dockerfile and shell script we will need to start Airflow inside a Docker container.
Note that the shell script contains instructions to create the default admin user. You can change the login credentials there - the default login is set to
admin
/admin
.
We can build the Docker image by running the following command inside the directory:
docker build -t airflow-postgres .
This will download the official Python Docker image as base and installs the latest version of Airflow (currently v2.5.1
with support for Python v3.10
).
You can now run the container exposing the WebUI port using:
docker run --rm -d -p 8080:8080 --name airflow airflow-postgres
You can visit the WebUI on http://localhost:8080
and login with the user that was created by the shell script:
We now have access to the user interface and can take control over workflows:
Command Line Interface
Besides the web interface we can also interact with Airflow through it's CLI interface. Since Airflow is running inside the airflow
Docker container we first have to access this environment:
docker exec -ti airflow /bin/bash
usage: airflow [-h] GROUP_OR_COMMAND ...
positional arguments:
GROUP_OR_COMMAND
Groups:
celery Celery components
config View configuration
connections Manage connections
dags Manage DAGs
db Database operations
jobs Manage jobs
kubernetes Tools to help run the KubernetesExecutor
pools Manage pools
providers Display providers
roles Manage roles
tasks Manage tasks
users Manage users
variables Manage variables
Commands:
cheat-sheet Display cheat sheet
dag-processor Start a standalone Dag Processor instance
info Show information about current Airflow and environment
kerberos Start a kerberos ticket renewer
plugins Dump information about loaded plugins
rotate-fernet-key
Rotate encrypted connection credentials and variables
scheduler Start a scheduler instance
standalone Run an all-in-one copy of Airflow
sync-perm Update permissions for existing roles and optionally DAGs
triggerer Start a triggerer instance
version Show the version
webserver Start a Airflow webserver instanc
CLI Cheat Sheet
Commands | Description |
---|---|
Miscellaneous commands | |
airflow cheat-sheet | Display cheat sheet |
airflow dag-processor | Start a standalone Dag Processor instance |
airflow info | Show information about current Airflow and environment |
airflow kerberos | Start a kerberos ticket renewer |
airflow plugins | Dump information about loaded plugins |
airflow rotate-fernet-key | Rotate encrypted connection credentials and variables |
airflow scheduler | Start a scheduler instance |
airflow standalone | Run an all-in-one copy of Airflow |
airflow sync-perm | Update permissions for existing roles and optionally DAGs |
airflow triggerer | Start a triggerer instance |
airflow version | Show the version |
airflow webserver | Start a Airflow webserver instance |
Celery components | |
airflow celery flower | Start a Celery Flower |
airflow celery stop | Stop the Celery worker gracefully |
airflow celery worker | Start a Celery worker node |
View configuration | |
airflow config get-value | Print the value of the configuration |
airflow config list | List options for the configuration |
Manage connections | |
airflow connections add | Add a connection |
airflow connections delete | Delete a connection |
airflow connections export | Export all connections |
airflow connections get | Get a connection |
airflow connections import | Import connections from a file |
airflow connections list | List connections |
Manage DAGs | |
airflow dags backfill | Run subsections of a DAG for a specified date range |
airflow dags delete | Delete all DB records related to the specified DAG |
airflow dags list | List all the DAGs |
airflow dags list-import-errors | List all the DAGs that have import errors |
airflow dags list-jobs | List the jobs |
airflow dags list-runs | List DAG runs given a DAG id |
airflow dags next-execution | Get the next execution datetimes of a DAG |
airflow dags pause | Pause a DAG |
airflow dags report | Show DagBag loading report |
airflow dags reserialize | Reserialize all DAGs by parsing the DagBag files |
airflow dags show | Displays DAG's tasks with their dependencies |
airflow dags show-dependencies | Displays DAGs with their dependencies |
airflow dags state | Get the status of a dag run |
airflow dags test | Execute one single DagRun |
airflow dags trigger | Trigger a DAG run |
airflow dags unpause | Resume a paused DAG |
Database operations | |
airflow db check | Check if the database can be reached |
airflow db check-migrations | Check if migration have finished |
airflow db clean | Purge old records in metastore tables |
airflow db downgrade | Downgrade the schema of the metadata database. |
airflow db init | Initialize the metadata database |
airflow db reset | Burn down and rebuild the metadata database |
airflow db shell | Runs a shell to access the database |
airflow db upgrade | Upgrade the metadata database to latest version |
Manage jobs | |
airflow jobs check | Checks if job(s) are still alive |
Tools to help run the KubernetesExecutor | |
airflow kubernetes cleanup-pods | Clean up Kubernetes pods (created by KubernetesExecutor/KubernetesPodOperator) in evicted/failed/succeeded/pending states |
airflow kubernetes generate-dag-yaml | Generate YAML files for all tasks in DAG. Useful for debugging tasks without launching into a cluster |
Manage pools | |
airflow pools delete | Delete pool |
airflow pools export | Export all pools |
airflow pools get | Get pool size |
airflow pools import | Import pools |
airflow pools list | List pools |
airflow pools set | Configure pool |
Display providers | |
airflow providers auth | Get information about API auth backends provided |
airflow providers behaviours | Get information about registered connection types with custom behaviours |
airflow providers get | Get detailed information about a provider |
airflow providers hooks | List registered provider hooks |
airflow providers links | List extra links registered by the providers |
airflow providers list | List installed providers |
airflow providers logging | Get information about task logging handlers provided |
airflow providers secrets | Get information about secrets backends provided |
airflow providers widgets | Get information about registered connection form widgets |
Manage roles | |
airflow roles add-perms | Add roles permissions |
airflow roles create | Create role |
airflow roles del-perms | Delete roles permissions |
airflow roles delete | Delete role |
airflow roles export | Export roles (without permissions) from db to JSON file |
airflow roles import | Import roles (without permissions) from JSON file to db |
airflow roles list | List roles |
Manage tasks | |
airflow tasks clear | Clear a set of task instance, as if they never ran |
airflow tasks failed-deps | Returns the unmet dependencies for a task instance |
airflow tasks list | List the tasks within a DAG |
airflow tasks render | Render a task instance's template(s) |
airflow tasks run | Run a single task instance |
airflow tasks state | Get the status of a task instance |
airflow tasks states-for-dag-run | Get the status of all task instances in a dag run |
airflow tasks test | Test a task instance |
Manage users | |
airflow users add-role | Add role to a user |
airflow users create | Create a user |
airflow users delete | Delete a user |
airflow users export | Export all users |
airflow users import | Import users |
airflow users list | List users |
airflow users remove-role | Remove role from a user |
Manage variables | |
airflow variables delete | Delete variable |
airflow variables export | Export all variables |
airflow variables get | Get variable |
airflow variables import | Import variables |
airflow variables list | List variables |
airflow variables set | Set variable |
Common Commands
airflow db init
: Initialise the metadatabaseairflow db reset
: Reinitialize the metadatabase (Drop everything)airflow db upgrade
: Upgrade the metadatabase (Latest schemas, values, ...)airflow webserver
: Start Airflow’s webserverairflow scheduler
: Start Airflow’s schedulerairflow celery worker
: Start a Celery worker (Useful in distributed mode to spread tasks among nodes - machines)airflow dags list
: Give the list of known dags (either those in the examples folder or in dags folder)airflow dags trigger example_python_operator
: Trigger the dag example_python_operator with the current date as execution dateairflow dags trigger example_python_operator -e 2023-01-01
: Trigger the dag example_python_operator with a date in the past as execution date (This won’t trigger the tasks of that dag unless you set the option catchup=True in the DAG definition)airflow dags trigger example_python_operator -e '2023-01-01 19:04:00+00:00'
: Trigger the dag example_python_operator with a date in the future (change the date here with one having +2 minutes later than the current date displayed in the Airflow UI). The dag will be scheduled at that date.airflow dags list-runs -d example_python_operator
: Display the history of example_python_operator’s dag runsairflow tasks list example_python_operator
: List the tasks contained into the example_python_operator dagairflow tasks test example_python_operator print_the_context 2023-01-01
: Allow to test a task (print_the_context) from a given dag (example_python_operator here) without taking care of dependencies and past runs. Useful for debugging.