1
0
Fork 0

Parse Views hierarchy in loadPlan.

This commit is contained in:
Max Neunhoeffer 2018-03-08 15:07:46 +01:00
parent a8a307b532
commit 28be92ec52
No known key found for this signature in database
GPG Key ID: 89A912AD5E343E1E
4 changed files with 129 additions and 25 deletions

View File

@ -410,9 +410,11 @@ void ClusterInfo::loadPlan() {
decltype(_shards) newShards;
decltype(_shardServers) newShardServers;
decltype(_shardKeys) newShardKeys;
decltype(_plannedViews) newViews;
bool swapDatabases = false;
bool swapCollections = false;
bool swapViews = false;
VPackSlice databasesSlice;
databasesSlice = planSlice.get("Databases");
@ -425,10 +427,10 @@ void ClusterInfo::loadPlan() {
swapDatabases = true;
}
// mop: immediate children of collections are DATABASES, followed by their
// collections
// Immediate children of "Collections" are database names, then ids
// of collections, then one JSON object with the description:
//{
// "Plan":{"Collections": {
// "_system": {
// "3010001": {
// "deleted": false,
@ -474,7 +476,7 @@ void ClusterInfo::loadPlan() {
// "waitForSync": false
// },...
// },...
//}
// }}
databasesSlice = planSlice.get("Collections"); //format above
if (databasesSlice.isObject()) {
@ -612,6 +614,104 @@ void ClusterInfo::loadPlan() {
}
}
// Immediate children of "Views" are database names, then ids
// of views, then one JSON object with the description:
// "Plan":{"Views": {
// "_system": {
// "654321": {
// "id": "654321",
// "name": "v",
// "collections": [
// <list of cluster-wide collection IDs of linked collections>
// ]
// },...
// },...
// }}
// Now the same for views:
databasesSlice = planSlice.get("Views"); // format above
if (databasesSlice.isObject()) {
bool isCoordinator = ServerState::instance()->isCoordinator();
for (auto const& databasePairSlice :
VPackObjectIterator(databasesSlice)) {
VPackSlice const& viewsSlice = databasePairSlice.value;
if (!viewsSlice.isObject()) {
continue;
}
DatabaseViews databaseViews;
std::string const databaseName = databasePairSlice.key.copyString();
TRI_vocbase_t* vocbase = nullptr;
if (isCoordinator) {
vocbase = databaseFeature->lookupDatabaseCoordinator(databaseName);
} else {
vocbase = databaseFeature->lookupDatabase(databaseName);
}
if (vocbase == nullptr) {
// No database with this name found.
// We have an invalid state here.
continue;
}
for (auto const& viewPairSlice :
VPackObjectIterator(viewsSlice)) {
VPackSlice const& viewSlice = viewPairSlice.value;
if (!viewSlice.isObject()) {
continue;
}
std::string const viewId =
viewPairSlice.key.copyString();
try {
std::shared_ptr<LogicalView> newView;
newView = std::make_shared<LogicalView>(vocbase, viewSlice);
newView->setPlanVersion(newPlanVersion);
std::string const viewName = newView->name();
// mop: register with name as well as with id
databaseViews.emplace(
std::make_pair(viewName, newView));
databaseCollections.emplace(
std::make_pair(viewId, newView));
} catch (std::exception const& ex) {
// The Plan contains invalid view information.
// This should not happen in healthy situations.
// If it happens in unhealthy situations the
// cluster should not fail.
LOG_TOPIC(ERR, Logger::AGENCY)
<< "Failed to load information for view '" << viewId
<< "': " << ex.what() << ". invalid information in Plan. The"
"view will be ignored for now and the invalid information"
"will be repaired. VelocyPack: "
<< viewSlice.toJson();
TRI_ASSERT(false);
continue;
} catch (...) {
// The Plan contains invalid view information.
// This should not happen in healthy situations.
// If it happens in unhealthy situations the
// cluster should not fail.
LOG_TOPIC(ERR, Logger::AGENCY)
<< "Failed to load information for view '" << viewId
<< ". invalid information in Plan. The view will "
"be ignored for now and the invalid information will "
"be repaired. VelocyPack: "
<< viewSlice.toJson();
TRI_ASSERT(false);
continue;
}
}
newViews.emplace(
std::make_pair(databaseName, databaseViews));
swapViews = true;
}
}
WRITE_LOCKER(writeLocker, _planProt.lock);
_plan = planBuilder;
_planVersion = newPlanVersion;
@ -624,6 +724,9 @@ void ClusterInfo::loadPlan() {
_shardKeys.swap(newShardKeys);
_shardServers.swap(newShardServers);
}
if (swapViews) {
_plannedViews.swap(newViews);
}
_planProt.doneVersion = storedVersion;
_planProt.isValid = true; // will never be reset to false
} else {

View File

@ -189,8 +189,7 @@ LogicalCollection::LogicalCollection(LogicalCollection const& other)
_keyGenerator(KeyGenerator::factory(VPackSlice(keyOptions()))),
_globallyUniqueId(other._globallyUniqueId),
_physical(other.getPhysical()->clone(this)),
_clusterEstimateTTL(0),
_planVersion(other._planVersion) {
_clusterEstimateTTL(0) {
TRI_ASSERT(_physical != nullptr);
if (ServerState::instance()->isDBServer() ||
@ -239,8 +238,7 @@ LogicalCollection::LogicalCollection(TRI_vocbase_t* vocbase,
_globallyUniqueId(Helper::getStringValue(info, "globallyUniqueId", "")),
_physical(
EngineSelectorFeature::ENGINE->createPhysicalCollection(this, info)),
_clusterEstimateTTL(0),
_planVersion(0) {
_clusterEstimateTTL(0) {
TRI_ASSERT(info.isObject());
if (!IsAllowedName(info)) {

View File

@ -353,14 +353,6 @@ class LogicalCollection: public LogicalDataSource {
// with the checksum provided in the reference checksum
Result compareChecksums(velocypack::Slice checksumSlice, std::string const& referenceChecksum) const;
// Set and get _planVersion, this is only used if the object is used in
// ClusterInfo to represent a cluster wide collection in the agency.
void setPlanVersion(uint64_t v) {
_planVersion = v;
}
uint64_t getPlanVersion() const {
return _planVersion;
}
private:
void prepareIndexes(velocypack::Slice indexesSlice);
@ -443,14 +435,8 @@ class LogicalCollection: public LogicalDataSource {
std::unordered_map<std::string, double> _clusterEstimates;
double _clusterEstimateTTL; //only valid if above vector is not empty
basics::ReadWriteLock _clusterEstimatesLock;
uint64_t _planVersion; // Only set if setPlanVersion was called. This only
// happens in ClusterInfo when this object is used
// to represent a cluster wide collection. This is
// then the version in the agency Plan that underpins
// the information in this object. Otherwise 0.
};
} // namespace arangodb
#endif
#endif

View File

@ -73,7 +73,8 @@ class LogicalDataSource {
_vocbase(vocbase),
_id(id),
_planId(planId),
_deleted(deleted) {
_deleted(deleted),
_planVersion(0) {
}
LogicalDataSource(LogicalDataSource const& other)
: _name(other._name),
@ -81,7 +82,8 @@ class LogicalDataSource {
_vocbase(other._vocbase),
_id(other._id),
_planId(other._planId),
_deleted(other._deleted) {
_deleted(other._deleted),
_planVersion(other._planVersion) {
}
virtual ~LogicalDataSource() {}
@ -95,6 +97,15 @@ class LogicalDataSource {
inline Type const& type() const noexcept { return _type; }
inline TRI_vocbase_t* vocbase() const { return _vocbase; }
// Set and get _planVersion, this is only used if the object is used in
// ClusterInfo to represent a cluster wide collection in the agency.
void setPlanVersion(uint64_t v) {
_planVersion = v;
}
uint64_t getPlanVersion() const {
return _planVersion;
}
protected:
inline void deleted(bool deleted) noexcept { _deleted = deleted; }
inline void name(std::string&& name) noexcept { _name = std::move(name); }
@ -107,6 +118,12 @@ class LogicalDataSource {
TRI_voc_cid_t const _id; // local data-source id (current database node)
TRI_voc_cid_t const _planId; // global data-source id (cluster-wide)
bool _deleted; // data-source marked as deleted
uint64_t _planVersion; // Only set if setPlanVersion was called. This only
// happens in ClusterInfo when this object is used
// to represent a cluster wide collection. This is
// then the version in the agency Plan that underpins
// the information in this object. Otherwise 0.
};
} // arangodb