28.5 性能优化

21 分钟阅读

28.5.1 性能优化概述#

Claude Code 的性能优化涉及多个层面,包括:

  1. 响应速度:减少用户请求的响应时间
  2. 资源利用:优化 CPU、内存、磁盘等资源的使用
  3. 并发处理:提高并发请求的处理能力
  4. 缓存策略:有效利用缓存减少重复计算
  5. 算法优化:选择和实现高效的算法

28.5.2 缓存优化#

1. 多级缓存架构#

python
class MultiLevelCache: """多级缓存""" def __init__(self): self.l1_cache: Dict[str, Any] = {} # 内存缓存 self.l2_cache: Dict[str, Any] = {} # 本地缓存 self.l3_cache: Optional[CacheClient] = None # 分布式缓存 self.l1_size = 1000 self.l1_hits = 0 self.l1_misses = 0 self.l2_size = 10000 self.l2_hits = 0 self.l2_misses = 0 def set_l3_cache(self, cache_client: CacheClient): """设置三级缓存""" self.l3_cache = cache_client def get(self, key: str) -> Optional[Any]: """获取缓存值""" # L1 缓存 if key in self.l1_cache: self.l1_hits += 1 return self.l1_cache[key] self.l1_misses += 1 # L2 缓存 if key in self.l2_cache: self.l2_hits += 1 value = self.l2_cache[key] # 提升到 L1 self._promote_to_l1(key, value) return value self.l2_misses += 1 # L3 缓存 if self.l3_cache: value = self.l3_cache.get(key) if value is not None: # 提升到 L2 和 L1 self._promote_to_l2(key, value) self._promote_to_l1(key, value) return value return None def set(self, key: str, value: Any, ttl: int = 3600): """设置缓存值""" # L1 缓存 self._set_l1(key, value) # L2 缓存 self._set_l2(key, value, ttl) # L3 缓存 if self.l3_cache: self.l3_cache.set(key, value, ttl) def _set_l1(self, key: str, value: Any): """设置 L1 缓存""" if len(self.l1_cache) >= self.l1_size: # LRU 淘汰 self._evict_lru(self.l1_cache) self.l1_cache[key] = value def _set_l2(self, key: str, value: Any, ttl: int): """设置 L2 缓存""" if len(self.l2_cache) >= self.l2_size: # LRU 淘汰 self._evict_lru(self.l2_cache) self.l2_cache[key] = { 'value': value, 'expires': time.time() + ttl } def _promote_to_l1(self, key: str, value: Any): """提升到 L1 缓存""" self._set_l1(key, value) def _promote_to_l2(self, key: str, value: Any): """提升到 L2 缓存""" self._set_l2(key, value, 3600) def _evict_lru(self, cache: Dict[str, Any]): """淘汰 LRU 条目""" if not cache: return # 简单实现:删除第一个条目 oldest_key = next(iter(cache)) del cache[oldest_key] def get_stats(self) -> Dict[str, Any]: """获取缓存统计""" l1_hit_rate = self.l1_hits / (self.l1_hits + self.l1_misses) if (self.l1_hits + self.l1_misses) > 0 else 0 l2_hit_rate = self.l2_hits / (self.l2_hits + self.l2_misses) if (self.l2_hits + self.l2_misses) > 0 else 0 return { 'l1': { 'size': len(self.l1_cache), 'hits': self.l1_hits, 'misses': self.l1_misses, 'hit_rate': l1_hit_rate }, 'l2': { 'size': len(self.l2_cache), 'hits': self.l2_hits, 'misses': self.l2_misses, 'hit_rate': l2_hit_rate } }

2. 智能缓存策略#

python
class SmartCache: """智能缓存""" def __init__(self): self.cache: Dict[str, CacheEntry] = {} self.access_patterns: Dict[str, AccessPattern] = {} self.max_size = 10000 def get(self, key: str) -> Optional[Any]: """获取缓存值""" entry = self.cache.get(key) if entry is None: return None # 检查是否过期 if entry.is_expired(): del self.cache[key] return None # 更新访问模式 self._update_access_pattern(key) return entry.value def set(self, key: str, value: Any, ttl: int = None): """设置缓存值""" # 预测 TTL if ttl is None: ttl = self._predict_ttl(key) entry = CacheEntry( key=key, value=value, ttl=ttl, created_at=time.time() ) # 检查缓存大小 if len(self.cache) >= self.max_size: self._evict() self.cache[key] = entry def _update_access_pattern(self, key: str): """更新访问模式""" if key not in self.access_patterns: self.access_patterns[key] = AccessPattern(key) pattern = self.access_patterns[key] pattern.record_access() def _predict_ttl(self, key: str) -> int: """预测 TTL""" pattern = self.access_patterns.get(key) if pattern is None: return 3600 # 默认 1 小时 # 基于访问频率预测 frequency = pattern.get_frequency() if frequency > 10: # 高频访问 return 7200 # 2 小时 elif frequency > 5: # 中频访问 return 3600 # 1 小时 else: # 低频访问 return 1800 # 30 分钟 def _evict(self): """淘汰缓存条目""" # 基于访问模式进行淘汰 scores = {} for key, pattern in self.access_patterns.items(): score = pattern.get_score() scores[key] = score # 淘汰分数最低的条目 if scores: worst_key = min(scores, key=scores.get) if worst_key in self.cache: del self.cache[worst_key] if worst_key in self.access_patterns: del self.access_patterns[worst_key] class CacheEntry: """缓存条目""" def __init__(self, key: str, value: Any, ttl: int, created_at: float): self.key = key self.value = value self.ttl = ttl self.created_at = created_at def is_expired(self) -> bool: """检查是否过期""" if self.ttl is None: return False return time.time() > self.created_at + self.ttl class AccessPattern: """访问模式""" def __init__(self, key: str): self.key = key self.access_times: List[float] = [] self.max_history = 100 def record_access(self): """记录访问""" self.access_times.append(time.time()) # 限制历史记录大小 if len(self.access_times) > self.max_history: self.access_times = self.access_times[-self.max_history:] def get_frequency(self) -> float: """获取访问频率""" if len(self.access_times) < 2: return 0 time_span = self.access_times[-1] - self.access_times[0] if time_span == 0: return len(self.access_times) return len(self.access_times) / time_span def get_score(self) -> float: """获取分数(用于淘汰决策)""" frequency = self.get_frequency() recency = self._get_recency() # 综合频率和最近性 return frequency * 0.7 + recency * 0.3 def _get_recency(self) -> float: """获取最近性""" if not self.access_times: return 0 last_access = self.access_times[-1] time_since_last = time.time() - last_access # 越近访问,分数越高 return 1.0 / (1.0 + time_since_last)

28.5.3 并发优化#

1. 异步任务处理#

python
import asyncio from typing import List, Dict, Optional from datetime import datetime class AsyncTaskProcessor: """异步任务处理器""" def __init__(self, max_workers: int = 4): self.task_queue: asyncio.Queue = asyncio.Queue() self.max_workers = max_workers self.workers: List[asyncio.Task] = [] self.running = False async def start(self): """启动处理器""" self.running = True # 创建工作协程 for i in range(self.max_workers): worker = asyncio.create_task(self._worker(f"worker-{i}")) self.workers.append(worker) async def stop(self): """停止处理器""" self.running = False # 等待所有工作协程完成 await asyncio.gather(*self.workers, return_exceptions=True) async def submit_task(self, task: AsyncTask): """提交任务""" await self.task_queue.put(task) async def _worker(self, worker_name: str): """工作协程""" logger.info(f"{worker_name} started") while self.running: try: # 获取任务 task = await asyncio.wait_for( self.task_queue.get(), timeout=1.0 ) # 执行任务 await self._execute_task(worker_name, task) except asyncio.TimeoutError: continue except Exception as e: logger.error(f"{worker_name} error: {e}") logger.info(f"{worker_name} stopped") async def _execute_task(self, worker_name: str, task: AsyncTask): """执行任务""" task.status = "running" task.started_at = time.time() try: result = await task.execute() task.result = result task.status = "completed" except Exception as e: task.error = str(e) task.status = "failed" finally: task.completed_at = time.time() task.duration = task.completed_at - task.started_at

2. 任务池管理#

python
class TaskPool: """任务池""" def __init__(self, max_size: int = 100): self.max_size = max_size self.tasks: Dict[str, AsyncTask] = {} self.task_queue: asyncio.Queue = asyncio.Queue() self.completed_tasks: List[str] = [] async def add_task(self, task: AsyncTask) -> str: """添加任务""" task_id = task.id # 检查任务池大小 if len(self.tasks) >= self.max_size: # 等待有任务完成 await self._wait_for_completion() self.tasks[task_id] = task await self.task_queue.put(task) return task_id async def get_task_result(self, task_id: str, timeout: float = None) -> Any: """获取任务结果""" task = self.tasks.get(task_id) if not task: raise ValueError(f"Task not found: {task_id}") # 等待任务完成 if task.status != "completed": await self._wait_for_task(task, timeout) if task.status == "failed": raise Exception(task.error) return task.result async def _wait_for_completion(self): """等待任务完成""" while len(self.tasks) >= self.max_size: # 检查是否有完成的任务 completed = [tid for tid, task in self.tasks.items() if task.status in ["completed", "failed"]] if completed: # 清理完成的任务 for tid in completed: del self.tasks[tid] self.completed_tasks.append(tid) break await asyncio.sleep(0.1) async def _wait_for_task(self, task: AsyncTask, timeout: float): """等待任务完成""" start_time = time.time() while task.status not in ["completed", "failed"]: if timeout and (time.time() - start_time) > timeout: raise TimeoutError(f"Task timeout: {task.id}") await asyncio.sleep(0.1)

28.5.4 算法优化#

1. 字符串匹配优化#

python
from typing import Dict, List, Optional from dataclasses import dataclass @dataclass class MatchResult: """匹配结果""" pattern: str start: int end: int text: str class StringMatcher: """字符串匹配器""" def __init__(self): self.patterns: Dict[str, List[str]] = {} self.trie: Trie = Trie() def add_pattern(self, category: str, pattern: str): """添加模式""" if category not in self.patterns: self.patterns[category] = [] self.patterns[category].append(pattern) self.trie.insert(pattern) def match(self, text: str) -> List[MatchResult]: """匹配文本""" results = [] # 使用 Trie 进行快速匹配 matches = self.trie.search(text) for match in matches: result = MatchResult( pattern=match.pattern, start=match.start, end=match.end, text=text[match.start:match.end] ) results.append(result) return results class TrieNode: """Trie 节点""" def __init__(self): self.children: Dict[str, TrieNode] = {} self.is_end = False self.pattern: Optional[str] = None class Trie: """Trie 树""" def __init__(self): self.root = TrieNode() def insert(self, pattern: str): """插入模式""" node = self.root for char in pattern: if char not in node.children: node.children[char] = TrieNode() node = node.children[char] node.is_end = True node.pattern = pattern def search(self, text: str) -> List[Dict[str, Any]]: """搜索文本""" matches = [] for i in range(len(text)): node = self.root j = i while j < len(text) and text[j] in node.children: node = node.children[text[j]] j += 1 if node.is_end: matches.append({ 'pattern': node.pattern, 'start': i, 'end': j }) return matches

2. 向量搜索优化#

python
class VectorSearcher: """向量搜索器""" def __init__(self, dimension: int): self.dimension = dimension self.vectors: Dict[str, np.ndarray] = {} self.index: Optional[faiss.Index] = None def add_vector(self, vector_id: str, vector: np.ndarray): """添加向量""" if len(vector) != self.dimension: raise ValueError("Vector dimension mismatch") self.vectors[vector_id] = vector def build_index(self): """构建索引""" if not self.vectors: return # 创建 FAISS 索引 self.index = faiss.IndexFlatL2(self.dimension) # 添加向量 vectors = np.array(list(self.vectors.values())) self.index.add(vectors) def search(self, query: np.ndarray, k: int = 10) -> List[SearchResult]: """搜索相似向量""" if not self.index: self.build_index() # 搜索 distances, indices = self.index.search( query.reshape(1, -1), k ) # 构建结果 results = [] vector_ids = list(self.vectors.keys()) for i, idx in enumerate(indices[0]): if idx >= 0 and idx < len(vector_ids): result = SearchResult( vector_id=vector_ids[idx], distance=float(distances[0][i]), similarity=1.0 / (1.0 + float(distances[0][i])) ) results.append(result) return results

28.5.5 资源优化#

1. 内存管理#

python
from typing import Dict, Any import time class MemoryAllocation: """内存分配""" def __init__(self, id: str, size: int, allocated_at: float): self.id = id self.size = size self.allocated_at = allocated_at class MemoryManager: """内存管理器""" def __init__(self, max_memory: int = 1024 * 1024 * 1024): # 1GB self.max_memory = max_memory self.used_memory = 0 self.allocations: Dict[str, MemoryAllocation] = {} def allocate(self, allocation_id: str, size: int) -> bool: """分配内存""" if self.used_memory + size > self.max_memory: # 尝试释放一些内存 if not self._free_memory(size): return False allocation = MemoryAllocation( id=allocation_id, size=size, allocated_at=time.time() ) self.allocations[allocation_id] = allocation self.used_memory += size return True def deallocate(self, allocation_id: str): """释放内存""" allocation = self.allocations.get(allocation_id) if allocation: self.used_memory -= allocation.size del self.allocations[allocation_id] def _free_memory(self, required_size: int) -> bool: """释放内存""" # 按使用时间排序 sorted_allocations = sorted( self.allocations.values(), key=lambda x: x.allocated_at ) freed = 0 for allocation in sorted_allocations: self.deallocate(allocation.id) freed += allocation.size if freed >= required_size: return True return False def get_stats(self) -> Dict[str, Any]: """获取统计信息""" return { 'max_memory': self.max_memory, 'used_memory': self.used_memory, 'free_memory': self.max_memory - self.used_memory, 'utilization': self.used_memory / self.max_memory, 'num_allocations': len(self.allocations) }

2. 连接池管理#

python
class ConnectionPool: """连接池""" def __init__(self, factory: callable, max_size: int = 10): self.factory = factory self.max_size = max_size self.pool: List[Any] = [] self.in_use: Set[Any] = set() def acquire(self, timeout: float = 30.0) -> Any: """获取连接""" start_time = time.time() while True: # 检查是否有可用连接 if self.pool: connection = self.pool.pop() self.in_use.add(connection) return connection # 检查是否可以创建新连接 if len(self.in_use) < self.max_size: connection = self.factory() self.in_use.add(connection) return connection # 等待连接释放 if time.time() - start_time > timeout: raise TimeoutError("Failed to acquire connection") time.sleep(0.1) def release(self, connection: Any): """释放连接""" if connection in self.in_use: self.in_use.remove(connection) self.pool.append(connection) def close_all(self): """关闭所有连接""" for connection in self.pool: try: connection.close() except Exception as e: logger.error(f"Error closing connection: {e}") for connection in self.in_use: try: connection.close() except Exception as e: logger.error(f"Error closing connection: {e}") self.pool.clear() self.in_use.clear()

28.5.6 性能监控#

1. 性能指标收集#

