1
0
Fork 0

More fixing of the queues.

This commit is contained in:
Alan Plum 2015-06-10 14:17:23 +02:00
parent ddf39db6c8
commit a7a1ca2e00
8 changed files with 139 additions and 153 deletions

View File

@ -526,22 +526,30 @@ v2.6.0 (XXXX-XX-XX)
up and checking the queues more frequently, which may increase CPU usage of the server.
When not using Foxx queues, this value can be raised to save some CPU time.
* added startup option `--server.foxx-queues-system-only`
* added startup option `--server.foxx-queues-warmup-exports`
This startup option controls whether the Foxx queue manager will check queue and job entries
in the `_system` database only. Restricting the Foxx queue manager to the `_system` database
will lead to the queue manager having to check only the queues collection of a single database,
whereas making it check the queues of all databases might result in more work to be done and
more CPU time to be used by the queue manager.
This startup option makes sure the exports of all installed Foxx apps are pre-loaded when
the server is restarted. This prevents a known issue with function-based Foxx queue job types.
It is recommended that you upgrade any existing function-based job types to script-based ones
to avoid the issue entirely.
The default value is `true`, so that the queue manager will only check the queues in the
`_system` database.
The default value is `true`. Disabling this feature can reduce memory usage and startup time
but may cause issues with function-based job types.
* added startup option `--server.foxx-queues`
This startup option controls whether the Foxx queue manager will check queue and job entries.
Disabling this option can reduce server load but will prevent jobs added to Foxx queues from
being processed at all.
The default value is `true`, enabling the Foxx queues feature.
* make Foxx queues really database-specific.
Foxx queues were and are stored in a database-specific collection `_queues`. However, a global
cache variable for the queues led to the queue names being treated database-independently, which
was wrong.
Since 2.6, Foxx queues names are truly database-specific, so the same queue name can be used in
two different databases for two different queues. Until then, it is advisable to think of queues
as already being database-specific, and using the database name as a queue name prefix to be

View File

