Skip to content

Fix: Prefect Not Working — Flow Deployment, Worker Errors, and 2.x to 3.x Migration

FixDevs ·

Quick Answer

How to fix Prefect errors — flow deployment not running, worker not picking up runs, PrefectHTTPStatusError cannot connect to API, task retries not working, state transitions stuck in Pending, and flow_run_name template not resolving.

The Error

You deploy a flow and it stays in the “Pending” state forever:

$ prefect flow-run ls
My flow - Pending (created 5 minutes ago)

Or the Prefect client can’t connect to the API:

PrefectHTTPStatusError:
Failed to connect to server at http://127.0.0.1:4200/api
httpx.ConnectError: [Errno 111] Connection refused

Or you start a worker and it doesn’t pick up any runs:

$ prefect worker start --pool default
Worker started
# But queued flow runs never execute

Or you migrate from Prefect 2 to 3 and imports break:

ImportError: cannot import name 'Deployment' from 'prefect.deployments'
AttributeError: module 'prefect' has no attribute 'Agent'

Or task retries silently don’t happen despite being configured:

@task(retries=3)
def flaky_task():
    raise Exception("fail")
# First failure → flow aborts, never retries

Prefect is the dynamic, code-first orchestrator — unlike Airflow’s static DAGs or Dagster’s asset-centric model, Prefect treats flows as regular Python functions that can do anything. This flexibility creates its own failure modes around deployment configuration, worker pools, and state management. This guide covers each.

Why This Happens

Prefect separates the flow code (what runs) from the deployment (how it runs). Deployments live on the Prefect server/cloud, specifying which work pool handles them. Work pools are managed by workers (in Prefect 3) or agents (in Prefect 2) that poll for work. If any of these three pieces — flow, deployment, worker — is misconfigured or not running, flow runs stay pending forever.

Prefect 3 (released August 2024) introduced breaking changes: Agent and the Deployment.build_from_flow pattern are gone, replaced by .deploy() and Workers. Tutorials written for Prefect 2 break on Prefect 3 immediately.

Fix 1: API Connection — Server Running and Configured

PrefectHTTPStatusError: Failed to connect to server at http://127.0.0.1:4200/api

Prefect needs an API backend (local server, cloud, or self-hosted server).

Start a local Prefect server:

# Start in foreground
prefect server start

# Opens at http://127.0.0.1:4200
# API at http://127.0.0.1:4200/api

Set the API URL so the client knows where to connect:

prefect config set PREFECT_API_URL=http://127.0.0.1:4200/api

Verify:

prefect config view
# PREFECT_API_URL: http://127.0.0.1:4200/api

prefect version
# Server version: 3.1.0

Use Prefect Cloud instead of local server:

prefect cloud login   # Opens browser for auth
# Or
prefect cloud login --key your-api-key --workspace your-workspace
prefect config view
# PREFECT_API_URL: https://api.prefect.cloud/api/accounts/.../workspaces/...

Switch profiles to toggle between environments:

prefect profile create dev
prefect profile use dev
prefect config set PREFECT_API_URL=http://127.0.0.1:4200/api

prefect profile create prod
prefect profile use prod
prefect config set PREFECT_API_URL=https://api.prefect.cloud/api/...

Common Mistake: Starting prefect server start in one terminal, opening the UI, and running flows from another terminal without setting PREFECT_API_URL. The client defaults to an “ephemeral” SQLite DB — your flow runs show up in a separate storage that the UI doesn’t see. Always set the API URL explicitly.

Fix 2: Defining and Running Flows

from prefect import flow, task

@task
def extract():
    return [1, 2, 3, 4, 5]

@task
def transform(data):
    return [x * 2 for x in data]

@task
def load(data):
    print(f"Loaded: {data}")

@flow
def my_etl():
    data = extract()
    transformed = transform(data)
    load(transformed)

if __name__ == "__main__":
    my_etl()   # Runs locally, logs to the configured Prefect API

Flow parameters with validation:

from prefect import flow
from pydantic import BaseModel

class Config(BaseModel):
    source_table: str
    date: str

@flow
def process(config: Config):
    print(f"Processing {config.source_table} for {config.date}")

# Pydantic validates at call time
process(config=Config(source_table="orders", date="2025-04-09"))

