import {
BulkCrudOperationKeys,
InternalError,
OperationKeys,
} from "@decaf-ts/db-decorators";
import { Model, ModelConstructor } from "@decaf-ts/decorator-validation";
import { Observer } from "../interfaces";
import { Adapter } from "./Adapter";
import { UnsupportedError } from "./errors";
import { AdapterDispatch, ContextOf, EventIds } from "./types";
import { Constructor } from "@decaf-ts/decoration";
import {
ContextualArgs,
ContextualizedArgs,
ContextualLoggedClass,
MaybeContextualArg,
MethodOrOperation,
} from "../utils/ContextualLoggedClass";
import { PersistenceKeys } from "./constants";
/**
* @description Dispatches database operation events to observers
* @summary The Dispatch class implements the Observable interface and is responsible for intercepting
* database operations from an Adapter and notifying observers when changes occur. It uses proxies to
* wrap the adapter's CRUD methods and automatically trigger observer updates after operations complete.
* @template Y - The native database driver type
* @param {void} - No constructor parameters
* @class Dispatch
* @example
* ```typescript
* // Creating and using a Dispatch instance
* const dispatch = new Dispatch<PostgresDriver>();
*
* // Connect it to an adapter
* const adapter = new PostgresAdapter(connection);
* dispatch.observe(adapter);
*
* // Now any CRUD operations on the adapter will automatically
* // trigger observer notifications
* await adapter.create('users', 123, userModel);
* // Observers will be notified about the creation
*
* // When done, you can disconnect
* dispatch.unObserve(adapter);
* ```
*/
export class Dispatch<A extends Adapter<any, any, any, any>>
extends ContextualLoggedClass<ContextOf<A>>
implements AdapterDispatch<A>
{
/**
* @description The adapter being observed
* @summary Reference to the database adapter whose operations are being monitored
*/
protected adapter?: A;
/**
* @description List of model constructors
* @summary Array of model constructors that are registered with the adapter
*/
protected models!: ModelConstructor<any>[];
/**
* @description Creates a new Dispatch instance
* @summary Initializes a new Dispatch instance without any adapter
*/
constructor() {
super();
}
protected override logCtx<
ARGS extends any[] = any[],
METHOD extends MethodOrOperation = MethodOrOperation,
>(
args: MaybeContextualArg<ContextOf<A>, ARGS>,
operation: METHOD
): ContextualizedArgs<
ContextOf<A>,
ARGS,
METHOD extends string ? true : false
>;
protected override logCtx<
ARGS extends any[] = any[],
METHOD extends MethodOrOperation = MethodOrOperation,
>(
args: MaybeContextualArg<ContextOf<A>, ARGS>,
operation: METHOD,
allowCreate: false
): ContextualizedArgs<
ContextOf<A>,
ARGS,
METHOD extends string ? true : false
>;
protected override logCtx<
ARGS extends any[] = any[],
METHOD extends MethodOrOperation = MethodOrOperation,
>(
args: MaybeContextualArg<ContextOf<A>, ARGS>,
operation: METHOD,
allowCreate: true
): Promise<
ContextualizedArgs<ContextOf<A>, ARGS, METHOD extends string ? true : false>
>;
protected override logCtx<
ARGS extends any[] = any[],
METHOD extends MethodOrOperation = MethodOrOperation,
>(
args: MaybeContextualArg<ContextOf<A>, ARGS>,
operation: METHOD,
allowCreate: boolean = false
):
| Promise<
ContextualizedArgs<
ContextOf<A>,
ARGS,
METHOD extends string ? true : false
>
>
| ContextualizedArgs<
ContextOf<A>,
ARGS,
METHOD extends string ? true : false
> {
if (!this.adapter) throw new InternalError("Adapter not set yet");
return this.adapter["logCtx"](args, operation, allowCreate as any) as
| ContextualizedArgs<
ContextOf<A>,
ARGS,
METHOD extends string ? true : false
>
| Promise<
ContextualizedArgs<
ContextOf<A>,
ARGS,
METHOD extends string ? true : false
>
>;
}
/**
* @description Initializes the dispatch by proxying adapter methods
* @summary Sets up proxies on the adapter's CRUD methods to intercept operations and notify observers.
* This method is called automatically when an adapter is observed.
* @return {Promise<void>} A promise that resolves when initialization is complete
* @mermaid
* sequenceDiagram
* participant Dispatch
* participant Adapter
* participant Proxy
*
* Dispatch->>Dispatch: initialize()
* Dispatch->>Dispatch: Check if adapter exists
* alt No adapter
* Dispatch-->>Dispatch: Throw InternalError
* end
*
* loop For each CRUD method
* Dispatch->>Adapter: Check if method exists
* alt Method doesn't exist
* Dispatch-->>Dispatch: Throw InternalError
* end
*
* Dispatch->>Adapter: Get property descriptor
* loop While descriptor not found
* Dispatch->>Adapter: Check prototype chain
* end
*
* alt Descriptor not found or not writable
* Dispatch->>Dispatch: Log error and continue
* else Descriptor found and writable
* Dispatch->>Proxy: Create proxy for method
* Dispatch->>Adapter: Replace method with proxy
* end
* end
*/
protected async initialize(
...args: MaybeContextualArg<ContextOf<A>>
): Promise<void> {
if (!this.adapter) {
// Gracefully skip initialization when no adapter is observed yet.
// Some tests or setups may construct a Dispatch before calling observe().
// Instead of throwing, we no-op so that later observe() can proceed.
this.log
.for(this.initialize)
.verbose(`No adapter observed for dispatch; skipping initialization`);
return;
}
const { log } = (
await this.logCtx(args, PersistenceKeys.INITIALIZATION, true)
).for(this.initialize);
log.verbose(`Initializing ${this.adapter}'s event Dispatch`);
const adapter = this.adapter as Adapter<any, any, any, any>;
(
[
OperationKeys.CREATE,
OperationKeys.UPDATE,
OperationKeys.DELETE,
BulkCrudOperationKeys.CREATE_ALL,
BulkCrudOperationKeys.UPDATE_ALL,
BulkCrudOperationKeys.DELETE_ALL,
] as (keyof Adapter<any, any, any, any>)[]
).forEach((toWrap) => {
if (!adapter[toWrap])
throw new InternalError(
`Method ${toWrap} not found in ${adapter.alias} adapter to bind Observables Dispatch`
);
let descriptor = Object.getOwnPropertyDescriptor(adapter, toWrap);
let proto: any = adapter;
while (!descriptor && proto !== Object.prototype) {
proto = Object.getPrototypeOf(proto);
descriptor = Object.getOwnPropertyDescriptor(proto, toWrap);
}
if (!descriptor || !descriptor.writable) {
this.log.error(
`Could not find method ${toWrap} to bind Observables Dispatch`
);
return;
}
function bulkToSingle(method: string) {
switch (method) {
case BulkCrudOperationKeys.CREATE_ALL:
return OperationKeys.CREATE;
case BulkCrudOperationKeys.UPDATE_ALL:
return OperationKeys.UPDATE;
case BulkCrudOperationKeys.DELETE_ALL:
return OperationKeys.DELETE;
default:
return method;
}
}
// @ts-expect-error because there are read only properties
adapter[toWrap] = new Proxy(adapter[toWrap], {
apply: async (target: any, thisArg: A, argArray: any[]) => {
const { log, ctxArgs, ctx } = thisArg["logCtx"](
argArray.slice(3 - (4 - argArray.length), argArray.length),
target
);
const [tableName, ids, payload] = argArray;
const result = await target.apply(thisArg, [
tableName,
ids,
payload,
...ctxArgs,
]);
const resultArgs: [string, string, EventIds] = [
tableName,
bulkToSingle(toWrap),
ids,
];
if (ctx.get("observeFullResult")) {
resultArgs.push(
Array.isArray(result)
? result.map((r) => tableName(r))
: tableName(result)
);
}
this.updateObservers(...resultArgs, ...ctxArgs).catch((e: unknown) =>
log.error(
`Failed to dispatch observer refresh for ${toWrap} on ${tableName.name || tableName} for ${ids}: ${e}`
)
);
return result;
},
});
});
}
/**
* @description Closes the dispatch
* @summary Performs any necessary cleanup when the dispatch is no longer needed
* @return {Promise<void>} A promise that resolves when closing is complete
*/
async close(): Promise<void> {
// to nothing in this instance but may be required for closing connections
}
/**
* @description Starts observing an adapter
* @summary Connects this dispatch to an adapter to monitor its operations
* @param {Adapter<any, any, any, any>} observer - The adapter to observe
* @return {void}
*/
observe(observer: A): () => void {
if (!(observer instanceof Adapter))
throw new UnsupportedError("Only Adapters can be observed by dispatch");
this.adapter = observer;
this.models = Adapter.models(this.adapter.alias);
this.initialize().then(() =>
this.log.verbose(
`Dispatch initialized for ${this.adapter!.alias} adapter`
)
);
return () => this.unObserve(observer);
}
/**
* @description Stops observing an adapter
* @summary Disconnects this dispatch from an adapter
* @param {Observer} observer - The adapter to stop observing
* @return {void}
*/
unObserve(observer: Observer): void {
if (this.adapter !== observer)
throw new UnsupportedError(
"Only the adapter that was used to observe can be unobserved"
);
this.adapter = undefined;
}
/**
* @description Updates observers about a database event
* @summary Notifies observers about a change in the database
* @param {string} table - The name of the table where the change occurred
* @param {OperationKeys|BulkCrudOperationKeys|string} event - The type of operation that occurred
* @param {EventIds} id - The identifier(s) of the affected record(s)
* @return {Promise<void>} A promise that resolves when all observers have been notified
*/
async updateObservers(
model: Constructor<any> | string,
event: OperationKeys | BulkCrudOperationKeys | string,
id: EventIds,
...args: ContextualArgs<ContextOf<A>>
): Promise<void> {
if (!model)
throw new InternalError(`Model must be provided for observer update`);
const table =
model && typeof model === "string" ? model : Model.tableName(model);
const { log, ctxArgs, ctx } = this.logCtx(args, this.updateObservers);
if (!this.adapter) {
log.verbose(
`No adapter observed for dispatch; skipping observer update for ${table}:${event}`
);
return;
}
try {
log.debug(
`dispatching observer refresh for ${event}:${table}: ${id}${ctx.get("observeFullResult") ? " - including result" : ""}`
);
await this.adapter.refresh(model, event, id, ...ctxArgs);
} catch (e: unknown) {
throw new InternalError(`Failed to refresh dispatch: ${e}`);
}
}
override toString() {
return `${this.adapter ? this.adapter.toString() : "uninitialized"} event dispatch`;
}
}
if (Adapter) Adapter["_baseDispatch"] = Dispatch;
Source