1
0
Fork 0

Bug fix 3.3/foxx queues (#3930)

* fix issue #3858

* added tests and updated documentation for Foxx queues
This commit is contained in:
Jan 2017-12-07 10:21:01 +01:00 committed by Frank Celler
parent 21e275d1a1
commit 68a2f331e9
8 changed files with 253 additions and 45 deletions

View File

@ -255,7 +255,35 @@ Returns the job id.
Function to pre-process a job's (validated) data before serializing it in the queue.
* **repeatTimes**: `Function` (Default: `0`)
* **data**: `any`
Job data of the job; must be serializable to JSON.
* **opts**: `object` (optional)
Object with any of the following properties:
* **success**: `Function` (optional)
Function to be called after the job has been completed successfully.
* **failure**: `Function` (optional)
Function to be called after the job has failed too many times.
* **delayUntil**: `number | Date` (Default: `Date.now()`)
Timestamp in milliseconds (or `Date` instance) until which the execution of the job should be delayed.
* **backOff**: `Function | number` (Default: `1000`)
See *script.backOff*.
* **maxFailures**: `number | Infinity` (Default: `0`):
See *script.maxFailures*.
* **repeatTimes**: `number | Function` (Default: `0`)
If set to a positive number, the job will be repeated this many times (not counting recovery when using *maxFailures*). If set to a negative number or `Infinity`, the job will be repeated indefinitely. If set to `0` the job will not be repeated.
@ -267,46 +295,6 @@ Returns the job id.
If the job is set to automatically repeat, this can be set to a non-negative value to set the number of milliseconds for which the job will be delayed before it is started again.
* **data**: `any`
Job data of the job; must be serializable to JSON.
* **opts**: `object` (optional)
Object with any of the following properties:
* **success**: `Function` (optional)
Function to be called after the job has been completed successfully.
* **failure**: `Function` (optional)
Function to be called after the job has failed too many times.
* **delayUntil**: `number | Date` (Default: `Date.now()`)
Timestamp in milliseconds (or `Date` instance) until which the execution of the job should be delayed.
* **backOff**: `Function | number` (Default: `1000`)
See *script.backOff*.
* **maxFailures**: `number | Infinity` (Default: `0`):
See *script.maxFailures*.
* **repeatTimes**: `Function` (Default: `0`)
See *script.repeatTimes*.
* **repeatUntil**: `number | Date` (optional)
See *script.repeatUntil*.
* **repeatDelay**: `number` (Default: `0`)
See *script.repeatDelay*.
Note that if you pass a function for the *backOff* calculation, *success* callback or *failure* callback options the function will be serialized to the database as a string and therefore must not rely on any external scope or external variables.
When the job is set to automatically repeat, the *failure* callback will only be executed when a run of the job has failed more than *maxFailures* times. Note that if the job fails and *maxFailures* is set, it will be rescheduled according to the *backOff* until it has either failed too many times or completed successfully before being scheduled according to the *repeatDelay* again. Recovery attempts by *maxFailures* do not count towards *repeatTimes*.

View File

@ -34,8 +34,6 @@ var console = require('console');
var actions = require('@arangodb/actions');
var arangodb = require('@arangodb');
// var queue = Foxx.queues.create("internal-demo-queue")
// //////////////////////////////////////////////////////////////////////////////
// / @brief was docuBlock JSF_get_admin_time
// //////////////////////////////////////////////////////////////////////////////

View File

@ -0,0 +1,4 @@
'use strict';
const router = require('@arangodb/foxx/router')();
module.context.use(router);

View File

@ -0,0 +1 @@
require("internal").sleep(15);

View File

@ -0,0 +1,6 @@
{
"main": "index.js",
"scripts": {
"job": "job.js"
}
}

View File

@ -111,7 +111,7 @@ function deleteQueue (key) {
internal.deleteQueue(key);
result = true;
} catch (e) {
internal.print('Deleting queue failed: ' + e.message);
console.warn('Deleting queue \'' + key + '\' failed: ' + e.message);
}
return result;
}
@ -252,6 +252,7 @@ Object.assign(Queue.prototype, {
opts = {};
}
job.type.preprocess = typeof job.type.preprocess === 'function' ? job.type.preprocess.toString() : job.type.preprocess;
job.delayUntil = asNumber(opts.delayUntil) || now;
job.delayUntil += asNumber(opts.delay);

View File

@ -180,7 +180,7 @@ exports.run = function () {
try {
db._useDatabase(name);
db._jobs.toArray().filter(function(job) {
return job.stats === 'progress';
return job.status === 'progress';
}).forEach(function(job) {
db._jobs.update(job._id, { status: 'pending' });
});

View File

@ -0,0 +1,210 @@
/*global assertEqual, assertNotEqual, assertFalse, fail */
////////////////////////////////////////////////////////////////////////////////
/// @brief test foxx queues
///
/// @file
///
/// DISCLAIMER
///
/// Copyright 2010-2012 triagens GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Jan Steemann
/// @author Copyright 2015, ArangoDB GmbH, Cologne, Germany
////////////////////////////////////////////////////////////////////////////////
"use strict";
var jsunity = require("jsunity");
var internal = require("internal");
var db = require("internal").db;
var queues = require('@arangodb/foxx/queues');
var FoxxManager = require('org/arangodb/foxx/manager');
var fs = require('fs');
var basePath = fs.makeAbsolute(fs.join(internal.startupPath, 'common', 'test-data', 'apps'));
function foxxQueuesSuite () {
var cn = "UnitTestsFoxx";
var collection = null;
var qn = "FoxxCircus";
return {
setUp : function () {
db._drop(cn);
collection = db._create(cn);
queues.delete(qn);
},
tearDown : function () {
db._drop(cn);
collection = null;
queues.delete(qn);
},
testCreateEmptyQueue : function () {
var queue = queues.create(qn);
assertEqual([], queue.all());
},
testCreateQueueAndFetch : function () {
var queue = queues.create(qn);
assertEqual(qn, queues.get(qn).name);
},
testCreateDelayedJob : function () {
var delay = { delayUntil: Date.now() + 1000 * 86400 };
var queue = queues.create(qn);
var id = queue.push({
name: 'testi',
mount: '/test'
}, {}, delay);
assertEqual(1, queue.all().length);
var job = db._jobs.document(id);
assertEqual(id, job._id);
assertEqual("pending", job.status);
assertEqual(qn, job.queue);
assertEqual("testi", job.type.name);
assertEqual("/test", job.type.mount);
assertEqual(0, job.runs);
assertEqual([], job.failures);
assertEqual({}, job.data);
assertEqual(0, job.maxFailures);
assertEqual(0, job.repeatTimes);
assertEqual(0, job.repeatDelay);
assertEqual(-1, job.repeatUntil);
},
testCreateAndExecuteJobThatWillFail : function () {
var queue = queues.create(qn);
var id = queue.push({
name: 'testi',
mount: '/test-this-does-not-exist-please-ignore'
}, {});
var tries = 0;
while (++tries < 60) {
var result = db._jobs.document(id);
if (result.status === 'pending' || result.status === 'progress') {
internal.wait(0.5, false);
} else {
break;
}
}
assertEqual('failed', db._jobs.document(id).status);
},
testPreprocessJobData : function () {
var queue = queues.create(qn);
var id = queue.push({
name: 'peng',
mount: '/_admin/aardvark',
preprocess: function(data) { data.hund = 'hans'; delete data.meow; return data; }
}, { a: 1, b: 2, meow: true });
var job = db._jobs.document(id);
assertEqual(1, job.data.a);
assertEqual(2, job.data.b);
assertEqual('hans', job.data.hund);
assertFalse(job.data.hasOwnProperty('meow'));
},
testExecuteJobThatWillTakeLong : function () {
try {
FoxxManager.uninstall('/unittest/queues-test', {force: true});
} catch (err) {}
FoxxManager.install(fs.join(basePath, 'queues-test'), '/unittest/queues-test');
try {
var queue = queues.create(qn);
var id = queue.push({
name: 'job',
mount: '/unittest/queues-test'
}, {});
var tries = 0;
while (++tries < 120) {
var result = db._jobs.document(id);
if (result.status === 'pending') {
internal.wait(0.5, false);
} else {
break;
}
}
// should still be still in progress
assertEqual('progress', db._jobs.document(id).status);
} finally {
FoxxManager.uninstall('/unittest/queues-test', {force: true});
}
},
testExecuteJob : function () {
assertEqual(0, db.UnitTestsFoxx.count());
var queue = queues.create(qn);
var id = queue.push({
name: 'peng',
mount: '/_admin/aardvark'
}, {}, { failure: function() {
require("internal").db.UnitTestsFoxx.insert({ failed: true });
}});
var tries = 0;
while (++tries < 60) {
var result = db._jobs.document(id);
if (result.status === 'pending' || result.status === 'progress') {
internal.wait(0.5, false);
} else {
break;
}
}
// should have failed
assertEqual('failed', db._jobs.document(id).status);
assertNotEqual(0, db.UnitTestsFoxx.count());
},
testExecuteRepeatedJob : function () {
assertEqual(0, db.UnitTestsFoxx.count());
var queue = queues.create(qn);
var id = queue.push({
name: 'peng',
mount: '/_admin/aardvark',
repeatDelay: 1, // milliseconds
maxFailures: 3
}, {}, { failure: function() {
require("internal").db.UnitTestsFoxx.insert({ failed: true });
}});
var tries = 0;
while (++tries < 120) {
var result = db._jobs.document(id);
if (result.status === 'pending' || result.status === 'progress') {
internal.wait(0.5, false);
} else {
break;
}
}
// should have failed
assertEqual('failed', db._jobs.document(id).status);
assertNotEqual(3, db.UnitTestsFoxx.count());
}
};
}
jsunity.run(foxxQueuesSuite);
return jsunity.done();