Task-level concurrency and timeouts:

from prefect import task
from datetime import timedelta

@task(
    retries=3,
    retry_delay_seconds=10,
    timeout_seconds=300,
    cache_key_fn=lambda ctx, params: f"extract-{params['date']}",
    cache_expiration=timedelta(hours=1),
)
def extract(date: str):
    return fetch_data(date)

Subflows — flows calling other flows:

@flow
def subflow(x):
    return x * 2

@flow
def main_flow():
    result = subflow(5)   # Runs as a child flow with its own state
    print(result)

Fix 3: Deployments in Prefect 3

In Prefect 3, use .deploy() directly on the flow:

from prefect import flow

@flow(log_prints=True)
def my_flow(name: str = "world"):
    print(f"Hello, {name}")

if __name__ == "__main__":
    my_flow.deploy(
        name="hello-deployment",
        work_pool_name="default-agent-pool",
        schedule={"cron": "0 9 * * *", "timezone": "America/New_York"},
    )

From the CLI (prefect.yaml-based):

# Initialize a prefect.yaml file
prefect init

# Deploy
prefect deploy ./my_flow.py:my_flow --name hello --pool default-agent-pool

prefect.yaml configuration:

# prefect.yaml
name: my-project
prefect-version: 3.1.0

deployments:
  - name: hello-deployment
    entrypoint: my_flow.py:my_flow
    work_pool:
      name: default-agent-pool
    schedule:
      cron: "0 9 * * *"
      timezone: America/New_York
    parameters:
      name: "daily"

Prefect 2 migration — old code looks like this:

# OLD (Prefect 2)
from prefect.deployments import Deployment

deployment = Deployment.build_from_flow(
    flow=my_flow,
    name="hello-deployment",
    work_queue_name="default",
)
deployment.apply()

# NEW (Prefect 3)
my_flow.deploy(
    name="hello-deployment",
    work_pool_name="default-agent-pool",
)

Key API changes (2.x → 3.x):

Prefect 2Prefect 3
Deployment.build_from_flow()flow.deploy() or prefect deploy
AgentWorker
work_queue_namework_pool_name
prefect agent startprefect worker start
prefect.deployments.Deployment(removed)

Fix 4: Workers and Work Pools

A deployment needs a running worker to pick up and execute flow runs.

Create a work pool:

prefect work-pool create default-agent-pool --type process

Work pool types:

TypeRuns flows on
processLocal processes
dockerDocker containers
kubernetesKubernetes pods
ecsAWS ECS
cloud-runGCP Cloud Run
aciAzure Container Instances

Start a worker for the pool:

prefect worker start --pool default-agent-pool

The worker polls for scheduled flow runs and executes them. Without a running worker, deployments stay pending.

Verify worker is active:

prefect work-pool inspect default-agent-pool
# Shows worker heartbeat, last polled

Pro Tip: For development, always check three things when a flow stays in “Pending”: (1) is the server running?, (2) is there a work pool?, (3) is a worker polling that pool? All three must be running simultaneously. In production, workers typically run as long-lived processes under systemd, Docker, or Kubernetes.

Run workers in background with systemd:

# /etc/systemd/system/prefect-worker.service
[Unit]
Description=Prefect Worker
After=network.target

[Service]
User=prefect
WorkingDirectory=/opt/prefect
Environment="PREFECT_API_URL=https://api.prefect.cloud/api/..."
Environment="PREFECT_API_KEY=pnu_xxx"
ExecStart=/opt/prefect/venv/bin/prefect worker start --pool default-pool
Restart=always

[Install]
WantedBy=multi-user.target
sudo systemctl enable prefect-worker
sudo systemctl start prefect-worker

Fix 5: Task Retries and Error Handling

from prefect import task, flow
from datetime import timedelta

@task(
    retries=3,
    retry_delay_seconds=[10, 30, 60],   # Exponential backoff
    retry_jitter_factor=0.5,
)
def flaky_api_call():
    import requests
    response = requests.get("https://api.example.com/data", timeout=5)
    response.raise_for_status()
    return response.json()

Retry on specific exceptions only:

from prefect.tasks import exponential_backoff

