From 81bd732fe10571ec1fe94ca0718bd6761c1cedef Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Simon=20Gr=C3=A4tzer?= Date: Fri, 15 Dec 2017 15:52:24 +0100 Subject: [PATCH] Pregel: fixing nullptr access (#4055) * fixing issue with nullptr access * Fix cluster test --- arangod/Pregel/AlgoRegistry.cpp | 6 ++--- arangod/Pregel/AlgoRegistry.h | 6 ++--- arangod/Pregel/Conductor.cpp | 37 +++++++++++++++++------------- arangod/Pregel/PregelFeature.cpp | 10 ++++---- arangod/Pregel/PregelFeature.h | 4 ++-- arangod/V8Server/v8-collection.cpp | 6 ++--- 6 files changed, 38 insertions(+), 31 deletions(-) diff --git a/arangod/Pregel/AlgoRegistry.cpp b/arangod/Pregel/AlgoRegistry.cpp index 1e40029bfb..f1d7855c5c 100644 --- a/arangod/Pregel/AlgoRegistry.cpp +++ b/arangod/Pregel/AlgoRegistry.cpp @@ -75,12 +75,12 @@ IAlgorithm* AlgoRegistry::createAlgorithm(std::string const& algorithm, } template -IWorker* AlgoRegistry::createWorker(TRI_vocbase_t* vocbase, +std::unique_ptr AlgoRegistry::createWorker(TRI_vocbase_t* vocbase, Algorithm* algo, VPackSlice body) { - return new Worker(vocbase, algo, body); + return std::make_unique>(vocbase, algo, body); } -IWorker* AlgoRegistry::createWorker(TRI_vocbase_t* vocbase, VPackSlice body) { +std::unique_ptr AlgoRegistry::createWorker(TRI_vocbase_t* vocbase, VPackSlice body) { VPackSlice algoSlice = body.get(Utils::algorithmKey); if (!algoSlice.isString()) { THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_BAD_PARAMETER, diff --git a/arangod/Pregel/AlgoRegistry.h b/arangod/Pregel/AlgoRegistry.h index da6a501ea9..0d2fffd14a 100644 --- a/arangod/Pregel/AlgoRegistry.h +++ b/arangod/Pregel/AlgoRegistry.h @@ -33,12 +33,12 @@ namespace pregel { struct AlgoRegistry { static IAlgorithm* createAlgorithm(std::string const& algorithm, VPackSlice userParams); - static IWorker* createWorker(TRI_vocbase_t* vocbase, VPackSlice body); + static std::unique_ptr createWorker(TRI_vocbase_t* vocbase, VPackSlice body); private: template - static IWorker* createWorker(TRI_vocbase_t* vocbase, Algorithm* algo, - VPackSlice body); + static std::unique_ptr createWorker(TRI_vocbase_t* vocbase, Algorithm* algo, + VPackSlice body); }; } } diff --git a/arangod/Pregel/Conductor.cpp b/arangod/Pregel/Conductor.cpp index 65b830138b..eb56199d52 100644 --- a/arangod/Pregel/Conductor.cpp +++ b/arangod/Pregel/Conductor.cpp @@ -605,27 +605,32 @@ int Conductor::_initializeWorkers(std::string const& suffix, b.close(); b.close(); - // only on single server + // hack for singke serveronly on single server if (ServerState::instance()->getRole() == ServerState::ROLE_SINGLE) { - std::shared_ptr w = - PregelFeature::instance()->worker(_executionNumber); - if (!w) { - TRI_vocbase_t* vocbase = _vocbaseGuard.database(); - std::unique_ptr w(AlgoRegistry::createWorker(vocbase, b.slice())); - TRI_ASSERT(w != nullptr); - PregelFeature::instance()->addWorker(w.get(), _executionNumber); - w.release()->setupWorker(); - } else { + TRI_ASSERT(vertexMap.size() == 1); + PregelFeature* feature = PregelFeature::instance(); + + std::shared_ptr worker = feature->worker(_executionNumber); + if (worker) { THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, - "a worker with this execution number already exists."); + "a worker with this execution number already exists."); } + + TRI_vocbase_t* vocbase = _vocbaseGuard.database(); + auto created = AlgoRegistry::createWorker(vocbase, b.slice()); + TRI_ASSERT(created.get() != nullptr); + PregelFeature::instance()->addWorker(std::move(created), _executionNumber); + worker = PregelFeature::instance()->worker(_executionNumber); + TRI_ASSERT (worker); + worker->setupWorker(); return TRI_ERROR_NO_ERROR; + + } else { + auto body = std::make_shared(b.toJson()); + requests.emplace_back("server:" + server, rest::RequestType::POST, path, + body); + LOG_TOPIC(DEBUG, Logger::PREGEL) << "Initializing Server " << server; } - - auto body = std::make_shared(b.toJson()); - requests.emplace_back("server:" + server, rest::RequestType::POST, path, - body); - LOG_TOPIC(DEBUG, Logger::PREGEL) << "Initializing Server " << server; } std::shared_ptr cc = ClusterComm::instance(); diff --git a/arangod/Pregel/PregelFeature.cpp b/arangod/Pregel/PregelFeature.cpp index c3d34c18f1..5317b8e1c2 100644 --- a/arangod/Pregel/PregelFeature.cpp +++ b/arangod/Pregel/PregelFeature.cpp @@ -109,10 +109,11 @@ void PregelFeature::beginShutdown() { Instance = nullptr; } -void PregelFeature::addConductor(Conductor* const c, uint64_t executionNumber) { +void PregelFeature::addConductor(std::unique_ptr&& c, uint64_t executionNumber) { MUTEX_LOCKER(guard, _mutex); //_executions. - _conductors.emplace(executionNumber, std::shared_ptr(c)); + _conductors.emplace(executionNumber, std::shared_ptr(c.get())); + c.release(); } std::shared_ptr PregelFeature::conductor(uint64_t executionNumber) { @@ -121,9 +122,10 @@ std::shared_ptr PregelFeature::conductor(uint64_t executionNumber) { return it != _conductors.end() ? it->second : nullptr; } -void PregelFeature::addWorker(IWorker* const w, uint64_t executionNumber) { +void PregelFeature::addWorker(std::unique_ptr&& w, uint64_t executionNumber) { MUTEX_LOCKER(guard, _mutex); - _workers.emplace(executionNumber, std::shared_ptr(w)); + _workers.emplace(executionNumber, std::shared_ptr(w.get())); + w.release(); } std::shared_ptr PregelFeature::worker(uint64_t executionNumber) { diff --git a/arangod/Pregel/PregelFeature.h b/arangod/Pregel/PregelFeature.h index db525363da..3fde291dfa 100644 --- a/arangod/Pregel/PregelFeature.h +++ b/arangod/Pregel/PregelFeature.h @@ -48,10 +48,10 @@ class PregelFeature final : public application_features::ApplicationFeature { void beginShutdown() override final; uint64_t createExecutionNumber(); - void addConductor(Conductor* const exec, uint64_t executionNumber); + void addConductor(std::unique_ptr&&, uint64_t executionNumber); std::shared_ptr conductor(uint64_t executionNumber); - void addWorker(IWorker* const worker, uint64_t executionNumber); + void addWorker(std::unique_ptr&&, uint64_t executionNumber); std::shared_ptr worker(uint64_t executionNumber); void cleanupConductor(uint64_t executionNumber); diff --git a/arangod/V8Server/v8-collection.cpp b/arangod/V8Server/v8-collection.cpp index 009dddd0e7..8906338b27 100644 --- a/arangod/V8Server/v8-collection.cpp +++ b/arangod/V8Server/v8-collection.cpp @@ -2017,9 +2017,9 @@ static void JS_PregelStart(v8::FunctionCallbackInfo const& args) { uint64_t en = pregel::PregelFeature::instance()->createExecutionNumber(); auto c = std::make_unique(en, vocbase, paramVertices, edgeColls, algorithm, paramBuilder.slice()); - pregel::PregelFeature::instance()->addConductor(c.get(), en); - c->start(); - c.release(); + pregel::PregelFeature::instance()->addConductor(std::move(c), en); + TRI_ASSERT(pregel::PregelFeature::instance()->conductor(en)); + pregel::PregelFeature::instance()->conductor(en)->start(); TRI_V8_RETURN(v8::Number::New(isolate, static_cast(en))); TRI_V8_TRY_CATCH_END