mirror of https://gitee.com/bigwinds/arangodb
Delete all shard move jobs when server is healthy again
This commit is contained in:
parent
a854cbf674
commit
63a173f002
|
@ -230,23 +230,55 @@ JOB_STATUS FailedServer::status() {
|
|||
}
|
||||
|
||||
if (status == PENDING) {
|
||||
auto const& serverHealth = _snapshot(healthPrefix + _server + "/Status").getString();
|
||||
|
||||
// mop: ohhh...server is healthy again!
|
||||
bool serverHealthy = serverHealth == Supervision::HEALTH_STATUS_GOOD;
|
||||
|
||||
std::shared_ptr<Builder> deleteTodos;
|
||||
|
||||
Node::Children const todos = _snapshot(toDoPrefix).children();
|
||||
Node::Children const pends = _snapshot(pendingPrefix).children();
|
||||
size_t found = 0;
|
||||
bool hasOpenChildTasks = false;
|
||||
|
||||
for (auto const& subJob : todos) {
|
||||
if (!subJob.first.compare(0, _jobId.size() + 1, _jobId + "-")) {
|
||||
found++;
|
||||
if (serverHealthy) {
|
||||
if (!deleteTodos) {
|
||||
deleteTodos.reset(new Builder());
|
||||
deleteTodos->openArray();
|
||||
deleteTodos->openObject();
|
||||
}
|
||||
deleteTodos->add(_agencyPrefix + toDoPrefix + subJob.first, VPackValue(VPackValueType::Object));
|
||||
deleteTodos->add("op", VPackValue("delete"));
|
||||
deleteTodos->close();
|
||||
} else {
|
||||
hasOpenChildTasks = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (auto const& subJob : pends) {
|
||||
if (!subJob.first.compare(0, _jobId.size() + 1, _jobId + "-")) {
|
||||
found++;
|
||||
hasOpenChildTasks = true;
|
||||
}
|
||||
}
|
||||
|
||||
if (!found) {
|
||||
if (deleteTodos) {
|
||||
LOG_TOPIC(INFO, Logger::AGENCY) << "Server " << _server << " is healthy again. Will try to delete any jobs which have not yet started!";
|
||||
deleteTodos->close();
|
||||
deleteTodos->close();
|
||||
// Transact to agency
|
||||
write_ret_t res = transact(_agent, *deleteTodos);
|
||||
|
||||
if (!res.accepted || res.indices.size() != 1 || !res.indices[0]) {
|
||||
LOG_TOPIC(WARN, Logger::AGENCY)
|
||||
<< "Server was healthy. Tried deleting subjobs but failed :(";
|
||||
return status;
|
||||
}
|
||||
}
|
||||
|
||||
if (!hasOpenChildTasks) {
|
||||
if (finish("DBServers/" + _server)) {
|
||||
return FINISHED;
|
||||
}
|
||||
|
|
|
@ -54,6 +54,7 @@ static std::string const blockedShardsPrefix = "/Supervision/Shards/";
|
|||
static std::string const serverStatePrefix = "/Sync/ServerStates/";
|
||||
static std::string const planVersion = "/Plan/Version";
|
||||
static std::string const plannedServers = "/Plan/DBServers";
|
||||
static std::string const healthPrefix = "/Supervision/Health/";
|
||||
|
||||
inline arangodb::consensus::write_ret_t transact(Agent* _agent,
|
||||
Builder const& transaction,
|
||||
|
|
|
@ -111,11 +111,12 @@ class Supervision : public arangodb::Thread {
|
|||
/// @brief Upgrade agency
|
||||
void upgradeAgency();
|
||||
|
||||
private:
|
||||
static constexpr const char* HEALTH_STATUS_GOOD = "GOOD";
|
||||
static constexpr const char* HEALTH_STATUS_BAD = "BAD";
|
||||
static constexpr const char* HEALTH_STATUS_FAILED = "FAILED";
|
||||
|
||||
private:
|
||||
|
||||
/// @brief Update agency prefix from agency itself
|
||||
bool updateAgencyPrefix(size_t nTries = 10, int intervalSec = 1);
|
||||
|
||||
|
|
Loading…
Reference in New Issue