All files / src/persistence ObserverHandler.ts

76.92% Statements 20/26
25% Branches 1/4
90% Functions 9/10
85.71% Lines 18/21

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63    9x             9x 2x           12x       12x 12x 12x       12x 12x 12x                   6x     6x 6x 3x 3x               6x   6x 6x              
import { Observable, Observer } from "../interfaces";
import { EventIds, ObserverFilter } from "./types";
import {
  BulkCrudOperationKeys,
  InternalError,
  OperationKeys,
} from "@decaf-ts/db-decorators";
import { Logger } from "@decaf-ts/logging";
 
export class ObserverHandler implements Observable {
  protected readonly observers: {
    observer: Observer;
    filter?: ObserverFilter;
  }[] = [];
 
  count() {
    return this.observers.length;
  }
 
  observe(observer: Observer, filter?: ObserverFilter): void {
    const index = this.observers.map((o) => o.observer).indexOf(observer);
    Iif (index !== -1) throw new InternalError("Observer already registered");
    this.observers.push({ observer: observer, filter: filter });
  }
 
  unObserve(observer: Observer): void {
    const index = this.observers.map((o) => o.observer).indexOf(observer);
    Iif (index === -1) throw new InternalError("Failed to find Observer");
    this.observers.splice(index, 1);
  }
 
  async updateObservers(
    log: Logger,
    table: string,
    event: OperationKeys | BulkCrudOperationKeys | string,
    id: EventIds,
    ...args: any[]
  ): Promise<void> {
    const results = await Promise.allSettled(
      this.observers
        .filter((o) => {
          const { filter } = o;
          if (!filter) return true;
          try {
            return filter(table, event, id);
          } catch (e: unknown) {
            log.error(
              `Failed to filter observer ${o.observer.toString()}: ${e}`
            );
            return false;
          }
        })
        .map((o) => o.observer.refresh(table, event, id, ...args))
    );
    results.forEach((result, i) => {
      Iif (result.status === "rejected")
        log.error(
          `Failed to update observable ${this.observers[i].toString()}: ${result.reason}`
        );
    });
  }
}