diff --git a/protocols/core.js b/protocols/core.js index e717dee..063ce93 100644 --- a/protocols/core.js +++ b/protocols/core.js @@ -31,7 +31,9 @@ export default class Core extends EventEmitter { // Runs a single attempt with a timeout and cleans up afterward async runOnceSafe () { - if (this.options.debug) { + const { debug, attemptTimeout } = this.options + + if (debug) { this.logger.debugEnabled = true } this.logger.prefix = 'Q#' + (uid++) @@ -41,7 +43,7 @@ export default class Core extends EventEmitter { this.logger.debug('Options:', this.options) let abortCall = null - this.abortedPromise = new Promise((resolve, reject) => { + this.abortedPromise = new Promise((_resolve, reject) => { abortCall = () => reject(new Error('Query is finished -- cancelling outstanding promises')) }).catch(() => { // Make sure that if this promise isn't attached to, it doesn't throw a unhandled promise rejection @@ -50,7 +52,7 @@ export default class Core extends EventEmitter { let timeout try { const promise = this.runOnce() - timeout = Promises.createTimeout(this.options.attemptTimeout, 'Attempt') + timeout = Promises.createTimeout(attemptTimeout, 'Attempt') const result = await Promise.race([promise, timeout]) this.logger.debug('Query was successful') return result @@ -58,9 +60,9 @@ export default class Core extends EventEmitter { this.logger.debug('Query failed with error', e) throw e } finally { - timeout && timeout.cancel() + timeout?.cancel() try { - abortCall() + abortCall?.() } catch (e) { this.logger.debug('Error during abort cleanup: ' + e.stack) } @@ -68,34 +70,29 @@ export default class Core extends EventEmitter { } async runOnce () { - const options = this.options + const { options, dnsResolver, shortestRTT } = this + if (('host' in options) && !('address' in options)) { - const resolved = await this.dnsResolver.resolve(options.host, options.ipFamily, this.srvRecord) + const resolved = await dnsResolver.resolve(options.host, options.ipFamily, this.srvRecord) options.address = resolved.address - if (resolved.port) options.port = resolved.port + options.port ||= resolved.port } const state = new Results() - await this.run(state) - state.queryPort = options.port + state.queryPort = options.port // because lots of servers prefix with spaces to try to appear first state.name = (state.name || '').trim() + state.connect = `${state.gameHost || options.host || options.address}:${state.gamePort || options.port}` + state.ping = shortestRTT - if (!('connect' in state)) { - state.connect = '' + - (state.gameHost || this.options.host || this.options.address) + - ':' + - (state.gamePort || this.options.port) - } - state.ping = this.shortestRTT delete state.gameHost delete state.gamePort this.logger.debug(log => { - log('Size of players array: ' + state.players.length) - log('Size of bots array: ' + state.bots.length) + log('Size of players array:', state.players.length) + log('Size of bots array:', state.bots.length) }) return state @@ -104,19 +101,21 @@ export default class Core extends EventEmitter { async run (/** Results */ state) {} /** Param can be a time in ms, or a promise (which will be timed) */ - registerRtt (param) { - if (param.then) { - const start = Date.now() - param.then(() => { - const end = Date.now() - const rtt = end - start - this.registerRtt(rtt) - }).catch(() => {}) - } else { - this.logger.debug('Registered RTT: ' + param + 'ms') - if (this.shortestRTT === 0 || param < this.shortestRTT) { - this.shortestRTT = param + async registerRtt (param) { + const start = Date.now() + + try { + if (param.then) { + await param + await this.registerRtt(Date.now() - start) + } else { + this.logger.debug('Registered RTT: ' + param + 'ms') + if (this.shortestRTT === 0 || param < this.shortestRTT) { + this.shortestRTT = param + } } + } catch (error) { + this.logger.debug('Error in promise:', error) } } @@ -164,8 +163,10 @@ export default class Core extends EventEmitter { */ async withTcp (fn, port) { this.usedTcp = true + const { options, logger } = this const address = this.options.address - if (!port) port = this.options.port + port ??= options.port + this.assertValidPort(port) let socket, connectionTimeout @@ -176,28 +177,29 @@ export default class Core extends EventEmitter { // Prevent unhandled 'error' events from dumping straight to console socket.on('error', () => {}) - this.logger.debug(log => { - this.logger.debug(address + ':' + port + ' TCP Connecting') + logger.debug(log => { + logger.debug(address + ':' + port + ' TCP Connecting') const writeHook = socket.write socket.write = (...args) => { log(address + ':' + port + ' TCP-->') log(debugDump(args[0])) writeHook.apply(socket, args) } + socket.on('error', e => log('TCP Error:', e)) socket.on('close', () => log('TCP Closed')) socket.on('data', (data) => { - log(address + ':' + port + ' <--TCP') + log(`${address}:${port} <--TCP`) log(data) }) - socket.on('ready', () => log(address + ':' + port + ' TCP Connected')) + socket.on('ready', () => log(`${address}:${port} TCP Connected`)) }) const connectionPromise = new Promise((resolve, reject) => { socket.on('ready', resolve) socket.on('close', () => reject(new Error('TCP Connection Refused'))) }) - this.registerRtt(connectionPromise) + await this.registerRtt(connectionPromise) connectionTimeout = Promises.createTimeout(this.options.socketTimeout, 'TCP Opening') await Promise.race([ connectionPromise, @@ -206,8 +208,8 @@ export default class Core extends EventEmitter { ]) return await fn(socket) } finally { - socket && socket.destroy() - connectionTimeout && connectionTimeout.cancel() + socket?.destroy() + connectionTimeout?.cancel() } } @@ -221,7 +223,7 @@ export default class Core extends EventEmitter { async tcpSend (socket, buffer, ondata) { let timeout try { - const promise = new Promise((resolve, reject) => { + const promise = new Promise((resolve, _reject) => { let received = Buffer.from([]) const onData = (data) => { received = Buffer.concat([received, data]) @@ -249,8 +251,7 @@ export default class Core extends EventEmitter { * @template T */ async udpSend (buffer, onPacket, onTimeout) { - const address = this.options.address - const port = this.options.port + const { address, port, debug, socketTimeout } = this.options this.assertValidPort(port) if (typeof buffer === 'string') buffer = Buffer.from(buffer, 'binary') @@ -264,19 +265,14 @@ export default class Core extends EventEmitter { let socketCallback let timeout + try { const promise = new Promise((resolve, reject) => { const start = Date.now() - let end = null socketCallback = (fromAddress, fromPort, buffer) => { try { - if (fromAddress !== address) return - if (fromPort !== port) return - if (end === null) { - end = Date.now() - const rtt = end - start - this.registerRtt(rtt) - } + if (fromAddress !== address || fromPort !== port) return + this.registerRtt(Date.now() - start) const result = onPacket(buffer) if (result !== undefined) { this.logger.debug('UDP send finished by callback') @@ -286,30 +282,24 @@ export default class Core extends EventEmitter { reject(e) } } - socket.addCallback(socketCallback, this.options.debug) + socket.addCallback(socketCallback, debug) }) - timeout = Promises.createTimeout(this.options.socketTimeout, 'UDP') - const wrappedTimeout = new Promise((resolve, reject) => { - timeout.catch((e) => { - this.logger.debug('UDP timeout detected') - if (onTimeout) { - try { - const result = onTimeout() - if (result !== undefined) { - this.logger.debug('UDP timeout resolved by callback') - resolve(result) - return - } - } catch (e) { - reject(e) - } + timeout = Promises.createTimeout(socketTimeout, 'UDP') + const wrappedTimeout = Promise.resolve(timeout).catch((e) => { + this.logger.debug('UDP timeout detected') + if (onTimeout) { + const result = onTimeout() + if (result !== undefined) { + this.logger.debug('UDP timeout resolved by callback') + return result } - reject(e) - }) + } + throw e }) + return await Promise.race([promise, wrappedTimeout, this.abortedPromise]) } finally { - timeout && timeout.cancel() + timeout?.cancel() socketCallback && socket.removeCallback(socketCallback) } } @@ -344,7 +334,7 @@ export default class Core extends EventEmitter { }) return await Promise.race([wrappedPromise, this.abortedPromise]) } finally { - requestPromise && requestPromise.cancel() + requestPromise?.cancel() } } }