1
0
Fork 0

Persistent tree beginnings.

This commit is contained in:
Dan Larkin-York 2019-11-26 16:25:39 -05:00
parent 9dc41f9c15
commit 45ee2431b3
12 changed files with 34 additions and 10 deletions

View File

@ -1382,7 +1382,7 @@ Result DatabaseInitialSyncer::fetchCollectionSyncByRevisions(arangodb::LogicalCo
// diff with local tree
std::pair<std::size_t, std::size_t> fullRange = treeMaster->range();
std::unique_ptr<containers::RevisionTree> treeLocal =
physical->revisionTree(*trx, fullRange.first, fullRange.second);
physical->revisionTree(*trx, fullRange.second);
std::vector<std::pair<std::size_t, std::size_t>> ranges = treeMaster->diff(*treeLocal);
if (ranges.empty()) {
// no differences, done!

View File

@ -33,6 +33,7 @@
#include "Basics/VelocyPackHelper.h"
#include "Basics/WriteLocker.h"
#include "Basics/StaticStrings.h"
#include "Basics/hashes.h"
#include "Cluster/ClusterFeature.h"
#include "Cluster/ClusterHelpers.h"
#include "Cluster/ClusterMethods.h"
@ -2901,11 +2902,11 @@ void RestReplicationHandler::handleCommandRevisionTree() {
*static_cast<RevisionReplicationIterator*>(ctx.iter.get());
std::size_t constexpr maxDepth = 6;
std::size_t const rangeMin = it.hasMore() ? it.revision() : 0;
std::size_t const rangeMin = static_cast<std::size_t>(ctx.collection->minRevision());
containers::RevisionTree tree(maxDepth, rangeMin);
while (it.hasMore()) {
tree.insert(it.revision(), it.document().hashString());
tree.insert(it.revision(), TRI_FnvHashPod(it.revision())/*it.document().hashString()*/);
it.next();
}

View File

@ -27,6 +27,7 @@
#include "Basics/StaticStrings.h"
#include "Basics/StringUtils.h"
#include "Basics/VelocyPackHelper.h"
#include "Basics/hashes.h"
#include "Cache/CacheManagerFeature.h"
#include "Cache/Common.h"
#include "Cache/Manager.h"
@ -1190,19 +1191,20 @@ Result RocksDBCollection::remove(transaction::Methods& trx, velocypack::Slice sl
}
std::unique_ptr<containers::RevisionTree> RocksDBCollection::revisionTree(
transaction::Methods& trx, std::size_t rangeMin, std::size_t rangeMax) {
transaction::Methods& trx, std::size_t rangeMax) {
std::unique_ptr<ReplicationIterator> iter =
getReplicationIterator(ReplicationIterator::Ordering::Revision, trx);
RevisionReplicationIterator& it =
*static_cast<RevisionReplicationIterator*>(iter.get());
TRI_voc_rid_t rangeMin = _logicalCollection.minRevision();
std::size_t constexpr maxDepth = 6;
std::unique_ptr<containers::RevisionTree> tree =
std::make_unique<containers::RevisionTree>(maxDepth, rangeMin, rangeMax);
std::make_unique<containers::RevisionTree>(maxDepth, static_cast<std::size_t>(rangeMin), rangeMax);
it.seek(static_cast<TRI_voc_rid_t>(rangeMin));
it.seek(rangeMin);
while (it.hasMore() && it.revision() <= static_cast<TRI_voc_rid_t>(rangeMax)) {
tree->insert(it.revision(), it.document().hashString());
tree->insert(it.revision(), TRI_FnvHashPod(it.revision())/*it.document().hashString()*/);
it.next();
}

View File

@ -138,7 +138,6 @@ class RocksDBCollection final : public RocksDBMetaCollection {
inline bool cacheEnabled() const { return _cacheEnabled; }
std::unique_ptr<containers::RevisionTree> revisionTree(transaction::Methods& trx,
std::size_t rangeMin,
std::size_t rangeMax) override;
void adjustNumberDocuments(transaction::Methods&, int64_t) override;

View File

@ -529,7 +529,7 @@ Result PhysicalCollection::newObjectForReplace(transaction::Methods*,
}
std::unique_ptr<containers::RevisionTree> PhysicalCollection::revisionTree(
transaction::Methods& trx, std::size_t rangeMin, std::size_t rangeMax) {
transaction::Methods& trx, std::size_t rangeMax) {
return nullptr;
}

View File

@ -223,7 +223,7 @@ class PhysicalCollection {
bool isRestore, TRI_voc_rid_t& revisionId) const;
virtual std::unique_ptr<containers::RevisionTree> revisionTree(
transaction::Methods& trx, std::size_t rangeMin, std::size_t rangeMax);
transaction::Methods& trx, std::size_t rangeMax);
protected:
PhysicalCollection(LogicalCollection& collection, arangodb::velocypack::Slice const& info);

View File

@ -152,6 +152,7 @@ LogicalCollection::LogicalCollection(TRI_vocbase_t& vocbase, VPackSlice const& i
#endif
_usesRevisionsAsDocumentIds(
Helper::getBooleanValue(info, StaticStrings::UsesRevisionsAsDocumentIds, false)),
_minRevision(Helper::getNumericValue<TRI_voc_rid_t>(info, StaticStrings::MinRevision, 0)),
_waitForSync(Helper::getBooleanValue(info, StaticStrings::WaitForSyncString, false)),
_allowUserKeys(Helper::getBooleanValue(info, "allowUserKeys", true)),
#ifdef USE_ENTERPRISE
@ -439,6 +440,10 @@ bool LogicalCollection::usesRevisionsAsDocumentIds() const {
return _usesRevisionsAsDocumentIds;
}
TRI_voc_rid_t LogicalCollection::minRevision() const {
return _minRevision;
}
std::unique_ptr<FollowerInfo> const& LogicalCollection::followers() const {
return _followers;
}
@ -685,6 +690,7 @@ arangodb::Result LogicalCollection::appendVelocyPack(arangodb::velocypack::Build
result.add(StaticStrings::IsSmartChild, VPackValue(isSmartChild()));
result.add(StaticStrings::UsesRevisionsAsDocumentIds,
VPackValue(usesRevisionsAsDocumentIds()));
result.add(StaticStrings::MinRevision, VPackValue(minRevision()));
if (hasSmartJoinAttribute()) {
result.add(StaticStrings::SmartJoinAttribute, VPackValue(_smartJoinAttribute));

View File

@ -146,6 +146,7 @@ class LogicalCollection : public LogicalDataSource {
bool isSmartChild() const { return false; }
#endif
bool usesRevisionsAsDocumentIds() const;
TRI_voc_rid_t minRevision() const;
/// @brief is this a cluster-wide Plan (ClusterInfo) collection
bool isAStub() const { return _isAStub; }
/// @brief is this a cluster-wide Plan (ClusterInfo) collection
@ -379,6 +380,8 @@ class LogicalCollection : public LogicalDataSource {
bool const _usesRevisionsAsDocumentIds;
TRI_voc_rid_t const _minRevision;
// SECTION: Properties
bool _waitForSync;

View File

@ -294,6 +294,8 @@ Result Collections::create(TRI_vocbase_t& vocbase,
if (addUseRevs) {
helper.add(arangodb::StaticStrings::UsesRevisionsAsDocumentIds,
arangodb::velocypack::Value(useRevs));
helper.add(arangodb::StaticStrings::MinRevision,
arangodb::velocypack::Value(TRI_HybridLogicalClock()));
}
if (ServerState::instance()->isCoordinator()) {

View File

@ -244,6 +244,7 @@ std::string const StaticStrings::Sharding("sharding");
std::string const StaticStrings::Satellite("satellite");
std::string const StaticStrings::UsesRevisionsAsDocumentIds(
"usesRevisionsAsDocumentIds");
std::string const StaticStrings::MinRevision("minRevision");
// graph attribute names
std::string const StaticStrings::GraphCollection("_graphs");

View File

@ -225,6 +225,7 @@ class StaticStrings {
static std::string const Sharding;
static std::string const Satellite;
static std::string const UsesRevisionsAsDocumentIds;
static std::string const MinRevision;
// graph attribute names
static std::string const GraphCollection;

View File

@ -37,6 +37,9 @@
#include <utility>
#include <vector>
#include <chrono>
#include <thread>
#include "MerkleTree.h"
#include "Basics/debugging.h"
@ -336,6 +339,12 @@ std::size_t MerkleTree<BranchingBits, LockStripes>::index(std::size_t key,
// not thread-safe, lock buffer from outside
TRI_ASSERT(depth <= meta().maxDepth);
TRI_ASSERT(key >= meta().rangeMin);
if (key >= meta().rangeMax) {
while (true) {
LOG_DEVEL << "WAITING FOR DEBUG";
std::this_thread::sleep_for(std::chrono::seconds(10));
}
}
TRI_ASSERT(key < meta().rangeMax);
// special fast case