diff --git a/arangod/Agency/Supervision.cpp b/arangod/Agency/Supervision.cpp index 4d0c40b591..7543beb786 100644 --- a/arangod/Agency/Supervision.cpp +++ b/arangod/Agency/Supervision.cpp @@ -120,19 +120,19 @@ struct HealthRecord { obj.add("Host", VPackValue(hostId)); obj.add("SyncStatus", VPackValue(syncStatus)); obj.add("Status", VPackValue(status)); - if (syncTime.empty()) { + if (syncTime.empty()) { obj.add("Timestamp", - VPackValue(timepointToString(std::chrono::system_clock::now()))); + VPackValue(timepointToString(std::chrono::system_clock::now()))); } else { obj.add("SyncTime", VPackValue(syncTime)); obj.add("LastAcked", VPackValue(lastAcked)); } } - + bool statusDiff(HealthRecord const& other) { return (status != other.status || syncStatus != other.syncStatus); } - + friend std::ostream& operator<<(std::ostream& o, HealthRecord const& hr) { VPackBuilder builder; { VPackObjectBuilder b(&builder); @@ -140,7 +140,7 @@ struct HealthRecord { o << builder.toJson(); return o; } - + }; @@ -193,7 +193,7 @@ void Supervision::upgradeOne(Builder& builder) { builder.add("oldEmpty", VPackValue(true)); } } - } + } } } @@ -221,7 +221,7 @@ void Supervision::upgradeHealthRecords(Builder& builder) { // "/arango/Supervision/health" is in old format Builder b; size_t n = 0; - + if (_snapshot.has(healthPrefix)) { HealthRecord hr; { VPackObjectBuilder oo(&b); @@ -234,14 +234,14 @@ void Supervision::upgradeHealthRecords(Builder& builder) { b.add(VPackValue(recPair.first)); { VPackObjectBuilder ooo(&b); hr.toVelocyPack(b); - + } } } } } } - + if (n>0) { { VPackArrayBuilder trx(&builder); { VPackObjectBuilder o(&builder); @@ -249,7 +249,7 @@ void Supervision::upgradeHealthRecords(Builder& builder) { } } } - + } // Upgrade agency, guarded by wakeUp @@ -280,10 +280,10 @@ void handleOnStatusDBServer( Agent* agent, Node const& snapshot, HealthRecord& persisted, HealthRecord& transisted, std::string const& serverID, uint64_t const& jobId, std::shared_ptr& envelope) { - + std::string failedServerPath = failedServersPrefix + "/" + serverID; - - // New condition GOOD: + + // New condition GOOD: if (transisted.status == Supervision::HEALTH_STATUS_GOOD) { if (snapshot.has(failedServerPath)) { Builder del; @@ -307,14 +307,14 @@ void handleOnStatusDBServer( "supervision", serverID).create(envelope); } } - + } void handleOnStatusCoordinator( Agent* agent, Node const& snapshot, HealthRecord& persisted, HealthRecord& transisted, std::string const& serverID) { - + if (transisted.status == Supervision::HEALTH_STATUS_FAILED) { // if the current foxxmaster server failed => reset the value to "" if (snapshot.has(foxxmaster) && snapshot(foxxmaster).getString() == serverID) { @@ -324,7 +324,7 @@ void handleOnStatusCoordinator( create.add(foxxmaster, VPackValue("")); }} singleWriteTransaction(agent, create); } - + } } @@ -333,7 +333,7 @@ void handleOnStatusSingle( HealthRecord& transisted, std::string const& serverID) { // if the current leader server failed => reset the value to "" if (transisted.status == Supervision::HEALTH_STATUS_FAILED) { - + if (snapshot.has(asyncReplLeader) && snapshot(asyncReplLeader).getString() == serverID) { VPackBuilder create; { VPackArrayBuilder tx(&create); @@ -341,10 +341,10 @@ void handleOnStatusSingle( create.add(asyncReplLeader, VPackValue("")); }} singleWriteTransaction(agent, create); } - + } } - + void handleOnStatus( Agent* agent, Node const& snapshot, HealthRecord& persisted, HealthRecord& transisted, std::string const& serverID, @@ -362,7 +362,7 @@ void handleOnStatus( LOG_TOPIC(ERR, Logger::SUPERVISION) << "Unknown server type. No supervision action taken. " << serverID; } - + } // Check all DB servers, guarded above doChecks @@ -397,7 +397,7 @@ std::vector Supervision::check(std::string const& type) { for (auto const& srv : todelete) { { VPackObjectBuilder server(del.get()); del->add(VPackValue(_agencyPrefix + healthPrefix + srv)); - { VPackObjectBuilder oper(del.get()); + { VPackObjectBuilder oper(del.get()); del->add("op", VPackValue("delete")); }}}}} _agent->write(del); } @@ -405,122 +405,131 @@ std::vector Supervision::check(std::string const& type) { // Do actual monitoring for (auto const& machine : machinesPlanned) { std::string lastHeartbeatStatus, lastHeartbeatAcked, lastHeartbeatTime, - lastStatus, serverID(machine.first), - shortName(_snapshot(targetShortID + serverID + "/ShortName").getString()); - - // Endpoint - std::string endpoint; - std::string epPath = serverID + "/endpoint"; - if (serversRegistered.has(epPath)) { - endpoint = serversRegistered(epPath).getString(); - } - std::string hostId; - std::string hoPath = serverID + "/host"; - if (serversRegistered.has(hoPath)) { - hostId = serversRegistered(hoPath).getString(); - } + lastStatus, serverID(machine.first), shortName; - // Health records from persistence, from transience and a new one - HealthRecord transist(shortName, endpoint, hostId); - HealthRecord persist(shortName, endpoint, hostId); - - // Get last health entries from transient and persistent key value stores - if (_transient.has(healthPrefix + serverID)) { - transist = _transient(healthPrefix + serverID); - } - if (_snapshot.has(healthPrefix + serverID)) { - persist = _snapshot(healthPrefix + serverID); - } - - // New health record (start with old add current information from sync) - // Sync.time is copied to Health.syncTime - // Sync.status is copied to Health.syncStatus - std::string syncTime = _transient.has(syncPrefix + serverID) ? - _transient(syncPrefix + serverID + "/time").getString() : - timepointToString(std::chrono::system_clock::time_point()); - std::string syncStatus = _transient.has(syncPrefix + serverID) ? - _transient(syncPrefix + serverID + "/status").getString() : "UNKNOWN"; + // short name arrives asynchronous to machine registering, make sure + // it has arrived before trying to use it + if (LEAF == _snapshot(targetShortID + serverID + "/ShortName").type()) { - // Last change registered in sync (transient != sync) - // Either now or value in transient - auto lastAckedTime = (syncTime != transist.syncTime) ? - std::chrono::system_clock::now() : stringToTimepoint(transist.lastAcked); - transist.lastAcked = timepointToString(lastAckedTime); - transist.syncTime = syncTime; - transist.syncStatus = syncStatus; - - // Calculate elapsed since lastAcked - auto elapsed = std::chrono::duration( - std::chrono::system_clock::now() - lastAckedTime); - - if (elapsed.count() <= _okThreshold) { - transist.status = Supervision::HEALTH_STATUS_GOOD; - } else if (elapsed.count() <= _gracePeriod) { - transist.status = Supervision::HEALTH_STATUS_BAD; - } else { - transist.status = Supervision::HEALTH_STATUS_FAILED; - } + shortName = _snapshot(targetShortID + serverID + "/ShortName").getString(); - // Status changed? - bool changed = transist.statusDiff(persist); + // Endpoint + std::string endpoint; + std::string epPath = serverID + "/endpoint"; + if (serversRegistered.has(epPath)) { + endpoint = serversRegistered(epPath).getString(); + } + std::string hostId; + std::string hoPath = serverID + "/host"; + if (serversRegistered.has(hoPath)) { + hostId = serversRegistered(hoPath).getString(); + } - // Take necessary actions if any - std::shared_ptr envelope; - if (changed) { - handleOnStatus(_agent, _snapshot, persist, transist, serverID, _jobId, - envelope); - } - - persist = transist; // Now copy Status, SyncStatus from transient to persited - - // Transient report - std::shared_ptr tReport = std::make_shared(); - { VPackArrayBuilder transaction(tReport.get()); // Transist Transaction + // Health records from persistence, from transience and a new one + HealthRecord transist(shortName, endpoint, hostId); + HealthRecord persist(shortName, endpoint, hostId); + + // Get last health entries from transient and persistent key value stores + if (_transient.has(healthPrefix + serverID)) { + transist = _transient(healthPrefix + serverID); + } + if (_snapshot.has(healthPrefix + serverID)) { + persist = _snapshot(healthPrefix + serverID); + } + + // New health record (start with old add current information from sync) + // Sync.time is copied to Health.syncTime + // Sync.status is copied to Health.syncStatus + std::string syncTime = _transient.has(syncPrefix + serverID) ? + _transient(syncPrefix + serverID + "/time").getString() : + timepointToString(std::chrono::system_clock::time_point()); + std::string syncStatus = _transient.has(syncPrefix + serverID) ? + _transient(syncPrefix + serverID + "/status").getString() : "UNKNOWN"; + + // Last change registered in sync (transient != sync) + // Either now or value in transient + auto lastAckedTime = (syncTime != transist.syncTime) ? + std::chrono::system_clock::now() : stringToTimepoint(transist.lastAcked); + transist.lastAcked = timepointToString(lastAckedTime); + transist.syncTime = syncTime; + transist.syncStatus = syncStatus; + + // Calculate elapsed since lastAcked + auto elapsed = std::chrono::duration( + std::chrono::system_clock::now() - lastAckedTime); + + if (elapsed.count() <= _okThreshold) { + transist.status = Supervision::HEALTH_STATUS_GOOD; + } else if (elapsed.count() <= _gracePeriod) { + transist.status = Supervision::HEALTH_STATUS_BAD; + } else { + transist.status = Supervision::HEALTH_STATUS_FAILED; + } + + // Status changed? + bool changed = transist.statusDiff(persist); + + // Take necessary actions if any std::shared_ptr envelope; - { VPackObjectBuilder operation(tReport.get()); // Operation - tReport->add(VPackValue(healthPrefix + serverID)); // Supervision/Health - { VPackObjectBuilder oo(tReport.get()); - transist.toVelocyPack(*tReport); }}} // Transaction - - // Persistent report - std::shared_ptr pReport = nullptr; - if (changed) { - pReport = std::make_shared(); - { VPackArrayBuilder transaction(pReport.get()); // Persist Transaction - { VPackObjectBuilder operation(pReport.get()); // Operation - pReport->add(VPackValue(healthPrefix + serverID)); // Supervision/Health - { VPackObjectBuilder oo(pReport.get()); - persist.toVelocyPack(*pReport); } - if (envelope != nullptr) { // Failed server - TRI_ASSERT( - envelope->slice().isArray() && envelope->slice()[0].isObject()); - for (VPackObjectIterator::ObjectPair i : VPackObjectIterator(envelope->slice()[0])) { - pReport->add(i.key.copyString(), i.value); - } - }} // Operation - if (envelope != nullptr) { // Preconditions(Job) - TRI_ASSERT( - envelope->slice().isArray() && envelope->slice()[1].isObject()); - pReport->add(envelope->slice()[1]); - }} // Transaction - } - - if (!this->isStopping()) { - - // Replicate special event and only then transient store if (changed) { - write_ret_t res = singleWriteTransaction(_agent, *pReport); - if (res.accepted && res.indices.front() != 0) { - ++_jobId; // Job was booked + handleOnStatus(_agent, _snapshot, persist, transist, serverID, _jobId, + envelope); + } + + persist = transist; // Now copy Status, SyncStatus from transient to persited + + // Transient report + std::shared_ptr tReport = std::make_shared(); + { VPackArrayBuilder transaction(tReport.get()); // Transist Transaction + std::shared_ptr envelope; + { VPackObjectBuilder operation(tReport.get()); // Operation + tReport->add(VPackValue(healthPrefix + serverID)); // Supervision/Health + { VPackObjectBuilder oo(tReport.get()); + transist.toVelocyPack(*tReport); }}} // Transaction + + // Persistent report + std::shared_ptr pReport = nullptr; + if (changed) { + pReport = std::make_shared(); + { VPackArrayBuilder transaction(pReport.get()); // Persist Transaction + { VPackObjectBuilder operation(pReport.get()); // Operation + pReport->add(VPackValue(healthPrefix + serverID)); // Supervision/Health + { VPackObjectBuilder oo(pReport.get()); + persist.toVelocyPack(*pReport); } + if (envelope != nullptr) { // Failed server + TRI_ASSERT( + envelope->slice().isArray() && envelope->slice()[0].isObject()); + for (VPackObjectIterator::ObjectPair i : VPackObjectIterator(envelope->slice()[0])) { + pReport->add(i.key.copyString(), i.value); + } + }} // Operation + if (envelope != nullptr) { // Preconditions(Job) + TRI_ASSERT( + envelope->slice().isArray() && envelope->slice()[1].isObject()); + pReport->add(envelope->slice()[1]); + }} // Transaction + } + + if (!this->isStopping()) { + + // Replicate special event and only then transient store + if (changed) { + write_ret_t res = singleWriteTransaction(_agent, *pReport); + if (res.accepted && res.indices.front() != 0) { + ++_jobId; // Job was booked + transient(_agent, *tReport); + } + } else { // Nothing special just transient store transient(_agent, *tReport); } - } else { // Nothing special just transient store - transient(_agent, *tReport); } - } - - } - + } else { + LOG_TOPIC(INFO, Logger::SUPERVISION) << + "Short name for << " << serverID << " not yet available. Skipping health check."; + } // else + + } // for + return ret; } @@ -528,22 +537,22 @@ std::vector Supervision::check(std::string const& type) { // Update local agency snapshot, guarded by callers bool Supervision::updateSnapshot() { _lock.assertLockedByCurrentThread(); - + if (_agent == nullptr || this->isStopping()) { return false; } - + _agent->executeLocked([&]() { if (_agent->readDB().has(_agencyPrefix)) { _snapshot = _agent->readDB().get(_agencyPrefix); } if (_agent->transient().has(_agencyPrefix)) { _transient = _agent->transient().get(_agencyPrefix); - } + } }); - + return true; - + } // All checks, guarded by main thread @@ -570,8 +579,8 @@ void Supervision::run() { CONDITION_LOCKER(guard, _cv); _cv.wait(static_cast(1000000 * _frequency)); } - - bool done = false; + + bool done = false; MUTEX_LOCKER(locker, _lock); _agent->executeLocked([&]() { if (_agent->readDB().has(supervisionNode)) { @@ -586,7 +595,7 @@ void Supervision::run() { } } }); - + if (done) { break; } @@ -604,7 +613,7 @@ void Supervision::run() { { MUTEX_LOCKER(locker, _lock); - + // Get bunch of job IDs from agency for future jobs if (_agent->leading() && (_jobId == 0 || _jobId == _jobIdMax)) { getUniqueIds(); // cannot fail but only hang @@ -621,9 +630,17 @@ void Supervision::run() { // Do nothing unless leader for over 10 seconds auto secondsSinceLeader = std::chrono::duration( std::chrono::steady_clock::now() - _agent->leaderSince()).count(); - + // 10 seconds should be plenty of time for all servers to send + // heartbeat status to new leader (heartbeat is once per second) if (secondsSinceLeader > 10.0) { - doChecks(); + try { + doChecks(); + } catch (std::exception const& e) { + LOG_TOPIC(ERR, Logger::SUPERVISION) << e.what() << " " << __FILE__ << " " << __LINE__; + } catch (...) { + LOG_TOPIC(ERR, Logger::SUPERVISION) << + "Supervision::doChecks() generated an uncaught exception."; + } } } @@ -710,11 +727,11 @@ void Supervision::handleShutdown() { } } -// Guarded by caller +// Guarded by caller bool Supervision::handleJobs() { _lock.assertLockedByCurrentThread(); // Do supervision - + shrinkCluster(); enforceReplication(); workJobs(); @@ -735,7 +752,7 @@ void Supervision::workJobs() { JobContext( PENDING, (*pendEnt.second)("jobId").getString(), _snapshot, _agent).run(); } - + } @@ -747,7 +764,7 @@ void Supervision::enforceReplication() { auto const& db = *(db_.second); for (const auto& col_ : db.children()) { // Planned collections auto const& col = *(col_.second); - + size_t replicationFactor; if (col.has("replicationFactor") && col("replicationFactor").isUInt()) { replicationFactor = col("replicationFactor").getUInt(); @@ -762,13 +779,13 @@ void Supervision::enforceReplication() { auto available = Job::availableServers(_snapshot); replicationFactor = available.size(); } - + bool clone = col.has("distributeShardsLike"); if (!clone) { for (auto const& shard_ : col("shards").children()) { // Pl shards auto const& shard = *(shard_.second); - + size_t actualReplicationFactor = shard.slice().length(); if (actualReplicationFactor != replicationFactor) { // Check that there is not yet an addFollower or removeFollower @@ -810,7 +827,7 @@ void Supervision::enforceReplication() { } } } - + } void Supervision::fixPrototypeChain(Builder& migrate) { @@ -827,7 +844,7 @@ void Supervision::fixPrototypeChain(Builder& migrate) { } return (s.empty()) ? col : resolve(db, s); }; - + for (auto const& database : _snapshot(planColPrefix).children()) { for (auto const& collection : database.second->children()) { if (collection.second->has("distributeShardsLike")) { @@ -865,13 +882,13 @@ void Supervision::shrinkCluster() { if (!todo.empty() || !pending.empty()) { // This is low priority return; } - + // Get servers from plan auto availServers = Job::availableServers(_snapshot); size_t targetNumDBServers; std::string const NDBServers ("/Target/NumberOfDBServers"); - + if (_snapshot.has(NDBServers) && _snapshot(NDBServers).isUInt()) { targetNumDBServers = _snapshot(NDBServers).getUInt(); } else { @@ -932,7 +949,7 @@ void Supervision::shrinkCluster() { availServers.size() > targetNumDBServers) { // Sort servers by name std::sort(availServers.begin(), availServers.end()); - + // Schedule last server for cleanout CleanOutServer(_snapshot, _agent, std::to_string(_jobId++), "supervision", availServers.back()).run(); @@ -978,7 +995,7 @@ void Supervision::getUniqueIds() { { VPackArrayBuilder a(builder.get()); builder->add(VPackValue(path)); } } // [[{path:{"op":"increment","step":n}}],[path]] - + auto ret = _agent->transact(builder); if (ret.accepted) { try { @@ -989,10 +1006,10 @@ void Supervision::getUniqueIds() { } catch (std::exception const& e) { LOG_TOPIC(ERR, Logger::SUPERVISION) << "Failed to acquire job IDs from agency: " - << e.what() << __FILE__ << __LINE__; + << e.what() << __FILE__ << " " << __LINE__; } } - + }