Back to Blog
🔷DAG Patternsintermediate

Dynamic Task Mapping in Airflow 2.x: Scale Tasks at Runtime

Master Airflow's Dynamic Task Mapping feature to create a variable number of task instances at runtime. Covers .expand(), .partial(), expand_kwargs(), and real-world patterns for parallel data processing.

Prashant Singh

Prashant Singh

Senior Data Engineer

5 min read
Share:

Dynamic Task Mapping, introduced in Airflow 2.3, solves one of the most common data engineering problems: you don't know how many tasks you need until runtime.

Before Dynamic Task Mapping, you had to either hardcode task counts, use SubDAGs (fragile and deprecated), or use TaskGroup with ugly workarounds. Now, it's clean, Pythonic, and powerful.

The Core Concept

Instead of defining N tasks at DAG parse time, you define one task template and tell Airflow to expand it over a list at runtime.

# Old way: hardcoded, brittle
process_table_1 = PythonOperator(task_id="process_users", ...)
process_table_2 = PythonOperator(task_id="process_orders", ...)
process_table_3 = PythonOperator(task_id="process_products", ...)
 
# New way: runtime-determined, elegant
@task()
def process_table(table: str) -> int:
    return load_table(table)
 
tables = ["users", "orders", "products"]
process_table.expand(table=tables)

Basic Usage: .expand()

dags/dynamic_mapping_intro.py
from datetime import datetime
from airflow.decorators import dag, task
 
 
@dag(
    dag_id="dynamic_mapping_intro",
    schedule="@daily",
    start_date=datetime(2024, 1, 1),
    catchup=False,
)
def dynamic_mapping_intro():
 
    @task()
    def get_tables() -> list[str]:
        """Return the list of tables to process. Can query a DB at runtime."""
        return ["users", "orders", "products", "events", "sessions"]
 
    @task()
    def process_table(table: str) -> int:
        """Process a single table. Called once per item in the list."""
        print(f"Processing {table}...")
        rows = load_and_transform(table)
        return rows
 
    @task()
    def summarize(row_counts: list[int]) -> None:
        """Automatically receives the aggregated results from all mapped tasks."""
        total = sum(row_counts)
        print(f"Processed {total:,} rows across {len(row_counts)} tables")
 
    tables = get_tables()
 
    # Airflow creates one task instance per table
    counts = process_table.expand(table=tables)
 
    # summarize() receives a list of all results
    summarize(counts)
 
 
def load_and_transform(table: str) -> int:
    import random
    return random.randint(1000, 100000)
 
 
dynamic_mapping_intro()

In the Airflow UI, you'll see process_table with a [0], [1], [2]... index notation for each mapped instance.

.partial() + .expand() for Mixed Arguments

Use .partial() for arguments that are the same for all instances, and .expand() for the argument that varies:

dags/partial_expand.py
from datetime import datetime
from airflow.decorators import dag, task
 
 
@dag(schedule="@daily", start_date=datetime(2024, 1, 1))
def partial_expand_demo():
 
    @task()
    def get_tables() -> list[str]:
        return ["users", "orders", "payments"]
 
    @task()
    def sync_to_warehouse(
        table: str,
        target_schema: str,
        batch_size: int,
    ) -> dict:
        print(f"Syncing {table}{target_schema} (batch: {batch_size})")
        return {"table": table, "rows": 5000, "schema": target_schema}
 
    tables = get_tables()
 
    # target_schema and batch_size are fixed for all mapped instances
    # Only 'table' varies
    results = sync_to_warehouse.partial(
        target_schema="analytics",
        batch_size=5000,
    ).expand(table=tables)
 
 
partial_expand_demo()

.expand_kwargs() for Multiple Varying Arguments

When multiple arguments vary together (row-by-row), use .expand_kwargs():

dags/expand_kwargs.py
from datetime import datetime
from airflow.decorators import dag, task
 
 
@dag(schedule="@daily", start_date=datetime(2024, 1, 1))
def expand_kwargs_demo():
 
    @task()
    def get_pipeline_configs() -> list[dict]:
        """Each config varies both source and destination."""
        return [
            {"source": "mysql://db1/orders", "dest": "bq://project/staging.orders"},
            {"source": "postgres://db2/users", "dest": "bq://project/staging.users"},
            {"source": "mongo://db3/events", "dest": "bq://project/staging.events"},
        ]
 
    @task()
    def run_pipeline(source: str, dest: str) -> str:
        print(f"Running pipeline: {source}{dest}")
        return f"Success: {dest}"
 
    configs = get_pipeline_configs()
 
    # Each dict in the list maps to the function's keyword arguments
    results = run_pipeline.expand_kwargs(configs)
 
 
