1
0
Fork 0
This commit is contained in:
Simon Grätzer 2017-03-02 12:47:50 +01:00
parent 27098e9e4f
commit c74fd5b2d8
2 changed files with 12 additions and 5 deletions

View File

@ -47,17 +47,20 @@ struct SSSPComputation : public VertexComputation<int64_t, int64_t, int64_t> {
int64_t val = *edge->data() + tmp; int64_t val = *edge->data() + tmp;
sendMessage(edge, val); sendMessage(edge, val);
} }
} else {
voteHalt();
} }
voteHalt();
} }
}; };
uint32_t SSSPAlgorithm::messageBatchSize(WorkerConfig const& config, uint32_t SSSPAlgorithm::messageBatchSize(WorkerConfig const& config,
MessageStats const& stats) const { MessageStats const& stats) const {
if (config.localSuperstep() <= 2) { if (config.localSuperstep() <= 1) {
return 1; return 5;
} else { } else {
return Algorithm::messageBatchSize(config, stats); double msgsPerSec = stats.sendCount / stats.superstepRuntimeSecs;
msgsPerSec /= config.parallelism(); // per thread
return msgsPerSec > 250.0 ? (uint32_t)msgsPerSec : 250;
} }
} }

View File

@ -561,7 +561,11 @@ void Worker<V, E, M>::_finishedProcessing() {
/// in async mode checks if there are messages to process /// in async mode checks if there are messages to process
template <typename V, typename E, typename M> template <typename V, typename E, typename M>
void Worker<V, E, M>::_continueAsync() { void Worker<V, E, M>::_continueAsync() {
if (_state == WorkerState::IDLE && _writeCache->containedMessageCount() > 0) { uint64_t cnt = _writeCache->containedMessageCount();
if (_state == WorkerState::IDLE && cnt > 0) {
if (cnt < 10) {
usleep(10000);
}
{ // swap these pointers atomically { // swap these pointers atomically
MY_WRITE_LOCKER(guard, _cacheRWLock); MY_WRITE_LOCKER(guard, _cacheRWLock);
std::swap(_readCache, _writeCache); std::swap(_readCache, _writeCache);