diff --git a/arangod/Pregel/Conductor.cpp b/arangod/Pregel/Conductor.cpp index 53149d4f08..6ec07f6e6a 100644 --- a/arangod/Pregel/Conductor.cpp +++ b/arangod/Pregel/Conductor.cpp @@ -168,7 +168,7 @@ void Conductor::start() { auto body = std::make_shared(b.toJson()); requests.emplace_back("server:" + it.first, rest::RequestType::POST, - baseUrl + Utils::nextGSSPath, body); + baseUrl + Utils::startExecutionPath, body); } size_t nrDone = 0; cc->performRequests(requests, 120.0, nrDone, LogTopic("Pregel Conductor")); @@ -176,6 +176,34 @@ void Conductor::start() { << _vertexCollections[0]->name(); // look at results printResults(requests); + + if (nrDone == requests.size()) { + startGlobalStep(); + } else { + LOG(ERR) << "Not all DBServers started the execution"; + } +} + +void Conductor::startGlobalStep() { + + VPackBuilder b; + b.openObject(); + b.add(Utils::executionNumberKey, VPackValue(_executionNumber)); + b.add(Utils::globalSuperstepKey, VPackValue(_globalSuperstep)); + b.add(Utils::aggregatorValuesKey, VPackValue(VPackValueType::Object)); + for (std::unique_ptr& aggregator : _aggregators) { + aggregator->serializeValue(b); + } + b.close(); + b.close(); + + std::string baseUrl = Utils::baseUrl(_vocbaseGuard.vocbase()->name()); + sendToAllDBServers(baseUrl + Utils::startGSSPath, b.slice()); + + for (std::unique_ptr& aggregator : _aggregators) { + aggregator->reset(); + } + LOG(INFO) << "Conductor started new gss " << _globalSuperstep; } void Conductor::finishedGlobalStep(VPackSlice& data) { @@ -207,8 +235,8 @@ void Conductor::finishedGlobalStep(VPackSlice& data) { LOG(INFO) << "Finished gss " << _globalSuperstep; _globalSuperstep++; - std::string baseUrl = Utils::baseUrl(_vocbaseGuard.vocbase()->name()); if (_doneCount == _dbServerCount || _globalSuperstep == 101) { + std::string baseUrl = Utils::baseUrl(_vocbaseGuard.vocbase()->name()); LOG(INFO) << "Done. We did " << _globalSuperstep - 1 << " rounds"; VPackBuilder b; b.openObject(); @@ -217,24 +245,8 @@ void Conductor::finishedGlobalStep(VPackSlice& data) { b.close(); sendToAllDBServers(baseUrl + Utils::finalizeExecutionPath, b.slice()); _state = ExecutionState::FINISHED; - } else { // trigger next superstep - VPackBuilder b; - b.openObject(); - b.add(Utils::executionNumberKey, VPackValue(_executionNumber)); - b.add(Utils::globalSuperstepKey, VPackValue(_globalSuperstep)); - b.add(Utils::aggregatorValuesKey, VPackValue(VPackValueType::Object)); - for (std::unique_ptr& aggregator : _aggregators) { - aggregator->serializeValue(b); - } - b.close(); - b.close(); - sendToAllDBServers(baseUrl + Utils::nextGSSPath, b.slice()); - - for (std::unique_ptr& aggregator : _aggregators) { - aggregator->reset(); - } - LOG(INFO) << "Conductor started new gss " << _globalSuperstep; + startGlobalStep(); } } } @@ -248,7 +260,7 @@ int Conductor::sendToAllDBServers(std::string path, VPackSlice const& config) { _doneCount = 0; if (_dbServerCount == 0) { - LOG(WARN) << "No shards registered for " << _vertexCollections[0]->name(); + LOG(WARN) << "No servers registered"; return TRI_ERROR_FAILED; } @@ -261,8 +273,7 @@ int Conductor::sendToAllDBServers(std::string path, VPackSlice const& config) { size_t nrDone = 0; cc->performRequests(requests, 120.0, nrDone, LogTopic("Pregel Conductor")); - LOG(INFO) << "Send messages to " << nrDone << " shards of " - << _vertexCollections[0]->name(); + LOG(INFO) << "Send messages to " << nrDone << " servers"; printResults(requests); return TRI_ERROR_NO_ERROR; diff --git a/arangod/Pregel/Conductor.h b/arangod/Pregel/Conductor.h index 70d35980ae..db9d00101c 100644 --- a/arangod/Pregel/Conductor.h +++ b/arangod/Pregel/Conductor.h @@ -53,10 +53,10 @@ class Conductor { private: Mutex _finishedGSSMutex; // prevents concurrent calls to finishedGlobalStep + VocbaseGuard _vocbaseGuard; const unsigned int _executionNumber; std::string _algorithm; - ExecutionState _state = ExecutionState::RUNNING; std::vector> _aggregators; @@ -69,7 +69,7 @@ class Conductor { int32_t _responseCount = 0; int32_t _doneCount = 0; - // convenience method + void startGlobalStep(); int sendToAllDBServers(std::string url, VPackSlice const& body); }; } diff --git a/arangod/Pregel/OutgoingCache.cpp b/arangod/Pregel/OutgoingCache.cpp index 116280f74c..fd1dec525e 100644 --- a/arangod/Pregel/OutgoingCache.cpp +++ b/arangod/Pregel/OutgoingCache.cpp @@ -134,7 +134,7 @@ void OutgoingCache::sendMessageTo(std::string const& toValue, template void OutgoingCache::sendMessages() { - LOG(INFO) << "Beggining to send messages to other machines"; + LOG(INFO) << "Beginning to send messages to other machines"; std::vector requests; for (auto const& it : _map) { diff --git a/arangod/Pregel/PregelFeature.cpp b/arangod/Pregel/PregelFeature.cpp index 3f16e7ca72..cc7bae92b1 100644 --- a/arangod/Pregel/PregelFeature.cpp +++ b/arangod/Pregel/PregelFeature.cpp @@ -23,13 +23,13 @@ #include "PregelFeature.h" #include "Conductor.h" #include "Worker.h" +#include "Cluster/ClusterInfo.h" using namespace arangodb::pregel; static PregelFeature* Instance; -static unsigned int _exeI = 0; -unsigned int PregelFeature::createExecutionNumber() { return ++_exeI; } +unsigned int PregelFeature::createExecutionNumber() { return ClusterInfo::instance()->uniqid(); } PregelFeature::PregelFeature(application_features::ApplicationServer* server) : ApplicationFeature(server, "Pregel") { diff --git a/arangod/Pregel/Utils.cpp b/arangod/Pregel/Utils.cpp index d8d6dc3ece..c21c19f143 100644 --- a/arangod/Pregel/Utils.cpp +++ b/arangod/Pregel/Utils.cpp @@ -33,7 +33,8 @@ using namespace arangodb::pregel; std::string const Utils::apiPrefix = "/_api/pregel/"; -std::string const Utils::nextGSSPath = "nextGSS"; +std::string const Utils::startExecutionPath = "startExecution"; +std::string const Utils::startGSSPath = "startGSS"; std::string const Utils::finishedGSSPath = "finishedGSS"; std::string const Utils::messagesPath = "messages"; std::string const Utils::finalizeExecutionPath = "finalizeExecution"; diff --git a/arangod/Pregel/Utils.h b/arangod/Pregel/Utils.h index ea7ce2493c..d1e0badba4 100644 --- a/arangod/Pregel/Utils.h +++ b/arangod/Pregel/Utils.h @@ -38,7 +38,8 @@ class Utils { // constants static std::string const apiPrefix; - static std::string const nextGSSPath; + static std::string const startExecutionPath; + static std::string const startGSSPath; static std::string const finishedGSSPath; static std::string const messagesPath; static std::string const finalizeExecutionPath; diff --git a/arangod/Pregel/Worker.cpp b/arangod/Pregel/Worker.cpp index 418f27f6de..7d34fa1e88 100644 --- a/arangod/Pregel/Worker.cpp +++ b/arangod/Pregel/Worker.cpp @@ -100,7 +100,7 @@ Worker::~Worker() { /// @brief Setup next superstep template -void Worker::nextGlobalStep(VPackSlice data) { +void Worker::startGlobalStep(VPackSlice data) { LOG(INFO) << "Called next global step: " << data.toJson(); // TODO do some work? @@ -121,10 +121,12 @@ void Worker::nextGlobalStep(VPackSlice data) { // parse aggregated values VPackSlice aggregatedValues = data.get(Utils::aggregatorValuesKey); - for (auto const& pair : _aggregators) { - VPackSlice val = aggregatedValues.get(pair.second->name()); - if (!val.isNone()) { - pair.second->setAggregatedValue(val); + if (aggregatedValues.isObject()) { + for (auto const& pair : _aggregators) { + VPackSlice val = aggregatedValues.get(pair.second->name()); + if (!val.isNone()) { + pair.second->setAggregatedValue(val); + } } } diff --git a/arangod/Pregel/Worker.h b/arangod/Pregel/Worker.h index d7b07179d8..995fd07cac 100644 --- a/arangod/Pregel/Worker.h +++ b/arangod/Pregel/Worker.h @@ -34,7 +34,7 @@ namespace pregel { class IWorker { public: virtual ~IWorker(){}; - virtual void nextGlobalStep(VPackSlice data) = 0; // called by coordinator + virtual void startGlobalStep(VPackSlice data) = 0; // called by coordinator virtual void receivedMessages(VPackSlice data) = 0; virtual void finalizeExecution(VPackSlice data) = 0; @@ -51,7 +51,7 @@ class Worker : public IWorker { std::shared_ptr> context); ~Worker(); - void nextGlobalStep(VPackSlice data) override; // called by coordinator + void startGlobalStep(VPackSlice data) override; // called by coordinator void receivedMessages(VPackSlice data) override; void finalizeExecution(VPackSlice data) override; diff --git a/arangod/RestHandler/RestPregelHandler.cpp b/arangod/RestHandler/RestPregelHandler.cpp index c20d5994e4..2e94cc7461 100644 --- a/arangod/RestHandler/RestPregelHandler.cpp +++ b/arangod/RestHandler/RestPregelHandler.cpp @@ -68,6 +68,27 @@ RestStatus RestPregelHandler::execute() { generateError(rest::ResponseCode::NOT_FOUND, TRI_ERROR_HTTP_NOT_FOUND); return RestStatus::DONE; + } else if (suffix[0] == Utils::startExecutionPath) { + IWorker *w = PregelFeature::instance()->worker(executionNumber); + if (w) { + LOG(ERR) << "Worker with this execution number already exists."; + generateError(rest::ResponseCode::BAD, + TRI_ERROR_HTTP_FORBIDDEN); + return RestStatus::DONE; + } + LOG(INFO) << "creating worker"; + w = IWorker::createWorker(_vocbase, body); + PregelFeature::instance()->addWorker(w, executionNumber); + } else if (suffix[0] == Utils::startGSSPath) { + IWorker *w = PregelFeature::instance()->worker(executionNumber); + if (w) { + w->startGlobalStep(body); + } else { + LOG(ERR) << "Invalid execution number, worker does not exist."; + generateError(rest::ResponseCode::NOT_FOUND, + TRI_ERROR_HTTP_NOT_FOUND); + return RestStatus::DONE; + } } else if (suffix[0] == Utils::finishedGSSPath) { Conductor *exe = PregelFeature::instance()->conductor(executionNumber); if (exe) { @@ -75,14 +96,6 @@ RestStatus RestPregelHandler::execute() { } else { LOG(ERR) << "Conductor not found: " << executionNumber; } - } else if (suffix[0] == Utils::nextGSSPath) { - IWorker *w = PregelFeature::instance()->worker(executionNumber); - if (!w) {// can happen if gss == 0 - LOG(INFO) << "creating worker"; - w = IWorker::createWorker(_vocbase, body); - PregelFeature::instance()->addWorker(w, executionNumber); - } - w->nextGlobalStep(body); } else if (suffix[0] == Utils::messagesPath) { LOG(INFO) << "messages"; IWorker *exe = PregelFeature::instance()->worker(executionNumber);