diff --git a/CHANGELOG b/CHANGELOG index 0ca09b6148..1e4c7e22ac 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,14 @@ v3.3.4 (XXXX-XX-XX) ------------------- + +* fix internal issue 1770: collection creation using distributeShardsLike yields + errors and did not distribute shards correctly in the following cases: + 1. If numberOfShards * replicationFactor % nrDBServers != 0 + (shards * replication is not divisible by DBServers). + 2. If there was failover / move shard case on the leading collection + and creating the follower collection afterwards. + * fix timeout issues in replication client expiration * added missing edge filter to neighbors-only traversals @@ -32,7 +40,6 @@ v3.3.4 (XXXX-XX-XX) * fixed issue #4395: If your foxx app includes an `APP` folder it got accidently removed by selfhealing this is not the case anymore. - v3.3.3 (2018-01-16) ------------------- diff --git a/Documentation/Books/Manual/DataModeling/Collections/DatabaseMethods.md b/Documentation/Books/Manual/DataModeling/Collections/DatabaseMethods.md index 0a2653303d..33b69c8c40 100644 --- a/Documentation/Books/Manual/DataModeling/Collections/DatabaseMethods.md +++ b/Documentation/Books/Manual/DataModeling/Collections/DatabaseMethods.md @@ -143,15 +143,18 @@ to the [naming conventions](../NamingConventions/README.md). servers holding copies take over, usually without an error being reported. -- *distributeShardsLike* distribute the shards of this collection - cloning the shard distribution of another. - When using the *Enterprise* version of ArangoDB the replicationFactor may be set to "satellite" making the collection locally joinable on every database server. This reduces the number of network hops dramatically when using joins in AQL at the costs of reduced write performance on these collections. +- *distributeShardsLike* distribute the shards of this collection + cloning the shard distribution of another. If this value is set + it will copy *replicationFactor* and *numberOfShards* from the + other collection, the attributes in this collection will be + ignored and can be ommited. + `db._create(collection-name, properties, type)` diff --git a/arangod/Cluster/ClusterInfo.cpp b/arangod/Cluster/ClusterInfo.cpp index 65a63f2603..7754f8f082 100644 --- a/arangod/Cluster/ClusterInfo.cpp +++ b/arangod/Cluster/ClusterInfo.cpp @@ -1142,14 +1142,6 @@ int ClusterInfo::createCollectionCoordinator(std::string const& databaseName, std::string const name = arangodb::basics::VelocyPackHelper::getStringValue(json, "name", ""); - std::shared_ptr otherCidShardMap = nullptr; - if (json.hasKey("distributeShardsLike")) { - auto const otherCidString = json.get("distributeShardsLike").copyString(); - if (!otherCidString.empty()) { - otherCidShardMap = getCollection(databaseName, otherCidString)->shardIds(); - } - } - { // check if a collection with the same name is already planned loadPlan(); @@ -1259,23 +1251,24 @@ int ClusterInfo::createCollectionCoordinator(std::string const& databaseName, _agencyCallbackRegistry->registerCallback(agencyCallback); TRI_DEFER(_agencyCallbackRegistry->unregisterCallback(agencyCallback)); - VPackBuilder builder; - builder.add(json); - - std::vector opers ( { AgencyOperation("Plan/Collections/" + databaseName + "/" + collectionID, - AgencyValueOperationType::SET, builder.slice()), + AgencyValueOperationType::SET, json), AgencyOperation("Plan/Version", AgencySimpleOperationType::INCREMENT_OP)}); std::vector precs; - // Any of the shards locked? - if (otherCidShardMap != nullptr) { - for (auto const& shard : *otherCidShardMap) { - precs.emplace_back( - AgencyPrecondition("Supervision/Shards/" + shard.first, - AgencyPrecondition::Type::EMPTY, true)); + std::shared_ptr otherCidShardMap = nullptr; + if (json.hasKey("distributeShardsLike")) { + auto const otherCidString = json.get("distributeShardsLike").copyString(); + if (!otherCidString.empty()) { + otherCidShardMap = getCollection(databaseName, otherCidString)->shardIds(); + // Any of the shards locked? + for (auto const& shard : *otherCidShardMap) { + precs.emplace_back( + AgencyPrecondition("Supervision/Shards/" + shard.first, + AgencyPrecondition::Type::EMPTY, true)); + } } } @@ -2915,3 +2908,7 @@ std::unordered_map ClusterInfo::getServerAliases() { } return ret; } + +// ----------------------------------------------------------------------------- +// --SECTION-- END-OF-FILE +// ----------------------------------------------------------------------------- diff --git a/arangod/Cluster/ClusterMethods.cpp b/arangod/Cluster/ClusterMethods.cpp index d3499917d9..063fc44892 100644 --- a/arangod/Cluster/ClusterMethods.cpp +++ b/arangod/Cluster/ClusterMethods.cpp @@ -465,6 +465,138 @@ static void collectResultsFromAllShards( } } +//////////////////////////////////////////////////////////////////////////////// +/// @brief compute a shard distribution for a new collection, the list +/// dbServers must be a list of DBserver ids to distribute across. +/// If this list is empty, the complete current list of DBservers is +/// fetched from ClusterInfo and with random_shuffle to mix it up. +//////////////////////////////////////////////////////////////////////////////// + +static std::shared_ptr>> DistributeShardsEvenly( + ClusterInfo* ci, + uint64_t numberOfShards, + uint64_t replicationFactor, + std::vector& dbServers, + bool warnAboutReplicationFactor) { + + auto shards = std::make_shared>>(); + + ci->loadCurrentDBServers(); + if (dbServers.size() == 0) { + dbServers = ci->getCurrentDBServers(); + if (dbServers.empty()) { + return shards; + } + random_shuffle(dbServers.begin(), dbServers.end()); + } + + // mop: distribute satellite collections on all servers + if (replicationFactor == 0) { + replicationFactor = dbServers.size(); + } + + // fetch a unique id for each shard to create + uint64_t const id = ci->uniqid(numberOfShards); + + size_t leaderIndex = 0; + size_t followerIndex = 0; + for (uint64_t i = 0; i < numberOfShards; ++i) { + // determine responsible server(s) + std::vector serverIds; + for (uint64_t j = 0; j < replicationFactor; ++j) { + if (j >= dbServers.size()) { + if (warnAboutReplicationFactor) { + LOG_TOPIC(WARN, Logger::CLUSTER) + << "createCollectionCoordinator: replicationFactor is " + << "too large for the number of DBservers"; + } + break; + } + std::string candidate; + // mop: leader + if (serverIds.size() == 0) { + candidate = dbServers[leaderIndex++]; + if (leaderIndex >= dbServers.size()) { + leaderIndex = 0; + } + } else { + do { + candidate = dbServers[followerIndex++]; + if (followerIndex >= dbServers.size()) { + followerIndex = 0; + } + } while (candidate == serverIds[0]); // mop: ignore leader + } + serverIds.push_back(candidate); + } + + // determine shard id + std::string shardId = "s" + StringUtils::itoa(id + i); + + shards->emplace(shardId, serverIds); + } + + return shards; +} + + + +//////////////////////////////////////////////////////////////////////////////// +/// @brief Clone shard distribution from other collection +//////////////////////////////////////////////////////////////////////////////// + +static std::shared_ptr>> +CloneShardDistribution(ClusterInfo* ci, LogicalCollection* col, + TRI_voc_cid_t cid) { + auto result = std::make_shared>>(); + TRI_ASSERT(cid != 0); + std::string cidString = arangodb::basics::StringUtils::itoa(cid); + std::shared_ptr other = + ci->getCollection(col->dbName(), cidString); + // The function guarantees that no nullptr is returned + TRI_ASSERT(other != nullptr); + + if (!other->distributeShardsLike().empty()) { + std::string const errorMessage = "Cannot distribute shards like '" + other->name() + "' it is already distributed like '" + other->distributeShardsLike() + "'."; + THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_CLUSTER_CHAIN_OF_DISTRIBUTESHARDSLIKE, errorMessage); + } + + // We need to replace the distribute with the cid. + col->distributeShardsLike(cidString); + + if (col->isSmart() && col->type() == TRI_COL_TYPE_EDGE) { + return result; + } + + if (col->replicationFactor() != other->replicationFactor()) { + col->replicationFactor(other->replicationFactor()); + } + + if (col->numberOfShards() != other->numberOfShards()) { + col->numberOfShards(other->numberOfShards()); + } + + auto shards = other->shardIds(); + auto shardList = ci->getShardList(cidString); + + auto numberOfShards = static_cast(col->numberOfShards()); + // fetch a unique id for each shard to create + uint64_t const id = ci->uniqid(numberOfShards); + for (uint64_t i = 0; i < numberOfShards; ++i) { + // determine responsible server(s) + std::string shardId = "s" + StringUtils::itoa(id + i); + auto it = shards->find(shardList->at(i)); + if (it == shards->end()) { + TRI_ASSERT(false); + THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "Inconsistency in shard distribution detected. Is in the process of self-healing. Please retry the operation again after some seconds."); + } + result->emplace(shardId, it->second); + } + return result; +} + + + //////////////////////////////////////////////////////////////////////////////// /// @brief creates a copy of all HTTP headers to forward //////////////////////////////////////////////////////////////////////////////// @@ -2437,80 +2569,6 @@ int flushWalOnAllDBServers(bool waitForSync, bool waitForCollector, double maxWa return TRI_ERROR_NO_ERROR; } -//////////////////////////////////////////////////////////////////////////////// -/// @brief compute a shard distribution for a new collection, the list -/// dbServers must be a list of DBserver ids to distribute across. -/// If this list is empty, the complete current list of DBservers is -/// fetched from ClusterInfo and with random_shuffle to mix it up. -//////////////////////////////////////////////////////////////////////////////// - -std::unordered_map> distributeShards( - uint64_t numberOfShards, - uint64_t replicationFactor, - std::vector& dbServers, - bool warnAboutReplicationFactor) { - - std::unordered_map> shards; - - ClusterInfo* ci = ClusterInfo::instance(); - ci->loadCurrentDBServers(); - if (dbServers.size() == 0) { - dbServers = ci->getCurrentDBServers(); - if (dbServers.empty()) { - return shards; - } - random_shuffle(dbServers.begin(), dbServers.end()); - } - - // mop: distribute satellite collections on all servers - if (replicationFactor == 0) { - replicationFactor = dbServers.size(); - } - - // fetch a unique id for each shard to create - uint64_t const id = ci->uniqid(numberOfShards); - - size_t leaderIndex = 0; - size_t followerIndex = 0; - for (uint64_t i = 0; i < numberOfShards; ++i) { - // determine responsible server(s) - std::vector serverIds; - for (uint64_t j = 0; j < replicationFactor; ++j) { - if (j >= dbServers.size()) { - if (warnAboutReplicationFactor) { - LOG_TOPIC(WARN, Logger::CLUSTER) - << "createCollectionCoordinator: replicationFactor is " - << "too large for the number of DBservers"; - } - break; - } - std::string candidate; - // mop: leader - if (serverIds.size() == 0) { - candidate = dbServers[leaderIndex++]; - if (leaderIndex >= dbServers.size()) { - leaderIndex = 0; - } - } else { - do { - candidate = dbServers[followerIndex++]; - if (followerIndex >= dbServers.size()) { - followerIndex = 0; - } - } while (candidate == serverIds[0]); // mop: ignore leader - } - serverIds.push_back(candidate); - } - - // determine shard id - std::string shardId = "s" + StringUtils::itoa(id + i); - - shards.emplace(shardId, serverIds); - } - - return shards; -} - #ifndef USE_ENTERPRISE std::unique_ptr ClusterMethods::createCollectionOnCoordinator( TRI_col_type_e collectionType, TRI_vocbase_t* vocbase, VPackSlice parameters, @@ -2533,18 +2591,15 @@ std::unique_ptr ClusterMethods::createCollectionOnCoordinator std::unique_ptr ClusterMethods::persistCollectionInAgency( LogicalCollection* col, bool ignoreDistributeShardsLikeErrors, bool waitForSyncReplication, bool enforceReplicationFactor, - VPackSlice parameters) { + VPackSlice) { std::string distributeShardsLike = col->distributeShardsLike(); std::vector avoid = col->avoidServers(); - size_t replicationFactor = col->replicationFactor(); - size_t numberOfShards = col->numberOfShards(); - std::string const replicationFactorStr("replicationFactor"); - std::string const numberOfShardsStr("numberOfShards"); ClusterInfo* ci = ClusterInfo::instance(); - std::vector dbServers; - + std::vector dbServers = ci->getCurrentDBServers(); + std::shared_ptr>> shards = nullptr; + if (!distributeShardsLike.empty()) { CollectionNameResolver resolver(col->vocbase()); @@ -2552,132 +2607,48 @@ std::unique_ptr ClusterMethods::persistCollectionInAgency( resolver.getCollectionIdCluster(distributeShardsLike); if (otherCid != 0) { - - bool chainOfDistributeShardsLike = false; - bool numberOfShardsConflict = false; - bool replicationFactorConflict = false; - std::string otherCidString - = arangodb::basics::StringUtils::itoa(otherCid); - - VPackBuilder builder; - { VPackObjectBuilder a(&builder); - col->toVelocyPack(builder,false); } - - try { - - std::shared_ptr other = - ci->getCollection(col->dbName(), otherCidString); - - size_t otherReplFactor = size_t(other->replicationFactor()); - - if (!col->isSmart()) { - if (parameters.hasKey(replicationFactorStr)) { - replicationFactor = parameters.get(replicationFactorStr).getNumber(); - if (otherReplFactor != replicationFactor) { - replicationFactor = otherReplFactor; - col->replicationFactor(static_cast(otherReplFactor)); - //replicationFactorConflict = true; - } - } else { - replicationFactor = otherReplFactor; - col->replicationFactor(static_cast(otherReplFactor)); - } - - size_t otherNumOfShards = size_t(other->numberOfShards()); - if (parameters.hasKey(numberOfShardsStr)) { - numberOfShards = parameters.get(numberOfShardsStr).getNumber(); - if (otherNumOfShards != numberOfShards) { - numberOfShards = otherNumOfShards; - col->replicationFactor(static_cast(otherNumOfShards)); - //numberOfShardsConflict = true; - } - } else { - numberOfShards = otherNumOfShards; - col->replicationFactor(static_cast(otherNumOfShards)); - } - - } - if (!other->distributeShardsLike().empty()) { - chainOfDistributeShardsLike = true; - } - - auto shards = other->shardIds(); - auto shardList = ci->getShardList(otherCidString); - - for (auto const& s : *shardList) { - auto it = shards->find(s); - if (it != shards->end()) { - for (auto const& s : it->second) { - dbServers.push_back(s); - } - } - } - - - - } catch (...) {} - - if (replicationFactorConflict) { - THROW_ARANGO_EXCEPTION( - TRI_ERROR_CLUSTER_DISTRIBUTE_SHARDS_LIKE_REPLICATION_FACTOR); - } - - if (numberOfShardsConflict) { - THROW_ARANGO_EXCEPTION( - TRI_ERROR_CLUSTER_DISTRIBUTE_SHARDS_LIKE_NUMBER_OF_SHARDS); - } - - if (chainOfDistributeShardsLike) { - THROW_ARANGO_EXCEPTION(TRI_ERROR_CLUSTER_CHAIN_OF_DISTRIBUTESHARDSLIKE); - } - - col->distributeShardsLike(otherCidString); + shards = CloneShardDistribution(ci, col, otherCid); } else { - dbServers = ci->getCurrentDBServers(); - if (ignoreDistributeShardsLikeErrors) { - col->distributeShardsLike(std::string()); - } else { - THROW_ARANGO_EXCEPTION(TRI_ERROR_CLUSTER_UNKNOWN_DISTRIBUTESHARDSLIKE); - } + THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_CLUSTER_UNKNOWN_DISTRIBUTESHARDSLIKE, + "Could not find collection " + distributeShardsLike + " to distribute shards like it."); } - } else if (!avoid.empty()) { - dbServers = ci->getCurrentDBServers(); - if (dbServers.size() - avoid.size() >= replicationFactor) { + } else { + // system collections should never enforce replicationfactor + // to allow them to come up with 1 dbserver + if (col->isSystem()) { + enforceReplicationFactor = false; + } + + size_t replicationFactor = col->replicationFactor(); + size_t numberOfShards = col->numberOfShards(); + + // the default behaviour however is to bail out and inform the user + // that the requested replicationFactor is not possible right now + if (enforceReplicationFactor && dbServers.size() < replicationFactor) { + THROW_ARANGO_EXCEPTION(TRI_ERROR_CLUSTER_INSUFFICIENT_DBSERVERS); + } + + if (!avoid.empty()) { + // We need to remove all servers that are in the avoid list + if (dbServers.size() - avoid.size() < replicationFactor) { + // Not enough DBServers left + THROW_ARANGO_EXCEPTION(TRI_ERROR_CLUSTER_INSUFFICIENT_DBSERVERS); + } dbServers.erase( std::remove_if( - dbServers.begin(), dbServers.end(), [&](const std::string&x) { + dbServers.begin(), dbServers.end(), [&](const std::string& x) { return std::find(avoid.begin(), avoid.end(), x) != avoid.end(); }), dbServers.end()); } std::random_shuffle(dbServers.begin(), dbServers.end()); - } else { - dbServers = ci->getCurrentDBServers(); + shards = DistributeShardsEvenly(ci, numberOfShards, replicationFactor, dbServers, !col->isSystem()); } - // system collections should never enforce replicationfactor - // to allow them to come up with 1 dbserver - if (enforceReplicationFactor && col->isSystem()) { - enforceReplicationFactor = false; - } - - // the default behaviour however is to bail out and inform the user - // that the requested replicationFactor is not possible right now - if (enforceReplicationFactor && dbServers.size() < replicationFactor) { - THROW_ARANGO_EXCEPTION(TRI_ERROR_CLUSTER_INSUFFICIENT_DBSERVERS); - } - - // If the list dbServers is still empty, it will be filled in - // distributeShards below. - - // Now create the shards: - bool warnAboutReplicationFactor = (!col->isSystem()); - auto shards = std::make_shared< - std::unordered_map>>( - arangodb::distributeShards(numberOfShards, replicationFactor, dbServers, warnAboutReplicationFactor)); if (shards->empty() && !col->isSmart()) { THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "no database servers found in cluster"); } + col->setShardMap(shards); std::unordered_set const ignoreKeys{ @@ -2690,7 +2661,8 @@ std::unique_ptr ClusterMethods::persistCollectionInAgency( std::string errorMsg; int myerrno = ci->createCollectionCoordinator( col->dbName(), col->cid_as_string(), - numberOfShards, replicationFactor, waitForSyncReplication, velocy.slice(), errorMsg, 240.0); + col->numberOfShards(), col->replicationFactor(), + waitForSyncReplication, velocy.slice(), errorMsg, 240.0); if (myerrno != TRI_ERROR_NO_ERROR) { if (errorMsg.empty()) { diff --git a/arangod/Cluster/ClusterMethods.h b/arangod/Cluster/ClusterMethods.h index a20a25a377..cc6785637d 100644 --- a/arangod/Cluster/ClusterMethods.h +++ b/arangod/Cluster/ClusterMethods.h @@ -248,20 +248,6 @@ int flushWalOnAllDBServers(bool waitForSync, bool waitForCollector, double maxWa int rotateActiveJournalOnAllDBServers(std::string const& dbname, std::string const& collname); -//////////////////////////////////////////////////////////////////////////////// -/// @brief compute a shard distribution for a new collection, the list -/// dbServers must be a list of DBserver ids to distribute across. -/// If this list is empty, the complete current list of DBservers is -/// fetched from ClusterInfo. If shuffle is true, a few random shuffles -/// are performed before the list is taken. Thus modifies the list. -//////////////////////////////////////////////////////////////////////////////// - -std::unordered_map> distributeShards( - uint64_t numberOfShards, - uint64_t replicationFactor, - std::vector& dbServers, - bool warnAboutReplicationFactor); - class ClusterMethods { public: // wrapper Class for static functions. diff --git a/js/server/tests/resilience/shard-distribution-spec.js b/js/server/tests/resilience/shard-distribution-spec.js index 36f08581d3..eeb8ea0ee2 100644 --- a/js/server/tests/resilience/shard-distribution-spec.js +++ b/js/server/tests/resilience/shard-distribution-spec.js @@ -29,7 +29,10 @@ const internal = require('internal'); const download = require('internal').download; const colName = "UnitTestDistributionTest"; const _ = require("lodash"); +const wait = require("internal").wait; const request = require('@arangodb/request'); +const endpointToURL = require("@arangodb/cluster").endpointToURL; +const coordinatorName = "Coordinator0001"; let coordinator = instanceInfo.arangods.filter(arangod => { return arangod.role === 'coordinator'; @@ -87,7 +90,6 @@ describe('Shard distribution', function () { serverCount += 1; } } - console.log("Found health records:", serverCount, health.Health, count); if (serverCount >= dbServerCount) { break; } @@ -155,4 +157,177 @@ describe('Shard distribution', function () { }); + describe("using distributeShardsLike", function () { + const followCollection = 'UnitTestDistributionFollower'; + const numberOfShards = 12; + + const cleanUp = function () { + internal.db._drop(followCollection); + }; + + const shardNumber = function (shard) { + // Each shard starts with 's' + expect(shard[0]).to.equal('s'); + // And is followed by a numeric value + const nr = parseInt(shard.slice(1)); + expect(nr).to.be.above(0); + return nr + }; + + const sortShardsNumericly = function (l, r) { + return shardNumber(l) - shardNumber(r); + }; + + const compareDistributions = function() { + const all = request.get(coordinator.url + '/_admin/cluster/shardDistribution'); + const dist = JSON.parse(all.body).results; + const orig = dist[colName].Current; + const fol = dist[followCollection].Current; + const origShards = Object.keys(orig).sort(sortShardsNumericly); + const folShards = Object.keys(fol).sort(sortShardsNumericly); + // Now we have all shard names sorted in alphabetical ordering. + // It needs to be guaranteed that leader + follower of each shard in this ordering is identical. + expect(origShards).to.have.length.of(folShards.length); + for (let i = 0; i < origShards.length; ++i) { + const oneOrigShard = orig[origShards[i]]; + const oneFolShard = fol[folShards[i]]; + // Leader has to be identical + expect(oneOrigShard.leader).to.equal(oneFolShard.leader); + // Follower Order does not matter, but needs to be the same servers + expect(oneOrigShard.followers.sort()).to.deep.equal(oneFolShard.followers.sort()); + } + }; + + describe("without replication", function () { + const replicationFactor = 1; + + + beforeEach(function () { + cleanUp(); + internal.db._create(colName, {replicationFactor, numberOfShards}); + }); + + afterEach(cleanUp); + + it("should create all shards on identical servers", function () { + internal.db._create(followCollection, {replicationFactor, numberOfShards, distributeShardsLike: colName}); + compareDistributions(); + }); + }); + + describe("with replication", function () { + const replicationFactor = 3; + // Note here: We have to make sure that numberOfShards * replicationFactor is not disible by the number of DBServers + + //////////////////////////////////////////////////////////////////////////////// + /// @brief order the cluster to clean out a server: + //////////////////////////////////////////////////////////////////////////////// + + const cleanOutServer = function (id) { + var coordEndpoint = + global.ArangoClusterInfo.getServerEndpoint(coordinatorName); + var url = endpointToURL(coordEndpoint); + var body = {"server": id}; + try { + return request({ method: "POST", + url: url + "/_admin/cluster/cleanOutServer", + body: JSON.stringify(body) }); + } catch (err) { + console.error( + "Exception for POST /_admin/cluster/cleanOutServer:", err.stack); + return false; + } + }; + + const getCleanedOutServers = function () { + const coordEndpoint = + global.ArangoClusterInfo.getServerEndpoint(coordinatorName); + const url = endpointToURL(coordEndpoint); + + try { + const envelope = + { method: "GET", url: url + "/_admin/cluster/numberOfServers" }; + let res = request(envelope); + var body = res.body; + if (typeof body === "string") { + body = JSON.parse(body); + } + return body; + } catch (err) { + console.error( + "Exception for POST /_admin/cluster/cleanOutServer:", err.stack); + return {}; + } + }; + + const waitForCleanout = function (id) { + let count = 600; + while (--count > 0) { + let obj = getCleanedOutServers(); + if (obj.cleanedServers.indexOf(id) >= 0) { + console.info( + "Success: Server " + id + " cleaned out after " + (600-count) + " seconds"); + return true; + } + wait(1.0); + } + console.error( + "Failed: Server " + id + " not cleaned out after 600 seconds"); + return false; + }; + + const waitForSynchronousReplication = function (collection) { + global.ArangoClusterInfo.flush(); + var cinfo = global.ArangoClusterInfo.getCollectionInfo( + "_system", collection); + var shards = Object.keys(cinfo.shards); + var replFactor = cinfo.shards[shards[0]].length; + var count = 0; + while (++count <= 180) { + var ccinfo = shards.map( + s => global.ArangoClusterInfo.getCollectionInfoCurrent( + "_system", collection, s) + ); + let replicas = ccinfo.map(s => s.servers); + if (_.every(replicas, x => x.length === replFactor)) { + return true; + } + wait(0.5); + global.ArangoClusterInfo.flush(); + } + console.error(`Collection "${collection}" failed to get all followers in sync after 60 sec`); + return false; + }; + + + beforeEach(function () { + cleanUp(); + internal.db._create(colName, {replicationFactor, numberOfShards}); + expect(waitForSynchronousReplication(colName)).to.equal(true); + }); + + afterEach(cleanUp); + + it("should create all shards and followers on identical servers", function () { + internal.db._create(followCollection, {replicationFactor, numberOfShards, distributeShardsLike: colName}); + expect(waitForSynchronousReplication(followCollection)).to.equal(true); + compareDistributions(); + }); + + it("should be resilient to a failover in the original collection", function () { + var server = global.ArangoClusterInfo.getDBServers()[1].serverId; + // Clean out the server that is scheduled second. + expect(cleanOutServer(server)).to.not.equal(false); + expect(waitForCleanout(server)).to.equal(true); + expect(waitForSynchronousReplication(colName)).to.equal(true); + // Now we have moved around some shards. + internal.db._create(followCollection, {replicationFactor, numberOfShards, distributeShardsLike: colName}); + expect(waitForSynchronousReplication(followCollection)).to.equal(true); + compareDistributions(); + }); + }); + + + }); + }); diff --git a/js/server/tests/shell/shell-collection-properties-cluster-spec.js b/js/server/tests/shell/shell-collection-properties-cluster-spec.js index 7b7dc11ae6..bcfa7cbfe5 100644 --- a/js/server/tests/shell/shell-collection-properties-cluster-spec.js +++ b/js/server/tests/shell/shell-collection-properties-cluster-spec.js @@ -63,9 +63,10 @@ function checkReplicationFactor(name, fac) { internal.sleep(0.5); } let current = ArangoAgency.get('Current/Collections/_system'); - let val = current.arango.Current.Collections['_system'][collectionId]; - throw "replicationFactor is not reflected properly in " + - "/Current/Collections/_system/" + collectionId + ": "+ JSON.stringify(val); + let val = current.arango.Current.Collections['_system'][collectionId]; + expect(true).to.equal(false, "Expected replicationFactor of " + fac + " in collection " + + name + " is not reflected properly in " + + "/Current/Collections/_system/" + collectionId + ": "+ JSON.stringify(val)); }; describe('Update collection properties', function() { @@ -87,13 +88,13 @@ describe('Update collection properties', function() { checkReplicationFactor(cn1, 1); const coll = db._collection(cn1); - + let props = coll.properties({replicationFactor: 2}); expect(props.replicationFactor).to.equal(2); checkReplicationFactor(cn1, 2); }); - + it('decrease replication factor ', function() { db._create(cn1, {replicationFactor: 2, numberOfShards: 2}, {waitForSyncReplication: true}); @@ -115,7 +116,7 @@ describe('Update collection properties', function() { try { const coll = db._collection(cn1); coll.properties({replicationFactor: -1}); - expect(false.replicationFactor).to.equal(true, + expect(false.replicationFactor).to.equal(true, "Was able to update replicationFactor of follower"); } catch(e) { expect(e.errorNum).to.equal(errors.ERROR_BAD_PARAMETER.code); @@ -124,7 +125,7 @@ describe('Update collection properties', function() { try { const coll = db._collection(cn1); coll.properties({replicationFactor: 100}); - expect(false.replicationFactor).to.equal(true, + expect(false.replicationFactor).to.equal(true, "Was able to update replicationFactor of follower"); } catch(e) { expect(e.errorNum).to.equal(errors.ERROR_BAD_PARAMETER.code); @@ -133,7 +134,7 @@ describe('Update collection properties', function() { try { const coll = db._collection(cn1); coll.properties({replicationFactor: "satellite"}); - expect(false.replicationFactor).to.equal(true, + expect(false.replicationFactor).to.equal(true, "Was able to update replicationFactor of follower"); } catch(e) { expect(e.errorNum).to.equal(errors.ERROR_FORBIDDEN.code); @@ -153,9 +154,9 @@ describe('Update collection properties with distributeShardsLike, ', function() db._useDatabase("_system"); try { - db._drop(cn2); + db._drop(cn2); } catch (e) {} - + try { db._drop(cn1); } catch (e) {} @@ -166,22 +167,22 @@ describe('Update collection properties with distributeShardsLike, ', function() db._create(cn2, {distributeShardsLike: cn1}, {waitForSyncReplication: true}); checkReplicationFactor(cn1, 1); - checkReplicationFactor(cn2, 1); + checkReplicationFactor(cn2, 1); const leader = db._collection(cn1); let props = leader.properties({replicationFactor: 2}); expect(props.replicationFactor).to.equal(2); checkReplicationFactor(cn1, 2); - checkReplicationFactor(cn2, 2); + checkReplicationFactor(cn2, 2); }); - + it('decrease replication factor', function() { db._create(cn1, {replicationFactor: 2, numberOfShards: 2}, {waitForSyncReplication: true}); db._create(cn2, {distributeShardsLike: cn1}, {waitForSyncReplication: true}); checkReplicationFactor(cn1, 2); - checkReplicationFactor(cn2, 2); + checkReplicationFactor(cn2, 2); const leader = db._collection(cn1); @@ -197,12 +198,12 @@ describe('Update collection properties with distributeShardsLike, ', function() db._create(cn2, {distributeShardsLike: cn1}, {waitForSyncReplication: true}); checkReplicationFactor(cn1, 2); - checkReplicationFactor(cn2, 2); - + checkReplicationFactor(cn2, 2); + try { const follower = db._collection(cn2); follower.properties({replicationFactor: 1}); - expect(false.replicationFactor).to.equal(true, + expect(false.replicationFactor).to.equal(true, "Was able to update replicationFactor of follower"); } catch(e) { expect(e.errorNum).to.equal(errors.ERROR_FORBIDDEN.code); @@ -219,10 +220,16 @@ describe('Replication factor constraints', function() { db._useDatabase("_system"); try { - db._drop(cn1); + // must be dropped first because cn1 is prototype for this collection + // and can only be dropped if all dependent collections are dropped first. + db._drop(cn2); + } catch (e) {} + + try { + db._drop(cn1); } catch (e) {} }); - + it('should not allow to create a collection with more replicas than dbservers available', function() { try { db._create(cn1, {replicationFactor: 5}); @@ -235,4 +242,27 @@ describe('Replication factor constraints', function() { it('should allow to create a collection with more replicas than dbservers when explicitly requested', function() { db._create(cn1, {replicationFactor: 5}, {enforceReplicationFactor: false}); }); -}); \ No newline at end of file + + it('check replication factor of system collections', function() { + ["_appbundles", "_apps", "_aqlfunctions", "_frontend", "_graphs", + "_iresearch_analyzers", "_jobs", "_modules", "_queues", "_routing", + "_statistics" , "_statistics15" , "_statisticsRaw" ,"_users" + ].forEach(name => { + if(name === "_graphs"){ + expect(db[name].properties()['replicationFactor']).to.equal(2); + } else if(db[name]){ + expect(db[name].properties()['replicationFactor']).to.equal(2); + expect(db[name].properties()['distributeShardsLike']).to.equal("_graphs"); + } + + }); + }); + + it('distributeShardsLike should ignore additional parameters', function() { + db._create(cn1, {replicationFactor: 2, numberOfShards: 2}, {waitForSyncReplication: true}); + db._create(cn2, {distributeShardsLike: cn1, replicationFactor: 5, numberOfShards: 99}, {waitForSyncReplication: true}); + expect(db[cn1].properties()['replicationFactor']).to.equal(db[cn2].properties()['replicationFactor']); + expect(db[cn1].properties()['numberOfShards']).to.equal(db[cn2].properties()['numberOfShards']); + expect(db[cn2].properties()['distributeShardsLike']).to.equal(cn1); + }); +});