Back to Blog

Writing Your First Apache Airflow DAG

A hands-on beginner's guide to creating your first DAG in Apache Airflow. We cover the core concepts — tasks, operators, scheduling, and dependencies — with complete, runnable code.

Prashant Singh

Prashant Singh

Senior Data Engineer

5 min read
Share:

If you've just installed Apache Airflow and are staring at an empty dags/ folder, this guide is for you. By the end, you'll have a real, working DAG that extracts data, transforms it, and loads it — the classic ETL pattern.

Prerequisites

  • Apache Airflow 2.8+ installed (locally or via Docker)
  • Python 3.8+
  • Basic Python knowledge

What is a DAG?

A DAG (Directed Acyclic Graph) is the core abstraction in Airflow. It's a Python file that defines:

  1. What to do — the tasks and their logic
  2. When to do it — the schedule
  3. In what order — the dependencies between tasks

"Directed" means dependencies flow in one direction. "Acyclic" means there are no loops. This structure guarantees Airflow can always determine which tasks to run next.

Your First DAG: The TaskFlow Way

Since Airflow 2.0, the recommended way to write DAGs is using the TaskFlow API — it uses Python decorators and makes XCom passing implicit.

dags/my_first_dag.py
from __future__ import annotations
 
from datetime import datetime, timedelta
 
from airflow.decorators import dag, task
 
 
@dag(
    dag_id="my_first_etl",
    description="A simple ETL DAG to get started with Airflow",
    schedule="@daily",
    start_date=datetime(2024, 1, 1),
    catchup=False,
    default_args={
        "retries": 2,
        "retry_delay": timedelta(minutes=5),
    },
    tags=["beginner", "etl"],
)
def my_first_etl():
    """My first Airflow DAG — a simple ETL pipeline."""
 
    @task()
    def extract() -> list[dict]:
        """Simulate extracting data from an API."""
        print("Extracting data from source...")
        return [
            {"id": 1, "name": "Alice", "score": 95},
            {"id": 2, "name": "Bob", "score": 87},
            {"id": 3, "name": "Charlie", "score": 92},
        ]
 
    @task()
    def transform(records: list[dict]) -> list[dict]:
        """Add a grade field based on score."""
        def get_grade(score: int) -> str:
            if score >= 90:
                return "A"
            elif score >= 80:
                return "B"
            return "C"
 
        return [
            {**r, "grade": get_grade(r["score"])}
            for r in records
        ]
 
    @task()
    def load(records: list[dict]) -> None:
        """Simulate loading data to a data warehouse."""
        print(f"Loading {len(records)} records to warehouse...")
        for record in records:
            print(f"  ✓ {record['name']}: {record['score']} ({record['grade']})")
        print("Load complete!")
 
    # Wire up the pipeline — data flows automatically via XCom
    raw_data = extract()
    transformed = transform(raw_data)
    load(transformed)
 
 
# Instantiate the DAG
my_first_etl()

Save this file to your dags/ directory. Airflow will detect it within 30 seconds (configurable via dag_dir_list_interval).

Understanding the Key Parameters

schedule

The schedule parameter accepts:

  • Cron expressions: "0 6 * * *" (daily at 6am)
  • Airflow presets: "@daily", "@hourly", "@weekly", "@monthly"
  • Timedelta: timedelta(hours=6) (every 6 hours)
  • Dataset objects: trigger on data availability (covered in an advanced guide)
  • None: manual trigger only
# These are all equivalent:
schedule="@daily"
schedule="0 0 * * *"
schedule=timedelta(days=1)

start_date

The start_date tells Airflow when to start scheduling runs. Combined with schedule, it determines the backfill window.

Important: Always use a fixed, static start_date. Never use datetime.now() — it changes on every DAG parse and will confuse the scheduler.

catchup=False

By default, catchup=True — Airflow will try to run all missed DAG runs from start_date to now. For most pipelines, you want catchup=False to only run from the present forward.

# DON'T do this — will create hundreds of backfill runs
@dag(start_date=datetime(2020, 1, 1), catchup=True)
 
# DO this for new pipelines
@dag(start_date=datetime(2024, 1, 1), catchup=False)

Classic Operator Style (Pre-2.0)

If you're maintaining older DAGs or need to use operators that don't have TaskFlow wrappers, here's the equivalent classic style:

dags/classic_style_dag.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
 
 
def extract_fn(**context):
    records = [
        {"id": 1, "name": "Alice", "score": 95},
        {"id": 2, "name": "Bob", "score": 87},
    ]
    # Push to XCom manually
    context["ti"].xcom_push(key="records", value=records)
 
 
def transform_fn(**context):
    records = context["ti"].xcom_pull(key="records", task_ids="extract")
    transformed = [{**r, "grade": "A" if r["score"] >= 90 else "B"}
                   for r in records]
    context["ti"].xcom_push(key="transformed", value=transformed)
 
 
def load_fn(**context):
    records = context["ti"].xcom_pull(key="transformed", task_ids="transform")
    print(f"Loading {len(records)} records")
 
 
with DAG(
    dag_id="classic_etl",
    start_date=datetime(2024, 1, 1),
    schedule="@daily",
    catchup=False,
    default_args={"retries": 2, "retry_delay": timedelta(minutes=5)},
) as dag:
 
    extract = PythonOperator(task_id="extract", python_callable=extract_fn)
    transform = PythonOperator(task_id="transform", python_callable=transform_fn)
    load = PythonOperator(task_id="load", python_callable=load_fn)
 
    extract >> transform >> load  # Set dependencies with >>

The >> operator sets task dependencies (left task must complete before right). You can also use << or set_downstream() / set_upstream().

Viewing Your DAG

  1. Open the Airflow UI at http://localhost:8080
  2. Toggle the DAG on (the toggle on the left of the DAG list)
  3. Click into the DAG to see the Graph View
  4. Trigger a manual run with the ▶ button
  5. Watch tasks turn green as they succeed

Common Beginner Mistakes

Using mutable default arguments

# ❌ Wrong — default_args dict is shared and mutated
DEFAULT = {"retries": 3}
 
@dag(default_args=DEFAULT)
def my_dag(): ...
 
# ✅ Right — new dict each time
@dag(default_args={"retries": 3})
def my_dag(): ...

Importing at the module level

# ❌ Wrong — this runs on EVERY scheduler heartbeat
import pandas as pd  # Expensive import at parse time
df = pd.read_csv("s3://bucket/large-file.csv")  # Absolute disaster
 
@dag(...)
def my_dag(): ...
# ✅ Right — import inside the task function
@task()
def process():
    import pandas as pd  # Only imports when task runs
    df = pd.read_csv("s3://bucket/large-file.csv")

Storing large data in XCom

XCom stores data in the Airflow metadata database. It's designed for small values (IDs, config, metadata) — not for DataFrames or large payloads.

# ❌ Wrong — writing a 1GB DataFrame to the metadata DB
@task()
def process() -> pd.DataFrame:
    return huge_dataframe  # Will crash your metadata DB
 
# ✅ Right — pass a path or reference
@task()
def process() -> str:
    huge_dataframe.to_parquet("s3://bucket/output/data.parquet")
    return "s3://bucket/output/data.parquet"  # Just the path

Next Steps

Now that your first DAG is running, here's where to go next:

Happy pipelining! 🚀

Was this helpful?

Found this helpful? Share it with your team.

Share:

Comments

Prashant Singh

Prashant Singh

Senior Data Engineer

Senior Data Engineer with expertise in Apache Airflow, data orchestration, and building scalable data pipelines. Passionate about sharing knowledge and best practices with the data engineering community.