feat: initial mini simple event sourcing for pending application updates
parent
76a8909dc1
commit
df5bfe4e1a
@ -1,3 +1,4 @@
|
|||||||
node_modules
|
node_modules
|
||||||
|
|
||||||
.env.local
|
.env.local
|
||||||
|
data/
|
||||||
|
@ -0,0 +1,9 @@
|
|||||||
|
import ApplicationUpdates from "./projections/ApplicationUpdates";
|
||||||
|
|
||||||
|
export default class AppProjections {
|
||||||
|
public readonly ApplicationUpdates = new ApplicationUpdates();
|
||||||
|
|
||||||
|
getAll(): DomainProjection[] {
|
||||||
|
return [this.ApplicationUpdates];
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,11 @@
|
|||||||
|
import AppProjections from "./AppProjections";
|
||||||
|
import { UpdateDefinition } from "./projections/ApplicationUpdates";
|
||||||
|
|
||||||
|
export default class AppQueries {
|
||||||
|
constructor(private readonly projections: AppProjections) {}
|
||||||
|
|
||||||
|
pendingApplicationUpdates(appName?: string): UpdateDefinition[] {
|
||||||
|
// TODO: Implement filtering by appName
|
||||||
|
return this.projections.ApplicationUpdates.getPendingUpdates();
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,5 @@
|
|||||||
|
interface DomainEvent<T> {
|
||||||
|
readonly type: string;
|
||||||
|
readonly createdAt: Date;
|
||||||
|
readonly payload: T;
|
||||||
|
}
|
@ -0,0 +1,3 @@
|
|||||||
|
interface DomainProjection {
|
||||||
|
handle(event: DomainEvent<any>): void;
|
||||||
|
}
|
@ -0,0 +1,5 @@
|
|||||||
|
export default interface EventStore {
|
||||||
|
append(event: DomainEvent<any>): void;
|
||||||
|
subscribe(projection: DomainProjection): void;
|
||||||
|
replay(): Promise<void>;
|
||||||
|
}
|
@ -0,0 +1,14 @@
|
|||||||
|
type ApplicationUpdateStartedPayload = {
|
||||||
|
id: string;
|
||||||
|
newVersion: string;
|
||||||
|
};
|
||||||
|
|
||||||
|
export class ApplicationUpdateStarted
|
||||||
|
implements DomainEvent<ApplicationUpdateStartedPayload>
|
||||||
|
{
|
||||||
|
readonly type = "ApplicationUpdateStarted" as const;
|
||||||
|
constructor(
|
||||||
|
public readonly payload: ApplicationUpdateStartedPayload,
|
||||||
|
public readonly createdAt: Date
|
||||||
|
) {}
|
||||||
|
}
|
@ -0,0 +1,18 @@
|
|||||||
|
export type UpdateDefinition = {
|
||||||
|
id: string;
|
||||||
|
newVersion: string;
|
||||||
|
};
|
||||||
|
|
||||||
|
export default class ApplicationUpdates implements DomainProjection {
|
||||||
|
private readonly pendingUpdates: UpdateDefinition[] = [];
|
||||||
|
handle(event: DomainEvent<any>): void {
|
||||||
|
if (event.type === "ApplicationUpdateStarted") {
|
||||||
|
console.log("ApplicationUpdateStarted", event.payload);
|
||||||
|
this.pendingUpdates.push(event.payload);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
getPendingUpdates(): UpdateDefinition[] {
|
||||||
|
return this.pendingUpdates;
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,60 @@
|
|||||||
|
import { appendFile } from "node:fs/promises";
|
||||||
|
import EventStore from "../domain/EventStore";
|
||||||
|
import { ApplicationUpdateStarted } from "../domain/events/ApplicationUpdateStarted";
|
||||||
|
|
||||||
|
export default class FileEventStore implements EventStore {
|
||||||
|
private handlers: DomainProjection[] = [];
|
||||||
|
constructor(private readonly filePath: string) {}
|
||||||
|
|
||||||
|
append(event: DomainEvent<any>): void {
|
||||||
|
appendFile(this.filePath, this.serialize(event) + "\n");
|
||||||
|
this.emit(event);
|
||||||
|
}
|
||||||
|
|
||||||
|
subscribe(projection: DomainProjection) {
|
||||||
|
this.handlers.push(projection);
|
||||||
|
}
|
||||||
|
|
||||||
|
async replay() {
|
||||||
|
// TODO Improve this with streaming
|
||||||
|
console.log("Replaying events from", this.filePath);
|
||||||
|
const file = Bun.file(this.filePath);
|
||||||
|
const content = await file.text();
|
||||||
|
const lines = content.split("\n");
|
||||||
|
for (const line of lines) {
|
||||||
|
if (!line) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
console.log("Deserializing", line);
|
||||||
|
const event = this.deserialize(line);
|
||||||
|
this.emit(event);
|
||||||
|
}
|
||||||
|
console.log("Replaying done");
|
||||||
|
}
|
||||||
|
|
||||||
|
private emit(event: DomainEvent<any>) {
|
||||||
|
for (const handler of this.handlers) {
|
||||||
|
handler.handle(event);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private serialize(event: DomainEvent<any>) {
|
||||||
|
return JSON.stringify(event);
|
||||||
|
}
|
||||||
|
|
||||||
|
private deserialize(line: string) {
|
||||||
|
const event = JSON.parse(line);
|
||||||
|
switch (event.type) {
|
||||||
|
case "ApplicationUpdateStarted":
|
||||||
|
return new ApplicationUpdateStarted(
|
||||||
|
{
|
||||||
|
id: event.payload.id,
|
||||||
|
newVersion: event.payload.newVersion,
|
||||||
|
},
|
||||||
|
new Date(event.createdAt)
|
||||||
|
);
|
||||||
|
default:
|
||||||
|
throw new Error("Unknown event type" + event.type);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue