π Scheduling Our DAG in Airflow
Hello everyone! π
Welcome to Day 3 of the series on mastering Apache Airflow. Today, weβre diving into scheduling DAGs, an essential component of automating workflows in Airflow. I suppose until now you are familiar with the airflow environment and basic concepts discussed in previous blogs. In not yet, I recomment you to cover those topics before we go into the details of scheduling a DAG.
β° Scheduling a DAG
In Airflow, the schedule_interval parameter controls when and how often a DAG runs. There are several ways to schedule our DAGs, depending on our needs. Let’s explore them!
π 1- Unscheduled DAGs
Sometimes, we may want to create a DAG that doesnβt run on a schedule but only when manually triggered. In such cases, we can set the schedule_interval to None. Here’s an example:
from airflow import DAG
import datetime as dt
dag = DAG(
dag_id="01_unscheduled",
start_date=dt.datetime(2019, 1, 1),
schedule_interval=None,
)
Here:
- schedule_interval=None: This means the DAG will only run when triggered manually and won’t execute automatically on a schedule.
π 2- Running at Regular Intervals
Airflow allows us to use predefined frequently used scheduling intervals such as @daily, @hourly, @weekly, etc., to schedule a DAG. Here’s an example that runs daily:
dag = DAG(
dag_id="03_with_end_date",
schedule_interval="@daily",
start_date=dt.datetime(2019, 1, 1),
end_date=dt.datetime(2019, 1, 5),
)
In this example:
- start_date: The date after which the DAG begins running.
- end_date: When the DAG stops running.
There is an option to omit the end_date parameter, but in this case the DAG will continue running indefinitely.
π°οΈ 4- Cron-based Intervals
Another popular way to schedule a DAG is using Cron expressions, which give us fine-grained control over scheduling.
For example, to run a DAG every day at midnight, we can use:
schedule_interval="0 0 * * *"
While Cron is powerful, it can get tricky to write custom schedules like “every third day” or “every 10 minutes.” For example, scheduling a DAG every three days with a Cron expression might cause issues. It could run consecutively on both the 31st and the 1st of the next month, violating the intended schedule.
To overcome this challange, airflow provides an option to use Frequency based Intervals.
β³ 5- Frequency-based Intervals
If we need more flexibility, like running a task every 3 days or every 5 minutes, we can use timedelta from Pythonβs datetime module.
dag = DAG(
dag_id="03_frequency_based",
schedule_interval=dt.timedelta(days=3),
start_date=dt.datetime(2019, 1, 1),
)
Hereβs how we can set a schedule for every 5 minutes:
schedule_interval=dt.timedelta(minutes=5)
Or, every 4 hours:
schedule_interval=dt.timedelta(hours=4)
This approach avoids some of the limitations of Cron-based schedules.
π Processing Data Incrementally
Let us consider that we have a task to load data into our system using an API call. One way is that we load the entire data set at once but in that case we wont be able to build the history. To build an incremental history of events in our database, Airflow allows us to process data incrementally by leveraging the execution_date and next_execution_date functionalities.
Now using these, we can load data for the specific time window between the DAG’s execution_date and the next one, ensuring that we only process new data. his feature is particularly useful for large datasets, reducing the need for processing everything from scratch every time.
π Using Backfilling in Airflow
When we are loading the data in tntervals using start and an end date, sometimes, we might need to run a DAG for a period in the past where it was either missed or not scheduled. Airflow supports this using the catchup parameter. Let’s understand this with the piece of code given below:
dag = DAG(
dag_id="09_no_catchup",
schedule_interval="@daily",
start_date=dt.datetime(year=2019, month=1, day=1),
end_date=dt.datetime(year=2019, month=1, day=5),
catchup=False
)
In the above code, pay attention to the parameter catchup. Here is how it works:
- catchup=True: Airflow will run tasks for each missed period between the start_date and the current date.
- catchup=False: Airflow will only run future tasks, ignoring any missed intervals.
β Best Practices for Designing Tasks
Designing tasks properly is key to ensuring our workflows are reliable and scalable. Here are two core principles:
- Atomicity: Each task should perform a single responsibility, keeping it simple and isolated.
- Idempotency: Tasks should be able to run multiple times without producing different results, making them safe to retry.
π My Takeaways
Understanding how to schedule our DAG is fundamental to building automated workflows. We can set schedules based on fixed intervals, cron expressions, or even custom intervals using timedelta. The flexibility Airflow offers ensures us can meet any scheduling requirement, whether itβs running tasks every minute or every third day. Last but not the least, we also learned how to handle backfilling and designing tasks that are both atomic and idempotent.
π Stay tuned for Day 4, where we’ll cover the chapter 4 from the book discussing how we can use DAGs to communicate to other systems. Feel free to share your thoughts and questions via Linkedin or email. Letβs keep on learning together! π
Cheers, Aditya