All files / src/persistence Dispatch.ts

72% Statements 36/50
40% Branches 6/15
81.81% Functions 9/11
72% Lines 36/50

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 1279x             9x 9x 9x     9x               7x 1x 7x           1x   1x   1x                 6x         6x 6x 6x 9x 9x     6x             3x               3x       6x   3x 3x 3x   3x     3x             3x             1x   1x 1x 1x 1x 1x                               3x   3x 3x            
import {
  InternalError,
  OperationKeys,
  BulkCrudOperationKeys,
} from "@decaf-ts/db-decorators";
import { ModelConstructor } from "@decaf-ts/decorator-validation";
import { Observable, Observer } from "../interfaces";
import { Adapter } from "./Adapter";
import { UnsupportedError } from "./errors";
import { Logger, Logging } from "@decaf-ts/logging";
import { EventIds } from "./types";
 
export class Dispatch<Y> implements Observable {
  protected adapter?: Adapter<Y, any, any, any>;
  protected native?: Y;
  protected models!: ModelConstructor<any>[];
 
  private logger!: Logger;
 
  protected get log() {
    if (!this.logger)
      this.logger = Logging.for(this as any).for(this.adapter as any);
    return this.logger;
  }
 
  constructor() {}
 
  protected initialize(): void {
    Iif (!this.adapter)
      throw new InternalError(`No adapter observed for dispatch`);
    const adapter = this.adapter as Adapter<Y, any, any, any>;
    (
      [
        OperationKeys.CREATE,
        OperationKeys.UPDATE,
        OperationKeys.DELETE,
        BulkCrudOperationKeys.CREATE_ALL,
        BulkCrudOperationKeys.UPDATE_ALL,
        BulkCrudOperationKeys.DELETE_ALL,
      ] as (keyof Adapter<Y, any, any, any>)[]
    ).forEach((method) => {
      Iif (!adapter[method])
        throw new InternalError(
          `Method ${method} not found in ${adapter.alias} adapter to bind Observables Dispatch`
        );
 
      let descriptor = Object.getOwnPropertyDescriptor(adapter, method);
      let proto: any = adapter;
      while (!descriptor && proto !== Object.prototype) {
        proto = Object.getPrototypeOf(proto);
        descriptor = Object.getOwnPropertyDescriptor(proto, method);
      }
 
      Iif (!descriptor || !descriptor.writable) {
        this.log.error(
          `Could not find method ${method} to bind Observables Dispatch`
        );
        return;
      }
      function bulkToSingle(method: string) {
        switch (method) {
          case BulkCrudOperationKeys.CREATE_ALL:
            return OperationKeys.CREATE;
          case BulkCrudOperationKeys.UPDATE_ALL:
            return OperationKeys.UPDATE;
          case BulkCrudOperationKeys.DELETE_ALL:
            return OperationKeys.DELETE;
          default:
            return method;
        }
      }
      // @ts-expect-error because there are read only properties
      adapter[method] = new Proxy(adapter[method], {
        apply: async (target: any, thisArg, argArray: any[]) => {
          const [tableName, ids] = argArray;
          const result = await target.apply(thisArg, argArray);
          this.updateObservers(tableName, bulkToSingle(method), ids as EventIds)
            .then(() => {
              this.log.verbose(
                `Observer refresh dispatched by ${method} for ${tableName}`
              );
              this.log.debug(`pks: ${ids}`);
            })
            .catch((e: unknown) =>
              this.log.error(
                `Failed to dispatch observer refresh for ${method} on ${tableName}: ${e}`
              )
            );
          return result;
        },
      });
    });
  }
 
  observe(observer: Adapter<Y, any, any, any>): void {
    Iif (!(observer instanceof Adapter))
      throw new UnsupportedError("Only Adapters can be observed by dispatch");
    this.adapter = observer;
    this.native = observer.native;
    this.models = Adapter.models(this.adapter.alias);
    this.initialize();
    this.log.verbose(`Dispatch initialized for ${this.adapter.alias} adapter`);
  }
 
  unObserve(observer: Observer): void {
    Iif (this.adapter !== observer)
      throw new UnsupportedError(
        "Only the adapter that was used to observe can be unobserved"
      );
    this.adapter = undefined;
  }
 
  async updateObservers(
    table: string,
    event: OperationKeys | BulkCrudOperationKeys | string,
    id: EventIds
  ): Promise<void> {
    Iif (!this.adapter)
      throw new InternalError(`No adapter observed for dispatch`);
    try {
      await this.adapter.refresh(table, event, id);
    } catch (e: unknown) {
      throw new InternalError(`Failed to refresh dispatch: ${e}`);
    }
  }
}