liminfo

Apache Airflow DAG Writing Guide

A hands-on guide covering everything from setting up Airflow locally with Docker Compose to understanding DAG structure, using Operators, XCom data passing, error handling, and production deployment.

Airflow DAGApache AirflowData PipelineETLDAG SchedulingAirflow OperatorXComWorkflow Automation

Problem

You are managing data pipelines with crontab and shell scripts, leading to multiple issues. When scripts fail, there are no alerts, and data gaps are only discovered days later. Task dependencies are controlled solely by cron time gaps, so if an upstream step runs late, downstream steps execute on incomplete data. There is no automatic retry on failure, requiring manual re-execution every time, and there is no history of when tasks succeeded or failed. You need to adopt Apache Airflow to implement systematic workflow management, monitoring, and automatic retries.

Required Tools

Apache Airflow 2.x

Workflow orchestration platform. Defines task dependencies as DAGs (Directed Acyclic Graphs) and manages scheduling, monitoring, and retries.

Docker Compose

Easily sets up a multi-container local environment with Airflow Webserver, Scheduler, Worker, PostgreSQL, and Redis.

Python 3.9+

The language for writing DAG files and task logic. All Airflow configurations and workflows are defined in Python code.

Solution Steps

1

Set Up Local Airflow Environment (Docker Compose)

Using the official Apache Airflow Docker Compose file, you can set up the Webserver, Scheduler, Worker, PostgreSQL, and Redis all at once. The recommended workflow is to write and test DAGs in a local dev environment, then deploy to production. Mount the dags/, logs/, and plugins/ directories to the host so code changes are reflected immediately.

# Create project directory
mkdir airflow-project && cd airflow-project
mkdir -p dags logs plugins config

# Download the official Docker Compose file
curl -LfO "https://airflow.apache.org/docs/apache-airflow/2.10.5/docker-compose.yaml"

# Set environment variables
cat > .env << 'EOF'
AIRFLOW_UID=50000
AIRFLOW_GID=0
_AIRFLOW_WWW_USER_USERNAME=admin
_AIRFLOW_WWW_USER_PASSWORD=admin
EOF

# Initialize Airflow (DB migration + admin account creation)
docker compose up airflow-init

# Start all services in the background
docker compose up -d

# Check status
docker compose ps

# Access Webserver: http://localhost:8080
# Username: admin / Password: admin
2

Write Your First DAG (Context Manager, default_args, Schedule)

A DAG (Directed Acyclic Graph) is a workflow that defines the execution order and dependencies of tasks. Use Python's context manager (with statement) to define a DAG, and default_args to apply common settings to all tasks. The schedule parameter specifies the execution interval using cron expressions or presets (@daily, @hourly, etc.). start_date is the reference date for the first DAG run. Without catchup=False, Airflow will backfill all missed intervals from the start_date to the present.

# dags/my_first_dag.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator

# Default settings applied to all tasks
default_args = {
    "owner": "data-team",
    "depends_on_past": False,
    "email": ["alert@example.com"],
    "email_on_failure": True,
    "email_on_retry": False,
    "retries": 2,
    "retry_delay": timedelta(minutes=5),
}

# DAG definition (context manager pattern)
with DAG(
    dag_id="my_first_dag",
    default_args=default_args,
    description="First Airflow DAG - ETL pipeline example",
    schedule="0 9 * * *",  # Daily at 9:00 AM (UTC)
    start_date=datetime(2026, 1, 1),
    catchup=False,  # Don't backfill missed intervals
    tags=["example", "etl"],
) as dag:

    def extract(**kwargs):
        """Extract data"""
        print(f"Extracting data for {kwargs['ds']}")
        return {"records": 1000, "source": "api"}

    def transform(**kwargs):
        """Transform data"""
        print(f"Transforming data for {kwargs['ds']}")
        return {"processed": 950}

    def load(**kwargs):
        """Load data"""
        print(f"Loading data for {kwargs['ds']}")

    # Task definitions
    task_extract = PythonOperator(
        task_id="extract",
        python_callable=extract,
    )

    task_transform = PythonOperator(
        task_id="transform",
        python_callable=transform,
    )

    task_load = PythonOperator(
        task_id="load",
        python_callable=load,
    )

    # Task dependencies: extract -> transform -> load
    task_extract >> task_transform >> task_load
3

Use Operators (PythonOperator, BashOperator, Dependency Patterns)

Operators are the building blocks that define a single task. Choose the right Operator for each type of work. PythonOperator executes Python functions, and BashOperator runs shell commands. Use the >> operator to declare task dependencies, and use lists for parallel execution. The TaskFlow API (@task decorator) enables more concise DAG authoring.

