跳转到主要内容
进度上报功能允许你的服务器在某个 tool 运行期间向 Client 发送进度更新。Client 可以利用这些更新,在耗时操作过程中显示加载动画、进度条以及“仍在处理中”的界面。 注意:仅当 Client 为此请求提供了进度令牌时,ctx.report_progress(...) 才会发送通知。如果 Client 没有请求进度,这个调用不会执行任何操作。

基本用法

from dedalus_mcp import get_context, tool

@tool(description="Process files")
async def process_files(files: list[str]) -> str:
    ctx = get_context()

    for i, file in enumerate(files, start=1):
        await ctx.report_progress(i, total=len(files), message=f"Processing {file}")
        process_file(file)  # 你的代码;如果此操作阻塞,请将其卸载或改为异步

    return f"Processed {len(files)} files"

参数

await ctx.report_progress(
    progress=50,                 # 当前进度值
    total=100,                   # 可选总数(用于计算百分比)
    message="Halfway done...",   # 可选状态消息
)
参数类型描述
progressint | float当前进度值
totalint | float | None可选的总进度值;用于显示百分比 UI
messagestr | None可选的、便于阅读的状态文本

示例:下载文件

from dedalus_mcp import get_context, tool

@tool(description="Download files")
async def download_files(urls: list[str]) -> dict:
    ctx = get_context()
    downloaded: list[str] = []

    for i, url in enumerate(urls, start=1):
        await ctx.report_progress(i - 1, total=len(urls), message=f"Downloading {url}")
        await ctx.info("Downloading", data={"url": url})

        path = await download(url)  # 你的代码
        downloaded.append(path)

        await ctx.report_progress(i, total=len(urls))

    return {"files": downloaded}

示例:数据处理管道

from dedalus_mcp import get_context, tool

@tool(description="处理大型数据集")
async def process_dataset(dataset_id: str) -> dict:
    ctx = get_context()

    await ctx.report_progress(0, total=100, message="加载数据集")
    data = load_dataset(dataset_id)  # 你的代码
    await ctx.report_progress(30, total=100, message="转换数据")

    total_items = len(data)
    if total_items:
        for i, item in enumerate(data, start=1):
            # 将项进度映射到 30..70 范围
            progress = 30 + int((i / total_items) * 40)
            await ctx.report_progress(progress, total=100)
            transform(item)  # 你的代码

    await ctx.report_progress(70, total=100, message="保存结果")
    save_results(data)  # 你的代码
    await ctx.report_progress(100, total=100, message="完成")

    return {"processed": len(data)}

示例:不确定总量的进度

如果你一开始不知道总量,可以省略 total,并定期发送进度更新:
from dedalus_mcp import get_context, tool

@tool(description="Search until found")
async def search(query: str) -> str:
    ctx = get_context()
    pages_searched = 0

    while True:
        pages_searched += 1
        await ctx.report_progress(pages_searched, message=f"Searched {pages_searched} pages")

        result = search_page(query, pages_searched)  # 你的代码
        if result:
            return result

        if pages_searched > 100:
            return "Not found"

提示

  • 优先使用异步工作:当你的工具在执行 I/O(async def)时,进度更新最有用。如果你在做大量 CPU 密集型或阻塞型工作,考虑将其卸载出去,这样进度通知仍然可以正常发送。
  • 谨慎使用 message:像 “Downloading…”, “Transforming…”, “Saving…” 这样的短消息最方便 Client 展示。
  • 不要频繁推送更新:在每一个微小步骤上都发送进度会产生大量噪音。对于非常大的循环,你可以只在每处理 N 个项时报告一次。