Prioritizing requests in node
Overview
I was implementing a mapping server that served both map tiles and UTFGrids (json data allowing us to create hovers for the points). While the clients would fire a number of requests for both tiles and grids, I wanted to ensure that the tiles were always given priority - to ensure they would get the visual part as fast as possible.
Another thing I wanted to achieve was ensure that connections that were dropped by the client were not needlessly processed. This can happen a lot with slippy maps, when users just pan and zoom constantly.
Implementation
The idea was then to create a proxy that would only send as many requests as the map server could process at once. The remaining query would be kept in a queue where they could be prioritized. This approach also gave us the opportunity to drop requests from the queue when the client dropped the request - as this was much more complicated to do at the level of the map server itself.
The implementation was simple: a node http server takes incoming requests, prioritizes them and puts them in a queue. We then use node-http-proxy to send them on. A bit of error handling (in particular to detect dropped connections), and user defined request types and priorities - and here we go.
The class is used in this way:
var proxy = new QueueingProxy(
4000, // Incomming port
'http://127.0.0.1:5000', // Target server
4, // Number of requests that get set on at one time (per client)
['tile', 'grid', 'other'], // The different types of requests, in priority order
function (req) { // A function to return the type of a given request
if (req.url.match(/\d+\/\d+\/\d+\.png/)) {
return 'tile';
} else if (req.url.match(/\d+\/\d+\/\d+\.grid\.json/)) {
return 'grid';
} else {
return 'other';
}
}
);
The code
The code is rather short (185 lines long), and it is not something I wanted to maintain beyond my immediate needs - so I am sharing it here. It may or may not work out of the box for your needs - this is shared for inspiration, not as a ready made tool!
var http = require('http');
var httpProxy = require('http-proxy');
var _queue_request_id_sec = 0;
/**
* The QueueingProxy provides an HTTP proxy which can limit the number of requests sent to the proxy at one time,
* and prioritize incoming requests per type.
*
* @param port: Port to listen on;
* @param target: URL to proxy to;
* @param requests_per_client: Number of requests that can be pushed at one time for a single client;
* @param request_types: Array defining the request types, starting with the highest priority requests;
* @param get_request_type: Function that returns the request type of a request
* @constructor
*/
function QueueingProxy(port, target, requests_per_client, request_types, get_request_type){
var self = this;
// General setup
this._target = target;
this._requests_per_client = requests_per_client;
this._request_types = request_types;
this._get_request_type = get_request_type;
this._requests = {};
this._processing = {};
// Start the proxy
this._proxy = httpProxy.createProxyServer({});
this._proxy.on('proxyRes', function(res, req){
self._request_finished(req);
});
this._proxy.on('error', function(err, req, res){
self._deal_with_error(req, err.code.toString() + ": ");
});
// And start listening
this._http = http.createServer(function(req, res){
self._new_request(req, res);
}).listen(port);
}
QueueingProxy.prototype._new_request = function(req, res){
var self = this;
var id = this._get_client_id(req);
var request_id = this._get_request_id(req);
var type = this._get_request_type(req);
console.log("INCOMING: " + request_id); // + " from " + id);
/* Queue the request */
if (!(id in this._requests)){
this._requests[id] = {};
for (var i in this._request_types){
this._requests[id][this._request_types[i]] = [];
}
this._processing[id] = 0;
}
this._requests[id][type].push({
'req': req,
'res': res
});
/* Ensure request is removed from list on premature termination */
req.on('close', function(){
self._deal_with_error(req, 'DROPPED: ');
});
/* Kickstart the consumer */
this._process_next_requests(id);
};
/**
* For a given http request return a string identifying the client.
*
* This will cache the client id on the request object.
*
* @param req: HTTP request object
* @return: String - The client id
*/
QueueingProxy.prototype._get_client_id = function(req){
if (typeof req._queue_client_id !== 'undefined'){
return req._queue_client_id;
}
var ip = req.headers['x-forwarded-for'] || req.connection.remoteAddress;
var id = ip + "-" + req['headers']['user-agent'] + "-" + req['headers']['cookie'];
req._queue_client_id = id;
return id;
};
/**
* Return the request id for the given request.
*
* The request id is cached on the object, and generated sequentially
*
* @param req: HTTP request object
* @return: String - the request id
*/
QueueingProxy.prototype._get_request_id = function(req){
if (typeof req._queue_request_id !== 'undefined'){
return req._queue_request_id;
}
req._queue_request_id = "r" + _queue_request_id_sec.toString();
_queue_request_id_sec += 1;
return req._queue_request_id;
};
/**
* Process next requests for the given client id
*
* @param id: Client id
* @param type: If defined, the type of request to process. If not defined, process the first available request
* with the highest priority type
*/
QueueingProxy.prototype._process_next_requests = function(id, type){
if (this._processing[id] >= this._requests_per_client){
return;
}
if (typeof type == 'undefined'){
for (var i in this._request_types){
var r_type = this._request_types[i];
if (this._requests[id][r_type].length > 0){
return this._process_next_requests(id, r_type);
}
}
return;
}
var request = this._requests[id][type].pop();
this._processing[id] += 1;
console.log("FORWARDING " + request['req']._queue_request_id + " (" + type + ")"); // + " from " + id + " sent to target server.");
this._proxy.web(request['req'], request['res'], {target: this._target});
this._process_next_requests(id);
};
/**
* Handle finished requests
* @param req: HTTP request object
*/
QueueingProxy.prototype._request_finished = function(req){
var id = this._get_client_id(req);
console.log("FINISHED: " + req._queue_request_id); // + " from " + id);
if (this._request_continuation(req)) {
this._processing[id] -= 1;
this._process_next_requests(id);
}
};
/**
* Handle error for the given request
*/
QueueingProxy.prototype._deal_with_error = function(req, msg_prefix){
var id = this._get_client_id(req);
var request_id = this._get_request_id(req);
console.log(msg_prefix + request_id); // + " for client " + id);
var pos = -1;
var type = '';
for (var t in this._requests[id]){
for (var i in this._requests[id][t]){
if (this._get_request_id(this._requests[id][t][i].req) == request_id){
type = t;
pos = i;
break;
}
}
if (pos >= 0){
break;
}
}
if (pos >= 0){
console.log(request_id + ' still in the queue - removing.');
this._requests[id][type].splice(pos, 1);
}
if (this._request_continuation(req)) {
this._processing[id] -= 1;
this._process_next_requests(id);
}
};
/**
* Retruns true if this request's continuation has not yet been processed. Calling this marks the requests continuation
* has having been processed.
*/
QueueingProxy.prototype._request_continuation = function(req){
if (!req._queue_request_continuation) {
req._queue_request_continuation = true;
return true;
}
return false;
};
module.exports = QueueingProxy;