# dags/operator_examples.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator
from airflow.decorators import task

default_args = {
    "owner": "data-team",
    "retries": 1,
    "retry_delay": timedelta(minutes=3),
}

with DAG(
    dag_id="operator_examples",
    default_args=default_args,
    schedule="@daily",
    start_date=datetime(2026, 1, 1),
    catchup=False,
) as dag:

    # 1) BashOperator - Run shell commands
    check_source = BashOperator(
        task_id="check_source",
        bash_command="echo 'Checking data source...' && curl -sf https://api.example.com/health",
    )

    # 2) PythonOperator - Run Python functions
    def process_data(ds, **kwargs):
        """Process data using execution_date (ds)"""
        print(f"Processing data for date: {ds}")
        # ds = "2026-01-15" (execution date string)
        return {"status": "success", "date": ds}

    process = PythonOperator(
        task_id="process_data",
        python_callable=process_data,
    )

    # 3) TaskFlow API (@task decorator) - Concise syntax
    @task
    def validate_output():
        """Validate output data"""
        print("Validating output data...")
        return True

    @task
    def send_notification(is_valid: bool):
        """Send notification"""
        if is_valid:
            print("Pipeline completed successfully!")

    # 4) EmptyOperator - Branch/merge markers
    start = EmptyOperator(task_id="start")
    end = EmptyOperator(task_id="end")

    # 5) Dependency patterns
    validation = validate_output()
    notification = send_notification(validation)

    # Sequential: start -> check_source -> process
    start >> check_source >> process

    # Merge: process -> validation -> notification -> end
    process >> validation >> notification >> end
4

Pass Data Between Tasks with XCom

XCom (Cross-Communication) is the mechanism for passing small amounts of data between tasks. In PythonOperator, the return value is automatically pushed to XCom, and other tasks can retrieve it with xcom_pull. With the TaskFlow API, function return values are automatically passed as arguments to the next task. Caution: XCom data is stored in the metadata database, so you should never pass large data (more than a few MB). For large datasets, pass the S3/GCS file path instead.

# dags/xcom_example.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.decorators import task, dag

# === Method 1: Traditional xcom_push / xcom_pull ===

def extract_data(**kwargs):
    """Extract data and push to XCom"""
    data = {
        "records": [
            {"id": 1, "name": "Alice", "score": 85},
            {"id": 2, "name": "Bob", "score": 92},
        ],
        "total_count": 2,
        "extracted_at": str(datetime.now()),
    }
    # Return value is automatically pushed to XCom
    return data


def transform_data(**kwargs):
    """Pull data from XCom and transform"""
    ti = kwargs["ti"]

    # Get the return value of the previous task
    extracted = ti.xcom_pull(task_ids="extract")
    print(f"Received {extracted['total_count']} records")

    # Transformation logic
    transformed = {
        "records": [
            {**r, "grade": "A" if r["score"] >= 90 else "B"}
            for r in extracted["records"]
        ],
        "transform_count": extracted["total_count"],
    }
    return transformed


def load_data(**kwargs):
    """Load transformed data"""
    ti = kwargs["ti"]
    data = ti.xcom_pull(task_ids="transform")
    print(f"Loading {data['transform_count']} records")
    for record in data["records"]:
        print(f"  -> {record['name']}: {record['grade']}")


with DAG(
    dag_id="xcom_traditional",
    schedule="@daily",
    start_date=datetime(2026, 1, 1),
    catchup=False,
) as dag1:
    t1 = PythonOperator(task_id="extract", python_callable=extract_data)
    t2 = PythonOperator(task_id="transform", python_callable=transform_data)
    t3 = PythonOperator(task_id="load", python_callable=load_data)
    t1 >> t2 >> t3


# === Method 2: TaskFlow API (Recommended) ===

@dag(
    dag_id="xcom_taskflow",
    schedule="@daily",
    start_date=datetime(2026, 1, 1),
    catchup=False,
)
def xcom_taskflow_pipeline():

    @task
    def extract():
        return {"records": [{"id": 1, "value": 100}], "count": 1}

    @task
    def transform(data: dict):
        # data is automatically passed from extract()'s return value
        data["records"] = [{**r, "processed": True} for r in data["records"]]
        return data

    @task
    def load(data: dict):
        print(f"Loaded {data['count']} records")

    # Function call chaining expresses both dependencies and data passing
    raw = extract()
    processed = transform(raw)
    load(processed)

xcom_taskflow_pipeline()
5

Error Handling and Retries (retries, on_failure_callback, SLA)

