Skip to content

Fix: Apache Airflow Not Working — DAG Not Found, Task Failures, and Scheduler Issues

FixDevs ·

Quick Answer

How to fix Apache Airflow errors — DAG not appearing in UI, ImportError preventing DAG load, task stuck in running or queued, scheduler not scheduling, XCom too large, connection not found, and database migration errors.

The Error

Your DAG file is in the right folder but it never appears in the Airflow UI. Or it appears but the scheduler ignores it. Or a task runs for hours showing “running” in the UI with nothing actually executing. Or the whole webserver fails to start:

airflow.exceptions.AirflowException: Database schema mismatch.
Please run `airflow db migrate` to upgrade your schema.

Or a task silently moves to “failed” with just:

ERROR - Lost connection to MySQL server during query

Airflow is a distributed system — scheduler, webserver, workers, and a metadata database all run as separate processes. When any layer breaks, the error surface is wide. This guide covers the most common failure modes and how to diagnose each one.

Why This Happens

Airflow separates concerns across components: the scheduler parses DAG files and queues task instances, the webserver serves the UI, the executor runs tasks (sequentially, via Celery, Kubernetes, etc.), and the metadata database tracks all state. A failure in any one component can produce misleading symptoms in another.

The most common root causes: DAG files that fail to import, a scheduler process that isn’t running, task state stuck in the database after a worker crash, and mismatches between database schema and Airflow version.

Fix 1: DAG Not Appearing in the UI

This is the most common Airflow issue. The DAG file is in the dags folder but the UI shows nothing.

Step 1: Check import errors directly:

# List all DAGs that loaded successfully
airflow dags list

# List all DAG files that failed to import
airflow dags list-import-errors

Any Python error in your DAG file — syntax error, missing import, wrong variable name — prevents the entire file from loading. The UI shows an “Import Errors” tab at the top if any files failed.

Step 2: Verify the DAG folder path:

airflow config get-value core dags_folder
# or
echo $AIRFLOW__CORE__DAGS_FOLDER

The DAG file must be inside this directory (or a subdirectory, if AIRFLOW__CORE__DAG_DIR_LIST_INTERVAL is set to scan subdirectories).

Step 3: Check dag_discovery_safe_mode. When enabled (the default), Airflow only parses files that contain the strings "DAG" or "airflow" somewhere in the file. A utility module that doesn’t mention these strings won’t be parsed — which is intentional — but a DAG file accidentally missing both strings won’t load:

airflow config get-value core dag_discovery_safe_mode
# If True, your .py file must contain 'DAG' or 'airflow' somewhere in the text

Step 4: Verify your DAG object is accessible at module level. The scheduler imports your file and looks for DAG instances. The DAG must be assigned to a variable at the module’s top level:

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

# WRONG — DAG defined inside a function; scheduler can't find it
def create_dag():
    with DAG('my_dag', start_date=datetime(2024, 1, 1)) as dag:
        ...
    return dag

# CORRECT — DAG at module level
with DAG(
    dag_id='my_dag',
    start_date=datetime(2024, 1, 1),
    schedule='@daily',
    catchup=False,
) as dag:
    task = PythonOperator(task_id='say_hello', python_callable=lambda: print("hello"))

Step 5: Wait for the scheduler to pick it up. The scheduler re-scans the DAG folder every min_file_process_interval seconds (default: 30). After adding a new file, it can take up to a minute to appear. Force a refresh:

airflow dags reserialize

Fix 2: ImportError Prevents DAG From Loading

DagFileProcessorProcess: Failed to import: /opt/airflow/dags/my_dag.py
Traceback (most recent call last):
  File "/opt/airflow/dags/my_dag.py", line 3, in <module>
    from airflow.operators.bash import BashOperator
ModuleNotFoundError: No module named 'apache-airflow-providers-bash'

Any unhandled exception during import — including a missing Airflow provider — silently removes the DAG from the UI.

Install missing providers:

# Airflow 2.x uses provider packages for external integrations
pip install apache-airflow-providers-bash          # BashOperator
pip install apache-airflow-providers-postgres      # PostgresOperator
pip install apache-airflow-providers-amazon        # S3, EMR, etc.
pip install apache-airflow-providers-google        # GCP operators
pip install apache-airflow-providers-http          # SimpleHttpOperator

# List installed providers
airflow providers list

Airflow 2.x import paths changed from 1.x. If you’re migrating from Airflow 1.x:

