๐Ÿš€ Crafting Your First Real Airflow DAG

Hello everyone! ๐Ÿ‘‹

Welcome back to Day 2 of my 15-day learning series on Apache Airflow! Today, weโ€™ll explore the anatomy of a DAG (Directed Acyclic Graph) and get hands-on by writing a simple workflow with three steps. Step 1: extracting the data of an art gallery simulating the response of a API call, Step 2: Transforming the data and Finally loading the data in a pdndas dataframe. Letโ€™s break down Chapter 2 of Data Pipelines with Apache Airflow and understand how Airflowโ€™s scheduling magic works.


๐Ÿ“˜ Writing Your First Workflow (DAG)

As we are already aware of the fact till now that in Airflow, a DAG represents a workflow. It is a collection of tasks that are executed in a specific order. Here’s the code we’ll be using:

from airflow import DAG 
from datetime import datetime 
from airflow.operators.python import PythonOperator 
import pandas as pd 

with DAG( 
    dag_id="art_gallery_etl_2024", 
    start_date=datetime(year=2024, month=1, day=1, hour=9, minute=0), 
    schedule="@daily", 
    catchup=True, 
    max_active_runs=1, 
    render_template_as_native_obj=True 
) as dag:

    def extract_art_data_callable(): 
        # Simulate data extraction from an art gallery API
        print("Extracting art piece data from gallery records") 
        return { 
            "date_acquired": "2022-09-15", 
            "artist": "Vincent van Gogh", 
            "title": "Starry Night", 
            "details": { 
                "type": "Painting", 
                "dimensions": "73.7 cm x 92.1 cm" 
            } 
        } 

    extract_art_data = PythonOperator( 
        dag=dag, 
        task_id="extract_art_data", 
        python_callable=extract_art_data_callable 
    )

    def transform_art_data_callable(raw_data): 
        # Transform raw data into a report format
        transformed_data = [ 
            [ 
                raw_data.get("date_acquired"), 
                raw_data.get("artist"), 
                raw_data.get("title"), 
                raw_data.get("details").get("type"), 
                raw_data.get("details").get("dimensions") 
            ] 
        ] 
        return transformed_data 

    transform_art_data = PythonOperator( 
        dag=dag, 
        task_id="transform_art_data", 
        python_callable=transform_art_data_callable, 
        op_kwargs={"raw_data": "{{ ti.xcom_pull(task_ids='extract_art_data') }}"} 
    )

    def load_art_data_callable(transformed_data): 
        loaded_data = pd.DataFrame(transformed_data) 
        loaded_data.columns = [ 
            "date_acquired", 
            "artist", 
            "title", 
            "art_type", 
            "dimensions" 
        ] 
        print(loaded_data) 

    load_art_data = PythonOperator( 
        dag=dag, 
        task_id="load_art_data", 
        python_callable=load_art_data_callable, 
        op_kwargs={"transformed_data": "{{ ti.xcom_pull(task_ids='transform_art_data') }}"} 
    )

    extract_art_data >> transform_art_data >> load_art_data

๐Ÿ” Breakdown of Key Components:

DAG: Defines the workflow (or data pipeline). It includes properties like:

  • dag_id: A unique identifier for the DAG.
  • start_date: The date when the DAG starts running.
  • schedule: Specifies when and how often the DAG should run (@daily in this case).
  • Tasks: In Airflow, each task performs a single action in the workflow. Here, we have:
    • extract_art_data: This task simulates extracting art gallery from an API.
    • transform_art_data: This task transforms the raw data into a structured format.
    • load_art_data: This task loads the transformed data into a pandas DataFrame.
  • Operators: Operators define what kind of task is being executed. We used PythonOperator to run Python functions in each step.
  • Dependencies: Using >>, we define the execution order, ensuring that extract_art_data runs first, followed by transform_art_data, and then load_art_data.

๐ŸŒ Examining the Airflow UI

Once youโ€™ve written our first DAG, itโ€™s time to deploy it and observe it in the Airflow UI:

  • Navigate to http://localhost:8080 to access the interface.
  • Youโ€™ll see your DAG listed there. When you trigger the DAG, you can monitor task statuses, start times, and logs.

๐Ÿ›  Handling Task Failures

Airflowโ€™s strength lies in its ability to handle task failures efficiently:

  • Logs: If a task fails, you can view detailed logs from the UI to troubleshoot the issue.
  • Selective Rerun: Instead of rerunning the entire pipeline, Airflow allows you to rerun just the failed task while keeping the successful ones intact.

This feature is particularly useful when you have long-running pipelines, as it saves time by allowing you to focus only on tasks that failed.

Airflow uses task instance state tracking, which means that successful tasks don’t need to be rerun unless explicitly cleared. When a task fails, you can view its logs, resolve the issue, and rerun just the failed task while leaving successful ones intact. This capability makes Airflow highly efficient for error recovery in complex data pipelines.


๐Ÿ‘จโ€๐Ÿ’ป My Takeaways

In this chapter, I learned how powerful and flexible Airflow DAGs are. By breaking down complex workflows into independent tasks with clear dependencies, you can build scalable data pipelines. The UI makes monitoring and debugging seamless, especially with the logs and selective rerun features.


Stay tuned for Day 3, where weโ€™ll dive deeper into Airflow’s architecture and its components! Letโ€™s continue this journey of mastering Apache Airflow together.

Cheers, Aditya

Avatar
Aditya Paliwal
Data Engineer @ Telenet

Related

Next
Previous