Skip to content

Fix: Structlog Not Working — Processor Chain, Context Variables, and Stdlib Integration

FixDevs ·

Quick Answer

How to fix Structlog errors — output is dict not JSON, context vars not propagating, stdlib logging not unified, async context loss, configure_once, KeyValueRenderer vs JSONRenderer, and async filtering.

The Error

You configure Structlog and the output looks like a Python dict, not JSON:

import structlog
log = structlog.get_logger()
log.info("user_created", user_id=123)
# 2025-04-09 10:00:00 [info     ] user_created    user_id=123
# Wanted: {"timestamp": "...", "level": "info", "event": "user_created", "user_id": 123}

Or bind_contextvars() values disappear in nested code:

structlog.contextvars.bind_contextvars(request_id="abc")
log.info("processing")   # OK — has request_id

async def nested():
    log.info("nested")   # Missing request_id in async context

Or stdlib logging and Structlog produce inconsistent output:

import logging
logging.getLogger("sqlalchemy.engine").info("SQL: SELECT ...")
# Plain text format — different from Structlog's JSON

Or configure() runs multiple times in tests, causing duplicate log lines:

# pytest fixture configures Structlog
# Test imports module that also configures Structlog
# Result: every log line printed 2-3 times

Or async tasks lose their bound context variables across await points:

async def request(req_id):
    structlog.contextvars.bind_contextvars(request_id=req_id)
    await some_task()
    log.info("done")   # request_id still present? Sometimes yes, sometimes no

Structlog is the structured logging library favored for production observability — JSON output, processor pipelines, context propagation. Unlike Loguru (which optimizes for simplicity), Structlog gives you composable processor chains for transforming log records. This power has costs: misconfigured processors silently change output format, context vars need careful setup, and the dual stdlib/Structlog model surprises newcomers. This guide covers each.

Why This Happens

Structlog’s processor chain is a list of functions, each taking the previous output and returning a transformed record. By default, the chain ends with ConsoleRenderer — colored, human-friendly output. To get JSON, you swap the renderer. Misordering or omitting processors silently changes behavior — Structlog won’t error on a “broken” chain; it’ll just produce unexpected output.

The standard library’s logging module is entirely separate from Structlog. Without explicit integration, third-party libraries (SQLAlchemy, Uvicorn, requests) log through stdlib in their own format. Unifying both takes deliberate setup.

Fix 1: Configuration and Renderer Selection

import structlog
import logging

structlog.configure(
    processors=[
        structlog.contextvars.merge_contextvars,
        structlog.processors.add_log_level,
        structlog.processors.TimeStamper(fmt="iso"),
        structlog.processors.StackInfoRenderer(),
        structlog.processors.format_exc_info,
        structlog.processors.JSONRenderer(),
    ],
    wrapper_class=structlog.stdlib.BoundLogger,
    logger_factory=structlog.stdlib.LoggerFactory(),
    cache_logger_on_first_use=True,
)

log = structlog.get_logger()
log.info("user_created", user_id=123)
# {"timestamp": "2025-04-09T10:00:00", "level": "info", "event": "user_created", "user_id": 123}

The processor chain runs in order — each function takes the previous output as a dict and returns the transformed dict. The final processor in the list is the renderer (converts dict to string).

Common Mistake: Forgetting that the last processor must be a renderer. If you put JSONRenderer in the middle of the chain, subsequent processors see a string instead of a dict and silently break.

# WRONG — JSONRenderer not last
processors = [
    structlog.processors.JSONRenderer(),       # Wrong position
    structlog.processors.add_log_level,         # Won't add level — chain broken
]

# CORRECT
processors = [
    structlog.processors.add_log_level,
    structlog.processors.JSONRenderer(),       # Last position
]

Common renderers:

RendererUse case
ConsoleRendererDev — colored, human-readable
JSONRendererProduction — log aggregators
KeyValueRendererlogfmt format (key=value key=value)
LogfmtRendererStrict logfmt for Heroku, Honeycomb

Environment-based config:

import os
import sys
import structlog

renderer = (
    structlog.processors.JSONRenderer()
    if not sys.stdout.isatty()
    else structlog.dev.ConsoleRenderer(colors=True)
)

structlog.configure(
    processors=[
        structlog.contextvars.merge_contextvars,
        structlog.processors.add_log_level,
        structlog.processors.TimeStamper(fmt="iso"),
        renderer,
    ],
)

Dev terminals get color; CI and production get JSON.

Fix 2: Context Variables for Request-Scoped Data

import structlog
from structlog.contextvars import bind_contextvars, unbind_contextvars, clear_contextvars

log = structlog.get_logger()

def handle_request(req_id, user_id):
    bind_contextvars(request_id=req_id, user_id=user_id)
    try:
        log.info("request_started")   # Has request_id, user_id
        do_work()
        log.info("request_complete")
    finally:
        clear_contextvars()

def do_work():
    log.info("processing")   # Also has request_id, user_id from parent context

merge_contextvars processor is required — without it, the context isn’t included in logs:

structlog.configure(
    processors=[
        structlog.contextvars.merge_contextvars,   # MUST be in the chain
        ...
    ],
)

Context vars work across async boundaries — Python’s contextvars module is async-aware:

import asyncio
import structlog
from structlog.contextvars import bind_contextvars

log = structlog.get_logger()

async def request(req_id):
    bind_contextvars(request_id=req_id)
    log.info("start")
    await asyncio.sleep(0.1)
    log.info("after_await")   # Still has request_id

async def main():
    await asyncio.gather(
        request("abc"),
        request("xyz"),
    )

# Each task has its own context — abc and xyz don't leak across

bound_contextvars() context manager for scoped binding:

from structlog.contextvars import bound_contextvars

def handler():
    with bound_contextvars(request_id="abc", trace_id="xyz"):
        log.info("inside")   # Has the bound vars
    log.info("outside")   # Doesn't

Pro Tip: Bind context vars at the request boundary (middleware, route handler, message consumer entrypoint) — everything downstream gets the context for free. This is the single biggest improvement Structlog brings over plain logging: correlation IDs propagate automatically through deeply nested code without threading them through every function signature.

Fix 3: Unifying Stdlib Logging with Structlog

Third-party libraries use stdlib logging. Unify both formats so logs look consistent:

import logging
import structlog

# Shared processors run for both Structlog and stdlib logs
shared_processors = [
    structlog.contextvars.merge_contextvars,
    structlog.processors.add_log_level,
    structlog.processors.TimeStamper(fmt="iso"),
]

structlog.configure(
    processors=shared_processors + [
        structlog.stdlib.ProcessorFormatter.wrap_for_formatter,
    ],
    logger_factory=structlog.stdlib.LoggerFactory(),
    wrapper_class=structlog.stdlib.BoundLogger,
    cache_logger_on_first_use=True,
)

# Configure stdlib to also use Structlog's formatter
formatter = structlog.stdlib.ProcessorFormatter(
    foreign_pre_chain=shared_processors,
    processors=[
        structlog.stdlib.ProcessorFormatter.remove_processors_meta,
        structlog.processors.JSONRenderer(),
    ],
)

handler = logging.StreamHandler()
handler.setFormatter(formatter)

root_logger = logging.getLogger()
root_logger.addHandler(handler)
root_logger.setLevel(logging.INFO)

# Now both produce identical JSON output
log = structlog.get_logger()
log.info("structlog message", user_id=123)

logging.getLogger("requests").info("HTTP request")
# Both lines: same JSON format with timestamp, level, message, fields

foreign_pre_chain processes stdlib log records before merging with Structlog’s chain. This ensures third-party loggers get the same context vars and formatting.

Set log levels per logger:

logging.getLogger("sqlalchemy.engine").setLevel(logging.WARNING)
logging.getLogger("urllib3").setLevel(logging.WARNING)
logging.getLogger("httpx").setLevel(logging.WARNING)

Reduces noise from chatty libraries. Set this after configuring Structlog so the levels stick.

Fix 4: Configuring Once vs Multiple Times

structlog.configure(...)   # Replaces any prior config

configure() is idempotent — calling it multiple times replaces the configuration entirely. But in tests where modules import each other, you may want first-config-wins:

structlog.configure_once(...)   # Only configures if not already configured

configure_once for libraries:

# mylib/__init__.py
import structlog

# Only set default config if the user hasn't configured Structlog
structlog.configure_once(
    processors=[
        structlog.processors.add_log_level,
        structlog.processors.TimeStamper(fmt="iso"),
        structlog.processors.JSONRenderer(),
    ],
)

Library code shouldn’t override user configuration — configure_once is the polite default for libraries.

Common Mistake: Calling structlog.configure() inside a function that runs multiple times (e.g., a test fixture, a Flask route handler). Each call replaces the global config. Symptoms include intermittent format changes between requests and logs that look correct in tests but wrong in production. Always configure Structlog once at startup, not in request handlers.

Fix 5: Async-Aware Logging

Structlog’s BoundLogger is synchronous by default. For async code, use AsyncBoundLogger:

import structlog

structlog.configure(
    processors=[
        structlog.contextvars.merge_contextvars,
        structlog.processors.add_log_level,
        structlog.processors.TimeStamper(fmt="iso"),
        structlog.processors.JSONRenderer(),
    ],
    wrapper_class=structlog.stdlib.AsyncBoundLogger,   # Async variant
    logger_factory=structlog.stdlib.LoggerFactory(),
)

log = structlog.get_logger()

async def handler():
    await log.ainfo("async log")   # Note: ainfo, not info
    await log.adebug("debug log")
    await log.awarning("warn log")

Why async-aware? Sync log calls in async code work but block the event loop briefly. For high-throughput async services (FastAPI, aiohttp), use AsyncBoundLogger to delegate logging to a thread pool.

For FastAPI specifically:

from fastapi import FastAPI, Request
import structlog
from structlog.contextvars import bind_contextvars, clear_contextvars
import uuid

app = FastAPI()
log = structlog.get_logger()

@app.middleware("http")
async def correlation_id_middleware(request: Request, call_next):
    clear_contextvars()
    bind_contextvars(
        request_id=str(uuid.uuid4()),
        method=request.method,
        path=request.url.path,
    )
    response = await call_next(request)
    await log.ainfo("request_complete", status=response.status_code)
    return response

For FastAPI middleware patterns that interact with Structlog context, see FastAPI dependency injection error.

Fix 6: Filtering and Sampling

import structlog
import logging

# Drop debug logs in production
def filter_by_level(logger, method_name, event_dict):
    if method_name == "debug" and os.getenv("ENV") == "production":
        raise structlog.DropEvent
    return event_dict

structlog.configure(
    processors=[
        filter_by_level,
        structlog.processors.add_log_level,
        structlog.processors.JSONRenderer(),
    ],
)

DropEvent signals “skip this log entirely” — Structlog stops processing and returns nothing.

Sample high-volume logs:

import random

def sample_debug(logger, method_name, event_dict):
    if method_name == "debug" and random.random() > 0.01:
        raise structlog.DropEvent   # Drop 99% of debug logs
    return event_dict

Redact sensitive fields:

SENSITIVE = {"password", "api_key", "credit_card", "ssn"}

def redact_sensitive(logger, method_name, event_dict):
    for key in list(event_dict.keys()):
        if key.lower() in SENSITIVE:
            event_dict[key] = "[REDACTED]"
    return event_dict

structlog.configure(
    processors=[
        redact_sensitive,   # Run early in the chain
        structlog.processors.add_log_level,
        structlog.processors.JSONRenderer(),
    ],
)

log.info("login_attempt", username="alice", password="secret123")
# {"event": "login_attempt", "username": "alice", "password": "[REDACTED]"}

Fix 7: Exception Logging

import structlog
log = structlog.get_logger()

try:
    risky_operation()
except Exception:
    log.exception("operation_failed")   # Includes full traceback

format_exc_info processor must be in the chain for tracebacks to render:

structlog.configure(
    processors=[
        structlog.processors.add_log_level,
        structlog.processors.format_exc_info,    # Required for exception tracebacks
        structlog.processors.JSONRenderer(),
    ],
)

Add exception type and message as fields:

def add_exception_info(logger, method_name, event_dict):
    exc_info = event_dict.get("exc_info")
    if exc_info:
        exc_type, exc_value, _ = exc_info
        event_dict["exc_type"] = exc_type.__name__
        event_dict["exc_message"] = str(exc_value)
    return event_dict

structlog.configure(
    processors=[
        add_exception_info,
        structlog.processors.format_exc_info,
        structlog.processors.JSONRenderer(),
    ],
)

With manual exception:

try:
    1/0
except ZeroDivisionError as e:
    log.error("calculation_failed", error=str(e), error_type=type(e).__name__)
    # Or use log.exception("calculation_failed") to auto-include traceback

Fix 8: Testing with Structlog

Structlog provides testing.capture_logs for inspecting logs in tests:

import structlog

def fn():
    log = structlog.get_logger()
    log.info("event", user_id=123)

def test_fn():
    with structlog.testing.capture_logs() as cap_logs:
        fn()
    assert cap_logs == [{"event": "event", "user_id": 123, "log_level": "info"}]

For pytest, configure Structlog to use the test renderer:

# conftest.py
import structlog
import pytest

@pytest.fixture(autouse=True)
def configure_structlog():
    structlog.configure(
        processors=[
            structlog.processors.add_log_level,
            structlog.testing.LogCapture(),
        ],
        wrapper_class=structlog.testing.BoundLoggerMock,
    )

For pytest fixture patterns that pair with structlog testing, see pytest fixture not found.

Still Not Working?

