import { appendFile } from "node:fs/promises"; import EventStore from "../domain/EventStore"; import { ApplicationUpdateStarted } from "../domain/events/ApplicationUpdateStarted"; import DomainEvent from "../domain/DomainEvent"; import DomainProjection from "../domain/DomainProjection"; export default class FileEventStore implements EventStore { private handlers: DomainProjection[] = []; constructor(private readonly filePath: string) {} append(event: DomainEvent): void { appendFile(this.filePath, this.serialize(event) + "\n"); this.emit(event); } subscribe(projection: DomainProjection) { this.handlers.push(projection); } async replay() { // TODO Improve this with streaming console.log("Replaying events from", this.filePath); const file = Bun.file(this.filePath); const content = await file.text(); const lines = content.split("\n"); for (const line of lines) { if (!line) { continue; } console.log("Deserializing", line); const event = this.deserialize(line); this.emit(event); } console.log("Replaying done"); } private emit(event: DomainEvent) { for (const handler of this.handlers) { handler.handle(event); } } private serialize(event: DomainEvent) { return JSON.stringify(event); } private deserialize(line: string) { const event = JSON.parse(line); switch (event.type) { case "ApplicationUpdateStarted": return new ApplicationUpdateStarted( { id: event.payload.id, newVersion: event.payload.newVersion, }, new Date(event.createdAt) ); default: throw new Error("Unknown event type" + event.type); } } }