All files / src pub-sub.ts

100% Statements 43/43
100% Branches 2/2
100% Functions 7/7
100% Lines 40/40

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 1015x 5x   5x 5x         5x 2x             2x         2x 2x 2x 2x       2x 2x 2x       8x 3x 3x 3x 3x     2x       5x       6x     6x   6x 6x       3x   3x 3x       6x 6x         6x                 2x 2x 2x       3x 3x 3x   3x     3x 2x        
import Debug from 'debug';
import {Publisher, Subscriber} from 'zeromq';
import {RhizomeNode} from './node';
import {PeerAddress} from './peers';
const debug = Debug('rz:pub-sub');
 
export type SubscribedMessageHandler = (sender: PeerAddress, msg: string) => void;
 
// TODO: Allow subscribing to multiple topics on one socket
export class Subscription {
  sock = new Subscriber();
  topic: string;
  publishAddr: PeerAddress;
  publishAddrStr: string;
  cb: SubscribedMessageHandler;
 
  constructor(
    readonly pubSub: PubSub,
    publishAddr: PeerAddress,
    topic: string,
    cb: SubscribedMessageHandler,
  ) {
    this.cb = cb;
    this.topic = topic;
    this.publishAddr = publishAddr;
    this.publishAddrStr = `tcp://${this.publishAddr.toAddrString()}`;
  }
 
  async start() {
    this.sock.connect(this.publishAddrStr);
    this.sock.subscribe(this.topic);
    debug(`[${this.pubSub.rhizomeNode.config.peerId}]`, `Subscribing to ${this.topic} topic on ZeroMQ ${this.publishAddrStr}`);
 
    // Wait for ZeroMQ messages.
    // This will block indefinitely.
    for await (const [, sender, msg] of this.sock) {
      const senderStr = PeerAddress.fromString(sender.toString());
      const msgStr = msg.toString();
      debug(`[${this.pubSub.rhizomeNode.config.peerId}]`, `ZeroMQ subscribtion received msg: ${msgStr}`);
      this.cb(senderStr, msgStr);
    }
 
    debug(`[${this.pubSub.rhizomeNode.config.peerId}]`, `Done waiting for subscription socket for topic ${this.topic}`);
  }
}
 
export class PubSub {
  rhizomeNode: RhizomeNode;
  publishSock?: Publisher;
  publishAddrStr: string;
  subscriptions: Subscription[] = [];
 
  constructor(rhizomeNode: RhizomeNode) {
    this.rhizomeNode = rhizomeNode;
 
    const {publishBindAddr, publishBindPort} = this.rhizomeNode.config;
    this.publishAddrStr = `tcp://${publishBindAddr}:${publishBindPort}`;
  }
 
  async startZmq() {
    this.publishSock = new Publisher();
 
    await this.publishSock.bind(this.publishAddrStr);
    debug(`[${this.rhizomeNode.config.peerId}]`, `ZeroMQ publishing socket bound to ${this.publishAddrStr}`);
  }
 
  async publish(topic: string, msg: string) {
    if (this.publishSock) {
      await this.publishSock.send([
        topic,
        this.rhizomeNode.myRequestAddr.toAddrString(),
        msg
      ]);
      debug(`[${this.rhizomeNode.config.peerId}]`, `Published to ZeroMQ, msg: ${msg}`);
    }
  }
 
  subscribe(
    publishAddr: PeerAddress,
    topic: string,
    cb: SubscribedMessageHandler
  ): Subscription {
    const subscription = new Subscription(this, publishAddr, topic, cb);
    this.subscriptions.push(subscription);
    return subscription;
  }
 
  async stop() {
    if (this.publishSock) {
      await this.publishSock.unbind(this.publishAddrStr);
      this.publishSock.close();
      // Free the memory by taking the old object out of scope.
      this.publishSock = undefined;
    }
 
    for (const subscription of this.subscriptions) {
      subscription.sock.close();
    }
  }
}