All files / src request-reply.ts

94.54% Statements 52/55
62.5% Branches 5/8
100% Functions 12/12
96.07% Lines 49/51

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 1195x 5x 5x     5x               5x       2x 2x 2x 2x 2x       2x 2x     2x         2x 2x       2x   2x 2x       5x 2x     2x     2x         2x 2x 2x 2x       2x     5x     6x       6x 6x 6x         3x   3x 3x   7x 2x 2x 2x             6x 2x 2x 2x           2x       3x 3x 3x 3x 3x        
import Debug from 'debug';
import {EventEmitter} from 'node:events';
import {Message, Reply, Request} from 'zeromq';
import {RhizomeNode} from './node';
import {PeerAddress, RequestMethods} from './peers';
const debug = Debug('rz:request-reply');
 
export type PeerRequest = {
  method: RequestMethods;
};
 
export type RequestHandler = (req: PeerRequest, res: ResponseSocket) => void;
 
export class RequestSocket {
  sock?: Request;
  addrStr: string;
 
  constructor(readonly requestReply: RequestReply, addr: PeerAddress) {
    this.addrStr = `tcp://${addr.addr}:${addr.port}`;
    this.sock = new Request();
    this.sock.connect(this.addrStr);
    debug(`[${this.requestReply.rhizomeNode.config.peerId}]`, `Request socket connecting to ${this.addrStr}`);
  }
 
  async request(method: RequestMethods): Promise<Message> {
    Iif (!this.sock) throw new Error('Request socket is undefined');
    const req: PeerRequest = {
      method
    };
    await this.sock.send(JSON.stringify(req));
    // Wait for a response.
    // TODO: Timeout
    // TODO: Retry
    // this.sock.receiveTimeout = ...
    const [res] = await this.sock.receive();
    return res;
  }
 
  close() {
    this.sock?.close();
    // Make sure it goes out of scope
    this.sock = undefined;
    debug(`[${this.requestReply.rhizomeNode.config.peerId}]`, 'Request socket closed');
  }
}
 
export class ResponseSocket {
  constructor(readonly sock: Reply) {}
 
  async send(msg: object | string) {
    Iif (typeof msg === 'object') {
      msg = JSON.stringify(msg);
    }
    await this.sock.send(msg);
  }
}
 
function peerRequestFromMsg(msg: Message): PeerRequest | null {
  let req: PeerRequest | null = null;
  try {
    const obj = JSON.parse(msg.toString());
    req = {...obj};
  } catch (e) {
    console.error('error receiving command', e);
  }
  return req;
}
 
export class RequestReply {
  rhizomeNode: RhizomeNode;
  replySock?: Reply;
  requestStream = new EventEmitter();
  requestBindAddrStr: string;
 
  constructor(rhizomeNode: RhizomeNode) {
    this.rhizomeNode = rhizomeNode;
    const {requestBindAddr, requestBindPort} = this.rhizomeNode.config;
    this.requestBindAddrStr = `tcp://${requestBindAddr}:${requestBindPort}`;
  }
 
  // Listen for incoming requests
  async start() {
    this.replySock = new Reply();
 
    await this.replySock.bind(this.requestBindAddrStr);
    debug(`[${this.rhizomeNode.config.peerId}]`, `Reply socket bound to ${this.requestBindAddrStr}`);
 
    for await (const [msg] of this.replySock) {
      debug(`[${this.rhizomeNode.config.peerId}]`, `Received message`, {msg: msg.toString()});
      const req = peerRequestFromMsg(msg);
      this.requestStream.emit('request', req);
    }
  }
 
  // Add a top level handler for incoming requests.
  // Each handler will get a copy of every message.
  registerRequestHandler(handler: RequestHandler) {
    this.requestStream.on('request', (req) => {
      if (this.replySock) {
        const res = new ResponseSocket(this.replySock);
        handler(req, res);
      }
    });
  }
 
  createRequestSocket(addr: PeerAddress) {
    return new RequestSocket(this, addr);
  }
 
  async stop() {
    if (this.replySock) {
      await this.replySock.unbind(this.requestBindAddrStr);
      this.replySock.close();
      this.replySock = undefined;
      debug(`[${this.rhizomeNode.config.peerId}]`, 'Reply socket closed');
    }
  }
}