27.3 自主规划算法

17 分钟阅读

自主规划算法概述#

自主规划算法是Agentic AI系统的核心能力之一,它允许AI系统根据目标自主分解任务、制定执行计划、监控执行进度,并根据实际情况调整计划。

规划算法的基本概念#

1. 什么是自主规划#

自主规划是指AI系统在没有人类干预的情况下,根据给定的目标,自动生成和执行任务计划的过程。

自主规划的特点:

  • 目标驱动: 以目标为导向进行规划
  • 任务分解: 将复杂目标分解为可执行的子任务
  • 动态调整: 根据执行情况动态调整计划
  • 资源管理: 合理分配和利用资源

2. 规划算法分类#

算法类型特点适用场景
前向搜索从初始状态到目标状态目标明确,状态空间小
后向搜索从目标状态到初始状态目标状态明确,初始状态复杂
双向搜索同时从两端搜索状态空间大,两端明确
层次规划分层规划,逐步细化复杂任务,多层级目标
部分规划规划部分路径,边执行边规划动态环境,不确定性高

任务分解算法#

1. 层次任务分解#

示例:层次任务分解

用户请求: "实现一个层次任务分解算法"

Claude Code 生成的代码:

python
````python from typing import Dict, List, Any, Optional from dataclasses import dataclass, field from enum import Enum import logging logger = logging.getLogger(__name__) class TaskStatus(Enum): """任务状态""" PENDING = "pending" IN_PROGRESS = "in_progress" COMPLETED = "completed" FAILED = "failed" CANCELLED = "cancelled" class TaskPriority(Enum): """任务优先级""" CRITICAL = 1 HIGH = 2 MEDIUM = 3 LOW = 4 @dataclass class Task: """任务""" id: str name: str description: str status: TaskStatus = TaskStatus.PENDING priority: TaskPriority = TaskPriority.MEDIUM dependencies: List[str] = field(default_factory=list) subtasks: List['Task'] = field(default_factory=list) estimated_duration: Optional[float] = None actual_duration: Optional[float] = None result: Optional[Any] = None error: Optional[str] = None metadata: Dict[str, Any] = field(default_factory=dict) class HierarchicalTaskPlanner: """层次任务规划器""" def __init__(self): self.tasks: Dict[str, Task] = {} self.task_graph: Dict[str, List[str]] = {} def add_task(self, task: Task): """添加任务""" self.tasks[task.id] = task self.task_graph[task.id] = task.dependencies logger.info(f"Task added: {task.id}") def decompose_task(self, task_id: str, max_depth: int = 3) -> List[Task]: """分解任务""" task = self.tasks.get(task_id) if not task: raise ValueError(f"Task not found: {task_id}") if max_depth <= 0: return [task] # 生成子任务 subtasks = self._generate_subtasks(task) # 递归分解子任务 all_tasks = [task] for subtask in subtasks: self.add_task(subtask) task.subtasks.append(subtask) all_tasks.extend(self.decompose_task(subtask.id, max_depth - 1)) # 更新任务图 self.task_graph[task.id] = [subtask.id for subtask in subtasks] return all_tasks def _generate_subtasks(self, task: Task) -> List[Task]: """生成子任务""" subtasks = [] # 根据任务类型生成子任务 task_type = task.metadata.get('type', 'default') if task_type == 'code_generation': subtasks = self._generate_code_generation_subtasks(task) elif task_type == 'code_review': subtasks = self._generate_code_review_subtasks(task) elif task_type == 'testing': subtasks = self._generate_testing_subtasks(task) elif task_type == 'deployment': subtasks = self._generate_deployment_subtasks(task) else: subtasks = self._generate_default_subtasks(task) return subtasks def _generate_code_generation_subtasks(self, task: Task) -> List[Task]: """生成代码生成子任务""" subtasks = [ Task( id=f"{task.id}_analyze_requirements", name="Analyze Requirements", description="Analyze the requirements for code generation", priority=TaskPriority.HIGH, metadata={'type': 'analysis'} ), Task( id=f"{task.id}_design_architecture", name="Design Architecture", description="Design the system architecture", priority=TaskPriority.HIGH, dependencies=[f"{task.id}_analyze_requirements"], metadata={'type': 'design'} ), Task( id=f"{task.id}_generate_code", name="Generate Code", description="Generate the code", priority=TaskPriority.HIGH, dependencies=[f"{task.id}_design_architecture"], metadata={'type': 'code_generation'} ), Task( id=f"{task.id}_review_code", name="Review Code", description="Review the generated code", priority=TaskPriority.MEDIUM, dependencies=[f"{task.id}_generate_code"], metadata={'type': 'code_review'} ) ] return subtasks def _generate_code_review_subtasks(self, task: Task) -> List[Task]: """生成代码审查子任务""" subtasks = [ Task( id=f"{task.id}_static_analysis", name="Static Analysis", description="Perform static code analysis", priority=TaskPriority.HIGH, metadata={'type': 'analysis'} ), Task( id=f"{task.id}_security_check", name="Security Check", description="Check for security vulnerabilities", priority=TaskPriority.HIGH, dependencies=[f"{task.id}_static_analysis"], metadata={'type': 'security'} ), Task( id=f"{task.id}_performance_check", name="Performance Check", description="Check for performance issues", priority=TaskPriority.MEDIUM, dependencies=[f"{task.id}_static_analysis"], metadata={'type': 'performance'} ), Task( id=f"{task.id}_generate_report", name="Generate Report", description="Generate review report", priority=TaskPriority.MEDIUM, dependencies=[ f"{task.id}_security_check", f"{task.id}_performance_check" ], metadata={'type': 'reporting'} ) ] return subtasks def _generate_testing_subtasks(self, task: Task) -> List[Task]: """生成测试子任务""" subtasks = [ Task( id=f"{task.id}_unit_tests", name="Unit Tests", description="Write and run unit tests", priority=TaskPriority.HIGH, metadata={'type': 'testing'} ), Task( id=f"{task.id}_integration_tests", name="Integration Tests", description="Write and run integration tests", priority=TaskPriority.HIGH, dependencies=[f"{task.id}_unit_tests"], metadata={'type': 'testing'} ), Task( id=f"{task.id}_e2e_tests",

name="E2E Tests", description="Write and run end-to-end tests", priority=TaskPriority.MEDIUM, dependencies=[f"{task.id}_integration_tests"], metadata={'type': 'testing'} ) ]

return subtasks

def _generate_deployment_subtasks(self, task: Task) -> List[Task]: """生成部署子任务""" subtasks = [ Task( id=f"{task.id}_build", name="Build", description="Build the application", priority=TaskPriority.HIGH, metadata={'type': 'build'} ), Task( id=f"{task.id}_test", name="Test", description="Run tests", priority=TaskPriority.HIGH, dependencies=[f"{task.id}_build"], metadata={'type': 'testing'} ), Task( id=f"{task.id}_deploy", name="Deploy", description="Deploy to production", priority=TaskPriority.HIGH, dependencies=[f"{task.id}_test"], metadata={'type': 'deployment'} ), Task( id=f"{task.id}_verify", name="Verify", description="Verify deployment", priority=TaskPriority.HIGH, dependencies=[f"{task.id}_deploy"], metadata={'type': 'verification'} ) ]

return subtasks

def _generate_default_subtasks(self, task: Task) -> List[Task]: """生成默认子任务""" subtasks = [ Task( id=f"{task.id}_prepare", name="Prepare", description="Prepare for task execution", priority=TaskPriority.HIGH, metadata={'type': 'preparation'} ), Task( id=f"{task.id}_execute", name="Execute", description="Execute the task", priority=TaskPriority.HIGH, dependencies=[f"{task.id}_prepare"], metadata={'type': 'execution'} ), Task( id=f"{task.id}_finalize", name="Finalize", description="Finalize the task", priority=TaskPriority.MEDIUM, dependencies=[f"{task.id}_execute"], metadata={'type': 'finalization'} ) ]