@task(
    retries=5,
    retry_delay_seconds=exponential_backoff(backoff_factor=2),
    retry_condition_fn=lambda task, task_run, state: isinstance(
        state.result(raise_on_failure=False), ConnectionError
    ),
)
def network_task():
    ...

Common Mistake: Setting retries=3 and assuming all exceptions will trigger retry. Prefect only retries on exceptions — not on TimeoutError raised by a task’s own timeout_seconds. And retries don’t fire if the flow itself is cancelled. Test retry behavior explicitly rather than assuming.

Task failure handling without aborting the flow:

from prefect import flow, task
from prefect.states import Completed

@task
def sub_task(x):
    if x == 3:
        raise ValueError("bad value")
    return x * 2

@flow
def parent_flow():
    results = []
    for i in range(5):
        future = sub_task.submit(i, return_state=True)   # Get state, don't raise
        state = future.result()
        if state.is_completed():
            results.append(state.result())
        else:
            print(f"Task {i} failed, continuing")
    return results

Fix 6: Scheduling and Parameterized Runs

Cron schedule:

from prefect import flow
from prefect.schedules import CronSchedule

@flow
def daily_job():
    ...

daily_job.deploy(
    name="daily",
    work_pool_name="default-agent-pool",
    schedule=CronSchedule(cron="0 9 * * *", timezone="America/New_York"),
)

Interval schedule:

from prefect.schedules import IntervalSchedule
from datetime import timedelta

my_flow.deploy(
    name="every-5-min",
    work_pool_name="default-agent-pool",
    schedule=IntervalSchedule(interval=timedelta(minutes=5)),
)

Multiple schedules on one deployment:

my_flow.deploy(
    name="multi-schedule",
    work_pool_name="default-agent-pool",
    schedules=[
        {"cron": "0 9 * * MON-FRI"},   # Weekdays at 9 AM
        {"cron": "0 12 * * SAT,SUN"},   # Weekends at noon
    ],
)

Parameterize runs:

@flow
def process_date(date: str):
    print(f"Processing {date}")

# Deploy with default parameters
process_date.deploy(
    name="process-date",
    work_pool_name="default-agent-pool",
    parameters={"date": "2025-04-09"},
)

# Trigger ad-hoc with different parameters
# prefect deployment run process-date/process-date --param date=2025-04-10

Dynamic flow run names:

@flow(flow_run_name="process-{date}")   # Template with parameter values
def process(date: str):
    ...

# Run name becomes "process-2025-04-09" etc.

Fix 7: Variables, Secrets, and Blocks

Variables (non-secret config):

prefect variable set my_config_key "some value"
from prefect.variables import Variable

@flow
def my_flow():
    config = Variable.get("my_config_key")
    ...

Secrets (stored encrypted in the Prefect backend):

from prefect.blocks.system import Secret

# Save once
Secret(value="sk-...").save(name="openai-api-key")

# Load in flow
@flow
def my_flow():
    api_key = Secret.load("openai-api-key").get()
    ...

Custom blocks for reusable configuration:

from prefect.blocks.core import Block
from pydantic import SecretStr

class DatabaseConfig(Block):
    host: str
    port: int = 5432
    username: str
    password: SecretStr

# Save
DatabaseConfig(
    host="db.example.com",
    username="prefect",
    password="secret",
).save(name="prod-db")

# Load
db = DatabaseConfig.load("prod-db")

Environment variables for runtime config:

export PREFECT_API_URL=...
export PREFECT_API_KEY=...
export OPENAI_API_KEY=sk-...

Fix 8: Concurrency and Resource Management

Global concurrency limits:

# Limit any "database" task to 5 concurrent runs system-wide
prefect concurrency-limit create database 5
@task(tags=["database"])
def query(sql):
    ...

Work pool concurrency:

prefect work-pool set-concurrency-limit default-agent-pool 10

Task concurrency within a flow:

from prefect import flow
from prefect.futures import wait

@flow
def parallel_flow(urls):
    futures = [fetch_url.submit(url) for url in urls]
    wait(futures)   # Wait for all to complete
    return [f.result() for f in futures]

Serial vs parallel task execution:

@flow
def sequential():
    a = task_a()
    b = task_b(a)   # Waits for a
    c = task_c(b)   # Waits for b
    return c

