mirror of https://gitee.com/bigwinds/arangodb
329 lines
8.0 KiB
JavaScript
329 lines
8.0 KiB
JavaScript
'use strict';
|
|
|
|
// //////////////////////////////////////////////////////////////////////////////
|
|
// / DISCLAIMER
|
|
// /
|
|
// / Copyright 2014 triAGENS GmbH, Cologne, Germany
|
|
// / Copyright 2015 ArangoDB GmbH, Cologne, Germany
|
|
// /
|
|
// / Licensed under the Apache License, Version 2.0 (the "License")
|
|
// / you may not use this file except in compliance with the License.
|
|
// / You may obtain a copy of the License at
|
|
// /
|
|
// / http://www.apache.org/licenses/LICENSE-2.0
|
|
// /
|
|
// / Unless required by applicable law or agreed to in writing, software
|
|
// / distributed under the License is distributed on an "AS IS" BASIS,
|
|
// / WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// / See the License for the specific language governing permissions and
|
|
// / limitations under the License.
|
|
// /
|
|
// / Copyright holder is ArangoDB GmbH, Cologne, Germany
|
|
// /
|
|
// / @author Alan Plum
|
|
// //////////////////////////////////////////////////////////////////////////////
|
|
|
|
const isCluster = require('@arangodb/cluster').isCluster();
|
|
const isAgent = global.ArangoAgent.enabled();
|
|
|
|
var _ = require('lodash');
|
|
var internal = require('internal');
|
|
var flatten = require('internal').flatten;
|
|
var arangodb = require('@arangodb');
|
|
var joi = require('joi');
|
|
var qb = require('aqb');
|
|
var db = arangodb.db;
|
|
|
|
var queueCache = {};
|
|
var jobCache = {};
|
|
|
|
function validate (data, schema) {
|
|
if (!schema) {
|
|
schema = joi.forbidden();
|
|
}
|
|
var raw = data;
|
|
var isTuple = Boolean(
|
|
schema._meta && schema._meta.some((meta) => meta.isTuple)
|
|
);
|
|
if (isTuple) {
|
|
raw = Array.isArray(raw) ? raw : [raw];
|
|
data = Object.assign({}, raw);
|
|
}
|
|
var result = schema.validate(data);
|
|
if (result.error) {
|
|
throw result.error;
|
|
}
|
|
if (isTuple) {
|
|
return raw.map((x, i) => result.value[i]);
|
|
}
|
|
return result.value;
|
|
}
|
|
|
|
function updateQueueDelay () {
|
|
try {
|
|
var delayUntil = db._query(global.aqlQuery`
|
|
LET queues = (FOR queue IN _queues RETURN queue._key)
|
|
FOR job IN _jobs
|
|
FILTER ('pending' == job.status)
|
|
FILTER POSITION(queues, job.queue, false)
|
|
FILTER (null != job.delayUntil)
|
|
SORT job.delayUntil ASC
|
|
LIMIT 1
|
|
RETURN job.delayUntil`).next();
|
|
if (typeof delayUntil !== 'number') {
|
|
delayUntil = -1;
|
|
}
|
|
global.KEYSPACE_CREATE('queue-control', 1, true);
|
|
global.KEY_SET('queue-control', 'delayUntil', delayUntil);
|
|
} catch (e) {}
|
|
}
|
|
|
|
function getQueue (key) {
|
|
var databaseName = db._name();
|
|
var cache = queueCache[databaseName];
|
|
if (!cache) {
|
|
cache = queueCache[databaseName] = {};
|
|
}
|
|
if (!cache[key]) {
|
|
if (!db._queues.exists(key)) {
|
|
throw new Error('Queue does not exist: ' + key);
|
|
}
|
|
cache[key] = new Queue(key);
|
|
}
|
|
return cache[key];
|
|
}
|
|
|
|
function createQueue (key, maxWorkers) {
|
|
internal.createQueue(key, maxWorkers || 1);
|
|
|
|
var databaseName = db._name();
|
|
var cache = queueCache[databaseName];
|
|
if (!cache) {
|
|
cache = queueCache[databaseName] = {};
|
|
}
|
|
cache[key] = new Queue(key);
|
|
return cache[key];
|
|
}
|
|
|
|
function deleteQueue (key) {
|
|
var result = false;
|
|
try {
|
|
internal.deleteQueue(key);
|
|
result = true;
|
|
} catch (e) {
|
|
console.warn('Deleting queue \'' + key + '\' failed: ' + e.message);
|
|
}
|
|
return result;
|
|
}
|
|
|
|
function getJobs (queue, status, type) {
|
|
var query = qb.for('job').in('_jobs');
|
|
var vars = {};
|
|
|
|
if (queue !== undefined) {
|
|
query = query.filter(qb.ref('@queue').eq('job.queue'));
|
|
vars.queue = queue;
|
|
}
|
|
|
|
if (status !== undefined) {
|
|
query = query.filter(qb.ref('@status').eq('job.status'));
|
|
vars.status = status;
|
|
}
|
|
|
|
if (type !== undefined) {
|
|
query = query.filter(
|
|
qb.ref('@type.name').eq('job.type.name')
|
|
.and(qb.ref('@type.mount').eq('job.type.mount'))
|
|
);
|
|
vars.type = {
|
|
name: type.name,
|
|
mount: type.mount
|
|
};
|
|
}
|
|
|
|
return db._createStatement({
|
|
query: query.sort('job.delayUntil', 'ASC').return('job._id'),
|
|
bindVars: vars
|
|
}).execute().toArray();
|
|
}
|
|
|
|
function Job (id) {
|
|
Object.defineProperty(this, 'id', {
|
|
value: id,
|
|
writable: false,
|
|
configurable: false,
|
|
enumerable: true
|
|
});
|
|
_.each(['data', 'status', 'type', 'failures', 'runs', 'runFailures'], (key) => {
|
|
Object.defineProperty(this, key, {
|
|
get () {
|
|
var value = db._jobs.document(this.id)[key];
|
|
return (value && typeof value === 'object') ? Object.freeze(value) : value;
|
|
},
|
|
configurable: false,
|
|
enumerable: true
|
|
});
|
|
});
|
|
}
|
|
|
|
Object.assign(Job.prototype, {
|
|
abort () {
|
|
var self = this;
|
|
db._executeTransaction({
|
|
collections: {
|
|
exclusive: ['_jobs']
|
|
},
|
|
action () {
|
|
var job = db._jobs.document(self.id);
|
|
if (job.status !== 'completed') {
|
|
job.failures.push(flatten(new Error('Job aborted.')));
|
|
db._jobs.update(job, {
|
|
status: 'failed',
|
|
modified: Date.now(),
|
|
failures: job.failures
|
|
});
|
|
}
|
|
}
|
|
});
|
|
},
|
|
reset () {
|
|
db._jobs.update(this.id, {
|
|
status: 'pending'
|
|
});
|
|
updateQueueDelayClusterAware();
|
|
}
|
|
});
|
|
|
|
function Queue (name) {
|
|
Object.defineProperty(this, 'name', {
|
|
get: () => name,
|
|
configurable: false,
|
|
enumerable: true
|
|
});
|
|
}
|
|
|
|
function asNumber (num) {
|
|
if (!num) {
|
|
return 0;
|
|
}
|
|
if (num === Infinity) {
|
|
return -1;
|
|
}
|
|
return num ? Number(num) : 0;
|
|
}
|
|
|
|
function updateQueueDelayClusterAware () {
|
|
if (isCluster && !isAgent) {
|
|
global.ArangoAgency.set('Current/FoxxmasterQueueupdate', true);
|
|
}
|
|
updateQueueDelay();
|
|
}
|
|
|
|
Object.assign(Queue.prototype, {
|
|
push (type, data, opts) {
|
|
if (!type) {
|
|
throw new Error('Must pass a job type!');
|
|
}
|
|
|
|
type = Object.assign({}, type);
|
|
|
|
if (type.schema) {
|
|
data = validate(data, type.schema);
|
|
delete type.schema;
|
|
}
|
|
|
|
if (type.preprocess) {
|
|
data = type.preprocess(data);
|
|
}
|
|
|
|
var now = Date.now();
|
|
var job = {
|
|
status: 'pending',
|
|
queue: this.name,
|
|
type: type,
|
|
failures: [],
|
|
runs: 0,
|
|
data: data,
|
|
created: now,
|
|
modified: now
|
|
};
|
|
|
|
if (!opts) {
|
|
opts = {};
|
|
}
|
|
|
|
job.type.preprocess = typeof job.type.preprocess === 'function' ? job.type.preprocess.toString() : job.type.preprocess;
|
|
job.delayUntil = asNumber(opts.delayUntil) || now;
|
|
job.delayUntil += asNumber(opts.delay);
|
|
|
|
job.maxFailures = asNumber(opts.maxFailures);
|
|
|
|
job.repeatDelay = asNumber(opts.repeatDelay);
|
|
job.repeatTimes = asNumber(opts.repeatTimes);
|
|
job.repeatUntil = asNumber(opts.repeatUntil) || -1;
|
|
|
|
job.backOff = typeof opts.backOff === 'function' ? opts.backOff.toString() : opts.backOff;
|
|
job.success = typeof opts.success === 'function' ? opts.success.toString() : opts.success;
|
|
job.failure = typeof opts.failure === 'function' ? opts.failure.toString() : opts.failure;
|
|
|
|
job = db._jobs.save(job);
|
|
|
|
updateQueueDelayClusterAware();
|
|
return job._id;
|
|
},
|
|
get (id) {
|
|
if (typeof id !== 'string') {
|
|
throw new Error('Invalid job id');
|
|
}
|
|
if (!id.match(/^_jobs\//)) {
|
|
id = '_jobs/' + id;
|
|
}
|
|
// jobs are database-specific
|
|
var databaseName = db._name();
|
|
if (!jobCache[databaseName]) {
|
|
jobCache[databaseName] = {};
|
|
}
|
|
if (!jobCache[databaseName][id]) {
|
|
jobCache[databaseName][id] = new Job(id);
|
|
}
|
|
return jobCache[databaseName][id];
|
|
},
|
|
delete (id) {
|
|
return db._executeTransaction({
|
|
collections: {
|
|
write: ['_jobs']
|
|
},
|
|
action () {
|
|
try {
|
|
db._jobs.remove(id);
|
|
return true;
|
|
} catch (err) {
|
|
return false;
|
|
}
|
|
}
|
|
});
|
|
},
|
|
pending (type) {
|
|
return getJobs(this.name, 'pending', type);
|
|
},
|
|
complete (type) {
|
|
return getJobs(this.name, 'complete', type);
|
|
},
|
|
failed (type) {
|
|
return getJobs(this.name, 'failed', type);
|
|
},
|
|
progress (type) {
|
|
return getJobs(this.name, 'progress', type);
|
|
},
|
|
all (type) {
|
|
return getJobs(this.name, undefined, type);
|
|
}
|
|
});
|
|
|
|
module.exports = {
|
|
_updateQueueDelay: updateQueueDelay,
|
|
get: getQueue,
|
|
create: createQueue,
|
|
delete: deleteQueue
|
|
};
|