diff --git a/arangod/Cluster/ClusterInfo.cpp b/arangod/Cluster/ClusterInfo.cpp index 8c400ce35f..d7be927091 100644 --- a/arangod/Cluster/ClusterInfo.cpp +++ b/arangod/Cluster/ClusterInfo.cpp @@ -264,12 +264,6 @@ ClusterInfo* ClusterInfo::instance () { ClusterInfo::ClusterInfo () : _agency(), - _serversValid(false), - _DBServersValid(false), - _coordinatorsValid(false), - _plannedDatabases(), - _currentDatabases(), - _collectionsValid(false), _uniqid() { _uniqid._currentValue = _uniqid._upperValue = 0ULL; @@ -282,8 +276,8 @@ ClusterInfo::ClusterInfo () //////////////////////////////////////////////////////////////////////////////// ClusterInfo::~ClusterInfo () { - clearPlannedDatabases(); - clearCurrentDatabases(); + clearPlannedDatabases(_plannedDatabases); + clearCurrentDatabases(_currentDatabases); } // ----------------------------------------------------------------------------- @@ -329,21 +323,13 @@ uint64_t ClusterInfo::uniqid (uint64_t count) { //////////////////////////////////////////////////////////////////////////////// void ClusterInfo::flush () { - WRITE_LOCKER(_lock); - - _collectionsValid = false; - _collectionsCurrentValid = false; - _serversValid = false; - _DBServersValid = false; - _coordinatorsValid = false; - - _collections.clear(); - _collectionsCurrent.clear(); - _servers.clear(); - _shardIds.clear(); - - clearPlannedDatabases(); - clearCurrentDatabases(); + loadServers(); + loadCurrentDBServers(); + loadCurrentCoordinators(); + loadPlannedDatabases(); + loadCurrentDatabases(); + loadPlannedCollections(); + loadCurrentCollections(); } //////////////////////////////////////////////////////////////////////////////// @@ -354,26 +340,38 @@ bool ClusterInfo::doesDatabaseExist (DatabaseID const& databaseID, bool reload) { int tries = 0; - if (reload) { + if (reload || + ! _plannedDatabasesProt.isValid || + ! _currentDatabasesProt.isValid || + ! _DBServersProt.isValid) { loadPlannedDatabases(); loadCurrentDatabases(); loadCurrentDBServers(); - ++tries; + ++tries; // no need to reload if the database is not found } + // From now on we know that all data has been valid once, so no need + // to check the isValid flags again under the lock. + while (true) { { - READ_LOCKER(_lock); - const size_t expectedSize = _DBServers.size(); + size_t expectedSize; + { + READ_LOCKER(_DBServersProt.lock); + expectedSize = _DBServers.size(); + } - // look up database by name + // look up database by name: + READ_LOCKER(_plannedDatabasesProt.lock); // _plannedDatabases is a map-type auto it = _plannedDatabases.find(databaseID); if (it != _plannedDatabases.end()) { // found the database in Plan - // _currentDatabases is a map-type + READ_LOCKER(_currentDatabasesProt.lock); + // _currentDatabases is + // a map-type> auto it2 = _currentDatabases.find(databaseID); if (it2 != _currentDatabases.end()) { @@ -403,44 +401,57 @@ bool ClusterInfo::doesDatabaseExist (DatabaseID const& databaseID, vector ClusterInfo::listDatabases (bool reload) { vector result; - if (reload) { + if (reload || + ! _plannedDatabasesProt.isValid || + ! _currentDatabasesProt.isValid || + ! _DBServersProt.isValid) { loadPlannedDatabases(); loadCurrentDatabases(); loadCurrentDBServers(); } - READ_LOCKER(_lock); - const size_t expectedSize = _DBServers.size(); + // From now on we know that all data has been valid once, so no need + // to check the isValid flags again under the lock. - // _plannedDatabases is a map-type - auto it = _plannedDatabases.begin(); + size_t expectedSize; + { + READ_LOCKER(_DBServersProt.lock); + expectedSize = _DBServers.size(); + } - while (it != _plannedDatabases.end()) { - // _currentDatabases is: - // a map-type> - auto it2 = _currentDatabases.find((*it).first); + { + READ_LOCKER(_plannedDatabasesProt.lock); + READ_LOCKER(_currentDatabasesProt.lock); + // _plannedDatabases is a map-type + auto it = _plannedDatabases.begin(); - if (it2 != _currentDatabases.end()) { - if ((*it2).second.size() >= expectedSize) { - result.push_back((*it).first); + while (it != _plannedDatabases.end()) { + // _currentDatabases is: + // a map-type> + auto it2 = _currentDatabases.find((*it).first); + + if (it2 != _currentDatabases.end()) { + if ((*it2).second.size() >= expectedSize) { + result.push_back((*it).first); + } } - } - ++it; + ++it; + } } return result; } //////////////////////////////////////////////////////////////////////////////// -/// @brief flushes the list of planned databases +/// @brief actually clears a list of planned databases //////////////////////////////////////////////////////////////////////////////// -void ClusterInfo::clearPlannedDatabases () { - // _plannedDatabases is a map-type - auto it = _plannedDatabases.begin(); +void ClusterInfo::clearPlannedDatabases ( + std::unordered_map& databases) { - while (it != _plannedDatabases.end()) { + auto it = databases.begin(); + while (it != databases.end()) { TRI_json_t* json = (*it).second; if (json != nullptr) { @@ -448,20 +459,79 @@ void ClusterInfo::clearPlannedDatabases () { } ++it; } - - _plannedDatabases.clear(); + databases.clear(); } //////////////////////////////////////////////////////////////////////////////// -/// @brief flushes the list of current databases +/// @brief (re-)load the information about planned databases +/// Usually one does not have to call this directly. +//////////////////////////////////////////////////////////////////////////////// +// +static const std::string prefixPlannedDatabases = "Plan/Databases"; + +void ClusterInfo::loadPlannedDatabases () { + + uint64_t storedVersion = _plannedDatabasesProt.version; + MUTEX_LOCKER(_plannedDatabasesProt.mutex); + if (_plannedDatabasesProt.version > storedVersion) { + // Somebody else did, what we intended to do, so just return + return; + } + + // Now contact the agency: + AgencyCommResult result; + { + AgencyCommLocker locker("Plan", "READ"); + + if (locker.successful()) { + result = _agency.getValues(prefixPlannedDatabases, true); + } + } + + if (result.successful()) { + result.parse(prefixPlannedDatabases + "/", false); + + decltype(_plannedDatabases) newDatabases; + + // result._values is a std::map + auto it = result._values.begin(); + + while (it != result._values.end()) { + string const& name = (*it).first; + TRI_json_t* options = (*it).second._json; + + // steal the json + (*it).second._json = nullptr; + newDatabases.insert(std::make_pair(name, options)); + + ++it; + } + + // Now set the new value: + { + WRITE_LOCKER(_plannedDatabasesProt.lock); + _plannedDatabases.swap(newDatabases); + _plannedDatabasesProt.version++; // such that others notice our change + _plannedDatabasesProt.isValid = true; // will never be reset to false + } + clearPlannedDatabases(newDatabases); // delete the old stuff + return; + } + + LOG_TRACE("Error while loading %s", prefixPlannedDatabases.c_str()); +} + +//////////////////////////////////////////////////////////////////////////////// +/// @brief deletes a list of current databases //////////////////////////////////////////////////////////////////////////////// -void ClusterInfo::clearCurrentDatabases () { - // _currentDatabases is - // a map-type> - auto it = _currentDatabases.begin(); +void ClusterInfo::clearCurrentDatabases ( + std::unordered_map>& + databases) { - while (it != _currentDatabases.end()) { + auto it = databases.begin(); + while (it != databases.end()) { auto it2 = (*it).second.begin(); while (it2 != (*it).second.end()) { @@ -476,51 +546,7 @@ void ClusterInfo::clearCurrentDatabases () { ++it; } - _currentDatabases.clear(); -} - -//////////////////////////////////////////////////////////////////////////////// -/// @brief (re-)load the information about planned databases -/// Usually one does not have to call this directly. -//////////////////////////////////////////////////////////////////////////////// -// -static const std::string prefixPlannedDatabases = "Plan/Databases"; -void ClusterInfo::loadPlannedDatabases () { - - AgencyCommResult result; - - { - AgencyCommLocker locker("Plan", "READ"); - - if (locker.successful()) { - result = _agency.getValues(prefixPlannedDatabases, true); - } - } - - if (result.successful()) { - result.parse(prefixPlannedDatabases + "/", false); - - WRITE_LOCKER(_lock); - clearPlannedDatabases(); - - // result._values is a std::map - auto it = result._values.begin(); - - while (it != result._values.end()) { - string const& name = (*it).first; - TRI_json_t* options = (*it).second._json; - - // steal the json - (*it).second._json = nullptr; - _plannedDatabases.insert(std::make_pair(name, options)); - - ++it; - } - - return; - } - - LOG_TRACE("Error while loading %s", prefixPlannedDatabases.c_str()); + databases.clear(); } //////////////////////////////////////////////////////////////////////////////// @@ -529,10 +555,18 @@ void ClusterInfo::loadPlannedDatabases () { //////////////////////////////////////////////////////////////////////////////// static const std::string prefixCurrentDatabases = "Current/Databases"; + void ClusterInfo::loadCurrentDatabases () { - AgencyCommResult result; + uint64_t storedVersion = _currentDatabasesProt.version; + MUTEX_LOCKER(_currentDatabasesProt.mutex); + if (_currentDatabasesProt.version > storedVersion) { + // Somebody else did, what we intended to do, so just return + return; + } + // Now contact the agency: + AgencyCommResult result; { AgencyCommLocker locker("Plan", "READ"); @@ -544,8 +578,7 @@ void ClusterInfo::loadCurrentDatabases () { if (result.successful()) { result.parse(prefixCurrentDatabases + "/", false); - WRITE_LOCKER(_lock); - clearCurrentDatabases(); + decltype(_currentDatabases) newDatabases; std::map::iterator it = result._values.begin(); @@ -563,12 +596,12 @@ void ClusterInfo::loadCurrentDatabases () { // _currentDatabases is // a map-type> - auto it2 = _currentDatabases.find(database); + auto it2 = newDatabases.find(database); - if (it2 == _currentDatabases.end()) { + if (it2 == newDatabases.end()) { // insert an empty list for this database decltype(it2->second) empty; - it2 = _currentDatabases.insert(std::make_pair(database, empty)).first; + it2 = newDatabases.insert(std::make_pair(database, empty)).first; } if (parts.size() == 2) { @@ -582,6 +615,14 @@ void ClusterInfo::loadCurrentDatabases () { ++it; } + // Now set the new value: + { + WRITE_LOCKER(_currentDatabasesProt.lock); + _currentDatabases.swap(newDatabases); + _currentDatabasesProt.version++; // such that others notice our change + _currentDatabasesProt.isValid = true; // will never be reset to false + } + clearCurrentDatabases(newDatabases); // delete the old stuff return; } @@ -594,10 +635,18 @@ void ClusterInfo::loadCurrentDatabases () { //////////////////////////////////////////////////////////////////////////////// static const std::string prefixPlannedCollections = "Plan/Collections"; + void ClusterInfo::loadPlannedCollections (bool acquireLock) { - AgencyCommResult result; + uint64_t storedVersion = _plannedCollectionsProt.version; + MUTEX_LOCKER(_plannedCollectionsProt.mutex); + if (_plannedCollectionsProt.version > storedVersion) { + // Somebody else did, what we intended to do, so just return + return; + } + // Now contact the agency: + AgencyCommResult result; { if (acquireLock) { AgencyCommLocker locker("Plan", "READ"); @@ -614,9 +663,9 @@ void ClusterInfo::loadPlannedCollections (bool acquireLock) { if (result.successful()) { result.parse(prefixPlannedCollections + "/", false); - WRITE_LOCKER(_lock); - _collections.clear(); - _shards.clear(); + decltype(_plannedCollections) newCollections; + decltype(_shards) newShards; + decltype(_shardKeys) newShardKeys; std::map::iterator it = result._values.begin(); @@ -636,13 +685,13 @@ void ClusterInfo::loadPlannedCollections (bool acquireLock) { const std::string collection = parts[1]; // check whether we have created an entry for the database already - AllCollections::iterator it2 = _collections.find(database); + AllCollections::iterator it2 = newCollections.find(database); - if (it2 == _collections.end()) { + if (it2 == newCollections.end()) { // not yet, so create an entry for the database DatabaseCollections empty; - _collections.emplace(std::make_pair(database, empty)); - it2 = _collections.find(database); + newCollections.emplace(std::make_pair(database, empty)); + it2 = newCollections.find(database); } TRI_json_t* json = (*it).second._json; @@ -652,15 +701,15 @@ void ClusterInfo::loadPlannedCollections (bool acquireLock) { shared_ptr collectionData (new CollectionInfo(json)); vector* shardKeys = new vector; *shardKeys = collectionData->shardKeys(); - _shardKeys.insert( - make_pair(collection, shared_ptr > (shardKeys))); + newShardKeys.insert( + make_pair(collection, shared_ptr > (shardKeys))); map shardIDs = collectionData->shardIds(); vector* shards = new vector; map::iterator it3; for (it3 = shardIDs.begin(); it3 != shardIDs.end(); ++it3) { shards->push_back(it3->first); } - _shards.emplace( + newShards.emplace( std::make_pair(collection, shared_ptr >(shards))); // insert the collection into the existing map, insert it under its @@ -672,12 +721,20 @@ void ClusterInfo::loadPlannedCollections (bool acquireLock) { collectionData)); } - _collectionsValid = true; + + // Now set the new value: + { + WRITE_LOCKER(_plannedCollectionsProt.lock); + _plannedCollections.swap(newCollections); + _shards.swap(newShards); + _shardKeys.swap(newShardKeys); + _plannedCollectionsProt.version++; // such that others notice our change + _plannedCollectionsProt.isValid = true; // will never be reset to false + } return; } LOG_TRACE("Error while loading %s", prefixPlannedCollections.c_str()); - _collectionsValid = false; } //////////////////////////////////////////////////////////////////////////////// @@ -690,18 +747,18 @@ shared_ptr ClusterInfo::getCollection CollectionID const& collectionID) { int tries = 0; - if (! _collectionsValid) { + if (! _plannedCollectionsProt.isValid) { loadPlannedCollections(true); ++tries; } while (true) { // left by break { - READ_LOCKER(_lock); + READ_LOCKER(_plannedCollectionsProt.lock); // look up database by id - AllCollections::const_iterator it = _collections.find(databaseID); + AllCollections::const_iterator it = _plannedCollections.find(databaseID); - if (it != _collections.end()) { + if (it != _plannedCollections.end()) { // look up collection by id (or by name) DatabaseCollections::const_iterator it2 = (*it).second.find(collectionID); @@ -773,11 +830,11 @@ const std::vector > ClusterInfo::getCollections // always reload loadPlannedCollections(true); - READ_LOCKER(_lock); + READ_LOCKER(_plannedCollectionsProt.lock); // look up database by id - AllCollections::const_iterator it = _collections.find(databaseID); + AllCollections::const_iterator it = _plannedCollections.find(databaseID); - if (it == _collections.end()) { + if (it == _plannedCollections.end()) { return result; } @@ -807,8 +864,15 @@ const std::vector > ClusterInfo::getCollections static const std::string prefixCurrentCollections = "Current/Collections"; void ClusterInfo::loadCurrentCollections (bool acquireLock) { - AgencyCommResult result; + uint64_t storedVersion = _currentCollectionsProt.version; + MUTEX_LOCKER(_currentCollectionsProt.mutex); + if (_currentCollectionsProt.version > storedVersion) { + // Somebody else did, what we intended to do, so just return + return; + } + // Now contact the agency: + AgencyCommResult result; { if (acquireLock) { AgencyCommLocker locker("Current", "READ"); @@ -825,9 +889,8 @@ void ClusterInfo::loadCurrentCollections (bool acquireLock) { if (result.successful()) { result.parse(prefixCurrentCollections + "/", false); - WRITE_LOCKER(_lock); - _collectionsCurrent.clear(); - _shardIds.clear(); + decltype(_currentCollections) newCollections; + decltype(_shardIds) newShardIds; std::map::iterator it = result._values.begin(); @@ -849,13 +912,13 @@ void ClusterInfo::loadCurrentCollections (bool acquireLock) { const std::string shardID = parts[2]; // check whether we have created an entry for the database already - AllCollectionsCurrent::iterator it2 = _collectionsCurrent.find(database); + AllCollectionsCurrent::iterator it2 = newCollections.find(database); - if (it2 == _collectionsCurrent.end()) { + if (it2 == newCollections.end()) { // not yet, so create an entry for the database DatabaseCollectionsCurrent empty; - _collectionsCurrent.insert(std::make_pair(database, empty)); - it2 = _collectionsCurrent.find(database); + newCollections.insert(std::make_pair(database, empty)); + it2 = newCollections.find(database); } TRI_json_t* json = (*it).second._json; @@ -886,15 +949,22 @@ void ClusterInfo::loadCurrentCollections (bool acquireLock) { std::string DBserver = triagens::basics::JsonHelper::getStringValue (json, "DBServer", ""); if (DBserver != "") { - _shardIds.insert(make_pair(shardID, DBserver)); + newShardIds.insert(make_pair(shardID, DBserver)); } } - _collectionsCurrentValid = true; + + // Now set the new value: + { + WRITE_LOCKER(_currentCollectionsProt.lock); + _currentCollections.swap(newCollections); + _shardIds.swap(newShardIds); + _currentCollectionsProt.version++; // such that others notice our change + _currentCollectionsProt.isValid = true; // will never be reset to false + } return; } LOG_TRACE("Error while loading %s", prefixCurrentCollections.c_str()); - _collectionsCurrentValid = false; } //////////////////////////////////////////////////////////////////////////////// @@ -908,18 +978,18 @@ shared_ptr ClusterInfo::getCollectionCurrent CollectionID const& collectionID) { int tries = 0; - if (! _collectionsCurrentValid) { + if (! _currentCollectionsProt.isValid) { loadCurrentCollections(true); ++tries; } while (true) { { - READ_LOCKER(_lock); + READ_LOCKER(_currentCollectionsProt.lock); // look up database by id - AllCollectionsCurrent::const_iterator it = _collectionsCurrent.find(databaseID); + AllCollectionsCurrent::const_iterator it = _currentCollections.find(databaseID); - if (it != _collectionsCurrent.end()) { + if (it != _currentCollections.end()) { // look up collection by id DatabaseCollectionsCurrent::const_iterator it2 = (*it).second.find(collectionID); @@ -976,6 +1046,9 @@ int ClusterInfo::createDatabaseCoordinator (string const& name, } } + // Now update our own cache of planned databases: + loadPlannedDatabases(); + // Now wait for it to appear and be complete: res.clear(); res = ac.getValues("Current/Version", false); @@ -1022,6 +1095,7 @@ int ClusterInfo::createDatabaseCoordinator (string const& name, errorMsg = "Error in creation of database:" + tmpMsg; return TRI_ERROR_CLUSTER_COULD_NOT_CREATE_DATABASE; } + loadCurrentDatabases(); // update our cache return setErrormsg(TRI_ERROR_NO_ERROR, errorMsg); } } @@ -1087,7 +1161,9 @@ int ClusterInfo::dropDatabaseCoordinator (string const& name, string& errorMsg, } } - _collectionsValid = false; + // Load our own caches: + loadPlannedDatabases(); + loadPlannedCollections(true); // Now wait for it to appear and be complete: res.clear(); @@ -1153,9 +1229,9 @@ int ClusterInfo::createCollectionCoordinator (string const& databaseName, // check if a collection with the same name is already planned loadPlannedCollections(false); - READ_LOCKER(_lock); - AllCollections::const_iterator it = _collections.find(databaseName); - if (it != _collections.end()) { + READ_LOCKER(_plannedCollectionsProt.lock); + AllCollections::const_iterator it = _plannedCollections.find(databaseName); + if (it != _plannedCollections.end()) { const std::string name = JsonHelper::getStringValue(json, "name", ""); DatabaseCollections::const_iterator it2 = (*it).second.find(name); @@ -1184,6 +1260,9 @@ int ClusterInfo::createCollectionCoordinator (string const& databaseName, } } + // Update our cache: + loadPlannedCollections(); + // Now wait for it to appear and be complete: AgencyCommResult res = ac.getValues("Current/Version", false); if (!res.successful()) { @@ -1226,6 +1305,7 @@ int ClusterInfo::createCollectionCoordinator (string const& databaseName, errorMsg = "Error in creation of collection:" + tmpMsg; return TRI_ERROR_CLUSTER_COULD_NOT_CREATE_COLLECTION; } + loadPlannedCollections(); return setErrormsg(TRI_ERROR_NO_ERROR, errorMsg); } } @@ -1276,7 +1356,8 @@ int ClusterInfo::dropCollectionCoordinator (string const& databaseName, } } - flush(); + // Update our own cache: + loadPlannedCollections(true); // Now wait for it to appear and be complete: res.clear(); @@ -1307,6 +1388,7 @@ int ClusterInfo::dropCollectionCoordinator (string const& databaseName, return setErrormsg( TRI_ERROR_CLUSTER_COULD_NOT_REMOVE_COLLECTION_IN_CURRENT, errorMsg); } + loadCurrentCollections(); return setErrormsg(TRI_ERROR_NO_ERROR, errorMsg); } } @@ -1499,10 +1581,15 @@ int ClusterInfo::ensureIndexCoordinator (string const& databaseName, { loadPlannedCollections(false); - READ_LOCKER(_lock); - shared_ptr c = getCollection(databaseName, collectionID); + // Note that nobody is removing this collection in the plan, since + // we hold the write lock in the agency, therefore it does not matter + // that getCollection fetches the read lock and releases it before + // we get it again. + // + READ_LOCKER(_plannedCollectionsProt.lock); + if (c->empty()) { return setErrormsg(TRI_ERROR_ARANGO_COLLECTION_NOT_FOUND, errorMsg); } @@ -1604,8 +1691,8 @@ int ClusterInfo::ensureIndexCoordinator (string const& databaseName, } } - // wipe cache - flush(); + // reload our own cache: + loadPlannedCollections(true); TRI_ASSERT(numberOfShards > 0); @@ -1672,6 +1759,8 @@ int ClusterInfo::ensureIndexCoordinator (string const& databaseName, resultJson = newIndex; TRI_Insert3ObjectJson(TRI_UNKNOWN_MEM_ZONE, resultJson, "isNewlyCreated", TRI_CreateBooleanJson(TRI_UNKNOWN_MEM_ZONE, true)); + loadCurrentCollections(); + return setErrormsg(TRI_ERROR_NO_ERROR, errorMsg); } } @@ -1717,10 +1806,10 @@ int ClusterInfo::dropIndexCoordinator (string const& databaseName, { loadPlannedCollections(false); - READ_LOCKER(_lock); - shared_ptr c = getCollection(databaseName, collectionID); + READ_LOCKER(_plannedCollectionsProt.lock); + if (c->empty()) { return setErrormsg(TRI_ERROR_ARANGO_COLLECTION_NOT_FOUND, errorMsg); } @@ -1804,8 +1893,8 @@ int ClusterInfo::dropIndexCoordinator (string const& databaseName, } } - // wipe cache - flush(); + // load our own cache: + loadPlannedCollections(); TRI_ASSERT(numberOfShards > 0); @@ -1851,6 +1940,7 @@ int ClusterInfo::dropIndexCoordinator (string const& databaseName, } if (! found) { + loadCurrentCollections(); return setErrormsg(TRI_ERROR_NO_ERROR, errorMsg); } } @@ -1870,10 +1960,18 @@ int ClusterInfo::dropIndexCoordinator (string const& databaseName, //////////////////////////////////////////////////////////////////////////////// static const std::string prefixServers = "Current/ServersRegistered"; + void ClusterInfo::loadServers () { - AgencyCommResult result; + uint64_t storedVersion = _serversProt.version; + MUTEX_LOCKER(_serversProt.mutex); + if (_serversProt.version > storedVersion) { + // Somebody else did, what we intended to do, so just return + return; + } + // Now contact the agency: + AgencyCommResult result; { AgencyCommLocker locker("Current", "READ"); @@ -1885,8 +1983,7 @@ void ClusterInfo::loadServers () { if (result.successful()) { result.parse(prefixServers + "/", false); - WRITE_LOCKER(_lock); - _servers.clear(); + decltype(_servers) newServers; std::map::const_iterator it = result._values.begin(); @@ -1897,21 +1994,22 @@ void ClusterInfo::loadServers () { if (nullptr != sub) { std::string server = triagens::basics::JsonHelper::getStringValue(sub, ""); - _servers.emplace(std::make_pair((*it).first, server)); + newServers.emplace(std::make_pair((*it).first, server)); } ++it; } - _serversValid = true; - + // Now set the new value: + { + WRITE_LOCKER(_serversProt.lock); + _servers.swap(newServers); + _serversProt.version++; // such that others notice our change + _serversProt.isValid = true; // will never be reset to false + } return; } LOG_TRACE("Error while loading %s", prefixServers.c_str()); - - _serversValid = false; - - return; } //////////////////////////////////////////////////////////////////////////////// @@ -1923,14 +2021,14 @@ void ClusterInfo::loadServers () { std::string ClusterInfo::getServerEndpoint (ServerID const& serverID) { int tries = 0; - if (! _serversValid) { + if (! _serversProt.isValid) { loadServers(); tries++; } while (true) { { - READ_LOCKER(_lock); + READ_LOCKER(_serversProt.lock); // _servers is a map-type auto it = _servers.find(serverID); @@ -1959,14 +2057,14 @@ std::string ClusterInfo::getServerEndpoint (ServerID const& serverID) { std::string ClusterInfo::getServerName (std::string const& endpoint) { int tries = 0; - if (! _serversValid) { + if (! _serversProt.isValid) { loadServers(); tries++; } while (true) { { - READ_LOCKER(_lock); + READ_LOCKER(_serversProt.lock); for (auto const& it : _servers) { if (it.second == endpoint) { return it.first; @@ -1994,8 +2092,15 @@ static const std::string prefixCurrentCoordinators = "Current/Coordinators"; void ClusterInfo::loadCurrentCoordinators () { - AgencyCommResult result; + uint64_t storedVersion = _coordinatorsProt.version; + MUTEX_LOCKER(_coordinatorsProt.mutex); + if (_coordinatorsProt.version > storedVersion) { + // Somebody else did, what we intended to do, so just return + return; + } + // Now contact the agency: + AgencyCommResult result; { AgencyCommLocker locker("Current", "READ"); @@ -2007,23 +2112,25 @@ void ClusterInfo::loadCurrentCoordinators () { if (result.successful()) { result.parse(prefixCurrentCoordinators + "/", false); - WRITE_LOCKER(_lock); - _coordinators.clear(); + decltype(_coordinators) newCoordinators; std::map::const_iterator it = result._values.begin(); for (; it != result._values.end(); ++it) { - _coordinators.emplace(std::make_pair((*it).first, triagens::basics::JsonHelper::getStringValue((*it).second._json, ""))); + newCoordinators.emplace(std::make_pair((*it).first, triagens::basics::JsonHelper::getStringValue((*it).second._json, ""))); } - _coordinatorsValid = true; + // Now set the new value: + { + WRITE_LOCKER(_coordinatorsProt.lock); + _coordinators.swap(newCoordinators); + _coordinatorsProt.version++; // such that others notice our change + _coordinatorsProt.isValid = true; // will never be reset to false + } return; } LOG_TRACE("Error while loading %s", prefixCurrentCoordinators.c_str()); - - _coordinatorsValid = false; - return; } @@ -2036,8 +2143,15 @@ static const std::string prefixCurrentDBServers = "Current/DBServers"; void ClusterInfo::loadCurrentDBServers () { - AgencyCommResult result; + uint64_t storedVersion = _DBServersProt.version; + MUTEX_LOCKER(_DBServersProt.mutex); + if (_DBServersProt.version > storedVersion) { + // Somebody else did, what we intended to do, so just return + return; + } + // Now contact the agency: + AgencyCommResult result; { AgencyCommLocker locker("Current", "READ"); @@ -2049,23 +2163,25 @@ void ClusterInfo::loadCurrentDBServers () { if (result.successful()) { result.parse(prefixCurrentDBServers + "/", false); - WRITE_LOCKER(_lock); - _DBServers.clear(); + decltype(_DBServers) newDBServers; std::map::const_iterator it = result._values.begin(); for (; it != result._values.end(); ++it) { - _DBServers.emplace(std::make_pair((*it).first, triagens::basics::JsonHelper::getStringValue((*it).second._json, ""))); + newDBServers.emplace(std::make_pair((*it).first, triagens::basics::JsonHelper::getStringValue((*it).second._json, ""))); } - _DBServersValid = true; + // Now set the new value: + { + WRITE_LOCKER(_DBServersProt.lock); + newDBServers.swap(newDBServers); + _DBServersProt.version++; // such that others notice our change + _DBServersProt.isValid = true; // will never be reset to false + } return; } LOG_TRACE("Error while loading %s", prefixCurrentDBServers.c_str()); - - _DBServersValid = false; - return; } @@ -2076,22 +2192,24 @@ void ClusterInfo::loadCurrentDBServers () { std::vector ClusterInfo::getCurrentDBServers () { std::vector result; - int tries = 0; + + if (! _DBServersProt.isValid) { + loadCurrentDBServers(); + tries++; + } while (true) { { // return a consistent state of servers - READ_LOCKER(_lock); + READ_LOCKER(_DBServersProt.lock); - if (_DBServersValid) { - result.reserve(_DBServers.size()); + result.reserve(_DBServers.size()); - for (auto& it : _DBServers) { - result.emplace_back(it.first); - } - - return result; + for (auto& it : _DBServers) { + result.emplace_back(it.first); } + + return result; } if (++tries >= 2) { @@ -2150,14 +2268,14 @@ std::string ClusterInfo::getTargetServerEndpoint (ServerID const& serverID) { ServerID ClusterInfo::getResponsibleServer (ShardID const& shardID) { int tries = 0; - if (! _collectionsCurrentValid) { + if (! _currentCollectionsProt.isValid) { loadCurrentCollections(true); tries++; } while (true) { { - READ_LOCKER(_lock); + READ_LOCKER(_currentCollectionsProt.lock); // _shardIds is a map-type auto it = _shardIds.find(shardID); @@ -2203,7 +2321,7 @@ int ClusterInfo::getResponsibleShard (CollectionID const& collectionID, // Note that currently we take the number of shards and the shardKeys // from Plan, since they are immutable. Later we will have to switch // this to Current, when we allow to add and remove shards. - if (! _collectionsValid) { + if (! _plannedCollectionsProt.isValid) { loadPlannedCollections(); } @@ -2216,7 +2334,7 @@ int ClusterInfo::getResponsibleShard (CollectionID const& collectionID, while (true) { { // Get the sharding keys and the number of shards: - READ_LOCKER(_lock); + READ_LOCKER(_plannedCollectionsProt.lock); // _shards is a map-type >> auto it = _shards.find(collectionID); @@ -2243,7 +2361,7 @@ int ClusterInfo::getResponsibleShard (CollectionID const& collectionID, if (++tries >= 2) { break; } - loadPlannedCollections(); + loadPlannedCollections(true); } if (! found) { @@ -2272,22 +2390,24 @@ int ClusterInfo::getResponsibleShard (CollectionID const& collectionID, std::vector ClusterInfo::getCurrentCoordinators () { std::vector result; - int tries = 0; + + if (! _coordinatorsProt.isValid) { + loadCurrentCoordinators(); + tries++; + } while (true) { { // return a consistent state of servers - READ_LOCKER(_lock); + READ_LOCKER(_coordinatorsProt.lock); - if (_coordinatorsValid) { - result.reserve(_coordinators.size()); + result.reserve(_coordinators.size()); - for (auto& it : _coordinators) { - result.emplace_back(it.first); - } - - return result; + for (auto& it : _coordinators) { + result.emplace_back(it.first); } + + return result; } if (++tries >= 2) { diff --git a/arangod/Cluster/ClusterInfo.h b/arangod/Cluster/ClusterInfo.h index 2b4b48bd04..138d3b967d 100644 --- a/arangod/Cluster/ClusterInfo.h +++ b/arangod/Cluster/ClusterInfo.h @@ -1010,16 +1010,20 @@ namespace triagens { private: //////////////////////////////////////////////////////////////////////////////// -/// @brief flushes the list of planned databases +/// @brief actually clears a list of planned databases //////////////////////////////////////////////////////////////////////////////// - void clearPlannedDatabases (); + void clearPlannedDatabases ( + std::unordered_map& databases); //////////////////////////////////////////////////////////////////////////////// -/// @brief flushes the list of current databases +/// @brief actually clears a list of current databases //////////////////////////////////////////////////////////////////////////////// - void clearCurrentDatabases (); + void clearCurrentDatabases ( + std::unordered_map>& + databases); //////////////////////////////////////////////////////////////////////////////// /// @brief get an operation timeout @@ -1055,30 +1059,58 @@ namespace triagens { private: AgencyComm _agency; - triagens::basics::ReadWriteLock _lock; // Cached data from the agency, we reload whenever necessary: + // We group the data, each group has an atomic "valid-flag" + // which is used for lazy loading in the beginning. It starts + // as false, is set to true at each reload and is never reset + // to false in the lifetime of the server. The variable is + // atomic to be able to check it without acquiring + // the read lock (see below). Flush is just an explicit reload + // for all data and is only used in tests. + // Furthermore, each group has a mutex that protects against + // simultaneously contacting the agency for an update. + // In addition, each group has an atomic version number, this is used + // to prevent a stampede if multiple threads notice concurrently + // that an update from the agency is necessary. Finally, there is + // a read/write lock which protects the actual data structure. + // We encapsulate this protection in the struct ProtectionData: + + struct ProtectionData { + std::atomic isValid; + triagens::basics::Mutex mutex; + std::atomic version; + triagens::basics::ReadWriteLock lock; + + ProtectionData () : isValid(false), version(0) { + } + }; + // The servers, first all, we only need Current here: std::unordered_map _servers; // from Current/ServersRegistered - bool - _serversValid; + ProtectionData _serversProt; + // The DBServers, also from Current: std::unordered_map _DBServers; // from Current/DBServers - bool _DBServersValid; + ProtectionData _DBServersProt; + // The Coordinators, also from Current: std::unordered_map _coordinators; // from Current/Coordinators - bool _coordinatorsValid; + ProtectionData _coordinatorsProt; // First the databases, there is Plan and Current information: std::unordered_map _plannedDatabases; // from Plan/Databases + ProtectionData _plannedDatabasesProt; + std::unordered_map> _currentDatabases; // from Current/Databases + ProtectionData _currentDatabasesProt; // Finally, we need information about collections, again we have // data from Plan and from Current. @@ -1090,8 +1122,8 @@ namespace triagens { // The Plan state: AllCollections - _collections; // from Plan/Collections/ - bool _collectionsValid; + _plannedCollections; // from Plan/Collections/ + ProtectionData _plannedCollectionsProt; std::unordered_map>> _shards; // from Plan/Collections/ @@ -1102,9 +1134,8 @@ namespace triagens { // The Current state: AllCollectionsCurrent - _collectionsCurrent; // from Current/Collections/ - bool - _collectionsCurrentValid; + _currentCollections; // from Current/Collections/ + ProtectionData _currentCollectionsProt; std::unordered_map _shardIds; // from Current/Collections/