AI 智能体(Agent)开发实战:从架构设计到多 Agent 协作
概述
人工智能智能体(AI Agent)是 2024-2026 年 AI 领域最热门的方向之一。从 AutoGPT 的爆火到 LangChain、LlamaIndex 等框架的成熟,AI Agent 正在从概念走向实际应用。本文将深入讲解 AI Agent 的核心架构、关键组件、工具调用机制,并通过 5 个实战案例带你从零构建功能强大的 AI 智能体系统。
本文适合人群:
- 有 Python 编程基础的开发者
- 对 AI Agent 感兴趣的技术人员
- 希望构建自动化 AI 应用的工程师
学习收获:
- 理解 AI Agent 的核心架构和设计理念
- 掌握工具调用、记忆系统、规划能力等关键技术
- 完成 5 个从简单到复杂的实战项目
- 了解多 Agent 协作系统的设计模式
一、AI Agent 核心概念
1.1 什么是 AI Agent?
AI Agent 是指能够感知环境、进行推理、做出决策并执行动作的智能系统。与大语言模型(LLM)不同,Agent 具有以下特征:
| 特性 | LLM | AI Agent |
|---|---|---|
| 感知能力 | 被动接收输入 | 主动感知环境 |
| 记忆能力 | 上下文窗口限制 | 长期记忆系统 |
| 行动能力 | 仅生成文本 | 调用工具/API |
| 规划能力 | 单次推理 | 多步任务规划 |
| 自主性 | 低 | 高 |
1.2 Agent 的核心组件
一个完整的 AI Agent 系统通常包含以下核心组件:
┌─────────────────────────────────────────────────────────┐
│ AI Agent 架构 │
├─────────────────────────────────────────────────────────┤
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ 感知层 │ │ 认知层 │ │ 行动层 │ │
│ │ (输入处理) │ │ (推理决策) │ │ (工具调用) │ │
│ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │
│ │ │ │ │
│ ┌──────▼──────┐ ┌──────▼──────┐ ┌──────▼──────┐ │
│ │ 环境观察 │ │ LLM 核心 │ │ 执行器 │ │
│ │ 用户输入 │ │ 规划模块 │ │ API 调用 │ │
│ │ 文件读取 │ │ 记忆检索 │ │ 代码执行 │ │
│ └─────────────┘ └──────┬──────┘ └─────────────┘ │
│ │ │
│ ┌──────▼──────┐ │
│ │ 记忆系统 │ │
│ │ 短期记忆 │ │
│ │ 长期记忆 │ │
│ │ 向量数据库 │ │
│ └─────────────┘ │
└─────────────────────────────────────────────────────────┘1.3 Agent 的工作流程
典型的 Agent 工作流程遵循 ReAct(Reasoning + Acting)模式:
1. 观察(Observe) → 接收用户输入和环境信息
2. 思考(Think) → LLM 分析任务,制定计划
3. 行动(Act) → 调用工具执行具体操作
4. 反思(Reflect) → 评估结果,调整策略
5. 重复 2-4 步直到任务完成二、环境搭建与基础框架
2.1 安装依赖
# 创建项目目录
mkdir ai-agent-practice && cd ai-agent-practice
# 创建虚拟环境
python3 -m venv venv
source venv/bin/activate
# 安装核心依赖
pip install langchain langchain-openai langchain-community
pip install chromadb sentence-transformers
pip install python-dotenv pydantic requests
pip install duckduckgo-search wikipedia2.2 配置环境变量
创建 .env 文件:
# .env
OPENAI_API_KEY=your_api_key_here
OPENAI_API_BASE=https://api.openai.com/v1
# 或使用其他兼容的 API 服务
# OPENAI_API_BASE=https://api.moonshot.cn/v12.3 基础 Agent 类设计
# agent_core.py
from typing import List, Dict, Optional, Any
from pydantic import BaseModel, Field
from datetime import datetime
import json
class Tool(BaseModel):
"""工具定义"""
name: str
description: str
parameters: Dict[str, Any]
func: Any # 实际执行的函数
class Memory(BaseModel):
"""记忆条目"""
content: str
timestamp: datetime
type: str = "short_term" # short_term, long_term, episodic
tags: List[str] = []
class AgentState(BaseModel):
"""Agent 状态"""
current_goal: str = ""
completed_steps: List[str] = []
pending_actions: List[Dict] = []
context: Dict[str, Any] = {}
class BaseAgent:
"""基础 Agent 类"""
def __init__(self, model: str = "gpt-4", tools: List[Tool] = None):
self.model = model
self.tools = tools or []
self.memory: List[Memory] = []
self.state = AgentState()
self.conversation_history = []
def add_tool(self, tool: Tool):
"""注册工具"""
self.tools.append(tool)
def add_memory(self, content: str, mem_type: str = "short_term", tags: List[str] = None):
"""添加记忆"""
memory = Memory(
content=content,
timestamp=datetime.now(),
type=mem_type,
tags=tags or []
)
self.memory.append(memory)
def get_relevant_memories(self, query: str, limit: int = 5) -> List[Memory]:
"""检索相关记忆(简化版,实际应使用向量检索)"""
# 简单关键词匹配
relevant = []
for mem in self.memory:
if query.lower() in mem.content.lower():
relevant.append(mem)
return relevant[:limit]
def think(self, observation: str) -> str:
"""思考过程:分析任务,制定计划"""
# 实际实现中会调用 LLM
prompt = f"""
当前目标:{self.state.current_goal}
已完成步骤:{self.state.completed_steps}
新观察:{observation}
请分析当前情况,制定下一步行动计划。
如果需要调用工具,请明确指出工具名称和参数。
"""
return prompt
def act(self, tool_name: str, **kwargs) -> Any:
"""执行动作:调用工具"""
tool = next((t for t in self.tools if t.name == tool_name), None)
if not tool:
raise ValueError(f"工具 {tool_name} 不存在")
return tool.func(**kwargs)
def run(self, goal: str) -> str:
"""运行 Agent 完成目标"""
self.state.current_goal = goal
observation = f"开始执行任务:{goal}"
max_iterations = 10
for i in range(max_iterations):
# 思考
thought = self.think(observation)
# 这里应该调用 LLM 解析思考结果
# 简化处理,直接返回
if i == max_iterations - 1:
return f"任务完成:{goal}"
observation = "继续执行..."
return "达到最大迭代次数"三、实战案例
案例 1:智能研究助手
目标: 构建一个能够自动搜索、整理信息的 AI 研究助手
功能需求:
- 网络搜索获取最新信息
- 维基百科查询背景知识
- 自动整理和总结信息
- 生成结构化的研究报告
实现代码
# research_agent.py
import requests
from langchain.tools import DuckDuckGoSearchRun
from langchain_community.tools import WikipediaQueryRun
from langchain_community.utilities import WikipediaAPIWrapper
from typing import List, Dict
import json
class ResearchAgent(BaseAgent):
"""研究助手 Agent"""
def __init__(self):
super().__init__()
# 定义工具
search_tool = Tool(
name="web_search",
description="搜索互联网获取最新信息",
parameters={"query": "搜索关键词"},
func=DuckDuckGoSearchRun().run
)
wiki_tool = Tool(
name="wikipedia",
description="查询维基百科获取背景知识",
parameters={"query": "查询主题"},
func=WikipediaQueryRun(api_wrapper=WikipediaAPIWrapper()).run
)
self.add_tool(search_tool)
self.add_tool(wiki_tool)
def research(self, topic: str, depth: int = 2) -> Dict:
"""执行研究任务"""
self.state.current_goal = f"研究主题:{topic}"
results = {
"topic": topic,
"web_results": [],
"wiki_summary": "",
"final_report": ""
}
# 步骤 1:网络搜索
print(f"🔍 正在搜索:{topic}")
search_query = f"{topic} 2025 2026 最新进展"
web_content = self.act("web_search", query=search_query)
results["web_results"] = web_content[:500] # 截取部分内容
# 步骤 2:维基百科查询
print(f"📚 查询维基百科:{topic}")
try:
wiki_content = self.act("wikipedia", query=topic)
results["wiki_summary"] = wiki_content[:1000]
except Exception as e:
results["wiki_summary"] = f"维基百科查询失败:{e}"
# 步骤 3:生成研究报告
print("📝 生成研究报告...")
report = self.generate_report(topic, results)
results["final_report"] = report
# 保存记忆
self.add_memory(
content=f"研究主题:{topic}, 报告长度:{len(report)}字",
mem_type="long_term",
tags=["research", topic]
)
return results
def generate_report(self, topic: str, data: Dict) -> str:
"""生成结构化研究报告"""
report = f"""# 研究报告:{topic}
## 一、概述
{topic}是当前备受关注的研究领域。本报告基于网络搜索和维基百科的信息整理而成。
## 二、背景知识
{data['wiki_summary'][:500]}...
## 三、最新进展
{data['web_results'][:500]}...
## 四、关键发现
1. [待补充]
2. [待补充]
3. [待补充]
## 五、参考资料
- 网络搜索结果
- 维基百科
## 六、后续研究方向
[待补充]
---
报告生成时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
"""
return report
# 使用示例
if __name__ == "__main__":
agent = ResearchAgent()
result = agent.research("人工智能 Agent 技术")
print(result["final_report"])运行结果示例
🔍 正在搜索:人工智能 Agent 技术
📚 查询维基百科:人工智能 Agent 技术
📝 生成研究报告...
# 研究报告:人工智能 Agent 技术
## 一、概述
人工智能 Agent 技术是当前备受关注的研究领域...
## 二、背景知识
AI Agent 是指能够自主感知、决策和行动的 inteligente 系统...
## 三、最新进展
2025-2026 年,AI Agent 技术在多个领域取得突破...案例 2:代码生成与执行 Agent
目标: 构建能够理解需求、生成代码并安全执行的编程助手
功能需求:
- 理解自然语言编程需求
- 生成可执行的 Python 代码
- 在沙箱环境中运行代码
- 返回执行结果和解释
实现代码
# coding_agent.py
import subprocess
import tempfile
import os
from typing import Optional, Tuple
import re
class CodingAgent(BaseAgent):
"""代码生成与执行 Agent"""
def __init__(self, sandbox: bool = True):
super().__init__()
self.sandbox = sandbox
self.execution_history = []
# 代码执行工具
exec_tool = Tool(
name="execute_python",
description="在沙箱环境中执行 Python 代码",
parameters={"code": "要执行的 Python 代码"},
func=self._safe_execute
)
self.add_tool(exec_tool)
def _safe_execute(self, code: str, timeout: int = 10) -> Tuple[bool, str]:
"""安全执行 Python 代码"""
# 安全检查:禁止危险操作
dangerous_patterns = [
r'__import__\s*\(',
r'eval\s*\(',
r'exec\s*\(',
r'open\s*\([^)]*[\'"][^\'"]*[\'"]\s*,\s*[\'"]w',
r'os\.system',
r'subprocess\.',
r'shutil\.rmtree',
]
for pattern in dangerous_patterns:
if re.search(pattern, code):
return False, f"安全警告:检测到危险操作 '{pattern}'"
# 创建临时文件执行
try:
with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f:
f.write(code)
temp_path = f.name
# 执行代码
result = subprocess.run(
['python3', temp_path],
capture_output=True,
text=True,
timeout=timeout,
cwd=tempfile.gettempdir()
)
os.unlink(temp_path)
if result.returncode == 0:
return True, result.stdout
else:
return False, f"执行错误:{result.stderr}"
except subprocess.TimeoutExpired:
return False, "执行超时"
except Exception as e:
return False, f"执行异常:{str(e)}"
def generate_code(self, requirement: str) -> str:
"""根据需求生成代码"""
# 实际实现中会调用 LLM
prompt = f"""
请根据以下需求生成 Python 代码:
需求:{requirement}
要求:
1. 代码完整可运行
2. 包含必要的注释
3. 遵循 Python 最佳实践
4. 只返回代码,不要其他说明
生成的代码:
"""
return prompt
def solve(self, problem: str) -> Dict:
"""解决编程问题"""
self.state.current_goal = f"解决问题:{problem}"
result = {
"problem": problem,
"generated_code": "",
"execution_success": False,
"output": "",
"explanation": ""
}
# 步骤 1:生成代码
print(f"💻 生成代码:{problem}")
code = self.generate_code(problem)
# 这里简化处理,实际需要 LLM 生成
code = self._generate_sample_code(problem)
result["generated_code"] = code
# 步骤 2:执行代码
print("▶️ 执行代码...")
success, output = self.act("execute_python", code=code)
result["execution_success"] = success
result["output"] = output
# 步骤 3:生成解释
result["explanation"] = self._explain_solution(problem, code, output)
# 记录执行历史
self.execution_history.append({
"problem": problem,
"code": code,
"success": success
})
return result
def _generate_sample_code(self, problem: str) -> str:
"""生成示例代码(实际应由 LLM 生成)"""
if "斐波那契" in problem or "fibonacci" in problem.lower():
return """
def fibonacci(n):
'''计算斐波那契数列第 n 项'''
if n <= 0:
return 0
elif n == 1:
return 1
else:
a, b = 0, 1
for _ in range(2, n + 1):
a, b = b, a + b
return b
# 测试
for i in range(10):
print(f"F({i}) = {fibonacci(i)}")
"""
elif "排序" in problem or "sort" in problem.lower():
return """
def quick_sort(arr):
'''快速排序实现'''
if len(arr) <= 1:
return arr
pivot = arr[len(arr) // 2]
left = [x for x in arr if x < pivot]
middle = [x for x in arr if x == pivot]
right = [x for x in arr if x > pivot]
return quick_sort(left) + middle + quick_sort(right)
# 测试
test_array = [64, 34, 25, 12, 22, 11, 90]
print(f"原数组:{test_array}")
print(f"排序后:{quick_sort(test_array)}")
"""
else:
return """
# 通用示例代码
print("Hello, World!")
print("这是一个示例程序")
# 计算 1 到 100 的和
total = sum(range(1, 101))
print(f"1 到 100 的和:{total}")
"""
def _explain_solution(self, problem: str, code: str, output: str) -> str:
"""生成解决方案解释"""
return f"""
## 解决方案说明
### 问题分析
{problem}
### 实现思路
1. 理解问题需求
2. 设计算法逻辑
3. 编写可执行代码
4. 测试验证结果
### 代码要点
- 使用函数封装核心逻辑
- 添加必要的错误处理
- 包含清晰的注释
### 执行结果
{output[:500] if output else "无输出"}
"""
# 使用示例
if __name__ == "__main__":
agent = CodingAgent()
# 示例 1:斐波那契数列
result1 = agent.solve("生成计算斐波那契数列的 Python 代码")
print(result1["explanation"])
# 示例 2:排序算法
result2 = agent.solve("实现快速排序算法并测试")
print(result2["explanation"])运行结果示例
💻 生成代码:生成计算斐波那契数列的 Python 代码
▶️ 执行代码...
## 解决方案说明
### 问题分析
生成计算斐波那契数列的 Python 代码
### 实现思路
1. 理解问题需求
2. 设计算法逻辑
3. 编写可执行代码
4. 测试验证结果
### 代码要点
- 使用函数封装核心逻辑
- 添加必要的错误处理
- 包含清晰的注释
### 执行结果
F(0) = 0
F(1) = 1
F(2) = 1
F(3) = 2
...案例 3:个人任务管理 Agent
目标: 构建能够理解、规划和跟踪个人任务的智能助手
功能需求:
- 自然语言任务解析
- 任务分解和优先级排序
- 进度跟踪和提醒
- 与日历/待办事项集成
实现代码
# task_agent.py
from datetime import datetime, timedelta
from typing import List, Dict, Optional
from enum import Enum
import json
import os
class TaskPriority(Enum):
LOW = 1
MEDIUM = 2
HIGH = 3
URGENT = 4
class TaskStatus(Enum):
PENDING = "pending"
IN_PROGRESS = "in_progress"
COMPLETED = "completed"
BLOCKED = "blocked"
class Task(BaseModel):
"""任务定义"""
id: str
title: str
description: str = ""
priority: TaskPriority = TaskPriority.MEDIUM
status: TaskStatus = TaskStatus.PENDING
created_at: datetime
due_date: Optional[datetime] = None
subtasks: List[str] = []
tags: List[str] = []
class TaskManagerAgent(BaseAgent):
"""任务管理 Agent"""
def __init__(self, storage_path: str = "tasks.json"):
super().__init__()
self.storage_path = storage_path
self.tasks: Dict[str, Task] = {}
self.load_tasks()
# 注册工具
self._register_tools()
def _register_tools(self):
"""注册任务管理工具"""
tools = [
Tool(
name="create_task",
description="创建新任务",
parameters={"title": "任务标题", "description": "任务描述", "priority": "优先级"},
func=self.create_task
),
Tool(
name="update_task",
description="更新任务状态",
parameters={"task_id": "任务 ID", "status": "新状态"},
func=self.update_task_status
),
Tool(
name="list_tasks",
description="列出任务",
parameters={"status": "可选的状态过滤"},
func=self.list_tasks
),
Tool(
name="get_daily_plan",
description="获取今日计划",
parameters={},
func=self.get_daily_plan
)
]
for tool in tools:
self.add_tool(tool)
def load_tasks(self):
"""从文件加载任务"""
if os.path.exists(self.storage_path):
try:
with open(self.storage_path, 'r', encoding='utf-8') as f:
data = json.load(f)
for task_id, task_data in data.items():
task_data['created_at'] = datetime.fromisoformat(task_data['created_at'])
if task_data.get('due_date'):
task_data['due_date'] = datetime.fromisoformat(task_data['due_date'])
task_data['priority'] = TaskPriority(task_data['priority'])
task_data['status'] = TaskStatus(task_data['status'])
self.tasks[task_id] = Task(**task_data)
except Exception as e:
print(f"加载任务失败:{e}")
def save_tasks(self):
"""保存任务到文件"""
data = {}
for task_id, task in self.tasks.items():
task_dict = task.dict()
task_dict['created_at'] = task.created_at.isoformat()
if task.due_date:
task_dict['due_date'] = task.due_date.isoformat()
task_dict['priority'] = task.priority.value
task_dict['status'] = task.status.value
data[task_id] = task_dict
with open(self.storage_path, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
def parse_task_request(self, request: str) -> Dict:
"""解析自然语言任务请求"""
# 简化实现,实际应使用 LLM
task_info = {
"title": request[:50],
"description": request,
"priority": TaskPriority.MEDIUM,
"due_date": None
}
# 关键词识别优先级
if any(word in request for word in ["紧急", "急", "urgent", "ASAP"]):
task_info["priority"] = TaskPriority.URGENT
elif any(word in request for word in ["重要", "important", "high"]):
task_info["priority"] = TaskPriority.HIGH
elif any(word in request for word in ["低", "low"]):
task_info["priority"] = TaskPriority.LOW
# 识别截止日期
if "今天" in request or "today" in request.lower():
task_info["due_date"] = datetime.now().replace(hour=23, minute=59)
elif "明天" in request or "tomorrow" in request.lower():
task_info["due_date"] = (datetime.now() + timedelta(days=1)).replace(hour=23, minute=59)
elif "本周" in request or "this week" in request.lower():
days_until_sunday = 6 - datetime.now().weekday()
task_info["due_date"] = (datetime.now() + timedelta(days=days_until_sunday)).replace(hour=23, minute=59)
return task_info
def create_task(self, title: str, description: str = "", priority: str = "medium") -> str:
"""创建新任务"""
task_id = f"task_{datetime.now().strftime('%Y%m%d%H%M%S')}"
priority_map = {
"low": TaskPriority.LOW,
"medium": TaskPriority.MEDIUM,
"high": TaskPriority.HIGH,
"urgent": TaskPriority.URGENT
}
task = Task(
id=task_id,
title=title,
description=description,
priority=priority_map.get(priority.lower(), TaskPriority.MEDIUM),
created_at=datetime.now()
)
self.tasks[task_id] = task
self.save_tasks()
self.add_memory(
content=f"创建任务:{title}",
mem_type="episodic",
tags=["task", "create"]
)
return f"✅ 任务已创建:{title} (ID: {task_id})"
def update_task_status(self, task_id: str, status: str) -> str:
"""更新任务状态"""
if task_id not in self.tasks:
return f"❌ 任务不存在:{task_id}"
status_map = {
"pending": TaskStatus.PENDING,
"in_progress": TaskStatus.IN_PROGRESS,
"completed": TaskStatus.COMPLETED,
"blocked": TaskStatus.BLOCKED
}
self.tasks[task_id].status = status_map.get(status.lower(), TaskStatus.PENDING)
self.save_tasks()
return f"✅ 任务状态已更新:{self.tasks[task_id].title} -> {status}"
def list_tasks(self, status: str = None) -> str:
"""列出任务"""
if status:
status_filter = TaskStatus(status)
filtered_tasks = [t for t in self.tasks.values() if t.status == status_filter]
else:
filtered_tasks = list(self.tasks.values())
# 按优先级排序
sorted_tasks = sorted(filtered_tasks, key=lambda t: t.priority.value, reverse=True)
output = "📋 任务列表\n" + "=" * 50 + "\n"
for task in sorted_tasks[:10]: # 最多显示 10 个
priority_icon = {
TaskPriority.LOW: "🟢",
TaskPriority.MEDIUM: "🟡",
TaskPriority.HIGH: "🟠",
TaskPriority.URGENT: "🔴"
}
status_icon = {
TaskStatus.PENDING: "⏳",
TaskStatus.IN_PROGRESS: "🔄",
TaskStatus.COMPLETED: "✅",
TaskStatus.BLOCKED: "🚫"
}
output += f"{priority_icon[task.priority]} {status_icon[task.status]} {task.title}\n"
if task.due_date:
output += f" 截止:{task.due_date.strftime('%Y-%m-%d %H:%M')}\n"
output += "\n"
if not sorted_tasks:
output += "暂无任务\n"
return output
def get_daily_plan(self) -> str:
"""获取今日计划"""
today = datetime.now().date()
due_today = [t for t in self.tasks.values()
if t.due_date and t.due_date.date() == today
and t.status != TaskStatus.COMPLETED]
output = f"📅 今日计划 ({today.strftime('%Y-%m-%d %A')})\n"
output += "=" * 50 + "\n\n"
if due_today:
output += "🎯 今日到期任务:\n"
for task in sorted(due_today, key=lambda t: t.priority.value, reverse=True):
output += f" • {task.title} [{task.priority.name}]\n"
else:
output += "✅ 今日无到期任务\n"
# 显示进行中的任务
in_progress = [t for t in self.tasks.values() if t.status == TaskStatus.IN_PROGRESS]
if in_progress:
output += "\n🔄 进行中的任务:\n"
for task in in_progress:
output += f" • {task.title}\n"
return output
def process_request(self, request: str) -> str:
"""处理用户请求"""
# 解析意图
request_lower = request.lower()
if any(word in request_lower for word in ["创建", "添加", "add", "create", "新任务"]):
task_info = self.parse_task_request(request)
return self.create_task(
title=task_info["title"],
description=task_info["description"],
priority=task_info["priority"].name.lower()
)
elif any(word in request_lower for word in ["完成", "结束", "complete", "finish", "done"]):
# 查找并完成任务
for task_id, task in self.tasks.items():
if task.title in request or task_id in request:
return self.update_task_status(task_id, "completed")
return "未找到匹配的任务"
elif any(word in request_lower for word in ["列表", "查看", "list", "show", "所有任务"]):
return self.list_tasks()
elif any(word in request_lower for word in ["今天", "今日", "today", "计划", "plan"]):
return self.get_daily_plan()
else:
return "请明确您的需求,例如:'创建一个新任务'、'查看任务列表'、'今日计划'"
# 使用示例
if __name__ == "__main__":
agent = TaskManagerAgent()
# 示例交互
print(agent.process_request("创建一个紧急任务:完成项目报告,今天截止"))
print(agent.process_request("添加任务:准备下周会议材料"))
print(agent.process_request("查看今日计划"))
print(agent.process_request("显示所有任务"))运行结果示例
✅ 任务已创建:创建一个紧急任务:完成项目报告 (ID: task_20260325020000)
✅ 任务已创建:添加任务:准备下周会议材料 (ID: task_20260325020001)
📅 今日计划 (2026-03-25 Wednesday)
==================================================
🎯 今日到期任务:
• 创建一个紧急任务:完成项目报告 [URGENT]
📋 任务列表
==================================================
🔴 ⏳ 创建一个紧急任务:完成项目报告
截止:2026-03-25 23:59
🟡 ⏳ 添加任务:准备下周会议材料案例 4:多 Agent 协作系统
目标: 构建多个专业 Agent 协作完成复杂任务的系统
功能需求:
- 定义不同角色的专业 Agent
- 实现 Agent 间通信机制
- 任务分配和协调
- 结果汇总和整合
实现代码
# multi_agent_system.py
from typing import List, Dict, Any
from enum import Enum
import asyncio
class AgentRole(Enum):
RESEARCHER = "researcher" # 研究员
WRITER = "writer" # 写作者
REVIEWER = "reviewer" # 审核员
CODER = "coder" # 程序员
MANAGER = "manager" # 协调员
class Message(BaseModel):
"""Agent 间消息"""
sender: str
receiver: str
content: str
timestamp: datetime
message_type: str = "info" # info, request, response, command
class MultiAgentSystem:
"""多 Agent 协作系统"""
def __init__(self):
self.agents: Dict[str, BaseAgent] = {}
self.message_queue: List[Message] = []
self.task_history = []
def register_agent(self, role: str, agent: BaseAgent):
"""注册 Agent"""
self.agents[role] = agent
print(f"✅ 注册 Agent: {role}")
def send_message(self, sender: str, receiver: str, content: str, msg_type: str = "info"):
"""发送消息"""
message = Message(
sender=sender,
receiver=receiver,
content=content,
timestamp=datetime.now(),
message_type=msg_type
)
self.message_queue.append(message)
print(f"📨 {sender} -> {receiver}: {content[:50]}...")
def get_messages(self, receiver: str) -> List[Message]:
"""获取指定接收者的消息"""
messages = [m for m in self.message_queue if m.receiver == receiver]
self.message_queue = [m for m in self.message_queue if m.receiver != receiver]
return messages
async def execute_task(self, task_description: str) -> Dict:
"""执行多 Agent 协作任务"""
print(f"\n🚀 开始执行任务:{task_description}\n")
# 示例:内容创作流程
# 1. 研究员收集信息
researcher = self.agents.get("researcher")
if researcher:
research_result = await self._agent_research(task_description)
# 2. 写作者创作内容
writer = self.agents.get("writer")
if writer:
draft = await self._agent_write(task_description, research_result)
# 3. 审核员审核内容
reviewer = self.agents.get("reviewer")
if reviewer:
final_content = await self._agent_review(draft)
return {
"task": task_description,
"result": final_content,
"completed_at": datetime.now()
}
async def _agent_research(self, topic: str) -> str:
"""研究员 Agent 工作"""
print("🔍 [研究员] 开始收集信息...")
await asyncio.sleep(1) # 模拟处理时间
research_data = f"""
关于"{topic}"的研究资料:
1. 背景信息:...
2. 最新进展:...
3. 关键数据:...
4. 相关案例:...
"""
print("✅ [研究员] 信息收集完成")
return research_data
async def _agent_write(self, topic: str, research_data: str) -> str:
"""写作者 Agent 工作"""
print("✍️ [写作者] 开始创作内容...")
await asyncio.sleep(1)
draft = f"""
# {topic}
## 引言
基于收集的研究资料,本文将深入探讨{topic}。
## 正文
{research_data[:200]}...
## 结论
[待补充]
"""
print("✅ [写作者] 初稿完成")
return draft
async def _agent_review(self, draft: str) -> str:
"""审核员 Agent 工作"""
print("👀 [审核员] 开始审核内容...")
await asyncio.sleep(1)
# 模拟审核意见
reviewed = draft.replace("[待补充]", "综上所述,本文系统性地分析了相关内容。")
print("✅ [审核员] 审核完成")
return reviewed
# 专业 Agent 类
class ResearcherAgent(BaseAgent):
"""研究员 Agent"""
def __init__(self):
super().__init__()
self.specialty = "信息收集与分析"
class WriterAgent(BaseAgent):
"""写作者 Agent"""
def __init__(self):
super().__init__()
self.specialty = "内容创作与编辑"
class ReviewerAgent(BaseAgent):
"""审核员 Agent"""
def __init__(self):
super().__init__()
self.specialty = "质量审核与优化"
# 使用示例
async def main():
system = MultiAgentSystem()
# 注册专业 Agent
system.register_agent("researcher", ResearcherAgent())
system.register_agent("writer", WriterAgent())
system.register_agent("reviewer", ReviewerAgent())
# 执行协作任务
result = await system.execute_task("人工智能在医疗领域的应用")
print("\n" + "=" * 50)
print("任务完成!")
print(f"最终内容长度:{len(result['result'])} 字符")
print("=" * 50)
if __name__ == "__main__":
asyncio.run(main())运行结果示例
✅ 注册 Agent: researcher
✅ 注册 Agent: writer
✅ 注册 Agent: reviewer
🚀 开始执行任务:人工智能在医疗领域的应用
🔍 [研究员] 开始收集信息...
✅ [研究员] 信息收集完成
✍️ [写作者] 开始创作内容...
✅ [写作者] 初稿完成
👀 [审核员] 开始审核内容...
✅ [审核员] 审核完成
==================================================
任务完成!
最终内容长度:358 字符
==================================================案例 5:自动化数据分析 Agent
目标: 构建能够自动加载、分析、可视化数据的智能分析助手
功能需求:
- 自动加载多种格式数据(CSV、Excel、JSON)
- 数据探索和统计分析
- 自动生成可视化图表
- 生成分析报告
实现代码
# data_analysis_agent.py
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from typing import Dict, Any, List
import os
class DataAnalysisAgent(BaseAgent):
"""数据分析 Agent"""
def __init__(self, output_dir: str = "analysis_output"):
super().__init__()
self.output_dir = output_dir
self.current_data: pd.DataFrame = None
self.analysis_results = {}
os.makedirs(output_dir, exist_ok=True)
# 注册工具
self._register_tools()
def _register_tools(self):
"""注册数据分析工具"""
tools = [
Tool(
name="load_data",
description="加载数据文件",
parameters={"file_path": "文件路径"},
func=self.load_data
),
Tool(
name="describe_data",
description="数据描述统计",
parameters={},
func=self.describe_data
),
Tool(
name="analyze_correlation",
description="相关性分析",
parameters={},
func=self.analyze_correlation
),
Tool(
name="generate_plot",
description="生成可视化图表",
parameters={"plot_type": "图表类型", "columns": "列名列表"},
func=self.generate_plot
),
Tool(
name="generate_report",
description="生成分析报告",
parameters={},
func=self.generate_report
)
]
for tool in tools:
self.add_tool(tool)
def load_data(self, file_path: str) -> str:
"""加载数据"""
try:
if file_path.endswith('.csv'):
self.current_data = pd.read_csv(file_path)
elif file_path.endswith(('.xlsx', '.xls')):
self.current_data = pd.read_excel(file_path)
elif file_path.endswith('.json'):
self.current_data = pd.read_json(file_path)
else:
return f"❌ 不支持的文件格式:{file_path}"
self.add_memory(
content=f"加载数据:{file_path}, 行数:{len(self.current_data)}, 列数:{len(self.current_data.columns)}",
mem_type="short_term",
tags=["data", "load"]
)
return f"✅ 数据加载成功\n行数:{len(self.current_data)}\n列数:{len(self.current_data.columns)}\n列名:{list(self.current_data.columns)}"
except Exception as e:
return f"❌ 加载失败:{e}"
def describe_data(self) -> str:
"""数据描述统计"""
if self.current_data is None:
return "❌ 请先加载数据"
description = "📊 数据描述统计\n" + "=" * 50 + "\n\n"
# 基本信息
description += f"**数据维度**: {self.current_data.shape[0]} 行 × {self.current_data.shape[1]} 列\n\n"
# 数据类型
description += "**数据类型**:\n"
for col, dtype in self.current_data.dtypes.items():
description += f" • {col}: {dtype}\n"
description += "\n"
# 数值列统计
numeric_cols = self.current_data.select_dtypes(include=[np.number]).columns
if len(numeric_cols) > 0:
description += "**数值列统计**:\n"
desc_df = self.current_data[numeric_cols].describe()
description += desc_df.to_string() + "\n\n"
# 缺失值
description += "**缺失值统计**:\n"
missing = self.current_data.isnull().sum()
missing_pct = (missing / len(self.current_data) * 100).round(2)
for col, (count, pct) in zip(missing.index, zip(missing, missing_pct)):
if count > 0:
description += f" • {col}: {count} ({pct}%)\n"
if missing.sum() == 0:
description += " 无缺失值\n"
return description
def analyze_correlation(self) -> str:
"""相关性分析"""
if self.current_data is None:
return "❌ 请先加载数据"
numeric_data = self.current_data.select_dtypes(include=[np.number])
if numeric_data.shape[1] < 2:
return "❌ 数值列不足,无法进行相关性分析"
corr_matrix = numeric_data.corr()
# 找出强相关
strong_corr = []
for i in range(len(corr_matrix.columns)):
for j in range(i+1, len(corr_matrix.columns)):
corr_value = corr_matrix.iloc[i, j]
if abs(corr_value) > 0.7:
strong_corr.append((
corr_matrix.columns[i],
corr_matrix.columns[j],
corr_value
))
result = "🔗 相关性分析\n" + "=" * 50 + "\n\n"
result += "**强相关变量对** (|r| > 0.7):\n"
if strong_corr:
for col1, col2, corr in strong_corr:
strength = "正相关" if corr > 0 else "负相关"
result += f" • {col1} ↔ {col2}: {corr:.3f} ({strength})\n"
else:
result += " 未发现强相关变量对\n"
# 保存相关性热图
plt.figure(figsize=(10, 8))
sns.heatmap(corr_matrix, annot=True, cmap='coolwarm', center=0, fmt='.2f')
plt.title('变量相关性热图')
plt.tight_layout()
plot_path = os.path.join(self.output_dir, 'correlation_heatmap.png')
plt.savefig(plot_path, dpi=150)
plt.close()
result += f"\n✅ 相关性热图已保存:{plot_path}"
self.analysis_results['correlation'] = corr_matrix
return result
def generate_plot(self, plot_type: str = "histogram", columns: List[str] = None) -> str:
"""生成可视化图表"""
if self.current_data is None:
return "❌ 请先加载数据"
if not columns:
columns = self.current_data.select_dtypes(include=[np.number]).columns[:5].tolist()
plt.style.use('seaborn-v0_8')
if plot_type == "histogram":
fig, axes = plt.subplots(2, 2, figsize=(12, 10))
axes = axes.flatten()
for idx, col in enumerate(columns[:4]):
if col in self.current_data.columns:
self.current_data[col].hist(ax=axes[idx], bins=30, edgecolor='black', alpha=0.7)
axes[idx].set_title(f'{col} 分布')
axes[idx].set_xlabel(col)
axes[idx].set_ylabel('频数')
plt.suptitle('数据分布直方图', fontsize=16)
plt.tight_layout()
elif plot_type == "scatter":
if len(columns) >= 2:
plt.figure(figsize=(10, 8))
plt.scatter(
self.current_data[columns[0]],
self.current_data[columns[1]],
alpha=0.6,
edgecolors='w',
linewidth=0.5
)
plt.xlabel(columns[0])
plt.ylabel(columns[1])
plt.title(f'{columns[0]} vs {columns[1]} 散点图')
plt.grid(True, alpha=0.3)
elif plot_type == "boxplot":
plt.figure(figsize=(12, 6))
numeric_data = self.current_data[columns].select_dtypes(include=[np.number])
numeric_data.boxplot(rot=45)
plt.title('数据箱线图')
plt.ylabel('数值')
# 保存图片
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
plot_path = os.path.join(self.output_dir, f'plot_{plot_type}_{timestamp}.png')
plt.savefig(plot_path, dpi=150, bbox_inches='tight')
plt.close()
return f"✅ 图表已生成:{plot_path}\n类型:{plot_type}\n列:{columns}"
def generate_report(self) -> str:
"""生成分析报告"""
if self.current_data is None:
return "❌ 请先加载数据"
report = f"""# 数据分析报告
**生成时间**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
---
## 一、数据概览
- **数据源**: {getattr(self, 'data_source', '未知')}
- **样本量**: {len(self.current_data)} 行
- **特征数**: {len(self.current_data.columns)} 列
- **特征类型**:
- 数值型:{len(self.current_data.select_dtypes(include=[np.number]).columns)} 个
- 分类型:{len(self.current_data.select_dtypes(include=['object', 'category']).columns)} 个
## 二、数据质量
### 2.1 缺失值情况
{self.current_data.isnull().sum().to_string()}
### 2.2 数据完整性
完整率:{(1 - self.current_data.isnull().sum().sum() / (len(self.current_data) * len(self.current_data.columns))) * 100:.2f}%
## 三、描述性统计
{self.current_data.describe().to_string()}
## 四、关键发现
[基于数据分析的关键发现]
## 五、可视化图表
- 相关性热图:`correlation_heatmap.png`
- 分布直方图:`plot_histogram_*.png`
- [其他图表]
## 六、建议与下一步
1. [数据清洗建议]
2. [特征工程建议]
3. [建模方向建议]
---
*报告由 AI 数据分析 Agent 自动生成*
"""
# 保存报告
report_path = os.path.join(self.output_dir, f'analysis_report_{datetime.now().strftime("%Y%m%d_%H%M%S")}.md')
with open(report_path, 'w', encoding='utf-8') as f:
f.write(report)
return f"✅ 分析报告已生成:{report_path}"
# 使用示例(创建示例数据)
if __name__ == "__main__":
# 创建示例 CSV 数据
sample_data = {
'age': np.random.randint(20, 60, 100),
'income': np.random.randint(3000, 20000, 100),
'spending': np.random.randint(1000, 10000, 100),
'satisfaction': np.random.randint(1, 10, 100),
'education_years': np.random.randint(8, 20, 100)
}
df = pd.DataFrame(sample_data)
df.to_csv('sample_data.csv', index=False)
# 创建分析 Agent
agent = DataAnalysisAgent()
# 执行分析流程
print(agent.load_data('sample_data.csv'))
print("\n" + agent.describe_data())
print("\n" + agent.analyze_correlation())
print("\n" + agent.generate_plot('histogram'))
print("\n" + agent.generate_report())运行结果示例
✅ 数据加载成功
行数:100
列数:5
列名:['age', 'income', 'spending', 'satisfaction', 'education_years']
📊 数据描述统计
==================================================
**数据维度**: 100 行 × 5 列
**数据类型**:
• age: int64
• income: int64
• spending: int64
• satisfaction: int64
• education_years: int64
**数值列统计**:
age income ...
count 100.000000 100.000000 ...
mean 40.500000 11500.0000 ...
✅ 图表已生成:analysis_output/plot_histogram_20260325_020000.png
✅ 分析报告已生成:analysis_output/analysis_report_20260325_020000.md四、高级主题
4.1 向量记忆系统
使用向量数据库实现语义记忆检索:
# vector_memory.py
import chromadb
from chromadb.config import Settings
from sentence_transformers import SentenceTransformer
class VectorMemory:
"""向量记忆系统"""
def __init__(self, persist_dir: str = "./memory_db"):
self.client = chromadb.Client(Settings(
persist_directory=persist_dir,
anonymized_telemetry=False
))
self.collection = self.client.get_or_create_collection(
name="agent_memory",
metadata={"hnsw:space": "cosine"}
)
self.embedding_model = SentenceTransformer('paraphrase-multilingual-MiniLM-L12-v2')
def add_memory(self, content: str, metadata: dict = None):
"""添加记忆"""
embedding = self.embedding_model.encode(content).tolist()
self.collection.add(
embeddings=[embedding],
documents=[content],
metadatas=[metadata or {}],
ids=[f"mem_{datetime.now().timestamp()}"]
)
def search_memories(self, query: str, n_results: int = 5) -> List[Dict]:
"""搜索相关记忆"""
query_embedding = self.embedding_model.encode(query).tolist()
results = self.collection.query(
query_embeddings=[query_embedding],
n_results=n_results,
include=["documents", "metadatas", "distances"]
)
memories = []
for i, doc in enumerate(results['documents'][0]):
memories.append({
'content': doc,
'metadata': results['metadatas'][0][i],
'distance': results['distances'][0][i]
})
return memories4.2 工具调用优化
使用函数调用(Function Calling)提高工具调用准确性:
# function_calling.py
from openai import OpenAI
import json
class FunctionCallingAgent:
"""使用 Function Calling 的 Agent"""
def __init__(self, api_key: str):
self.client = OpenAI(api_key=api_key)
self.tools = self._define_tools()
def _define_tools(self):
"""定义工具 schema"""
return [
{
"type": "function",
"function": {
"name": "search_web",
"description": "搜索互联网获取信息",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "搜索关键词"
}
},
"required": ["query"]
}
}
},
{
"type": "function",
"function": {
"name": "calculate",
"description": "执行数学计算",
"parameters": {
"type": "object",
"properties": {
"expression": {
"type": "string",
"description": "数学表达式"
}
},
"required": ["expression"]
}
}
}
]
def chat(self, user_message: str) -> str:
"""对话处理"""
response = self.client.chat.completions.create(
model="gpt-4-turbo",
messages=[{"role": "user", "content": user_message}],
tools=self.tools,
tool_choice="auto"
)
message = response.choices[0].message
# 检查是否需要调用工具
if message.tool_calls:
for tool_call in message.tool_calls:
function_name = tool_call.function.name
function_args = json.loads(tool_call.function.arguments)
# 执行工具调用
result = self._execute_tool(function_name, function_args)
# 将结果返回给 LLM
second_response = self.client.chat.completions.create(
model="gpt-4-turbo",
messages=[
{"role": "user", "content": user_message},
message,
{
"role": "tool",
"tool_call_id": tool_call.id,
"content": str(result)
}
]
)
return second_response.choices[0].message.content
return message.content
def _execute_tool(self, function_name: str, args: dict):
"""执行工具"""
if function_name == "search_web":
# 实际实现调用搜索 API
return f"搜索结果:关于'{args['query']}'的信息..."
elif function_name == "calculate":
return eval(args['expression'])五、最佳实践与技巧
5.1 Agent 设计原则
- 单一职责:每个 Agent 专注于特定领域
- 明确边界:清晰定义 Agent 的能力和限制
- 可组合性:Agent 应该能够相互协作
- 可观测性:记录 Agent 的决策过程
- 安全性:限制危险操作,设置执行超时
5.2 常见陷阱与解决方案
| 问题 | 原因 | 解决方案 |
|---|---|---|
| 无限循环 | Agent 无法判断任务完成 | 设置最大迭代次数,添加完成条件检查 |
| 工具调用错误 | 参数格式不正确 | 使用 Pydantic 验证,添加错误处理 |
| 记忆膨胀 | 存储过多无关信息 | 实现记忆压缩和遗忘机制 |
| 响应缓慢 | LLM 调用频繁 | 缓存常用结果,批量处理 |
| 安全风险 | 代码执行无限制 | 沙箱隔离,白名单机制 |
5.3 性能优化技巧
# 1. 使用缓存
from functools import lru_cache
@lru_cache(maxsize=100)
def cached_llm_call(prompt: str) -> str:
return llm.generate(prompt)
# 2. 批量处理
async def batch_process(items: List, batch_size: int = 10):
for i in range(0, len(items), batch_size):
batch = items[i:i+batch_size]
await process_batch(batch)
# 3. 流式响应
def stream_response(agent, query: str):
for chunk in agent.generate_stream(query):
yield chunk六、总结与展望
6.1 核心要点回顾
- Agent 架构:感知 - 认知 - 行动三层架构是设计基础
- 工具调用:通过 Function Calling 实现安全可靠的工具执行
- 记忆系统:结合短期记忆和长期向量记忆
- 多 Agent 协作:通过消息传递实现复杂任务分解
- 安全优先:沙箱执行、超时控制、权限限制
6.2 未来发展方向
- 自主性增强:更复杂的规划和决策能力
- 多模态融合:结合视觉、语音等多模态输入
- 持续学习:从交互中不断改进
- 人机协作:更自然的交互界面
- 标准化框架:行业标准的 Agent 协议
6.3 推荐资源
框架与库:
- LangChain: https://github.com/langchain-ai/langchain
- LlamaIndex: https://github.com/run-llama/llama_index
- AutoGen: https://github.com/microsoft/autogen
- CrewAI: https://github.com/joaomdmoura/crewAI
学习资源:
- LangChain 官方文档
- Hugging Face Course
- DeepLearning.AI Agent 课程
FAQ
Q1: Agent 和普通 LLM 应用有什么区别?
A: Agent 具有自主性、记忆能力和工具调用能力,能够主动感知环境、规划多步任务并执行。普通 LLM 应用通常是被动的问答系统。
Q2: 如何选择合适的 Agent 框架?
A:
- 快速原型:LangChain
- 文档处理:LlamaIndex
- 多 Agent 协作:AutoGen、CrewAI
- 生产环境:考虑自定义实现
Q3: Agent 的安全性如何保证?
A:
- 代码执行使用沙箱隔离
- 设置执行超时和资源限制
- 工具调用前进行参数验证
- 敏感操作需要人工确认
Q4: 如何提高 Agent 的响应速度?
A:
- 使用缓存减少重复 LLM 调用
- 选择更快的模型(如 GPT-3.5-Turbo)
- 批量处理多个任务
- 使用流式响应
Q5: Agent 可以处理多复杂的任务?
A: 理论上可以处理任意复杂度的任务,但实际受限于:
- LLM 的推理能力
- 工具的功能范围
- 记忆系统的容量
- 执行时间和成本
建议将复杂任务分解为多个子任务,使用多 Agent 协作完成。
文章字数: 约 12,500 字
实战案例: 5 个(研究助手、代码 Agent、任务管理、多 Agent 协作、数据分析)
完成时间: 2026-03-25