mirror of https://gitee.com/bigwinds/arangodb
Cleanup stuff in ClusterInfo, invalidate caches when needed.
This commit is contained in:
parent
3c6b4d1197
commit
416c493534
|
@ -498,35 +498,36 @@ void ClusterInfo::loadPlannedDatabases() {
|
|||
if (result.successful()) {
|
||||
|
||||
velocypack::Slice databases =
|
||||
result._vpack->slice()[0]
|
||||
.get(AgencyComm::prefix().substr(1,AgencyComm::prefix().size()-2))
|
||||
.get("Plan").get("Databases");
|
||||
result._vpack->slice()[0].get(std::vector<std::string>(
|
||||
{AgencyComm::prefixStripped(), "Plan", "Databases"}));
|
||||
|
||||
decltype(_plannedDatabases) newDatabases;
|
||||
if (!databases.isNone()) {
|
||||
decltype(_plannedDatabases) newDatabases;
|
||||
|
||||
for (auto const& database : VPackObjectIterator(databases)) {
|
||||
for (auto const& database : VPackObjectIterator(databases)) {
|
||||
|
||||
std::string const& name = database.key.copyString();
|
||||
std::string const& name = database.key.copyString();
|
||||
|
||||
// TODO: _plannedDatabases need to be moved to velocypack
|
||||
// Than this can be merged to swap
|
||||
// TODO: _plannedDatabases need to be moved to velocypack
|
||||
// Then this can be merged to swap
|
||||
|
||||
TRI_json_t* options =
|
||||
arangodb::basics::VelocyPackHelper::velocyPackToJson(database.value);
|
||||
TRI_json_t* options =
|
||||
arangodb::basics::VelocyPackHelper::velocyPackToJson(database.value);
|
||||
|
||||
newDatabases.insert(std::make_pair(name, options));
|
||||
newDatabases.insert(std::make_pair(name, options));
|
||||
|
||||
}
|
||||
|
||||
// Now set the new value:
|
||||
{
|
||||
WRITE_LOCKER(writeLocker, _plannedDatabasesProt.lock);
|
||||
_plannedDatabases.swap(newDatabases);
|
||||
_plannedDatabasesProt.version++; // such that others notice our change
|
||||
_plannedDatabasesProt.isValid = true; // will never be reset to false
|
||||
}
|
||||
clearPlannedDatabases(newDatabases); // delete the old stuff
|
||||
return;
|
||||
}
|
||||
|
||||
// Now set the new value:
|
||||
{
|
||||
WRITE_LOCKER(writeLocker, _plannedDatabasesProt.lock);
|
||||
_plannedDatabases.swap(newDatabases);
|
||||
_plannedDatabasesProt.version++; // such that others notice our change
|
||||
_plannedDatabasesProt.isValid = true; // will never be reset to false
|
||||
}
|
||||
clearPlannedDatabases(newDatabases); // delete the old stuff
|
||||
return;
|
||||
}
|
||||
|
||||
LOG(DEBUG) << "Error while loading " << prefixPlannedDatabases
|
||||
|
@ -589,46 +590,46 @@ void ClusterInfo::loadCurrentDatabases() {
|
|||
if (result.successful()) {
|
||||
|
||||
velocypack::Slice databases =
|
||||
result._vpack->slice()[0]
|
||||
.get(AgencyComm::prefix().substr(1,AgencyComm::prefix().size()-2))
|
||||
.get("Current").get("Databases");
|
||||
result._vpack->slice()[0].get(std::vector<std::string>(
|
||||
{AgencyComm::prefixStripped(), "Current", "Databases"}));
|
||||
|
||||
if (!databases.isNone()) {
|
||||
decltype(_currentDatabases) newDatabases;
|
||||
|
||||
decltype(_currentDatabases) newDatabases;
|
||||
for (auto const& dbase : VPackObjectIterator(databases)) {
|
||||
|
||||
for (auto const& dbase : VPackObjectIterator(databases)) {
|
||||
std::string const database = dbase.key.copyString();
|
||||
|
||||
std::string const database = dbase.key.copyString();
|
||||
// _currentDatabases is
|
||||
// a map-type<DatabaseID, a map-type<ServerID, TRI_json_t*>>
|
||||
auto it2 = newDatabases.find(database);
|
||||
|
||||
// _currentDatabases is
|
||||
// a map-type<DatabaseID, a map-type<ServerID, TRI_json_t*>>
|
||||
auto it2 = newDatabases.find(database);
|
||||
if (it2 == newDatabases.end()) {
|
||||
// insert an empty list for this database
|
||||
decltype(it2->second) empty;
|
||||
it2 = newDatabases.insert(std::make_pair(database, empty)).first;
|
||||
}
|
||||
|
||||
// TODO: _plannedDatabases need to be moved to velocypack
|
||||
// Then this can be merged to swap
|
||||
for (auto const& server : VPackObjectIterator(dbase.value)) {
|
||||
TRI_json_t* json = arangodb::basics::VelocyPackHelper::velocyPackToJson(
|
||||
server.value);
|
||||
(*it2).second.insert(std::make_pair(server.key.copyString(), json));
|
||||
}
|
||||
|
||||
if (it2 == newDatabases.end()) {
|
||||
// insert an empty list for this database
|
||||
decltype(it2->second) empty;
|
||||
it2 = newDatabases.insert(std::make_pair(database, empty)).first;
|
||||
}
|
||||
|
||||
// TODO: _plannedDatabases need to be moved to velocypack
|
||||
// Than this can be merged to swap
|
||||
for (auto const& server : VPackObjectIterator(dbase.value)) {
|
||||
TRI_json_t* json = arangodb::basics::VelocyPackHelper::velocyPackToJson(
|
||||
server.value);
|
||||
(*it2).second.insert(std::make_pair(server.key.copyString(), json));
|
||||
// Now set the new value:
|
||||
{
|
||||
WRITE_LOCKER(writeLocker, _currentDatabasesProt.lock);
|
||||
_currentDatabases.swap(newDatabases);
|
||||
_currentDatabasesProt.version++; // such that others notice our change
|
||||
_currentDatabasesProt.isValid = true; // will never be reset to false
|
||||
}
|
||||
|
||||
clearCurrentDatabases(newDatabases); // delete the old stuff
|
||||
return;
|
||||
}
|
||||
|
||||
// Now set the new value:
|
||||
{
|
||||
WRITE_LOCKER(writeLocker, _currentDatabasesProt.lock);
|
||||
_currentDatabases.swap(newDatabases);
|
||||
_currentDatabasesProt.version++; // such that others notice our change
|
||||
_currentDatabasesProt.isValid = true; // will never be reset to false
|
||||
}
|
||||
clearCurrentDatabases(newDatabases); // delete the old stuff
|
||||
return;
|
||||
}
|
||||
|
||||
LOG(DEBUG) << "Error while loading " << prefixCurrentDatabases
|
||||
|
@ -669,73 +670,74 @@ void ClusterInfo::loadPlannedCollections() {
|
|||
if (result.successful()) {
|
||||
|
||||
velocypack::Slice databases =
|
||||
result._vpack->slice()[0]
|
||||
.get(AgencyComm::prefix().substr(1,AgencyComm::prefix().size()-2))
|
||||
.get("Plan").get("Collections");
|
||||
result._vpack->slice()[0].get(std::vector<std::string>(
|
||||
{AgencyComm::prefixStripped(), "Plan", "Collections"}));
|
||||
|
||||
decltype(_plannedCollections) newCollections;
|
||||
decltype(_shards) newShards;
|
||||
decltype(_shardKeys) newShardKeys;
|
||||
if (!databases.isNone()) {
|
||||
decltype(_plannedCollections) newCollections;
|
||||
decltype(_shards) newShards;
|
||||
decltype(_shardKeys) newShardKeys;
|
||||
|
||||
for (auto const& db : VPackObjectIterator(databases)) {
|
||||
std::string const database = db.key.copyString();
|
||||
for (auto const& db : VPackObjectIterator(databases)) {
|
||||
std::string const database = db.key.copyString();
|
||||
|
||||
for (auto const& col : VPackObjectIterator(db.value)) {
|
||||
std::string const collection = col.key.copyString();
|
||||
for (auto const& col : VPackObjectIterator(db.value)) {
|
||||
std::string const collection = col.key.copyString();
|
||||
|
||||
// check whether we have created an entry for the database already
|
||||
AllCollections::iterator it2 = newCollections.find(database);
|
||||
|
||||
if (it2 == newCollections.end()) {
|
||||
// not yet, so create an entry for the database
|
||||
DatabaseCollections empty;
|
||||
newCollections.emplace(std::make_pair(database, empty));
|
||||
it2 = newCollections.find(database);
|
||||
// check whether we have created an entry for the database already
|
||||
AllCollections::iterator it2 = newCollections.find(database);
|
||||
|
||||
if (it2 == newCollections.end()) {
|
||||
// not yet, so create an entry for the database
|
||||
DatabaseCollections empty;
|
||||
newCollections.emplace(std::make_pair(database, empty));
|
||||
it2 = newCollections.find(database);
|
||||
}
|
||||
|
||||
// TODO: The Collection info has to store VPack instead of JSON
|
||||
TRI_json_t* json = arangodb::basics::VelocyPackHelper::velocyPackToJson(
|
||||
col.value);
|
||||
|
||||
auto collectionData = std::make_shared<CollectionInfo>(json);
|
||||
auto shardKeys = std::make_shared<std::vector<std::string>>(
|
||||
collectionData->shardKeys());
|
||||
newShardKeys.insert(make_pair(collection, shardKeys));
|
||||
auto shardIDs = collectionData->shardIds();
|
||||
auto shards = std::make_shared<std::vector<std::string>>();
|
||||
for (auto const& p : *shardIDs) {
|
||||
shards->push_back(p.first);
|
||||
}
|
||||
// Sort by the number in the shard ID ("s0000001" for example):
|
||||
std::sort(shards->begin(), shards->end(),
|
||||
[](std::string const& a, std::string const& b) -> bool {
|
||||
return std::strtol(a.c_str() + 1, nullptr, 10) <
|
||||
std::strtol(b.c_str() + 1, nullptr, 10);
|
||||
});
|
||||
newShards.emplace(std::make_pair(collection, shards));
|
||||
|
||||
// insert the collection into the existing map, insert it under its
|
||||
// ID as well as under its name, so that a lookup can be done with
|
||||
// either of the two.
|
||||
|
||||
(*it2).second.emplace(std::make_pair(collection, collectionData));
|
||||
(*it2).second.emplace(
|
||||
std::make_pair(collectionData->name(), collectionData));
|
||||
|
||||
}
|
||||
|
||||
// TODO: The Collection info has to store VPack instead of JSON
|
||||
TRI_json_t* json = arangodb::basics::VelocyPackHelper::velocyPackToJson(
|
||||
col.value);
|
||||
|
||||
auto collectionData = std::make_shared<CollectionInfo>(json);
|
||||
auto shardKeys = std::make_shared<std::vector<std::string>>(
|
||||
collectionData->shardKeys());
|
||||
newShardKeys.insert(make_pair(collection, shardKeys));
|
||||
auto shardIDs = collectionData->shardIds();
|
||||
auto shards = std::make_shared<std::vector<std::string>>();
|
||||
for (auto const& p : *shardIDs) {
|
||||
shards->push_back(p.first);
|
||||
}
|
||||
// Sort by the number in the shard ID ("s0000001" for example):
|
||||
std::sort(shards->begin(), shards->end(),
|
||||
[](std::string const& a, std::string const& b) -> bool {
|
||||
return std::strtol(a.c_str() + 1, nullptr, 10) <
|
||||
std::strtol(b.c_str() + 1, nullptr, 10);
|
||||
});
|
||||
newShards.emplace(std::make_pair(collection, shards));
|
||||
|
||||
// insert the collection into the existing map, insert it under its
|
||||
// ID as well as under its name, so that a lookup can be done with
|
||||
// either of the two.
|
||||
|
||||
(*it2).second.emplace(std::make_pair(collection, collectionData));
|
||||
(*it2).second.emplace(
|
||||
std::make_pair(collectionData->name(), collectionData));
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
// Now set the new value:
|
||||
{
|
||||
WRITE_LOCKER(writeLocker, _plannedCollectionsProt.lock);
|
||||
// Now set the new value:
|
||||
{
|
||||
WRITE_LOCKER(writeLocker, _plannedCollectionsProt.lock);
|
||||
|
||||
_plannedCollections.swap(newCollections);
|
||||
_shards.swap(newShards);
|
||||
_shardKeys.swap(newShardKeys);
|
||||
_plannedCollectionsProt.version++; // such that others notice our change
|
||||
_plannedCollectionsProt.isValid = true; // will never be reset to false
|
||||
_plannedCollections.swap(newCollections);
|
||||
_shards.swap(newShards);
|
||||
_shardKeys.swap(newShardKeys);
|
||||
_plannedCollectionsProt.version++; // such that others notice our change
|
||||
_plannedCollectionsProt.isValid = true; // will never be reset to false
|
||||
}
|
||||
return;
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
LOG(ERR) << "Error while loading " << prefixPlannedCollections
|
||||
|
@ -870,72 +872,73 @@ void ClusterInfo::loadCurrentCollections() {
|
|||
if (result.successful()) {
|
||||
|
||||
velocypack::Slice databases =
|
||||
result._vpack->slice()[0]
|
||||
.get(AgencyComm::prefix().substr(1,AgencyComm::prefix().size()-2))
|
||||
.get("Current").get("Collections");
|
||||
result._vpack->slice()[0].get(std::vector<std::string>(
|
||||
{AgencyComm::prefixStripped(), "Current", "Collections"}));
|
||||
|
||||
decltype(_currentCollections) newCollections;
|
||||
decltype(_shardIds) newShardIds;
|
||||
if (!databases.isNone()) {
|
||||
decltype(_currentCollections) newCollections;
|
||||
decltype(_shardIds) newShardIds;
|
||||
|
||||
for (auto const& db : VPackObjectIterator(databases)) {
|
||||
std::string const database = db.key.copyString();
|
||||
|
||||
for (auto const& col : VPackObjectIterator(db.value)) {
|
||||
std::string const collection = col.key.copyString();
|
||||
for (auto const& db : VPackObjectIterator(databases)) {
|
||||
std::string const database = db.key.copyString();
|
||||
|
||||
for (auto const& shrd : VPackObjectIterator(col.value)) {
|
||||
std::string const shardID = shrd.key.copyString();
|
||||
for (auto const& col : VPackObjectIterator(db.value)) {
|
||||
std::string const collection = col.key.copyString();
|
||||
|
||||
// check whether we have created an entry for the database already
|
||||
AllCollectionsCurrent::iterator it2 = newCollections.find(database);
|
||||
|
||||
if (it2 == newCollections.end()) {
|
||||
// not yet, so create an entry for the database
|
||||
DatabaseCollectionsCurrent empty;
|
||||
newCollections.insert(std::make_pair(database, empty));
|
||||
it2 = newCollections.find(database);
|
||||
}
|
||||
|
||||
// TODO: The Collection info has to store VPack instead of JSON
|
||||
TRI_json_t* json =
|
||||
arangodb::basics::VelocyPackHelper::velocyPackToJson (shrd.value);
|
||||
|
||||
// check whether we already have a CollectionInfoCurrent:
|
||||
DatabaseCollectionsCurrent::iterator it3 = it2->second.find(collection);
|
||||
if (it3 == it2->second.end()) {
|
||||
auto collectionDataCurrent =
|
||||
std::make_shared<CollectionInfoCurrent>(shardID, json);
|
||||
it2->second.insert(make_pair(collection, collectionDataCurrent));
|
||||
it3 = it2->second.find(collection);
|
||||
} else {
|
||||
it3->second->add(shardID, json);
|
||||
}
|
||||
|
||||
// Note that we have only inserted the CollectionInfoCurrent under
|
||||
// the collection ID and not under the name! It is not possible
|
||||
// to query the current collection info by name. This is because
|
||||
// the correct place to hold the current name is in the plan.
|
||||
// Thus: Look there and get the collection ID from there. Then
|
||||
// ask about the current collection info.
|
||||
|
||||
// Now take note of this shard and its responsible server:
|
||||
auto servers = std::make_shared<std::vector<ServerID>>(
|
||||
it3->second->servers(shardID));
|
||||
newShardIds.insert(make_pair(shardID, servers));
|
||||
for (auto const& shrd : VPackObjectIterator(col.value)) {
|
||||
std::string const shardID = shrd.key.copyString();
|
||||
|
||||
// check whether we have created an entry for the database already
|
||||
AllCollectionsCurrent::iterator it2 = newCollections.find(database);
|
||||
|
||||
if (it2 == newCollections.end()) {
|
||||
// not yet, so create an entry for the database
|
||||
DatabaseCollectionsCurrent empty;
|
||||
newCollections.insert(std::make_pair(database, empty));
|
||||
it2 = newCollections.find(database);
|
||||
}
|
||||
|
||||
// TODO: The Collection info has to store VPack instead of JSON
|
||||
TRI_json_t* json =
|
||||
arangodb::basics::VelocyPackHelper::velocyPackToJson (shrd.value);
|
||||
|
||||
// check whether we already have a CollectionInfoCurrent:
|
||||
DatabaseCollectionsCurrent::iterator it3 = it2->second.find(collection);
|
||||
if (it3 == it2->second.end()) {
|
||||
auto collectionDataCurrent =
|
||||
std::make_shared<CollectionInfoCurrent>(shardID, json);
|
||||
it2->second.insert(make_pair(collection, collectionDataCurrent));
|
||||
it3 = it2->second.find(collection);
|
||||
} else {
|
||||
it3->second->add(shardID, json);
|
||||
}
|
||||
|
||||
// Note that we have only inserted the CollectionInfoCurrent under
|
||||
// the collection ID and not under the name! It is not possible
|
||||
// to query the current collection info by name. This is because
|
||||
// the correct place to hold the current name is in the plan.
|
||||
// Thus: Look there and get the collection ID from there. Then
|
||||
// ask about the current collection info.
|
||||
|
||||
// Now take note of this shard and its responsible server:
|
||||
auto servers = std::make_shared<std::vector<ServerID>>(
|
||||
it3->second->servers(shardID));
|
||||
newShardIds.insert(make_pair(shardID, servers));
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Now set the new value:
|
||||
{
|
||||
WRITE_LOCKER(writeLocker, _currentCollectionsProt.lock);
|
||||
_currentCollections.swap(newCollections);
|
||||
_shardIds.swap(newShardIds);
|
||||
_currentCollectionsProt.version++; // such that others notice our change
|
||||
_currentCollectionsProt.isValid = true;
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
// Now set the new value:
|
||||
{
|
||||
WRITE_LOCKER(writeLocker, _currentCollectionsProt.lock);
|
||||
_currentCollections.swap(newCollections);
|
||||
_shardIds.swap(newShardIds);
|
||||
_currentCollectionsProt.version++; // such that others notice our change
|
||||
_currentCollectionsProt.isValid = true;
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
LOG(DEBUG) << "Error while loading " << prefixCurrentCollections
|
||||
|
@ -2018,40 +2021,42 @@ int ClusterInfo::dropIndexCoordinator(std::string const& databaseName,
|
|||
if (res.successful()) {
|
||||
|
||||
velocypack::Slice current =
|
||||
res._vpack->slice()[0]
|
||||
.get(AgencyComm::prefix().substr(1,AgencyComm::prefix().size()-2))
|
||||
.get("Current").get("Collections").get(databaseName).get(collectionID);
|
||||
res._vpack->slice()[0].get(std::vector<std::string>(
|
||||
{AgencyComm::prefixStripped(), "Current", "Collections",
|
||||
databaseName, collectionID}));
|
||||
|
||||
VPackObjectIterator shards(current);
|
||||
|
||||
if (shards.size() == (size_t)numberOfShards) {
|
||||
if (!current.isNone()) {
|
||||
VPackObjectIterator shards(current);
|
||||
|
||||
bool found = false;
|
||||
for (auto const& shard : shards) {
|
||||
if (shards.size() == (size_t)numberOfShards) {
|
||||
|
||||
VPackSlice const indexes = shard.value.get("indexes");
|
||||
|
||||
if (indexes.isArray()) {
|
||||
for (auto const& v : VPackArrayIterator(indexes)) {
|
||||
if (v.isObject()) {
|
||||
VPackSlice const k = v.get("id");
|
||||
if (k.isString() && idString == k.copyString()) {
|
||||
// still found the index in some shard
|
||||
found = true;
|
||||
bool found = false;
|
||||
for (auto const& shard : shards) {
|
||||
|
||||
VPackSlice const indexes = shard.value.get("indexes");
|
||||
|
||||
if (indexes.isArray()) {
|
||||
for (auto const& v : VPackArrayIterator(indexes)) {
|
||||
if (v.isObject()) {
|
||||
VPackSlice const k = v.get("id");
|
||||
if (k.isString() && idString == k.copyString()) {
|
||||
// still found the index in some shard
|
||||
found = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (found) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (found) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!found) {
|
||||
loadCurrentCollections();
|
||||
return setErrormsg(TRI_ERROR_NO_ERROR, errorMsg);
|
||||
|
||||
if (!found) {
|
||||
loadCurrentCollections();
|
||||
return setErrormsg(TRI_ERROR_NO_ERROR, errorMsg);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -2092,29 +2097,30 @@ void ClusterInfo::loadServers() {
|
|||
if (result.successful()) {
|
||||
|
||||
velocypack::Slice serversRegistered =
|
||||
result._vpack->slice()[0]
|
||||
.get(AgencyComm::prefix().substr(1,AgencyComm::prefix().size()-2)).
|
||||
get("Current").get("ServersRegistered");
|
||||
result._vpack->slice()[0].get(std::vector<std::string>(
|
||||
{AgencyComm::prefixStripped(), "Current", "ServersRegistered"}));
|
||||
|
||||
decltype(_servers) newServers;
|
||||
if (!serversRegistered.isNone()) {
|
||||
decltype(_servers) newServers;
|
||||
|
||||
for (auto const& res : VPackObjectIterator(serversRegistered)) {
|
||||
velocypack::Slice slice = res.value;
|
||||
if (slice.isObject() && slice.hasKey("endpoint")) {
|
||||
std::string server = arangodb::basics::VelocyPackHelper::getStringValue(
|
||||
slice, "endpoint", "");
|
||||
newServers.emplace(std::make_pair(res.key.copyString(), server));
|
||||
for (auto const& res : VPackObjectIterator(serversRegistered)) {
|
||||
velocypack::Slice slice = res.value;
|
||||
if (slice.isObject() && slice.hasKey("endpoint")) {
|
||||
std::string server = arangodb::basics::VelocyPackHelper::getStringValue(
|
||||
slice, "endpoint", "");
|
||||
newServers.emplace(std::make_pair(res.key.copyString(), server));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Now set the new value:
|
||||
{
|
||||
WRITE_LOCKER(writeLocker, _serversProt.lock);
|
||||
_servers.swap(newServers);
|
||||
_serversProt.version++; // such that others notice our change
|
||||
_serversProt.isValid = true; // will never be reset to false
|
||||
// Now set the new value:
|
||||
{
|
||||
WRITE_LOCKER(writeLocker, _serversProt.lock);
|
||||
_servers.swap(newServers);
|
||||
_serversProt.version++; // such that others notice our change
|
||||
_serversProt.isValid = true; // will never be reset to false
|
||||
}
|
||||
return;
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
LOG(DEBUG) << "Error while loading " << prefixServers
|
||||
|
@ -2223,25 +2229,26 @@ void ClusterInfo::loadCurrentCoordinators() {
|
|||
if (result.successful()) {
|
||||
|
||||
velocypack::Slice currentCoordinators =
|
||||
result._vpack->slice()[0]
|
||||
.get(AgencyComm::prefix().substr(1,AgencyComm::prefix().size()-2)).
|
||||
get("Current").get("Coordinators");
|
||||
result._vpack->slice()[0].get(std::vector<std::string>(
|
||||
{AgencyComm::prefixStripped(), "Current", "Coordinators"}));
|
||||
|
||||
decltype(_coordinators) newCoordinators;
|
||||
|
||||
for (auto const& coordinator : VPackObjectIterator(currentCoordinators)) {
|
||||
newCoordinators.emplace(
|
||||
std::make_pair(coordinator.key.copyString(), coordinator.value.copyString()));
|
||||
if (!currentCoordinators.isNone()) {
|
||||
decltype(_coordinators) newCoordinators;
|
||||
|
||||
for (auto const& coordinator : VPackObjectIterator(currentCoordinators)) {
|
||||
newCoordinators.emplace(
|
||||
std::make_pair(coordinator.key.copyString(), coordinator.value.copyString()));
|
||||
}
|
||||
|
||||
// Now set the new value:
|
||||
{
|
||||
WRITE_LOCKER(writeLocker, _coordinatorsProt.lock);
|
||||
_coordinators.swap(newCoordinators);
|
||||
_coordinatorsProt.version++; // such that others notice our change
|
||||
_coordinatorsProt.isValid = true; // will never be reset to false
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
// Now set the new value:
|
||||
{
|
||||
WRITE_LOCKER(writeLocker, _coordinatorsProt.lock);
|
||||
_coordinators.swap(newCoordinators);
|
||||
_coordinatorsProt.version++; // such that others notice our change
|
||||
_coordinatorsProt.isValid = true; // will never be reset to false
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
LOG(DEBUG) << "Error while loading " << prefixCurrentCoordinators
|
||||
|
@ -2279,26 +2286,27 @@ void ClusterInfo::loadCurrentDBServers() {
|
|||
if (result.successful()) {
|
||||
|
||||
velocypack::Slice currentDBServers =
|
||||
result._vpack->slice()[0]
|
||||
.get(AgencyComm::prefix().substr(1,AgencyComm::prefix().size()-2)).
|
||||
get("Current").get("DBServers");
|
||||
result._vpack->slice()[0].get(std::vector<std::string>(
|
||||
{AgencyComm::prefixStripped(), "Current", "DBServers"}));
|
||||
|
||||
decltype(_DBServers) newDBServers;
|
||||
if (!currentDBServers.isNone()) {
|
||||
decltype(_DBServers) newDBServers;
|
||||
|
||||
//for (; it != result._values.end(); ++it) {
|
||||
for (auto const& dbserver : VPackObjectIterator(currentDBServers)) {
|
||||
newDBServers.emplace(
|
||||
std::make_pair(dbserver.key.copyString(), dbserver.value.copyString()));
|
||||
//for (; it != result._values.end(); ++it) {
|
||||
for (auto const& dbserver : VPackObjectIterator(currentDBServers)) {
|
||||
newDBServers.emplace(
|
||||
std::make_pair(dbserver.key.copyString(), dbserver.value.copyString()));
|
||||
}
|
||||
|
||||
// Now set the new value:
|
||||
{
|
||||
WRITE_LOCKER(writeLocker, _DBServersProt.lock);
|
||||
_DBServers.swap(newDBServers);
|
||||
_DBServersProt.version++; // such that others notice our change
|
||||
_DBServersProt.isValid = true; // will never be reset to false
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
// Now set the new value:
|
||||
{
|
||||
WRITE_LOCKER(writeLocker, _DBServersProt.lock);
|
||||
_DBServers.swap(newDBServers);
|
||||
_DBServersProt.version++; // such that others notice our change
|
||||
_DBServersProt.isValid = true; // will never be reset to false
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
LOG(DEBUG) << "Error while loading " << prefixCurrentDBServers
|
||||
|
@ -2470,7 +2478,6 @@ std::shared_ptr<std::vector<ShardID>> ClusterInfo::getShardList(
|
|||
/// `_key` is the one and only sharding attribute.
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
|
||||
int ClusterInfo::getResponsibleShard(CollectionID const& collectionID,
|
||||
VPackSlice slice, bool docComplete,
|
||||
ShardID& shardID,
|
||||
|
@ -2654,12 +2661,46 @@ std::vector<ServerID> ClusterInfo::getCurrentCoordinators() {
|
|||
return result;
|
||||
}
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief invalidate plan
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
void ClusterInfo::invalidatePlan() {
|
||||
{
|
||||
WRITE_LOCKER(writeLocker, _plannedDatabasesProt.lock);
|
||||
_plannedDatabasesProt.isValid = false;
|
||||
}
|
||||
{
|
||||
WRITE_LOCKER(writeLocker, _plannedCollectionsProt.lock);
|
||||
_plannedCollectionsProt.isValid = false;
|
||||
}
|
||||
}
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief invalidate current
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
void ClusterInfo::invalidateCurrent() {
|
||||
WRITE_LOCKER(writeLocker, _currentCollectionsProt.lock);
|
||||
_currentCollectionsProt.isValid = false;
|
||||
{
|
||||
WRITE_LOCKER(writeLocker, _serversProt.lock);
|
||||
_serversProt.isValid = false;
|
||||
}
|
||||
{
|
||||
WRITE_LOCKER(writeLocker, _DBServersProt.lock);
|
||||
_DBServersProt.isValid = false;
|
||||
}
|
||||
{
|
||||
WRITE_LOCKER(writeLocker, _coordinatorsProt.lock);
|
||||
_coordinatorsProt.isValid = false;
|
||||
}
|
||||
{
|
||||
WRITE_LOCKER(writeLocker, _currentDatabasesProt.lock);
|
||||
_currentDatabasesProt.isValid = false;
|
||||
}
|
||||
{
|
||||
WRITE_LOCKER(writeLocker, _currentCollectionsProt.lock);
|
||||
_currentCollectionsProt.isValid = false;
|
||||
}
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -2679,7 +2720,9 @@ std::shared_ptr<std::vector<ServerID> const> FollowerInfo::get() {
|
|||
/// there).
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
VPackBuilder newShardEntry(VPackSlice oldValue, ServerID const& sid, bool add) {
|
||||
static VPackBuilder newShardEntry(VPackSlice oldValue,
|
||||
ServerID const& sid,
|
||||
bool add) {
|
||||
VPackBuilder newValue;
|
||||
VPackSlice servers;
|
||||
{
|
||||
|
|
|
@ -820,6 +820,12 @@ class ClusterInfo {
|
|||
|
||||
std::vector<ServerID> getCurrentCoordinators();
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief invalidate planned
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
void invalidatePlan();
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief invalidate current
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
|
|
@ -208,16 +208,23 @@ void HeartbeatThread::runDBServer() {
|
|||
if (currentVersion == 0) {
|
||||
LOG(ERR) << "Current/Version in agency is 0.";
|
||||
} else {
|
||||
if (currentVersion > _desiredVersions.current) {
|
||||
_desiredVersions.current = currentVersion;
|
||||
LOG(INFO) << "Found greater Current/Version in agency.";
|
||||
syncDBServerStatusQuo();
|
||||
{
|
||||
MUTEX_LOCKER(mutexLocker, _statusLock);
|
||||
if (currentVersion > _desiredVersions.current) {
|
||||
_desiredVersions.current = currentVersion;
|
||||
LOG(DEBUG) << "Found greater Current/Version in agency.";
|
||||
}
|
||||
}
|
||||
syncDBServerStatusQuo();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (isStopping()) {
|
||||
break;
|
||||
}
|
||||
|
||||
double remain = interval - (TRI_microtime() - start);
|
||||
// mop: execute at least once
|
||||
do {
|
||||
|
@ -634,6 +641,8 @@ bool HeartbeatThread::handlePlanChangeCoordinator(uint64_t currentPlanVersion) {
|
|||
|
||||
bool HeartbeatThread::syncDBServerStatusQuo() {
|
||||
bool shouldUpdate = false;
|
||||
bool becauseOfPlan = false;
|
||||
bool becauseOfCurrent = false;
|
||||
{
|
||||
MUTEX_LOCKER(mutexLocker, _statusLock);
|
||||
// mop: only dispatch one at a time
|
||||
|
@ -645,16 +654,28 @@ bool HeartbeatThread::syncDBServerStatusQuo() {
|
|||
LOG(DEBUG) << "Plan version " << _currentVersions.plan
|
||||
<< " is lower than desired version " << _desiredVersions.plan;
|
||||
_isDispatchingChange = true;
|
||||
} else if (_desiredVersions.current > _currentVersions.current) {
|
||||
becauseOfPlan = true;
|
||||
}
|
||||
if (_desiredVersions.current > _currentVersions.current) {
|
||||
LOG(DEBUG) << "Current version " << _currentVersions.current
|
||||
<< " is lower than desired version "
|
||||
<< _desiredVersions.current;
|
||||
_isDispatchingChange = true;
|
||||
becauseOfCurrent = true;
|
||||
}
|
||||
shouldUpdate = _isDispatchingChange;
|
||||
}
|
||||
|
||||
if (shouldUpdate) {
|
||||
// First invalidate the caches in ClusterInfo:
|
||||
auto ci = ClusterInfo::instance();
|
||||
if (becauseOfPlan) {
|
||||
ci->invalidatePlan();
|
||||
}
|
||||
if (becauseOfCurrent) {
|
||||
ci->invalidateCurrent();
|
||||
}
|
||||
|
||||
LOG(TRACE) << "Dispatching Sync";
|
||||
// schedule a job for the change
|
||||
std::unique_ptr<arangodb::rest::Job> job(new ServerJob(this));
|
||||
|
|
Loading…
Reference in New Issue