16.5 Skills 版本管理

24 分钟阅读

版本管理概述#

Skills 的版本管理是维护和演进 Skills 的重要机制。本节将详细介绍 Skills 的版本控制、版本发布和版本迁移等内容。

版本控制#

1. 版本号规范#

1.1 语义化版本

语义化版本(SemVer)#

版本格式#

主版本号.次版本号.修订号(MAJOR.MINOR.PATCH)

版本规则#

  • 主版本号(MAJOR):不兼容的 API 修改
  • 次版本号(MINOR):向下兼容的功能性新增
  • 修订号(PATCH):向下兼容的问题修正

示例#

  • 1.0.0 → 1.1.0:新增功能(向下兼容)
  • 1.1.0 → 1.1.1:修复 bug(向下兼容)
  • 1.1.1 → 2.0.0:不兼容的 API 变更

1.2 预发布版本

格式#

主版本号.次版本号.修订号-预发布标识.预发布版本号

预发布标识#

  • alpha:内部测试版
  • beta:公开测试版
  • rc:候选发布版

示例#

  • 1.0.0-alpha.1:第一个 alpha 版本
  • 1.0.0-beta.1:第一个 beta 版本
  • 1.0.0-rc.1:第一个候选版本

1.3 构建元数据

格式#

主版本号.次版本号.修订号+构建元数据

构建元数据#

  • 构建日期
  • 构建号
  • Git 提交哈希

示例#

  • 1.0.0+20240115
  • 1.0.0+build.123
  • 1.0.0+abc123def456

2. 版本比较#

2.1 版本比较算法

python
class VersionComparator: def compare(self, v1, v2): v1_parts = self.parse_version(v1) v2_parts = self.parse_version(v2) # 比较主版本号 if v1_parts["major"] != v2_parts["major"]: return -1 if v1_parts["major"] < v2_parts["major"] else 1 # 比较次版本号 if v1_parts["minor"] != v2_parts["minor"]: return -1 if v1_parts["minor"] < v2_parts["minor"] else 1 # 比较修订号 if v1_parts["patch"] != v2_parts["patch"]: return -1 if v1_parts["patch"] < v2_parts["patch"] else 1 # 比较预发布版本 if v1_parts["prerelease"] and v2_parts["prerelease"]: return self.compare_prerelease(v1_parts["prerelease"], v2_parts["prerelease"]) elif v1_parts["prerelease"]: return -1 elif v2_parts["prerelease"]: return 1 return 0 def parse_version(self, version): # 解析版本号 parts = version.split("-", 1) version_part = parts[0] prerelease_part = parts[1] if len(parts) > 1 else None version_numbers = version_part.split(".") major = int(version_numbers[0]) if len(version_numbers) > 0 else 0 minor = int(version_numbers[1]) if len(version_numbers) > 1 else 0 patch = int(version_numbers[2]) if len(version_numbers) > 2 else 0 # 解析构建元数据 build_metadata = None if "+" in version_part: version_part, build_metadata = version_part.split("+", 1) return { "major": major, "minor": minor, "patch": patch, "prerelease": prerelease_part, "build": build_metadata } def compare_prerelease(self, p1, p2): # 比较预发布版本 p1_parts = p1.split(".") p2_parts = p2.split(".") for i in range(max(len(p1_parts), len(p2_parts))): if i >= len(p1_parts): return -1 if i >= len(p2_parts): return 1 part1 = p1_parts[i] part2 = p2_parts[i] # 数字比较 if part1.isdigit() and part2.isdigit(): num1 = int(part1) num2 = int(part2) if num1 != num2: return -1 if num1 < num2 else 1 else: # 字符串比较 if part1 != part2: return -1 if part1 < part2 else 1 return 0

2.2 版本范围