@flow
def parallel():
    a = task_a.submit()
    b = task_b.submit()
    c = task_c.submit()
    return [a.result(), b.result(), c.result()]   # All three in parallel

Still Not Working?

Prefect vs Airflow vs Dagster

  • Prefect — Dynamic Python-first API, best for workflows that don’t fit static DAGs. See this article.
  • Airflow — Operator-based, largest ecosystem, static DAGs. See Airflow not working.
  • Dagster — Asset-centric with strong dev UX. See Dagster not working.

Debugging Flow Runs

from prefect import get_run_logger

@flow
def my_flow():
    logger = get_run_logger()
    logger.info("Starting flow")
    logger.error("Something bad")   # Appears in UI and server logs

log_prints=True automatically logs all print() calls:

@flow(log_prints=True)
def my_flow():
    print("This appears in the Prefect UI")   # Captured as INFO log

Integration with dbt

from prefect_dbt.cli.commands import DbtCoreOperation

@flow
def run_dbt():
    DbtCoreOperation(
        commands=["dbt run --select raw_orders"],
        project_dir="/path/to/dbt/project",
    ).run()

For dbt-specific errors, see dbt not working.

Storage for Flow Code

In production, workers pull flow code from remote storage (GitHub, S3, GCS) rather than shipping with the worker:

from prefect.filesystems import GitHub

github_block = GitHub(
    repository="https://github.com/org/prefect-flows",
    reference="main",
).save("github-flows", overwrite=True)

my_flow.from_source(
    source="https://github.com/org/prefect-flows",
    entrypoint="flows/my_flow.py:my_flow",
).deploy(
    name="github-deployment",
    work_pool_name="default-agent-pool",
)

Artifacts and Run Metadata

Prefect lets you attach structured artifacts to flow runs — visible in the UI:

from prefect import flow
from prefect.artifacts import create_markdown_artifact, create_table_artifact, create_link_artifact

@flow
def my_flow():
    # Markdown summary
    create_markdown_artifact(
        key="run-summary",
        markdown="## Results\n- Rows processed: 1,234\n- Errors: 0",
    )

    # Table
    create_table_artifact(
        key="top-10-users",
        table=[
            {"user": "alice", "score": 95},
            {"user": "bob", "score": 87},
        ],
    )

    # External link (reports, dashboards)
    create_link_artifact(
        key="dashboard",
        link="https://grafana.example.com/d/...",
        description="Live metrics dashboard",
    )

Event-Driven Triggers

Prefect 3 supports automation rules — trigger flows based on custom conditions:

from prefect.automations import Automation
from prefect.events.schemas.automations import EventTrigger, EventFilter

Automation(
    name="retry-on-failure",
    trigger=EventTrigger(
        expect={"prefect.flow-run.Failed"},
        match={"prefect.resource.id": "prefect.flow-run.*"},
    ),
    actions=[{"type": "run-deployment", "deployment_id": "..."}],
).create()

This replaces the Prefect 2 notifications system with a more general event model.

Testing Flows

from prefect.testing.utilities import prefect_test_harness

def test_flow():
    with prefect_test_harness():
        result = my_flow(name="test")
        assert result == "expected"

For general pytest fixture patterns that affect Prefect testing, see pytest fixture not found.

Cancelling and Pausing Flows

# From the CLI
prefect flow-run cancel <run-id>

# Pause a deployment's schedule without deleting it
prefect deployment pause my-flow/my-deployment
prefect deployment resume my-flow/my-deployment

In the UI, every flow run has a “Cancel” button. Workers check for cancellation signals between tasks — long-running individual tasks may complete before cancellation takes effect.

Prefect Cloud Free Tier Limits

The free Prefect Cloud tier has limits: flow run counts per month, concurrent workers, automation rules. If you’re building a production system, check the current pricing page before relying on the free tier — some production workloads exceed free limits within days.

For self-hosted alternatives, run prefect server start on your own infrastructure with a PostgreSQL backend:

export PREFECT_API_DATABASE_CONNECTION_URL="postgresql+asyncpg://user:pass@host/prefect"
prefect server start
F

FixDevs

Solo developer based in Japan. Every solution is cross-referenced with official documentation and tested before publishing.

Was this article helpful?

Related Articles