diff --git a/arangod/Cluster/AgencyCallback.cpp b/arangod/Cluster/AgencyCallback.cpp index 2064416852..57e16299fd 100644 --- a/arangod/Cluster/AgencyCallback.cpp +++ b/arangod/Cluster/AgencyCallback.cpp @@ -35,8 +35,6 @@ using namespace arangodb; -#include - AgencyCallback::AgencyCallback(AgencyComm& agency, std::string const& key, std::function const& cb, @@ -133,7 +131,7 @@ void AgencyCallback::waitWithFailover(double timeout) { } } -void AgencyCallback::waitForExecution(double maxTimeout) { +void AgencyCallback::executeByCallbackOrTimeout(double maxTimeout) { auto compareBuilder = std::make_shared(); if (_lastData) { compareBuilder = _lastData; diff --git a/arangod/Cluster/AgencyCallback.h b/arangod/Cluster/AgencyCallback.h index 1784d61bc7..e2e5f8b5bd 100644 --- a/arangod/Cluster/AgencyCallback.h +++ b/arangod/Cluster/AgencyCallback.h @@ -53,7 +53,7 @@ public: std::string const key; void refetchAndUpdate(); - void waitForExecution(double); + void executeByCallbackOrTimeout(double); private: diff --git a/arangod/Cluster/ClusterInfo.cpp b/arangod/Cluster/ClusterInfo.cpp index a765cd1077..500aff2e40 100644 --- a/arangod/Cluster/ClusterInfo.cpp +++ b/arangod/Cluster/ClusterInfo.cpp @@ -1002,11 +1002,65 @@ int ClusterInfo::createDatabaseCoordinator(std::string const& name, AgencyComm ac; AgencyCommResult res; + + Mutex dbServersMutex; double const realTimeout = getTimeout(timeout); double const endTime = TRI_microtime() + realTimeout; double const interval = getPollInterval(); + std::vector DBServers = getCurrentDBServers(); + + int dbServerResult = -1; + std::function dbServerChanged = + [&](VPackSlice const& result) { + size_t numDbServers; + { + MUTEX_LOCKER(guard, dbServersMutex); + numDbServers = DBServers.size(); + } + if (result.isObject() && result.length() == numDbServers) { + VPackObjectIterator dbs(result); + + std::map::iterator it; + std::string tmpMsg = ""; + bool tmpHaveError = false; + + for (auto const& dbserver : dbs) { + VPackSlice slice = dbserver.value; + if (arangodb::basics::VelocyPackHelper::getBooleanValue( + slice, "error", false)) { + tmpHaveError = true; + tmpMsg += " DBServer:" + dbserver.key.copyString() + ":"; + tmpMsg += arangodb::basics::VelocyPackHelper::getStringValue( + slice, "errorMessage", ""); + if (slice.hasKey("errorNum")) { + VPackSlice errorNum = slice.get("errorNum"); + if (errorNum.isNumber()) { + tmpMsg += " (errorNum="; + tmpMsg += basics::StringUtils::itoa( + errorNum.getNumericValue()); + tmpMsg += ")"; + } + } + } + } + if (tmpHaveError) { + errorMsg = "Error in creation of database:" + tmpMsg; + std::cout << errorMsg << std::endl; + dbServerResult = TRI_ERROR_CLUSTER_COULD_NOT_CREATE_DATABASE; + return true; + } + loadCurrentDatabases(); // update our cache + dbServerResult = setErrormsg(TRI_ERROR_NO_ERROR, errorMsg); + } + return true; + }; + + auto agencyCallback = std::make_shared( + ac, "Current/Databases/" + name, dbServerChanged, true, false); + _agencyCallbackRegistry->registerCallback(agencyCallback); + { AgencyCommLocker locker("Plan", "WRITE"); @@ -1028,70 +1082,13 @@ int ClusterInfo::createDatabaseCoordinator(std::string const& name, // Now update our own cache of planned databases: loadPlannedDatabases(); - - // Now wait for it to appear and be complete: - res.clear(); - res = ac.getValues2("Current/Version", false); - if (!res.successful()) { - return setErrormsg(TRI_ERROR_CLUSTER_COULD_NOT_READ_CURRENT_VERSION, - errorMsg); - } - // HERE - std::vector DBServers = getCurrentDBServers(); + int count = 0; // this counts, when we have to reload the DBServers - - std::string where = "Current/Databases/" + name; while (TRI_microtime() <= endTime) { - - res.clear(); - - res = ac.getValues2(where, true); - VPackSlice adbservers = - res._vpack->slice()[0] - .get(AgencyComm::prefixStripped()).get("Current") - .get("Databases").get(name); - - if (res.successful() && !adbservers.isNone()) { - - VPackObjectIterator dbs(adbservers); - - if (dbs.size() == DBServers.size()) { - std::map::iterator it; - std::string tmpMsg = ""; - bool tmpHaveError = false; - - for (auto const& dbserver : dbs) { - VPackSlice slice = dbserver.value; - if (arangodb::basics::VelocyPackHelper::getBooleanValue( - slice, "error", false)) { - tmpHaveError = true; - tmpMsg += " DBServer:" + dbserver.key.copyString() + ":"; - tmpMsg += arangodb::basics::VelocyPackHelper::getStringValue( - slice, "errorMessage", ""); - if (slice.hasKey("errorNum")) { - VPackSlice errorNum = slice.get("errorNum"); - if (errorNum.isNumber()) { - tmpMsg += " (errorNum="; - tmpMsg += basics::StringUtils::itoa( - errorNum.getNumericValue()); - tmpMsg += ")"; - } - } - } - } - if (tmpHaveError) { - errorMsg = "Error in creation of database:" + tmpMsg; - std::cout << errorMsg << std::endl; - return TRI_ERROR_CLUSTER_COULD_NOT_CREATE_DATABASE; - } - loadCurrentDatabases(); // update our cache - return setErrormsg(TRI_ERROR_NO_ERROR, errorMsg); - } + agencyCallback->executeByCallbackOrTimeout(getReloadServerListTimeout() / interval); + if (dbServerResult >= 0) { + break; } - - res.clear(); - _agencyCallbackRegistry->awaitNextChange( - "Current/Version", getReloadServerListTimeout() / interval); if (++count >= static_cast(getReloadServerListTimeout() / interval)) { // We update the list of DBServers every minute in case one of them @@ -1099,10 +1096,16 @@ int ClusterInfo::createDatabaseCoordinator(std::string const& name, // if a new DBServer was added. However, in this case we report // success a bit too early, which is not too bad. loadCurrentDBServers(); - DBServers = getCurrentDBServers(); + { + MUTEX_LOCKER(guard, dbServersMutex); + DBServers = getCurrentDBServers(); + } count = 0; } - + } + _agencyCallbackRegistry->unregisterCallback(agencyCallback); + if (dbServerResult >= 0) { + return dbServerResult; } return setErrormsg(TRI_ERROR_CLUSTER_TIMEOUT, errorMsg); } @@ -1275,9 +1278,7 @@ int ClusterInfo::createCollectionCoordinator(std::string const& databaseName, return true; } dbServerResult = setErrormsg(TRI_ERROR_NO_ERROR, errorMsg); - return true; } - return true; }; @@ -1318,7 +1319,7 @@ int ClusterInfo::createCollectionCoordinator(std::string const& databaseName, loadPlannedCollections(); while (TRI_microtime() <= endTime) { - agencyCallback->waitForExecution(interval); + agencyCallback->executeByCallbackOrTimeout(interval); if (dbServerResult >= 0) { break; @@ -1410,7 +1411,7 @@ int ClusterInfo::dropCollectionCoordinator(std::string const& databaseName, loadPlannedCollections(); while (TRI_microtime() <= endTime) { - agencyCallback->waitForExecution(interval); + agencyCallback->executeByCallbackOrTimeout(interval); if (dbServerResult >= 0) { break; }