perf: revise core class and make it cleaner (#548)
* feat: rework core protocol a bit * feat: transform some strings to literals * feat: check if a param is a promise with instanceof * fix: shortest RTT not registering properly * feat: use another optional chaining * fix: grammatic error in comment * docs: add CHANGELOG line * fix: move a start time indicator closer to the source
This commit is contained in:
parent
6bfc3426e7
commit
6b85c6bc0b
|
@ -9,6 +9,7 @@
|
||||||
* Assetto Corsa - Fixed how `state.numplayers` is set (By @podrivo #538)
|
* Assetto Corsa - Fixed how `state.numplayers` is set (By @podrivo #538)
|
||||||
* TeamSpeak 2 - Fixed how `state.name` is set (By @podrivo #544)
|
* TeamSpeak 2 - Fixed how `state.name` is set (By @podrivo #544)
|
||||||
* Grand Theft Auto: San Andreas OpenMP - Fixed `state.players` returning an empty array (By @Focus04 #547)
|
* Grand Theft Auto: San Andreas OpenMP - Fixed `state.players` returning an empty array (By @Focus04 #547)
|
||||||
|
* Perf: Re-write of the `core` class.
|
||||||
|
|
||||||
## 5.0.0-beta.2
|
## 5.0.0-beta.2
|
||||||
* Fixed support for projects using `require`.
|
* Fixed support for projects using `require`.
|
||||||
|
|
|
@ -31,7 +31,9 @@ export default class Core extends EventEmitter {
|
||||||
|
|
||||||
// Runs a single attempt with a timeout and cleans up afterward
|
// Runs a single attempt with a timeout and cleans up afterward
|
||||||
async runOnceSafe () {
|
async runOnceSafe () {
|
||||||
if (this.options.debug) {
|
const { debug, attemptTimeout } = this.options
|
||||||
|
|
||||||
|
if (debug) {
|
||||||
this.logger.debugEnabled = true
|
this.logger.debugEnabled = true
|
||||||
}
|
}
|
||||||
this.logger.prefix = 'Q#' + (uid++)
|
this.logger.prefix = 'Q#' + (uid++)
|
||||||
|
@ -41,16 +43,16 @@ export default class Core extends EventEmitter {
|
||||||
this.logger.debug('Options:', this.options)
|
this.logger.debug('Options:', this.options)
|
||||||
|
|
||||||
let abortCall = null
|
let abortCall = null
|
||||||
this.abortedPromise = new Promise((resolve, reject) => {
|
this.abortedPromise = new Promise((_resolve, reject) => {
|
||||||
abortCall = () => reject(new Error('Query is finished -- cancelling outstanding promises'))
|
abortCall = () => reject(new Error('Query is finished -- cancelling outstanding promises'))
|
||||||
}).catch(() => {
|
}).catch(() => {
|
||||||
// Make sure that if this promise isn't attached to, it doesn't throw a unhandled promise rejection
|
// Make sure that if this promise isn't attached to, it doesn't throw an unhandled promise rejection
|
||||||
})
|
})
|
||||||
|
|
||||||
let timeout
|
let timeout
|
||||||
try {
|
try {
|
||||||
const promise = this.runOnce()
|
const promise = this.runOnce()
|
||||||
timeout = Promises.createTimeout(this.options.attemptTimeout, 'Attempt')
|
timeout = Promises.createTimeout(attemptTimeout, 'Attempt')
|
||||||
const result = await Promise.race([promise, timeout])
|
const result = await Promise.race([promise, timeout])
|
||||||
this.logger.debug('Query was successful')
|
this.logger.debug('Query was successful')
|
||||||
return result
|
return result
|
||||||
|
@ -58,9 +60,9 @@ export default class Core extends EventEmitter {
|
||||||
this.logger.debug('Query failed with error', e)
|
this.logger.debug('Query failed with error', e)
|
||||||
throw e
|
throw e
|
||||||
} finally {
|
} finally {
|
||||||
timeout && timeout.cancel()
|
timeout?.cancel()
|
||||||
try {
|
try {
|
||||||
abortCall()
|
abortCall?.()
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
this.logger.debug('Error during abort cleanup: ' + e.stack)
|
this.logger.debug('Error during abort cleanup: ' + e.stack)
|
||||||
}
|
}
|
||||||
|
@ -68,34 +70,29 @@ export default class Core extends EventEmitter {
|
||||||
}
|
}
|
||||||
|
|
||||||
async runOnce () {
|
async runOnce () {
|
||||||
const options = this.options
|
const { options, dnsResolver } = this
|
||||||
|
|
||||||
if (('host' in options) && !('address' in options)) {
|
if (('host' in options) && !('address' in options)) {
|
||||||
const resolved = await this.dnsResolver.resolve(options.host, options.ipFamily, this.srvRecord)
|
const resolved = await dnsResolver.resolve(options.host, options.ipFamily, this.srvRecord)
|
||||||
options.address = resolved.address
|
options.address = resolved.address
|
||||||
if (resolved.port) options.port = resolved.port
|
options.port ||= resolved.port
|
||||||
}
|
}
|
||||||
|
|
||||||
const state = new Results()
|
const state = new Results()
|
||||||
|
|
||||||
await this.run(state)
|
await this.run(state)
|
||||||
state.queryPort = options.port
|
|
||||||
|
|
||||||
|
state.queryPort = options.port
|
||||||
// because lots of servers prefix with spaces to try to appear first
|
// because lots of servers prefix with spaces to try to appear first
|
||||||
state.name = (state.name || '').trim()
|
state.name = (state.name || '').trim()
|
||||||
|
state.connect = `${state.gameHost || options.host || options.address}:${state.gamePort || options.port}`
|
||||||
if (!('connect' in state)) {
|
|
||||||
state.connect = '' +
|
|
||||||
(state.gameHost || this.options.host || this.options.address) +
|
|
||||||
':' +
|
|
||||||
(state.gamePort || this.options.port)
|
|
||||||
}
|
|
||||||
state.ping = this.shortestRTT
|
state.ping = this.shortestRTT
|
||||||
|
|
||||||
delete state.gameHost
|
delete state.gameHost
|
||||||
delete state.gamePort
|
delete state.gamePort
|
||||||
|
|
||||||
this.logger.debug(log => {
|
this.logger.debug(log => {
|
||||||
log('Size of players array: ' + state.players.length)
|
log('Size of players array:', state.players.length)
|
||||||
log('Size of bots array: ' + state.bots.length)
|
log('Size of bots array:', state.bots.length)
|
||||||
})
|
})
|
||||||
|
|
||||||
return state
|
return state
|
||||||
|
@ -104,19 +101,20 @@ export default class Core extends EventEmitter {
|
||||||
async run (/** Results */ state) {}
|
async run (/** Results */ state) {}
|
||||||
|
|
||||||
/** Param can be a time in ms, or a promise (which will be timed) */
|
/** Param can be a time in ms, or a promise (which will be timed) */
|
||||||
registerRtt (param) {
|
async registerRtt (param) {
|
||||||
if (param.then) {
|
try {
|
||||||
const start = Date.now()
|
if (param instanceof Promise) {
|
||||||
param.then(() => {
|
const start = Date.now()
|
||||||
const end = Date.now()
|
await param
|
||||||
const rtt = end - start
|
await this.registerRtt(Date.now() - start)
|
||||||
this.registerRtt(rtt)
|
} else {
|
||||||
}).catch(() => {})
|
this.logger.debug(`Registered RTT: ${param}ms`)
|
||||||
} else {
|
if (this.shortestRTT === 0 || param < this.shortestRTT) {
|
||||||
this.logger.debug('Registered RTT: ' + param + 'ms')
|
this.shortestRTT = param
|
||||||
if (this.shortestRTT === 0 || param < this.shortestRTT) {
|
}
|
||||||
this.shortestRTT = param
|
|
||||||
}
|
}
|
||||||
|
} catch (error) {
|
||||||
|
this.logger.debug(`Error in promise: ${error}`)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -164,8 +162,10 @@ export default class Core extends EventEmitter {
|
||||||
*/
|
*/
|
||||||
async withTcp (fn, port) {
|
async withTcp (fn, port) {
|
||||||
this.usedTcp = true
|
this.usedTcp = true
|
||||||
|
const { options, logger } = this
|
||||||
const address = this.options.address
|
const address = this.options.address
|
||||||
if (!port) port = this.options.port
|
port ??= options.port
|
||||||
|
|
||||||
this.assertValidPort(port)
|
this.assertValidPort(port)
|
||||||
|
|
||||||
let socket, connectionTimeout
|
let socket, connectionTimeout
|
||||||
|
@ -176,28 +176,29 @@ export default class Core extends EventEmitter {
|
||||||
// Prevent unhandled 'error' events from dumping straight to console
|
// Prevent unhandled 'error' events from dumping straight to console
|
||||||
socket.on('error', () => {})
|
socket.on('error', () => {})
|
||||||
|
|
||||||
this.logger.debug(log => {
|
logger.debug(log => {
|
||||||
this.logger.debug(address + ':' + port + ' TCP Connecting')
|
logger.debug(address + ':' + port + ' TCP Connecting')
|
||||||
const writeHook = socket.write
|
const writeHook = socket.write
|
||||||
socket.write = (...args) => {
|
socket.write = (...args) => {
|
||||||
log(address + ':' + port + ' TCP-->')
|
log(address + ':' + port + ' TCP-->')
|
||||||
log(debugDump(args[0]))
|
log(debugDump(args[0]))
|
||||||
writeHook.apply(socket, args)
|
writeHook.apply(socket, args)
|
||||||
}
|
}
|
||||||
|
|
||||||
socket.on('error', e => log('TCP Error:', e))
|
socket.on('error', e => log('TCP Error:', e))
|
||||||
socket.on('close', () => log('TCP Closed'))
|
socket.on('close', () => log('TCP Closed'))
|
||||||
socket.on('data', (data) => {
|
socket.on('data', (data) => {
|
||||||
log(address + ':' + port + ' <--TCP')
|
log(`${address}:${port} <--TCP`)
|
||||||
log(data)
|
log(data)
|
||||||
})
|
})
|
||||||
socket.on('ready', () => log(address + ':' + port + ' TCP Connected'))
|
socket.on('ready', () => log(`${address}:${port} TCP Connected`))
|
||||||
})
|
})
|
||||||
|
|
||||||
const connectionPromise = new Promise((resolve, reject) => {
|
const connectionPromise = new Promise((resolve, reject) => {
|
||||||
socket.on('ready', resolve)
|
socket.on('ready', resolve)
|
||||||
socket.on('close', () => reject(new Error('TCP Connection Refused')))
|
socket.on('close', () => reject(new Error('TCP Connection Refused')))
|
||||||
})
|
})
|
||||||
this.registerRtt(connectionPromise)
|
await this.registerRtt(connectionPromise)
|
||||||
connectionTimeout = Promises.createTimeout(this.options.socketTimeout, 'TCP Opening')
|
connectionTimeout = Promises.createTimeout(this.options.socketTimeout, 'TCP Opening')
|
||||||
await Promise.race([
|
await Promise.race([
|
||||||
connectionPromise,
|
connectionPromise,
|
||||||
|
@ -206,8 +207,8 @@ export default class Core extends EventEmitter {
|
||||||
])
|
])
|
||||||
return await fn(socket)
|
return await fn(socket)
|
||||||
} finally {
|
} finally {
|
||||||
socket && socket.destroy()
|
socket?.destroy()
|
||||||
connectionTimeout && connectionTimeout.cancel()
|
connectionTimeout?.cancel()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -221,7 +222,7 @@ export default class Core extends EventEmitter {
|
||||||
async tcpSend (socket, buffer, ondata) {
|
async tcpSend (socket, buffer, ondata) {
|
||||||
let timeout
|
let timeout
|
||||||
try {
|
try {
|
||||||
const promise = new Promise((resolve, reject) => {
|
const promise = new Promise((resolve, _reject) => {
|
||||||
let received = Buffer.from([])
|
let received = Buffer.from([])
|
||||||
const onData = (data) => {
|
const onData = (data) => {
|
||||||
received = Buffer.concat([received, data])
|
received = Buffer.concat([received, data])
|
||||||
|
@ -237,7 +238,7 @@ export default class Core extends EventEmitter {
|
||||||
timeout = Promises.createTimeout(this.options.socketTimeout, 'TCP')
|
timeout = Promises.createTimeout(this.options.socketTimeout, 'TCP')
|
||||||
return await Promise.race([promise, timeout, this.abortedPromise])
|
return await Promise.race([promise, timeout, this.abortedPromise])
|
||||||
} finally {
|
} finally {
|
||||||
timeout && timeout.cancel()
|
timeout?.cancel()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -249,8 +250,7 @@ export default class Core extends EventEmitter {
|
||||||
* @template T
|
* @template T
|
||||||
*/
|
*/
|
||||||
async udpSend (buffer, onPacket, onTimeout) {
|
async udpSend (buffer, onPacket, onTimeout) {
|
||||||
const address = this.options.address
|
const { address, port, debug, socketTimeout } = this.options
|
||||||
const port = this.options.port
|
|
||||||
this.assertValidPort(port)
|
this.assertValidPort(port)
|
||||||
|
|
||||||
if (typeof buffer === 'string') buffer = Buffer.from(buffer, 'binary')
|
if (typeof buffer === 'string') buffer = Buffer.from(buffer, 'binary')
|
||||||
|
@ -264,19 +264,14 @@ export default class Core extends EventEmitter {
|
||||||
|
|
||||||
let socketCallback
|
let socketCallback
|
||||||
let timeout
|
let timeout
|
||||||
|
|
||||||
try {
|
try {
|
||||||
const promise = new Promise((resolve, reject) => {
|
const promise = new Promise((resolve, reject) => {
|
||||||
const start = Date.now()
|
const start = Date.now()
|
||||||
let end = null
|
|
||||||
socketCallback = (fromAddress, fromPort, buffer) => {
|
socketCallback = (fromAddress, fromPort, buffer) => {
|
||||||
try {
|
try {
|
||||||
if (fromAddress !== address) return
|
if (fromAddress !== address || fromPort !== port) return
|
||||||
if (fromPort !== port) return
|
this.registerRtt(Date.now() - start)
|
||||||
if (end === null) {
|
|
||||||
end = Date.now()
|
|
||||||
const rtt = end - start
|
|
||||||
this.registerRtt(rtt)
|
|
||||||
}
|
|
||||||
const result = onPacket(buffer)
|
const result = onPacket(buffer)
|
||||||
if (result !== undefined) {
|
if (result !== undefined) {
|
||||||
this.logger.debug('UDP send finished by callback')
|
this.logger.debug('UDP send finished by callback')
|
||||||
|
@ -286,30 +281,24 @@ export default class Core extends EventEmitter {
|
||||||
reject(e)
|
reject(e)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
socket.addCallback(socketCallback, this.options.debug)
|
socket.addCallback(socketCallback, debug)
|
||||||
})
|
})
|
||||||
timeout = Promises.createTimeout(this.options.socketTimeout, 'UDP')
|
timeout = Promises.createTimeout(socketTimeout, 'UDP')
|
||||||
const wrappedTimeout = new Promise((resolve, reject) => {
|
const wrappedTimeout = Promise.resolve(timeout).catch((e) => {
|
||||||
timeout.catch((e) => {
|
this.logger.debug('UDP timeout detected')
|
||||||
this.logger.debug('UDP timeout detected')
|
if (onTimeout) {
|
||||||
if (onTimeout) {
|
const result = onTimeout()
|
||||||
try {
|
if (result !== undefined) {
|
||||||
const result = onTimeout()
|
this.logger.debug('UDP timeout resolved by callback')
|
||||||
if (result !== undefined) {
|
return result
|
||||||
this.logger.debug('UDP timeout resolved by callback')
|
|
||||||
resolve(result)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
} catch (e) {
|
|
||||||
reject(e)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
reject(e)
|
}
|
||||||
})
|
throw e
|
||||||
})
|
})
|
||||||
|
|
||||||
return await Promise.race([promise, wrappedTimeout, this.abortedPromise])
|
return await Promise.race([promise, wrappedTimeout, this.abortedPromise])
|
||||||
} finally {
|
} finally {
|
||||||
timeout && timeout.cancel()
|
timeout?.cancel()
|
||||||
socketCallback && socket.removeCallback(socketCallback)
|
socketCallback && socket.removeCallback(socketCallback)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -344,7 +333,7 @@ export default class Core extends EventEmitter {
|
||||||
})
|
})
|
||||||
return await Promise.race([wrappedPromise, this.abortedPromise])
|
return await Promise.race([wrappedPromise, this.abortedPromise])
|
||||||
} finally {
|
} finally {
|
||||||
requestPromise && requestPromise.cancel()
|
requestPromise?.cancel()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue