In production data pipelines, failures are inevitable. Networks blip. APIs rate-limit. Databases time out. How you handle these failures is what separates a brittle pipeline from a reliable one.
This guide covers Airflow's retry system in depth — not just retries=3, but the full picture including exponential backoff, task-level vs DAG-level configuration, and alerting strategies.
The Full Retry Configuration
from datetime import datetime, timedelta
from airflow.decorators import dag, task
from airflow.utils.email import send_email
def on_failure_alert(context):
"""Send a Slack message (or email) on task failure."""
dag_id = context["dag"].dag_id
task_id = context["task_instance"].task_id
run_id = context["run_id"]
log_url = context["task_instance"].log_url
print(f"ALERT: {dag_id}.{task_id} failed in run {run_id}")
print(f"Logs: {log_url}")
# In production: call your Slack webhook or PagerDuty here
def on_retry_alert(context):
"""Log retries for observability (avoid alert fatigue on retries)."""
ti = context["task_instance"]
print(
f"Retrying {ti.task_id} "
f"(attempt {ti.try_number} of {ti.max_tries + 1})"
)
@dag(
dag_id="retry_patterns",
start_date=datetime(2024, 1, 1),
schedule="@daily",
catchup=False,
# DAG-level defaults — inherited by all tasks
default_args={
"owner": "data-team",
"retries": 3,
"retry_delay": timedelta(minutes=5),
"retry_exponential_backoff": True, # 5m → 10m → 20m
"max_retry_delay": timedelta(hours=1),
"on_failure_callback": on_failure_alert,
"on_retry_callback": on_retry_alert,
"execution_timeout": timedelta(hours=2),
},
)
def retry_patterns():
@task(
# Override at task level for tasks needing different behavior
retries=5,
retry_delay=timedelta(seconds=30),
retry_exponential_backoff=True,
)
def call_flaky_api() -> dict:
"""An external API that sometimes rate-limits or 503s."""
import requests
response = requests.get(
"https://api.example.com/data",
timeout=30,
)
response.raise_for_status() # Raises on 4xx/5xx → triggers retry
return response.json()
@task(
# DB operations: fewer retries, longer delay
retries=2,
retry_delay=timedelta(minutes=10),
retry_exponential_backoff=False,
execution_timeout=timedelta(minutes=30),
)
def write_to_warehouse(data: dict) -> None:
"""Load to DW — longer delay lets DB recover from locks."""
print(f"Writing {len(data)} records to warehouse")
@task(
# Critical task: immediately alert, no retries
retries=0,
on_failure_callback=on_failure_alert,
)
def send_business_report() -> None:
"""Time-sensitive report — no retries, immediate alert."""
print("Sending report...")
data = call_flaky_api()
write_to_warehouse(data)
send_business_report()
retry_patterns()Understanding Exponential Backoff
With retry_exponential_backoff=True and retry_delay=timedelta(minutes=5):
| Attempt | Wait Before Retry |
|---|---|
| 1st retry | ~5 minutes |
| 2nd retry | ~10 minutes |
| 3rd retry | ~20 minutes |
| 4th retry | ~40 minutes (capped by max_retry_delay) |
Airflow uses jitter in the backoff calculation to prevent thundering herd issues when many tasks retry simultaneously.
Set
max_retry_delay— without it, exponential backoff can lead to multi-day waits for the last retry attempt.
Retry Granularity: When to Override at Task Level
default_args = {
"retries": 3, # Sensible default for most tasks
"retry_delay": timedelta(minutes=5),
}
# Short-lived tasks that can retry quickly
@task(retries=5, retry_delay=timedelta(seconds=10))
def poll_job_status(): ...
# Tasks touching external paid APIs — retry less aggressively
@task(retries=2, retry_delay=timedelta(minutes=30))
def call_expensive_api(): ...
# Idempotency-unsafe tasks — no automatic retries
@task(retries=0)
def send_notification_email(): ...Making Tasks Safe to Retry
The most important retry principle: tasks must be idempotent. Running a task twice with the same inputs must produce the same result.
Idempotent database writes
@task()
def upsert_records(records: list[dict]) -> None:
from airflow.providers.postgres.hooks.postgres import PostgresHook
hook = PostgresHook(postgres_conn_id="warehouse")
# ✅ UPSERT — safe to run multiple times
hook.run("""
INSERT INTO orders (id, amount, status)
VALUES %(id)s, %(amount)s, %(status)s
ON CONFLICT (id) DO UPDATE
SET amount = EXCLUDED.amount,
status = EXCLUDED.status,
updated_at = NOW()
""", parameters=records)
# ❌ INSERT — creates duplicates on retry
# hook.run("INSERT INTO orders VALUES ...", parameters=records)Idempotent file writes
@task()
def write_output(data: list[dict], **context) -> str:
import json
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
# Use logical_date in the path — same date always writes same path
logical_date = context["logical_date"].strftime("%Y/%m/%d")
s3_path = f"s3://bucket/output/{logical_date}/data.json"
hook = S3Hook(aws_conn_id="aws_default")
hook.load_string(
json.dumps(data),
key=s3_path,
replace=True, # ✅ Overwrite on retry
)
return s3_pathHandling Specific Exception Types
Sometimes you only want to retry on certain errors:
import requests
from airflow.exceptions import AirflowException
@task(retries=3)
def smart_api_call() -> dict:
try:
response = requests.get("https://api.example.com/data", timeout=30)
response.raise_for_status()
return response.json()
except requests.exceptions.Timeout:
# Network timeout — definitely retry
raise
except requests.exceptions.HTTPError as e:
if e.response.status_code == 429:
# Rate limited — retry
raise
elif e.response.status_code == 404:
# Resource not found — don't retry, raise a non-retryable error
raise AirflowException(f"Resource not found: {e}") from e
else:
raiseSLA Misses — Different From Failures
An SLA miss is when a task doesn't complete within a defined time window. It's separate from failure and doesn't trigger retries.
from airflow.models import DAG
from airflow.utils.email import send_email
def sla_miss_callback(dag, task_list, blocking_task_list, slas, blocking_tis):
"""Called when a task misses its SLA."""
print(f"SLA missed for tasks: {task_list}")
# Alert your on-call team
@dag(
dag_id="sla_example",
schedule="@daily",
start_date=datetime(2024, 1, 1),
sla_miss_callback=sla_miss_callback,
)
def sla_pipeline():
@task(
sla=timedelta(hours=2), # This task must complete within 2h of DAG start
)
def critical_transform(): ...
critical_transform()
sla_pipeline()Observability: Tracking Retry Rates
Use the Airflow metrics integration to track retry rates in Prometheus/Grafana:
# airflow.cfg
[metrics]
statsd_on = True
statsd_host = localhost
statsd_port = 8125
statsd_prefix = airflowKey metrics to monitor:
airflow.task_instance.duration— task duration by stateairflow.ti.retries— retry count per taskairflow.dagrun.duration.failed— failed DAG run durations
A spike in airflow.ti.retries for a specific task is often the first sign of a degraded downstream system.
Quick Reference
| Scenario | Recommended Config |
|---|---|
| External API calls | retries=3, retry_exponential_backoff=True, retry_delay=30s |
| Database operations | retries=2, retry_delay=10m, retry_exponential_backoff=False |
| File I/O | retries=3, retry_delay=1m |
| Notifications/emails | retries=0 (not idempotent) |
| Critical business tasks | retries=0, immediate on_failure_callback |
Related reading: