处理程序
报告错误代码
复制
询问AI
from dedalus_mcp.client import ClientCapabilitiesConfig, open_connection
from dedalus_mcp.types import (
ElicitRequestParams,
ElicitResult,
ErrorData,
)
async def elicitation_handler(
context: object,
params: ElicitRequestParams,
) -> ElicitResult | ErrorData:
"""处理来自服务器的征询请求。"""
print(f"\nServer requests input: {params.message}")
# 收集 schema 中每个字段的值
content = {}
schema = params.requestedSchema
properties = schema.get("properties", {})
required = schema.get("required", [])
for field, field_schema in properties.items():
field_type = field_schema.get("type", "string")
is_required = field in required
value = input(f"{field} ({field_type}): ")
if not value and is_required:
return ElicitResult(action="decline")
# 类型强制转换
if field_type == "boolean":
content[field] = value.lower() in ("true", "yes", "1", "y")
elif field_type == "integer":
content[field] = int(value)
elif field_type == "number":
content[field] = float(value)
else:
content[field] = value
return ElicitResult(action="accept", content=content)
使用方法
报告错误代码
复制
询问AI
capabilities = ClientCapabilitiesConfig(elicitation=elicitation_handler)
async with open_connection(
url="http://127.0.0.1:8000/mcp",
transport="streamable-http",
capabilities=capabilities,
) as client:
# 服务器现在可以在 tool 执行期间请求用户输入
result = await client.call_tool("deploy", {"env": "production"})
操作
| 操作 | 描述 |
|---|---|
accept | 将收集到的内容提交到服务器 |
decline | 拒绝该请求,但不终止工作流 |
cancel | 完全终止整个操作 |
报告错误代码
复制
询问AI
# 接受并返回内容
return ElicitResult(action="accept", content={"confirmed": True})
# Decline (user refused)
return ElicitResult(action="decline")
# Cancel (abort operation)
return ElicitResult(action="cancel")
自动确认示例
报告错误代码
复制
询问AI
async def elicitation_handler(context, params):
"""自动接受并使用默认值进行测试。"""
content = {}
properties = params.requestedSchema.get("properties", {})
for field, schema in properties.items():
field_type = schema.get("type", "string")
if field_type == "boolean":
content[field] = True
elif field_type in ("integer", "number"):
content[field] = 0
else:
content[field] = "auto-filled"
return ElicitResult(action="accept", content=content)