Add TCP support to protocol core

This commit is contained in:
Michael Morrison 2014-02-01 09:45:49 -06:00
parent b31ffab1f5
commit d5b4310cef

View file

@ -1,5 +1,6 @@
var EventEmitter = require('events').EventEmitter,
dns = require('dns'),
net = require('net'),
async = require('async'),
Class = require('../../lib/Class'),
Reader = require('../../lib/reader');
@ -81,6 +82,11 @@ module.exports = Class.extend(EventEmitter,{
}
this.timers = [];
if(this.tcpSocket) {
this.tcpSocket.destroy();
delete this.tcpSocket;
}
this.udpTimeoutTimer = false;
this.udpCallback = false;
},
@ -106,11 +112,37 @@ module.exports = Class.extend(EventEmitter,{
},
parseDns: function(host,c) {
var self = this;
dns.lookup(host, function(err,address,family) {
if(err) return self.error(err);
self.options.address = address;
c();
});
function resolveStandard(host,c) {
dns.lookup(host, function(err,address,family) {
if(err) return self.error(err);
self.options.address = address;
c();
});
}
function resolveSrv(srv,host,c) {
dns.resolve(srv+'.'+host, 'SRV', function(err,addresses) {
if(err) return resolveStandard(host,c);
if(addresses.length >= 1) {
var line = addresses[0];
self.options.port = line.port;
var srvhost = line.name;
if(srvhost.match(/\d+\.\d+\.\d+\.\d+/)) {
self.options.address = srvhost;
c();
} else {
// resolve yet again
resolveStandard(srvhost,c);
}
return;
}
return resolveStandard(host,c);
});
}
if(this.srvRecord) resolveSrv(this.srvRecord,host,c);
else resolveStandard(host,c);
},
// utils
@ -133,21 +165,58 @@ module.exports = Class.extend(EventEmitter,{
return id;
},
_tcpConnect: function(c) {
var self = this;
if(this.tcpSocket) return c(this.tcpSocket);
var socket = this.tcpSocket = net.connect(
this.options.port,
this.options.address,
function() {
c(socket);
}
);
socket.setTimeout(10000);
socket.setNoDelay(true);
var received = new Buffer(0);
socket.on('data', function(data) {
if(!self.tcpCallback) return;
received = Buffer.concat([received,data]);
if(self.tcpCallback(received)) {
self.tcpCallback = false;
received = new Buffer(0);
}
});
},
tcpSend: function(buffer,ondata) {
if(this.tcpCallback) return this.fatal('Attempted to send TCP packet while still waiting on a managed response');
this.tcpCallback = ondata;
this._tcpConnect(function(socket) {
socket.write(buffer);
});
},
udpSend: function(buffer,onpacket,ontimeout) {
var self = this;
process.nextTick(function() {
if(self.udpCallback) return self.fatal('Attempted to send UDP packet while still waiting on a managed response');
self._udpSendNow(buffer);
if(!onpacket) return;
self.udpTimeoutTimer = self.setTimeout(function() {
self.udpCallback = false;
var timeout = false;
if(!ontimeout || ontimeout() !== true) timeout = true;
if(timeout) self.error('timeout');
},1000);
self.udpCallback = onpacket;
});
if(self.udpCallback) return self.fatal('Attempted to send UDP packet while still waiting on a managed response');
self._udpSendNow(buffer);
if(!onpacket) return;
self.udpTimeoutTimer = self.setTimeout(function() {
self.udpCallback = false;
var timeout = false;
if(!ontimeout || ontimeout() !== true) timeout = true;
if(timeout) self.error('timeout');
},1000);
self.udpCallback = onpacket;
},
_udpSendNow: function(buffer) {
if(!('port' in this.options)) return this.fatal('Attempted to send without setting a port');
@ -155,7 +224,7 @@ module.exports = Class.extend(EventEmitter,{
if(typeof buffer == 'string') buffer = new Buffer(buffer,'binary');
if(this.debug) console.log("Sent",buffer,this.options.address,this.options.port);
if(this.debug) console.log(this.options.address+':'+this.options.port+" --> "+buffer.toString('hex'));
this.udpSocket.send(buffer,0,buffer.length,this.options.port,this.options.address);
},
_udpResponse: function(buffer) {