In production pipelines, retries and alerting for network errors, temporary API outages, etc. are essential. Configure automatic retries with retries and retry_delay, and send alerts on failure with on_failure_callback. Setting SLA (Service Level Agreement) triggers a warning when task execution takes longer than expected. Use execution_timeout to prevent infinite runs, and trigger_rule for fine-grained control over task execution conditions.

# dags/error_handling_dag.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.empty import EmptyOperator
from airflow.utils.trigger_rule import TriggerRule
import logging

logger = logging.getLogger(__name__)


# Callback function invoked on task failure
def on_failure(context):
    """Send Slack/email alert on task failure"""
    task_instance = context["task_instance"]
    exception = context.get("exception", "Unknown")
    dag_id = context["dag"].dag_id
    execution_date = context["execution_date"]

    message = (
        f"Task Failed!\n"
        f"DAG: {dag_id}\n"
        f"Task: {task_instance.task_id}\n"
        f"Execution Date: {execution_date}\n"
        f"Error: {exception}"
    )
    logger.error(message)
    # Send alert via Slack webhook, PagerDuty, email, etc.
    # requests.post(SLACK_WEBHOOK_URL, json={"text": message})


def on_success(context):
    """Called on task success"""
    logger.info(f"Task {context['task_instance'].task_id} succeeded!")


def on_retry(context):
    """Called on retry"""
    task_instance = context["task_instance"]
    logger.warning(
        f"Retrying {task_instance.task_id}, "
        f"attempt {task_instance.try_number}"
    )


default_args = {
    "owner": "data-team",
    "retries": 3,
    "retry_delay": timedelta(minutes=2),
    "retry_exponential_backoff": True,  # 2min -> 4min -> 8min
    "max_retry_delay": timedelta(minutes=30),
    "execution_timeout": timedelta(hours=1),  # Kill if exceeds 1 hour
    "on_failure_callback": on_failure,
    "on_success_callback": on_success,
    "on_retry_callback": on_retry,
    "sla": timedelta(hours=2),  # Must complete within 2 hours
}

with DAG(
    dag_id="error_handling_dag",
    default_args=default_args,
    schedule="0 6 * * *",
    start_date=datetime(2026, 1, 1),
    catchup=False,
    dagrun_timeout=timedelta(hours=4),  # Overall DAG run timeout
) as dag:

    def risky_api_call(**kwargs):
        """External API call that may fail"""
        import random
        if random.random() < 0.3:
            raise ConnectionError("API temporarily unavailable")
        return {"data": "success"}

    extract = PythonOperator(
        task_id="extract_from_api",
        python_callable=risky_api_call,
        # Task-level retry settings (override default_args)
        retries=5,
        retry_delay=timedelta(minutes=1),
    )

    transform = PythonOperator(
        task_id="transform",
        python_callable=lambda: print("Transforming..."),
    )

    load = PythonOperator(
        task_id="load",
        python_callable=lambda: print("Loading..."),
    )

    # trigger_rule: execution condition based on upstream results
    cleanup = PythonOperator(
        task_id="cleanup",
        python_callable=lambda: print("Cleaning up temp files..."),
        trigger_rule=TriggerRule.ALL_DONE,  # Always run regardless of success/failure
    )

    notify_complete = EmptyOperator(
        task_id="notify_complete",
        trigger_rule=TriggerRule.ALL_SUCCESS,  # Run only if all succeed
    )

    extract >> transform >> load >> [cleanup, notify_complete]
6

Production Deployment (Variables, Connections, DAG Testing)

In production environments, avoid hardcoding and externalize configurations using Variables and Connections. Variables store key-value settings, while Connections manage database/API access credentials via the Airflow UI or CLI. Always run DAG syntax validation and unit tests before deployment, and integrate them into your CI/CD pipeline. Use TaskGroup to logically group complex DAGs, greatly improving readability in the Airflow UI.

# dags/production_dag.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.models import Variable
from airflow.hooks.base import BaseHook
from airflow.operators.python import PythonOperator
from airflow.utils.task_group import TaskGroup
from airflow.decorators import task
import json

default_args = {
    "owner": "data-team",
    "retries": 3,
    "retry_delay": timedelta(minutes=5),
}

