事件溯源
@ventostack/events 提供了内存事件溯源存储,支持乐观并发控制与快照机制。
事件溯源将应用状态建模为一系列事件,而非当前状态快照:
- 聚合根(Aggregate):业务实体(如 Order、Account)
- 事件(Event):状态变更记录(如 OrderPlaced、PaymentReceived)
- 投影(Projection):从事件流重建当前状态
import { createMemoryEventStore } from "@ventostack/events";
const store = createMemoryEventStore();
// 追加事件(expectedVersion 用于乐观并发控制)const stored = await store.append( "order_123", "order", [ { eventType: "ORDER_PLACED", payload: { userId: "user_1", items: [...], total: 199 }, }, ], 0, // 期望当前版本为 0);
// 读取事件流const events = await store.getEvents("order_123");
// 获取最新版本号const version = await store.getLatestVersion("order_123");interface OrderState { id: string; status: "pending" | "confirmed" | "cancelled"; total: number; items: OrderItem[];}
// 从事件重建状态function applyOrderEvent(state: OrderState | null, event: StoredEvent): OrderState | null { switch (event.eventType) { case "ORDER_PLACED": return { id: event.aggregateId, status: "pending", total: (event.payload as any).total, items: (event.payload as any).items, };
case "ORDER_CONFIRMED": return state ? { ...state, status: "confirmed" } : state;
case "ORDER_CANCELLED": return state ? { ...state, status: "cancelled" } : state;
default: return state; }}
// 从事件流重建聚合状态async function getOrder(orderId: string): Promise<OrderState | null> { const events = await store.getEvents(orderId); if (events.length === 0) return null; return events.reduce<OrderState | null>(applyOrderEvent, null);}乐观并发控制
Section titled “乐观并发控制”// 读取当前版本const version = await store.getLatestVersion(orderId);
// 追加时检查版本,防止并发冲突await store.append(orderId, "order", [newEvent], version);// 如果版本不匹配(被其他操作修改),会抛出 Error// 保存聚合根快照await store.saveSnapshot(orderId, currentState, currentVersion);
// 获取聚合根快照const snapshot = await store.getSnapshot(orderId);if (snapshot) { // 从快照版本之后的事件继续重建 const events = await store.getEvents(orderId, snapshot.version + 1); const state = events.reduce(applyOrderEvent, snapshot.state as OrderState);}EventStore 接口
Section titled “EventStore 接口”interface StoredEvent<T = unknown> { id: string; aggregateId: string; aggregateType: string; eventType: string; payload: T; version: number; timestamp: number; metadata?: Record<string, unknown>;}
interface EventStore { append( aggregateId: string, aggregateType: string, events: Array<{ eventType: string; payload: unknown; metadata?: Record<string, unknown> }>, expectedVersion: number, ): Promise<StoredEvent[]>; getEvents(aggregateId: string, fromVersion?: number): Promise<StoredEvent[]>; getLatestVersion(aggregateId: string): Promise<number>; saveSnapshot(aggregateId: string, state: unknown, version: number): Promise<void>; getSnapshot(aggregateId: string): Promise<{ state: unknown; version: number } | null>;}createMemoryEventStore基于内存 Map 实现,进程重启后数据丢失- 生产环境需要自行实现持久化存储适配器(基于数据库或事件存储系统)
- 乐观并发控制通过
expectedVersion参数实现,版本冲突时抛出Error