Apache Airflow DAG Writing Guide
A hands-on guide covering everything from setting up Airflow locally with Docker Compose to understanding DAG structure, using Operators, XCom data passing, error handling, and production deployment.
Problem
Required Tools
Workflow orchestration platform. Defines task dependencies as DAGs (Directed Acyclic Graphs) and manages scheduling, monitoring, and retries.
Easily sets up a multi-container local environment with Airflow Webserver, Scheduler, Worker, PostgreSQL, and Redis.
The language for writing DAG files and task logic. All Airflow configurations and workflows are defined in Python code.
Solution Steps
Set Up Local Airflow Environment (Docker Compose)
Using the official Apache Airflow Docker Compose file, you can set up the Webserver, Scheduler, Worker, PostgreSQL, and Redis all at once. The recommended workflow is to write and test DAGs in a local dev environment, then deploy to production. Mount the dags/, logs/, and plugins/ directories to the host so code changes are reflected immediately.
# Create project directory
mkdir airflow-project && cd airflow-project
mkdir -p dags logs plugins config
# Download the official Docker Compose file
curl -LfO "https://airflow.apache.org/docs/apache-airflow/2.10.5/docker-compose.yaml"
# Set environment variables
cat > .env << 'EOF'
AIRFLOW_UID=50000
AIRFLOW_GID=0
_AIRFLOW_WWW_USER_USERNAME=admin
_AIRFLOW_WWW_USER_PASSWORD=admin
EOF
# Initialize Airflow (DB migration + admin account creation)
docker compose up airflow-init
# Start all services in the background
docker compose up -d
# Check status
docker compose ps
# Access Webserver: http://localhost:8080
# Username: admin / Password: adminWrite Your First DAG (Context Manager, default_args, Schedule)
A DAG (Directed Acyclic Graph) is a workflow that defines the execution order and dependencies of tasks. Use Python's context manager (with statement) to define a DAG, and default_args to apply common settings to all tasks. The schedule parameter specifies the execution interval using cron expressions or presets (@daily, @hourly, etc.). start_date is the reference date for the first DAG run. Without catchup=False, Airflow will backfill all missed intervals from the start_date to the present.
# dags/my_first_dag.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
# Default settings applied to all tasks
default_args = {
"owner": "data-team",
"depends_on_past": False,
"email": ["alert@example.com"],
"email_on_failure": True,
"email_on_retry": False,
"retries": 2,
"retry_delay": timedelta(minutes=5),
}
# DAG definition (context manager pattern)
with DAG(
dag_id="my_first_dag",
default_args=default_args,
description="First Airflow DAG - ETL pipeline example",
schedule="0 9 * * *", # Daily at 9:00 AM (UTC)
start_date=datetime(2026, 1, 1),
catchup=False, # Don't backfill missed intervals
tags=["example", "etl"],
) as dag:
def extract(**kwargs):
"""Extract data"""
print(f"Extracting data for {kwargs['ds']}")
return {"records": 1000, "source": "api"}
def transform(**kwargs):
"""Transform data"""
print(f"Transforming data for {kwargs['ds']}")
return {"processed": 950}
def load(**kwargs):
"""Load data"""
print(f"Loading data for {kwargs['ds']}")
# Task definitions
task_extract = PythonOperator(
task_id="extract",
python_callable=extract,
)
task_transform = PythonOperator(
task_id="transform",
python_callable=transform,
)
task_load = PythonOperator(
task_id="load",
python_callable=load,
)
# Task dependencies: extract -> transform -> load
task_extract >> task_transform >> task_loadUse Operators (PythonOperator, BashOperator, Dependency Patterns)
Operators are the building blocks that define a single task. Choose the right Operator for each type of work. PythonOperator executes Python functions, and BashOperator runs shell commands. Use the >> operator to declare task dependencies, and use lists for parallel execution. The TaskFlow API (@task decorator) enables more concise DAG authoring.
# dags/operator_examples.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator
from airflow.decorators import task
default_args = {
"owner": "data-team",
"retries": 1,
"retry_delay": timedelta(minutes=3),
}
with DAG(
dag_id="operator_examples",
default_args=default_args,
schedule="@daily",
start_date=datetime(2026, 1, 1),
catchup=False,
) as dag:
# 1) BashOperator - Run shell commands
check_source = BashOperator(
task_id="check_source",
bash_command="echo 'Checking data source...' && curl -sf https://api.example.com/health",
)
# 2) PythonOperator - Run Python functions
def process_data(ds, **kwargs):
"""Process data using execution_date (ds)"""
print(f"Processing data for date: {ds}")
# ds = "2026-01-15" (execution date string)
return {"status": "success", "date": ds}
process = PythonOperator(
task_id="process_data",
python_callable=process_data,
)
# 3) TaskFlow API (@task decorator) - Concise syntax
@task
def validate_output():
"""Validate output data"""
print("Validating output data...")
return True
@task
def send_notification(is_valid: bool):
"""Send notification"""
if is_valid:
print("Pipeline completed successfully!")
# 4) EmptyOperator - Branch/merge markers
start = EmptyOperator(task_id="start")
end = EmptyOperator(task_id="end")
# 5) Dependency patterns
validation = validate_output()
notification = send_notification(validation)
# Sequential: start -> check_source -> process
start >> check_source >> process
# Merge: process -> validation -> notification -> end
process >> validation >> notification >> endPass Data Between Tasks with XCom
XCom (Cross-Communication) is the mechanism for passing small amounts of data between tasks. In PythonOperator, the return value is automatically pushed to XCom, and other tasks can retrieve it with xcom_pull. With the TaskFlow API, function return values are automatically passed as arguments to the next task. Caution: XCom data is stored in the metadata database, so you should never pass large data (more than a few MB). For large datasets, pass the S3/GCS file path instead.
# dags/xcom_example.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.decorators import task, dag
# === Method 1: Traditional xcom_push / xcom_pull ===
def extract_data(**kwargs):
"""Extract data and push to XCom"""
data = {
"records": [
{"id": 1, "name": "Alice", "score": 85},
{"id": 2, "name": "Bob", "score": 92},
],
"total_count": 2,
"extracted_at": str(datetime.now()),
}
# Return value is automatically pushed to XCom
return data
def transform_data(**kwargs):
"""Pull data from XCom and transform"""
ti = kwargs["ti"]
# Get the return value of the previous task
extracted = ti.xcom_pull(task_ids="extract")
print(f"Received {extracted['total_count']} records")
# Transformation logic
transformed = {
"records": [
{**r, "grade": "A" if r["score"] >= 90 else "B"}
for r in extracted["records"]
],
"transform_count": extracted["total_count"],
}
return transformed
def load_data(**kwargs):
"""Load transformed data"""
ti = kwargs["ti"]
data = ti.xcom_pull(task_ids="transform")
print(f"Loading {data['transform_count']} records")
for record in data["records"]:
print(f" -> {record['name']}: {record['grade']}")
with DAG(
dag_id="xcom_traditional",
schedule="@daily",
start_date=datetime(2026, 1, 1),
catchup=False,
) as dag1:
t1 = PythonOperator(task_id="extract", python_callable=extract_data)
t2 = PythonOperator(task_id="transform", python_callable=transform_data)
t3 = PythonOperator(task_id="load", python_callable=load_data)
t1 >> t2 >> t3
# === Method 2: TaskFlow API (Recommended) ===
@dag(
dag_id="xcom_taskflow",
schedule="@daily",
start_date=datetime(2026, 1, 1),
catchup=False,
)
def xcom_taskflow_pipeline():
@task
def extract():
return {"records": [{"id": 1, "value": 100}], "count": 1}
@task
def transform(data: dict):
# data is automatically passed from extract()'s return value
data["records"] = [{**r, "processed": True} for r in data["records"]]
return data
@task
def load(data: dict):
print(f"Loaded {data['count']} records")
# Function call chaining expresses both dependencies and data passing
raw = extract()
processed = transform(raw)
load(processed)
xcom_taskflow_pipeline()Error Handling and Retries (retries, on_failure_callback, SLA)
In production pipelines, retries and alerting for network errors, temporary API outages, etc. are essential. Configure automatic retries with retries and retry_delay, and send alerts on failure with on_failure_callback. Setting SLA (Service Level Agreement) triggers a warning when task execution takes longer than expected. Use execution_timeout to prevent infinite runs, and trigger_rule for fine-grained control over task execution conditions.
# dags/error_handling_dag.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.empty import EmptyOperator
from airflow.utils.trigger_rule import TriggerRule
import logging
logger = logging.getLogger(__name__)
# Callback function invoked on task failure
def on_failure(context):
"""Send Slack/email alert on task failure"""
task_instance = context["task_instance"]
exception = context.get("exception", "Unknown")
dag_id = context["dag"].dag_id
execution_date = context["execution_date"]
message = (
f"Task Failed!\n"
f"DAG: {dag_id}\n"
f"Task: {task_instance.task_id}\n"
f"Execution Date: {execution_date}\n"
f"Error: {exception}"
)
logger.error(message)
# Send alert via Slack webhook, PagerDuty, email, etc.
# requests.post(SLACK_WEBHOOK_URL, json={"text": message})
def on_success(context):
"""Called on task success"""
logger.info(f"Task {context['task_instance'].task_id} succeeded!")
def on_retry(context):
"""Called on retry"""
task_instance = context["task_instance"]
logger.warning(
f"Retrying {task_instance.task_id}, "
f"attempt {task_instance.try_number}"
)
default_args = {
"owner": "data-team",
"retries": 3,
"retry_delay": timedelta(minutes=2),
"retry_exponential_backoff": True, # 2min -> 4min -> 8min
"max_retry_delay": timedelta(minutes=30),
"execution_timeout": timedelta(hours=1), # Kill if exceeds 1 hour
"on_failure_callback": on_failure,
"on_success_callback": on_success,
"on_retry_callback": on_retry,
"sla": timedelta(hours=2), # Must complete within 2 hours
}
with DAG(
dag_id="error_handling_dag",
default_args=default_args,
schedule="0 6 * * *",
start_date=datetime(2026, 1, 1),
catchup=False,
dagrun_timeout=timedelta(hours=4), # Overall DAG run timeout
) as dag:
def risky_api_call(**kwargs):
"""External API call that may fail"""
import random
if random.random() < 0.3:
raise ConnectionError("API temporarily unavailable")
return {"data": "success"}
extract = PythonOperator(
task_id="extract_from_api",
python_callable=risky_api_call,
# Task-level retry settings (override default_args)
retries=5,
retry_delay=timedelta(minutes=1),
)
transform = PythonOperator(
task_id="transform",
python_callable=lambda: print("Transforming..."),
)
load = PythonOperator(
task_id="load",
python_callable=lambda: print("Loading..."),
)
# trigger_rule: execution condition based on upstream results
cleanup = PythonOperator(
task_id="cleanup",
python_callable=lambda: print("Cleaning up temp files..."),
trigger_rule=TriggerRule.ALL_DONE, # Always run regardless of success/failure
)
notify_complete = EmptyOperator(
task_id="notify_complete",
trigger_rule=TriggerRule.ALL_SUCCESS, # Run only if all succeed
)
extract >> transform >> load >> [cleanup, notify_complete]Production Deployment (Variables, Connections, DAG Testing)
In production environments, avoid hardcoding and externalize configurations using Variables and Connections. Variables store key-value settings, while Connections manage database/API access credentials via the Airflow UI or CLI. Always run DAG syntax validation and unit tests before deployment, and integrate them into your CI/CD pipeline. Use TaskGroup to logically group complex DAGs, greatly improving readability in the Airflow UI.
# dags/production_dag.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.models import Variable
from airflow.hooks.base import BaseHook
from airflow.operators.python import PythonOperator
from airflow.utils.task_group import TaskGroup
from airflow.decorators import task
import json
default_args = {
"owner": "data-team",
"retries": 3,
"retry_delay": timedelta(minutes=5),
}
with DAG(
dag_id="production_etl",
default_args=default_args,
schedule="0 2 * * *",
start_date=datetime(2026, 1, 1),
catchup=False,
tags=["production", "etl"],
doc_md="""
## Production ETL Pipeline
ETL pipeline that runs daily at 2:00 AM.
Extracts data from source DB and loads it into the Data Warehouse.
""",
) as dag:
# --- Using Variables (set via UI or CLI) ---
# airflow variables set etl_config '{"batch_size":1000,"debug":false}'
# airflow variables set alert_email "team@example.com"
@task
def load_config():
config = json.loads(Variable.get("etl_config", default_var='{"batch_size":500}'))
alert_email = Variable.get("alert_email", default_var="default@example.com")
return {"config": config, "alert_email": alert_email}
# --- Using Connections (set via UI or CLI) ---
# airflow connections add source_db \
# --conn-type postgres \
# --conn-host db.example.com \
# --conn-port 5432 \
# --conn-login readonly \
# --conn-password secret \
# --conn-schema analytics
@task
def extract_from_db(config: dict):
conn = BaseHook.get_connection("source_db")
print(f"Connecting to {conn.host}:{conn.port}/{conn.schema}")
batch_size = config["config"]["batch_size"]
print(f"Extracting with batch_size={batch_size}")
return {"extracted_rows": 5000, "source": conn.host}
# --- TaskGroup for grouping transform tasks ---
with TaskGroup(group_id="transform_group") as transform_group:
@task(task_group=transform_group)
def clean_data(extract_result: dict):
print(f"Cleaning {extract_result['extracted_rows']} rows")
return {"cleaned_rows": 4800}
@task(task_group=transform_group)
def enrich_data(cleaned: dict):
print(f"Enriching {cleaned['cleaned_rows']} rows")
return {"enriched_rows": 4800}
@task(task_group=transform_group)
def validate_data(enriched: dict):
print(f"Validating {enriched['enriched_rows']} rows")
if enriched["enriched_rows"] == 0:
raise ValueError("No data to load!")
return {"valid_rows": enriched["enriched_rows"]}
@task
def load_to_warehouse(validated: dict):
conn = BaseHook.get_connection("target_warehouse")
print(f"Loading {validated['valid_rows']} rows to {conn.host}")
# Pipeline wiring
cfg = load_config()
raw = extract_from_db(cfg)
cleaned = clean_data(raw)
enriched = enrich_data(cleaned)
validated = validate_data(enriched)
load_to_warehouse(validated)
# ============================
# DAG Testing (pre-deployment validation)
# ============================
# 1) DAG syntax validation (check for import errors)
# python dags/production_dag.py
# 2) Test individual tasks with Airflow CLI
# airflow tasks test production_etl load_config 2026-01-15
# 3) Test full DAG backfill
# airflow dags test production_etl 2026-01-15
# 4) Unit test DAG structure with pytest
# tests/test_dags.py
# ----------------------------------------
# import pytest
# from airflow.models import DagBag
#
# @pytest.fixture
# def dagbag():
# return DagBag(dag_folder="dags/", include_examples=False)
#
# def test_no_import_errors(dagbag):
# assert len(dagbag.import_errors) == 0
#
# def test_production_etl_tasks(dagbag):
# dag = dagbag.get_dag("production_etl")
# assert dag is not None
# task_ids = [t.task_id for t in dag.tasks]
# assert "load_config" in task_ids
# assert "extract_from_db" in task_ids
# ----------------------------------------Common Mistakes
Running heavy code at the top level of DAG files (DB queries, API calls, etc.)
The Airflow Scheduler parses DAG files periodically (every 30 seconds by default). Heavy code at the top level severely degrades Scheduler performance. Always write data processing logic inside Operator callable functions, and use lazy imports (import inside functions).
Processing large datasets directly inside Operators (memory overflow)
Loading hundreds of MBs of data in a PythonOperator can cause Workers to terminate with OOM errors. Delegate large data processing to external engines like Spark or dbt, and use Airflow only for orchestration (triggering execution, tracking state).
Not using TaskGroup, resulting in dozens of tasks displayed flat in the UI
Grouping related tasks with TaskGroup enables collapse/expand in the Airflow UI, greatly improving readability. Organize by ETL stages such as extract_group, transform_group, and load_group.
Storing large data in XCom, causing metadata DB overload
XCom is stored in the metadata DB (PostgreSQL/MySQL), so only pass metadata of a few KB or less. Store large data in S3/GCS and pass only the file path (s3://bucket/path) via XCom. You can also configure a Custom XCom Backend to automatically store data in S3.
Not realizing catchup=True (default) causes massive backfill from past dates
If start_date is in the past and catchup=True (the default), Airflow will run all missed intervals at once up to the present. Generally, set catchup=False explicitly, and use the airflow dags backfill command for manual backfill when needed.