核心组件概述#
Claude Code 的核心组件是其功能实现的基础,包括意图识别、任务规划、工具调度等关键模块。深入理解这些组件对于掌握Claude Code的工作原理至关重要。
意图识别器#
1. 意图识别原理#
markdown# 示例:意图识别器 用户请求: "实现一个意图识别器" Claude Code 生成的代码: ```python from typing import Dict, List, Any, Optional from datetime import datetime import logging import re logger = logging.getLogger(__name__) class Intent: """意图""" def __init__(self, name: str, description: str, confidence: float = 0.0): self.name = name self.description = description self.confidence = confidence self.entities: Dict[str, Any] = {} self.metadata: Dict[str, Any] = {} class IntentRecognizer: """意图识别器""" def __init__(self): self.intents: Dict[str, Dict[str, Any]] = {} self.patterns: Dict[str, List[str]] = {} self.models: Dict[str, Any] = {} def register_intent(self, name: str, description: str, patterns: List[str] = None): """注册意图""" self.intents[name] = { 'name': name, 'description': description, 'patterns': patterns or [] } if patterns: self.patterns[name] = patterns logger.info(f"Intent registered: {name}") def recognize(self, text: str) -> Intent: """识别意图""" scores = {} # 基于模式匹配 for intent_name, patterns in self.patterns.items(): score = self._match_patterns(text, patterns) scores[intent_name] = score # 基于关键词匹配 for intent_name, intent_info in self.intents.items(): score = self._match_keywords(text, intent_info) scores[intent_name] = max(scores.get(intent_name, 0), score) # 基于模型预测 if self.models: model_scores = self._predict_with_model(text) for intent_name, score in model_scores.items(): scores[intent_name] = max(scores.get(intent_name, 0), score) # 选择最高分的意图 if not scores: return Intent("unknown", "Unknown intent", 0.0) best_intent_name = max(scores, key=scores.get) confidence = scores[best_intent_name] # 提取实体 entities = self._extract_entities(text, best_intent_name) intent = Intent( name=best_intent_name, description=self.intents[best_intent_name]['description'], confidence=confidence ) intent.entities = entities return intent def _match_patterns(self, text: str, patterns: List[str]) -> float: """匹配模式""" max_score = 0.0 for pattern in patterns: try: if re.search(pattern, text, re.IGNORECASE): max_score = max(max_score, 0.9) except re.error: continue return max_score def _match_keywords(self, text: str, intent_info: Dict[str, Any]) -> float: """匹配关键词""" keywords = intent_info.get('keywords', []) if not keywords: return 0.0 text_lower = text.lower() matched = sum(1 for keyword in keywords if keyword.lower() in text_lower) return matched / len(keywords) if keywords else 0.0 def _predict_with_model(self, text: str) -> Dict[str, float]: """使用模型预测""" # 这里可以使用机器学习模型进行预测 # 简化版本:返回空字典 return {} def _extract_entities(self, text: str, intent_name: str) -> Dict[str, Any]: """提取实体""" entities = {} # 根据意图类型提取实体 if intent_name == 'read_file': file_match = re.search(r'file\s*["\']?([^"\']+)["\']?', text, re.IGNORECASE) if file_match: entities['file_path'] = file_match.group(1) elif intent_name == 'write_file': file_match = re.search(r'file\s*["\']?([^"\']+)["\']?', text, re.IGNORECASE) content_match = re.search(r'content\s*["\']?([^"\']+)["\']?', text, re.IGNORECASE) if file_match: entities['file_path'] = file_match.group(1) if content_match: entities['content'] = content_match.group(1) elif intent_name == 'execute_code': code_match = re.search(r'code\s*["\']?([^"\']+)["\']?', text, re.IGNORECASE) language_match = re.search(r'language\s*["\']?([^"\']+)["\']?', text, re.IGNORECASE) if code_match: entities['code'] = code_match.group(1) if language_match: entities['language'] = language_match.group(1) return entities # 使用示例 recognizer = IntentRecognizer() # 注册意图 recognizer.register_intent( name='read_file', description='Read a file', patterns=[r'read\s+file', r'open\s+file', r'cat\s+file'], keywords=['read', 'file', 'open', 'cat'] ) recognizer.register_intent( name='write_file', description='Write to a file', patterns=[r'write\s+file', r'save\s+file', r'create\s+file'], keywords=['write', 'file', 'save', 'create'] ) recognizer.register_intent( name='execute_code', description='Execute code', patterns=[r'execute\s+code', r'run\s+code', r'eval\s+code'], keywords=['execute', 'code', 'run', 'eval'] ) # 识别意图 text = "Please read the file config.txt" intent = recognizer.recognize(text) print(f"Intent: {intent.name}")
print(f"Confidence: {intent.confidence}") print(f"Entities: {intent.entities}")
bash> **意图识别器特点**: > - 支持多种识别方法 > - 模式匹配 > - 关键词匹配 > - 实体提取 ## 任务规划器 ### 1. 任务规划原理 ```python # 示例:任务规划器 用户请求: "实现一个任务规划器" Claude Code 生成的代码: ```python from typing import Dict, List, Any, Optional from datetime import datetime, timedelta import logging logger = logging.getLogger(__name__) class Task: """任务""" def __init__(self, task_id: str, name: str, description: str, action: str): self.id = task_id self.name = name self.description = description self.action = action self.status = "pending" self.dependencies: List[str] = [] self.created_at = datetime.utcnow() self.started_at: Optional[datetime] = None self.completed_at: Optional[datetime] = None self.result: Optional[Any] = None self.error: Optional[str] = None class TaskPlanner: """任务规划器""" def __init__(self): self.tasks: Dict[str, Task] = {} self.task_graph: Dict[str, List[str]] = {} self.execution_plan: List[str] = [] def add_task(self, task: Task): """添加任务""" self.tasks[task.id] = task self.task_graph[task.id] = task.dependencies logger.info(f"Task added: {task.id}") def create_plan(self, goal: str, context: Dict[str, Any]) -> List[Task]: """创建执行计划""" # 根据目标和上下文生成任务 tasks = self._generate_tasks(goal, context) # 添加任务 for task in tasks: self.add_task(task) # 生成执行顺序 self.execution_plan = self._topological_sort() # 返回按顺序排列的任务 return [self.tasks[task_id] for task_id in self.execution_plan] def _generate_tasks(self, goal: str, context: Dict[str, Any]) -> List[Task]: """生成任务""" tasks = [] # 根据目标类型生成任务 if 'read' in goal.lower() and 'file' in goal.lower(): tasks = self._generate_file_reading_tasks(goal, context) elif 'write' in goal.lower() and 'file' in goal.lower(): tasks = self._generate_file_writing_tasks(goal, context) elif 'execute' in goal.lower() and 'code' in goal.lower(): tasks = self._generate_code_execution_tasks(goal, context) else: tasks = self._generate_default_tasks(goal, context) return tasks def _generate_file_reading_tasks(self, goal: str, context: Dict[str, Any]) -> List[Task]: """生成文件读取任务""" file_path = context.get('file_path', 'default.txt') tasks = [ Task( task_id="validate_file_path", name="Validate File Path", description=f"Validate file path: {file_path}", action="validate_file_path" ), Task( task_id="read_file", name="Read File", description=f"Read file: {file_path}", action="read_file", dependencies=["validate_file_path"] ), Task( task_id="parse_content", name="Parse Content", description="Parse file content", action="parse_content", dependencies=["read_file"] ) ] return tasks def _generate_file_writing_tasks(self, goal: str, context: Dict[str, Any]) -> List[Task]: """生成文件写入任务""" file_path = context.get('file_path', 'output.txt') content = context.get('content', '') tasks = [ Task( task_id="validate_file_path", name="Validate File Path", description=f"Validate file path: {file_path}", action="validate_file_path" ), Task( task_id="prepare_content", name="Prepare Content", description="Prepare content for writing", action="prepare_content", dependencies=["validate_file_path"] ), Task( task_id="write_file", name="Write File", description=f"Write content to file: {file_path}", action="write_file", dependencies=["prepare_content"] ), Task( task_id="verify_write", name="Verify Write", description="Verify file was written successfully", action="verify_write", dependencies=["write_file"] ) ] return tasks def _generate_code_execution_tasks(self, goal: str, context: Dict[str, Any]) -> List[Task]: """生成代码执行任务""" code = context.get('code', '') language = context.get('language', 'python') tasks = [ Task( task_id="validate_code", name="Validate Code", description="Validate code syntax", action="validate_code" ), Task( task_id="setup_environment", name="Setup Environment", description=f"Setup {language} execution environment", action="setup_environment", dependencies=["validate_code"] ), Task( task_id="execute_code", name="Execute Code", description="Execute the code", action="execute_code", dependencies=["setup_environment"] ), Task( task_id="capture_output", name="Capture Output", description="Capture execution output", action="capture_output", dependencies=["execute_code"] ) ] return tasks def _generate_default_tasks(self, goal: str, context: Dict[str, Any]) -> List[Task]: """生成默认任务""" tasks = [ Task( task_id="analyze_goal", name="Analyze Goal", description=f"Analyze goal: {goal}", action="analyze_goal" ), Task( task_id="execute_goal", name="Execute Goal", description=f"Execute goal: {goal}", action="execute_goal", dependencies=["analyze_goal"] ) ] return tasks def _topological_sort(self) -> List[str]: """拓扑排序""" visited = set() result = [] def visit(task_id: str): if task_id in visited: return visited.add(task_id) # 先访问依赖 for dep_id in self.task_graph.get(task_id, []): visit(dep_id) result.append(task_id) for task_id in self.tasks: visit(task_id) return result def update_task_status(self, task_id: str, status: str, result: Any = None, error: str = None): """更新任务状态""" task = self.tasks.get(task_id) if not task: logger.warning(f"Task not found: {task_id}") return task.status = status if status == "in_progress": task.started_at = datetime.utcnow() elif status == "completed": task.completed_at = datetime.utcnow() task.result = result elif status == "failed": task.completed_at = datetime.utcnow() task.error = error logger.info(f"Task {task_id} status updated to {status}") def get_plan_status(self) -> Dict[str, Any]: """获取计划状态""" total_tasks = len(self.tasks) completed_tasks = len([t for t in self.tasks.values() if t.status == "completed"]) failed_tasks = len([t for t in self.tasks.values() if t.status == "failed"]) in_progress_tasks = len([t for t in self.tasks.values() if t.status == "in_progress"]) pending_tasks = len([t for t in self.tasks.values() if t.status == "pending"]) return { 'total_tasks': total_tasks, 'completed_tasks': completed_tasks, 'failed_tasks': failed_tasks, 'in_progress_tasks': in_progress_tasks, 'pending_tasks': pending_tasks, 'progress': completed_tasks / total_tasks if total_tasks > 0 else 0 } # 使用示例 planner = TaskPlanner() # 创建计划 goal = "Read the file config.txt" context = {'file_path': 'config.txt'} plan = planner.create_plan(goal, context) print("Execution plan:") for task in plan: print(f" - {task.name}: {task.description}") # 更新任务状态 planner.update_task_status("validate_file_path", "completed") planner.update_task_status("read_file", "in_progress") # 获取计划状态 status = planner.get_plan_status() print(f"\nPlan status: {status}")
任务规划器特点:
- 根据目标生成任务
- 处理任务依赖
- 拓扑排序
- 状态追踪
工具调度器#
1. 工具调度原理#
python# 示例:工具调度器 用户请求: "实现一个工具调度器" Claude Code 生成的代码: ```python from typing import Dict, List, Any, Optional from datetime import datetime import logging import asyncio logger = logging.getLogger(__name__) class Tool: """工具""" def __init__(self, tool_id: str, name: str, description: str, function: callable): self.id = tool_id self.name = name self.description = description self.function = function self.status = "idle" self.last_used: Optional[datetime] = None self.usage_count = 0 class ToolScheduler: """工具调度器""" def __init__(self, max_concurrent: int = 3): self.tools: Dict[str, Tool] = {} self.task_queue: asyncio.Queue = asyncio.Queue() self.running_tasks: Dict[str, asyncio.Task] = {} self.max_concurrent = max_concurrent self.running = False def register_tool(self, tool: Tool): """注册工具""" self.tools[tool.id] = tool logger.info(f"Tool registered: {tool.id}") def schedule_task(self, tool_id: str, parameters: Dict[str, Any]) -> str: """调度任务""" task_id = f"task_{datetime.utcnow().timestamp()}" task = { 'task_id': task_id, 'tool_id': tool_id, 'parameters': parameters, 'status': 'pending', 'created_at': datetime.utcnow() } asyncio.create_task(self.task_queue.put(task)) logger.info(f"Task scheduled: {task_id}") return task_id async def start(self): """启动调度器""" self.running = True logger.info("Tool scheduler started") # 启动工作线程 for i in range(self.max_concurrent): asyncio.create_task(self._worker(f"worker-{i}")) async def stop(self): """停止调度器""" self.running = False # 等待所有任务完成 await asyncio.gather(*self.running_tasks.values(), return_exceptions=True) logger.info("Tool scheduler stopped") async def _worker(self, worker_name: str): """工作线程""" logger.info(f"{worker_name} started") while self.running: try: # 获取任务 task = await asyncio.wait_for( self.task_queue.get(), timeout=1.0 ) tool_id = task['tool_id'] tool = self.tools.get(tool_id) if not tool: logger.warning(f"Tool not found: {tool_id}") continue # 执行任务 await self._execute_task(worker_name, task, tool) except asyncio.TimeoutError: continue except Exception as e: logger.error(f"{worker_name} error: {e}") logger.info(f"{worker_name} stopped") async def _execute_task(self, worker_name: str, task: Dict[str, Any], tool: Tool): """执行任务""" task_id = task['task_id'] # 更新工具状态 tool.status = "busy" try: # 执行工具函数 result = await self._run_tool_function(tool, task['parameters']) # 更新任务状态 task['status'] = 'completed' task['result'] = result task['completed_at'] = datetime.utcnow() # 更新工具统计 tool.last_used = datetime.utcnow() tool.usage_count += 1 logger.info(f"{worker_name} completed task {task_id}") except Exception as e: # 更新任务状态 task['status'] = 'failed' task['error'] = str(e) task['completed_at'] = datetime.utcnow() logger.error(f"{worker_name} failed task {task_id}: {e}") finally: # 更新工具状态 tool.status = "idle" async def _run_tool_function(self, tool: Tool, parameters: Dict[str, Any]) -> Any: """运行工具函数""" # 检查函数是否是协程函数 if asyncio.iscoroutinefunction(tool.function): return await tool.function(**parameters) else: # 在线程池中运行同步函数 loop = asyncio.get_event_loop() return await loop.run_in_executor(None, tool.function, **parameters) def get_tool_status(self, tool_id: str) -> Optional[Dict[str, Any]]: """获取工具状态""" tool = self.tools.get(tool_id) if not tool: return None return { 'id': tool.id, 'name': tool.name, 'status': tool.status, 'last_used': tool.last_used.isoformat() if tool.last_used else None, 'usage_count': tool.usage_count } def get_all_tool_status(self) -> Dict[str, Dict[str, Any]]: """获取所有工具状态""" return { tool_id: self.get_tool_status(tool_id) for tool_id in self.tools } # 使用示例 async def main(): """主函数""" scheduler = ToolScheduler(max_concurrent=2) # 注册工具 def read_file_tool(file_path: str) -> str: """读取文件工具""" with open(file_path, 'r') as f: return f.read() def write_file_tool(file_path: str, content: str) -> str: """写入文件工具""" with open(file_path, 'w') as f: f.write(content) return f"Written to {file_path}" scheduler.register_tool(Tool("read_file", "Read File", "Read file content", read_file_tool)) scheduler.register_tool(Tool("write_file", "Write File", "Write content to file", write_file_tool)) # 启动调度器 await scheduler.start() # 调度任务 task1 = scheduler.schedule_task("read_file", {'file_path': 'test.txt'}) task2 = scheduler.schedule_task("write_file", {'file_path': 'output.txt', 'content': 'Hello'}) task3 = scheduler.schedule_task("read_file", {'file_path': 'test.txt'}) # 等待任务完成 await asyncio.sleep(2) # 获取工具状态 status = scheduler.get_all_tool_status() print(f"Tool status: {status}") # 停止调度器 await scheduler.stop() if __name__ == '__main__': asyncio.run(main())
工具调度器特点:
- 异步任务调度
- 并发控制
- 工具状态管理
- 使用统计
总结#
核心组件详解包括:
- 意图识别器: 意图识别原理、识别方法实现
- 任务规划器: 任务规划原理、执行计划生成
- 工具调度器: 工具调度原理、异步执行
通过理解这些核心组件,可以更好地掌握Claude Code的工作原理。
在下一节中,我们将探讨数据流与工作流程。