From b37d8fff48755fdf1f47f9115c40d0ae0e6d0d1b Mon Sep 17 00:00:00 2001 From: Max Neunhoeffer Date: Fri, 6 May 2016 23:06:10 +0200 Subject: [PATCH] Better protection against multi-threading. --- arangod/Cluster/AgencyCallback.cpp | 4 ++-- arangod/Cluster/ClusterInfo.cpp | 31 ++++++++++++++++++++++++------ 2 files changed, 27 insertions(+), 8 deletions(-) diff --git a/arangod/Cluster/AgencyCallback.cpp b/arangod/Cluster/AgencyCallback.cpp index 569365033e..8d6be9e175 100644 --- a/arangod/Cluster/AgencyCallback.cpp +++ b/arangod/Cluster/AgencyCallback.cpp @@ -92,8 +92,8 @@ bool AgencyCallback::executeEmpty() { result = _cb(VPackSlice::noneSlice()); } + CONDITION_LOCKER(locker, _cv); if (_useCv) { - CONDITION_LOCKER(locker, _cv); _cv.signal(); } return result; @@ -107,8 +107,8 @@ bool AgencyCallback::execute(std::shared_ptr newData) { result = _cb(newData->slice()); } + CONDITION_LOCKER(locker, _cv); if (_useCv) { - CONDITION_LOCKER(locker, _cv); _cv.signal(); } return result; diff --git a/arangod/Cluster/ClusterInfo.cpp b/arangod/Cluster/ClusterInfo.cpp index 6ebd40604c..398e2d7ed3 100644 --- a/arangod/Cluster/ClusterInfo.cpp +++ b/arangod/Cluster/ClusterInfo.cpp @@ -901,6 +901,7 @@ int ClusterInfo::createDatabaseCoordinator(std::string const& name, std::vector DBServers = getCurrentDBServers(); int dbServerResult = -1; + std::function dbServerChanged = [&](VPackSlice const& result) { size_t numDbServers; @@ -908,7 +909,11 @@ int ClusterInfo::createDatabaseCoordinator(std::string const& name, MUTEX_LOCKER(guard, dbServersMutex); numDbServers = DBServers.size(); } - if (result.isObject() && result.length() == numDbServers) { + if (result.isObject() && result.length() >= numDbServers) { + // We use >= here since the number of DBservers could have increased + // during the creation of the database and we might not yet have + // the latest list. Thus there could be more reports than we know + // servers. VPackObjectIterator dbs(result); std::string tmpMsg = ""; @@ -934,16 +939,24 @@ int ClusterInfo::createDatabaseCoordinator(std::string const& name, } } if (tmpHaveError) { + MUTEX_LOCKER(guard, dbServersMutex); errorMsg = "Error in creation of database:" + tmpMsg; dbServerResult = TRI_ERROR_CLUSTER_COULD_NOT_CREATE_DATABASE; return true; } loadCurrent(); // update our cache - dbServerResult = setErrormsg(TRI_ERROR_NO_ERROR, errorMsg); + { + MUTEX_LOCKER(guard, dbServersMutex); + dbServerResult = setErrormsg(TRI_ERROR_NO_ERROR, errorMsg); + } } return true; }; + // ATTENTION: The following callback calls the above closure in a + // different thread. Nevertheless, the closure accesses some of our + // local variables. Therefore we have to protect all accesses to them + // by the above mutex. auto agencyCallback = std::make_shared( ac, "Current/Databases/" + name, dbServerChanged, true, false); _agencyCallbackRegistry->registerCallback(agencyCallback); @@ -974,8 +987,11 @@ int ClusterInfo::createDatabaseCoordinator(std::string const& name, int count = 0; // this counts, when we have to reload the DBServers while (TRI_microtime() <= endTime) { agencyCallback->executeByCallbackOrTimeout(getReloadServerListTimeout() / interval); - if (dbServerResult >= 0) { - break; + { + MUTEX_LOCKER(guard, dbServersMutex); + if (dbServerResult >= 0) { + break; + } } if (++count >= static_cast(getReloadServerListTimeout() / interval)) { @@ -991,8 +1007,11 @@ int ClusterInfo::createDatabaseCoordinator(std::string const& name, count = 0; } } - if (dbServerResult >= 0) { - return dbServerResult; + { + MUTEX_LOCKER(guard, dbServersMutex); + if (dbServerResult >= 0) { + return dbServerResult; + } } return setErrormsg(TRI_ERROR_CLUSTER_TIMEOUT, errorMsg); }