1
0
Fork 0

moveShard jobs running

This commit is contained in:
Kaveh Vahedipour 2016-12-12 10:29:41 +01:00
parent ad4ba248a5
commit ec18efc80e
2 changed files with 73 additions and 18 deletions

View File

@ -121,6 +121,16 @@ JOB_STATUS CleanOutServer::status() {
bool CleanOutServer::create() { // Only through shrink cluster
// Lookup server
if (_server.find("DBServer") == 0) {
try {
_server = uuidLookup(_snapshot, _server);
} catch (...) {
LOG_TOPIC(ERR, Logger::AGENCY) <<
"MoveShard: To server " << _server << " does not exist";
}
}
LOG_TOPIC(INFO, Logger::AGENCY)
<< "Todo: Clean out server " + _server + " for shrinkage";

View File

@ -39,6 +39,7 @@ MoveShard::MoveShard(Node const& snapshot, Agent* agent,
_shard(shard),
_from(from),
_to(to) {
try {
JOB_STATUS js = status();
@ -58,6 +59,27 @@ MoveShard::MoveShard(Node const& snapshot, Agent* agent,
MoveShard::~MoveShard() {}
bool MoveShard::create() {
// Lookup from server
if (_from.find("DBServer") == 0) {
try {
_from = uuidLookup(_snapshot, _from);
} catch (...) {
LOG_TOPIC(ERR, Logger::AGENCY) <<
"MoveShard: From server " << _from << " does not exist";
}
}
// Lookup to Server
if (_to.find("DBServer") == 0) {
try {
_to = uuidLookup(_snapshot, _to);
} catch (...) {
LOG_TOPIC(ERR, Logger::AGENCY) <<
"MoveShard: To server " << _to << " does not exist";
}
}
LOG_TOPIC(INFO, Logger::AGENCY)
<< "Todo: Move shard " + _shard + " from " + _from + " to " << _to;
@ -75,24 +97,6 @@ bool MoveShard::create() {
_jb->openArray();
_jb->openObject();
// Lookup from server
if (_from.find("DBServer") == 0) {
try {
_from = uuidLookup(_snapshot, _from);
} catch (...) {
LOG_TOPIC(ERR, Logger::AGENCY) <<
"MoveShard: From server " << _from << " does not exist";
}
}
if (_to.find("DBServer") == 0) {
try {
_to = uuidLookup(_snapshot, _to);
} catch (...) {
LOG_TOPIC(ERR, Logger::AGENCY) <<
"MoveShard: To server " << _to << " does not exist";
}
}
if (_from == _to) {
path = _agencyPrefix + failedPrefix + _jobId;
_jb->add("timeFinished", VPackValue(now));
@ -138,6 +142,26 @@ bool MoveShard::create() {
}
bool MoveShard::start() {
// Lookup from server
if (_from.find("DBServer") == 0) {
try {
_from = uuidLookup(_snapshot, _from);
} catch (...) {
LOG_TOPIC(ERR, Logger::AGENCY) <<
"MoveShard: From server " << _from << " does not exist";
}
}
// Lookup to Server
if (_to.find("DBServer") == 0) {
try {
_to = uuidLookup(_snapshot, _to);
} catch (...) {
LOG_TOPIC(ERR, Logger::AGENCY) <<
"MoveShard: To server " << _to << " does not exist";
}
}
// Are we distributeShardsLiking other shard?
// Invoke moveShard there
@ -355,6 +379,27 @@ JOB_STATUS MoveShard::status() {
_to = _snapshot(pos[status] + _jobId + "/toServer").getString();
_shard =
_snapshot(pos[status] + _jobId + "/shards").slice()[0].copyString();
// Lookup from server
if (_from.find("DBServer") == 0) {
try {
_from = uuidLookup(_snapshot, _from);
} catch (...) {
LOG_TOPIC(ERR, Logger::AGENCY) <<
"MoveShard: From server " << _from << " does not exist";
}
}
// Lookup to Server
if (_to.find("DBServer") == 0) {
try {
_to = uuidLookup(_snapshot, _to);
} catch (...) {
LOG_TOPIC(ERR, Logger::AGENCY) <<
"MoveShard: To server " << _to << " does not exist";
}
}
} catch (std::exception const& e) {
std::string err =
std::string("Failed to find job ") + _jobId + " in agency: " + e.what();