Press n or j to go to the next uncovered block, b, p or k for the previous block.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 | 5x 5x 5x 5x 5x 5x 5x 5x 5x 5x 6x 6x 6x 6x 6x 6x 6x 3x 3x 3x 3x 3x 3x 3x 3x 6x 6x 6x 6x 3x 3x | import Debug from 'debug'; import EventEmitter from 'node:events'; import objectHash from 'object-hash'; import {Delta} from './delta'; import {RhizomeNode} from './node'; const debug = Debug('rz:deltas'); enum Decision { Accept, Reject, Defer }; export class DeltaStream { deltaStream = new EventEmitter(); deltasProposed: Delta[] = []; deltasAccepted: Delta[] = []; deltasRejected: Delta[] = []; deltasDeferred: Delta[] = []; hashesReceived = new Set<string>(); constructor(readonly rhizomeNode: RhizomeNode) {} applyPolicy(delta: Delta): Decision { return !!delta && Decision.Accept; } receiveDelta(delta: Delta) { // Deduplication: if we already received this delta, disregard it const hash = objectHash(delta); Iif (!this.hashesReceived.has(hash)) { this.hashesReceived.add(hash); this.deltasProposed.push(delta); } } ingestDelta(delta: Delta) { const decision = this.applyPolicy(delta); switch (decision) { case Decision.Accept: this.deltasAccepted.push(delta); this.deltaStream.emit('delta', delta); break; case Decision.Reject: this.deltasRejected.push(delta); break; case Decision.Defer: this.deltasDeferred.push(delta); break; } } ingestNext(): boolean { const delta = this.deltasProposed.shift(); Iif (!delta) { return false; } this.ingestDelta(delta); return true; } ingestAll() { while (this.ingestNext()); } ingestNextDeferred(): boolean { const delta = this.deltasDeferred.shift(); Iif (!delta) { return false; } this.ingestDelta(delta); return true; } ingestAllDeferred() { while (this.ingestNextDeferred()); } subscribeDeltas(fn: (delta: Delta) => void) { this.deltaStream.on('delta', (delta) => { fn(delta); }); } async publishDelta(delta: Delta) { debug(`[${this.rhizomeNode.config.peerId}]`, `Publishing delta: ${JSON.stringify(delta)}`); await this.rhizomeNode.pubSub.publish( "deltas", this.serializeDelta(delta) ); } serializeDelta(delta: Delta): string { const deltaNetworkImage = delta.toNetworkImage(); return JSON.stringify(deltaNetworkImage); } deserializeDelta(input: string): Delta { // TODO: Input validation const parsed = JSON.parse(input); return Delta.fromNetworkImage(parsed); } } |