diff --git a/arangod/Cluster/AgencyCallback.cpp b/arangod/Cluster/AgencyCallback.cpp index c91cc82437..00e54b9c38 100644 --- a/arangod/Cluster/AgencyCallback.cpp +++ b/arangod/Cluster/AgencyCallback.cpp @@ -124,18 +124,11 @@ bool AgencyCallback::execute(std::shared_ptr newData) { void AgencyCallback::executeByCallbackOrTimeout(double maxTimeout) { // One needs to acquire the mutex of the condition variable // before entering this function! - auto compareBuilder = std::make_shared(); - if (_lastData) { - compareBuilder = _lastData; - } - if (!_cv.wait(static_cast(maxTimeout * 1000000.0))) { - if (!_lastData || !_lastData->slice().equals(compareBuilder->slice())) { - LOG_TOPIC(DEBUG, Logger::CLUSTER) - << "Waiting done and nothing happended. Refetching to be sure"; - // mop: watches have not triggered during our sleep...recheck to be sure - refetchAndUpdate(false); - } + LOG_TOPIC(DEBUG, Logger::CLUSTER) + << "Waiting done and nothing happended. Refetching to be sure"; + // mop: watches have not triggered during our sleep...recheck to be sure + refetchAndUpdate(false); } } diff --git a/arangod/Cluster/ClusterInfo.cpp b/arangod/Cluster/ClusterInfo.cpp index a2e9f00143..888eab50c0 100644 --- a/arangod/Cluster/ClusterInfo.cpp +++ b/arangod/Cluster/ClusterInfo.cpp @@ -839,7 +839,6 @@ int ClusterInfo::createDatabaseCoordinator(std::string const& name, *dbServerResult = TRI_ERROR_CLUSTER_COULD_NOT_CREATE_DATABASE; return true; } - loadCurrent(); // update our cache *dbServerResult = setErrormsg(TRI_ERROR_NO_ERROR, *errMsg); } return true; @@ -897,6 +896,7 @@ int ClusterInfo::createDatabaseCoordinator(std::string const& name, } if (*dbServerResult >= 0) { + loadCurrent(); // update our cache return *dbServerResult; } @@ -1062,7 +1062,6 @@ int ClusterInfo::createCollectionCoordinator(std::string const& databaseName, } } } - loadCurrent(); if (tmpHaveError) { *errMsg = "Error in creation of collection:" + tmpMsg; *dbServerResult = TRI_ERROR_CLUSTER_COULD_NOT_CREATE_COLLECTION; @@ -1120,8 +1119,9 @@ int ClusterInfo::createCollectionCoordinator(std::string const& databaseName, while (true) { errorMsg = *errMsg; - + if (*dbServerResult >= 0) { + loadCurrent(); return *dbServerResult; } @@ -1151,37 +1151,21 @@ int ClusterInfo::dropCollectionCoordinator(std::string const& databaseName, double const realTimeout = getTimeout(timeout); double const endTime = TRI_microtime() + realTimeout; double const interval = getPollInterval(); - + auto dbServerResult = std::make_shared(-1); auto errMsg = std::make_shared(); std::function dbServerChanged = [=](VPackSlice const& result) { - AgencyComm ac; if (result.isObject() && result.length() == 0) { - // ...remove the entire directory for the collection - AgencyCommResult res; - res = ac.removeValues( - "Current/Collections/" + databaseName + "/" + collectionID, true); - if (res.successful()) { - *dbServerResult = setErrormsg(TRI_ERROR_NO_ERROR, *errMsg); - return true; - } - *dbServerResult = setErrormsg( - TRI_ERROR_CLUSTER_COULD_NOT_REMOVE_COLLECTION_IN_CURRENT, - *errMsg); - return true; - - loadCurrent(); *dbServerResult = setErrormsg(TRI_ERROR_NO_ERROR, *errMsg); - return true; } return true; }; - + // monitor the entry for the collection std::string const where = "Current/Collections/" + databaseName + "/" + collectionID; - + // ATTENTION: The following callback calls the above closure in a // different thread. Nevertheless, the closure accesses some of our // local variables. Therefore we have to protect all accesses to them @@ -1203,25 +1187,29 @@ int ClusterInfo::dropCollectionCoordinator(std::string const& databaseName, AgencyWriteTransaction trans( {delPlanCollection, incrementVersion},precondition); res = ac.sendTransactionWithFailover(trans); - + // Update our own cache: loadPlan(); { CONDITION_LOCKER(locker, agencyCallback->_cv); - + while (true) { errorMsg = *errMsg; if (*dbServerResult >= 0) { + // ...remove the entire directory for the collection + ac.removeValues( + "Current/Collections/" + databaseName + "/" + collectionID, true); + loadCurrent() return *dbServerResult; } - + if (TRI_microtime() > endTime) { return setErrormsg(TRI_ERROR_CLUSTER_TIMEOUT, errorMsg); } - + agencyCallback->executeByCallbackOrTimeout(interval); } } @@ -1618,15 +1606,13 @@ int ClusterInfo::ensureIndexCoordinator( } resBuilder->add("isNewlyCreated", VPackValue(true)); } - loadCurrent(); - *dbServerResult = setErrormsg(TRI_ERROR_NO_ERROR, *errMsg); return true; } } return true; }; - + // ATTENTION: The following callback calls the above closure in a // different thread. Nevertheless, the closure accesses some of our @@ -1644,19 +1630,19 @@ int ClusterInfo::ensureIndexCoordinator( "Plan/Version", AgencySimpleOperationType::INCREMENT_OP); AgencyPrecondition oldValue(key, AgencyPrecondition::VALUE, collection); AgencyWriteTransaction trx ({newValue, incrementVersion}, oldValue); - + AgencyCommResult result = ac.sendTransactionWithFailover(trx, 0.0); - + if (!result.successful()) { resultBuilder = *resBuilder; return setErrormsg(TRI_ERROR_CLUSTER_COULD_NOT_CREATE_COLLECTION_IN_PLAN, errorMsg); } - + loadPlan(); - + TRI_ASSERT(*numberOfShards > 0); - + { CONDITION_LOCKER(locker, agencyCallback->_cv); @@ -1664,15 +1650,16 @@ int ClusterInfo::ensureIndexCoordinator( errorMsg = *errMsg; resultBuilder = *resBuilder; - + if (*dbServerResult >= 0) { + loadCurrent(); return *dbServerResult; } - + if (TRI_microtime() > endTime) { return setErrormsg(TRI_ERROR_CLUSTER_TIMEOUT, errorMsg); } - + agencyCallback->executeByCallbackOrTimeout(interval); } } @@ -1741,7 +1728,7 @@ int ClusterInfo::dropIndexCoordinator(std::string const& databaseName, } VPackObjectIterator shards(current); - + if (shards.size() == (size_t)localNumberOfShards) { bool found = false; for (auto const& shard : shards) { @@ -1767,13 +1754,12 @@ int ClusterInfo::dropIndexCoordinator(std::string const& databaseName, } if (!found) { - loadCurrent(); *dbServerResult = setErrormsg(TRI_ERROR_NO_ERROR, *errMsg); } } return true; }; - + // ATTENTION: The following callback calls the above closure in a // different thread. Nevertheless, the closure accesses some of our // local variables. Therefore we have to protect all accesses to them @@ -1789,30 +1775,30 @@ int ClusterInfo::dropIndexCoordinator(std::string const& databaseName, // and the write lock we acquire below something has changed. Therefore // we first get the previous value and then do a compare and swap operation. - + VPackBuilder tmp; VPackSlice indexes; { std::shared_ptr c = getCollection(databaseName, collectionID); - + READ_LOCKER(readLocker, _planProt.lock); - + if (c == nullptr) { return setErrormsg(TRI_ERROR_ARANGO_COLLECTION_NOT_FOUND, errorMsg); } c->getIndexesVPack(tmp, false); indexes = tmp.slice(); - + if (!indexes.isArray()) { // no indexes present, so we can't delete our index return setErrormsg(TRI_ERROR_ARANGO_INDEX_NOT_FOUND, errorMsg); } - + MUTEX_LOCKER(guard, *numberOfShardsMutex); *numberOfShards = c->numberOfShards(); } - + bool found = false; VPackBuilder newIndexes; { @@ -1822,14 +1808,14 @@ int ClusterInfo::dropIndexCoordinator(std::string const& databaseName, for (auto const& indexSlice: VPackArrayIterator(indexes)) { VPackSlice id = indexSlice.get("id"); VPackSlice type = indexSlice.get("type"); - + if (!id.isString() || !type.isString()) { continue; } if (idString == id.copyString()) { // found our index, ignore it when copying found = true; - + std::string const typeString = type.copyString(); if (typeString == "primary" || typeString == "edge") { return setErrormsg(TRI_ERROR_FORBIDDEN, errorMsg); @@ -1842,7 +1828,7 @@ int ClusterInfo::dropIndexCoordinator(std::string const& databaseName, if (!found) { return setErrormsg(TRI_ERROR_ARANGO_INDEX_NOT_FOUND, errorMsg); } - + VPackBuilder newCollectionBuilder; { VPackObjectBuilder newCollectionObjectBuilder(&newCollectionBuilder); @@ -1862,15 +1848,15 @@ int ClusterInfo::dropIndexCoordinator(std::string const& databaseName, AgencyPrecondition prec(key, AgencyPrecondition::VALUE, previous); AgencyWriteTransaction trx ({newVal, incrementVersion}, prec); AgencyCommResult result = ac.sendTransactionWithFailover(trx, 0.0); - + if (!result.successful()) { return setErrormsg(TRI_ERROR_CLUSTER_COULD_NOT_CREATE_COLLECTION_IN_PLAN, errorMsg); } - + // load our own cache: loadPlan(); - + { MUTEX_LOCKER(guard, *numberOfShardsMutex); TRI_ASSERT(*numberOfShards > 0); @@ -1882,8 +1868,9 @@ int ClusterInfo::dropIndexCoordinator(std::string const& databaseName, while (true) { errorMsg = *errMsg; - + if (*dbServerResult >= 0) { + loadCurrent(); return *dbServerResult; } @@ -1919,7 +1906,7 @@ void ClusterInfo::loadServers() { AgencyCommResult result = _agency.getValues(prefixServers); if (result.successful()) { - + velocypack::Slice serversRegistered = result.slice()[0].get(std::vector( {AgencyComm::prefix(), "Current", "ServersRegistered"}));