Back to Blog

Orchestrating dbt with Apache Airflow: A Complete Integration Guide

Learn how to integrate Apache Airflow with dbt (data build tool) using the dbt Cloud provider and Cosmos. Covers scheduling dbt runs, handling failures, passing artifacts between runs, and best practices for the dbt + Airflow stack.

Neha Agarwal

Neha Agarwal

Data Platform Engineer

6 min read
Share:

The Airflow + dbt combination has become the de-facto standard for data teams building modern data pipelines. Airflow handles ingestion, orchestration, and cross-system coordination. dbt handles SQL-based transformation inside your warehouse. Together, they cover the full ELT lifecycle.

This guide covers three integration approaches — from simple to powerful — so you can pick the right fit for your team.

The Three Integration Patterns

ApproachBest ForComplexity
dbt Cloud providerTeams already on dbt CloudLow
dbt CLI via BashOperatorQuick setup, simple pipelinesLow
Astronomer CosmosFull DAG-level visibility & controlMedium

Approach 1: dbt Cloud Provider

If your team uses dbt Cloud, the apache-airflow-providers-dbt-cloud package gives you first-class integration.

pip install apache-airflow-providers-dbt-cloud
dags/dbt_cloud_dag.py
from datetime import datetime, timedelta
from airflow.decorators import dag
from airflow.providers.dbt.cloud.operators.dbt import (
    DbtCloudRunJobOperator,
    DbtCloudGetJobRunArtifactOperator,
)
from airflow.providers.dbt.cloud.sensors.dbt import DbtCloudJobRunSensor
 
 
@dag(
    dag_id="dbt_cloud_pipeline",
    schedule="@daily",
    start_date=datetime(2024, 1, 1),
    catchup=False,
    default_args={
        "retries": 1,
        "retry_delay": timedelta(minutes=5),
        "dbt_cloud_conn_id": "dbt_cloud_default",
    },
)
def dbt_cloud_pipeline():
    """
    Trigger a dbt Cloud job and wait for completion.
    Uses async deferrable operator to free up the worker slot.
    """
 
    # Trigger the dbt Cloud job
    trigger_job = DbtCloudRunJobOperator(
        task_id="trigger_dbt_job",
        job_id=12345,             # Your dbt Cloud job ID
        check_interval=10,        # Poll every 10s
        timeout=3600,             # Fail after 1 hour
        # Optionally override job settings:
        # additional_run_config={"threads_override": 8},
        deferrable=True,          # Uses Triggerer — non-blocking
    )
 
    # Download the manifest for lineage tracking
    get_manifest = DbtCloudGetJobRunArtifactOperator(
        task_id="get_manifest",
        run_id=trigger_job.output,
        path="manifest.json",
        output_file_name="/tmp/dbt_manifest.json",
    )
 
    trigger_job >> get_manifest
 
 
dbt_cloud_pipeline()

Set up the connection:

# In Airflow UI: Admin → Connections → Add
# Or via environment variable:
export AIRFLOW_CONN_DBT_CLOUD_DEFAULT='dbt-cloud://:your-api-token@/your-account-id'

Approach 2: dbt CLI via BashOperator

For teams using dbt Core (self-hosted), the simplest integration is running dbt commands directly.

dags/dbt_bash_dag.py
import os
from datetime import datetime, timedelta
from airflow.decorators import dag, task
from airflow.operators.bash import BashOperator
 
DBT_DIR = "/opt/airflow/dbt"       # Path to your dbt project
DBT_PROFILES_DIR = "/opt/airflow/.dbt"  # Profile dir with credentials
 
DBT_CMD = f"cd {DBT_DIR} && dbt"
 
 
@dag(
    dag_id="dbt_bash_pipeline",
    schedule="@daily",
    start_date=datetime(2024, 1, 1),
    catchup=False,
    default_args={"retries": 1},
)
def dbt_bash_pipeline():
 
    dbt_deps = BashOperator(
        task_id="dbt_deps",
        bash_command=f"{DBT_CMD} deps --profiles-dir {DBT_PROFILES_DIR}",
    )
 
    dbt_run = BashOperator(
        task_id="dbt_run",
        bash_command=(
            f"{DBT_CMD} run "
            f"--profiles-dir {DBT_PROFILES_DIR} "
            f"--target prod "
            f"--vars '{{\"execution_date\": \"{{{{ ds }}}}\"}}'  "
        ),
    )
 
    dbt_test = BashOperator(
        task_id="dbt_test",
        bash_command=(
            f"{DBT_CMD} test "
            f"--profiles-dir {DBT_PROFILES_DIR} "
            f"--target prod "
        ),
    )
 
    # Only run tests after models succeed
    dbt_deps >> dbt_run >> dbt_test
 
 
dbt_bash_pipeline()

Limitation: The BashOperator approach gives you a single "dbt run" task. All models run inside it, so if one model fails, you can't retry just that model from Airflow. This is where Cosmos shines.

Cosmos converts your dbt project into a native Airflow DAG — each dbt model becomes an individual Airflow task. This gives you full granularity, retry control, and task-level observability.

