import asyncio
import webbrowser
from collections.abc import Awaitable, Callable
from typing import TypeVar
from dotenv import load_dotenv
load_dotenv()
from dedalus_labs import AsyncDedalus, AuthenticationError
T = TypeVar("T")
async def with_oauth_retry(fn: Callable[[], Awaitable[T]]) -> T:
try:
return await fn()
except AuthenticationError as e:
body = e.body if isinstance(e.body, dict) else {}
url = body.get("connect_url") or body.get("detail", {}).get("connect_url")
if not url:
raise
webbrowser.open(url)
input("\n完成 OAuth 后按 Enter...")
return await fn()
async def main():
client = AsyncDedalus()
async def do_request():
return await client.chat.completions.create(
model="openai/gpt-4.1",
messages=[
{
"role": "user",
"content": "列出我最近的邮件并进行总结",
}
],
mcp_servers=["anny_personal/gmail-mcp"],
)
resp = await with_oauth_retry(do_request)
print(resp.choices[0].message.content)
if resp.mcp_tool_results:
for r in resp.mcp_tool_results:
print(f"{r.tool_name} ({r.duration_ms}ms): {r.result}")
asyncio.run(main())