Source

client/FabricClientDispatch.ts

import {
  Adapter,
  ContextualArgs,
  Dispatch,
  EventIds,
  UnsupportedError,
  Context,
  MaybeContextualArg,
  PersistenceKeys,
} from "@decaf-ts/core";
import { PeerConfig } from "../shared/types";
import { Client } from "@grpc/grpc-js";
import { FabricClientAdapter } from "./FabricClientAdapter";
import {
  BulkCrudOperationKeys,
  InternalError,
  OperationKeys,
} from "@decaf-ts/db-decorators";
import {
  ChaincodeEvent,
  CloseableAsyncIterable,
} from "@hyperledger/fabric-gateway";
import { parseEventName } from "../shared/events";
import { Model } from "@decaf-ts/decorator-validation";
import { Constructor } from "@decaf-ts/decoration";
import { FabricClientFlags } from "./types";

/**
 * @description Event dispatcher for Hyperledger Fabric chaincode events
 * @summary Listens for and processes events emitted by Fabric chaincode, dispatching them to registered observers
 * @template PeerConfig - Configuration type for connecting to a Fabric peer
 * @param client - gRPC client for connecting to the Fabric network
 * @class FabricClientDispatch
 * @example
 * ```typescript
 * // Create a new FabricDispatch instance
 * const client = await FabricAdapter.getClient(peerConfig);
 * const dispatch = new FabricDispatch(client);
 *
 * // Configure the dispatch with peer configuration
 * dispatch.configure(peerConfig);
 *
 * // Register an observer for a specific table and event
 * dispatch.observe('users', 'create', (id) => {
 *   console.log(`User created: ${id}`);
 * });
 *
 * // Start listening for events
 * await dispatch.start();
 * ```
 * @mermaid
 * sequenceDiagram
 *   participant Client
 *   participant FabricDispatch
 *   participant Gateway
 *   participant Network
 *   participant Chaincode
 *
 *   Client->>FabricDispatch: new FabricDispatch(client)
 *   Client->>FabricDispatch: configure(peerConfig)
 *   Client->>FabricDispatch: observe(table, event, callback)
 *   Client->>FabricDispatch: start()
 *   FabricDispatch->>FabricDispatch: initialize()
 *   FabricDispatch->>Gateway: getGateway(config, client)
 *   Gateway->>Network: getNetwork(channel)
 *   Network->>Network: getChaincodeEvents(chaincodeName)
 *   FabricDispatch->>FabricDispatch: handleEvents()
 *   loop For each event
 *     Chaincode-->>FabricDispatch: ChaincodeEvent
 *     FabricDispatch->>FabricDispatch: parseEventName(eventName)
 *     FabricDispatch->>FabricDispatch: parsePayload(payload)
 *     FabricDispatch->>FabricDispatch: updateObservers(table, event, id)
 *     FabricDispatch-->>Client: callback(id)
 *   end
 */
export class FabricClientDispatch extends Dispatch<FabricClientAdapter> {
  /**
   * @description Event listening stack for chaincode events
   */
  private listeningStack?: CloseableAsyncIterable<ChaincodeEvent>;

  /**
   * @description Text decoder for converting event payloads from bytes to strings
   */
  private decoder = new TextDecoder("utf8");

  /**
   * @description Creates a new FabricDispatch instance
   * @summary Initializes a dispatcher for Fabric chaincode events
   * @param {Client} client - gRPC client for connecting to the Fabric network
   */
  constructor(protected client: Client) {
    super();
  }

  /**
   * @description Closes the event listening connection
   * @summary Stops listening for chaincode events and releases resources
   * @return {Promise<void>} Promise that resolves when the connection is closed
   */
  override async close(
    ...ctxArgs: ContextualArgs<Context<FabricClientFlags>>
  ): Promise<void> {
    const { log, ctxArgs: loggedArgs } = (
      await this.logCtx(ctxArgs, PersistenceKeys.SHUTDOWN, true)
    ).for(this.close);
    try {
      await super.close(...loggedArgs);
    } catch (e: unknown) {
      log.error(`Failed to close Fabric proxies event listener`, e as Error);
    }
    if (this.listeningStack) {
      try {
        await this.listeningStack.close();
      } catch (e: unknown) {
        log.error(`Failed to close Fabric event listener`, e as Error);
      } finally {
        this.listeningStack = undefined;
      }
    }
  }

