Press n or j to go to the next uncovered block, b, p or k for the previous block.
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 | 5x 5x 5x 5x 5x 2x 2x 2x 2x 2x 2x 2x 2x 2x 2x 2x 2x 2x 5x 2x 2x 2x 2x 2x 2x 2x 2x 5x 6x 6x 6x 6x 3x 3x 3x 7x 2x 2x 2x 6x 2x 2x 2x 2x 3x 3x 3x 3x 3x | import Debug from 'debug';
import {EventEmitter} from 'node:events';
import {Message, Reply, Request} from 'zeromq';
import {RhizomeNode} from './node';
import {PeerAddress, RequestMethods} from './peers';
const debug = Debug('rz:request-reply');
export type PeerRequest = {
method: RequestMethods;
};
export type RequestHandler = (req: PeerRequest, res: ResponseSocket) => void;
export class RequestSocket {
sock?: Request;
addrStr: string;
constructor(readonly requestReply: RequestReply, addr: PeerAddress) {
this.addrStr = `tcp://${addr.addr}:${addr.port}`;
this.sock = new Request();
this.sock.connect(this.addrStr);
debug(`[${this.requestReply.rhizomeNode.config.peerId}]`, `Request socket connecting to ${this.addrStr}`);
}
async request(method: RequestMethods): Promise<Message> {
Iif (!this.sock) throw new Error('Request socket is undefined');
const req: PeerRequest = {
method
};
await this.sock.send(JSON.stringify(req));
// Wait for a response.
// TODO: Timeout
// TODO: Retry
// this.sock.receiveTimeout = ...
const [res] = await this.sock.receive();
return res;
}
close() {
this.sock?.close();
// Make sure it goes out of scope
this.sock = undefined;
debug(`[${this.requestReply.rhizomeNode.config.peerId}]`, 'Request socket closed');
}
}
export class ResponseSocket {
constructor(readonly sock: Reply) {}
async send(msg: object | string) {
Iif (typeof msg === 'object') {
msg = JSON.stringify(msg);
}
await this.sock.send(msg);
}
}
function peerRequestFromMsg(msg: Message): PeerRequest | null {
let req: PeerRequest | null = null;
try {
const obj = JSON.parse(msg.toString());
req = {...obj};
} catch (e) {
console.error('error receiving command', e);
}
return req;
}
export class RequestReply {
rhizomeNode: RhizomeNode;
replySock?: Reply;
requestStream = new EventEmitter();
requestBindAddrStr: string;
constructor(rhizomeNode: RhizomeNode) {
this.rhizomeNode = rhizomeNode;
const {requestBindAddr, requestBindPort} = this.rhizomeNode.config;
this.requestBindAddrStr = `tcp://${requestBindAddr}:${requestBindPort}`;
}
// Listen for incoming requests
async start() {
this.replySock = new Reply();
await this.replySock.bind(this.requestBindAddrStr);
debug(`[${this.rhizomeNode.config.peerId}]`, `Reply socket bound to ${this.requestBindAddrStr}`);
for await (const [msg] of this.replySock) {
debug(`[${this.rhizomeNode.config.peerId}]`, `Received message`, {msg: msg.toString()});
const req = peerRequestFromMsg(msg);
this.requestStream.emit('request', req);
}
}
// Add a top level handler for incoming requests.
// Each handler will get a copy of every message.
registerRequestHandler(handler: RequestHandler) {
this.requestStream.on('request', (req) => {
if (this.replySock) {
const res = new ResponseSocket(this.replySock);
handler(req, res);
}
});
}
createRequestSocket(addr: PeerAddress) {
return new RequestSocket(this, addr);
}
async stop() {
if (this.replySock) {
await this.replySock.unbind(this.requestBindAddrStr);
this.replySock.close();
this.replySock = undefined;
debug(`[${this.rhizomeNode.config.peerId}]`, 'Reply socket closed');
}
}
}
|