The Airflow + dbt combination has become the de-facto standard for data teams building modern data pipelines. Airflow handles ingestion, orchestration, and cross-system coordination. dbt handles SQL-based transformation inside your warehouse. Together, they cover the full ELT lifecycle.
This guide covers three integration approaches ā from simple to powerful ā so you can pick the right fit for your team.
The Three Integration Patterns
| Approach | Best For | Complexity |
|---|---|---|
| dbt Cloud provider | Teams already on dbt Cloud | Low |
| dbt CLI via BashOperator | Quick setup, simple pipelines | Low |
| Astronomer Cosmos | Full DAG-level visibility & control | Medium |
Approach 1: dbt Cloud Provider
If your team uses dbt Cloud, the apache-airflow-providers-dbt-cloud package gives you first-class integration.
pip install apache-airflow-providers-dbt-cloudfrom datetime import datetime, timedelta
from airflow.decorators import dag
from airflow.providers.dbt.cloud.operators.dbt import (
DbtCloudRunJobOperator,
DbtCloudGetJobRunArtifactOperator,
)
from airflow.providers.dbt.cloud.sensors.dbt import DbtCloudJobRunSensor
@dag(
dag_id="dbt_cloud_pipeline",
schedule="@daily",
start_date=datetime(2024, 1, 1),
catchup=False,
default_args={
"retries": 1,
"retry_delay": timedelta(minutes=5),
"dbt_cloud_conn_id": "dbt_cloud_default",
},
)
def dbt_cloud_pipeline():
"""
Trigger a dbt Cloud job and wait for completion.
Uses async deferrable operator to free up the worker slot.
"""
# Trigger the dbt Cloud job
trigger_job = DbtCloudRunJobOperator(
task_id="trigger_dbt_job",
job_id=12345, # Your dbt Cloud job ID
check_interval=10, # Poll every 10s
timeout=3600, # Fail after 1 hour
# Optionally override job settings:
# additional_run_config={"threads_override": 8},
deferrable=True, # Uses Triggerer ā non-blocking
)
# Download the manifest for lineage tracking
get_manifest = DbtCloudGetJobRunArtifactOperator(
task_id="get_manifest",
run_id=trigger_job.output,
path="manifest.json",
output_file_name="/tmp/dbt_manifest.json",
)
trigger_job >> get_manifest
dbt_cloud_pipeline()Set up the connection:
# In Airflow UI: Admin ā Connections ā Add
# Or via environment variable:
export AIRFLOW_CONN_DBT_CLOUD_DEFAULT='dbt-cloud://:your-api-token@/your-account-id'Approach 2: dbt CLI via BashOperator
For teams using dbt Core (self-hosted), the simplest integration is running dbt commands directly.
import os
from datetime import datetime, timedelta
from airflow.decorators import dag, task
from airflow.operators.bash import BashOperator
DBT_DIR = "/opt/airflow/dbt" # Path to your dbt project
DBT_PROFILES_DIR = "/opt/airflow/.dbt" # Profile dir with credentials
DBT_CMD = f"cd {DBT_DIR} && dbt"
@dag(
dag_id="dbt_bash_pipeline",
schedule="@daily",
start_date=datetime(2024, 1, 1),
catchup=False,
default_args={"retries": 1},
)
def dbt_bash_pipeline():
dbt_deps = BashOperator(
task_id="dbt_deps",
bash_command=f"{DBT_CMD} deps --profiles-dir {DBT_PROFILES_DIR}",
)
dbt_run = BashOperator(
task_id="dbt_run",
bash_command=(
f"{DBT_CMD} run "
f"--profiles-dir {DBT_PROFILES_DIR} "
f"--target prod "
f"--vars '{{\"execution_date\": \"{{{{ ds }}}}\"}}' "
),
)
dbt_test = BashOperator(
task_id="dbt_test",
bash_command=(
f"{DBT_CMD} test "
f"--profiles-dir {DBT_PROFILES_DIR} "
f"--target prod "
),
)
# Only run tests after models succeed
dbt_deps >> dbt_run >> dbt_test
dbt_bash_pipeline()Limitation: The BashOperator approach gives you a single "dbt run" task. All models run inside it, so if one model fails, you can't retry just that model from Airflow. This is where Cosmos shines.
Approach 3: Astronomer Cosmos (Recommended)
Cosmos converts your dbt project into a native Airflow DAG ā each dbt model becomes an individual Airflow task. This gives you full granularity, retry control, and task-level observability.
pip install astronomer-cosmos[dbt-postgres]
# Or for BigQuery:
pip install astronomer-cosmos[dbt-bigquery]from datetime import datetime
from pathlib import Path
from cosmos import DbtDag, ProjectConfig, ProfileConfig, ExecutionConfig
from cosmos.profiles import PostgresUserPasswordProfileMapping
profile_config = ProfileConfig(
profile_name="jaffle_shop",
target_name="prod",
profile_mapping=PostgresUserPasswordProfileMapping(
conn_id="postgres_warehouse",
profile_args={"schema": "analytics"},
),
)
dbt_dag = DbtDag(
dag_id="dbt_cosmos_full",
project_config=ProjectConfig(
dbt_project_path=Path("/opt/airflow/dbt/jaffle_shop"),
),
profile_config=profile_config,
execution_config=ExecutionConfig(
dbt_executable_path="/usr/local/bin/dbt",
),
schedule="@daily",
start_date=datetime(2024, 1, 1),
catchup=False,
# Render the full dbt DAG as a TaskGroup
# operator_args={"install_deps": True},
)This generates a DAG that looks like your dbt lineage graph, with each model as a task!
Selective Model Runs with Cosmos
from cosmos import DbtDag, ProjectConfig, ProfileConfig, RenderConfig
from cosmos.constants import LoadMode, TestBehavior
dbt_dag = DbtDag(
dag_id="dbt_staging_only",
project_config=ProjectConfig(
dbt_project_path="/opt/airflow/dbt/jaffle_shop",
),
render_config=RenderConfig(
# Only render staging models
select=["path:models/staging"],
# Exclude specific models
exclude=["tag:deprecated"],
# Run tests after every model (not just at the end)
test_behavior=TestBehavior.AFTER_EACH,
load_method=LoadMode.DBT_LS,
),
profile_config=profile_config,
schedule="@hourly",
start_date=datetime(2024, 1, 1),
)Passing Context from Airflow to dbt
A common need: pass the Airflow execution_date to dbt as a variable.
from airflow.decorators import dag, task
from airflow.operators.bash import BashOperator
@dag(schedule="@daily", start_date=datetime(2024, 1, 1))
def dbt_with_airflow_context():
@task()
def get_date_range(**context) -> dict:
ds = context["ds"] # "2024-03-15"
ds_nodash = context["ds_nodash"] # "20240315"
return {"start_date": ds, "partition": ds_nodash}
@task()
def run_dbt(config: dict) -> None:
import subprocess
result = subprocess.run(
[
"dbt", "run",
"--vars", f'{{"start_date": "{config["start_date"]}", "partition": "{config["partition"]}"}}',
"--select", "tag:incremental",
],
capture_output=True, text=True, check=True,
cwd="/opt/airflow/dbt/jaffle_shop",
)
print(result.stdout)
config = get_date_range()
run_dbt(config)
dbt_with_airflow_context()In your dbt model, reference the variable:
-- models/staging/stg_orders.sql
SELECT *
FROM {{ source('raw', 'orders') }}
{% if var('start_date', none) %}
WHERE DATE(created_at) = '{{ var("start_date") }}'
{% endif %}Handling dbt Test Failures
By default, dbt test failures return a non-zero exit code, which fails the Airflow task. You might want softer failure handling:
@task()
def run_dbt_tests(**context) -> dict:
import subprocess
result = subprocess.run(
["dbt", "test", "--store-failures"],
capture_output=True, text=True,
cwd="/opt/airflow/dbt/jaffle_shop",
)
if result.returncode != 0:
# Parse output to get failure details
failures = [
line for line in result.stdout.splitlines()
if "FAIL" in line or "ERROR" in line
]
# Send to Slack instead of failing the task
for failure in failures:
print(f"ā ļø dbt test failure: {failure}")
# Optionally: raise only for critical test failures
critical = [f for f in failures if "critical" in f.lower()]
if critical:
raise RuntimeError(f"Critical dbt test failures: {critical}")
return {"exit_code": result.returncode, "passed": result.returncode == 0}Recommended Project Structure
airflow-project/
āāā dags/
ā āāā ingestion/ # Source data loading DAGs
ā ā āāā load_orders.py
ā ā āāā load_customers.py
ā āāā transformation/ # dbt-powered DAGs
ā āāā dbt_staging.py # Hourly staging models
ā āāā dbt_marts.py # Daily mart models (depends on staging)
āāā dbt/
ā āāā jaffle_shop/ # Your dbt project
ā āāā dbt_project.yml
ā āāā models/
ā āāā tests/
āāā docker/
āāā Dockerfile # Custom image with dbt pre-installed
The Cosmos approach is the gold standard for Airflow + dbt in 2025. You get full lineage visibility, per-model retries, and all of dbt's built-in testing ā without sacrificing Airflow's orchestration power.
Related guides: