15.3 Skills 与主代理的交互

14 分钟阅读

交互机制概述#

Skills 与主代理的交互是 Claude Code 系统的核心。主代理负责协调和管理 Skills,而 Skills 则提供具体的功能实现。本节将深入探讨两者之间的交互机制。

交互模式#

1. 主动调用模式#

1.1 调用流程

主动调用流程#

步骤 1:任务识别#

主代理接收用户请求,分析任务类型和需求

步骤 2:Skill 选择#

根据任务需求,从可用 Skills 中选择最合适的 Skill

步骤 3:参数准备#

准备 Skill 需要的参数和上下文信息

步骤 4:Skill 调用#

调用选定的 Skill,传递参数和上下文

步骤 5:结果处理#

接收 Skill 的执行结果,进行必要的处理和整合

步骤 6:响应生成#

基于 Skill 的结果,生成最终的响应返回给用户

1.2 代码示例

bash
python

class MainAgent:
    def __init__(self):
        self.skills = load_skills()
        self.context_manager = ContextManager()

    def process_request(self, user_request):
        # 1. 任务识别
        task = self.analyze_task(user_request)

        # 2. Skill 选择
        skill = self.select_skill(task)

        # 3. 参数准备
        context = self.context_manager.collect_context(skill, task)
        parameters = self.prepare_parameters(task, context)

        # 4. Skill 调用
        result = skill.execute(parameters, context)

        # 5. 结果处理
        processed_result = self.process_result(result, context)

        # 6. 响应生成
        response = self.generate_response(processed_result)

        return response

### 2. 被动调用模式

#### 2.1 调用流程

## 被动调用流程
### 步骤 1:用户指定
用户明确指定要使用的 Skill
### 步骤 2:参数验证
验证用户提供的参数是否有效
### 步骤 3:上下文收集
收集 Skill 需要的上下文信息
### 步骤 4:Skill 执行
执行指定的 Skill
### 步骤 5:结果返回
直接返回 Skill 的执行结果

2.2 代码示例

bash
python

class MainAgent:
    def execute_skill(self, skill_name, user_parameters):
        # 1. 验证 Skill 存在
        if skill_name not in self.skills:
            raise SkillNotFoundError(skill_name)

        skill = self.skills[skill_name]

        # 2. 参数验证
        validated_params = skill.validate_parameters(user_parameters)

        # 3. 上下文收集
        context = self.context_manager.collect_context(skill, validated_params)

        # 4. Skill 执行
        result = skill.execute(validated_params, context)

        # 5. 结果返回
        return result

### 3. 嵌套调用模式

#### 3.1 调用流程

## 嵌套调用流程
### 示例场景:部署应用
### 调用层次
主代理
└─> 部署 Skill
├─> 测试 Skill
│   └─> 代码分析 Skill
│       └─> 文档检查 Skill
├─> 构建 Skill
│   └─> 依赖检查 Skill
└─> 验证 Skill
└─> 健康检查 Skill
### 执行流程

3.2 代码示例

bash
python

class DeploymentSkill(Skill):
    def execute(self, parameters, context):
        # 调用测试 Skill
        test_result = self.call_skill("test", context)

        if not test_result.success:
            return DeploymentResult(success=False, error="Tests failed")

        # 调用构建 Skill
        build_result = self.call_skill("build", context)

        if not build_result.success:
            return DeploymentResult(success=False, error="Build failed")

        # 执行部署
        deploy_result = self.deploy(build_result.artifact)

        # 调用验证 Skill
        verify_result = self.call_skill("verify", context)

        return DeploymentResult(
            success=verify_result.success,
            deploy_result=deploy_result,
            verify_result=verify_result
        )

## 通信机制

### 1. 消息传递

#### 1.1 消息格式

## 消息格式
### 请求消息
~~~`json
`json

{
"message_id": "msg_123456",
"timestamp": "2024-01-15T10:30:00Z",
"type": "skill_request",
"skill_name": "code-review",
"parameters": {
"file": "src/main.py",
"strict": true

}, "context": { "project": {...}, "code": {...}, "user": {...} } }

bash
~~~
json

{
  "message_id": "msg_123456",
  "timestamp": "2024-01-15T10:30:15Z",
  "type": "skill_response",
  "status": "success",
  "result": {
    "issues": [...],
    "summary": {...}
  },
  "metadata": {
    "execution_time": 15.2,
    "memory_used": "256MB"
  }
}

### 错误消息
~~~`json
````json

{
"message_id": "msg_123456",
"timestamp": "2024-01-15T10:30:10Z",
"type": "skill_error",
"error": {
"code": "FILE_NOT_FOUND",
"message": "File src/main.py not found",
"details": {...}
}
}

```> >
~~~
#### 1.2 消息队列

class MessageQueue:
def __init__(self):
self.queue = asyncio.Queue()
self.handlers = {}
async def send(self, message):
await self.queue.put(message)
async def receive(self):
return await self.queue.get()
def register_handler(self, message_type, handler):
self.handlers[message_type] = handler
async def process_messages(self):
while True:
message = await self.receive()
handler = self.handlers.get(message.type)
if handler:
await handler(message)

~~~
### 2. 事件驱动
#### 2.1 事件类型
~~~
markdown

## 事件类型

### Skill 事件
- skill_started: Skill 开始执行
- skill_progress: Skill 执行进度更新
- skill_completed: Skill 执行完成
- skill_failed: Skill 执行失败

### 上下文事件
- context_updated: 上下文更新
- context_invalidated: 上下文失效

### 工具事件
- tool_called: 工具被调用
- tool_completed: 工具执行完成
- tool_failed: 工具执行失败

#### 2.2 事件处理

class EventHandler:
def __init__(self):
self.listeners = defaultdict(list)
def on(self, event_type, callback):
self.listeners[event_type].append(callback)
async def emit(self, event_type, data):
for callback in self.listeners.get(event_type, []):
await callback(data)
async def handle_skill_started(self, event):
print(f"Skill {event.skill_name} started")
async def handle_skill_progress(self, event):
print(f"Progress: {event.progress}%")
async def handle_skill_completed(self, event):
print(f"Skill {event.skill_name} completed")

~~~
### 3. 流式通信
#### 3.1 流式输出
~~~
python

class StreamingSkill(Skill):
    async def execute_stream(self, parameters, context):
        # 步骤 1
        yield {"step": 1, "message": "Analyzing code..."}
        result1 = await self.analyze_code(parameters, context)

        # 步骤 2
        yield {"step": 2, "message": "Checking security..."}
        result2 = await self.check_security(result1, context)

        # 步骤 3
        yield {"step": 3, "message": "Generating report..."}
        result3 = await self.generate_report(result2, context)

        # 最终结果
        yield {"step": 4, "message": "Completed", "result": result3}

#### 3.2 流式消费

async def consume_stream(skill, parameters, context):
async for chunk in skill.execute_stream(parameters, context):
if "message" in chunk:
print(chunk["message"])
if "result" in chunk:
return chunk["result"]

~~~
## 状态管理
### 1. 执行状态
#### 1.1 状态类型
~~~
markdown

## 执行状态

### 状态定义
- PENDING: 等待执行
- RUNNING: 正在执行
- PAUSED: 已暂停
- COMPLETED: 已完成
- FAILED: 执行失败
- CANCELLED: 已取消

### 状态转换
PENDING → RUNNING → COMPLETED
PENDING → RUNNING → FAILED
RUNNING → PAUSED → RUNNING
RUNNING → CANCELLED

#### 1.2 状态管理

class ExecutionState:
def __init__(self):
self.state = "PENDING"
self.start_time = None
self.end_time = None
self.progress = 0
self.error = None
def start(self):
self.state = "RUNNING"
self.start_time = datetime.now()
def complete(self):
self.state = "COMPLETED"
self.end_time = datetime.now()
def fail(self, error):
self.state = "FAILED"
self.error = error
self.end_time = datetime.now()
def update_progress(self, progress):
self.progress = progress
def get_duration(self):
if self.start_time and self.end_time:
return (self.end_time - self.start_time).total_seconds()
return None

~~~
### 2. 上下文状态
#### 2.1 上下文快照
~~~
python

class ContextSnapshot:
    def __init__(self, context):
        self.timestamp = datetime.now()
        self.context = copy.deepcopy(context)
        self.version = self.generate_version()

    def generate_version(self):
        return hashlib.md5(
            json.dumps(self.context, sort_keys=True).encode()
        ).hexdigest()

    def compare(self, other_snapshot):
        return self.version == other_snapshot.version

#### 2.2 上下文恢复

class ContextManager:
def __init__(self):
self.snapshots = []
self.current_context = {}
def create_snapshot(self):
snapshot = ContextSnapshot(self.current_context)
self.snapshots.append(snapshot)
return snapshot
def restore_snapshot(self, snapshot):
self.current_context = copy.deepcopy(snapshot.context)
def rollback_to(self, version):
for snapshot in reversed(self.snapshots):
if snapshot.version == version:
self.restore_snapshot(snapshot)
return True
return False

~~~
### 3. 会话状态
#### 3.1 会话管理
~~~
python

class SessionManager:
    def __init__(self):
        self.sessions = {}
        self.current_session_id = None

    def create_session(self):
        session_id = generate_id()
        self.sessions[session_id] = {
            "id": session_id,
            "created_at": datetime.now(),
            "context": {},
            "history": [],
            "state": "ACTIVE"
        }
        self.current_session_id = session_id
        return session_id

    def get_session(self, session_id):
        return self.sessions.get(session_id)

    def update_session(self, session_id, updates):
        if session_id in self.sessions:
            self.sessions[session_id].update(updates)

    def close_session(self, session_id):
        if session_id in self.sessions:
            self.sessions[session_id]["state"] = "CLOSED"
            self.sessions[session_id]["closed_at"] = datetime.now()

## 错误处理

### 1. 错误传播

#### 1.1 错误类型

