From 77b2cc1c7fa4123104de578e230f3afe7337d576 Mon Sep 17 00:00:00 2001 From: mmorrison Date: Mon, 7 Jan 2019 00:52:29 -0600 Subject: [PATCH] Initial es6 async conversion work --- bin/gamedig.js | 6 +- lib/index.js | 44 +--- lib/typeresolver.js | 7 +- protocols/americasarmy.js | 4 +- protocols/armagetron.js | 6 +- protocols/ase.js | 70 +++-- protocols/battlefield.js | 246 +++++++++--------- protocols/core.js | 460 +++++++++++++++++--------------- protocols/samp.js | 143 +++++----- protocols/valve.js | 535 +++++++++++++++++++------------------- 10 files changed, 773 insertions(+), 748 deletions(-) diff --git a/bin/gamedig.js b/bin/gamedig.js index 919246d..a874ee9 100644 --- a/bin/gamedig.js +++ b/bin/gamedig.js @@ -5,7 +5,7 @@ const argv = require('minimist')(process.argv.slice(2)), const debug = argv.debug; delete argv.debug; -const outputFormat = argv.output; +const pretty = !!argv.pretty; delete argv.output; const options = {}; @@ -25,7 +25,7 @@ Gamedig.isCommandLine = true; Gamedig.query(options) .then((state) => { - if(outputFormat === 'pretty') { + if(pretty) { console.log(JSON.stringify(state,null,' ')); } else { console.log(JSON.stringify(state)); @@ -42,7 +42,7 @@ Gamedig.query(options) if (error instanceof Error) { error = error.message; } - if (outputFormat === 'pretty') { + if (pretty) { console.log(JSON.stringify({error: error}, null, ' ')); } else { console.log(JSON.stringify({error: error})); diff --git a/lib/index.js b/lib/index.js index c0466f0..cda6184 100644 --- a/lib/index.js +++ b/lib/index.js @@ -2,7 +2,7 @@ const dgram = require('dgram'), TypeResolver = require('./typeresolver'), HexUtil = require('./HexUtil'); -const activeQueries = []; +const activeQueries = new Set(); const udpSocket = dgram.createSocket('udp4'); udpSocket.unref(); @@ -13,12 +13,9 @@ udpSocket.on('message', (buffer, rinfo) => { console.log(HexUtil.debugDump(buffer)); } for(const query of activeQueries) { - if( - query.options.address !== rinfo.address - && query.options.altaddress !== rinfo.address - ) continue; + if(query.options.address !== rinfo.address) continue; if(query.options.port_query !== rinfo.port) continue; - query._udpResponse(buffer); + query._udpIncoming(buffer); break; } }); @@ -29,27 +26,14 @@ udpSocket.on('error', (e) => { class Gamedig { static query(options,callback) { - const promise = new Promise((resolve,reject) => { + const promise = (async () => { for (const key of Object.keys(options)) { if (['port_query', 'port'].includes(key)) { options[key] = parseInt(options[key]); } } - options.callback = (state) => { - if (state.error) reject(state.error); - else resolve(state); - }; - - let query; - try { - query = TypeResolver.lookup(options.type); - } catch(e) { - process.nextTick(() => { - options.callback({error:e}); - }); - return; - } + let query = TypeResolver.lookup(options.type); query.debug = Gamedig.debug; query.udpSocket = udpSocket; query.type = options.type; @@ -73,17 +57,13 @@ class Gamedig { query.options[key] = options[key]; } - activeQueries.push(query); - - query.on('finished',() => { - const i = activeQueries.indexOf(query); - if(i >= 0) activeQueries.splice(i, 1); - }); - - process.nextTick(() => { - query.start(); - }); - }); + activeQueries.add(query); + try { + return await query.runAll(); + } finally { + activeQueries.delete(query); + } + })(); if (callback && callback instanceof Function) { if(callback.length === 2) { diff --git a/lib/typeresolver.js b/lib/typeresolver.js index 78f27f0..dd33536 100644 --- a/lib/typeresolver.js +++ b/lib/typeresolver.js @@ -1,5 +1,6 @@ const Path = require('path'), - fs = require('fs'); + fs = require('fs'), + Core = require('../protocols/core'); const protocolDir = Path.normalize(__dirname+'/../protocols'); const gamesFile = Path.normalize(__dirname+'/../games.txt'); @@ -55,6 +56,10 @@ function createProtocolInstance(type) { } class TypeResolver { + /** + * @param {string} type + * @returns Core + */ static lookup(type) { if(!type) throw Error('No game specified'); diff --git a/protocols/americasarmy.js b/protocols/americasarmy.js index cf92afc..34b6bc2 100644 --- a/protocols/americasarmy.js +++ b/protocols/americasarmy.js @@ -1,8 +1,8 @@ const Gamespy2 = require('./gamespy2'); class AmericasArmy extends Gamespy2 { - finalizeState(state) { - super.finalizeState(state); + async run(state) { + await super.run(state); state.name = this.stripColor(state.name); state.map = this.stripColor(state.map); for(const key of Object.keys(state.raw)) { diff --git a/protocols/armagetron.js b/protocols/armagetron.js index e156023..1b3d628 100644 --- a/protocols/armagetron.js +++ b/protocols/armagetron.js @@ -7,10 +7,10 @@ class Armagetron extends Core { this.byteorder = 'be'; } - run(state) { + async run(state) { const b = Buffer.from([0,0x35,0,0,0,0,0,0x11]); - this.udpSend(b,(buffer) => { + await this.udpSend(b,(buffer) => { const reader = this.reader(buffer); reader.skip(6); @@ -37,7 +37,7 @@ class Armagetron extends Core { state.raw.uri = this.readString(reader); state.raw.globalids = this.readString(reader); this.finish(state); - return true; + return null; }); } diff --git a/protocols/ase.js b/protocols/ase.js index 038a7c7..ae0557f 100644 --- a/protocols/ase.js +++ b/protocols/ase.js @@ -1,44 +1,42 @@ const Core = require('./core'); class Ase extends Core { - run(state) { - this.udpSend('s',(buffer) => { + async run(state) { + const buffer = await this.udpSend('s',(buffer) => { const reader = this.reader(buffer); - - const header = reader.string({length:4}); - if(header !== 'EYE1') return; - - state.raw.gamename = this.readString(reader); - state.raw.port = parseInt(this.readString(reader)); - state.name = this.readString(reader); - state.raw.gametype = this.readString(reader); - state.map = this.readString(reader); - state.raw.version = this.readString(reader); - state.password = this.readString(reader) === '1'; - state.raw.numplayers = parseInt(this.readString(reader)); - state.maxplayers = parseInt(this.readString(reader)); - - while(!reader.done()) { - const key = this.readString(reader); - if(!key) break; - const value = this.readString(reader); - state.raw[key] = value; - } - - while(!reader.done()) { - const flags = reader.uint(1); - const player = {}; - if(flags & 1) player.name = this.readString(reader); - if(flags & 2) player.team = this.readString(reader); - if(flags & 4) player.skin = this.readString(reader); - if(flags & 8) player.score = parseInt(this.readString(reader)); - if(flags & 16) player.ping = parseInt(this.readString(reader)); - if(flags & 32) player.time = parseInt(this.readString(reader)); - state.players.push(player); - } - - this.finish(state); + const header = reader.string({length: 4}); + if (header === 'EYE1') return buffer; }); + + const reader = this.reader(buffer); + state.raw.gamename = this.readString(reader); + state.raw.port = parseInt(this.readString(reader)); + state.name = this.readString(reader); + state.raw.gametype = this.readString(reader); + state.map = this.readString(reader); + state.raw.version = this.readString(reader); + state.password = this.readString(reader) === '1'; + state.raw.numplayers = parseInt(this.readString(reader)); + state.maxplayers = parseInt(this.readString(reader)); + + while(!reader.done()) { + const key = this.readString(reader); + if(!key) break; + const value = this.readString(reader); + state.raw[key] = value; + } + + while(!reader.done()) { + const flags = reader.uint(1); + const player = {}; + if(flags & 1) player.name = this.readString(reader); + if(flags & 2) player.team = this.readString(reader); + if(flags & 4) player.skin = this.readString(reader); + if(flags & 8) player.score = parseInt(this.readString(reader)); + if(flags & 16) player.ping = parseInt(this.readString(reader)); + if(flags & 32) player.time = parseInt(this.readString(reader)); + state.players.push(player); + } } readString(reader) { diff --git a/protocols/battlefield.js b/protocols/battlefield.js index 9f3463f..45968c5 100644 --- a/protocols/battlefield.js +++ b/protocols/battlefield.js @@ -1,5 +1,4 @@ -const async = require('async'), - Core = require('./core'); +const Core = require('./core'); class Battlefield extends Core { constructor() { @@ -8,114 +7,128 @@ class Battlefield extends Core { this.isBadCompany2 = false; } - run(state) { - async.series([ - (c) => { - this.query(['serverInfo'], (data) => { - if(this.debug) console.log(data); - if(data.shift() !== 'OK') return this.fatal('Missing OK'); + async run(state) { + await this.withTcp(async socket => { + { + const data = await this.query(socket, ['serverInfo']); + state.raw.name = data.shift(); + state.raw.numplayers = parseInt(data.shift()); + state.maxplayers = parseInt(data.shift()); + state.raw.gametype = data.shift(); + state.map = data.shift(); + state.raw.roundsplayed = parseInt(data.shift()); + state.raw.roundstotal = parseInt(data.shift()); - state.raw.name = data.shift(); - state.raw.numplayers = parseInt(data.shift()); - state.maxplayers = parseInt(data.shift()); - state.raw.gametype = data.shift(); - state.map = data.shift(); - state.raw.roundsplayed = parseInt(data.shift()); - state.raw.roundstotal = parseInt(data.shift()); + const teamCount = data.shift(); + state.raw.teams = []; + for (let i = 0; i < teamCount; i++) { + const tickets = parseFloat(data.shift()); + state.raw.teams.push({ + tickets: tickets + }); + } - const teamCount = data.shift(); - state.raw.teams = []; - for(let i = 0; i < teamCount; i++) { - const tickets = parseFloat(data.shift()); - state.raw.teams.push({ - tickets:tickets - }); - } - - state.raw.targetscore = parseInt(data.shift()); + state.raw.targetscore = parseInt(data.shift()); + data.shift(); + state.raw.ranked = (data.shift() === 'true'); + state.raw.punkbuster = (data.shift() === 'true'); + state.password = (data.shift() === 'true'); + state.raw.uptime = parseInt(data.shift()); + state.raw.roundtime = parseInt(data.shift()); + if (this.isBadCompany2) { data.shift(); - state.raw.ranked = (data.shift() === 'true'); - state.raw.punkbuster = (data.shift() === 'true'); - state.password = (data.shift() === 'true'); - state.raw.uptime = parseInt(data.shift()); - state.raw.roundtime = parseInt(data.shift()); - if(this.isBadCompany2) { - data.shift(); - data.shift(); - } - state.raw.ip = data.shift(); - state.raw.punkbusterversion = data.shift(); - state.raw.joinqueue = (data.shift() === 'true'); - state.raw.region = data.shift(); - if(!this.isBadCompany2) { - state.raw.pingsite = data.shift(); - state.raw.country = data.shift(); - state.raw.quickmatch = (data.shift() === 'true'); - } - - c(); - }); - }, - (c) => { - this.query(['version'], (data) => { - if(this.debug) console.log(data); - if(data[0] !== 'OK') return this.fatal('Missing OK'); - - state.raw.version = data[2]; - - c(); - }); - }, - (c) => { - this.query(['listPlayers','all'], (data) => { - if(this.debug) console.log(data); - if(data.shift() !== 'OK') return this.fatal('Missing OK'); - - const fieldCount = parseInt(data.shift()); - const fields = []; - for(let i = 0; i < fieldCount; i++) { - fields.push(data.shift()); - } - const numplayers = data.shift(); - for(let i = 0; i < numplayers; i++) { - const player = {}; - for (let key of fields) { - let value = data.shift(); - - if(key === 'teamId') key = 'team'; - else if(key === 'squadId') key = 'squad'; - - if( - key === 'kills' - || key === 'deaths' - || key === 'score' - || key === 'rank' - || key === 'team' - || key === 'squad' - || key === 'ping' - || key === 'type' - ) { - value = parseInt(value); - } - - player[key] = value; - } - state.players.push(player); - } - - this.finish(state); - }); + data.shift(); + } + state.raw.ip = data.shift(); + state.raw.punkbusterversion = data.shift(); + state.raw.joinqueue = (data.shift() === 'true'); + state.raw.region = data.shift(); + if (!this.isBadCompany2) { + state.raw.pingsite = data.shift(); + state.raw.country = data.shift(); + state.raw.quickmatch = (data.shift() === 'true'); + } + } + + { + const data = await this.query(socket, ['version']); + data.shift(); + state.raw.version = data.shift(); + } + + { + const data = await this.query(socket, ['listPlayers', 'all']); + const fieldCount = parseInt(data.shift()); + const fields = []; + for (let i = 0; i < fieldCount; i++) { + fields.push(data.shift()); + } + const numplayers = data.shift(); + for (let i = 0; i < numplayers; i++) { + const player = {}; + for (let key of fields) { + let value = data.shift(); + + if (key === 'teamId') key = 'team'; + else if (key === 'squadId') key = 'squad'; + + if ( + key === 'kills' + || key === 'deaths' + || key === 'score' + || key === 'rank' + || key === 'team' + || key === 'squad' + || key === 'ping' + || key === 'type' + ) { + value = parseInt(value); + } + + player[key] = value; + } + state.players.push(player); + } } - ]); - } - query(params,c) { - this.tcpSend(buildPacket(params), (data) => { - const decoded = this.decodePacket(data); - if(!decoded) return false; - c(decoded); - return true; }); } + + async query(socket, params) { + const outPacket = this.buildPacket(params); + return await this.tcpSend(socket, outPacket, (data) => { + const decoded = this.decodePacket(data); + if(decoded) { + if(this.debug) console.log(decoded); + if(decoded.shift() !== 'OK') throw new Error('Missing OK'); + return decoded; + } + }); + } + + buildPacket(params) { + const paramBuffers = []; + for (const param of params) { + paramBuffers.push(Buffer.from(param,'utf8')); + } + + let totalLength = 12; + for (const paramBuffer of paramBuffers) { + totalLength += paramBuffer.length+1+4; + } + + const b = Buffer.alloc(totalLength); + b.writeUInt32LE(0,0); + b.writeUInt32LE(totalLength,4); + b.writeUInt32LE(params.length,8); + let offset = 12; + for (const paramBuffer of paramBuffers) { + b.writeUInt32LE(paramBuffer.length, offset); offset += 4; + paramBuffer.copy(b, offset); offset += paramBuffer.length; + b.writeUInt8(0, offset); offset += 1; + } + + return b; + } decodePacket(buffer) { if(buffer.length < 8) return false; const reader = this.reader(buffer); @@ -134,29 +147,4 @@ class Battlefield extends Core { } } -function buildPacket(params) { - const paramBuffers = []; - for (const param of params) { - paramBuffers.push(Buffer.from(param,'utf8')); - } - - let totalLength = 12; - for (const paramBuffer of paramBuffers) { - totalLength += paramBuffer.length+1+4; - } - - const b = Buffer.alloc(totalLength); - b.writeUInt32LE(0,0); - b.writeUInt32LE(totalLength,4); - b.writeUInt32LE(params.length,8); - let offset = 12; - for (const paramBuffer of paramBuffers) { - b.writeUInt32LE(paramBuffer.length, offset); offset += 4; - paramBuffer.copy(b, offset); offset += paramBuffer.length; - b.writeUInt8(0, offset); offset += 1; - } - - return b; -} - module.exports = Battlefield; \ No newline at end of file diff --git a/protocols/core.js b/protocols/core.js index 1fadb09..97598fc 100644 --- a/protocols/core.js +++ b/protocols/core.js @@ -1,9 +1,11 @@ const EventEmitter = require('events').EventEmitter, dns = require('dns'), net = require('net'), - async = require('async'), Reader = require('../lib/reader'), - HexUtil = require('../lib/HexUtil'); + HexUtil = require('../lib/HexUtil'), + util = require('util'), + dnsLookupAsync = util.promisify(dns.lookup), + dnsResolveAsync = util.promisify(dns.resolve); class Core extends EventEmitter { constructor() { @@ -13,23 +15,15 @@ class Core extends EventEmitter { attemptTimeout: 10000, maxAttempts: 1 }; - this.attempt = 1; - this.finished = false; this.encoding = 'utf8'; this.byteorder = 'le'; this.delimiter = '\0'; this.srvRecord = null; - this.attemptTimeoutTimer = null; - } - fatal(err,noretry) { - if(!noretry && this.attempt < this.options.maxAttempts) { - this.attempt++; - this.start(); - return; - } - - this.done({error: err.toString()}); + this.attemptAbortables = new Set(); + this.udpCallback = null; + this.udpLocked = false; + this.lastAbortableId = 0; } initState() { @@ -46,128 +40,138 @@ class Core extends EventEmitter { }; } - finalizeState(state) {} + // Run all attempts + async runAll() { + let result = null; + let lastError = null; + for (let attempt = 1; attempt <= this.options.maxAttempts; attempt++) { + try { + result = await this.runOnceSafe(); + result.query.attempts = attempt; + break; + } catch (e) { + lastError = e; + } + } - finish(state) { - this.finalizeState(state); - this.done(state); + if (result === null) { + throw lastError; + } + return result; } - done(state) { - if(this.finished) return; + // Runs a single attempt with a timeout and cleans up afterward + async runOnceSafe() { + try { + const result = await this.timedPromise(this.runOnce(), this.options.attemptTimeout, "Attempt"); + if (this.attemptAbortables.size) { + let out = []; + for (const abortable of this.attemptAbortables) { + out.push(abortable.id + " " + abortable.stack); + } + throw new Error('Query succeeded, but abortables were not empty (async leak?):\n' + out.join('\n---\n')); + } + return result; + } finally { + // Clean up any lingering long-running functions + for (const abortable of this.attemptAbortables) { + try { + abortable.abort(); + } catch(e) {} + } + this.attemptAbortables.clear(); + } + } - if(this.options.notes) + timedPromise(promise, timeoutMs, timeoutMsg) { + return new Promise((resolve, reject) => { + const cancelTimeout = this.setTimeout( + () => reject(new Error(timeoutMsg + " - Timed out after " + timeoutMs + "ms")), + timeoutMs + ); + promise.finally(cancelTimeout).then(resolve,reject); + }); + } + + async runOnce() { + const startMillis = Date.now(); + const options = this.options; + if (('host' in options) && !('address' in options)) { + options.address = await this.parseDns(options.host); + } + if(!('port_query' in options) && 'port' in options) { + const offset = options.port_query_offset || 0; + options.port_query = options.port + offset; + } + + const state = this.initState(); + await this.run(state); + + if (this.options.notes) state.notes = this.options.notes; state.query = {}; - if('host' in this.options) state.query.host = this.options.host; - if('address' in this.options) state.query.address = this.options.address; - if('port' in this.options) state.query.port = this.options.port; - if('port_query' in this.options) state.query.port_query = this.options.port_query; + if ('host' in this.options) state.query.host = this.options.host; + if ('address' in this.options) state.query.address = this.options.address; + if ('port' in this.options) state.query.port = this.options.port; + if ('port_query' in this.options) state.query.port_query = this.options.port_query; state.query.type = this.type; - if('pretty' in this) state.query.pretty = this.pretty; - state.query.duration = Date.now() - this.startMillis; - state.query.attempts = this.attempt; + if ('pretty' in this) state.query.pretty = this.pretty; + state.query.duration = Date.now() - startMillis; - this.reset(); - this.finished = true; - this.emit('finished',state); - if(this.options.callback) this.options.callback(state); + return state; } - reset() { - clearTimeout(this.attemptTimeoutTimer); - if(this.timers) { - for (const timer of this.timers) { - clearTimeout(timer); - } - } - this.timers = []; + async run(state) {} - if(this.tcpSocket) { - this.tcpSocket.destroy(); - delete this.tcpSocket; - } - - this.udpTimeoutTimer = false; - this.udpCallback = false; - } - - start() { - const options = this.options; - this.reset(); - - this.startMillis = Date.now(); - - this.attemptTimeoutTimer = setTimeout(() => { - this.fatal('timeout'); - },this.options.attemptTimeout); - - async.series([ - (c) => { - // resolve host names - if(!('host' in options)) return c(); - if(options.host.match(/\d+\.\d+\.\d+\.\d+/)) { - options.address = options.host; - c(); - } else { - this.parseDns(options.host,c); - } - }, - (c) => { - // calculate query port if needed - if(!('port_query' in options) && 'port' in options) { - const offset = options.port_query_offset || 0; - options.port_query = options.port + offset; - } - c(); - }, - (c) => { - // run - this.run(this.initState()); - } - - ]); - } - - run() {} - - parseDns(host,c) { - const resolveStandard = (host,c) => { + /** + * @param {string} host + * @returns {Promise} + */ + async parseDns(host) { + const isIp = (host) => { + return !!host.match(/\d+\.\d+\.\d+\.\d+/); + }; + const resolveStandard = async (host) => { + if(isIp(host)) return host; if(this.debug) console.log("Standard DNS Lookup: " + host); - dns.lookup(host, (err,address,family) => { - if(err) return this.fatal(err); - if(this.debug) console.log(address); - this.options.address = address; - c(); - }); + const {address,family} = await dnsLookupAsync(host); + if(this.debug) console.log(address); + return address; }; - - const resolveSrv = (srv,host,c) => { + const resolveSrv = async (srv,host) => { + if(isIp(host)) return host; if(this.debug) console.log("SRV DNS Lookup: " + srv+'.'+host); - dns.resolve(srv+'.'+host, 'SRV', (err,addresses) => { - if(this.debug) console.log(err, addresses); - if(err) return resolveStandard(host,c); - if(addresses.length >= 1) { - const line = addresses[0]; - this.options.port = line.port; - const srvhost = line.name; - - if(srvhost.match(/\d+\.\d+\.\d+\.\d+/)) { - this.options.address = srvhost; - c(); - } else { - // resolve yet again - resolveStandard(srvhost,c); - } - return; + let records; + try { + records = await dnsResolveAsync(srv + '.' + host, 'SRV'); + if(this.debug) console.log(records); + if(records.length >= 1) { + const record = records[0]; + this.options.port = record.port; + const srvhost = record.name; + return await resolveStandard(srvhost); } - return resolveStandard(host,c); - }); + } catch(e) { + if (this.debug) console.log(e.toString()); + } + return await resolveStandard(host); }; - if(this.srvRecord) resolveSrv(this.srvRecord,host,c); - else resolveStandard(host,c); + if(this.srvRecord) return await resolveSrv(this.srvRecord, host); + else return await resolveStandard(host); + } + + addAbortable(fn) { + const id = ++this.lastAbortableId; + const stack = new Error().stack; + const entry = { id: id, abort: fn, stack: stack }; + if (this.debug) console.log("Adding abortable: " + id); + this.attemptAbortables.add(entry); + return () => { + if (this.debug) console.log("Removing abortable: " + id); + this.attemptAbortables.delete(entry); + } } // utils @@ -184,125 +188,169 @@ class Core extends EventEmitter { } } } - setTimeout(c,t) { - if(this.finished) return 0; - const id = setTimeout(c,t); - this.timers.push(id); - return id; - } trueTest(str) { if(typeof str === 'boolean') return str; if(typeof str === 'number') return str !== 0; if(typeof str === 'string') { if(str.toLowerCase() === 'true') return true; - if(str === 'yes') return true; + if(str.toLowerCase() === 'yes') return true; if(str === '1') return true; } return false; } - _tcpConnect(c) { - if(this.tcpSocket) return c(this.tcpSocket); - - let connected = false; - let received = Buffer.from([]); + /** + * @param {function(Socket):Promise} fn + * @returns {Promise} + */ + async withTcp(fn) { const address = this.options.address; const port = this.options.port_query; - const socket = this.tcpSocket = net.connect(port,address,() => { - if(this.debug) console.log(address+':'+port+" TCPCONNECTED"); - connected = true; - c(socket); - }); + const socket = net.connect(port,address); socket.setNoDelay(true); - if(this.debug) console.log(address+':'+port+" TCPCONNECT"); + const cancelAbortable = this.addAbortable(() => socket.destroy()); - const writeHook = socket.write; - socket.write = (...args) => { - if(this.debug) { + if(this.debug) { + console.log(address+':'+port+" TCP Connecting"); + const writeHook = socket.write; + socket.write = (...args) => { console.log(address+':'+port+" TCP-->"); console.log(HexUtil.debugDump(args[0])); - } - writeHook.apply(socket,args); - }; - - socket.on('error', () => {}); - socket.on('close', () => { - if(!this.tcpCallback) return; - if(connected) return this.fatal('Socket closed while waiting on TCP'); - else return this.fatal('TCP Connection Refused'); - }); - socket.on('data', (data) => { - if(!this.tcpCallback) return; - if(this.debug) { - console.log(address+':'+port+" <--TCP"); - console.log(HexUtil.debugDump(data)); - } - received = Buffer.concat([received,data]); - if(this.tcpCallback(received)) { - clearTimeout(this.tcpTimeoutTimer); - this.tcpCallback = false; - received = Buffer.from([]); - } - }); - } - tcpSend(buffer,ondata) { - process.nextTick(() => { - if(this.tcpCallback) return this.fatal('Attempted to send TCP packet while still waiting on a managed response'); - this._tcpConnect((socket) => { - socket.write(buffer); + writeHook.apply(socket,args); + }; + socket.on('error', e => console.log('TCP Error: ' + e)); + socket.on('close', () => console.log('TCP Closed')); + socket.on('data', (data) => { + if(this.debug) { + console.log(address+':'+port+" <--TCP"); + console.log(HexUtil.debugDump(data)); + } }); - if(!ondata) return; + socket.on('ready', () => console.log(address+':'+port+" TCP Connected")); + } - this.tcpTimeoutTimer = this.setTimeout(() => { - this.tcpCallback = false; - this.fatal('TCP Watchdog Timeout'); - },this.options.socketTimeout); - this.tcpCallback = ondata; - }); + try { + await this.timedPromise( + new Promise((resolve,reject) => { + socket.on('ready', resolve); + socket.on('close', () => reject(new Error('TCP Connection Refused'))); + }), + this.options.socketTimeout, + 'TCP Opening' + ); + return await fn(socket); + } finally { + cancelAbortable(); + socket.destroy(); + } } - udpSend(buffer,onpacket,ontimeout) { - process.nextTick(() => { - if(this.udpCallback) return this.fatal('Attempted to send UDP packet while still waiting on a managed response'); - this._udpSendNow(buffer); - if(!onpacket) return; - - this.udpTimeoutTimer = this.setTimeout(() => { - this.udpCallback = false; - let timeout = false; - if(!ontimeout || ontimeout() !== true) timeout = true; - if(timeout) this.fatal('UDP Watchdog Timeout'); - },this.options.socketTimeout); - this.udpCallback = onpacket; - }); + setTimeout(callback, time) { + let cancelAbortable; + const onTimeout = () => { + cancelAbortable(); + callback(); + }; + const timeout = setTimeout(onTimeout, time); + cancelAbortable = this.addAbortable(() => clearTimeout(timeout)); + return () => { + cancelAbortable(); + clearTimeout(timeout); + } } - _udpSendNow(buffer) { - if(!('port_query' in this.options)) return this.fatal('Attempted to send without setting a port'); - if(!('address' in this.options)) return this.fatal('Attempted to send without setting an address'); + /** + * @param {Socket} socket + * @param {Buffer} buffer + * @param {function(Buffer):boolean} ondata + * @returns {Promise} + */ + async tcpSend(socket,buffer,ondata) { + return await this.timedPromise( + new Promise(async (resolve,reject) => { + let received = Buffer.from([]); + const onData = (data) => { + received = Buffer.concat([received, data]); + const result = ondata(received); + if (result !== undefined) { + socket.off('data', onData); + resolve(result); + } + }; + socket.on('data', onData); + socket.write(buffer); + }), + this.options.socketTimeout, + 'TCP' + ); + } + + async withUdpLock(fn) { + if (this.udpLocked) { + throw new Error('Attempted to lock UDP when already locked'); + } + this.udpLocked = true; + try { + return await fn(); + } finally { + this.udpLocked = false; + this.udpCallback = null; + } + } + + /** + * @param {Buffer|string} buffer + * @param {function(Buffer):T} onPacket + * @param {(function():T)=} onTimeout + * @returns Promise + * @template T + */ + async udpSend(buffer,onPacket,onTimeout) { + if(!('port_query' in this.options)) throw new Error('Attempted to send without setting a port'); + if(!('address' in this.options)) throw new Error('Attempted to send without setting an address'); if(typeof buffer === 'string') buffer = Buffer.from(buffer,'binary'); - if(this.debug) { console.log(this.options.address+':'+this.options.port_query+" UDP-->"); console.log(HexUtil.debugDump(buffer)); } - this.udpSocket.send(buffer,0,buffer.length,this.options.port_query,this.options.address); + + return await this.withUdpLock(async() => { + this.udpSocket.send(buffer,0,buffer.length,this.options.port_query,this.options.address); + + return await new Promise((resolve,reject) => { + const cancelTimeout = this.setTimeout(() => { + if (this.debug) console.log("UDP timeout detected"); + let success = false; + if (onTimeout) { + const result = onTimeout(); + if (result !== undefined) { + if (this.debug) console.log("UDP timeout resolved by callback"); + resolve(result); + success = true; + } + } + if (!success) { + reject(new Error('UDP Watchdog Timeout')); + } + },this.options.socketTimeout); + + this.udpCallback = (buffer) => { + const result = onPacket(buffer); + if(result !== undefined) { + if (this.debug) console.log("UDP send finished by callback"); + cancelTimeout(); + resolve(result); + } + }; + }); + }); } - _udpResponse(buffer) { - if(this.udpCallback) { - const result = this.udpCallback(buffer); - if(result === true) { - // we're done with this udp session - clearTimeout(this.udpTimeoutTimer); - this.udpCallback = false; - } - } else { - this.udpResponse(buffer); - } + + _udpIncoming(buffer) { + this.udpCallback && this.udpCallback(buffer); } - udpResponse() {} } module.exports = Core; diff --git a/protocols/samp.js b/protocols/samp.js index 43934a4..57cff96 100644 --- a/protocols/samp.js +++ b/protocols/samp.js @@ -1,5 +1,4 @@ -const async = require('async'), - Core = require('./core'); +const Core = require('./core'); class Samp extends Core { constructor() { @@ -7,87 +6,83 @@ class Samp extends Core { this.encoding = 'win1252'; } - run(state) { - async.series([ - (c) => { - this.sendPacket('i',(reader) => { - state.password = !!reader.uint(1); - state.raw.numplayers = reader.uint(2); - state.maxplayers = reader.uint(2); - state.name = this.readString(reader,4); - state.raw.gamemode = this.readString(reader,4); - this.map = this.readString(reader,4); - c(); - }); - }, - (c) => { - this.sendPacket('r',(reader) => { - const ruleCount = reader.uint(2); - state.raw.rules = {}; - for(let i = 0; i < ruleCount; i++) { - const key = this.readString(reader,1); - const value = this.readString(reader,1); - state.raw.rules[key] = value; - } - if('mapname' in state.raw.rules) - state.map = state.raw.rules.mapname; - c(); - }); - }, - (c) => { - this.sendPacket('d',(reader) => { - const playerCount = reader.uint(2); - for(let i = 0; i < playerCount; i++) { - const player = {}; - player.id = reader.uint(1); - player.name = this.readString(reader,1); - player.score = reader.int(4); - player.ping = reader.uint(4); - state.players.push(player); - } - c(); - },() => { - for(let i = 0; i < state.raw.numplayers; i++) { - state.players.push({}); - } - c(); - }); - }, - (c) => { - this.finish(state); + async run(state) { + // read info + { + const reader = await this.sendPacket('i'); + state.password = !!reader.uint(1); + state.raw.numplayers = reader.uint(2); + state.maxplayers = reader.uint(2); + state.name = this.readString(reader,4); + state.raw.gamemode = this.readString(reader,4); + this.map = this.readString(reader,4); + } + + // read rules + { + const reader = await this.sendPacket('r'); + const ruleCount = reader.uint(2); + state.raw.rules = {}; + for(let i = 0; i < ruleCount; i++) { + const key = this.readString(reader,1); + const value = this.readString(reader,1); + state.raw.rules[key] = value; } - ]); + if('mapname' in state.raw.rules) + state.map = state.raw.rules.mapname; + } + + // read players + { + const reader = await this.sendPacket('d', true); + if (reader !== null) { + const playerCount = reader.uint(2); + for(let i = 0; i < playerCount; i++) { + const player = {}; + player.id = reader.uint(1); + player.name = this.readString(reader,1); + player.score = reader.int(4); + player.ping = reader.uint(4); + state.players.push(player); + } + } else { + for(let i = 0; i < state.raw.numplayers; i++) { + state.players.push({}); + } + } + } } readString(reader,lenBytes) { const length = reader.uint(lenBytes); if(!length) return ''; - const string = reader.string({length:length}); - return string; + return reader.string({length:length}); } - sendPacket(type,onresponse,ontimeout) { - const outbuffer = Buffer.alloc(11); - outbuffer.writeUInt32BE(0x53414D50,0); + async sendPacket(type,allowTimeout) { + const outBuffer = Buffer.alloc(11); + outBuffer.writeUInt32BE(0x53414D50,0); const ipSplit = this.options.address.split('.'); - outbuffer.writeUInt8(parseInt(ipSplit[0]),4); - outbuffer.writeUInt8(parseInt(ipSplit[1]),5); - outbuffer.writeUInt8(parseInt(ipSplit[2]),6); - outbuffer.writeUInt8(parseInt(ipSplit[3]),7); - outbuffer.writeUInt16LE(this.options.port,8); - outbuffer.writeUInt8(type.charCodeAt(0),10); + outBuffer.writeUInt8(parseInt(ipSplit[0]),4); + outBuffer.writeUInt8(parseInt(ipSplit[1]),5); + outBuffer.writeUInt8(parseInt(ipSplit[2]),6); + outBuffer.writeUInt8(parseInt(ipSplit[3]),7); + outBuffer.writeUInt16LE(this.options.port,8); + outBuffer.writeUInt8(type.charCodeAt(0),10); - this.udpSend(outbuffer,(buffer) => { - const reader = this.reader(buffer); - for(let i = 0; i < outbuffer.length; i++) { - if(outbuffer.readUInt8(i) !== reader.uint(1)) return; + return await this.udpSend( + outBuffer, + (buffer) => { + const reader = this.reader(buffer); + for(let i = 0; i < outBuffer.length; i++) { + if(outBuffer.readUInt8(i) !== reader.uint(1)) return; + } + return reader; + }, + () => { + if(allowTimeout) { + return null; + } } - onresponse(reader); - return true; - },() => { - if(ontimeout) { - ontimeout(); - return true; - } - }); + ); } } diff --git a/protocols/valve.js b/protocols/valve.js index a925a57..9a77511 100644 --- a/protocols/valve.js +++ b/protocols/valve.js @@ -1,5 +1,4 @@ -const async = require('async'), - Bzip2 = require('compressjs').Bzip2, +const Bzip2 = require('compressjs').Bzip2, Core = require('./core'); class Valve extends Core { @@ -28,173 +27,169 @@ class Valve extends Core { this._challenge = ''; } - run(state) { - async.series([ - (c) => { this.queryInfo(state,c); }, - (c) => { this.queryChallenge(state,c); }, - (c) => { this.queryPlayers(state,c); }, - (c) => { this.queryRules(state,c); }, - (c) => { this.cleanup(state,c); }, - (c) => { this.finish(state); } - ]); + async run(state) { + await this.queryInfo(state); + await this.queryChallenge(); + await this.queryPlayers(state); + await this.queryRules(state); + await this.cleanup(state); } - queryInfo(state,c) { - this.sendPacket( - 0x54,false,'Source Engine Query\0', + async queryInfo(state) { + const b = await this.sendPacket( + 0x54, + false, + 'Source Engine Query\0', this.goldsrcInfo ? 0x6D : 0x49, - (b) => { - const reader = this.reader(b); - - if(this.goldsrcInfo) state.raw.address = reader.string(); - else state.raw.protocol = reader.uint(1); - - state.name = reader.string(); - state.map = reader.string(); - state.raw.folder = reader.string(); - state.raw.game = reader.string(); - state.raw.steamappid = reader.uint(2); - state.raw.numplayers = reader.uint(1); - state.maxplayers = reader.uint(1); - - if(this.goldsrcInfo) state.raw.protocol = reader.uint(1); - else state.raw.numbots = reader.uint(1); - - state.raw.listentype = reader.uint(1); - state.raw.environment = reader.uint(1); - if(!this.goldsrcInfo) { - state.raw.listentype = String.fromCharCode(state.raw.listentype); - state.raw.environment = String.fromCharCode(state.raw.environment); - } - - state.password = !!reader.uint(1); - if(this.goldsrcInfo) { - state.raw.ismod = reader.uint(1); - if(state.raw.ismod) { - state.raw.modlink = reader.string(); - state.raw.moddownload = reader.string(); - reader.skip(1); - state.raw.modversion = reader.uint(4); - state.raw.modsize = reader.uint(4); - state.raw.modtype = reader.uint(1); - state.raw.moddll = reader.uint(1); - } - } - state.raw.secure = reader.uint(1); - - if(this.goldsrcInfo) { - state.raw.numbots = reader.uint(1); - } else { - if(state.raw.folder === 'ship') { - state.raw.shipmode = reader.uint(1); - state.raw.shipwitnesses = reader.uint(1); - state.raw.shipduration = reader.uint(1); - } - state.raw.version = reader.string(); - const extraFlag = reader.uint(1); - if(extraFlag & 0x80) state.raw.port = reader.uint(2); - if(extraFlag & 0x10) state.raw.steamid = reader.uint(8); - if(extraFlag & 0x40) { - state.raw.sourcetvport = reader.uint(2); - state.raw.sourcetvname = reader.string(); - } - if(extraFlag & 0x20) state.raw.tags = reader.string(); - if(extraFlag & 0x01) state.raw.gameid = reader.uint(8); - } - - // from https://developer.valvesoftware.com/wiki/Server_queries - if( - state.raw.protocol === 7 && ( - state.raw.steamappid === 215 - || state.raw.steamappid === 17550 - || state.raw.steamappid === 17700 - || state.raw.steamappid === 240 - ) - ) { - this._skipSizeInSplitHeader = true; - } - if(this.debug) { - console.log("STEAM APPID: "+state.raw.steamappid); - console.log("PROTOCOL: "+state.raw.protocol); - } - if(state.raw.protocol === 48) { - if(this.debug) console.log("GOLDSRC DETECTED - USING MODIFIED SPLIT FORMAT"); - this.goldsrcSplits = true; - } - - c(); - } + false ); - } - queryChallenge(state,c) { - if(this.legacyChallenge) { - this.sendPacket(0x57,false,null,0x41,(b) => { - // sendPacket will catch the response packet and - // save the challenge for us - c(); - }); + const reader = this.reader(b); + + if(this.goldsrcInfo) state.raw.address = reader.string(); + else state.raw.protocol = reader.uint(1); + + state.name = reader.string(); + state.map = reader.string(); + state.raw.folder = reader.string(); + state.raw.game = reader.string(); + state.raw.steamappid = reader.uint(2); + state.raw.numplayers = reader.uint(1); + state.maxplayers = reader.uint(1); + + if(this.goldsrcInfo) state.raw.protocol = reader.uint(1); + else state.raw.numbots = reader.uint(1); + + state.raw.listentype = reader.uint(1); + state.raw.environment = reader.uint(1); + if(!this.goldsrcInfo) { + state.raw.listentype = String.fromCharCode(state.raw.listentype); + state.raw.environment = String.fromCharCode(state.raw.environment); + } + + state.password = !!reader.uint(1); + if(this.goldsrcInfo) { + state.raw.ismod = reader.uint(1); + if(state.raw.ismod) { + state.raw.modlink = reader.string(); + state.raw.moddownload = reader.string(); + reader.skip(1); + state.raw.modversion = reader.uint(4); + state.raw.modsize = reader.uint(4); + state.raw.modtype = reader.uint(1); + state.raw.moddll = reader.uint(1); + } + } + state.raw.secure = reader.uint(1); + + if(this.goldsrcInfo) { + state.raw.numbots = reader.uint(1); } else { - c(); + if(state.raw.folder === 'ship') { + state.raw.shipmode = reader.uint(1); + state.raw.shipwitnesses = reader.uint(1); + state.raw.shipduration = reader.uint(1); + } + state.raw.version = reader.string(); + const extraFlag = reader.uint(1); + if(extraFlag & 0x80) state.raw.port = reader.uint(2); + if(extraFlag & 0x10) state.raw.steamid = reader.uint(8); + if(extraFlag & 0x40) { + state.raw.sourcetvport = reader.uint(2); + state.raw.sourcetvname = reader.string(); + } + if(extraFlag & 0x20) state.raw.tags = reader.string(); + if(extraFlag & 0x01) state.raw.gameid = reader.uint(8); + } + + // from https://developer.valvesoftware.com/wiki/Server_queries + if( + state.raw.protocol === 7 && ( + state.raw.steamappid === 215 + || state.raw.steamappid === 17550 + || state.raw.steamappid === 17700 + || state.raw.steamappid === 240 + ) + ) { + this._skipSizeInSplitHeader = true; + } + if(this.debug) { + console.log("STEAM APPID: "+state.raw.steamappid); + console.log("PROTOCOL: "+state.raw.protocol); + } + if(state.raw.protocol === 48) { + if(this.debug) console.log("GOLDSRC DETECTED - USING MODIFIED SPLIT FORMAT"); + this.goldsrcSplits = true; } } - queryPlayers(state,c) { + async queryChallenge() { + if(this.legacyChallenge) { + // sendPacket will catch the response packet and + // save the challenge for us + await this.sendPacket( + 0x57, + false, + null, + 0x41, + false + ); + } + } + + async queryPlayers(state) { state.raw.players = []; - this.sendPacket(0x55,true,null,0x44,(b) => { - const reader = this.reader(b); - const num = reader.uint(1); - for(let i = 0; i < num; i++) { - reader.skip(1); - const name = reader.string(); - const score = reader.int(4); - const time = reader.float(); - if(this.debug) console.log("Found player: "+name+" "+score+" "+time); + // CSGO doesn't even respond sometimes if host_players_show is not 2 + // Ignore timeouts in only this case + const allowTimeout = state.raw.steamappid === 730; - // connecting players don't count as players. - if(!name) continue; + const b = await this.sendPacket( + 0x55, + true, + null, + 0x44, + allowTimeout + ); + if (b === null) return; // timed out - // CSGO sometimes adds a bot named 'Max Players' if host_players_show is not 2 - if (state.raw.steamappid === 730 && name === 'Max Players') continue; + const reader = this.reader(b); + const num = reader.uint(1); + for(let i = 0; i < num; i++) { + reader.skip(1); + const name = reader.string(); + const score = reader.int(4); + const time = reader.float(); - state.raw.players.push({ - name:name, score:score, time:time - }); - } + if(this.debug) console.log("Found player: "+name+" "+score+" "+time); - c(); - }, () => { - // CSGO doesn't even respond sometimes if host_players_show is not 2 - // Ignore timeouts in only this case - if (state.raw.steamappid === 730) { - c(); - return true; - } - }); + // connecting players don't count as players. + if(!name) continue; + + // CSGO sometimes adds a bot named 'Max Players' if host_players_show is not 2 + if (state.raw.steamappid === 730 && name === 'Max Players') continue; + + state.raw.players.push({ + name:name, score:score, time:time + }); + } } - queryRules(state,c) { + async queryRules(state) { state.raw.rules = {}; - this.sendPacket(0x56,true,null,0x45,(b) => { - const reader = this.reader(b); - const num = reader.uint(2); - for(let i = 0; i < num; i++) { - const key = reader.string(); - const value = reader.string(); - state.raw.rules[key] = value; - } - c(); - }, () => { - // no rules were returned after timeout -- - // the server probably has them disabled - // ignore the timeout - c(); - return true; - }); + const b = await this.sendPacket(0x56,true,null,0x45,true); + if (b === null) return; // timed out - the server probably just has rules disabled + + const reader = this.reader(b); + const num = reader.uint(2); + for(let i = 0; i < num; i++) { + const key = reader.string(); + const value = reader.string(); + state.raw.rules[key] = value; + } } - cleanup(state,c) { + async cleanup(state) { // Battalion 1944 puts its info into rules fields for some reason if ('bat_name_s' in state.raw.rules) { state.name = state.raw.rules.bat_name_s; @@ -234,142 +229,158 @@ class Valve extends Core { if (sortedPlayers.length) state.players.push(sortedPlayers.pop()); else state.players.push({}); } - - c(); } /** + * Sends a request packet and returns only the response type expected * @param {number} type * @param {boolean} sendChallenge * @param {?string|Buffer} payload * @param {number} expect - * @param {function(Buffer)} callback - * @param {(function():boolean)=} ontimeout + * @param {boolean=} allowTimeout + * @returns Buffer|null **/ - sendPacket( + async sendPacket( type, sendChallenge, payload, expect, - callback, - ontimeout + allowTimeout ) { + for (let keyRetry = 0; keyRetry < 3; keyRetry++) { + let retryQuery = false; + const response = await this.sendPacketRaw( + type, sendChallenge, payload, + (payload) => { + const reader = this.reader(payload); + const type = reader.uint(1); + if (type === 0x41) { + const key = reader.uint(4); + if (this._challenge !== key) { + if (this.debug) console.log('Received new challenge key: ' + key); + this._challenge = key; + retryQuery = true; + if (keyRetry === 0 && sendChallenge) { + if (this.debug) console.log('Restarting query'); + return null; + } + } + } + if (this.debug) console.log("Received " + type.toString(16) + " expected " + expect.toString(16)); + if (type === expect) { + return reader.rest(); + } + }, + () => { + if (allowTimeout) return null; + } + ); + if (!retryQuery) return response; + } + throw new Error('Received too many challenge key responses'); + } + + /** + * Sends a request packet and assembles partial responses + * @param {number} type + * @param {boolean} sendChallenge + * @param {?string|Buffer} payload + * @param {function(Buffer)} onResponse + * @param {function()} onTimeout + **/ + async sendPacketRaw( + type, + sendChallenge, + payload, + onResponse, + onTimeout + ) { + if (typeof payload === 'string') payload = Buffer.from(payload, 'binary'); + const challengeLength = sendChallenge ? 4 : 0; + const payloadLength = payload ? payload.length : 0; + + const b = Buffer.alloc(5 + challengeLength + payloadLength); + b.writeInt32LE(-1, 0); + b.writeUInt8(type, 4); + + if (sendChallenge) { + let challenge = this._challenge; + if (!challenge) challenge = 0xffffffff; + if (this.byteorder === 'le') b.writeUInt32LE(challenge, 5); + else b.writeUInt32BE(challenge, 5); + } + if (payloadLength) payload.copy(b, 5 + challengeLength); + const packetStorage = {}; + return await this.udpSend( + b, + (buffer) => { + const reader = this.reader(buffer); + const header = reader.int(4); + if(header === -1) { + // full package + if(this.debug) console.log("Received full packet"); + return onResponse(reader.rest()); + } + if(header === -2) { + // partial package + const uid = reader.uint(4); + if(!(uid in packetStorage)) packetStorage[uid] = {}; + const packets = packetStorage[uid]; - const receivedFull = (reader) => { - const type = reader.uint(1); + let bzip = false; + if(!this.goldsrcSplits && uid & 0x80000000) bzip = true; - if(type === 0x41) { - const key = reader.uint(4); - - if(this.debug) console.log('Received challenge key: ' + key); - - if(this._challenge !== key) { - this._challenge = key; - if(sendChallenge) { - if (this.debug) console.log('Restarting query'); - send(); - return true; + let packetNum,payload,numPackets; + if(this.goldsrcSplits) { + packetNum = reader.uint(1); + numPackets = packetNum & 0x0f; + packetNum = (packetNum & 0xf0) >> 4; + payload = reader.rest(); + } else { + numPackets = reader.uint(1); + packetNum = reader.uint(1); + if(!this._skipSizeInSplitHeader) reader.skip(2); + if(packetNum === 0 && bzip) reader.skip(8); + payload = reader.rest(); } - } - return; - } + packets[packetNum] = payload; - if(this.debug) console.log("Received "+type.toString(16)+" expected "+expect.toString(16)); - if(type !== expect) return; - callback(reader.rest()); - return true; - }; - - const receivedOne = (buffer) => { - const reader = this.reader(buffer); - - const header = reader.int(4); - if(header === -1) { - // full package - if(this.debug) console.log("Received full packet"); - return receivedFull(reader); - } - if(header === -2) { - // partial package - const uid = reader.uint(4); - if(!(uid in packetStorage)) packetStorage[uid] = {}; - const packets = packetStorage[uid]; - - let bzip = false; - if(!this.goldsrcSplits && uid & 0x80000000) bzip = true; - - let packetNum,payload,numPackets; - if(this.goldsrcSplits) { - packetNum = reader.uint(1); - numPackets = packetNum & 0x0f; - packetNum = (packetNum & 0xf0) >> 4; - payload = reader.rest(); - } else { - numPackets = reader.uint(1); - packetNum = reader.uint(1); - if(!this._skipSizeInSplitHeader) reader.skip(2); - if(packetNum === 0 && bzip) reader.skip(8); - payload = reader.rest(); - } - - packets[packetNum] = payload; - - if(this.debug) { - console.log("Received partial packet uid:"+uid+" num:"+packetNum); - console.log("Received "+Object.keys(packets).length+'/'+numPackets+" packets for this UID"); - } - - if(Object.keys(packets).length !== numPackets) return; - - // assemble the parts - const list = []; - for(let i = 0; i < numPackets; i++) { - if(!(i in packets)) { - this.fatal('Missing packet #'+i); - return true; + if(this.debug) { + console.log("Received partial packet uid:"+uid+" num:"+packetNum); + console.log("Received "+Object.keys(packets).length+'/'+numPackets+" packets for this UID"); } - list.push(packets[i]); - } - let assembled = Buffer.concat(list); - if(bzip) { - if(this.debug) console.log("BZIP DETECTED - Extracing packet..."); - try { - assembled = Buffer.from(Bzip2.decompressFile(assembled)); - } catch(e) { - this.fatal('Invalid bzip packet'); - return true; + if(Object.keys(packets).length !== numPackets) return; + + // assemble the parts + const list = []; + for(let i = 0; i < numPackets; i++) { + if(!(i in packets)) { + this.fatal('Missing packet #'+i); + return true; + } + list.push(packets[i]); } + + let assembled = Buffer.concat(list); + if(bzip) { + if(this.debug) console.log("BZIP DETECTED - Extracing packet..."); + try { + assembled = Buffer.from(Bzip2.decompressFile(assembled)); + } catch(e) { + this.fatal('Invalid bzip packet'); + return true; + } + } + const assembledReader = this.reader(assembled); + assembledReader.skip(4); // header + return onResponse(assembledReader.rest()); } - const assembledReader = this.reader(assembled); - assembledReader.skip(4); // header - return receivedFull(assembledReader); - } - }; - - const send = (c) => { - if(typeof payload === 'string') payload = Buffer.from(payload,'binary'); - const challengeLength = sendChallenge ? 4 : 0; - const payloadLength = payload ? payload.length : 0; - - const b = Buffer.alloc(5 + challengeLength + payloadLength); - b.writeInt32LE(-1, 0); - b.writeUInt8(type, 4); - - if(sendChallenge) { - let challenge = this._challenge; - if(!challenge) challenge = 0xffffffff; - if(this.byteorder === 'le') b.writeUInt32LE(challenge, 5); - else b.writeUInt32BE(challenge, 5); - } - if(payloadLength) payload.copy(b, 5+challengeLength); - - this.udpSend(b,receivedOne,ontimeout); - }; - - send(); + }, + onTimeout + ); } }