python
import time import numpy as np class PerformanceMonitor: """性能监控器""" def __init__(self): self.metrics: Dict[str, List[float]] = {} self.counters: Dict[str, int] = {} self.timers: Dict[str, float] = {} def record_metric(self, name: str, value: float): """记录指标""" if name not in self.metrics: self.metrics[name] = [] self.metrics[name].append(value) def increment_counter(self, name: str, value: int = 1): """增加计数器""" self.counters[name] = self.counters.get(name, 0) + value def start_timer(self, name: str): """启动计时器""" self.timers[name] = time.time() def stop_timer(self, name: str) -> float: """停止计时器""" if name not in self.timers: raise ValueError(f"Timer not found: {name}") duration = time.time() - self.timers[name] del self.timers[name] self.record_metric(f"{name}_duration", duration) return duration def get_summary(self) -> Dict[str, Any]: """获取摘要""" summary = { 'metrics': {}, 'counters': self.counters.copy() } for name, values in self.metrics.items(): summary['metrics'][name] = { 'count': len(values), 'mean': sum(values) / len(values), 'min': min(values), 'max': max(values), 'p50': np.percentile(values, 50), 'p95': np.percentile(values, 95), 'p99': np.percentile(values, 99) } return summary

2. 性能分析器#

python
class PerformanceProfiler: """性能分析器""" def __init__(self): self.call_stack: List[ProfileEntry] = [] self.entries: List[ProfileEntry] = [] def enter(self, function_name: str): """进入函数""" entry = ProfileEntry( function_name=function_name, start_time=time.time(), parent=self.call_stack[-1] if self.call_stack else None ) self.call_stack.append(entry) self.entries.append(entry) def exit(self): """退出函数""" if not self.call_stack: return entry = self.call_stack.pop() entry.end_time = time.time() entry.duration = entry.end_time - entry.start_time def get_report(self) -> str: """获取报告""" # 按函数名分组 function_stats: Dict[str, Dict[str, Any]] = {} for entry in self.entries: if entry.function_name not in function_stats: function_stats[entry.function_name] = { 'count': 0, 'total_duration': 0.0, 'min_duration': float('inf'), 'max_duration': 0.0 } stats = function_stats[entry.function_name] stats['count'] += 1 stats['total_duration'] += entry.duration stats['min_duration'] = min(stats['min_duration'], entry.duration) stats['max_duration'] = max(stats['max_duration'], entry.duration) # 生成报告 report = [] report.append("Performance Profile Report") report.append("=" * 50) report.append("") for function_name, stats in sorted( function_stats.items(), key=lambda x: x[1]['total_duration'], reverse=True ): report.append(f"Function: {function_name}") report.append(f" Calls: {stats['count']}") report.append(f" Total Time: {stats['total_duration']:.4f}s") report.append(f" Avg Time: {stats['total_duration'] / stats['count']:.4f}s") report.append(f" Min Time: {stats['min_duration']:.4f}s") report.append(f" Max Time: {stats['max_duration']:.4f}s") report.append("") return "\n".join(report) class ProfileEntry: """性能分析条目""" def __init__(self, function_name: str, start_time: float, parent: Optional['ProfileEntry']): self.function_name = function_name self.start_time = start_time self.end_time = None self.duration = None self.parent = parent

28.5.7 性能优化最佳实践#

1. 优化原则#

  1. 测量优先:先测量再优化,避免过早优化
  2. 热点优化:专注于性能瓶颈和热点代码
  3. 渐进优化:逐步优化,每次优化后验证效果
  4. 权衡考虑:在性能、可读性、可维护性之间权衡
  5. 持续监控:建立持续的性能监控机制

2. 优化策略#

  1. 缓存优先:有效利用缓存减少重复计算
  2. 异步处理:使用异步处理提高并发能力
  3. 算法优化:选择合适的算法和数据结构
  4. 资源管理:合理管理内存、连接等资源
  5. 批量处理:批量处理减少开销

3. 监控建议#

  1. 关键指标:监控响应时间、吞吐量、资源使用率
  2. 告警机制:设置合理的告警阈值
  3. 趋势分析:分析性能趋势,提前发现问题
  4. 定期审查:定期审查性能指标和优化效果
  5. 文档记录:记录优化过程和经验教训

通过系统的性能优化,Claude Code 可以提供更快速、更高效的编程辅助体验。

标记本节教程为已读

记录您的学习进度,方便后续查看。