@ -4,17 +4,15 @@ If you have never built a Foxx App, then you can make your own [first Foxx App](
Foxx allows defining job queues that let you perform slow or expensive actions asynchronously. These queues can be used to send e-mails, call external APIs or perform other actions that you do not want to perform directly or want to retry on failure.
This feature can be disabled by setting the startup option `--server.foxx-queues` to `false`. This will disable the queue manager and prevent any jobs from being processed, which may improve CPU load if you do not plan to use Foxx queues at all.
Please note that Foxx job queues are database-specific. Queues and jobs are always relative to the database in which they are created or accessed.
By default only jobs from queues in the `_system` database will be processed automatically by the Foxx queue manager. To automatically process the queues of other databases than `_system`, you have to start ArangoDB with the option `--server.foxx-queues-system-only false`.
Restricting the Foxx queue manager to the `_system` database will lead to the queue manager only having to check the queues of a single database. Checking the queues of *all* databases might result in more work and more CPU time used by the queue manager.
By default the Foxx queue manager will be checking the queues once every second. This value can be changed by setting the startup option `--server.foxx-manager-poll-interval`. Lowering this value will result in the queue manager waking up and checking the queues more frequently, which may increase CPU usage of the server.
By default the Foxx queue manager will be checking the queues once every second. This value can be changed by setting the startup option `--server.foxx-queues-poll-interval`. Lowering this value will result in the queue manager waking up and checking the queues more frequently, which may increase CPU usage of the server.
For the low-level functionality see [the chapter on the **Task Management** module](../../ModuleTasks/).
As of ArangoDB 2.6, Foxx queue job types are defined as regular Foxx scripts. To learn more about Foxx scripts, see [the chapter on Foxx scripts](./Scripts.html).
As of ArangoDB 2.6, Foxx queue job types are defined as regular Foxx scripts. To learn more about Foxx scripts, see [the chapter on Foxx scripts](./Scripts.html). Function-based job types are still supported in 2.6 but will be removed in 2.7.
For an example of re-usable job types see the various mailer apps available in the Foxx app store.

View File

@ -315,7 +315,7 @@ http://jsteemann.github.io/blog/2015/04/14/updating-documents-with-arangoimp/
up and checking the queues more frequently, which may increase CPU usage of the server.
When not using Foxx queues, this value can be raised to save some CPU time.
* Added configuration option `--server.foxx-queues-system-only`
* Added configuration option `--server.foxx-queues`
This startup option controls whether the Foxx queue manager will check queue and job entries
in the `_system` database only. Restricting the Foxx queue manager to the `_system` database
@ -323,3 +323,10 @@ http://jsteemann.github.io/blog/2015/04/14/updating-documents-with-arangoimp/
whereas making it check the queues of all databases might result in more work to be done and
more CPU time to be used by the queue manager.
* added startup option `--server.foxx-queues-warmup-exports`
This startup option makes sure the exports of all installed Foxx apps are pre-loaded when
the server is restarted. This prevents a known issue with function-based Foxx queue job types.
It is recommended that you upgrade any existing function-based job types to script-based ones
to avoid the issue entirely.

View File

@ -339,9 +339,8 @@ ArangoServer::ArangoServer (int argc, char** argv)
_disableReplicationApplier(false),
_disableQueryTracking(false),
_foxxQueues(true),
_foxxQueuesSystemOnly(true),
_foxxQueuesWarmupExports(true),
_foxxQueuesPollInterval(1.0),
_foxxQueuesPollSleep(5.0),
_server(nullptr),
_queryRegistry(nullptr),
_pairForAql(nullptr),
@ -621,8 +620,6 @@ void ArangoServer::buildApplicationServer () {
("server.foxx-queues", &_foxxQueues, "enable Foxx queues")
("server.foxx-queues-warmup-exports", &_foxxQueuesWarmupExports, "enable pre-loading of Foxx exports for Foxx queues")
("server.foxx-queues-poll-interval", &_foxxQueuesPollInterval, "Foxx queue manager poll interval (in seconds)")
("server.foxx-queues-poll-sleep", &_foxxQueuesPollSleep, "Foxx queue manager sleep duration when queue is empty (in seconds)")
("server.foxx-queues-system-only", &_foxxQueuesSystemOnly, "run Foxx queues in _system database only")
("server.session-timeout", &VocbaseContext::ServerSessionTtl, "timeout of web interface server sessions (in seconds)")
;

View File

@ -548,22 +548,6 @@ namespace triagens {
bool _foxxQueuesWarmupExports;
////////////////////////////////////////////////////////////////////////////////
/// @brief restrict the Foxx queues to run in the _system database only
/// @startDocuBlock foxxQueuesSystemOnly
/// `--server.foxx-queues-system-only flag`
///
/// If *true*, the Foxx queues will be handled and jobs in them executed only
/// for queues and jobs in the _system database. Queues and jobs of all other
/// databases will be ignored then.
///
/// The default is *true*. It should only be changed if Foxx queues are not used
/// with other databases than *_system*.
/// @endDocuBlock
////////////////////////////////////////////////////////////////////////////////
bool _foxxQueuesSystemOnly;
////////////////////////////////////////////////////////////////////////////////
/// @brief poll interval for Foxx queues
/// @startDocuBlock foxxQueuesPollInterval
@ -582,21 +566,6 @@ namespace triagens {
double _foxxQueuesPollInterval;
////////////////////////////////////////////////////////////////////////////////
/// @brief poll sleep duration for Foxx queues
/// @startDocuBlock foxxQueuesPollSleepDuration
/// `--server.foxx-queues-poll-sleep-duration value`
///
/// The number of seconds the Foxx queues manager will sleep when the queue
/// is empty. Lower values will mean more immediate job execution.
///
/// The default is *5*. If Foxx queues are not used much, then this value
/// may be increased to make the queues thread wake up less.
/// @endDocuBlock
////////////////////////////////////////////////////////////////////////////////
double _foxxQueuesPollSleep;
////////////////////////////////////////////////////////////////////////////////
/// @brief unit tests
///

View File

@ -63,16 +63,29 @@ function validate(data, schema) {
return result.value;
}
function resetQueueDelay() {
function updateQueueDelay() {
try {
global.KEY_SET("queue-control", "delayUntil", db._query(
'LET queues = (FOR queue IN _queues RETURN queue.name)'
+ ' FOR job IN _jobs'
+ ' FILTER job.status == "pending"'
+ ' FILTER POSITION(queues, job.queue, false) '
+ ' SORT job.delayUntil ASC'
+ ' RETURN job.delayUntil'
)[0]);
db._executeTransaction({
collections: {
read: ['_queues', '_jobs']
},
action: function () {
var delayUntil = db._query(
qb.let('queues', qb.for('queue').in('_queues').return('queue._key'))
.for('job').in('_jobs')
.filter(qb('pending').eq('job.status'))
.filter(qb.POSITION('queues', 'job.queue', false))
.filter(qb(null).neq('job.delayUntil'))
.sort('job.delayUntil', 'ASC')
.return('job.delayUntil')
).next();
if (typeof delayUntil !== 'number') {
delayUntil = -1;
}
global.KEYSPACE_CREATE('queue-control', 1, true);
global.KEY_SET('queue-control', 'delayUntil', delayUntil);
}
});
} catch (e) {}
}
@ -231,7 +244,7 @@ _.extend(Job.prototype, {
db._jobs.update(this.id, {
status: 'pending'
});
resetQueueDelay();
updateQueueDelay();
}
});
@ -303,7 +316,7 @@ _.extend(Queue.prototype, {
onSuccess: opts.success ? opts.success.toString() : null,
onFailure: opts.failure ? opts.failure.toString() : null
});
resetQueueDelay();
updateQueueDelay();
return job._id;
},
get: function (id) {
@ -360,7 +373,7 @@ createQueue('default');
module.exports = {
_jobTypes: jobTypeCache,
_clearCache: resetQueueDelay,
_updateQueueDelay: updateQueueDelay,
get: getQueue,
create: createQueue,
delete: deleteQueue,

View File

@ -33,92 +33,97 @@ var db = require('org/arangodb').db;
var qb = require('aqb');
var runInDatabase = function () {
var sleep = true;
db._queues.all().toArray()
.forEach(function (queue) {
var numBusy = db._jobs.byExample({
queue: queue._key,
status: 'progress'
}).count();
if (numBusy >= queue.maxWorkers) {
return;
}
var jobs = db._createStatement({
query: (
qb.for('job').in('_jobs')
.filter(
qb('pending').eq('job.status')
.and(qb.ref('@queue').eq('job.queue'))
.and(qb.ref('@now').gte('job.delayUntil'))
)
.sort('job.delayUntil', 'ASC')
.limit('@max')
.return('job')
),
bindVars: {
queue: queue._key,
now: Date.now(),
max: queue.maxWorkers - numBusy
}
}).execute().toArray();
var busy = false;
db._executeTransaction({
collections: {
read: ['_queues', '_jobs'],
write: ['_jobs']
},
action: function () {
db._queues.all().toArray()
.forEach(function (queue) {
var numBusy = db._jobs.byExample({
queue: queue._key,
status: 'progress'
}).count();
sleep = sleep && !jobs.length;
jobs.forEach(function (doc) {
db._jobs.update(doc, {status: 'progress'});
tasks.register({
command: function (cfg) {
var db = require('org/arangodb').db;
var initialDatabase = db._name();
db._useDatabase(cfg.db);
try {
require('org/arangodb/foxx/queues/worker').work(cfg.job);
} catch(e) {}
db._useDatabase(initialDatabase);
},
offset: 0,
isSystem: true,
params: {
job: _.extend({}, doc, {status: 'progress'}),
db: db._name()
if (numBusy >= queue.maxWorkers) {
busy = true;
return;
}
var jobs = db._createStatement({
query: (
qb.for('job').in('_jobs')
.filter(
qb('pending').eq('job.status')
.and(qb.ref('@queue').eq('job.queue'))
.and(qb.ref('@now').gte('job.delayUntil'))
)
.sort('job.delayUntil', 'ASC')
.limit('@max')
.return('job')
),
bindVars: {
queue: queue._key,
now: Date.now(),
max: queue.maxWorkers - numBusy
}
}).execute().toArray();
if (jobs.length > 0) {
busy = true;
}
jobs.forEach(function (job) {
db._jobs.update(job, {status: 'progress'});
tasks.register({
command: function (cfg) {
var db = require('org/arangodb').db;
var initialDatabase = db._name();
db._useDatabase(cfg.db);
try {
require('org/arangodb/foxx/queues/worker').work(cfg.job);
} catch(e) {}
db._useDatabase(initialDatabase);
},
offset: 1,
isSystem: true,
params: {
job: _.extend({}, job, {status: 'progress'}),
db: db._name()
}
});
});
});
});
}
});
return sleep;
if (!busy) {
require('org/arangodb/foxx/queues')._updateQueueDelay();
}
};
exports.manage = function (systemOnly, sleepDuration) {
exports.manage = function () {
var initialDatabase = db._name();
var databases = systemOnly ? ['_system'] : db._listDatabases();
var databases = db._listDatabases();
databases.forEach(function (database) {
try {
db._useDatabase(database);
global.KEYSPACE_CREATE('queue-control', 2, true);
global.KEYSPACE_CREATE('queue-control', 1, true);
var delayUntil = global.KEY_GET('queue-control', 'delayUntil') || 0;
if (delayUntil > Date.now()) {
if (delayUntil > Date.now() || delayUntil === -1) {
return;
}
var queues = db._collection('_queues');
var jobs = db._collection('_jobs');
if (queues && jobs) {
db._executeTransaction({
collections: {
read: ['_queues', '_jobs'],
write: ['_jobs']
},
action: function () {
var sleep = runInDatabase();
if (sleep) {
global.KEY_SET('queue-control', 'delayUntil', Date.now() + sleepDuration);
}
}
});
if (!queues || !jobs || !queues.count() || !jobs.count()) {
global.KEY_SET('queue-control', 'delayUntil', -1);
} else {
global.KEY_SET('queue-control', 'delayUntil', Date.now() + sleepDuration);
runInDatabase();
}
} catch (e) {}
});
@ -137,6 +142,7 @@ exports.run = function () {
}
// warmup exports
// TODO remove this feature along with function-based job types in 2.7
if (options['server.foxx-queues-warmup-exports'] !== false) {
require('org/arangodb/foxx/manager')._warmupAllExports();
}
@ -147,26 +153,21 @@ exports.run = function () {
period = options['server.foxx-queues-poll-interval'];
}
// sleep duration when idling
var sleepDuration = 5;
if (options.hasOwnProperty('server.foxx-queues-poll-sleep-duration')) {
sleepDuration = options['server.foxx-queues-poll-sleep-duration'];
}
// only use system database
var systemOnly = false;
if (options.hasOwnProperty('server.foxx-queues-system-only')) {
systemOnly = options['server.foxx-queues-system-only'];
}
db._jobs.updateByExample({status: 'progress'}, {status: 'pending'});
var initialDatabase = db._name();
db._listDatabases().forEach(function (name) {
try {
db._useDatabase(name);
db._jobs.updateByExample({status: 'progress'}, {status: 'pending'});
require('org/arangodb/foxx/queues')._updateQueueDelay();
} catch(e) {}
});
db._useDatabase(initialDatabase);
return tasks.register({
command: function (params) {
require('org/arangodb/foxx/queues/manager').manage(params.systemOnly, params.sleepDuration);
require('org/arangodb/foxx/queues/manager').manage();
},
period: period,
isSystem: true,
params: {systemOnly: systemOnly, sleepDuration: sleepDuration * 1000}
isSystem: true
});
};

View File

@ -108,7 +108,7 @@ exports.work = function (job) {
fm.runScript(cfg.name, cfg.mount, [].concat(job.data, job._id));
}
} catch (e) {
console.error('Job %s failed:\n%s', job._key, e.stack || String(e));
console.errorLines('Job %s failed:\n%s', job._key, e.stack || String(e));
job.failures.push(flatten(e));
success = false;
}
@ -139,14 +139,7 @@ exports.work = function (job) {
failures: job.failures,
status: 'pending'
});
global.KEY_SET("queue-control", "delayUntil", db._query(
'LET queues = (FOR queue IN _queues RETURN queue.name)'
+ ' FOR job IN _jobs'
+ ' FILTER job.status == "pending"'
+ ' FILTER POSITION(queues, job.queue, false) '
+ ' SORT job.delayUntil ASC'
+ ' RETURN job.delayUntil'
)[0]);
queues._updateQueueDelay();
}
});
}