mirror of https://gitee.com/bigwinds/arangodb
Fixing agency prefix in Agency/Job.cpp (#5039)
* Fixing some test issues and fixing the agency prefix in Agency/Job.cpp * Making logic consistent in failed- leader / follower job * reverting condition back to == GOOD
This commit is contained in:
parent
42d0f7b435
commit
68442dae5a
|
@ -1,7 +1,7 @@
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
/// DISCLAIMER
|
/// DISCLAIMER
|
||||||
///
|
///
|
||||||
/// Copyright 2016 ArangoDB GmbH, Cologne, Germany
|
/// Copyright 2018 ArangoDB GmbH, Cologne, Germany
|
||||||
///
|
///
|
||||||
/// Licensed under the Apache License, Version 2.0 (the "License");
|
/// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
/// you may not use this file except in compliance with the License.
|
/// you may not use this file except in compliance with the License.
|
||||||
|
@ -82,9 +82,7 @@ bool AddFollower::create(std::shared_ptr<VPackBuilder> envelope) {
|
||||||
_jb->openObject();
|
_jb->openObject();
|
||||||
}
|
}
|
||||||
|
|
||||||
std::string path = toDoPrefix + _jobId;
|
_jb->add(VPackValue(toDoPrefix + _jobId));
|
||||||
|
|
||||||
_jb->add(VPackValue(path));
|
|
||||||
{ VPackObjectBuilder guard(_jb.get());
|
{ VPackObjectBuilder guard(_jb.get());
|
||||||
_jb->add("creator", VPackValue(_creator));
|
_jb->add("creator", VPackValue(_creator));
|
||||||
_jb->add("type", VPackValue("addFollower"));
|
_jb->add("type", VPackValue("addFollower"));
|
||||||
|
@ -166,7 +164,7 @@ bool AddFollower::start() {
|
||||||
// Remove those that are not in state "GOOD":
|
// Remove those that are not in state "GOOD":
|
||||||
auto it = available.begin();
|
auto it = available.begin();
|
||||||
while (it != available.end()) {
|
while (it != available.end()) {
|
||||||
if (checkServerGood(_snapshot, *it) != "GOOD") {
|
if (checkServerHealth(_snapshot, *it) != "GOOD") {
|
||||||
it = available.erase(it);
|
it = available.erase(it);
|
||||||
} else {
|
} else {
|
||||||
++it;
|
++it;
|
||||||
|
@ -258,7 +256,7 @@ bool AddFollower::start() {
|
||||||
addPreconditionUnchanged(trx, planPath, planned);
|
addPreconditionUnchanged(trx, planPath, planned);
|
||||||
addPreconditionShardNotBlocked(trx, _shard);
|
addPreconditionShardNotBlocked(trx, _shard);
|
||||||
for (auto const& srv : chosen) {
|
for (auto const& srv : chosen) {
|
||||||
addPreconditionServerGood(trx, srv);
|
addPreconditionServerHealth(trx, srv, "GOOD");
|
||||||
}
|
}
|
||||||
} // precondition done
|
} // precondition done
|
||||||
} // array for transaction done
|
} // array for transaction done
|
||||||
|
@ -269,7 +267,7 @@ bool AddFollower::start() {
|
||||||
if (res.accepted && res.indices.size() == 1 && res.indices[0]) {
|
if (res.accepted && res.indices.size() == 1 && res.indices[0]) {
|
||||||
_status = FINISHED;
|
_status = FINISHED;
|
||||||
LOG_TOPIC(INFO, Logger::SUPERVISION)
|
LOG_TOPIC(INFO, Logger::SUPERVISION)
|
||||||
<< "Pending: Addfollower(s) to shard " << _shard << " in collection "
|
<< "Finished: Addfollower(s) to shard " << _shard << " in collection "
|
||||||
<< _collection;
|
<< _collection;
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
@ -307,4 +305,3 @@ arangodb::Result AddFollower::abort() {
|
||||||
return result;
|
return result;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -247,8 +247,7 @@ void AgencyFeature::start() {
|
||||||
if (!feature->agencyPrefix().empty()) {
|
if (!feature->agencyPrefix().empty()) {
|
||||||
arangodb::consensus::Supervision::setAgencyPrefix(
|
arangodb::consensus::Supervision::setAgencyPrefix(
|
||||||
std::string("/") + feature->agencyPrefix());
|
std::string("/") + feature->agencyPrefix());
|
||||||
arangodb::consensus::Job::agencyPrefix
|
arangodb::consensus::Job::agencyPrefix = feature->agencyPrefix();;
|
||||||
= std::string("/") + feature->agencyPrefix();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: Port this to new options handling
|
// TODO: Port this to new options handling
|
||||||
|
|
|
@ -42,8 +42,8 @@ struct TRI_vocbase_t;
|
||||||
namespace arangodb {
|
namespace arangodb {
|
||||||
namespace consensus {
|
namespace consensus {
|
||||||
|
|
||||||
class Agent : public arangodb::Thread,
|
class Agent final : public arangodb::Thread,
|
||||||
public AgentInterface {
|
public AgentInterface {
|
||||||
|
|
||||||
public:
|
public:
|
||||||
/// @brief Construct with program options
|
/// @brief Construct with program options
|
||||||
|
|
|
@ -206,7 +206,7 @@ bool CleanOutServer::start() {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check that the server is in state "GOOD":
|
// Check that the server is in state "GOOD":
|
||||||
std::string health = checkServerGood(_snapshot, _server);
|
std::string health = checkServerHealth(_snapshot, _server);
|
||||||
if (health != "GOOD") {
|
if (health != "GOOD") {
|
||||||
LOG_TOPIC(DEBUG, Logger::SUPERVISION) << "server " << _server
|
LOG_TOPIC(DEBUG, Logger::SUPERVISION) << "server " << _server
|
||||||
<< " is currently " << health << ", not starting CleanOutServer job "
|
<< " is currently " << health << ", not starting CleanOutServer job "
|
||||||
|
@ -317,7 +317,7 @@ bool CleanOutServer::start() {
|
||||||
// Preconditions
|
// Preconditions
|
||||||
{ VPackObjectBuilder objectForPrecondition(pending.get());
|
{ VPackObjectBuilder objectForPrecondition(pending.get());
|
||||||
addPreconditionServerNotBlocked(*pending, _server);
|
addPreconditionServerNotBlocked(*pending, _server);
|
||||||
addPreconditionServerGood(*pending, _server);
|
addPreconditionServerHealth(*pending, _server, "GOOD");
|
||||||
addPreconditionUnchanged(*pending, failedServersPrefix, failedServers);
|
addPreconditionUnchanged(*pending, failedServersPrefix, failedServers);
|
||||||
addPreconditionUnchanged(*pending, cleanedPrefix, cleanedServers);
|
addPreconditionUnchanged(*pending, cleanedPrefix, cleanedServers);
|
||||||
}
|
}
|
||||||
|
|
|
@ -211,7 +211,7 @@ bool FailedFollower::start() {
|
||||||
// shard not blocked
|
// shard not blocked
|
||||||
addPreconditionShardNotBlocked(job, _shard);
|
addPreconditionShardNotBlocked(job, _shard);
|
||||||
// toServer in good condition
|
// toServer in good condition
|
||||||
addPreconditionServerGood(job, _to);
|
addPreconditionServerHealth(job, _to, "GOOD");
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -245,8 +245,8 @@ bool FailedFollower::start() {
|
||||||
|
|
||||||
auto slice = result.get(
|
auto slice = result.get(
|
||||||
std::vector<std::string>(
|
std::vector<std::string>(
|
||||||
{agencyPrefix, "Supervision", "Health", _from, "Status"}));
|
{ agencyPrefix, "Supervision", "Health", _from, "Status"}));
|
||||||
if (!slice.isString() || slice.copyString() != "FAILED") {
|
if (slice.isString() && slice.copyString() != "FAILED") {
|
||||||
finish("", _shard, false, "Server " + _from + " no longer failing.");
|
finish("", _shard, false, "Server " + _from + " no longer failing.");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -264,11 +264,9 @@ bool FailedLeader::start() {
|
||||||
// Preconditions -------------------------------------------------------
|
// Preconditions -------------------------------------------------------
|
||||||
{ VPackObjectBuilder preconditions(&pending);
|
{ VPackObjectBuilder preconditions(&pending);
|
||||||
// Failed condition persists
|
// Failed condition persists
|
||||||
pending.add(VPackValue(healthPrefix + _from + "/Status"));
|
addPreconditionServerHealth(pending, _from, "FAILED");
|
||||||
{ VPackObjectBuilder stillExists(&pending);
|
|
||||||
pending.add("old", VPackValue("FAILED")); }
|
|
||||||
// Destination server still in good condition
|
// Destination server still in good condition
|
||||||
addPreconditionServerGood(pending, _to);
|
addPreconditionServerHealth(pending, _to, "GOOD");
|
||||||
// Server list in plan still as before
|
// Server list in plan still as before
|
||||||
addPreconditionUnchanged(pending, planPath, planned);
|
addPreconditionUnchanged(pending, planPath, planned);
|
||||||
// Destination server should not be blocked by another job
|
// Destination server should not be blocked by another job
|
||||||
|
|
|
@ -116,23 +116,16 @@ bool FailedServer::start() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Delete todo
|
// Delete todo
|
||||||
pending.add(VPackValue(toDoPrefix + _jobId));
|
addRemoveJobFromSomewhere(pending, "ToDo", _jobId);
|
||||||
{ VPackObjectBuilder del(&pending);
|
|
||||||
pending.add("op", VPackValue("delete")); }
|
|
||||||
|
|
||||||
addBlockServer(pending, _server, _jobId);
|
addBlockServer(pending, _server, _jobId);
|
||||||
} // <------------ Operations
|
} // <------------ Operations
|
||||||
|
|
||||||
// Preconditions ----------->
|
// Preconditions ----------->
|
||||||
{ VPackObjectBuilder prec(&pending);
|
{ VPackObjectBuilder prec(&pending);
|
||||||
// Check that toServer not blocked
|
// Check that toServer not blocked
|
||||||
pending.add(VPackValue(blockedServersPrefix + _server));
|
addPreconditionServerNotBlocked(pending, _server);
|
||||||
{ VPackObjectBuilder block(&pending);
|
|
||||||
pending.add("oldEmpty", VPackValue(true)); }
|
|
||||||
// Status should still be FAILED
|
// Status should still be FAILED
|
||||||
pending.add(VPackValue(healthPrefix + _server + "/Status"));
|
addPreconditionServerHealth(pending, _server, "FAILED");
|
||||||
{ VPackObjectBuilder old(&pending);
|
|
||||||
pending.add("old", VPackValue("FAILED")); }
|
|
||||||
} // <--------- Preconditions
|
} // <--------- Preconditions
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -237,9 +230,7 @@ bool FailedServer::create(std::shared_ptr<VPackBuilder> envelope) {
|
||||||
//Preconditions
|
//Preconditions
|
||||||
{ VPackObjectBuilder health(_jb.get());
|
{ VPackObjectBuilder health(_jb.get());
|
||||||
// Status should still be BAD
|
// Status should still be BAD
|
||||||
_jb->add(VPackValue(healthPrefix + _server + "/Status"));
|
addPreconditionServerHealth(*_jb, _server, "BAD");
|
||||||
{ VPackObjectBuilder old(_jb.get());
|
|
||||||
_jb->add("old", VPackValue("BAD")); }
|
|
||||||
// Target/FailedServers does not already include _server
|
// Target/FailedServers does not already include _server
|
||||||
_jb->add(VPackValue(failedServersPrefix + "/" + _server));
|
_jb->add(VPackValue(failedServersPrefix + "/" + _server));
|
||||||
{ VPackObjectBuilder old(_jb.get());
|
{ VPackObjectBuilder old(_jb.get());
|
||||||
|
|
|
@ -67,7 +67,7 @@ Job::Job(JOB_STATUS status, Node const& snapshot, AgentInterface* agent,
|
||||||
Job::~Job() {}
|
Job::~Job() {}
|
||||||
|
|
||||||
// this will be initialized in the AgencyFeature
|
// this will be initialized in the AgencyFeature
|
||||||
std::string Job::agencyPrefix = "/arango";
|
std::string Job::agencyPrefix = "arango";
|
||||||
|
|
||||||
bool Job::finish(
|
bool Job::finish(
|
||||||
std::string const& server, std::string const& shard,
|
std::string const& server, std::string const& shard,
|
||||||
|
@ -206,7 +206,7 @@ std::string Job::randomIdleGoodAvailableServer(Node const& snap,
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// @brief Get servers from plan, which are not failed or cleaned out
|
||||||
std::vector<std::string> Job::availableServers(Node const& snapshot) {
|
std::vector<std::string> Job::availableServers(Node const& snapshot) {
|
||||||
|
|
||||||
std::vector<std::string> ret;
|
std::vector<std::string> ret;
|
||||||
|
@ -217,7 +217,7 @@ std::vector<std::string> Job::availableServers(Node const& snapshot) {
|
||||||
ret.push_back(srv.first);
|
ret.push_back(srv.first);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Remove cleaned servers from ist
|
// Remove cleaned servers from list
|
||||||
try {
|
try {
|
||||||
for (auto const& srv :
|
for (auto const& srv :
|
||||||
VPackArrayIterator(snapshot(cleanedPrefix).slice())) {
|
VPackArrayIterator(snapshot(cleanedPrefix).slice())) {
|
||||||
|
@ -500,11 +500,12 @@ void Job::addPreconditionServerNotBlocked(Builder& pre, std::string const& serve
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void Job::addPreconditionServerGood(Builder& pre, std::string const& server) {
|
void Job::addPreconditionServerHealth(Builder& pre, std::string const& server,
|
||||||
pre.add(VPackValue(healthPrefix + server + "/Status"));
|
std::string const& health) {
|
||||||
{ VPackObjectBuilder serverGood(&pre);
|
pre.add(VPackValue(healthPrefix + server + "/Status"));
|
||||||
pre.add("old", VPackValue("GOOD"));
|
{ VPackObjectBuilder serverGood(&pre);
|
||||||
}
|
pre.add("old", VPackValue(health));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void Job::addPreconditionShardNotBlocked(Builder& pre, std::string const& shard) {
|
void Job::addPreconditionShardNotBlocked(Builder& pre, std::string const& shard) {
|
||||||
|
@ -544,13 +545,10 @@ void Job::addReleaseShard(Builder& trx, std::string const& shard) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
std::string Job::checkServerGood(Node const& snapshot,
|
std::string Job::checkServerHealth(Node const& snapshot,
|
||||||
std::string const& server) {
|
std::string const& server) {
|
||||||
if (!snapshot.has(healthPrefix + server + "/Status")) {
|
if (!snapshot.has(healthPrefix + server + "/Status")) {
|
||||||
return "UNCLEAR";
|
return "UNCLEAR";
|
||||||
}
|
}
|
||||||
if (snapshot(healthPrefix + server + "/Status").getString() != "GOOD") {
|
return snapshot(healthPrefix + server + "/Status").getString();
|
||||||
return "UNHEALTHY";
|
|
||||||
}
|
|
||||||
return "GOOD";
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -58,16 +58,6 @@ extern std::string const planVersion;
|
||||||
extern std::string const plannedServers;
|
extern std::string const plannedServers;
|
||||||
extern std::string const healthPrefix;
|
extern std::string const healthPrefix;
|
||||||
|
|
||||||
struct JobResult {
|
|
||||||
JobResult() {}
|
|
||||||
};
|
|
||||||
|
|
||||||
struct JobCallback {
|
|
||||||
JobCallback() {}
|
|
||||||
virtual ~JobCallback(){};
|
|
||||||
virtual bool operator()(JobResult*) = 0;
|
|
||||||
};
|
|
||||||
|
|
||||||
struct Job {
|
struct Job {
|
||||||
|
|
||||||
struct shard_t {
|
struct shard_t {
|
||||||
|
@ -135,6 +125,7 @@ struct Job {
|
||||||
static std::string randomIdleGoodAvailableServer(
|
static std::string randomIdleGoodAvailableServer(
|
||||||
Node const& snap, VPackSlice const& exclude);
|
Node const& snap, VPackSlice const& exclude);
|
||||||
|
|
||||||
|
/// @brief Get servers from plan, which are not failed or cleaned out
|
||||||
static std::vector<std::string> availableServers(
|
static std::vector<std::string> availableServers(
|
||||||
const arangodb::consensus::Node&);
|
const arangodb::consensus::Node&);
|
||||||
|
|
||||||
|
@ -151,6 +142,7 @@ struct Job {
|
||||||
AgentInterface* _agent;
|
AgentInterface* _agent;
|
||||||
std::string _jobId;
|
std::string _jobId;
|
||||||
std::string _creator;
|
std::string _creator;
|
||||||
|
|
||||||
static std::string agencyPrefix; // will be initialized in AgencyFeature
|
static std::string agencyPrefix; // will be initialized in AgencyFeature
|
||||||
|
|
||||||
std::shared_ptr<Builder> _jb;
|
std::shared_ptr<Builder> _jb;
|
||||||
|
@ -177,12 +169,11 @@ struct Job {
|
||||||
static void addReleaseServer(Builder& trx, std::string const& server);
|
static void addReleaseServer(Builder& trx, std::string const& server);
|
||||||
static void addReleaseShard(Builder& trx, std::string const& shard);
|
static void addReleaseShard(Builder& trx, std::string const& shard);
|
||||||
static void addPreconditionServerNotBlocked(Builder& pre, std::string const& server);
|
static void addPreconditionServerNotBlocked(Builder& pre, std::string const& server);
|
||||||
static void addPreconditionServerGood(Builder& pre, std::string const& server);
|
static void addPreconditionServerHealth(Builder& pre, std::string const& server, std::string const& health);
|
||||||
static void addPreconditionShardNotBlocked(Builder& pre, std::string const& shard);
|
static void addPreconditionShardNotBlocked(Builder& pre, std::string const& shard);
|
||||||
static void addPreconditionUnchanged(Builder& pre,
|
static void addPreconditionUnchanged(Builder& pre,
|
||||||
std::string const& key, Slice value);
|
std::string const& key, Slice value);
|
||||||
static std::string checkServerGood(Node const& snapshot,
|
static std::string checkServerHealth(Node const& snapshot, std::string const& server);
|
||||||
std::string const& server);
|
|
||||||
|
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -199,13 +190,13 @@ inline arangodb::consensus::write_ret_t singleWriteTransaction(
|
||||||
VPackArrayBuilder onePair(envelope.get());
|
VPackArrayBuilder onePair(envelope.get());
|
||||||
{ VPackObjectBuilder mutationPart(envelope.get());
|
{ VPackObjectBuilder mutationPart(envelope.get());
|
||||||
for (auto const& pair : VPackObjectIterator(trx[0])) {
|
for (auto const& pair : VPackObjectIterator(trx[0])) {
|
||||||
envelope->add(Job::agencyPrefix + pair.key.copyString(), pair.value);
|
envelope->add("/" + Job::agencyPrefix + pair.key.copyString(), pair.value);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (trx.length() > 1) {
|
if (trx.length() > 1) {
|
||||||
VPackObjectBuilder preconditionPart(envelope.get());
|
VPackObjectBuilder preconditionPart(envelope.get());
|
||||||
for (auto const& pair : VPackObjectIterator(trx[1])) {
|
for (auto const& pair : VPackObjectIterator(trx[1])) {
|
||||||
envelope->add(Job::agencyPrefix + pair.key.copyString(), pair.value);
|
envelope->add("/" + Job::agencyPrefix + pair.key.copyString(), pair.value);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -239,19 +230,19 @@ inline arangodb::consensus::trans_ret_t generalTransaction(
|
||||||
VPackArrayBuilder onePair(envelope.get());
|
VPackArrayBuilder onePair(envelope.get());
|
||||||
{ VPackObjectBuilder mutationPart(envelope.get());
|
{ VPackObjectBuilder mutationPart(envelope.get());
|
||||||
for (auto const& pair : VPackObjectIterator(singleTrans[0])) {
|
for (auto const& pair : VPackObjectIterator(singleTrans[0])) {
|
||||||
envelope->add(Job::agencyPrefix + pair.key.copyString(), pair.value);
|
envelope->add("/" + Job::agencyPrefix + pair.key.copyString(), pair.value);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (singleTrans.length() > 1) {
|
if (singleTrans.length() > 1) {
|
||||||
VPackObjectBuilder preconditionPart(envelope.get());
|
VPackObjectBuilder preconditionPart(envelope.get());
|
||||||
for (auto const& pair : VPackObjectIterator(singleTrans[1])) {
|
for (auto const& pair : VPackObjectIterator(singleTrans[1])) {
|
||||||
envelope->add(Job::agencyPrefix + pair.key.copyString(), pair.value);
|
envelope->add("/" + Job::agencyPrefix + pair.key.copyString(), pair.value);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else if (singleTrans[0].isString()) {
|
} else if (singleTrans[0].isString()) {
|
||||||
VPackArrayBuilder reads(envelope.get());
|
VPackArrayBuilder reads(envelope.get());
|
||||||
for (auto const& path : VPackArrayIterator(singleTrans)) {
|
for (auto const& path : VPackArrayIterator(singleTrans)) {
|
||||||
envelope->add(VPackValue(Job::agencyPrefix + path.copyString()));
|
envelope->add(VPackValue("/" + Job::agencyPrefix + path.copyString()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -282,13 +273,13 @@ inline arangodb::consensus::trans_ret_t transient(AgentInterface* _agent,
|
||||||
VPackArrayBuilder onePair(envelope.get());
|
VPackArrayBuilder onePair(envelope.get());
|
||||||
{ VPackObjectBuilder mutationPart(envelope.get());
|
{ VPackObjectBuilder mutationPart(envelope.get());
|
||||||
for (auto const& pair : VPackObjectIterator(trx[0])) {
|
for (auto const& pair : VPackObjectIterator(trx[0])) {
|
||||||
envelope->add(Job::agencyPrefix + pair.key.copyString(), pair.value);
|
envelope->add("/" + Job::agencyPrefix + pair.key.copyString(), pair.value);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (trx.length() > 1) {
|
if (trx.length() > 1) {
|
||||||
VPackObjectBuilder preconditionPart(envelope.get());
|
VPackObjectBuilder preconditionPart(envelope.get());
|
||||||
for (auto const& pair : VPackObjectIterator(trx[1])) {
|
for (auto const& pair : VPackObjectIterator(trx[1])) {
|
||||||
envelope->add(Job::agencyPrefix + pair.key.copyString(), pair.value);
|
envelope->add("/" + Job::agencyPrefix + pair.key.copyString(), pair.value);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -41,36 +41,22 @@ JobContext::JobContext (JOB_STATUS status, std::string id, Node const& snapshot,
|
||||||
auto const& type = job("type").getString();
|
auto const& type = job("type").getString();
|
||||||
|
|
||||||
if (type == "failedLeader") {
|
if (type == "failedLeader") {
|
||||||
_job =
|
_job = std::make_unique<FailedLeader>(snapshot, agent, status, id);
|
||||||
std::unique_ptr<FailedLeader>(
|
|
||||||
new FailedLeader(snapshot, agent, status, id));
|
|
||||||
} else if (type == "failedFollower") {
|
} else if (type == "failedFollower") {
|
||||||
_job =
|
_job = std::make_unique<FailedFollower>(snapshot, agent, status, id);
|
||||||
std::unique_ptr<FailedFollower>(
|
|
||||||
new FailedFollower(snapshot, agent, status, id));
|
|
||||||
} else if (type == "failedServer") {
|
} else if (type == "failedServer") {
|
||||||
_job =
|
_job = std::make_unique<FailedServer>(snapshot, agent, status, id);
|
||||||
std::unique_ptr<FailedServer>(
|
|
||||||
new FailedServer(snapshot, agent, status, id));
|
|
||||||
} else if (type == "cleanOutServer") {
|
} else if (type == "cleanOutServer") {
|
||||||
_job =
|
_job = std::make_unique<CleanOutServer>(snapshot, agent, status, id);
|
||||||
std::unique_ptr<CleanOutServer>(
|
|
||||||
new CleanOutServer(snapshot, agent, status, id));
|
|
||||||
} else if (type == "moveShard") {
|
} else if (type == "moveShard") {
|
||||||
_job =
|
_job = std::make_unique<MoveShard>(snapshot, agent, status, id);
|
||||||
std::unique_ptr<MoveShard>(
|
|
||||||
new MoveShard(snapshot, agent, status, id));
|
|
||||||
} else if (type == "addFollower") {
|
} else if (type == "addFollower") {
|
||||||
_job =
|
_job = std::make_unique<AddFollower>(snapshot, agent, status, id);
|
||||||
std::unique_ptr<AddFollower>(
|
|
||||||
new AddFollower(snapshot, agent, status, id));
|
|
||||||
} else if (type == "removeFollower") {
|
} else if (type == "removeFollower") {
|
||||||
_job =
|
_job = std::make_unique<RemoveFollower>(snapshot, agent, status, id);
|
||||||
std::unique_ptr<RemoveFollower>(
|
|
||||||
new RemoveFollower(snapshot, agent, status, id));
|
|
||||||
} else {
|
} else {
|
||||||
LOG_TOPIC(ERR, Logger::AGENCY) <<
|
LOG_TOPIC(ERR, Logger::AGENCY) <<
|
||||||
"Failed to run supervision job " << type << " with id " << id;
|
"Failed to run supervision job " << type << " with id " << id;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -187,7 +187,7 @@ bool MoveShard::start() {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check that the toServer is in state "GOOD":
|
// Check that the toServer is in state "GOOD":
|
||||||
std::string health = checkServerGood(_snapshot, _to);
|
std::string health = checkServerHealth(_snapshot, _to);
|
||||||
if (health != "GOOD") {
|
if (health != "GOOD") {
|
||||||
LOG_TOPIC(DEBUG, Logger::SUPERVISION) << "server " << _to
|
LOG_TOPIC(DEBUG, Logger::SUPERVISION) << "server " << _to
|
||||||
<< " is currently " << health << ", not starting MoveShard job "
|
<< " is currently " << health << ", not starting MoveShard job "
|
||||||
|
@ -348,7 +348,7 @@ bool MoveShard::start() {
|
||||||
addPreconditionUnchanged(pending, planPath, planned);
|
addPreconditionUnchanged(pending, planPath, planned);
|
||||||
addPreconditionShardNotBlocked(pending, _shard);
|
addPreconditionShardNotBlocked(pending, _shard);
|
||||||
addPreconditionServerNotBlocked(pending, _to);
|
addPreconditionServerNotBlocked(pending, _to);
|
||||||
addPreconditionServerGood(pending, _to);
|
addPreconditionServerHealth(pending, _to, "GOOD");
|
||||||
addPreconditionUnchanged(pending, failedServersPrefix, failedServers);
|
addPreconditionUnchanged(pending, failedServersPrefix, failedServers);
|
||||||
addPreconditionUnchanged(pending, cleanedPrefix, cleanedServers);
|
addPreconditionUnchanged(pending, cleanedPrefix, cleanedServers);
|
||||||
} // precondition done
|
} // precondition done
|
||||||
|
|
|
@ -169,7 +169,7 @@ bool RemoveFollower::start() {
|
||||||
bool leaderBad = false;
|
bool leaderBad = false;
|
||||||
for (auto const& srv : VPackArrayIterator(planned)) {
|
for (auto const& srv : VPackArrayIterator(planned)) {
|
||||||
std::string serverName = srv.copyString();
|
std::string serverName = srv.copyString();
|
||||||
if (checkServerGood(_snapshot, serverName) == "GOOD") {
|
if (checkServerHealth(_snapshot, serverName) == "GOOD") {
|
||||||
overview.emplace(serverName, 0);
|
overview.emplace(serverName, 0);
|
||||||
} else {
|
} else {
|
||||||
overview.emplace(serverName, -1);
|
overview.emplace(serverName, -1);
|
||||||
|
@ -357,7 +357,7 @@ bool RemoveFollower::start() {
|
||||||
addPreconditionUnchanged(trx, planPath, planned);
|
addPreconditionUnchanged(trx, planPath, planned);
|
||||||
addPreconditionShardNotBlocked(trx, _shard);
|
addPreconditionShardNotBlocked(trx, _shard);
|
||||||
for (auto const& srv : kept) {
|
for (auto const& srv : kept) {
|
||||||
addPreconditionServerGood(trx, srv);
|
addPreconditionServerHealth(trx, srv, "GOOD");
|
||||||
}
|
}
|
||||||
} // precondition done
|
} // precondition done
|
||||||
} // array for transaction done
|
} // array for transaction done
|
||||||
|
|
|
@ -28,8 +28,6 @@
|
||||||
#include "fakeit.hpp"
|
#include "fakeit.hpp"
|
||||||
|
|
||||||
#include "Agency/AddFollower.h"
|
#include "Agency/AddFollower.h"
|
||||||
#include "Agency/FailedLeader.h"
|
|
||||||
#include "Agency/MoveShard.h"
|
|
||||||
#include "Agency/AgentInterface.h"
|
#include "Agency/AgentInterface.h"
|
||||||
#include "Agency/Node.h"
|
#include "Agency/Node.h"
|
||||||
#include "lib/Basics/StringUtils.h"
|
#include "lib/Basics/StringUtils.h"
|
||||||
|
@ -151,7 +149,7 @@ TEST_CASE("AddFollower", "[agency][supervision]") {
|
||||||
When(Method(mockAgent, waitFor)).AlwaysReturn(AgentInterface::raft_commit_t::OK);
|
When(Method(mockAgent, waitFor)).AlwaysReturn(AgentInterface::raft_commit_t::OK);
|
||||||
auto& agent = mockAgent.get();
|
auto& agent = mockAgent.get();
|
||||||
auto addFollower = AddFollower(
|
auto addFollower = AddFollower(
|
||||||
baseStructure, &agent, jobId, "unittest", DATABASE, COLLECTION, SHARD);
|
baseStructure(PREFIX), &agent, jobId, "unittest", DATABASE, COLLECTION, SHARD);
|
||||||
|
|
||||||
addFollower.create();
|
addFollower.create();
|
||||||
|
|
||||||
|
@ -180,7 +178,6 @@ TEST_CASE("AddFollower", "[agency][supervision]") {
|
||||||
if (path == "/arango/Target/ToDo") {
|
if (path == "/arango/Target/ToDo") {
|
||||||
builder->add(jobId, createBuilder(todo).slice());
|
builder->add(jobId, createBuilder(todo).slice());
|
||||||
}
|
}
|
||||||
builder->close();
|
|
||||||
} else {
|
} else {
|
||||||
builder->add(s);
|
builder->add(s);
|
||||||
}
|
}
|
||||||
|
|
|
@ -230,7 +230,7 @@ SECTION("if we want to start and the collection went missing from plan (our trut
|
||||||
When(Method(mockAgent, waitFor)).AlwaysReturn(AgentInterface::raft_commit_t::OK);
|
When(Method(mockAgent, waitFor)).AlwaysReturn(AgentInterface::raft_commit_t::OK);
|
||||||
AgentInterface &agent = mockAgent.get();
|
AgentInterface &agent = mockAgent.get();
|
||||||
auto failedFollower = FailedFollower(
|
auto failedFollower = FailedFollower(
|
||||||
agency("arango"),
|
agency(PREFIX),
|
||||||
&agent,
|
&agent,
|
||||||
JOB_STATUS::TODO,
|
JOB_STATUS::TODO,
|
||||||
jobId
|
jobId
|
||||||
|
@ -290,7 +290,7 @@ SECTION("if we are supposed to fail a distributeShardsLike job we immediately fa
|
||||||
When(Method(mockAgent, waitFor)).AlwaysReturn(AgentInterface::raft_commit_t::OK);
|
When(Method(mockAgent, waitFor)).AlwaysReturn(AgentInterface::raft_commit_t::OK);
|
||||||
AgentInterface &agent = mockAgent.get();
|
AgentInterface &agent = mockAgent.get();
|
||||||
auto failedFollower = FailedFollower(
|
auto failedFollower = FailedFollower(
|
||||||
agency("arango"),
|
agency(PREFIX),
|
||||||
&agent,
|
&agent,
|
||||||
JOB_STATUS::TODO,
|
JOB_STATUS::TODO,
|
||||||
jobId
|
jobId
|
||||||
|
@ -352,12 +352,14 @@ SECTION("if the follower is healthy again we fail the job") {
|
||||||
When(Method(mockAgent, waitFor)).AlwaysReturn();
|
When(Method(mockAgent, waitFor)).AlwaysReturn();
|
||||||
AgentInterface &agent = mockAgent.get();
|
AgentInterface &agent = mockAgent.get();
|
||||||
auto failedFollower = FailedFollower(
|
auto failedFollower = FailedFollower(
|
||||||
agency("arango"),
|
agency(PREFIX),
|
||||||
&agent,
|
&agent,
|
||||||
JOB_STATUS::TODO,
|
JOB_STATUS::TODO,
|
||||||
jobId
|
jobId
|
||||||
);
|
);
|
||||||
failedFollower.start();
|
REQUIRE_FALSE(failedFollower.start());
|
||||||
|
Verify(Method(mockAgent, transact));
|
||||||
|
Verify(Method(mockAgent, write));
|
||||||
}
|
}
|
||||||
|
|
||||||
SECTION("if there is no healthy free server when trying to start just wait") {
|
SECTION("if there is no healthy free server when trying to start just wait") {
|
||||||
|
@ -400,12 +402,12 @@ SECTION("if there is no healthy free server when trying to start just wait") {
|
||||||
Mock<AgentInterface> mockAgent;
|
Mock<AgentInterface> mockAgent;
|
||||||
AgentInterface &agent = mockAgent.get();
|
AgentInterface &agent = mockAgent.get();
|
||||||
auto failedFollower = FailedFollower(
|
auto failedFollower = FailedFollower(
|
||||||
agency("arango"),
|
agency(PREFIX),
|
||||||
&agent,
|
&agent,
|
||||||
JOB_STATUS::TODO,
|
JOB_STATUS::TODO,
|
||||||
jobId
|
jobId
|
||||||
);
|
);
|
||||||
failedFollower.start();
|
REQUIRE_FALSE(failedFollower.start());
|
||||||
}
|
}
|
||||||
|
|
||||||
SECTION("abort any moveShard job blocking the shard and start") {
|
SECTION("abort any moveShard job blocking the shard and start") {
|
||||||
|
@ -427,7 +429,7 @@ SECTION("abort any moveShard job blocking the shard and start") {
|
||||||
When(Method(moveShardMockAgent, waitFor)).Return();
|
When(Method(moveShardMockAgent, waitFor)).Return();
|
||||||
AgentInterface &moveShardAgent = moveShardMockAgent.get();
|
AgentInterface &moveShardAgent = moveShardMockAgent.get();
|
||||||
auto moveShard = MoveShard(
|
auto moveShard = MoveShard(
|
||||||
baseStructure("arango"), &moveShardAgent, "2", "strunz", DATABASE,
|
baseStructure(PREFIX), &moveShardAgent, "2", "strunz", DATABASE,
|
||||||
COLLECTION, SHARD, SHARD_LEADER, FREE_SERVER, true);
|
COLLECTION, SHARD, SHARD_LEADER, FREE_SERVER, true);
|
||||||
moveShard.create();
|
moveShard.create();
|
||||||
|
|
||||||
|
@ -489,12 +491,12 @@ SECTION("abort any moveShard job blocking the shard and start") {
|
||||||
When(Method(mockAgent, waitFor)).AlwaysReturn(AgentInterface::raft_commit_t::OK);
|
When(Method(mockAgent, waitFor)).AlwaysReturn(AgentInterface::raft_commit_t::OK);
|
||||||
AgentInterface &agent = mockAgent.get();
|
AgentInterface &agent = mockAgent.get();
|
||||||
auto failedFollower = FailedFollower(
|
auto failedFollower = FailedFollower(
|
||||||
agency("arango"),
|
agency(PREFIX),
|
||||||
&agent,
|
&agent,
|
||||||
JOB_STATUS::TODO,
|
JOB_STATUS::TODO,
|
||||||
jobId
|
jobId
|
||||||
);
|
);
|
||||||
failedFollower.start();
|
REQUIRE(failedFollower.start());
|
||||||
Verify(Method(mockAgent, transact));
|
Verify(Method(mockAgent, transact));
|
||||||
Verify(Method(mockAgent, write));
|
Verify(Method(mockAgent, write));
|
||||||
}
|
}
|
||||||
|
@ -567,7 +569,7 @@ SECTION("a successfully started job should finish immediately and set everything
|
||||||
When(Method(mockAgent, waitFor)).AlwaysReturn(AgentInterface::raft_commit_t::OK);
|
When(Method(mockAgent, waitFor)).AlwaysReturn(AgentInterface::raft_commit_t::OK);
|
||||||
AgentInterface &agent = mockAgent.get();
|
AgentInterface &agent = mockAgent.get();
|
||||||
auto failedFollower = FailedFollower(
|
auto failedFollower = FailedFollower(
|
||||||
agency("arango"),
|
agency(PREFIX),
|
||||||
&agent,
|
&agent,
|
||||||
JOB_STATUS::TODO,
|
JOB_STATUS::TODO,
|
||||||
jobId
|
jobId
|
||||||
|
@ -669,7 +671,7 @@ SECTION("the job should handle distributeShardsLike") {
|
||||||
When(Method(mockAgent, waitFor)).AlwaysReturn(AgentInterface::raft_commit_t::OK);
|
When(Method(mockAgent, waitFor)).AlwaysReturn(AgentInterface::raft_commit_t::OK);
|
||||||
AgentInterface &agent = mockAgent.get();
|
AgentInterface &agent = mockAgent.get();
|
||||||
auto failedFollower = FailedFollower(
|
auto failedFollower = FailedFollower(
|
||||||
agency("arango"),
|
agency(PREFIX),
|
||||||
&agent,
|
&agent,
|
||||||
JOB_STATUS::TODO,
|
JOB_STATUS::TODO,
|
||||||
jobId
|
jobId
|
||||||
|
@ -736,7 +738,7 @@ SECTION("the job should timeout after a while") {
|
||||||
When(Method(mockAgent, waitFor)).AlwaysReturn(AgentInterface::raft_commit_t::OK);
|
When(Method(mockAgent, waitFor)).AlwaysReturn(AgentInterface::raft_commit_t::OK);
|
||||||
AgentInterface &agent = mockAgent.get();
|
AgentInterface &agent = mockAgent.get();
|
||||||
auto failedFollower = FailedFollower(
|
auto failedFollower = FailedFollower(
|
||||||
agency("arango"),
|
agency(PREFIX),
|
||||||
&agent,
|
&agent,
|
||||||
JOB_STATUS::TODO,
|
JOB_STATUS::TODO,
|
||||||
jobId
|
jobId
|
||||||
|
@ -791,7 +793,7 @@ SECTION("the job should be abortable when it is in todo") {
|
||||||
When(Method(mockAgent, waitFor)).AlwaysReturn(AgentInterface::raft_commit_t::OK);
|
When(Method(mockAgent, waitFor)).AlwaysReturn(AgentInterface::raft_commit_t::OK);
|
||||||
AgentInterface &agent = mockAgent.get();
|
AgentInterface &agent = mockAgent.get();
|
||||||
auto failedFollower = FailedFollower(
|
auto failedFollower = FailedFollower(
|
||||||
agency("arango"),
|
agency(PREFIX),
|
||||||
&agent,
|
&agent,
|
||||||
JOB_STATUS::TODO,
|
JOB_STATUS::TODO,
|
||||||
jobId
|
jobId
|
||||||
|
|
|
@ -27,10 +27,9 @@
|
||||||
#include "catch.hpp"
|
#include "catch.hpp"
|
||||||
#include "fakeit.hpp"
|
#include "fakeit.hpp"
|
||||||
|
|
||||||
#include "Agency/AddFollower.h"
|
|
||||||
#include "Agency/FailedLeader.h"
|
#include "Agency/FailedLeader.h"
|
||||||
#include "Agency/MoveShard.h"
|
|
||||||
#include "Agency/AgentInterface.h"
|
#include "Agency/AgentInterface.h"
|
||||||
|
#include "Agency/MoveShard.h"
|
||||||
#include "Agency/Node.h"
|
#include "Agency/Node.h"
|
||||||
#include "lib/Basics/StringUtils.h"
|
#include "lib/Basics/StringUtils.h"
|
||||||
#include "lib/Random/RandomGenerator.h"
|
#include "lib/Random/RandomGenerator.h"
|
||||||
|
@ -338,7 +337,9 @@ SECTION("if the leader is healthy again we fail the job") {
|
||||||
JOB_STATUS::TODO,
|
JOB_STATUS::TODO,
|
||||||
jobId
|
jobId
|
||||||
);
|
);
|
||||||
failedLeader.start();
|
REQUIRE_FALSE(failedLeader.start());
|
||||||
|
Verify(Method(mockAgent, transact));
|
||||||
|
Verify(Method(mockAgent, write)).Exactly(Once);
|
||||||
}
|
}
|
||||||
|
|
||||||
SECTION("the job must not be started if there is no server that is in sync for every shard") {
|
SECTION("the job must not be started if there is no server that is in sync for every shard") {
|
||||||
|
@ -382,7 +383,7 @@ SECTION("the job must not be started if there is no server that is in sync for e
|
||||||
JOB_STATUS::TODO,
|
JOB_STATUS::TODO,
|
||||||
jobId
|
jobId
|
||||||
);
|
);
|
||||||
failedLeader.start();
|
REQUIRE_FALSE(failedLeader.start());
|
||||||
}
|
}
|
||||||
|
|
||||||
SECTION("the job must not be started if there if one of the linked shards (distributeShardsLike) is not in sync") {
|
SECTION("the job must not be started if there if one of the linked shards (distributeShardsLike) is not in sync") {
|
||||||
|
|
|
@ -65,23 +65,6 @@ const char *agency =
|
||||||
#include "FailedServerTest.json"
|
#include "FailedServerTest.json"
|
||||||
;
|
;
|
||||||
|
|
||||||
VPackBuilder createJob() {
|
|
||||||
VPackBuilder builder;
|
|
||||||
{
|
|
||||||
VPackObjectBuilder a(&builder);
|
|
||||||
builder.add("creator", VPackValue("1"));
|
|
||||||
builder.add("type", VPackValue("failedServer"));
|
|
||||||
builder.add("database", VPackValue("database"));
|
|
||||||
builder.add("collection", VPackValue("collection"));
|
|
||||||
builder.add("shard", VPackValue("shard"));
|
|
||||||
builder.add("fromServer", VPackValue("follower1"));
|
|
||||||
builder.add("jobId", VPackValue("1"));
|
|
||||||
builder.add("timeCreated",
|
|
||||||
VPackValue(timepointToString(std::chrono::system_clock::now())));
|
|
||||||
}
|
|
||||||
return builder;
|
|
||||||
}
|
|
||||||
|
|
||||||
Node createNodeFromBuilder(VPackBuilder const& builder) {
|
Node createNodeFromBuilder(VPackBuilder const& builder) {
|
||||||
|
|
||||||
VPackBuilder opBuilder;
|
VPackBuilder opBuilder;
|
||||||
|
@ -361,8 +344,7 @@ TEST_CASE("FailedServer", "[agency][supervision]") {
|
||||||
|
|
||||||
} // SECTION
|
} // SECTION
|
||||||
|
|
||||||
|
SECTION("The state is still 'FAILED' and 'Target/FailedServers' is PART 2") {
|
||||||
SECTION("The state is still 'FAILED' and 'Target/FailedServers' is PART 2") {
|
|
||||||
|
|
||||||
TestStructureType createTestStructure = [&](
|
TestStructureType createTestStructure = [&](
|
||||||
Slice const& s, std::string const& path) {
|
Slice const& s, std::string const& path) {
|
||||||
|
|
Loading…
Reference in New Issue