27.4 记忆系统设计

17 分钟阅读

记忆系统概述#

记忆系统是Agentic AI系统的重要组成部分,它使AI能够存储、检索和利用过去的信息,从而实现长期学习和持续改进。

记忆系统的基本概念#

1. 什么是记忆系统#

记忆系统是指AI系统用于存储、组织和检索信息的机制,类似于人类的记忆功能。

记忆系统的特点:

  • 持久化存储: 长期保存重要信息
  • 快速检索: 高效地检索相关信息
  • 智能组织: 自动组织和分类信息
  • 上下文感知: 根据上下文检索信息

2. 记忆类型#

记忆类型特点存储时间容量
感知记忆短暂存储原始输入毫秒到秒有限
短期记忆存储当前上下文秒到分钟有限
长期记忆存储重要知识和经验永久无限
工作记忆处理当前任务任务期间有限

记忆存储机制#

1. 向量存储#

示例:向量存储

用户请求: "实现一个基于向量的记忆存储系统"

Claude Code 生成的代码:

python
````python from typing import Dict, List, Any, Optional import numpy as np from datetime import datetime import json import logging logger = logging.getLogger(__name__) class Memory: """记忆""" def __init__(self, id: str, content: str, embedding: np.ndarray, metadata: Optional[Dict[str, Any]] = None): self.id = id self.content = content self.embedding = embedding self.metadata = metadata or {} self.created_at = datetime.utcnow() self.accessed_at = datetime.utcnow() self.access_count = 0 def to_dict(self) -> Dict[str, Any]: """转换为字典""" return { 'id': self.id, 'content': self.content, 'embedding': self.embedding.tolist(), 'metadata': self.metadata, 'created_at': self.created_at.isoformat(), 'accessed_at': self.accessed_at.isoformat(), 'access_count': self.access_count } class VectorMemoryStore: """向量记忆存储""" def __init__(self, embedding_dim: int = 768): self.embedding_dim = embedding_dim self.memories: Dict[str, Memory] = {} self.index: Dict[str, List[str]] = {} def add_memory(self, content: str, embedding: np.ndarray, metadata: Optional[Dict[str, Any]] = None) -> Memory: """添加记忆""" memory_id = f"mem_{len(self.memories)}" memory = Memory(memory_id, content, embedding, metadata) self.memories[memory_id] = memory # 建立索引 self._index_memory(memory) logger.info(f"Memory added: {memory_id}") return memory def _index_memory(self, memory: Memory): """索引记忆""" # 按类型索引 memory_type = memory.metadata.get('type', 'default') if memory_type not in self.index: self.index[memory_type] = [] self.index[memory_type].append(memory.id) # 按标签索引 tags = memory.metadata.get('tags', []) for tag in tags: if tag not in self.index: self.index[tag] = [] self.index[tag].append(memory.id) def retrieve_similar(self, query_embedding: np.ndarray, top_k: int = 5, threshold: float = 0.7) -> List[Memory]: """检索相似记忆""" if not self.memories: return [] # 计算相似度 similarities = [] for memory_id, memory in self.memories.items(): similarity = self._cosine_similarity(query_embedding, memory.embedding) if similarity >= threshold: similarities.append((memory_id, similarity)) # 按相似度排序 similarities.sort(key=lambda x: x[1], reverse=True) # 获取top_k个记忆 top_memories = [] for memory_id, similarity in similarities[:top_k]: memory = self.memories[memory_id] memory.accessed_at = datetime.utcnow() memory.access_count += 1 top_memories.append(memory) return top_memories def _cosine_similarity(self, embedding1: np.ndarray, embedding2: np.ndarray) -> float: """计算余弦相似度""" dot_product = np.dot(embedding1, embedding2) norm1 = np.linalg.norm(embedding1) norm2 = np.linalg.norm(embedding2) if norm1 == 0 or norm2 == 0: return 0.0 return dot_product / (norm1 * norm2) def retrieve_by_type(self, memory_type: str, limit: int = 10) -> List[Memory]: """按类型检索记忆""" memory_ids = self.index.get(memory_type, []) memories = [self.memories[mid] for mid in memory_ids[:limit]] return memories def retrieve_by_tag(self, tag: str, limit: int = 10) -> List[Memory]: """按标签检索记忆""" memory_ids = self.index.get(tag, []) memories = [self.memories[mid] for mid in memory_ids[:limit]] return memories def update_memory(self, memory_id: str, content: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None): """更新记忆""" memory = self.memories.get(memory_id) if not memory: raise ValueError(f"Memory not found: {memory_id}") if content: memory.content = content if metadata: memory.metadata.update(metadata) logger.info(f"Memory updated: {memory_id}") def delete_memory(self, memory_id: str): """删除记忆""" if memory_id in self.memories: # 删除索引 memory = self.memories[memory_id] memory_type = memory.metadata.get('type', 'default') if memory_type in self.index and memory_id in self.index[memory_type]: self.index[memory_type].remove(memory_id) tags = memory.metadata.get('tags', []) for tag in tags: if tag in self.index and memory_id in self.index[tag]: self.index[tag].remove(memory_id) # 删除记忆 del self.memories[memory_id] logger.info(f"Memory deleted: {memory_id}")

def get_memory(self, memory_id: str) -> Optional[Memory]: """获取记忆""" memory = self.memories.get(memory_id) if memory: memory.accessed_at = datetime.utcnow() memory.access_count += 1

return memory

def get_statistics(self) -> Dict[str, Any]: """获取统计信息""" total_memories = len(self.memories) total_accesses = sum(m.access_count for m in self.memories.values())

type_counts = {} for memory in self.memories.values(): memory_type = memory.metadata.get('type', 'default') type_counts[memory_type] = type_counts.get(memory_type, 0) + 1

return { 'total_memories': total_memories, 'total_accesses': total_accesses, 'type_counts': type_counts, 'index_size': len(self.index) }

使用示例

store = VectorMemoryStore()

添加记忆

memories = [ ("The quick brown fox jumps over the lazy dog", np.random.rand(768), {'type': 'fact', 'tags': ['animal', 'action']}), ("Python is a high-level programming language", np.random.rand(768), {'type': 'knowledge', 'tags': ['programming', 'language']}), ("The capital of France is Paris", np.random.rand(768), {'type': 'fact', 'tags': ['geography', 'city']}), ("Machine learning is a subset of AI", np.random.rand(768), {'type': 'knowledge', 'tags': ['AI', 'technology']}), ]

for content, embedding, metadata in memories: store.add_memory(content, embedding, metadata)

检索相似记忆

query_embedding = np.random.rand(768) similar_memories = store.retrieve_similar(query_embedding, top_k=3)

print("Similar memories:") for memory in similar_memories: print(f" - {memory.content}")

按类型检索

fact_memories = store.retrieve_by_type('fact')

print("\nFact memories:") for memory in fact_memories: print(f" - {memory.content}")

获取统计信息

stats = store.get_statistics() print(f"\nStatistics: {stats}")

bash
> - 使用向量表示记忆内容
> - 基于相似度检索
> - 支持多种索引方式
> - 记录访问统计

2. 层次化存储#

示例:层次化存储

用户请求: "实现一个层次化的记忆存储系统" Claude Code 生成的代码:

python
`python from typing import Dict, List, Any, Optional from datetime import datetime, timedelta import json import logging logger = logging.getLogger(__name__) class MemoryLevel(Enum): """记忆层级""" WORKING = "working" SHORT_TERM = "short_term" LONG_TERM = "long_term" class HierarchicalMemoryStore: """层次化记忆存储""" def __init__(self): self.working_memory: Dict[str, Memory] = {} self.short_term_memory: Dict[str, Memory] = {} self.long_term_memory: Dict[str, Memory] = {} # 配置 self.working_memory_limit = 10 self.short_term_memory_limit = 100 self.short_term_memory_ttl = timedelta(hours=24) def add_memory(self, content: str, embedding: np.ndarray, level: MemoryLevel = MemoryLevel.SHORT_TERM, metadata: Optional[Dict[str, Any]] = None) -> Memory: """添加记忆""" memory_id = f"mem_{datetime.utcnow().timestamp()}" memory = Memory(memory_id, content, embedding, metadata) # 根据层级存储 if level == MemoryLevel.WORKING: self._add_to_working_memory(memory) elif level == MemoryLevel.SHORT_TERM: self._add_to_short_term_memory(memory) elif level == MemoryLevel.LONG_TERM: self._add_to_long_term_memory(memory) logger.info(f"Memory added to {level.value}: {memory_id}") return memory def _add_to_working_memory(self, memory: Memory): """添加到工作记忆""" # 检查容量 if len(self.working_memory) >= self.working_memory_limit: # 移除最旧的 oldest_id = min(self.working_memory.keys(), key=lambda k: self.working_memory[k].created_at) self._evict_from_working_memory(oldest_id) self.working_memory[memory.id] = memory def _add_to_short_term_memory(self, memory: Memory): """添加到短期记忆""" # 检查容量 if len(self.short_term_memory) >= self.short_term_memory_limit: # 移除最旧的 oldest_id = min(self.short_term_memory.keys(), key=lambda k: self.short_term_memory[k].created_at) self._evict_from_short_term_memory(oldest_id) self.short_term_memory[memory.id] = memory def _add_to_long_term_memory(self, memory: Memory): """添加到长期记忆""" self.long_term_memory[memory.id] = memory def _evict_from_working_memory(self, memory_id: str): """从工作记忆中驱逐""" memory = self.working_memory.pop(memory_id) # 移动到短期记忆 self._add_to_short_term_memory(memory) logger.info(f"Memory evicted from working memory: {memory_id}") def _evict_from_short_term_memory(self, memory_id: str): """从短期记忆中驱逐""" memory = self.short_term_memory.pop(memory_id) # 移动到长期记忆 self._add_to_long_term_memory(memory) logger.info(f"Memory evicted from short-term memory: {memory_id}") def retrieve(self, query_embedding: np.ndarray, top_k: int = 5) -> List[Memory]: """检索记忆""" # 从所有层级检索 all_memories = { **self.working_memory, **self.short_term_memory, **self.long_term_memory } # 计算相似度 similarities = [] for memory_id, memory in all_memories.items(): similarity = self._cosine_similarity(query_embedding, memory.embedding) similarities.append((memory, similarity)) # 按相似度排序 similarities.sort(key=lambda x: x[1], reverse=True) # 获取top_k个记忆 top_memories = [memory for memory, similarity in similarities[:top_k]] return top_memories def _cosine_similarity(self, embedding1: np.ndarray, embedding2: np.ndarray) -> float: """计算余弦相似度""" dot_product = np.dot(embedding1, embedding2) norm1 = np.linalg.norm(embedding1) norm2 = np.linalg.norm(embedding2) if norm1 == 0 or norm2 == 0: return 0.0 return dot_product / (norm1 * norm2) def promote_memory(self, memory_id: str, target_level: MemoryLevel): """提升记忆层级""" memory = None source_level = None # 查找记忆 if memory_id in self.working_memory: memory = self.working_memory.pop(memory_id) source_level = MemoryLevel.WORKING elif memory_id in self.short_term_memory: memory = self.short_term_memory.pop(memory_id) source_level = MemoryLevel.SHORT_TERM elif memory_id in self.long_term_memory: memory = self.long_term_memory.pop(memory_id) source_level = MemoryLevel.LONG_TERM if not memory: raise ValueError(f"Memory not found: {memory_id}") # 移动到目标层级 if target_level == MemoryLevel.WORKING: self._add_to_working_memory(memory) elif target_level == MemoryLevel.SHORT_TERM: self._add_to_short_term_memory(memory) elif target_level == MemoryLevel.LONG_TERM: self._add_to_long_term_memory(memory) logger.info(f"Memory promoted from {source_level.value} to {target_level.value}: {memory_id}") def cleanup_expired(self): """清理过期记忆""" current_time = datetime.utcnow() expired_ids = [] # 检查短期记忆 for memory_id, memory in self.short_term_memory.items(): if current_time - memory.created_at > self.short_term_memory_ttl: expired_ids.append(memory_id) # 移除过期记忆 for memory_id in expired_ids: memory = self.short_term_memory.pop(memory_id) # 移动到长期记忆 self._add_to_long_term_memory(memory) logger.info(f"Expired memory moved to long-term: {memory_id}") def get_statistics(self) -> Dict[str, Any]: """获取统计信息""" return { 'working_memory_size': len(self.working_memory), 'short_term_memory_size': len(self.short_term_memory), 'long_term_memory_size': len(self.long_term_memory), 'total_memories': len(self.working_memory) + len(self.short_term_memory) + len(self.long_term_memory) } # 使用示例 store = HierarchicalMemoryStore() # 添加记忆到不同层级 store.add_memory("Current task context", np.random.rand(768), MemoryLevel.WORKING, {'type': 'context'}) store.add_memory("Recent conversation", np.random.rand(768), MemoryLevel.SHORT_TERM, {'type': 'conversation'}) store.add_memory("Important knowledge", np.random.rand(768), MemoryLevel.LONG_TERM, {'type': 'knowledge'}) # 获取统计信息 stats = store.get_statistics() print(f"Statistics: {stats}") # 检索记忆 query_embedding = np.random.rand(768) memories = store.retrieve(query_embedding, top_k=3) print("\nRetrieved memories:") for memory in memories: print(f" - {memory.content}") # 提升记忆层级 if store.short_term_memory: memory_id = list(store.short_term_memory.keys())[0] store.promote_memory(memory_id, MemoryLevel.LONG_TERM) ```> **层次化存储特点**: > - 分层存储不同重要性的记忆 > - 自动驱逐和提升记忆 > - 限制各层容量 > - 定期清理过期记忆 ``` ## 记忆检索优化 ### 1. 上下文感知检索 # 示例:上下文感知检索 用户请求: "实现上下文感知的记忆检索" Claude Code 生成的代码: ````python `python from typing import Dict, List, Any, Optional from datetime import datetime import logging logger = logging.getLogger(__name__) class ContextAwareMemoryStore(VectorMemoryStore): """上下文感知记忆存储""" def __init__(self, embedding_dim: int = 768): super().__init__(embedding_dim) self.context_history: List[Dict[str, Any]] = [] self.context_window_size = 10 def add_context(self, context: str, embedding: np.ndarray, metadata: Optional[Dict[str, Any]] = None): """添加上下文""" context_entry = { 'content': context, 'embedding': embedding, 'metadata': metadata or {}, 'timestamp': datetime.utcnow() } self.context_history.append(context_entry) # 限制上下文历史大小 if len(self.context_history) > self.context_window_size: self.context_history.pop(0) logger.info(f"Context added: {context[:50]}...") def retrieve_with_context(self, query_embedding: np.ndarray, top_k: int = 5) -> List[Dict[str, Any]]: """带上下文的检索""" # 获取当前上下文 current_context = self._get_current_context() # 结合上下文和查询 context_enhanced_embedding = self._combine_context_and_query(current_context, query_embedding) # 检索记忆 memories = self.retrieve_similar(context_enhanced_embedding, top_k) # 添加上下文信息 results = [] for memory in memories: results.append({ 'memory': memory, 'context_relevance': self._calculate_context_relevance(memory, current_context), 'query_relevance': self._cosine_similarity(query_embedding, memory.embedding) }) # 按综合相关性排序 results.sort(key=lambda x: 0.7 * x['query_relevance'] + 0.3 * x['context_relevance'], reverse=True) return results def _get_current_context(self) -> List[Dict[str, Any]]: """获取当前上下文""" return self.context_history[-self.context_window_size:] def _combine_context_and_query(self, context: List[Dict[str, Any]], query_embedding: np.ndarray) -> np.ndarray: """结合上下文和查询""" if not context: return query_embedding # 计算上下文的平均嵌入 context_embeddings = [c['embedding'] for c in context] avg_context_embedding = np.mean(context_embeddings, axis=0) # 加权组合 combined = 0.7 * query_embedding + 0.3 * avg_context_embedding return combined def _calculate_context_relevance(self, memory: Memory, context: List[Dict[str, Any]]) -> float: """计算上下文相关性""" if not context: return 0.0 # 计算记忆与上下文的平均相似度 similarities = [] for ctx in context: similarity = self._cosine_similarity(memory.embedding, ctx['embedding']) similarities.append(similarity) return np.mean(similarities) def retrieve_temporal(self, time_range: timedelta, top_k: int = 10) -> List[Memory]: """按时间范围检索""" current_time = datetime.utcnow() cutoff_time = current_time - time_range # 筛选时间范围内的记忆 recent_memories = [ memory for memory in self.memories.values() if memory.created_at >= cutoff_time ] # 按创建时间排序 recent_memories.sort(key=lambda m: m.created_at, reverse=True) return recent_memories[:top_k] # 使用示例 store = ContextAwareMemoryStore() # 添加上下文 contexts = [ ("User is asking about Python programming", np.random.rand(768)), ("User wants to learn about data structures", np.random.rand(768)), ("User is interested in algorithms", np.random.rand(768)), ] for content, embedding in contexts: store.add_context(content, embedding) # 添加记忆 memories = [ ("Python lists are mutable sequences", np.random.rand(768), {'type': 'knowledge', 'topic': 'python'}), ("Dictionaries are key-value pairs", np.random.rand(768), {'type': 'knowledge', 'topic': 'python'}), ("Binary search has O(log n) complexity", np.random.rand(768), {'type': 'knowledge', 'topic': 'algorithms'}), ] for content, embedding, metadata in memories: store.add_memory(content, embedding, metadata) # 带上下文的检索 query_embedding = np.random.rand(768) results = store.retrieve_with_context(query_embedding, top_k=3) print("Retrieved with context:") for result in results: print(f" - {result['memory'].content}") print(f" Query relevance: {result['query_relevance']:.3f}") print(f" Context relevance: {result['context_relevance']:.3f}") ```> **上下文感知检索特点**: > - 维护上下文历史 > - 结合上下文和查询 > - 计算上下文相关性 > - 按时间范围检索 ``` ## 总结 记忆系统设计包括: 1. **记忆系统的基本概念**: 什么是记忆系统、记忆类型 2. **记忆存储机制**: 向量存储、层次化存储 3. **记忆检索优化**: 上下文感知检索 通过记忆系统,Claude Code能够存储和检索重要信息,实现长期学习和持续改进。 在下一节中,我们将探讨多智能体协作。 ```

标记本节教程为已读

记录您的学习进度,方便后续查看。