import { ChildProcess, fork } from "node:child_process";
import { join } from "node:path";
import { LoggedClass } from "@decaf-ts/logging";
import { TestReporter } from "./TestReporter";
/**
* @description Store for logs indexed by identifier.
* @summary A record where keys are identifiers and values are arrays of log strings.
* @typedef {Record<number, string[]>} LogStore
* @memberOf module:utils
*/
type LogStore = Record<number, string[]>;
/**
* @description Structure of a parsed log entry.
* @summary Contains timestamp, child identifier, and action.
* @interface ParsedLog
* @property {number} timestamp - The timestamp of the log.
* @property {string} child - The child identifier.
* @property {string} action - The action performed.
* @memberOf module:utils
*/
export interface ParsedLog {
timestamp: number;
child: string;
action: string;
}
/**
* @description Result of a comparison between consumer and producer logs.
* @summary Contains arrays of parsed logs for both consumer and producer.
* @interface ComparerResult
* @property {ParsedLog[]} consumer - The parsed consumer logs.
* @property {ParsedLog[]} producer - The parsed producer logs.
* @memberOf module:utils
*/
export interface ComparerResult {
consumer: ParsedLog[];
producer: ParsedLog[];
}
/**
* @description Function type for comparing consumer and producer data.
* @summary Compares two LogStores and returns a Promise resolving to ComparerResult.
* @typedef {function(LogStore, LogStore): Promise<ComparerResult>} Comparer
* @memberOf module:utils
*/
type Comparer = (
consumerData: LogStore,
producerData: LogStore
) => Promise<ComparerResult>;
/**
* @description Function type for handling consumer actions.
* @summary A function that takes an identifier and optional arguments, returning a result.
* @typedef {function(number, ...unknown[]): unknown | Promise<unknown>} ConsumerHandler
* @memberOf module:utils
*/
type ConsumerHandler = (
identifier: number,
...args: unknown[]
) => unknown | Promise<unknown>;
/**
* @description Message structure sent to the producer child process.
* @summary Defines the properties of a message sent to control the producer.
* @interface ProducerMessage
* @property {number} identifier - The identifier of the producer.
* @property {string[]} [result] - Optional result logs.
* @property {unknown[]} [args] - Optional arguments.
* @property {string} action - The action to perform.
* @property {number} [timeout] - Optional timeout.
* @property {number} times - Number of times to repeat.
* @property {boolean} [random] - Whether to use random timeouts.
* @memberOf module:utils
*/
interface ProducerMessage {
identifier: number;
result?: string[];
args?: unknown[];
action: string;
timeout?: number;
times: number;
random?: boolean;
}
/**
* @description Parses a log string into a ParsedLog object.
* @summary Splits the log string by " - " and extracts timestamp, child, and action.
* @param {string} data - The log string to parse.
* @return {ParsedLog} The parsed log object.
* @function parseData
* @memberOf module:utils
*/
const parseData = (data: string): ParsedLog => {
const [timestamp, , child, action] = data.split(" - ");
return {
timestamp: parseInt(timestamp, 10),
child,
action,
};
};
/**
* @description Default comparer function for consumer and producer logs.
* @summary Sorts and compares consumer and producer logs to ensure they match.
* @param {LogStore} consumerData - The consumer logs.
* @param {LogStore} producerData - The producer logs.
* @return {Promise<ComparerResult>} The comparison result.
* @function defaultComparer
* @memberOf module:utils
*/
export const defaultComparer: Comparer = async (consumerData, producerData) => {
const sortedConsumerData = Object.keys(consumerData)
.reduce<ParsedLog[]>((accum, key) => {
const identifier = Number(key);
const entries = consumerData[identifier] ?? [];
accum.push(...entries.map((entry) => parseData(entry)));
return accum;
}, [])
.sort((a, b) => a.timestamp - b.timestamp);
const sortedProducerData = Object.keys(producerData)
.reduce<ParsedLog[]>((accum, key) => {
const identifier = Number(key);
const entries = producerData[identifier] ?? [];
accum.push(...entries.map((entry) => parseData(entry)));
return accum;
}, [])
.sort((a, b) => a.timestamp - b.timestamp);
if (sortedProducerData.length !== sortedConsumerData.length) {
throw new Error("Producer data and consumer data does not match in length");
}
let counter = -1;
const isMatching = sortedProducerData.every((producer, index) => {
counter = index;
const consumer = sortedConsumerData[index];
return (
producer.child === consumer.child && producer.action === consumer.action
);
});
if (!isMatching) {
const errorLines = [
`Producer data and consumer data do not sort the same way as of record ${counter}:`,
" | CONSUMER | PRODUCER |",
" | id | action | timestamp | id | action | timestamp |",
];
sortedProducerData.forEach((producer, index) => {
if (index < counter || index > counter + 15) {
return;
}
const consumer = sortedConsumerData[index];
errorLines.push(
` ${index < 10 ? `0${index}` : index}| ${consumer.child} | ${consumer.action} | ${consumer.timestamp} | ${producer.child} | ${producer.action} | ${producer.timestamp} |`
);
});
throw new Error(errorLines.join("\n"));
}
return {
consumer: sortedConsumerData,
producer: sortedProducerData,
};
};
/**
* @description Options for the reporting comparer.
* @summary Configuration options for the reportingComparer function.
* @interface ReportingComparerOptions
* @property {TestReporter} [reporter] - The test reporter instance.
* @property {string} [testCase] - The test case name.
* @property {string} [referencePrefix] - The prefix for report references.
* @memberOf module:utils
*/
export interface ReportingComparerOptions {
reporter?: TestReporter;
testCase?: string;
referencePrefix?: string;
}
/**
* @description Formats a timestamp into an ISO string.
* @summary Converts a numeric timestamp to an ISO 8601 string.
* @param {number} value - The timestamp to format.
* @return {string} The formatted date string.
* @function formatTimestamp
* @memberOf module:utils
*/
const formatTimestamp = (value: number): string =>
new Date(value).toISOString();
/**
* @description Comparer function that reports results using TestReporter.
* @summary Compares logs and generates a report with tables and messages.
* @param {LogStore} consumerData - The consumer logs.
* @param {LogStore} producerData - The producer logs.
* @param {ReportingComparerOptions} [options] - Options for reporting.
* @return {Promise<ComparerResult>} The comparison result.
* @function reportingComparer
* @memberOf module:utils
*/
export const reportingComparer = async (
consumerData: LogStore,
producerData: LogStore,
options?: ReportingComparerOptions
): Promise<ComparerResult> => {
const reporter =
options?.reporter ??
new TestReporter(options?.testCase ?? "consumer-producer");
const referencePrefix = options?.referencePrefix ?? "consumer-producer";
try {
const comparison = await defaultComparer(consumerData, producerData);
const rows = comparison.consumer.map((consumerEntry, index) => {
const producerEntry = comparison.producer[index];
return {
Index: `${index}`,
"Consumer Child": consumerEntry.child,
"Consumer Action": consumerEntry.action,
"Consumer Timestamp": formatTimestamp(consumerEntry.timestamp),
"Producer Child": producerEntry?.child ?? "N/A",
"Producer Action": producerEntry?.action ?? "N/A",
"Producer Timestamp": producerEntry
? formatTimestamp(producerEntry.timestamp)
: "N/A",
};
});
await Promise.allSettled([
reporter.reportMessage(
`${referencePrefix}-comparison`,
`Consumer and producer logs matched (${comparison.consumer.length} entries).`
),
reporter.reportTable(`${referencePrefix}-logs`, {
headers: [
"Index",
"Consumer Child",
"Consumer Action",
"Consumer Timestamp",
"Producer Child",
"Producer Action",
"Producer Timestamp",
],
rows,
}),
]);
return comparison;
} catch (error) {
const message =
error instanceof Error ? error.message : String(error ?? "Unknown error");
await Promise.allSettled([
reporter.reportMessage(`${referencePrefix}-mismatch`, message),
reporter.reportObject(`${referencePrefix}-consumer`, consumerData),
reporter.reportObject(`${referencePrefix}-producer`, producerData),
]);
throw error;
}
};
/**
* @class ConsumerRunner
* @description Runs a consumer process and manages producer child processes.
* @summary Orchestrates the execution of consumer and producer processes, collects logs, and compares results.
* @param {string} action - The action name.
* @param {ConsumerHandler} consumerHandler - The handler function for the consumer.
* @param {Comparer} [compareHandler] - Optional custom comparer function.
* @memberOf module:utils
*/
export class ConsumerRunner extends LoggedClass {
private readonly action: string;
private readonly handler: ConsumerHandler;
private readonly comparerHandle: Comparer;
private forkedCache: ChildProcess[] | undefined = [];
private consumerResults: LogStore = {};
private producerResults: LogStore = {};
private childExitPromises: Array<Promise<void>> = [];
private completionTriggered = false;
private producerCompletion = 0;
private consumerCompletion = 0;
private expectedIterations = 0;
private activeHandlers = 0;
constructor(
action: string,
consumerHandler: ConsumerHandler,
compareHandler?: Comparer
) {
super();
this.action = action;
this.handler = consumerHandler;
this.comparerHandle = compareHandler ?? defaultComparer;
this.reset();
}
private reset(): void {
this.forkedCache = [];
this.consumerResults = {};
this.producerResults = {};
this.completionTriggered = false;
this.childExitPromises = [];
this.activeHandlers = 0;
this.producerCompletion = 0;
this.consumerCompletion = 0;
this.expectedIterations = 0;
}
private waitForChildExit(): Promise<void> {
if (!this.childExitPromises?.length) {
return Promise.resolve();
}
const exits = [...this.childExitPromises];
this.childExitPromises = [];
return Promise.allSettled(exits).then(() => void 0);
}
private store(
identifier: number,
action: string,
timeout: number | undefined,
times: number,
count: number,
random?: boolean
): void {
const logParts: Array<string | number | boolean> = [
Date.now(),
"PRODUCER",
identifier,
action,
];
if (timeout) {
logParts.push(timeout);
}
if (times && count) {
logParts.push(`${count}/${times}`, random ?? false);
}
const log = logParts.join(" - ");
if (!this.producerResults[identifier]) {
this.producerResults[identifier] = [];
}
const logs = this.producerResults[identifier];
logs.push(log);
const totalTimes = times ?? this.expectedIterations;
if (totalTimes > 0 && logs.length === totalTimes) {
this.producerCompletion += 1;
}
}
private recordConsumer(identifier: number, times?: number): void {
const logParts: Array<string | number> = [
Date.now(),
"CONSUMER",
identifier,
this.action,
];
const log = logParts.join(" - ");
if (!this.consumerResults[identifier]) {
this.consumerResults[identifier] = [];
}
const logs = this.consumerResults[identifier];
logs.push(log);
const totalTimes = times ?? this.expectedIterations;
if (totalTimes > 0 && logs.length === totalTimes) {
this.consumerCompletion += 1;
}
}
private isProducerComplete(count: number): boolean {
return this.producerCompletion >= count;
}
private isConsumerComplete(count: number): boolean {
return this.consumerCompletion >= count;
}
private terminateChildren(forceKill = false): Promise<void> {
if (!this.forkedCache) {
return this.waitForChildExit();
}
const cached = this.forkedCache;
this.forkedCache = undefined;
cached.forEach((forked, index) => {
if (!forked.connected && !forceKill) {
return;
}
try {
forked.send({
identifier: index,
terminate: true,
});
} catch {
// IPC channel already closed; nothing else to do.
}
if (forceKill && !forked.killed) {
forked.kill();
}
});
return this.waitForChildExit();
}
/**
* @description Runs the consumer and producer processes.
* @summary Starts the producer child processes and the consumer handler, then waits for completion and compares results.
* @param {number} count - The number of producers.
* @param {number} [timeout] - The timeout for producers.
* @param {number} times - The number of times to repeat.
* @param {boolean} [random] - Whether to use random timeouts.
* @return {Promise<ComparerResult>} The comparison result.
* @mermaid
* sequenceDiagram
* participant Runner as ConsumerRunner
* participant Child as ProducerChild
* participant Handler as ConsumerHandler
* participant Comparer as Comparer
* Runner->>Runner: reset()
* loop For each count
* Runner->>Child: fork()
* Runner->>Runner: Store child process
* end
* Runner->>Child: send(start message)
* loop For each message from Child
* Child->>Runner: message(action)
* Runner->>Runner: store producer log
* Runner->>Handler: call handler
* Handler-->>Runner: return
* Runner->>Runner: record consumer log
* Runner->>Runner: finalizeIfComplete()
* end
* alt Complete
* Runner->>Comparer: compare logs
* Comparer-->>Runner: return result
* Runner-->>Caller: resolve(result)
* end
*/
async run(
count: number,
timeout: number | undefined,
times: number,
random: boolean | undefined
): Promise<ComparerResult> {
this.reset();
this.expectedIterations = times;
const childPath = join(__dirname, "ProducerChildProcess.cjs");
return new Promise<ComparerResult>((resolve, reject) => {
const snapshotState = () => {
const summarize = (records: LogStore) =>
Object.keys(records).reduce<Record<string, number>>((acc, key) => {
acc[key] = records[Number(key)]?.length ?? 0;
return acc;
}, {});
return {
producers: summarize(this.producerResults),
consumers: summarize(this.consumerResults),
activeHandlers: this.activeHandlers,
};
};
const handleError = (error: unknown) => {
if (this.completionTriggered) {
return;
}
this.completionTriggered = true;
Promise.resolve(this.terminateChildren(true)).finally(() =>
reject(error)
);
};
const finalizeIfComplete = () => {
if (this.completionTriggered) {
return;
}
if (
!this.isProducerComplete(count) ||
!this.isConsumerComplete(count) ||
this.activeHandlers > 0
) {
return;
}
this.completionTriggered = true;
if (process.env.DEBUG_CONSUMER_RUNNER === "1") {
console.debug("ConsumerRunner finalize state", snapshotState());
}
try {
const comparisonPromise = Promise.resolve(
this.comparerHandle(this.consumerResults, this.producerResults)
);
Promise.all([comparisonPromise, this.waitForChildExit()])
.then(async ([comparison]) => {
await new Promise((resolveDelay) => setImmediate(resolveDelay));
resolve(comparison);
})
.catch(reject);
} catch (error) {
reject(error);
}
};
for (let identifier = 1; identifier < count + 1; identifier += 1) {
const forked = fork(childPath);
this.forkedCache?.push(forked);
this.childExitPromises?.push(
new Promise<void>((resolveChild) => {
forked.once("exit", () => resolveChild());
})
);
forked.on("error", handleError);
forked.on("message", async (message: ProducerMessage) => {
if (this.completionTriggered) {
return;
}
const {
identifier: childId,
args,
action,
timeout: childTimeout,
times: childTimes,
random: childRandom,
} = message;
this.activeHandlers += 1;
let handlerFailed = false;
if (process.env.DEBUG_CONSUMER_RUNNER === "1") {
console.debug("ConsumerRunner message:start", {
childId,
producerCount: this.producerResults[childId]?.length ?? 0,
consumerCount: this.consumerResults[childId]?.length ?? 0,
activeHandlers: this.activeHandlers,
});
}
try {
this.store(
childId,
action,
childTimeout,
childTimes,
count,
childRandom
);
const handlerArgs = Array.isArray(args) ? args : [];
await Promise.resolve(this.handler(childId, ...handlerArgs));
this.recordConsumer(childId, childTimes ?? times);
if (process.env.DEBUG_CONSUMER_RUNNER === "1") {
console.debug("ConsumerRunner message:complete", {
childId,
producerCount: this.producerResults[childId]?.length ?? 0,
consumerCount: this.consumerResults[childId]?.length ?? 0,
activeHandlers: this.activeHandlers,
});
}
} catch (error) {
handlerFailed = true;
handleError(error);
} finally {
this.activeHandlers = Math.max(0, this.activeHandlers - 1);
if (!handlerFailed) {
finalizeIfComplete();
}
}
});
}
this.forkedCache?.forEach((forked, index) => {
forked.send({
identifier: index,
action: this.action,
timeout,
times,
random,
});
});
});
}
}
Source