1
0
Fork 0

More style.

This commit is contained in:
Alan Plum 2015-05-27 11:30:39 +02:00
parent 0a2b4947ae
commit d40d87a459
1 changed files with 111 additions and 113 deletions

View File

@ -7,7 +7,7 @@
///
/// DISCLAIMER
///
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
/// Copyright 2004-2015 triAGENS GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
@ -31,28 +31,19 @@ var _ = require('underscore');
var flatten = require('internal').flatten;
var arangodb = require('org/arangodb');
var console = require('console');
var qb = require('aqb');
var db = arangodb.db;
var queueCache = {};
var jobCache = {};
var jobTypeCache = {};
function failImmutable(name) {
return function () {
throw new Error(name + ' is not mutable');
};
}
var queues = {
_jobTypes: {},
_clearCache: function () {
function resetQueueControl() {
try {
global.KEY_SET("queue-control", "skip", 0);
} catch (e) {}
}
catch (err) {
// ignore error if key does not exist
}
},
get: function (key) {
function getQueue(key) {
var dbName = db._name();
if (!queueCache[dbName]) {
queueCache[dbName] = {};
@ -64,8 +55,9 @@ var queues = {
queueCache[dbName][key] = new Queue(key);
}
return queueCache[dbName][key];
},
create: function (key, maxWorkers) {
}
function createQueue(key, maxWorkers) {
try {
db._queues.save({_key: key, maxWorkers: maxWorkers || 1});
} catch (err) {
@ -77,15 +69,16 @@ var queues = {
db._queues.update(key, {maxWorkers: maxWorkers});
}
}
this._clearCache();
resetQueueControl();
var dbName = db._name();
if (!queueCache[dbName]) {
queueCache[dbName] = {};
}
queueCache[dbName][key] = new Queue(key);
return queueCache[dbName][key];
},
delete: function (key) {
}
function deleteQueue(key) {
var result = false;
db._executeTransaction({
collections: {
@ -100,11 +93,12 @@ var queues = {
}
});
if (result) {
this._clearCache();
resetQueueControl();
}
return result;
},
registerJobType: function (type, opts) {
}
function registerJobType(type, opts) {
if (typeof opts === 'function') {
opts = {execute: opts};
}
@ -118,55 +112,54 @@ var queues = {
// _jobTypes are database-specific
var dbName = db._name();
if (!queues._jobTypes[dbName]) {
queues._jobTypes[dbName] = {};
if (!jobTypeCache[dbName]) {
jobTypeCache[dbName] = {};
}
queues._jobTypes[dbName][type] = cfg;
jobTypeCache[dbName][type] = cfg;
}
};
function getJobs(queue, status, type) {
var vars = {},
aql = 'FOR job IN _jobs';
var query = qb.for('job').in('_jobs');
var vars = {};
if (queue !== undefined) {
aql += ' FILTER job.queue == @queue';
query = query.filter(qb.ref('@queue').eq('job.queue'));
vars.queue = queue;
}
if (status !== undefined) {
aql += ' FILTER job.status == @status';
query = query.filter(qb.ref('@status').eq('job.status'));
vars.status = status;
}
if (type !== undefined) {
aql += ' FILTER job.type == @type';
query = query.filter(qb.ref('@type').eq('job.type'));
vars.type = type;
}
aql += ' SORT job.delayUntil ASC RETURN job._id';
return db._createStatement({
query: aql,
query: query.sort('job.delayUntil', 'ASC').return('job._id'),
bindVars: vars
}).execute().toArray();
}
function Job(id) {
var self = this;
Object.defineProperty(self, 'id', {
get: function () {
return id;
},
Object.defineProperty(this, 'id', {
value: id,
writable: false,
configurable: false,
enumerable: true
});
_.each(['data', 'status', 'type', 'failures'], function (key) {
Object.defineProperty(self, key, {
Object.defineProperty(this, key, {
get: function () {
var value = db._jobs.document(this.id)[key];
return (value && typeof value === 'object') ? Object.freeze(value) : value;
},
set: failImmutable(key),
configurable: false,
enumerable: true
});
});
}, this);
}
_.extend(Job.prototype, {
@ -180,12 +173,11 @@ _.extend(Job.prototype, {
action: function () {
var job = db._jobs.document(self.id);
if (job.status !== 'completed') {
job.failures.push(flatten(new Error('Job aborted.')));
db._jobs.update(job, {
status: 'failed',
modified: Date.now(),
failures: job.failures.concat([
flatten(new Error('Job aborted.'))
])
failures: job.failures
});
}
}
@ -195,7 +187,7 @@ _.extend(Job.prototype, {
db._jobs.update(this.id, {
status: 'pending'
});
queues._clearCache();
resetQueueControl();
}
});
@ -204,7 +196,6 @@ function Queue(name) {
get: function () {
return name;
},
set: failImmutable('name'),
configurable: false,
enumerable: true
});
@ -221,10 +212,10 @@ _.extend(Queue.prototype, {
}
// _jobTypes are database-specific
var dbName = db._name();
if (! queues._jobTypes.hasOwnProperty(dbName)) {
queues._jobTypes[dbName] = { };
if (!jobTypeCache[dbName]) {
jobTypeCache[dbName] = {};
}
type = queues._jobTypes[dbName][name];
type = jobTypeCache[dbName][name];
if (type !== undefined) {
if (type.schema) {
@ -242,7 +233,7 @@ _.extend(Queue.prototype, {
} else {
throw new Error('Unknown job type: ' + name);
}
queues._clearCache();
resetQueueControl();
now = Date.now();
return db._jobs.save({
status: 'pending',
@ -260,7 +251,7 @@ _.extend(Queue.prototype, {
})._id;
},
get: function (id) {
if (id === undefined || id === null) {
if (typeof id !== 'string') {
throw new Error('Invalid job id');
}
if (!id.match(/^_jobs\//)) {
@ -285,7 +276,7 @@ _.extend(Queue.prototype, {
action: function () {
try {
db._jobs.remove(id);
queues._clearCache();
resetQueueControl();
return true;
} catch (err) {
return false;
@ -310,9 +301,16 @@ _.extend(Queue.prototype, {
}
});
queues.create('default');
createQueue('default');
module.exports = queues;
module.exports = {
_jobTypes: jobTypeCache,
_clearCache: resetQueueControl,
get: getQueue,
create: createQueue,
delete: deleteQueue,
registerJobType: registerJobType
};
// -----------------------------------------------------------------------------
// --SECTION-- END-OF-FILE