Source

client/FabricClientDispatch.ts

import {
  Adapter,
  ContextualArgs,
  Dispatch,
  EventIds,
  UnsupportedError,
} from "@decaf-ts/core";
import { PeerConfig } from "../shared/types";
import { Client } from "@grpc/grpc-js";
import {
  FabricClientAdapter,
  FabricClientContext,
} from "./FabricClientAdapter";
import {
  BulkCrudOperationKeys,
  InternalError,
  OperationKeys,
} from "@decaf-ts/db-decorators";
import {
  ChaincodeEvent,
  CloseableAsyncIterable,
} from "@hyperledger/fabric-gateway";
import { parseEventName } from "../shared/events";
import { Model } from "@decaf-ts/decorator-validation";
import { Constructor } from "@decaf-ts/decoration";

/**
 * @description Event dispatcher for Hyperledger Fabric chaincode events
 * @summary Listens for and processes events emitted by Fabric chaincode, dispatching them to registered observers
 * @template PeerConfig - Configuration type for connecting to a Fabric peer
 * @param client - gRPC client for connecting to the Fabric network
 * @class FabricClientDispatch
 * @example
 * ```typescript
 * // Create a new FabricDispatch instance
 * const client = await FabricAdapter.getClient(peerConfig);
 * const dispatch = new FabricDispatch(client);
 *
 * // Configure the dispatch with peer configuration
 * dispatch.configure(peerConfig);
 *
 * // Register an observer for a specific table and event
 * dispatch.observe('users', 'create', (id) => {
 *   console.log(`User created: ${id}`);
 * });
 *
 * // Start listening for events
 * await dispatch.start();
 * ```
 * @mermaid
 * sequenceDiagram
 *   participant Client
 *   participant FabricDispatch
 *   participant Gateway
 *   participant Network
 *   participant Chaincode
 *
 *   Client->>FabricDispatch: new FabricDispatch(client)
 *   Client->>FabricDispatch: configure(peerConfig)
 *   Client->>FabricDispatch: observe(table, event, callback)
 *   Client->>FabricDispatch: start()
 *   FabricDispatch->>FabricDispatch: initialize()
 *   FabricDispatch->>Gateway: getGateway(config, client)
 *   Gateway->>Network: getNetwork(channel)
 *   Network->>Network: getChaincodeEvents(chaincodeName)
 *   FabricDispatch->>FabricDispatch: handleEvents()
 *   loop For each event
 *     Chaincode-->>FabricDispatch: ChaincodeEvent
 *     FabricDispatch->>FabricDispatch: parseEventName(eventName)
 *     FabricDispatch->>FabricDispatch: parsePayload(payload)
 *     FabricDispatch->>FabricDispatch: updateObservers(table, event, id)
 *     FabricDispatch-->>Client: callback(id)
 *   end
 */
export class FabricClientDispatch extends Dispatch<FabricClientAdapter> {
  /**
   * @description Event listening stack for chaincode events
   */
  private listeningStack?: CloseableAsyncIterable<ChaincodeEvent>;

  /**
   * @description Text decoder for converting event payloads from bytes to strings
   */
  private decoder = new TextDecoder("utf8");

  /**
   * @description Creates a new FabricDispatch instance
   * @summary Initializes a dispatcher for Fabric chaincode events
   * @param {Client} client - gRPC client for connecting to the Fabric network
   */
  constructor(protected client: Client) {
    super();
  }

  /**
   * @description Closes the event listening connection
   * @summary Stops listening for chaincode events and releases resources
   * @return {Promise<void>} Promise that resolves when the connection is closed
   */
  override async close() {
    if (this.listeningStack) this.listeningStack.close();
  }

  /**
   * @description Parses event payload from binary format
   * @summary Converts a Uint8Array containing JSON to an object with an id property
   * @param {Uint8Array} jsonBytes - The binary payload from the chaincode event
   * @return {{ id: string }} The parsed payload containing the record ID
   */
  private parsePayload(jsonBytes: Uint8Array): { id: string } {
    const json = this.decoder.decode(jsonBytes);
    return JSON.parse(json);
  }

  /**
   * @description Starts observing an adapter
   * @summary Connects this dispatch to an adapter to monitor its operations
   * @param {Adapter<any, any, any, any>} observer - The adapter to observe
   * @return {void}
   */
  override observe(observer: FabricClientAdapter): void {
    if (!(observer instanceof FabricClientAdapter))
      throw new UnsupportedError(
        "Only FabricClientAdapter can be observed by dispatch"
      );
    this.adapter = observer;
    this.models = Adapter.models(this.adapter.alias);
    this.initialize().then(() =>
      this.log.verbose(
        `Dispatch initialized for ${this.adapter!.alias} adapter`
      )
    );
  }