python
class VersionRange: def __init__(self, range_spec): self.range_spec = range_spec self.comparator = VersionComparator() def satisfies(self, version): constraints = self.parse_range(self.range_spec) for constraint in constraints: if not self.check_constraint(version, constraint): return False return True def parse_range(self, range_spec): constraints = [] if not any(c in range_spec for c in ['^', '~', '>=', '<=', '>', '<', '||']): constraints.append({ "operator": "==", "version": range_spec }) return constraints parts = range_spec.split("||") for part in parts: part = part.strip() if part.startswith("^"): version = part[1:] constraints.append({ "operator": "^", "version": version }) elif part.startswith("~"): version = part[1:] constraints.append({ "operator": "~", "version": version }) elif part.startswith(">="): version = part[2:] constraints.append({ "operator": ">=", "version": version }) elif part.startswith("<="): version = part[2:] constraints.append({ "operator": "<=", "version": version }) elif part.startswith(">"): version = part[1:] constraints.append({ "operator": ">", "version": version }) elif part.startswith("<"): version = part[1:] constraints.append({ "operator": "<", "version": version }) return constraints def check_constraint(self, version, constraint): operator = constraint["operator"] constraint_version = constraint["version"] if operator == "==": return self.comparator.compare(version, constraint_version) == 0 elif operator == ">=": return self.comparator.compare(version, constraint_version) >= 0 elif operator == "<=": return self.comparator.compare(version, constraint_version) <= 0 elif operator == ">": return self.comparator.compare(version, constraint_version) > 0 elif operator == "<": return self.comparator.compare(version, constraint_version) < 0 elif operator == "^": return self.check_caret(version, constraint_version) elif operator == "~": return self.check_tilde(version, constraint_version) return False def check_caret(self, version, constraint): constraint_parts = self.comparator.parse_version(constraint) min_version = constraint max_version = f"{constraint_parts['major'] + 1}.0.0" return (self.comparator.compare(version, min_version) >= 0 and self.comparator.compare(version, max_version) < 0) def check_tilde(self, version, constraint): constraint_parts = self.comparator.parse_version(constraint) min_version = constraint max_version = f"{constraint_parts['major']}.{constraint_parts['minor'] + 1}.0" return (self.comparator.compare(version, min_version) >= 0 and self.comparator.compare(version, max_version) < 0)

3. 版本存储#

3.1 版本仓库

python
class VersionRepository: def __init__(self, storage_path): self.storage_path = storage_path self.comparator = VersionComparator() def save_version(self, skill_id, version, skill_data): version_path = self.get_version_path(skill_id, version) os.makedirs(os.path.dirname(version_path), exist_ok=True) with open(version_path, 'w') as f: json.dump(skill_data, f, indent=2) def load_version(self, skill_id, version): version_path = self.get_version_path(skill_id, version) if not os.path.exists(version_path): raise VersionNotFoundError(version) with open(version_path, 'r') as f: return json.load(f) def list_versions(self, skill_id): skill_path = self.get_skill_path(skill_id) if not os.path.exists(skill_path): return [] versions = [] for item in os.listdir(skill_path): if os.path.isdir(os.path.join(skill_path, item)): versions.append(item) versions.sort(key=lambda v: self.comparator.parse_version(v)) return versions def get_latest_version(self, skill_id): versions = self.list_versions(skill_id) if not versions: return None return versions[-1] def get_version_path(self, skill_id, version): return os.path.join(self.storage_path, skill_id, version, "skill.json") def get_skill_path(self, skill_id): return os.path.join(self.storage_path, skill_id)

3.2 版本索引

python
class VersionIndex: def __init__(self, index_path): self.index_path = index_path self.index = self.load_index() def load_index(self): if os.path.exists(self.index_path): with open(self.index_path, 'r') as f: return json.load(f) return {} def save_index(self): with open(self.index_path, 'w') as f: json.dump(self.index, f, indent=2) def add_version(self, skill_id, version, metadata): if skill_id not in self.index: self.index[skill_id] = {} self.index[skill_id][version] = metadata self.save_index() def get_version_metadata(self, skill_id, version): return self.index.get(skill_id, {}).get(version) def list_versions(self, skill_id): versions = self.index.get(skill_id, {}) return list(versions.keys()) def get_latest_version(self, skill_id): versions = self.list_versions(skill_id) if not versions: return None comparator = VersionComparator() return max(versions, key=lambda v: comparator.parse_version(v)) def search_versions(self, skill_id, version_range): range_checker = VersionRange(version_range) versions = self.list_versions(skill_id) return [v for v in versions if range_checker.satisfies(v)]

版本发布#

1. 发布流程#

1.1 发布准备

python
class ReleasePreparer: def __init__(self): self.version_repository = VersionRepository("./skills") self.version_index = VersionIndex("./index.json") def prepare_release(self, skill_id, version, release_notes): if not self.validate_version(version): raise InvalidVersionError(version) if self.version_exists(skill_id, version): raise VersionAlreadyExistsError(version) skill_data = self.load_skill_data(skill_id) metadata = { "version": version, "release_date": datetime.now().isoformat(), "release_notes": release_notes, "checksum": self.calculate_checksum(skill_data) } return metadata def validate_version(self, version): pattern = r'^\d+\.\d+\.\d+(-[a-zA-Z0-9]+(\.\d+)?)?(\+[a-zA-Z0-9]+)?$' return re.match(pattern, version) is not None def version_exists(self, skill_id, version): versions = self.version_repository.list_versions(skill_id) return version in versions def load_skill_data(self, skill_id): pass def calculate_checksum(self, skill_data): data_str = json.dumps(skill_data, sort_keys=True) return hashlib.sha256(data_str.encode()).hexdigest()

1.2 发布执行

python
class ReleasePublisher: def __init__(self): self.version_repository = VersionRepository("./skills") self.version_index = VersionIndex("./index.json") def publish_release(self, skill_id, version, skill_data, metadata): self.version_repository.save_version(skill_id, version, skill_data) self.version_index.add_version(skill_id, version, metadata) self.create_tag(skill_id, version) self.notify_subscribers(skill_id, version, metadata) def create_tag(self, skill_id, version): tag_name = f"{skill_id}/v{version}" tag_message = f"Release {skill_id} version {version}" subprocess.run([ "git", "tag", "-a", tag_name, "-m", tag_message ], check=True) def notify_subscribers(self, skill_id, version, metadata): subscribers = self.get_subscribers(skill_id) for subscriber in subscribers: self.send_notification(subscriber, skill_id, version, metadata) def get_subscribers(self, skill_id): pass def send_notification(self, subscriber, skill_id, version, metadata): pass

2. 变更日志#

2.1 变更记录

python
class ChangeLogManager: def __init__(self): self.change_logs = {} def add_change(self, skill_id, version, change_type, description): if skill_id not in self.change_logs: self.change_logs[skill_id] = {} if version not in self.change_logs[skill_id]: self.change_logs[skill_id][version] = { "version": version, "date": datetime.now().isoformat(), "changes": { "added": [], "changed": [], "deprecated": [], "removed": [], "fixed": [], "security": [] } } self.change_logs[skill_id][version]["changes"][change_type].append(description) def get_change_log(self, skill_id, version): return self.change_logs.get(skill_id, {}).get(version) def get_all_change_logs(self, skill_id): return self.change_logs.get(skill_id, {}) def generate_release_notes(self, skill_id, version): change_log = self.get_change_log(skill_id, version) if not change_log: return "No changes recorded." notes = [f"# Release {version}"] notes.append(f"Date: {change_log['date']}") notes.append("") changes = change_log["changes"] if changes["added"]: notes.append("## Added") for change in changes["added"]: notes.append(f"- {change}") notes.append("") if changes["changed"]: notes.append("## Changed") for change in changes["changed"]: notes.append(f"- {change}") notes.append("") if changes["deprecated"]: notes.append("## Deprecated") for change in changes["deprecated"]: notes.append(f"- {change}") notes.append("") if changes["removed"]: notes.append("## Removed") for change in changes["removed"]: notes.append(f"- {change}") notes.append("") if changes["fixed"]: notes.append("## Fixed") for change in changes["fixed"]: notes.append(f"- {change}") notes.append("") if changes["security"]: notes.append("## Security") for change in changes["security"]: notes.append(f"- {change}") notes.append("") return "\n".join(notes)

2.2 变更对比

python
class ChangeComparator: def __init__(self): self.version_repository = VersionRepository("./skills") def compare_versions(self, skill_id, from_version, to_version): from_data = self.version_repository.load_version(skill_id, from_version) to_data = self.version_repository.load_version(skill_id, to_version) changes = { "added": [], "removed": [], "modified": [] } changes["added"].extend(self.find_added_parameters(from_data, to_data)) changes["removed"].extend(self.find_removed_parameters(from_data, to_data)) changes["modified"].extend(self.find_modified_parameters(from_data, to_data)) changes["added"].extend(self.find_added_features(from_data, to_data)) changes["removed"].extend(self.find_removed_features(from_data, to_data)) changes["modified"].extend(self.find_modified_features(from_data, to_data)) return changes def find_added_parameters(self, from_data, to_data): from_params = set(from_data.get("parameters", {}).keys()) to_params = set(to_data.get("parameters", {}).keys()) return list(to_params - from_params) def find_removed_parameters(self, from_data, to_data): from_params = set(from_data.get("parameters", {}).keys()) to_params = set(to_data.get("parameters", {}).keys()) return list(from_params - to_params) def find_modified_parameters(self, from_data, to_data): from_params = from_data.get("parameters", {}) to_params = to_data.get("parameters", {}) modified = [] for param in from_params: if param in to_params and from_params[param] != to_params[param]: modified.append(param) return modified def find_added_features(self, from_data, to_data): from_features = set(from_data.get("features", [])) to_features = set(to_data.get("features", [])) return list(to_features - from_features) def find_removed_features(self, from_data, to_data): from_features = set(from_data.get("features", [])) to_features = set(to_data.get("features", [])) return list(from_features - to_features) def find_modified_features(self, from_data, to_data): pass

版本迁移#

版本迁移#

1. 迁移脚本#

1.1 迁移定义

python
class Migration: def __init__(self, from_version, to_version, migrate_func): self.from_version = from_version self.to_version = to_version self.migrate_func = migrate_func def can_migrate(self, from_version, to_version): comparator = VersionComparator() return (comparator.compare(from_version, self.from_version) >= 0 and comparator.compare(to_version, self.to_version) <= 0) def migrate(self, data): return self.migrate_func(data)

1.2 迁移注册

python
class MigrationRegistry: def __init__(self): self.migrations = [] def register_migration(self, migration): self.migrations.append(migration) def get_migrations(self, from_version, to_version): applicable_migrations = [] for migration in self.migrations: if migration.can_migrate(from_version, to_version): applicable_migrations.append(migration) comparator = VersionComparator() applicable_migrations.sort(key=lambda m: comparator.parse_version(m.from_version)) return applicable_migrations

2. 迁移执行#

2.1 数据迁移

python
class DataMigrator: def __init__(self, migration_registry): self.migration_registry = migration_registry def migrate_data(self, data, from_version, to_version): migrations = self.migration_registry.get_migrations(from_version, to_version) current_data = data current_version = from_version for migration in migrations: current_data = migration.migrate(current_data) current_version = migration.to_version return current_data def migrate_skill(self, skill_id, from_version, to_version): version_repository = VersionRepository("./skills") data = version_repository.load_version(skill_id, from_version) migrated_data = self.migrate_data(data, from_version, to_version) version_repository.save_version(skill_id, to_version, migrated_data) return migrated_data

