diff --git a/arangod/Pregel/IncomingCache.cpp b/arangod/Pregel/IncomingCache.cpp index 6210b4834e..d3db940e63 100644 --- a/arangod/Pregel/IncomingCache.cpp +++ b/arangod/Pregel/IncomingCache.cpp @@ -121,17 +121,25 @@ void ArrayInCache::_set(prgl_shard_t shard, std::string const& key, } template -void ArrayInCache::mergeCache(InCache const* otherCache) { +void ArrayInCache::mergeCache(WorkerConfig const& config, InCache const* otherCache) { ArrayInCache* other = (ArrayInCache*)otherCache; this->_containedMessageCount += other->_containedMessageCount; - - for (auto const& pair : other->_shardMap) { - MUTEX_LOCKER(guard, this->_bucketLocker[pair.first]); - HMap& vertexMap(_shardMap[pair.first]); - for (auto& vertexMessage : pair.second) { - std::vector& a = vertexMap[vertexMessage.first]; - std::vector const& b = vertexMessage.second; - a.insert(a.end(), b.begin(), b.end()); + + // ranomize access to buckets + std::set const& shardIDs = config.localPregelShardIDs(); + std::vector randomized(shardIDs.begin(), shardIDs.end()); + std::random_shuffle(randomized.begin(), randomized.end()); + for (prgl_shard_t shardId : randomized) { + auto const& it = other->_shardMap.find(shardId); + if (it != other->_shardMap.end()) { + MUTEX_LOCKER(guard, this->_bucketLocker[shardId]); + HMap& myVertexMap = _shardMap[shardId]; + + for (auto& vertexMessage : it->second) { + std::vector& a = myVertexMap[vertexMessage.first]; + std::vector const& b = vertexMessage.second; + a.insert(a.end(), b.begin(), b.end()); + } } } } @@ -215,20 +223,28 @@ void CombiningInCache::_set(prgl_shard_t shard, std::string const& key, } template -void CombiningInCache::mergeCache(InCache const* otherCache) { +void CombiningInCache::mergeCache(WorkerConfig const& config, + InCache const* otherCache) { CombiningInCache* other = (CombiningInCache*)otherCache; this->_containedMessageCount += other->_containedMessageCount; - - for (auto const& pair : other->_shardMap) { - MUTEX_LOCKER(guard, this->_bucketLocker[pair.first]); - - HMap& vertexMap = _shardMap[pair.first]; - for (auto& vertexMessage : pair.second) { - auto vmsg = vertexMap.find(vertexMessage.first); - if (vmsg != vertexMap.end()) { // got a message for the same vertex - _combiner->combine(vmsg->second, vertexMessage.second); - } else { - vertexMap.insert(vertexMessage); + + // ranomize access to buckets + std::set const& shardIDs = config.localPregelShardIDs(); + std::vector randomized(shardIDs.begin(), shardIDs.end()); + std::random_shuffle(randomized.begin(), randomized.end()); + for (prgl_shard_t shardId : randomized) { + auto const& it = other->_shardMap.find(shardId); + if (it != other->_shardMap.end()) { + MUTEX_LOCKER(guard, this->_bucketLocker[shardId]); + HMap& myVertexMap = _shardMap[shardId]; + + for (auto& vertexMessage : it->second) { + auto vmsg = myVertexMap.find(vertexMessage.first); + if (vmsg != myVertexMap.end()) { // got a message for the same vertex + _combiner->combine(vmsg->second, vertexMessage.second); + } else { + myVertexMap.insert(vertexMessage); + } } } } diff --git a/arangod/Pregel/IncomingCache.h b/arangod/Pregel/IncomingCache.h index cdd012680c..4234fc39db 100644 --- a/arangod/Pregel/IncomingCache.h +++ b/arangod/Pregel/IncomingCache.h @@ -73,7 +73,7 @@ class InCache { void storeMessage(prgl_shard_t shard, std::string const& vertexId, M const& data); - virtual void mergeCache(InCache const* otherCache) = 0; + virtual void mergeCache(WorkerConfig const& config, InCache const* otherCache) = 0; /// @brief get messages for vertex id. (Don't use keys from _from or _to /// directly, they contain the collection name) virtual MessageIterator getMessages(prgl_shard_t shard, @@ -103,7 +103,7 @@ class ArrayInCache : public InCache { public: ArrayInCache(WorkerConfig const* config, MessageFormat const* format); - void mergeCache(InCache const* otherCache) override; + void mergeCache(WorkerConfig const& config, InCache const* otherCache) override; MessageIterator getMessages(prgl_shard_t shard, std::string const& key) override; void clear() override; @@ -131,7 +131,7 @@ class CombiningInCache : public InCache { MessageCombiner const* combiner() const { return _combiner; } - void mergeCache(InCache const* otherCache) override; + void mergeCache(WorkerConfig const& config, InCache const* otherCache) override; MessageIterator getMessages(prgl_shard_t shard, std::string const& key) override; void clear() override; diff --git a/arangod/Pregel/Worker.cpp b/arangod/Pregel/Worker.cpp index 6f75c7eee6..a01567cb5f 100644 --- a/arangod/Pregel/Worker.cpp +++ b/arangod/Pregel/Worker.cpp @@ -390,7 +390,7 @@ bool Worker::_processVertices( } // merge thread local messages, _writeCache does locking - _writeCache->mergeCache(inCache.get()); + _writeCache->mergeCache(_config, inCache.get()); // TODO ask how to implement message sending without waiting for a response MessageStats stats; @@ -452,8 +452,9 @@ void Worker::_finishedProcessing() { size_t total = _graphStore->localVertexCount(); if (total > currentAVCount) { if (_config.asynchronousMode()) { + // just process these vertices in the next superstep ReadLocker rguard(&_cacheRWLock); - _writeCache->mergeCache(_readCache); // compute in next superstep + _writeCache->mergeCache(_config, _readCache); // compute in next superstep _messageStats.sendCount += _readCache->containedMessageCount(); } else { // TODO call _startProcessing ???