If you've just installed Apache Airflow and are staring at an empty dags/ folder, this guide is for you. By the end, you'll have a real, working DAG that extracts data, transforms it, and loads it — the classic ETL pattern.
Prerequisites
- Apache Airflow 2.8+ installed (locally or via Docker)
- Python 3.8+
- Basic Python knowledge
What is a DAG?
A DAG (Directed Acyclic Graph) is the core abstraction in Airflow. It's a Python file that defines:
- What to do — the tasks and their logic
- When to do it — the schedule
- In what order — the dependencies between tasks
"Directed" means dependencies flow in one direction. "Acyclic" means there are no loops. This structure guarantees Airflow can always determine which tasks to run next.
Your First DAG: The TaskFlow Way
Since Airflow 2.0, the recommended way to write DAGs is using the TaskFlow API — it uses Python decorators and makes XCom passing implicit.
from __future__ import annotations
from datetime import datetime, timedelta
from airflow.decorators import dag, task
@dag(
dag_id="my_first_etl",
description="A simple ETL DAG to get started with Airflow",
schedule="@daily",
start_date=datetime(2024, 1, 1),
catchup=False,
default_args={
"retries": 2,
"retry_delay": timedelta(minutes=5),
},
tags=["beginner", "etl"],
)
def my_first_etl():
"""My first Airflow DAG — a simple ETL pipeline."""
@task()
def extract() -> list[dict]:
"""Simulate extracting data from an API."""
print("Extracting data from source...")
return [
{"id": 1, "name": "Alice", "score": 95},
{"id": 2, "name": "Bob", "score": 87},
{"id": 3, "name": "Charlie", "score": 92},
]
@task()
def transform(records: list[dict]) -> list[dict]:
"""Add a grade field based on score."""
def get_grade(score: int) -> str:
if score >= 90:
return "A"
elif score >= 80:
return "B"
return "C"
return [
{**r, "grade": get_grade(r["score"])}
for r in records
]
@task()
def load(records: list[dict]) -> None:
"""Simulate loading data to a data warehouse."""
print(f"Loading {len(records)} records to warehouse...")
for record in records:
print(f" ✓ {record['name']}: {record['score']} ({record['grade']})")
print("Load complete!")
# Wire up the pipeline — data flows automatically via XCom
raw_data = extract()
transformed = transform(raw_data)
load(transformed)
# Instantiate the DAG
my_first_etl()Save this file to your dags/ directory. Airflow will detect it within 30 seconds (configurable via dag_dir_list_interval).
Understanding the Key Parameters
schedule
The schedule parameter accepts:
- Cron expressions:
"0 6 * * *"(daily at 6am) - Airflow presets:
"@daily","@hourly","@weekly","@monthly" - Timedelta:
timedelta(hours=6)(every 6 hours) - Dataset objects: trigger on data availability (covered in an advanced guide)
None: manual trigger only
# These are all equivalent:
schedule="@daily"
schedule="0 0 * * *"
schedule=timedelta(days=1)start_date
The start_date tells Airflow when to start scheduling runs. Combined with schedule, it determines the backfill window.
Important: Always use a fixed, static
start_date. Never usedatetime.now()— it changes on every DAG parse and will confuse the scheduler.
catchup=False
By default, catchup=True — Airflow will try to run all missed DAG runs from start_date to now. For most pipelines, you want catchup=False to only run from the present forward.
# DON'T do this — will create hundreds of backfill runs
@dag(start_date=datetime(2020, 1, 1), catchup=True)
# DO this for new pipelines
@dag(start_date=datetime(2024, 1, 1), catchup=False)Classic Operator Style (Pre-2.0)
If you're maintaining older DAGs or need to use operators that don't have TaskFlow wrappers, here's the equivalent classic style:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
def extract_fn(**context):
records = [
{"id": 1, "name": "Alice", "score": 95},
{"id": 2, "name": "Bob", "score": 87},
]
# Push to XCom manually
context["ti"].xcom_push(key="records", value=records)
def transform_fn(**context):
records = context["ti"].xcom_pull(key="records", task_ids="extract")
transformed = [{**r, "grade": "A" if r["score"] >= 90 else "B"}
for r in records]
context["ti"].xcom_push(key="transformed", value=transformed)
def load_fn(**context):
records = context["ti"].xcom_pull(key="transformed", task_ids="transform")
print(f"Loading {len(records)} records")
with DAG(
dag_id="classic_etl",
start_date=datetime(2024, 1, 1),
schedule="@daily",
catchup=False,
default_args={"retries": 2, "retry_delay": timedelta(minutes=5)},
) as dag:
extract = PythonOperator(task_id="extract", python_callable=extract_fn)
transform = PythonOperator(task_id="transform", python_callable=transform_fn)
load = PythonOperator(task_id="load", python_callable=load_fn)
extract >> transform >> load # Set dependencies with >>The >> operator sets task dependencies (left task must complete before right). You can also use << or set_downstream() / set_upstream().
Viewing Your DAG
- Open the Airflow UI at
http://localhost:8080 - Toggle the DAG on (the toggle on the left of the DAG list)
- Click into the DAG to see the Graph View
- Trigger a manual run with the ▶ button
- Watch tasks turn green as they succeed
Common Beginner Mistakes
Using mutable default arguments
# ❌ Wrong — default_args dict is shared and mutated
DEFAULT = {"retries": 3}
@dag(default_args=DEFAULT)
def my_dag(): ...
# ✅ Right — new dict each time
@dag(default_args={"retries": 3})
def my_dag(): ...Importing at the module level
# ❌ Wrong — this runs on EVERY scheduler heartbeat
import pandas as pd # Expensive import at parse time
df = pd.read_csv("s3://bucket/large-file.csv") # Absolute disaster
@dag(...)
def my_dag(): ...# ✅ Right — import inside the task function
@task()
def process():
import pandas as pd # Only imports when task runs
df = pd.read_csv("s3://bucket/large-file.csv")Storing large data in XCom
XCom stores data in the Airflow metadata database. It's designed for small values (IDs, config, metadata) — not for DataFrames or large payloads.
# ❌ Wrong — writing a 1GB DataFrame to the metadata DB
@task()
def process() -> pd.DataFrame:
return huge_dataframe # Will crash your metadata DB
# ✅ Right — pass a path or reference
@task()
def process() -> str:
huge_dataframe.to_parquet("s3://bucket/output/data.parquet")
return "s3://bucket/output/data.parquet" # Just the pathNext Steps
Now that your first DAG is running, here's where to go next:
- Best Practices for Task Retries — Configure robust error handling
- Dynamic Task Mapping — Scale tasks based on runtime data
- Airflow + dbt Integration — Orchestrate dbt models with Airflow
Happy pipelining! 🚀