1
0
Fork 0

Merge branch 'aql2' of ssh://github.com/triAGENS/ArangoDB into aql2

This commit is contained in:
Max Neunhoeffer 2014-09-08 15:05:46 +02:00
commit 5ee7e47879
6 changed files with 47 additions and 23 deletions

View File

@ -66,7 +66,7 @@ V8PeriodicTask::V8PeriodicTask (string const& id,
_parameters(parameters),
_created(TRI_microtime()) {
TRI_ASSERT(vocbase != 0);
TRI_ASSERT(vocbase != nullptr);
// increase reference counter for the database used
TRI_UseVocBase(_vocbase);
@ -80,7 +80,7 @@ V8PeriodicTask::~V8PeriodicTask () {
// decrease reference counter for the database used
TRI_ReleaseVocBase(_vocbase);
if (_parameters != 0) {
if (_parameters != nullptr) {
TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, _parameters);
}
}

View File

@ -117,7 +117,10 @@ bool V8TimerTask::handleTimeout () {
"(function (params) { " + _command + " } )(params);",
_parameters);
_dispatcher->addJob(job);
if (_dispatcher->addJob(job) != TRI_ERROR_NO_ERROR) {
// just in case the dispatcher cannot accept the job (e.g. when shutting down)
delete job;
}
// note: this will destroy the task (i.e. ourselves!!)
_scheduler->destroyTask(this);

View File

@ -110,7 +110,7 @@ static DispatcherThread* CreateV8DispatcherThread (DispatcherQueue* queue, void*
static v8::Handle<v8::Value> JS_RegisterTask (v8::Arguments const& argv) {
v8::HandleScope scope;
if (GlobalScheduler == 0 || GlobalDispatcher == 0) {
if (GlobalScheduler == nullptr || GlobalDispatcher == nullptr) {
TRI_V8_EXCEPTION_MESSAGE(scope, TRI_ERROR_INTERNAL, "no scheduler found");
}
@ -335,7 +335,7 @@ static v8::Handle<v8::Value> JS_GetTask (v8::Arguments const& argv) {
static v8::Handle<v8::Value> JS_CreateNamedQueue (v8::Arguments const& argv) {
v8::HandleScope scope;
if (GlobalDispatcher == 0) {
if (GlobalDispatcher == nullptr) {
TRI_V8_EXCEPTION_MESSAGE(scope, TRI_ERROR_INTERNAL, "no dispatcher found");
}

View File

@ -85,9 +85,9 @@ namespace {
ApplicationDispatcher::ApplicationDispatcher ()
: ApplicationFeature("dispatcher"),
_applicationScheduler(0),
_dispatcher(0),
_dispatcherReporterTask(0),
_applicationScheduler(nullptr),
_dispatcher(nullptr),
_dispatcherReporterTask(nullptr),
_reportInterval(0.0) {
}
@ -96,7 +96,7 @@ ApplicationDispatcher::ApplicationDispatcher ()
////////////////////////////////////////////////////////////////////////////////
ApplicationDispatcher::~ApplicationDispatcher () {
if (_dispatcher != 0) {
if (_dispatcher != nullptr) {
delete _dispatcher;
}
}
@ -127,13 +127,13 @@ Dispatcher* ApplicationDispatcher::dispatcher () const {
void ApplicationDispatcher::buildStandardQueue (size_t nrThreads,
size_t maxSize) {
if (_dispatcher == 0) {
if (_dispatcher == nullptr) {
LOG_FATAL_AND_EXIT("no dispatcher is known, cannot create dispatcher queue");
}
LOG_TRACE("setting up a standard queue with %d threads", (int) nrThreads);
TRI_ASSERT(_dispatcher != 0);
TRI_ASSERT(_dispatcher != nullptr);
_dispatcher->addStandardQueue(nrThreads, maxSize);
}
@ -200,7 +200,7 @@ bool ApplicationDispatcher::open () {
return true;
}
if (_dispatcher != 0) {
if (_dispatcher != nullptr) {
return _dispatcher->open();
}
@ -216,7 +216,7 @@ void ApplicationDispatcher::close () {
return;
}
if (_dispatcher != 0) {
if (_dispatcher != nullptr) {
_dispatcher->beginShutdown();
}
}
@ -230,11 +230,11 @@ void ApplicationDispatcher::stop () {
return;
}
if (_dispatcherReporterTask != 0) {
_dispatcherReporterTask = 0;
if (_dispatcherReporterTask != nullptr) {
_dispatcherReporterTask = nullptr;
}
if (_dispatcher != 0) {
if (_dispatcher != nullptr) {
static size_t const MAX_TRIES = 50; // 10 seconds (50 * 200 ms)
for (size_t count = 0; count < MAX_TRIES && _dispatcher->isRunning(); ++count) {
@ -245,7 +245,7 @@ void ApplicationDispatcher::stop () {
_dispatcher->shutdown();
delete _dispatcher;
_dispatcher = 0;
_dispatcher = nullptr;
}
}
@ -258,7 +258,7 @@ void ApplicationDispatcher::stop () {
////////////////////////////////////////////////////////////////////////////////
void ApplicationDispatcher::buildDispatcher (Scheduler* scheduler) {
if (_dispatcher != 0) {
if (_dispatcher != nullptr) {
LOG_FATAL_AND_EXIT("a dispatcher has already been created");
}
@ -270,7 +270,7 @@ void ApplicationDispatcher::buildDispatcher (Scheduler* scheduler) {
////////////////////////////////////////////////////////////////////////////////
void ApplicationDispatcher::buildDispatcherReporter () {
if (_dispatcher == 0) {
if (_dispatcher == nullptr) {
LOG_FATAL_AND_EXIT("no dispatcher is known, cannot create dispatcher reporter");
}

View File

@ -175,10 +175,10 @@ int Dispatcher::addJob (Job* job) {
}
// try to find a suitable queue
const string& name = job->queue();
string const& name = job->queue();
DispatcherQueue* queue = lookupQueue(name);
if (queue == 0) {
if (queue == nullptr) {
LOG_WARNING("unknown queue '%s'", name.c_str());
return TRI_ERROR_QUEUE_UNKNOWN;
}
@ -201,10 +201,10 @@ int Dispatcher::addJob (Job* job) {
////////////////////////////////////////////////////////////////////////////////
bool Dispatcher::cancelJob (uint64_t jobId) {
MUTEX_LOCKER(_accessDispatcher);
bool done = false;
MUTEX_LOCKER(_accessDispatcher);
for (map<string, DispatcherQueue*>::iterator i = _queues.begin(); i != _queues.end() && ! done; ++i) {
DispatcherQueue* q = i->second;

View File

@ -224,6 +224,27 @@ void DispatcherQueue::beginShutdown () {
size_t const MAX_TRIES = 10;
_stopping = 1;
// kill all jobs in the queue that were not yet executed
{
CONDITION_LOCKER(guard, _accessQueue);
for (auto it = _readyJobs.begin(); it != _readyJobs.end(); ++it) {
Job* job = *it;
bool canceled = job->cancel(false);
if (canceled) {
try {
job->setDispatcherThread(0);
job->cleanup();
}
catch (...) {
}
}
}
_readyJobs.clear();
}
for (size_t count = 0; count < MAX_TRIES; ++count) {
{