1
0
Fork 0

Bug fix/distribute shards like (#4415)

This commit is contained in:
Michael Hackstein 2018-01-29 13:07:06 +01:00
parent 399d2e67f7
commit 79a80558e1
7 changed files with 422 additions and 252 deletions

View File

@ -1,6 +1,14 @@
v3.3.4 (XXXX-XX-XX) v3.3.4 (XXXX-XX-XX)
------------------- -------------------
* fix internal issue 1770: collection creation using distributeShardsLike yields
errors and did not distribute shards correctly in the following cases:
1. If numberOfShards * replicationFactor % nrDBServers != 0
(shards * replication is not divisible by DBServers).
2. If there was failover / move shard case on the leading collection
and creating the follower collection afterwards.
* fix timeout issues in replication client expiration * fix timeout issues in replication client expiration
* added missing edge filter to neighbors-only traversals * added missing edge filter to neighbors-only traversals
@ -32,7 +40,6 @@ v3.3.4 (XXXX-XX-XX)
* fixed issue #4395: If your foxx app includes an `APP` folder it got accidently removed by selfhealing * fixed issue #4395: If your foxx app includes an `APP` folder it got accidently removed by selfhealing
this is not the case anymore. this is not the case anymore.
v3.3.3 (2018-01-16) v3.3.3 (2018-01-16)
------------------- -------------------

View File

@ -143,15 +143,18 @@ to the [naming conventions](../NamingConventions/README.md).
servers holding copies take over, usually without an error being servers holding copies take over, usually without an error being
reported. reported.
- *distributeShardsLike* distribute the shards of this collection
cloning the shard distribution of another.
When using the *Enterprise* version of ArangoDB the replicationFactor When using the *Enterprise* version of ArangoDB the replicationFactor
may be set to "satellite" making the collection locally joinable may be set to "satellite" making the collection locally joinable
on every database server. This reduces the number of network hops on every database server. This reduces the number of network hops
dramatically when using joins in AQL at the costs of reduced write dramatically when using joins in AQL at the costs of reduced write
performance on these collections. performance on these collections.
- *distributeShardsLike* distribute the shards of this collection
cloning the shard distribution of another. If this value is set
it will copy *replicationFactor* and *numberOfShards* from the
other collection, the attributes in this collection will be
ignored and can be ommited.
`db._create(collection-name, properties, type)` `db._create(collection-name, properties, type)`

View File

@ -1142,14 +1142,6 @@ int ClusterInfo::createCollectionCoordinator(std::string const& databaseName,
std::string const name = std::string const name =
arangodb::basics::VelocyPackHelper::getStringValue(json, "name", ""); arangodb::basics::VelocyPackHelper::getStringValue(json, "name", "");
std::shared_ptr<ShardMap> otherCidShardMap = nullptr;
if (json.hasKey("distributeShardsLike")) {
auto const otherCidString = json.get("distributeShardsLike").copyString();
if (!otherCidString.empty()) {
otherCidShardMap = getCollection(databaseName, otherCidString)->shardIds();
}
}
{ {
// check if a collection with the same name is already planned // check if a collection with the same name is already planned
loadPlan(); loadPlan();
@ -1259,23 +1251,24 @@ int ClusterInfo::createCollectionCoordinator(std::string const& databaseName,
_agencyCallbackRegistry->registerCallback(agencyCallback); _agencyCallbackRegistry->registerCallback(agencyCallback);
TRI_DEFER(_agencyCallbackRegistry->unregisterCallback(agencyCallback)); TRI_DEFER(_agencyCallbackRegistry->unregisterCallback(agencyCallback));
VPackBuilder builder;
builder.add(json);
std::vector<AgencyOperation> opers ( std::vector<AgencyOperation> opers (
{ AgencyOperation("Plan/Collections/" + databaseName + "/" + collectionID, { AgencyOperation("Plan/Collections/" + databaseName + "/" + collectionID,
AgencyValueOperationType::SET, builder.slice()), AgencyValueOperationType::SET, json),
AgencyOperation("Plan/Version", AgencySimpleOperationType::INCREMENT_OP)}); AgencyOperation("Plan/Version", AgencySimpleOperationType::INCREMENT_OP)});
std::vector<AgencyPrecondition> precs; std::vector<AgencyPrecondition> precs;
// Any of the shards locked? std::shared_ptr<ShardMap> otherCidShardMap = nullptr;
if (otherCidShardMap != nullptr) { if (json.hasKey("distributeShardsLike")) {
for (auto const& shard : *otherCidShardMap) { auto const otherCidString = json.get("distributeShardsLike").copyString();
precs.emplace_back( if (!otherCidString.empty()) {
AgencyPrecondition("Supervision/Shards/" + shard.first, otherCidShardMap = getCollection(databaseName, otherCidString)->shardIds();
AgencyPrecondition::Type::EMPTY, true)); // Any of the shards locked?
for (auto const& shard : *otherCidShardMap) {
precs.emplace_back(
AgencyPrecondition("Supervision/Shards/" + shard.first,
AgencyPrecondition::Type::EMPTY, true));
}
} }
} }
@ -2915,3 +2908,7 @@ std::unordered_map<ServerID, std::string> ClusterInfo::getServerAliases() {
} }
return ret; return ret;
} }
// -----------------------------------------------------------------------------
// --SECTION-- END-OF-FILE
// -----------------------------------------------------------------------------

View File

@ -465,6 +465,138 @@ static void collectResultsFromAllShards(
} }
} }
////////////////////////////////////////////////////////////////////////////////
/// @brief compute a shard distribution for a new collection, the list
/// dbServers must be a list of DBserver ids to distribute across.
/// If this list is empty, the complete current list of DBservers is
/// fetched from ClusterInfo and with random_shuffle to mix it up.
////////////////////////////////////////////////////////////////////////////////
static std::shared_ptr<std::unordered_map<std::string, std::vector<std::string>>> DistributeShardsEvenly(
ClusterInfo* ci,
uint64_t numberOfShards,
uint64_t replicationFactor,
std::vector<std::string>& dbServers,
bool warnAboutReplicationFactor) {
auto shards = std::make_shared<std::unordered_map<std::string, std::vector<std::string>>>();
ci->loadCurrentDBServers();
if (dbServers.size() == 0) {
dbServers = ci->getCurrentDBServers();
if (dbServers.empty()) {
return shards;
}
random_shuffle(dbServers.begin(), dbServers.end());
}
// mop: distribute satellite collections on all servers
if (replicationFactor == 0) {
replicationFactor = dbServers.size();
}
// fetch a unique id for each shard to create
uint64_t const id = ci->uniqid(numberOfShards);
size_t leaderIndex = 0;
size_t followerIndex = 0;
for (uint64_t i = 0; i < numberOfShards; ++i) {
// determine responsible server(s)
std::vector<std::string> serverIds;
for (uint64_t j = 0; j < replicationFactor; ++j) {
if (j >= dbServers.size()) {
if (warnAboutReplicationFactor) {
LOG_TOPIC(WARN, Logger::CLUSTER)
<< "createCollectionCoordinator: replicationFactor is "
<< "too large for the number of DBservers";
}
break;
}
std::string candidate;
// mop: leader
if (serverIds.size() == 0) {
candidate = dbServers[leaderIndex++];
if (leaderIndex >= dbServers.size()) {
leaderIndex = 0;
}
} else {
do {
candidate = dbServers[followerIndex++];
if (followerIndex >= dbServers.size()) {
followerIndex = 0;
}
} while (candidate == serverIds[0]); // mop: ignore leader
}
serverIds.push_back(candidate);
}
// determine shard id
std::string shardId = "s" + StringUtils::itoa(id + i);
shards->emplace(shardId, serverIds);
}
return shards;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief Clone shard distribution from other collection
////////////////////////////////////////////////////////////////////////////////
static std::shared_ptr<std::unordered_map<std::string, std::vector<std::string>>>
CloneShardDistribution(ClusterInfo* ci, LogicalCollection* col,
TRI_voc_cid_t cid) {
auto result = std::make_shared<std::unordered_map<std::string, std::vector<std::string>>>();
TRI_ASSERT(cid != 0);
std::string cidString = arangodb::basics::StringUtils::itoa(cid);
std::shared_ptr<LogicalCollection> other =
ci->getCollection(col->dbName(), cidString);
// The function guarantees that no nullptr is returned
TRI_ASSERT(other != nullptr);
if (!other->distributeShardsLike().empty()) {
std::string const errorMessage = "Cannot distribute shards like '" + other->name() + "' it is already distributed like '" + other->distributeShardsLike() + "'.";
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_CLUSTER_CHAIN_OF_DISTRIBUTESHARDSLIKE, errorMessage);
}
// We need to replace the distribute with the cid.
col->distributeShardsLike(cidString);
if (col->isSmart() && col->type() == TRI_COL_TYPE_EDGE) {
return result;
}
if (col->replicationFactor() != other->replicationFactor()) {
col->replicationFactor(other->replicationFactor());
}
if (col->numberOfShards() != other->numberOfShards()) {
col->numberOfShards(other->numberOfShards());
}
auto shards = other->shardIds();
auto shardList = ci->getShardList(cidString);
auto numberOfShards = static_cast<uint64_t>(col->numberOfShards());
// fetch a unique id for each shard to create
uint64_t const id = ci->uniqid(numberOfShards);
for (uint64_t i = 0; i < numberOfShards; ++i) {
// determine responsible server(s)
std::string shardId = "s" + StringUtils::itoa(id + i);
auto it = shards->find(shardList->at(i));
if (it == shards->end()) {
TRI_ASSERT(false);
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "Inconsistency in shard distribution detected. Is in the process of self-healing. Please retry the operation again after some seconds.");
}
result->emplace(shardId, it->second);
}
return result;
}
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @brief creates a copy of all HTTP headers to forward /// @brief creates a copy of all HTTP headers to forward
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
@ -2437,80 +2569,6 @@ int flushWalOnAllDBServers(bool waitForSync, bool waitForCollector, double maxWa
return TRI_ERROR_NO_ERROR; return TRI_ERROR_NO_ERROR;
} }
////////////////////////////////////////////////////////////////////////////////
/// @brief compute a shard distribution for a new collection, the list
/// dbServers must be a list of DBserver ids to distribute across.
/// If this list is empty, the complete current list of DBservers is
/// fetched from ClusterInfo and with random_shuffle to mix it up.
////////////////////////////////////////////////////////////////////////////////
std::unordered_map<std::string, std::vector<std::string>> distributeShards(
uint64_t numberOfShards,
uint64_t replicationFactor,
std::vector<std::string>& dbServers,
bool warnAboutReplicationFactor) {
std::unordered_map<std::string, std::vector<std::string>> shards;
ClusterInfo* ci = ClusterInfo::instance();
ci->loadCurrentDBServers();
if (dbServers.size() == 0) {
dbServers = ci->getCurrentDBServers();
if (dbServers.empty()) {
return shards;
}
random_shuffle(dbServers.begin(), dbServers.end());
}
// mop: distribute satellite collections on all servers
if (replicationFactor == 0) {
replicationFactor = dbServers.size();
}
// fetch a unique id for each shard to create
uint64_t const id = ci->uniqid(numberOfShards);
size_t leaderIndex = 0;
size_t followerIndex = 0;
for (uint64_t i = 0; i < numberOfShards; ++i) {
// determine responsible server(s)
std::vector<std::string> serverIds;
for (uint64_t j = 0; j < replicationFactor; ++j) {
if (j >= dbServers.size()) {
if (warnAboutReplicationFactor) {
LOG_TOPIC(WARN, Logger::CLUSTER)
<< "createCollectionCoordinator: replicationFactor is "
<< "too large for the number of DBservers";
}
break;
}
std::string candidate;
// mop: leader
if (serverIds.size() == 0) {
candidate = dbServers[leaderIndex++];
if (leaderIndex >= dbServers.size()) {
leaderIndex = 0;
}
} else {
do {
candidate = dbServers[followerIndex++];
if (followerIndex >= dbServers.size()) {
followerIndex = 0;
}
} while (candidate == serverIds[0]); // mop: ignore leader
}
serverIds.push_back(candidate);
}
// determine shard id
std::string shardId = "s" + StringUtils::itoa(id + i);
shards.emplace(shardId, serverIds);
}
return shards;
}
#ifndef USE_ENTERPRISE #ifndef USE_ENTERPRISE
std::unique_ptr<LogicalCollection> ClusterMethods::createCollectionOnCoordinator( std::unique_ptr<LogicalCollection> ClusterMethods::createCollectionOnCoordinator(
TRI_col_type_e collectionType, TRI_vocbase_t* vocbase, VPackSlice parameters, TRI_col_type_e collectionType, TRI_vocbase_t* vocbase, VPackSlice parameters,
@ -2533,18 +2591,15 @@ std::unique_ptr<LogicalCollection> ClusterMethods::createCollectionOnCoordinator
std::unique_ptr<LogicalCollection> ClusterMethods::persistCollectionInAgency( std::unique_ptr<LogicalCollection> ClusterMethods::persistCollectionInAgency(
LogicalCollection* col, bool ignoreDistributeShardsLikeErrors, LogicalCollection* col, bool ignoreDistributeShardsLikeErrors,
bool waitForSyncReplication, bool enforceReplicationFactor, bool waitForSyncReplication, bool enforceReplicationFactor,
VPackSlice parameters) { VPackSlice) {
std::string distributeShardsLike = col->distributeShardsLike(); std::string distributeShardsLike = col->distributeShardsLike();
std::vector<std::string> avoid = col->avoidServers(); std::vector<std::string> avoid = col->avoidServers();
size_t replicationFactor = col->replicationFactor();
size_t numberOfShards = col->numberOfShards();
std::string const replicationFactorStr("replicationFactor");
std::string const numberOfShardsStr("numberOfShards");
ClusterInfo* ci = ClusterInfo::instance(); ClusterInfo* ci = ClusterInfo::instance();
std::vector<std::string> dbServers; std::vector<std::string> dbServers = ci->getCurrentDBServers();
std::shared_ptr<std::unordered_map<std::string, std::vector<std::string>>> shards = nullptr;
if (!distributeShardsLike.empty()) { if (!distributeShardsLike.empty()) {
CollectionNameResolver resolver(col->vocbase()); CollectionNameResolver resolver(col->vocbase());
@ -2552,132 +2607,48 @@ std::unique_ptr<LogicalCollection> ClusterMethods::persistCollectionInAgency(
resolver.getCollectionIdCluster(distributeShardsLike); resolver.getCollectionIdCluster(distributeShardsLike);
if (otherCid != 0) { if (otherCid != 0) {
shards = CloneShardDistribution(ci, col, otherCid);
bool chainOfDistributeShardsLike = false;
bool numberOfShardsConflict = false;
bool replicationFactorConflict = false;
std::string otherCidString
= arangodb::basics::StringUtils::itoa(otherCid);
VPackBuilder builder;
{ VPackObjectBuilder a(&builder);
col->toVelocyPack(builder,false); }
try {
std::shared_ptr<LogicalCollection> other =
ci->getCollection(col->dbName(), otherCidString);
size_t otherReplFactor = size_t(other->replicationFactor());
if (!col->isSmart()) {
if (parameters.hasKey(replicationFactorStr)) {
replicationFactor = parameters.get(replicationFactorStr).getNumber<size_t>();
if (otherReplFactor != replicationFactor) {
replicationFactor = otherReplFactor;
col->replicationFactor(static_cast<int>(otherReplFactor));
//replicationFactorConflict = true;
}
} else {
replicationFactor = otherReplFactor;
col->replicationFactor(static_cast<int>(otherReplFactor));
}
size_t otherNumOfShards = size_t(other->numberOfShards());
if (parameters.hasKey(numberOfShardsStr)) {
numberOfShards = parameters.get(numberOfShardsStr).getNumber<size_t>();
if (otherNumOfShards != numberOfShards) {
numberOfShards = otherNumOfShards;
col->replicationFactor(static_cast<int>(otherNumOfShards));
//numberOfShardsConflict = true;
}
} else {
numberOfShards = otherNumOfShards;
col->replicationFactor(static_cast<int>(otherNumOfShards));
}
}
if (!other->distributeShardsLike().empty()) {
chainOfDistributeShardsLike = true;
}
auto shards = other->shardIds();
auto shardList = ci->getShardList(otherCidString);
for (auto const& s : *shardList) {
auto it = shards->find(s);
if (it != shards->end()) {
for (auto const& s : it->second) {
dbServers.push_back(s);
}
}
}
} catch (...) {}
if (replicationFactorConflict) {
THROW_ARANGO_EXCEPTION(
TRI_ERROR_CLUSTER_DISTRIBUTE_SHARDS_LIKE_REPLICATION_FACTOR);
}
if (numberOfShardsConflict) {
THROW_ARANGO_EXCEPTION(
TRI_ERROR_CLUSTER_DISTRIBUTE_SHARDS_LIKE_NUMBER_OF_SHARDS);
}
if (chainOfDistributeShardsLike) {
THROW_ARANGO_EXCEPTION(TRI_ERROR_CLUSTER_CHAIN_OF_DISTRIBUTESHARDSLIKE);
}
col->distributeShardsLike(otherCidString);
} else { } else {
dbServers = ci->getCurrentDBServers(); THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_CLUSTER_UNKNOWN_DISTRIBUTESHARDSLIKE,
if (ignoreDistributeShardsLikeErrors) { "Could not find collection " + distributeShardsLike + " to distribute shards like it.");
col->distributeShardsLike(std::string());
} else {
THROW_ARANGO_EXCEPTION(TRI_ERROR_CLUSTER_UNKNOWN_DISTRIBUTESHARDSLIKE);
}
} }
} else if (!avoid.empty()) { } else {
dbServers = ci->getCurrentDBServers(); // system collections should never enforce replicationfactor
if (dbServers.size() - avoid.size() >= replicationFactor) { // to allow them to come up with 1 dbserver
if (col->isSystem()) {
enforceReplicationFactor = false;
}
size_t replicationFactor = col->replicationFactor();
size_t numberOfShards = col->numberOfShards();
// the default behaviour however is to bail out and inform the user
// that the requested replicationFactor is not possible right now
if (enforceReplicationFactor && dbServers.size() < replicationFactor) {
THROW_ARANGO_EXCEPTION(TRI_ERROR_CLUSTER_INSUFFICIENT_DBSERVERS);
}
if (!avoid.empty()) {
// We need to remove all servers that are in the avoid list
if (dbServers.size() - avoid.size() < replicationFactor) {
// Not enough DBServers left
THROW_ARANGO_EXCEPTION(TRI_ERROR_CLUSTER_INSUFFICIENT_DBSERVERS);
}
dbServers.erase( dbServers.erase(
std::remove_if( std::remove_if(
dbServers.begin(), dbServers.end(), [&](const std::string&x) { dbServers.begin(), dbServers.end(), [&](const std::string& x) {
return std::find(avoid.begin(), avoid.end(), x) != avoid.end(); return std::find(avoid.begin(), avoid.end(), x) != avoid.end();
}), dbServers.end()); }), dbServers.end());
} }
std::random_shuffle(dbServers.begin(), dbServers.end()); std::random_shuffle(dbServers.begin(), dbServers.end());
} else { shards = DistributeShardsEvenly(ci, numberOfShards, replicationFactor, dbServers, !col->isSystem());
dbServers = ci->getCurrentDBServers();
} }
// system collections should never enforce replicationfactor
// to allow them to come up with 1 dbserver
if (enforceReplicationFactor && col->isSystem()) {
enforceReplicationFactor = false;
}
// the default behaviour however is to bail out and inform the user
// that the requested replicationFactor is not possible right now
if (enforceReplicationFactor && dbServers.size() < replicationFactor) {
THROW_ARANGO_EXCEPTION(TRI_ERROR_CLUSTER_INSUFFICIENT_DBSERVERS);
}
// If the list dbServers is still empty, it will be filled in
// distributeShards below.
// Now create the shards:
bool warnAboutReplicationFactor = (!col->isSystem());
auto shards = std::make_shared<
std::unordered_map<std::string, std::vector<std::string>>>(
arangodb::distributeShards(numberOfShards, replicationFactor, dbServers, warnAboutReplicationFactor));
if (shards->empty() && !col->isSmart()) { if (shards->empty() && !col->isSmart()) {
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL,
"no database servers found in cluster"); "no database servers found in cluster");
} }
col->setShardMap(shards); col->setShardMap(shards);
std::unordered_set<std::string> const ignoreKeys{ std::unordered_set<std::string> const ignoreKeys{
@ -2690,7 +2661,8 @@ std::unique_ptr<LogicalCollection> ClusterMethods::persistCollectionInAgency(
std::string errorMsg; std::string errorMsg;
int myerrno = ci->createCollectionCoordinator( int myerrno = ci->createCollectionCoordinator(
col->dbName(), col->cid_as_string(), col->dbName(), col->cid_as_string(),
numberOfShards, replicationFactor, waitForSyncReplication, velocy.slice(), errorMsg, 240.0); col->numberOfShards(), col->replicationFactor(),
waitForSyncReplication, velocy.slice(), errorMsg, 240.0);
if (myerrno != TRI_ERROR_NO_ERROR) { if (myerrno != TRI_ERROR_NO_ERROR) {
if (errorMsg.empty()) { if (errorMsg.empty()) {

View File

@ -248,20 +248,6 @@ int flushWalOnAllDBServers(bool waitForSync, bool waitForCollector, double maxWa
int rotateActiveJournalOnAllDBServers(std::string const& dbname, int rotateActiveJournalOnAllDBServers(std::string const& dbname,
std::string const& collname); std::string const& collname);
////////////////////////////////////////////////////////////////////////////////
/// @brief compute a shard distribution for a new collection, the list
/// dbServers must be a list of DBserver ids to distribute across.
/// If this list is empty, the complete current list of DBservers is
/// fetched from ClusterInfo. If shuffle is true, a few random shuffles
/// are performed before the list is taken. Thus modifies the list.
////////////////////////////////////////////////////////////////////////////////
std::unordered_map<std::string, std::vector<std::string>> distributeShards(
uint64_t numberOfShards,
uint64_t replicationFactor,
std::vector<std::string>& dbServers,
bool warnAboutReplicationFactor);
class ClusterMethods { class ClusterMethods {
public: public:
// wrapper Class for static functions. // wrapper Class for static functions.

View File

@ -29,7 +29,10 @@ const internal = require('internal');
const download = require('internal').download; const download = require('internal').download;
const colName = "UnitTestDistributionTest"; const colName = "UnitTestDistributionTest";
const _ = require("lodash"); const _ = require("lodash");
const wait = require("internal").wait;
const request = require('@arangodb/request'); const request = require('@arangodb/request');
const endpointToURL = require("@arangodb/cluster").endpointToURL;
const coordinatorName = "Coordinator0001";
let coordinator = instanceInfo.arangods.filter(arangod => { let coordinator = instanceInfo.arangods.filter(arangod => {
return arangod.role === 'coordinator'; return arangod.role === 'coordinator';
@ -87,7 +90,6 @@ describe('Shard distribution', function () {
serverCount += 1; serverCount += 1;
} }
} }
console.log("Found health records:", serverCount, health.Health, count);
if (serverCount >= dbServerCount) { if (serverCount >= dbServerCount) {
break; break;
} }
@ -155,4 +157,177 @@ describe('Shard distribution', function () {
}); });
describe("using distributeShardsLike", function () {
const followCollection = 'UnitTestDistributionFollower';
const numberOfShards = 12;
const cleanUp = function () {
internal.db._drop(followCollection);
};
const shardNumber = function (shard) {
// Each shard starts with 's'
expect(shard[0]).to.equal('s');
// And is followed by a numeric value
const nr = parseInt(shard.slice(1));
expect(nr).to.be.above(0);
return nr
};
const sortShardsNumericly = function (l, r) {
return shardNumber(l) - shardNumber(r);
};
const compareDistributions = function() {
const all = request.get(coordinator.url + '/_admin/cluster/shardDistribution');
const dist = JSON.parse(all.body).results;
const orig = dist[colName].Current;
const fol = dist[followCollection].Current;
const origShards = Object.keys(orig).sort(sortShardsNumericly);
const folShards = Object.keys(fol).sort(sortShardsNumericly);
// Now we have all shard names sorted in alphabetical ordering.
// It needs to be guaranteed that leader + follower of each shard in this ordering is identical.
expect(origShards).to.have.length.of(folShards.length);
for (let i = 0; i < origShards.length; ++i) {
const oneOrigShard = orig[origShards[i]];
const oneFolShard = fol[folShards[i]];
// Leader has to be identical
expect(oneOrigShard.leader).to.equal(oneFolShard.leader);
// Follower Order does not matter, but needs to be the same servers
expect(oneOrigShard.followers.sort()).to.deep.equal(oneFolShard.followers.sort());
}
};
describe("without replication", function () {
const replicationFactor = 1;
beforeEach(function () {
cleanUp();
internal.db._create(colName, {replicationFactor, numberOfShards});
});
afterEach(cleanUp);
it("should create all shards on identical servers", function () {
internal.db._create(followCollection, {replicationFactor, numberOfShards, distributeShardsLike: colName});
compareDistributions();
});
});
describe("with replication", function () {
const replicationFactor = 3;
// Note here: We have to make sure that numberOfShards * replicationFactor is not disible by the number of DBServers
////////////////////////////////////////////////////////////////////////////////
/// @brief order the cluster to clean out a server:
////////////////////////////////////////////////////////////////////////////////
const cleanOutServer = function (id) {
var coordEndpoint =
global.ArangoClusterInfo.getServerEndpoint(coordinatorName);
var url = endpointToURL(coordEndpoint);
var body = {"server": id};
try {
return request({ method: "POST",
url: url + "/_admin/cluster/cleanOutServer",
body: JSON.stringify(body) });
} catch (err) {
console.error(
"Exception for POST /_admin/cluster/cleanOutServer:", err.stack);
return false;
}
};
const getCleanedOutServers = function () {
const coordEndpoint =
global.ArangoClusterInfo.getServerEndpoint(coordinatorName);
const url = endpointToURL(coordEndpoint);
try {
const envelope =
{ method: "GET", url: url + "/_admin/cluster/numberOfServers" };
let res = request(envelope);
var body = res.body;
if (typeof body === "string") {
body = JSON.parse(body);
}
return body;
} catch (err) {
console.error(
"Exception for POST /_admin/cluster/cleanOutServer:", err.stack);
return {};
}
};
const waitForCleanout = function (id) {
let count = 600;
while (--count > 0) {
let obj = getCleanedOutServers();
if (obj.cleanedServers.indexOf(id) >= 0) {
console.info(
"Success: Server " + id + " cleaned out after " + (600-count) + " seconds");
return true;
}
wait(1.0);
}
console.error(
"Failed: Server " + id + " not cleaned out after 600 seconds");
return false;
};
const waitForSynchronousReplication = function (collection) {
global.ArangoClusterInfo.flush();
var cinfo = global.ArangoClusterInfo.getCollectionInfo(
"_system", collection);
var shards = Object.keys(cinfo.shards);
var replFactor = cinfo.shards[shards[0]].length;
var count = 0;
while (++count <= 180) {
var ccinfo = shards.map(
s => global.ArangoClusterInfo.getCollectionInfoCurrent(
"_system", collection, s)
);
let replicas = ccinfo.map(s => s.servers);
if (_.every(replicas, x => x.length === replFactor)) {
return true;
}
wait(0.5);
global.ArangoClusterInfo.flush();
}
console.error(`Collection "${collection}" failed to get all followers in sync after 60 sec`);
return false;
};
beforeEach(function () {
cleanUp();
internal.db._create(colName, {replicationFactor, numberOfShards});
expect(waitForSynchronousReplication(colName)).to.equal(true);
});
afterEach(cleanUp);
it("should create all shards and followers on identical servers", function () {
internal.db._create(followCollection, {replicationFactor, numberOfShards, distributeShardsLike: colName});
expect(waitForSynchronousReplication(followCollection)).to.equal(true);
compareDistributions();
});
it("should be resilient to a failover in the original collection", function () {
var server = global.ArangoClusterInfo.getDBServers()[1].serverId;
// Clean out the server that is scheduled second.
expect(cleanOutServer(server)).to.not.equal(false);
expect(waitForCleanout(server)).to.equal(true);
expect(waitForSynchronousReplication(colName)).to.equal(true);
// Now we have moved around some shards.
internal.db._create(followCollection, {replicationFactor, numberOfShards, distributeShardsLike: colName});
expect(waitForSynchronousReplication(followCollection)).to.equal(true);
compareDistributions();
});
});
});
}); });

View File

@ -63,9 +63,10 @@ function checkReplicationFactor(name, fac) {
internal.sleep(0.5); internal.sleep(0.5);
} }
let current = ArangoAgency.get('Current/Collections/_system'); let current = ArangoAgency.get('Current/Collections/_system');
let val = current.arango.Current.Collections['_system'][collectionId]; let val = current.arango.Current.Collections['_system'][collectionId];
throw "replicationFactor is not reflected properly in " + expect(true).to.equal(false, "Expected replicationFactor of " + fac + " in collection "
"/Current/Collections/_system/" + collectionId + ": "+ JSON.stringify(val); + name + " is not reflected properly in " +
"/Current/Collections/_system/" + collectionId + ": "+ JSON.stringify(val));
}; };
describe('Update collection properties', function() { describe('Update collection properties', function() {
@ -87,13 +88,13 @@ describe('Update collection properties', function() {
checkReplicationFactor(cn1, 1); checkReplicationFactor(cn1, 1);
const coll = db._collection(cn1); const coll = db._collection(cn1);
let props = coll.properties({replicationFactor: 2}); let props = coll.properties({replicationFactor: 2});
expect(props.replicationFactor).to.equal(2); expect(props.replicationFactor).to.equal(2);
checkReplicationFactor(cn1, 2); checkReplicationFactor(cn1, 2);
}); });
it('decrease replication factor ', function() { it('decrease replication factor ', function() {
db._create(cn1, {replicationFactor: 2, numberOfShards: 2}, {waitForSyncReplication: true}); db._create(cn1, {replicationFactor: 2, numberOfShards: 2}, {waitForSyncReplication: true});
@ -115,7 +116,7 @@ describe('Update collection properties', function() {
try { try {
const coll = db._collection(cn1); const coll = db._collection(cn1);
coll.properties({replicationFactor: -1}); coll.properties({replicationFactor: -1});
expect(false.replicationFactor).to.equal(true, expect(false.replicationFactor).to.equal(true,
"Was able to update replicationFactor of follower"); "Was able to update replicationFactor of follower");
} catch(e) { } catch(e) {
expect(e.errorNum).to.equal(errors.ERROR_BAD_PARAMETER.code); expect(e.errorNum).to.equal(errors.ERROR_BAD_PARAMETER.code);
@ -124,7 +125,7 @@ describe('Update collection properties', function() {
try { try {
const coll = db._collection(cn1); const coll = db._collection(cn1);
coll.properties({replicationFactor: 100}); coll.properties({replicationFactor: 100});
expect(false.replicationFactor).to.equal(true, expect(false.replicationFactor).to.equal(true,
"Was able to update replicationFactor of follower"); "Was able to update replicationFactor of follower");
} catch(e) { } catch(e) {
expect(e.errorNum).to.equal(errors.ERROR_BAD_PARAMETER.code); expect(e.errorNum).to.equal(errors.ERROR_BAD_PARAMETER.code);
@ -133,7 +134,7 @@ describe('Update collection properties', function() {
try { try {
const coll = db._collection(cn1); const coll = db._collection(cn1);
coll.properties({replicationFactor: "satellite"}); coll.properties({replicationFactor: "satellite"});
expect(false.replicationFactor).to.equal(true, expect(false.replicationFactor).to.equal(true,
"Was able to update replicationFactor of follower"); "Was able to update replicationFactor of follower");
} catch(e) { } catch(e) {
expect(e.errorNum).to.equal(errors.ERROR_FORBIDDEN.code); expect(e.errorNum).to.equal(errors.ERROR_FORBIDDEN.code);
@ -153,9 +154,9 @@ describe('Update collection properties with distributeShardsLike, ', function()
db._useDatabase("_system"); db._useDatabase("_system");
try { try {
db._drop(cn2); db._drop(cn2);
} catch (e) {} } catch (e) {}
try { try {
db._drop(cn1); db._drop(cn1);
} catch (e) {} } catch (e) {}
@ -166,22 +167,22 @@ describe('Update collection properties with distributeShardsLike, ', function()
db._create(cn2, {distributeShardsLike: cn1}, {waitForSyncReplication: true}); db._create(cn2, {distributeShardsLike: cn1}, {waitForSyncReplication: true});
checkReplicationFactor(cn1, 1); checkReplicationFactor(cn1, 1);
checkReplicationFactor(cn2, 1); checkReplicationFactor(cn2, 1);
const leader = db._collection(cn1); const leader = db._collection(cn1);
let props = leader.properties({replicationFactor: 2}); let props = leader.properties({replicationFactor: 2});
expect(props.replicationFactor).to.equal(2); expect(props.replicationFactor).to.equal(2);
checkReplicationFactor(cn1, 2); checkReplicationFactor(cn1, 2);
checkReplicationFactor(cn2, 2); checkReplicationFactor(cn2, 2);
}); });
it('decrease replication factor', function() { it('decrease replication factor', function() {
db._create(cn1, {replicationFactor: 2, numberOfShards: 2}, {waitForSyncReplication: true}); db._create(cn1, {replicationFactor: 2, numberOfShards: 2}, {waitForSyncReplication: true});
db._create(cn2, {distributeShardsLike: cn1}, {waitForSyncReplication: true}); db._create(cn2, {distributeShardsLike: cn1}, {waitForSyncReplication: true});
checkReplicationFactor(cn1, 2); checkReplicationFactor(cn1, 2);
checkReplicationFactor(cn2, 2); checkReplicationFactor(cn2, 2);
const leader = db._collection(cn1); const leader = db._collection(cn1);
@ -197,12 +198,12 @@ describe('Update collection properties with distributeShardsLike, ', function()
db._create(cn2, {distributeShardsLike: cn1}, {waitForSyncReplication: true}); db._create(cn2, {distributeShardsLike: cn1}, {waitForSyncReplication: true});
checkReplicationFactor(cn1, 2); checkReplicationFactor(cn1, 2);
checkReplicationFactor(cn2, 2); checkReplicationFactor(cn2, 2);
try { try {
const follower = db._collection(cn2); const follower = db._collection(cn2);
follower.properties({replicationFactor: 1}); follower.properties({replicationFactor: 1});
expect(false.replicationFactor).to.equal(true, expect(false.replicationFactor).to.equal(true,
"Was able to update replicationFactor of follower"); "Was able to update replicationFactor of follower");
} catch(e) { } catch(e) {
expect(e.errorNum).to.equal(errors.ERROR_FORBIDDEN.code); expect(e.errorNum).to.equal(errors.ERROR_FORBIDDEN.code);
@ -219,10 +220,16 @@ describe('Replication factor constraints', function() {
db._useDatabase("_system"); db._useDatabase("_system");
try { try {
db._drop(cn1); // must be dropped first because cn1 is prototype for this collection
// and can only be dropped if all dependent collections are dropped first.
db._drop(cn2);
} catch (e) {}
try {
db._drop(cn1);
} catch (e) {} } catch (e) {}
}); });
it('should not allow to create a collection with more replicas than dbservers available', function() { it('should not allow to create a collection with more replicas than dbservers available', function() {
try { try {
db._create(cn1, {replicationFactor: 5}); db._create(cn1, {replicationFactor: 5});
@ -235,4 +242,27 @@ describe('Replication factor constraints', function() {
it('should allow to create a collection with more replicas than dbservers when explicitly requested', function() { it('should allow to create a collection with more replicas than dbservers when explicitly requested', function() {
db._create(cn1, {replicationFactor: 5}, {enforceReplicationFactor: false}); db._create(cn1, {replicationFactor: 5}, {enforceReplicationFactor: false});
}); });
});
it('check replication factor of system collections', function() {
["_appbundles", "_apps", "_aqlfunctions", "_frontend", "_graphs",
"_iresearch_analyzers", "_jobs", "_modules", "_queues", "_routing",
"_statistics" , "_statistics15" , "_statisticsRaw" ,"_users"
].forEach(name => {
if(name === "_graphs"){
expect(db[name].properties()['replicationFactor']).to.equal(2);
} else if(db[name]){
expect(db[name].properties()['replicationFactor']).to.equal(2);
expect(db[name].properties()['distributeShardsLike']).to.equal("_graphs");
}
});
});
it('distributeShardsLike should ignore additional parameters', function() {
db._create(cn1, {replicationFactor: 2, numberOfShards: 2}, {waitForSyncReplication: true});
db._create(cn2, {distributeShardsLike: cn1, replicationFactor: 5, numberOfShards: 99}, {waitForSyncReplication: true});
expect(db[cn1].properties()['replicationFactor']).to.equal(db[cn2].properties()['replicationFactor']);
expect(db[cn1].properties()['numberOfShards']).to.equal(db[cn2].properties()['numberOfShards']);
expect(db[cn2].properties()['distributeShardsLike']).to.equal(cn1);
});
});