第02章 任务分解:把复杂目标拆成可执行步骤
第02章 任务分解:把复杂目标拆成可执行步骤
“所有宏大的成就,都始于把它拆成足够小的部分。复杂性不会凭空消失,它只会被分解成可管理的单元。” —— 软件工程原则
“帮我搭建一个自动化报告系统”——这句话对 Agent 来说,就像让一位刚入职的实习生负责"优化整个公司的运营效率"。目标是真实的,但没有被分解,它就是一句空话。
任务分解(Task Decomposition)是规划的第一步,也是最关键的一步。分解得好,后续执行水到渠成;分解得差,再好的执行器也会迷失方向。
2.1 分解的核心思想:HTN 层次任务网络
# ====================================================
# 层次任务网络(Hierarchical Task Network, HTN)
# ====================================================
# HTN是经典规划领域的核心模型:任务分为"复合任务"和"原子任务"
# 复合任务可以被分解为子任务,直到所有任务都是原子任务(可直接执行)
from dataclasses import dataclass, field
from typing import List, Optional, Callable, Any
from enum import Enum
class TaskType(Enum):
COMPOUND = "compound" # 复合任务:需要进一步分解
ATOMIC = "atomic" # 原子任务:可以直接执行
@dataclass
class Task:
"""HTN任务节点"""
id: str
title: str
description: str
task_type: TaskType
subtasks: List["Task"] = field(default_factory=list)
action: Optional[Callable] = None # 原子任务的执行函数
estimated_duration: float = 0.0 # 估计耗时(秒)
def is_executable(self) -> bool:
"""是否可以直接执行"""
return self.task_type == TaskType.ATOMIC and self.action is not None
def total_subtasks(self) -> int:
"""递归统计所有叶节点数"""
if self.task_type == TaskType.ATOMIC:
return 1
return sum(t.total_subtasks() for t in self.subtasks)
# ---- 构建一个真实的任务树 ----
def build_report_task_tree() -> Task:
"""构建'生成竞品分析报告'的任务树"""
# === 原子任务(叶节点)===
search_competitor_a = Task(
id="t1.1.1",
title="搜索竞品A官网信息",
description="访问竞品A官网,提取产品功能列表、定价和核心卖点",
task_type=TaskType.ATOMIC,
estimated_duration=30,
)
search_competitor_b = Task(
id="t1.1.2",
title="搜索竞品B官网信息",
description="访问竞品B官网,提取产品功能列表、定价和核心卖点",
task_type=TaskType.ATOMIC,
estimated_duration=30,
)
search_reviews = Task(
id="t1.2.1",
title="搜索用户评价",
description="在G2、ProductHunt等平台搜索竞品的用户评价",
task_type=TaskType.ATOMIC,
estimated_duration=45,
)
build_matrix = Task(
id="t2.1",
title="构建功能对比矩阵",
description="将收集到的信息整理成结构化的功能对比表格",
task_type=TaskType.ATOMIC,
estimated_duration=60,
)
write_intro = Task(
id="t3.1",
title="撰写报告引言",
description="写分析背景、研究范围和方法论",
task_type=TaskType.ATOMIC,
estimated_duration=20,
)
write_analysis = Task(
id="t3.2",
title="撰写核心分析",
description="基于对比矩阵,分析各竞品的优劣势",
task_type=TaskType.ATOMIC,
estimated_duration=90,
)
write_conclusion = Task(
id="t3.3",
title="撰写结论与建议",
description="给出战略性建议和行动计划",
task_type=TaskType.ATOMIC,
estimated_duration=30,
)
# === 复合任务(中间节点)===
collect_info = Task(
id="t1.1",
title="收集官网信息",
description="从各竞品官网收集产品信息",
task_type=TaskType.COMPOUND,
subtasks=[search_competitor_a, search_competitor_b],
)
collect_social = Task(
id="t1.2",
title="收集社区反馈",
description="从评论平台和社区收集用户反馈",
task_type=TaskType.COMPOUND,
subtasks=[search_reviews],
)
gather_data = Task(
id="t1",
title="数据收集阶段",
description="收集所有需要的竞品信息",
task_type=TaskType.COMPOUND,
subtasks=[collect_info, collect_social],
)
analyze_data = Task(
id="t2",
title="数据分析阶段",
description="分析和整理收集到的数据",
task_type=TaskType.COMPOUND,
subtasks=[build_matrix],
)
write_report = Task(
id="t3",
title="报告撰写阶段",
description="基于分析结果撰写完整报告",
task_type=TaskType.COMPOUND,
subtasks=[write_intro, write_analysis, write_conclusion],
)
# === 顶层任务 ===
root = Task(
id="t0",
title="生成竞品分析报告",
description="完整的竞品分析报告,涵盖功能、定价、用户反馈和战略建议",
task_type=TaskType.COMPOUND,
subtasks=[gather_data, analyze_data, write_report],
)
return root
# ---- 可视化任务树 ----
def print_task_tree(task: Task, indent: int = 0) -> None:
"""打印任务树结构"""
prefix = " " * indent
icon = "📋" if task.task_type == TaskType.COMPOUND else "✅"
print(f"{prefix}{icon} [{task.id}] {task.title}")
if task.subtasks:
for subtask in task.subtasks:
print_task_tree(subtask, indent + 1)
# 测试
tree = build_report_task_tree()
print_task_tree(tree)
print(f"\n总原子任务数: {tree.total_subtasks()}")
# 输出:
# 📋 [t0] 生成竞品分析报告
# 📋 [t1] 数据收集阶段
# 📋 [t1.1] 收集官网信息
# ✅ [t1.1.1] 搜索竞品A官网信息
# ✅ [t1.1.2] 搜索竞品B官网信息
# 📋 [t1.2] 收集社区反馈
# ✅ [t1.2.1] 搜索用户评价
# 📋 [t2] 数据分析阶段
# ✅ [t2.1] 构建功能对比矩阵
# 📋 [t3] 报告撰写阶段
# ✅ [t3.1] 撰写报告引言
# ✅ [t3.2] 撰写核心分析
# ✅ [t3.3] 撰写结论与建议
# 总原子任务数: 7
2.2 用 LLM 自动生成任务分解
# ====================================================
# 用GPT-4o自动分解任务
# ====================================================
import asyncio
import json
from openai import AsyncOpenAI
client = AsyncOpenAI()
DECOMPOSE_SYSTEM_PROMPT = """你是一个专业的任务规划专家。
当用户给你一个目标时,你的任务是将其分解为层次化的可执行子任务。
分解规则:
1. 每个叶节点任务(原子任务)必须可以被一个工具调用或一次LLM生成完成
2. 每个任务应该有明确的"完成标准"(什么情况下算做完了)
3. 最多3层嵌套,每层不超过5个子任务
4. 为每个任务估计耗时(秒)
5. 标注哪些子任务可以并行执行
以JSON格式返回,结构为:
{
"task_tree": {
"id": "t0",
"title": "...",
"type": "compound|atomic",
"completion_criteria": "...",
"estimated_duration": 0,
"can_parallelize_children": true|false,
"subtasks": [...]
}
}
"""
async def decompose_goal_with_llm(goal: str, context: str = "") -> dict:
"""
使用LLM将高层目标分解为任务树
Args:
goal: 高层目标描述
context: 上下文信息(可用工具、约束条件等)
Returns:
任务树字典
"""
user_message = f"目标:{goal}"
if context:
user_message += f"\n\n上下文:{context}"
response = await client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": DECOMPOSE_SYSTEM_PROMPT},
{"role": "user", "content": user_message}
],
response_format={"type": "json_object"},
temperature=0.3, # 低温度,保证结构一致性
)
result = json.loads(response.choices[0].message.content)
return result["task_tree"]
async def decompose_with_tools_context(goal: str, available_tools: list) -> dict:
"""
感知可用工具的任务分解(第8章会深入展开)
"""
tools_description = "\n".join([
f"- {tool['name']}: {tool['description']}"
for tool in available_tools
])
context = f"""
可用工具:
{tools_description}
请确保每个原子任务都能用上述工具之一完成。
如果某个子目标无法用现有工具完成,请标注为"需要人工干预"。
"""
return await decompose_goal_with_llm(goal, context)
# ---- 测试分解效果 ----
async def main():
goal = "帮我分析我们公司过去3个月的客户支持工单,找出最常见的问题类型,并给出改进建议"
available_tools = [
{"name": "read_file", "description": "读取本地文件内容"},
{"name": "web_search", "description": "搜索互联网信息"},
{"name": "code_execute", "description": "执行Python代码"},
{"name": "llm_generate", "description": "使用LLM生成文本内容"},
{"name": "data_analyze", "description": "对结构化数据进行统计分析"},
]
print("开始分解目标...")
task_tree = await decompose_with_tools_context(goal, available_tools)
print("\n任务树:")
print(json.dumps(task_tree, ensure_ascii=False, indent=2))
# 统计原子任务数
def count_atomic(node):
if node["type"] == "atomic":
return 1
return sum(count_atomic(child) for child in node.get("subtasks", []))
print(f"\n总原子任务数: {count_atomic(task_tree)}")
# 找出可并行的任务
def find_parallel_groups(node, depth=0):
if node.get("can_parallelize_children") and node.get("subtasks"):
children_titles = [s["title"] for s in node["subtasks"]]
print(f"并行机会(深度{depth}): {children_titles}")
for child in node.get("subtasks", []):
find_parallel_groups(child, depth + 1)
print("\n并行执行机会:")
find_parallel_groups(task_tree)
asyncio.run(main())
2.3 分解质量控制:避免常见陷阱
# ====================================================
# 任务分解的质量检查器
# ====================================================
from typing import List, Tuple
class DecompositionValidator:
"""任务分解质量检查器"""
def __init__(self, max_depth: int = 3, max_children: int = 5):
self.max_depth = max_depth
self.max_children = max_children
def validate(self, task_tree: dict) -> Tuple[bool, List[str]]:
"""
检查任务树是否符合质量要求
返回:(是否通过, 问题列表)
"""
issues = []
self._check_node(task_tree, depth=0, issues=issues)
return len(issues) == 0, issues
def _check_node(self, node: dict, depth: int, issues: List[str]) -> None:
"""递归检查每个节点"""
# 检查1:深度限制
if depth > self.max_depth:
issues.append(
f"任务 '{node.get('title')}' 嵌套过深(深度{depth}),"
f"建议不超过{self.max_depth}层"
)
# 检查2:子任务数量
subtasks = node.get("subtasks", [])
if len(subtasks) > self.max_children:
issues.append(
f"任务 '{node.get('title')}' 有{len(subtasks)}个子任务,"
f"建议不超过{self.max_children}个"
)
# 检查3:复合任务必须有子任务
if node.get("type") == "compound" and not subtasks:
issues.append(
f"复合任务 '{node.get('title')}' 没有子任务,"
f"请进一步分解或改为原子任务"
)
# 检查4:原子任务不能有子任务
if node.get("type") == "atomic" and subtasks:
issues.append(
f"原子任务 '{node.get('title')}' 不应该有子任务,"
f"请改为复合任务"
)
# 检查5:必须有完成标准
if not node.get("completion_criteria"):
issues.append(
f"任务 '{node.get('title')}' 缺少完成标准,"
f"请添加 completion_criteria 字段"
)
# 检查6:ID唯一性(收集所有ID)
# (简化版,实际需要收集全树ID后检查)
# 递归检查子任务
for child in subtasks:
self._check_node(child, depth + 1, issues)
async def auto_fix(
self,
task_tree: dict,
issues: List[str]
) -> dict:
"""尝试自动修复分解问题"""
fix_prompt = f"""
以下任务树存在质量问题,请修复:
任务树:
{json.dumps(task_tree, ensure_ascii=False, indent=2)}
问题列表:
{chr(10).join(f"- {issue}" for issue in issues)}
请返回修复后的任务树(相同的JSON格式)。
"""
response = await client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": "你是任务规划专家,专门修复任务树的结构问题。"},
{"role": "user", "content": fix_prompt}
],
response_format={"type": "json_object"},
)
return json.loads(response.choices[0].message.content)
# ---- 迭代分解:反复优化直到满足质量标准 ----
async def iterative_decompose(goal: str, max_iterations: int = 3) -> dict:
"""
迭代分解:生成 → 验证 → 修复,直到满足质量标准
"""
validator = DecompositionValidator()
# 初始分解
task_tree = await decompose_goal_with_llm(goal)
for iteration in range(max_iterations):
is_valid, issues = validator.validate(task_tree)
if is_valid:
print(f"第{iteration + 1}次迭代后,任务分解通过质量检查 ✅")
return task_tree
print(f"第{iteration + 1}次迭代,发现 {len(issues)} 个问题:")
for issue in issues:
print(f" - {issue}")
# 尝试自动修复
task_tree = await validator.auto_fix(task_tree, issues)
# 即使未完全修复,也返回最优版本
print(f"达到最大迭代次数,返回当前最优版本")
return task_tree
# ---- 任务分解的三种粒度测试 ----
async def granularity_experiment():
"""实验:不同粒度的任务分解对比"""
goal = "发布一篇技术博客文章"
# 策略1:粗粒度(2-3步)
coarse_prompt = f"""
请将以下目标分解为2-3个高层步骤(每步可能需要数小时):
目标:{goal}
返回JSON任务树。
"""
# 策略2:中粒度(5-8步,推荐)
medium_prompt = f"""
请将以下目标分解为5-8个中等粒度的步骤(每步约15-30分钟):
目标:{goal}
每步应该是具体可操作的单元,可以由一个工具调用完成。
返回JSON任务树。
"""
# 策略3:细粒度(15+步)
fine_prompt = f"""
请将以下目标分解为非常细粒度的步骤(每步约2-5分钟):
目标:{goal}
返回JSON任务树。
"""
results = {}
for name, prompt in [("粗粒度", coarse_prompt), ("中粒度", medium_prompt), ("细粒度", fine_prompt)]:
response = await client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}],
response_format={"type": "json_object"},
)
tree = json.loads(response.choices[0].message.content)
def count_leaves(node):
if not node.get("subtasks"):
return 1
return sum(count_leaves(c) for c in node["subtasks"])
results[name] = {
"leaf_count": count_leaves(tree.get("task_tree", tree)),
"tree": tree
}
print(f"{name}: {results[name]['leaf_count']} 个原子任务")
# 推荐:中粒度在实践中效果最好
print("\n推荐使用中粒度分解(5-8个原子任务)")
return results
asyncio.run(granularity_experiment())
本章小结
-
HTN 是任务分解的经典模型:复合任务 → 原子任务的递归分解,每个叶节点都必须是可直接执行的具体动作,这是规划可落地的基础。
-
LLM 可以自动做分解:但需要提供正确的 System Prompt,包含分解规则、粒度要求和输出格式。同时需要工具上下文,才能生成工具感知的计划。
-
中粒度是最佳实践:5-8 个原子任务最合适,粒度太粗导致每步不可操作,粒度太细导致计划本身成为负担。
-
分解质量需要自动验证:通过 Validator 检查深度、子任务数、完成标准等,发现问题后通过迭代修复,而不是依赖一次生成就完美。
-
分解不是一次性的:在动态任务中,分解会随着新信息的出现而更新。第6章的"动态重规划"会深入讲解如何在执行中持续优化任务树。
# 核心行动:为你的项目写一个通用任务分解函数
import asyncio
from openai import AsyncOpenAI
async def decompose_for_your_project(goal: str) -> dict:
"""
基于本章的 iterative_decompose,
为你的具体项目定制任务分解器
"""
# 加入你的项目工具列表和约束条件
your_tools = [
# {"name": "your_tool", "description": "你的工具描述"},
]
task_tree = await decompose_with_tools_context(goal, your_tools)
validator = DecompositionValidator()
is_valid, issues = validator.validate(task_tree)
if not is_valid:
task_tree = await validator.auto_fix(task_tree, issues)
return task_tree
# 测试你的分解器
asyncio.run(decompose_for_your_project("你的业务目标"))
本章提示词模板
【模板1:层次任务分解提示词】
你是一个专业的任务规划师。请将以下目标分解为层次化的可执行任务树。
目标:{goal}
可用工具:{tools_list}
约束条件:{constraints}
分解要求:
- 最多3层嵌套
- 每个叶节点(原子任务)必须可以用一个工具调用完成
- 为每个任务写明"完成标准"(什么情况下算完成)
- 标注哪些兄弟任务可以并行执行
- 预估每个原子任务的耗时
以JSON格式返回任务树。
【模板2:分解优化提示词】
以下任务分解方案存在问题,请帮我优化。
原始目标:{goal}
当前分解:{current_plan}
优化要求:
1. 如果某个步骤太模糊(不知道具体怎么做),请进一步细化
2. 如果某个步骤太细碎(合并更合理),请合并
3. 找出任何被遗漏的关键步骤
4. 识别步骤之间的真实依赖关系(A必须在B之前 vs. A和B可以同时进行)
返回优化后的任务树和优化说明。