Fix: Structlog Not Working — Processor Chain, Context Variables, and Stdlib Integration
Quick Answer
How to fix Structlog errors — output is dict not JSON, context vars not propagating, stdlib logging not unified, async context loss, configure_once, KeyValueRenderer vs JSONRenderer, and async filtering.
The Error
You configure Structlog and the output looks like a Python dict, not JSON:
import structlog
log = structlog.get_logger()
log.info("user_created", user_id=123)
# 2025-04-09 10:00:00 [info ] user_created user_id=123
# Wanted: {"timestamp": "...", "level": "info", "event": "user_created", "user_id": 123}Or bind_contextvars() values disappear in nested code:
structlog.contextvars.bind_contextvars(request_id="abc")
log.info("processing") # OK — has request_id
async def nested():
log.info("nested") # Missing request_id in async contextOr stdlib logging and Structlog produce inconsistent output:
import logging
logging.getLogger("sqlalchemy.engine").info("SQL: SELECT ...")
# Plain text format — different from Structlog's JSONOr configure() runs multiple times in tests, causing duplicate log lines:
# pytest fixture configures Structlog
# Test imports module that also configures Structlog
# Result: every log line printed 2-3 timesOr async tasks lose their bound context variables across await points:
async def request(req_id):
structlog.contextvars.bind_contextvars(request_id=req_id)
await some_task()
log.info("done") # request_id still present? Sometimes yes, sometimes noStructlog is the structured logging library favored for production observability — JSON output, processor pipelines, context propagation. Unlike Loguru (which optimizes for simplicity), Structlog gives you composable processor chains for transforming log records. This power has costs: misconfigured processors silently change output format, context vars need careful setup, and the dual stdlib/Structlog model surprises newcomers. This guide covers each.
Why This Happens
Structlog’s processor chain is a list of functions, each taking the previous output and returning a transformed record. By default, the chain ends with ConsoleRenderer — colored, human-friendly output. To get JSON, you swap the renderer. Misordering or omitting processors silently changes behavior — Structlog won’t error on a “broken” chain; it’ll just produce unexpected output.
The standard library’s logging module is entirely separate from Structlog. Without explicit integration, third-party libraries (SQLAlchemy, Uvicorn, requests) log through stdlib in their own format. Unifying both takes deliberate setup.
Fix 1: Configuration and Renderer Selection
import structlog
import logging
structlog.configure(
processors=[
structlog.contextvars.merge_contextvars,
structlog.processors.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.processors.JSONRenderer(),
],
wrapper_class=structlog.stdlib.BoundLogger,
logger_factory=structlog.stdlib.LoggerFactory(),
cache_logger_on_first_use=True,
)
log = structlog.get_logger()
log.info("user_created", user_id=123)
# {"timestamp": "2025-04-09T10:00:00", "level": "info", "event": "user_created", "user_id": 123}The processor chain runs in order — each function takes the previous output as a dict and returns the transformed dict. The final processor in the list is the renderer (converts dict to string).
Common Mistake: Forgetting that the last processor must be a renderer. If you put JSONRenderer in the middle of the chain, subsequent processors see a string instead of a dict and silently break.
# WRONG — JSONRenderer not last
processors = [
structlog.processors.JSONRenderer(), # Wrong position
structlog.processors.add_log_level, # Won't add level — chain broken
]
# CORRECT
processors = [
structlog.processors.add_log_level,
structlog.processors.JSONRenderer(), # Last position
]Common renderers:
| Renderer | Use case |
|---|---|
ConsoleRenderer | Dev — colored, human-readable |
JSONRenderer | Production — log aggregators |
KeyValueRenderer | logfmt format (key=value key=value) |
LogfmtRenderer | Strict logfmt for Heroku, Honeycomb |
Environment-based config:
import os
import sys
import structlog
renderer = (
structlog.processors.JSONRenderer()
if not sys.stdout.isatty()
else structlog.dev.ConsoleRenderer(colors=True)
)
structlog.configure(
processors=[
structlog.contextvars.merge_contextvars,
structlog.processors.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
renderer,
],
)Dev terminals get color; CI and production get JSON.
Fix 2: Context Variables for Request-Scoped Data
import structlog
from structlog.contextvars import bind_contextvars, unbind_contextvars, clear_contextvars
log = structlog.get_logger()
def handle_request(req_id, user_id):
bind_contextvars(request_id=req_id, user_id=user_id)
try:
log.info("request_started") # Has request_id, user_id
do_work()
log.info("request_complete")
finally:
clear_contextvars()
def do_work():
log.info("processing") # Also has request_id, user_id from parent contextmerge_contextvars processor is required — without it, the context isn’t included in logs:
structlog.configure(
processors=[
structlog.contextvars.merge_contextvars, # MUST be in the chain
...
],
)Context vars work across async boundaries — Python’s contextvars module is async-aware:
import asyncio
import structlog
from structlog.contextvars import bind_contextvars
log = structlog.get_logger()
async def request(req_id):
bind_contextvars(request_id=req_id)
log.info("start")
await asyncio.sleep(0.1)
log.info("after_await") # Still has request_id
async def main():
await asyncio.gather(
request("abc"),
request("xyz"),
)
# Each task has its own context — abc and xyz don't leak acrossbound_contextvars() context manager for scoped binding:
from structlog.contextvars import bound_contextvars
def handler():
with bound_contextvars(request_id="abc", trace_id="xyz"):
log.info("inside") # Has the bound vars
log.info("outside") # Doesn'tPro Tip: Bind context vars at the request boundary (middleware, route handler, message consumer entrypoint) — everything downstream gets the context for free. This is the single biggest improvement Structlog brings over plain logging: correlation IDs propagate automatically through deeply nested code without threading them through every function signature.
Fix 3: Unifying Stdlib Logging with Structlog
Third-party libraries use stdlib logging. Unify both formats so logs look consistent:
import logging
import structlog
# Shared processors run for both Structlog and stdlib logs
shared_processors = [
structlog.contextvars.merge_contextvars,
structlog.processors.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
]
structlog.configure(
processors=shared_processors + [
structlog.stdlib.ProcessorFormatter.wrap_for_formatter,
],
logger_factory=structlog.stdlib.LoggerFactory(),
wrapper_class=structlog.stdlib.BoundLogger,
cache_logger_on_first_use=True,
)
# Configure stdlib to also use Structlog's formatter
formatter = structlog.stdlib.ProcessorFormatter(
foreign_pre_chain=shared_processors,
processors=[
structlog.stdlib.ProcessorFormatter.remove_processors_meta,
structlog.processors.JSONRenderer(),
],
)
handler = logging.StreamHandler()
handler.setFormatter(formatter)
root_logger = logging.getLogger()
root_logger.addHandler(handler)
root_logger.setLevel(logging.INFO)
# Now both produce identical JSON output
log = structlog.get_logger()
log.info("structlog message", user_id=123)
logging.getLogger("requests").info("HTTP request")
# Both lines: same JSON format with timestamp, level, message, fieldsforeign_pre_chain processes stdlib log records before merging with Structlog’s chain. This ensures third-party loggers get the same context vars and formatting.
Set log levels per logger:
logging.getLogger("sqlalchemy.engine").setLevel(logging.WARNING)
logging.getLogger("urllib3").setLevel(logging.WARNING)
logging.getLogger("httpx").setLevel(logging.WARNING)Reduces noise from chatty libraries. Set this after configuring Structlog so the levels stick.
Fix 4: Configuring Once vs Multiple Times
structlog.configure(...) # Replaces any prior configconfigure() is idempotent — calling it multiple times replaces the configuration entirely. But in tests where modules import each other, you may want first-config-wins:
structlog.configure_once(...) # Only configures if not already configuredconfigure_once for libraries:
# mylib/__init__.py
import structlog
# Only set default config if the user hasn't configured Structlog
structlog.configure_once(
processors=[
structlog.processors.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.JSONRenderer(),
],
)Library code shouldn’t override user configuration — configure_once is the polite default for libraries.
Common Mistake: Calling structlog.configure() inside a function that runs multiple times (e.g., a test fixture, a Flask route handler). Each call replaces the global config. Symptoms include intermittent format changes between requests and logs that look correct in tests but wrong in production. Always configure Structlog once at startup, not in request handlers.
Fix 5: Async-Aware Logging
Structlog’s BoundLogger is synchronous by default. For async code, use AsyncBoundLogger:
import structlog
structlog.configure(
processors=[
structlog.contextvars.merge_contextvars,
structlog.processors.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.JSONRenderer(),
],
wrapper_class=structlog.stdlib.AsyncBoundLogger, # Async variant
logger_factory=structlog.stdlib.LoggerFactory(),
)
log = structlog.get_logger()
async def handler():
await log.ainfo("async log") # Note: ainfo, not info
await log.adebug("debug log")
await log.awarning("warn log")Why async-aware? Sync log calls in async code work but block the event loop briefly. For high-throughput async services (FastAPI, aiohttp), use AsyncBoundLogger to delegate logging to a thread pool.
For FastAPI specifically:
from fastapi import FastAPI, Request
import structlog
from structlog.contextvars import bind_contextvars, clear_contextvars
import uuid
app = FastAPI()
log = structlog.get_logger()
@app.middleware("http")
async def correlation_id_middleware(request: Request, call_next):
clear_contextvars()
bind_contextvars(
request_id=str(uuid.uuid4()),
method=request.method,
path=request.url.path,
)
response = await call_next(request)
await log.ainfo("request_complete", status=response.status_code)
return responseFor FastAPI middleware patterns that interact with Structlog context, see FastAPI dependency injection error.
Fix 6: Filtering and Sampling
import structlog
import logging
# Drop debug logs in production
def filter_by_level(logger, method_name, event_dict):
if method_name == "debug" and os.getenv("ENV") == "production":
raise structlog.DropEvent
return event_dict
structlog.configure(
processors=[
filter_by_level,
structlog.processors.add_log_level,
structlog.processors.JSONRenderer(),
],
)DropEvent signals “skip this log entirely” — Structlog stops processing and returns nothing.
Sample high-volume logs:
import random
def sample_debug(logger, method_name, event_dict):
if method_name == "debug" and random.random() > 0.01:
raise structlog.DropEvent # Drop 99% of debug logs
return event_dictRedact sensitive fields:
SENSITIVE = {"password", "api_key", "credit_card", "ssn"}
def redact_sensitive(logger, method_name, event_dict):
for key in list(event_dict.keys()):
if key.lower() in SENSITIVE:
event_dict[key] = "[REDACTED]"
return event_dict
structlog.configure(
processors=[
redact_sensitive, # Run early in the chain
structlog.processors.add_log_level,
structlog.processors.JSONRenderer(),
],
)
log.info("login_attempt", username="alice", password="secret123")
# {"event": "login_attempt", "username": "alice", "password": "[REDACTED]"}Fix 7: Exception Logging
import structlog
log = structlog.get_logger()
try:
risky_operation()
except Exception:
log.exception("operation_failed") # Includes full tracebackformat_exc_info processor must be in the chain for tracebacks to render:
structlog.configure(
processors=[
structlog.processors.add_log_level,
structlog.processors.format_exc_info, # Required for exception tracebacks
structlog.processors.JSONRenderer(),
],
)Add exception type and message as fields:
def add_exception_info(logger, method_name, event_dict):
exc_info = event_dict.get("exc_info")
if exc_info:
exc_type, exc_value, _ = exc_info
event_dict["exc_type"] = exc_type.__name__
event_dict["exc_message"] = str(exc_value)
return event_dict
structlog.configure(
processors=[
add_exception_info,
structlog.processors.format_exc_info,
structlog.processors.JSONRenderer(),
],
)With manual exception:
try:
1/0
except ZeroDivisionError as e:
log.error("calculation_failed", error=str(e), error_type=type(e).__name__)
# Or use log.exception("calculation_failed") to auto-include tracebackFix 8: Testing with Structlog
Structlog provides testing.capture_logs for inspecting logs in tests:
import structlog
def fn():
log = structlog.get_logger()
log.info("event", user_id=123)
def test_fn():
with structlog.testing.capture_logs() as cap_logs:
fn()
assert cap_logs == [{"event": "event", "user_id": 123, "log_level": "info"}]For pytest, configure Structlog to use the test renderer:
# conftest.py
import structlog
import pytest
@pytest.fixture(autouse=True)
def configure_structlog():
structlog.configure(
processors=[
structlog.processors.add_log_level,
structlog.testing.LogCapture(),
],
wrapper_class=structlog.testing.BoundLoggerMock,
)For pytest fixture patterns that pair with structlog testing, see pytest fixture not found.
Still Not Working?
Structlog vs Loguru
- Structlog — Processor chain, more configurable, first-class context vars, production observability focus.
- Loguru — Simpler API, fewer concepts, drop-in replacement for stdlib logging. See Loguru not working.
Use Loguru for simplicity. Use Structlog when you need processor chains (custom transformations, redaction, sampling) or strict JSON output with context propagation.
Performance Considerations
Structlog is generally fast but the chain runs on every log call. Optimizations:
# 1. Cache logger on first use (avoids re-creation per call)
structlog.configure(cache_logger_on_first_use=True, ...)
# 2. Filter early in the chain (skip expensive processors for dropped logs)
processors = [
filter_by_level, # Cheap; drops first
structlog.processors.add_log_level,
expensive_enrichment, # Only runs if not dropped
structlog.processors.JSONRenderer(),
]
# 3. Lazy evaluation of expensive values
log.info("done", expensive=lambda: compute_summary())
# Structlog evaluates the lambda only if the log is rendered (not dropped)Integration with FastAPI/Django/Flask
For FastAPI, use middleware as shown above. For Django:
# settings.py
LOGGING = {
"version": 1,
"disable_existing_loggers": False,
"formatters": {
"json": {
"()": structlog.stdlib.ProcessorFormatter,
"processor": structlog.processors.JSONRenderer(),
},
},
"handlers": {
"default": {"class": "logging.StreamHandler", "formatter": "json"},
},
"root": {"handlers": ["default"], "level": "INFO"},
}For Django-specific logging patterns, see Django migration conflict for environment patterns that affect logging configuration.
Multiple Output Sinks
Sometimes you want different log destinations — JSON to a file for aggregation, colored output to stdout for humans. Configure stdlib logging with two handlers:
import logging
import structlog
import sys
shared_processors = [
structlog.contextvars.merge_contextvars,
structlog.processors.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
]
# Console handler — colored
console_formatter = structlog.stdlib.ProcessorFormatter(
foreign_pre_chain=shared_processors,
processors=[
structlog.stdlib.ProcessorFormatter.remove_processors_meta,
structlog.dev.ConsoleRenderer(colors=True),
],
)
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setFormatter(console_formatter)
# File handler — JSON
file_formatter = structlog.stdlib.ProcessorFormatter(
foreign_pre_chain=shared_processors,
processors=[
structlog.stdlib.ProcessorFormatter.remove_processors_meta,
structlog.processors.JSONRenderer(),
],
)
file_handler = logging.FileHandler("app.log")
file_handler.setFormatter(file_formatter)
root = logging.getLogger()
root.addHandler(console_handler)
root.addHandler(file_handler)
root.setLevel(logging.INFO)This pattern leverages stdlib’s multi-handler model — Structlog handles record formatting per handler, stdlib handles routing.
Cloud Logging Integration
GCP, AWS, and DataDog all consume JSON logs with specific field names:
# Rename fields to GCP convention
def gcp_field_mapping(logger, method_name, event_dict):
event_dict["severity"] = event_dict.pop("log_level", "INFO").upper()
event_dict["message"] = event_dict.pop("event")
return event_dict
structlog.configure(processors=[..., gcp_field_mapping, JSONRenderer()])For Loguru-based cloud logging patterns that compare to this Structlog approach, see Loguru not working.
Combining with Tenacity for Retry Logging
import structlog
from tenacity import retry, stop_after_attempt, before_sleep_log
import logging
log = structlog.get_logger()
@retry(
stop=stop_after_attempt(5),
before_sleep=before_sleep_log(logging.getLogger(), logging.INFO),
)
def fetch():
...For Tenacity-specific retry patterns, see Tenacity not working.
Debugging Misconfigured Chains
If logs come out wrong (missing fields, wrong format), add a print processor in the middle to inspect the dict at that point:
def debug_processor(logger, method_name, event_dict):
print(f"DEBUG processor state: {event_dict}")
return event_dict
structlog.configure(processors=[
structlog.processors.add_log_level,
debug_processor, # Inspect here
structlog.contextvars.merge_contextvars,
debug_processor, # And here
structlog.processors.JSONRenderer(),
])This makes the chain’s transformations visible — invaluable when a custom processor isn’t producing the expected output. Remove debug processors before deploying.
Solo developer based in Japan. Every solution is cross-referenced with official documentation and tested before publishing.
Was this article helpful?
Related Articles
Fix: Loguru Not Working — Missing Logs, Rotation Errors, and Multiprocessing Issues
How to fix Loguru errors — logs not appearing after logger.add, file rotation not working, enqueue required for multiprocessing, structured logging JSON, intercepting stdlib logging, and handler removal.
Fix: OpenTelemetry Not Working — Traces Not Appearing, Spans Missing, or Exporter Connection Refused
How to fix OpenTelemetry issues — SDK initialization order, auto-instrumentation setup, OTLP exporter configuration, context propagation, and missing spans in Node.js, Python, and Java.
Fix: joblib Not Working — Parallel Backends, Memory Cache, and Pickling Errors
How to fix joblib errors — Parallel n_jobs slower than expected, Memory cache miss, backend loky vs threading vs multiprocessing, pickling lambda not supported, dump load file size, and pytest interference.
Fix: Marshmallow Not Working — Schema Errors, Load vs Dump, and Field Validation
How to fix Marshmallow errors — Schema not validated on dump, ValidationError messages format, unknown field handling, missing vs default, post_load object construction, and Marshmallow 3 to 4 migration.