From f084bd7b12d25e557e980fa26e03538f8df265c1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Simon=20Gra=CC=88tzer?= Date: Sat, 17 Dec 2016 14:09:41 +0100 Subject: [PATCH] All necessities supported --- arangod/Pregel/OutgoingCache.cpp | 56 ++++++++++++++++++++++++-------- arangod/Pregel/OutgoingCache.h | 20 +++++++++--- arangod/Pregel/Worker.cpp | 48 ++++++++++++++------------- arangod/Pregel/Worker.h | 10 ++++-- 4 files changed, 91 insertions(+), 43 deletions(-) diff --git a/arangod/Pregel/OutgoingCache.cpp b/arangod/Pregel/OutgoingCache.cpp index 278717bb85..269eea8f42 100644 --- a/arangod/Pregel/OutgoingCache.cpp +++ b/arangod/Pregel/OutgoingCache.cpp @@ -40,12 +40,12 @@ template OutCache::OutCache(WorkerConfig* state, InCache* cache) : _state(state), _format(cache->format()), _localCache(cache) { _baseUrl = Utils::baseUrl(_state->database()); - _gss = _state->globalSuperstep(); } template -void OutCache::sendNextGSS(bool np) { - _gss = _state->globalSuperstep() + (np ? 1 : 0); +OutCache::OutCache(WorkerConfig* state, InCache* cache, InCache* nextGSS) + : _state(state), _format(cache->format()), _localCache(cache), _localCacheNextGSS(nextGSS) { + _baseUrl = Utils::baseUrl(_state->database()); } // ================= ArrayOutCache ================== @@ -65,9 +65,13 @@ template void ArrayOutCache::appendMessage(prgl_shard_t shard, std::string const& key, M const& data) { if (this->_state->isLocalVertexShard(shard)) { - this->_localCache->setDirect(shard, key, data); - // LOG(INFO) << "Worker: Got messages for myself " << key << " <- " << data; - this->_sendMessages++; + if (this->_sendToNextGSS) { + this->_localCacheNextGSS->setDirect(shard, key, data); + this->_sendCountNextGSS++; + } else { + this->_localCache->setDirect(shard, key, data); + this->_sendCount++; + } } else { _shardMap[shard][key].push_back(data); if (this->_containedMessages++ > this->_batchSize) { @@ -79,6 +83,10 @@ void ArrayOutCache::appendMessage(prgl_shard_t shard, std::string const& key, template void ArrayOutCache::flushMessages() { LOG(INFO) << "Beginning to send messages to other machines"; + uint64_t gss = this->_state->globalSuperstep(); + if (this->_sendToNextGSS) { + gss += 1; + } std::vector requests; for (auto const& it : _shardMap) { @@ -98,7 +106,11 @@ void ArrayOutCache::flushMessages() { package.add(VPackValue(vertexMessagePair.first)); for (M const& val : vertexMessagePair.second) { this->_format->addValue(package, val); - this->_sendMessages++; + if (this->_sendToNextGSS) { + this->_sendCountNextGSS++; + } else { + this->_sendCount++; + } } package.close(); } @@ -106,7 +118,7 @@ void ArrayOutCache::flushMessages() { package.add(Utils::senderKey, VPackValue(ServerState::instance()->getId())); package.add(Utils::executionNumberKey, VPackValue(this->_state->executionNumber())); - package.add(Utils::globalSuperstepKey, VPackValue(this->_gss)); + package.add(Utils::globalSuperstepKey, VPackValue(gss)); package.close(); // add a request ShardID const& shardId = this->_state->globalShardIDs()[shard]; @@ -137,6 +149,12 @@ CombiningOutCache::CombiningOutCache(WorkerConfig* state, CombiningInCache* cache) : OutCache(state, cache), _combiner(cache->combiner()) {} +template +CombiningOutCache::CombiningOutCache(WorkerConfig* state, + CombiningInCache* cache, + InCache *nextPhase) +: OutCache(state, cache, nextPhase), _combiner(cache->combiner()) {} + template CombiningOutCache::~CombiningOutCache() { clear(); @@ -153,9 +171,13 @@ void CombiningOutCache::appendMessage(prgl_shard_t shard, std::string const& key, M const& data) { if (this->_state->isLocalVertexShard(shard)) { - this->_localCache->setDirect(shard, key, data); - // LOG(INFO) << "Worker: Got messages for myself " << key << " <- " << data; - this->_sendMessages++; + if (this->_sendToNextGSS) { + this->_localCacheNextGSS->setDirect(shard, key, data); + this->_sendCountNextGSS++; + } else { + this->_localCache->setDirect(shard, key, data); + this->_sendCount++; + } } else { std::unordered_map& vertexMap = _shardMap[shard]; auto it = vertexMap.find(key); @@ -174,6 +196,10 @@ void CombiningOutCache::appendMessage(prgl_shard_t shard, template void CombiningOutCache::flushMessages() { LOG(INFO) << "Beginning to send messages to other machines"; + uint64_t gss = this->_state->globalSuperstep(); + if (this->_sendToNextGSS) { + gss += 1; + } std::vector requests; for (auto const& it : _shardMap) { @@ -194,13 +220,17 @@ void CombiningOutCache::flushMessages() { package.add(VPackValue(shard)); package.add(VPackValue(vertexMessagePair.first)); this->_format->addValue(package, vertexMessagePair.second); - this->_sendMessages++; + if (this->_sendToNextGSS) { + this->_sendCountNextGSS++; + } else { + this->_sendCount++; + } } package.close(); package.add(Utils::senderKey, VPackValue(ServerState::instance()->getId())); package.add(Utils::executionNumberKey, VPackValue(this->_state->executionNumber())); - package.add(Utils::globalSuperstepKey, VPackValue(this->_gss)); + package.add(Utils::globalSuperstepKey, VPackValue(gss)); package.close(); // add a request ShardID const& shardId = this->_state->globalShardIDs()[shard]; diff --git a/arangod/Pregel/OutgoingCache.h b/arangod/Pregel/OutgoingCache.h index 5634bea72e..f033c40903 100644 --- a/arangod/Pregel/OutgoingCache.h +++ b/arangod/Pregel/OutgoingCache.h @@ -52,23 +52,27 @@ class OutCache { WorkerConfig const* _state; MessageFormat const* _format; InCache* _localCache; + InCache* _localCacheNextGSS = nullptr; std::string _baseUrl; uint32_t _batchSize = 1000; - uint64_t _gss; + bool _sendToNextGSS = false; /// @brief current number of vertices stored size_t _containedMessages = 0; - size_t _sendMessages = 0; + size_t _sendCount = 0; + size_t _sendCountNextGSS = 0; bool shouldFlushCache(); public: OutCache(WorkerConfig* state, InCache* cache); + OutCache(WorkerConfig* state, InCache* cache, InCache* nextGSSCache); virtual ~OutCache(){}; - size_t sendMessageCount() const { return _sendMessages; } + size_t sendCount() const { return _sendCount; } + size_t sendCountNextGSS() const { return _sendCountNextGSS; } uint32_t batchSize() const { return _batchSize; } void setBatchSize(uint32_t bs) { _batchSize = bs; } - void sendNextGSS(bool np); + void sendToNextGSS(bool np) { _sendToNextGSS = np; } virtual void clear() = 0; virtual void appendMessage(prgl_shard_t shard, std::string const& key, @@ -86,6 +90,8 @@ class ArrayOutCache : public OutCache { public: ArrayOutCache(WorkerConfig* state, InCache* cache) : OutCache(state, cache) {} + ArrayOutCache(WorkerConfig* state, InCache* cache, InCache* nextGSSCache) + : OutCache(state, cache, nextGSSCache) {} ~ArrayOutCache(); void clear() override; @@ -103,7 +109,11 @@ class CombiningOutCache : public OutCache { _shardMap; public: - CombiningOutCache(WorkerConfig* state, CombiningInCache* cache); + CombiningOutCache(WorkerConfig* state, + CombiningInCache* cache); + CombiningOutCache(WorkerConfig* state, + CombiningInCache* cache, + InCache *nextPhase); ~CombiningOutCache(); void clear() override; diff --git a/arangod/Pregel/Worker.cpp b/arangod/Pregel/Worker.cpp index d90a4d66fd..41f46998a1 100644 --- a/arangod/Pregel/Worker.cpp +++ b/arangod/Pregel/Worker.cpp @@ -55,6 +55,7 @@ Worker::Worker(TRI_vocbase_t* vocbase, Algorithm* algo, _conductorAggregators.reset(new AggregatorHandler(algo)); _workerAggregators.reset(new AggregatorHandler(algo)); _graphStore.reset(new GraphStore(vocbase, _algorithm->inputFormat())); + _nextGSSSendMessageCount = 0; if (_messageCombiner) { _readCache = new CombiningInCache(_messageFormat.get(), _messageCombiner.get()); _writeCache = @@ -93,19 +94,16 @@ Worker::Worker(TRI_vocbase_t* vocbase, Algorithm* algo, }); } +/*template +GSSContext::~GSSContext() {}*/ + template Worker::~Worker() { LOG(INFO) << "Called ~Worker()"; _state = WorkerState::DONE; - if (_readCache) { - delete _readCache; - } - if (_writeCache) { - delete _writeCache; - } - if (_nextPhase) { - delete _nextPhase; - } + delete _readCache; + delete _writeCache; + delete _writeCacheNextGSS; } template @@ -146,14 +144,14 @@ void Worker::prepareGlobalStep(VPackSlice data) { if (_config.asynchronousMode()) { TRI_ASSERT(_readCache->receivedMessageCount() == 0); TRI_ASSERT(_writeCache->receivedMessageCount() == 0); - std::swap(_readCache, _nextPhase); + std::swap(_readCache, _writeCacheNextGSS); _writeCache->clear(); + _requestedNextGSS = false;// only relevant for async } else { TRI_ASSERT(_writeCache->receivedMessageCount() == 0); std::swap(_readCache, _writeCache); _writeCache->clear(); } - _requestedNextGSS = false;// only relevant for async // execute context if (_workerContext != nullptr) { @@ -164,6 +162,9 @@ void Worker::prepareGlobalStep(VPackSlice data) { template void Worker::receivedMessages(VPackSlice data) { // LOG(INFO) << "Worker received some messages: " << data.toJson(); + if (_state != WorkerState::COMPUTING) { + THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "Cannot receive messages while computng"); + } VPackSlice gssSlice = data.get(Utils::globalSuperstepKey); VPackSlice messageSlice = data.get(Utils::messagesKey); @@ -183,7 +184,7 @@ void Worker::receivedMessages(VPackSlice data) { } } } else if (_config.asynchronousMode() && gss == _config._globalSuperstep+1) { - _nextPhase->parseMessages(messageSlice); + _writeCacheNextGSS->parseMessages(messageSlice); } else { THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_BAD_PARAMETER, "Superstep out of sync"); @@ -265,21 +266,24 @@ void Worker::_processVertices( // thread local caches std::unique_ptr> inCache; - std::unique_ptr> outCache, nextOutCache; + std::unique_ptr> outCache; if (_messageCombiner) { inCache.reset( new CombiningInCache(_messageFormat.get(), _messageCombiner.get())); - outCache.reset( - new CombiningOutCache(&_config, (CombiningInCache*)inCache.get())); if (_config.asynchronousMode()) { - nextOutCache.reset(new CombiningOutCache(&_config, - (CombiningInCache*)inCache.get())); + outCache.reset(new CombiningOutCache(&_config, + (CombiningInCache*)inCache.get(), + _writeCacheNextGSS)); + } else { + outCache.reset(new CombiningOutCache(&_config, + (CombiningInCache*)inCache.get())); } } else { inCache.reset(new ArrayInCache(_messageFormat.get())); - outCache.reset(new ArrayOutCache(&_config, inCache.get())); if (_config.asynchronousMode()) { - nextOutCache.reset(new ArrayOutCache(&_config, inCache.get())); + outCache.reset(new ArrayOutCache(&_config, inCache.get(), _writeCacheNextGSS)); + } else { + outCache.reset(new ArrayOutCache(&_config, inCache.get())); } } @@ -292,7 +296,7 @@ void Worker::_processVertices( vertexComputation->_workerAggregators = &workerAggregator; vertexComputation->_cache = outCache.get(); if (_config.asynchronousMode()) { - outCache->sendNextGSS(_requestedNextGSS); + outCache->sendToNextGSS(_requestedNextGSS); } size_t activeCount = 0; @@ -321,8 +325,8 @@ void Worker::_processVertices( // ==================== send messages to other shards ==================== outCache->flushMessages(); if (!_requestedNextGSS && vertexComputation->_nextPhase) { - MUTEX_LOCKER(guard2, _commandMutex); _requestedNextGSS = true; + _nextGSSSendMessageCount += outCache->sendCountNextGSS(); } // merge thread local messages, _writeCache does locking @@ -333,7 +337,7 @@ void Worker::_processVertices( WorkerStats stats; stats.activeCount = activeCount; - stats.sendCount = outCache->sendMessageCount(); + stats.sendCount = outCache->sendCount(); stats.superstepRuntimeSecs = TRI_microtime() - start; _finishedProcessing(vertexComputation->_workerAggregators, stats); } diff --git a/arangod/Pregel/Worker.h b/arangod/Pregel/Worker.h index 2de983f0e7..dbf9271a99 100644 --- a/arangod/Pregel/Worker.h +++ b/arangod/Pregel/Worker.h @@ -23,6 +23,7 @@ #ifndef ARANGODB_PREGEL_WORKER_H #define ARANGODB_PREGEL_WORKER_H 1 +#include #include "Basics/Common.h" #include "Basics/Mutex.h" #include "Pregel/AggregatorHandler.h" @@ -47,7 +48,7 @@ class IWorker { virtual void startRecovery(VPackSlice data) = 0; virtual void compensateStep(VPackSlice data) = 0; }; - + template class GraphStore; @@ -86,19 +87,22 @@ class Worker : public IWorker { // only valid while recovering to determine the offset // where new vertices were inserted size_t _preRecoveryTotal; + /// During async mode this should keep track of the send messages std::unique_ptr _conductorAggregators; std::unique_ptr _workerAggregators; std::unique_ptr> _graphStore; std::unique_ptr> _messageFormat; std::unique_ptr> _messageCombiner; + // from previous or current superstep InCache *_readCache = nullptr; // for the current or next superstep InCache *_writeCache = nullptr; // intended for the next superstep phase - InCache *_nextPhase = nullptr; - bool _requestedNextGSS = true; + InCache *_writeCacheNextGSS = nullptr; + std::atomic _nextGSSSendMessageCount; + std::atomic _requestedNextGSS; WorkerStats _superstepStats; size_t _runningThreads;