mirror of https://gitee.com/bigwinds/arangodb
cleanoutServer Bug Fix (#6537)
* Fixing bug: cleanoutServer will no longer add old leader as follower. * Fixed rollback.
This commit is contained in:
parent
6db05b8b12
commit
5929cafaf9
|
@ -389,6 +389,8 @@ bool CleanOutServer::scheduleMoveShards(std::shared_ptr<Builder>& trx) {
|
|||
serversCopy.end());
|
||||
}
|
||||
|
||||
bool isLeader = (found == 0);
|
||||
|
||||
// Among those a random destination:
|
||||
std::string toServer;
|
||||
if (serversCopy.empty()) {
|
||||
|
@ -403,7 +405,7 @@ bool CleanOutServer::scheduleMoveShards(std::shared_ptr<Builder>& trx) {
|
|||
// Schedule move into trx:
|
||||
MoveShard(_snapshot, _agent, _jobId + "-" + std::to_string(sub++),
|
||||
_jobId, database.first, collptr.first,
|
||||
shard.first, _server, toServer, found == 0)
|
||||
shard.first, _server, toServer, isLeader, false)
|
||||
.create(trx);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -30,6 +30,22 @@
|
|||
using namespace arangodb;
|
||||
using namespace arangodb::consensus;
|
||||
|
||||
MoveShard::MoveShard(Node const& snapshot, AgentInterface* agent,
|
||||
std::string const& jobId, std::string const& creator,
|
||||
std::string const& database,
|
||||
std::string const& collection, std::string const& shard,
|
||||
std::string const& from, std::string const& to,
|
||||
bool isLeader, bool remainsFollower)
|
||||
: Job(NOTFOUND, snapshot, agent, jobId, creator),
|
||||
_database(database),
|
||||
_collection(collection),
|
||||
_shard(shard),
|
||||
_from(id(from)),
|
||||
_to(id(to)),
|
||||
_isLeader(isLeader), // will be initialized properly when information known
|
||||
_remainsFollower(remainsFollower)
|
||||
{ }
|
||||
|
||||
MoveShard::MoveShard(Node const& snapshot, AgentInterface* agent,
|
||||
std::string const& jobId, std::string const& creator,
|
||||
std::string const& database,
|
||||
|
@ -42,7 +58,8 @@ MoveShard::MoveShard(Node const& snapshot, AgentInterface* agent,
|
|||
_shard(shard),
|
||||
_from(id(from)),
|
||||
_to(id(to)),
|
||||
_isLeader(isLeader) // will be initialized properly when information known
|
||||
_isLeader(isLeader), // will be initialized properly when information known
|
||||
_remainsFollower(isLeader)
|
||||
{ }
|
||||
|
||||
MoveShard::MoveShard(Node const& snapshot, AgentInterface* agent,
|
||||
|
@ -57,6 +74,7 @@ MoveShard::MoveShard(Node const& snapshot, AgentInterface* agent,
|
|||
auto tmp_to = _snapshot.hasAsString(path + "toServer");
|
||||
auto tmp_shard = _snapshot.hasAsString(path + "shard");
|
||||
auto tmp_isLeader = _snapshot.hasAsSlice(path + "isLeader");
|
||||
auto tmp_remainsFollower = _snapshot.hasAsSlice(path + "remainsFollower");
|
||||
auto tmp_creator = _snapshot.hasAsString(path + "creator");
|
||||
|
||||
if (tmp_database.second && tmp_collection.second && tmp_from.second && tmp_to.second
|
||||
|
@ -67,6 +85,7 @@ MoveShard::MoveShard(Node const& snapshot, AgentInterface* agent,
|
|||
_to = tmp_to.first;
|
||||
_shard = tmp_shard.first;
|
||||
_isLeader = tmp_isLeader.first.isTrue();
|
||||
_remainsFollower = tmp_remainsFollower.second ? tmp_remainsFollower.first.isTrue() : _isLeader;
|
||||
_creator = tmp_creator.first;
|
||||
} else {
|
||||
std::stringstream err;
|
||||
|
@ -130,6 +149,7 @@ bool MoveShard::create(std::shared_ptr<VPackBuilder> envelope) {
|
|||
_jb->add("fromServer", VPackValue(_from));
|
||||
_jb->add("toServer", VPackValue(_to));
|
||||
_jb->add("isLeader", VPackValue(_isLeader));
|
||||
_jb->add("remainsFollower", VPackValue(_remainsFollower));
|
||||
_jb->add("jobId", VPackValue(_jobId));
|
||||
_jb->add("timeCreated", VPackValue(now));
|
||||
}
|
||||
|
@ -276,6 +296,11 @@ bool MoveShard::start() {
|
|||
return false;
|
||||
}
|
||||
|
||||
if (!_isLeader && _remainsFollower) {
|
||||
finish("", "", false, "remainsFollower is invalid without isLeader");
|
||||
return false;
|
||||
}
|
||||
|
||||
// Compute group to move shards together:
|
||||
std::vector<Job::shard_t> shardsLikeMe
|
||||
= clones(_snapshot, _database, _collection, _shard);
|
||||
|
@ -354,7 +379,7 @@ bool MoveShard::start() {
|
|||
addPreconditionUnchanged(pending, planPath, planned);
|
||||
addPreconditionShardNotBlocked(pending, _shard);
|
||||
addPreconditionServerNotBlocked(pending, _to);
|
||||
addPreconditionServerHealth(pending, _to, "GOOD");
|
||||
addPreconditionServerHealth(pending, _to, "GOOD");
|
||||
addPreconditionUnchanged(pending, failedServersPrefix, failedServers);
|
||||
addPreconditionUnchanged(pending, cleanedPrefix, cleanedServers);
|
||||
} // precondition done
|
||||
|
@ -507,6 +532,7 @@ JOB_STATUS MoveShard::pendingLeader() {
|
|||
trx.add(srv);
|
||||
}
|
||||
}
|
||||
// add the old leader as follower in case of a rollback
|
||||
trx.add(VPackValue(_from));
|
||||
}
|
||||
// Precondition: Plan still as it was
|
||||
|
@ -545,7 +571,19 @@ JOB_STATUS MoveShard::pendingLeader() {
|
|||
{ VPackObjectBuilder trxObject(&trx);
|
||||
VPackObjectBuilder preObject(&pre);
|
||||
doForAllShards(_snapshot, _database, shardsLikeMe,
|
||||
[&pre](Slice plan, Slice current, std::string& planPath) {
|
||||
[&trx, &pre, this](Slice plan, Slice current, std::string& planPath) {
|
||||
if (!_remainsFollower) {
|
||||
// Remove _from from the list of follower
|
||||
trx.add(VPackValue(planPath));
|
||||
{ VPackArrayBuilder guard(&trx);
|
||||
for (auto const& srv : VPackArrayIterator(plan)) {
|
||||
if (!srv.isEqualString(_from)) {
|
||||
trx.add(srv);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Precondition: Plan still as it was
|
||||
pre.add(VPackValue(planPath));
|
||||
{ VPackObjectBuilder guard(&pre);
|
||||
|
|
|
@ -31,7 +31,17 @@ namespace arangodb {
|
|||
namespace consensus {
|
||||
|
||||
struct MoveShard : public Job {
|
||||
|
||||
|
||||
MoveShard(Node const& snapshot, AgentInterface* agent, std::string const& jobId,
|
||||
std::string const& creator,
|
||||
std::string const& database,
|
||||
std::string const& collection,
|
||||
std::string const& shard,
|
||||
std::string const& from,
|
||||
std::string const& to,
|
||||
bool isLeader,
|
||||
bool remainsFollower);
|
||||
|
||||
MoveShard(Node const& snapshot, AgentInterface* agent, std::string const& jobId,
|
||||
std::string const& creator,
|
||||
std::string const& database,
|
||||
|
@ -61,6 +71,7 @@ struct MoveShard : public Job {
|
|||
std::string _from;
|
||||
std::string _to;
|
||||
bool _isLeader;
|
||||
bool _remainsFollower;
|
||||
};
|
||||
}
|
||||
}
|
||||
|
|
|
@ -66,6 +66,7 @@ VPackBuilder createMoveShardJob() {
|
|||
builder.add("fromServer", VPackValue("test"));
|
||||
builder.add("toServer", VPackValue("test2"));
|
||||
builder.add("isLeader", VPackValue(true));
|
||||
builder.add("remainsFollower", VPackValue(false));
|
||||
builder.add("collection", VPackValue("test"));
|
||||
builder.add("shard", VPackValue("s99"));
|
||||
builder.add("creator", VPackValue("unittest"));
|
||||
|
@ -77,7 +78,7 @@ VPackBuilder createMoveShardJob() {
|
|||
|
||||
void checkFailed(JOB_STATUS status, query_t const& q) {
|
||||
INFO("WRITE: " << q->toJson());
|
||||
|
||||
|
||||
REQUIRE(std::string(q->slice().typeName()) == "array" );
|
||||
REQUIRE(q->slice().length() == 1);
|
||||
REQUIRE(std::string(q->slice()[0].typeName()) == "array");
|
||||
|
@ -100,7 +101,7 @@ Node createNodeFromBuilder(VPackBuilder const& builder) {
|
|||
VPackBuilder opBuilder;
|
||||
{ VPackObjectBuilder a(&opBuilder);
|
||||
opBuilder.add("new", builder.slice()); }
|
||||
|
||||
|
||||
Node node("");
|
||||
node.handle<SET>(opBuilder.slice());
|
||||
return node;
|
||||
|
@ -112,11 +113,11 @@ Builder createBuilder(char const* c) {
|
|||
options.checkAttributeUniqueness = true;
|
||||
VPackParser parser(&options);
|
||||
parser.parse(c);
|
||||
|
||||
|
||||
VPackBuilder builder;
|
||||
builder.add(parser.steal()->slice());
|
||||
return builder;
|
||||
|
||||
|
||||
}
|
||||
|
||||
Node createNode(char const* c) {
|
||||
|
@ -156,7 +157,7 @@ VPackBuilder createJob(std::string const& server) {
|
|||
TEST_CASE("CleanOutServer", "[agency][supervision]") {
|
||||
RandomGenerator::initialize(RandomGenerator::RandomType::MERSENNE);
|
||||
auto baseStructure = createRootNode();
|
||||
|
||||
|
||||
write_ret_t fakeWriteResult {true, "", std::vector<bool> {true}, std::vector<index_t> {1}};
|
||||
auto transBuilder = std::make_shared<Builder>();
|
||||
{ VPackArrayBuilder a(transBuilder.get());
|
||||
|
@ -338,7 +339,7 @@ SECTION("cleanout server should fail if the server is already cleaned") {
|
|||
}
|
||||
return builder;
|
||||
};
|
||||
|
||||
|
||||
Mock<AgentInterface> mockAgent;
|
||||
When(Method(mockAgent, write)).Do([&](query_t const& q, bool d) -> write_ret_t {
|
||||
checkFailed(JOB_STATUS::TODO, q);
|
||||
|
@ -385,7 +386,7 @@ SECTION("cleanout server should fail if the server is failed") {
|
|||
}
|
||||
return builder;
|
||||
};
|
||||
|
||||
|
||||
Mock<AgentInterface> mockAgent;
|
||||
When(Method(mockAgent, write)).Do([&](query_t const& q, bool d) -> write_ret_t {
|
||||
checkFailed(JOB_STATUS::TODO, q);
|
||||
|
@ -434,7 +435,7 @@ SECTION("cleanout server should fail if the replicationFactor is too big for any
|
|||
}
|
||||
return builder;
|
||||
};
|
||||
|
||||
|
||||
Mock<AgentInterface> mockAgent;
|
||||
When(Method(mockAgent, write)).Do([&](query_t const& q, bool d) -> write_ret_t {
|
||||
checkFailed(JOB_STATUS::TODO, q);
|
||||
|
@ -484,7 +485,7 @@ SECTION("cleanout server should fail if the replicationFactor is too big for any
|
|||
}
|
||||
return builder;
|
||||
};
|
||||
|
||||
|
||||
Mock<AgentInterface> mockAgent;
|
||||
When(Method(mockAgent, write)).Do([&](query_t const& q, bool d) -> write_ret_t {
|
||||
checkFailed(JOB_STATUS::TODO, q);
|
||||
|
@ -529,11 +530,11 @@ SECTION("a cleanout server job should move into pending when everything is ok")
|
|||
}
|
||||
return builder;
|
||||
};
|
||||
|
||||
|
||||
Mock<AgentInterface> mockAgent;
|
||||
When(Method(mockAgent, write)).Do([&](query_t const& q, bool d) -> write_ret_t {
|
||||
INFO("WRITE: " << q->toJson());
|
||||
|
||||
|
||||
REQUIRE(std::string(q->slice().typeName()) == "array" );
|
||||
REQUIRE(q->slice().length() == 1);
|
||||
REQUIRE(std::string(q->slice()[0].typeName()) == "array");
|
||||
|
@ -609,14 +610,14 @@ SECTION("a cleanout server job should abort after a long timeout") {
|
|||
}
|
||||
return builder;
|
||||
};
|
||||
|
||||
|
||||
int qCount = 0;
|
||||
Mock<AgentInterface> mockAgent;
|
||||
When(Method(mockAgent, write)).AlwaysDo([&](query_t const& q, bool d) -> write_ret_t {
|
||||
if (qCount++ == 0) {
|
||||
// first the moveShard job should be aborted
|
||||
INFO("WRITE FIRST: " << q->toJson());
|
||||
|
||||
|
||||
REQUIRE(std::string(q->slice().typeName()) == "array" );
|
||||
REQUIRE(q->slice().length() == 1);
|
||||
REQUIRE(std::string(q->slice()[0].typeName()) == "array");
|
||||
|
@ -720,7 +721,7 @@ SECTION("once all subjobs were successful then the job should be finished") {
|
|||
Mock<AgentInterface> mockAgent;
|
||||
When(Method(mockAgent, write)).Do([&](query_t const& q, bool d) -> write_ret_t {
|
||||
INFO("WRITE: " << q->toJson());
|
||||
|
||||
|
||||
REQUIRE(std::string(q->slice().typeName()) == "array" );
|
||||
REQUIRE(q->slice().length() == 1);
|
||||
REQUIRE(std::string(q->slice()[0].typeName()) == "array");
|
||||
|
@ -827,7 +828,7 @@ SECTION("when the cleanout server job is aborted all subjobs should be aborted t
|
|||
if (qCount++ == 0) {
|
||||
// first the moveShard job should be aborted
|
||||
INFO("WRITE FIRST: " << q->toJson());
|
||||
|
||||
|
||||
REQUIRE(std::string(q->slice().typeName()) == "array" );
|
||||
REQUIRE(q->slice().length() == 1);
|
||||
REQUIRE(std::string(q->slice()[0].typeName()) == "array");
|
||||
|
|
|
@ -87,7 +87,7 @@ Node createNodeFromBuilder(VPackBuilder const& builder) {
|
|||
VPackBuilder opBuilder;
|
||||
{ VPackObjectBuilder a(&opBuilder);
|
||||
opBuilder.add("new", builder.slice()); }
|
||||
|
||||
|
||||
Node node("");
|
||||
node.handle<SET>(opBuilder.slice());
|
||||
return node;
|
||||
|
@ -100,11 +100,11 @@ Builder createBuilder(char const* c) {
|
|||
options.checkAttributeUniqueness = true;
|
||||
VPackParser parser(&options);
|
||||
parser.parse(c);
|
||||
|
||||
|
||||
VPackBuilder builder;
|
||||
builder.add(parser.steal()->slice());
|
||||
return builder;
|
||||
|
||||
|
||||
}
|
||||
|
||||
Node createNode(char const* c) {
|
||||
|
@ -121,13 +121,13 @@ TEST_CASE("FailedFollower", "[agency][supervision]") {
|
|||
auto transBuilder = std::make_shared<Builder>();
|
||||
{ VPackArrayBuilder a(transBuilder.get());
|
||||
transBuilder->add(VPackValue((uint64_t)1)); }
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
auto baseStructure = createRootNode();
|
||||
write_ret_t fakeWriteResult {true, "", std::vector<bool> {true}, std::vector<index_t> {1}};
|
||||
trans_ret_t fakeTransResult {true, "", 1, 0, transBuilder};
|
||||
|
||||
|
||||
SECTION("creating a job should create a job in todo") {
|
||||
Mock<AgentInterface> mockAgent;
|
||||
|
||||
|
@ -396,7 +396,7 @@ SECTION("if there is no healthy free server when trying to start just wait") {
|
|||
REQUIRE(builder);
|
||||
INFO("Agency: " << builder->toJson());
|
||||
Node agency = createNodeFromBuilder(*builder);
|
||||
|
||||
|
||||
// nothing should happen
|
||||
Mock<AgentInterface> mockAgent;
|
||||
AgentInterface &agent = mockAgent.get();
|
||||
|
@ -429,7 +429,7 @@ SECTION("abort any moveShard job blocking the shard and start") {
|
|||
AgentInterface &moveShardAgent = moveShardMockAgent.get();
|
||||
auto moveShard = MoveShard(
|
||||
baseStructure(PREFIX), &moveShardAgent, "2", "strunz", DATABASE,
|
||||
COLLECTION, SHARD, SHARD_LEADER, FREE_SERVER, true);
|
||||
COLLECTION, SHARD, SHARD_LEADER, FREE_SERVER, true, true);
|
||||
moveShard.create();
|
||||
|
||||
std::string jobId = "1";
|
||||
|
|
|
@ -67,7 +67,7 @@ Node createNodeFromBuilder(Builder const& builder) {
|
|||
Builder opBuilder;
|
||||
{ VPackObjectBuilder a(&opBuilder);
|
||||
opBuilder.add("new", builder.slice()); }
|
||||
|
||||
|
||||
Node node("");
|
||||
node.handle<SET>(opBuilder.slice());
|
||||
return node;
|
||||
|
@ -80,11 +80,11 @@ Builder createBuilder(char const* c) {
|
|||
options.checkAttributeUniqueness = true;
|
||||
VPackParser parser(&options);
|
||||
parser.parse(c);
|
||||
|
||||
|
||||
Builder builder;
|
||||
builder.add(parser.steal()->slice());
|
||||
return builder;
|
||||
|
||||
|
||||
}
|
||||
|
||||
Node createNode(char const* c) {
|
||||
|
@ -111,14 +111,14 @@ TEST_CASE("FailedLeader", "[agency][supervision]") {
|
|||
|
||||
Builder builder;
|
||||
baseStructure.toBuilder(builder);
|
||||
|
||||
|
||||
write_ret_t fakeWriteResult {true, "", std::vector<bool> {true}, std::vector<index_t> {1}};
|
||||
auto transBuilder = std::make_shared<Builder>();
|
||||
{ VPackArrayBuilder a(transBuilder.get());
|
||||
transBuilder->add(VPackValue((uint64_t)1)); }
|
||||
|
||||
trans_ret_t fakeTransResult {true, "", 1, 0, transBuilder};
|
||||
|
||||
|
||||
SECTION("creating a job should create a job in todo") {
|
||||
Mock<AgentInterface> mockAgent;
|
||||
|
||||
|
@ -372,7 +372,7 @@ SECTION("the job must not be started if there is no server that is in sync for e
|
|||
REQUIRE(builder);
|
||||
INFO(builder->toJson());
|
||||
Node agency = createNodeFromBuilder(*builder);
|
||||
|
||||
|
||||
// nothing should happen
|
||||
Mock<AgentInterface> mockAgent;
|
||||
AgentInterface &agent = mockAgent.get();
|
||||
|
@ -466,7 +466,7 @@ SECTION("abort any moveShard job blocking the shard and start") {
|
|||
AgentInterface &moveShardAgent = moveShardMockAgent.get();
|
||||
auto moveShard = MoveShard(
|
||||
baseStructure("arango"), &moveShardAgent, "2", "strunz", DATABASE,
|
||||
COLLECTION, SHARD, SHARD_LEADER, FREE_SERVER, true);
|
||||
COLLECTION, SHARD, SHARD_LEADER, FREE_SERVER, true, true);
|
||||
moveShard.create();
|
||||
|
||||
std::string jobId = "1";
|
||||
|
|
|
@ -96,7 +96,7 @@ Node createRootNode() {
|
|||
options.checkAttributeUniqueness = true;
|
||||
VPackParser parser(&options);
|
||||
parser.parse(agency);
|
||||
|
||||
|
||||
VPackBuilder builder;
|
||||
{ VPackObjectBuilder a(&builder);
|
||||
builder.add("new", parser.steal()->slice()); }
|
||||
|
|
Loading…
Reference in New Issue