Fix: Apache Airflow Not Working — DAG Not Found, Task Failures, and Scheduler Issues
Part of: Python Errors
Quick Answer
How to fix Apache Airflow errors — DAG not appearing in UI, ImportError preventing DAG load, task stuck in running or queued, scheduler not scheduling, XCom too large, connection not found, and database migration errors.
The Error
Your DAG file is in the right folder but it never appears in the Airflow UI. Or it appears but the scheduler ignores it. Or a task runs for hours showing “running” in the UI with nothing actually executing. Or the whole webserver fails to start:
airflow.exceptions.AirflowException: Database schema mismatch.
Please run `airflow db migrate` to upgrade your schema.Or a task silently moves to “failed” with just:
ERROR - Lost connection to MySQL server during queryAirflow is a distributed system — scheduler, webserver, workers, and a metadata database all run as separate processes. When any layer breaks, the error surface is wide. This guide covers the most common failure modes and how to diagnose each one.
Why This Happens
Airflow separates concerns across components: the scheduler parses DAG files and queues task instances, the webserver serves the UI, the executor runs tasks (sequentially, via Celery, Kubernetes, etc.), and the metadata database tracks all state. A failure in any one component can produce misleading symptoms in another.
The most common root causes: DAG files that fail to import, a scheduler process that isn’t running, task state stuck in the database after a worker crash, and mismatches between database schema and Airflow version.
Fix 1: DAG Not Appearing in the UI
This is the most common Airflow issue. The DAG file is in the dags folder but the UI shows nothing.
Step 1: Check import errors directly:
# List all DAGs that loaded successfully
airflow dags list
# List all DAG files that failed to import
airflow dags list-import-errorsAny Python error in your DAG file — syntax error, missing import, wrong variable name — prevents the entire file from loading. The UI shows an “Import Errors” tab at the top if any files failed.
Step 2: Verify the DAG folder path:
airflow config get-value core dags_folder
# or
echo $AIRFLOW__CORE__DAGS_FOLDERThe DAG file must be inside this directory (or a subdirectory, if AIRFLOW__CORE__DAG_DIR_LIST_INTERVAL is set to scan subdirectories).
Step 3: Check dag_discovery_safe_mode. When enabled (the default), Airflow only parses files that contain the strings "DAG" or "airflow" somewhere in the file. A utility module that doesn’t mention these strings won’t be parsed — which is intentional — but a DAG file accidentally missing both strings won’t load:
airflow config get-value core dag_discovery_safe_mode
# If True, your .py file must contain 'DAG' or 'airflow' somewhere in the textStep 4: Verify your DAG object is accessible at module level. The scheduler imports your file and looks for DAG instances. The DAG must be assigned to a variable at the module’s top level:
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
# WRONG — DAG defined inside a function; scheduler can't find it
def create_dag():
with DAG('my_dag', start_date=datetime(2024, 1, 1)) as dag:
...
return dag
# CORRECT — DAG at module level
with DAG(
dag_id='my_dag',
start_date=datetime(2024, 1, 1),
schedule='@daily',
catchup=False,
) as dag:
task = PythonOperator(task_id='say_hello', python_callable=lambda: print("hello"))Step 5: Wait for the scheduler to pick it up. The scheduler re-scans the DAG folder every min_file_process_interval seconds (default: 30). After adding a new file, it can take up to a minute to appear. Force a refresh:
airflow dags reserializeFix 2: ImportError Prevents DAG From Loading
DagFileProcessorProcess: Failed to import: /opt/airflow/dags/my_dag.py
Traceback (most recent call last):
File "/opt/airflow/dags/my_dag.py", line 3, in <module>
from airflow.operators.bash import BashOperator
ModuleNotFoundError: No module named 'apache-airflow-providers-bash'Any unhandled exception during import — including a missing Airflow provider — silently removes the DAG from the UI.
Install missing providers:
# Airflow 2.x uses provider packages for external integrations
pip install apache-airflow-providers-bash # BashOperator
pip install apache-airflow-providers-postgres # PostgresOperator
pip install apache-airflow-providers-amazon # S3, EMR, etc.
pip install apache-airflow-providers-google # GCP operators
pip install apache-airflow-providers-http # SimpleHttpOperator
# List installed providers
airflow providers listAirflow 2.x import paths changed from 1.x. If you’re migrating from Airflow 1.x:
# OLD — Airflow 1.x
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.sensors.http_sensor import HttpSensor
# NEW — Airflow 2.x (providers package)
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.providers.http.sensors.http import HttpSensorTest your DAG file imports in isolation:
# Run the file directly — any import error appears immediately
python /opt/airflow/dags/my_dag.py
# Or use the Airflow CLI to trigger parsing with full error output
python -c "
from airflow.models import DagBag
dagbag = DagBag(dag_folder='/opt/airflow/dags', include_examples=False)
for dag_id, errors in dagbag.import_errors.items():
print(f'{dag_id}: {errors}')
"Fix 3: Task Stuck in “Running” or “Queued”
A task shows “running” in the UI for hours with nothing actually executing — the worker process crashed, leaving orphaned state in the database.
Clear the task state to reschedule it:
# Mark a specific task instance as cleared (will re-run)
airflow tasks clear my_dag -t my_task_id --yes
# Clear all tasks in a DAG run
airflow tasks clear my_dag --dag-run-id manual__2025-01-01T00:00:00 --yes
# Set a specific task to failed (when you don't want it to re-run)
airflow tasks state my_dag my_task_id 2025-01-01
airflow dags trigger my_dag # Then trigger a fresh runFor tasks stuck in “queued” — the executor is unable to pick up the task. Check the executor:
# Check which executor is configured
airflow config get-value core executor
# SequentialExecutor, LocalExecutor, CeleryExecutor, KubernetesExecutor
# For CeleryExecutor — check worker status
airflow celery worker --help
celery -A airflow.executors.celery_executor.app inspect active
celery -A airflow.executors.celery_executor.app inspect reservedCeleryExecutor broker unreachable is a common cause of stuck queued tasks. The Celery broker (Redis or RabbitMQ) must be accessible:
# Test Redis broker connectivity (if AIRFLOW__CELERY__BROKER_URL uses redis://)
redis-cli -h your-redis-host ping
# Expected: PONGFor Redis connection errors, check that the broker host is reachable from the worker container and that the Redis password matches the URL. For Celery worker issues, see Celery task not executing.
Zombie tasks — a task that the scheduler determines has lost its worker process — are automatically detected and marked as failed after task_adoption_timeout (default 10 minutes). If a task stays “running” beyond that without resolving, check the worker logs:
# Worker log location (configurable)
cat $AIRFLOW_HOME/logs/scheduler/latest/*.log
# Or for a specific task instance
airflow tasks logs my_dag my_task_id 2025-01-01T00:00:00Fix 4: Scheduler Not Running or Not Scheduling Tasks
The most common silent failure — tasks don’t run because no scheduler process is active, but the UI still loads (the webserver runs independently of the scheduler).
Check if the scheduler is running:
# Check Airflow's internal health endpoint
airflow jobs check --job-type SchedulerJob
# Or check the process directly
ps aux | grep "airflow scheduler"
# Or via the health API
curl http://localhost:8080/health
# {"metadatabase": {"status": "healthy"}, "scheduler": {"status": "healthy", ...}}Start the scheduler if it’s not running:
airflow scheduler
# Or in the background
airflow scheduler -D # Daemon modeIn Docker Compose, the scheduler service must start after the database is ready and initialized. A common issue is the scheduler starting before airflow db migrate completes. Use health checks on the database service:
# docker-compose.yml
services:
airflow-scheduler:
image: apache/airflow:2.9.0
command: scheduler
depends_on:
airflow-init:
condition: service_completed_successfully
postgres:
condition: service_healthy
environment:
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflowFor Docker Compose dependency ordering issues, see docker-compose depends_on not working.
DAGs are paused by default on first load. Check and unpause:
airflow dags list | grep paused
airflow dags unpause my_dagCommon Mistake: Omitting catchup=False when start_date is in the past. Without it, Airflow schedules a run for every missed interval since start_date, which can generate hundreds of unexpected task instances:
with DAG(
dag_id='my_dag',
start_date=datetime(2023, 1, 1), # Far in the past
schedule='@daily',
catchup=False, # Without this: ~730 runs queued immediately
) as dag:
...Fix 5: XCom Data Too Large
ERROR - Error pushing to XCom
sqlalchemy.exc.DataError: (psycopg2.errors.StringDataRightTruncation)
value too long for type character varyingXCom (cross-communication) stores task output in the metadata database. It’s designed for small values: task IDs, file paths, record counts. It’s not designed for DataFrames, large JSON blobs, or binary data.
The database backend limits:
- SQLite: ~2KB practical limit
- MySQL: 64KB (
MEDIUMBLOB) - PostgreSQL: essentially unlimited (but degrades performance at scale)
Common mistake — pushing a DataFrame through XCom:
# WRONG — DataFrame pushed to database causes size errors and performance issues
def process_data(**context):
df = pd.read_csv('s3://bucket/large_file.csv')
df = df[df['status'] == 'active']
context['ti'].xcom_push(key='result', value=df.to_json()) # Can be megabytes
# CORRECT — push a path, not the data
def process_data(**context):
df = pd.read_csv('s3://bucket/large_file.csv')
filtered = df[df['status'] == 'active']
output_path = 's3://bucket/processed/active_users.parquet'
filtered.to_parquet(output_path)
context['ti'].xcom_push(key='output_path', value=output_path) # Just a string
def next_task(**context):
path = context['ti'].xcom_pull(key='output_path', task_ids='process_data')
df = pd.read_parquet(path)For production Airflow with large data, configure a custom XCom backend that stores data in S3 or GCS:
# airflow.cfg or environment variable
AIRFLOW__CORE__XCOM_BACKEND=airflow.providers.amazon.aws.xcom_backends.s3.S3XComBackend
AIRFLOW__AWS_S3_XCOM_BACKEND__BUCKET_NAME=your-xcom-bucketCheck what’s in XCom for a specific task:
airflow tasks xcom-list --dag-id my_dag --task-id my_task --run-id manual__2025-01-01T00:00:00Fix 6: ConnectionNotFound — Missing conn_id
airflow.exceptions.AirflowNotFoundException: The conn_id `postgres_default` isn't definedAirflow operators that interact with external systems require a connection registered in the metadata database. The conn_id in your operator code must match a connection stored in Airflow.
Register connections via the UI: Admin → Connections → ”+” → fill in conn_id, type, host, login, password, port.
Register via CLI:
airflow connections add 'postgres_production' \
--conn-type 'postgres' \
--conn-host 'db.example.com' \
--conn-login 'airflow_user' \
--conn-password 'secret' \
--conn-port '5432' \
--conn-schema 'warehouse'Register via environment variable (best for Docker/Kubernetes — no UI needed):
# Format: AIRFLOW_CONN_{CONN_ID_UPPERCASE} = URI
export AIRFLOW_CONN_POSTGRES_PRODUCTION="postgresql://airflow_user:[email protected]:5432/warehouse"
export AIRFLOW_CONN_AWS_DEFAULT="aws://access_key:secret_key@?region_name=us-east-1"Variables work the same way:
# Set via CLI
airflow variables set MY_ENV_KEY "production"
# Set via environment variable
export AIRFLOW_VAR_MY_ENV_KEY="production"
# Read in a DAG
from airflow.models import Variable
env = Variable.get("MY_ENV_KEY", default_var="staging")Pro Tip: Prefer environment variables for connections and variables in containerized deployments. It avoids the “connection works on my laptop but not in prod” problem — the connection definition is part of the deployment config, not stored in a database that might not be migrated.
Fix 7: Database Issues — Schema Mismatch and Migration Errors
airflow.exceptions.AirflowException: Database schema mismatch.
Please run `airflow db migrate` to upgrade your schema.This fires when you upgrade Airflow without running the schema migration. Always run the migration command before starting any Airflow components after an upgrade:
# Airflow 2.7+ (preferred)
airflow db migrate
# Older Airflow 2.x
airflow db upgrade
# Check current schema version
airflow db checkInitialize a fresh database (first-time setup only — destroys existing data):
airflow db init # Older Airflow
airflow db migrate # Airflow 2.7+SQLite is for development only. The default connection (sqlite:///$AIRFLOW_HOME/airflow.db) doesn’t support concurrent writes. With multiple workers or the CeleryExecutor, SQLite causes database lock errors. Switch to PostgreSQL for production:
# airflow.cfg or environment variable
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=postgresql+psycopg2://user:password@host:5432/airflow
# Install the postgres dependency
pip install "apache-airflow[postgres]"
# or
pip install apache-airflow-providers-postgresFor PostgreSQL connection errors when Airflow can’t reach the metadata database, confirm the pg_hba.conf rules allow the Airflow host and that the credentials in AIRFLOW__DATABASE__SQL_ALCHEMY_CONN match a real role.
Fix 8: Common Operator Mistakes
BashOperator — environment variables not available:
from airflow.operators.bash import BashOperator
# WRONG — assumes environment variables from the shell session
task = BashOperator(
task_id='run_script',
bash_command='python my_script.py', # MY_SECRET_KEY not available
)
# CORRECT — pass env explicitly
task = BashOperator(
task_id='run_script',
bash_command='python my_script.py',
env={
'MY_SECRET_KEY': '{{ var.value.my_secret }}', # Jinja template
'PATH': '/usr/local/bin:/usr/bin:/bin', # Preserve PATH
},
)PythonOperator — can’t pickle the callable:
# WRONG — lambda and closures can't always be pickled for CeleryExecutor
task = PythonOperator(
task_id='process',
python_callable=lambda: print("hello"), # Pickling error with Celery
)
# CORRECT — define a proper function at module level
def process_task(**context):
print("hello")
return "done"
task = PythonOperator(
task_id='process',
python_callable=process_task,
)start_date must be static — dynamic dates cause the DAG to be treated as different each parse:
from datetime import datetime, timedelta
# WRONG — changes every time the file is parsed; causes schedule instability
default_args = {'start_date': datetime.now() - timedelta(days=1)}
# CORRECT — fixed past date
default_args = {'start_date': datetime(2024, 1, 1)}@task decorator (TaskFlow API) — the modern Airflow 2.x pattern for Python tasks:
from airflow.decorators import dag, task
from datetime import datetime
@dag(schedule='@daily', start_date=datetime(2024, 1, 1), catchup=False)
def my_pipeline():
@task
def extract() -> dict:
return {'value': 42}
@task
def transform(data: dict) -> str:
return f"Processed: {data['value']}"
@task
def load(result: str):
print(result)
# XCom passing happens automatically via return values
load(transform(extract()))
dag_instance = my_pipeline()The TaskFlow API automatically handles XCom passing through return values — no xcom_push/xcom_pull needed for simple Python pipelines.
Still Not Working?
DAG Runs on Wrong Schedule
Airflow uses UTC for all scheduling. If your start_date is datetime(2024, 1, 1) and you’re in UTC-5, the first run happens at UTC midnight (7pm local time). Use pendulum for timezone-aware dates:
import pendulum
with DAG(
dag_id='my_dag',
start_date=pendulum.datetime(2024, 1, 1, tz='America/New_York'),
schedule='@daily',
catchup=False,
) as dag:
...Important: The first DAG run occurs at start_date + schedule_interval. A DAG with start_date=2024-01-01 and schedule='@daily' first runs at 2024-01-02 00:00 UTC. This surprises many developers.
Dynamic DAG Generation
Dynamically generated DAGs (e.g., one DAG per database table) must still produce stable dag_id values. Variable dag_id values cause the scheduler to create new DAG entries on every parse:
# Generate multiple DAGs from a config
tables = ['orders', 'customers', 'products']
for table in tables:
with DAG(
dag_id=f'sync_{table}', # Stable ID per table
start_date=datetime(2024, 1, 1),
schedule='@hourly',
catchup=False,
) as dag:
sync_task = PythonOperator(
task_id=f'sync_{table}_data',
python_callable=sync_table,
op_kwargs={'table': table},
)
globals()[f'dag_{table}'] = dag # Must register in globals()Task Dependency and Trigger Rules
By default, a task only runs if all upstream tasks succeeded. Use trigger_rule for different behavior:
from airflow.utils.trigger_rule import TriggerRule
cleanup = PythonOperator(
task_id='cleanup',
python_callable=cleanup_fn,
trigger_rule=TriggerRule.ALL_DONE, # Runs even if upstream failed
)Other useful rules: ONE_SUCCESS, ONE_FAILED, NONE_FAILED, NONE_SKIPPED.
Checking Logs and Diagnosing Silently Failed Tasks
Tasks can fail silently (no exception, just wrong output) or with errors buried in logs. Navigate directly to task logs in the UI: click the task square in Graph View → “Log” tab. Or via CLI:
airflow tasks logs my_dag my_task_id 2025-01-01T00:00:00+00:00For Docker Compose deployments where the scheduler, webserver, and workers need proper service dependencies and health checks, see docker-compose depends_on not working.
DAG Parsing Taking Tens of Seconds Per File
The scheduler logs DagFileProcessor took X seconds to parse warnings when a single DAG file imports heavy libraries at module level. Anything you import at the top of a DAG runs on every parse loop. Move pandas, boto3, ORM model classes, or HTTP clients into the function body or @task body. If you must import something heavy at module level, gate it behind if TYPE_CHECKING: or split it into a non-DAG helper module. Raise min_file_process_interval to 60 or 120 seconds if you have hundreds of DAG files.
Task Heartbeat Lost in Long-Running Tasks
A task running for more than scheduler_zombie_task_threshold seconds (default 300) without a heartbeat is marked as a zombie and killed. This usually happens when a synchronous third-party call blocks the worker process. Convert the operator to a deferrable operator (Airflow 2.2+) so the wait happens on the triggerer, not the worker — the task gets a next_method and surrenders its worker slot. Built-in deferrable versions exist for TimeSensor, S3KeySensor, BigQueryInsertJobOperator, and most provider sensors as *Async variants.
Webserver Returns 502 Behind a Load Balancer
ALB or nginx in front of the Airflow webserver returns 502 when the gunicorn worker takes longer than the LB idle timeout (60s on AWS ALB by default). Increase AIRFLOW__WEBSERVER__WEB_SERVER_WORKER_TIMEOUT to match, and raise the LB idle timeout to 120s. If the UI loads but the Graph View hangs, the slow path is the /get_task_logs endpoint pulling from S3 — switch the remote log backend to use the IAM role of the webserver pod, not a deprecated aws_default connection.
Platform-Specific Differences
Airflow behaves very differently depending on where and how you run it. The failure modes you hit on a local Docker Compose stack are not the same as the ones you hit on MWAA, Composer, or Astronomer, and the symptoms above can mean completely different things per platform.
Airflow 2.x vs Airflow 3 Beta
Airflow 3 introduces a separate API server, a redesigned task SDK, and asset-based scheduling. The schedule_interval argument is removed in favor of schedule. DAGs written against 3.0 will not load on 2.x without import shims, and providers must be the 3.x-compatible release. Pin your provider versions explicitly in requirements.txt — letting pip install apache-airflow-providers-amazon resolve freely is the fastest way to break a 2.x worker. On 2.9+, you can stay on 2.x and still get most of the deferrable-operator benefits without the 3.x migration cost.
Docker Compose vs Celery Executor vs Kubernetes Executor
docker-compose with LocalExecutor is fine for development but executes all tasks inside the scheduler container. CPU-heavy tasks will starve the scheduler loop, which then misses heartbeats and marks unrelated tasks as zombies. Move to CeleryExecutor with a separate worker service for any real workload.
CeleryExecutor requires a broker (Redis or RabbitMQ) and a result backend. The most common production failure is the broker dropping idle connections — set broker_pool_limit low and enable broker_connection_retry_on_startup = True in airflow.cfg. Worker startup loops usually trace back to the broker URL being unreachable from inside the worker container, not from your laptop.
KubernetesExecutor runs each task as a pod. Tasks stuck in “queued” usually mean the pod cannot be scheduled — check kubectl get events -n airflow for FailedScheduling. Pods that never report back are usually killed by node memory pressure; see Kubernetes OOMKilled for diagnosing and raising the pod-level memory request in your pod_template_file.
MWAA, Cloud Composer, and Astronomer
Managed services hide the scheduler and webserver, which changes what you can debug. On MWAA, you cannot airflow scheduler or airflow db migrate — schema upgrades happen on environment upgrade, and provider packages must be declared in the requirements.txt uploaded to S3, not installed via pip at runtime. On Cloud Composer, the gcloud composer environments storage dags import command is the only supported DAG sync; copying directly into the GCS bucket sometimes lags by a minute because of bucket eventing. On Astronomer, astro dev start runs Postgres + scheduler + webserver in a single docker-compose, and astro deploy builds an image and pushes it — provider changes require an image rebuild, not a UI restart. None of these platforms expose the metadata DB directly, so XCom backend overrides must use S3 or GCS, not a custom SQL table.
dbt-Airflow Integration
The most common pairing is dbt-airflow or the official cosmos library. Both parse manifest.json from dbt compile and generate one Airflow task per dbt model. If your DAG suddenly shows hundreds of new tasks or none at all, the manifest path is wrong or the manifest is stale. Always run dbt parse (cheap) in a setup task before the Cosmos DbtTaskGroup, and store manifest.json in object storage so workers on different nodes see the same file. Running dbt run from a BashOperator instead of Cosmos works but loses per-model retries and lineage in the Airflow UI — switch to Cosmos as soon as you have more than ~20 models. For broader dbt failures, see dbt not working.
Solo developer based in Japan. Every solution is cross-referenced with official documentation and tested before publishing.
Was this article helpful?
Related Articles
Fix: Dagster Not Working — Asset Loading, Resource Errors, and Daemon Issues
How to fix Dagster errors — asset not found in definitions, resource not defined, dagster daemon not running, sensor or schedule not firing, DagsterInvariantViolationError, and asset materialization failing.
Fix: Prefect Not Working — Flow Deployment, Worker Errors, and 2.x to 3.x Migration
How to fix Prefect errors — flow deployment not running, worker not picking up runs, PrefectHTTPStatusError cannot connect to API, task retries not working, state transitions stuck in Pending, and flow_run_name template not resolving.
Fix: dbt Not Working — ref() Not Found, Schema Mismatch, and Compilation Errors
How to fix dbt errors — ref() model not found, profile not found, database relation does not exist, incremental model schema mismatch requiring full-refresh, dbt deps failure, Jinja compilation errors, and test failures.
Fix: DuckDB Not Working — File Lock Conflicts, Out of Memory, Extensions, and Parquet/S3 Reads
How to fix DuckDB errors — IOException database is locked, OutOfMemoryException on large queries, httpfs extension not loaded, secret manager for S3, Pandas/Polars zero-copy, and concurrent writer limits.