## 错误类型
### Skill 错误
- SkillNotFoundError: Skill 不存在
- SkillExecutionError: Skill 执行失败
- SkillTimeoutError: Skill 执行超时
### 参数错误
- ParameterValidationError: 参数验证失败
- MissingParameterError: 缺少必需参数
- InvalidParameterError: 参数值无效
### 上下文错误
- ContextNotFoundError: 上下文不存在
- ContextInvalidError: 上下文无效
- ContextTimeoutError: 上下文获取超时

~~~
#### 1.2 错误处理策略
~~~
python

class ErrorHandler:
    def __init__(self):
        self.retries = {}
        self.fallbacks = {}

    def handle_error(self, error, context):
        error_type = type(error).__name__

        # 检查是否应该重试
        if self.should_retry(error_type):
            return self.retry(error, context)

        # 检查是否有回退方案
        if self.has_fallback(error_type):
            return self.fallback(error, context)

        # 否则抛出错误
        raise error

    def should_retry(self, error_type):
        return error_type in self.retries

    def retry(self, error, context):
        retry_config = self.retries[type(error).__name__]
        max_attempts = retry_config.get("max_attempts", 3)
        delay = retry_config.get("delay", 1)

        attempt = context.get("attempt", 0) + 1
        if attempt < max_attempts:
            context["attempt"] = attempt
            time.sleep(delay * attempt)
            return "RETRY"

        return error

    def has_fallback(self, error_type):
        return error_type in self.fallbacks

    def fallback(self, error, context):
        fallback_func = self.fallbacks[type(error).__name__]
        return fallback_func(error, context)

### 2. 错误恢复

#### 2.1 恢复策略

## 恢复策略
### 自动恢复
- 重试机制
- 回退方案
- 降级处理
### 手动恢复
- 用户确认
- 参数修正
- 上下文调整
### 状态恢复
- 快照恢复
- 断点续传
- 事务回滚

~~~
#### 2.2 恢复实现
~~~
python

class RecoveryManager:
    def __init__(self):
        self.checkpoints = {}

    def create_checkpoint(self, execution_id, state):
        self.checkpoints[execution_id] = {
            "timestamp": datetime.now(),
            "state": copy.deepcopy(state)
        }

    def restore_checkpoint(self, execution_id):
        if execution_id in self.checkpoints:
            return copy.deepcopy(self.checkpoints[execution_id]["state"])
        return None

    def recover_from_error(self, error, execution_id):
        # 恢复到检查点
        state = self.restore_checkpoint(execution_id)
        if state:
            # 尝试恢复执行
            return self.resume_execution(state)

        # 如果没有检查点,尝试其他恢复策略
        return self.alternative_recovery(error)

## 性能优化

### 1. 并行执行

#### 1.1 并行策略

class ParallelExecutor:
def __init__(self, max_workers=4):
self.max_workers = max_workers
self.executor = ThreadPoolExecutor(max_workers=max_workers)
async def execute_parallel(self, tasks):
futures = []
for task in tasks:
future = self.executor.submit(task.execute)
futures.append(future)
results = []
for future in futures:
result = await asyncio.wrap_future(future)
results.append(result)
return results

~~~
#### 1.2 依赖管理
~~~
python

class DependencyManager:
    def __init__(self):
        self.dependencies = {}

    def add_dependency(self, task, depends_on):
        if task not in self.dependencies:
            self.dependencies[task] = []
        self.dependencies[task].extend(depends_on)

    def get_execution_order(self, tasks):
        order = []
        visited = set()

        def visit(task):
            if task in visited:
                return
            visited.add(task)

            for dep in self.dependencies.get(task, []):
                visit(dep)

            order.append(task)

        for task in tasks:
            visit(task)

        return order

### 2. 资源管理

#### 2.1 资源池

class ResourcePool:
def __init__(self, max_resources):
self.max_resources = max_resources
self.available = max_resources
self.lock = asyncio.Lock()
async def acquire(self):
async with self.lock:
while self.available <= 0:
await asyncio.sleep(0.1)
self.available -= 1
return True
async def release(self):
async with self.lock:
self.available += 1
async def __aenter__(self):
await self.acquire()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.release()

~~~
#### 2.2 资源监控
~~~
python

class ResourceMonitor:
    def __init__(self):
        self.metrics = defaultdict(list)

    def record_metric(self, name, value):
        self.metrics[name].append({
            "value": value,
            "timestamp": datetime.now()
        })

    def get_average(self, name):
        values = [m["value"] for m in self.metrics[name]]
        return sum(values) / len(values) if values else 0

    def get_peak(self, name):
        values = [m["value"] for m in self.metrics[name]]
        return max(values) if values else 0

总结#

Skills 与主代理的交互机制是一个复杂而精密的系统,涉及多种交互模式、通信机制、状态管理、错误处理和性能优化。理解这些机制有助于:

  1. 优化性能:通过并行执行和资源管理提高性能
  2. 增强可靠性:通过完善的错误处理和恢复机制提高可靠性
  3. 改善体验:通过流式通信和事件驱动改善用户体验
  4. 支持扩展:通过灵活的交互机制支持功能扩展

在下一节中,我们将探讨 Skills 的性能优化策略,了解如何进一步提高 Skills 的执行效率。

bash
~~~

标记本节教程为已读

记录您的学习进度,方便后续查看。