Skip to main content

Apache Airflow DAG Scheduling

Guangzhou, China

Basic Scheduling

As we have seen before every pipeline we build is defined with a start_date and a scheduled interval in which the pipeline should be run. The catchup parameter tells the DAG if it should try to run all the non-triggered DAG-runs since the start. Another, optional parameter is end_date:

with DAG("example_dag", start_date=datetime(2023,2,5), schedule="@daily", catchup=False):

Use Dataset Updates as Worflow Trigger

You can use Dataset Scheduler e.g. when you are working with multiple Airflow DAGs that depend on the results from the previous pipeline. So that pipeline 2 is automatically triggered once pipeline 1 finished updating a dataset.

So we first write a DAG that pre-processes our dataset - simulated by adding some text to a simple text file:

from airflow import DAG, Dataset
from airflow.decorators import task

from datetime import datetime

dataset = Dataset('/tmp/preprocessed_data.txt')
start_date = datetime(2023,2,5)
description = "Data pre-processing"
tags = ['scheduler', 'producer']
dag_id = 'producer'
schedule = '@daily'

with DAG(
dag_id=dag_id,
start_date=start_date,
schedule=schedule,
catchup=False,
description=description,
tags=tags
):

## decorator with outlet so updates
## to dataset can be used as trigger
@task(outlets=[dataset])
## preprocess data and write to file
def preprocess_dataset():
with open(dataset.uri, 'a+') as file:
file.write('preprocessed data ready for consumption')

preprocess_dataset()

The @task decorator now has an outlet that tells Airflow that once this task ran the dataset dataset was updated and can use this event to trigger the following DAG:

from airflow import DAG, Dataset
from airflow.decorators import task

from datetime import datetime

dataset = Dataset('/tmp/preprocessed_data.txt')
start_date = datetime(2023,2,5)
description = "Start this pipeline when new data arrives"
tags = ['scheduler', 'consumer']
dag_id = 'consumer'

with DAG(
start_date=start_date,
dag_id=dag_id,
schedule=[dataset],
catchup=False,
description=description,
tags=tags
):

@task
## process prepared data and write to file
def process_dataset():
with open(dataset.uri, 'r') as file:
print(file.read())

process_dataset()

Apache Airflow DAG Scheduling

Now as soon as we trigger the Producer DAG the dataset in form of the text file will be updated which in turn triggers the Consumer DAG:

Apache Airflow DAG Scheduling

Debugging DAGs

I am certain that I was able to find the execution logs in the Airflow UI before... hmm not sure where they are hiding now. But you can enter the Airflow container and get the task log from there:

docker exec -ti airflow /bin/bash
airflow info

Apache Airflow
version | 2.5.1
executor | SequentialExecutor
task_logging_handler | airflow.utils.log.file_task_handler.FileTaskHandler
sql_alchemy_conn | sqlite:////opt/airflow/airflow.db
dags_folder | /opt/airflow/dags
plugins_folder | /opt/airflow/plugins
base_log_folder | /opt/airflow/logs

...

I was able to find the log file in:

/opt/airflow/logs/dag_id=producer/run_id=manual__2023-02-05T13:10:07.002907+00:00/task_id=preprocess_dataset

And catch the Python error that was crashing my flow.

Importing Datasets

In the example above we needed to define the dataset in both DAGs. A cleaner solution is to have a dataset definition file instead:

mkdir /opt/airflow/dags/include
nano /opt/airflow/dags/include/datasets.py
from airflow import Dataset

DATASET_01 = Dataset('/tmp/preprocessed_data.txt')

Now we can import the dataset where ever we need it:

from airflow import DAG
from airflow.decorators import task

from datetime import datetime

from include.datasets import DATASET_01


start_date = datetime(2023,2,5)
description = "Data pre-processing"
tags = ['scheduler', 'producer']
dag_id = 'producer'
schedule = '@daily'

with DAG(
dag_id=dag_id,
start_date=start_date,
schedule=schedule,
catchup=False,
description=description,
tags=tags
):

## decorator with outlet so updates
## to dataset can be used as trigger
@task(outlets=[DATASET_01])
## preprocess data and write to file
def preprocess_dataset():
with open(DATASET_01.uri, 'a+') as file:
file.write('preprocessed data ready for consumption')

preprocess_dataset()

All your datasets and DAGs that depend on it can be monitored under the Datasets tab:

Apache Airflow DAG Scheduling

Airflow Sensors

With Dataset Schedulers Airflow monitors the successful completion of a task to trigger the next DAG. If we want to be able to react to other programs updating datasets we need to use Airflow Sensors instead. Sensors are a special type of Operator that are designed to do exactly one thing - wait for something to occur. It can be time-based, or waiting for a file, or an external event, but all they do is wait until something happens, and then succeed so their downstream tasks can run.

In the following example we receive data from 3 actors, have to pre-process it and after all data is collected process the prepared data:

Data Producer A -> Pre-Processing ->
Data Producer B -> Pre-Processing -> Processing
Data Producer C -> Pre-Processing ->
from airflow.models import DAG
from airflow.sensors.filesystem import FileSensor

from datetime import datetime

start_date = datetime(2023,2,5)
description = "Data pre-processing"
tags = ['sensor']
schedule = '@daily'


with DAG('dag_sensor',
schedule=schedule,
start_date=start_date,
description=description,
tags=tags
catchup=False
):

sensor_task = FileSensor(
task_id='waiting_for_change',
poke_interval=30,
# kill sensor if data does
# not arrive
timeout= 60 * 5,
# 'reschedule' or 'poke'
# the first releases the
# worker slot in between pokes
mode='reschedule',
# skip sensor after timeout
soft_fail=True,
filepath= '/tmp/input_data.txt'
)

...