Back to Blog
🔷DAG Patternsintermediate

Best Practices for Task Retries in Apache Airflow

Learn how to configure robust retry strategies in Airflow using retries, retry_delay, exponential backoff, and on_failure_callback. Includes real-world patterns for handling flaky APIs, database timeouts, and more.

Neha Agarwal

Neha Agarwal

Data Platform Engineer

5 min read
Share:

In production data pipelines, failures are inevitable. Networks blip. APIs rate-limit. Databases time out. How you handle these failures is what separates a brittle pipeline from a reliable one.

This guide covers Airflow's retry system in depth — not just retries=3, but the full picture including exponential backoff, task-level vs DAG-level configuration, and alerting strategies.

The Full Retry Configuration

dags/retry_best_practices.py
from datetime import datetime, timedelta
from airflow.decorators import dag, task
from airflow.utils.email import send_email
 
 
def on_failure_alert(context):
    """Send a Slack message (or email) on task failure."""
    dag_id = context["dag"].dag_id
    task_id = context["task_instance"].task_id
    run_id = context["run_id"]
    log_url = context["task_instance"].log_url
 
    print(f"ALERT: {dag_id}.{task_id} failed in run {run_id}")
    print(f"Logs: {log_url}")
    # In production: call your Slack webhook or PagerDuty here
 
 
def on_retry_alert(context):
    """Log retries for observability (avoid alert fatigue on retries)."""
    ti = context["task_instance"]
    print(
        f"Retrying {ti.task_id} "
        f"(attempt {ti.try_number} of {ti.max_tries + 1})"
    )
 
 
@dag(
    dag_id="retry_patterns",
    start_date=datetime(2024, 1, 1),
    schedule="@daily",
    catchup=False,
    # DAG-level defaults — inherited by all tasks
    default_args={
        "owner": "data-team",
        "retries": 3,
        "retry_delay": timedelta(minutes=5),
        "retry_exponential_backoff": True,   # 5m → 10m → 20m
        "max_retry_delay": timedelta(hours=1),
        "on_failure_callback": on_failure_alert,
        "on_retry_callback": on_retry_alert,
        "execution_timeout": timedelta(hours=2),
    },
)
def retry_patterns():
 
    @task(
        # Override at task level for tasks needing different behavior
        retries=5,
        retry_delay=timedelta(seconds=30),
        retry_exponential_backoff=True,
    )
    def call_flaky_api() -> dict:
        """An external API that sometimes rate-limits or 503s."""
        import requests
 
        response = requests.get(
            "https://api.example.com/data",
            timeout=30,
        )
        response.raise_for_status()  # Raises on 4xx/5xx → triggers retry
        return response.json()
 
    @task(
        # DB operations: fewer retries, longer delay
        retries=2,
        retry_delay=timedelta(minutes=10),
        retry_exponential_backoff=False,
        execution_timeout=timedelta(minutes=30),
    )
    def write_to_warehouse(data: dict) -> None:
        """Load to DW — longer delay lets DB recover from locks."""
        print(f"Writing {len(data)} records to warehouse")
 
    @task(
        # Critical task: immediately alert, no retries
        retries=0,
        on_failure_callback=on_failure_alert,
    )
    def send_business_report() -> None:
        """Time-sensitive report — no retries, immediate alert."""
        print("Sending report...")
 
    data = call_flaky_api()
    write_to_warehouse(data)
    send_business_report()
 
 
retry_patterns()

Understanding Exponential Backoff

With retry_exponential_backoff=True and retry_delay=timedelta(minutes=5):

AttemptWait Before Retry
1st retry~5 minutes
2nd retry~10 minutes
3rd retry~20 minutes
4th retry~40 minutes (capped by max_retry_delay)

Airflow uses jitter in the backoff calculation to prevent thundering herd issues when many tasks retry simultaneously.

Set max_retry_delay — without it, exponential backoff can lead to multi-day waits for the last retry attempt.

Retry Granularity: When to Override at Task Level

default_args = {
    "retries": 3,              # Sensible default for most tasks
    "retry_delay": timedelta(minutes=5),
}
 
# Short-lived tasks that can retry quickly
@task(retries=5, retry_delay=timedelta(seconds=10))
def poll_job_status(): ...
 
# Tasks touching external paid APIs — retry less aggressively
@task(retries=2, retry_delay=timedelta(minutes=30))
def call_expensive_api(): ...
 
