1
0
Fork 0

Better protection against multi-threading.

This commit is contained in:
Max Neunhoeffer 2016-05-06 23:06:10 +02:00
parent b5c87fba33
commit b37d8fff48
2 changed files with 27 additions and 8 deletions

View File

@ -92,8 +92,8 @@ bool AgencyCallback::executeEmpty() {
result = _cb(VPackSlice::noneSlice()); result = _cb(VPackSlice::noneSlice());
} }
if (_useCv) {
CONDITION_LOCKER(locker, _cv); CONDITION_LOCKER(locker, _cv);
if (_useCv) {
_cv.signal(); _cv.signal();
} }
return result; return result;
@ -107,8 +107,8 @@ bool AgencyCallback::execute(std::shared_ptr<VPackBuilder> newData) {
result = _cb(newData->slice()); result = _cb(newData->slice());
} }
if (_useCv) {
CONDITION_LOCKER(locker, _cv); CONDITION_LOCKER(locker, _cv);
if (_useCv) {
_cv.signal(); _cv.signal();
} }
return result; return result;

View File

@ -901,6 +901,7 @@ int ClusterInfo::createDatabaseCoordinator(std::string const& name,
std::vector<ServerID> DBServers = getCurrentDBServers(); std::vector<ServerID> DBServers = getCurrentDBServers();
int dbServerResult = -1; int dbServerResult = -1;
std::function<bool(VPackSlice const& result)> dbServerChanged = std::function<bool(VPackSlice const& result)> dbServerChanged =
[&](VPackSlice const& result) { [&](VPackSlice const& result) {
size_t numDbServers; size_t numDbServers;
@ -908,7 +909,11 @@ int ClusterInfo::createDatabaseCoordinator(std::string const& name,
MUTEX_LOCKER(guard, dbServersMutex); MUTEX_LOCKER(guard, dbServersMutex);
numDbServers = DBServers.size(); numDbServers = DBServers.size();
} }
if (result.isObject() && result.length() == numDbServers) { if (result.isObject() && result.length() >= numDbServers) {
// We use >= here since the number of DBservers could have increased
// during the creation of the database and we might not yet have
// the latest list. Thus there could be more reports than we know
// servers.
VPackObjectIterator dbs(result); VPackObjectIterator dbs(result);
std::string tmpMsg = ""; std::string tmpMsg = "";
@ -934,16 +939,24 @@ int ClusterInfo::createDatabaseCoordinator(std::string const& name,
} }
} }
if (tmpHaveError) { if (tmpHaveError) {
MUTEX_LOCKER(guard, dbServersMutex);
errorMsg = "Error in creation of database:" + tmpMsg; errorMsg = "Error in creation of database:" + tmpMsg;
dbServerResult = TRI_ERROR_CLUSTER_COULD_NOT_CREATE_DATABASE; dbServerResult = TRI_ERROR_CLUSTER_COULD_NOT_CREATE_DATABASE;
return true; return true;
} }
loadCurrent(); // update our cache loadCurrent(); // update our cache
{
MUTEX_LOCKER(guard, dbServersMutex);
dbServerResult = setErrormsg(TRI_ERROR_NO_ERROR, errorMsg); dbServerResult = setErrormsg(TRI_ERROR_NO_ERROR, errorMsg);
} }
}
return true; return true;
}; };
// ATTENTION: The following callback calls the above closure in a
// different thread. Nevertheless, the closure accesses some of our
// local variables. Therefore we have to protect all accesses to them
// by the above mutex.
auto agencyCallback = std::make_shared<AgencyCallback>( auto agencyCallback = std::make_shared<AgencyCallback>(
ac, "Current/Databases/" + name, dbServerChanged, true, false); ac, "Current/Databases/" + name, dbServerChanged, true, false);
_agencyCallbackRegistry->registerCallback(agencyCallback); _agencyCallbackRegistry->registerCallback(agencyCallback);
@ -974,9 +987,12 @@ int ClusterInfo::createDatabaseCoordinator(std::string const& name,
int count = 0; // this counts, when we have to reload the DBServers int count = 0; // this counts, when we have to reload the DBServers
while (TRI_microtime() <= endTime) { while (TRI_microtime() <= endTime) {
agencyCallback->executeByCallbackOrTimeout(getReloadServerListTimeout() / interval); agencyCallback->executeByCallbackOrTimeout(getReloadServerListTimeout() / interval);
{
MUTEX_LOCKER(guard, dbServersMutex);
if (dbServerResult >= 0) { if (dbServerResult >= 0) {
break; break;
} }
}
if (++count >= static_cast<int>(getReloadServerListTimeout() / interval)) { if (++count >= static_cast<int>(getReloadServerListTimeout() / interval)) {
// We update the list of DBServers every minute in case one of them // We update the list of DBServers every minute in case one of them
@ -991,9 +1007,12 @@ int ClusterInfo::createDatabaseCoordinator(std::string const& name,
count = 0; count = 0;
} }
} }
{
MUTEX_LOCKER(guard, dbServersMutex);
if (dbServerResult >= 0) { if (dbServerResult >= 0) {
return dbServerResult; return dbServerResult;
} }
}
return setErrormsg(TRI_ERROR_CLUSTER_TIMEOUT, errorMsg); return setErrormsg(TRI_ERROR_CLUSTER_TIMEOUT, errorMsg);
} }