1
0
Fork 0

moveShard with distributeShardsLike

This commit is contained in:
Kaveh Vahedipour 2016-10-04 11:05:02 +02:00
parent 21beb09d08
commit c681cbb5c8
2 changed files with 278 additions and 142 deletions

View File

@ -241,6 +241,27 @@ bool CleanOutServer::start() {
return false;
}
template<class Iter>
struct iter_pair_range : std::pair<Iter,Iter> {
iter_pair_range(std::pair<Iter,Iter> const& x)
: std::pair<Iter,Iter>(x)
{}
Iter begin() const {return this->first;}
Iter end() const {return this->second;}
};
template<class Iter>
inline iter_pair_range<Iter> as_range(std::pair<Iter,Iter> const& x)
{ return iter_pair_range<Iter>(x); }
//template<class T, class S>
/*std::ostream& operator<< (std::ostream& os, std::multimap<std::string,std::string> const& mm) {
for (const auto& i : mm) {
os << i.first << ": " << i.second;
}
return os;
}*/
bool CleanOutServer::scheduleMoveShards() {
std::vector<std::string> availServers;
@ -271,9 +292,32 @@ bool CleanOutServer::scheduleMoveShards() {
size_t sub = 0;
for (auto const& database : databases) {
// Find shardsLike dependencies
std::vector<std::string> originals;
std::multimap<std::string, std::string> clones;
for (auto const& collptr : database.second->children()) {
Node const& collection = *(collptr.second);
auto const& collection = *(collptr.second);
try {
clones.emplace(collection("distributeShardsLike").slice().copyString(),
collptr.first);
} catch (...) {
originals.push_back(collptr.first);
}
}
LOG(WARN) << originals;
for (const auto& i : clones) {
LOG(WARN) << i.first << ": " << i.second;
}
for (const auto& original : originals) {
auto const& collection = (*(database.second))(original);
for (auto const& shard : collection("shards").children()) {
bool found = false;
VPackArrayIterator dbsit(shard.second->slice());
@ -291,16 +335,17 @@ bool CleanOutServer::scheduleMoveShards() {
// Only destinations, which are not already holding this shard
std::vector<std::string> myServers = availServers;
for (auto const& dbserver : dbsit) {
myServers.erase(std::remove(myServers.begin(), myServers.end(),
dbserver.copyString()),
myServers.erase(
std::remove(
myServers.begin(), myServers.end(), dbserver.copyString()),
myServers.end());
}
// Among those a random destination
std::string toServer;
if (myServers.empty()) {
LOG_TOPIC(ERR, Logger::AGENCY) << "No servers remain as target for "
<< "MoveShard";
LOG_TOPIC(ERR, Logger::AGENCY)
<< "No servers remain as target for MoveShard";
return false;
}
@ -313,8 +358,9 @@ bool CleanOutServer::scheduleMoveShards() {
// Schedule move
MoveShard(_snapshot, _agent, _jobId + "-" + std::to_string(sub++),
_jobId, _agencyPrefix, database.first, collptr.first,
shard.first, _server, toServer);
_jobId, _agencyPrefix, database.first, original, shard.first,
_server, toServer);
}
}
}
@ -325,8 +371,8 @@ bool CleanOutServer::scheduleMoveShards() {
bool CleanOutServer::checkFeasibility() {
// Server exists
if (_snapshot.exists("/Plan/DBServers/" + _server).size() != 3) {
LOG_TOPIC(ERR, Logger::AGENCY) << "No db server with id " << _server
<< " in plan.";
LOG_TOPIC(ERR, Logger::AGENCY)
<< "No db server with id " << _server << " in plan.";
return false;
}
@ -334,8 +380,8 @@ bool CleanOutServer::checkFeasibility() {
for (auto const& srv :
VPackArrayIterator(_snapshot("/Target/CleanedServers").slice())) {
if (srv.copyString() == _server) {
LOG_TOPIC(ERR, Logger::AGENCY) << _server
<< " has been cleaned out already!";
LOG_TOPIC(ERR, Logger::AGENCY)
<< _server << " has been cleaned out already!";
return false;
}
}
@ -344,15 +390,14 @@ bool CleanOutServer::checkFeasibility() {
for (auto const& srv :
VPackObjectIterator(_snapshot("/Target/FailedServers").slice())) {
if (srv.key.copyString() == _server) {
LOG_TOPIC(ERR, Logger::AGENCY) << _server
<< " has failed!";
LOG_TOPIC(ERR, Logger::AGENCY) << _server << " has failed!";
return false;
}
}
if (_snapshot.exists(serverStatePrefix + _server + "/cleaning").size() == 4) {
LOG_TOPIC(ERR, Logger::AGENCY) << _server
<< " has been cleaned out already!";
LOG_TOPIC(ERR, Logger::AGENCY)
<< _server << " has been cleaned out already!";
return false;
}
@ -368,16 +413,17 @@ bool CleanOutServer::checkFeasibility() {
if (_snapshot.exists("/Target/CleanedServers").size() == 2) {
for (auto const& srv :
VPackArrayIterator(_snapshot("/Target/CleanedServers").slice())) {
availServers.erase(std::remove(availServers.begin(), availServers.end(),
srv.copyString()),
availServers.erase(
std::remove(
availServers.begin(), availServers.end(), srv.copyString()),
availServers.end());
}
}
// Minimum 1 DB server must remain
if (availServers.size() == 1) {
LOG_TOPIC(ERR, Logger::AGENCY) << "DB server " << _server
<< " is the last standing db server.";
LOG_TOPIC(ERR, Logger::AGENCY)
<< "DB server " << _server << " is the last standing db server.";
return false;
}

View File

@ -61,6 +61,60 @@ bool MoveShard::create() {
LOG_TOPIC(INFO, Logger::AGENCY)
<< "Todo: Move shard " + _shard + " from " + _from + " to " << _to;
// Are we distributeShardsLiking other shard?
// Invoke moveShard there
auto collection = _snapshot(planColPrefix + _database + "/" + _collection);
auto myshards = _snapshot(
planColPrefix + _database + "/" + _collection + "/shards").children();
auto mpos = std::distance(myshards.begin(),
myshards.find(_shard));
std::string distributeShardsLike, othershard;
try {
distributeShardsLike = collection("distributeShardsLike").getString();
auto othershards = _snapshot(planColPrefix + _database + "/"
+ distributeShardsLike + "/shards").children();
auto opos = othershards.begin();
std::advance(opos, mpos);
othershard = opos->first;
} catch(...) {}
if (!distributeShardsLike.empty()) {
LOG_TOPIC(INFO, Logger::AGENCY)
<< "Distributed like collection " << distributeShardsLike
<< " shard " << othershard;
MoveShard(_snapshot, _agent, _jobId, _creator, _agencyPrefix, _database,
distributeShardsLike, othershard, _from, _to);
return false;
}
// Are we ditributeShardsLiked by others?
// Invoke moveShard here with others
auto collections = _snapshot(planColPrefix + _database).children();
std::vector<std::string> colsLikeMe;
std::vector<std::string> shardsLikeMe;
colsLikeMe.push_back(_collection);
shardsLikeMe.push_back(_shard);
for (auto const& collptr : collections) {
auto const& node = *(collptr.second);
try {
if (node("distributeShardsLike").getString() == _collection) {
auto opos = node("shards").children().begin();
if (!node("shards").children().empty()) {
std::advance(opos, mpos);
colsLikeMe.push_back(collptr.first);
shardsLikeMe.push_back(opos->first);
}
}
} catch (std::exception const& e) {
LOG(WARN) << e.what();
}
}
std::string path, now(timepointToString(std::chrono::system_clock::now()));
// DBservers
@ -89,15 +143,27 @@ bool MoveShard::create() {
_jb->add("creator", VPackValue(_creator));
_jb->add("type", VPackValue("moveShard"));
_jb->add("database", VPackValue(_database));
_jb->add("collection", VPackValue(_collection));
_jb->add("shard", VPackValue(_shard));
_jb->add(VPackValue("collections"));
{
VPackArrayBuilder b(_jb.get());
for (auto const& c : colsLikeMe) {
_jb->add(VPackValue(c));
}
}
_jb->add(VPackValue("shards"));
{
VPackArrayBuilder b(_jb.get());
for (auto const& s : shardsLikeMe) {
_jb->add(VPackValue(s));
}
}
_jb->add("fromServer", VPackValue(_from));
_jb->add("toServer", VPackValue(_to));
_jb->add("isLeader", VPackValue(current[0].copyString() == _from));
_jb->add("jobId", VPackValue(_jobId));
_jb->add("timeCreated", VPackValue(now));
_jb->close();
_jb->close();
_jb->close();
@ -155,10 +221,13 @@ bool MoveShard::start() {
return false;
}
} else {
todo.add(_jb->slice()[0].valueAt(0));
try {
todo.add(_jb->slice()[0].get(_agencyPrefix + toDoPrefix + _jobId));
} catch (std::exception const& e) {
LOG_TOPIC(WARN, Logger::AGENCY) << e.what() << __FILE__ << __LINE__;
}
}
todo.close();
// Enter pending, remove todo, block toserver
pending.openArray();
@ -186,7 +255,16 @@ bool MoveShard::start() {
pending.close();
// --- Plan changes
pending.add(_agencyPrefix + planPath, VPackValue(VPackValueType::Array));
size_t j = 0;
for (auto const& c : VPackArrayIterator(todo.slice()[0].get("collections"))) {
planPath = planColPrefix + _database + "/" + c.copyString() + "/shards/"
+ todo.slice()[0].get("shards")[j++].copyString();
planned = _snapshot(planPath).slice();
pending.add(VPackValue(_agencyPrefix + planPath));
{
VPackArrayBuilder b(&pending);
if (planned[0].copyString() == _from) { // Leader
pending.add(planned[0]);
pending.add(VPackValue(_to));
@ -199,7 +277,8 @@ bool MoveShard::start() {
}
pending.add(VPackValue(_to));
}
pending.close();
}
}
// --- Increment Plan/Version
pending.add(_agencyPrefix + planVersion, VPackValue(VPackValueType::Object));
@ -244,10 +323,10 @@ JOB_STATUS MoveShard::status() {
try {
_database = _snapshot(pos[status] + _jobId + "/database").getString();
_collection = _snapshot(pos[status] + _jobId + "/collection").getString();
_collection = _snapshot(pos[status] + _jobId + "/collections").slice()[0].copyString();
_from = _snapshot(pos[status] + _jobId + "/fromServer").getString();
_to = _snapshot(pos[status] + _jobId + "/toServer").getString();
_shard = _snapshot(pos[status] + _jobId + "/shard").getString();
_shard = _snapshot(pos[status] + _jobId + "/shards").slice()[0].copyString();
} catch (std::exception const& e) {
std::stringstream err;
err << "Failed to find job " << _jobId << " in agency: " << e.what();
@ -258,10 +337,21 @@ JOB_STATUS MoveShard::status() {
}
if (status == PENDING) {
Slice collections = _snapshot(pos[status] + _jobId + "/collections").slice();
Slice shards = _snapshot(pos[status] + _jobId + "/shards").slice();
size_t i = 0;
size_t done = 0;
for (auto const& collslice : VPackArrayIterator(collections)) {
std::string shard = shards[i++].copyString();
std::string collection = collslice.copyString();
std::string planPath =
planColPrefix + _database + "/" + _collection + "/shards/" + _shard;
std::string curPath = curColPrefix + _database + "/" + _collection + "/" +
_shard + "/servers";
planColPrefix + _database + "/" + collection + "/shards/" + shard;
std::string curPath = curColPrefix + _database + "/" + collection + "/" +
shard + "/servers";
Slice current = _snapshot(curPath).slice();
Slice plan = _snapshot(planPath).slice();
@ -275,7 +365,6 @@ JOB_STATUS MoveShard::status() {
currv.push_back(srv.copyString());
}
std::sort(currv.begin(), currv.end());
if (currv == planv) {
if (current[0].copyString() ==
std::string("_") + _from) { // Retired leader
@ -298,8 +387,6 @@ JOB_STATUS MoveShard::status() {
remove.close();
transact(_agent, remove);
return PENDING;
} else {
bool foundFrom = false, foundTo = false;
for (auto const& srv : VPackArrayIterator(current)) {
@ -359,16 +446,19 @@ JOB_STATUS MoveShard::status() {
transact(_agent, remove);
}
return PENDING;
} else if (foundTo && !foundFrom) {
done++;
}
}
}
}
if (done == collections.length()) {
if (finish("Shards/" + _shard)) {
return FINISHED;
}
}
}
}
}
return status;
}