๐ Crafting Your First Real Airflow DAG
Hello everyone! ๐
Welcome back to Day 2 of my 15-day learning series on Apache Airflow! Today, weโll explore the anatomy of a DAG (Directed Acyclic Graph) and get hands-on by writing a simple workflow with three steps. Step 1: extracting the data of an art gallery simulating the response of a API call, Step 2: Transforming the data and Finally loading the data in a pdndas dataframe. Letโs break down Chapter 2 of Data Pipelines with Apache Airflow and understand how Airflowโs scheduling magic works.
๐ Writing Your First Workflow (DAG)
As we are already aware of the fact till now that in Airflow, a DAG represents a workflow. It is a collection of tasks that are executed in a specific order. Here’s the code we’ll be using:
from airflow import DAG
from datetime import datetime
from airflow.operators.python import PythonOperator
import pandas as pd
with DAG(
dag_id="art_gallery_etl_2024",
start_date=datetime(year=2024, month=1, day=1, hour=9, minute=0),
schedule="@daily",
catchup=True,
max_active_runs=1,
render_template_as_native_obj=True
) as dag:
def extract_art_data_callable():
# Simulate data extraction from an art gallery API
print("Extracting art piece data from gallery records")
return {
"date_acquired": "2022-09-15",
"artist": "Vincent van Gogh",
"title": "Starry Night",
"details": {
"type": "Painting",
"dimensions": "73.7 cm x 92.1 cm"
}
}
extract_art_data = PythonOperator(
dag=dag,
task_id="extract_art_data",
python_callable=extract_art_data_callable
)
def transform_art_data_callable(raw_data):
# Transform raw data into a report format
transformed_data = [
[
raw_data.get("date_acquired"),
raw_data.get("artist"),
raw_data.get("title"),
raw_data.get("details").get("type"),
raw_data.get("details").get("dimensions")
]
]
return transformed_data
transform_art_data = PythonOperator(
dag=dag,
task_id="transform_art_data",
python_callable=transform_art_data_callable,
op_kwargs={"raw_data": "{{ ti.xcom_pull(task_ids='extract_art_data') }}"}
)
def load_art_data_callable(transformed_data):
loaded_data = pd.DataFrame(transformed_data)
loaded_data.columns = [
"date_acquired",
"artist",
"title",
"art_type",
"dimensions"
]
print(loaded_data)
load_art_data = PythonOperator(
dag=dag,
task_id="load_art_data",
python_callable=load_art_data_callable,
op_kwargs={"transformed_data": "{{ ti.xcom_pull(task_ids='transform_art_data') }}"}
)
extract_art_data >> transform_art_data >> load_art_data
๐ Breakdown of Key Components:
DAG: Defines the workflow (or data pipeline). It includes properties like:
- dag_id: A unique identifier for the DAG.
- start_date: The date when the DAG starts running.
- schedule: Specifies when and how often the DAG should run (@daily in this case).
- Tasks: In Airflow, each task performs a single action in the workflow. Here, we have:
- extract_art_data: This task simulates extracting art gallery from an API.
- transform_art_data: This task transforms the raw data into a structured format.
- load_art_data: This task loads the transformed data into a pandas DataFrame.
- Operators: Operators define what kind of task is being executed. We used PythonOperator to run Python functions in each step.
- Dependencies: Using >>, we define the execution order, ensuring that extract_art_data runs first, followed by transform_art_data, and then load_art_data.
๐ Examining the Airflow UI
Once youโve written our first DAG, itโs time to deploy it and observe it in the Airflow UI:
- Navigate to http://localhost:8080 to access the interface.
- Youโll see your DAG listed there. When you trigger the DAG, you can monitor task statuses, start times, and logs.
๐ Handling Task Failures
Airflowโs strength lies in its ability to handle task failures efficiently:
- Logs: If a task fails, you can view detailed logs from the UI to troubleshoot the issue.
- Selective Rerun: Instead of rerunning the entire pipeline, Airflow allows you to rerun just the failed task while keeping the successful ones intact.
This feature is particularly useful when you have long-running pipelines, as it saves time by allowing you to focus only on tasks that failed.
Airflow uses task instance state tracking, which means that successful tasks don’t need to be rerun unless explicitly cleared. When a task fails, you can view its logs, resolve the issue, and rerun just the failed task while leaving successful ones intact. This capability makes Airflow highly efficient for error recovery in complex data pipelines.
๐จโ๐ป My Takeaways
In this chapter, I learned how powerful and flexible Airflow DAGs are. By breaking down complex workflows into independent tasks with clear dependencies, you can build scalable data pipelines. The UI makes monitoring and debugging seamless, especially with the logs and selective rerun features.
Stay tuned for Day 3, where weโll dive deeper into Airflow’s architecture and its components! Letโs continue this journey of mastering Apache Airflow together.
Cheers, Aditya