Press n or j to go to the next uncovered block, b, p or k for the previous block.
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 | 5x 5x 5x 5x 5x 5x 5x 5x 5x 5x 6x 6x 6x 6x 6x 6x 6x 3x 3x 3x 3x 3x 3x 3x 3x 6x 6x 6x 6x 3x 3x | import Debug from 'debug';
import EventEmitter from 'node:events';
import objectHash from 'object-hash';
import {Delta} from './delta';
import {RhizomeNode} from './node';
const debug = Debug('rz:deltas');
enum Decision {
Accept,
Reject,
Defer
};
export class DeltaStream {
deltaStream = new EventEmitter();
deltasProposed: Delta[] = [];
deltasAccepted: Delta[] = [];
deltasRejected: Delta[] = [];
deltasDeferred: Delta[] = [];
hashesReceived = new Set<string>();
constructor(readonly rhizomeNode: RhizomeNode) {}
applyPolicy(delta: Delta): Decision {
return !!delta && Decision.Accept;
}
receiveDelta(delta: Delta) {
// Deduplication: if we already received this delta, disregard it
const hash = objectHash(delta);
Iif (!this.hashesReceived.has(hash)) {
this.hashesReceived.add(hash);
this.deltasProposed.push(delta);
}
}
ingestDelta(delta: Delta) {
const decision = this.applyPolicy(delta);
switch (decision) {
case Decision.Accept:
this.deltasAccepted.push(delta);
this.deltaStream.emit('delta', delta);
break;
case Decision.Reject:
this.deltasRejected.push(delta);
break;
case Decision.Defer:
this.deltasDeferred.push(delta);
break;
}
}
ingestNext(): boolean {
const delta = this.deltasProposed.shift();
Iif (!delta) {
return false;
}
this.ingestDelta(delta);
return true;
}
ingestAll() {
while (this.ingestNext());
}
ingestNextDeferred(): boolean {
const delta = this.deltasDeferred.shift();
Iif (!delta) {
return false;
}
this.ingestDelta(delta);
return true;
}
ingestAllDeferred() {
while (this.ingestNextDeferred());
}
subscribeDeltas(fn: (delta: Delta) => void) {
this.deltaStream.on('delta', (delta) => {
fn(delta);
});
}
async publishDelta(delta: Delta) {
debug(`[${this.rhizomeNode.config.peerId}]`, `Publishing delta: ${JSON.stringify(delta)}`);
await this.rhizomeNode.pubSub.publish(
"deltas",
this.serializeDelta(delta)
);
}
serializeDelta(delta: Delta): string {
const deltaNetworkImage = delta.toNetworkImage();
return JSON.stringify(deltaNetworkImage);
}
deserializeDelta(input: string): Delta {
// TODO: Input validation
const parsed = JSON.parse(input);
return Delta.fromNetworkImage(parsed);
}
}
|