# OLD — Airflow 1.x
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.sensors.http_sensor import HttpSensor

# NEW — Airflow 2.x (providers package)
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.providers.http.sensors.http import HttpSensor

Test your DAG file imports in isolation:

# Run the file directly — any import error appears immediately
python /opt/airflow/dags/my_dag.py

# Or use the Airflow CLI to trigger parsing with full error output
python -c "
from airflow.models import DagBag
dagbag = DagBag(dag_folder='/opt/airflow/dags', include_examples=False)
for dag_id, errors in dagbag.import_errors.items():
    print(f'{dag_id}: {errors}')
"

Fix 3: Task Stuck in “Running” or “Queued”

A task shows “running” in the UI for hours with nothing actually executing — the worker process crashed, leaving orphaned state in the database.

Clear the task state to reschedule it:

# Mark a specific task instance as cleared (will re-run)
airflow tasks clear my_dag -t my_task_id --yes

# Clear all tasks in a DAG run
airflow tasks clear my_dag --dag-run-id manual__2025-01-01T00:00:00 --yes

# Set a specific task to failed (when you don't want it to re-run)
airflow tasks state my_dag my_task_id 2025-01-01
airflow dags trigger my_dag  # Then trigger a fresh run

For tasks stuck in “queued” — the executor is unable to pick up the task. Check the executor:

# Check which executor is configured
airflow config get-value core executor
# SequentialExecutor, LocalExecutor, CeleryExecutor, KubernetesExecutor

# For CeleryExecutor — check worker status
airflow celery worker --help
celery -A airflow.executors.celery_executor.app inspect active
celery -A airflow.executors.celery_executor.app inspect reserved

CeleryExecutor broker unreachable is a common cause of stuck queued tasks. The Celery broker (Redis or RabbitMQ) must be accessible:

# Test Redis broker connectivity (if AIRFLOW__CELERY__BROKER_URL uses redis://)
redis-cli -h your-redis-host ping
# Expected: PONG

For Redis connection errors, see Redis connection refused. For Celery worker issues, see Celery task not executing.

Zombie tasks — a task that the scheduler determines has lost its worker process — are automatically detected and marked as failed after task_adoption_timeout (default 10 minutes). If a task stays “running” beyond that without resolving, check the worker logs:

# Worker log location (configurable)
cat $AIRFLOW_HOME/logs/scheduler/latest/*.log

# Or for a specific task instance
airflow tasks logs my_dag my_task_id 2025-01-01T00:00:00

Fix 4: Scheduler Not Running or Not Scheduling Tasks

The most common silent failure — tasks don’t run because no scheduler process is active, but the UI still loads (the webserver runs independently of the scheduler).

Check if the scheduler is running:

# Check Airflow's internal health endpoint
airflow jobs check --job-type SchedulerJob

# Or check the process directly
ps aux | grep "airflow scheduler"

# Or via the health API
curl http://localhost:8080/health
# {"metadatabase": {"status": "healthy"}, "scheduler": {"status": "healthy", ...}}

Start the scheduler if it’s not running:

airflow scheduler
# Or in the background
airflow scheduler -D   # Daemon mode

In Docker Compose, the scheduler service must start after the database is ready and initialized. A common issue is the scheduler starting before airflow db migrate completes. Use health checks on the database service:

# docker-compose.yml
services:
  airflow-scheduler:
    image: apache/airflow:2.9.0
    command: scheduler
    depends_on:
      airflow-init:
        condition: service_completed_successfully
      postgres:
        condition: service_healthy
    environment:
      AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow

For Docker Compose dependency ordering issues, see docker-compose depends_on not working.

DAGs are paused by default on first load. Check and unpause:

airflow dags list | grep paused
airflow dags unpause my_dag

Common Mistake: Omitting catchup=False when start_date is in the past. Without it, Airflow schedules a run for every missed interval since start_date, which can generate hundreds of unexpected task instances:

with DAG(
    dag_id='my_dag',
    start_date=datetime(2023, 1, 1),   # Far in the past
    schedule='@daily',
    catchup=False,   # Without this: ~730 runs queued immediately
) as dag:
    ...

Fix 5: XCom Data Too Large

ERROR - Error pushing to XCom
sqlalchemy.exc.DataError: (psycopg2.errors.StringDataRightTruncation)
value too long for type character varying

XCom (cross-communication) stores task output in the metadata database. It’s designed for small values: task IDs, file paths, record counts. It’s not designed for DataFrames, large JSON blobs, or binary data.

The database backend limits:

  • SQLite: ~2KB practical limit
  • MySQL: 64KB (MEDIUMBLOB)
  • PostgreSQL: essentially unlimited (but degrades performance at scale)

Common mistake — pushing a DataFrame through XCom:

# WRONG — DataFrame pushed to database causes size errors and performance issues
def process_data(**context):
    df = pd.read_csv('s3://bucket/large_file.csv')
    df = df[df['status'] == 'active']
    context['ti'].xcom_push(key='result', value=df.to_json())   # Can be megabytes

# CORRECT — push a path, not the data
def process_data(**context):
    df = pd.read_csv('s3://bucket/large_file.csv')
    filtered = df[df['status'] == 'active']
    output_path = 's3://bucket/processed/active_users.parquet'
    filtered.to_parquet(output_path)
    context['ti'].xcom_push(key='output_path', value=output_path)   # Just a string

def next_task(**context):
    path = context['ti'].xcom_pull(key='output_path', task_ids='process_data')
    df = pd.read_parquet(path)

For production Airflow with large data, configure a custom XCom backend that stores data in S3 or GCS:

# airflow.cfg or environment variable
AIRFLOW__CORE__XCOM_BACKEND=airflow.providers.amazon.aws.xcom_backends.s3.S3XComBackend
AIRFLOW__AWS_S3_XCOM_BACKEND__BUCKET_NAME=your-xcom-bucket

Check what’s in XCom for a specific task:

airflow tasks xcom-list --dag-id my_dag --task-id my_task --run-id manual__2025-01-01T00:00:00

Fix 6: ConnectionNotFound — Missing conn_id

airflow.exceptions.AirflowNotFoundException: The conn_id `postgres_default` isn't defined

Airflow operators that interact with external systems require a connection registered in the metadata database. The conn_id in your operator code must match a connection stored in Airflow.

Register connections via the UI: Admin → Connections → ”+” → fill in conn_id, type, host, login, password, port.

Register via CLI:

airflow connections add 'postgres_production' \
    --conn-type 'postgres' \
    --conn-host 'db.example.com' \
    --conn-login 'airflow_user' \
    --conn-password 'secret' \
    --conn-port '5432' \
    --conn-schema 'warehouse'

Register via environment variable (best for Docker/Kubernetes — no UI needed):

# Format: AIRFLOW_CONN_{CONN_ID_UPPERCASE} = URI
export AIRFLOW_CONN_POSTGRES_PRODUCTION="postgresql://airflow_user:[email protected]:5432/warehouse"
export AIRFLOW_CONN_AWS_DEFAULT="aws://access_key:secret_key@?region_name=us-east-1"

Variables work the same way:

# Set via CLI
airflow variables set MY_ENV_KEY "production"

# Set via environment variable
export AIRFLOW_VAR_MY_ENV_KEY="production"

# Read in a DAG
from airflow.models import Variable
env = Variable.get("MY_ENV_KEY", default_var="staging")

Pro Tip: Prefer environment variables for connections and variables in containerized deployments. It avoids the “connection works on my laptop but not in prod” problem — the connection definition is part of the deployment config, not stored in a database that might not be migrated.

Fix 7: Database Issues — Schema Mismatch and Migration Errors

airflow.exceptions.AirflowException: Database schema mismatch.
Please run `airflow db migrate` to upgrade your schema.

This fires when you upgrade Airflow without running the schema migration. Always run the migration command before starting any Airflow components after an upgrade:

# Airflow 2.7+ (preferred)
airflow db migrate

# Older Airflow 2.x
airflow db upgrade

# Check current schema version
airflow db check

Initialize a fresh database (first-time setup only — destroys existing data):

airflow db init        # Older Airflow
airflow db migrate     # Airflow 2.7+

SQLite is for development only. The default connection (sqlite:///$AIRFLOW_HOME/airflow.db) doesn’t support concurrent writes. With multiple workers or the CeleryExecutor, SQLite causes database lock errors. Switch to PostgreSQL for production:

# airflow.cfg or environment variable
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=postgresql+psycopg2://user:password@host:5432/airflow

# Install the postgres dependency
pip install "apache-airflow[postgres]"
# or
pip install apache-airflow-providers-postgres

For PostgreSQL connection errors when Airflow can’t reach the metadata database, see PostgreSQL connection refused.

Fix 8: Common Operator Mistakes

BashOperator — environment variables not available:

from airflow.operators.bash import BashOperator

# WRONG — assumes environment variables from the shell session
task = BashOperator(
    task_id='run_script',
    bash_command='python my_script.py',   # MY_SECRET_KEY not available
)

# CORRECT — pass env explicitly
task = BashOperator(
    task_id='run_script',
    bash_command='python my_script.py',
    env={
        'MY_SECRET_KEY': '{{ var.value.my_secret }}',   # Jinja template
        'PATH': '/usr/local/bin:/usr/bin:/bin',          # Preserve PATH
    },
)

PythonOperator — can’t pickle the callable:

# WRONG — lambda and closures can't always be pickled for CeleryExecutor
task = PythonOperator(
    task_id='process',
    python_callable=lambda: print("hello"),   # Pickling error with Celery
)

# CORRECT — define a proper function at module level
def process_task(**context):
    print("hello")
    return "done"

task = PythonOperator(
    task_id='process',
    python_callable=process_task,
)

start_date must be static — dynamic dates cause the DAG to be treated as different each parse:

from datetime import datetime, timedelta

# WRONG — changes every time the file is parsed; causes schedule instability
default_args = {'start_date': datetime.now() - timedelta(days=1)}

# CORRECT — fixed past date
default_args = {'start_date': datetime(2024, 1, 1)}

@task decorator (TaskFlow API) — the modern Airflow 2.x pattern for Python tasks:

from airflow.decorators import dag, task
from datetime import datetime

@dag(schedule='@daily', start_date=datetime(2024, 1, 1), catchup=False)
def my_pipeline():
    @task
    def extract() -> dict:
        return {'value': 42}

    @task
    def transform(data: dict) -> str:
        return f"Processed: {data['value']}"

    @task
    def load(result: str):
        print(result)

    # XCom passing happens automatically via return values
    load(transform(extract()))

dag_instance = my_pipeline()

The TaskFlow API automatically handles XCom passing through return values — no xcom_push/xcom_pull needed for simple Python pipelines.

Still Not Working?

DAG Runs on Wrong Schedule

Airflow uses UTC for all scheduling. If your start_date is datetime(2024, 1, 1) and you’re in UTC-5, the first run happens at UTC midnight (7pm local time). Use pendulum for timezone-aware dates:

import pendulum

with DAG(
    dag_id='my_dag',
    start_date=pendulum.datetime(2024, 1, 1, tz='America/New_York'),
    schedule='@daily',
    catchup=False,
) as dag:
    ...

Important: The first DAG run occurs at start_date + schedule_interval. A DAG with start_date=2024-01-01 and schedule='@daily' first runs at 2024-01-02 00:00 UTC. This surprises many developers.

Dynamic DAG Generation

Dynamically generated DAGs (e.g., one DAG per database table) must still produce stable dag_id values. Variable dag_id values cause the scheduler to create new DAG entries on every parse:

# Generate multiple DAGs from a config
tables = ['orders', 'customers', 'products']

for table in tables:
    with DAG(
        dag_id=f'sync_{table}',           # Stable ID per table
        start_date=datetime(2024, 1, 1),
        schedule='@hourly',
        catchup=False,
    ) as dag:
        sync_task = PythonOperator(
            task_id=f'sync_{table}_data',
            python_callable=sync_table,
            op_kwargs={'table': table},
        )
    globals()[f'dag_{table}'] = dag       # Must register in globals()

Task Dependency and Trigger Rules

By default, a task only runs if all upstream tasks succeeded. Use trigger_rule for different behavior:

from airflow.utils.trigger_rule import TriggerRule

cleanup = PythonOperator(
    task_id='cleanup',
    python_callable=cleanup_fn,
    trigger_rule=TriggerRule.ALL_DONE,   # Runs even if upstream failed
)

Other useful rules: ONE_SUCCESS, ONE_FAILED, NONE_FAILED, NONE_SKIPPED.

Checking Logs and Diagnosing Silently Failed Tasks

Tasks can fail silently (no exception, just wrong output) or with errors buried in logs. Navigate directly to task logs in the UI: click the task square in Graph View → “Log” tab. Or via CLI:

airflow tasks logs my_dag my_task_id 2025-01-01T00:00:00+00:00

For Docker Compose deployments where the scheduler, webserver, and workers need proper service dependencies and health checks, see docker-compose depends_on not working.

F

FixDevs

Solo developer based in Japan. Every solution is cross-referenced with official documentation and tested before publishing.

Was this article helpful?

Related Articles