1
0
Fork 0

Pregel: fixing nullptr access (#4055)

* fixing issue with nullptr access

* Fix cluster test
This commit is contained in:
Simon Grätzer 2017-12-15 15:52:24 +01:00 committed by Jan
parent b1463922f6
commit 81bd732fe1
6 changed files with 38 additions and 31 deletions

View File

@ -75,12 +75,12 @@ IAlgorithm* AlgoRegistry::createAlgorithm(std::string const& algorithm,
} }
template <typename V, typename E, typename M> template <typename V, typename E, typename M>
IWorker* AlgoRegistry::createWorker(TRI_vocbase_t* vocbase, std::unique_ptr<IWorker> AlgoRegistry::createWorker(TRI_vocbase_t* vocbase,
Algorithm<V, E, M>* algo, VPackSlice body) { Algorithm<V, E, M>* algo, VPackSlice body) {
return new Worker<V, E, M>(vocbase, algo, body); return std::make_unique<Worker<V, E, M>>(vocbase, algo, body);
} }
IWorker* AlgoRegistry::createWorker(TRI_vocbase_t* vocbase, VPackSlice body) { std::unique_ptr<IWorker> AlgoRegistry::createWorker(TRI_vocbase_t* vocbase, VPackSlice body) {
VPackSlice algoSlice = body.get(Utils::algorithmKey); VPackSlice algoSlice = body.get(Utils::algorithmKey);
if (!algoSlice.isString()) { if (!algoSlice.isString()) {
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_BAD_PARAMETER, THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_BAD_PARAMETER,

View File

@ -33,12 +33,12 @@ namespace pregel {
struct AlgoRegistry { struct AlgoRegistry {
static IAlgorithm* createAlgorithm(std::string const& algorithm, static IAlgorithm* createAlgorithm(std::string const& algorithm,
VPackSlice userParams); VPackSlice userParams);
static IWorker* createWorker(TRI_vocbase_t* vocbase, VPackSlice body); static std::unique_ptr<IWorker> createWorker(TRI_vocbase_t* vocbase, VPackSlice body);
private: private:
template <typename V, typename E, typename M> template <typename V, typename E, typename M>
static IWorker* createWorker(TRI_vocbase_t* vocbase, Algorithm<V, E, M>* algo, static std::unique_ptr<IWorker> createWorker(TRI_vocbase_t* vocbase, Algorithm<V, E, M>* algo,
VPackSlice body); VPackSlice body);
}; };
} }
} }

View File

@ -605,27 +605,32 @@ int Conductor::_initializeWorkers(std::string const& suffix,
b.close(); b.close();
b.close(); b.close();
// only on single server // hack for singke serveronly on single server
if (ServerState::instance()->getRole() == ServerState::ROLE_SINGLE) { if (ServerState::instance()->getRole() == ServerState::ROLE_SINGLE) {
std::shared_ptr<IWorker> w = TRI_ASSERT(vertexMap.size() == 1);
PregelFeature::instance()->worker(_executionNumber); PregelFeature* feature = PregelFeature::instance();
if (!w) {
TRI_vocbase_t* vocbase = _vocbaseGuard.database(); std::shared_ptr<IWorker> worker = feature->worker(_executionNumber);
std::unique_ptr<IWorker> w(AlgoRegistry::createWorker(vocbase, b.slice())); if (worker) {
TRI_ASSERT(w != nullptr);
PregelFeature::instance()->addWorker(w.get(), _executionNumber);
w.release()->setupWorker();
} else {
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL,
"a worker with this execution number already exists."); "a worker with this execution number already exists.");
} }
TRI_vocbase_t* vocbase = _vocbaseGuard.database();
auto created = AlgoRegistry::createWorker(vocbase, b.slice());
TRI_ASSERT(created.get() != nullptr);
PregelFeature::instance()->addWorker(std::move(created), _executionNumber);
worker = PregelFeature::instance()->worker(_executionNumber);
TRI_ASSERT (worker);
worker->setupWorker();
return TRI_ERROR_NO_ERROR; return TRI_ERROR_NO_ERROR;
} else {
auto body = std::make_shared<std::string const>(b.toJson());
requests.emplace_back("server:" + server, rest::RequestType::POST, path,
body);
LOG_TOPIC(DEBUG, Logger::PREGEL) << "Initializing Server " << server;
} }
auto body = std::make_shared<std::string const>(b.toJson());
requests.emplace_back("server:" + server, rest::RequestType::POST, path,
body);
LOG_TOPIC(DEBUG, Logger::PREGEL) << "Initializing Server " << server;
} }
std::shared_ptr<ClusterComm> cc = ClusterComm::instance(); std::shared_ptr<ClusterComm> cc = ClusterComm::instance();

