消息队列
@ventostack/events 提供了消息队列适配器接口,内置内存实现,可扩展接入 Kafka、RabbitMQ、NATS 等。
内存消息队列(开发/测试)
Section titled “内存消息队列(开发/测试)”import { createMemoryMQAdapter } from "@ventostack/events";
const mq = createMemoryMQAdapter();
// 发布消息await mq.publish("orders", { id: "order_123", type: "ORDER_PLACED", payload: { orderId: "order_123", userId: "user_1" }, timestamp: new Date().toISOString(),});
// 订阅主题await mq.subscribe("orders", async (message) => { console.log("收到消息:", message.payload); // 处理消息...});自定义适配器(生产环境)
Section titled “自定义适配器(生产环境)”使用 createMQAdapterFactory 注册自定义实现:
import { createMQAdapterFactory } from "@ventostack/events";import type { MQAdapter, MQAdapterConfig, MQMessage } from "@ventostack/events";
// 实现 Kafka 适配器function createKafkaAdapter(config: MQAdapterConfig): MQAdapter { const kafka = new KafkaClient(config);
return { async publish(topic: string, message: MQMessage): Promise<void> { await kafka.send({ topic, messages: [{ value: JSON.stringify(message) }], }); },
async subscribe(topic: string, handler: (msg: MQMessage) => Promise<void>): Promise<void> { const consumer = kafka.consumer(); await consumer.subscribe({ topic }); await consumer.run({ eachMessage: async ({ message }) => { const msg = JSON.parse(message.value!.toString()); await handler(msg); }, }); },
async close(): Promise<void> { await kafka.disconnect(); }, };}
// 注册适配器const factory = createMQAdapterFactory();factory.register("kafka", createKafkaAdapter);
// 创建适配器实例const mq = factory.create({ type: "kafka", brokers: ["localhost:9092"],});interface MQMessage { id?: string; type: string; payload: unknown; timestamp: string; headers?: Record<string, string>;}MQAdapter 接口
Section titled “MQAdapter 接口”interface MQAdapter { publish(topic: string, message: MQMessage): Promise<void>; subscribe(topic: string, handler: MQMessageHandler): Promise<void>; unsubscribe(topic: string): Promise<void>; close(): Promise<void>;}
type MQMessageHandler = (message: MQMessage) => Promise<void>;