  /**
   * @description Parses event payload from binary format
   * @summary Converts a Uint8Array containing JSON to an object with an id property
   * @param {Uint8Array} jsonBytes - The binary payload from the chaincode event
   * @return {{ id: string }} The parsed payload containing the record ID
   */
  private parsePayload(jsonBytes: Uint8Array): { id: string; result?: any } {
    const json = this.decoder.decode(jsonBytes);
    return JSON.parse(json);
  }

  /**
   * @description Starts observing an adapter
   * @summary Connects this dispatch to an adapter to monitor its operations
   * @param {Adapter<any, any, any, any>} observer - The adapter to observe
   * @return {void}
   */
  override observe(observer: Adapter<any, any, any, any>): () => void {
    if (!(observer instanceof FabricClientAdapter))
      throw new UnsupportedError(
        "Only FabricClientAdapter can be observed by dispatch"
      );
    super.observe(observer as FabricClientAdapter);
    return () => this.unObserve(observer);
  }

  /**
   * @description Updates observers about a database event
   * @summary Notifies observers about a change in the database
   * @param {string} table - The name of the table where the change occurred
   * @param {OperationKeys|BulkCrudOperationKeys|string} event - The type of operation that occurred
   * @param {any} payload - The event payload
   * @return {Promise<void>} A promise that resolves when all observers have been notified
   */
  override async updateObservers(
    model: Constructor<any> | string,
    event: OperationKeys | BulkCrudOperationKeys | string,
    id: EventIds,
    ...args: ContextualArgs<Context<FabricClientFlags>>
  ): Promise<void> {
    const { log, ctxArgs } = Adapter.logCtx<Context<FabricClientFlags>>(
      this.updateObservers,
      event,
      false,
      ...args
    );
    if (!this.adapter) {
      log.verbose(
        `No adapter observed for dispatch; skipping observer update for ${typeof model === "string" ? model : Model.tableName(model)}:${event}`
      );
      return;
    }
    try {
      await this.adapter.refresh(model, event, id, ...ctxArgs);
    } catch (e: unknown) {
      throw new InternalError(`Failed to refresh dispatch: ${e}`);
    }
  }

  /**
   * @description Processes incoming chaincode events
   * @summary Listens for events from the chaincode and dispatches them to registered observers
   * @return {Promise<void>} Promise that resolves when event handling stops
   * @mermaid
   * sequenceDiagram
   *   participant FabricDispatch
   *   participant EventStack
   *   participant EventParser
   *   participant Observers
   *
   *   FabricDispatch->>FabricDispatch: handleEvents()
   *   FabricDispatch->>EventStack: for await (const evt of listeningStack)
   *   EventStack-->>FabricDispatch: ChaincodeEvent
   *   FabricDispatch->>EventParser: parseEventName(evt.eventName)
   *   EventParser-->>FabricDispatch: { table, event, owner }
   *   FabricDispatch->>FabricDispatch: Check if event is for this MSP
   *   FabricDispatch->>FabricDispatch: parsePayload(evt.payload)
   *   FabricDispatch->>Observers: updateObservers(table, event, payload.id)
   *   Observers-->>FabricDispatch: Callbacks executed
   */
  protected async handleEvents(
    ctxArg?: Context<FabricClientFlags>
  ): Promise<void> {
    if (!this.listeningStack)
      throw new InternalError(
        `Event stack not initialized. Ensure that "startListening" is called before attempting this operation.`
      );

    if (!this.adapter || !this.adapter.config)
      throw new InternalError(`No adapter found. should be impossible`);

    const ctx =
      ctxArg ||
      (await this.adapter.context(
        OperationKeys.READ,
        {
          correlationId: this.adapter.config.chaincodeName,
        },
        (this.models && this.models[0]) || (Model as unknown as Constructor)
      ));
    const log = this.log.for(this.handleEvents);

    log.info(
      `Listening for incoming events on chaincode "${this.adapter.config.chaincodeName}" on channel "${this.adapter.config.channel}"...`
    );

    try {
      for await (const evt of this.listeningStack) {
        const { table, event, owner } = parseEventName(evt.eventName);
        if (
          this.adapter.config?.mspEventOnly &&
          owner &&
          owner !== this.adapter.config?.mspId
        )
          continue;
        const payload: { id: string; result?: any } = this.parsePayload(
          evt.payload
        );
        try {
          const targetModel = table
            ? Model.get(table)
            : Model.get(this.models[0].name);
          const modelRef = targetModel ?? (table || this.models[0]?.name);
          const observerArgs = payload.result ? [payload.result, ctx] : [ctx];
          await this.updateObservers(
            modelRef as Constructor | string,
            event,
            payload.id as string,
            ...(observerArgs as ContextualArgs<any>)
          );
        } catch (e: unknown) {
          log.error(
            `Failed update observables for table ${table} event ${event} id: ${payload.id}: ${e}`
          );
        }
      }
    } catch (e: any) {
      log.error(
        `Failed to read event for chaincode "${this.adapter.config.chaincodeName}" on channel "${this.adapter.config.channel}": ${e}`
      );
      await this.close(ctx);
    }
  }

