22.3.1 插件间通信机制#
事件总线通信#
// src/communication/event-bus.ts
/**
- 插件事件总线 */ export class PluginEventBus { private listeners: Map<string, EventListener[]> = new Map(); private history: Event[] = []; private maxHistorySize: number = 100;
constructor(maxHistorySize: number = 100) { this.maxHistorySize = maxHistorySize; }
/**
- 订阅事件 */ subscribe(eventType: string, listener: EventListener): void { if (!this.listeners.has(eventType)) { this.listeners.set(eventType, []); }
this.listeners.get(eventType)!.push(listener); }
/**
- 取消订阅 */ unsubscribe(eventType: string, listener: EventListener): void { const listeners = this.listeners.get(eventType);
if (listeners) { const index = listeners.indexOf(listener); if (index > -1) { listeners.splice(index, 1); } } }
/**
- 发布事件 */ async publish(event: PluginEvent): Promise<void> { // 记录历史 this.addToHistory(event);
// 获取监听器 const listeners = this.listeners.get(event.type);
if (!listeners || listeners.length === 0) { return; }
// 通知所有监听器 const promises = listeners.map(listener => this.safeNotify(listener, event) );
await Promise.all(promises);
}
/**
- 安全通知
*/
private async safeNotify(listener: EventListener, event: PluginEvent): Promise<void> {
try {
await listener(event);
} catch (error) {
console.error(
Error in event listener for ${event.type}:, error); } }
/**
- 添加到历史 */ private addToHistory(event: PluginEvent): void { this.history.push(event);
// 限制历史大小 if (this.history.length > this.maxHistorySize) { this.history.shift(); } }
/**
- 获取历史事件 */ getHistory(eventType?: string): PluginEvent[] { if (eventType) { return this.history.filter(event => event.type === eventType); }
return [...this.history]; }
/**
- 清除历史 */ clearHistory(): void { this.history = []; } }
/**
- 事件监听器 */ type EventListener = (event: PluginEvent) => Promise<void> | void;
/**
- 插件事件 */ interface PluginEvent { type: string; source: string; target?: string; data: any; timestamp: Date; }
// 使用示例 const eventBus = new PluginEventBus();
// 插件 A 订阅事件
eventBus.subscribe('user.created', async (event) => {
console.log(Plugin A received user.created:, event.data);
});
// 插件 B 订阅事件
eventBus.subscribe('user.created', async (event) => {
console.log(Plugin B received user.created:, event.data);
});
// 插件 C 发布事件 await eventBus.publish({ type: 'user.created', source: 'plugin-c', data: { userId: 1, name: 'John' }, timestamp: new Date() });
// 查看历史 const history = eventBus.getHistory('user.created'); console.log('History:', history);
消息队列通信#
bashtypescript // src/communication/message-queue.ts /** * 插件消息队列 */ export class PluginMessageQueue { private queues: Map<string, Message[]> = new Map(); private consumers: Map<string, MessageConsumer[]> = new Map(); private processing: boolean = false; /** * 发送消息 */ send(queueName: string, message: Message): void { if (!this.queues.has(queueName)) { this.queues.set(queueName, []); } this.queues.get(queueName)!.push(message); // 触发处理 this.processQueue(queueName); } /** * 接收消息 */ receive(queueName: string, consumer: MessageConsumer): void { if (!this.consumers.has(queueName)) { this.consumers.set(queueName, []); } this.consumers.get(queueName)!.push(consumer); } /** * 处理队列 */ private async processQueue(queueName: string): Promise<void> { if (this.processing) { return; } this.processing = true; try { const queue = this.queues.get(queueName); const consumers = this.consumers.get(queueName); if (!queue || queue.length === 0 || !consumers || consumers.length === 0) { return; } // 取出消息 const message = queue.shift()!; // 分发给消费者 const promises = consumers.map(consumer => this.safeConsume(consumer, message) ); await Promise.all(promises); // 继续处理下一条消息 if (queue.length > 0) { await this.processQueue(queueName); } } finally { this.processing = false; } } /** * 安全消费 */ private async safeConsume(consumer: MessageConsumer, message: Message): Promise<void> { try { await consumer(message); } catch (error) { console.error(`Error in message consumer:`, error); } } /** * 获取队列大小 */ getQueueSize(queueName: string): number { const queue = this.queues.get(queueName); return queue ? queue.length : 0; } /** * 清空队列 */ clearQueue(queueName: string): void { this.queues.set(queueName, []); } } /** * 消息接口 */ interface Message { id: string; source: string; target?: string; data: any; timestamp: Date; } /** * 消息消费者 */ type MessageConsumer = (message: Message) => Promise<void> | void; // 使用示例 const messageQueue = new PluginMessageQueue(); // 插件 A 注册消费者 messageQueue.receive('user-queue', async (message) => { console.log(`Plugin A received message:`, message.data); }); // 插件 B 注册消费者 messageQueue.receive('user-queue', async (message) => { console.log(`Plugin B received message:`, message.data); }); // 插件 C 发送消息 messageQueue.send('user-queue', { id: 'msg-1', source: 'plugin-c', data: { userId: 1, name: 'John' }, timestamp: new Date() }); // 查看队列大小 const size = messageQueue.getQueueSize('user-queue'); console.log('Queue size:', size); ### RPC 通信 // src/communication/rpc.ts /** * 插件 RPC 服务 */ export class PluginRPCService { private services: Map<string, RPCHandler> = new Map(); private clients: Map<string, RPCClient> = new Map(); /** * 注册服务 */ registerService(serviceName: string, handler: RPCHandler): void { this.services.set(serviceName, handler); } /** * 注销服务 */ unregisterService(serviceName: string): void { this.services.delete(serviceName); } /** * 调用服务 */ async call(serviceName: string, method: string, params: any): Promise<any> { const handler = this.services.get(serviceName); if (!handler) { throw new Error(`Service not found: ${serviceName}`); } return handler(method, params); } /** * 创建客户端 */ createClient(serviceName: string): RPCClient { const client = new RPCClient(this, serviceName); this.clients.set(serviceName, client); return client; } /** * 获取客户端 */ getClient(serviceName: string): RPCClient | undefined { return this.clients.get(serviceName); } } /** * RPC 处理器 */ type RPCHandler = (method: string, params: any) => Promise<any>; /** * RPC 客户端 */ export class RPCClient { constructor( private rpcService: PluginRPCService, private serviceName: string ) {} /** * 调用方法 */ async call(method: string, params?: any): Promise<any> { return this.rpcService.call(this.serviceName, method, params || {}); } } // 使用示例 const rpcService = new PluginRPCService(); // 插件 A 注册服务 rpcService.registerService('user-service', async (method, params) => { switch (method) { case 'getUser': return { id: params.id, name: 'John Doe' }; case 'createUser': return { id: Date.now(), ...params }; default: throw new Error(`Unknown method: ${method}`); } }); // 插件 B 注册服务 rpcService.registerService('order-service', async (method, params) => { switch (method) { case 'getOrder': return { id: params.id, userId: 1, total: 100 }; case 'createOrder': return { id: Date.now(), ...params }; default: throw new Error(`Unknown method: ${method}`); } }); // 插件 C 创建客户端并调用服务 const userClient = rpcService.createClient('user-service'); const orderClient = rpcService.createClient('order-service'); const user = await userClient.call('getUser', { id: 1 }); console.log('User:', user); const order = await orderClient.call('getOrder', { id: 1 }); console.log('Order:', order);
22.3.2 插件依赖管理#
依赖解析#
bashtypescript // src/dependencies/dependency-resolver.ts /** * 插件依赖解析器 */ export class PluginDependencyResolver { private plugins: Map<string, PluginInfo> = new Map(); /** * 添加插件 */ addPlugin(pluginInfo: PluginInfo): void { this.plugins.set(pluginInfo.name, pluginInfo); } /** * 移除插件 */ removePlugin(pluginName: string): void { this.plugins.delete(pluginName); } /** * 解析依赖顺序 */ resolve(): string[] { const visited: Set<string> = new Set(); const visiting: Set<string> = new Set(); const order: string[] = []; for (const pluginName of this.plugins.keys()) { if (!visited.has(pluginName)) { this.visit(pluginName, visited, visiting, order); } } return order; } /** * 访问插件 */ private visit( pluginName: string, visited: Set<string>, visiting: Set<string>, order: string[] ): void { if (visiting.has(pluginName)) { throw new Error(`Circular dependency detected: ${pluginName}`); } if (visited.has(pluginName)) { return; } visiting.add(pluginName); const plugin = this.plugins.get(pluginName); if (plugin) { for (const dep of plugin.dependencies || []) { this.visit(dep, visited, visiting, order); } } visiting.delete(pluginName); visited.add(pluginName); order.push(pluginName); } /** * 检查依赖 */ checkDependencies(): DependencyCheckResult { const errors: string[] = []; const warnings: string[] = []; for (const [name, plugin] of this.plugins.entries()) { for (const dep of plugin.dependencies || []) { if (!this.plugins.has(dep)) { errors.push(`Plugin ${name} depends on missing plugin: ${dep}`); } } } return { errors, warnings }; } } /** * 插件信息 */ interface PluginInfo { name: string; version: string; dependencies?: string[]; } /** * 依赖检查结果 */ interface DependencyCheckResult { errors: string[]; warnings: string[]; } // 使用示例 const resolver = new PluginDependencyResolver(); // 添加插件 resolver.addPlugin({ name: 'plugin-a', version: '1.0.0', dependencies: [] }); resolver.addPlugin({ name: 'plugin-b', version: '1.0.0', dependencies: ['plugin-a'] }); resolver.addPlugin({ name: 'plugin-c', version: '1.0.0', dependencies: ['plugin-a', 'plugin-b'] }); // 检查依赖 const checkResult = resolver.checkDependencies(); console.log('Dependency check:', checkResult); // 解析依赖顺序 const order = resolver.resolve(); console.log('Load order:', order); // ['plugin-a', 'plugin-b', 'plugin-c'] ### 依赖注入 // src/dependencies/dependency-injection.ts /** * 插件依赖注入容器 */ export class PluginDIContainer { private services: Map<string, ServiceDefinition> = new Map(); private instances: Map<string, any> = new Map(); /** * 注册服务 */ registerService(name: string, definition: ServiceDefinition): void { this.services.set(name, definition); } /** * 解析服务 */ resolve(name: string): any { // 检查是否已实例化 if (this.instances.has(name)) { return this.instances.get(name); } // 获取服务定义 const definition = this.services.get(name); if (!definition) { throw new Error(`Service not found: ${name}`); } // 解析依赖 const dependencies = (definition.dependencies || []).map(dep => this.resolve(dep) ); // 创建实例 const instance = definition.factory(...dependencies); // 如果是单例,缓存实例 if (definition.singleton) { this.instances.set(name, instance); } return instance; } /** * 清除实例 */ clear(): void { this.instances.clear(); } } /** * 服务定义 */ interface ServiceDefinition { factory: (...args: any[]) => any; dependencies?: string[]; singleton?: boolean; } // 使用示例 const container = new PluginDIContainer(); // 注册服务 container.registerService('logger', { factory: () => ({ log: (message: string) => console.log(`[LOG] ${message}`) }), singleton: true }); container.registerService('database', { factory: (logger: any) => ({ query: (sql: string) => { logger.log(`Executing query: ${sql}`); return []; } }), dependencies: ['logger'], singleton: true }); container.registerService('userService', { factory: (database: any, logger: any) => ({ getUser: (id: number) => { logger.log(`Getting user ${id}`); return database.query(`SELECT * FROM users WHERE id = ${id}`); } }), dependencies: ['database', 'logger'], singleton: false }); // 解析服务 const userService = container.resolve('userService'); const user = userService.getUser(1); console.log('User:', user);
22.3.3 插件生命周期协调#
生命周期管理器#
bashtypescript // src/lifecycle/lifecycle-manager.ts /** * 插件生命周期管理器 */ export class PluginLifecycleManager { private plugins: Map<string, ManagedPlugin> = new Map(); private state: LifecycleState = LifecycleState.IDLE; /** * 添加插件 */ addPlugin(plugin: ManagedPlugin): void { this.plugins.set(plugin.name, plugin); } /** * 移除插件 */ removePlugin(pluginName: string): void { this.plugins.delete(pluginName); } /** * 初始化所有插件 */ async initializeAll(): Promise<void> { this.state = LifecycleState.INITIALIZING; const resolver = new PluginDependencyResolver(); // 添加插件到解析器 for (const plugin of this.plugins.values()) { resolver.addPlugin({ name: plugin.name, version: plugin.version, dependencies: plugin.dependencies }); } // 解析依赖顺序 const order = resolver.resolve(); // 按顺序初始化 for (const pluginName of order) { const plugin = this.plugins.get(pluginName); if (plugin) { await plugin.initialize(); } } this.state = LifecycleState.INITIALIZED; } /** * 启动所有插件 */ async startAll(): Promise<void> { this.state = LifecycleState.STARTING; const resolver = new PluginDependencyResolver(); // 添加插件到解析器 for (const plugin of this.plugins.values()) { resolver.addPlugin({ name: plugin.name, version: plugin.version, dependencies: plugin.dependencies }); } // 解析依赖顺序 const order = resolver.resolve(); // 按顺序启动 for (const pluginName of order) { const plugin = this.plugins.get(pluginName); if (plugin) { await plugin.start(); } } this.state = LifecycleState.RUNNING; } /** * 停止所有插件 */ async stopAll(): Promise<void> { this.state = LifecycleState.STOPPING; const resolver = new PluginDependencyResolver(); // 添加插件到解析器 for (const plugin of this.plugins.values()) { resolver.addPlugin({ name: plugin.name, version: plugin.version, dependencies: plugin.dependencies }); } // 解析依赖顺序(反向) const order = resolver.resolve().reverse(); // 按反向顺序停止 for (const pluginName of order) { const plugin = this.plugins.get(pluginName); if (plugin) { await plugin.stop(); } } this.state = LifecycleState.STOPPED; } /** * 清理所有插件 */ async cleanupAll(): Promise<void> { const resolver = new PluginDependencyResolver(); // 添加插件到解析器 for (const plugin of this.plugins.values()) { resolver.addPlugin({ name: plugin.name, version: plugin.version, dependencies: plugin.dependencies }); } // 解析依赖顺序(反向) const order = resolver.resolve().reverse(); // 按反向顺序清理 for (const pluginName of order) { const plugin = this.plugins.get(pluginName); if (plugin) { await plugin.cleanup(); } } this.state = LifecycleState.IDLE; } /** * 获取状态 */ getState(): LifecycleState { return this.state; } } /** * 管理的插件 */ interface ManagedPlugin { name: string; version: string; dependencies?: string[]; initialize: () => Promise<void>; start: () => Promise<void>; stop: () => Promise<void>; cleanup: () => Promise<void>; } /** * 生命周期状态 */ enum LifecycleState { IDLE = 'IDLE', INITIALIZING = 'INITIALIZING', INITIALIZED = 'INITIALIZED', STARTING = 'STARTING', RUNNING = 'RUNNING', STOPPING = 'STOPPING', STOPPED = 'STOPPED' } // 使用示例 const manager = new PluginLifecycleManager(); // 添加插件 manager.addPlugin({ name: 'plugin-a', version: '1.0.0', dependencies: [], initialize: async () => console.log('Plugin A initialized'), start: async () => console.log('Plugin A started'), stop: async () => console.log('Plugin A stopped'), cleanup: async () => console.log('Plugin A cleaned up') }); manager.addPlugin({ name: 'plugin-b', version: '1.0.0', dependencies: ['plugin-a'], initialize: async () => console.log('Plugin B initialized'), start: async () => console.log('Plugin B started'), stop: async () => console.log('Plugin B stopped'), cleanup: async () => console.log('Plugin B cleaned up') }); manager.addPlugin({ name: 'plugin-c', version: '1.0.0', dependencies: ['plugin-a', 'plugin-b'], initialize: async () => console.log('Plugin C initialized'), start: async () => console.log('Plugin C started'), stop: async () => console.log('Plugin C stopped'), cleanup: async () => console.log('Plugin C cleaned up') }); // 初始化 await manager.initializeAll(); // 启动 await manager.startAll(); // 停止 await manager.stopAll(); // 清理 await manager.cleanupAll(); ### 生命周期事件 // src/lifecycle/lifecycle-events.ts /** * 插件生命周期事件 */ export class PluginLifecycleEvents { private listeners: Map<string, LifecycleEventListener[]> = new Map(); /** * 订阅事件 */ subscribe(eventType: LifecycleEventType, listener: LifecycleEventListener): void { if (!this.listeners.has(eventType)) { this.listeners.set(eventType, []); } this.listeners.get(eventType)!.push(listener); } /** * 取消订阅 */ unsubscribe(eventType: LifecycleEventType, listener: LifecycleEventListener): void { const listeners = this.listeners.get(eventType); if (listeners) { const index = listeners.indexOf(listener); if (index > -1) { listeners.splice(index, 1); } } } /** * 触发事件 */ async emit(event: LifecycleEvent): Promise<void> { const listeners = this.listeners.get(event.type); if (!listeners || listeners.length === 0) { return; } const promises = listeners.map(listener => this.safeNotify(listener, event) ); await Promise.all(promises); } /** * 安全通知 */ private async safeNotify(listener: LifecycleEventListener, event: LifecycleEvent): Promise<void> { try { await listener(event); } catch (error) { console.error(`Error in lifecycle event listener:`, error); } } } /** * 生命周期事件类型 */ enum LifecycleEventType { BEFORE_INITIALIZE = 'BEFORE_INITIALIZE', AFTER_INITIALIZE = 'AFTER_INITIALIZE', BEFORE_START = 'BEFORE_START', AFTER_START = 'AFTER_START', BEFORE_STOP = 'BEFORE_STOP', AFTER_STOP = 'AFTER_STOP', BEFORE_CLEANUP = 'BEFORE_CLEANUP', AFTER_CLEANUP = 'AFTER_CLEANUP' } /** * 生命周期事件 */ interface LifecycleEvent { type: LifecycleEventType; pluginName: string; timestamp: Date; } /** * 生命周期事件监听器 */ type LifecycleEventListener = (event: LifecycleEvent) => Promise<void> | void; // 使用示例 const events = new PluginLifecycleEvents(); // 订阅事件 events.subscribe(LifecycleEventType.BEFORE_START, async (event) => { console.log(`Before starting plugin: ${event.pluginName}`); }); events.subscribe(LifecycleEventType.AFTER_START, async (event) => { console.log(`After starting plugin: ${event.pluginName}`); }); // 触发事件 await events.emit({ type: LifecycleEventType.BEFORE_START, pluginName: 'plugin-a', timestamp: new Date() }); await events.emit({ type: LifecycleEventType.AFTER_START, pluginName: 'plugin-a', timestamp: new Date() });