expand_kwargs_demo()

Real-World Pattern: Parallel API Ingestion

A common production use case — fetch data from multiple API endpoints in parallel:

dags/parallel_api_ingestion.py
from __future__ import annotations
 
from datetime import datetime, timedelta
from airflow.decorators import dag, task
import requests
 
 
@dag(
    dag_id="parallel_api_ingestion",
    schedule="@hourly",
    start_date=datetime(2024, 1, 1),
    catchup=False,
    default_args={
        "retries": 3,
        "retry_delay": timedelta(minutes=2),
        "retry_exponential_backoff": True,
    },
)
def parallel_api_ingestion():
 
    @task()
    def get_endpoints(**context) -> list[dict]:
        """
        Build the list of API endpoints to fetch.
        Could read from a config file, DB, or environment.
        """
        ds = context["ds"]
        return [
            {
                "endpoint": "/api/v2/orders",
                "params": {"date": ds, "page_size": 1000},
                "output_key": "orders",
            },
            {
                "endpoint": "/api/v2/customers",
                "params": {"updated_since": ds},
                "output_key": "customers",
            },
            {
                "endpoint": "/api/v2/products",
                "params": {},
                "output_key": "products",
            },
        ]
 
    @task()
    def fetch_from_api(endpoint: str, params: dict, output_key: str) -> dict:
        """Fetch data from a single API endpoint."""
        base_url = "https://api.example.com"
 
        response = requests.get(
            f"{base_url}{endpoint}",
            params=params,
            headers={"Authorization": "Bearer {{ var.value.api_token }}"},
            timeout=30,
        )
        response.raise_for_status()
 
        data = response.json()
        records = data.get("results", [])
 
        print(f"Fetched {len(records)} records from {endpoint}")
        return {
            "key": output_key,
            "count": len(records),
            "records": records,
        }
 
    @task()
    def consolidate(fetch_results: list[dict]) -> None:
        """Receive all fetch results and write to S3."""
        import json
        import boto3
 
        s3 = boto3.client("s3")
        for result in fetch_results:
            s3.put_object(
                Bucket="my-data-lake",
                Key=f"raw/{result['key']}/latest.json",
                Body=json.dumps(result["records"]),
            )
            print(f"Saved {result['count']} {result['key']} records to S3")
 
    endpoints = get_endpoints()
    # All API calls run in parallel — one task per endpoint
    fetch_results = fetch_from_api.expand_kwargs(endpoints)
    consolidate(fetch_results)
 
 
parallel_api_ingestion()

Controlling Concurrency

Too many parallel tasks can overwhelm downstream systems. Use these controls:

# ① Limit via max_active_tis_per_dag on the task
@task(max_active_tis_per_dag=10)  # Max 10 concurrent mapped instances
def process_table(table: str) -> None: ...
 
# ② Limit via Airflow Pools
@task(pool="database_pool", pool_slots=2)
def write_to_db(data: dict) -> None: ...
# In Airflow UI: Admin → Pools → Add pool "database_pool" with 5 slots
# Max 5/2 = 2 concurrent DB-writing tasks

Accessing the Mapped Index

Each mapped task instance knows its own index via {{ task_instance.map_index }}:

@task()
def process_item(item: dict, **context) -> dict:
    map_index = context["task_instance"].map_index
    total = context["task_instance"].max_map_index + 1  # Airflow 2.9+
    print(f"Processing item {map_index + 1} of {total}: {item}")
    return item

Limitations to Know

  1. XCom size: Mapped results are stored in XCom. Each individual result should be small (a path or count, not a DataFrame).

  2. Map index visibility: The UI shows indexed task instances ([0], [1], ...). Task IDs in alerts will include the index.

  3. Dependencies: You can chain mapped tasks. The downstream receives a list of all upstream results automatically.

  4. No branching inside mapped tasks: You can't use BranchPythonOperator on a mapped task's output. Structure your branching before the mapping.


Dynamic Task Mapping is one of the most powerful features in modern Airflow. Once you start using it, you'll find it replaces a huge category of "how do I process N things in parallel" problems cleanly and idiomatically.

Keep exploring:

Was this helpful?

Found this helpful? Share it with your team.

Share:

Comments

Prashant Singh

Prashant Singh

Senior Data Engineer

Senior Data Engineer with expertise in Apache Airflow, data orchestration, and building scalable data pipelines. Passionate about sharing knowledge and best practices with the data engineering community.