# Idempotency-unsafe tasks — no automatic retries
@task(retries=0)
def send_notification_email(): ...

Making Tasks Safe to Retry

The most important retry principle: tasks must be idempotent. Running a task twice with the same inputs must produce the same result.

Idempotent database writes

@task()
def upsert_records(records: list[dict]) -> None:
    from airflow.providers.postgres.hooks.postgres import PostgresHook
 
    hook = PostgresHook(postgres_conn_id="warehouse")
 
    # ✅ UPSERT — safe to run multiple times
    hook.run("""
        INSERT INTO orders (id, amount, status)
        VALUES %(id)s, %(amount)s, %(status)s
        ON CONFLICT (id) DO UPDATE
            SET amount = EXCLUDED.amount,
                status = EXCLUDED.status,
                updated_at = NOW()
    """, parameters=records)
 
    # ❌ INSERT — creates duplicates on retry
    # hook.run("INSERT INTO orders VALUES ...", parameters=records)

Idempotent file writes

@task()
def write_output(data: list[dict], **context) -> str:
    import json
    from airflow.providers.amazon.aws.hooks.s3 import S3Hook
 
    # Use logical_date in the path — same date always writes same path
    logical_date = context["logical_date"].strftime("%Y/%m/%d")
    s3_path = f"s3://bucket/output/{logical_date}/data.json"
 
    hook = S3Hook(aws_conn_id="aws_default")
    hook.load_string(
        json.dumps(data),
        key=s3_path,
        replace=True,  # ✅ Overwrite on retry
    )
    return s3_path

Handling Specific Exception Types

Sometimes you only want to retry on certain errors:

import requests
from airflow.exceptions import AirflowException
 
 
@task(retries=3)
def smart_api_call() -> dict:
    try:
        response = requests.get("https://api.example.com/data", timeout=30)
        response.raise_for_status()
        return response.json()
 
    except requests.exceptions.Timeout:
        # Network timeout — definitely retry
        raise
 
    except requests.exceptions.HTTPError as e:
        if e.response.status_code == 429:
            # Rate limited — retry
            raise
        elif e.response.status_code == 404:
            # Resource not found — don't retry, raise a non-retryable error
            raise AirflowException(f"Resource not found: {e}") from e
        else:
            raise

SLA Misses — Different From Failures

An SLA miss is when a task doesn't complete within a defined time window. It's separate from failure and doesn't trigger retries.

from airflow.models import DAG
from airflow.utils.email import send_email
 
def sla_miss_callback(dag, task_list, blocking_task_list, slas, blocking_tis):
    """Called when a task misses its SLA."""
    print(f"SLA missed for tasks: {task_list}")
    # Alert your on-call team
 
@dag(
    dag_id="sla_example",
    schedule="@daily",
    start_date=datetime(2024, 1, 1),
    sla_miss_callback=sla_miss_callback,
)
def sla_pipeline():
 
    @task(
        sla=timedelta(hours=2),   # This task must complete within 2h of DAG start
    )
    def critical_transform(): ...
 
    critical_transform()
 
sla_pipeline()

Observability: Tracking Retry Rates

Use the Airflow metrics integration to track retry rates in Prometheus/Grafana:

# airflow.cfg
[metrics]
statsd_on = True
statsd_host = localhost
statsd_port = 8125
statsd_prefix = airflow

Key metrics to monitor:

  • airflow.task_instance.duration — task duration by state
  • airflow.ti.retries — retry count per task
  • airflow.dagrun.duration.failed — failed DAG run durations

A spike in airflow.ti.retries for a specific task is often the first sign of a degraded downstream system.

Quick Reference

ScenarioRecommended Config
External API callsretries=3, retry_exponential_backoff=True, retry_delay=30s
Database operationsretries=2, retry_delay=10m, retry_exponential_backoff=False
File I/Oretries=3, retry_delay=1m
Notifications/emailsretries=0 (not idempotent)
Critical business tasksretries=0, immediate on_failure_callback

Related reading:

Was this helpful?

Found this helpful? Share it with your team.

Share:

Comments

Neha Agarwal

Neha Agarwal

Data Platform Engineer

Data Platform Engineer specializing in workflow orchestration, data architecture, and cloud infrastructure. Focused on helping teams build reliable and maintainable data platforms.