From cc6df4f70c6e2213c5d5e82d2fa066ff528eca7c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Simon=20Gra=CC=88tzer?= Date: Fri, 25 Nov 2016 17:03:08 +0100 Subject: [PATCH] Master Context --- arangod/Pregel/Conductor.cpp | 74 +++++++++++++++++++++++++++++++- arangod/Pregel/Conductor.h | 8 +++- arangod/Pregel/MasterContext.h | 69 +++++++++++++++++++++++++++++ arangod/Pregel/PregelFeature.cpp | 47 ++++++++++++++------ arangod/Pregel/PregelFeature.h | 6 ++- arangod/Pregel/Recovery.cpp | 71 ++++++++++++++++++++++-------- arangod/Pregel/Recovery.h | 10 +++-- arangod/Pregel/Utils.cpp | 2 +- arangod/Pregel/Utils.h | 1 + arangod/Pregel/WorkerContext.h | 1 - 10 files changed, 247 insertions(+), 42 deletions(-) create mode 100644 arangod/Pregel/MasterContext.h diff --git a/arangod/Pregel/Conductor.cpp b/arangod/Pregel/Conductor.cpp index ede81395d4..effdd93f46 100644 --- a/arangod/Pregel/Conductor.cpp +++ b/arangod/Pregel/Conductor.cpp @@ -39,6 +39,8 @@ #include "Pregel/Algorithm.h" #include "Pregel/Algos/PageRank.h" #include "Pregel/Algos/SSSP.h" +#include "Pregel/PregelFeature.h" +#include "Pregel/Recovery.h" #include @@ -63,7 +65,6 @@ Conductor::Conductor( : _vocbaseGuard(vocbase), _executionNumber(executionNumber), _algorithm(algorithm), - _recoveryManager(this), _vertexCollections(vertexCollections), _edgeCollections(edgeCollections) { bool isCoordinator = ServerState::instance()->isCoordinator(); @@ -214,7 +215,7 @@ void Conductor::start(VPackSlice userConfig) { if (nrDone == requests.size()) { if (_startGlobalStep()) { - _recoveryManager.monitorDBServers(_dbServers); + PregelFeature::instance()->recoveryManager()->monitorDBServers(_dbServers); } } else { LOG(ERR) << "Not all DBServers started the execution"; @@ -317,6 +318,75 @@ void Conductor::finishedGlobalStep(VPackSlice& data) { void Conductor::cancel() {_state = ExecutionState::CANCELED; } +void Conductor::checkForWorkerOutage() { + RecoveryManager *manager = PregelFeature::instance()->recoveryManager(); + if (manager) { + if (manager->allServersAvailable(_dbServers) == false) { + // we lost a DBServer, we need to reconfigure all remainging servers + // so they load the data for the lost machine + + std::map>> vertexMap, edgeMap; + + // resolve plan id's and shards on the servers + for (auto &collection : _vertexCollections) { + resolveShards(collection.get(), vertexMap); + } + for (auto &collection : _edgeCollections) { + resolveShards(collection.get(), edgeMap); + } + _dbServers.clear(); + for (auto const& pair : vertexMap) { + _dbServers.push_back(pair.first); + } + + std::string const baseUrl = Utils::baseUrl(_vocbaseGuard.vocbase()->name()); + std::string coordinatorId = ServerState::instance()->getId(); + LOG(INFO) << "My id: " << coordinatorId; + std::vector requests; + for (auto const& it : vertexMap) { + ServerID const& server = it.first; + std::map> const& vertexShardMap = it.second; + std::map> const& edgeShardMap = edgeMap[it.first]; + + VPackBuilder b; + b.openObject(); + b.add(Utils::executionNumberKey, VPackValue(_executionNumber)); + b.add(Utils::vertexShardsKey, VPackValue(VPackValueType::Object)); + for (auto const& pair : vertexShardMap) { + b.add(pair.first, VPackValue(VPackValueType::Array)); + for (ShardID const& shard : pair.second) { + b.add(VPackValue(shard)); + } + b.close(); + } + b.close(); + b.add(Utils::edgeShardsKey, VPackValue(VPackValueType::Object)); + for (auto const& pair : edgeShardMap) { + b.add(pair.first, VPackValue(VPackValueType::Array)); + for (ShardID const& shard : pair.second) { + b.add(VPackValue(shard)); + } + b.close(); + } + b.close(); + b.close(); + + + auto body = std::make_shared(b.toJson()); + requests.emplace_back("server:" + server, rest::RequestType::POST, + baseUrl + Utils::reconfigurePath, body); + } + + ClusterComm* cc = ClusterComm::instance(); + size_t nrDone = 0; + cc->performRequests(requests, 5.0 * 60.0, nrDone, LogTopic("Pregel Conductor")); + LOG(INFO) << "Send messages to " << nrDone << " shards of " + << _vertexCollections[0]->name(); + + } + } +} + int Conductor::_sendToAllDBServers(std::string path, VPackSlice const& config) { ClusterComm* cc = ClusterComm::instance(); _responseCount = 0; diff --git a/arangod/Pregel/Conductor.h b/arangod/Pregel/Conductor.h index dcbc057b3b..a93397cc85 100644 --- a/arangod/Pregel/Conductor.h +++ b/arangod/Pregel/Conductor.h @@ -30,7 +30,6 @@ #include "Cluster/ClusterInfo.h" #include "Pregel/AggregatorUsage.h" #include "Pregel/Statistics.h" -#include "Pregel/Recovery.h" #include "VocBase/vocbase.h" namespace arangodb { @@ -41,13 +40,17 @@ enum ExecutionState { DEFAULT, RUNNING, DONE, CANCELED }; class Conductor { friend class arangodb::RestPregelHandler; + enum OperationMode { + NORMAL, + RECOVERY + }; ExecutionState _state = ExecutionState::DEFAULT; + OperationMode _operationMode = OperationMode::NORMAL; const VocbaseGuard _vocbaseGuard; const uint64_t _executionNumber; const std::string _algorithm; Mutex _finishedGSSMutex; // prevents concurrent calls to finishedGlobalStep - RecoveryManager _recoveryManager; std::vector> _vertexCollections; std::vector> _edgeCollections; @@ -77,6 +80,7 @@ class Conductor { void start(VPackSlice params); void cancel(); + void checkForWorkerOutage(); ExecutionState getState() const { return _state; } WorkerStats workerStats() const {return _workerStats;} diff --git a/arangod/Pregel/MasterContext.h b/arangod/Pregel/MasterContext.h new file mode 100644 index 0000000000..6c79601500 --- /dev/null +++ b/arangod/Pregel/MasterContext.h @@ -0,0 +1,69 @@ +//////////////////////////////////////////////////////////////////////////////// +/// DISCLAIMER +/// +/// Copyright 2016 ArangoDB GmbH, Cologne, Germany +/// +/// Licensed under the Apache License, Version 2.0 (the "License"); +/// you may not use this file except in compliance with the License. +/// You may obtain a copy of the License at +/// +/// http://www.apache.org/licenses/LICENSE-2.0 +/// +/// Unless required by applicable law or agreed to in writing, software +/// distributed under the License is distributed on an "AS IS" BASIS, +/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +/// See the License for the specific language governing permissions and +/// limitations under the License. +/// +/// Copyright holder is ArangoDB GmbH, Cologne, Germany +/// +/// @author Simon Grätzer +//////////////////////////////////////////////////////////////////////////////// + +#ifndef ARANGODB_PREGEL_MASTER_CONTEXT_H +#define ARANGODB_PREGEL_MASTER_CONTEXT_H 1 + +#include +#include +#include "AggregatorUsage.h" +#include "Basics/Common.h" +#include "Utils.h" + +namespace arangodb { +namespace pregel { + +class MasterContext { + + uint64_t _vertexCount, _edgeCount; + const AggregatorUsage* _aggregators; + + protected: + template + inline void aggregate(std::string const& name, const T* valuePtr) { + _aggregators->aggregate(name, valuePtr); + } + + template + inline const T* getAggregatedValue(std::string const& name) { + return _aggregators->getAggregatedValue(name); + } + + virtual void preApplication(){}; + virtual void preGlobalSuperstep(uint64_t gss){}; + virtual void postGlobalSuperstep(uint64_t gss){}; + virtual void postApplication(){}; + + virtual void compensate() { + + } + + public: + MasterContext(){}; + + inline uint64_t vertexCount() const { return _vertexCount; } + + inline uint64_t edgeCount() const { return _edgeCount; } +}; +} +} +#endif diff --git a/arangod/Pregel/PregelFeature.cpp b/arangod/Pregel/PregelFeature.cpp index 5eef8573d6..ab23a3da86 100644 --- a/arangod/Pregel/PregelFeature.cpp +++ b/arangod/Pregel/PregelFeature.cpp @@ -22,9 +22,13 @@ #include "PregelFeature.h" #include "Cluster/ClusterInfo.h" -#include "Conductor.h" -#include "Worker.h" +#include "Pregel/Conductor.h" +#include "Pregel/Worker.h" +#include "Pregel/Recovery.h" #include "Basics/MutexLocker.h" +#include "ApplicationFeatures/ApplicationServer.h" +#include "Cluster/ClusterFeature.h" + using namespace arangodb::pregel; @@ -36,18 +40,36 @@ uint64_t PregelFeature::createExecutionNumber() { PregelFeature::PregelFeature(application_features::ApplicationServer* server) : ApplicationFeature(server, "Pregel") { - setOptional(true); + setOptional(false); requiresElevatedPrivileges(false); startsAfter("Database"); startsAfter("Logger"); startsAfter("Endpoint"); + startsAfter("Cluster"); Instance = this; } -PregelFeature::~PregelFeature() { cleanupAll(); } +PregelFeature::~PregelFeature() { + if (_recoveryManager) { + delete _recoveryManager; + } + cleanupAll(); +} PregelFeature* PregelFeature::instance() { return Instance; } +void PregelFeature::start() { + ClusterFeature* cluster = + application_features::ApplicationServer::getFeature( + "Cluster"); + if (cluster != nullptr) { + AgencyCallbackRegistry *registry = cluster->agencyCallbackRegistry(); + if (registry != nullptr) { + _recoveryManager = new RecoveryManager(registry); + } + } +} + void PregelFeature::beginShutdown() { cleanupAll(); } void PregelFeature::addExecution(Conductor* const exec, @@ -59,12 +81,8 @@ void PregelFeature::addExecution(Conductor* const exec, Conductor* PregelFeature::conductor(uint64_t executionNumber) { MUTEX_LOCKER(guard, _mutex); - auto it = _conductors.find(executionNumber); - if (it != _conductors.end()) - return it->second; - else - return nullptr; + return it != _conductors.end() ? it->second : nullptr; } void PregelFeature::addWorker(IWorker* const worker, @@ -76,10 +94,7 @@ void PregelFeature::addWorker(IWorker* const worker, IWorker* PregelFeature::worker(uint64_t executionNumber) { MUTEX_LOCKER(guard, _mutex); auto it = _workers.find(executionNumber); - if (it != _workers.end()) - return it->second; - else - return nullptr; + return it != _workers.end() ? it->second : nullptr; } void PregelFeature::cleanup(uint64_t executionNumber) { @@ -107,3 +122,9 @@ void PregelFeature::cleanupAll() { } _workers.clear(); } + +void PregelFeature::notifyConductors() { + for (auto it : _conductors) { + it.second->checkForWorkerOutage(); + } +} diff --git a/arangod/Pregel/PregelFeature.h b/arangod/Pregel/PregelFeature.h index 718291a4d9..c18afea640 100644 --- a/arangod/Pregel/PregelFeature.h +++ b/arangod/Pregel/PregelFeature.h @@ -33,6 +33,7 @@ namespace pregel { class Conductor; class IWorker; +class RecoveryManager; class PregelFeature final : public application_features::ApplicationFeature { public: @@ -41,23 +42,26 @@ class PregelFeature final : public application_features::ApplicationFeature { static PregelFeature* instance(); + void start() override final; void beginShutdown() override final; uint64_t createExecutionNumber(); void addExecution(Conductor* const exec, uint64_t executionNumber); Conductor* conductor(uint64_t executionNumber); - void notifyConductorOutage(); + void notifyConductors(); void addWorker(IWorker* const worker, uint64_t executionNumber); IWorker* worker(uint64_t executionNumber); void cleanup(uint64_t executionNumber); void cleanupAll(); + RecoveryManager* recoveryManager() {return _recoveryManager;} private: std::unordered_map _conductors; std::unordered_map _workers; Mutex _mutex; + RecoveryManager* _recoveryManager; }; } } diff --git a/arangod/Pregel/Recovery.cpp b/arangod/Pregel/Recovery.cpp index 9971264f31..2c74c9271a 100644 --- a/arangod/Pregel/Recovery.cpp +++ b/arangod/Pregel/Recovery.cpp @@ -22,32 +22,33 @@ #include "Recovery.h" -#include "Basics/Exceptions.h" -#include "Cluster/ClusterFeature.h" +#include "Basics/MutexLocker.h" #include "Cluster/ClusterInfo.h" #include "Pregel/Utils.h" #include "Pregel/Conductor.h" #include "Pregel/WorkerState.h" -#include "ApplicationFeatures/ApplicationServer.h" +#include "Pregel/PregelFeature.h" +#include "Agency/Supervision.h" using namespace arangodb; using namespace arangodb::pregel; -RecoveryManager::RecoveryManager(Conductor *c) : _conductor(c) { - ClusterFeature* cluster = - application_features::ApplicationServer::getFeature( - "Cluster"); - TRI_ASSERT(cluster != nullptr); - _agencyCallbackRegistry = cluster->agencyCallbackRegistry(); +RecoveryManager::RecoveryManager(AgencyCallbackRegistry *registry) : _agencyCallbackRegistry(registry) { } RecoveryManager::~RecoveryManager() { + stopMonitoring(); +} + +void RecoveryManager::stopMonitoring() { for (auto call : _agencyCallbacks) { _agencyCallbackRegistry->unregisterCallback(call); } + _agencyCallbacks.clear(); } void RecoveryManager::monitorDBServers(std::vector const& dbServers) { + MUTEX_LOCKER(guard, _lock); std::function dbServerChanged = [](VPackSlice const& result) { @@ -80,25 +81,59 @@ void RecoveryManager::monitorDBServers(std::vector const& dbServers) { } *dbServerResult = setErrormsg(TRI_ERROR_NO_ERROR, *errMsg); }*/ + + PregelFeature::instance()->notifyConductors(); + LOG(INFO) << result.toString(); return true; }; //std::string const& dbName = _conductor->_vocbaseGuard.vocbase()->name(); - for (auto server : dbServers) { - _statusMap[server] = ""; - std::string path = "Supervision/Health/" + server + "/Status"; - auto call = std::make_shared(_agency, path, - dbServerChanged, true, false); - _agencyCallbacks.push_back(call); - _agencyCallbackRegistry->registerCallback(call); - } + std::string path = "Plan/Collections/_system/6500032"; + auto call = std::make_shared(_agency, path, + dbServerChanged, true, false); + _agencyCallbacks.push_back(call); + _agencyCallbackRegistry->registerCallback(call); + + /*for (auto server : dbServers) { + auto const& it = _statusMap.find(server); + if (it == _statusMap.end()) { + std::string path = "Supervision/Health/" + server + "/Status/"; + auto call = std::make_shared(_agency, path, + dbServerChanged, true, false); + _agencyCallbacks.push_back(call); + _agencyCallbackRegistry->registerCallback(call); + } + }*/ } bool RecoveryManager::allServersAvailable(std::vector const& dbServers) { + MUTEX_LOCKER(guard, _lock); + if (TRI_microtime() - _lastHealthCheck > 5.0) { + //Supervis + AgencyCommResult result = _agency.getValues("Supervision/Health/"); + if (result.successful() && result.slice().isObject()) { + for (auto server : VPackObjectIterator(result.slice())) { + std::string status = server.value.get("Status").copyString(); + _statusMap[server.key.copyString()] = status; + } + _lastHealthCheck = TRI_microtime();// I don't like this + } + } - return false; + std::string failed(consensus::Supervision::HEALTH_STATUS_FAILED); + + for (auto const& server : dbServers) { + auto const& it = _statusMap.find(server); + if (it != _statusMap.end()) { + if (it->second == failed) { + return false; + } + } + } + + return true; } diff --git a/arangod/Pregel/Recovery.h b/arangod/Pregel/Recovery.h index 5162533276..f1e60b4193 100644 --- a/arangod/Pregel/Recovery.h +++ b/arangod/Pregel/Recovery.h @@ -23,6 +23,7 @@ #ifndef ARANGODB_PREGEL_RECOVERY_H #define ARANGODB_PREGEL_RECOVERY_H 1 +#include "Basics/Mutex.h" #include "Cluster/ClusterInfo.h" #include "Cluster/AgencyComm.h" #include "Cluster/AgencyCallbackRegistry.h" @@ -32,7 +33,6 @@ namespace arangodb { namespace pregel { -class Conductor; template class GraphStore; @@ -40,16 +40,18 @@ class RecoveryManager { AgencyComm _agency; AgencyCallbackRegistry *_agencyCallbackRegistry;//weak - Conductor *_conductor; - + Mutex _lock; + double _lastHealthCheck = 0; + std::map _statusMap; std::vector> _agencyCallbacks; public: - RecoveryManager(Conductor *c); + RecoveryManager(AgencyCallbackRegistry *registry); ~RecoveryManager(); void monitorDBServers(std::vector const& dbServers); + void stopMonitoring(); bool allServersAvailable(std::vector const& dbServers); }; diff --git a/arangod/Pregel/Utils.cpp b/arangod/Pregel/Utils.cpp index 0ee319cf61..b9a6cd40f1 100644 --- a/arangod/Pregel/Utils.cpp +++ b/arangod/Pregel/Utils.cpp @@ -34,7 +34,6 @@ using namespace arangodb; using namespace arangodb::pregel; -std::string const Utils::edgeShardingKey = "_vertex"; std::string const Utils::apiPrefix = "/_api/pregel/"; std::string const Utils::startExecutionPath = "startExecution"; std::string const Utils::prepareGSSPath = "prepareGSS"; @@ -42,6 +41,7 @@ std::string const Utils::startGSSPath = "startGSS"; std::string const Utils::finishedGSSPath = "finishedGSS"; std::string const Utils::messagesPath = "messages"; std::string const Utils::finalizeExecutionPath = "finalizeExecution"; +std::string const Utils::reconfigurePath = "reconfigure"; std::string const Utils::executionNumberKey = "exn"; std::string const Utils::collectionPlanIdMapKey = "collectionPlanIdMap"; diff --git a/arangod/Pregel/Utils.h b/arangod/Pregel/Utils.h index 6d08ea6be4..32995f390e 100644 --- a/arangod/Pregel/Utils.h +++ b/arangod/Pregel/Utils.h @@ -46,6 +46,7 @@ class Utils { static std::string const finishedGSSPath; static std::string const messagesPath; static std::string const finalizeExecutionPath; + static std::string const reconfigurePath; static std::string const executionNumberKey; static std::string const algorithmKey; diff --git a/arangod/Pregel/WorkerContext.h b/arangod/Pregel/WorkerContext.h index 277e65d0a3..0a6336c09d 100644 --- a/arangod/Pregel/WorkerContext.h +++ b/arangod/Pregel/WorkerContext.h @@ -25,7 +25,6 @@ #include #include -#include #include "AggregatorUsage.h" #include "Basics/Common.h" #include "Utils.h"