1
0
Fork 0

Try to sort out ClusterInfo protection for good.

This commit is contained in:
Max Neunhoeffer 2015-08-03 15:07:59 +02:00
parent e8b4e510d8
commit d0cfab6248
2 changed files with 373 additions and 222 deletions

View File

@ -264,12 +264,6 @@ ClusterInfo* ClusterInfo::instance () {
ClusterInfo::ClusterInfo ()
: _agency(),
_serversValid(false),
_DBServersValid(false),
_coordinatorsValid(false),
_plannedDatabases(),
_currentDatabases(),
_collectionsValid(false),
_uniqid() {
_uniqid._currentValue = _uniqid._upperValue = 0ULL;
@ -282,8 +276,8 @@ ClusterInfo::ClusterInfo ()
////////////////////////////////////////////////////////////////////////////////
ClusterInfo::~ClusterInfo () {
clearPlannedDatabases();
clearCurrentDatabases();
clearPlannedDatabases(_plannedDatabases);
clearCurrentDatabases(_currentDatabases);
}
// -----------------------------------------------------------------------------
@ -329,21 +323,13 @@ uint64_t ClusterInfo::uniqid (uint64_t count) {
////////////////////////////////////////////////////////////////////////////////
void ClusterInfo::flush () {
WRITE_LOCKER(_lock);
_collectionsValid = false;
_collectionsCurrentValid = false;
_serversValid = false;
_DBServersValid = false;
_coordinatorsValid = false;
_collections.clear();
_collectionsCurrent.clear();
_servers.clear();
_shardIds.clear();
clearPlannedDatabases();
clearCurrentDatabases();
loadServers();
loadCurrentDBServers();
loadCurrentCoordinators();
loadPlannedDatabases();
loadCurrentDatabases();
loadPlannedCollections();
loadCurrentCollections();
}
////////////////////////////////////////////////////////////////////////////////
@ -354,26 +340,38 @@ bool ClusterInfo::doesDatabaseExist (DatabaseID const& databaseID,
bool reload) {
int tries = 0;
if (reload) {
if (reload ||
! _plannedDatabasesProt.isValid ||
! _currentDatabasesProt.isValid ||
! _DBServersProt.isValid) {
loadPlannedDatabases();
loadCurrentDatabases();
loadCurrentDBServers();
++tries;
++tries; // no need to reload if the database is not found
}
// From now on we know that all data has been valid once, so no need
// to check the isValid flags again under the lock.
while (true) {
{
READ_LOCKER(_lock);
const size_t expectedSize = _DBServers.size();
size_t expectedSize;
{
READ_LOCKER(_DBServersProt.lock);
expectedSize = _DBServers.size();
}
// look up database by name
// look up database by name:
READ_LOCKER(_plannedDatabasesProt.lock);
// _plannedDatabases is a map-type<DatabaseID, TRI_json_t*>
auto it = _plannedDatabases.find(databaseID);
if (it != _plannedDatabases.end()) {
// found the database in Plan
// _currentDatabases is a map-type<ServerID, TRI_json_t*>
READ_LOCKER(_currentDatabasesProt.lock);
// _currentDatabases is
// a map-type<DatabaseID, a map-type<ServerID, TRI_json_t*>>
auto it2 = _currentDatabases.find(databaseID);
if (it2 != _currentDatabases.end()) {
@ -403,15 +401,27 @@ bool ClusterInfo::doesDatabaseExist (DatabaseID const& databaseID,
vector<DatabaseID> ClusterInfo::listDatabases (bool reload) {
vector<DatabaseID> result;
if (reload) {
if (reload ||
! _plannedDatabasesProt.isValid ||
! _currentDatabasesProt.isValid ||
! _DBServersProt.isValid) {
loadPlannedDatabases();
loadCurrentDatabases();
loadCurrentDBServers();
}
READ_LOCKER(_lock);
const size_t expectedSize = _DBServers.size();
// From now on we know that all data has been valid once, so no need
// to check the isValid flags again under the lock.
size_t expectedSize;
{
READ_LOCKER(_DBServersProt.lock);
expectedSize = _DBServers.size();
}
{
READ_LOCKER(_plannedDatabasesProt.lock);
READ_LOCKER(_currentDatabasesProt.lock);
// _plannedDatabases is a map-type<DatabaseID, TRI_json_t*>
auto it = _plannedDatabases.begin();
@ -428,19 +438,20 @@ vector<DatabaseID> ClusterInfo::listDatabases (bool reload) {
++it;
}
}
return result;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief flushes the list of planned databases
/// @brief actually clears a list of planned databases
////////////////////////////////////////////////////////////////////////////////
void ClusterInfo::clearPlannedDatabases () {
// _plannedDatabases is a map-type<DatabaseID, TRI_json_t*>
auto it = _plannedDatabases.begin();
void ClusterInfo::clearPlannedDatabases (
std::unordered_map<DatabaseID, TRI_json_t*>& databases) {
while (it != _plannedDatabases.end()) {
auto it = databases.begin();
while (it != databases.end()) {
TRI_json_t* json = (*it).second;
if (json != nullptr) {
@ -448,20 +459,79 @@ void ClusterInfo::clearPlannedDatabases () {
}
++it;
}
_plannedDatabases.clear();
databases.clear();
}
////////////////////////////////////////////////////////////////////////////////
/// @brief flushes the list of current databases
/// @brief (re-)load the information about planned databases
/// Usually one does not have to call this directly.
////////////////////////////////////////////////////////////////////////////////
//
static const std::string prefixPlannedDatabases = "Plan/Databases";
void ClusterInfo::loadPlannedDatabases () {
uint64_t storedVersion = _plannedDatabasesProt.version;
MUTEX_LOCKER(_plannedDatabasesProt.mutex);
if (_plannedDatabasesProt.version > storedVersion) {
// Somebody else did, what we intended to do, so just return
return;
}
// Now contact the agency:
AgencyCommResult result;
{
AgencyCommLocker locker("Plan", "READ");
if (locker.successful()) {
result = _agency.getValues(prefixPlannedDatabases, true);
}
}
if (result.successful()) {
result.parse(prefixPlannedDatabases + "/", false);
decltype(_plannedDatabases) newDatabases;
// result._values is a std::map<std::string, AgencyCommResultEntry>
auto it = result._values.begin();
while (it != result._values.end()) {
string const& name = (*it).first;
TRI_json_t* options = (*it).second._json;
// steal the json
(*it).second._json = nullptr;
newDatabases.insert(std::make_pair(name, options));
++it;
}
// Now set the new value:
{
WRITE_LOCKER(_plannedDatabasesProt.lock);
_plannedDatabases.swap(newDatabases);
_plannedDatabasesProt.version++; // such that others notice our change
_plannedDatabasesProt.isValid = true; // will never be reset to false
}
clearPlannedDatabases(newDatabases); // delete the old stuff
return;
}
LOG_TRACE("Error while loading %s", prefixPlannedDatabases.c_str());
}
////////////////////////////////////////////////////////////////////////////////
/// @brief deletes a list of current databases
////////////////////////////////////////////////////////////////////////////////
void ClusterInfo::clearCurrentDatabases () {
// _currentDatabases is
// a map-type<DatabaseID, a map-type<ServerID, TRI_json_t*>>
auto it = _currentDatabases.begin();
void ClusterInfo::clearCurrentDatabases (
std::unordered_map<DatabaseID,
std::unordered_map<ServerID, TRI_json_t*>>&
databases) {
while (it != _currentDatabases.end()) {
auto it = databases.begin();
while (it != databases.end()) {
auto it2 = (*it).second.begin();
while (it2 != (*it).second.end()) {
@ -476,51 +546,7 @@ void ClusterInfo::clearCurrentDatabases () {
++it;
}
_currentDatabases.clear();
}
////////////////////////////////////////////////////////////////////////////////
/// @brief (re-)load the information about planned databases
/// Usually one does not have to call this directly.
////////////////////////////////////////////////////////////////////////////////
//
static const std::string prefixPlannedDatabases = "Plan/Databases";
void ClusterInfo::loadPlannedDatabases () {
AgencyCommResult result;
{
AgencyCommLocker locker("Plan", "READ");
if (locker.successful()) {
result = _agency.getValues(prefixPlannedDatabases, true);
}
}
if (result.successful()) {
result.parse(prefixPlannedDatabases + "/", false);
WRITE_LOCKER(_lock);
clearPlannedDatabases();
// result._values is a std::map<std::string, AgencyCommResultEntry>
auto it = result._values.begin();
while (it != result._values.end()) {
string const& name = (*it).first;
TRI_json_t* options = (*it).second._json;
// steal the json
(*it).second._json = nullptr;
_plannedDatabases.insert(std::make_pair(name, options));
++it;
}
return;
}
LOG_TRACE("Error while loading %s", prefixPlannedDatabases.c_str());
databases.clear();
}
////////////////////////////////////////////////////////////////////////////////
@ -529,10 +555,18 @@ void ClusterInfo::loadPlannedDatabases () {
////////////////////////////////////////////////////////////////////////////////
static const std::string prefixCurrentDatabases = "Current/Databases";
void ClusterInfo::loadCurrentDatabases () {
AgencyCommResult result;
uint64_t storedVersion = _currentDatabasesProt.version;
MUTEX_LOCKER(_currentDatabasesProt.mutex);
if (_currentDatabasesProt.version > storedVersion) {
// Somebody else did, what we intended to do, so just return
return;
}
// Now contact the agency:
AgencyCommResult result;
{
AgencyCommLocker locker("Plan", "READ");
@ -544,8 +578,7 @@ void ClusterInfo::loadCurrentDatabases () {
if (result.successful()) {
result.parse(prefixCurrentDatabases + "/", false);
WRITE_LOCKER(_lock);
clearCurrentDatabases();
decltype(_currentDatabases) newDatabases;
std::map<std::string, AgencyCommResultEntry>::iterator it = result._values.begin();
@ -563,12 +596,12 @@ void ClusterInfo::loadCurrentDatabases () {
// _currentDatabases is
// a map-type<DatabaseID, a map-type<ServerID, TRI_json_t*>>
auto it2 = _currentDatabases.find(database);
auto it2 = newDatabases.find(database);
if (it2 == _currentDatabases.end()) {
if (it2 == newDatabases.end()) {
// insert an empty list for this database
decltype(it2->second) empty;
it2 = _currentDatabases.insert(std::make_pair(database, empty)).first;
it2 = newDatabases.insert(std::make_pair(database, empty)).first;
}
if (parts.size() == 2) {
@ -582,6 +615,14 @@ void ClusterInfo::loadCurrentDatabases () {
++it;
}
// Now set the new value:
{
WRITE_LOCKER(_currentDatabasesProt.lock);
_currentDatabases.swap(newDatabases);
_currentDatabasesProt.version++; // such that others notice our change
_currentDatabasesProt.isValid = true; // will never be reset to false
}
clearCurrentDatabases(newDatabases); // delete the old stuff
return;
}
@ -594,10 +635,18 @@ void ClusterInfo::loadCurrentDatabases () {
////////////////////////////////////////////////////////////////////////////////
static const std::string prefixPlannedCollections = "Plan/Collections";
void ClusterInfo::loadPlannedCollections (bool acquireLock) {
AgencyCommResult result;
uint64_t storedVersion = _plannedCollectionsProt.version;
MUTEX_LOCKER(_plannedCollectionsProt.mutex);
if (_plannedCollectionsProt.version > storedVersion) {
// Somebody else did, what we intended to do, so just return
return;
}
// Now contact the agency:
AgencyCommResult result;
{
if (acquireLock) {
AgencyCommLocker locker("Plan", "READ");
@ -614,9 +663,9 @@ void ClusterInfo::loadPlannedCollections (bool acquireLock) {
if (result.successful()) {
result.parse(prefixPlannedCollections + "/", false);
WRITE_LOCKER(_lock);
_collections.clear();
_shards.clear();
decltype(_plannedCollections) newCollections;
decltype(_shards) newShards;
decltype(_shardKeys) newShardKeys;
std::map<std::string, AgencyCommResultEntry>::iterator it = result._values.begin();
@ -636,13 +685,13 @@ void ClusterInfo::loadPlannedCollections (bool acquireLock) {
const std::string collection = parts[1];
// check whether we have created an entry for the database already
AllCollections::iterator it2 = _collections.find(database);
AllCollections::iterator it2 = newCollections.find(database);
if (it2 == _collections.end()) {
if (it2 == newCollections.end()) {
// not yet, so create an entry for the database
DatabaseCollections empty;
_collections.emplace(std::make_pair(database, empty));
it2 = _collections.find(database);
newCollections.emplace(std::make_pair(database, empty));
it2 = newCollections.find(database);
}
TRI_json_t* json = (*it).second._json;
@ -652,7 +701,7 @@ void ClusterInfo::loadPlannedCollections (bool acquireLock) {
shared_ptr<CollectionInfo> collectionData (new CollectionInfo(json));
vector<string>* shardKeys = new vector<string>;
*shardKeys = collectionData->shardKeys();
_shardKeys.insert(
newShardKeys.insert(
make_pair(collection, shared_ptr<vector<string> > (shardKeys)));
map<ShardID, ServerID> shardIDs = collectionData->shardIds();
vector<string>* shards = new vector<string>;
@ -660,7 +709,7 @@ void ClusterInfo::loadPlannedCollections (bool acquireLock) {
for (it3 = shardIDs.begin(); it3 != shardIDs.end(); ++it3) {
shards->push_back(it3->first);
}
_shards.emplace(
newShards.emplace(
std::make_pair(collection, shared_ptr<vector<string> >(shards)));
// insert the collection into the existing map, insert it under its
@ -672,12 +721,20 @@ void ClusterInfo::loadPlannedCollections (bool acquireLock) {
collectionData));
}
_collectionsValid = true;
// Now set the new value:
{
WRITE_LOCKER(_plannedCollectionsProt.lock);
_plannedCollections.swap(newCollections);
_shards.swap(newShards);
_shardKeys.swap(newShardKeys);
_plannedCollectionsProt.version++; // such that others notice our change
_plannedCollectionsProt.isValid = true; // will never be reset to false
}
return;
}
LOG_TRACE("Error while loading %s", prefixPlannedCollections.c_str());
_collectionsValid = false;
}
////////////////////////////////////////////////////////////////////////////////
@ -690,18 +747,18 @@ shared_ptr<CollectionInfo> ClusterInfo::getCollection
CollectionID const& collectionID) {
int tries = 0;
if (! _collectionsValid) {
if (! _plannedCollectionsProt.isValid) {
loadPlannedCollections(true);
++tries;
}
while (true) { // left by break
{
READ_LOCKER(_lock);
READ_LOCKER(_plannedCollectionsProt.lock);
// look up database by id
AllCollections::const_iterator it = _collections.find(databaseID);
AllCollections::const_iterator it = _plannedCollections.find(databaseID);
if (it != _collections.end()) {
if (it != _plannedCollections.end()) {
// look up collection by id (or by name)
DatabaseCollections::const_iterator it2 = (*it).second.find(collectionID);
@ -773,11 +830,11 @@ const std::vector<shared_ptr<CollectionInfo> > ClusterInfo::getCollections
// always reload
loadPlannedCollections(true);
READ_LOCKER(_lock);
READ_LOCKER(_plannedCollectionsProt.lock);
// look up database by id
AllCollections::const_iterator it = _collections.find(databaseID);
AllCollections::const_iterator it = _plannedCollections.find(databaseID);
if (it == _collections.end()) {
if (it == _plannedCollections.end()) {
return result;
}
@ -807,8 +864,15 @@ const std::vector<shared_ptr<CollectionInfo> > ClusterInfo::getCollections
static const std::string prefixCurrentCollections = "Current/Collections";
void ClusterInfo::loadCurrentCollections (bool acquireLock) {
AgencyCommResult result;
uint64_t storedVersion = _currentCollectionsProt.version;
MUTEX_LOCKER(_currentCollectionsProt.mutex);
if (_currentCollectionsProt.version > storedVersion) {
// Somebody else did, what we intended to do, so just return
return;
}
// Now contact the agency:
AgencyCommResult result;
{
if (acquireLock) {
AgencyCommLocker locker("Current", "READ");
@ -825,9 +889,8 @@ void ClusterInfo::loadCurrentCollections (bool acquireLock) {
if (result.successful()) {
result.parse(prefixCurrentCollections + "/", false);
WRITE_LOCKER(_lock);
_collectionsCurrent.clear();
_shardIds.clear();
decltype(_currentCollections) newCollections;
decltype(_shardIds) newShardIds;
std::map<std::string, AgencyCommResultEntry>::iterator it = result._values.begin();
@ -849,13 +912,13 @@ void ClusterInfo::loadCurrentCollections (bool acquireLock) {
const std::string shardID = parts[2];
// check whether we have created an entry for the database already
AllCollectionsCurrent::iterator it2 = _collectionsCurrent.find(database);
AllCollectionsCurrent::iterator it2 = newCollections.find(database);
if (it2 == _collectionsCurrent.end()) {
if (it2 == newCollections.end()) {
// not yet, so create an entry for the database
DatabaseCollectionsCurrent empty;
_collectionsCurrent.insert(std::make_pair(database, empty));
it2 = _collectionsCurrent.find(database);
newCollections.insert(std::make_pair(database, empty));
it2 = newCollections.find(database);
}
TRI_json_t* json = (*it).second._json;
@ -886,15 +949,22 @@ void ClusterInfo::loadCurrentCollections (bool acquireLock) {
std::string DBserver = triagens::basics::JsonHelper::getStringValue
(json, "DBServer", "");
if (DBserver != "") {
_shardIds.insert(make_pair(shardID, DBserver));
newShardIds.insert(make_pair(shardID, DBserver));
}
}
_collectionsCurrentValid = true;
// Now set the new value:
{
WRITE_LOCKER(_currentCollectionsProt.lock);
_currentCollections.swap(newCollections);
_shardIds.swap(newShardIds);
_currentCollectionsProt.version++; // such that others notice our change
_currentCollectionsProt.isValid = true; // will never be reset to false
}
return;
}
LOG_TRACE("Error while loading %s", prefixCurrentCollections.c_str());
_collectionsCurrentValid = false;
}
////////////////////////////////////////////////////////////////////////////////
@ -908,18 +978,18 @@ shared_ptr<CollectionInfoCurrent> ClusterInfo::getCollectionCurrent
CollectionID const& collectionID) {
int tries = 0;
if (! _collectionsCurrentValid) {
if (! _currentCollectionsProt.isValid) {
loadCurrentCollections(true);
++tries;
}
while (true) {
{
READ_LOCKER(_lock);
READ_LOCKER(_currentCollectionsProt.lock);
// look up database by id
AllCollectionsCurrent::const_iterator it = _collectionsCurrent.find(databaseID);
AllCollectionsCurrent::const_iterator it = _currentCollections.find(databaseID);
if (it != _collectionsCurrent.end()) {
if (it != _currentCollections.end()) {
// look up collection by id
DatabaseCollectionsCurrent::const_iterator it2 = (*it).second.find(collectionID);
@ -976,6 +1046,9 @@ int ClusterInfo::createDatabaseCoordinator (string const& name,
}
}
// Now update our own cache of planned databases:
loadPlannedDatabases();
// Now wait for it to appear and be complete:
res.clear();
res = ac.getValues("Current/Version", false);
@ -1022,6 +1095,7 @@ int ClusterInfo::createDatabaseCoordinator (string const& name,
errorMsg = "Error in creation of database:" + tmpMsg;
return TRI_ERROR_CLUSTER_COULD_NOT_CREATE_DATABASE;
}
loadCurrentDatabases(); // update our cache
return setErrormsg(TRI_ERROR_NO_ERROR, errorMsg);
}
}
@ -1087,7 +1161,9 @@ int ClusterInfo::dropDatabaseCoordinator (string const& name, string& errorMsg,
}
}
_collectionsValid = false;
// Load our own caches:
loadPlannedDatabases();
loadPlannedCollections(true);
// Now wait for it to appear and be complete:
res.clear();
@ -1153,9 +1229,9 @@ int ClusterInfo::createCollectionCoordinator (string const& databaseName,
// check if a collection with the same name is already planned
loadPlannedCollections(false);
READ_LOCKER(_lock);
AllCollections::const_iterator it = _collections.find(databaseName);
if (it != _collections.end()) {
READ_LOCKER(_plannedCollectionsProt.lock);
AllCollections::const_iterator it = _plannedCollections.find(databaseName);
if (it != _plannedCollections.end()) {
const std::string name = JsonHelper::getStringValue(json, "name", "");
DatabaseCollections::const_iterator it2 = (*it).second.find(name);
@ -1184,6 +1260,9 @@ int ClusterInfo::createCollectionCoordinator (string const& databaseName,
}
}
// Update our cache:
loadPlannedCollections();
// Now wait for it to appear and be complete:
AgencyCommResult res = ac.getValues("Current/Version", false);
if (!res.successful()) {
@ -1226,6 +1305,7 @@ int ClusterInfo::createCollectionCoordinator (string const& databaseName,
errorMsg = "Error in creation of collection:" + tmpMsg;
return TRI_ERROR_CLUSTER_COULD_NOT_CREATE_COLLECTION;
}
loadPlannedCollections();
return setErrormsg(TRI_ERROR_NO_ERROR, errorMsg);
}
}
@ -1276,7 +1356,8 @@ int ClusterInfo::dropCollectionCoordinator (string const& databaseName,
}
}
flush();
// Update our own cache:
loadPlannedCollections(true);
// Now wait for it to appear and be complete:
res.clear();
@ -1307,6 +1388,7 @@ int ClusterInfo::dropCollectionCoordinator (string const& databaseName,
return setErrormsg(
TRI_ERROR_CLUSTER_COULD_NOT_REMOVE_COLLECTION_IN_CURRENT, errorMsg);
}
loadCurrentCollections();
return setErrormsg(TRI_ERROR_NO_ERROR, errorMsg);
}
}
@ -1499,10 +1581,15 @@ int ClusterInfo::ensureIndexCoordinator (string const& databaseName,
{
loadPlannedCollections(false);
READ_LOCKER(_lock);
shared_ptr<CollectionInfo> c = getCollection(databaseName, collectionID);
// Note that nobody is removing this collection in the plan, since
// we hold the write lock in the agency, therefore it does not matter
// that getCollection fetches the read lock and releases it before
// we get it again.
//
READ_LOCKER(_plannedCollectionsProt.lock);
if (c->empty()) {
return setErrormsg(TRI_ERROR_ARANGO_COLLECTION_NOT_FOUND, errorMsg);
}
@ -1604,8 +1691,8 @@ int ClusterInfo::ensureIndexCoordinator (string const& databaseName,
}
}
// wipe cache
flush();
// reload our own cache:
loadPlannedCollections(true);
TRI_ASSERT(numberOfShards > 0);
@ -1672,6 +1759,8 @@ int ClusterInfo::ensureIndexCoordinator (string const& databaseName,
resultJson = newIndex;
TRI_Insert3ObjectJson(TRI_UNKNOWN_MEM_ZONE, resultJson, "isNewlyCreated", TRI_CreateBooleanJson(TRI_UNKNOWN_MEM_ZONE, true));
loadCurrentCollections();
return setErrormsg(TRI_ERROR_NO_ERROR, errorMsg);
}
}
@ -1717,10 +1806,10 @@ int ClusterInfo::dropIndexCoordinator (string const& databaseName,
{
loadPlannedCollections(false);
READ_LOCKER(_lock);
shared_ptr<CollectionInfo> c = getCollection(databaseName, collectionID);
READ_LOCKER(_plannedCollectionsProt.lock);
if (c->empty()) {
return setErrormsg(TRI_ERROR_ARANGO_COLLECTION_NOT_FOUND, errorMsg);
}
@ -1804,8 +1893,8 @@ int ClusterInfo::dropIndexCoordinator (string const& databaseName,
}
}
// wipe cache
flush();
// load our own cache:
loadPlannedCollections();
TRI_ASSERT(numberOfShards > 0);
@ -1851,6 +1940,7 @@ int ClusterInfo::dropIndexCoordinator (string const& databaseName,
}
if (! found) {
loadCurrentCollections();
return setErrormsg(TRI_ERROR_NO_ERROR, errorMsg);
}
}
@ -1870,10 +1960,18 @@ int ClusterInfo::dropIndexCoordinator (string const& databaseName,
////////////////////////////////////////////////////////////////////////////////
static const std::string prefixServers = "Current/ServersRegistered";
void ClusterInfo::loadServers () {
AgencyCommResult result;
uint64_t storedVersion = _serversProt.version;
MUTEX_LOCKER(_serversProt.mutex);
if (_serversProt.version > storedVersion) {
// Somebody else did, what we intended to do, so just return
return;
}
// Now contact the agency:
AgencyCommResult result;
{
AgencyCommLocker locker("Current", "READ");
@ -1885,8 +1983,7 @@ void ClusterInfo::loadServers () {
if (result.successful()) {
result.parse(prefixServers + "/", false);
WRITE_LOCKER(_lock);
_servers.clear();
decltype(_servers) newServers;
std::map<std::string, AgencyCommResultEntry>::const_iterator it = result._values.begin();
@ -1897,21 +1994,22 @@ void ClusterInfo::loadServers () {
if (nullptr != sub) {
std::string server = triagens::basics::JsonHelper::getStringValue(sub, "");
_servers.emplace(std::make_pair((*it).first, server));
newServers.emplace(std::make_pair((*it).first, server));
}
++it;
}
_serversValid = true;
// Now set the new value:
{
WRITE_LOCKER(_serversProt.lock);
_servers.swap(newServers);
_serversProt.version++; // such that others notice our change
_serversProt.isValid = true; // will never be reset to false
}
return;
}
LOG_TRACE("Error while loading %s", prefixServers.c_str());
_serversValid = false;
return;
}
////////////////////////////////////////////////////////////////////////////////
@ -1923,14 +2021,14 @@ void ClusterInfo::loadServers () {
std::string ClusterInfo::getServerEndpoint (ServerID const& serverID) {
int tries = 0;
if (! _serversValid) {
if (! _serversProt.isValid) {
loadServers();
tries++;
}
while (true) {
{
READ_LOCKER(_lock);
READ_LOCKER(_serversProt.lock);
// _servers is a map-type <ServerId, string>
auto it = _servers.find(serverID);
@ -1959,14 +2057,14 @@ std::string ClusterInfo::getServerEndpoint (ServerID const& serverID) {
std::string ClusterInfo::getServerName (std::string const& endpoint) {
int tries = 0;
if (! _serversValid) {
if (! _serversProt.isValid) {
loadServers();
tries++;
}
while (true) {
{
READ_LOCKER(_lock);
READ_LOCKER(_serversProt.lock);
for (auto const& it : _servers) {
if (it.second == endpoint) {
return it.first;
@ -1994,8 +2092,15 @@ static const std::string prefixCurrentCoordinators = "Current/Coordinators";
void ClusterInfo::loadCurrentCoordinators () {
AgencyCommResult result;
uint64_t storedVersion = _coordinatorsProt.version;
MUTEX_LOCKER(_coordinatorsProt.mutex);
if (_coordinatorsProt.version > storedVersion) {
// Somebody else did, what we intended to do, so just return
return;
}
// Now contact the agency:
AgencyCommResult result;
{
AgencyCommLocker locker("Current", "READ");
@ -2007,23 +2112,25 @@ void ClusterInfo::loadCurrentCoordinators () {
if (result.successful()) {
result.parse(prefixCurrentCoordinators + "/", false);
WRITE_LOCKER(_lock);
_coordinators.clear();
decltype(_coordinators) newCoordinators;
std::map<std::string, AgencyCommResultEntry>::const_iterator it = result._values.begin();
for (; it != result._values.end(); ++it) {
_coordinators.emplace(std::make_pair((*it).first, triagens::basics::JsonHelper::getStringValue((*it).second._json, "")));
newCoordinators.emplace(std::make_pair((*it).first, triagens::basics::JsonHelper::getStringValue((*it).second._json, "")));
}
_coordinatorsValid = true;
// Now set the new value:
{
WRITE_LOCKER(_coordinatorsProt.lock);
_coordinators.swap(newCoordinators);
_coordinatorsProt.version++; // such that others notice our change
_coordinatorsProt.isValid = true; // will never be reset to false
}
return;
}
LOG_TRACE("Error while loading %s", prefixCurrentCoordinators.c_str());
_coordinatorsValid = false;
return;
}
@ -2036,8 +2143,15 @@ static const std::string prefixCurrentDBServers = "Current/DBServers";
void ClusterInfo::loadCurrentDBServers () {
AgencyCommResult result;
uint64_t storedVersion = _DBServersProt.version;
MUTEX_LOCKER(_DBServersProt.mutex);
if (_DBServersProt.version > storedVersion) {
// Somebody else did, what we intended to do, so just return
return;
}
// Now contact the agency:
AgencyCommResult result;
{
AgencyCommLocker locker("Current", "READ");
@ -2049,23 +2163,25 @@ void ClusterInfo::loadCurrentDBServers () {
if (result.successful()) {
result.parse(prefixCurrentDBServers + "/", false);
WRITE_LOCKER(_lock);
_DBServers.clear();
decltype(_DBServers) newDBServers;
std::map<std::string, AgencyCommResultEntry>::const_iterator it = result._values.begin();
for (; it != result._values.end(); ++it) {
_DBServers.emplace(std::make_pair((*it).first, triagens::basics::JsonHelper::getStringValue((*it).second._json, "")));
newDBServers.emplace(std::make_pair((*it).first, triagens::basics::JsonHelper::getStringValue((*it).second._json, "")));
}
_DBServersValid = true;
// Now set the new value:
{
WRITE_LOCKER(_DBServersProt.lock);
newDBServers.swap(newDBServers);
_DBServersProt.version++; // such that others notice our change
_DBServersProt.isValid = true; // will never be reset to false
}
return;
}
LOG_TRACE("Error while loading %s", prefixCurrentDBServers.c_str());
_DBServersValid = false;
return;
}
@ -2076,14 +2192,17 @@ void ClusterInfo::loadCurrentDBServers () {
std::vector<ServerID> ClusterInfo::getCurrentDBServers () {
std::vector<ServerID> result;
int tries = 0;
if (! _DBServersProt.isValid) {
loadCurrentDBServers();
tries++;
}
while (true) {
{
// return a consistent state of servers
READ_LOCKER(_lock);
READ_LOCKER(_DBServersProt.lock);
if (_DBServersValid) {
result.reserve(_DBServers.size());
for (auto& it : _DBServers) {
@ -2092,7 +2211,6 @@ std::vector<ServerID> ClusterInfo::getCurrentDBServers () {
return result;
}
}
if (++tries >= 2) {
break;
@ -2150,14 +2268,14 @@ std::string ClusterInfo::getTargetServerEndpoint (ServerID const& serverID) {
ServerID ClusterInfo::getResponsibleServer (ShardID const& shardID) {
int tries = 0;
if (! _collectionsCurrentValid) {
if (! _currentCollectionsProt.isValid) {
loadCurrentCollections(true);
tries++;
}
while (true) {
{
READ_LOCKER(_lock);
READ_LOCKER(_currentCollectionsProt.lock);
// _shardIds is a map-type <ShardId, ServerId>
auto it = _shardIds.find(shardID);
@ -2203,7 +2321,7 @@ int ClusterInfo::getResponsibleShard (CollectionID const& collectionID,
// Note that currently we take the number of shards and the shardKeys
// from Plan, since they are immutable. Later we will have to switch
// this to Current, when we allow to add and remove shards.
if (! _collectionsValid) {
if (! _plannedCollectionsProt.isValid) {
loadPlannedCollections();
}
@ -2216,7 +2334,7 @@ int ClusterInfo::getResponsibleShard (CollectionID const& collectionID,
while (true) {
{
// Get the sharding keys and the number of shards:
READ_LOCKER(_lock);
READ_LOCKER(_plannedCollectionsProt.lock);
// _shards is a map-type <CollectionId, shared_ptr<vector<string>>>
auto it = _shards.find(collectionID);
@ -2243,7 +2361,7 @@ int ClusterInfo::getResponsibleShard (CollectionID const& collectionID,
if (++tries >= 2) {
break;
}
loadPlannedCollections();
loadPlannedCollections(true);
}
if (! found) {
@ -2272,14 +2390,17 @@ int ClusterInfo::getResponsibleShard (CollectionID const& collectionID,
std::vector<ServerID> ClusterInfo::getCurrentCoordinators () {
std::vector<ServerID> result;
int tries = 0;
if (! _coordinatorsProt.isValid) {
loadCurrentCoordinators();
tries++;
}
while (true) {
{
// return a consistent state of servers
READ_LOCKER(_lock);
READ_LOCKER(_coordinatorsProt.lock);
if (_coordinatorsValid) {
result.reserve(_coordinators.size());
for (auto& it : _coordinators) {
@ -2288,7 +2409,6 @@ std::vector<ServerID> ClusterInfo::getCurrentCoordinators () {
return result;
}
}
if (++tries >= 2) {
break;

View File

@ -1010,16 +1010,20 @@ namespace triagens {
private:
////////////////////////////////////////////////////////////////////////////////
/// @brief flushes the list of planned databases
/// @brief actually clears a list of planned databases
////////////////////////////////////////////////////////////////////////////////
void clearPlannedDatabases ();
void clearPlannedDatabases (
std::unordered_map<DatabaseID, TRI_json_t*>& databases);
////////////////////////////////////////////////////////////////////////////////
/// @brief flushes the list of current databases
/// @brief actually clears a list of current databases
////////////////////////////////////////////////////////////////////////////////
void clearCurrentDatabases ();
void clearCurrentDatabases (
std::unordered_map<DatabaseID,
std::unordered_map<ServerID, TRI_json_t*>>&
databases);
////////////////////////////////////////////////////////////////////////////////
/// @brief get an operation timeout
@ -1055,30 +1059,58 @@ namespace triagens {
private:
AgencyComm _agency;
triagens::basics::ReadWriteLock _lock;
// Cached data from the agency, we reload whenever necessary:
// We group the data, each group has an atomic "valid-flag"
// which is used for lazy loading in the beginning. It starts
// as false, is set to true at each reload and is never reset
// to false in the lifetime of the server. The variable is
// atomic to be able to check it without acquiring
// the read lock (see below). Flush is just an explicit reload
// for all data and is only used in tests.
// Furthermore, each group has a mutex that protects against
// simultaneously contacting the agency for an update.
// In addition, each group has an atomic version number, this is used
// to prevent a stampede if multiple threads notice concurrently
// that an update from the agency is necessary. Finally, there is
// a read/write lock which protects the actual data structure.
// We encapsulate this protection in the struct ProtectionData:
struct ProtectionData {
std::atomic<bool> isValid;
triagens::basics::Mutex mutex;
std::atomic<uint64_t> version;
triagens::basics::ReadWriteLock lock;
ProtectionData () : isValid(false), version(0) {
}
};
// The servers, first all, we only need Current here:
std::unordered_map<ServerID, std::string>
_servers; // from Current/ServersRegistered
bool
_serversValid;
ProtectionData _serversProt;
// The DBServers, also from Current:
std::unordered_map<ServerID, ServerID>
_DBServers; // from Current/DBServers
bool _DBServersValid;
ProtectionData _DBServersProt;
// The Coordinators, also from Current:
std::unordered_map<ServerID, ServerID>
_coordinators; // from Current/Coordinators
bool _coordinatorsValid;
ProtectionData _coordinatorsProt;
// First the databases, there is Plan and Current information:
std::unordered_map<DatabaseID, struct TRI_json_t*>
_plannedDatabases; // from Plan/Databases
ProtectionData _plannedDatabasesProt;
std::unordered_map<DatabaseID,
std::unordered_map<ServerID, struct TRI_json_t*>>
_currentDatabases; // from Current/Databases
ProtectionData _currentDatabasesProt;
// Finally, we need information about collections, again we have
// data from Plan and from Current.
@ -1090,8 +1122,8 @@ namespace triagens {
// The Plan state:
AllCollections
_collections; // from Plan/Collections/
bool _collectionsValid;
_plannedCollections; // from Plan/Collections/
ProtectionData _plannedCollectionsProt;
std::unordered_map<CollectionID,
std::shared_ptr<std::vector<std::string>>>
_shards; // from Plan/Collections/
@ -1102,9 +1134,8 @@ namespace triagens {
// The Current state:
AllCollectionsCurrent
_collectionsCurrent; // from Current/Collections/
bool
_collectionsCurrentValid;
_currentCollections; // from Current/Collections/
ProtectionData _currentCollectionsProt;
std::unordered_map<ShardID, ServerID>
_shardIds; // from Current/Collections/