Skip to main content
Logging lets your server send debug/info/warning/error messages to MCP clients while handling a request. This is helpful for visibility during tool execution and for debugging. Note: Clients decide how (or whether) to display these logs.

Basic usage

from dedalus_mcp import get_context, tool

@tool(description="Process data")
async def process(data: str) -> str:
    ctx = get_context()

    await ctx.info("Processing", data={"bytes": len(data)})
    # ... your work ...
    await ctx.info("Processing complete")

    return "done"

Log levels

await ctx.debug("Detailed debugging info")
await ctx.info("General operational messages")
await ctx.warning("Warning conditions")
await ctx.error("Error conditions")
MethodLevelUse case
ctx.debug()DEBUGDetailed debugging information
ctx.info()INFOGeneral operational messages
ctx.warning()WARNINGWarning conditions
ctx.error()ERRORError conditions

Example: Data pipeline

from dedalus_mcp import get_context, tool

@tool(description="Run data pipeline")
async def run_pipeline(source: str) -> dict:
    ctx = get_context()

    await ctx.info("Starting pipeline", data={"source": source})

    # Load
    await ctx.debug("Loading data...")
    data = load_data(source)  # your code
    await ctx.info("Loaded records", data={"count": len(data)})

    # Transform
    await ctx.debug("Transforming data...")
    try:
        transformed = transform(data)  # your code
    except ValueError as e:
        await ctx.warning("Transform warning", data={"error": str(e)})
        transformed = fallback_transform(data)  # your code

    # Save
    await ctx.debug("Saving results...")
    try:
        save(transformed)  # your code
        await ctx.info("Pipeline complete", data={"records": len(transformed)})
    except OSError as e:
        await ctx.error("Save failed", data={"error": str(e)})
        raise

    return {"records": len(transformed)}

Example: Batch processing

from dedalus_mcp import get_context, tool

@tool(description="Process items in batch")
async def batch_process(items: list[str]) -> dict:
    ctx = get_context()
    results = {"success": 0, "failed": 0}

    await ctx.info("Starting batch", data={"items": len(items)})

    for i, item in enumerate(items, start=1):
        await ctx.debug("Processing item", data={"index": i, "total": len(items), "item": item})
        try:
            process_item(item)  # your code
            results["success"] += 1
        except Exception as e:
            await ctx.warning("Item failed", data={"item": item, "error": str(e)})
            results["failed"] += 1

    if results["failed"]:
        await ctx.warning("Batch completed with failures", data=results)
    else:
        await ctx.info("Batch completed successfully", data=results)

    return results

Structured logging

Pass structured fields using data=:
await ctx.info(
    "Request processed",
    data={
        "duration_ms": 150,
        "items_processed": 42,
    },
)
Tip: Avoid using the key "msg" inside data—Dedalus MCP uses "msg" internally for the main message text.