Update some code

This commit is contained in:
Ignacio Gonzalez Chemes 2024-01-16 19:35:34 -03:00
parent 2ffff5e7d6
commit 73dd0dbdf7
1 changed files with 170 additions and 155 deletions

View File

@ -11,7 +11,7 @@ import Promises from '../lib/Promises.js'
let uid = 0
export default class Core extends EventEmitter {
constructor () {
constructor() {
super()
this.encoding = 'utf8'
this.byteorder = 'le'
@ -30,102 +30,102 @@ export default class Core extends EventEmitter {
}
// Runs a single attempt with a timeout and cleans up afterward
async runOnceSafe () {
if (this.options.debug) {
this.logger.debugEnabled = true
async runOnceSafe() {
const { debug, attemptTimeout } = this.options;
const prefix = 'Q#' + (uid++);
this.logger.prefix = prefix;
if (debug) {
this.logger.debugEnabled = true;
}
this.logger.prefix = 'Q#' + (uid++)
this.logger.debug('Starting')
this.logger.debug('Protocol: ' + this.constructor.name)
this.logger.debug('Options:', this.options)
this.logger.debug('Starting');
this.logger.debug('Protocol: ' + this.constructor.name);
this.logger.debug('Options:', this.options);
let abortCall = null
this.abortedPromise = new Promise((resolve, reject) => {
abortCall = () => reject(new Error('Query is finished -- cancelling outstanding promises'))
}).catch(() => {
// Make sure that if this promise isn't attached to, it doesn't throw a unhandled promise rejection
})
let abortCall = null;
this.abortedPromise = new Promise((_, reject) => {
abortCall = () => reject(new Error('Query is finished -- cancelling outstanding promises'));
});
let timeout
try {
const promise = this.runOnce()
timeout = Promises.createTimeout(this.options.attemptTimeout, 'Attempt')
const result = await Promise.race([promise, timeout])
this.logger.debug('Query was successful')
timeout = Promises.createTimeout(attemptTimeout, 'Attempt');
const result = await Promise.race([promise, timeout]);
this.logger.debug('Query was successful');
return result
} catch (e) {
this.logger.debug('Query failed with error', e)
throw e
} finally {
timeout && timeout.cancel()
timeout?.cancel();
try {
abortCall()
abortCall?.();
} catch (e) {
this.logger.debug('Error during abort cleanup: ' + e.stack)
this.logger.debug('Error during abort cleanup:', e.stack);
}
}
}
async runOnce () {
const options = this.options
if (('host' in options) && !('address' in options)) {
const resolved = await this.dnsResolver.resolve(options.host, options.ipFamily, this.srvRecord)
options.address = resolved.address
if (resolved.port) options.port = resolved.port
async runOnce() {
const { options, dnsResolver, shortestRTT } = this;
if ('host' in options && !('address' in options)) {
const resolved = await dnsResolver.resolve(options.host, options.ipFamily, this.srvRecord);
options.address = resolved.address;
options.port = resolved.port || options.port;
}
const state = new Results()
const state = new Results();
await this.run(state);
await this.run(state)
state.name = (state.name || '').trim();
state.connect = `${state.gameHost || options.host || options.address}:${state.gamePort || options.port}`;
state.ping = shortestRTT;
// because lots of servers prefix with spaces to try to appear first
state.name = (state.name || '').trim()
if (!('connect' in state)) {
state.connect = '' +
(state.gameHost || this.options.host || this.options.address) +
':' +
(state.gamePort || this.options.port)
}
state.ping = this.shortestRTT
delete state.gameHost
delete state.gamePort
delete state.gameHost;
delete state.gamePort;
this.logger.debug(log => {
log('Size of players array: ' + state.players.length)
log('Size of bots array: ' + state.bots.length)
})
log('Size of players array:', state.players.length);
log('Size of bots array:', state.bots.length);
});
return state
return state;
}
async run (/** Results */ state) {}
async run(/** Results */ state) { }
/** Param can be a time in ms, or a promise (which will be timed) */
registerRtt (param) {
if (param.then) {
const start = Date.now()
param.then(() => {
const end = Date.now()
const rtt = end - start
this.registerRtt(rtt)
}).catch(() => {})
} else {
this.logger.debug('Registered RTT: ' + param + 'ms')
if (this.shortestRTT === 0 || param < this.shortestRTT) {
this.shortestRTT = param
async registerRtt(param) {
const start = Date.now();
try {
if (param.then) {
await param;
const end = Date.now();
const rtt = end - start;
this.registerRtt(rtt);
} else {
this.logger.debug(`Registered RTT: ${param}ms`);
if (this.shortestRTT === 0 || param < this.shortestRTT) {
this.shortestRTT = param;
}
}
} catch (error) {
this.logger.debug('Error in promise:', error);
}
}
// utils
/** @returns {Reader} */
reader (buffer) {
reader(buffer) {
return new Reader(this, buffer)
}
translate (obj, trans) {
translate(obj, trans) {
for (const from of Object.keys(trans)) {
const to = trans[from]
if (from in obj) {
@ -135,7 +135,7 @@ export default class Core extends EventEmitter {
}
}
trueTest (str) {
trueTest(str) {
if (typeof str === 'boolean') return str
if (typeof str === 'number') return str !== 0
if (typeof str === 'string') {
@ -146,7 +146,7 @@ export default class Core extends EventEmitter {
return false
}
assertValidPort (port) {
assertValidPort(port) {
if (!port) {
throw new Error('Could not determine port to query. Did you provide a port?')
}
@ -161,55 +161,64 @@ export default class Core extends EventEmitter {
* @param {number=} port
* @returns {Promise<T>}
*/
async withTcp (fn, port) {
async withTcp(fn, port) {
this.usedTcp = true
const address = this.options.address
if (!port) port = this.options.port
const { options, logger } = this;
const address = options.address;
port = port ?? options.port;
this.assertValidPort(port)
let socket, connectionTimeout
try {
socket = net.connect(port, address)
socket.setNoDelay(true)
socket = net.connect(port, address);
socket.setNoDelay(true);
// Prevent unhandled 'error' events from dumping straight to console
socket.on('error', () => {})
socket.on('error', () => { })
this.logger.debug(log => {
this.logger.debug(address + ':' + port + ' TCP Connecting')
const writeHook = socket.write
logger.debug(log => {
log(`${address}:${port} TCP Connecting`);
const writeHook = socket.write;
socket.write = (...args) => {
log(address + ':' + port + ' TCP-->')
log(debugDump(args[0]))
writeHook.apply(socket, args)
}
socket.on('error', e => log('TCP Error:', e))
socket.on('close', () => log('TCP Closed'))
socket.on('data', (data) => {
log(address + ':' + port + ' <--TCP')
log(data)
})
socket.on('ready', () => log(address + ':' + port + ' TCP Connected'))
})
log(`${address}:${port} TCP-->`);
log(debugDump(args[0]));
writeHook.apply(socket, args);
};
socket.on('error', e => log('TCP Error:', e));
socket.on('close', () => log('TCP Closed'));
socket.on('data', data => {
log(`${address}:${port} <--TCP`);
log(data);
});
socket.on('ready', () => log(`${address}:${port} TCP Connected`));
});
const connectionPromise = new Promise((resolve, reject) => {
socket.on('ready', resolve)
socket.on('close', () => reject(new Error('TCP Connection Refused')))
})
socket.on('ready', resolve);
socket.on('close', () => reject(new Error('TCP Connection Refused')));
});
this.registerRtt(connectionPromise)
connectionTimeout = Promises.createTimeout(this.options.socketTimeout, 'TCP Opening')
await Promise.race([
connectionPromise,
connectionTimeout,
this.abortedPromise
])
return await fn(socket)
await Promise.race([connectionPromise, connectionTimeout, this.abortedPromise]);
const result = await fn(socket);
await this.closeTcpConnection(socket);
return result;
} finally {
socket && socket.destroy()
connectionTimeout && connectionTimeout.cancel()
connectionTimeout?.cancel();
}
}
async closeTcpConnection(socket) {
socket.end();
await Promises.createTimeout(1000);
socket.destroy();
}
/**
* @template T
* @param {NodeJS.Socket} socket
@ -217,7 +226,7 @@ export default class Core extends EventEmitter {
* @param {function(Buffer):T} ondata
* @returns Promise<T>
*/
async tcpSend (socket, buffer, ondata) {
async tcpSend(socket, buffer, ondata) {
let timeout
try {
const promise = new Promise((resolve, reject) => {
@ -247,103 +256,109 @@ export default class Core extends EventEmitter {
* @returns Promise<T>
* @template T
*/
async udpSend (buffer, onPacket, onTimeout) {
const address = this.options.address
const port = this.options.port
async udpSend(buffer, onPacket, onTimeout) {
const { address, port, debug, socketTimeout } = this.options;
this.assertValidPort(port)
if (typeof buffer === 'string') buffer = Buffer.from(buffer, 'binary')
const socket = this.udpSocket
await socket.send(buffer, address, port, this.options.debug)
const socket = this.udpSocket;
await socket.send(buffer, address, port, debug);
if (!onPacket && !onTimeout) {
return null
return null;
}
let socketCallback
let timeout
let socketCallback;
let timeout;
try {
const promise = new Promise((resolve, reject) => {
const start = Date.now()
let end = null
const start = Date.now();
socketCallback = (fromAddress, fromPort, buffer) => {
try {
if (fromAddress !== address) return
if (fromPort !== port) return
if (end === null) {
end = Date.now()
const rtt = end - start
this.registerRtt(rtt)
}
const result = onPacket(buffer)
if (fromAddress !== address || fromPort !== port) return;
const end = Date.now();
const rtt = end - start;
this.registerRtt(rtt);
const result = onPacket(buffer);
if (result !== undefined) {
this.logger.debug('UDP send finished by callback')
resolve(result)
this.logger.debug('UDP send finished by callback');
resolve(result);
}
} catch (e) {
reject(e)
reject(e);
}
};
socket.addCallback(socketCallback, debug);
});
timeout = Promises.createTimeout(socketTimeout, 'UDP');
const wrappedTimeout = Promise.resolve(timeout).catch((e) => {
this.logger.debug('UDP timeout detected');
if (onTimeout) {
try {
const result = onTimeout();
if (result !== undefined) {
this.logger.debug('UDP timeout resolved by callback');
return result;
}
} catch (e) {
throw e;
}
}
socket.addCallback(socketCallback, this.options.debug)
})
timeout = Promises.createTimeout(this.options.socketTimeout, 'UDP')
const wrappedTimeout = new Promise((resolve, reject) => {
timeout.catch((e) => {
this.logger.debug('UDP timeout detected')
if (onTimeout) {
try {
const result = onTimeout()
if (result !== undefined) {
this.logger.debug('UDP timeout resolved by callback')
resolve(result)
return
}
} catch (e) {
reject(e)
}
}
reject(e)
})
})
return await Promise.race([promise, wrappedTimeout, this.abortedPromise])
throw e;
});
return await Promise.race([promise, wrappedTimeout, this.abortedPromise]);
} finally {
timeout && timeout.cancel()
socketCallback && socket.removeCallback(socketCallback)
timeout && timeout.cancel();
socketCallback && socket.removeCallback(socketCallback);
}
}
async tcpPing () {
async tcpPing() {
// This will give a much more accurate RTT than using the rtt of an http request.
if (!this.usedTcp) {
await this.withTcp(() => {})
await this.withTcp(() => { })
}
}
async request (params) {
async request(params) {
await this.tcpPing()
let requestPromise
let requestPromise;
try {
requestPromise = got({
...params,
timeout: {
request: this.options.socketTimeout
}
})
request: this.options.socketTimeout,
},
});
this.logger.debug(log => {
log(() => params.url + ' HTTP-->')
log(() => params.url + ' HTTP-->');
requestPromise
.then((response) => log(params.url + ' <--HTTP ' + response.statusCode))
.catch(() => {})
})
.then(response => log(params.url + ' <--HTTP ' + response.statusCode))
.catch(() => { });
});
const wrappedPromise = requestPromise.then(response => {
if (response.statusCode !== 200) throw new Error('Bad status code: ' + response.statusCode)
return response.body
})
return await Promise.race([wrappedPromise, this.abortedPromise])
if (response.statusCode !== 200) {
throw new Error('Bad status code: ' + response.statusCode);
}
return response.body;
});
return await Promise.race([wrappedPromise, this.abortedPromise]);
} finally {
requestPromise && requestPromise.cancel()
requestPromise?.cancel();
}
}
}