pip install astronomer-cosmos[dbt-postgres]
# Or for BigQuery:
pip install astronomer-cosmos[dbt-bigquery]
dags/dbt_cosmos_dag.py
from datetime import datetime
from pathlib import Path
from cosmos import DbtDag, ProjectConfig, ProfileConfig, ExecutionConfig
from cosmos.profiles import PostgresUserPasswordProfileMapping
 
 
profile_config = ProfileConfig(
    profile_name="jaffle_shop",
    target_name="prod",
    profile_mapping=PostgresUserPasswordProfileMapping(
        conn_id="postgres_warehouse",
        profile_args={"schema": "analytics"},
    ),
)
 
dbt_dag = DbtDag(
    dag_id="dbt_cosmos_full",
    project_config=ProjectConfig(
        dbt_project_path=Path("/opt/airflow/dbt/jaffle_shop"),
    ),
    profile_config=profile_config,
    execution_config=ExecutionConfig(
        dbt_executable_path="/usr/local/bin/dbt",
    ),
    schedule="@daily",
    start_date=datetime(2024, 1, 1),
    catchup=False,
    # Render the full dbt DAG as a TaskGroup
    # operator_args={"install_deps": True},
)

This generates a DAG that looks like your dbt lineage graph, with each model as a task!

Selective Model Runs with Cosmos

dags/dbt_cosmos_selective.py
from cosmos import DbtDag, ProjectConfig, ProfileConfig, RenderConfig
from cosmos.constants import LoadMode, TestBehavior
 
dbt_dag = DbtDag(
    dag_id="dbt_staging_only",
    project_config=ProjectConfig(
        dbt_project_path="/opt/airflow/dbt/jaffle_shop",
    ),
    render_config=RenderConfig(
        # Only render staging models
        select=["path:models/staging"],
        # Exclude specific models
        exclude=["tag:deprecated"],
        # Run tests after every model (not just at the end)
        test_behavior=TestBehavior.AFTER_EACH,
        load_method=LoadMode.DBT_LS,
    ),
    profile_config=profile_config,
    schedule="@hourly",
    start_date=datetime(2024, 1, 1),
)

Passing Context from Airflow to dbt

A common need: pass the Airflow execution_date to dbt as a variable.

dags/dbt_with_context.py
from airflow.decorators import dag, task
from airflow.operators.bash import BashOperator
 
 
@dag(schedule="@daily", start_date=datetime(2024, 1, 1))
def dbt_with_airflow_context():
 
    @task()
    def get_date_range(**context) -> dict:
        ds = context["ds"]          # "2024-03-15"
        ds_nodash = context["ds_nodash"]  # "20240315"
        return {"start_date": ds, "partition": ds_nodash}
 
    @task()
    def run_dbt(config: dict) -> None:
        import subprocess
        result = subprocess.run(
            [
                "dbt", "run",
                "--vars", f'{{"start_date": "{config["start_date"]}", "partition": "{config["partition"]}"}}',
                "--select", "tag:incremental",
            ],
            capture_output=True, text=True, check=True,
            cwd="/opt/airflow/dbt/jaffle_shop",
        )
        print(result.stdout)
 
    config = get_date_range()
    run_dbt(config)
 
 
dbt_with_airflow_context()

In your dbt model, reference the variable:

-- models/staging/stg_orders.sql
SELECT *
FROM {{ source('raw', 'orders') }}
{% if var('start_date', none) %}
WHERE DATE(created_at) = '{{ var("start_date") }}'
{% endif %}

Handling dbt Test Failures

By default, dbt test failures return a non-zero exit code, which fails the Airflow task. You might want softer failure handling:

@task()
def run_dbt_tests(**context) -> dict:
    import subprocess
 
    result = subprocess.run(
        ["dbt", "test", "--store-failures"],
        capture_output=True, text=True,
        cwd="/opt/airflow/dbt/jaffle_shop",
    )
 
    if result.returncode != 0:
        # Parse output to get failure details
        failures = [
            line for line in result.stdout.splitlines()
            if "FAIL" in line or "ERROR" in line
        ]
        # Send to Slack instead of failing the task
        for failure in failures:
            print(f"āš ļø dbt test failure: {failure}")
 
        # Optionally: raise only for critical test failures
        critical = [f for f in failures if "critical" in f.lower()]
        if critical:
            raise RuntimeError(f"Critical dbt test failures: {critical}")
 
    return {"exit_code": result.returncode, "passed": result.returncode == 0}
airflow-project/
ā”œā”€ā”€ dags/
│   ā”œā”€ā”€ ingestion/          # Source data loading DAGs
│   │   ā”œā”€ā”€ load_orders.py
│   │   └── load_customers.py
│   └── transformation/     # dbt-powered DAGs
│       ā”œā”€ā”€ dbt_staging.py  # Hourly staging models
│       └── dbt_marts.py    # Daily mart models (depends on staging)
ā”œā”€ā”€ dbt/
│   └── jaffle_shop/        # Your dbt project
│       ā”œā”€ā”€ dbt_project.yml
│       ā”œā”€ā”€ models/
│       └── tests/
└── docker/
    └── Dockerfile          # Custom image with dbt pre-installed

The Cosmos approach is the gold standard for Airflow + dbt in 2025. You get full lineage visibility, per-model retries, and all of dbt's built-in testing — without sacrificing Airflow's orchestration power.

Related guides:

Was this helpful?

Found this helpful? Share it with your team.

Share:

Comments

Neha Agarwal

Neha Agarwal

Data Platform Engineer

Data Platform Engineer specializing in workflow orchestration, data architecture, and cloud infrastructure. Focused on helping teams build reliable and maintainable data platforms.