diff --git a/arangod/Agency/Supervision.cpp b/arangod/Agency/Supervision.cpp index 4ce0239f91..4f48397659 100644 --- a/arangod/Agency/Supervision.cpp +++ b/arangod/Agency/Supervision.cpp @@ -65,6 +65,7 @@ Supervision::~Supervision() { } static std::string const syncPrefix = "/Sync/ServerStates/"; +static std::string const supervisionPrefix = "/Supervision"; static std::string const healthPrefix = "/Supervision/Health/"; static std::string const planDBServersPrefix = "/Plan/DBServers"; static std::string const planCoordinatorsPrefix = "/Plan/Coordinators"; @@ -485,6 +486,9 @@ void Supervision::run() { // First wait until somebody has initialized the ArangoDB data, before // that running the supervision does not make sense and will indeed // lead to horrible errors: + + std::string const supervisionNode = _agencyPrefix + supervisionPrefix; + while (!this->isStopping()) { { CONDITION_LOCKER(guard, _cv); @@ -492,9 +496,9 @@ void Supervision::run() { } MUTEX_LOCKER(locker, _lock); - if (_agent->readDB().has(_agencyPrefix)) { + if (_agent->readDB().has(supervisionNode)) { try { - _snapshot = _agent->readDB().get(_agencyPrefix); + _snapshot = _agent->readDB().get(supervisionNode); if (_snapshot.children().size() > 0) { break; } diff --git a/arangod/Scheduler/SocketTask.cpp b/arangod/Scheduler/SocketTask.cpp index f374b81c74..80a2c1234b 100644 --- a/arangod/Scheduler/SocketTask.cpp +++ b/arangod/Scheduler/SocketTask.cpp @@ -126,6 +126,13 @@ void SocketTask::addWriteBuffer(WriteBuffer& buffer) { return; } + if (application_features::ApplicationServer::isStopping()) { + LOG_TOPIC(TRACE, Logger::COMMUNICATION) << "aborting because shutdown is in progress"; + closeStream(); + buffer.release(); + return; + } + { auto self = shared_from_this(); @@ -162,6 +169,10 @@ void SocketTask::writeWriteBuffer() { err.clear(); while (true) { + if (application_features::ApplicationServer::isStopping()) { + break; + } + RequestStatistics::SET_WRITE_START(_writeBuffer._statistics); written = _peer->write(_writeBuffer._buffer, err); @@ -186,6 +197,12 @@ void SocketTask::writeWriteBuffer() { written = 0; } + if (application_features::ApplicationServer::isStopping()) { + LOG_TOPIC(TRACE, Logger::COMMUNICATION) << "aborting because shutdown is in progress"; + closeStreamNoLock(); + return; + } + // write could have blocked which is the only acceptable error if (err && err != ::boost::asio::error::would_block) { LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "write on stream failed with: " @@ -210,6 +227,10 @@ void SocketTask::writeWriteBuffer() { LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "write on stream failed with: " << ec.message(); closeStreamNoLock(); + } else if (application_features::ApplicationServer::isStopping()) { + LOG_TOPIC(TRACE, Logger::COMMUNICATION) << "aborting because shutdown is in progress"; + closeStreamNoLock(); + return; } else { if (completedWriteBuffer()) { _loop._scheduler->post([self, this]() { @@ -240,7 +261,7 @@ bool SocketTask::completedWriteBuffer() { return true; } -// caller must hold the _writeLock +// caller must not hold the _writeLock void SocketTask::closeStream() { MUTEX_LOCKER(locker, _writeLock); closeStreamNoLock(); @@ -408,6 +429,12 @@ void SocketTask::asyncReadSome() { size_t n = 0; while (++n <= MAX_DIRECT_TRIES) { + if (application_features::ApplicationServer::isStopping()) { + LOG_TOPIC(TRACE, Logger::COMMUNICATION) << "aborting because shutdown is in progress"; + closeStream(); + return; + } + if (!reserveMemory()) { LOG_TOPIC(TRACE, Logger::COMMUNICATION) << "failed to reserve memory"; return; @@ -466,10 +493,14 @@ void SocketTask::asyncReadSome() { LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "read on stream failed with: " << ec.message(); closeStream(); + } else if (application_features::ApplicationServer::isStopping()) { + LOG_TOPIC(TRACE, Logger::COMMUNICATION) << "aborting because shutdown is in progress"; + closeStream(); + return; } else { _readBuffer.increaseLength(transferred); - if (processAll()) { + if (processAll() && ! application_features::ApplicationServer::isStopping()) { _loop._scheduler->post([self, this]() { asyncReadSome(); }); } diff --git a/lib/ApplicationFeatures/ApplicationServer.cpp b/lib/ApplicationFeatures/ApplicationServer.cpp index b639902b01..f6d4dadae6 100644 --- a/lib/ApplicationFeatures/ApplicationServer.cpp +++ b/lib/ApplicationFeatures/ApplicationServer.cpp @@ -239,6 +239,8 @@ void ApplicationServer::run(int argc, char* argv[]) { void ApplicationServer::beginShutdown() { LOG_TOPIC(TRACE, Logger::STARTUP) << "ApplicationServer::beginShutdown"; + _stopping = true; + // fowards the begin shutdown signal to all features for (auto it = _orderedFeatures.rbegin(); it != _orderedFeatures.rend(); ++it) { @@ -254,7 +256,6 @@ void ApplicationServer::beginShutdown() { } } - _stopping = true; // TODO: use condition variable for signaling shutdown // to run method } diff --git a/lib/ApplicationFeatures/ApplicationServer.h b/lib/ApplicationFeatures/ApplicationServer.h index 68f60b4310..343831689c 100644 --- a/lib/ApplicationFeatures/ApplicationServer.h +++ b/lib/ApplicationFeatures/ApplicationServer.h @@ -127,9 +127,11 @@ class ApplicationServer { }; static ApplicationServer* server; + static bool isStopping() { return server != nullptr && server->_stopping.load(); } + static bool isPrepared() { return server != nullptr && (server->_state == ServerState::IN_START || server->_state == ServerState::IN_WAIT || diff --git a/scripts/startLocalCluster.ps1 b/scripts/startLocalCluster.ps1 index 33b2d3598e..dc1c4f3ae6 100644 --- a/scripts/startLocalCluster.ps1 +++ b/scripts/startLocalCluster.ps1 @@ -28,6 +28,7 @@ New-Item -Path cluster -Force -ItemType Directory | Out-Null $agentArguments = @( "--agency.activate=true", + "--agency.supervision=true", "--agency.size=$AgentCount", "--javascript.v8-contexts=1", "--server.statistics=false",