Press n or j to go to the next uncovered block, b, p or k for the previous block.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 | 5x 5x 5x 5x 5x 2x 2x 2x 2x 2x 2x 2x 2x 2x 2x 2x 2x 2x 5x 2x 2x 2x 2x 2x 2x 2x 2x 5x 6x 6x 6x 6x 3x 3x 3x 7x 2x 2x 2x 6x 2x 2x 2x 2x 3x 3x 3x 3x 3x | import Debug from 'debug'; import {EventEmitter} from 'node:events'; import {Message, Reply, Request} from 'zeromq'; import {RhizomeNode} from './node'; import {PeerAddress, RequestMethods} from './peers'; const debug = Debug('rz:request-reply'); export type PeerRequest = { method: RequestMethods; }; export type RequestHandler = (req: PeerRequest, res: ResponseSocket) => void; export class RequestSocket { sock?: Request; addrStr: string; constructor(readonly requestReply: RequestReply, addr: PeerAddress) { this.addrStr = `tcp://${addr.addr}:${addr.port}`; this.sock = new Request(); this.sock.connect(this.addrStr); debug(`[${this.requestReply.rhizomeNode.config.peerId}]`, `Request socket connecting to ${this.addrStr}`); } async request(method: RequestMethods): Promise<Message> { Iif (!this.sock) throw new Error('Request socket is undefined'); const req: PeerRequest = { method }; await this.sock.send(JSON.stringify(req)); // Wait for a response. // TODO: Timeout // TODO: Retry // this.sock.receiveTimeout = ... const [res] = await this.sock.receive(); return res; } close() { this.sock?.close(); // Make sure it goes out of scope this.sock = undefined; debug(`[${this.requestReply.rhizomeNode.config.peerId}]`, 'Request socket closed'); } } export class ResponseSocket { constructor(readonly sock: Reply) {} async send(msg: object | string) { Iif (typeof msg === 'object') { msg = JSON.stringify(msg); } await this.sock.send(msg); } } function peerRequestFromMsg(msg: Message): PeerRequest | null { let req: PeerRequest | null = null; try { const obj = JSON.parse(msg.toString()); req = {...obj}; } catch (e) { console.error('error receiving command', e); } return req; } export class RequestReply { rhizomeNode: RhizomeNode; replySock?: Reply; requestStream = new EventEmitter(); requestBindAddrStr: string; constructor(rhizomeNode: RhizomeNode) { this.rhizomeNode = rhizomeNode; const {requestBindAddr, requestBindPort} = this.rhizomeNode.config; this.requestBindAddrStr = `tcp://${requestBindAddr}:${requestBindPort}`; } // Listen for incoming requests async start() { this.replySock = new Reply(); await this.replySock.bind(this.requestBindAddrStr); debug(`[${this.rhizomeNode.config.peerId}]`, `Reply socket bound to ${this.requestBindAddrStr}`); for await (const [msg] of this.replySock) { debug(`[${this.rhizomeNode.config.peerId}]`, `Received message`, {msg: msg.toString()}); const req = peerRequestFromMsg(msg); this.requestStream.emit('request', req); } } // Add a top level handler for incoming requests. // Each handler will get a copy of every message. registerRequestHandler(handler: RequestHandler) { this.requestStream.on('request', (req) => { if (this.replySock) { const res = new ResponseSocket(this.replySock); handler(req, res); } }); } createRequestSocket(addr: PeerAddress) { return new RequestSocket(this, addr); } async stop() { if (this.replySock) { await this.replySock.unbind(this.requestBindAddrStr); this.replySock.close(); this.replySock = undefined; debug(`[${this.rhizomeNode.config.peerId}]`, 'Reply socket closed'); } } } |