Fix: Dagster Not Working — Asset Loading, Resource Errors, and Daemon Issues
Quick Answer
How to fix Dagster errors — asset not found in definitions, resource not defined, dagster daemon not running, sensor or schedule not firing, DagsterInvariantViolationError, and asset materialization failing.
The Error
You try to load assets and Dagster can’t find them:
dagster.DagsterInvariantViolationError:
No assets with key 'my_dataset' found in DefinitionsOr a resource is referenced but not configured:
dagster.DagsterInvalidDefinitionError:
Resource 'database' is required by asset 'raw_orders' but no resource with that key was provided.Or schedules and sensors don’t fire:
$ dagster schedule list
# daily_etl - RUNNING
# But nothing has triggered in 24 hoursOr partitioned asset backfills fail with cryptic errors:
Partition 2025-04-09 has no materialization.
Upstream partition required but not found.Or the UI shows assets as failed but logs reveal no error:
Asset materialization: FAILURE
Last error: <no error details available>Dagster’s asset-centric model is powerful but opinionated — it expects you to think in terms of materialized data (assets) rather than operations (tasks). Resources inject dependencies, schedules and sensors drive execution, and the daemon process coordinates everything. Missing any piece produces errors that don’t look like bugs in your code. This guide covers each failure mode.
Why This Happens
Dagster has three conceptual layers: assets (data that gets materialized), ops (computation units, usually implicit in asset functions), and jobs (ops grouped into runnable graphs). Resources inject external dependencies (databases, APIs, S3 clients) into asset functions via type annotations.
The Dagster daemon is a separate process that runs schedules, sensors, and backfills. The webserver (dagster dev) shows the UI but doesn’t run schedules on its own. Users often start dagster dev expecting everything to work and find that their 9am schedule silently never triggers.
Fix 1: Defining and Registering Assets
# assets.py
from dagster import asset
@asset
def raw_orders():
import pandas as pd
return pd.read_csv("orders.csv")
@asset
def processed_orders(raw_orders):
return raw_orders[raw_orders["amount"] > 0]# definitions.py
from dagster import Definitions, load_assets_from_modules
from . import assets
defs = Definitions(
assets=load_assets_from_modules([assets]),
)Common error — asset not registered:
DagsterInvariantViolationError: No assets with key 'my_asset' found in DefinitionsCauses:
- Asset not in any file imported by definitions.py
load_assets_from_modulesmissing the module@assetdecorator missing — function is just a regular function
Explicit asset list (when auto-discovery gets confusing):
from dagster import Definitions
from myproject.assets import raw_orders, processed_orders, summary
defs = Definitions(
assets=[raw_orders, processed_orders, summary],
)Asset groups for organization:
from dagster import asset, AssetGroup
@asset(group_name="raw")
def raw_orders(): ...
@asset(group_name="processed")
def clean_orders(raw_orders): ...
@asset(group_name="analytics")
def monthly_revenue(clean_orders): ...Asset key prefixes for hierarchical organization:
@asset(key_prefix=["database", "raw"])
def orders(): ...
# Asset key: database/raw/ordersCommon Mistake: Defining an asset function and running the Dagster UI without updating definitions.py. Dagster only sees assets loaded through Definitions. If you add a new asset file, you must either list it explicitly or include it via load_assets_from_modules — or the UI won’t show it.
Fix 2: Resources and Dependency Injection
Resources inject shared state (database connections, API clients) into asset functions via type annotations.
from dagster import asset, ConfigurableResource
from pydantic import Field
class DatabaseResource(ConfigurableResource):
connection_string: str = Field(..., description="Postgres connection")
def execute(self, sql: str):
import psycopg
with psycopg.connect(self.connection_string) as conn:
return conn.execute(sql).fetchall()
@asset
def raw_orders(database: DatabaseResource):
return database.execute("SELECT * FROM orders")# definitions.py
from dagster import Definitions, EnvVar
from .assets import raw_orders
from .resources import DatabaseResource
defs = Definitions(
assets=[raw_orders],
resources={
"database": DatabaseResource(
connection_string=EnvVar("DATABASE_URL"),
),
},
)EnvVar for runtime configuration — values resolved when the asset runs, not at import time:
from dagster import EnvVar
resources = {
"api_key": EnvVar("OPENAI_API_KEY"),
"database": DatabaseResource(
connection_string=EnvVar("DATABASE_URL"),
),
}Common error:
DagsterInvalidDefinitionError: Resource 'database' is required by asset 'raw_orders'
but no resource with that key was provided.The asset’s type annotation (database: DatabaseResource) is the resource key. If definitions.py doesn’t register a resource under that key, the asset can’t run.
Pro Tip: Use ConfigurableResource (Pydantic-based) for all new code. The older string-based config API is deprecated. ConfigurableResource gives you type safety, IDE autocomplete, and better error messages — plus the ability to use EnvVar for secrets.
Multiple environments — define separate Definitions for dev/prod:
# definitions.py
import os
from dagster import Definitions
if os.getenv("DAGSTER_ENV") == "production":
resources = {
"database": DatabaseResource(connection_string=EnvVar("PROD_DATABASE_URL")),
}
else:
resources = {
"database": DatabaseResource(connection_string="postgresql://localhost/dev"),
}
defs = Definitions(assets=[...], resources=resources)Fix 3: Running the Daemon for Schedules and Sensors
dagster dev # Starts the webserver AND daemondagster dev is for development — it runs both the webserver (port 3000) and the daemon (schedules, sensors). In production, you run them separately:
# Webserver (UI)
dagster-webserver -h 0.0.0.0 -p 3000
# Daemon (background worker for schedules, sensors, backfills)
dagster-daemon runVerify daemon is running:
dagster debug run-info
# Shows daemon heartbeat statusIf the daemon isn’t running, schedules and sensors silently never fire. The UI shows them as “running” (meaning enabled), but nothing actually triggers.
Schedule definition:
from dagster import schedule, RunRequest
@schedule(
job_name="daily_etl_job",
cron_schedule="0 9 * * *", # 9 AM every day
execution_timezone="America/New_York",
)
def daily_etl():
return RunRequest(run_key=None, run_config={})Register it:
defs = Definitions(
assets=[...],
jobs=[daily_etl_job],
schedules=[daily_etl],
)Turn on from UI or CLI:
dagster schedule start daily_etlSchedules are off by default — they must be explicitly started before they fire.
Sensors (event-driven triggers):
from dagster import sensor, RunRequest, SensorEvaluationContext
@sensor(job=my_job, minimum_interval_seconds=60)
def file_sensor(context: SensorEvaluationContext):
import os
new_files = [f for f in os.listdir("./inbox") if f.endswith(".csv")]
for filename in new_files:
yield RunRequest(run_key=filename, run_config={
"ops": {"process_file": {"config": {"filename": filename}}}
})The daemon polls sensors every minimum_interval_seconds. Low values mean more checks but higher load.
Fix 4: Partitioned Assets
from dagster import asset, DailyPartitionsDefinition
from datetime import datetime
daily_partitions = DailyPartitionsDefinition(start_date="2024-01-01")
@asset(partitions_def=daily_partitions)
def daily_revenue(context):
date = context.partition_key # e.g., "2025-04-09"
return compute_revenue_for_date(date)Common error with partitioned assets:
Partition 2025-04-09 has no materialization.
Upstream partition required but not found.Downstream partitioned assets expect upstream partitions to be materialized first. If you try to compute monthly_summary for April before all April daily partitions exist, it fails.
Materialize upstream first:
# From UI or CLI
dagster asset materialize --select daily_revenue --partition 2025-04-09
# Materialize a range
dagster asset materialize --select daily_revenue --partition-range 2025-04-01:2025-04-09Multi-dimensional partitions (e.g., date × region):
from dagster import MultiPartitionsDefinition, StaticPartitionsDefinition
region_partitions = StaticPartitionsDefinition(["us", "eu", "asia"])
multi = MultiPartitionsDefinition({
"date": daily_partitions,
"region": region_partitions,
})
@asset(partitions_def=multi)
def regional_revenue(context):
keys = context.partition_key.keys_by_dimension
date = keys["date"]
region = keys["region"]
return compute(date, region)Backfill — materialize a range of missed partitions:
dagster asset materialize --select my_asset --partition-range 2025-01-01:2025-03-31In the UI, the “Backfill” button lets you select a partition range visually.
Fix 5: Asset Checks and Data Quality
Asset checks validate data after materialization — similar to dbt tests:
from dagster import asset, asset_check, AssetCheckResult
@asset
def customers():
import pandas as pd
return pd.read_csv("customers.csv")
@asset_check(asset=customers)
def not_null_email(customers):
df = customers
null_count = df["email"].isna().sum()
return AssetCheckResult(
passed=null_count == 0,
description=f"{null_count} nulls in email column",
metadata={"null_count": null_count},
)
@asset_check(asset=customers, blocking=True) # Blocks downstream on failure
def unique_customer_id(customers):
dup_count = customers["id"].duplicated().sum()
return AssetCheckResult(passed=dup_count == 0, metadata={"duplicates": dup_count})blocking=True asset checks prevent downstream assets from materializing when the check fails — important for enforcing data quality gates.
Register in Definitions:
defs = Definitions(
assets=[customers, ...],
asset_checks=[not_null_email, unique_customer_id],
)For dbt-based data quality patterns with similar test semantics, see dbt not working.
Fix 6: IO Managers — Where Assets Are Stored
IO managers control how asset outputs are persisted. The default writes to a local pickle file — fine for dev, wrong for production.
from dagster_aws.s3 import S3PickleIOManager, S3Resource
defs = Definitions(
assets=[...],
resources={
"io_manager": S3PickleIOManager(
s3_resource=S3Resource(),
s3_bucket="my-dagster-bucket",
),
},
)Per-asset IO manager:
from dagster import asset
@asset(io_manager_key="snowflake_io_manager")
def my_asset(): ...
@asset(io_manager_key="s3_io_manager")
def other_asset(): ...
defs = Definitions(
assets=[my_asset, other_asset],
resources={
"snowflake_io_manager": SnowflakeIOManager(...),
"s3_io_manager": S3PickleIOManager(...),
},
)Custom IO manager:
from dagster import IOManager, io_manager, OutputContext, InputContext
class ParquetIOManager(IOManager):
def handle_output(self, context: OutputContext, obj):
path = f"{context.asset_key.to_path()}.parquet"
obj.to_parquet(path)
def load_input(self, context: InputContext):
import pandas as pd
path = f"{context.asset_key.to_path()}.parquet"
return pd.read_parquet(path)
@io_manager
def parquet_io_manager():
return ParquetIOManager()Common Mistake: Leaving the default FilesystemIOManager in production. This writes pickles to the Dagster server’s local disk. If the server is in Kubernetes or restarted, assets are lost. Production deployments should use S3/GCS/Snowflake IO managers.
Fix 7: Jobs and Op-Based Workflows
For workflows that don’t fit the asset model (one-off operations, side effects), use ops and jobs:
from dagster import op, job, Out, In, Nothing
@op
def extract() -> list:
return [1, 2, 3, 4, 5]
@op
def transform(data: list) -> list:
return [x * 2 for x in data]
@op
def load(data: list):
print(f"Loaded {len(data)} records")
@job
def etl_job():
load(transform(extract()))Asset jobs (combine assets into a job for scheduling):
from dagster import define_asset_job, AssetSelection
daily_etl_job = define_asset_job(
name="daily_etl",
selection=AssetSelection.groups("raw", "processed"),
)
defs = Definitions(
assets=[...],
jobs=[daily_etl_job],
schedules=[daily_etl_schedule],
)Graph composition for reusable sub-pipelines:
from dagster import graph
@graph
def preprocess(raw_data):
cleaned = clean(raw_data)
validated = validate(cleaned)
return validated
@job
def full_pipeline():
data = extract()
preprocessed = preprocess(data)
train_model(preprocessed)Fix 8: Debugging and Logs
Enable structured logs inside assets:
from dagster import asset, get_dagster_logger
@asset
def my_asset(context):
logger = context.log # Built-in logger
logger.info("Starting materialization")
logger.debug("Detail info")
try:
result = do_work()
except Exception as e:
logger.error(f"Failed: {e}", exc_info=True)
raise
logger.info(f"Completed with {len(result)} rows")
return resultMaterialize metadata for UI display:
from dagster import MaterializeResult, MetadataValue
@asset
def my_asset():
df = compute_data()
return MaterializeResult(
metadata={
"num_rows": MetadataValue.int(len(df)),
"preview": MetadataValue.md(df.head().to_markdown()),
"report": MetadataValue.url("https://example.com/report"),
},
)Run history in the UI shows asset materializations, run steps, logs, and timings. For expensive debugging, always check the run detail page first.
Command-line run:
# Materialize specific assets
dagster asset materialize --select "my_asset+" --select "other_asset-"
# + means downstream; - means upstream
# "my_asset+" — my_asset and all downstream
# "+my_asset" — my_asset and all upstream
# "+my_asset+" — my_asset with all upstream AND downstreamStill Not Working?
Dagster vs Airflow vs Prefect
- Dagster — Asset-centric, strong dev experience, software-defined assets. Best for data products where lineage matters.
- Airflow — Task-centric, mature ecosystem, huge community. See Airflow not working. Best when you need Airflow’s operator ecosystem.
- Prefect — Task-centric with modern Python API. Best for workflows that mix data and non-data tasks.
Integration with dbt
Dagster has first-class dbt integration:
pip install dagster-dbtfrom dagster_dbt import DbtCliResource, dbt_assets
@dbt_assets(manifest=Path("path/to/manifest.json"))
def my_dbt_assets(context, dbt: DbtCliResource):
yield from dbt.cli(["build"], context=context).stream()For dbt-specific patterns that Dagster orchestrates, see dbt not working.
Production Deployment
Dagster runs in Kubernetes via the dagster-k8s package, on ECS via dagster-ecs, or managed via Dagster Cloud. For database configuration errors that affect deployment, see PostgreSQL connection refused.
Resource Configuration at Runtime
# Override resources at materialization time
context.resources.database # Access the resource inside an asset
# Override in tests
from dagster import materialize
result = materialize(
[my_asset],
resources={
"database": DatabaseResource(connection_string="sqlite:///test.db"),
},
)For testing patterns with pytest fixtures, see pytest fixture not found.
Freshness Checks and SLAs
Dagster can alert when assets become stale (not materialized within a time window):
from dagster import asset, FreshnessPolicy
@asset(
freshness_policy=FreshnessPolicy(
maximum_lag_minutes=60, # Alert if not materialized in 60 minutes
cron_schedule="0 * * * *", # Expected at the top of each hour
),
)
def hourly_metrics():
...The UI highlights stale assets in red and can route alerts to Slack or email via sensors.
Code Locations and Separate Workspaces
For multi-team deployments, split Dagster into multiple code locations — each can have its own Python environment and dependencies:
# workspace.yaml
load_from:
- python_module:
module_name: team_a.definitions
working_directory: /path/to/team_a
- python_module:
module_name: team_b.definitions
working_directory: /path/to/team_bEach code location runs in its own process, so dependency conflicts between teams don’t affect each other. The UI shows assets from all locations in one unified graph.
Asset Selection Syntax Reference
Dagster’s asset selection syntax is powerful but syntax-heavy:
| Syntax | Meaning |
|---|---|
"my_asset" | Just this asset |
"my_asset+" | This asset and all downstream |
"+my_asset" | This asset and all upstream |
"+my_asset+" | This asset + both directions |
"group:my_group" | All assets in group |
"tag:my_tag" | All assets with tag |
"key_prefix:raw/" | All assets under this prefix |
"my_asset-other_asset" | Exclude |
Combine with AssetSelection in Python for complex filters across jobs and sensors.
Solo developer based in Japan. Every solution is cross-referenced with official documentation and tested before publishing.
Was this article helpful?
Related Articles
Fix: Prefect Not Working — Flow Deployment, Worker Errors, and 2.x to 3.x Migration
How to fix Prefect errors — flow deployment not running, worker not picking up runs, PrefectHTTPStatusError cannot connect to API, task retries not working, state transitions stuck in Pending, and flow_run_name template not resolving.
Fix: Apache Airflow Not Working — DAG Not Found, Task Failures, and Scheduler Issues
How to fix Apache Airflow errors — DAG not appearing in UI, ImportError preventing DAG load, task stuck in running or queued, scheduler not scheduling, XCom too large, connection not found, and database migration errors.
Fix: dbt Not Working — ref() Not Found, Schema Mismatch, and Compilation Errors
How to fix dbt errors — ref() model not found, profile not found, database relation does not exist, incremental model schema mismatch requiring full-refresh, dbt deps failure, Jinja compilation errors, and test failures.
Fix: aiohttp Not Working — Session Leaks, ClientTimeout, and Connector Errors
How to fix aiohttp errors — RuntimeError session is closed, ClientConnectorError connection refused, SSL verify failure, Unclosed client session warning, server websocket disconnect, and connector pool exhausted.