diff --git a/arangod/CMakeLists.txt b/arangod/CMakeLists.txt index 8da42223d6..9ece379999 100644 --- a/arangod/CMakeLists.txt +++ b/arangod/CMakeLists.txt @@ -371,7 +371,7 @@ add_executable(${BIN_ARANGOD} Wal/Slots.cpp Wal/SynchronizerThread.cpp Pregel/Conductor.cpp - Pregel/JobMapping.cpp + Pregel/PregelFeature.cpp Pregel/InMessageCache.cpp Pregel/OutMessageCache.cpp Pregel/Vertex.cpp diff --git a/arangod/Pregel/JobMapping.cpp b/arangod/Pregel/PregelFeature.cpp similarity index 58% rename from arangod/Pregel/JobMapping.cpp rename to arangod/Pregel/PregelFeature.cpp index 4eea8f2e85..5553e52b52 100755 --- a/arangod/Pregel/JobMapping.cpp +++ b/arangod/Pregel/PregelFeature.cpp @@ -20,39 +20,63 @@ /// @author Simon Grätzer //////////////////////////////////////////////////////////////////////////////// -#include "JobMapping.h" +#include "PregelFeature.h" #include "Conductor.h" #include "Worker.h" using namespace arangodb::pregel; +static PregelFeature *Instance; + static unsigned int _exeI = 0; -unsigned int JobMapping::createExecutionNumber() { - return ++_exeI; +unsigned int PregelFeature::createExecutionNumber() { + return ++_exeI; } -void JobMapping::addExecution(Conductor* const exec, unsigned int executionNumber) { +PregelFeature::PregelFeature(application_features::ApplicationServer* server) +: ApplicationFeature(server, "Pregel") { + setOptional(true); + requiresElevatedPrivileges(false); + startsAfter("Database"); + startsAfter("Logger"); + startsAfter("Dispatcher"); + Instance = this; +} + +PregelFeature::~PregelFeature() { + cleanupAll(); +} + +PregelFeature* PregelFeature::instance() { + return Instance; +} + +void PregelFeature::beginShutdown() { + cleanupAll(); +} + +void PregelFeature::addExecution(Conductor* const exec, unsigned int executionNumber) { //_executions. _conductors[executionNumber] = exec; } -Conductor* JobMapping::conductor(int executionNumber) { +Conductor* PregelFeature::conductor(int executionNumber) { auto it = _conductors.find(executionNumber); if (it != _conductors.end()) return it->second; else return nullptr; } -void JobMapping::addWorker(Worker* const worker, unsigned int executionNumber) { +void PregelFeature::addWorker(Worker* const worker, unsigned int executionNumber) { _workers[executionNumber] = worker; } -Worker* JobMapping::worker(unsigned int executionNumber) { +Worker* PregelFeature::worker(unsigned int executionNumber) { auto it = _workers.find(executionNumber); if (it != _workers.end()) return it->second; else return nullptr; } -void JobMapping::cleanup(unsigned int executionNumber) { +void PregelFeature::cleanup(unsigned int executionNumber) { auto cit = _conductors.find(executionNumber); if (cit != _conductors.end()) { delete(cit->second); @@ -64,3 +88,14 @@ void JobMapping::cleanup(unsigned int executionNumber) { _workers.erase(executionNumber); } } + +void PregelFeature::cleanupAll() { + for (auto it : _conductors) { + delete(it.second); + } + _conductors.clear(); + for (auto it : _workers) { + delete(it.second); + } + _workers.clear(); +} diff --git a/arangod/Pregel/JobMapping.h b/arangod/Pregel/PregelFeature.h similarity index 77% rename from arangod/Pregel/JobMapping.h rename to arangod/Pregel/PregelFeature.h index 1f59c7caf0..632f283539 100755 --- a/arangod/Pregel/JobMapping.h +++ b/arangod/Pregel/PregelFeature.h @@ -20,10 +20,13 @@ /// @author Simon Grätzer //////////////////////////////////////////////////////////////////////////////// -#ifndef ARANGODB_PREGEL_JOBMAPPING_H -#define ARANGODB_PREGEL_JOBMAPPING_H 1 +#ifndef ARANGODB_PREGEL_FEATURE_H +#define ARANGODB_PREGEL_FEATURE_H 1 #include +#include "Basics/Common.h" +#include "ApplicationFeatures/ApplicationFeature.h" + namespace arangodb { namespace pregel { @@ -31,14 +34,15 @@ namespace pregel { class Conductor; class Worker; - class JobMapping { + class PregelFeature final + : public application_features::ApplicationFeature { public: + explicit PregelFeature(application_features::ApplicationServer* server); + ~PregelFeature(); + + static PregelFeature* instance(); - - static JobMapping* instance() { - static JobMapping *Instance = new JobMapping(); - return Instance; - }; + void beginShutdown() override final; unsigned int createExecutionNumber(); void addExecution(Conductor* const exec, unsigned int executionNumber); @@ -48,13 +52,11 @@ namespace pregel { Worker* worker(unsigned int executionNumber); void cleanup(unsigned int executionNumber); + void cleanupAll(); private: std::unordered_map _conductors; std::unordered_map _workers; - - JobMapping() {}; - JobMapping(const JobMapping &c) {}; }; } } diff --git a/arangod/Pregel/Worker.cpp b/arangod/Pregel/Worker.cpp index 3bf14ce054..cbbd8b2f0a 100755 --- a/arangod/Pregel/Worker.cpp +++ b/arangod/Pregel/Worker.cpp @@ -199,9 +199,9 @@ void Worker::nextGlobalStep(VPackSlice data) { std::unique_ptr job(new WorkerJob(this, _ctx)); int res = DispatcherFeature::DISPATCHER->addJob(job, true); -if (res != TRI_ERROR_NO_ERROR) { - LOG(ERR) << "Could not start worker job"; -} + if (res != TRI_ERROR_NO_ERROR) { + LOG(ERR) << "Could not start worker job"; + } LOG(INFO) << "Worker started new gss: " << gss; } diff --git a/arangod/RestHandler/RestPregelHandler.cpp b/arangod/RestHandler/RestPregelHandler.cpp index ffd63396b1..f099419370 100644 --- a/arangod/RestHandler/RestPregelHandler.cpp +++ b/arangod/RestHandler/RestPregelHandler.cpp @@ -28,7 +28,7 @@ #include #include -#include "Pregel/JobMapping.h" +#include "Pregel/PregelFeature.h" #include "Pregel/Conductor.h" #include "Pregel/Worker.h" #include "Pregel/Utils.h" @@ -69,31 +69,29 @@ RestHandler::status RestPregelHandler::execute() { generateError(rest::ResponseCode::NOT_FOUND, TRI_ERROR_HTTP_NOT_FOUND); return status::DONE; - } else if (suffix[0] == "finishedGSS") { - LOG(INFO) << "finishedGSS"; - Conductor *exe = JobMapping::instance()->conductor(executionNumber); + } else if (suffix[0] == Utils::finishedGSSPath) { + Conductor *exe = PregelFeature::instance()->conductor(executionNumber); if (exe) { exe->finishedGlobalStep(body); } else { LOG(ERR) << "Conductor not found: " << executionNumber; } - } else if (suffix[0] == "nextGSS") { - LOG(INFO) << "nextGSS"; - Worker *w = JobMapping::instance()->worker(executionNumber); + } else if (suffix[0] == Utils::nextGSSPath) { + Worker *w = PregelFeature::instance()->worker(executionNumber); if (!w) {// can happen if gss == 0 LOG(INFO) << "creating worker"; w = new Worker(executionNumber, _vocbase, body); - JobMapping::instance()->addWorker(w, executionNumber); + PregelFeature::instance()->addWorker(w, executionNumber); } w->nextGlobalStep(body); } else if (suffix[0] == "messages") { LOG(INFO) << "messages"; - Worker *exe = JobMapping::instance()->worker(executionNumber); + Worker *exe = PregelFeature::instance()->worker(executionNumber); if (exe) { exe->receivedMessages(body); } } else if (suffix[0] == "writeResults") { - Worker *exe = JobMapping::instance()->worker(executionNumber); + Worker *exe = PregelFeature::instance()->worker(executionNumber); if (exe) { exe->writeResults(); } diff --git a/arangod/RestServer/arangod.cpp b/arangod/RestServer/arangod.cpp index f3ff850b09..e0a6948e81 100644 --- a/arangod/RestServer/arangod.cpp +++ b/arangod/RestServer/arangod.cpp @@ -67,6 +67,7 @@ #include "RestServer/UnitTestsFeature.h" #include "RestServer/UpgradeFeature.h" #include "RestServer/WorkMonitorFeature.h" +#include "Pregel/PregelFeature.h" #include "Scheduler/SchedulerFeature.h" #include "Ssl/SslFeature.h" #include "Ssl/SslServerFeature.h" @@ -146,6 +147,7 @@ static int runServer(int argc, char** argv) { server.addFeature(new LoggerFeature(&server, true)); server.addFeature(new NonceFeature(&server)); server.addFeature(new PageSizeFeature(&server)); + server.addFeature(new pregel::PregelFeature(&server)); server.addFeature(new PrivilegeFeature(&server)); server.addFeature(new QueryRegistryFeature(&server)); server.addFeature(new TraverserEngineRegistryFeature(&server)); diff --git a/arangod/V8Server/v8-collection.cpp b/arangod/V8Server/v8-collection.cpp index 085becf29b..20e42d8595 100644 --- a/arangod/V8Server/v8-collection.cpp +++ b/arangod/V8Server/v8-collection.cpp @@ -55,7 +55,7 @@ #include "VocBase/modes.h" #include "Wal/LogfileManager.h" #include "Pregel/Conductor.h" -#include "Pregel/JobMapping.h" +#include "Pregel/PregelFeature.h" #include #include @@ -1869,9 +1869,9 @@ static void JS_Pregel(v8::FunctionCallbackInfo const& args) { TRI_V8_THROW_EXCEPTION_USAGE("Collections do not exist"); } - result = pregel::JobMapping::instance()->createExecutionNumber(); + result = pregel::PregelFeature::instance()->createExecutionNumber(); pregel::Conductor* e = new pregel::Conductor(result, vocbase, vertexColl, edgeColl, "todo"); - pregel::JobMapping::instance()->addExecution(e, result); + pregel::PregelFeature::instance()->addExecution(e, result); LOG(INFO) << "Starting..."; e->start();