Press n or j to go to the next uncovered block, b, p or k for the previous block.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 | 5x 5x 5x 5x 5x 2x 2x 2x 2x 2x 2x 2x 2x 2x 8x 3x 3x 3x 3x 2x 5x 6x 6x 6x 6x 3x 3x 3x 6x 6x 6x 2x 2x 2x 3x 3x 3x 3x 3x 2x | import Debug from 'debug'; import {Publisher, Subscriber} from 'zeromq'; import {RhizomeNode} from './node'; import {PeerAddress} from './peers'; const debug = Debug('rz:pub-sub'); export type SubscribedMessageHandler = (sender: PeerAddress, msg: string) => void; // TODO: Allow subscribing to multiple topics on one socket export class Subscription { sock = new Subscriber(); topic: string; publishAddr: PeerAddress; publishAddrStr: string; cb: SubscribedMessageHandler; constructor( readonly pubSub: PubSub, publishAddr: PeerAddress, topic: string, cb: SubscribedMessageHandler, ) { this.cb = cb; this.topic = topic; this.publishAddr = publishAddr; this.publishAddrStr = `tcp://${this.publishAddr.toAddrString()}`; } async start() { this.sock.connect(this.publishAddrStr); this.sock.subscribe(this.topic); debug(`[${this.pubSub.rhizomeNode.config.peerId}]`, `Subscribing to ${this.topic} topic on ZeroMQ ${this.publishAddrStr}`); // Wait for ZeroMQ messages. // This will block indefinitely. for await (const [, sender, msg] of this.sock) { const senderStr = PeerAddress.fromString(sender.toString()); const msgStr = msg.toString(); debug(`[${this.pubSub.rhizomeNode.config.peerId}]`, `ZeroMQ subscribtion received msg: ${msgStr}`); this.cb(senderStr, msgStr); } debug(`[${this.pubSub.rhizomeNode.config.peerId}]`, `Done waiting for subscription socket for topic ${this.topic}`); } } export class PubSub { rhizomeNode: RhizomeNode; publishSock?: Publisher; publishAddrStr: string; subscriptions: Subscription[] = []; constructor(rhizomeNode: RhizomeNode) { this.rhizomeNode = rhizomeNode; const {publishBindAddr, publishBindPort} = this.rhizomeNode.config; this.publishAddrStr = `tcp://${publishBindAddr}:${publishBindPort}`; } async startZmq() { this.publishSock = new Publisher(); await this.publishSock.bind(this.publishAddrStr); debug(`[${this.rhizomeNode.config.peerId}]`, `ZeroMQ publishing socket bound to ${this.publishAddrStr}`); } async publish(topic: string, msg: string) { if (this.publishSock) { await this.publishSock.send([ topic, this.rhizomeNode.myRequestAddr.toAddrString(), msg ]); debug(`[${this.rhizomeNode.config.peerId}]`, `Published to ZeroMQ, msg: ${msg}`); } } subscribe( publishAddr: PeerAddress, topic: string, cb: SubscribedMessageHandler ): Subscription { const subscription = new Subscription(this, publishAddr, topic, cb); this.subscriptions.push(subscription); return subscription; } async stop() { if (this.publishSock) { await this.publishSock.unbind(this.publishAddrStr); this.publishSock.close(); // Free the memory by taking the old object out of scope. this.publishSock = undefined; } for (const subscription of this.subscriptions) { subscription.sock.close(); } } } |