  /**
   * @description Updates observers about a database event
   * @summary Notifies observers about a change in the database
   * @param {string} table - The name of the table where the change occurred
   * @param {OperationKeys|BulkCrudOperationKeys|string} event - The type of operation that occurred
   * @param {any} payload - The event payload
   * @return {Promise<void>} A promise that resolves when all observers have been notified
   */
  override async updateObservers(
    model: Constructor<any> | string,
    event: OperationKeys | BulkCrudOperationKeys | string,
    id: EventIds,
    ...args: ContextualArgs<FabricClientContext>
  ): Promise<void> {
    const { log, ctxArgs } = this.logCtx(args, this.updateObservers);
    if (!this.adapter) {
      log.verbose(
        `No adapter observed for dispatch; skipping observer update for ${typeof model === "string" ? model : Model.tableName(model)}:${event}`
      );
      return;
    }
    try {
      await this.adapter.refresh(model, event, id, ...ctxArgs);
    } catch (e: unknown) {
      throw new InternalError(`Failed to refresh dispatch: ${e}`);
    }
  }

  /**
   * @description Processes incoming chaincode events
   * @summary Listens for events from the chaincode and dispatches them to registered observers
   * @return {Promise<void>} Promise that resolves when event handling stops
   * @mermaid
   * sequenceDiagram
   *   participant FabricDispatch
   *   participant EventStack
   *   participant EventParser
   *   participant Observers
   *
   *   FabricDispatch->>FabricDispatch: handleEvents()
   *   FabricDispatch->>EventStack: for await (const evt of listeningStack)
   *   EventStack-->>FabricDispatch: ChaincodeEvent
   *   FabricDispatch->>EventParser: parseEventName(evt.eventName)
   *   EventParser-->>FabricDispatch: { table, event, owner }
   *   FabricDispatch->>FabricDispatch: Check if event is for this MSP
   *   FabricDispatch->>FabricDispatch: parsePayload(evt.payload)
   *   FabricDispatch->>Observers: updateObservers(table, event, payload.id)
   *   Observers-->>FabricDispatch: Callbacks executed
   */
  protected async handleEvents(
    ctxArg?: FabricClientContext
  ): Promise<void> {
    if (!this.listeningStack)
      throw new InternalError(
        `Event stack not initialized. Ensure that "startListening" is called before attempting this operation.`
      );

    if (!this.adapter || !this.adapter.config)
      throw new InternalError(`No adapter found. should be impossible`);

    const ctx =
      ctxArg ||
      (await this.adapter.context(
        OperationKeys.READ,
        {
          correlationId: this.adapter.config.chaincodeName,
        },
        (this.models && this.models[0]) || Model as unknown as Constructor
      ));
    const log = this.log.for(this.handleEvents);

    log.info(
      `Listening for incoming events on chaincode "${this.adapter.config.chaincodeName}" on channel "${this.adapter.config.channel}"...`
    );

    try {
      for await (const evt of this.listeningStack) {
        const { table, event, owner } = parseEventName(evt.eventName);
        if (owner && owner !== this.adapter.config?.mspId) continue;
        const payload: { id: string } = this.parsePayload(evt.payload);
        try {
          const targetModel = table
            ? Model.get(table)
            : Model.get(this.models[0].name);
          const modelRef =
            targetModel ?? (table || this.models[0]?.name);
          await this.updateObservers(
            modelRef as Constructor | string,
            event,
            payload.id as string,
            ctx
          );
        } catch (e: unknown) {
          log.error(
            `Failed update observables for table ${table} event ${event} id: ${payload.id}: ${e}`
          );
        }
      }
    } catch (e: any) {
      log.error(
        `Failed to read event for chaincode "${this.adapter.config.chaincodeName}" on channel "${this.adapter.config.channel}": ${e}`
      );
      await this.close();
    }
  }

  /**
   * @description Initializes the event listener
   * @summary Sets up the connection to the Fabric network and starts listening for chaincode events
   * @return {Promise<void>} Promise that resolves when initialization is complete
   */
  protected override async initialize(): Promise<void> {
    if (!this.adapter)
      throw new InternalError(`No adapter or config observed for dispatch`);
    const gateway = await FabricClientAdapter.getGateway(
      this.adapter.config as PeerConfig,
      this.client
    );
    const network = gateway.getNetwork(this.adapter.config.channel);
    if (!this.adapter)
      throw new InternalError(`No adapter observed for dispatch`);
    this.listeningStack = await network.getChaincodeEvents(
      this.adapter.config.chaincodeName
    );
    this.handleEvents();
  }
}

if (FabricClientAdapter)
  FabricClientAdapter["_baseDispatch"] = FabricClientDispatch;