Structlog vs Loguru

  • Structlog — Processor chain, more configurable, first-class context vars, production observability focus.
  • Loguru — Simpler API, fewer concepts, drop-in replacement for stdlib logging. See Loguru not working.

Use Loguru for simplicity. Use Structlog when you need processor chains (custom transformations, redaction, sampling) or strict JSON output with context propagation.

Performance Considerations

Structlog is generally fast but the chain runs on every log call. Optimizations:

# 1. Cache logger on first use (avoids re-creation per call)
structlog.configure(cache_logger_on_first_use=True, ...)

# 2. Filter early in the chain (skip expensive processors for dropped logs)
processors = [
    filter_by_level,             # Cheap; drops first
    structlog.processors.add_log_level,
    expensive_enrichment,         # Only runs if not dropped
    structlog.processors.JSONRenderer(),
]

# 3. Lazy evaluation of expensive values
log.info("done", expensive=lambda: compute_summary())
# Structlog evaluates the lambda only if the log is rendered (not dropped)

Integration with FastAPI/Django/Flask

For FastAPI, use middleware as shown above. For Django:

# settings.py
LOGGING = {
    "version": 1,
    "disable_existing_loggers": False,
    "formatters": {
        "json": {
            "()": structlog.stdlib.ProcessorFormatter,
            "processor": structlog.processors.JSONRenderer(),
        },
    },
    "handlers": {
        "default": {"class": "logging.StreamHandler", "formatter": "json"},
    },
    "root": {"handlers": ["default"], "level": "INFO"},
}

For Django-specific logging patterns, see Django migration conflict for environment patterns that affect logging configuration.

Multiple Output Sinks

Sometimes you want different log destinations — JSON to a file for aggregation, colored output to stdout for humans. Configure stdlib logging with two handlers:

import logging
import structlog
import sys

shared_processors = [
    structlog.contextvars.merge_contextvars,
    structlog.processors.add_log_level,
    structlog.processors.TimeStamper(fmt="iso"),
]

# Console handler — colored
console_formatter = structlog.stdlib.ProcessorFormatter(
    foreign_pre_chain=shared_processors,
    processors=[
        structlog.stdlib.ProcessorFormatter.remove_processors_meta,
        structlog.dev.ConsoleRenderer(colors=True),
    ],
)
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setFormatter(console_formatter)

# File handler — JSON
file_formatter = structlog.stdlib.ProcessorFormatter(
    foreign_pre_chain=shared_processors,
    processors=[
        structlog.stdlib.ProcessorFormatter.remove_processors_meta,
        structlog.processors.JSONRenderer(),
    ],
)
file_handler = logging.FileHandler("app.log")
file_handler.setFormatter(file_formatter)

root = logging.getLogger()
root.addHandler(console_handler)
root.addHandler(file_handler)
root.setLevel(logging.INFO)

This pattern leverages stdlib’s multi-handler model — Structlog handles record formatting per handler, stdlib handles routing.

Cloud Logging Integration

GCP, AWS, and DataDog all consume JSON logs with specific field names:

# Rename fields to GCP convention
def gcp_field_mapping(logger, method_name, event_dict):
    event_dict["severity"] = event_dict.pop("log_level", "INFO").upper()
    event_dict["message"] = event_dict.pop("event")
    return event_dict

structlog.configure(processors=[..., gcp_field_mapping, JSONRenderer()])

For Loguru-based cloud logging patterns that compare to this Structlog approach, see Loguru not working.

Combining with Tenacity for Retry Logging

import structlog
from tenacity import retry, stop_after_attempt, before_sleep_log
import logging

log = structlog.get_logger()

@retry(
    stop=stop_after_attempt(5),
    before_sleep=before_sleep_log(logging.getLogger(), logging.INFO),
)
def fetch():
    ...

For Tenacity-specific retry patterns, see Tenacity not working.

Debugging Misconfigured Chains

If logs come out wrong (missing fields, wrong format), add a print processor in the middle to inspect the dict at that point:

def debug_processor(logger, method_name, event_dict):
    print(f"DEBUG processor state: {event_dict}")
    return event_dict

structlog.configure(processors=[
    structlog.processors.add_log_level,
    debug_processor,                # Inspect here
    structlog.contextvars.merge_contextvars,
    debug_processor,                # And here
    structlog.processors.JSONRenderer(),
])

This makes the chain’s transformations visible — invaluable when a custom processor isn’t producing the expected output. Remove debug processors before deploying.

F

FixDevs

Solo developer based in Japan. Every solution is cross-referenced with official documentation and tested before publishing.

Was this article helpful?

Related Articles