  /**
   * @description Initializes the event listener
   * @summary Sets up the connection to the Fabric network and starts listening for chaincode events
   * @return {Promise<void>} Promise that resolves when initialization is complete
   */
  protected override async initialize(): Promise<void> {
    if (!this.adapter)
      throw new InternalError(`No adapter or config observed for dispatch`);
    const context = await this.adapter.context(
      "dispatch",
      {
        correlationId: this.adapter.config.chaincodeName,
      },
      Model as any
    );
    const { ctx } = this.logCtx([context], this.initialize);
    const gateway = await FabricClientAdapter.getGateway(
      ctx,
      this.adapter.config as PeerConfig,
      this.client
    );
    const network = gateway.getNetwork(this.adapter.config.channel);
    if (!this.adapter)
      throw new InternalError(`No adapter observed for dispatch`);
    this.listeningStack = await network.getChaincodeEvents(
      this.adapter.config.chaincodeName
    );
    this.handleEvents(ctx);

    // fallback for fully segregated models (the adapter doesnt sent events if so)

    (
      [
        OperationKeys.CREATE,
        OperationKeys.UPDATE,
        OperationKeys.DELETE,
        BulkCrudOperationKeys.CREATE_ALL,
        BulkCrudOperationKeys.UPDATE_ALL,
        BulkCrudOperationKeys.DELETE_ALL,
      ] as (keyof Adapter<any, any, any, any>)[]
    ).forEach((toWrap) => {
      if (!this.adapter)
        throw new InternalError(
          `No adapter provided for the fallback of fully segregated models`
        );
      if (!this.adapter[toWrap])
        throw new InternalError(
          `Method ${toWrap} not found in ${this.adapter.alias} adapter to bind Observables Dispatch`
        );

      let descriptor = Object.getOwnPropertyDescriptor(this.adapter, toWrap);
      let proto: any = this.adapter;
      while (!descriptor && proto !== Object.prototype) {
        proto = Object.getPrototypeOf(proto);
        descriptor = Object.getOwnPropertyDescriptor(proto, toWrap);
      }

      if (!descriptor || !descriptor.writable) {
        this.log.error(
          `Could not find method ${toWrap} to bind Observables Dispatch`
        );
        return;
      }
      function bulkToSingle(method: string) {
        switch (method) {
          case BulkCrudOperationKeys.CREATE_ALL:
            return OperationKeys.CREATE;
          case BulkCrudOperationKeys.UPDATE_ALL:
            return OperationKeys.UPDATE;
          case BulkCrudOperationKeys.DELETE_ALL:
            return OperationKeys.DELETE;
          default:
            return method;
        }
      }

      // @ts-expect-error because there are read only properties
      this.adapter[toWrap] = new Proxy(this.adapter[toWrap], {
        apply: async (
          target: any,
          thisArg: FabricClientAdapter,
          argArray: any[]
        ) => {
          // Run the original method unchanged so transient data is preserved
          const result = await target.apply(thisArg, argArray);

          const clazz: Constructor<any> = argArray[0];
          // Fully-public models emit a chaincode event on the contract side;
          // skip the local fallback to avoid double-notification.
          if (!Model.isTransient(clazz)) return result;

          // Context is always the last element of argArray
          const { log, ctxArgs, ctx } = thisArg["logCtx"](
            argArray.slice(argArray.length - 1),
            target
          );
          const ids = argArray[1];
          const resultArgs: any[] = [clazz, bulkToSingle(toWrap), ids];

          if (ctx.getOrUndefined("observeFullResult")) {
            resultArgs.push(result);
          }
          this.updateObservers(
            ...(resultArgs as Parameters<typeof this.updateObservers>),
            ...ctxArgs
          ).catch((e: unknown) =>
            log.error(
              `Failed to dispatch observer refresh for ${toWrap} on ${clazz.name || clazz} for ${ids}: ${e}`
            )
          );
          return result;
        },
      });
    });
  }
}

if (FabricClientAdapter)
  FabricClientAdapter["_baseDispatch"] = FabricClientDispatch;