return subtasks

def get_execution_plan(self, task_id: str) -> List[Task]: """获取执行计划""" plan = [] visited = set()

def dfs(current_task_id): if current_task_id in visited: return visited.add(current_task_id)

先执行依赖任务

for dep_id in self.task_graph.get(current_task_id, []): dfs(dep_id)

添加当前任务

task = self.tasks.get(current_task_id) if task: plan.append(task)

dfs(task_id)

return plan

def visualize_plan(self, task_id: str) -> str: """可视化执行计划""" plan = self.get_execution_plan(task_id)

visualization = "Execution Plan:\n" for i, task in enumerate(plan, 1): status_icon = { TaskStatus.PENDING: "○", TaskStatus.IN_PROGRESS: "◐", TaskStatus.COMPLETED: "●", TaskStatus.FAILED: "✗", TaskStatus.CANCELLED: "⊘" }.get(task.status, "?")

visualization += f"{i}. {status_icon} {task.name}\n"

if task.subtasks: for j, subtask in enumerate(task.subtasks, 1): subtask_icon = { TaskStatus.PENDING: "○", TaskStatus.IN_PROGRESS: "◐", TaskStatus.COMPLETED: "●", TaskStatus.FAILED: "✗", TaskStatus.CANCELLED: "⊘" }.get(subtask.status, "?")

visualization += f" {j}. {subtask_icon} {subtask.name}\n"

return visualization

使用示例

planner = HierarchicalTaskPlanner()

添加主任务

main_task = Task( id="main", name="Develop User Authentication System", description="Develop a complete user authentication system", priority=TaskPriority.HIGH, metadata={'type': 'code_generation'} )

planner.add_task(main_task)

分解任务

planner.decompose_task("main", max_depth=2)

获取执行计划

plan = planner.get_execution_plan("main")

print("Execution Plan:") for task in plan: print(f" - {task.name}")

可视化计划

print("\n" + planner.visualize_plan("main"))

bash
> - 递归分解复杂任务
> - 自动生成子任务
> - 维护任务依赖关系
> - 生成执行计划

2. 动态任务调整#

示例:动态任务调整

用户请求: "实现动态任务调整算法" Claude Code 生成的代码:

