mirror of https://gitee.com/bigwinds/arangodb
issue 521.1: do not mark loadPlan as valid unless all instances were created successfully (#8235)
* issue 521.1: do not mark loadPlan as valid unless all instances were created successfully * add extra validation
This commit is contained in:
parent
2a9b1b127a
commit
7b7f5d82ab
|
@ -425,8 +425,10 @@ void ClusterInfo::loadPlan() {
|
|||
DatabaseFeature* databaseFeature =
|
||||
application_features::ApplicationServer::getFeature<DatabaseFeature>(
|
||||
"Database");
|
||||
|
||||
++_planProt.wantedVersion; // Indicate that after *NOW* somebody has to
|
||||
// reread from the agency!
|
||||
|
||||
MUTEX_LOCKER(mutexLocker, _planProt.mutex); // only one may work at a time
|
||||
|
||||
// For ArangoSearch views we need to get access to immediately created views
|
||||
|
@ -449,11 +451,13 @@ void ClusterInfo::loadPlan() {
|
|||
_newPlannedViews.clear();
|
||||
});
|
||||
|
||||
bool planValid = true; // has the loadPlan compleated without skipping valid objects
|
||||
uint64_t storedVersion = _planProt.wantedVersion; // this is the version
|
||||
// we will set in the end
|
||||
|
||||
LOG_TOPIC(TRACE, Logger::CLUSTER) << "loadPlan: wantedVersion=" << storedVersion
|
||||
<< ", doneVersion=" << _planProt.doneVersion;
|
||||
|
||||
if (_planProt.doneVersion == storedVersion) {
|
||||
// Somebody else did, what we intended to do, so just return
|
||||
return;
|
||||
|
@ -462,38 +466,72 @@ void ClusterInfo::loadPlan() {
|
|||
// Now contact the agency:
|
||||
AgencyCommResult result = _agency.getValues(prefixPlan);
|
||||
|
||||
if (result.successful()) {
|
||||
VPackSlice slice = result.slice()[0].get(
|
||||
std::vector<std::string>({AgencyCommManager::path(), "Plan"}));
|
||||
auto planBuilder = std::make_shared<VPackBuilder>();
|
||||
if (!result.successful()) {
|
||||
LOG_TOPIC(DEBUG, Logger::CLUSTER)
|
||||
<< "Error while loading " << prefixPlan
|
||||
<< " httpCode: " << result.httpCode() << " errorCode: " << result.errorCode()
|
||||
<< " errorMessage: " << result.errorMessage() << " body: " << result.body();
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
auto resultSlice = result.slice();
|
||||
|
||||
if (!resultSlice.isArray() || resultSlice.length() != 1) {
|
||||
LOG_TOPIC(DEBUG, Logger::CLUSTER)
|
||||
<< "Error while loading " << prefixPlan << " response structure is not an array of size 1"
|
||||
<< " httpCode: " << result.httpCode() << " errorCode: " << result.errorCode()
|
||||
<< " errorMessage: " << result.errorMessage() << " body: " << result.body();
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
auto slice = resultSlice[0].get( // get slice
|
||||
std::vector<std::string>({AgencyCommManager::path(), "Plan"}) // args
|
||||
);
|
||||
auto planBuilder = std::make_shared<velocypack::Builder>();
|
||||
|
||||
planBuilder->add(slice);
|
||||
|
||||
VPackSlice planSlice = planBuilder->slice();
|
||||
auto planSlice = planBuilder->slice();
|
||||
|
||||
if (!planSlice.isObject()) {
|
||||
LOG_TOPIC(ERR, Logger::CLUSTER)
|
||||
<< "\"Plan\" is not an object in agency";
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
if (planSlice.isObject()) {
|
||||
uint64_t newPlanVersion = 0;
|
||||
VPackSlice planVersionSlice = planSlice.get("Version");
|
||||
auto planVersionSlice = planSlice.get("Version");
|
||||
|
||||
if (planVersionSlice.isNumber()) {
|
||||
try {
|
||||
newPlanVersion = planVersionSlice.getNumber<uint64_t>();
|
||||
} catch (...) {
|
||||
}
|
||||
}
|
||||
LOG_TOPIC(TRACE, Logger::CLUSTER) << "loadPlan: newPlanVersion=" << newPlanVersion;
|
||||
|
||||
LOG_TOPIC(TRACE, Logger::CLUSTER)
|
||||
<< "loadPlan: newPlanVersion=" << newPlanVersion;
|
||||
|
||||
if (newPlanVersion == 0) {
|
||||
LOG_TOPIC(WARN, Logger::CLUSTER) << "Attention: /arango/Plan/Version "
|
||||
"in the agency is not set or not "
|
||||
"a positive number.";
|
||||
LOG_TOPIC(WARN, Logger::CLUSTER)
|
||||
<< "Attention: /arango/Plan/Version in the agency is not set or not a positive number.";
|
||||
}
|
||||
|
||||
{
|
||||
READ_LOCKER(guard, _planProt.lock);
|
||||
|
||||
if (_planProt.isValid && newPlanVersion <= _planVersion) {
|
||||
LOG_TOPIC(DEBUG, Logger::CLUSTER)
|
||||
<< "We already know this or a later version, do not update. "
|
||||
<< "newPlanVersion=" << newPlanVersion << " _planVersion=" << _planVersion;
|
||||
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
decltype(_plannedDatabases) newDatabases;
|
||||
decltype(_plannedCollections) newCollections; // map<string /*database id*/
|
||||
// ,map<string /*collection id*/
|
||||
|
@ -508,23 +546,26 @@ void ClusterInfo::loadPlan() {
|
|||
bool swapCollections = false;
|
||||
bool swapViews = false;
|
||||
|
||||
VPackSlice databasesSlice;
|
||||
databasesSlice = planSlice.get("Databases");
|
||||
if (databasesSlice.isObject()) {
|
||||
auto planDatabasesSlice = planSlice.get("Databases");
|
||||
|
||||
if (planDatabasesSlice.isObject()) {
|
||||
swapDatabases = true; // mark for swap even if no databases present to ensure dangling datasources are removed
|
||||
|
||||
std::string name;
|
||||
for (auto const& database : VPackObjectIterator(databasesSlice)) {
|
||||
|
||||
for (auto const& database: velocypack::ObjectIterator(planDatabasesSlice)) {
|
||||
try {
|
||||
name = database.key.copyString();
|
||||
} catch (arangodb::velocypack::Exception const& e) {
|
||||
LOG_TOPIC(ERR, Logger::AGENCY)
|
||||
<< "Failed to get database name from json, error '" << e.what()
|
||||
<< "'. VelocyPack: " << database.key.toJson();
|
||||
|
||||
throw;
|
||||
}
|
||||
|
||||
newDatabases.insert(std::make_pair(name, database.value));
|
||||
newDatabases.emplace(name, database.value);
|
||||
}
|
||||
swapDatabases = true;
|
||||
}
|
||||
|
||||
// Ensure views are being created BEFORE collections to allow
|
||||
|
@ -545,66 +586,68 @@ void ClusterInfo::loadPlan() {
|
|||
// }}
|
||||
|
||||
// Now the same for views:
|
||||
databasesSlice = planSlice.get("Views"); // format above
|
||||
if (databasesSlice.isObject()) {
|
||||
swapViews = true; // mark for swap even if no databases present to
|
||||
// ensure dangling datasources are removed
|
||||
auto planViewsSlice = planSlice.get("Views"); // format above
|
||||
|
||||
for (auto const& databasePairSlice : VPackObjectIterator(databasesSlice)) {
|
||||
VPackSlice const& viewsSlice = databasePairSlice.value;
|
||||
if (planViewsSlice.isObject()) {
|
||||
swapViews = true; // mark for swap even if no databases present to ensure dangling datasources are removed
|
||||
|
||||
for (auto const& databasePairSlice: velocypack::ObjectIterator(planViewsSlice)) {
|
||||
auto const& viewsSlice = databasePairSlice.value;
|
||||
|
||||
if (!viewsSlice.isObject()) {
|
||||
LOG_TOPIC(INFO, Logger::AGENCY)
|
||||
<< "Views in the plan is not a valid json object."
|
||||
" Views will be ignored for now and the invalid information"
|
||||
" will be repaired. VelocyPack: "
|
||||
<< " Views will be ignored for now and the invalid information"
|
||||
<< " will be repaired. VelocyPack: "
|
||||
<< viewsSlice.toJson();
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
std::string const databaseName = databasePairSlice.key.copyString();
|
||||
TRI_vocbase_t* vocbase = databaseFeature->lookupDatabase(databaseName);
|
||||
auto const databaseName = databasePairSlice.key.copyString();
|
||||
auto* vocbase = databaseFeature->lookupDatabase(databaseName);
|
||||
|
||||
if (vocbase == nullptr) {
|
||||
if (!vocbase) {
|
||||
// No database with this name found.
|
||||
// We have an invalid state here.
|
||||
LOG_TOPIC(WARN, Logger::AGENCY)
|
||||
<< "No database '" << databaseName
|
||||
<< "' found,"
|
||||
" corresponding view will be ignored for now and the "
|
||||
"invalid information"
|
||||
" will be repaired. VelocyPack: "
|
||||
<< "No database '" << databaseName << "' found,"
|
||||
<< " corresponding view will be ignored for now and the "
|
||||
<< "invalid information will be repaired. VelocyPack: "
|
||||
<< viewsSlice.toJson();
|
||||
planValid &= !viewsSlice.length(); // cannot find vocbase for defined views (allow empty views for missing vocbase)
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
for (auto const& viewPairSlice : VPackObjectIterator(viewsSlice)) {
|
||||
VPackSlice const& viewSlice = viewPairSlice.value;
|
||||
for (auto const& viewPairSlice: velocypack::ObjectIterator(viewsSlice)) {
|
||||
auto const& viewSlice = viewPairSlice.value;
|
||||
|
||||
if (!viewSlice.isObject()) {
|
||||
LOG_TOPIC(INFO, Logger::AGENCY)
|
||||
<< "View entry is not a valid json object."
|
||||
" The view will be ignored for now and the invalid "
|
||||
"information"
|
||||
" will be repaired. VelocyPack: "
|
||||
<< " The view will be ignored for now and the invalid "
|
||||
<< "information will be repaired. VelocyPack: "
|
||||
<< viewSlice.toJson();
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
std::string const viewId = viewPairSlice.key.copyString();
|
||||
auto const viewId = viewPairSlice.key.copyString();
|
||||
|
||||
try {
|
||||
LogicalView::ptr view;
|
||||
auto res = LogicalView::instantiate(view, *vocbase, viewPairSlice.value,
|
||||
newPlanVersion);
|
||||
auto res = LogicalView::instantiate( // instantiate
|
||||
view, *vocbase, viewPairSlice.value, newPlanVersion // args
|
||||
);
|
||||
|
||||
if (!res.ok() || !view) {
|
||||
LOG_TOPIC(ERR, Logger::AGENCY)
|
||||
<< "Failed to create view '" << viewId
|
||||
<< "'. The view will be ignored for now and the invalid "
|
||||
"information "
|
||||
"will be repaired. VelocyPack: "
|
||||
<< "information will be repaired. VelocyPack: "
|
||||
<< viewSlice.toJson();
|
||||
planValid = false; // view creation failure
|
||||
|
||||
continue;
|
||||
}
|
||||
|
@ -623,14 +666,13 @@ void ClusterInfo::loadPlan() {
|
|||
// cluster should not fail.
|
||||
LOG_TOPIC(ERR, Logger::AGENCY)
|
||||
<< "Failed to load information for view '" << viewId
|
||||
<< "': " << ex.what()
|
||||
<< ". invalid information in Plan. The "
|
||||
"view will be ignored for now and the invalid "
|
||||
"information "
|
||||
"will be repaired. VelocyPack: "
|
||||
<< "': " << ex.what() << ". invalid information in Plan. The "
|
||||
<< "view will be ignored for now and the invalid "
|
||||
<< "information will be repaired. VelocyPack: "
|
||||
<< viewSlice.toJson();
|
||||
|
||||
TRI_ASSERT(false);
|
||||
continue;
|
||||
} catch (...) {
|
||||
// The Plan contains invalid view information.
|
||||
// This should not happen in healthy situations.
|
||||
|
@ -639,11 +681,12 @@ void ClusterInfo::loadPlan() {
|
|||
LOG_TOPIC(ERR, Logger::AGENCY)
|
||||
<< "Failed to load information for view '" << viewId
|
||||
<< ". invalid information in Plan. The view will "
|
||||
"be ignored for now and the invalid information will "
|
||||
"be repaired. VelocyPack: "
|
||||
<< "be ignored for now and the invalid information will "
|
||||
<< "be repaired. VelocyPack: "
|
||||
<< viewSlice.toJson();
|
||||
|
||||
TRI_ASSERT(false);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -700,81 +743,82 @@ void ClusterInfo::loadPlan() {
|
|||
// },...
|
||||
// }}
|
||||
|
||||
databasesSlice = planSlice.get("Collections"); // format above
|
||||
if (databasesSlice.isObject()) {
|
||||
swapCollections = true; // mark for swap even if no databases present
|
||||
// to ensure dangling datasources are removed
|
||||
auto planCollectionsSlice = planSlice.get("Collections"); // format above
|
||||
|
||||
if (planCollectionsSlice.isObject()) {
|
||||
swapCollections = true; // mark for swap even if no databases present to ensure dangling datasources are removed
|
||||
|
||||
bool const isCoordinator = ServerState::instance()->isCoordinator();
|
||||
|
||||
for (auto const& databasePairSlice : VPackObjectIterator(databasesSlice)) {
|
||||
VPackSlice const& collectionsSlice = databasePairSlice.value;
|
||||
for (auto const& databasePairSlice: velocypack::ObjectIterator(planCollectionsSlice)) {
|
||||
auto const& collectionsSlice = databasePairSlice.value;
|
||||
|
||||
if (!collectionsSlice.isObject()) {
|
||||
LOG_TOPIC(INFO, Logger::AGENCY)
|
||||
<< "Collections in the plan is not a valid json object."
|
||||
" Collections will be ignored for now and the invalid "
|
||||
"information"
|
||||
" will be repaired. VelocyPack: "
|
||||
<< " Collections will be ignored for now and the invalid "
|
||||
<< "information will be repaired. VelocyPack: "
|
||||
<< collectionsSlice.toJson();
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
DatabaseCollections databaseCollections;
|
||||
std::string const databaseName = databasePairSlice.key.copyString();
|
||||
TRI_vocbase_t* vocbase = databaseFeature->lookupDatabase(databaseName);
|
||||
auto const databaseName = databasePairSlice.key.copyString();
|
||||
auto* vocbase = databaseFeature->lookupDatabase(databaseName);
|
||||
|
||||
if (vocbase == nullptr) {
|
||||
if (!vocbase) {
|
||||
// No database with this name found.
|
||||
// We have an invalid state here.
|
||||
LOG_TOPIC(WARN, Logger::AGENCY)
|
||||
<< "No database '" << databaseName
|
||||
<< "' found,"
|
||||
" corresponding collection will be ignored for now and the "
|
||||
"invalid information"
|
||||
" will be repaired. VelocyPack: "
|
||||
<< "No database '" << databaseName << "' found,"
|
||||
<< " corresponding collection will be ignored for now and the "
|
||||
<< "invalid information will be repaired. VelocyPack: "
|
||||
<< collectionsSlice.toJson();
|
||||
planValid &= !collectionsSlice.length(); // cannot find vocbase for defined collections (allow empty collections for missing vocbase)
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
for (auto const& collectionPairSlice : VPackObjectIterator(collectionsSlice)) {
|
||||
VPackSlice const& collectionSlice = collectionPairSlice.value;
|
||||
for (auto const& collectionPairSlice: velocypack::ObjectIterator(collectionsSlice)) {
|
||||
auto const& collectionSlice = collectionPairSlice.value;
|
||||
|
||||
if (!collectionSlice.isObject()) {
|
||||
LOG_TOPIC(WARN, Logger::AGENCY)
|
||||
<< "Collection entry is not a valid json object."
|
||||
" The collection will be ignored for now and the invalid "
|
||||
"information"
|
||||
" will be repaired. VelocyPack: "
|
||||
<< " The collection will be ignored for now and the invalid "
|
||||
<< "information will be repaired. VelocyPack: "
|
||||
<< collectionSlice.toJson();
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
std::string const collectionId = collectionPairSlice.key.copyString();
|
||||
auto const collectionId = collectionPairSlice.key.copyString();
|
||||
|
||||
try {
|
||||
std::shared_ptr<LogicalCollection> newCollection;
|
||||
|
||||
#if defined(USE_ENTERPRISE)
|
||||
VPackSlice isSmart = collectionSlice.get(StaticStrings::IsSmart);
|
||||
auto isSmart = collectionSlice.get(StaticStrings::IsSmart);
|
||||
|
||||
if (isSmart.isTrue()) {
|
||||
auto type = collectionSlice.get(arangodb::StaticStrings::DataSourceType);
|
||||
auto type = collectionSlice.get(StaticStrings::DataSourceType);
|
||||
|
||||
if (type.isInteger() && type.getUInt() == TRI_COL_TYPE_EDGE) {
|
||||
newCollection =
|
||||
std::make_shared<VirtualSmartEdgeCollection>(*vocbase, collectionSlice,
|
||||
newPlanVersion);
|
||||
newCollection = std::make_shared<VirtualSmartEdgeCollection>( // create collection
|
||||
*vocbase, collectionSlice, newPlanVersion // args
|
||||
);
|
||||
} else {
|
||||
newCollection =
|
||||
std::make_shared<SmartVertexCollection>(*vocbase, collectionSlice,
|
||||
newPlanVersion);
|
||||
newCollection = std::make_shared<SmartVertexCollection>( // create collection
|
||||
*vocbase, collectionSlice, newPlanVersion // args
|
||||
);
|
||||
}
|
||||
} else
|
||||
#endif
|
||||
{
|
||||
newCollection =
|
||||
std::make_shared<LogicalCollection>(*vocbase, collectionSlice,
|
||||
true, newPlanVersion);
|
||||
newCollection = std::make_shared<LogicalCollection>( // create collection
|
||||
*vocbase, collectionSlice, true, newPlanVersion // args
|
||||
);
|
||||
}
|
||||
|
||||
auto& collectionName = newCollection->name();
|
||||
|
@ -782,17 +826,22 @@ void ClusterInfo::loadPlan() {
|
|||
if (isCoordinator) {
|
||||
// copying over index estimates from the old version of the
|
||||
// collection into the new one
|
||||
LOG_TOPIC(TRACE, Logger::CLUSTER) << "copying index estimates";
|
||||
LOG_TOPIC(TRACE, Logger::CLUSTER)
|
||||
<< "copying index estimates";
|
||||
|
||||
// it is effectively safe to access _plannedCollections in
|
||||
// read-only mode here, as the only places that modify
|
||||
// _plannedCollections are the shutdown and this function
|
||||
// itself, which is protected by a mutex
|
||||
auto it = _plannedCollections.find(databaseName);
|
||||
|
||||
if (it != _plannedCollections.end()) {
|
||||
auto it2 = (*it).second.find(collectionId);
|
||||
|
||||
if (it2 != (*it).second.end()) {
|
||||
try {
|
||||
auto estimates = (*it2).second->clusterIndexEstimates(false);
|
||||
|
||||
if (!estimates.empty()) {
|
||||
// already have an estimate... now copy it over
|
||||
newCollection->clusterIndexEstimates(std::move(estimates));
|
||||
|
@ -803,28 +852,35 @@ void ClusterInfo::loadPlan() {
|
|||
}
|
||||
}
|
||||
}
|
||||
// register with name as well as with id:
|
||||
databaseCollections.emplace(std::make_pair(collectionName, newCollection));
|
||||
databaseCollections.emplace(std::make_pair(collectionId, newCollection));
|
||||
|
||||
auto shardKeys =
|
||||
std::make_shared<std::vector<std::string>>(newCollection->shardKeys());
|
||||
newShardKeys.insert(make_pair(collectionId, shardKeys));
|
||||
// register with name as well as with id:
|
||||
databaseCollections.emplace(collectionName, newCollection);
|
||||
databaseCollections.emplace(collectionId, newCollection);
|
||||
|
||||
auto shardKeys = std::make_shared<std::vector<std::string>>( // shard keys
|
||||
newCollection->shardKeys() // args
|
||||
);
|
||||
|
||||
newShardKeys.emplace(collectionId, shardKeys);
|
||||
|
||||
auto shardIDs = newCollection->shardIds();
|
||||
auto shards = std::make_shared<std::vector<std::string>>();
|
||||
|
||||
for (auto const& p: *shardIDs) {
|
||||
shards->push_back(p.first);
|
||||
newShardServers.emplace(p.first, p.second);
|
||||
}
|
||||
|
||||
// Sort by the number in the shard ID ("s0000001" for example):
|
||||
std::sort(shards->begin(), shards->end(),
|
||||
std::sort( // sort
|
||||
shards->begin(), // begin
|
||||
shards->end(), // end
|
||||
[](std::string const& a, std::string const& b) -> bool {
|
||||
return std::strtol(a.c_str() + 1, nullptr, 10) <
|
||||
std::strtol(b.c_str() + 1, nullptr, 10);
|
||||
});
|
||||
newShards.emplace(std::make_pair(collectionId, shards));
|
||||
|
||||
} // comparator
|
||||
);
|
||||
newShards.emplace(collectionId, shards);
|
||||
} catch (std::exception const& ex) {
|
||||
// The plan contains invalid collection information.
|
||||
// This should not happen in healthy situations.
|
||||
|
@ -834,9 +890,8 @@ void ClusterInfo::loadPlan() {
|
|||
<< "Failed to load information for collection '"
|
||||
<< collectionId << "': " << ex.what()
|
||||
<< ". invalid information in plan. The "
|
||||
"collection will be ignored for now and the invalid "
|
||||
"information"
|
||||
"will be repaired. VelocyPack: "
|
||||
<< "collection will be ignored for now and the invalid "
|
||||
<< "information will be repaired. VelocyPack: "
|
||||
<< collectionSlice.toJson();
|
||||
|
||||
TRI_ASSERT(false);
|
||||
|
@ -849,8 +904,8 @@ void ClusterInfo::loadPlan() {
|
|||
LOG_TOPIC(ERR, Logger::AGENCY)
|
||||
<< "Failed to load information for collection '" << collectionId
|
||||
<< ". invalid information in plan. The collection will "
|
||||
"be ignored for now and the invalid information will "
|
||||
"be repaired. VelocyPack: "
|
||||
<< "be ignored for now and the invalid information will "
|
||||
<< "be repaired. VelocyPack: "
|
||||
<< collectionSlice.toJson();
|
||||
|
||||
TRI_ASSERT(false);
|
||||
|
@ -863,32 +918,31 @@ void ClusterInfo::loadPlan() {
|
|||
}
|
||||
|
||||
WRITE_LOCKER(writeLocker, _planProt.lock);
|
||||
|
||||
_plan = planBuilder;
|
||||
_planVersion = newPlanVersion;
|
||||
|
||||
if (swapDatabases) {
|
||||
_plannedDatabases.swap(newDatabases);
|
||||
}
|
||||
|
||||
if (swapCollections) {
|
||||
_plannedCollections.swap(newCollections);
|
||||
_shards.swap(newShards);
|
||||
_shardKeys.swap(newShardKeys);
|
||||
_shardServers.swap(newShardServers);
|
||||
}
|
||||
|
||||
if (swapViews) {
|
||||
_plannedViews.swap(_newPlannedViews);
|
||||
}
|
||||
|
||||
// mark plan as fully loaded only if all incoming objects were fully loaded
|
||||
// must still swap structures to allow creation of new vocbases and removal of stale datasources
|
||||
if (planValid) {
|
||||
_planProt.doneVersion = storedVersion;
|
||||
_planProt.isValid = true;
|
||||
} else {
|
||||
LOG_TOPIC(ERR, Logger::CLUSTER) << "\"Plan\" is not an object in agency";
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
LOG_TOPIC(DEBUG, Logger::CLUSTER)
|
||||
<< "Error while loading " << prefixPlan
|
||||
<< " httpCode: " << result.httpCode() << " errorCode: " << result.errorCode()
|
||||
<< " errorMessage: " << result.errorMessage() << " body: " << result.body();
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -901,8 +955,10 @@ static std::string const prefixCurrent = "Current";
|
|||
void ClusterInfo::loadCurrent() {
|
||||
++_currentProt.wantedVersion; // Indicate that after *NOW* somebody has to
|
||||
// reread from the agency!
|
||||
|
||||
MUTEX_LOCKER(mutexLocker, _currentProt.mutex); // only one may work at a time
|
||||
uint64_t storedVersion = _currentProt.wantedVersion; // this is the version
|
||||
|
||||
// we will set at the end
|
||||
if (_currentProt.doneVersion == storedVersion) {
|
||||
// Somebody else did, what we intended to do, so just return
|
||||
|
@ -912,36 +968,65 @@ void ClusterInfo::loadCurrent() {
|
|||
// Now contact the agency:
|
||||
AgencyCommResult result = _agency.getValues(prefixCurrent);
|
||||
|
||||
if (result.successful()) {
|
||||
velocypack::Slice slice = result.slice()[0].get(
|
||||
std::vector<std::string>({AgencyCommManager::path(), "Current"}));
|
||||
if (!result.successful()) {
|
||||
LOG_TOPIC(DEBUG, Logger::CLUSTER)
|
||||
<< "Error while loading " << prefixCurrent
|
||||
<< " httpCode: " << result.httpCode() << " errorCode: " << result.errorCode()
|
||||
<< " errorMessage: " << result.errorMessage() << " body: " << result.body();
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
auto resultSlice = result.slice();
|
||||
|
||||
if (!resultSlice.isArray() || resultSlice.length() != 1) {
|
||||
LOG_TOPIC(DEBUG, Logger::CLUSTER)
|
||||
<< "Error while loading " << prefixCurrent << " response structure is not an array of size 1"
|
||||
<< " httpCode: " << result.httpCode() << " errorCode: " << result.errorCode()
|
||||
<< " errorMessage: " << result.errorMessage() << " body: " << result.body();
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
auto slice = resultSlice[0].get( // get slice
|
||||
std::vector<std::string>({AgencyCommManager::path(), "Current"}) // args
|
||||
);
|
||||
auto currentBuilder = std::make_shared<velocypack::Builder>();
|
||||
|
||||
auto currentBuilder = std::make_shared<VPackBuilder>();
|
||||
currentBuilder->add(slice);
|
||||
|
||||
VPackSlice currentSlice = currentBuilder->slice();
|
||||
auto currentSlice = currentBuilder->slice();
|
||||
|
||||
if (!currentSlice.isObject()) {
|
||||
LOG_TOPIC(ERR, Logger::CLUSTER)
|
||||
<< "Current is not an object!";
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
if (currentSlice.isObject()) {
|
||||
uint64_t newCurrentVersion = 0;
|
||||
VPackSlice currentVersionSlice = currentSlice.get("Version");
|
||||
auto currentVersionSlice = currentSlice.get("Version");
|
||||
|
||||
if (currentVersionSlice.isNumber()) {
|
||||
try {
|
||||
newCurrentVersion = currentVersionSlice.getNumber<uint64_t>();
|
||||
} catch (...) {
|
||||
}
|
||||
}
|
||||
|
||||
if (newCurrentVersion == 0) {
|
||||
LOG_TOPIC(WARN, Logger::CLUSTER)
|
||||
<< "Attention: /arango/Current/Version in the agency is not set or "
|
||||
"not a positive number.";
|
||||
<< "Attention: /arango/Current/Version in the agency is not set or not a positive number.";
|
||||
}
|
||||
|
||||
{
|
||||
READ_LOCKER(guard, _currentProt.lock);
|
||||
|
||||
if (_currentProt.isValid && newCurrentVersion <= _currentVersion) {
|
||||
LOG_TOPIC(DEBUG, Logger::CLUSTER)
|
||||
<< "We already know this or a later version, do not update. "
|
||||
<< "newCurrentVersion=" << newCurrentVersion
|
||||
<< " _currentVersion=" << _currentVersion;
|
||||
<< "newCurrentVersion=" << newCurrentVersion << " _currentVersion=" << _currentVersion;
|
||||
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
@ -953,40 +1038,46 @@ void ClusterInfo::loadCurrent() {
|
|||
bool swapDatabases = false;
|
||||
bool swapCollections = false;
|
||||
|
||||
VPackSlice databasesSlice = currentSlice.get("Databases");
|
||||
if (databasesSlice.isObject()) {
|
||||
for (auto const& databaseSlicePair : VPackObjectIterator(databasesSlice)) {
|
||||
std::string const database = databaseSlicePair.key.copyString();
|
||||
auto currentDatabasesSlice = currentSlice.get("Databases");
|
||||
|
||||
if (currentDatabasesSlice.isObject()) {
|
||||
swapDatabases = true;
|
||||
|
||||
for (auto const& databaseSlicePair: velocypack::ObjectIterator(currentDatabasesSlice)) {
|
||||
auto const database = databaseSlicePair.key.copyString();
|
||||
|
||||
if (!databaseSlicePair.value.isObject()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
std::unordered_map<ServerID, VPackSlice> serverList;
|
||||
for (auto const& serverSlicePair :
|
||||
VPackObjectIterator(databaseSlicePair.value)) {
|
||||
serverList.insert(std::make_pair(serverSlicePair.key.copyString(),
|
||||
serverSlicePair.value));
|
||||
std::unordered_map<ServerID, velocypack::Slice> serverList;
|
||||
|
||||
for (auto const& serverSlicePair: velocypack::ObjectIterator(databaseSlicePair.value)) {
|
||||
serverList.emplace(serverSlicePair.key.copyString(), serverSlicePair.value);
|
||||
}
|
||||
|
||||
newDatabases.insert(std::make_pair(database, serverList));
|
||||
newDatabases.emplace(database, serverList);
|
||||
}
|
||||
swapDatabases = true;
|
||||
}
|
||||
|
||||
databasesSlice = currentSlice.get("Collections");
|
||||
if (databasesSlice.isObject()) {
|
||||
for (auto const& databaseSlice : VPackObjectIterator(databasesSlice)) {
|
||||
std::string const databaseName = databaseSlice.key.copyString();
|
||||
auto currentCollectionsSlice = currentSlice.get("Collections");
|
||||
|
||||
if (currentCollectionsSlice.isObject()) {
|
||||
swapCollections = true;
|
||||
|
||||
for (auto const& databaseSlice: velocypack::ObjectIterator(currentCollectionsSlice)) {
|
||||
auto const databaseName = databaseSlice.key.copyString();
|
||||
DatabaseCollectionsCurrent databaseCollections;
|
||||
for (auto const& collectionSlice : VPackObjectIterator(databaseSlice.value)) {
|
||||
std::string const collectionName = collectionSlice.key.copyString();
|
||||
|
||||
for (auto const& collectionSlice: velocypack::ObjectIterator(databaseSlice.value)) {
|
||||
auto const collectionName = collectionSlice.key.copyString();
|
||||
|
||||
auto collectionDataCurrent =
|
||||
std::make_shared<CollectionInfoCurrent>(newCurrentVersion);
|
||||
for (auto const& shardSlice : VPackObjectIterator(collectionSlice.value)) {
|
||||
std::string const shardID = shardSlice.key.copyString();
|
||||
|
||||
for (auto const& shardSlice: velocypack::ObjectIterator(collectionSlice.value)) {
|
||||
auto const shardID = shardSlice.key.copyString();
|
||||
|
||||
collectionDataCurrent->add(shardID, shardSlice.value);
|
||||
|
||||
// Note that we have only inserted the CollectionInfoCurrent under
|
||||
|
@ -998,42 +1089,38 @@ void ClusterInfo::loadCurrent() {
|
|||
|
||||
// Now take note of this shard and its responsible server:
|
||||
auto servers = std::make_shared<std::vector<ServerID>>(
|
||||
collectionDataCurrent->servers(shardID));
|
||||
newShardIds.insert(make_pair(shardID, servers));
|
||||
collectionDataCurrent->servers(shardID) // args
|
||||
);
|
||||
|
||||
newShardIds.emplace(shardID, servers);
|
||||
}
|
||||
databaseCollections.insert(std::make_pair(collectionName, collectionDataCurrent));
|
||||
|
||||
databaseCollections.emplace(collectionName, collectionDataCurrent);
|
||||
}
|
||||
newCollections.emplace(std::make_pair(databaseName, databaseCollections));
|
||||
|
||||
newCollections.emplace(databaseName, databaseCollections);
|
||||
}
|
||||
swapCollections = true;
|
||||
}
|
||||
|
||||
// Now set the new value:
|
||||
WRITE_LOCKER(writeLocker, _currentProt.lock);
|
||||
|
||||
_current = currentBuilder;
|
||||
_currentVersion = newCurrentVersion;
|
||||
|
||||
if (swapDatabases) {
|
||||
_currentDatabases.swap(newDatabases);
|
||||
}
|
||||
|
||||
if (swapCollections) {
|
||||
LOG_TOPIC(TRACE, Logger::CLUSTER)
|
||||
<< "Have loaded new collections current cache!";
|
||||
_currentCollections.swap(newCollections);
|
||||
_shardIds.swap(newShardIds);
|
||||
}
|
||||
|
||||
_currentProt.doneVersion = storedVersion;
|
||||
_currentProt.isValid = true;
|
||||
} else {
|
||||
LOG_TOPIC(ERR, Logger::CLUSTER) << "Current is not an object!";
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
LOG_TOPIC(DEBUG, Logger::CLUSTER)
|
||||
<< "Error while loading " << prefixCurrent
|
||||
<< " httpCode: " << result.httpCode() << " errorCode: " << result.errorCode()
|
||||
<< " errorMessage: " << result.errorMessage() << " body: " << result.body();
|
||||
}
|
||||
|
||||
/// @brief ask about a collection
|
||||
|
@ -2621,7 +2708,6 @@ Result ClusterInfo::ensureIndexCoordinatorInner( // create index
|
|||
_agencyCallbackRegistry->registerCallback(agencyCallback);
|
||||
auto cbGuard = scopeGuard(
|
||||
[&] { _agencyCallbackRegistry->unregisterCallback(agencyCallback); });
|
||||
|
||||
AgencyOperation newValue(planIndexesKey, AgencyValueOperationType::PUSH,
|
||||
newIndexBuilder.slice());
|
||||
AgencyOperation incrementVersion("Plan/Version", AgencySimpleOperationType::INCREMENT_OP);
|
||||
|
|
Loading…
Reference in New Issue