From d9d192e4806e3b7fd582dba5b3a3808f16fb25f0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Simon=20Gra=CC=88tzer?= Date: Mon, 5 Dec 2016 13:53:49 +0100 Subject: [PATCH] fixed some stuff --- arangod/Pregel/Algorithm.h | 2 +- arangod/Pregel/Algos/PageRank.cpp | 12 +++++ arangod/Pregel/Algos/PageRank.h | 1 + arangod/Pregel/Utils.cpp | 1 + arangod/Pregel/Utils.h | 1 + arangod/Pregel/VertexComputation.h | 2 +- arangod/Pregel/Worker.cpp | 81 ++++++++++++++++++++---------- arangod/Pregel/Worker.h | 9 +++- 8 files changed, 79 insertions(+), 30 deletions(-) diff --git a/arangod/Pregel/Algorithm.h b/arangod/Pregel/Algorithm.h index e63533df2d..a74f960be9 100644 --- a/arangod/Pregel/Algorithm.h +++ b/arangod/Pregel/Algorithm.h @@ -84,7 +84,7 @@ struct Algorithm : IAlgorithm { virtual MessageFormat* messageFormat() const = 0; virtual MessageCombiner* messageCombiner() const = 0; virtual VertexComputation* createComputation(uint64_t gss) const = 0; - virtual VertexCompensation* createCompensation() { + virtual VertexCompensation* createCompensation(uint64_t gss) const { return nullptr; } protected: diff --git a/arangod/Pregel/Algos/PageRank.cpp b/arangod/Pregel/Algos/PageRank.cpp index 6cbcb42454..e847f187ee 100644 --- a/arangod/Pregel/Algos/PageRank.cpp +++ b/arangod/Pregel/Algos/PageRank.cpp @@ -110,6 +110,18 @@ PageRankAlgorithm::createComputation(uint64_t gss) const { return new PageRankComputation(_threshold); } +struct PageRankCompensation : public VertexCompensation { + PageRankCompensation() {} + void compensate(bool inLostPartition) override { + + } +}; + +VertexCompensation* +PageRankAlgorithm::createCompensation(uint64_t gss) const { + return new PageRankCompensation(); +} + Aggregator* PageRankAlgorithm::aggregator(std::string const& name) const { if (name == "convergence") { return new FloatMaxAggregator(0); diff --git a/arangod/Pregel/Algos/PageRank.h b/arangod/Pregel/Algos/PageRank.h index 4cf80104ee..d12ae7d1ed 100644 --- a/arangod/Pregel/Algos/PageRank.h +++ b/arangod/Pregel/Algos/PageRank.h @@ -42,6 +42,7 @@ struct PageRankAlgorithm : public SimpleAlgorithm { MessageCombiner* messageCombiner() const override; VertexComputation* createComputation(uint64_t gss) const override; + VertexCompensation* createCompensation(uint64_t gss) const override; Aggregator* aggregator(std::string const& name) const override; }; } diff --git a/arangod/Pregel/Utils.cpp b/arangod/Pregel/Utils.cpp index 078fa3b4fd..b8989c0b87 100644 --- a/arangod/Pregel/Utils.cpp +++ b/arangod/Pregel/Utils.cpp @@ -43,6 +43,7 @@ std::string const Utils::finishedGSSPath = "finishedGSS"; std::string const Utils::messagesPath = "messages"; std::string const Utils::finalizeExecutionPath = "finalizeExecution"; std::string const Utils::startRecoveryPath = "startRecovery"; +std::string const Utils::continueRecoveryPath = "continueRecovery"; std::string const Utils::finishedRecoveryPath = "finishedRecovery"; std::string const Utils::executionNumberKey = "exn"; diff --git a/arangod/Pregel/Utils.h b/arangod/Pregel/Utils.h index 6ccb19128b..bba3989af0 100644 --- a/arangod/Pregel/Utils.h +++ b/arangod/Pregel/Utils.h @@ -49,6 +49,7 @@ class Utils { static std::string const messagesPath; static std::string const finalizeExecutionPath; static std::string const startRecoveryPath; + static std::string const continueRecoveryPath; static std::string const finishedRecoveryPath; static std::string const executionNumberKey; diff --git a/arangod/Pregel/VertexComputation.h b/arangod/Pregel/VertexComputation.h index bb3c7cc09e..69abb6c938 100644 --- a/arangod/Pregel/VertexComputation.h +++ b/arangod/Pregel/VertexComputation.h @@ -95,7 +95,7 @@ public: }; template -class VertexCompensate : public VertexContext { +class VertexCompensation : public VertexContext { friend class Worker; public: diff --git a/arangod/Pregel/Worker.cpp b/arangod/Pregel/Worker.cpp index 032103ba5b..51d7a2ca61 100644 --- a/arangod/Pregel/Worker.cpp +++ b/arangod/Pregel/Worker.cpp @@ -332,41 +332,68 @@ template void Worker::startRecovery(VPackSlice data) { VPackSlice method = data.get(Utils::recoveryMethodKey); if (method.compareString(Utils::compensate) == 0) { + + _preRecoveryTotal = _graphStore->vertexCount(); WorkerState nextState(_state.database(), data); _graphStore->loadShards(nextState); _state = nextState; - - // TODO start compensate method - /* - ThreadPool *pool = PregelFeature::instance()->threadPool(); - pool->enqueue([this, start, end] { - if (!_running) { - LOG(INFO) << "Execution aborted prematurely."; - return; - } - - size_t total = _graphStore->vertexCount(); - size_t start = 0, end = delta; - auto vertexIterator = _graphStore->vertexIterator(start, end); - _executeGlobalStep(vertexIterator); - }); - - uint64_t gss = _state.globalSuperstep(); - std::unique_ptr> - vertexComputation(_algorithm->createComputation(gss)); - vertexComputation->_gss = gss; - vertexComputation->_context = _workerContext.get(); - vertexComputation->_graphStore = _graphStore.get(); - vertexComputation->_outgoing = &outCache; - vertexComputation->_conductorAggregators = _conductorAggregators.get(); - vertexComputation->_workerAggregators = &workerAggregator; - */ + compensateStep(data); } else if (method.compareString(Utils::rollback) == 0) { } } +template +void Worker::compensateStep(VPackSlice data) { + _conductorAggregators->resetValues(); + VPackSlice aggValues = data.get(Utils::aggregatorValuesKey); + if (aggValues.isObject()) { + _conductorAggregators->aggregateValues(aggValues); + } + _workerAggregators->resetValues(); + + ThreadPool *pool = PregelFeature::instance()->threadPool(); + pool->enqueue([this] { + if (!_running) { + LOG(INFO) << "Compensation aborted prematurely."; + return; + } + + size_t total = _graphStore->vertexCount(); + auto vertexIterator = _graphStore->vertexIterator(0, total); + + // TODO look if we can avoid instantiating this + std::unique_ptr> + vCompensate(_algorithm->createCompensation(_state.globalSuperstep())); + _initializeVertexContext(vCompensate.get()); + vCompensate->_workerAggregators = _workerAggregators.get(); + + size_t i = 0; + for (VertexEntry *vertexEntry : vertexIterator) { + vCompensate->_vertexEntry = vertexEntry; + vCompensate->compensate(i < _preRecoveryTotal); + i++; + if (!_running) { + LOG(INFO) << "Execution aborted prematurely."; + break; + } + } + VPackBuilder package; + package.openObject(); + package.add(Utils::senderKey, VPackValue(ServerState::instance()->getId())); + package.add(Utils::executionNumberKey, VPackValue(_state.executionNumber())); + package.add(Utils::globalSuperstepKey, VPackValue(_state.globalSuperstep())); + if (_workerAggregators->size() > 0) {// add aggregators + package.add(Utils::aggregatorValuesKey, VPackValue(VPackValueType::Object)); + _workerAggregators->serializeValues(package); + package.close(); + } + package.close(); + _callConductor(Utils::finishedRecoveryPath, package.slice()); + }); +} + template void Worker::_callConductor(std::string path, VPackSlice message) { @@ -383,6 +410,8 @@ void Worker::_callConductor(std::string path, VPackSlice message) { true); } + + // template types to create template class arangodb::pregel::Worker; template class arangodb::pregel::Worker; diff --git a/arangod/Pregel/Worker.h b/arangod/Pregel/Worker.h index 34338cc45a..74928debe2 100644 --- a/arangod/Pregel/Worker.h +++ b/arangod/Pregel/Worker.h @@ -37,7 +37,6 @@ class RestPregelHandler; namespace pregel { class IWorker { - public: virtual ~IWorker(){}; virtual void prepareGlobalStep(VPackSlice data) = 0; @@ -45,6 +44,7 @@ class IWorker { virtual void receivedMessages(VPackSlice data) = 0; virtual void finalizeExecution(VPackSlice data) = 0; virtual void startRecovery(VPackSlice data) = 0; + virtual void compensateStep(VPackSlice data) = 0; }; template @@ -73,6 +73,11 @@ class Worker : public IWorker { std::unique_ptr> _algorithm; std::unique_ptr _workerContext; Mutex _conductorMutex;// locks callbak methods + mutable Mutex _threadMutex;// locks _workerThreadDone + + // only valid while recovering to determine the offset + // where new vertices were inserted + size_t _preRecoveryTotal; std::unique_ptr> _graphStore; std::unique_ptr> _readCache, _writeCache; @@ -83,7 +88,6 @@ class Worker : public IWorker { WorkerStats _superstepStats; size_t _runningThreads; - mutable Mutex _threadMutex; void _swapIncomingCaches() { _readCache.swap(_writeCache); @@ -106,6 +110,7 @@ class Worker : public IWorker { void receivedMessages(VPackSlice data) override; void finalizeExecution(VPackSlice data) override; void startRecovery(VPackSlice data) override; + void compensateStep(VPackSlice data) override; }; } }