Dynamic Task Mapping, introduced in Airflow 2.3, solves one of the most common data engineering problems: you don't know how many tasks you need until runtime.
Before Dynamic Task Mapping, you had to either hardcode task counts, use SubDAGs (fragile and deprecated), or use TaskGroup with ugly workarounds. Now, it's clean, Pythonic, and powerful.
The Core Concept
Instead of defining N tasks at DAG parse time, you define one task template and tell Airflow to expand it over a list at runtime.
# Old way: hardcoded, brittle
process_table_1 = PythonOperator(task_id="process_users", ...)
process_table_2 = PythonOperator(task_id="process_orders", ...)
process_table_3 = PythonOperator(task_id="process_products", ...)
# New way: runtime-determined, elegant
@task()
def process_table(table: str) -> int:
return load_table(table)
tables = ["users", "orders", "products"]
process_table.expand(table=tables)Basic Usage: .expand()
from datetime import datetime
from airflow.decorators import dag, task
@dag(
dag_id="dynamic_mapping_intro",
schedule="@daily",
start_date=datetime(2024, 1, 1),
catchup=False,
)
def dynamic_mapping_intro():
@task()
def get_tables() -> list[str]:
"""Return the list of tables to process. Can query a DB at runtime."""
return ["users", "orders", "products", "events", "sessions"]
@task()
def process_table(table: str) -> int:
"""Process a single table. Called once per item in the list."""
print(f"Processing {table}...")
rows = load_and_transform(table)
return rows
@task()
def summarize(row_counts: list[int]) -> None:
"""Automatically receives the aggregated results from all mapped tasks."""
total = sum(row_counts)
print(f"Processed {total:,} rows across {len(row_counts)} tables")
tables = get_tables()
# Airflow creates one task instance per table
counts = process_table.expand(table=tables)
# summarize() receives a list of all results
summarize(counts)
def load_and_transform(table: str) -> int:
import random
return random.randint(1000, 100000)
dynamic_mapping_intro()In the Airflow UI, you'll see process_table with a [0], [1], [2]... index notation for each mapped instance.
.partial() + .expand() for Mixed Arguments
Use .partial() for arguments that are the same for all instances, and .expand() for the argument that varies:
from datetime import datetime
from airflow.decorators import dag, task
@dag(schedule="@daily", start_date=datetime(2024, 1, 1))
def partial_expand_demo():
@task()
def get_tables() -> list[str]:
return ["users", "orders", "payments"]
@task()
def sync_to_warehouse(
table: str,
target_schema: str,
batch_size: int,
) -> dict:
print(f"Syncing {table} → {target_schema} (batch: {batch_size})")
return {"table": table, "rows": 5000, "schema": target_schema}
tables = get_tables()
# target_schema and batch_size are fixed for all mapped instances
# Only 'table' varies
results = sync_to_warehouse.partial(
target_schema="analytics",
batch_size=5000,
).expand(table=tables)
partial_expand_demo().expand_kwargs() for Multiple Varying Arguments
When multiple arguments vary together (row-by-row), use .expand_kwargs():
from datetime import datetime
from airflow.decorators import dag, task
@dag(schedule="@daily", start_date=datetime(2024, 1, 1))
def expand_kwargs_demo():
@task()
def get_pipeline_configs() -> list[dict]:
"""Each config varies both source and destination."""
return [
{"source": "mysql://db1/orders", "dest": "bq://project/staging.orders"},
{"source": "postgres://db2/users", "dest": "bq://project/staging.users"},
{"source": "mongo://db3/events", "dest": "bq://project/staging.events"},
]
@task()
def run_pipeline(source: str, dest: str) -> str:
print(f"Running pipeline: {source} → {dest}")
return f"Success: {dest}"
configs = get_pipeline_configs()
# Each dict in the list maps to the function's keyword arguments
results = run_pipeline.expand_kwargs(configs)
expand_kwargs_demo()Real-World Pattern: Parallel API Ingestion
A common production use case — fetch data from multiple API endpoints in parallel:
from __future__ import annotations
from datetime import datetime, timedelta
from airflow.decorators import dag, task
import requests
@dag(
dag_id="parallel_api_ingestion",
schedule="@hourly",
start_date=datetime(2024, 1, 1),
catchup=False,
default_args={
"retries": 3,
"retry_delay": timedelta(minutes=2),
"retry_exponential_backoff": True,
},
)
def parallel_api_ingestion():
@task()
def get_endpoints(**context) -> list[dict]:
"""
Build the list of API endpoints to fetch.
Could read from a config file, DB, or environment.
"""
ds = context["ds"]
return [
{
"endpoint": "/api/v2/orders",
"params": {"date": ds, "page_size": 1000},
"output_key": "orders",
},
{
"endpoint": "/api/v2/customers",
"params": {"updated_since": ds},
"output_key": "customers",
},
{
"endpoint": "/api/v2/products",
"params": {},
"output_key": "products",
},
]
@task()
def fetch_from_api(endpoint: str, params: dict, output_key: str) -> dict:
"""Fetch data from a single API endpoint."""
base_url = "https://api.example.com"
response = requests.get(
f"{base_url}{endpoint}",
params=params,
headers={"Authorization": "Bearer {{ var.value.api_token }}"},
timeout=30,
)
response.raise_for_status()
data = response.json()
records = data.get("results", [])
print(f"Fetched {len(records)} records from {endpoint}")
return {
"key": output_key,
"count": len(records),
"records": records,
}
@task()
def consolidate(fetch_results: list[dict]) -> None:
"""Receive all fetch results and write to S3."""
import json
import boto3
s3 = boto3.client("s3")
for result in fetch_results:
s3.put_object(
Bucket="my-data-lake",
Key=f"raw/{result['key']}/latest.json",
Body=json.dumps(result["records"]),
)
print(f"Saved {result['count']} {result['key']} records to S3")
endpoints = get_endpoints()
# All API calls run in parallel — one task per endpoint
fetch_results = fetch_from_api.expand_kwargs(endpoints)
consolidate(fetch_results)
parallel_api_ingestion()Controlling Concurrency
Too many parallel tasks can overwhelm downstream systems. Use these controls:
# ① Limit via max_active_tis_per_dag on the task
@task(max_active_tis_per_dag=10) # Max 10 concurrent mapped instances
def process_table(table: str) -> None: ...
# ② Limit via Airflow Pools
@task(pool="database_pool", pool_slots=2)
def write_to_db(data: dict) -> None: ...
# In Airflow UI: Admin → Pools → Add pool "database_pool" with 5 slots
# Max 5/2 = 2 concurrent DB-writing tasksAccessing the Mapped Index
Each mapped task instance knows its own index via {{ task_instance.map_index }}:
@task()
def process_item(item: dict, **context) -> dict:
map_index = context["task_instance"].map_index
total = context["task_instance"].max_map_index + 1 # Airflow 2.9+
print(f"Processing item {map_index + 1} of {total}: {item}")
return itemLimitations to Know
-
XCom size: Mapped results are stored in XCom. Each individual result should be small (a path or count, not a DataFrame).
-
Map index visibility: The UI shows indexed task instances (
[0],[1], ...). Task IDs in alerts will include the index. -
Dependencies: You can chain mapped tasks. The downstream receives a
listof all upstream results automatically. -
No branching inside mapped tasks: You can't use
BranchPythonOperatoron a mapped task's output. Structure your branching before the mapping.
Dynamic Task Mapping is one of the most powerful features in modern Airflow. Once you start using it, you'll find it replaces a huge category of "how do I process N things in parallel" problems cleanly and idiomatically.
Keep exploring: