Skip to content

AI 智能体(Agent)开发实战:从架构设计到多 Agent 协作

概述

人工智能智能体(AI Agent)是 2024-2026 年 AI 领域最热门的方向之一。从 AutoGPT 的爆火到 LangChain、LlamaIndex 等框架的成熟,AI Agent 正在从概念走向实际应用。本文将深入讲解 AI Agent 的核心架构、关键组件、工具调用机制,并通过 5 个实战案例带你从零构建功能强大的 AI 智能体系统。

本文适合人群:

  • 有 Python 编程基础的开发者
  • 对 AI Agent 感兴趣的技术人员
  • 希望构建自动化 AI 应用的工程师

学习收获:

  • 理解 AI Agent 的核心架构和设计理念
  • 掌握工具调用、记忆系统、规划能力等关键技术
  • 完成 5 个从简单到复杂的实战项目
  • 了解多 Agent 协作系统的设计模式

一、AI Agent 核心概念

1.1 什么是 AI Agent?

AI Agent 是指能够感知环境、进行推理、做出决策并执行动作的智能系统。与大语言模型(LLM)不同,Agent 具有以下特征:

特性LLMAI Agent
感知能力被动接收输入主动感知环境
记忆能力上下文窗口限制长期记忆系统
行动能力仅生成文本调用工具/API
规划能力单次推理多步任务规划
自主性

1.2 Agent 的核心组件

一个完整的 AI Agent 系统通常包含以下核心组件:

┌─────────────────────────────────────────────────────────┐
│                    AI Agent 架构                         │
├─────────────────────────────────────────────────────────┤
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐     │
│  │   感知层    │  │   认知层    │  │   行动层    │     │
│  │  (输入处理)  │  │  (推理决策)  │  │  (工具调用)  │     │
│  └──────┬──────┘  └──────┬──────┘  └──────┬──────┘     │
│         │                │                │            │
│  ┌──────▼──────┐  ┌──────▼──────┐  ┌──────▼──────┐     │
│  │  环境观察   │  │  LLM 核心   │  │  执行器     │     │
│  │  用户输入   │  │  规划模块   │  │  API 调用   │     │
│  │  文件读取   │  │  记忆检索   │  │  代码执行   │     │
│  └─────────────┘  └──────┬──────┘  └─────────────┘     │
│                          │                              │
│                   ┌──────▼──────┐                       │
│                   │   记忆系统   │                       │
│                   │  短期记忆   │                       │
│                   │  长期记忆   │                       │
│                   │  向量数据库  │                       │
│                   └─────────────┘                       │
└─────────────────────────────────────────────────────────┘

1.3 Agent 的工作流程

典型的 Agent 工作流程遵循 ReAct(Reasoning + Acting)模式:

1. 观察(Observe) → 接收用户输入和环境信息
2. 思考(Think)   → LLM 分析任务,制定计划
3. 行动(Act)     → 调用工具执行具体操作
4. 反思(Reflect) → 评估结果,调整策略
5. 重复 2-4 步直到任务完成

二、环境搭建与基础框架

2.1 安装依赖

bash
# 创建项目目录
mkdir ai-agent-practice && cd ai-agent-practice

# 创建虚拟环境
python3 -m venv venv
source venv/bin/activate

# 安装核心依赖
pip install langchain langchain-openai langchain-community
pip install chromadb sentence-transformers
pip install python-dotenv pydantic requests
pip install duckduckgo-search wikipedia

2.2 配置环境变量

创建 .env 文件:

bash
# .env
OPENAI_API_KEY=your_api_key_here
OPENAI_API_BASE=https://api.openai.com/v1
# 或使用其他兼容的 API 服务
# OPENAI_API_BASE=https://api.moonshot.cn/v1

2.3 基础 Agent 类设计

python
# agent_core.py
from typing import List, Dict, Optional, Any
from pydantic import BaseModel, Field
from datetime import datetime
import json

class Tool(BaseModel):
    """工具定义"""
    name: str
    description: str
    parameters: Dict[str, Any]
    func: Any  # 实际执行的函数

class Memory(BaseModel):
    """记忆条目"""
    content: str
    timestamp: datetime
    type: str = "short_term"  # short_term, long_term, episodic
    tags: List[str] = []

class AgentState(BaseModel):
    """Agent 状态"""
    current_goal: str = ""
    completed_steps: List[str] = []
    pending_actions: List[Dict] = []
    context: Dict[str, Any] = {}

class BaseAgent:
    """基础 Agent 类"""
    
    def __init__(self, model: str = "gpt-4", tools: List[Tool] = None):
        self.model = model
        self.tools = tools or []
        self.memory: List[Memory] = []
        self.state = AgentState()
        self.conversation_history = []
        
    def add_tool(self, tool: Tool):
        """注册工具"""
        self.tools.append(tool)
        
    def add_memory(self, content: str, mem_type: str = "short_term", tags: List[str] = None):
        """添加记忆"""
        memory = Memory(
            content=content,
            timestamp=datetime.now(),
            type=mem_type,
            tags=tags or []
        )
        self.memory.append(memory)
        
    def get_relevant_memories(self, query: str, limit: int = 5) -> List[Memory]:
        """检索相关记忆(简化版,实际应使用向量检索)"""
        # 简单关键词匹配
        relevant = []
        for mem in self.memory:
            if query.lower() in mem.content.lower():
                relevant.append(mem)
        return relevant[:limit]
    
    def think(self, observation: str) -> str:
        """思考过程:分析任务,制定计划"""
        # 实际实现中会调用 LLM
        prompt = f"""
        当前目标:{self.state.current_goal}
        已完成步骤:{self.state.completed_steps}
        新观察:{observation}
        
        请分析当前情况,制定下一步行动计划。
        如果需要调用工具,请明确指出工具名称和参数。
        """
        return prompt
    
    def act(self, tool_name: str, **kwargs) -> Any:
        """执行动作:调用工具"""
        tool = next((t for t in self.tools if t.name == tool_name), None)
        if not tool:
            raise ValueError(f"工具 {tool_name} 不存在")
        return tool.func(**kwargs)
    
    def run(self, goal: str) -> str:
        """运行 Agent 完成目标"""
        self.state.current_goal = goal
        observation = f"开始执行任务:{goal}"
        
        max_iterations = 10
        for i in range(max_iterations):
            # 思考
            thought = self.think(observation)
            
            # 这里应该调用 LLM 解析思考结果
            # 简化处理,直接返回
            if i == max_iterations - 1:
                return f"任务完成:{goal}"
                
            observation = "继续执行..."
            
        return "达到最大迭代次数"

三、实战案例

案例 1:智能研究助手

目标: 构建一个能够自动搜索、整理信息的 AI 研究助手

功能需求:

  • 网络搜索获取最新信息
  • 维基百科查询背景知识
  • 自动整理和总结信息
  • 生成结构化的研究报告

实现代码

python
# research_agent.py
import requests
from langchain.tools import DuckDuckGoSearchRun
from langchain_community.tools import WikipediaQueryRun
from langchain_community.utilities import WikipediaAPIWrapper
from typing import List, Dict
import json

class ResearchAgent(BaseAgent):
    """研究助手 Agent"""
    
    def __init__(self):
        super().__init__()
        
        # 定义工具
        search_tool = Tool(
            name="web_search",
            description="搜索互联网获取最新信息",
            parameters={"query": "搜索关键词"},
            func=DuckDuckGoSearchRun().run
        )
        
        wiki_tool = Tool(
            name="wikipedia",
            description="查询维基百科获取背景知识",
            parameters={"query": "查询主题"},
            func=WikipediaQueryRun(api_wrapper=WikipediaAPIWrapper()).run
        )
        
        self.add_tool(search_tool)
        self.add_tool(wiki_tool)
    
    def research(self, topic: str, depth: int = 2) -> Dict:
        """执行研究任务"""
        self.state.current_goal = f"研究主题:{topic}"
        results = {
            "topic": topic,
            "web_results": [],
            "wiki_summary": "",
            "final_report": ""
        }
        
        # 步骤 1:网络搜索
        print(f"🔍 正在搜索:{topic}")
        search_query = f"{topic} 2025 2026 最新进展"
        web_content = self.act("web_search", query=search_query)
        results["web_results"] = web_content[:500]  # 截取部分内容
        
        # 步骤 2:维基百科查询
        print(f"📚 查询维基百科:{topic}")
        try:
            wiki_content = self.act("wikipedia", query=topic)
            results["wiki_summary"] = wiki_content[:1000]
        except Exception as e:
            results["wiki_summary"] = f"维基百科查询失败:{e}"
        
        # 步骤 3:生成研究报告
        print("📝 生成研究报告...")
        report = self.generate_report(topic, results)
        results["final_report"] = report
        
        # 保存记忆
        self.add_memory(
            content=f"研究主题:{topic}, 报告长度:{len(report)}字",
            mem_type="long_term",
            tags=["research", topic]
        )
        
        return results
    
    def generate_report(self, topic: str, data: Dict) -> str:
        """生成结构化研究报告"""
        report = f"""# 研究报告:{topic}

## 一、概述
{topic}是当前备受关注的研究领域。本报告基于网络搜索和维基百科的信息整理而成。

## 二、背景知识
{data['wiki_summary'][:500]}...

## 三、最新进展
{data['web_results'][:500]}...

## 四、关键发现
1. [待补充]
2. [待补充]
3. [待补充]

## 五、参考资料
- 网络搜索结果
- 维基百科

## 六、后续研究方向
[待补充]

---
报告生成时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
"""
        return report

# 使用示例
if __name__ == "__main__":
    agent = ResearchAgent()
    result = agent.research("人工智能 Agent 技术")
    print(result["final_report"])

运行结果示例

🔍 正在搜索:人工智能 Agent 技术
📚 查询维基百科:人工智能 Agent 技术
📝 生成研究报告...

# 研究报告:人工智能 Agent 技术

## 一、概述
人工智能 Agent 技术是当前备受关注的研究领域...

## 二、背景知识
AI Agent 是指能够自主感知、决策和行动的 inteligente 系统...

## 三、最新进展
2025-2026 年,AI Agent 技术在多个领域取得突破...

案例 2:代码生成与执行 Agent

目标: 构建能够理解需求、生成代码并安全执行的编程助手

功能需求:

  • 理解自然语言编程需求
  • 生成可执行的 Python 代码
  • 在沙箱环境中运行代码
  • 返回执行结果和解释

实现代码

python
# coding_agent.py
import subprocess
import tempfile
import os
from typing import Optional, Tuple
import re

class CodingAgent(BaseAgent):
    """代码生成与执行 Agent"""
    
    def __init__(self, sandbox: bool = True):
        super().__init__()
        self.sandbox = sandbox
        self.execution_history = []
        
        # 代码执行工具
        exec_tool = Tool(
            name="execute_python",
            description="在沙箱环境中执行 Python 代码",
            parameters={"code": "要执行的 Python 代码"},
            func=self._safe_execute
        )
        self.add_tool(exec_tool)
    
    def _safe_execute(self, code: str, timeout: int = 10) -> Tuple[bool, str]:
        """安全执行 Python 代码"""
        # 安全检查:禁止危险操作
        dangerous_patterns = [
            r'__import__\s*\(',
            r'eval\s*\(',
            r'exec\s*\(',
            r'open\s*\([^)]*[\'"][^\'"]*[\'"]\s*,\s*[\'"]w',
            r'os\.system',
            r'subprocess\.',
            r'shutil\.rmtree',
        ]
        
        for pattern in dangerous_patterns:
            if re.search(pattern, code):
                return False, f"安全警告:检测到危险操作 '{pattern}'"
        
        # 创建临时文件执行
        try:
            with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f:
                f.write(code)
                temp_path = f.name
            
            # 执行代码
            result = subprocess.run(
                ['python3', temp_path],
                capture_output=True,
                text=True,
                timeout=timeout,
                cwd=tempfile.gettempdir()
            )
            
            os.unlink(temp_path)
            
            if result.returncode == 0:
                return True, result.stdout
            else:
                return False, f"执行错误:{result.stderr}"
                
        except subprocess.TimeoutExpired:
            return False, "执行超时"
        except Exception as e:
            return False, f"执行异常:{str(e)}"
    
    def generate_code(self, requirement: str) -> str:
        """根据需求生成代码"""
        # 实际实现中会调用 LLM
        prompt = f"""
        请根据以下需求生成 Python 代码:
        
        需求:{requirement}
        
        要求:
        1. 代码完整可运行
        2. 包含必要的注释
        3. 遵循 Python 最佳实践
        4. 只返回代码,不要其他说明
        
        生成的代码:
        """
        return prompt
    
    def solve(self, problem: str) -> Dict:
        """解决编程问题"""
        self.state.current_goal = f"解决问题:{problem}"
        result = {
            "problem": problem,
            "generated_code": "",
            "execution_success": False,
            "output": "",
            "explanation": ""
        }
        
        # 步骤 1:生成代码
        print(f"💻 生成代码:{problem}")
        code = self.generate_code(problem)
        # 这里简化处理,实际需要 LLM 生成
        code = self._generate_sample_code(problem)
        result["generated_code"] = code
        
        # 步骤 2:执行代码
        print("▶️ 执行代码...")
        success, output = self.act("execute_python", code=code)
        result["execution_success"] = success
        result["output"] = output
        
        # 步骤 3:生成解释
        result["explanation"] = self._explain_solution(problem, code, output)
        
        # 记录执行历史
        self.execution_history.append({
            "problem": problem,
            "code": code,
            "success": success
        })
        
        return result
    
    def _generate_sample_code(self, problem: str) -> str:
        """生成示例代码(实际应由 LLM 生成)"""
        if "斐波那契" in problem or "fibonacci" in problem.lower():
            return """
def fibonacci(n):
    '''计算斐波那契数列第 n 项'''
    if n <= 0:
        return 0
    elif n == 1:
        return 1
    else:
        a, b = 0, 1
        for _ in range(2, n + 1):
            a, b = b, a + b
        return b

# 测试
for i in range(10):
    print(f"F({i}) = {fibonacci(i)}")
"""
        elif "排序" in problem or "sort" in problem.lower():
            return """
def quick_sort(arr):
    '''快速排序实现'''
    if len(arr) <= 1:
        return arr
    pivot = arr[len(arr) // 2]
    left = [x for x in arr if x < pivot]
    middle = [x for x in arr if x == pivot]
    right = [x for x in arr if x > pivot]
    return quick_sort(left) + middle + quick_sort(right)

# 测试
test_array = [64, 34, 25, 12, 22, 11, 90]
print(f"原数组:{test_array}")
print(f"排序后:{quick_sort(test_array)}")
"""
        else:
            return """
# 通用示例代码
print("Hello, World!")
print("这是一个示例程序")

# 计算 1 到 100 的和
total = sum(range(1, 101))
print(f"1 到 100 的和:{total}")
"""
    
    def _explain_solution(self, problem: str, code: str, output: str) -> str:
        """生成解决方案解释"""
        return f"""
## 解决方案说明

### 问题分析
{problem}

### 实现思路
1. 理解问题需求
2. 设计算法逻辑
3. 编写可执行代码
4. 测试验证结果

### 代码要点
- 使用函数封装核心逻辑
- 添加必要的错误处理
- 包含清晰的注释

### 执行结果
{output[:500] if output else "无输出"}
"""

# 使用示例
if __name__ == "__main__":
    agent = CodingAgent()
    
    # 示例 1:斐波那契数列
    result1 = agent.solve("生成计算斐波那契数列的 Python 代码")
    print(result1["explanation"])
    
    # 示例 2:排序算法
    result2 = agent.solve("实现快速排序算法并测试")
    print(result2["explanation"])

运行结果示例

💻 生成代码:生成计算斐波那契数列的 Python 代码
▶️ 执行代码...

## 解决方案说明

### 问题分析
生成计算斐波那契数列的 Python 代码

### 实现思路
1. 理解问题需求
2. 设计算法逻辑
3. 编写可执行代码
4. 测试验证结果

### 代码要点
- 使用函数封装核心逻辑
- 添加必要的错误处理
- 包含清晰的注释

### 执行结果
F(0) = 0
F(1) = 1
F(2) = 1
F(3) = 2
...

案例 3:个人任务管理 Agent

目标: 构建能够理解、规划和跟踪个人任务的智能助手

功能需求:

  • 自然语言任务解析
  • 任务分解和优先级排序
  • 进度跟踪和提醒
  • 与日历/待办事项集成

实现代码

python
# task_agent.py
from datetime import datetime, timedelta
from typing import List, Dict, Optional
from enum import Enum
import json
import os

class TaskPriority(Enum):
    LOW = 1
    MEDIUM = 2
    HIGH = 3
    URGENT = 4

class TaskStatus(Enum):
    PENDING = "pending"
    IN_PROGRESS = "in_progress"
    COMPLETED = "completed"
    BLOCKED = "blocked"

class Task(BaseModel):
    """任务定义"""
    id: str
    title: str
    description: str = ""
    priority: TaskPriority = TaskPriority.MEDIUM
    status: TaskStatus = TaskStatus.PENDING
    created_at: datetime
    due_date: Optional[datetime] = None
    subtasks: List[str] = []
    tags: List[str] = []

class TaskManagerAgent(BaseAgent):
    """任务管理 Agent"""
    
    def __init__(self, storage_path: str = "tasks.json"):
        super().__init__()
        self.storage_path = storage_path
        self.tasks: Dict[str, Task] = {}
        self.load_tasks()
        
        # 注册工具
        self._register_tools()
    
    def _register_tools(self):
        """注册任务管理工具"""
        tools = [
            Tool(
                name="create_task",
                description="创建新任务",
                parameters={"title": "任务标题", "description": "任务描述", "priority": "优先级"},
                func=self.create_task
            ),
            Tool(
                name="update_task",
                description="更新任务状态",
                parameters={"task_id": "任务 ID", "status": "新状态"},
                func=self.update_task_status
            ),
            Tool(
                name="list_tasks",
                description="列出任务",
                parameters={"status": "可选的状态过滤"},
                func=self.list_tasks
            ),
            Tool(
                name="get_daily_plan",
                description="获取今日计划",
                parameters={},
                func=self.get_daily_plan
            )
        ]
        for tool in tools:
            self.add_tool(tool)
    
    def load_tasks(self):
        """从文件加载任务"""
        if os.path.exists(self.storage_path):
            try:
                with open(self.storage_path, 'r', encoding='utf-8') as f:
                    data = json.load(f)
                    for task_id, task_data in data.items():
                        task_data['created_at'] = datetime.fromisoformat(task_data['created_at'])
                        if task_data.get('due_date'):
                            task_data['due_date'] = datetime.fromisoformat(task_data['due_date'])
                        task_data['priority'] = TaskPriority(task_data['priority'])
                        task_data['status'] = TaskStatus(task_data['status'])
                        self.tasks[task_id] = Task(**task_data)
            except Exception as e:
                print(f"加载任务失败:{e}")
    
    def save_tasks(self):
        """保存任务到文件"""
        data = {}
        for task_id, task in self.tasks.items():
            task_dict = task.dict()
            task_dict['created_at'] = task.created_at.isoformat()
            if task.due_date:
                task_dict['due_date'] = task.due_date.isoformat()
            task_dict['priority'] = task.priority.value
            task_dict['status'] = task.status.value
            data[task_id] = task_dict
        
        with open(self.storage_path, 'w', encoding='utf-8') as f:
            json.dump(data, f, ensure_ascii=False, indent=2)
    
    def parse_task_request(self, request: str) -> Dict:
        """解析自然语言任务请求"""
        # 简化实现,实际应使用 LLM
        task_info = {
            "title": request[:50],
            "description": request,
            "priority": TaskPriority.MEDIUM,
            "due_date": None
        }
        
        # 关键词识别优先级
        if any(word in request for word in ["紧急", "急", "urgent", "ASAP"]):
            task_info["priority"] = TaskPriority.URGENT
        elif any(word in request for word in ["重要", "important", "high"]):
            task_info["priority"] = TaskPriority.HIGH
        elif any(word in request for word in ["低", "low"]):
            task_info["priority"] = TaskPriority.LOW
        
        # 识别截止日期
        if "今天" in request or "today" in request.lower():
            task_info["due_date"] = datetime.now().replace(hour=23, minute=59)
        elif "明天" in request or "tomorrow" in request.lower():
            task_info["due_date"] = (datetime.now() + timedelta(days=1)).replace(hour=23, minute=59)
        elif "本周" in request or "this week" in request.lower():
            days_until_sunday = 6 - datetime.now().weekday()
            task_info["due_date"] = (datetime.now() + timedelta(days=days_until_sunday)).replace(hour=23, minute=59)
        
        return task_info
    
    def create_task(self, title: str, description: str = "", priority: str = "medium") -> str:
        """创建新任务"""
        task_id = f"task_{datetime.now().strftime('%Y%m%d%H%M%S')}"
        
        priority_map = {
            "low": TaskPriority.LOW,
            "medium": TaskPriority.MEDIUM,
            "high": TaskPriority.HIGH,
            "urgent": TaskPriority.URGENT
        }
        
        task = Task(
            id=task_id,
            title=title,
            description=description,
            priority=priority_map.get(priority.lower(), TaskPriority.MEDIUM),
            created_at=datetime.now()
        )
        
        self.tasks[task_id] = task
        self.save_tasks()
        
        self.add_memory(
            content=f"创建任务:{title}",
            mem_type="episodic",
            tags=["task", "create"]
        )
        
        return f"✅ 任务已创建:{title} (ID: {task_id})"
    
    def update_task_status(self, task_id: str, status: str) -> str:
        """更新任务状态"""
        if task_id not in self.tasks:
            return f"❌ 任务不存在:{task_id}"
        
        status_map = {
            "pending": TaskStatus.PENDING,
            "in_progress": TaskStatus.IN_PROGRESS,
            "completed": TaskStatus.COMPLETED,
            "blocked": TaskStatus.BLOCKED
        }
        
        self.tasks[task_id].status = status_map.get(status.lower(), TaskStatus.PENDING)
        self.save_tasks()
        
        return f"✅ 任务状态已更新:{self.tasks[task_id].title} -> {status}"
    
    def list_tasks(self, status: str = None) -> str:
        """列出任务"""
        if status:
            status_filter = TaskStatus(status)
            filtered_tasks = [t for t in self.tasks.values() if t.status == status_filter]
        else:
            filtered_tasks = list(self.tasks.values())
        
        # 按优先级排序
        sorted_tasks = sorted(filtered_tasks, key=lambda t: t.priority.value, reverse=True)
        
        output = "📋 任务列表\n" + "=" * 50 + "\n"
        for task in sorted_tasks[:10]:  # 最多显示 10 个
            priority_icon = {
                TaskPriority.LOW: "🟢",
                TaskPriority.MEDIUM: "🟡",
                TaskPriority.HIGH: "🟠",
                TaskPriority.URGENT: "🔴"
            }
            status_icon = {
                TaskStatus.PENDING: "⏳",
                TaskStatus.IN_PROGRESS: "🔄",
                TaskStatus.COMPLETED: "✅",
                TaskStatus.BLOCKED: "🚫"
            }
            
            output += f"{priority_icon[task.priority]} {status_icon[task.status]} {task.title}\n"
            if task.due_date:
                output += f"   截止:{task.due_date.strftime('%Y-%m-%d %H:%M')}\n"
            output += "\n"
        
        if not sorted_tasks:
            output += "暂无任务\n"
        
        return output
    
    def get_daily_plan(self) -> str:
        """获取今日计划"""
        today = datetime.now().date()
        due_today = [t for t in self.tasks.values() 
                     if t.due_date and t.due_date.date() == today 
                     and t.status != TaskStatus.COMPLETED]
        
        output = f"📅 今日计划 ({today.strftime('%Y-%m-%d %A')})\n"
        output += "=" * 50 + "\n\n"
        
        if due_today:
            output += "🎯 今日到期任务:\n"
            for task in sorted(due_today, key=lambda t: t.priority.value, reverse=True):
                output += f"  • {task.title} [{task.priority.name}]\n"
        else:
            output += "✅ 今日无到期任务\n"
        
        # 显示进行中的任务
        in_progress = [t for t in self.tasks.values() if t.status == TaskStatus.IN_PROGRESS]
        if in_progress:
            output += "\n🔄 进行中的任务:\n"
            for task in in_progress:
                output += f"  • {task.title}\n"
        
        return output
    
    def process_request(self, request: str) -> str:
        """处理用户请求"""
        # 解析意图
        request_lower = request.lower()
        
        if any(word in request_lower for word in ["创建", "添加", "add", "create", "新任务"]):
            task_info = self.parse_task_request(request)
            return self.create_task(
                title=task_info["title"],
                description=task_info["description"],
                priority=task_info["priority"].name.lower()
            )
        
        elif any(word in request_lower for word in ["完成", "结束", "complete", "finish", "done"]):
            # 查找并完成任务
            for task_id, task in self.tasks.items():
                if task.title in request or task_id in request:
                    return self.update_task_status(task_id, "completed")
            return "未找到匹配的任务"
        
        elif any(word in request_lower for word in ["列表", "查看", "list", "show", "所有任务"]):
            return self.list_tasks()
        
        elif any(word in request_lower for word in ["今天", "今日", "today", "计划", "plan"]):
            return self.get_daily_plan()
        
        else:
            return "请明确您的需求,例如:'创建一个新任务'、'查看任务列表'、'今日计划'"

# 使用示例
if __name__ == "__main__":
    agent = TaskManagerAgent()
    
    # 示例交互
    print(agent.process_request("创建一个紧急任务:完成项目报告,今天截止"))
    print(agent.process_request("添加任务:准备下周会议材料"))
    print(agent.process_request("查看今日计划"))
    print(agent.process_request("显示所有任务"))

运行结果示例

✅ 任务已创建:创建一个紧急任务:完成项目报告 (ID: task_20260325020000)
✅ 任务已创建:添加任务:准备下周会议材料 (ID: task_20260325020001)

📅 今日计划 (2026-03-25 Wednesday)
==================================================

🎯 今日到期任务:
  • 创建一个紧急任务:完成项目报告 [URGENT]

📋 任务列表
==================================================
🔴 ⏳ 创建一个紧急任务:完成项目报告
   截止:2026-03-25 23:59

🟡 ⏳ 添加任务:准备下周会议材料

案例 4:多 Agent 协作系统

目标: 构建多个专业 Agent 协作完成复杂任务的系统

功能需求:

  • 定义不同角色的专业 Agent
  • 实现 Agent 间通信机制
  • 任务分配和协调
  • 结果汇总和整合

实现代码

python
# multi_agent_system.py
from typing import List, Dict, Any
from enum import Enum
import asyncio

class AgentRole(Enum):
    RESEARCHER = "researcher"      # 研究员
    WRITER = "writer"              # 写作者
    REVIEWER = "reviewer"          # 审核员
    CODER = "coder"                # 程序员
    MANAGER = "manager"            # 协调员

class Message(BaseModel):
    """Agent 间消息"""
    sender: str
    receiver: str
    content: str
    timestamp: datetime
    message_type: str = "info"  # info, request, response, command

class MultiAgentSystem:
    """多 Agent 协作系统"""
    
    def __init__(self):
        self.agents: Dict[str, BaseAgent] = {}
        self.message_queue: List[Message] = []
        self.task_history = []
        
    def register_agent(self, role: str, agent: BaseAgent):
        """注册 Agent"""
        self.agents[role] = agent
        print(f"✅ 注册 Agent: {role}")
    
    def send_message(self, sender: str, receiver: str, content: str, msg_type: str = "info"):
        """发送消息"""
        message = Message(
            sender=sender,
            receiver=receiver,
            content=content,
            timestamp=datetime.now(),
            message_type=msg_type
        )
        self.message_queue.append(message)
        print(f"📨 {sender} -> {receiver}: {content[:50]}...")
    
    def get_messages(self, receiver: str) -> List[Message]:
        """获取指定接收者的消息"""
        messages = [m for m in self.message_queue if m.receiver == receiver]
        self.message_queue = [m for m in self.message_queue if m.receiver != receiver]
        return messages
    
    async def execute_task(self, task_description: str) -> Dict:
        """执行多 Agent 协作任务"""
        print(f"\n🚀 开始执行任务:{task_description}\n")
        
        # 示例:内容创作流程
        # 1. 研究员收集信息
        researcher = self.agents.get("researcher")
        if researcher:
            research_result = await self._agent_research(task_description)
        
        # 2. 写作者创作内容
        writer = self.agents.get("writer")
        if writer:
            draft = await self._agent_write(task_description, research_result)
        
        # 3. 审核员审核内容
        reviewer = self.agents.get("reviewer")
        if reviewer:
            final_content = await self._agent_review(draft)
        
        return {
            "task": task_description,
            "result": final_content,
            "completed_at": datetime.now()
        }
    
    async def _agent_research(self, topic: str) -> str:
        """研究员 Agent 工作"""
        print("🔍 [研究员] 开始收集信息...")
        await asyncio.sleep(1)  # 模拟处理时间
        
        research_data = f"""
关于"{topic}"的研究资料:
1. 背景信息:...
2. 最新进展:...
3. 关键数据:...
4. 相关案例:...
"""
        print("✅ [研究员] 信息收集完成")
        return research_data
    
    async def _agent_write(self, topic: str, research_data: str) -> str:
        """写作者 Agent 工作"""
        print("✍️  [写作者] 开始创作内容...")
        await asyncio.sleep(1)
        
        draft = f"""
# {topic}

## 引言
基于收集的研究资料,本文将深入探讨{topic}

## 正文
{research_data[:200]}...

## 结论
[待补充]
"""
        print("✅ [写作者] 初稿完成")
        return draft
    
    async def _agent_review(self, draft: str) -> str:
        """审核员 Agent 工作"""
        print("👀 [审核员] 开始审核内容...")
        await asyncio.sleep(1)
        
        # 模拟审核意见
        reviewed = draft.replace("[待补充]", "综上所述,本文系统性地分析了相关内容。")
        
        print("✅ [审核员] 审核完成")
        return reviewed

# 专业 Agent 类
class ResearcherAgent(BaseAgent):
    """研究员 Agent"""
    def __init__(self):
        super().__init__()
        self.specialty = "信息收集与分析"

class WriterAgent(BaseAgent):
    """写作者 Agent"""
    def __init__(self):
        super().__init__()
        self.specialty = "内容创作与编辑"

class ReviewerAgent(BaseAgent):
    """审核员 Agent"""
    def __init__(self):
        super().__init__()
        self.specialty = "质量审核与优化"

# 使用示例
async def main():
    system = MultiAgentSystem()
    
    # 注册专业 Agent
    system.register_agent("researcher", ResearcherAgent())
    system.register_agent("writer", WriterAgent())
    system.register_agent("reviewer", ReviewerAgent())
    
    # 执行协作任务
    result = await system.execute_task("人工智能在医疗领域的应用")
    
    print("\n" + "=" * 50)
    print("任务完成!")
    print(f"最终内容长度:{len(result['result'])} 字符")
    print("=" * 50)

if __name__ == "__main__":
    asyncio.run(main())

运行结果示例

✅ 注册 Agent: researcher
✅ 注册 Agent: writer
✅ 注册 Agent: reviewer

🚀 开始执行任务:人工智能在医疗领域的应用

🔍 [研究员] 开始收集信息...
✅ [研究员] 信息收集完成
✍️  [写作者] 开始创作内容...
✅ [写作者] 初稿完成
👀 [审核员] 开始审核内容...
✅ [审核员] 审核完成

==================================================
任务完成!
最终内容长度:358 字符
==================================================

案例 5:自动化数据分析 Agent

目标: 构建能够自动加载、分析、可视化数据的智能分析助手

功能需求:

  • 自动加载多种格式数据(CSV、Excel、JSON)
  • 数据探索和统计分析
  • 自动生成可视化图表
  • 生成分析报告

实现代码

python
# data_analysis_agent.py
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from typing import Dict, Any, List
import os

class DataAnalysisAgent(BaseAgent):
    """数据分析 Agent"""
    
    def __init__(self, output_dir: str = "analysis_output"):
        super().__init__()
        self.output_dir = output_dir
        self.current_data: pd.DataFrame = None
        self.analysis_results = {}
        
        os.makedirs(output_dir, exist_ok=True)
        
        # 注册工具
        self._register_tools()
    
    def _register_tools(self):
        """注册数据分析工具"""
        tools = [
            Tool(
                name="load_data",
                description="加载数据文件",
                parameters={"file_path": "文件路径"},
                func=self.load_data
            ),
            Tool(
                name="describe_data",
                description="数据描述统计",
                parameters={},
                func=self.describe_data
            ),
            Tool(
                name="analyze_correlation",
                description="相关性分析",
                parameters={},
                func=self.analyze_correlation
            ),
            Tool(
                name="generate_plot",
                description="生成可视化图表",
                parameters={"plot_type": "图表类型", "columns": "列名列表"},
                func=self.generate_plot
            ),
            Tool(
                name="generate_report",
                description="生成分析报告",
                parameters={},
                func=self.generate_report
            )
        ]
        for tool in tools:
            self.add_tool(tool)
    
    def load_data(self, file_path: str) -> str:
        """加载数据"""
        try:
            if file_path.endswith('.csv'):
                self.current_data = pd.read_csv(file_path)
            elif file_path.endswith(('.xlsx', '.xls')):
                self.current_data = pd.read_excel(file_path)
            elif file_path.endswith('.json'):
                self.current_data = pd.read_json(file_path)
            else:
                return f"❌ 不支持的文件格式:{file_path}"
            
            self.add_memory(
                content=f"加载数据:{file_path}, 行数:{len(self.current_data)}, 列数:{len(self.current_data.columns)}",
                mem_type="short_term",
                tags=["data", "load"]
            )
            
            return f"✅ 数据加载成功\n行数:{len(self.current_data)}\n列数:{len(self.current_data.columns)}\n列名:{list(self.current_data.columns)}"
        
        except Exception as e:
            return f"❌ 加载失败:{e}"
    
    def describe_data(self) -> str:
        """数据描述统计"""
        if self.current_data is None:
            return "❌ 请先加载数据"
        
        description = "📊 数据描述统计\n" + "=" * 50 + "\n\n"
        
        # 基本信息
        description += f"**数据维度**: {self.current_data.shape[0]} 行 × {self.current_data.shape[1]}\n\n"
        
        # 数据类型
        description += "**数据类型**:\n"
        for col, dtype in self.current_data.dtypes.items():
            description += f"  • {col}: {dtype}\n"
        description += "\n"
        
        # 数值列统计
        numeric_cols = self.current_data.select_dtypes(include=[np.number]).columns
        if len(numeric_cols) > 0:
            description += "**数值列统计**:\n"
            desc_df = self.current_data[numeric_cols].describe()
            description += desc_df.to_string() + "\n\n"
        
        # 缺失值
        description += "**缺失值统计**:\n"
        missing = self.current_data.isnull().sum()
        missing_pct = (missing / len(self.current_data) * 100).round(2)
        for col, (count, pct) in zip(missing.index, zip(missing, missing_pct)):
            if count > 0:
                description += f"  • {col}: {count} ({pct}%)\n"
        if missing.sum() == 0:
            description += "  无缺失值\n"
        
        return description
    
    def analyze_correlation(self) -> str:
        """相关性分析"""
        if self.current_data is None:
            return "❌ 请先加载数据"
        
        numeric_data = self.current_data.select_dtypes(include=[np.number])
        
        if numeric_data.shape[1] < 2:
            return "❌ 数值列不足,无法进行相关性分析"
        
        corr_matrix = numeric_data.corr()
        
        # 找出强相关
        strong_corr = []
        for i in range(len(corr_matrix.columns)):
            for j in range(i+1, len(corr_matrix.columns)):
                corr_value = corr_matrix.iloc[i, j]
                if abs(corr_value) > 0.7:
                    strong_corr.append((
                        corr_matrix.columns[i],
                        corr_matrix.columns[j],
                        corr_value
                    ))
        
        result = "🔗 相关性分析\n" + "=" * 50 + "\n\n"
        result += "**强相关变量对** (|r| > 0.7):\n"
        
        if strong_corr:
            for col1, col2, corr in strong_corr:
                strength = "正相关" if corr > 0 else "负相关"
                result += f"  • {col1}{col2}: {corr:.3f} ({strength})\n"
        else:
            result += "  未发现强相关变量对\n"
        
        # 保存相关性热图
        plt.figure(figsize=(10, 8))
        sns.heatmap(corr_matrix, annot=True, cmap='coolwarm', center=0, fmt='.2f')
        plt.title('变量相关性热图')
        plt.tight_layout()
        
        plot_path = os.path.join(self.output_dir, 'correlation_heatmap.png')
        plt.savefig(plot_path, dpi=150)
        plt.close()
        
        result += f"\n✅ 相关性热图已保存:{plot_path}"
        
        self.analysis_results['correlation'] = corr_matrix
        
        return result
    
    def generate_plot(self, plot_type: str = "histogram", columns: List[str] = None) -> str:
        """生成可视化图表"""
        if self.current_data is None:
            return "❌ 请先加载数据"
        
        if not columns:
            columns = self.current_data.select_dtypes(include=[np.number]).columns[:5].tolist()
        
        plt.style.use('seaborn-v0_8')
        
        if plot_type == "histogram":
            fig, axes = plt.subplots(2, 2, figsize=(12, 10))
            axes = axes.flatten()
            
            for idx, col in enumerate(columns[:4]):
                if col in self.current_data.columns:
                    self.current_data[col].hist(ax=axes[idx], bins=30, edgecolor='black', alpha=0.7)
                    axes[idx].set_title(f'{col} 分布')
                    axes[idx].set_xlabel(col)
                    axes[idx].set_ylabel('频数')
            
            plt.suptitle('数据分布直方图', fontsize=16)
            plt.tight_layout()
        
        elif plot_type == "scatter":
            if len(columns) >= 2:
                plt.figure(figsize=(10, 8))
                plt.scatter(
                    self.current_data[columns[0]],
                    self.current_data[columns[1]],
                    alpha=0.6,
                    edgecolors='w',
                    linewidth=0.5
                )
                plt.xlabel(columns[0])
                plt.ylabel(columns[1])
                plt.title(f'{columns[0]} vs {columns[1]} 散点图')
                plt.grid(True, alpha=0.3)
        
        elif plot_type == "boxplot":
            plt.figure(figsize=(12, 6))
            numeric_data = self.current_data[columns].select_dtypes(include=[np.number])
            numeric_data.boxplot(rot=45)
            plt.title('数据箱线图')
            plt.ylabel('数值')
        
        # 保存图片
        timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
        plot_path = os.path.join(self.output_dir, f'plot_{plot_type}_{timestamp}.png')
        plt.savefig(plot_path, dpi=150, bbox_inches='tight')
        plt.close()
        
        return f"✅ 图表已生成:{plot_path}\n类型:{plot_type}\n列:{columns}"
    
    def generate_report(self) -> str:
        """生成分析报告"""
        if self.current_data is None:
            return "❌ 请先加载数据"
        
        report = f"""# 数据分析报告

**生成时间**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}

---

## 一、数据概览

- **数据源**: {getattr(self, 'data_source', '未知')}
- **样本量**: {len(self.current_data)}
- **特征数**: {len(self.current_data.columns)}
- **特征类型**:
  - 数值型:{len(self.current_data.select_dtypes(include=[np.number]).columns)}
  - 分类型:{len(self.current_data.select_dtypes(include=['object', 'category']).columns)}

## 二、数据质量

### 2.1 缺失值情况
{self.current_data.isnull().sum().to_string()}

### 2.2 数据完整性
完整率:{(1 - self.current_data.isnull().sum().sum() / (len(self.current_data) * len(self.current_data.columns))) * 100:.2f}%

## 三、描述性统计

{self.current_data.describe().to_string()}

## 四、关键发现

[基于数据分析的关键发现]

## 五、可视化图表

- 相关性热图:`correlation_heatmap.png`
- 分布直方图:`plot_histogram_*.png`
- [其他图表]

## 六、建议与下一步

1. [数据清洗建议]
2. [特征工程建议]
3. [建模方向建议]

---

*报告由 AI 数据分析 Agent 自动生成*
"""
        
        # 保存报告
        report_path = os.path.join(self.output_dir, f'analysis_report_{datetime.now().strftime("%Y%m%d_%H%M%S")}.md')
        with open(report_path, 'w', encoding='utf-8') as f:
            f.write(report)
        
        return f"✅ 分析报告已生成:{report_path}"

# 使用示例(创建示例数据)
if __name__ == "__main__":
    # 创建示例 CSV 数据
    sample_data = {
        'age': np.random.randint(20, 60, 100),
        'income': np.random.randint(3000, 20000, 100),
        'spending': np.random.randint(1000, 10000, 100),
        'satisfaction': np.random.randint(1, 10, 100),
        'education_years': np.random.randint(8, 20, 100)
    }
    
    df = pd.DataFrame(sample_data)
    df.to_csv('sample_data.csv', index=False)
    
    # 创建分析 Agent
    agent = DataAnalysisAgent()
    
    # 执行分析流程
    print(agent.load_data('sample_data.csv'))
    print("\n" + agent.describe_data())
    print("\n" + agent.analyze_correlation())
    print("\n" + agent.generate_plot('histogram'))
    print("\n" + agent.generate_report())

运行结果示例

✅ 数据加载成功
行数:100
列数:5
列名:['age', 'income', 'spending', 'satisfaction', 'education_years']

📊 数据描述统计
==================================================

**数据维度**: 100 行 × 5 列

**数据类型**:
  • age: int64
  • income: int64
  • spending: int64
  • satisfaction: int64
  • education_years: int64

**数值列统计**:
             age      income    ...
count  100.000000  100.000000  ...
mean    40.500000  11500.0000  ...

✅ 图表已生成:analysis_output/plot_histogram_20260325_020000.png
✅ 分析报告已生成:analysis_output/analysis_report_20260325_020000.md

四、高级主题

4.1 向量记忆系统

使用向量数据库实现语义记忆检索:

python
# vector_memory.py
import chromadb
from chromadb.config import Settings
from sentence_transformers import SentenceTransformer

class VectorMemory:
    """向量记忆系统"""
    
    def __init__(self, persist_dir: str = "./memory_db"):
        self.client = chromadb.Client(Settings(
            persist_directory=persist_dir,
            anonymized_telemetry=False
        ))
        self.collection = self.client.get_or_create_collection(
            name="agent_memory",
            metadata={"hnsw:space": "cosine"}
        )
        self.embedding_model = SentenceTransformer('paraphrase-multilingual-MiniLM-L12-v2')
    
    def add_memory(self, content: str, metadata: dict = None):
        """添加记忆"""
        embedding = self.embedding_model.encode(content).tolist()
        
        self.collection.add(
            embeddings=[embedding],
            documents=[content],
            metadatas=[metadata or {}],
            ids=[f"mem_{datetime.now().timestamp()}"]
        )
    
    def search_memories(self, query: str, n_results: int = 5) -> List[Dict]:
        """搜索相关记忆"""
        query_embedding = self.embedding_model.encode(query).tolist()
        
        results = self.collection.query(
            query_embeddings=[query_embedding],
            n_results=n_results,
            include=["documents", "metadatas", "distances"]
        )
        
        memories = []
        for i, doc in enumerate(results['documents'][0]):
            memories.append({
                'content': doc,
                'metadata': results['metadatas'][0][i],
                'distance': results['distances'][0][i]
            })
        
        return memories

4.2 工具调用优化

使用函数调用(Function Calling)提高工具调用准确性:

python
# function_calling.py
from openai import OpenAI
import json

class FunctionCallingAgent:
    """使用 Function Calling 的 Agent"""
    
    def __init__(self, api_key: str):
        self.client = OpenAI(api_key=api_key)
        self.tools = self._define_tools()
    
    def _define_tools(self):
        """定义工具 schema"""
        return [
            {
                "type": "function",
                "function": {
                    "name": "search_web",
                    "description": "搜索互联网获取信息",
                    "parameters": {
                        "type": "object",
                        "properties": {
                            "query": {
                                "type": "string",
                                "description": "搜索关键词"
                            }
                        },
                        "required": ["query"]
                    }
                }
            },
            {
                "type": "function",
                "function": {
                    "name": "calculate",
                    "description": "执行数学计算",
                    "parameters": {
                        "type": "object",
                        "properties": {
                            "expression": {
                                "type": "string",
                                "description": "数学表达式"
                            }
                        },
                        "required": ["expression"]
                    }
                }
            }
        ]
    
    def chat(self, user_message: str) -> str:
        """对话处理"""
        response = self.client.chat.completions.create(
            model="gpt-4-turbo",
            messages=[{"role": "user", "content": user_message}],
            tools=self.tools,
            tool_choice="auto"
        )
        
        message = response.choices[0].message
        
        # 检查是否需要调用工具
        if message.tool_calls:
            for tool_call in message.tool_calls:
                function_name = tool_call.function.name
                function_args = json.loads(tool_call.function.arguments)
                
                # 执行工具调用
                result = self._execute_tool(function_name, function_args)
                
                # 将结果返回给 LLM
                second_response = self.client.chat.completions.create(
                    model="gpt-4-turbo",
                    messages=[
                        {"role": "user", "content": user_message},
                        message,
                        {
                            "role": "tool",
                            "tool_call_id": tool_call.id,
                            "content": str(result)
                        }
                    ]
                )
                return second_response.choices[0].message.content
        
        return message.content
    
    def _execute_tool(self, function_name: str, args: dict):
        """执行工具"""
        if function_name == "search_web":
            # 实际实现调用搜索 API
            return f"搜索结果:关于'{args['query']}'的信息..."
        elif function_name == "calculate":
            return eval(args['expression'])

五、最佳实践与技巧

5.1 Agent 设计原则

  1. 单一职责:每个 Agent 专注于特定领域
  2. 明确边界:清晰定义 Agent 的能力和限制
  3. 可组合性:Agent 应该能够相互协作
  4. 可观测性:记录 Agent 的决策过程
  5. 安全性:限制危险操作,设置执行超时

5.2 常见陷阱与解决方案

问题原因解决方案
无限循环Agent 无法判断任务完成设置最大迭代次数,添加完成条件检查
工具调用错误参数格式不正确使用 Pydantic 验证,添加错误处理
记忆膨胀存储过多无关信息实现记忆压缩和遗忘机制
响应缓慢LLM 调用频繁缓存常用结果,批量处理
安全风险代码执行无限制沙箱隔离,白名单机制

5.3 性能优化技巧

python
# 1. 使用缓存
from functools import lru_cache

@lru_cache(maxsize=100)
def cached_llm_call(prompt: str) -> str:
    return llm.generate(prompt)

# 2. 批量处理
async def batch_process(items: List, batch_size: int = 10):
    for i in range(0, len(items), batch_size):
        batch = items[i:i+batch_size]
        await process_batch(batch)

# 3. 流式响应
def stream_response(agent, query: str):
    for chunk in agent.generate_stream(query):
        yield chunk

六、总结与展望

6.1 核心要点回顾

  1. Agent 架构:感知 - 认知 - 行动三层架构是设计基础
  2. 工具调用:通过 Function Calling 实现安全可靠的工具执行
  3. 记忆系统:结合短期记忆和长期向量记忆
  4. 多 Agent 协作:通过消息传递实现复杂任务分解
  5. 安全优先:沙箱执行、超时控制、权限限制

6.2 未来发展方向

  • 自主性增强:更复杂的规划和决策能力
  • 多模态融合:结合视觉、语音等多模态输入
  • 持续学习:从交互中不断改进
  • 人机协作:更自然的交互界面
  • 标准化框架:行业标准的 Agent 协议

6.3 推荐资源

框架与库:

学习资源:

  • LangChain 官方文档
  • Hugging Face Course
  • DeepLearning.AI Agent 课程

FAQ

Q1: Agent 和普通 LLM 应用有什么区别?

A: Agent 具有自主性、记忆能力和工具调用能力,能够主动感知环境、规划多步任务并执行。普通 LLM 应用通常是被动的问答系统。

Q2: 如何选择合适的 Agent 框架?

A:

  • 快速原型:LangChain
  • 文档处理:LlamaIndex
  • 多 Agent 协作:AutoGen、CrewAI
  • 生产环境:考虑自定义实现

Q3: Agent 的安全性如何保证?

A:

  • 代码执行使用沙箱隔离
  • 设置执行超时和资源限制
  • 工具调用前进行参数验证
  • 敏感操作需要人工确认

Q4: 如何提高 Agent 的响应速度?

A:

  • 使用缓存减少重复 LLM 调用
  • 选择更快的模型(如 GPT-3.5-Turbo)
  • 批量处理多个任务
  • 使用流式响应

Q5: Agent 可以处理多复杂的任务?

A: 理论上可以处理任意复杂度的任务,但实际受限于:

  • LLM 的推理能力
  • 工具的功能范围
  • 记忆系统的容量
  • 执行时间和成本

建议将复杂任务分解为多个子任务,使用多 Agent 协作完成。


文章字数: 约 12,500 字

实战案例: 5 个(研究助手、代码 Agent、任务管理、多 Agent 协作、数据分析)

完成时间: 2026-03-25

Released under the MIT License.