2.2 配置迁移

python
class ConfigMigrator: def __init__(self, migration_registry): self.migration_registry = migration_registry def migrate_config(self, config, from_version, to_version): migrations = self.migration_registry.get_migrations(from_version, to_version) current_config = config current_version = from_version for migration in migrations: current_config = migration.migrate(current_config) current_version = migration.to_version return current_config def migrate_user_config(self, user_id, from_version, to_version): config = self.load_user_config(user_id) migrated_config = self.migrate_config(config, from_version, to_version) self.save_user_config(user_id, migrated_config) return migrated_config def load_user_config(self, user_id): pass def save_user_config(self, user_id, config): pass

版本回滚#

1. 回滚准备#

bash
python

class RollbackPreparer:
    def __init__(self):
        self.version_repository = VersionRepository("./skills")
        self.backup_repository = BackupRepository("./backups")

    def prepare_rollback(self, skill_id, from_version, to_version):
        # 验证版本
        if not self.version_exists(skill_id, to_version):
            raise VersionNotFoundError(to_version)

        # 创建备份
        backup_id = self.create_backup(skill_id, from_version)

        # 准备回滚
        rollback_plan = {
            "skill_id": skill_id,
            "from_version": from_version,
            "to_version": to_version,
            "backup_id": backup_id,
            "steps": self.generate_rollback_steps(skill_id, from_version, to_version)
        }

        return rollback_plan

    def version_exists(self, skill_id, version):
        versions = self.version_repository.list_versions(skill_id)
        return version in versions

    def create_backup(self, skill_id, version):
        # 创建备份
        backup_id = self.backup_repository.create_backup(skill_id, version)
        return backup_id

    def generate_rollback_steps(self, skill_id, from_version, to_version):
        # 生成回滚步骤
        steps = [
            {
                "step": 1,
                "action": "stop_skill",
                "description": "停止 Skill"
            },
            {
                "step": 2,
                "action": "restore_version",
                "description": f"恢复到版本 {to_version}"
            },
            {
                "step": 3,
                "action": "migrate_config",
                "description": "迁移配置"
            },
            {
                "step": 4,
                "action": "start_skill",
                "description": "启动 Skill"
            },
            {
                "step": 5,
                "action": "verify",
                "description": "验证回滚"
            }
        ]

        return steps

### 2. 回滚执行

```python
class RollbackExecutor:
    def __init__(self):
        self.version_repository = VersionRepository("./skills")
        self.backup_repository = BackupRepository("./backups")
        self.config_migrator = ConfigMigrator(MigrationRegistry())

    def execute_rollback(self, rollback_plan):
        try:
            for step in rollback_plan["steps"]:
                self.execute_step(rollback_plan, step)
            return True
        except Exception as e:
            self.restore_backup(rollback_plan["backup_id"])
            raise e

    def execute_step(self, rollback_plan, step):
        action = step["action"]
        if action == "stop_skill":
            self.stop_skill(rollback_plan["skill_id"])
        elif action == "restore_version":
            self.restore_version(
                rollback_plan["skill_id"],
                rollback_plan["to_version"]
            )
        elif action == "migrate_config":
            self.migrate_config(
                rollback_plan["from_version"],
                rollback_plan["to_version"]
            )
        elif action == "start_skill":
            self.start_skill(rollback_plan["skill_id"])
        elif action == "verify":
            self.verify_rollback(rollback_plan)

    def stop_skill(self, skill_id):
        pass

    def restore_version(self, skill_id, version):
        pass

    def migrate_config(self, from_version, to_version):
        pass

    def start_skill(self, skill_id):
        pass

    def verify_rollback(self, rollback_plan):
        pass

    def restore_backup(self, backup_id):
        self.backup_repository.restore_backup(backup_id)

标记本节教程为已读

记录您的学习进度,方便后续查看。