diff --git a/CHANGELOG b/CHANGELOG index cca921af0f..ea6c60d9db 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,15 +1,18 @@ v3.4.8 (XXXX-XX-XX) ------------------- +* Fixed some races in cluster collection creation, which allowed collections with the + same name to be created in parallel under some rare conditions. + * arangoimport would not stop, much less report, communications errors. Add CSV reporting - of line numbers that are impacted during such errors + of line numbers that are impacted during such errors. * Fixed a bug which could lead to some unnecessary HTTP requests during an AQL query in a cluster. Only occurs with views in the query. * Prevent rare cases of duplicate DDL actions being executed by Maintenance. -* coordinator code was reporting rocksdb error codes, but not the associated detail message. +* Coordinator code was reporting rocksdb error codes, but not the associated detail message. Corrected. * Fixed some error reporting and logging in Maintenance. diff --git a/arangod/Cluster/ClusterInfo.cpp b/arangod/Cluster/ClusterInfo.cpp index 119d2637e4..93a01d082d 100644 --- a/arangod/Cluster/ClusterInfo.cpp +++ b/arangod/Cluster/ClusterInfo.cpp @@ -277,6 +277,20 @@ void ClusterInfo::cleanup() { theInstance->_currentCollections.clear(); } +/// @brief produces an agency dump and logs it +void ClusterInfo::logAgencyDump() const { +#ifdef ARANGODB_ENABLE_MAINTAINER_MODE + AgencyComm ac; + AgencyCommResult ag = ac.getValues("/"); + + if (ag.successful()) { + LOG_TOPIC(INFO, Logger::CLUSTER) << "Agency dump:\n" << ag.slice().toJson(); + } else { + LOG_TOPIC(WARN, Logger::CLUSTER) << "Could not get agency dump!"; + } +#endif +} + //////////////////////////////////////////////////////////////////////////////// /// @brief increase the uniqid value. if it exceeds the upper bound, fetch a /// new upper bound value from the agency @@ -1529,14 +1543,7 @@ int ClusterInfo::dropDatabaseCoordinator(std::string const& name, } if (TRI_microtime() > endTime) { - AgencyCommResult ag = ac.getValues("/"); - if (ag.successful()) { - LOG_TOPIC(ERR, Logger::CLUSTER) << "Agency dump:\n" - << ag.slice().toJson(); - } else { - LOG_TOPIC(ERR, Logger::CLUSTER) << "Could not get agency dump!"; - } - + logAgencyDump(); return setErrormsg(TRI_ERROR_CLUSTER_TIMEOUT, errorMsg); } @@ -1561,7 +1568,9 @@ int ClusterInfo::createCollectionCoordinator( std::vector infos{ ClusterCollectionCreationInfo{collectionID, numberOfShards, replicationFactor, waitForReplication, json}}; - Result res = createCollectionsCoordinator(databaseName, infos, timeout); + double const realTimeout = getTimeout(timeout); + double const endTime = TRI_microtime() + realTimeout; + Result res = createCollectionsCoordinator(databaseName, infos, endTime); if (res.fail()) { errorMsg = res.errorMessage(); return res.errorNumber(); @@ -1569,28 +1578,23 @@ int ClusterInfo::createCollectionCoordinator( return TRI_ERROR_NO_ERROR; } -Result ClusterInfo::createCollectionsCoordinator(std::string const& databaseName, - std::vector& infos, - double timeout) { - using arangodb::velocypack::Slice; +/// @brief this method does an atomic check of the preconditions for the collections +/// to be created, using the currently loaded plan. it populates the plan version +/// used for the checks +Result ClusterInfo::checkCollectionPreconditions(std::string const& databaseName, + std::vector const& infos, + uint64_t& planVersion) { + READ_LOCKER(readLocker, _planProt.lock); + + planVersion = _planVersion; - AgencyComm ac; - - double const realTimeout = getTimeout(timeout); - double const endTime = TRI_microtime() + realTimeout; - double const interval = getPollInterval(); - // We need to make sure our plan is up to date. - LOG_TOPIC(DEBUG, Logger::CLUSTER) - << "createCollectionCoordinator, loading Plan from agency..."; - loadPlan(); - // No matter how long this will take, we will not ourselfes trigger a plan relaoding. - for (auto& info : infos) { + for (auto const& info : infos) { // Check if name exists. if (info.name.empty() || !info.json.isObject() || !info.json.get("shards").isObject()) { return TRI_ERROR_BAD_PARAMETER; // must not be empty } - READ_LOCKER(readLocker, _planProt.lock); - // Validate that his collection does not exist + + // Validate that the collection does not exist in the current plan { AllCollections::const_iterator it = _plannedCollections.find(databaseName); if (it != _plannedCollections.end()) { @@ -1601,8 +1605,18 @@ Result ClusterInfo::createCollectionsCoordinator(std::string const& databaseName events::CreateCollection(info.name, TRI_ERROR_ARANGO_DUPLICATE_NAME); return TRI_ERROR_ARANGO_DUPLICATE_NAME; } + } else { + // no collection in plan for this particular database... this may be true for + // the first collection created in a db + // now check if there is a planned database at least + if (_plannedDatabases.find(databaseName) == _plannedDatabases.end()) { + // no need to create a collection in a database that is not there (anymore) + events::CreateCollection(info.name, TRI_ERROR_ARANGO_DATABASE_NOT_FOUND); + return TRI_ERROR_ARANGO_DATABASE_NOT_FOUND; + } } } + // Validate that there is no view with this name either { // check against planned views as well @@ -1618,21 +1632,23 @@ Result ClusterInfo::createCollectionsCoordinator(std::string const& databaseName } } - LOG_TOPIC(DEBUG, Logger::CLUSTER) - << "createCollectionCoordinator, checking things..."; - - // mop: why do these ask the agency instead of checking cluster info? - if (!ac.exists("Plan/Databases/" + databaseName)) { - events::CreateCollection(info.name, TRI_ERROR_ARANGO_DATABASE_NOT_FOUND); - return TRI_ERROR_ARANGO_DATABASE_NOT_FOUND; - } - - if (ac.exists("Plan/Collections/" + databaseName + "/" + info.collectionID)) { - events::CreateCollection(info.name, TRI_ERROR_CLUSTER_COLLECTION_ID_EXISTS); - return TRI_ERROR_CLUSTER_COLLECTION_ID_EXISTS; - } } + return {}; +} + +/// @brief create multiple collections in coordinator +/// If any one of these collections fails, all creations will be +/// rolled back. +/// Note that in contrast to most other methods here, this method does not +/// get a timeout parameter, but an endTime parameter!!! +Result ClusterInfo::createCollectionsCoordinator(std::string const& databaseName, + std::vector& infos, + double endTime) { + using arangodb::velocypack::Slice; + + double const interval = getPollInterval(); + // The following three are used for synchronization between the callback // closure and the main thread executing this function. Note that it can // happen that the callback is called only after we return from this @@ -1644,8 +1660,9 @@ Result ClusterInfo::createCollectionsCoordinator(std::string const& databaseName auto cacheMutexOwner = std::make_shared>(); auto isCleaned = std::make_shared(false); + AgencyComm ac; std::vector> agencyCallbacks; - + auto cbGuard = scopeGuard([&] { // We have a subtle race here, that we try to cover against: // We register a callback in the agency. @@ -1664,17 +1681,17 @@ Result ClusterInfo::createCollectionsCoordinator(std::string const& databaseName _agencyCallbackRegistry->unregisterCallback(cb); } }); - std::vector opers({IncreaseVersion()}); + std::vector opers({IncreaseVersion()}); std::vector precs; std::unordered_set conditions; - + // current thread owning 'cacheMutex' write lock (workaround for non-recursive Mutex) for (auto& info : infos) { TRI_ASSERT(!info.name.empty()); if (info.state == ClusterCollectionCreationInfo::State::DONE) { - // This is possible in Enterprise / Smart Collection situation + // This is possible in Enterprise / Smart collection situation (*nrDone)++; } // The AgencyCallback will copy the closure will take responsibilty of it. @@ -1825,23 +1842,35 @@ Result ClusterInfo::createCollectionsCoordinator(std::string const& databaseName AgencyPrecondition::Type::EMPTY, true)); } } + + // additionally ensure that no such collectionID exists yet in Plan/Collections + precs.emplace_back(AgencyPrecondition("Plan/Collections/" + databaseName + "/" + info.collectionID, + AgencyPrecondition::Type::EMPTY, true)); } - // We run a loop here to send the agency transaction, since there might - // be a precondition failed, in which case we want to retry for some time: - while (true) { - if (TRI_microtime() > endTime) { - for (auto info : infos) { - if (info.state != ClusterCollectionCreationInfo::DONE) { - LOG_TOPIC(ERR, Logger::CLUSTER) - << "Timeout in _create collection" - << ": database: " << databaseName << ", collId:" << info.collectionID - << "\njson: " << info.json.toString() - << "\ncould not send transaction to agency."; - } - } - return TRI_ERROR_CLUSTER_COULD_NOT_CREATE_COLLECTION_IN_PLAN; - } + // We need to make sure our plan is up to date. + LOG_TOPIC(DEBUG, Logger::CLUSTER) + << "createCollectionCoordinator, loading Plan from agency..."; + + // load the plan, so we are up-to-date + loadPlan(); + uint64_t planVersion = 0; // will be populated by following function call + Result res = checkCollectionPreconditions(databaseName, infos, planVersion); + if (res.fail()) { + return res; + } + + + // now try to update the plan in the agency, using the current plan version as our + // precondition + { + // create a builder with just the version number for comparison + VPackBuilder versionBuilder; + versionBuilder.add(VPackValue(planVersion)); + + // add a precondition that checks the plan version has not yet changed + precs.emplace_back(AgencyPrecondition("Plan/Version", AgencyPrecondition::Type::VALUE, versionBuilder.slice())); + AgencyWriteTransaction transaction(opers, precs); { // we hold this mutex from now on until we have updated our cache @@ -1853,61 +1882,28 @@ Result ClusterInfo::createCollectionsCoordinator(std::string const& databaseName // Only if not precondition failed if (!res.successful()) { if (res.httpCode() == (int)arangodb::rest::ResponseCode::PRECONDITION_FAILED) { - auto result = res.slice(); - AgencyCommResult ag = ac.getValues("/"); - - if (result.isArray() && result.length() > 0) { - if (result[0].isObject()) { - auto tres = result[0]; - std::string errorMsg = ""; - if (tres.hasKey(std::vector( - {AgencyCommManager::path(), "Supervision"}))) { - for (const auto& s : VPackObjectIterator(tres.get( - std::vector({AgencyCommManager::path(), - "Supervision", "Shards"})))) { - errorMsg += std::string("Shard ") + s.key.copyString(); - errorMsg += - " of prototype collection is blocked by supervision job "; - errorMsg += s.value.copyString(); - } - } - return {TRI_ERROR_CLUSTER_COULD_NOT_CREATE_COLLECTION_IN_PLAN, - std::move(errorMsg)}; - } - } - - LOG_TOPIC(ERR, Logger::CLUSTER) - << "Precondition failed for this agency transaction: " - << transaction.toJson() << ", return code: " << res.httpCode(); - if (ag.successful()) { - LOG_TOPIC(ERR, Logger::CLUSTER) << "Agency dump:\n" - << ag.slice().toJson(); - } else { - LOG_TOPIC(ERR, Logger::CLUSTER) << "Could not get agency dump!"; - } - // Agency is currently unhappy, try again in a few seconds: - std::this_thread::sleep_for(std::chrono::seconds(5)); - continue; - } else { - std::string errorMsg = ""; - errorMsg += std::string("file: ") + __FILE__ + " line: " + std::to_string(__LINE__); - errorMsg += " HTTP code: " + std::to_string(res.httpCode()); - errorMsg += " error message: " + res.errorMessage(); - errorMsg += " error details: " + res.errorDetails(); - errorMsg += " body: " + res.body(); - for (auto const& info : infos) { - events::CreateCollection(info.name, TRI_ERROR_CLUSTER_COULD_NOT_CREATE_COLLECTION_IN_PLAN); - } - return {TRI_ERROR_CLUSTER_COULD_NOT_CREATE_COLLECTION_IN_PLAN, std::move(errorMsg)}; + // use this special error code to signal that we got a precondition failure + // in this case the caller can try again with an updated version of the plan change + return {TRI_ERROR_REQUEST_CANCELED, "operation aborted due to precondition failure"}; + } + + std::string errorMsg = "HTTP code: " + std::to_string(res.httpCode()); + errorMsg += " error message: " + res.errorMessage(); + errorMsg += " error details: " + res.errorDetails(); + errorMsg += " body: " + res.body(); + for (auto const& info : infos) { + events::CreateCollection(info.name, TRI_ERROR_CLUSTER_COULD_NOT_CREATE_COLLECTION_IN_PLAN); } + return {TRI_ERROR_CLUSTER_COULD_NOT_CREATE_COLLECTION_IN_PLAN, std::move(errorMsg)}; } // Update our cache: loadPlan(); } - break; // Leave loop, since we are done } + // if we got here, the plan was updated successfully + LOG_TOPIC(DEBUG, Logger::CLUSTER) << "createCollectionCoordinator, Plan changed, waiting for success..."; @@ -1922,15 +1918,7 @@ Result ClusterInfo::createCollectionsCoordinator(std::string const& databaseName } // Get a full agency dump for debugging - { - AgencyCommResult ag = ac.getValues(""); - if (ag.successful()) { - LOG_TOPIC(ERR, Logger::CLUSTER) << "Agency dump:\n" - << ag.slice().toJson(); - } else { - LOG_TOPIC(ERR, Logger::CLUSTER) << "Could not get agency dump!"; - } - } + logAgencyDump(); if (tmpRes <= TRI_ERROR_NO_ERROR) { tmpRes = TRI_ERROR_CLUSTER_TIMEOUT; @@ -1956,7 +1944,6 @@ Result ClusterInfo::createCollectionsCoordinator(std::string const& databaseName precs.emplace_back(CreateCollectionOrderPrecondition(databaseName, info.collectionID, info.isBuildingSlice())); } - // TODO: Should we use preconditions? AgencyWriteTransaction transaction(opers, precs); @@ -2114,15 +2101,27 @@ int ClusterInfo::dropCollectionCoordinator(std::string const& dbName, "/shards"); if (res.successful()) { - velocypack::Slice shards = res.slice()[0].get(std::vector( - {AgencyCommManager::path(), "Plan", "Collections", dbName, collectionID, - "shards"})); - if (shards.isObject()) { - numberOfShards = shards.length(); + velocypack::Slice databaseSlice = res.slice()[0].get(std::vector( + {AgencyCommManager::path(), "Plan", "Collections", dbName })); + + if (!databaseSlice.isObject()) { + // database dropped in the meantime + return TRI_ERROR_ARANGO_DATABASE_NOT_FOUND; + } + + velocypack::Slice collectionSlice = databaseSlice.get(collectionID); + if (!collectionSlice.isObject()) { + // collection dropped in the meantime + return TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND; + } + + velocypack::Slice shardsSlice = collectionSlice.get("shards"); + if (shardsSlice.isObject()) { + numberOfShards = shardsSlice.length(); } else { LOG_TOPIC(ERR, Logger::CLUSTER) << "Missing shards information on dropping " << dbName << "/" << collectionID; - return TRI_ERROR_ARANGO_DATABASE_NOT_FOUND; + return TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND; } } @@ -2141,13 +2140,10 @@ int ClusterInfo::dropCollectionCoordinator(std::string const& dbName, << "Precondition failed for this agency transaction: " << trans.toJson() << ", return code: " << res.httpCode(); } - AgencyCommResult ag = ac.getValues(""); - if (ag.successful()) { - LOG_TOPIC(ERR, Logger::CLUSTER) << "Agency dump:\n" - << ag.slice().toJson(); - } else { - LOG_TOPIC(ERR, Logger::CLUSTER) << "Could not get agency dump!"; - } + + logAgencyDump(); + // TODO: this should rather be TRI_ERROR_ARANGO_DATABASE_NOT_FOUND, as the + // precondition is that the database still exists return TRI_ERROR_CLUSTER_COULD_NOT_DROP_COLLECTION; } @@ -2181,13 +2177,8 @@ int ClusterInfo::dropCollectionCoordinator(std::string const& dbName, << "Timeout in _drop collection (" << realTimeout << ")" << ": database: " << dbName << ", collId:" << collectionID << "\ntransaction sent to agency: " << trans.toJson(); - AgencyCommResult ag = ac.getValues(""); - if (ag.successful()) { - LOG_TOPIC(ERR, Logger::CLUSTER) << "Agency dump:\n" - << ag.slice().toJson(); - } else { - LOG_TOPIC(ERR, Logger::CLUSTER) << "Could not get agency dump!"; - } + + logAgencyDump(); events::DropCollection(collectionID, TRI_ERROR_CLUSTER_TIMEOUT); return setErrormsg(TRI_ERROR_CLUSTER_TIMEOUT, errorMsg); } @@ -2337,14 +2328,7 @@ int ClusterInfo::createViewCoordinator(std::string const& databaseName, viewID + " does not yet exist failed. Cannot create view."; // Dump agency plan: - auto const ag = ac.getValues("/"); - - if (ag.successful()) { - LOG_TOPIC(ERR, Logger::CLUSTER) << "Agency dump:\n" - << ag.slice().toJson(); - } else { - LOG_TOPIC(ERR, Logger::CLUSTER) << "Could not get agency dump!"; - } + logAgencyDump(); return TRI_ERROR_CLUSTER_COULD_NOT_CREATE_VIEW_IN_PLAN; } else { @@ -2395,14 +2379,7 @@ int ClusterInfo::dropViewCoordinator(std::string const& databaseName, errorMsg += " already exist failed. Cannot create view."; // Dump agency plan: - auto const ag = ac.getValues("/"); - - if (ag.successful()) { - LOG_TOPIC(ERR, Logger::CLUSTER) << "Agency dump:\n" - << ag.slice().toJson(); - } else { - LOG_TOPIC(ERR, Logger::CLUSTER) << "Could not get agency dump!"; - } + logAgencyDump(); } else { errorMsg += std::string("file: ") + __FILE__ + " line: " + std::to_string(__LINE__); errorMsg += " HTTP code: " + std::to_string(res.httpCode()); @@ -2437,14 +2414,7 @@ Result ClusterInfo::setViewPropertiesCoordinator(std::string const& databaseName {AgencyCommManager::path(), "Plan", "Views", databaseName, viewID}); if (!view.isObject()) { - auto const ag = ac.getValues(""); - - if (ag.successful()) { - LOG_TOPIC(ERR, Logger::CLUSTER) << "Agency dump:\n" - << ag.slice().toJson(); - } else { - LOG_TOPIC(ERR, Logger::CLUSTER) << "Could not get agency dump!"; - } + logAgencyDump(); return {TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND}; } diff --git a/arangod/Cluster/ClusterInfo.h b/arangod/Cluster/ClusterInfo.h index 259b6501b7..a8438385c9 100644 --- a/arangod/Cluster/ClusterInfo.h +++ b/arangod/Cluster/ClusterInfo.h @@ -270,6 +270,9 @@ class ClusterInfo { static void cleanup(); public: + /// @brief produces an agency dump and logs it + void logAgencyDump() const; + ////////////////////////////////////////////////////////////////////////////// /// @brief get a number of cluster-wide unique IDs, returns the first /// one and guarantees that are reserved for the caller. @@ -388,19 +391,22 @@ class ClusterInfo { arangodb::velocypack::Slice const& json, std::string& errorMsg, double timeout); - ////////////////////////////////////////////////////////////////////////////// + /// @brief this method does an atomic check of the preconditions for the collections + /// to be created, using the currently loaded plan. it populates the plan version + /// used for the checks + Result checkCollectionPreconditions(std::string const& databaseName, + std::vector const& infos, + uint64_t& planVersion); + /// @brief create multiple collections in coordinator /// If any one of these collections fails, all creations will be /// rolled back. - ////////////////////////////////////////////////////////////////////////////// - + /// Note that in contrast to most other methods here, this method does not + /// get a timeout parameter, but an endTime parameter!!! Result createCollectionsCoordinator(std::string const& databaseName, - std::vector&, double timeout); + std::vector&, double endTime); - ////////////////////////////////////////////////////////////////////////////// /// @brief drop collection in coordinator - ////////////////////////////////////////////////////////////////////////////// - int dropCollectionCoordinator(std::string const& databaseName, std::string const& collectionID, std::string& errorMsg, double timeout); @@ -612,21 +618,21 @@ class ClusterInfo { * @return List of DB servers serving the shard */ arangodb::Result getShardServers(ShardID const& shardId, std::vector&); - - private: - void loadClusterId(); - + ////////////////////////////////////////////////////////////////////////////// /// @brief get an operation timeout ////////////////////////////////////////////////////////////////////////////// - double getTimeout(double timeout) const { + static double getTimeout(double timeout) { if (timeout == 0.0) { return 24.0 * 3600.0; } return timeout; } + private: + void loadClusterId(); + ////////////////////////////////////////////////////////////////////////////// /// @brief get the poll interval ////////////////////////////////////////////////////////////////////////////// diff --git a/arangod/Cluster/ClusterMethods.cpp b/arangod/Cluster/ClusterMethods.cpp index 43692ac984..6c3b43fa2c 100644 --- a/arangod/Cluster/ClusterMethods.cpp +++ b/arangod/Cluster/ClusterMethods.cpp @@ -22,6 +22,7 @@ //////////////////////////////////////////////////////////////////////////////// #include "ClusterMethods.h" +#include "ApplicationFeatures/ApplicationServer.h" #include "Basics/NumberUtils.h" #include "Basics/StaticStrings.h" #include "Basics/StringRef.h" @@ -2567,107 +2568,147 @@ std::vector> ClusterMethods::persistCollectio std::vector>& collections, bool ignoreDistributeShardsLikeErrors, bool waitForSyncReplication, bool enforceReplicationFactor) { + TRI_ASSERT(!collections.empty()); if (collections.empty()) { THROW_ARANGO_EXCEPTION_MESSAGE( TRI_ERROR_INTERNAL, "Trying to create an empty list of collections on coordinator."); } + + double const realTimeout = ClusterInfo::getTimeout(240.0); + double const endTime = TRI_microtime() + realTimeout; + // We have at least one, take this collections DB name - auto& dbName = collections[0]->vocbase().name(); + auto const dbName = collections[0]->vocbase().name(); ClusterInfo* ci = ClusterInfo::instance(); - ci->loadCurrentDBServers(); - std::vector dbServers = ci->getCurrentDBServers(); + std::vector infos; - std::vector>> vpackData; - infos.reserve(collections.size()); - vpackData.reserve(collections.size()); - for (auto& col : collections) { - // We can only serve on Database at a time with this call. - // We have the vocbase context around this calls anyways, so this is save. - TRI_ASSERT(col->vocbase().name() == dbName); - std::string distributeShardsLike = col->distributeShardsLike(); - std::vector avoid = col->avoidServers(); - std::shared_ptr>> shards = nullptr; - if (!distributeShardsLike.empty()) { - CollectionNameResolver resolver(col->vocbase()); - TRI_voc_cid_t otherCid = resolver.getCollectionIdCluster(distributeShardsLike); + while (true) { + infos.clear(); - if (otherCid != 0) { - shards = CloneShardDistribution(ci, col.get(), otherCid); + ci->loadCurrentDBServers(); + std::vector dbServers = ci->getCurrentDBServers(); + infos.reserve(collections.size()); + + std::vector>> vpackData; + vpackData.reserve(collections.size()); + for (auto& col : collections) { + // We can only serve on Database at a time with this call. + // We have the vocbase context around this calls anyways, so this is save. + TRI_ASSERT(col->vocbase().name() == dbName); + std::string distributeShardsLike = col->distributeShardsLike(); + std::vector avoid = col->avoidServers(); + std::shared_ptr>> shards = nullptr; + + if (!distributeShardsLike.empty()) { + CollectionNameResolver resolver(col->vocbase()); + TRI_voc_cid_t otherCid = resolver.getCollectionIdCluster(distributeShardsLike); + + if (otherCid != 0) { + shards = CloneShardDistribution(ci, col.get(), otherCid); + } else { + THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_CLUSTER_UNKNOWN_DISTRIBUTESHARDSLIKE, + "Could not find collection " + distributeShardsLike + + " to distribute shards like it."); + } } else { - THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_CLUSTER_UNKNOWN_DISTRIBUTESHARDSLIKE, - "Could not find collection " + distributeShardsLike + - " to distribute shards like it."); - } - } else { - // system collections should never enforce replicationfactor - // to allow them to come up with 1 dbserver - if (col->system()) { - enforceReplicationFactor = false; - } + // system collections should never enforce replicationfactor + // to allow them to come up with 1 dbserver + if (col->system()) { + enforceReplicationFactor = false; + } - size_t replicationFactor = col->replicationFactor(); - size_t numberOfShards = col->numberOfShards(); + size_t replicationFactor = col->replicationFactor(); + size_t numberOfShards = col->numberOfShards(); - // the default behaviour however is to bail out and inform the user - // that the requested replicationFactor is not possible right now - if (dbServers.size() < replicationFactor) { - LOG_TOPIC(DEBUG, Logger::CLUSTER) + // the default behaviour however is to bail out and inform the user + // that the requested replicationFactor is not possible right now + if (dbServers.size() < replicationFactor) { + LOG_TOPIC(DEBUG, Logger::CLUSTER) << "Do not have enough DBServers for requested replicationFactor," << " nrDBServers: " << dbServers.size() << " replicationFactor: " << replicationFactor; - if (enforceReplicationFactor) { - THROW_ARANGO_EXCEPTION(TRI_ERROR_CLUSTER_INSUFFICIENT_DBSERVERS); + if (enforceReplicationFactor) { + THROW_ARANGO_EXCEPTION(TRI_ERROR_CLUSTER_INSUFFICIENT_DBSERVERS); + } } - } - if (!avoid.empty()) { - // We need to remove all servers that are in the avoid list - if (dbServers.size() - avoid.size() < replicationFactor) { - LOG_TOPIC(DEBUG, Logger::CLUSTER) + if (!avoid.empty()) { + // We need to remove all servers that are in the avoid list + if (dbServers.size() - avoid.size() < replicationFactor) { + LOG_TOPIC(DEBUG, Logger::CLUSTER) << "Do not have enough DBServers for requested replicationFactor," << " (after considering avoid list)," << " nrDBServers: " << dbServers.size() << " replicationFactor: " << replicationFactor << " avoid list size: " << avoid.size(); - // Not enough DBServers left - THROW_ARANGO_EXCEPTION(TRI_ERROR_CLUSTER_INSUFFICIENT_DBSERVERS); + // Not enough DBServers left + THROW_ARANGO_EXCEPTION(TRI_ERROR_CLUSTER_INSUFFICIENT_DBSERVERS); + } + dbServers.erase(std::remove_if(dbServers.begin(), dbServers.end(), + [&](const std::string& x) { + return std::find(avoid.begin(), avoid.end(), + x) != avoid.end(); + }), + dbServers.end()); } - dbServers.erase(std::remove_if(dbServers.begin(), dbServers.end(), - [&](const std::string& x) { - return std::find(avoid.begin(), avoid.end(), - x) != avoid.end(); - }), - dbServers.end()); + std::random_shuffle(dbServers.begin(), dbServers.end()); + shards = DistributeShardsEvenly(ci, numberOfShards, replicationFactor, + dbServers, !col->system()); } - std::random_shuffle(dbServers.begin(), dbServers.end()); - shards = DistributeShardsEvenly(ci, numberOfShards, replicationFactor, - dbServers, !col->system()); - } - if (shards->empty() && !col->isSmart()) { - THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, - "no database servers found in cluster"); - } + if (shards->empty() && !col->isSmart()) { + THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, + "no database servers found in cluster"); + } - col->setShardMap(shards); + col->setShardMap(shards); - std::unordered_set const ignoreKeys{ + std::unordered_set const ignoreKeys{ "allowUserKeys", "cid", "globallyUniqueId", "count", - "planId", "version", "objectId"}; - col->setStatus(TRI_VOC_COL_STATUS_LOADED); - VPackBuilder velocy = col->toVelocyPackIgnore(ignoreKeys, false, false); + "planId", "version", "objectId"}; + col->setStatus(TRI_VOC_COL_STATUS_LOADED); + VPackBuilder velocy = col->toVelocyPackIgnore(ignoreKeys, false, false); - infos.emplace_back( - ClusterCollectionCreationInfo{std::to_string(col->id()), - col->numberOfShards(), col->replicationFactor(), - waitForSyncReplication, velocy.slice()}); - vpackData.emplace_back(velocy.steal()); - } - Result res = ci->createCollectionsCoordinator(dbName, infos, 240.0); - if (res.fail()) { - THROW_ARANGO_EXCEPTION(res); + infos.emplace_back( + ClusterCollectionCreationInfo{std::to_string(col->id()), + col->numberOfShards(), col->replicationFactor(), + waitForSyncReplication, velocy.slice()}); + vpackData.emplace_back(velocy.steal()); + } + + // pass in the *endTime* here, not a timeout! + Result res = ci->createCollectionsCoordinator(dbName, infos, endTime); + + if (res.ok()) { + // success! exit the loop and go on + break; + } + + if (res.is(TRI_ERROR_REQUEST_CANCELED)) { + // special error code indicating that storing the updated plan in the agency + // didn't succeed, and that we should try again + + // sleep for a while + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + + if (TRI_microtime() > endTime) { + // timeout expired + THROW_ARANGO_EXCEPTION(TRI_ERROR_CLUSTER_TIMEOUT); + } + + if (arangodb::application_features::ApplicationServer::isStopping()) { + THROW_ARANGO_EXCEPTION(TRI_ERROR_SHUTTING_DOWN); + } + + // try in next iteration with an adjusted plan change attempt + continue; + + } else { + // any other error + THROW_ARANGO_EXCEPTION(res); + } } // This is no longer necessary, since we load the Plan in