import { TransactionLock } from "./interfaces/TransactionLock";
import { SynchronousLock } from "./locks/SynchronousLock";
import { DBKeys } from "@decaf-ts/db-decorators";
import "./overrides";
import { Metadata } from "@decaf-ts/decoration";
import { LoggedClass, getObjectName, Logging } from "@decaf-ts/logging";
import { TimeoutError } from "./errors";
type TransactionRunnable<R, C = unknown> = (this: C) => R | Promise<R>;
const objectNameCache = new WeakMap<object, string>();
/**
* @description Core transaction management class
* @summary Manages transaction lifecycle, including creation, execution, and cleanup. Provides mechanisms for binding transactions to objects and methods, ensuring proper transaction context propagation.
* @param {string} source - The source/origin of the transaction (typically a class name)
* @param {string} [method] - The method name associated with the transaction
* @param {function(): any} [action] - The function to execute within the transaction
* @param {any[]} [metadata] - Additional metadata to associate with the transaction
* @class Transaction
* @example
* // Creating and submitting a transaction
* const transaction = new Transaction(
* 'UserService',
* 'createUser',
* async () => {
* // Transaction logic here
* await db.insert('users', { name: 'John' });
* }
* );
* Transaction.submit(transaction);
*
* // Using the transactional decorator
* class UserService {
* @transactional()
* async createUser(data) {
* // Method will be executed within a transaction
* return await db.insert('users', data);
* }
* }
* @mermaid
* sequenceDiagram
* participant C as Client Code
* participant T as Transaction
* participant L as TransactionLock
* participant O as Original Method
*
* C->>T: new Transaction(source, method, action)
* C->>T: Transaction.submit(transaction)
* T->>L: submit(transaction)
* L->>T: fire()
* T->>O: Execute action()
* O-->>T: Return result/error
* T->>L: release(error?)
* L-->>C: Return result/error
*/
export class Transaction<R> extends LoggedClass {
static debug = false;
static globalTimeout = -1;
private static readonly metadataCache = new WeakMap<
// eslint-disable-next-line @typescript-eslint/no-unsafe-function-type
Function,
{
methods: string[];
propertyKeys: string[];
propertyDesignTypes: Map<string, boolean>;
}
>();
private static log = new Proxy(Logging.for(Transaction), {
get(target, prop, receiver) {
if (prop !== "log" || Transaction.debug)
return Reflect.get(target, prop, receiver);
// eslint-disable-next-line @typescript-eslint/no-unused-vars
return (...args: any[]) => {
// do nothing. ignore the log message the Transaction is not in debug mode
};
},
});
override get log() {
if (!this["_log"]) {
this["_log"] = Transaction.log;
}
return this["_log"];
}
readonly id: number;
protected action?: () => Promise<R>;
readonly method?: string;
readonly source?: string;
readonly logs: string[];
private readonly metadata?: any[];
private readonly completion: Promise<R>;
private resolveCompletion?: (value: R) => void;
private rejectCompletion?: (reason?: unknown) => void;
private initialFireDispatched = false;
private released = false;
private static lock: TransactionLock;
private static readonly contexts = new WeakMap<object, Transaction<any>>();
constructor(
source: string,
method?: string,
action?: () => Promise<R>,
metadata?: any[]
) {
super();
this.id = Date.now();
this.action = action;
this.method = method;
this.logs = [[this.id, source, method].join(" | ")];
this.source = source;
this.metadata = metadata;
this.completion = new Promise<R>((resolve, reject) => {
this.resolveCompletion = resolve;
this.rejectCompletion = reject;
});
}
/**
* @description Queues a transaction for execution
* @summary Pushes a transaction to the queue and waits for its resolution. Creates a new transaction with the provided issuer and callback method, then submits it to the transaction lock.
* @param {any} issuer - Any class instance that will be used as 'this' when calling the callbackMethod
* @param {Function} method - function containing the transaction logic, will be called with the issuer as 'this'
* @param {any[]} args - Arguments to pass to the method. Last one must be the callback function
* @return {void}
*/
static async push<R>(
issuer: any,
method: (...argzz: any[]) => Promise<R>,
...args: any[]
): Promise<R> {
const log = this.log.for(this.push);
const issuerName = Transaction.describeTarget(issuer);
const methodName = Transaction.describeTarget(method);
const transaction: Transaction<R> = new Transaction<R>(
issuerName,
methodName,
async () => {
const l = log.for(transaction.id.toString());
try {
l.verbose(`Executing transaction method ${methodName}`);
l.debug(`With arguments: ${JSON.stringify(args)}`);
const result = await Promise.resolve(
method.call(transaction.bindToTransaction(issuer), ...args)
);
l.verbose(`Transaction method ${methodName} executed successfully`);
l.debug(`Result: ${JSON.stringify(result)}`);
await transaction.release();
l.debug("lock released");
return result;
} catch (e: unknown) {
await transaction.release(e as Error);
throw e;
}
}
);
log.debug(
`Pushing transaction ${transaction.id} for method ${methodName} on issuer ${issuerName}`
);
return Transaction.submit(transaction);
}
static async run<R, C = unknown>(
runnable: TransactionRunnable<R, C>,
metadata?: any[]
): Promise<R>;
static async run<R, C = unknown>(
context: C,
runnable: TransactionRunnable<R, C>,
metadata?: any[]
): Promise<R>;
static async run<R, C = unknown>(
contextOrRunnable: C | TransactionRunnable<R, C>,
runnableOrMetadata?: TransactionRunnable<R, C> | any[],
maybeMetadata?: any[]
): Promise<R> {
const contextProvided = typeof contextOrRunnable !== "function";
const context = (contextProvided ? contextOrRunnable : undefined) as
| C
| undefined;
const runnable = (
contextProvided ? runnableOrMetadata : contextOrRunnable
) as TransactionRunnable<R, C>;
if (typeof runnable !== "function") {
throw new Error("Transaction.run requires an async function");
}
const rawMetadata = contextProvided ? maybeMetadata : runnableOrMetadata;
const metadataValue =
Array.isArray(rawMetadata) && rawMetadata.length
? rawMetadata
: undefined;
const sourceName = context
? Transaction.describeTarget(context as object)
: Transaction.describeTarget(runnable as object);
const methodName = Transaction.describeTarget(runnable as object);
// eslint-disable-next-line prefer-const
let transaction: Transaction<R>;
const action = async () => {
let caughtError: unknown;
try {
const boundContext = context
? transaction.bindToTransaction(context)
: undefined;
return await runnable.call((boundContext ?? transaction) as C);
} catch (error) {
caughtError = error;
throw error;
} finally {
await transaction.release(
caughtError instanceof Error ? (caughtError as Error) : undefined
);
}
};
transaction = new Transaction<R>(
sourceName,
methodName,
action,
metadataValue
);
return Transaction.submit(transaction);
}
/**
* @description Configures the transaction lock implementation
* @summary Sets the lock implementation to be used for transaction management, allowing customization of the transaction behavior
* @param {TransactionLock} lock - The lock implementation to use for managing transactions
* @return {void}
*/
static setLock(lock: TransactionLock) {
this.lock = lock;
}
/**
* @description Retrieves the current transaction lock
* @summary Gets the current transaction lock instance, creating a default SyncronousLock if none exists
* @return {TransactionLock} The current transaction lock implementation
*/
static getLock(): TransactionLock {
if (!this.lock) this.lock = new SynchronousLock();
return this.lock;
}
/**
* @description Submits a transaction for processing
* @summary Submits a transaction to the current transaction lock for processing and execution
* @param {Transaction} transaction - The transaction to submit for processing
* @return {void}
*/
static submit<R>(transaction: Transaction<R>): Promise<R> {
return Transaction.getLock().submit(transaction);
}
/**
* @description Releases the transaction lock
* @summary Releases the current transaction lock, optionally with an error, allowing the next transaction to proceed
* @param {Error} [err] - Optional error that occurred during transaction execution
* @return {Promise<void>} A promise that resolves when the lock has been released
*/
static async release(err?: Error) {
return Transaction.getLock().release(err);
}
/**
* @description Releases the transaction instance once
* @summary Ensures the underlying lock is released at most a single time for the transaction
* @param {Error} [err] - Optional error to propagate to the lock implementation
* @return {Promise<void>} Resolves once the lock release call finishes or immediately when already released
*/
async release(err?: Error) {
if (this.released) return;
this.released = true;
await Transaction.release(err);
}
/**
* @description Retrieves transaction metadata
* @summary Returns a copy of the metadata associated with this transaction, ensuring the original metadata remains unmodified
* @return {any[] | undefined} A copy of the transaction metadata or undefined if no metadata exists
*/
getMetadata() {
return this.metadata ? [...this.metadata] : undefined;
}
private static getTransactionalMetadata(target: any) {
let cached = this.metadataCache.get(target);
if (cached) return cached;
const reservedProps = new Set<string>([
"__transactionProxy",
"__transactionTarget",
typeof DBKeys.ORIGINAL === "string" ? DBKeys.ORIGINAL : "__originalObj",
]);
const methods = (Metadata.transactionals(target) as string[]) ?? [];
const propertyKeys = (Metadata.properties(target) || []).filter(
(prop) => !reservedProps.has(prop)
);
const propertyDesignTypes = new Map<string, boolean>();
propertyKeys.forEach((prop) => {
const type = Metadata.type(target, prop);
propertyDesignTypes.set(prop, !!type && Metadata.isTransactional(type));
});
cached = { methods, propertyKeys, propertyDesignTypes };
this.metadataCache.set(target, cached);
return cached;
}
/**
* @description Links a new transaction to the current one
* @summary Binds a new transaction operation to the current transaction, transferring logs and binding methods to maintain transaction context
* @param {Transaction} nextTransaction - The new transaction to bind to the current one
* @return {void}
*/
bindTransaction(nextTransaction: Transaction<any>) {
this.log
.for(this.bindTransaction)
.verbose(`Binding the ${nextTransaction.toString()} to ${this}`);
this.logs.push(...nextTransaction.logs);
nextTransaction.bindTransaction = this.bindToTransaction.bind(this);
nextTransaction.bindToTransaction = this.bindToTransaction.bind(this);
this.action = nextTransaction.action;
}
/**
* @description Binds an object to the current transaction context
* @summary Binds a transactional decorated object to the transaction by ensuring all transactional methods automatically receive the current transaction as their first argument
* @param {any} obj - The object to bind to the transaction
* @return {any} The bound object with transaction-aware method wrappers
*/
bindToTransaction(obj: any): any {
const log = this.log.for(this.bindToTransaction);
log.verbose(
`Binding object ${getObjectName(obj)} to transaction ${this.id}`
);
const metadata = Transaction.getTransactionalMetadata(obj.constructor);
const transactionalMethods = metadata.methods;
if (!transactionalMethods.length) return obj;
// eslint-disable-next-line @typescript-eslint/no-this-alias
const self = this;
const reservedProps = new Set<string>([
"__transactionProxy",
"__transactionTarget",
typeof DBKeys.ORIGINAL === "string" ? DBKeys.ORIGINAL : "__originalObj",
]);
const props = new Set<string>(metadata.propertyKeys);
Object.getOwnPropertyNames(obj).forEach((prop) => {
if (!reservedProps.has(prop)) props.add(prop);
});
const transactionProps: string[] = Array.from(props).filter((p) => {
if (metadata.propertyDesignTypes.get(p)) return true;
const value = (obj as Record<string, unknown>)[p];
if (
value &&
(typeof value === "object" || typeof value === "function") &&
Metadata.isTransactional(value.constructor as any)
) {
return true;
}
return false;
});
log.debug(
`found transaction methods: ${transactionalMethods.join(", ")} and properties: ${transactionProps.join(", ")}`
);
const boundObj = new Proxy(obj, {
get(target, prop, receiver) {
if (transactionalMethods.includes(prop as string))
return new Proxy(target[prop as keyof typeof target] as any, {
apply(methodTarget, thisArg, argArray) {
return Reflect.apply(methodTarget, thisArg, [self, ...argArray]);
},
});
if (transactionProps.includes(prop as string))
return self.bindToTransaction(target[prop as keyof typeof target]);
return Reflect.get(target, prop, receiver);
},
});
boundObj[DBKeys.ORIGINAL as keyof typeof boundObj] =
obj[DBKeys.ORIGINAL] || obj;
boundObj.toString = () =>
getObjectName(boundObj[DBKeys.ORIGINAL as keyof typeof boundObj]) +
" proxy for transaction " +
this.id;
(boundObj as any).__transactionProxy = true;
(boundObj as any).__transactionTarget =
(obj as any).__transactionTarget || obj;
Transaction.contexts.set(boundObj, self);
return boundObj;
}
/**
* @description Applies the global timeout to the provided Promise, if configured
* @param {Promise<R>} execution - Transaction execution promise
* @return {Promise<R>} Promise that respects the configured global timeout
* @private
*/
private applyGlobalTimeout(execution: Promise<R>): Promise<R> {
if (Transaction.globalTimeout <= 0) return execution;
const timeoutMs = Transaction.globalTimeout;
const log = this.log.for(this.applyGlobalTimeout);
return new Promise<R>((resolve, reject) => {
let settled = false;
const timer = setTimeout(() => {
if (settled) return;
const error = new TimeoutError(
`Transaction ${this.toString()} exceeded timeout of ${timeoutMs}ms`
);
log.warn(error.message);
this.release(error).catch((releaseErr) =>
log.error(releaseErr as Error)
);
reject(error);
}, timeoutMs);
execution
.then((value) => {
settled = true;
clearTimeout(timer);
resolve(value);
})
.catch((err) => {
settled = true;
clearTimeout(timer);
reject(err);
});
});
}
/**
* @description Executes the transaction action
* @summary Fires the transaction by executing its associated action function, throwing an error if no action is defined
* @return {any} The result of the transaction action
*/
fire(): Promise<R> {
if (!this.action) throw new Error(`Missing the method`);
const executeAction = async () => {
return this.action ? await this.action() : (undefined as R);
};
const baseExecution = executeAction();
const execution =
Transaction.globalTimeout > 0
? this.applyGlobalTimeout(baseExecution)
: baseExecution;
if (!this.initialFireDispatched) {
this.initialFireDispatched = true;
execution
.then((result) => {
this.resolveCompletion?.(result);
return result;
})
.catch((err) => {
this.rejectCompletion?.(err);
throw err;
});
}
return execution;
}
/**
* @description Provides a string representation of the transaction
* @summary Overrides the default toString method to provide a formatted string representation of the transaction, optionally including the transaction ID and log
* @param {boolean} [withId=true] - Whether to include the transaction ID in the output
* @param {boolean} [withLog=false] - Whether to include the transaction log in the output
* @return {string} A string representation of the transaction
*/
override toString(withId = true, withLog = false) {
return `${withId ? `[${this.id}]` : ""}[Transaction][${this.source}.${this.method}${
withLog ? `]\nTransaction Log:\n${this.logs.join("\n")}` : "]"
}`;
}
static contextTransaction(context: any): Transaction<any> | undefined {
if (!context || !(context as any).__transactionProxy) {
return undefined;
}
return this.contexts.get(context);
}
wait(): Promise<R> {
return this.completion;
}
private static describeTarget(target: any): string {
if (
target === null ||
(typeof target !== "object" && typeof target !== "function")
) {
return getObjectName(target);
}
const key = target as object;
let cached = objectNameCache.get(key);
if (!cached) {
cached = getObjectName(target);
objectNameCache.set(key, cached);
}
return cached;
}
}
Source