4.2 Airflow UI and DAG Creation

4.2.1 Airflow UI

The Airflow web UI is the primary interface for managing and monitoring workflows. It provides visibility into DAG status, task execution, logs, and configuration โ€” all from a browser.

DAGs Page

The main landing page lists every DAG discovered in the DAG directory. For each DAG it shows the owner, schedule interval, last run timestamp, current status, and a mini status bar for individual tasks within the most recent run. From this page you can toggle DAGs on/off, trigger a manual run, or delete a DAG entirely.

DAG Detail Views

Clicking into a DAG opens a set of views that help you understand and debug pipeline runs:

ViewPurpose
GridMatrix of all runs with per-task status cells โ€” quickly spot patterns of failure
GraphVisual representation of task dependencies โ€” shows the DAG structure
GanttTimeline bars for each task โ€” identifies bottlenecks and long-running tasks
CodeThe Python source code that defines the DAG
LogsStdout/stderr output for each task instance โ€” primary debugging tool

4.2.2 Creating DAGs and Using Operators

DAGs are created using the airflow.DAG() context manager. A typical ETL pipeline defines separate tasks for each stage and wires them together with dependency operators.

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

# define the DAG using a context manager
with DAG(
    dag_id="my_first_dag",
    description="ETL pipeline",
    tags=["data_engineering_team"],
    schedule="@daily",                          # run once per day at midnight
    start_date=datetime(2024, 12, 1),
    catchup=False,                              # don't backfill missed runs
):
    # each task wraps a Python function
    task_1 = PythonOperator(
        task_id="extract",
        python_callable=extract_data,           # function defined elsewhere
    )
    task_2 = PythonOperator(
        task_id="transform",
        python_callable=transform_data,
    )
    task_3 = PythonOperator(
        task_id="load",
        python_callable=load_data,
    )

    # set dependencies -- extract runs first, then transform, then load
    task_1 >> task_2 >> task_3

Operators

Operators define what each task actually does. Airflow provides a rich library of built-in operators:

OperatorModulePurpose
PythonOperatorairflow.operators.pythonExecutes a Python callable
BashOperatorairflow.operators.bashRuns a bash command or script
EmptyOperatorairflow.operators.emptyNo-op task โ€” useful as a join point for complex dependencies
EmailOperatorairflow.operators.emailSends email notifications
S3KeySensorairflow.providers.amazon.aws.sensors.s3Waits for a file to appear in S3 before proceeding

Task Dependencies

The >> and << bit-shift operators define execution order between tasks. Use Python lists to express parallel execution and convergence:

DAG dependency patterns: linear, fan-out, fan-in, and complex chain DAG dependency patterns: linear, fan-out, fan-in, and complex chain

For complex many-to-many dependencies where list syntax wonโ€™t work, use the chain() utility:

from airflow.models.baseoperator import chain

# chain() connects each layer to the next -- equivalent to writing
# all individual t0>>t1, t0>>t2, t1>>t3, t1>>t4, ... arrows manually
chain(task0, [task1, task2], [task3, task4], task5)

4.2.3 XComs and Variables

XComs (Cross-Communication)

XComs allow tasks to exchange small pieces of data โ€” metadata, dates, single-value metrics, and simple computations. They are not designed for large objects like DataFrames.

The flow works through the metadata database: one task pushes a value with xcom_push, and another task retrieves it with xcom_pull. Both methods are accessed through the task instance object in the execution context.

XCom cross-communication flow between tasks via metadata database XCom cross-communication flow between tasks via metadata database
def extract_from_api(**context):
    """Extract data and push a computed metric via XCom."""
    import requests

    response = requests.get(
        "https://jobicy.com/api/v2/remote-jobs",
        params={"count": 40, "geo": "usa", "tag": "data engineer"},
    ).json()

    # compute a metric from the response
    senior_count = sum(1 for job in response["jobs"] if job["jobLevel"] == "Senior")
    ratio = senior_count / len(response["jobs"])

    # push the value to XCom -- stored with key, timestamp, DAG ID, and task ID
    context["ti"].xcom_push(key="ratio_senior_jobs", value=ratio)


def load_results(**context):
    """Pull the metric from a previous task via XCom."""
    # retrieve by key and source task ID
    ratio = context["ti"].xcom_pull(
        key="ratio_senior_jobs",
        task_ids="extract",
    )
    print(f"Senior job ratio: {ratio}")

XComs are visible in the Airflow UI under Admin > XComs. For large objects like DataFrames, save them to intermediate storage (e.g., S3) and read the path in the downstream task instead.


Airflow Variables

Variables let you avoid hard-coding configuration values inside tasks. Create them in the Airflow UI (Admin > Variables) or as environment variables prefixed with AIRFLOW_VAR_.

from airflow.models import Variable

# retrieve a simple string variable
num_posts = Variable.get(key="number_post")

# retrieve a JSON variable and access a nested key
# passing deserialize_json=True parses the stored JSON string into a dict
locations = Variable.get(key="locations", deserialize_json=True)
geos = locations["geo"]    # e.g. ["usa", "canada", "france", "australia"]

Variables are global โ€” they apply across the entire Airflow installation. Use them for installation-wide configuration (API endpoints, bucket names, connection strings). To pass data between tasks within a single DAG run, use XComs instead.


Built-in Variables and Jinja Templating

Airflow provides a set of built-in variables with information about the current DAG run, such as the logical date (ds), execution date, and task instance. You can access them in two ways:

# option 1: via the context dictionary inside a task function
def my_task(**context):
    logical_date = context["ds"]                # "2024-12-01"
    print(f"Processing data for {logical_date}")


# option 2: via Jinja templating in operator arguments
task_load_s3 = PythonOperator(
    task_id="load_to_s3",
    python_callable=load_to_s3,
    # {{ ds }} is replaced at runtime with the logical date of the DAG run
    op_kwargs={"file_name": "data/created_{{ ds }}/file.csv"},
)

4.2.4 DAG Best Practices

Well-written DAGs are efficient, readable, idempotent, and reproducible.

PracticeDetails
Keep tasks atomicEach task should represent a single operation. An ETL pipeline needs at least three tasks (extract, transform, load) โ€” not one monolithic task that does everything. Atomic tasks improve visibility, enable targeted retries, and support idempotency.
Avoid top-level codeAny code outside of DAG/operator definitions is parsed by the scheduler every ~30 seconds. API calls, database queries, or heavy computations at the module level cause performance issues. Keep all logic inside operator callables.
Use variablesDonโ€™t hard-code values like bucket names, API URLs, or thresholds. Store them as Airflow Variables or environment variables so they can be updated without code changes.
Use task groupsOrganize related tasks visually in the UI using TaskGroup. This improves readability for complex DAGs without changing execution behavior.
Airflow is an orchestrator, not an executorHeavy processing belongs in execution frameworks like Spark, dbt, or AWS Glue. Airflow should trigger and monitor these jobs, not run the computation itself.
Donโ€™t pass large data via XComsXComs store data in the metadata database. For large objects, write to intermediate storage (S3, a staging table) and pass the reference path instead.
Keep task code in separate filesImport your Python functions from a module rather than defining them inline in the DAG file. This improves readability and testability.

Task Groups Example

from airflow.utils.task_group import TaskGroup

with DAG(...):
    start = EmptyOperator(task_id="start")

    # group related tasks -- they appear as a collapsible block in the UI
    with TaskGroup("transform_group") as transform_group:
        clean = PythonOperator(task_id="clean", python_callable=clean_data)
        enrich = PythonOperator(task_id="enrich", python_callable=enrich_data)
        clean >> enrich

    end = EmptyOperator(task_id="end")

    start >> transform_group >> end

Top-Level Code Anti-Pattern

# BAD: these run every time the scheduler parses this file (~every 30s)
call_some_function()           # avoid -- runs at parse time
perform_computation()          # avoid -- runs at parse time

with DAG(dag_id="example", start_date=datetime(2024, 3, 13),
         schedule="@daily", catchup=False):
    # GOOD: this code only runs when the task is actually executed
    task_1 = PythonOperator(task_id="extract", python_callable=extract_api)
    task_2 = PythonOperator(task_id="load", python_callable=load)
    task_1 >> task_2