1
0
Fork 0

fixed resilience

This commit is contained in:
Kaveh Vahedipour 2016-12-09 16:35:32 +01:00
parent eddecc0a4c
commit 2b9c018817
9 changed files with 97 additions and 25 deletions

View File

@ -100,9 +100,9 @@ bool AddFollower::create() {
TRI_ASSERT(current[0].isString());
#endif
size_t sub = 0;
auto const& myClones = clones(_snapshot, _database, _collection, _shard);
if (!myClones.empty()) {
size_t sub = 0;
for (auto const& clone : myClones) {
AddFollower(_snapshot, _agent, _jobId + "-" + std::to_string(sub++),
_jobId, _agencyPrefix, _database, clone.collection,

View File

@ -65,13 +65,9 @@ bool FailedFollower::create() {
LOG_TOPIC(INFO, Logger::AGENCY)
<< "Todo: failed Follower for " + _shard + " from " + _from + " to " + _to;
std::string path = _agencyPrefix + toDoPrefix + _jobId;
std::string planPath =
planColPrefix + _database + "/" + _collection + "/shards";
size_t sub = 0;
auto const& myClones = clones(_snapshot, _database, _collection, _shard);
if (!myClones.empty()) {
size_t sub = 0;
for (auto const& clone : myClones) {
FailedFollower(_snapshot, _agent, _jobId + "-" + std::to_string(sub++),
_jobId, _agencyPrefix, _database, clone.collection,
@ -84,6 +80,8 @@ bool FailedFollower::create() {
_jb->openObject();
// Todo entry
std::string path = _agencyPrefix + toDoPrefix + _jobId;
_jb->add(path, VPackValue(VPackValueType::Object));
_jb->add("creator", VPackValue(_creator));
_jb->add("type", VPackValue("failedFollower"));

View File

@ -446,7 +446,13 @@ bool Inception::estimateRAFTInterval() {
double precision = 1.0e-2;
mn = precision *
std::ceil((1./precision)*(.25 + precision*(maxmean+3*maxstdev)));
std::ceil((1. / precision)*(.3 + precision * (maxmean + 3.*maxstdev)));
if (config.waitForSync()) {
mn *= 4.;
}
if (mn > 2.0) {
mn = 2.0;
}
mx = 5. * mn;
LOG_TOPIC(INFO, Logger::AGENCY)

View File

@ -212,3 +212,14 @@ std::vector<Job::shard_t> Job::clones(
return ret;
}
std::string Job::uuidLookup (Node const& snapshot, std::string const& shortID) {
for (auto const& uuid : snapshot(mapUniqueToShortID).children()) {
if ((*uuid.second)("ShortName").getString() == shortID) {
return uuid.first;
}
}
return std::string();
}

View File

@ -40,7 +40,7 @@ namespace consensus {
enum JOB_STATUS { TODO, PENDING, FINISHED, FAILED, NOTFOUND };
const std::vector<std::string> pos({"/Target/ToDo/", "/Target/Pending/",
"/Target/Finished/", "/Target/Failed/"});
static std::string const mapUniqueToShortID = "/Target/MapUniqueToShortID/";
static std::string const pendingPrefix = "/Target/Pending/";
static std::string const failedPrefix = "/Target/Failed/";
static std::string const finishedPrefix = "/Target/Finished/";
@ -123,6 +123,8 @@ struct Job {
Node const& snap, std::string const& db, std::string const& col,
std::string const& shrd);
static std::string uuidLookup(Node const& snap, std::string const& shortID);
Node const _snapshot;
Agent* _agent;
std::string _jobId;

View File

@ -75,6 +75,24 @@ bool MoveShard::create() {
_jb->openArray();
_jb->openObject();
// Lookup from server
if (_from.find("DBServer") == 0) {
try {
_from = uuidLookup(_snapshot, _from);
} catch (...) {
LOG_TOPIC(ERR, Logger::AGENCY) <<
"MoveShard: From server " << _from << " does not exist";
}
}
if (_to.find("DBServer") == 0) {
try {
_to = uuidLookup(_snapshot, _to);
} catch (...) {
LOG_TOPIC(ERR, Logger::AGENCY) <<
"MoveShard: To server " << _to << " does not exist";
}
}
if (_from == _to) {
path = _agencyPrefix + failedPrefix + _jobId;
_jb->add("timeFinished", VPackValue(now));

View File

@ -1990,6 +1990,7 @@ int ClusterInfo::dropIndexCoordinator(std::string const& databaseName,
////////////////////////////////////////////////////////////////////////////////
static std::string const prefixServers = "Current/ServersRegistered";
static std::string const mapUniqueToShortId = "Target/MapUniqueToShortID";
void ClusterInfo::loadServers() {
++_serversProt.wantedVersion; // Indicate that after *NOW* somebody has to
@ -2002,23 +2003,43 @@ void ClusterInfo::loadServers() {
return;
}
// Now contact the agency:
AgencyCommResult result = _agency.getValues(prefixServers);
AgencyCommResult result = _agency.sendTransactionWithFailover(
AgencyReadTransaction({AgencyCommManager::path(prefixServers),
AgencyCommManager::path(mapUniqueToShortId)}));
if (result.successful()) {
velocypack::Slice serversRegistered =
result.slice()[0].get(std::vector<std::string>(
{AgencyCommManager::path(), "Current", "ServersRegistered"}));
result.slice()[0].get(
std::vector<std::string>(
{AgencyCommManager::path(), "Current", "ServersRegistered"}));
if (serversRegistered.isObject()) {
velocypack::Slice serversAliases =
result.slice()[0].get(
std::vector<std::string>(
{AgencyCommManager::path(), "Target", "MapUniqueToShortID"}));
if (serversRegistered.isObject()) {
decltype(_servers) newServers;
decltype(_serverAliases) newAliases;
size_t i = 0;
for (auto const& res : VPackObjectIterator(serversRegistered)) {
velocypack::Slice slice = res.value;
if (slice.isObject() && slice.hasKey("endpoint")) {
std::string server =
arangodb::basics::VelocyPackHelper::getStringValue(
slice, "endpoint", "");
velocypack::Slice aslice;
try {
aslice = serversAliases.valueAt(i++);
std::string alias =
arangodb::basics::VelocyPackHelper::getStringValue(
slice, "endpoint", "");
aslice, "ShortName", "");
newAliases.emplace(std::make_pair(alias, res.key.copyString()));
} catch (...) {}
newServers.emplace(std::make_pair(res.key.copyString(), server));
}
}
@ -2027,6 +2048,7 @@ void ClusterInfo::loadServers() {
{
WRITE_LOCKER(writeLocker, _serversProt.lock);
_servers.swap(newServers);
_serverAliases.swap(newAliases);
_serversProt.doneVersion = storedVersion;
_serversProt.isValid = true; // will never be reset to false
}
@ -2056,17 +2078,29 @@ std::string ClusterInfo::getServerEndpoint(ServerID const& serverID) {
tries++;
}
std::string serverID_ = serverID;
while (true) {
{
READ_LOCKER(readLocker, _serversProt.lock);
// _serversAliases is a map-type <Alias, ServerID>
auto ita = _serverAliases.find(serverID_);
if (ita != _serverAliases.end()) {
serverID_ = (*ita).second;
}
// _servers is a map-type <ServerId, std::string>
auto it = _servers.find(serverID);
auto it = _servers.find(serverID_);
if (it != _servers.end()) {
return (*it).second;
}
}
if (++tries >= 2) {
break;
}

View File

@ -561,6 +561,8 @@ class ClusterInfo {
// The servers, first all, we only need Current here:
std::unordered_map<ServerID, std::string>
_servers; // from Current/ServersRegistered
std::unordered_map<ServerID, std::string>
_serverAliases; // from Current/ServersRegistered
ProtectionData _serversProt;
// The DBServers, also from Current:

View File

@ -95,10 +95,11 @@ function MovingShardsSuite () {
////////////////////////////////////////////////////////////////////////////////
function getCleanedOutServers() {
var coordEndpoint = global.ArangoClusterInfo.getServerEndpoint("Coordinator001");
var coordEndpoint = global.ArangoClusterInfo.getServerEndpoint("Coordinator0001");
var request = require("@arangodb/request");
var endpointToURL = require("@arangodb/cluster").endpointToURL;
var url = endpointToURL(coordEndpoint);
var res = request({ method: "GET",
url: url + "/_admin/cluster/numberOfServers"});
var body = res.body;
@ -178,7 +179,7 @@ function MovingShardsSuite () {
////////////////////////////////////////////////////////////////////////////////
function cleanOutServer(id) {
var coordEndpoint = global.ArangoClusterInfo.getServerEndpoint("Coordinator001");
var coordEndpoint = global.ArangoClusterInfo.getServerEndpoint("Coordinator0001");
var request = require("@arangodb/request");
var endpointToURL = require("@arangodb/cluster").endpointToURL;
var url = endpointToURL(coordEndpoint);
@ -193,7 +194,7 @@ function MovingShardsSuite () {
////////////////////////////////////////////////////////////////////////////////
function shrinkCluster(toNum) {
var coordEndpoint = global.ArangoClusterInfo.getServerEndpoint("Coordinator001");
var coordEndpoint = global.ArangoClusterInfo.getServerEndpoint("Coordinator0001");
var request = require("@arangodb/request");
var endpointToURL = require("@arangodb/cluster").endpointToURL;
var url = endpointToURL(coordEndpoint);
@ -208,7 +209,7 @@ function MovingShardsSuite () {
////////////////////////////////////////////////////////////////////////////////
function resetCleanedOutServers() {
var coordEndpoint = global.ArangoClusterInfo.getServerEndpoint("Coordinator001");
var coordEndpoint = global.ArangoClusterInfo.getServerEndpoint("Coordinator0001");
var request = require("@arangodb/request");
var endpointToURL = require("@arangodb/cluster").endpointToURL;
var url = endpointToURL(coordEndpoint);
@ -232,7 +233,7 @@ function MovingShardsSuite () {
////////////////////////////////////////////////////////////////////////////////
function moveShard(database, collection, shard, fromServer, toServer) {
var coordEndpoint = global.ArangoClusterInfo.getServerEndpoint("Coordinator001");
var coordEndpoint = global.ArangoClusterInfo.getServerEndpoint("Coordinator0001");
var request = require("@arangodb/request");
var endpointToURL = require("@arangodb/cluster").endpointToURL;
var url = endpointToURL(coordEndpoint);
@ -278,7 +279,7 @@ function MovingShardsSuite () {
function findServerNotOnList(list) {
var count = 1;
var str = "" + count;
var pad = "000";
var pad = "0000";
var ans = pad.substring(0, pad.length - str.length) + str;
var name = "DBServer" + ans;
@ -353,13 +354,13 @@ function MovingShardsSuite () {
testShrinkNoReplication : function() {
assertTrue(waitForSynchronousReplication("_system"));
shrinkCluster(4);
assertTrue(testServerEmpty("DBServer005", true));
assertTrue(testServerEmpty("DBServer0005", true));
assertTrue(waitForSupervision());
shrinkCluster(3);
assertTrue(testServerEmpty("DBServer004", true));
assertTrue(testServerEmpty("DBServer0004", true));
assertTrue(waitForSupervision());
shrinkCluster(2);
assertTrue(testServerEmpty("DBServer003", true));
assertTrue(testServerEmpty("DBServer0003", true));
assertTrue(waitForSupervision());
},