1
0
Fork 0

Some more async stuff

This commit is contained in:
Simon Grätzer 2017-03-02 15:03:35 +01:00
parent fefed8b1d6
commit b2ceda6250
1 changed files with 11 additions and 8 deletions

View File

@ -255,7 +255,6 @@ void Worker<V, E, M>::receivedMessages(VPackSlice const& data) {
// Trigger the processing of vertices
if (_config.asynchronousMode() && _state == WorkerState::IDLE) {
MUTEX_LOCKER(guard, _commandMutex);
_continueAsync();
}
} else if (_config.asynchronousMode() &&
@ -548,7 +547,6 @@ void Worker<V, E, M>::_finishedProcessing() {
}
}
if (proceed) {
MUTEX_LOCKER(guard, _commandMutex);
_continueAsync();
}
} else { // no answer expected
@ -561,12 +559,16 @@ void Worker<V, E, M>::_finishedProcessing() {
/// in async mode checks if there are messages to process
template <typename V, typename E, typename M>
void Worker<V, E, M>::_continueAsync() {
uint64_t cnt = _writeCache->containedMessageCount();
if (_state == WorkerState::IDLE && cnt > 0) {
MUTEX_LOCKER(guard, _commandMutex);
if (_state != WorkerState::IDLE || _writeCache->containedMessageCount() == 0) {
return;
}
// avoid calling this method accidentially
_state = WorkerState::COMPUTING;
if (cnt < _messageBatchSize) {
_state = WorkerState::COMPUTING;
ThreadPool *pool = PregelFeature::instance()->threadPool();
pool->enqueue([this]{
if (_writeCache->containedMessageCount() < _messageBatchSize) {
usleep(50000);
}
{ // swap these pointers atomically
@ -576,12 +578,13 @@ void Worker<V, E, M>::_continueAsync() {
_requestedNextGSS = true;
}
}
MUTEX_LOCKER(guard, _commandMutex);
// overwrite conductor values with local values
_conductorAggregators->resetValues();
_conductorAggregators->aggregateValues(*_workerAggregators.get());
_workerAggregators->resetValues();
_startProcessing();
}
});
}
template <typename V, typename E, typename M>