diff --git a/arangod/Cluster/ClusterInfo.cpp b/arangod/Cluster/ClusterInfo.cpp index e3ddc9cfae..1a675e6e6a 100644 --- a/arangod/Cluster/ClusterInfo.cpp +++ b/arangod/Cluster/ClusterInfo.cpp @@ -224,6 +224,20 @@ void ClusterInfo::cleanup() { theInstance->_currentCollections.clear(); } +/// @brief produces an agency dump and logs it +void ClusterInfo::logAgencyDump() const { +#ifdef ARANGODB_ENABLE_MAINTAINER_MODE + AgencyComm ac; + AgencyCommResult ag = ac.getValues("/"); + + if (ag.successful()) { + LOG_TOPIC("fe8ce", INFO, Logger::CLUSTER) << "Agency dump:\n" << ag.slice().toJson(); + } else { + LOG_TOPIC("e7e30", WARN, Logger::CLUSTER) << "Could not get agency dump!"; + } +#endif +} + //////////////////////////////////////////////////////////////////////////////// /// @brief increase the uniqid value. if it exceeds the upper bound, fetch a /// new upper bound value from the agency @@ -1568,16 +1582,7 @@ Result ClusterInfo::dropDatabaseCoordinator( // drop database } if (TRI_microtime() > endTime) { - AgencyCommResult ag = ac.getValues("/"); - - if (ag.successful()) { - LOG_TOPIC("fe8ce", ERR, Logger::CLUSTER) << "Agency dump:\n" - << ag.slice().toJson(); - } else { - LOG_TOPIC("e7e30", ERR, Logger::CLUSTER) - << "Could not get agency dump!"; - } - + logAgencyDump(); return Result(TRI_ERROR_CLUSTER_TIMEOUT); } @@ -1604,30 +1609,28 @@ Result ClusterInfo::createCollectionCoordinator( // create collection std::vector infos{ ClusterCollectionCreationInfo{collectionID, numberOfShards, replicationFactor, minReplicationFactor, waitForReplication, json}}; - return createCollectionsCoordinator(databaseName, infos, timeout); -} - -Result ClusterInfo::createCollectionsCoordinator(std::string const& databaseName, - std::vector& infos, - double timeout) { - using arangodb::velocypack::Slice; - - AgencyComm ac; double const realTimeout = getTimeout(timeout); double const endTime = TRI_microtime() + realTimeout; - double const interval = getPollInterval(); - // We need to make sure our plan is up to date. - LOG_TOPIC("4315c", DEBUG, Logger::CLUSTER) - << "createCollectionCoordinator, loading Plan from agency..."; - loadPlan(); - // No matter how long this will take, we will not ourselfes trigger a plan relaoding. - for (auto& info : infos) { + return createCollectionsCoordinator(databaseName, infos, endTime); +} + +/// @brief this method does an atomic check of the preconditions for the collections +/// to be created, using the currently loaded plan. it populates the plan version +/// used for the checks +Result ClusterInfo::checkCollectionPreconditions(std::string const& databaseName, + std::vector const& infos, + uint64_t& planVersion) { + READ_LOCKER(readLocker, _planProt.lock); + + planVersion = _planVersion; + + for (auto const& info : infos) { // Check if name exists. if (info.name.empty() || !info.json.isObject() || !info.json.get("shards").isObject()) { return TRI_ERROR_BAD_PARAMETER; // must not be empty } - READ_LOCKER(readLocker, _planProt.lock); - // Validate that his collection does not exist + + // Validate that the collection does not exist in the current plan { AllCollections::const_iterator it = _plannedCollections.find(databaseName); if (it != _plannedCollections.end()) { @@ -1636,10 +1639,20 @@ Result ClusterInfo::createCollectionsCoordinator(std::string const& databaseName if (it2 != (*it).second.end()) { // collection already exists! events::CreateCollection(databaseName, info.name, TRI_ERROR_ARANGO_DUPLICATE_NAME); - return Result(TRI_ERROR_ARANGO_DUPLICATE_NAME); + return TRI_ERROR_ARANGO_DUPLICATE_NAME; + } + } else { + // no collection in plan for this particular database... this may be true for + // the first collection created in a db + // now check if there is a planned database at least + if (_plannedDatabases.find(databaseName) == _plannedDatabases.end()) { + // no need to create a collection in a database that is not there (anymore) + events::CreateCollection(databaseName, info.name, TRI_ERROR_ARANGO_DATABASE_NOT_FOUND); + return TRI_ERROR_ARANGO_DATABASE_NOT_FOUND; } } } + // Validate that there is no view with this name either { // check against planned views as well @@ -1650,26 +1663,23 @@ Result ClusterInfo::createCollectionsCoordinator(std::string const& databaseName if (it2 != (*it).second.end()) { // view already exists! events::CreateCollection(databaseName, info.name, TRI_ERROR_ARANGO_DUPLICATE_NAME); - return Result(TRI_ERROR_ARANGO_DUPLICATE_NAME); + return TRI_ERROR_ARANGO_DUPLICATE_NAME; } } } - LOG_TOPIC("66541", DEBUG, Logger::CLUSTER) - << "createCollectionCoordinator, checking things..."; - - // mop: why do these ask the agency instead of checking cluster info? - if (!ac.exists("Plan/Databases/" + databaseName)) { - events::CreateCollection(databaseName, info.name, TRI_ERROR_ARANGO_DATABASE_NOT_FOUND); - return TRI_ERROR_ARANGO_DATABASE_NOT_FOUND; - } - - if (ac.exists("Plan/Collections/" + databaseName + "/" + info.collectionID)) { - events::CreateCollection(databaseName, info.name, TRI_ERROR_CLUSTER_COLLECTION_ID_EXISTS); - return TRI_ERROR_CLUSTER_COLLECTION_ID_EXISTS; - } } + return {}; +} + +Result ClusterInfo::createCollectionsCoordinator(std::string const& databaseName, + std::vector& infos, + double endTime) { + using arangodb::velocypack::Slice; + + double const interval = getPollInterval(); + // The following three are used for synchronization between the callback // closure and the main thread executing this function. Note that it can // happen that the callback is called only after we return from this @@ -1681,8 +1691,9 @@ Result ClusterInfo::createCollectionsCoordinator(std::string const& databaseName auto cacheMutexOwner = std::make_shared>(); auto isCleaned = std::make_shared(false); + AgencyComm ac; std::vector> agencyCallbacks; - + auto cbGuard = scopeGuard([&] { // We have a subtle race here, that we try to cover against: // We register a callback in the agency. @@ -1701,8 +1712,8 @@ Result ClusterInfo::createCollectionsCoordinator(std::string const& databaseName _agencyCallbackRegistry->unregisterCallback(cb); } }); - std::vector opers({IncreaseVersion()}); + std::vector opers({IncreaseVersion()}); std::vector precs; std::unordered_set conditions; @@ -1862,25 +1873,35 @@ Result ClusterInfo::createCollectionsCoordinator(std::string const& databaseName AgencyPrecondition::Type::EMPTY, true)); } } + + // additionally ensure that no such collectionID exists yet in Plan/Collections + precs.emplace_back(AgencyPrecondition("Plan/Collections/" + databaseName + "/" + info.collectionID, + AgencyPrecondition::Type::EMPTY, true)); } + + // We need to make sure our plan is up to date. + LOG_TOPIC("f4b14", DEBUG, Logger::CLUSTER) + << "createCollectionCoordinator, loading Plan from agency..."; + + // load the plan, so we are up-to-date + loadPlan(); + uint64_t planVersion = 0; // will be populated by following function call + Result res = checkCollectionPreconditions(databaseName, infos, planVersion); + if (res.fail()) { + return res; + } + + + // now try to update the plan in the agency, using the current plan version as our + // precondition + { + // create a builder with just the version number for comparison + VPackBuilder versionBuilder; + versionBuilder.add(VPackValue(planVersion)); + + // add a precondition that checks the plan version has not yet changed + precs.emplace_back(AgencyPrecondition("Plan/Version", AgencyPrecondition::Type::VALUE, versionBuilder.slice())); - // We run a loop here to send the agency transaction, since there might - // be a precondition failed, in which case we want to retry for some time: - while (true) { - if (TRI_microtime() > endTime) { - for (auto const& info : infos) { - if (info.state != ClusterCollectionCreationInfo::DONE) { - LOG_TOPIC("a2184", ERR, Logger::CLUSTER) - << "Timeout in _create collection" - << ": database: " << databaseName << ", collId:" << info.collectionID - << "\njson: " << info.json.toString() - << "\ncould not send transaction to agency."; - events::CreateCollection(databaseName, info.name, - TRI_ERROR_CLUSTER_COULD_NOT_CREATE_COLLECTION_IN_PLAN); - } - } - return TRI_ERROR_CLUSTER_COULD_NOT_CREATE_COLLECTION_IN_PLAN; - } AgencyWriteTransaction transaction(opers, precs); { // we hold this mutex from now on until we have updated our cache @@ -1892,68 +1913,17 @@ Result ClusterInfo::createCollectionsCoordinator(std::string const& databaseName // Only if not precondition failed if (!res.successful()) { if (res.httpCode() == (int)arangodb::rest::ResponseCode::PRECONDITION_FAILED) { - auto result = res.slice(); - AgencyCommResult ag = ac.getValues("/"); - - if (result.isArray() && result.length() > 0) { - if (result[0].isObject()) { - auto tres = result[0]; - if (!tres.hasKey(std::vector( - {AgencyCommManager::path(), "Supervision"}))) { - for (auto const& info : infos) { - events::CreateCollection(databaseName, info.name, - TRI_ERROR_CLUSTER_COULD_NOT_CREATE_COLLECTION_IN_PLAN); - } - return Result(TRI_ERROR_CLUSTER_COULD_NOT_CREATE_COLLECTION_IN_PLAN); - } - - std::string errorMsg; - - for (auto const& s : - velocypack::ObjectIterator(tres.get(std::vector( - {AgencyCommManager::path(), "Supervision", "Shards"})))) { - errorMsg += std::string("Shard ") + s.key.copyString(); - errorMsg += - " of prototype collection is blocked by supervision job "; - errorMsg += s.value.copyString(); - } - for (auto const& info : infos) { - events::CreateCollection(databaseName, info.name, - TRI_ERROR_CLUSTER_COULD_NOT_CREATE_COLLECTION_IN_PLAN); - } - return Result( // result - TRI_ERROR_CLUSTER_COULD_NOT_CREATE_COLLECTION_IN_PLAN, // code - errorMsg // message - ); - } - } - - LOG_TOPIC("f6ecf", ERR, Logger::CLUSTER) - << "Precondition failed for this agency transaction: " - << transaction.toJson() << ", return code: " << res.httpCode(); - - if (ag.successful()) { - LOG_TOPIC("de7f8", ERR, Logger::CLUSTER) << "Agency dump:\n" - << ag.slice().toJson(); - } else { - LOG_TOPIC("4b178", ERR, Logger::CLUSTER) - << "Could not get agency dump!"; - } - - // Agency is currently unhappy, try again in a few seconds: - std::this_thread::sleep_for(std::chrono::seconds(5)); - - continue; - } - std::string errorMsg = ""; - errorMsg += std::string("file: ") + __FILE__ + " line: " + std::to_string(__LINE__); - errorMsg += " HTTP code: " + std::to_string(res.httpCode()); + // use this special error code to signal that we got a precondition failure + // in this case the caller can try again with an updated version of the plan change + return {TRI_ERROR_REQUEST_CANCELED, "operation aborted due to precondition failure"}; + } + + std::string errorMsg = "HTTP code: " + std::to_string(res.httpCode()); errorMsg += " error message: " + res.errorMessage(); errorMsg += " error details: " + res.errorDetails(); errorMsg += " body: " + res.body(); for (auto const& info : infos) { - events::CreateCollection(databaseName, info.name, - TRI_ERROR_CLUSTER_COULD_NOT_CREATE_COLLECTION_IN_PLAN); + events::CreateCollection(databaseName, info.name, TRI_ERROR_CLUSTER_COULD_NOT_CREATE_COLLECTION_IN_PLAN); } return {TRI_ERROR_CLUSTER_COULD_NOT_CREATE_COLLECTION_IN_PLAN, std::move(errorMsg)}; } @@ -1961,7 +1931,6 @@ Result ClusterInfo::createCollectionsCoordinator(std::string const& databaseName // Update our cache: loadPlan(); } - break; // Leave loop, since we are done } LOG_TOPIC("98bca", DEBUG, Logger::CLUSTER) @@ -1978,16 +1947,7 @@ Result ClusterInfo::createCollectionsCoordinator(std::string const& databaseName } // Get a full agency dump for debugging - { - AgencyCommResult ag = ac.getValues(""); - if (ag.successful()) { - LOG_TOPIC("ab229", ERR, Logger::CLUSTER) << "Agency dump:\n" - << ag.slice().toJson(); - } else { - LOG_TOPIC("2c83c", ERR, Logger::CLUSTER) - << "Could not get agency dump!"; - } - } + logAgencyDump(); if (tmpRes <= TRI_ERROR_NO_ERROR) { tmpRes = TRI_ERROR_CLUSTER_TIMEOUT; @@ -2029,11 +1989,9 @@ Result ClusterInfo::createCollectionsCoordinator(std::string const& databaseName return res.errorCode(); } if (tmpRes > TRI_ERROR_NO_ERROR) { - { - // We do not need to lock all condition variables - // we are save by cacheMutex - cbGuard.fire(); - } + // We do not need to lock all condition variables + // we are safe by using cacheMutex + cbGuard.fire(); // report error for (auto const& info : infos) { @@ -2176,17 +2134,31 @@ Result ClusterInfo::dropCollectionCoordinator( // drop collection "/shards"); if (res.successful()) { - velocypack::Slice shards = res.slice()[0].get(std::vector( - {AgencyCommManager::path(), "Plan", "Collections", dbName, collectionID, - "shards"})); - if (shards.isObject()) { - numberOfShards = shards.length(); + velocypack::Slice databaseSlice = res.slice()[0].get(std::vector( + {AgencyCommManager::path(), "Plan", "Collections", dbName })); + + if (!databaseSlice.isObject()) { + // database dropped in the meantime + events::DropCollection(dbName, collectionID, TRI_ERROR_ARANGO_DATABASE_NOT_FOUND); + return TRI_ERROR_ARANGO_DATABASE_NOT_FOUND; + } + + velocypack::Slice collectionSlice = databaseSlice.get(collectionID); + if (!collectionSlice.isObject()) { + // collection dropped in the meantime + events::DropCollection(dbName, collectionID, TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND); + return TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND; + } + + velocypack::Slice shardsSlice = collectionSlice.get("shards"); + if (shardsSlice.isObject()) { + numberOfShards = shardsSlice.length(); } else { LOG_TOPIC("d340d", ERR, Logger::CLUSTER) << "Missing shards information on dropping " << dbName << "/" << collectionID; events::DropCollection(dbName, collectionID, TRI_ERROR_ARANGO_DATABASE_NOT_FOUND); - return Result(TRI_ERROR_ARANGO_DATABASE_NOT_FOUND); + return Result(TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND); } } @@ -2206,15 +2178,10 @@ Result ClusterInfo::dropCollectionCoordinator( // drop collection << ", return code: " << res.httpCode(); } - AgencyCommResult ag = ac.getValues(""); - - if (ag.successful()) { - LOG_TOPIC("53e01", ERR, Logger::CLUSTER) << "Agency dump:\n" - << ag.slice().toJson(); - } else { - LOG_TOPIC("f1bfb", ERR, Logger::CLUSTER) << "Could not get agency dump!"; - } + logAgencyDump(); + // TODO: this should rather be TRI_ERROR_ARANGO_DATABASE_NOT_FOUND, as the + // precondition is that the database still exists events::DropCollection(dbName, collectionID, TRI_ERROR_CLUSTER_COULD_NOT_DROP_COLLECTION); return Result(TRI_ERROR_CLUSTER_COULD_NOT_DROP_COLLECTION); } @@ -2248,15 +2215,8 @@ Result ClusterInfo::dropCollectionCoordinator( // drop collection << "Timeout in _drop collection (" << realTimeout << ")" << ": database: " << dbName << ", collId:" << collectionID << "\ntransaction sent to agency: " << trans.toJson(); - AgencyCommResult ag = ac.getValues(""); - if (ag.successful()) { - LOG_TOPIC("803c8", ERR, Logger::CLUSTER) << "Agency dump:\n" - << ag.slice().toJson(); - } else { - LOG_TOPIC("37297", ERR, Logger::CLUSTER) - << "Could not get agency dump!"; - } + logAgencyDump(); events::DropCollection(dbName, collectionID, TRI_ERROR_CLUSTER_TIMEOUT); return Result(TRI_ERROR_CLUSTER_TIMEOUT); @@ -2415,15 +2375,8 @@ Result ClusterInfo::createViewCoordinator( // create view if (!res.successful()) { if (res.httpCode() == (int)arangodb::rest::ResponseCode::PRECONDITION_FAILED) { // Dump agency plan: - auto const ag = ac.getValues("/"); - - if (ag.successful()) { - LOG_TOPIC("d3aac", ERR, Logger::CLUSTER) << "Agency dump:\n" - << ag.slice().toJson(); - } else { - LOG_TOPIC("69f86", ERR, Logger::CLUSTER) - << "Could not get agency dump!"; - } + + logAgencyDump(); events::CreateView(databaseName, name, TRI_ERROR_CLUSTER_COULD_NOT_CREATE_VIEW_IN_PLAN); return Result( // result @@ -2481,15 +2434,7 @@ Result ClusterInfo::dropViewCoordinator( // drop view " already exist failed. Cannot create view."); // Dump agency plan: - auto const ag = ac.getValues("/"); - - if (ag.successful()) { - LOG_TOPIC("8a7e8", ERR, Logger::CLUSTER) << "Agency dump:\n" - << ag.slice().toJson(); - } else { - LOG_TOPIC("a7261", ERR, Logger::CLUSTER) - << "Could not get agency dump!"; - } + logAgencyDump(); } else { result = Result( // result TRI_ERROR_CLUSTER_COULD_NOT_REMOVE_COLLECTION_IN_PLAN, // FIXME COULD_NOT_REMOVE_VIEW_IN_PLAN @@ -2524,14 +2469,7 @@ Result ClusterInfo::setViewPropertiesCoordinator(std::string const& databaseName {AgencyCommManager::path(), "Plan", "Views", databaseName, viewID}); if (!view.isObject()) { - auto const ag = ac.getValues(""); - - if (ag.successful()) { - LOG_TOPIC("eabbe", ERR, Logger::CLUSTER) << "Agency dump:\n" - << ag.slice().toJson(); - } else { - LOG_TOPIC("5f212", ERR, Logger::CLUSTER) << "Could not get agency dump!"; - } + logAgencyDump(); return {TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND}; } diff --git a/arangod/Cluster/ClusterInfo.h b/arangod/Cluster/ClusterInfo.h index 29a56c4258..6e05990d82 100644 --- a/arangod/Cluster/ClusterInfo.h +++ b/arangod/Cluster/ClusterInfo.h @@ -298,6 +298,9 @@ class ClusterInfo final { static void cleanup(); public: + /// @brief produces an agency dump and logs it + void logAgencyDump() const; + ////////////////////////////////////////////////////////////////////////////// /// @brief get a number of cluster-wide unique IDs, returns the first /// one and guarantees that are reserved for the caller. @@ -420,18 +423,22 @@ class ClusterInfo final { bool waitForReplication, arangodb::velocypack::Slice const& json, double timeout // request timeout ); + + /// @brief this method does an atomic check of the preconditions for the collections + /// to be created, using the currently loaded plan. it populates the plan version + /// used for the checks + Result checkCollectionPreconditions(std::string const& databaseName, + std::vector const& infos, + uint64_t& planVersion); - ////////////////////////////////////////////////////////////////////////////// /// @brief create multiple collections in coordinator /// If any one of these collections fails, all creations will be /// rolled back. - ////////////////////////////////////////////////////////////////////////////// - + /// Note that in contrast to most other methods here, this method does not + /// get a timeout parameter, but an endTime parameter!!! Result createCollectionsCoordinator(std::string const& databaseName, - std::vector&, - double timeout); + std::vector&, double endTime); - ////////////////////////////////////////////////////////////////////////////// /// @brief drop collection in coordinator ////////////////////////////////////////////////////////////////////////////// Result dropCollectionCoordinator( // drop collection @@ -656,21 +663,21 @@ class ClusterInfo final { * @return List of DB servers serving the shard */ arangodb::Result getShardServers(ShardID const& shardId, std::vector&); - - private: - void loadClusterId(); - + ////////////////////////////////////////////////////////////////////////////// /// @brief get an operation timeout ////////////////////////////////////////////////////////////////////////////// - double getTimeout(double timeout) const { + static double getTimeout(double timeout) { if (timeout == 0.0) { return 24.0 * 3600.0; } return timeout; } + private: + void loadClusterId(); + ////////////////////////////////////////////////////////////////////////////// /// @brief get the poll interval ////////////////////////////////////////////////////////////////////////////// diff --git a/arangod/Cluster/ClusterMethods.cpp b/arangod/Cluster/ClusterMethods.cpp index a4c9a59181..d8e8e522bc 100644 --- a/arangod/Cluster/ClusterMethods.cpp +++ b/arangod/Cluster/ClusterMethods.cpp @@ -2881,113 +2881,156 @@ std::vector> ClusterMethods::persistCollectio std::vector>& collections, bool ignoreDistributeShardsLikeErrors, bool waitForSyncReplication, bool enforceReplicationFactor) { + TRI_ASSERT(!collections.empty()); if (collections.empty()) { THROW_ARANGO_EXCEPTION_MESSAGE( TRI_ERROR_INTERNAL, "Trying to create an empty list of collections on coordinator."); } - // We have at least one, take this collections DB name - auto& dbName = collections[0]->vocbase().name(); + + double const realTimeout = ClusterInfo::getTimeout(240.0); + double const endTime = TRI_microtime() + realTimeout; + + // We have at least one, take this collection's DB name + // (if there are multiple collections to create, the assumption is that + // all collections have the same database name - ArangoDB does not + // support cross-database operations and they cannot be triggered by + // users) + auto const dbName = collections[0]->vocbase().name(); ClusterInfo* ci = ClusterInfo::instance(); - ci->loadCurrentDBServers(); - std::vector dbServers = ci->getCurrentDBServers(); + std::vector infos; - std::vector>> vpackData; - infos.reserve(collections.size()); - vpackData.reserve(collections.size()); - for (auto& col : collections) { - // We can only serve on Database at a time with this call. - // We have the vocbase context around this calls anyways, so this is save. - TRI_ASSERT(col->vocbase().name() == dbName); - std::string distributeShardsLike = col->distributeShardsLike(); - std::vector avoid = col->avoidServers(); - std::shared_ptr>> shards = nullptr; - if (!distributeShardsLike.empty()) { - CollectionNameResolver resolver(col->vocbase()); - TRI_voc_cid_t otherCid = resolver.getCollectionIdCluster(distributeShardsLike); + while (true) { + infos.clear(); + + ci->loadCurrentDBServers(); + std::vector dbServers = ci->getCurrentDBServers(); + infos.reserve(collections.size()); + + std::vector>> vpackData; + vpackData.reserve(collections.size()); + for (auto& col : collections) { + // We can only serve on Database at a time with this call. + // We have the vocbase context around this calls anyways, so this is save. + TRI_ASSERT(col->vocbase().name() == dbName); + std::string distributeShardsLike = col->distributeShardsLike(); + std::vector avoid = col->avoidServers(); + std::shared_ptr>> shards = nullptr; - if (otherCid != 0) { - shards = CloneShardDistribution(ci, col.get(), otherCid); + if (!distributeShardsLike.empty()) { + CollectionNameResolver resolver(col->vocbase()); + TRI_voc_cid_t otherCid = resolver.getCollectionIdCluster(distributeShardsLike); + + if (otherCid != 0) { + shards = CloneShardDistribution(ci, col.get(), otherCid); + } else { + THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_CLUSTER_UNKNOWN_DISTRIBUTESHARDSLIKE, + "Could not find collection " + distributeShardsLike + + " to distribute shards like it."); + } } else { - THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_CLUSTER_UNKNOWN_DISTRIBUTESHARDSLIKE, - "Could not find collection " + distributeShardsLike + - " to distribute shards like it."); - } - } else { - // system collections should never enforce replicationfactor - // to allow them to come up with 1 dbserver - if (col->system()) { - enforceReplicationFactor = false; - } - - size_t replicationFactor = col->replicationFactor(); - size_t minReplicationFactor = col->minReplicationFactor(); - size_t numberOfShards = col->numberOfShards(); - - // the default behavior however is to bail out and inform the user - // that the requested replicationFactor is not possible right now - if (dbServers.size() < replicationFactor) { - TRI_ASSERT(minReplicationFactor <= replicationFactor); - // => (dbServers.size() < minReplicationFactor) is granted - LOG_TOPIC("9ce2e", DEBUG, Logger::CLUSTER) - << "Do not have enough DBServers for requested replicationFactor," - << " nrDBServers: " << dbServers.size() - << " replicationFactor: " << replicationFactor; - if (enforceReplicationFactor) { - THROW_ARANGO_EXCEPTION(TRI_ERROR_CLUSTER_INSUFFICIENT_DBSERVERS); + // system collections should never enforce replicationfactor + // to allow them to come up with 1 dbserver + if (col->system()) { + enforceReplicationFactor = false; } - } - if (!avoid.empty()) { - // We need to remove all servers that are in the avoid list - if (dbServers.size() - avoid.size() < replicationFactor) { + size_t replicationFactor = col->replicationFactor(); + size_t minReplicationFactor = col->minReplicationFactor(); + size_t numberOfShards = col->numberOfShards(); + + // the default behavior however is to bail out and inform the user + // that the requested replicationFactor is not possible right now + if (dbServers.size() < replicationFactor) { TRI_ASSERT(minReplicationFactor <= replicationFactor); - // => (dbServers.size() - avoid.size() < minReplicationFactor) is granted - LOG_TOPIC("03682", DEBUG, Logger::CLUSTER) + // => (dbServers.size() < minReplicationFactor) is granted + LOG_TOPIC("9ce2e", DEBUG, Logger::CLUSTER) << "Do not have enough DBServers for requested replicationFactor," - << " (after considering avoid list)," - << " nrDBServers: " << dbServers.size() << " replicationFactor: " << replicationFactor - << " avoid list size: " << avoid.size(); - // Not enough DBServers left - THROW_ARANGO_EXCEPTION(TRI_ERROR_CLUSTER_INSUFFICIENT_DBSERVERS); + << " nrDBServers: " << dbServers.size() + << " replicationFactor: " << replicationFactor; + if (enforceReplicationFactor) { + THROW_ARANGO_EXCEPTION(TRI_ERROR_CLUSTER_INSUFFICIENT_DBSERVERS); + } } - dbServers.erase(std::remove_if(dbServers.begin(), dbServers.end(), - [&](const std::string& x) { - return std::find(avoid.begin(), avoid.end(), - x) != avoid.end(); - }), - dbServers.end()); + + if (!avoid.empty()) { + // We need to remove all servers that are in the avoid list + if (dbServers.size() - avoid.size() < replicationFactor) { + LOG_TOPIC("03682", DEBUG, Logger::CLUSTER) + << "Do not have enough DBServers for requested replicationFactor," + << " (after considering avoid list)," + << " nrDBServers: " << dbServers.size() << " replicationFactor: " << replicationFactor + << " avoid list size: " << avoid.size(); + // Not enough DBServers left + THROW_ARANGO_EXCEPTION(TRI_ERROR_CLUSTER_INSUFFICIENT_DBSERVERS); + } + dbServers.erase(std::remove_if(dbServers.begin(), dbServers.end(), + [&](const std::string& x) { + return std::find(avoid.begin(), avoid.end(), + x) != avoid.end(); + }), + dbServers.end()); + } + std::random_shuffle(dbServers.begin(), dbServers.end()); + shards = DistributeShardsEvenly(ci, numberOfShards, replicationFactor, + dbServers, !col->system()); } - std::random_shuffle(dbServers.begin(), dbServers.end()); - shards = DistributeShardsEvenly(ci, numberOfShards, replicationFactor, - dbServers, !col->system()); + + if (shards->empty() && !col->isSmart()) { + THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, + "no database servers found in cluster"); + } + + col->setShardMap(shards); + + std::unordered_set const ignoreKeys{ + "allowUserKeys", "cid", "globallyUniqueId", "count", + "planId", "version", "objectId"}; + col->setStatus(TRI_VOC_COL_STATUS_LOADED); + VPackBuilder velocy = + col->toVelocyPackIgnore(ignoreKeys, LogicalDataSource::makeFlags()); + + infos.emplace_back( + ClusterCollectionCreationInfo{std::to_string(col->id()), + col->numberOfShards(), col->replicationFactor(), + col->minReplicationFactor(), + waitForSyncReplication, velocy.slice()}); + vpackData.emplace_back(velocy.steal()); } - if (shards->empty() && !col->isSmart()) { - THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, - "no database servers found in cluster"); + // pass in the *endTime* here, not a timeout! + Result res = ci->createCollectionsCoordinator(dbName, infos, endTime); + + if (res.ok()) { + // success! exit the loop and go on + break; } + + if (res.is(TRI_ERROR_REQUEST_CANCELED)) { + // special error code indicating that storing the updated plan in the agency + // didn't succeed, and that we should try again + + // sleep for a while + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + + if (TRI_microtime() > endTime) { + // timeout expired + THROW_ARANGO_EXCEPTION(TRI_ERROR_CLUSTER_TIMEOUT); + } + + if (arangodb::application_features::ApplicationServer::isStopping()) { + THROW_ARANGO_EXCEPTION(TRI_ERROR_SHUTTING_DOWN); + } + + // try in next iteration with an adjusted plan change attempt + continue; - col->setShardMap(shards); - - std::unordered_set const ignoreKeys{ - "allowUserKeys", "cid", "globallyUniqueId", "count", - "planId", "version", "objectId"}; - col->setStatus(TRI_VOC_COL_STATUS_LOADED); - VPackBuilder velocy = - col->toVelocyPackIgnore(ignoreKeys, LogicalDataSource::makeFlags()); - - infos.emplace_back(ClusterCollectionCreationInfo{ - std::to_string(col->id()), col->numberOfShards(), col->replicationFactor(), - col->minReplicationFactor(), waitForSyncReplication, velocy.slice()}); - vpackData.emplace_back(velocy.steal()); - } - - Result res = ci->createCollectionsCoordinator(dbName, infos, 240.0); - if (res.fail()) { - THROW_ARANGO_EXCEPTION(res); + } else { + // any other error + THROW_ARANGO_EXCEPTION(res); + } } ci->loadPlan();