跳转到主要内容
import asyncio
from dedalus_labs import AsyncDedalus, DedalusRunner
from dedalus_labs.utils.stream import stream_async
from dotenv import load_dotenv

load_dotenv()

def execute_python_code(code: str) -> str:
    """
    执行 Python 代码并返回结果。
    在受控命名空间中安全地执行代码。
    """
    try:
        namespace = {}
        exec(code, {"__builtins__": __builtins__}, namespace)

        if 'result' in namespace:
            return str(namespace['result'])

        results = {k: v for k, v in namespace.items() if not k.startswith('_')}
        return str(results) if results else "Code executed successfully"
    except Exception as e:
        return f"Error executing code: {str(e)}"

async def main():
    client = AsyncDedalus()
    runner = DedalusRunner(client)

    result = runner.run(
        input="""Research the current stock price of Tesla (TSLA) and Apple (AAPL).
        Then write and execute Python code to:
        1. Compare their current prices
        2. Calculate the percentage difference
        3. Determine which stock has grown more in the past year based on the data you find
        4. Provide investment insights based on your analysis

        Use web search to get the latest stock information.""",
        model="openai/gpt-5",
        tools=[execute_python_code],
        mcp_servers=["windsor/brave-search-mcp"],
        stream=True
    )

    await stream_async(result)

if __name__ == "__main__":
    asyncio.run(main())
这个数据分析师示例将实时网页搜索与代码执行能力结合起来:
  • Brave Search MCP (windsor/brave-search-mcp):从网络获取实时数据
  • execute_python_code 工具:允许智能体编写并运行用于分析的 Python 代码
智能体可以搜索当前信息、提取相关数据,然后动态编写代码对其进行分析并生成洞见。注意:在生产环境中,请考虑使用沙箱化的代码执行以确保安全性。