diff --git a/arangod/Pregel/Worker.cpp b/arangod/Pregel/Worker.cpp index 8586e4fb2d..f137176955 100644 --- a/arangod/Pregel/Worker.cpp +++ b/arangod/Pregel/Worker.cpp @@ -255,7 +255,6 @@ void Worker::receivedMessages(VPackSlice const& data) { // Trigger the processing of vertices if (_config.asynchronousMode() && _state == WorkerState::IDLE) { - MUTEX_LOCKER(guard, _commandMutex); _continueAsync(); } } else if (_config.asynchronousMode() && @@ -548,7 +547,6 @@ void Worker::_finishedProcessing() { } } if (proceed) { - MUTEX_LOCKER(guard, _commandMutex); _continueAsync(); } } else { // no answer expected @@ -561,12 +559,16 @@ void Worker::_finishedProcessing() { /// in async mode checks if there are messages to process template void Worker::_continueAsync() { - uint64_t cnt = _writeCache->containedMessageCount(); - if (_state == WorkerState::IDLE && cnt > 0) { + MUTEX_LOCKER(guard, _commandMutex); + if (_state != WorkerState::IDLE || _writeCache->containedMessageCount() == 0) { + return; + } // avoid calling this method accidentially - _state = WorkerState::COMPUTING; - - if (cnt < _messageBatchSize) { + _state = WorkerState::COMPUTING; + + ThreadPool *pool = PregelFeature::instance()->threadPool(); + pool->enqueue([this]{ + if (_writeCache->containedMessageCount() < _messageBatchSize) { usleep(50000); } { // swap these pointers atomically @@ -576,12 +578,13 @@ void Worker::_continueAsync() { _requestedNextGSS = true; } } + MUTEX_LOCKER(guard, _commandMutex); // overwrite conductor values with local values _conductorAggregators->resetValues(); _conductorAggregators->aggregateValues(*_workerAggregators.get()); _workerAggregators->resetValues(); _startProcessing(); - } + }); } template