1
0
Fork 0

Bug fix 3.4/cluster comm threads start stop (#6939)

* Start ClusterComm threads in `ClusterFeature::start`. Stop ClusterComm threads in `ClusterFeature::stop`.

* Do not free objects in `Scheduler::shutdown`. Let the `unique_ptr` do their job. Stop ClusterComm threads in `ClusterFeature::stop`, but free instance in `ClusterFeature::unprepare`.

* `io_context` may contains lambdas that hold `shared_ptr`s to `Tasks` the required a functional `VocBase` in their destructor.

* Clean up.
This commit is contained in:
Lars Maier 2018-10-19 13:12:51 +02:00 committed by Max Neunhöffer
parent def04788c5
commit d7863b4583
4 changed files with 29 additions and 18 deletions

View File

@ -257,11 +257,7 @@ ClusterComm::ClusterComm(bool ignored)
////////////////////////////////////////////////////////////////////////////////
ClusterComm::~ClusterComm() {
for (ClusterCommThread * thread: _backgroundThreads) {
thread->beginShutdown();
delete thread;
}
stopBackgroundThreads();
cleanupAllQueues();
}
@ -317,6 +313,10 @@ void ClusterComm::initialize() {
////////////////////////////////////////////////////////////////////////////////
void ClusterComm::cleanup() {
if (!_theInstance) {
return ;
}
_theInstance.reset(); // no more operations will be started, but running
// ones have their copy of the shared_ptr
}
@ -341,6 +341,16 @@ void ClusterComm::startBackgroundThread() {
} // for
}
void ClusterComm::stopBackgroundThreads() {
for (ClusterCommThread * thread: _backgroundThreads) {
thread->beginShutdown();
delete thread;
}
_backgroundThreads.clear();
}
////////////////////////////////////////////////////////////////////////////////
/// @brief choose next communicator via round robin
////////////////////////////////////////////////////////////////////////////////
@ -1277,7 +1287,7 @@ void ClusterCommThread::run() {
LOG_TOPIC(DEBUG, Logger::CLUSTER) << "stopped ClusterComm thread";
}
/// @brief logs a connection error (backend unavailable)
void ClusterComm::logConnectionError(bool useErrorLogLevel, ClusterCommResult const* result, double timeout, int /*line*/) {
std::string msg = "cannot create connection to server";
@ -1285,7 +1295,7 @@ void ClusterComm::logConnectionError(bool useErrorLogLevel, ClusterCommResult co
msg += ": '" + result->serverID + '\'';
}
msg += " at endpoint " + result->endpoint + "', timeout: " + std::to_string(timeout);
if (useErrorLogLevel) {
LOG_TOPIC(ERR, Logger::CLUSTER) << msg;
} else {

View File

@ -497,6 +497,7 @@ class ClusterComm {
//////////////////////////////////////////////////////////////////////////////
void startBackgroundThread();
void stopBackgroundThreads();
//////////////////////////////////////////////////////////////////////////////
/// @brief submit an HTTP request to a shard asynchronously.

View File

@ -272,11 +272,6 @@ void ClusterFeature::prepare() {
}
}
if (startClusterComm) {
// initialize ClusterComm library, must call initialize only once
ClusterComm::initialize();
}
// return if cluster is disabled
if (!_enableCluster) {
reportRole(ServerState::instance()->getRole());
@ -361,6 +356,11 @@ void ClusterFeature::prepare() {
}
void ClusterFeature::start() {
if (ServerState::instance()->isAgent() || _enableCluster) {
ClusterComm::initialize();
}
// return if cluster is disabled
if (!_enableCluster) {
startHeartbeatThread(nullptr, 5000, 5, std::string());
@ -440,6 +440,8 @@ void ClusterFeature::stop() {
}
}
}
ClusterComm::instance()->stopBackgroundThreads();
}
@ -448,7 +450,7 @@ void ClusterFeature::unprepare() {
ClusterComm::cleanup();
return;
}
if (_heartbeatThread != nullptr) {
_heartbeatThread->beginShutdown();
}
@ -479,7 +481,6 @@ void ClusterFeature::unprepare() {
// Try only once to unregister because maybe the agencycomm
// is shutting down as well...
// Remove from role list
ServerState::RoleEnum role = ServerState::instance()->getRole();
std::string alk = ServerState::roleToAgencyListKey(role);
@ -501,7 +502,6 @@ void ClusterFeature::unprepare() {
}
AgencyCommManager::MANAGER->stop();
ClusterComm::cleanup();
ClusterInfo::cleanup();
}

View File

@ -191,9 +191,6 @@ Scheduler::~Scheduler() {
_managerGuard.reset();
_managerContext.reset();
_serviceGuard.reset();
_ioContext.reset();
FifoJob* job = nullptr;
for (int i = 0; i < NUMBER_FIFOS; ++i) {
@ -524,6 +521,9 @@ void Scheduler::shutdown() {
std::this_thread::sleep_for(std::chrono::microseconds(20000));
}
// One has to clean up the ioContext here, because there could a lambda
// in its queue, that requires for it finalization some object (for example vocbase)
// that would already be destroyed
_managerContext.reset();
_ioContext.reset();
}