Fix: Prefect Not Working — Flow Deployment, Worker Errors, and 2.x to 3.x Migration
Quick Answer
How to fix Prefect errors — flow deployment not running, worker not picking up runs, PrefectHTTPStatusError cannot connect to API, task retries not working, state transitions stuck in Pending, and flow_run_name template not resolving.
The Error
You deploy a flow and it stays in the “Pending” state forever:
$ prefect flow-run ls
My flow - Pending (created 5 minutes ago)Or the Prefect client can’t connect to the API:
PrefectHTTPStatusError:
Failed to connect to server at http://127.0.0.1:4200/api
httpx.ConnectError: [Errno 111] Connection refusedOr you start a worker and it doesn’t pick up any runs:
$ prefect worker start --pool default
Worker started
# But queued flow runs never executeOr you migrate from Prefect 2 to 3 and imports break:
ImportError: cannot import name 'Deployment' from 'prefect.deployments'
AttributeError: module 'prefect' has no attribute 'Agent'Or task retries silently don’t happen despite being configured:
@task(retries=3)
def flaky_task():
raise Exception("fail")
# First failure → flow aborts, never retriesPrefect is the dynamic, code-first orchestrator — unlike Airflow’s static DAGs or Dagster’s asset-centric model, Prefect treats flows as regular Python functions that can do anything. This flexibility creates its own failure modes around deployment configuration, worker pools, and state management. This guide covers each.
Why This Happens
Prefect separates the flow code (what runs) from the deployment (how it runs). Deployments live on the Prefect server/cloud, specifying which work pool handles them. Work pools are managed by workers (in Prefect 3) or agents (in Prefect 2) that poll for work. If any of these three pieces — flow, deployment, worker — is misconfigured or not running, flow runs stay pending forever.
Prefect 3 (released August 2024) introduced breaking changes: Agent and the Deployment.build_from_flow pattern are gone, replaced by .deploy() and Workers. Tutorials written for Prefect 2 break on Prefect 3 immediately.
Fix 1: API Connection — Server Running and Configured
PrefectHTTPStatusError: Failed to connect to server at http://127.0.0.1:4200/apiPrefect needs an API backend (local server, cloud, or self-hosted server).
Start a local Prefect server:
# Start in foreground
prefect server start
# Opens at http://127.0.0.1:4200
# API at http://127.0.0.1:4200/apiSet the API URL so the client knows where to connect:
prefect config set PREFECT_API_URL=http://127.0.0.1:4200/apiVerify:
prefect config view
# PREFECT_API_URL: http://127.0.0.1:4200/api
prefect version
# Server version: 3.1.0Use Prefect Cloud instead of local server:
prefect cloud login # Opens browser for auth
# Or
prefect cloud login --key your-api-key --workspace your-workspaceprefect config view
# PREFECT_API_URL: https://api.prefect.cloud/api/accounts/.../workspaces/...Switch profiles to toggle between environments:
prefect profile create dev
prefect profile use dev
prefect config set PREFECT_API_URL=http://127.0.0.1:4200/api
prefect profile create prod
prefect profile use prod
prefect config set PREFECT_API_URL=https://api.prefect.cloud/api/...Common Mistake: Starting prefect server start in one terminal, opening the UI, and running flows from another terminal without setting PREFECT_API_URL. The client defaults to an “ephemeral” SQLite DB — your flow runs show up in a separate storage that the UI doesn’t see. Always set the API URL explicitly.
Fix 2: Defining and Running Flows
from prefect import flow, task
@task
def extract():
return [1, 2, 3, 4, 5]
@task
def transform(data):
return [x * 2 for x in data]
@task
def load(data):
print(f"Loaded: {data}")
@flow
def my_etl():
data = extract()
transformed = transform(data)
load(transformed)
if __name__ == "__main__":
my_etl() # Runs locally, logs to the configured Prefect APIFlow parameters with validation:
from prefect import flow
from pydantic import BaseModel
class Config(BaseModel):
source_table: str
date: str
@flow
def process(config: Config):
print(f"Processing {config.source_table} for {config.date}")
# Pydantic validates at call time
process(config=Config(source_table="orders", date="2025-04-09"))Task-level concurrency and timeouts:
from prefect import task
from datetime import timedelta
@task(
retries=3,
retry_delay_seconds=10,
timeout_seconds=300,
cache_key_fn=lambda ctx, params: f"extract-{params['date']}",
cache_expiration=timedelta(hours=1),
)
def extract(date: str):
return fetch_data(date)Subflows — flows calling other flows:
@flow
def subflow(x):
return x * 2
@flow
def main_flow():
result = subflow(5) # Runs as a child flow with its own state
print(result)Fix 3: Deployments in Prefect 3
In Prefect 3, use .deploy() directly on the flow:
from prefect import flow
@flow(log_prints=True)
def my_flow(name: str = "world"):
print(f"Hello, {name}")
if __name__ == "__main__":
my_flow.deploy(
name="hello-deployment",
work_pool_name="default-agent-pool",
schedule={"cron": "0 9 * * *", "timezone": "America/New_York"},
)From the CLI (prefect.yaml-based):
# Initialize a prefect.yaml file
prefect init
# Deploy
prefect deploy ./my_flow.py:my_flow --name hello --pool default-agent-poolprefect.yaml configuration:
# prefect.yaml
name: my-project
prefect-version: 3.1.0
deployments:
- name: hello-deployment
entrypoint: my_flow.py:my_flow
work_pool:
name: default-agent-pool
schedule:
cron: "0 9 * * *"
timezone: America/New_York
parameters:
name: "daily"Prefect 2 migration — old code looks like this:
# OLD (Prefect 2)
from prefect.deployments import Deployment
deployment = Deployment.build_from_flow(
flow=my_flow,
name="hello-deployment",
work_queue_name="default",
)
deployment.apply()
# NEW (Prefect 3)
my_flow.deploy(
name="hello-deployment",
work_pool_name="default-agent-pool",
)Key API changes (2.x → 3.x):
| Prefect 2 | Prefect 3 |
|---|---|
Deployment.build_from_flow() | flow.deploy() or prefect deploy |
Agent | Worker |
work_queue_name | work_pool_name |
prefect agent start | prefect worker start |
prefect.deployments.Deployment | (removed) |
Fix 4: Workers and Work Pools
A deployment needs a running worker to pick up and execute flow runs.
Create a work pool:
prefect work-pool create default-agent-pool --type processWork pool types:
| Type | Runs flows on |
|---|---|
process | Local processes |
docker | Docker containers |
kubernetes | Kubernetes pods |
ecs | AWS ECS |
cloud-run | GCP Cloud Run |
aci | Azure Container Instances |
Start a worker for the pool:
prefect worker start --pool default-agent-poolThe worker polls for scheduled flow runs and executes them. Without a running worker, deployments stay pending.
Verify worker is active:
prefect work-pool inspect default-agent-pool
# Shows worker heartbeat, last polledPro Tip: For development, always check three things when a flow stays in “Pending”: (1) is the server running?, (2) is there a work pool?, (3) is a worker polling that pool? All three must be running simultaneously. In production, workers typically run as long-lived processes under systemd, Docker, or Kubernetes.
Run workers in background with systemd:
# /etc/systemd/system/prefect-worker.service
[Unit]
Description=Prefect Worker
After=network.target
[Service]
User=prefect
WorkingDirectory=/opt/prefect
Environment="PREFECT_API_URL=https://api.prefect.cloud/api/..."
Environment="PREFECT_API_KEY=pnu_xxx"
ExecStart=/opt/prefect/venv/bin/prefect worker start --pool default-pool
Restart=always
[Install]
WantedBy=multi-user.targetsudo systemctl enable prefect-worker
sudo systemctl start prefect-workerFix 5: Task Retries and Error Handling
from prefect import task, flow
from datetime import timedelta
@task(
retries=3,
retry_delay_seconds=[10, 30, 60], # Exponential backoff
retry_jitter_factor=0.5,
)
def flaky_api_call():
import requests
response = requests.get("https://api.example.com/data", timeout=5)
response.raise_for_status()
return response.json()Retry on specific exceptions only:
from prefect.tasks import exponential_backoff
@task(
retries=5,
retry_delay_seconds=exponential_backoff(backoff_factor=2),
retry_condition_fn=lambda task, task_run, state: isinstance(
state.result(raise_on_failure=False), ConnectionError
),
)
def network_task():
...Common Mistake: Setting retries=3 and assuming all exceptions will trigger retry. Prefect only retries on exceptions — not on TimeoutError raised by a task’s own timeout_seconds. And retries don’t fire if the flow itself is cancelled. Test retry behavior explicitly rather than assuming.
Task failure handling without aborting the flow:
from prefect import flow, task
from prefect.states import Completed
@task
def sub_task(x):
if x == 3:
raise ValueError("bad value")
return x * 2
@flow
def parent_flow():
results = []
for i in range(5):
future = sub_task.submit(i, return_state=True) # Get state, don't raise
state = future.result()
if state.is_completed():
results.append(state.result())
else:
print(f"Task {i} failed, continuing")
return resultsFix 6: Scheduling and Parameterized Runs
Cron schedule:
from prefect import flow
from prefect.schedules import CronSchedule
@flow
def daily_job():
...
daily_job.deploy(
name="daily",
work_pool_name="default-agent-pool",
schedule=CronSchedule(cron="0 9 * * *", timezone="America/New_York"),
)Interval schedule:
from prefect.schedules import IntervalSchedule
from datetime import timedelta
my_flow.deploy(
name="every-5-min",
work_pool_name="default-agent-pool",
schedule=IntervalSchedule(interval=timedelta(minutes=5)),
)Multiple schedules on one deployment:
my_flow.deploy(
name="multi-schedule",
work_pool_name="default-agent-pool",
schedules=[
{"cron": "0 9 * * MON-FRI"}, # Weekdays at 9 AM
{"cron": "0 12 * * SAT,SUN"}, # Weekends at noon
],
)Parameterize runs:
@flow
def process_date(date: str):
print(f"Processing {date}")
# Deploy with default parameters
process_date.deploy(
name="process-date",
work_pool_name="default-agent-pool",
parameters={"date": "2025-04-09"},
)
# Trigger ad-hoc with different parameters
# prefect deployment run process-date/process-date --param date=2025-04-10Dynamic flow run names:
@flow(flow_run_name="process-{date}") # Template with parameter values
def process(date: str):
...
# Run name becomes "process-2025-04-09" etc.Fix 7: Variables, Secrets, and Blocks
Variables (non-secret config):
prefect variable set my_config_key "some value"from prefect.variables import Variable
@flow
def my_flow():
config = Variable.get("my_config_key")
...Secrets (stored encrypted in the Prefect backend):
from prefect.blocks.system import Secret
# Save once
Secret(value="sk-...").save(name="openai-api-key")
# Load in flow
@flow
def my_flow():
api_key = Secret.load("openai-api-key").get()
...Custom blocks for reusable configuration:
from prefect.blocks.core import Block
from pydantic import SecretStr
class DatabaseConfig(Block):
host: str
port: int = 5432
username: str
password: SecretStr
# Save
DatabaseConfig(
host="db.example.com",
username="prefect",
password="secret",
).save(name="prod-db")
# Load
db = DatabaseConfig.load("prod-db")Environment variables for runtime config:
export PREFECT_API_URL=...
export PREFECT_API_KEY=...
export OPENAI_API_KEY=sk-...Fix 8: Concurrency and Resource Management
Global concurrency limits:
# Limit any "database" task to 5 concurrent runs system-wide
prefect concurrency-limit create database 5@task(tags=["database"])
def query(sql):
...Work pool concurrency:
prefect work-pool set-concurrency-limit default-agent-pool 10Task concurrency within a flow:
from prefect import flow
from prefect.futures import wait
@flow
def parallel_flow(urls):
futures = [fetch_url.submit(url) for url in urls]
wait(futures) # Wait for all to complete
return [f.result() for f in futures]Serial vs parallel task execution:
@flow
def sequential():
a = task_a()
b = task_b(a) # Waits for a
c = task_c(b) # Waits for b
return c
@flow
def parallel():
a = task_a.submit()
b = task_b.submit()
c = task_c.submit()
return [a.result(), b.result(), c.result()] # All three in parallelStill Not Working?
Prefect vs Airflow vs Dagster
- Prefect — Dynamic Python-first API, best for workflows that don’t fit static DAGs. See this article.
- Airflow — Operator-based, largest ecosystem, static DAGs. See Airflow not working.
- Dagster — Asset-centric with strong dev UX. See Dagster not working.
Debugging Flow Runs
from prefect import get_run_logger
@flow
def my_flow():
logger = get_run_logger()
logger.info("Starting flow")
logger.error("Something bad") # Appears in UI and server logslog_prints=True automatically logs all print() calls:
@flow(log_prints=True)
def my_flow():
print("This appears in the Prefect UI") # Captured as INFO logIntegration with dbt
from prefect_dbt.cli.commands import DbtCoreOperation
@flow
def run_dbt():
DbtCoreOperation(
commands=["dbt run --select raw_orders"],
project_dir="/path/to/dbt/project",
).run()For dbt-specific errors, see dbt not working.
Storage for Flow Code
In production, workers pull flow code from remote storage (GitHub, S3, GCS) rather than shipping with the worker:
from prefect.filesystems import GitHub
github_block = GitHub(
repository="https://github.com/org/prefect-flows",
reference="main",
).save("github-flows", overwrite=True)
my_flow.from_source(
source="https://github.com/org/prefect-flows",
entrypoint="flows/my_flow.py:my_flow",
).deploy(
name="github-deployment",
work_pool_name="default-agent-pool",
)Artifacts and Run Metadata
Prefect lets you attach structured artifacts to flow runs — visible in the UI:
from prefect import flow
from prefect.artifacts import create_markdown_artifact, create_table_artifact, create_link_artifact
@flow
def my_flow():
# Markdown summary
create_markdown_artifact(
key="run-summary",
markdown="## Results\n- Rows processed: 1,234\n- Errors: 0",
)
# Table
create_table_artifact(
key="top-10-users",
table=[
{"user": "alice", "score": 95},
{"user": "bob", "score": 87},
],
)
# External link (reports, dashboards)
create_link_artifact(
key="dashboard",
link="https://grafana.example.com/d/...",
description="Live metrics dashboard",
)Event-Driven Triggers
Prefect 3 supports automation rules — trigger flows based on custom conditions:
from prefect.automations import Automation
from prefect.events.schemas.automations import EventTrigger, EventFilter
Automation(
name="retry-on-failure",
trigger=EventTrigger(
expect={"prefect.flow-run.Failed"},
match={"prefect.resource.id": "prefect.flow-run.*"},
),
actions=[{"type": "run-deployment", "deployment_id": "..."}],
).create()This replaces the Prefect 2 notifications system with a more general event model.
Testing Flows
from prefect.testing.utilities import prefect_test_harness
def test_flow():
with prefect_test_harness():
result = my_flow(name="test")
assert result == "expected"For general pytest fixture patterns that affect Prefect testing, see pytest fixture not found.
Cancelling and Pausing Flows
# From the CLI
prefect flow-run cancel <run-id>
# Pause a deployment's schedule without deleting it
prefect deployment pause my-flow/my-deployment
prefect deployment resume my-flow/my-deploymentIn the UI, every flow run has a “Cancel” button. Workers check for cancellation signals between tasks — long-running individual tasks may complete before cancellation takes effect.
Prefect Cloud Free Tier Limits
The free Prefect Cloud tier has limits: flow run counts per month, concurrent workers, automation rules. If you’re building a production system, check the current pricing page before relying on the free tier — some production workloads exceed free limits within days.
For self-hosted alternatives, run prefect server start on your own infrastructure with a PostgreSQL backend:
export PREFECT_API_DATABASE_CONNECTION_URL="postgresql+asyncpg://user:pass@host/prefect"
prefect server startSolo developer based in Japan. Every solution is cross-referenced with official documentation and tested before publishing.
Was this article helpful?
Related Articles
Fix: Dagster Not Working — Asset Loading, Resource Errors, and Daemon Issues
How to fix Dagster errors — asset not found in definitions, resource not defined, dagster daemon not running, sensor or schedule not firing, DagsterInvariantViolationError, and asset materialization failing.
Fix: Apache Airflow Not Working — DAG Not Found, Task Failures, and Scheduler Issues
How to fix Apache Airflow errors — DAG not appearing in UI, ImportError preventing DAG load, task stuck in running or queued, scheduler not scheduling, XCom too large, connection not found, and database migration errors.
Fix: dbt Not Working — ref() Not Found, Schema Mismatch, and Compilation Errors
How to fix dbt errors — ref() model not found, profile not found, database relation does not exist, incremental model schema mismatch requiring full-refresh, dbt deps failure, Jinja compilation errors, and test failures.
Fix: aiohttp Not Working — Session Leaks, ClientTimeout, and Connector Errors
How to fix aiohttp errors — RuntimeError session is closed, ClientConnectorError connection refused, SSL verify failure, Unclosed client session warning, server websocket disconnect, and connector pool exhausted.