with DAG(
    dag_id="production_etl",
    default_args=default_args,
    schedule="0 2 * * *",
    start_date=datetime(2026, 1, 1),
    catchup=False,
    tags=["production", "etl"],
    doc_md="""
    ## Production ETL Pipeline
    ETL pipeline that runs daily at 2:00 AM.
    Extracts data from source DB and loads it into the Data Warehouse.
    """,
) as dag:

    # --- Using Variables (set via UI or CLI) ---
    # airflow variables set etl_config '{"batch_size":1000,"debug":false}'
    # airflow variables set alert_email "team@example.com"

    @task
    def load_config():
        config = json.loads(Variable.get("etl_config", default_var='{"batch_size":500}'))
        alert_email = Variable.get("alert_email", default_var="default@example.com")
        return {"config": config, "alert_email": alert_email}

    # --- Using Connections (set via UI or CLI) ---
    # airflow connections add source_db \
    #   --conn-type postgres \
    #   --conn-host db.example.com \
    #   --conn-port 5432 \
    #   --conn-login readonly \
    #   --conn-password secret \
    #   --conn-schema analytics

    @task
    def extract_from_db(config: dict):
        conn = BaseHook.get_connection("source_db")
        print(f"Connecting to {conn.host}:{conn.port}/{conn.schema}")
        batch_size = config["config"]["batch_size"]
        print(f"Extracting with batch_size={batch_size}")
        return {"extracted_rows": 5000, "source": conn.host}

    # --- TaskGroup for grouping transform tasks ---
    with TaskGroup(group_id="transform_group") as transform_group:

        @task(task_group=transform_group)
        def clean_data(extract_result: dict):
            print(f"Cleaning {extract_result['extracted_rows']} rows")
            return {"cleaned_rows": 4800}

        @task(task_group=transform_group)
        def enrich_data(cleaned: dict):
            print(f"Enriching {cleaned['cleaned_rows']} rows")
            return {"enriched_rows": 4800}

        @task(task_group=transform_group)
        def validate_data(enriched: dict):
            print(f"Validating {enriched['enriched_rows']} rows")
            if enriched["enriched_rows"] == 0:
                raise ValueError("No data to load!")
            return {"valid_rows": enriched["enriched_rows"]}

    @task
    def load_to_warehouse(validated: dict):
        conn = BaseHook.get_connection("target_warehouse")
        print(f"Loading {validated['valid_rows']} rows to {conn.host}")

    # Pipeline wiring
    cfg = load_config()
    raw = extract_from_db(cfg)
    cleaned = clean_data(raw)
    enriched = enrich_data(cleaned)
    validated = validate_data(enriched)
    load_to_warehouse(validated)


# ============================
# DAG Testing (pre-deployment validation)
# ============================

# 1) DAG syntax validation (check for import errors)
# python dags/production_dag.py

# 2) Test individual tasks with Airflow CLI
# airflow tasks test production_etl load_config 2026-01-15

# 3) Test full DAG backfill
# airflow dags test production_etl 2026-01-15

# 4) Unit test DAG structure with pytest
# tests/test_dags.py
# ----------------------------------------
# import pytest
# from airflow.models import DagBag
#
# @pytest.fixture
# def dagbag():
#     return DagBag(dag_folder="dags/", include_examples=False)
#
# def test_no_import_errors(dagbag):
#     assert len(dagbag.import_errors) == 0
#
# def test_production_etl_tasks(dagbag):
#     dag = dagbag.get_dag("production_etl")
#     assert dag is not None
#     task_ids = [t.task_id for t in dag.tasks]
#     assert "load_config" in task_ids
#     assert "extract_from_db" in task_ids
# ----------------------------------------

Common Mistakes

Running heavy code at the top level of DAG files (DB queries, API calls, etc.)

The Airflow Scheduler parses DAG files periodically (every 30 seconds by default). Heavy code at the top level severely degrades Scheduler performance. Always write data processing logic inside Operator callable functions, and use lazy imports (import inside functions).

Processing large datasets directly inside Operators (memory overflow)

Loading hundreds of MBs of data in a PythonOperator can cause Workers to terminate with OOM errors. Delegate large data processing to external engines like Spark or dbt, and use Airflow only for orchestration (triggering execution, tracking state).

Not using TaskGroup, resulting in dozens of tasks displayed flat in the UI

Grouping related tasks with TaskGroup enables collapse/expand in the Airflow UI, greatly improving readability. Organize by ETL stages such as extract_group, transform_group, and load_group.

Storing large data in XCom, causing metadata DB overload

XCom is stored in the metadata DB (PostgreSQL/MySQL), so only pass metadata of a few KB or less. Store large data in S3/GCS and pass only the file path (s3://bucket/path) via XCom. You can also configure a Custom XCom Backend to automatically store data in S3.

Not realizing catchup=True (default) causes massive backfill from past dates

If start_date is in the past and catchup=True (the default), Airflow will run all missed intervals at once up to the present. Generally, set catchup=False explicitly, and use the airflow dags backfill command for manual backfill when needed.

Related liminfo Services