Prioritizing requests in node

Overview

I was implementing a mapping server that served both map tiles and UTFGrids (json data allowing us to create hovers for the points). While the clients would fire a number of requests for both tiles and grids, I wanted to ensure that the tiles were always given priority - to ensure they would get the visual part as fast as possible.

Another thing I wanted to achieve was ensure that connections that were dropped by the client were not needlessly processed. This can happen a lot with slippy maps, when users just pan and zoom constantly.

Implementation

The idea was then to create a proxy that would only send as many requests as the map server could process at once. The remaining query would be kept in a queue where they could be prioritized. This approach also gave us the opportunity to drop requests from the queue when the client dropped the request - as this was much more complicated to do at the level of the map server itself.

The implementation was simple: a node http server takes incoming requests, prioritizes them and puts them in a queue. We then use node-http-proxy to send them on. A bit of error handling (in particular to detect dropped connections), and user defined request types and priorities - and here we go.

The class is used in this way:

var proxy = new QueueingProxy(
  4000,                      // Incomming port
  'http://127.0.0.1:5000',   // Target server
  4,                         // Number of requests that get set on at one time (per client)
  ['tile', 'grid', 'other'], // The different types of requests, in priority order
  function (req) {           // A function to return the type of a given request
    if (req.url.match(/\d+\/\d+\/\d+\.png/)) {
      return 'tile';
    } else if (req.url.match(/\d+\/\d+\/\d+\.grid\.json/)) {
      return 'grid';
    } else {
      return 'other';
    }
  }
);

The code

The code is rather short (185 lines long), and it is not something I wanted to maintain beyond my immediate needs - so I am sharing it here. It may or may not work out of the box for your needs - this is shared for inspiration, not as a ready made tool!

var http = require('http');
var httpProxy = require('http-proxy');

var _queue_request_id_sec = 0;

/**
 * The QueueingProxy provides an HTTP proxy which can limit the number of requests sent to the proxy at one time,
 * and prioritize incoming requests per type.
 *
 * @param port: Port to listen on;
 * @param target: URL to proxy to;
 * @param requests_per_client: Number of requests that can be pushed at one time for a single client;
 * @param request_types: Array defining the request types, starting with the highest priority requests;
 * @param get_request_type: Function that returns the request type of a request
 * @constructor
 */
function QueueingProxy(port, target, requests_per_client, request_types, get_request_type){
  var self = this;
  // General setup
  this._target = target;
  this._requests_per_client = requests_per_client;
  this._request_types = request_types;
  this._get_request_type = get_request_type;
  this._requests = {};
  this._processing = {};
  // Start the proxy
  this._proxy = httpProxy.createProxyServer({});
  this._proxy.on('proxyRes', function(res, req){
    self._request_finished(req);
  });
  this._proxy.on('error', function(err, req, res){
    self._deal_with_error(req, err.code.toString() + ": ");
  });
  // And start listening
  this._http = http.createServer(function(req, res){
    self._new_request(req, res);
  }).listen(port);
}

QueueingProxy.prototype._new_request = function(req, res){
  var self = this;
  var id = this._get_client_id(req);
  var request_id = this._get_request_id(req);
  var type = this._get_request_type(req);
  console.log("INCOMING: " + request_id); // + " from " + id);
  /* Queue the request */
  if (!(id in this._requests)){
    this._requests[id] = {};
    for (var i in this._request_types){
      this._requests[id][this._request_types[i]] = [];
    }
    this._processing[id] = 0;
  }

  this._requests[id][type].push({
    'req': req,
    'res': res
  });
  /* Ensure request is removed from list on premature termination */
  req.on('close', function(){
    self._deal_with_error(req, 'DROPPED: ');
  });
  /* Kickstart the consumer */
  this._process_next_requests(id);
};

/**
 * For a given http request return a string identifying the client.
 *
 * This will cache the client id on the request object.
 *
 * @param req: HTTP request object
 * @return: String - The client id
 */
QueueingProxy.prototype._get_client_id = function(req){
  if (typeof req._queue_client_id !== 'undefined'){
    return req._queue_client_id;
  }
  var ip = req.headers['x-forwarded-for'] || req.connection.remoteAddress;
  var id = ip + "-" + req['headers']['user-agent'] + "-" + req['headers']['cookie'];
  req._queue_client_id = id;
  return id;
};

/**
 * Return the request id for the given request.
 *
 * The request id is cached on the object, and generated sequentially
 *
 * @param req: HTTP request object
 * @return: String - the request id
 */
QueueingProxy.prototype._get_request_id = function(req){
  if (typeof req._queue_request_id !== 'undefined'){
    return req._queue_request_id;
  }

  req._queue_request_id = "r" + _queue_request_id_sec.toString();
  _queue_request_id_sec += 1;
  return req._queue_request_id;
};

/**
 * Process next requests for the given client id
 *
 * @param id: Client id
 * @param type: If defined, the type of request to process. If not defined, process the first available request
 *              with the highest priority type
 */
QueueingProxy.prototype._process_next_requests = function(id, type){
  if (this._processing[id] >= this._requests_per_client){
    return;
  }
  if (typeof type == 'undefined'){
    for (var i in this._request_types){
      var r_type = this._request_types[i];
      if (this._requests[id][r_type].length > 0){
        return this._process_next_requests(id, r_type);
      }
    }
    return;
  }
  var request = this._requests[id][type].pop();
  this._processing[id] += 1;
  console.log("FORWARDING " + request['req']._queue_request_id + " (" + type + ")"); // + " from " + id + " sent to target server.");
  this._proxy.web(request['req'], request['res'], {target: this._target});
  this._process_next_requests(id);
};

/**
 * Handle finished requests
 * @param req: HTTP request object
 */
QueueingProxy.prototype._request_finished = function(req){
  var id = this._get_client_id(req);
  console.log("FINISHED: " + req._queue_request_id); // + " from " + id);
  if (this._request_continuation(req)) {
    this._processing[id] -= 1;
    this._process_next_requests(id);
  }
};

/**
 * Handle error for the given request
 */
QueueingProxy.prototype._deal_with_error = function(req, msg_prefix){
  var id = this._get_client_id(req);
  var request_id = this._get_request_id(req);
  console.log(msg_prefix + request_id); // + " for client " + id);
  var pos = -1;
  var type = '';
  for (var t in this._requests[id]){
    for (var i in this._requests[id][t]){
      if (this._get_request_id(this._requests[id][t][i].req) == request_id){
        type = t;
        pos = i;
        break;
      }
    }
    if (pos >= 0){
      break;
    }
  }
  if (pos >= 0){
    console.log(request_id + ' still in the queue - removing.');
    this._requests[id][type].splice(pos, 1);
  }
  if (this._request_continuation(req)) {
    this._processing[id] -= 1;
    this._process_next_requests(id);
  }
};

/**
 * Retruns true if this request's continuation has not yet been processed. Calling this marks the requests continuation
 * has having been processed.
 */
QueueingProxy.prototype._request_continuation = function(req){
  if (!req._queue_request_continuation) {
    req._queue_request_continuation = true;
    return true;
  }
  return false;
};

module.exports = QueueingProxy;