python
`python from typing import Dict, List, Any, Optional from datetime import datetime, timedelta import logging logger = logging.getLogger(__name__) class DynamicTaskPlanner(HierarchicalTaskPlanner): """动态任务规划器""" def __init__(self): super().__init__() self.execution_history: Dict[str, List[Dict[str, Any]]] = {} self.performance_metrics: Dict[str, Dict[str, float]] = {} def monitor_execution(self, task_id: str, status: TaskStatus, result: Optional[Any] = None, error: Optional[str] = None): """监控任务执行""" task = self.tasks.get(task_id) if not task: return # 更新任务状态 task.status = status task.result = result task.error = error # 记录执行历史 if task_id not in self.execution_history: self.execution_history[task_id] = [] self.execution_history[task_id].append({ 'timestamp': datetime.utcnow(), 'status': status, 'result': result, 'error': error }) logger.info(f"Task {task_id} status updated to {status}") def analyze_performance(self, task_id: str) -> Dict[str, float]: """分析任务性能""" if task_id not in self.execution_history: return {} history = self.execution_history[task_id] # 计算性能指标 metrics = { 'total_executions': len(history), 'success_rate': 0.0, 'avg_duration': 0.0, 'failure_rate': 0.0 } successful = [h for h in history if h['status'] == TaskStatus.COMPLETED] failed = [h for h in history if h['status'] == TaskStatus.FAILED] if len(history) > 0: metrics['success_rate'] = len(successful) / len(history) metrics['failure_rate'] = len(failed) / len(history) self.performance_metrics[task_id] = metrics return metrics def adjust_plan(self, task_id: str) -> List[Task]: """调整执行计划""" task = self.tasks.get(task_id) if not task: return [] # 分析性能 metrics = self.analyze_performance(task_id) # 根据性能调整计划 if metrics.get('failure_rate', 0) > 0.5: # 失败率高,添加重试任务 return self._add_retry_tasks(task) elif metrics.get('avg_duration', 0) > 3600: # 执行时间长,优化任务 return self._optimize_tasks(task) else: # 正常情况,返回原计划 return self.get_execution_plan(task_id) def _add_retry_tasks(self, task: Task) -> List[Task]: """添加重试任务""" retry_task = Task( id=f"{task.id}_retry", name=f"Retry {task.name}", description=f"Retry failed task: {task.description}", priority=TaskPriority.HIGH, dependencies=[task.id], metadata={'type': 'retry', 'original_task_id': task.id} ) self.add_task(retry_task) # 更新任务图 self.task_graph[task.id].append(retry_task.id) return self.get_execution_plan(task.id) def _optimize_tasks(self, task: Task) -> List[Task]: """优化任务""" # 分解长时间运行的任务 if task.subtasks: # 已经有子任务,优化子任务 for subtask in task.subtasks: if subtask.estimated_duration and subtask.estimated_duration > 1800: self.decompose_task(subtask.id, max_depth=1) else: # 分解当前任务 self.decompose_task(task.id, max_depth=2) return self.get_execution_plan(task.id) def predict_completion_time(self, task_id: str) -> Optional[float]: """预测完成时间""" plan = self.get_execution_plan(task_id) total_time = 0.0 for task in plan: if task.estimated_duration: total_time += task.estimated_duration elif task.id in self.performance_metrics: # 使用历史平均时间 avg_duration = self.performance_metrics[task.id].get('avg_duration', 0) total_time += avg_duration return total_time if total_time > 0 else None # 使用示例 planner = DynamicTaskPlanner() # 添加任务 task = Task( id="task1", name="Process Large Dataset", description="Process a large dataset", priority=TaskPriority.HIGH, estimated_duration=7200, metadata={'type': 'data_processing'} ) planner.add_task(task) # 分解任务 planner.decompose_task("task1", max_depth=2) # 模拟执行 planner.monitor_execution("task1", TaskStatus.IN_PROGRESS) # 模拟失败 planner.monitor_execution("task1", TaskStatus.FAILED, error="Timeout") # 分析性能 metrics = planner.analyze_performance("task1") print(f"Performance metrics: {metrics}") # 调整计划 adjusted_plan = planner.adjust_plan("task1") print("\nAdjusted plan:") for task in adjusted_plan: print(f" - {task.name}") ```> **动态任务调整特点**: > - 实时监控任务执行 > - 分析任务性能 > - 根据性能调整计划 > - 预测完成时间 ``` ## 规划算法优化 ### 1. 并行任务调度 # 示例:并行任务调度 用户请求: "实现并行任务调度算法" Claude Code 生成的代码: ````python `python from typing import Dict, List, Set, Optional from collections import defaultdict, deque import logging logger = logging.getLogger(__name__) class ParallelTaskScheduler: """并行任务调度器""" def __init__(self, max_workers: int = 4): self.max_workers = max_workers self.tasks: Dict[str, Task] = {} self.dependencies: Dict[str, Set[str]] = defaultdict(set) self.dependents: Dict[str, Set[str]] = defaultdict(set) self.ready_tasks: Set[str] = set() self.running_tasks: Set[str] = set() self.completed_tasks: Set[str] = set() def add_task(self, task: Task): """添加任务""" self.tasks[task.id] = task # 添加依赖关系 for dep_id in task.dependencies: self.dependencies[task.id].add(dep_id) self.dependents[dep_id].add(task.id) # 如果没有依赖,标记为就绪 if not task.dependencies: self.ready_tasks.add(task.id) logger.info(f"Task added: {task.id}") def get_ready_tasks(self) -> List[Task]: """获取就绪任务""" available = self.ready_tasks - self.running_tasks return [self.tasks[task_id] for task_id in available] def start_task(self, task_id: str): """开始任务""" if task_id in self.ready_tasks and task_id not in self.running_tasks: self.running_tasks.add(task_id) self.ready_tasks.remove(task_id) logger.info(f"Task started: {task_id}") def complete_task(self, task_id: str, result: Optional[Any] = None, error: Optional[str] = None): """完成任务""" if task_id not in self.running_tasks: logger.warning(f"Task not running: {task_id}") return # 更新任务状态 task = self.tasks[task_id] task.status = TaskStatus.COMPLETED if not error else TaskStatus.FAILED task.result = result task.error = error # 更新集合 self.running_tasks.remove(task_id) self.completed_tasks.add(task_id) # 检查依赖此任务的任务 for dependent_id in self.dependents[task_id]: self.dependencies[dependent_id].remove(task_id) # 如果所有依赖都完成,标记为就绪 if not self.dependencies[dependent_id]: self.ready_tasks.add(dependent_id) logger.info(f"Task completed: {task_id}") def get_schedule(self) -> List[List[str]]: """获取调度计划""" schedule = [] remaining = set(self.tasks.keys()) while remaining: # 获取当前可以执行的任务 current_batch = [] for task_id in remaining: task = self.tasks[task_id] if all(dep in self.completed_tasks for dep in task.dependencies): current_batch.append(task_id) # 限制并发数 current_batch = current_batch[:self.max_workers] if not current_batch: logger.warning("Circular dependency detected") break schedule.append(current_batch) # 更新剩余任务 remaining -= set(current_batch) self.completed_tasks.update(current_batch) return schedule def visualize_schedule(self) -> str: """可视化调度""" schedule = self.get_schedule() visualization = "Task Schedule:\n" for i, batch in enumerate(schedule, 1): visualization += f"Batch {i} (parallel):\n" for task_id in batch: task = self.tasks[task_id] visualization += f" - {task.name}\n" return visualization # 使用示例 scheduler = ParallelTaskScheduler(max_workers=3) # 添加任务 tasks = [ Task(id="A", name="Task A", description="Task A", priority=TaskPriority.HIGH), Task(id="B", name="Task B", description="Task B", priority=TaskPriority.HIGH, dependencies=["A"]), Task(id="C", name="Task C", description="Task C", priority=TaskPriority.HIGH, dependencies=["A"]), Task(id="D", name="Task D", description="Task D", priority=TaskPriority.HIGH, dependencies=["B", "C"]), Task(id="E", name="Task E", description="Task E", priority=TaskPriority.HIGH, dependencies=["B"]), Task(id="F", name="Task F", description="Task F", priority=TaskPriority.HIGH, dependencies=["C"]), Task(id="G", name="Task G", description="Task G", priority=TaskPriority.HIGH, dependencies=["D", "E", "F"]) ] for task in tasks: scheduler.add_task(task) # 获取调度计划 schedule = scheduler.get_schedule() print("Schedule:") for i, batch in enumerate(schedule, 1): print(f"Batch {i}: {[scheduler.tasks[task_id].name for task_id in batch]}") # 可视化调度 print("\n" + scheduler.visualize_schedule()) ```> **并行任务调度特点**: > - 识别可并行执行的任务 > - 限制并发任务数量 > - 处理任务依赖关系 > - 生成调度计划 ``` ## 总结 自主规划算法包括: 1. **规划算法的基本概念**: 什么是自主规划、规划算法分类 2. **任务分解算法**: 层次任务分解、动态任务调整 3. **规划算法优化**: 并行任务调度 通过自主规划算法,Claude Code可以自主分解任务、制定执行计划,并根据实际情况动态调整。 在下一节中,我们将探讨记忆系统设计。 ```

标记本节教程为已读

记录您的学习进度,方便后续查看。