1
0
Fork 0

New Features

This commit is contained in:
Simon Grätzer 2016-11-25 12:27:24 +01:00
parent 66951f4f8e
commit 273c906415
6 changed files with 69 additions and 51 deletions

View File

@ -63,7 +63,7 @@ Conductor::Conductor(
: _vocbaseGuard(vocbase),
_executionNumber(executionNumber),
_algorithm(algorithm),
_state(ExecutionState::DEFAULT),
_recoveryManager(this),
_vertexCollections(vertexCollections),
_edgeCollections(edgeCollections) {
bool isCoordinator = ServerState::instance()->isCoordinator();
@ -147,7 +147,6 @@ void Conductor::start(VPackSlice userConfig) {
_startTimeSecs = TRI_microtime();
_globalSuperstep = 0;
_state = ExecutionState::RUNNING;
_dbServerCount = _dbServers.size();
_responseCount = 0;
_doneCount = 0;
if (vertexMap.size() != edgeMap.size()) {
@ -214,7 +213,9 @@ void Conductor::start(VPackSlice userConfig) {
printResults(requests);
if (nrDone == requests.size()) {
startGlobalStep();
if (_startGlobalStep()) {
_recoveryManager.monitorDBServers(_dbServers);
}
} else {
LOG(ERR) << "Not all DBServers started the execution";
}
@ -222,7 +223,7 @@ void Conductor::start(VPackSlice userConfig) {
// only called by the conductor, is protected by the
// mutex locked in finishedGlobalStep
void Conductor::startGlobalStep() {
bool Conductor::_startGlobalStep() {
VPackBuilder b;
b.openObject();
b.add(Utils::executionNumberKey, VPackValue(_executionNumber));
@ -240,17 +241,19 @@ void Conductor::startGlobalStep() {
std::string baseUrl = Utils::baseUrl(_vocbaseGuard.vocbase()->name());
// first allow all workers to run worker level operations
int r = sendToAllDBServers(baseUrl + Utils::prepareGSSPath, b.slice());
int r = _sendToAllDBServers(baseUrl + Utils::prepareGSSPath, b.slice());
if (r == TRI_ERROR_NO_ERROR) {
// start vertex level operations, does not get a response
sendToAllDBServers(baseUrl + Utils::startGSSPath, b.slice());// call me maybe
_sendToAllDBServers(baseUrl + Utils::startGSSPath, b.slice());// call me maybe
LOG(INFO) << "Conductor started new gss " << _globalSuperstep;
return true;
} else {
LOG(INFO) << "Seems there is at least one worker out of order";
// TODO, in case a worker needs more than 5 minutes to do calculations
// this will be triggered as well
// TODO handle cluster failures
return false;
}
}
@ -275,24 +278,22 @@ void Conductor::finishedGlobalStep(VPackSlice& data) {
_doneCount++;
}
if (_responseCount == _dbServerCount) {
if (_responseCount == _dbServers.size()) {
LOG(INFO) << "Finished gss " << _globalSuperstep;
_globalSuperstep++;
if (_state != ExecutionState::RUNNING
|| _doneCount == _dbServerCount
|| _doneCount == _dbServers.size()
|| _globalSuperstep == 100) {
LOG(INFO) << "Done. We did " << _globalSuperstep << " rounds";
LOG(INFO) << "Send: " << _workerStats.sendCount
<< " Received: " << _workerStats.receivedCount;
LOG(INFO) << "Worker Runtime: " << _workerStats.superstepRuntimeSecs << "s";
LOG(INFO) << "Total Runtim: " << TRI_microtime() - _startTimeSecs << "s";
_endTimeSecs = TRI_microtime();
bool storeResults = _state == ExecutionState::RUNNING;
if (_state == ExecutionState::CANCELED) {
LOG(WARN) << "Execution was canceled, results will be discarded.";
} else {
_state = ExecutionState::DONE;
}
VPackBuilder b;
b.openObject();
b.add(Utils::executionNumberKey, VPackValue(_executionNumber));
@ -300,24 +301,27 @@ void Conductor::finishedGlobalStep(VPackSlice& data) {
b.add(Utils::storeResultsKey, VPackValue(storeResults));
b.close();
std::string baseUrl = Utils::baseUrl(_vocbaseGuard.vocbase()->name());
sendToAllDBServers(baseUrl + Utils::finalizeExecutionPath, b.slice());
_sendToAllDBServers(baseUrl + Utils::finalizeExecutionPath, b.slice());
LOG(INFO) << "Done. We did " << _globalSuperstep << " rounds";
LOG(INFO) << "Send: " << _workerStats.sendCount
<< " Received: " << _workerStats.receivedCount;
LOG(INFO) << "Worker Runtime: " << _workerStats.superstepRuntimeSecs << "s";
LOG(INFO) << "Total Runtime: " << totalRuntimeSecs() << "s";
_state = ExecutionState::DONE;
} else { // trigger next superstep
startGlobalStep();
_startGlobalStep();
}
}
}
void Conductor::cancel() { _state = ExecutionState::CANCELED; }
void Conductor::cancel() {_state = ExecutionState::CANCELED; }
int Conductor::sendToAllDBServers(std::string path, VPackSlice const& config) {
int Conductor::_sendToAllDBServers(std::string path, VPackSlice const& config) {
ClusterComm* cc = ClusterComm::instance();
_dbServerCount = _dbServers.size();
_responseCount = 0;
_doneCount = 0;
if (_dbServerCount == 0) {
if (_dbServers.size() == 0) {
LOG(WARN) << "No servers registered";
return TRI_ERROR_FAILED;
}

View File

@ -30,6 +30,7 @@
#include "Cluster/ClusterInfo.h"
#include "Pregel/AggregatorUsage.h"
#include "Pregel/Statistics.h"
#include "Pregel/Recovery.h"
#include "VocBase/vocbase.h"
namespace arangodb {
@ -41,10 +42,13 @@ enum ExecutionState { DEFAULT, RUNNING, DONE, CANCELED };
class Conductor {
friend class arangodb::RestPregelHandler;
VocbaseGuard _vocbaseGuard;
ExecutionState _state = ExecutionState::DEFAULT;
const VocbaseGuard _vocbaseGuard;
const uint64_t _executionNumber;
const std::string _algorithm;
ExecutionState _state;
Mutex _finishedGSSMutex; // prevents concurrent calls to finishedGlobalStep
RecoveryManager _recoveryManager;
std::vector<std::shared_ptr<LogicalCollection>> _vertexCollections;
std::vector<std::shared_ptr<LogicalCollection>> _edgeCollections;
std::vector<ServerID> _dbServers;
@ -53,16 +57,13 @@ class Conductor {
std::unique_ptr<IAggregatorCreator> _agregatorCreator;
std::unique_ptr<AggregatorUsage> _aggregatorUsage;
double _startTimeSecs = 0;
double _startTimeSecs = 0, _endTimeSecs = 0;
uint64_t _globalSuperstep = 0;
int32_t _dbServerCount = 0;
int32_t _responseCount = 0;
int32_t _doneCount = 0;
uint32_t _responseCount = 0, _doneCount = 0;
WorkerStats _workerStats;
Mutex _finishedGSSMutex; // prevents concurrent calls to finishedGlobalStep
void startGlobalStep();
int sendToAllDBServers(std::string url, VPackSlice const& body);
bool _startGlobalStep();
int _sendToAllDBServers(std::string url, VPackSlice const& body);
// === REST callbacks ===
void finishedGlobalStep(VPackSlice& data);
@ -80,6 +81,9 @@ class Conductor {
ExecutionState getState() const { return _state; }
WorkerStats workerStats() const {return _workerStats;}
uint64_t globalSuperstep() const {return _globalSuperstep;}
double totalRuntimeSecs() {
return _endTimeSecs == 0 ? TRI_microtime() - _startTimeSecs : _endTimeSecs - _startTimeSecs;
}
};
}
}

View File

@ -46,6 +46,7 @@ class PregelFeature final : public application_features::ApplicationFeature {
uint64_t createExecutionNumber();
void addExecution(Conductor* const exec, uint64_t executionNumber);
Conductor* conductor(uint64_t executionNumber);
void notifyConductorOutage();
void addWorker(IWorker* const worker, uint64_t executionNumber);
IWorker* worker(uint64_t executionNumber);

View File

@ -26,6 +26,7 @@
#include "Cluster/ClusterFeature.h"
#include "Cluster/ClusterInfo.h"
#include "Pregel/Utils.h"
#include "Pregel/Conductor.h"
#include "Pregel/WorkerState.h"
#include "ApplicationFeatures/ApplicationServer.h"
@ -40,12 +41,17 @@ RecoveryManager::RecoveryManager(Conductor *c) : _conductor(c) {
_agencyCallbackRegistry = cluster->agencyCallbackRegistry();
}
void RecoveryManager::monitorCollections(std::vector<std::shared_ptr<LogicalCollection>> const& collections) {
RecoveryManager::~RecoveryManager() {
for (auto call : _agencyCallbacks) {
_agencyCallbackRegistry->unregisterCallback(call);
}
}
/*
void RecoveryManager::monitorDBServers(std::vector<ServerID> const& dbServers) {
std::function<bool(VPackSlice const& result)> dbServerChanged =
[=](VPackSlice const& result) {
if (result.isObject() && result.length() == (size_t)numberOfShards) {
[](VPackSlice const& result) {
/*if (result.isObject() && result.length() == (size_t)numberOfShards) {
std::string tmpMsg = "";
bool tmpHaveError = false;
@ -73,21 +79,20 @@ void RecoveryManager::monitorCollections(std::vector<std::shared_ptr<LogicalColl
return true;
}
*dbServerResult = setErrormsg(TRI_ERROR_NO_ERROR, *errMsg);
}
}*/
LOG(INFO) << result.toString();
return true;
};
//std::string const& dbName = _conductor->_vocbaseGuard.vocbase()->name();
// ATTENTION: The following callback calls the above closure in a
// different thread. Nevertheless, the closure accesses some of our
// local variables. Therefore we have to protect all accesses to them
// by a mutex. We use the mutex of the condition variable in the
// AgencyCallback for this.
auto agencyCallback = std::make_shared<AgencyCallback>(
ac, "Current/Collections/" + databaseName + "/" + collectionID,
dbServerChanged, true, false);
_agencyCallbackRegistry->registerCallback(agencyCallback);
TRI_DEFER(_agencyCallbackRegistry->unregisterCallback(agencyCallback));*/
for (auto server : dbServers) {
_statusMap[server] = "";
std::string path = "Supervision/Health/" + server + "/Status";
auto call = std::make_shared<AgencyCallback>(_agency, path,
dbServerChanged, true, false);
_agencyCallbacks.push_back(call);
_agencyCallbackRegistry->registerCallback(call);
}
}

View File

@ -39,14 +39,17 @@ class GraphStore;
class RecoveryManager {
AgencyComm _agency;
AgencyCallbackRegistry *_agencyCallbackRegistry;
Conductor *_conductor;// weak
AgencyCallbackRegistry *_agencyCallbackRegistry;//weak
Conductor *_conductor;
std::map<ServerID, std::string> _statusMap;
std::vector<std::shared_ptr<AgencyCallback>> _agencyCallbacks;
public:
RecoveryManager(Conductor *c);
~RecoveryManager() {}
~RecoveryManager();
void monitorCollections(std::vector<std::shared_ptr<LogicalCollection>> const& collections);
void monitorDBServers(std::vector<ServerID> const& dbServers);
bool allServersAvailable(std::vector<ServerID> const& dbServers);
};

View File

@ -1964,6 +1964,7 @@ static void JS_PregelStatus(v8::FunctionCallbackInfo<v8::Value> const& args) {
result.add("running", VPackValue(c->getState() == pregel::ExecutionState::RUNNING,
VPackValueType::Bool));
result.add("gss", VPackValue(c->globalSuperstep()));
result.add("totalRuntime", VPackValue(c->totalRuntimeSecs()));
c->workerStats().serializeValues(result);
result.close();
TRI_V8_RETURN(TRI_VPackToV8(isolate, result.slice()));