diff --git a/arangod/Pregel/Algos/SSSP.cpp b/arangod/Pregel/Algos/SSSP.cpp index 9582f9b4af..65e3c5324a 100644 --- a/arangod/Pregel/Algos/SSSP.cpp +++ b/arangod/Pregel/Algos/SSSP.cpp @@ -47,17 +47,20 @@ struct SSSPComputation : public VertexComputation { int64_t val = *edge->data() + tmp; sendMessage(edge, val); } + } else { + voteHalt(); } - voteHalt(); } }; uint32_t SSSPAlgorithm::messageBatchSize(WorkerConfig const& config, MessageStats const& stats) const { - if (config.localSuperstep() <= 2) { - return 1; + if (config.localSuperstep() <= 1) { + return 5; } else { - return Algorithm::messageBatchSize(config, stats); + double msgsPerSec = stats.sendCount / stats.superstepRuntimeSecs; + msgsPerSec /= config.parallelism(); // per thread + return msgsPerSec > 250.0 ? (uint32_t)msgsPerSec : 250; } } diff --git a/arangod/Pregel/Worker.cpp b/arangod/Pregel/Worker.cpp index 27aa7c3266..7c35ac9ee2 100644 --- a/arangod/Pregel/Worker.cpp +++ b/arangod/Pregel/Worker.cpp @@ -561,7 +561,11 @@ void Worker::_finishedProcessing() { /// in async mode checks if there are messages to process template void Worker::_continueAsync() { - if (_state == WorkerState::IDLE && _writeCache->containedMessageCount() > 0) { + uint64_t cnt = _writeCache->containedMessageCount(); + if (_state == WorkerState::IDLE && cnt > 0) { + if (cnt < 10) { + usleep(10000); + } { // swap these pointers atomically MY_WRITE_LOCKER(guard, _cacheRWLock); std::swap(_readCache, _writeCache);