All files / src delta-stream.ts

60.78% Statements 31/51
50% Branches 5/10
64.28% Functions 9/14
60.78% Lines 31/51

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 1045x 5x 5x 5x   5x   5x 5x 5x 5x     5x 6x 6x 6x 6x 6x 6x   6x     3x                         3x 3x   3x 3x 3x                                                                         3x 3x         6x 6x             6x 6x         3x 3x      
import Debug from 'debug';
import EventEmitter from 'node:events';
import objectHash from 'object-hash';
import {Delta} from './delta';
import {RhizomeNode} from './node';
const debug = Debug('rz:deltas');
 
enum Decision {
  Accept,
  Reject,
  Defer
};
 
export class DeltaStream {
  deltaStream = new EventEmitter();
  deltasProposed: Delta[] = [];
  deltasAccepted: Delta[] = [];
  deltasRejected: Delta[] = [];
  deltasDeferred: Delta[] = [];
  hashesReceived = new Set<string>();
 
  constructor(readonly rhizomeNode: RhizomeNode) {}
 
  applyPolicy(delta: Delta): Decision {
    return !!delta && Decision.Accept;
  }
 
  receiveDelta(delta: Delta) {
    // Deduplication: if we already received this delta, disregard it
    const hash = objectHash(delta);
    Iif (!this.hashesReceived.has(hash)) {
      this.hashesReceived.add(hash);
      this.deltasProposed.push(delta);
    }
  }
 
  ingestDelta(delta: Delta) {
    const decision = this.applyPolicy(delta);
    switch (decision) {
      case Decision.Accept:
        this.deltasAccepted.push(delta);
        this.deltaStream.emit('delta', delta);
        break;
      case Decision.Reject:
        this.deltasRejected.push(delta);
        break;
      case Decision.Defer:
        this.deltasDeferred.push(delta);
        break;
    }
  }
 
  ingestNext(): boolean {
    const delta = this.deltasProposed.shift();
    Iif (!delta) {
      return false;
    }
    this.ingestDelta(delta);
    return true;
  }
 
  ingestAll() {
    while (this.ingestNext());
  }
 
  ingestNextDeferred(): boolean {
    const delta = this.deltasDeferred.shift();
    Iif (!delta) {
      return false;
    }
    this.ingestDelta(delta);
    return true;
  }
 
  ingestAllDeferred() {
    while (this.ingestNextDeferred());
  }
 
  subscribeDeltas(fn: (delta: Delta) => void) {
    this.deltaStream.on('delta', (delta) => {
      fn(delta);
    });
  }
 
  async publishDelta(delta: Delta) {
    debug(`[${this.rhizomeNode.config.peerId}]`, `Publishing delta: ${JSON.stringify(delta)}`);
    await this.rhizomeNode.pubSub.publish(
      "deltas",
      this.serializeDelta(delta)
    );
  }
 
  serializeDelta(delta: Delta): string {
    const deltaNetworkImage = delta.toNetworkImage();
    return JSON.stringify(deltaNetworkImage);
  }
 
  deserializeDelta(input: string): Delta {
    // TODO: Input validation
    const parsed = JSON.parse(input);
    return Delta.fromNetworkImage(parsed);
  }
}