View File

@ -109,10 +109,11 @@ void PregelFeature::beginShutdown() {
Instance = nullptr; Instance = nullptr;
} }
void PregelFeature::addConductor(Conductor* const c, uint64_t executionNumber) { void PregelFeature::addConductor(std::unique_ptr<Conductor>&& c, uint64_t executionNumber) {
MUTEX_LOCKER(guard, _mutex); MUTEX_LOCKER(guard, _mutex);
//_executions. //_executions.
_conductors.emplace(executionNumber, std::shared_ptr<Conductor>(c)); _conductors.emplace(executionNumber, std::shared_ptr<Conductor>(c.get()));
c.release();
} }
std::shared_ptr<Conductor> PregelFeature::conductor(uint64_t executionNumber) { std::shared_ptr<Conductor> PregelFeature::conductor(uint64_t executionNumber) {
@ -121,9 +122,10 @@ std::shared_ptr<Conductor> PregelFeature::conductor(uint64_t executionNumber) {
return it != _conductors.end() ? it->second : nullptr; return it != _conductors.end() ? it->second : nullptr;
} }
void PregelFeature::addWorker(IWorker* const w, uint64_t executionNumber) { void PregelFeature::addWorker(std::unique_ptr<IWorker>&& w, uint64_t executionNumber) {
MUTEX_LOCKER(guard, _mutex); MUTEX_LOCKER(guard, _mutex);
_workers.emplace(executionNumber, std::shared_ptr<IWorker>(w)); _workers.emplace(executionNumber, std::shared_ptr<IWorker>(w.get()));
w.release();
} }
std::shared_ptr<IWorker> PregelFeature::worker(uint64_t executionNumber) { std::shared_ptr<IWorker> PregelFeature::worker(uint64_t executionNumber) {

View File

@ -48,10 +48,10 @@ class PregelFeature final : public application_features::ApplicationFeature {
void beginShutdown() override final; void beginShutdown() override final;
uint64_t createExecutionNumber(); uint64_t createExecutionNumber();
void addConductor(Conductor* const exec, uint64_t executionNumber); void addConductor(std::unique_ptr<Conductor>&&, uint64_t executionNumber);
std::shared_ptr<Conductor> conductor(uint64_t executionNumber); std::shared_ptr<Conductor> conductor(uint64_t executionNumber);
void addWorker(IWorker* const worker, uint64_t executionNumber); void addWorker(std::unique_ptr<IWorker>&&, uint64_t executionNumber);
std::shared_ptr<IWorker> worker(uint64_t executionNumber); std::shared_ptr<IWorker> worker(uint64_t executionNumber);
void cleanupConductor(uint64_t executionNumber); void cleanupConductor(uint64_t executionNumber);

View File

@ -2017,9 +2017,9 @@ static void JS_PregelStart(v8::FunctionCallbackInfo<v8::Value> const& args) {
uint64_t en = pregel::PregelFeature::instance()->createExecutionNumber(); uint64_t en = pregel::PregelFeature::instance()->createExecutionNumber();
auto c = std::make_unique<pregel::Conductor>(en, vocbase, paramVertices, edgeColls, auto c = std::make_unique<pregel::Conductor>(en, vocbase, paramVertices, edgeColls,
algorithm, paramBuilder.slice()); algorithm, paramBuilder.slice());
pregel::PregelFeature::instance()->addConductor(c.get(), en); pregel::PregelFeature::instance()->addConductor(std::move(c), en);
c->start(); TRI_ASSERT(pregel::PregelFeature::instance()->conductor(en));
c.release(); pregel::PregelFeature::instance()->conductor(en)->start();
TRI_V8_RETURN(v8::Number::New(isolate, static_cast<double>(en))); TRI_V8_RETURN(v8::Number::New(isolate, static_cast<double>(en)));
TRI_V8_TRY_CATCH_END TRI_V8_TRY_CATCH_END