diff --git a/js/server/modules/org/arangodb/foxx/queues.js b/js/server/modules/org/arangodb/foxx/queues.js index 15552fa55a..1b7741f976 100644 --- a/js/server/modules/org/arangodb/foxx/queues.js +++ b/js/server/modules/org/arangodb/foxx/queues.js @@ -7,7 +7,7 @@ /// /// DISCLAIMER /// -/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany +/// Copyright 2004-2015 triAGENS GmbH, Cologne, Germany /// /// Licensed under the Apache License, Version 2.0 (the "License"); /// you may not use this file except in compliance with the License. @@ -31,142 +31,135 @@ var _ = require('underscore'); var flatten = require('internal').flatten; var arangodb = require('org/arangodb'); var console = require('console'); +var qb = require('aqb'); var db = arangodb.db; var queueCache = {}; var jobCache = {}; +var jobTypeCache = {}; -function failImmutable(name) { - return function () { - throw new Error(name + ' is not mutable'); - }; +function resetQueueControl() { + try { + global.KEY_SET("queue-control", "skip", 0); + } catch (e) {} } - -var queues = { - _jobTypes: {}, - _clearCache: function () { - try { - global.KEY_SET("queue-control", "skip", 0); - } - catch (err) { - // ignore error if key does not exist - } - }, - get: function (key) { - var dbName = db._name(); - if (!queueCache[dbName]) { - queueCache[dbName] = {}; - } - if (!queueCache[dbName][key]) { - if (!db._queues.exists(key)) { - throw new Error('Queue does not exist: ' + key); - } - queueCache[dbName][key] = new Queue(key); - } - return queueCache[dbName][key]; - }, - create: function (key, maxWorkers) { - try { - db._queues.save({_key: key, maxWorkers: maxWorkers || 1}); - } catch (err) { - if (!err instanceof arangodb.ArangoError || - err.errorNum !== arangodb.ERROR_ARANGO_UNIQUE_CONSTRAINT_VIOLATED) { - throw err; - } - if (maxWorkers) { - db._queues.update(key, {maxWorkers: maxWorkers}); - } - } - this._clearCache(); - var dbName = db._name(); - if (!queueCache[dbName]) { - queueCache[dbName] = {}; +function getQueue(key) { + var dbName = db._name(); + if (!queueCache[dbName]) { + queueCache[dbName] = {}; + } + if (!queueCache[dbName][key]) { + if (!db._queues.exists(key)) { + throw new Error('Queue does not exist: ' + key); } queueCache[dbName][key] = new Queue(key); - return queueCache[dbName][key]; - }, - delete: function (key) { - var result = false; - db._executeTransaction({ - collections: { - read: ['_queues'], - write: ['_queues'] - }, - action: function () { - if (db._queues.exists(key)) { - db._queues.remove(key); - result = true; - } - } - }); - if (result) { - this._clearCache(); - } - return result; - }, - registerJobType: function (type, opts) { - if (typeof opts === 'function') { - opts = {execute: opts}; - } - if (typeof opts.execute !== 'function') { - throw new Error('Must provide a function to execute!'); - } - if (opts.schema && typeof opts.schema.validate !== 'function') { - throw new Error('Schema must be a joi schema!'); - } - var cfg = _.extend({maxFailures: 0}, opts); - - // _jobTypes are database-specific - var dbName = db._name(); - if (!queues._jobTypes[dbName]) { - queues._jobTypes[dbName] = {}; - } - queues._jobTypes[dbName][type] = cfg; } -}; + return queueCache[dbName][key]; +} + +function createQueue(key, maxWorkers) { + try { + db._queues.save({_key: key, maxWorkers: maxWorkers || 1}); + } catch (err) { + if (!err instanceof arangodb.ArangoError || + err.errorNum !== arangodb.ERROR_ARANGO_UNIQUE_CONSTRAINT_VIOLATED) { + throw err; + } + if (maxWorkers) { + db._queues.update(key, {maxWorkers: maxWorkers}); + } + } + resetQueueControl(); + var dbName = db._name(); + if (!queueCache[dbName]) { + queueCache[dbName] = {}; + } + queueCache[dbName][key] = new Queue(key); + return queueCache[dbName][key]; +} + +function deleteQueue(key) { + var result = false; + db._executeTransaction({ + collections: { + read: ['_queues'], + write: ['_queues'] + }, + action: function () { + if (db._queues.exists(key)) { + db._queues.remove(key); + result = true; + } + } + }); + if (result) { + resetQueueControl(); + } + return result; +} + +function registerJobType(type, opts) { + if (typeof opts === 'function') { + opts = {execute: opts}; + } + if (typeof opts.execute !== 'function') { + throw new Error('Must provide a function to execute!'); + } + if (opts.schema && typeof opts.schema.validate !== 'function') { + throw new Error('Schema must be a joi schema!'); + } + var cfg = _.extend({maxFailures: 0}, opts); + + // _jobTypes are database-specific + var dbName = db._name(); + if (!jobTypeCache[dbName]) { + jobTypeCache[dbName] = {}; + } + jobTypeCache[dbName][type] = cfg; +} function getJobs(queue, status, type) { - var vars = {}, - aql = 'FOR job IN _jobs'; + var query = qb.for('job').in('_jobs'); + var vars = {}; + if (queue !== undefined) { - aql += ' FILTER job.queue == @queue'; + query = query.filter(qb.ref('@queue').eq('job.queue')); vars.queue = queue; } + if (status !== undefined) { - aql += ' FILTER job.status == @status'; + query = query.filter(qb.ref('@status').eq('job.status')); vars.status = status; } + if (type !== undefined) { - aql += ' FILTER job.type == @type'; + query = query.filter(qb.ref('@type').eq('job.type')); vars.type = type; } - aql += ' SORT job.delayUntil ASC RETURN job._id'; + return db._createStatement({ - query: aql, + query: query.sort('job.delayUntil', 'ASC').return('job._id'), bindVars: vars }).execute().toArray(); } function Job(id) { - var self = this; - Object.defineProperty(self, 'id', { - get: function () { - return id; - }, + Object.defineProperty(this, 'id', { + value: id, + writable: false, configurable: false, enumerable: true }); _.each(['data', 'status', 'type', 'failures'], function (key) { - Object.defineProperty(self, key, { + Object.defineProperty(this, key, { get: function () { var value = db._jobs.document(this.id)[key]; return (value && typeof value === 'object') ? Object.freeze(value) : value; }, - set: failImmutable(key), configurable: false, enumerable: true }); - }); + }, this); } _.extend(Job.prototype, { @@ -180,12 +173,11 @@ _.extend(Job.prototype, { action: function () { var job = db._jobs.document(self.id); if (job.status !== 'completed') { + job.failures.push(flatten(new Error('Job aborted.'))); db._jobs.update(job, { status: 'failed', modified: Date.now(), - failures: job.failures.concat([ - flatten(new Error('Job aborted.')) - ]) + failures: job.failures }); } } @@ -195,7 +187,7 @@ _.extend(Job.prototype, { db._jobs.update(this.id, { status: 'pending' }); - queues._clearCache(); + resetQueueControl(); } }); @@ -204,7 +196,6 @@ function Queue(name) { get: function () { return name; }, - set: failImmutable('name'), configurable: false, enumerable: true }); @@ -221,10 +212,10 @@ _.extend(Queue.prototype, { } // _jobTypes are database-specific var dbName = db._name(); - if (! queues._jobTypes.hasOwnProperty(dbName)) { - queues._jobTypes[dbName] = { }; + if (!jobTypeCache[dbName]) { + jobTypeCache[dbName] = {}; } - type = queues._jobTypes[dbName][name]; + type = jobTypeCache[dbName][name]; if (type !== undefined) { if (type.schema) { @@ -242,7 +233,7 @@ _.extend(Queue.prototype, { } else { throw new Error('Unknown job type: ' + name); } - queues._clearCache(); + resetQueueControl(); now = Date.now(); return db._jobs.save({ status: 'pending', @@ -260,7 +251,7 @@ _.extend(Queue.prototype, { })._id; }, get: function (id) { - if (id === undefined || id === null) { + if (typeof id !== 'string') { throw new Error('Invalid job id'); } if (!id.match(/^_jobs\//)) { @@ -285,7 +276,7 @@ _.extend(Queue.prototype, { action: function () { try { db._jobs.remove(id); - queues._clearCache(); + resetQueueControl(); return true; } catch (err) { return false; @@ -310,9 +301,16 @@ _.extend(Queue.prototype, { } }); -queues.create('default'); +createQueue('default'); -module.exports = queues; +module.exports = { + _jobTypes: jobTypeCache, + _clearCache: resetQueueControl, + get: getQueue, + create: createQueue, + delete: deleteQueue, + registerJobType: registerJobType +}; // ----------------------------------------------------------------------------- // --SECTION-- END-OF-FILE