1
0
Fork 0

prevent PregelFeature from shutting down while its workers are running (#8627)

This commit is contained in:
Jan 2019-03-29 18:22:37 +01:00 committed by GitHub
parent 7343541d7c
commit 3631d55146
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 135 additions and 67 deletions

View File

@ -7145,7 +7145,7 @@ AqlValue Functions::PregelResult(arangodb::aql::Query* query, transaction::Metho
}
uint64_t execNr = arg1.toInt64(trx);
pregel::PregelFeature* feature = pregel::PregelFeature::instance();
std::shared_ptr<pregel::PregelFeature> feature = pregel::PregelFeature::instance();
if (!feature) {
::registerWarning(query, AFN, TRI_ERROR_FAILED);
return AqlValue(arangodb::velocypack::Slice::emptyArraySlice());

View File

@ -959,7 +959,7 @@ void HeartbeatThread::runCoordinator() {
// calling pregel code
ClusterInfo::instance()->setFailedServers(failedServers);
pregel::PregelFeature* prgl = pregel::PregelFeature::instance();
std::shared_ptr<pregel::PregelFeature> prgl = pregel::PregelFeature::instance();
if (prgl != nullptr && failedServers.size() > 0) {
pregel::RecoveryManager* mngr = prgl->recoveryManager();
if (mngr != nullptr) {

View File

@ -74,13 +74,13 @@ IAlgorithm* AlgoRegistry::createAlgorithm(std::string const& algorithm, VPackSli
}
template <typename V, typename E, typename M>
/*static*/ std::unique_ptr<IWorker> AlgoRegistry::createWorker(TRI_vocbase_t& vocbase,
/*static*/ std::shared_ptr<IWorker> AlgoRegistry::createWorker(TRI_vocbase_t& vocbase,
Algorithm<V, E, M>* algo,
VPackSlice body) {
return std::make_unique<Worker<V, E, M>>(vocbase, algo, body);
return std::make_shared<Worker<V, E, M>>(vocbase, algo, body);
}
/*static*/ std::unique_ptr<IWorker> AlgoRegistry::createWorker(TRI_vocbase_t& vocbase,
/*static*/ std::shared_ptr<IWorker> AlgoRegistry::createWorker(TRI_vocbase_t& vocbase,
VPackSlice body) {
VPackSlice algoSlice = body.get(Utils::algorithmKey);

View File

@ -34,11 +34,11 @@ namespace pregel {
struct AlgoRegistry {
static IAlgorithm* createAlgorithm(std::string const& algorithm, VPackSlice userParams);
static std::unique_ptr<IWorker> createWorker(TRI_vocbase_t& vocbase, VPackSlice body);
static std::shared_ptr<IWorker> createWorker(TRI_vocbase_t& vocbase, VPackSlice body);
private:
template <typename V, typename E, typename M>
static std::unique_ptr<IWorker> createWorker(TRI_vocbase_t& vocbase,
static std::shared_ptr<IWorker> createWorker(TRI_vocbase_t& vocbase,
Algorithm<V, E, M>* algo, VPackSlice body);
};

View File

@ -584,10 +584,13 @@ int Conductor::_initializeWorkers(std::string const& suffix, VPackSlice addition
b.close();
b.close();
// hack for singke serveronly on single server
// hack for single server
if (ServerState::instance()->getRole() == ServerState::ROLE_SINGLE) {
TRI_ASSERT(vertexMap.size() == 1);
PregelFeature* feature = PregelFeature::instance();
std::shared_ptr<PregelFeature> feature = PregelFeature::instance();
if (!feature) {
THROW_ARANGO_EXCEPTION(TRI_ERROR_SHUTTING_DOWN);
}
std::shared_ptr<IWorker> worker = feature->worker(_executionNumber);
if (worker) {
@ -599,8 +602,8 @@ int Conductor::_initializeWorkers(std::string const& suffix, VPackSlice addition
auto created = AlgoRegistry::createWorker(_vocbaseGuard.database(), b.slice());
TRI_ASSERT(created.get() != nullptr);
PregelFeature::instance()->addWorker(std::move(created), _executionNumber);
worker = PregelFeature::instance()->worker(_executionNumber);
feature->addWorker(std::move(created), _executionNumber);
worker = feature->worker(_executionNumber);
TRI_ASSERT(worker);
worker->setupWorker();
@ -630,8 +633,13 @@ int Conductor::_finalizeWorkers() {
if (_masterContext) {
_masterContext->postApplication();
}
std::shared_ptr<PregelFeature> feature = PregelFeature::instance();
if (!feature) {
THROW_ARANGO_EXCEPTION(TRI_ERROR_SHUTTING_DOWN);
}
// stop monitoring shards
RecoveryManager* mngr = PregelFeature::instance()->recoveryManager();
RecoveryManager* mngr = feature->recoveryManager();
if (mngr) {
mngr->stopMonitoring(this);
}

View File

@ -61,21 +61,36 @@ bool authorized(std::pair<std::string, std::shared_ptr<arangodb::pregel::Conduct
bool authorized(std::pair<std::string, std::shared_ptr<arangodb::pregel::IWorker>> const& worker) {
return ::authorized(worker.first);
}
/// @brief custom deleter for the PregelFeature.
/// it does nothing, i.e. doesn't delete it. This is because the ApplicationServer
/// is managing the PregelFeature, but we need a shared_ptr to it here as well. The
/// shared_ptr we are using here just tracks the refcount, but doesn't manage the
/// memory
struct NonDeleter {
void operator()(arangodb::pregel::PregelFeature*) {}
};
std::shared_ptr<arangodb::pregel::PregelFeature> instance;
} // namespace
using namespace arangodb;
using namespace arangodb::pregel;
static PregelFeature* Instance = nullptr;
std::pair<Result, uint64_t> PregelFeature::startExecution(
TRI_vocbase_t& vocbase, std::string algorithm,
std::vector<std::string> const& vertexCollections,
std::vector<std::string> const& edgeCollections, VPackSlice const& params) {
if (nullptr == Instance) {
// make sure no one removes the PregelFeature while in use
std::shared_ptr<PregelFeature> instance = ::instance;
if (!instance) {
return std::make_pair(Result{TRI_ERROR_INTERNAL,
"pregel system not ready"}, 0);
}
ServerState* ss = ServerState::instance();
// check the access rights to collections
@ -179,12 +194,12 @@ std::pair<Result, uint64_t> PregelFeature::startExecution(
}
}
uint64_t en = Instance->createExecutionNumber();
auto c = std::make_unique<pregel::Conductor>(en, vocbase, vertexCollections,
uint64_t en = instance->createExecutionNumber();
auto c = std::make_shared<pregel::Conductor>(en, vocbase, vertexCollections,
edgeColls, algorithm, params);
Instance->addConductor(std::move(c), en);
TRI_ASSERT(Instance->conductor(en));
Instance->conductor(en)->start();
instance->addConductor(std::move(c), en);
TRI_ASSERT(instance->conductor(en));
instance->conductor(en)->start();
return std::make_pair(Result{}, en);
}
@ -206,7 +221,7 @@ PregelFeature::~PregelFeature() {
cleanupAll();
}
PregelFeature* PregelFeature::instance() { return Instance; }
std::shared_ptr<PregelFeature> PregelFeature::instance() { return ::instance; }
size_t PregelFeature::availableParallelism() {
const size_t procNum = TRI_numberProcessors();
@ -214,7 +229,10 @@ size_t PregelFeature::availableParallelism() {
}
void PregelFeature::start() {
Instance = this;
// don't delete the pointer to the feature on shutdown, as the ApplicationServer
// owns it
::instance.reset(this, ::NonDeleter());
if (ServerState::instance()->isAgent()) {
return;
}
@ -228,16 +246,18 @@ void PregelFeature::beginShutdown() {
cleanupAll();
}
void PregelFeature::stop() {
Instance = nullptr;
void PregelFeature::stop() {}
void PregelFeature::unprepare() {
::instance.reset();
}
void PregelFeature::addConductor(std::unique_ptr<Conductor>&& c, uint64_t executionNumber) {
MUTEX_LOCKER(guard, _mutex);
void PregelFeature::addConductor(std::shared_ptr<Conductor>&& c, uint64_t executionNumber) {
std::string user = ExecContext::CURRENT ? ExecContext::CURRENT->user() : "";
MUTEX_LOCKER(guard, _mutex);
_conductors.emplace(executionNumber,
std::make_pair(user, std::shared_ptr<Conductor>(c.get())));
c.release();
std::make_pair(std::move(user), std::move(c)));
}
std::shared_ptr<Conductor> PregelFeature::conductor(uint64_t executionNumber) {
@ -246,12 +266,12 @@ std::shared_ptr<Conductor> PregelFeature::conductor(uint64_t executionNumber) {
return (it != _conductors.end() && ::authorized(it->second)) ? it->second.second : nullptr;
}
void PregelFeature::addWorker(std::unique_ptr<IWorker>&& w, uint64_t executionNumber) {
MUTEX_LOCKER(guard, _mutex);
void PregelFeature::addWorker(std::shared_ptr<IWorker>&& w, uint64_t executionNumber) {
std::string user = ExecContext::CURRENT ? ExecContext::CURRENT->user() : "";
MUTEX_LOCKER(guard, _mutex);
_workers.emplace(executionNumber,
std::make_pair(user, std::shared_ptr<IWorker>(w.get())));
w.release();
std::make_pair(std::move(user), std::move(w)));
}
std::shared_ptr<IWorker> PregelFeature::worker(uint64_t executionNumber) {
@ -262,23 +282,23 @@ std::shared_ptr<IWorker> PregelFeature::worker(uint64_t executionNumber) {
void PregelFeature::cleanupConductor(uint64_t executionNumber) {
MUTEX_LOCKER(guard, _mutex);
auto cit = _conductors.find(executionNumber);
if (cit != _conductors.end()) {
_conductors.erase(executionNumber);
}
}
void PregelFeature::cleanupWorker(uint64_t executionNumber) {
// make sure no one removes the PregelFeature while in use
std::shared_ptr<PregelFeature> instance = ::instance;
if (!instance) {
THROW_ARANGO_EXCEPTION(TRI_ERROR_SHUTTING_DOWN);
}
// unmapping etc might need a few seconds
TRI_ASSERT(SchedulerFeature::SCHEDULER != nullptr);
rest::Scheduler* scheduler = SchedulerFeature::SCHEDULER;
scheduler->queue(RequestPriority::LOW, [this, executionNumber](bool) {
scheduler->queue(RequestPriority::LOW, [this, executionNumber, instance](bool) {
MUTEX_LOCKER(guard, _mutex);
auto wit = _workers.find(executionNumber);
if (wit != _workers.end()) {
_workers.erase(executionNumber);
}
});
}
@ -298,15 +318,21 @@ void PregelFeature::handleConductorRequest(std::string const& path, VPackSlice c
return; // shutdown ongoing
}
// make sure no one removes the PregelFeature while in use
std::shared_ptr<PregelFeature> instance = ::instance;
if (!instance) {
THROW_ARANGO_EXCEPTION(TRI_ERROR_SHUTTING_DOWN);
}
VPackSlice sExecutionNum = body.get(Utils::executionNumberKey);
if (!sExecutionNum.isInteger()) {
LOG_TOPIC(ERR, Logger::PREGEL) << "Invalid execution number";
}
uint64_t exeNum = sExecutionNum.getUInt();
std::shared_ptr<Conductor> co = Instance->conductor(exeNum);
std::shared_ptr<Conductor> co = instance->conductor(exeNum);
if (!co) {
THROW_ARANGO_EXCEPTION_MESSAGE(
TRI_ERROR_INTERNAL, "Conductor not found, invalid execution number");
TRI_ERROR_CURSOR_NOT_FOUND, "Conductor not found, invalid execution number");
}
if (path == Utils::finishedStartupPath) {
@ -326,6 +352,12 @@ void PregelFeature::handleConductorRequest(std::string const& path, VPackSlice c
return; // shutdown ongoing
}
// make sure no one removes the PregelFeature while in use
std::shared_ptr<PregelFeature> instance = ::instance;
if (!instance) {
THROW_ARANGO_EXCEPTION(TRI_ERROR_SHUTTING_DOWN);
}
VPackSlice sExecutionNum = body.get(Utils::executionNumberKey);
if (!sExecutionNum.isInteger()) {
@ -334,7 +366,7 @@ void PregelFeature::handleConductorRequest(std::string const& path, VPackSlice c
}
uint64_t exeNum = sExecutionNum.getUInt();
std::shared_ptr<IWorker> w = Instance->worker(exeNum);
std::shared_ptr<IWorker> w = instance->worker(exeNum);
// create a new worker instance if necessary
if (path == Utils::startExecutionPath) {
@ -344,24 +376,24 @@ void PregelFeature::handleConductorRequest(std::string const& path, VPackSlice c
"Worker with this execution number already exists.");
}
Instance->addWorker(AlgoRegistry::createWorker(vocbase, body), exeNum);
Instance->worker(exeNum)->setupWorker(); // will call conductor
instance->addWorker(AlgoRegistry::createWorker(vocbase, body), exeNum);
instance->worker(exeNum)->setupWorker(); // will call conductor
return;
} else if (path == Utils::startRecoveryPath) {
if (!w) {
Instance->addWorker(AlgoRegistry::createWorker(vocbase, body), exeNum);
instance->addWorker(AlgoRegistry::createWorker(vocbase, body), exeNum);
}
Instance->worker(exeNum)->startRecovery(body);
instance->worker(exeNum)->startRecovery(body);
return;
} else if (!w) {
// any other call should have a working worker instance
LOG_TOPIC(WARN, Logger::PREGEL)
<< "Handling " << path << "worker " << exeNum << " does not exist";
<< "Handling " << path << ", worker " << exeNum << " does not exist";
THROW_ARANGO_EXCEPTION_FORMAT(
TRI_ERROR_INTERNAL,
TRI_ERROR_CURSOR_NOT_FOUND,
"Handling request %s, but worker %lld does not exist.", path.c_str(), exeNum);
}
@ -374,10 +406,8 @@ void PregelFeature::handleConductorRequest(std::string const& path, VPackSlice c
} else if (path == Utils::cancelGSSPath) {
w->cancelGlobalStep(body);
} else if (path == Utils::finalizeExecutionPath) {
w->finalizeExecution(body, [exeNum](bool) {
if (Instance != nullptr) {
Instance->cleanupWorker(exeNum);
}
w->finalizeExecution(body, [exeNum, instance](bool) {
instance->cleanupWorker(exeNum);
});
} else if (path == Utils::continueRecoveryPath) {
w->compensateStep(body);

View File

@ -42,7 +42,7 @@ class PregelFeature final : public application_features::ApplicationFeature {
explicit PregelFeature(application_features::ApplicationServer& server);
~PregelFeature();
static PregelFeature* instance();
static std::shared_ptr<PregelFeature> instance();
static size_t availableParallelism();
static std::pair<Result, uint64_t> startExecution(
@ -53,12 +53,13 @@ class PregelFeature final : public application_features::ApplicationFeature {
void start() override final;
void beginShutdown() override final;
void stop() override final;
void unprepare() override final;
uint64_t createExecutionNumber();
void addConductor(std::unique_ptr<Conductor>&&, uint64_t executionNumber);
void addConductor(std::shared_ptr<Conductor>&&, uint64_t executionNumber);
std::shared_ptr<Conductor> conductor(uint64_t executionNumber);
void addWorker(std::unique_ptr<IWorker>&&, uint64_t executionNumber);
void addWorker(std::shared_ptr<IWorker>&&, uint64_t executionNumber);
std::shared_ptr<IWorker> worker(uint64_t executionNumber);
void cleanupConductor(uint64_t executionNumber);

View File

@ -294,8 +294,31 @@ void ReplicationApplier::doStart(std::function<void()>&& cb,
// reset error
_state._lastError.reset();
{
// steal thread
std::unique_ptr<Thread> t = std::move(_thread);
TRI_ASSERT(_thread == nullptr);
// now _thread is empty
// and release the write lock so when "thread" goes
// out of scope, it actually can call the thread
// deleter without holding the write lock (which would
// deadlock)
writeLocker.unlock();
}
if (application_features::ApplicationServer::isStopping()) {
// dont build a new applier if we shutting down
THROW_ARANGO_EXCEPTION(TRI_ERROR_SHUTTING_DOWN);
}
// reacquire the lock
writeLocker.lockEventual();
// build a new instance in _thread
cb();
TRI_ASSERT(_thread != nullptr);
if (!_thread->start()) {
_thread.reset();
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL,
@ -666,7 +689,7 @@ void ReplicationApplier::doStop(Result const& r, bool joinThread) {
// steal thread
std::unique_ptr<Thread> t = std::move(_thread);
TRI_ASSERT(_thread.get() == nullptr);
TRI_ASSERT(_thread == nullptr);
// now _thread is empty
// and release the write lock so when "thread" goes
// out of scope, it actually can call the thread

View File

@ -175,10 +175,16 @@ void RestControlPregelHandler::getExecutionStatus() {
}
uint64_t executionNumber = arangodb::basics::StringUtils::uint64(suffixes[0]);
auto c = pregel::PregelFeature::instance()->conductor(executionNumber);
std::shared_ptr<pregel::PregelFeature> pf = pregel::PregelFeature::instance();
if (!pf) {
generateError(rest::ResponseCode::SERVER_ERROR, TRI_ERROR_INTERNAL,
"pregel feature not available");
return;
}
auto c = pf->conductor(executionNumber);
if (nullptr == c) {
generateError(rest::ResponseCode::NOT_FOUND, TRI_ERROR_INTERNAL,
generateError(rest::ResponseCode::NOT_FOUND, TRI_ERROR_CURSOR_NOT_FOUND,
"Execution number is invalid");
return;
}
@ -195,7 +201,7 @@ void RestControlPregelHandler::cancelExecution() {
return;
}
auto pf = pregel::PregelFeature::instance();
std::shared_ptr<pregel::PregelFeature> pf = pregel::PregelFeature::instance();
if (nullptr == pf) {
generateError(rest::ResponseCode::SERVER_ERROR, TRI_ERROR_INTERNAL,
"pregel feature not available");
@ -206,7 +212,7 @@ void RestControlPregelHandler::cancelExecution() {
auto c = pf->conductor(executionNumber);
if (nullptr == c) {
generateError(rest::ResponseCode::NOT_FOUND, TRI_ERROR_INTERNAL,
generateError(rest::ResponseCode::NOT_FOUND, TRI_ERROR_CURSOR_NOT_FOUND,
"Execution number is invalid");
return;
}

View File

@ -1631,7 +1631,7 @@ static void JS_PregelStatus(v8::FunctionCallbackInfo<v8::Value> const& args) {
TRI_V8_THROW_EXCEPTION_USAGE("_pregelStatus(<executionNum>]");
}
pregel::PregelFeature* feature = pregel::PregelFeature::instance();
std::shared_ptr<pregel::PregelFeature> feature = pregel::PregelFeature::instance();
if (feature == nullptr) {
TRI_V8_THROW_EXCEPTION_MESSAGE(TRI_ERROR_FAILED, "pregel is not enabled");
}
@ -1659,7 +1659,7 @@ static void JS_PregelCancel(v8::FunctionCallbackInfo<v8::Value> const& args) {
}
pregel::PregelFeature* feature = pregel::PregelFeature::instance();
std::shared_ptr<pregel::PregelFeature> feature = pregel::PregelFeature::instance();
if (feature == nullptr) {
TRI_V8_THROW_EXCEPTION_MESSAGE(TRI_ERROR_FAILED, "pregel is not enabled");
}
@ -1687,7 +1687,7 @@ static void JS_PregelAQLResult(v8::FunctionCallbackInfo<v8::Value> const& args)
TRI_V8_THROW_EXCEPTION_USAGE("_pregelAqlResult(<executionNum>)");
}
pregel::PregelFeature* feature = pregel::PregelFeature::instance();
std::shared_ptr<pregel::PregelFeature> feature = pregel::PregelFeature::instance();
if (feature == nullptr) {
TRI_V8_THROW_EXCEPTION_MESSAGE(TRI_ERROR_FAILED, "pregel is not enabled");
}