1
0
Fork 0

Master Context

This commit is contained in:
Simon Grätzer 2016-11-25 17:03:08 +01:00
parent 9bcf8cd407
commit cc6df4f70c
10 changed files with 247 additions and 42 deletions

View File

@ -39,6 +39,8 @@
#include "Pregel/Algorithm.h"
#include "Pregel/Algos/PageRank.h"
#include "Pregel/Algos/SSSP.h"
#include "Pregel/PregelFeature.h"
#include "Pregel/Recovery.h"
#include <algorithm>
@ -63,7 +65,6 @@ Conductor::Conductor(
: _vocbaseGuard(vocbase),
_executionNumber(executionNumber),
_algorithm(algorithm),
_recoveryManager(this),
_vertexCollections(vertexCollections),
_edgeCollections(edgeCollections) {
bool isCoordinator = ServerState::instance()->isCoordinator();
@ -214,7 +215,7 @@ void Conductor::start(VPackSlice userConfig) {
if (nrDone == requests.size()) {
if (_startGlobalStep()) {
_recoveryManager.monitorDBServers(_dbServers);
PregelFeature::instance()->recoveryManager()->monitorDBServers(_dbServers);
}
} else {
LOG(ERR) << "Not all DBServers started the execution";
@ -317,6 +318,75 @@ void Conductor::finishedGlobalStep(VPackSlice& data) {
void Conductor::cancel() {_state = ExecutionState::CANCELED; }
void Conductor::checkForWorkerOutage() {
RecoveryManager *manager = PregelFeature::instance()->recoveryManager();
if (manager) {
if (manager->allServersAvailable(_dbServers) == false) {
// we lost a DBServer, we need to reconfigure all remainging servers
// so they load the data for the lost machine
std::map<ServerID, std::map<CollectionID, std::vector<ShardID>>> vertexMap, edgeMap;
// resolve plan id's and shards on the servers
for (auto &collection : _vertexCollections) {
resolveShards(collection.get(), vertexMap);
}
for (auto &collection : _edgeCollections) {
resolveShards(collection.get(), edgeMap);
}
_dbServers.clear();
for (auto const& pair : vertexMap) {
_dbServers.push_back(pair.first);
}
std::string const baseUrl = Utils::baseUrl(_vocbaseGuard.vocbase()->name());
std::string coordinatorId = ServerState::instance()->getId();
LOG(INFO) << "My id: " << coordinatorId;
std::vector<ClusterCommRequest> requests;
for (auto const& it : vertexMap) {
ServerID const& server = it.first;
std::map<CollectionID, std::vector<ShardID>> const& vertexShardMap = it.second;
std::map<CollectionID, std::vector<ShardID>> const& edgeShardMap = edgeMap[it.first];
VPackBuilder b;
b.openObject();
b.add(Utils::executionNumberKey, VPackValue(_executionNumber));
b.add(Utils::vertexShardsKey, VPackValue(VPackValueType::Object));
for (auto const& pair : vertexShardMap) {
b.add(pair.first, VPackValue(VPackValueType::Array));
for (ShardID const& shard : pair.second) {
b.add(VPackValue(shard));
}
b.close();
}
b.close();
b.add(Utils::edgeShardsKey, VPackValue(VPackValueType::Object));
for (auto const& pair : edgeShardMap) {
b.add(pair.first, VPackValue(VPackValueType::Array));
for (ShardID const& shard : pair.second) {
b.add(VPackValue(shard));
}
b.close();
}
b.close();
b.close();
auto body = std::make_shared<std::string const>(b.toJson());
requests.emplace_back("server:" + server, rest::RequestType::POST,
baseUrl + Utils::reconfigurePath, body);
}
ClusterComm* cc = ClusterComm::instance();
size_t nrDone = 0;
cc->performRequests(requests, 5.0 * 60.0, nrDone, LogTopic("Pregel Conductor"));
LOG(INFO) << "Send messages to " << nrDone << " shards of "
<< _vertexCollections[0]->name();
}
}
}
int Conductor::_sendToAllDBServers(std::string path, VPackSlice const& config) {
ClusterComm* cc = ClusterComm::instance();
_responseCount = 0;

View File

@ -30,7 +30,6 @@
#include "Cluster/ClusterInfo.h"
#include "Pregel/AggregatorUsage.h"
#include "Pregel/Statistics.h"
#include "Pregel/Recovery.h"
#include "VocBase/vocbase.h"
namespace arangodb {
@ -41,13 +40,17 @@ enum ExecutionState { DEFAULT, RUNNING, DONE, CANCELED };
class Conductor {
friend class arangodb::RestPregelHandler;
enum OperationMode {
NORMAL,
RECOVERY
};
ExecutionState _state = ExecutionState::DEFAULT;
OperationMode _operationMode = OperationMode::NORMAL;
const VocbaseGuard _vocbaseGuard;
const uint64_t _executionNumber;
const std::string _algorithm;
Mutex _finishedGSSMutex; // prevents concurrent calls to finishedGlobalStep
RecoveryManager _recoveryManager;
std::vector<std::shared_ptr<LogicalCollection>> _vertexCollections;
std::vector<std::shared_ptr<LogicalCollection>> _edgeCollections;
@ -77,6 +80,7 @@ class Conductor {
void start(VPackSlice params);
void cancel();
void checkForWorkerOutage();
ExecutionState getState() const { return _state; }
WorkerStats workerStats() const {return _workerStats;}

View File

@ -0,0 +1,69 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2016 ArangoDB GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Simon Grätzer
////////////////////////////////////////////////////////////////////////////////
#ifndef ARANGODB_PREGEL_MASTER_CONTEXT_H
#define ARANGODB_PREGEL_MASTER_CONTEXT_H 1
#include <velocypack/Slice.h>
#include <velocypack/velocypack-aliases.h>
#include "AggregatorUsage.h"
#include "Basics/Common.h"
#include "Utils.h"
namespace arangodb {
namespace pregel {
class MasterContext {
uint64_t _vertexCount, _edgeCount;
const AggregatorUsage* _aggregators;
protected:
template <typename T>
inline void aggregate(std::string const& name, const T* valuePtr) {
_aggregators->aggregate(name, valuePtr);
}
template <typename T>
inline const T* getAggregatedValue(std::string const& name) {
return _aggregators->getAggregatedValue(name);
}
virtual void preApplication(){};
virtual void preGlobalSuperstep(uint64_t gss){};
virtual void postGlobalSuperstep(uint64_t gss){};
virtual void postApplication(){};
virtual void compensate() {
}
public:
MasterContext(){};
inline uint64_t vertexCount() const { return _vertexCount; }
inline uint64_t edgeCount() const { return _edgeCount; }
};
}
}
#endif

View File

@ -22,9 +22,13 @@
#include "PregelFeature.h"
#include "Cluster/ClusterInfo.h"
#include "Conductor.h"
#include "Worker.h"
#include "Pregel/Conductor.h"
#include "Pregel/Worker.h"
#include "Pregel/Recovery.h"
#include "Basics/MutexLocker.h"
#include "ApplicationFeatures/ApplicationServer.h"
#include "Cluster/ClusterFeature.h"
using namespace arangodb::pregel;
@ -36,18 +40,36 @@ uint64_t PregelFeature::createExecutionNumber() {
PregelFeature::PregelFeature(application_features::ApplicationServer* server)
: ApplicationFeature(server, "Pregel") {
setOptional(true);
setOptional(false);
requiresElevatedPrivileges(false);
startsAfter("Database");
startsAfter("Logger");
startsAfter("Endpoint");
startsAfter("Cluster");
Instance = this;
}
PregelFeature::~PregelFeature() { cleanupAll(); }
PregelFeature::~PregelFeature() {
if (_recoveryManager) {
delete _recoveryManager;
}
cleanupAll();
}
PregelFeature* PregelFeature::instance() { return Instance; }
void PregelFeature::start() {
ClusterFeature* cluster =
application_features::ApplicationServer::getFeature<ClusterFeature>(
"Cluster");
if (cluster != nullptr) {
AgencyCallbackRegistry *registry = cluster->agencyCallbackRegistry();
if (registry != nullptr) {
_recoveryManager = new RecoveryManager(registry);
}
}
}
void PregelFeature::beginShutdown() { cleanupAll(); }
void PregelFeature::addExecution(Conductor* const exec,
@ -59,12 +81,8 @@ void PregelFeature::addExecution(Conductor* const exec,
Conductor* PregelFeature::conductor(uint64_t executionNumber) {
MUTEX_LOCKER(guard, _mutex);
auto it = _conductors.find(executionNumber);
if (it != _conductors.end())
return it->second;
else
return nullptr;
return it != _conductors.end() ? it->second : nullptr;
}
void PregelFeature::addWorker(IWorker* const worker,
@ -76,10 +94,7 @@ void PregelFeature::addWorker(IWorker* const worker,
IWorker* PregelFeature::worker(uint64_t executionNumber) {
MUTEX_LOCKER(guard, _mutex);
auto it = _workers.find(executionNumber);
if (it != _workers.end())
return it->second;
else
return nullptr;
return it != _workers.end() ? it->second : nullptr;
}
void PregelFeature::cleanup(uint64_t executionNumber) {
@ -107,3 +122,9 @@ void PregelFeature::cleanupAll() {
}
_workers.clear();
}
void PregelFeature::notifyConductors() {
for (auto it : _conductors) {
it.second->checkForWorkerOutage();
}
}

View File

@ -33,6 +33,7 @@ namespace pregel {
class Conductor;
class IWorker;
class RecoveryManager;
class PregelFeature final : public application_features::ApplicationFeature {
public:
@ -41,23 +42,26 @@ class PregelFeature final : public application_features::ApplicationFeature {
static PregelFeature* instance();
void start() override final;
void beginShutdown() override final;
uint64_t createExecutionNumber();
void addExecution(Conductor* const exec, uint64_t executionNumber);
Conductor* conductor(uint64_t executionNumber);
void notifyConductorOutage();
void notifyConductors();
void addWorker(IWorker* const worker, uint64_t executionNumber);
IWorker* worker(uint64_t executionNumber);
void cleanup(uint64_t executionNumber);
void cleanupAll();
RecoveryManager* recoveryManager() {return _recoveryManager;}
private:
std::unordered_map<uint64_t, Conductor*> _conductors;
std::unordered_map<uint64_t, IWorker*> _workers;
Mutex _mutex;
RecoveryManager* _recoveryManager;
};
}
}

View File

@ -22,32 +22,33 @@
#include "Recovery.h"
#include "Basics/Exceptions.h"
#include "Cluster/ClusterFeature.h"
#include "Basics/MutexLocker.h"
#include "Cluster/ClusterInfo.h"
#include "Pregel/Utils.h"
#include "Pregel/Conductor.h"
#include "Pregel/WorkerState.h"
#include "ApplicationFeatures/ApplicationServer.h"
#include "Pregel/PregelFeature.h"
#include "Agency/Supervision.h"
using namespace arangodb;
using namespace arangodb::pregel;
RecoveryManager::RecoveryManager(Conductor *c) : _conductor(c) {
ClusterFeature* cluster =
application_features::ApplicationServer::getFeature<ClusterFeature>(
"Cluster");
TRI_ASSERT(cluster != nullptr);
_agencyCallbackRegistry = cluster->agencyCallbackRegistry();
RecoveryManager::RecoveryManager(AgencyCallbackRegistry *registry) : _agencyCallbackRegistry(registry) {
}
RecoveryManager::~RecoveryManager() {
stopMonitoring();
}
void RecoveryManager::stopMonitoring() {
for (auto call : _agencyCallbacks) {
_agencyCallbackRegistry->unregisterCallback(call);
}
_agencyCallbacks.clear();
}
void RecoveryManager::monitorDBServers(std::vector<ServerID> const& dbServers) {
MUTEX_LOCKER(guard, _lock);
std::function<bool(VPackSlice const& result)> dbServerChanged =
[](VPackSlice const& result) {
@ -80,25 +81,59 @@ void RecoveryManager::monitorDBServers(std::vector<ServerID> const& dbServers) {
}
*dbServerResult = setErrormsg(TRI_ERROR_NO_ERROR, *errMsg);
}*/
PregelFeature::instance()->notifyConductors();
LOG(INFO) << result.toString();
return true;
};
//std::string const& dbName = _conductor->_vocbaseGuard.vocbase()->name();
for (auto server : dbServers) {
_statusMap[server] = "";
std::string path = "Supervision/Health/" + server + "/Status";
auto call = std::make_shared<AgencyCallback>(_agency, path,
dbServerChanged, true, false);
_agencyCallbacks.push_back(call);
_agencyCallbackRegistry->registerCallback(call);
}
std::string path = "Plan/Collections/_system/6500032";
auto call = std::make_shared<AgencyCallback>(_agency, path,
dbServerChanged, true, false);
_agencyCallbacks.push_back(call);
_agencyCallbackRegistry->registerCallback(call);
/*for (auto server : dbServers) {
auto const& it = _statusMap.find(server);
if (it == _statusMap.end()) {
std::string path = "Supervision/Health/" + server + "/Status/";
auto call = std::make_shared<AgencyCallback>(_agency, path,
dbServerChanged, true, false);
_agencyCallbacks.push_back(call);
_agencyCallbackRegistry->registerCallback(call);
}
}*/
}
bool RecoveryManager::allServersAvailable(std::vector<ServerID> const& dbServers) {
MUTEX_LOCKER(guard, _lock);
if (TRI_microtime() - _lastHealthCheck > 5.0) {
//Supervis
AgencyCommResult result = _agency.getValues("Supervision/Health/");
if (result.successful() && result.slice().isObject()) {
for (auto server : VPackObjectIterator(result.slice())) {
std::string status = server.value.get("Status").copyString();
_statusMap[server.key.copyString()] = status;
}
_lastHealthCheck = TRI_microtime();// I don't like this
}
}
return false;
std::string failed(consensus::Supervision::HEALTH_STATUS_FAILED);
for (auto const& server : dbServers) {
auto const& it = _statusMap.find(server);
if (it != _statusMap.end()) {
if (it->second == failed) {
return false;
}
}
}
return true;
}

View File

@ -23,6 +23,7 @@
#ifndef ARANGODB_PREGEL_RECOVERY_H
#define ARANGODB_PREGEL_RECOVERY_H 1
#include "Basics/Mutex.h"
#include "Cluster/ClusterInfo.h"
#include "Cluster/AgencyComm.h"
#include "Cluster/AgencyCallbackRegistry.h"
@ -32,7 +33,6 @@
namespace arangodb {
namespace pregel {
class Conductor;
template<typename V, typename E>
class GraphStore;
@ -40,16 +40,18 @@ class RecoveryManager {
AgencyComm _agency;
AgencyCallbackRegistry *_agencyCallbackRegistry;//weak
Conductor *_conductor;
Mutex _lock;
double _lastHealthCheck = 0;
std::map<ServerID, std::string> _statusMap;
std::vector<std::shared_ptr<AgencyCallback>> _agencyCallbacks;
public:
RecoveryManager(Conductor *c);
RecoveryManager(AgencyCallbackRegistry *registry);
~RecoveryManager();
void monitorDBServers(std::vector<ServerID> const& dbServers);
void stopMonitoring();
bool allServersAvailable(std::vector<ServerID> const& dbServers);
};

View File

@ -34,7 +34,6 @@
using namespace arangodb;
using namespace arangodb::pregel;
std::string const Utils::edgeShardingKey = "_vertex";
std::string const Utils::apiPrefix = "/_api/pregel/";
std::string const Utils::startExecutionPath = "startExecution";
std::string const Utils::prepareGSSPath = "prepareGSS";
@ -42,6 +41,7 @@ std::string const Utils::startGSSPath = "startGSS";
std::string const Utils::finishedGSSPath = "finishedGSS";
std::string const Utils::messagesPath = "messages";
std::string const Utils::finalizeExecutionPath = "finalizeExecution";
std::string const Utils::reconfigurePath = "reconfigure";
std::string const Utils::executionNumberKey = "exn";
std::string const Utils::collectionPlanIdMapKey = "collectionPlanIdMap";

View File

@ -46,6 +46,7 @@ class Utils {
static std::string const finishedGSSPath;
static std::string const messagesPath;
static std::string const finalizeExecutionPath;
static std::string const reconfigurePath;
static std::string const executionNumberKey;
static std::string const algorithmKey;

View File

@ -25,7 +25,6 @@
#include <velocypack/Slice.h>
#include <velocypack/velocypack-aliases.h>
#include <functional>
#include "AggregatorUsage.h"
#include "Basics/Common.h"
#include "Utils.h"