diff --git a/arangod/Agency/AgencyComm.cpp b/arangod/Agency/AgencyComm.cpp index 3ca8d6d764..d1b578c696 100644 --- a/arangod/Agency/AgencyComm.cpp +++ b/arangod/Agency/AgencyComm.cpp @@ -166,6 +166,8 @@ void AgencyOperation::toVelocyPack(VPackBuilder& builder) const { if (_opType.value == AgencyValueOperationType::OBSERVE || _opType.value == AgencyValueOperationType::UNOBSERVE) { builder.add("url", _value); + } else if (_opType.value == AgencyValueOperationType::ERASE) { + builder.add("val", _value); } else { builder.add("new", _value); } diff --git a/arangod/Agency/AgencyComm.h b/arangod/Agency/AgencyComm.h index bc27d7e90f..ca756b9b8f 100644 --- a/arangod/Agency/AgencyComm.h +++ b/arangod/Agency/AgencyComm.h @@ -84,7 +84,7 @@ enum class AgencyReadOperationType { READ }; // --SECTION-- AgencyValueOperationType // ----------------------------------------------------------------------------- -enum class AgencyValueOperationType { SET, OBSERVE, UNOBSERVE, PUSH, PREPEND }; +enum class AgencyValueOperationType { ERASE, SET, OBSERVE, UNOBSERVE, PUSH, PREPEND }; // ----------------------------------------------------------------------------- // --SECTION-- AgencySimpleOperationType @@ -130,6 +130,8 @@ class AgencyOperationType { return "push"; case AgencyValueOperationType::PREPEND: return "prepend"; + case AgencyValueOperationType::ERASE: + return "erase"; default: return "unknown_operation_type"; } diff --git a/arangod/Agency/Node.cpp b/arangod/Agency/Node.cpp index 03eabbd8ce..2fa4c3d031 100644 --- a/arangod/Agency/Node.cpp +++ b/arangod/Agency/Node.cpp @@ -464,24 +464,55 @@ bool Node::handle(VPackSlice const& slice) { return true; } -/// Remove element from any place in array by value array -template <> -bool Node::handle(VPackSlice const& slice) { - if (!slice.hasKey("val")) { +/// Remove element from any place in array by value or position +template <> bool Node::handle(VPackSlice const& slice) { + bool haveVal = slice.hasKey("val"); + bool havePos = slice.hasKey("pos"); + + if (!haveVal && !havePos) { LOG_TOPIC(WARN, Logger::AGENCY) - << "Operator erase without value to be erased: " << slice.toJson(); + << "Operator erase without value or position to be erased is illegal: " + << slice.toJson(); return false; + } else if (haveVal && havePos) { + LOG_TOPIC(WARN, Logger::AGENCY) + << "Operator erase with value and position to be erased is illegal: " + << slice.toJson(); + return false; + } else if (havePos && + (!slice.get("pos").isUInt() && !slice.get("pos").isSmallInt())) { + LOG_TOPIC(WARN, Logger::AGENCY) + << "Operator erase with non-positive integer position is illegal: " + << slice.toJson(); } + Builder tmp; { VPackArrayBuilder t(&tmp); + if (this->slice().isArray()) { - for (auto const& old : VPackArrayIterator(this->slice())) { - if (old != slice.get("val")) { - tmp.add(old); + if (haveVal) { + for (auto const& old : VPackArrayIterator(this->slice())) { + if (old != slice.get("val")) { + tmp.add(old); + } + } + } else { + size_t pos = slice.get("pos").getNumber(); + if (pos >= this->slice().length()) { + return false; + } + size_t n = 0; + for (const auto& old : VPackArrayIterator(this->slice())) { + if (n != pos) { + tmp.add(old); + } + ++n; } } } + } + *this = tmp.slice(); return true; } diff --git a/arangod/Aql/AstNode.cpp b/arangod/Aql/AstNode.cpp index c22f8dfa9b..fb5527f97d 100644 --- a/arangod/Aql/AstNode.cpp +++ b/arangod/Aql/AstNode.cpp @@ -237,98 +237,102 @@ int arangodb::aql::CompareAstNodes(AstNode const* lhs, AstNode const* rhs, if (diff < 0) { return -1; - } else if (diff > 0) { - return 1; + } + return 1; + } + + + switch (lType) { + case VPackValueType::Null: { + return 0; } - TRI_ASSERT(false); - return 0; - } - if (lType == VPackValueType::Null) { - return 0; - } + case VPackValueType::Bool: { + int diff = static_cast(lhs->getIntValue() - rhs->getIntValue()); - if (lType == VPackValueType::Bool) { - int diff = static_cast(lhs->getIntValue() - rhs->getIntValue()); - - if (diff != 0) { - if (diff < 0) { - return -1; + if (diff != 0) { + if (diff < 0) { + return -1; + } + return 1; } - return 1; - } - return 0; - } - - if (lType == VPackValueType::Int || lType == VPackValueType::Double) { - // TODO - double d = lhs->getDoubleValue() - rhs->getDoubleValue(); - if (d < 0.0) { - return -1; - } else if (d > 0.0) { - return 1; - } - return 0; - } - - if (lType == VPackValueType::String) { - if (compareUtf8) { - return TRI_compare_utf8(lhs->getStringValue(), lhs->getStringLength(), - rhs->getStringValue(), rhs->getStringLength()); - } - - size_t const minLength = - (std::min)(lhs->getStringLength(), rhs->getStringLength()); - - int res = memcmp(lhs->getStringValue(), rhs->getStringValue(), minLength); - - if (res != 0) { - return res; + return 0; } - int diff = static_cast(lhs->getStringLength()) - - static_cast(rhs->getStringLength()); - - if (diff != 0) { - return diff < 0 ? -1 : 1; + case VPackValueType::Int: + case VPackValueType::Double: { + // TODO + double d = lhs->getDoubleValue() - rhs->getDoubleValue(); + if (d != 0.0) { + if (d < 0.0) { + return -1; + } + return 1; + } + return 0; } - return 0; - } - if (lType == VPackValueType::Array) { - size_t const numLhs = lhs->numMembers(); - size_t const numRhs = rhs->numMembers(); - size_t const n = ((numLhs > numRhs) ? numRhs : numLhs); + case VPackValueType::String: { + if (compareUtf8) { + return TRI_compare_utf8(lhs->getStringValue(), lhs->getStringLength(), + rhs->getStringValue(), rhs->getStringLength()); + } + + size_t const minLength = + (std::min)(lhs->getStringLength(), rhs->getStringLength()); - for (size_t i = 0; i < n; ++i) { - int res = arangodb::aql::CompareAstNodes(lhs->getMember(i), - rhs->getMember(i), compareUtf8); + int res = memcmp(lhs->getStringValue(), rhs->getStringValue(), minLength); if (res != 0) { return res; } + + int diff = static_cast(lhs->getStringLength()) - + static_cast(rhs->getStringLength()); + + if (diff != 0) { + return diff < 0 ? -1 : 1; + } + return 0; } - if (numLhs < numRhs) { - return -1; - } else if (numLhs > numRhs) { - return 1; + + case VPackValueType::Array: { + size_t const numLhs = lhs->numMembers(); + size_t const numRhs = rhs->numMembers(); + size_t const n = ((numLhs > numRhs) ? numRhs : numLhs); + + for (size_t i = 0; i < n; ++i) { + int res = arangodb::aql::CompareAstNodes(lhs->getMember(i), + rhs->getMember(i), compareUtf8); + + if (res != 0) { + return res; + } + } + if (numLhs < numRhs) { + return -1; + } else if (numLhs > numRhs) { + return 1; + } + return 0; + } + + case VPackValueType::Object: { + // this is a rather exceptional case, so we can + // afford the inefficiency to convert the node to VPack + // for comparison (this saves us from writing our own compare function + // for array AstNodes) + auto l = lhs->toVelocyPackValue(); + auto r = rhs->toVelocyPackValue(); + + return basics::VelocyPackHelper::compare(l->slice(), r->slice(), compareUtf8); + } + + default: { + // all things equal + return 0; } - return 0; } - - if (lType == VPackValueType::Object) { - // this is a rather exceptional case, so we can - // afford the inefficiency to convert the node to VPack - // for comparison (this saves us from writing our own compare function - // for array AstNodes) - auto l = lhs->toVelocyPackValue(); - auto r = rhs->toVelocyPackValue(); - - return basics::VelocyPackHelper::compare(l->slice(), r->slice(), compareUtf8); - } - - // all things equal - return 0; } /// @brief returns whether or not the string is empty diff --git a/arangod/Cache/Cache.cpp b/arangod/Cache/Cache.cpp index dadc204562..7e832b4f7a 100644 --- a/arangod/Cache/Cache.cpp +++ b/arangod/Cache/Cache.cpp @@ -279,8 +279,7 @@ std::shared_ptr Cache::table() { return _table; } void Cache::beginShutdown() { _state.lock(); - if (!_state.isSet(State::Flag::shutdown) && - !_state.isSet(State::Flag::shuttingDown)) { + if (!_state.isSet(State::Flag::shutdown, State::Flag::shuttingDown)) { _state.toggleFlag(State::Flag::shuttingDown); } _state.unlock(); diff --git a/arangod/Cache/CacheManagerFeature.cpp b/arangod/Cache/CacheManagerFeature.cpp index 08a8047d83..ab9b29e18c 100644 --- a/arangod/Cache/CacheManagerFeature.cpp +++ b/arangod/Cache/CacheManagerFeature.cpp @@ -59,7 +59,7 @@ CacheManagerFeature::CacheManagerFeature( _cacheSize((TRI_PhysicalMemory >= (static_cast(4) << 30)) ? ((TRI_PhysicalMemory - (static_cast(2) << 30)) * 0.3) : (256 << 20)), - _rebalancingInterval(2 * 1000 * 1000) { + _rebalancingInterval(static_cast(2 * 1000 * 1000)) { setOptional(true); requiresElevatedPrivileges(false); startsAfter("Scheduler"); diff --git a/arangod/Cache/CachedValue.cpp b/arangod/Cache/CachedValue.cpp index 97a5a7691e..13b907b1c7 100644 --- a/arangod/Cache/CachedValue.cpp +++ b/arangod/Cache/CachedValue.cpp @@ -57,9 +57,13 @@ bool CachedValue::sameKey(void const* k, uint32_t kSize) const { return (0 == memcmp(key(), k, keySize)); } -void CachedValue::lease() { refCount++; } +void CachedValue::lease() { ++refCount; } -void CachedValue::release() { refCount--; } +void CachedValue::release() { + if (--refCount == UINT32_MAX) { + TRI_ASSERT(false); + } +} bool CachedValue::isFreeable() { return (refCount.load() == 0); } diff --git a/arangod/Cache/Finding.cpp b/arangod/Cache/Finding.cpp index 9bededca96..38aa083a47 100644 --- a/arangod/Cache/Finding.cpp +++ b/arangod/Cache/Finding.cpp @@ -26,6 +26,8 @@ using namespace arangodb::cache; +Finding::Finding() : _value(nullptr) {} + Finding::Finding(CachedValue* v) : _value(v) { if (_value != nullptr) { _value->lease(); @@ -83,6 +85,16 @@ Finding::~Finding() { void Finding::release() { if (_value != nullptr) { _value->release(); + // reset value so we do not unintentionally release multiple times + _value = nullptr; + } +} + +void Finding::set(CachedValue* v) { + TRI_ASSERT(_value == nullptr); + _value = v; + if (v != nullptr) { + _value->lease(); } } diff --git a/arangod/Cache/Finding.h b/arangod/Cache/Finding.h index 163d8b7f85..6665ae8411 100644 --- a/arangod/Cache/Finding.h +++ b/arangod/Cache/Finding.h @@ -41,7 +41,8 @@ namespace cache { //////////////////////////////////////////////////////////////////////////////// class Finding { public: - Finding(CachedValue* v); + Finding(); + explicit Finding(CachedValue* v); Finding(Finding const& other); Finding(Finding&& other); Finding& operator=(Finding const& other); @@ -53,6 +54,12 @@ class Finding { ////////////////////////////////////////////////////////////////////////////// void reset(CachedValue* v); + ////////////////////////////////////////////////////////////////////////////// + /// @brief Sets the underlying CachedValue pointer. Assumes that the Finding + /// is currently empty + ////////////////////////////////////////////////////////////////////////////// + void set(CachedValue* v); + ////////////////////////////////////////////////////////////////////////////// /// @brief Specifies whether the value was found. If not, value is nullptr. ////////////////////////////////////////////////////////////////////////////// diff --git a/arangod/Cache/FrequencyBuffer.h b/arangod/Cache/FrequencyBuffer.h index e9fe66cf7e..fd13cabaf4 100644 --- a/arangod/Cache/FrequencyBuffer.h +++ b/arangod/Cache/FrequencyBuffer.h @@ -64,7 +64,7 @@ class FrequencyBuffer { ////////////////////////////////////////////////////////////////////////////// /// @brief Initialize with the given capacity. ////////////////////////////////////////////////////////////////////////////// - FrequencyBuffer(uint64_t capacity) + explicit FrequencyBuffer(uint64_t capacity) : _current(0), _capacity(0), _mask(0), @@ -91,7 +91,7 @@ class FrequencyBuffer { ////////////////////////////////////////////////////////////////////////////// /// @brief Reports the memory usage in bytes. ////////////////////////////////////////////////////////////////////////////// - uint64_t memoryUsage() { + uint64_t memoryUsage() const { return ((_capacity * sizeof(T)) + sizeof(FrequencyBuffer) + sizeof(std::vector)); } diff --git a/arangod/Cache/Manager.cpp b/arangod/Cache/Manager.cpp index b17ba6bf72..6d2ca2467d 100644 --- a/arangod/Cache/Manager.cpp +++ b/arangod/Cache/Manager.cpp @@ -447,14 +447,14 @@ void Manager::reportAccess(std::shared_ptr cache) { void Manager::reportHitStat(Stat stat) { switch (stat) { case Stat::findHit: { - _findHits++; + ++_findHits; if (_enableWindowedStats && _findStats.get() != nullptr) { _findStats->insertRecord(static_cast(Stat::findHit)); } break; } case Stat::findMiss: { - _findMisses++; + ++_findMisses; if (_enableWindowedStats && _findStats.get() != nullptr) { _findStats->insertRecord(static_cast(Stat::findMiss)); } @@ -466,14 +466,12 @@ void Manager::reportHitStat(Stat stat) { bool Manager::isOperational() const { TRI_ASSERT(_state.isLocked()); - return (!_state.isSet(State::Flag::shutdown) && - !_state.isSet(State::Flag::shuttingDown)); + return !_state.isSet(State::Flag::shutdown, State::Flag::shuttingDown); } bool Manager::globalProcessRunning() const { TRI_ASSERT(_state.isLocked()); - return (_state.isSet(State::Flag::rebalancing) || - _state.isSet(State::Flag::resizing)); + return _state.isSet(State::Flag::rebalancing, State::Flag::resizing); } boost::asio::io_service* Manager::ioService() { return _ioService; } diff --git a/arangod/Cache/State.cpp b/arangod/Cache/State.cpp index 69c177e7f3..88f0a6017c 100644 --- a/arangod/Cache/State.cpp +++ b/arangod/Cache/State.cpp @@ -74,6 +74,11 @@ bool State::isSet(State::Flag flag) const { return ((_state.load() & static_cast(flag)) > 0); } +bool State::isSet(State::Flag flag1, State::Flag flag2) const { + TRI_ASSERT(isLocked()); + return ((_state.load() & (static_cast(flag1) | static_cast(flag2))) > 0); +} + void State::toggleFlag(State::Flag flag) { TRI_ASSERT(isLocked()); _state ^= static_cast(flag); diff --git a/arangod/Cache/State.h b/arangod/Cache/State.h index fc47eb4146..69e3550e17 100644 --- a/arangod/Cache/State.h +++ b/arangod/Cache/State.h @@ -107,6 +107,7 @@ struct State { /// @brief Checks whether the given flag is set. Requires state to be locked. ////////////////////////////////////////////////////////////////////////////// bool isSet(State::Flag flag) const; + bool isSet(State::Flag flag1, State::Flag flag2) const; ////////////////////////////////////////////////////////////////////////////// /// @brief Toggles the given flag. Requires state to be locked. diff --git a/arangod/Cache/TransactionalCache.cpp b/arangod/Cache/TransactionalCache.cpp index 545b66aa8c..4f0b373a3a 100644 --- a/arangod/Cache/TransactionalCache.cpp +++ b/arangod/Cache/TransactionalCache.cpp @@ -39,13 +39,11 @@ #include #include -#include - using namespace arangodb::cache; Finding TransactionalCache::find(void const* key, uint32_t keySize) { TRI_ASSERT(key != nullptr); - Finding result(nullptr); + Finding result; uint32_t hash = hashKey(key, keySize); bool ok; @@ -54,7 +52,7 @@ Finding TransactionalCache::find(void const* key, uint32_t keySize) { std::tie(ok, bucket, source) = getBucket(hash, Cache::triesFast); if (ok) { - result.reset(bucket->find(hash, key, keySize)); + result.set(bucket->find(hash, key, keySize)); recordStat(result.found() ? Stat::findHit : Stat::findMiss); bucket->unlock(); endOperation(); diff --git a/arangod/Cluster/ClusterInfo.cpp b/arangod/Cluster/ClusterInfo.cpp index f2165bf522..c374bb2517 100644 --- a/arangod/Cluster/ClusterInfo.cpp +++ b/arangod/Cluster/ClusterInfo.cpp @@ -1584,28 +1584,98 @@ int ClusterInfo::ensureIndexCoordinator( VPackSlice const& slice, bool create, bool (*compare)(VPackSlice const&, VPackSlice const&), VPackBuilder& resultBuilder, std::string& errorMsg, double timeout) { - AgencyComm ac; - - double const realTimeout = getTimeout(timeout); - double const endTime = TRI_microtime() + realTimeout; - double const interval = getPollInterval(); - - std::string where = - "Current/Collections/" + databaseName + "/" + collectionID; - + // check index id uint64_t iid = 0; - VPackSlice const idxSlice = slice.get("id"); - if (idxSlice.isString()) { + VPackSlice const idSlice = slice.get("id"); + if (idSlice.isString()) { // use predefined index id - iid = arangodb::basics::StringUtils::uint64(idxSlice.copyString()); + iid = arangodb::basics::StringUtils::uint64(idSlice.copyString()); } if (iid == 0) { // no id set, create a new one! iid = uniqid(); } + std::string const idString = arangodb::basics::StringUtils::itoa(iid); + + int errorCode = ensureIndexCoordinatorWithoutRollback( + databaseName, collectionID, idString, slice, create, compare, resultBuilder, errorMsg, timeout); + + if (errorCode == TRI_ERROR_NO_ERROR) { + return errorCode; + } + + std::shared_ptr planValue; + std::shared_ptr oldPlanIndexes; + std::shared_ptr c; + + size_t tries = 0; + do { + loadPlan(); + // find index in plan + planValue = nullptr; + oldPlanIndexes.reset(new VPackBuilder()); + + c = getCollection(databaseName, collectionID); + c->getIndexesVPack(*(oldPlanIndexes.get()), false); + VPackSlice const planIndexes = oldPlanIndexes->slice(); + + if (planIndexes.isArray()) { + for (auto const& index : VPackArrayIterator(planIndexes)) { + auto idPlanSlice = index.get("id"); + if (idPlanSlice.isString() && idPlanSlice.copyString() == idString) { + planValue.reset(new VPackBuilder()); + planValue->add(index); + break; + } + } + } + + if (!planValue) { + // hmm :S both empty :S did somebody else clean up? :S + // should not happen? + return errorCode; + } + std::string const planIndexesKey = "Plan/Collections/" + databaseName + "/" + collectionID +"/indexes"; + std::vector operations; + std::vector preconditions; + if (planValue) { + AgencyOperation planEraser(planIndexesKey, AgencyValueOperationType::ERASE, planValue->slice()); + TRI_ASSERT(oldPlanIndexes); + AgencyPrecondition planPrecondition(planIndexesKey, AgencyPrecondition::Type::VALUE, oldPlanIndexes->slice()); + operations.push_back(planEraser); + operations.push_back(AgencyOperation("Plan/Version", AgencySimpleOperationType::INCREMENT_OP)); + preconditions.push_back(planPrecondition); + } + + AgencyWriteTransaction trx(operations, preconditions); + AgencyCommResult eraseResult = _agency.sendTransactionWithFailover(trx, 0.0); + + if (eraseResult.successful()) { + loadPlan(); + return errorCode; + } + std::chrono::duration waitTime(10); + std::this_thread::sleep_for(waitTime); + } while (++tries < 5); + + LOG_TOPIC(ERR, Logger::CLUSTER) << "Couldn't roll back index creation of " << idString << ". Database: " << databaseName << ", Collection " << collectionID; + return errorCode; +} + +int ClusterInfo::ensureIndexCoordinatorWithoutRollback( + std::string const& databaseName, std::string const& collectionID, + std::string const& idString, VPackSlice const& slice, bool create, + bool (*compare)(VPackSlice const&, VPackSlice const&), + VPackBuilder& resultBuilder, std::string& errorMsg, double timeout) { + AgencyComm ac; + + double const realTimeout = getTimeout(timeout); + double const endTime = TRI_microtime() + realTimeout; + double const interval = getPollInterval(); + TRI_ASSERT(resultBuilder.isEmpty()); std::string const key = @@ -1639,8 +1709,6 @@ int ClusterInfo::ensureIndexCoordinator( std::shared_ptr c = getCollection(databaseName, collectionID); - READ_LOCKER(readLocker, _planProt.lock); - if (c == nullptr) { return setErrormsg(TRI_ERROR_ARANGO_COLLECTION_NOT_FOUND, errorMsg); } @@ -1705,8 +1773,6 @@ int ClusterInfo::ensureIndexCoordinator( return setErrormsg(TRI_ERROR_CLUSTER_AGENCY_STRUCTURE_INVALID, errorMsg); } - std::string const idString = arangodb::basics::StringUtils::itoa(iid); - try { VPackObjectBuilder b(newBuilder.get()); // Create a new collection VPack with the new Index @@ -1833,6 +1899,9 @@ int ClusterInfo::ensureIndexCoordinator( // local variables. Therefore we have to protect all accesses to them // by a mutex. We use the mutex of the condition variable in the // AgencyCallback for this. + std::string where = + "Current/Collections/" + databaseName + "/" + collectionID; + auto agencyCallback = std::make_shared(ac, where, dbServerChanged, true, false); _agencyCallbackRegistry->registerCallback(agencyCallback); diff --git a/arangod/Cluster/ClusterInfo.h b/arangod/Cluster/ClusterInfo.h index 315cd16b37..e807066741 100644 --- a/arangod/Cluster/ClusterInfo.h +++ b/arangod/Cluster/ClusterInfo.h @@ -538,6 +538,17 @@ class ClusterInfo { double getReloadServerListTimeout() const { return 60.0; } + ////////////////////////////////////////////////////////////////////////////// + /// @brief ensure an index in coordinator. + ////////////////////////////////////////////////////////////////////////////// + + int ensureIndexCoordinatorWithoutRollback( + std::string const& databaseName, std::string const& collectionID, + std::string const& idSlice, arangodb::velocypack::Slice const& slice, bool create, + bool (*compare)(arangodb::velocypack::Slice const&, + arangodb::velocypack::Slice const&), + arangodb::velocypack::Builder& resultBuilder, std::string& errorMsg, double timeout); + ////////////////////////////////////////////////////////////////////////////// /// @brief object for agency communication ////////////////////////////////////////////////////////////////////////////// diff --git a/arangod/RestServer/arangod.cpp b/arangod/RestServer/arangod.cpp index 24c002fe4c..059fe206c3 100644 --- a/arangod/RestServer/arangod.cpp +++ b/arangod/RestServer/arangod.cpp @@ -116,8 +116,8 @@ static int runServer(int argc, char** argv, ArangoGlobalContext &context) { "Action", "Affinity", "Agency", "Authentication", "Cluster", "Daemon", - "Dispatcher", "FoxxQueues", - "GeneralServer", "LoggerBufferFeature", + "FoxxQueues", "GeneralServer", + "Greetings", "LoggerBufferFeature", "Server", "SslServer", "Statistics", "Supervisor"}; diff --git a/arangod/RocksDBEngine/RocksDBCollection.cpp b/arangod/RocksDBEngine/RocksDBCollection.cpp index 22aa65690f..12ad31a069 100644 --- a/arangod/RocksDBEngine/RocksDBCollection.cpp +++ b/arangod/RocksDBEngine/RocksDBCollection.cpp @@ -201,8 +201,8 @@ void RocksDBCollection::open(bool ignoreErrors) { RocksDBEngine* engine = static_cast(EngineSelectorFeature::ENGINE); auto counterValue = engine->counterManager()->loadCounter(this->objectId()); - LOG_TOPIC(ERR, Logger::DEVEL) - << " number of documents: " << counterValue.added(); + LOG_TOPIC(ERR, Logger::DEVEL) << " number of documents: " + << counterValue.added(); _numberDocuments = counterValue.added() - counterValue.removed(); _revisionId = counterValue.revisionId(); @@ -433,10 +433,9 @@ std::shared_ptr RocksDBCollection::createIndex( idx->toVelocyPack(indexInfo, false, true); int res = static_cast(engine)->writeCreateCollectionMarker( _logicalCollection->vocbase()->id(), _logicalCollection->cid(), - builder.slice(), - RocksDBLogValue::IndexCreate(_logicalCollection->vocbase()->id(), - _logicalCollection->cid(), - indexInfo.slice())); + builder.slice(), RocksDBLogValue::IndexCreate( + _logicalCollection->vocbase()->id(), + _logicalCollection->cid(), indexInfo.slice())); if (res != TRI_ERROR_NO_ERROR) { // We could not persist the index creation. Better abort // Remove the Index in the local list again. @@ -516,10 +515,9 @@ int RocksDBCollection::restoreIndex(transaction::Methods* trx, TRI_ASSERT(engine != nullptr); int res = engine->writeCreateCollectionMarker( _logicalCollection->vocbase()->id(), _logicalCollection->cid(), - builder.slice(), - RocksDBLogValue::IndexCreate(_logicalCollection->vocbase()->id(), - _logicalCollection->cid(), - indexInfo.slice())); + builder.slice(), RocksDBLogValue::IndexCreate( + _logicalCollection->vocbase()->id(), + _logicalCollection->cid(), indexInfo.slice())); if (res != TRI_ERROR_NO_ERROR) { // We could not persist the index creation. Better abort // Remove the Index in the local list again. @@ -553,6 +551,7 @@ bool RocksDBCollection::dropIndex(TRI_idx_iid_t iid) { WRITE_LOCKER(guard, _indexesLock); for (auto index : _indexes) { RocksDBIndex* cindex = static_cast(index.get()); + TRI_ASSERT(cindex != nullptr); if (iid == cindex->id()) { int rv = cindex->drop(); @@ -564,7 +563,7 @@ bool RocksDBCollection::dropIndex(TRI_idx_iid_t iid) { _indexes.erase(_indexes.begin() + i); events::DropIndex("", std::to_string(iid), TRI_ERROR_NO_ERROR); // toVelocyPackIgnore will take a read lock and we don't need the - // lock anymore, we will always return + // lock anymore, we will always return guard.unlock(); VPackBuilder builder = _logicalCollection->toVelocyPackIgnore( @@ -664,6 +663,7 @@ void RocksDBCollection::truncate(transaction::Methods* trx, RocksDBIndex* rindex = static_cast(index.get()); rindex->truncate(trx); } + _needToPersistIndexEstimates = true; } /* @@ -1203,7 +1203,7 @@ void RocksDBCollection::figuresSpecific( /// @brief creates the initial indexes for the collection void RocksDBCollection::createInitialIndexes() { - { // addIndex holds an internal write lock + { // addIndex holds an internal write lock READ_LOCKER(guard, _indexesLock); if (!_indexes.empty()) { return; @@ -1246,7 +1246,7 @@ void RocksDBCollection::addIndex(std::shared_ptr idx) { void RocksDBCollection::addIndexCoordinator( std::shared_ptr idx) { WRITE_LOCKER(guard, _indexesLock); - + auto const id = idx->id(); for (auto const& it : _indexes) { if (it->id() == id) { @@ -1334,8 +1334,8 @@ arangodb::Result RocksDBCollection::fillIndexes( // occured, this needs to happen since we are non transactional if (!r.ok()) { iter->reset(); - rocksdb::WriteBatchWithIndex removeBatch(db->DefaultColumnFamily()->GetComparator(), - 32 * 1024 * 1024); + rocksdb::WriteBatchWithIndex removeBatch( + db->DefaultColumnFamily()->GetComparator(), 32 * 1024 * 1024); res = TRI_ERROR_NO_ERROR; auto removeCb = [&](DocumentIdentifierToken token) { @@ -1358,6 +1358,9 @@ arangodb::Result RocksDBCollection::fillIndexes( // Simon: Don't think so db->Write(writeOpts, removeBatch.GetWriteBatch()); } + if (numDocsWritten > 0) { + _needToPersistIndexEstimates = true; + } return r; } @@ -1418,12 +1421,12 @@ RocksDBOperationResult RocksDBCollection::insertDocument( READ_LOCKER(guard, _indexesLock); for (std::shared_ptr const& idx : _indexes) { innerRes.reset(idx->insert(trx, revisionId, doc, false)); - + // in case of no-memory, return immediately if (innerRes.is(TRI_ERROR_OUT_OF_MEMORY)) { return innerRes; } - + if (innerRes.fail()) { // "prefer" unique constraint violated over other errors if (innerRes.is(TRI_ERROR_ARANGO_UNIQUE_CONSTRAINT_VIOLATED) || @@ -1444,6 +1447,7 @@ RocksDBOperationResult RocksDBCollection::insertDocument( if (waitForSync) { trx->state()->waitForSync(true); } + _needToPersistIndexEstimates = true; } return res; @@ -1500,6 +1504,7 @@ RocksDBOperationResult RocksDBCollection::removeDocument( if (waitForSync) { trx->state()->waitForSync(true); } + _needToPersistIndexEstimates = true; } return res; @@ -1744,6 +1749,7 @@ uint64_t RocksDBCollection::recalculateCounts() { // need correction. The value is not changed and does not need to be synced globalRocksEngine()->counterManager()->sync(true); } + trx.commit(); return _numberDocuments; } @@ -1788,6 +1794,73 @@ void RocksDBCollection::estimateSize(velocypack::Builder& builder) { builder.close(); } +arangodb::Result RocksDBCollection::serializeIndexEstimates( + rocksdb::Transaction* rtrx) const { + if (!_needToPersistIndexEstimates) { + return {TRI_ERROR_NO_ERROR}; + } + _needToPersistIndexEstimates = false; + std::string output; + rocksdb::TransactionDB* tdb = rocksutils::globalRocksDB(); + for (auto index : getIndexes()) { + output.clear(); + RocksDBIndex* cindex = static_cast(index.get()); + TRI_ASSERT(cindex != nullptr); + rocksutils::uint64ToPersistent(output, static_cast(tdb->GetLatestSequenceNumber())); + cindex->serializeEstimate(output); + if (output.size() > sizeof(uint64_t)) { + RocksDBKey key = + RocksDBKey::IndexEstimateValue(cindex->objectId()); + rocksdb::Slice value(output); + rocksdb::Status s = rtrx->Put(key.string(), value); + + if (!s.ok()) { + LOG_TOPIC(WARN, Logger::ENGINES) << "writing index estimates failed"; + rtrx->Rollback(); + return rocksutils::convertStatus(s); + } + } + } + return {TRI_ERROR_NO_ERROR}; +} + +void RocksDBCollection::deserializeIndexEstimates(RocksDBCounterManager* mgr) { + std::vector> toRecalculate; + for (auto const& it : getIndexes()) { + auto idx = static_cast(it.get()); + if (!idx->deserializeEstimate(mgr)) { + toRecalculate.push_back(it); + } + } + if (!toRecalculate.empty()) { + recalculateIndexEstimates(toRecalculate); + } +} + +void RocksDBCollection::recalculateIndexEstimates() { + auto idxs = getIndexes(); + recalculateIndexEstimates(idxs); +} + +void RocksDBCollection::recalculateIndexEstimates(std::vector>& indexes) { + // start transaction to get a collection lock + arangodb::SingleCollectionTransaction trx( + arangodb::transaction::StandaloneContext::Create( + _logicalCollection->vocbase()), + _logicalCollection->cid(), AccessMode::Type::EXCLUSIVE); + auto res = trx.begin(); + if (res.fail()) { + THROW_ARANGO_EXCEPTION(res); + } + + for (auto const& it : indexes) { + auto idx = static_cast(it.get()); + idx->recalculateEstimates(); + } + _needToPersistIndexEstimates = true; + trx.commit(); +} + void RocksDBCollection::createCache() const { if (!_useCache || _cachePresent) { // we leave this if we do not need the cache @@ -1826,7 +1899,7 @@ void RocksDBCollection::blackListKey(char const* data, std::size_t len) const { bool blacklisted = false; uint64_t attempts = 0; while (!blacklisted) { - blacklisted = _cache->blacklist(data,(uint32_t)len); + blacklisted = _cache->blacklist(data, static_cast(len)); if (attempts++ % 10 == 0) { if (_cache->isShutdown()) { disableCache(); diff --git a/arangod/RocksDBEngine/RocksDBCollection.h b/arangod/RocksDBEngine/RocksDBCollection.h index 8298921fff..c9e6d4750b 100644 --- a/arangod/RocksDBEngine/RocksDBCollection.h +++ b/arangod/RocksDBEngine/RocksDBCollection.h @@ -32,6 +32,10 @@ #include "VocBase/KeyGenerator.h" #include "VocBase/ManagedDocumentResult.h" +namespace rocksdb { +class Transaction; +} + namespace arangodb { namespace cache { class Cache; @@ -184,6 +188,11 @@ class RocksDBCollection final : public PhysicalCollection { bool hasGeoIndex() { return _hasGeoIndex; } + Result serializeIndexEstimates(rocksdb::Transaction*) const; + void deserializeIndexEstimates(arangodb::RocksDBCounterManager* mgr); + + void recalculateIndexEstimates(); + private: /// @brief return engine-specific figures void figuresSpecific( @@ -220,15 +229,21 @@ class RocksDBCollection final : public PhysicalCollection { arangodb::Result lookupRevisionVPack(TRI_voc_rid_t, transaction::Methods*, arangodb::ManagedDocumentResult&) const; + void recalculateIndexEstimates(std::vector>& indexes); + void createCache() const; + void disableCache() const; + inline bool useCache() const { return (_useCache && _cachePresent); } + void blackListKey(char const* data, std::size_t len) const; private: uint64_t const _objectId; // rocksdb-specific object id for collection std::atomic _numberDocuments; std::atomic _revisionId; + mutable std::atomic _needToPersistIndexEstimates; /// upgrade write locks to exclusive locks if this flag is set bool _hasGeoIndex; diff --git a/arangod/RocksDBEngine/RocksDBCommon.cpp b/arangod/RocksDBEngine/RocksDBCommon.cpp index 9ebad1dff1..4a1bca0838 100644 --- a/arangod/RocksDBEngine/RocksDBCommon.cpp +++ b/arangod/RocksDBEngine/RocksDBCommon.cpp @@ -132,6 +132,34 @@ void uint64ToPersistent(std::string& p, uint64_t value) { } while (++len < sizeof(uint64_t)); } +uint16_t uint16FromPersistent(char const* p) { + uint16_t value = 0; + uint16_t x = 0; + uint8_t const* ptr = reinterpret_cast(p); + uint8_t const* end = ptr + sizeof(uint16_t); + do { + value += static_cast(*ptr++) << x; + x += 8; + } while (ptr < end); + return value; +} + +void uint16ToPersistent(char* p, uint16_t value) { + char* end = p + sizeof(uint16_t); + do { + *p++ = static_cast(value & 0xffU); + value >>= 8; + } while (p < end); +} + +void uint16ToPersistent(std::string& p, uint16_t value) { + size_t len = 0; + do { + p.push_back(static_cast(value & 0xffU)); + value >>= 8; + } while (++len < sizeof(uint16_t)); +} + bool hasObjectIds(VPackSlice const& inputSlice) { bool rv = false; if (inputSlice.isObject()) { diff --git a/arangod/RocksDBEngine/RocksDBCommon.h b/arangod/RocksDBEngine/RocksDBCommon.h index 9c25c00a5e..8cd2749986 100644 --- a/arangod/RocksDBEngine/RocksDBCommon.h +++ b/arangod/RocksDBEngine/RocksDBCommon.h @@ -86,6 +86,10 @@ uint64_t uint64FromPersistent(char const* p); void uint64ToPersistent(char* p, uint64_t value); void uint64ToPersistent(std::string& out, uint64_t value); +uint16_t uint16FromPersistent(char const* p); +void uint16ToPersistent(char* p, uint16_t value); +void uint16ToPersistent(std::string& out, uint16_t value); + std::pair>> stripObjectIds( VPackSlice const& inputSlice, bool checkBeforeCopy = true); diff --git a/arangod/RocksDBEngine/RocksDBCounterManager.cpp b/arangod/RocksDBEngine/RocksDBCounterManager.cpp index da2a196e8d..9369a25e40 100644 --- a/arangod/RocksDBEngine/RocksDBCounterManager.cpp +++ b/arangod/RocksDBEngine/RocksDBCounterManager.cpp @@ -23,15 +23,22 @@ #include "RocksDBCounterManager.h" +#include "ApplicationFeatures/ApplicationServer.h" #include "Basics/ReadLocker.h" #include "Basics/StringUtils.h" #include "Basics/VelocyPackHelper.h" #include "Basics/WriteLocker.h" #include "Logger/Logger.h" +#include "RestServer/DatabaseFeature.h" +#include "RocksDBEngine/RocksDBCollection.h" #include "RocksDBEngine/RocksDBCommon.h" +#include "RocksDBEngine/RocksDBCuckooIndexEstimator.h" +#include "RocksDBEngine/RocksDBEdgeIndex.h" #include "RocksDBEngine/RocksDBKey.h" #include "RocksDBEngine/RocksDBKeyBounds.h" +#include "RocksDBEngine/RocksDBVPackIndex.h" #include "RocksDBEngine/RocksDBValue.h" +#include "StorageEngine/EngineSelectorFeature.h" #include "VocBase/ticks.h" #include @@ -44,6 +51,7 @@ #include using namespace arangodb; +using namespace arangodb::application_features; RocksDBCounterManager::CMValue::CMValue(VPackSlice const& slice) : _sequenceNum(0), _count(0), _revisionId(0) { @@ -74,6 +82,7 @@ void RocksDBCounterManager::CMValue::serialize(VPackBuilder& b) const { RocksDBCounterManager::RocksDBCounterManager(rocksdb::DB* db) : _syncing(false), _db(db) { readSettings(); + readIndexEstimates(); readCounterValues(); if (!_counters.empty()) { @@ -135,7 +144,8 @@ void RocksDBCounterManager::updateCounter(uint64_t objectId, } } -arangodb::Result RocksDBCounterManager::setAbsoluteCounter(uint64_t objectId, uint64_t value) { +arangodb::Result RocksDBCounterManager::setAbsoluteCounter(uint64_t objectId, + uint64_t value) { arangodb::Result res; WRITE_LOCKER(guard, _rwLock); auto it = _counters.find(objectId); @@ -219,7 +229,7 @@ Result RocksDBCounterManager::sync(bool force) { return rocksutils::convertStatus(s); } } - + // now write global settings b.clear(); b.openObject(); @@ -241,6 +251,46 @@ Result RocksDBCounterManager::sync(bool force) { return rocksutils::convertStatus(s); } + // Now persist the index estimates: + { + for (auto const& pair : copy) { + auto dbColPair = rocksutils::mapObjectToCollection(pair.first); + if (dbColPair.second == 0 && dbColPair.first == 0) { + // collection with this objectID not known.Skip. + continue; + } + auto dbfeature = + ApplicationServer::getFeature("Database"); + TRI_ASSERT(dbfeature != nullptr); + auto vocbase = dbfeature->useDatabase(dbColPair.first); + if (vocbase == nullptr) { + // Bad state, we have references to a database that is not known + // anymore. + // However let's just skip in production. Not allowed to crash. + // If we cannot find this infos during recovery we can either recompute + // or start fresh. + continue; + } + auto collection = vocbase->lookupCollection(dbColPair.second); + if (collection == nullptr) { + // Bad state, we have references to a collection that is not known + // anymore. + // However let's just skip in production. Not allowed to crash. + // If we cannot find this infos during recovery we can either recompute + // or start fresh. + continue; + } + std::string estimateSerialisation; + auto rocksCollection = + static_cast(collection->getPhysical()); + TRI_ASSERT(rocksCollection != nullptr); + Result res = rocksCollection->serializeIndexEstimates(rtrx.get()); + if (!res.ok()) { + return res; + } + } + } + // we have to commit all counters in one batch s = rtrx->Commit(); if (s.ok()) { @@ -271,7 +321,7 @@ void RocksDBCounterManager::readSettings() { basics::VelocyPackHelper::stringUInt64(slice.get("tick")); LOG_TOPIC(TRACE, Logger::ENGINES) << "using last tick: " << lastTick; TRI_UpdateTickServer(lastTick); - + if (slice.hasKey("hlc")) { uint64_t lastHlc = basics::VelocyPackHelper::stringUInt64(slice.get("hlc")); @@ -286,6 +336,55 @@ void RocksDBCounterManager::readSettings() { } } +void RocksDBCounterManager::readIndexEstimates() { + WRITE_LOCKER(guard, _rwLock); + RocksDBKeyBounds bounds = RocksDBKeyBounds::IndexEstimateValues(); + + rocksdb::Comparator const* cmp = _db->GetOptions().comparator; + rocksdb::ReadOptions readOptions; + std::unique_ptr iter(_db->NewIterator(readOptions)); + iter->Seek(bounds.start()); + + for (; iter->Valid() && cmp->Compare(iter->key(), bounds.end()) < 0; + iter->Next()) { + uint64_t objectId = RocksDBKey::counterObjectId(iter->key()); + uint64_t lastSeqNumber = + rocksutils::uint64FromPersistent(iter->value().data()); + + StringRef estimateSerialisation(iter->value().data() + sizeof(uint64_t), + iter->value().size() - sizeof(uint64_t)); + // If this hits we have two estimates for the same index + TRI_ASSERT(_estimators.find(objectId) == _estimators.end()); + _estimators.emplace( + objectId, + std::make_pair(lastSeqNumber, + std::make_unique>( + estimateSerialisation))); + } +} + +std::unique_ptr> +RocksDBCounterManager::stealIndexEstimator(uint64_t objectId) { + std::unique_ptr> res(nullptr); + auto it = _estimators.find(objectId); + if (it != _estimators.end()) { + // We swap out the stored estimate in order to move it to the caller + res.swap(it->second.second); + // Drop the now empty estimator + _estimators.erase(objectId); + } + return res; +} + +void RocksDBCounterManager::clearIndexEstimators() { + // We call this to remove all index estimators that have been stored but are + // no longer read + // by recovery. + + // TODO REMOVE RocksDB Keys of all not stolen values? + _estimators.clear(); +} + /// Parse counter values from rocksdb void RocksDBCounterManager::readCounterValues() { WRITE_LOCKER(guard, _rwLock); @@ -312,14 +411,26 @@ struct WBReader final : public rocksdb::WriteBatch::Handler { // must be set by the counter manager std::unordered_map seqStart; std::unordered_map deltas; + std::unordered_map< + uint64_t, + std::pair>>>* + _estimators; rocksdb::SequenceNumber currentSeqNum; uint64_t _maxTick = 0; uint64_t _maxHLC = 0; - WBReader() : currentSeqNum(0) {} + WBReader(std::unordered_map< + uint64_t, + std::pair>>>* + estimators) + : _estimators(estimators), currentSeqNum(0) {} + ~WBReader() { // update ticks after parsing wal - LOG_TOPIC(TRACE, Logger::ENGINES) << "max tick found in WAL: " << _maxTick << ", last HLC value: " << _maxHLC; + LOG_TOPIC(TRACE, Logger::ENGINES) << "max tick found in WAL: " << _maxTick + << ", last HLC value: " << _maxHLC; TRI_UpdateTickServer(_maxTick); TRI_HybridLogicalClock(_maxHLC); @@ -367,15 +478,16 @@ struct WBReader final : public rocksdb::WriteBatch::Handler { storeMaxHLC(RocksDBKey::revisionId(key)); } else if (type == RocksDBEntryType::PrimaryIndexValue) { // document key - StringRef ref = RocksDBKey::primaryKey(key); + StringRef ref = RocksDBKey::primaryKey(key); TRI_ASSERT(!ref.empty()); // check if the key is numeric - if (ref[0] >= '1' && ref[0] <= '9') { + if (ref[0] >= '1' && ref[0] <= '9') { // numeric start byte. looks good try { // extract uint64_t value from key. this will throw if the key // is non-numeric - uint64_t tick = basics::StringUtils::uint64_check(ref.data(), ref.size()); + uint64_t tick = + basics::StringUtils::uint64_check(ref.data(), ref.size()); // if no previous _maxTick set or the numeric value found is // "near" our previous _maxTick, then we update it if (tick > _maxTick && (_maxTick == 0 || tick - _maxTick < 2048)) { @@ -415,6 +527,32 @@ struct WBReader final : public rocksdb::WriteBatch::Handler { it->second._added++; it->second._revisionId = revisionId; } + } else { + // We have to adjust the estimate with an insert + switch (RocksDBKey::type(key)) { + case RocksDBEntryType::IndexValue: { + uint64_t objectId = RocksDBKey::counterObjectId(key); + auto it = _estimators->find(objectId); + if (it != _estimators->end() && it->second.first < currentSeqNum) { + // We track estimates for this index + uint64_t hash = RocksDBVPackIndex::HashForKey(key); + it->second.second->insert(hash); + } + break; + } + case RocksDBEntryType::EdgeIndexValue: { + uint64_t objectId = RocksDBKey::counterObjectId(key); + auto it = _estimators->find(objectId); + if (it != _estimators->end() && it->second.first < currentSeqNum) { + // We track estimates for this index + uint64_t hash = RocksDBEdgeIndex::HashForKey(key); + it->second.second->insert(hash); + } + break; + } + default: + break; + } } } @@ -429,6 +567,32 @@ struct WBReader final : public rocksdb::WriteBatch::Handler { it->second._removed++; it->second._revisionId = revisionId; } + } else { + // We have to adjust the estimate with an remove + switch (RocksDBKey::type(key)) { + case RocksDBEntryType::IndexValue: { + uint64_t objectId = RocksDBKey::counterObjectId(key); + auto it = _estimators->find(objectId); + if (it != _estimators->end() && it->second.first < currentSeqNum) { + // We track estimates for this index + uint64_t hash = RocksDBVPackIndex::HashForKey(key); + it->second.second->remove(hash); + } + break; + } + case RocksDBEntryType::EdgeIndexValue: { + uint64_t objectId = RocksDBKey::counterObjectId(key); + auto it = _estimators->find(objectId); + if (it != _estimators->end() && it->second.first < currentSeqNum) { + // We track estimates for this index + uint64_t hash = RocksDBEdgeIndex::HashForKey(key); + it->second.second->remove(hash); + } + break; + } + default: + break; + } } } @@ -442,8 +606,8 @@ bool RocksDBCounterManager::parseRocksWAL() { rocksdb::SequenceNumber start = UINT64_MAX; // Tell the WriteBatch reader the transaction markers to look for - auto handler = std::make_unique(); - + auto handler = std::make_unique(&_estimators); + for (auto const& pair : _counters) { handler->seqStart.emplace(pair.first, pair.second._sequenceNum); start = std::min(start, pair.second._sequenceNum); diff --git a/arangod/RocksDBEngine/RocksDBCounterManager.h b/arangod/RocksDBEngine/RocksDBCounterManager.h index 654e3cdf54..6897fc28c8 100644 --- a/arangod/RocksDBEngine/RocksDBCounterManager.h +++ b/arangod/RocksDBEngine/RocksDBCounterManager.h @@ -28,6 +28,7 @@ #include "Basics/Common.h" #include "Basics/ReadWriteLock.h" #include "Basics/Result.h" +#include "RocksDBEngine/RocksDBCuckooIndexEstimator.h" #include "RocksDBEngine/RocksDBTypes.h" #include "VocBase/voc-types.h" @@ -78,8 +79,9 @@ class RocksDBCounterManager { /// the sequence number used void updateCounter(uint64_t objectId, CounterAdjustment const&); - //does not modify seq or revisionid - arangodb::Result setAbsoluteCounter(uint64_t objectId, uint64_t absouluteCount); + // does not modify seq or revisionid + arangodb::Result setAbsoluteCounter(uint64_t objectId, + uint64_t absouluteCount); /// Thread-Safe remove a counter void removeCounter(uint64_t objectId); @@ -87,7 +89,22 @@ class RocksDBCounterManager { /// Thread-Safe force sync arangodb::Result sync(bool force); - void readSettings(); + // Steal the index estimator that the recovery has built up to inject it into + // an index. + // NOTE: If this returns nullptr the recovery was not ably to find any + // estimator + // for this index. + std::unique_ptr> + stealIndexEstimator(uint64_t indexObjectId); + + // Free up all index estimators that were not read by any index. + // This is to save some memory. + // NOTE: After calling this the stored estimate of all not yet + // read index estimators will be dropped and no attempt + // to reread it will be done. + // So call it after ALL indexes for all databases + // have been created in memory. + void clearIndexEstimators(); protected: struct CMValue { @@ -105,6 +122,9 @@ class RocksDBCounterManager { }; void readCounterValues(); + void readSettings(); + void readIndexEstimates(); + bool parseRocksWAL(); ////////////////////////////////////////////////////////////////////////////// @@ -112,6 +132,17 @@ class RocksDBCounterManager { ////////////////////////////////////////////////////////////////////////////// std::unordered_map _counters; + ////////////////////////////////////////////////////////////////////////////// + /// @brief Index Estimator contianer. + /// Note the elements in this container will be moved into the + /// index classes and are only temporarily stored here during recovery. + ////////////////////////////////////////////////////////////////////////////// + std::unordered_map< + uint64_t, + std::pair>>> + _estimators; + ////////////////////////////////////////////////////////////////////////////// /// @brief synced sequence numbers ////////////////////////////////////////////////////////////////////////////// diff --git a/arangod/RocksDBEngine/RocksDBCuckooIndexEstimator.h b/arangod/RocksDBEngine/RocksDBCuckooIndexEstimator.h new file mode 100644 index 0000000000..070859baa1 --- /dev/null +++ b/arangod/RocksDBEngine/RocksDBCuckooIndexEstimator.h @@ -0,0 +1,617 @@ +//////////////////////////////////////////////////////////////////////////////// +/// DISCLAIMER +/// +/// Copyright 2017 ArangoDB GmbH, Cologne, Germany +/// +/// Licensed under the Apache License, Version 2.0 (the "License"); +/// you may not use this file except in compliance with the License. +/// You may obtain a copy of the License at +/// +/// http://www.apache.org/licenses/LICENSE-2.0 +/// +/// Unless required by applicable law or agreed to in writing, software +/// distributed under the License is distributed on an "AS IS" BASIS, +/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +/// See the License for the specific language governing permissions and +/// limitations under the License. +/// +/// Copyright holder is ArangoDB GmbH, Cologne, Germany +/// +/// @author Daniel Larkin +/// @author Michael Hackstein +//////////////////////////////////////////////////////////////////////////////// + +#ifndef ARANGOD_ROCKSDB_ROCKSDB_INDEX_ESTIMATOR_H +#define ARANGOD_ROCKSDB_ROCKSDB_INDEX_ESTIMATOR_H 1 + +#include "Basics/Common.h" +#include "Basics/ReadLocker.h" +#include "Basics/StringRef.h" +#include "Basics/WriteLocker.h" +#include "Basics/fasthash.h" + +#include "Logger/Logger.h" +#include "RocksDBEngine/RocksDBCommon.h" + +// In the following template: +// Key is the key type, it must be copyable and movable, furthermore, Key +// must be default constructible (without arguments) as empty and +// must have an empty() method to indicate that the instance is +// empty. +// If using fasthash64 on all bytes of the object is not +// a suitable hash function, one has to instanciate the template +// with two hash function types as 3rd and 4th argument. If +// std::equal_to is not implemented or does not behave correctly, +// one has to supply a comparison class as well. +// This class is not thread-safe! + +namespace arangodb { + +// C++ wrapper for the hash function: +template +class HashWithSeed { + public: + uint64_t operator()(T const& t) const { + // Some implementation like Fnv or xxhash looking at bytes in type T, + // taking the seed into account. + auto p = reinterpret_cast(&t); + return fasthash64(p, sizeof(T), Seed); + } +}; + +template , + class Fingerprint = HashWithSeed, + class HashShort = HashWithSeed, + class CompKey = std::equal_to> +class RocksDBCuckooIndexEstimator { + // Note that the following has to be a power of two and at least 4! + static constexpr uint32_t SlotsPerBucket = 4; + + private: + // Helper class to abstract away where which data is stored. + struct Slot { + private: + uint16_t* _data; + + public: + Slot(uint16_t* data) : _data(data) {} + + ~Slot() { + // Not responsible for anything + } + + bool operator==(const Slot& other) { return _data == other._data; } + + uint16_t* fingerprint() { return _data; } + + uint16_t* counter() { return _data + 1; } + + void reset() { + *fingerprint() = 0; + *counter() = 0; + } + + bool isEqual(uint16_t fp) { return ((*fingerprint()) == fp); } + + bool isEmpty() { return (*fingerprint()) == 0; } + }; + + enum SerializeFormat : char { + // To describe this format we use | as a seperator for readability, but it + // is NOT a printed character in the serialized string + // NOCOMPRESSION: type|length|size|nrUsed|nrCuckood|nrTotal|niceSize|logSize|base + NOCOMPRESSION = '0' + + }; + + public: + RocksDBCuckooIndexEstimator(uint64_t size) + : _randState(0x2636283625154737ULL), + _slotSize(2 * sizeof(uint16_t)), // Sort out offsets and alignments + _nrUsed(0), + _nrCuckood(0), + _nrTotal(0), + _maxRounds(16) { + // Inflate size so that we have some padding to avoid failure + size *= 2.0; + size = (size >= 1024) ? size : 1024; // want 256 buckets minimum + + // First find the smallest power of two that is not smaller than size: + size /= SlotsPerBucket; + _size = size; + initializeDefault(); + } + + RocksDBCuckooIndexEstimator(arangodb::StringRef const serialized) + : _randState(0x2636283625154737ULL), + _slotSize(2 * sizeof(uint16_t)), // Sort out offsets and alignments + _nrUsed(0), + _nrCuckood(0), + _nrTotal(0), + _maxRounds(16) { + switch (serialized.front()) { + case SerializeFormat::NOCOMPRESSION: { + deserializeUncompressed(serialized); + break; + } + default: { + LOG_TOPIC(ERR, arangodb::Logger::ENGINES) << "Unable to restore the " + "index estimates. Invalid " + "format persisted."; + initializeDefault(); + } + } + } + + ~RocksDBCuckooIndexEstimator() { delete[] _allocBase; } + + RocksDBCuckooIndexEstimator(RocksDBCuckooIndexEstimator const&) = delete; + RocksDBCuckooIndexEstimator(RocksDBCuckooIndexEstimator&&) = delete; + RocksDBCuckooIndexEstimator& operator=(RocksDBCuckooIndexEstimator const&) = + delete; + RocksDBCuckooIndexEstimator& operator=(RocksDBCuckooIndexEstimator&&) = + delete; + + void serialize(std::string& serialized) const { + // This format is always hard coded and the serialisation has to support + // older formats + // for backwards compatibility + // We always have to start with the type and then the length + serialized += SerializeFormat::NOCOMPRESSION; + + uint64_t serialLength = + (sizeof(SerializeFormat) + sizeof(uint64_t) + sizeof(_size) + sizeof(_nrUsed) + + sizeof(_nrCuckood) + sizeof(_nrTotal) + sizeof(_niceSize) + + sizeof(_logSize) + (_size * _slotSize * SlotsPerBucket)); + + serialized.reserve(sizeof(uint64_t) + serialLength); + // We always prepend the length, so parsing is easier + rocksutils::uint64ToPersistent(serialized, serialLength); + + { + // Sorry we need a consistent state, so we have to read-lock from here on... + READ_LOCKER(locker, _bucketLock); + // Add all member variables + rocksutils::uint64ToPersistent(serialized, _size); + rocksutils::uint64ToPersistent(serialized, _nrUsed); + rocksutils::uint64ToPersistent(serialized, _nrCuckood); + rocksutils::uint64ToPersistent(serialized, _nrTotal); + rocksutils::uint64ToPersistent(serialized, _niceSize); + rocksutils::uint64ToPersistent(serialized, _logSize); + + // Add the data blob + // Size is as follows: nrOfBuckets * SlotsPerBucket * SlotSize + TRI_ASSERT((_size * _slotSize * SlotsPerBucket) <= _allocSize); + + for (uint64_t i = 0; i < (_size * _slotSize * SlotsPerBucket) / sizeof(uint16_t); + ++i) { + rocksutils::uint16ToPersistent(serialized, *(reinterpret_cast(_base + i * 2))); + } + } + } + + void clear() { + WRITE_LOCKER(locker, _bucketLock); + // Reset Stats + _nrTotal = 0; + _nrCuckood = 0; + _nrUsed = 0; + + // Reset filter content + // Now initialize all slots in all buckets with zero data: + for (uint32_t b = 0; b < _size; ++b) { + for (size_t i = 0; i < SlotsPerBucket; ++i) { + Slot f = findSlot(b, i); + f.reset(); + } + } + } + + double computeEstimate() { + READ_LOCKER(locker, _bucketLock); + if (_nrTotal == 0) { + // If we do not have any documents we have a rather constant estimate. + return 1; + } + // _nrUsed; These are known to be distinct values + // _nrCuckood; These are eventually distinct documents with unknown state + return (double)(_nrUsed + ((double)_nrCuckood * 3 * _nrUsed / _nrTotal)) / + _nrTotal; + } + + bool lookup(Key const& k) const { + // look up a key, return either false if no pair with key k is + // found or true. + uint64_t hash1 = _hasherKey(k); + uint64_t pos1 = hashToPos(hash1); + uint16_t fingerprint = keyToFingerprint(k); + // We compute the second hash already here to allow the result to + // survive a mispredicted branch in the first loop. Is this sensible? + uint64_t hash2 = _hasherPosFingerprint(pos1, fingerprint); + uint64_t pos2 = hashToPos(hash2); + bool found = false; + { + READ_LOCKER(guard, _bucketLock); + findSlotNoCuckoo(pos1, pos2, fingerprint, found); + } + return found; + } + + bool insert(Key& k) { + // insert the key k + // + // The inserted key will have its fingerprint input entered in the table. If + // there is a collision and a fingerprint needs to be cuckooed, a certain + // number of attempts will be made. After that, a given fingerprint may + // simply be expunged. If something is expunged, the function will return + // false, otherwise true. + + uint64_t hash1 = _hasherKey(k); + uint64_t pos1 = hashToPos(hash1); + uint16_t fingerprint = keyToFingerprint(k); + // We compute the second hash already here to let it survive a + // mispredicted + // branch in the first loop: + uint64_t hash2 = _hasherPosFingerprint(pos1, fingerprint); + uint64_t pos2 = hashToPos(hash2); + + { + WRITE_LOCKER(guard, _bucketLock); + Slot slot = findSlotCuckoo(pos1, pos2, fingerprint); + if (slot.isEmpty()) { + // Free slot insert ourself. + *slot.fingerprint() = fingerprint; + *slot.counter() = 1; // We are the first element + _nrUsed++; + } else { + TRI_ASSERT(slot.isEqual(fingerprint)); + // TODO replace with constant uint16_t max + if (*slot.counter() < 65536) { + // just to avoid overflow... + (*slot.counter())++; + } + } + } + _nrTotal++; + return true; + } + + bool remove(Key const& k) { + // remove one element with key k, if one is in the table. Return true if + // a key was removed and false otherwise. + // look up a key, return either false if no pair with key k is + // found or true. + uint64_t hash1 = _hasherKey(k); + uint64_t pos1 = hashToPos(hash1); + uint16_t fingerprint = keyToFingerprint(k); + // We compute the second hash already here to allow the result to + // survive a mispredicted branch in the first loop. Is this sensible? + uint64_t hash2 = _hasherPosFingerprint(pos1, fingerprint); + uint64_t pos2 = hashToPos(hash2); + + bool found = false; + _nrTotal--; + { + WRITE_LOCKER(guard, _bucketLock); + Slot slot = findSlotNoCuckoo(pos1, pos2, fingerprint, found); + if (found) { + if (*slot.counter() <= 1) { + // We remove the last one of those, free slot + slot.reset(); + _nrUsed--; + } else { + // Just decrease the counter + (*slot.counter())--; + } + return true; + } + } + // If we get here we assume that the element was once inserted, but removed + // by cuckoo + // Reduce nrCuckood; + if (_nrCuckood > 0) { + --_nrCuckood; + } + return false; + } + + uint64_t capacity() const { return _size * SlotsPerBucket; } + + uint64_t nrUsed() const { return _nrUsed; } + + uint64_t nrCuckood() const { return _nrCuckood; } + + uint64_t memoryUsage() const { + return sizeof(RocksDBCuckooIndexEstimator) + _allocSize; + } + + private: // methods + Slot findSlotNoCuckoo(uint64_t pos1, uint64_t pos2, uint16_t fp, + bool& found) const { + found = false; + Slot s = findSlotNoCuckoo(pos1, fp, found); + if (found) { + return s; + } + // Not found by first hash. use second hash. + return findSlotNoCuckoo(pos2, fp, found); + } + + // Find a slot for this fingerprint + // This function guarantees the following: + // If this fingerprint is stored already, the slot will be + // pointing to this fingerprint. + // If this fingerprint is NOT storead already the returned slot + // will be empty and can be filled with the fingerprint + // In order to create an empty slot this function tries to + // cuckoo neighboring elements, if that does not succeed + // it deletes a random element occupying a position. + Slot findSlotCuckoo(uint64_t pos1, uint64_t pos2, uint16_t fp) { + Slot firstEmpty(nullptr); + bool foundEmpty = false; + + for (uint64_t i = 0; i < SlotsPerBucket; ++i) { + Slot slot = findSlot(pos1, i); + if (slot.isEqual(fp)) { + // Found we are done, short-circuit. + return slot; + } + if (!foundEmpty && slot.isEmpty()) { + foundEmpty = true; + firstEmpty = slot; + } + } + + for (uint64_t i = 0; i < SlotsPerBucket; ++i) { + Slot slot = findSlot(pos2, i); + if (slot.isEqual(fp)) { + // Found we are done, short-circuit. + return slot; + } + if (!foundEmpty && slot.isEmpty()) { + foundEmpty = true; + firstEmpty = slot; + } + } + + // Value not yet inserted. + + if (foundEmpty) { + // But we found an empty slot + return firstEmpty; + } + + // We also did not find an empty slot, now the cuckoo goes... + + uint16_t counter = + 0; // We initially write a 0 in here, because the caller will + // Increase the counter by one + uint8_t r = pseudoRandomChoice(); + if ((r & 1) != 0) { + std::swap(pos1, pos2); + } + + // Now expunge a random element from any of these slots: + // and place our own into it. + // We have to keep the reference to the cuckood slot here. + r = pseudoRandomChoice(); + uint64_t i = r & (SlotsPerBucket - 1); + firstEmpty = findSlot(pos1, i); + uint16_t fDummy = *firstEmpty.fingerprint(); + uint16_t cDummy = *firstEmpty.counter(); + *firstEmpty.fingerprint() = fp; + *firstEmpty.counter() = counter; + fp = fDummy; + counter = cDummy; + + uint64_t hash2 = _hasherPosFingerprint(pos1, fp); + pos2 = hashToPos(hash2); + + // Now let the cuckoo fly and find a place for the poor one we just took + // out. + for (uint64_t i = 0; i < SlotsPerBucket; ++i) { + Slot slot = findSlot(pos2, i); + if (slot.isEmpty()) { + // Yeah we found an empty place already + *slot.fingerprint() = fp; + *slot.counter() = counter; + ++_nrUsed; + return firstEmpty; + } + } + + // Bad luck, let us try to move to a different slot. + for (unsigned attempt = 1; attempt < _maxRounds; attempt++) { + std::swap(pos1, pos2); + // Now expunge a random element from any of these slots: + r = pseudoRandomChoice(); + uint64_t i = r & (SlotsPerBucket - 1); + // We expunge the element at position pos1 and slot i: + Slot slot = findSlot(pos1, i); + if (slot == firstEmpty) { + // We have to keep this one in place. + // Take a different one + i = (i + 1) % SlotsPerBucket; + slot = findSlot(pos1, i); + } + fDummy = *slot.fingerprint(); + cDummy = *slot.counter(); + *slot.fingerprint() = fp; + *slot.counter() = counter; + fp = fDummy; + counter = cDummy; + + hash2 = _hasherPosFingerprint(pos1, fp); + pos2 = hashToPos(hash2); + + for (uint64_t i = 0; i < SlotsPerBucket; ++i) { + Slot slot = findSlot(pos2, i); + if (slot.isEmpty()) { + // Finally an empty place + *slot.fingerprint() = fp; + *slot.counter() = counter; + ++_nrUsed; + return firstEmpty; + } + } + } + // If we get here we had to remove one of the elements. + // Let's increas the cuckoo counter + _nrCuckood++; + return firstEmpty; + } + + // Do not use the output if found == false + Slot findSlotNoCuckoo(uint64_t pos, uint16_t fp, bool& found) const { + found = false; + for (uint64_t i = 0; i < SlotsPerBucket; ++i) { + Slot slot = findSlot(pos, i); + if (fp == *slot.fingerprint()) { + found = true; + return slot; + } + } + return Slot{nullptr}; + } + + Slot findSlot(uint64_t pos, uint64_t slot) const { + char* address = _base + _slotSize * (pos * SlotsPerBucket + slot); + auto ret = reinterpret_cast(address); + return Slot(ret); + } + + uint64_t hashToPos(uint64_t hash) const { + uint64_t relevantBits = (hash >> _sizeShift) & _sizeMask; + return ((relevantBits < _size) ? relevantBits : (relevantBits - _size)); + } + + uint16_t keyToFingerprint(Key const& k) const { + uint64_t hash = _fingerprint(k); + uint16_t fingerprint = (uint16_t)( + (hash ^ (hash >> 16) ^ (hash >> 32) ^ (hash >> 48)) & 0xFFFF); + return (fingerprint ? fingerprint : 1); + } + + uint64_t _hasherPosFingerprint(uint64_t pos, uint16_t fingerprint) const { + return ((pos << _sizeShift) ^ _hasherShort(fingerprint)); + } + + uint8_t pseudoRandomChoice() { + _randState = _randState * 997 + 17; // ignore overflows + return static_cast((_randState >> 37) & 0xff); + } + + void deserializeUncompressed(arangodb::StringRef const& serialized) { + // Assert that we have at least the member variables + TRI_ASSERT(serialized.size() >= (sizeof(SerializeFormat) + sizeof(uint64_t) + sizeof(_size) + sizeof(_nrUsed) + + sizeof(_nrCuckood) + sizeof(_nrTotal) + sizeof(_niceSize) + + sizeof(_logSize) )); + char const* current = serialized.data(); + TRI_ASSERT(*current == SerializeFormat::NOCOMPRESSION); + current++; // Skip format char + + uint64_t length = rocksutils::uint64FromPersistent(current); + current += sizeof(uint64_t); + // Validate that the serialized format is exactly as long as we expect it to be + TRI_ASSERT(serialized.size() == length); + + _size = rocksutils::uint64FromPersistent(current); + current += sizeof(_size); + + _nrUsed = rocksutils::uint64FromPersistent(current); + current += sizeof(_nrUsed); + + _nrCuckood = rocksutils::uint64FromPersistent(current); + current += sizeof(_nrCuckood); + + _nrTotal = rocksutils::uint64FromPersistent(current); + current += sizeof(_nrTotal); + + _niceSize = rocksutils::uint64FromPersistent(current); + current += sizeof(_niceSize); + + _logSize = rocksutils::uint64FromPersistent(current); + current += sizeof(_logSize); + + deriveSizesAndAlloc(); + + + // Validate that we have enough data in the serialized format. + TRI_ASSERT(serialized.size() == + (sizeof(SerializeFormat) + sizeof( uint64_t) + sizeof(_size) + sizeof(_nrUsed) + + sizeof(_nrCuckood) + sizeof(_nrTotal) + sizeof(_niceSize) + + sizeof(_logSize) + (_size * _slotSize * SlotsPerBucket))); + + // Insert the raw data + // Size is as follows: nrOfBuckets * SlotsPerBucket * SlotSize + TRI_ASSERT((_size * _slotSize * SlotsPerBucket) <= _allocSize); + + for (uint64_t i = 0; i < (_size * _slotSize * SlotsPerBucket) / sizeof(uint16_t); + ++i) { + *(reinterpret_cast(_base + i * 2)) = rocksutils::uint16FromPersistent(current + (i * sizeof(uint16_t))); + } + } + + void initializeDefault() { + _niceSize = 256; + _logSize = 8; + while (_niceSize < _size) { + _niceSize <<= 1; + _logSize += 1; + } + + deriveSizesAndAlloc(); + + // Now initialize all slots in all buckets with zero data: + for (uint32_t b = 0; b < _size; ++b) { + for (size_t i = 0; i < SlotsPerBucket; ++i) { + Slot f = findSlot(b, i); + f.reset(); + } + } + } + + void deriveSizesAndAlloc() { + _sizeMask = _niceSize - 1; + _sizeShift = (64 - _logSize) / 2; + _allocSize = _size * _slotSize * SlotsPerBucket + + 64; // give 64 bytes padding to enable 64-byte alignment + _allocBase = new char[_allocSize]; + + _base = reinterpret_cast( + (reinterpret_cast(_allocBase) + 63) & + ~((uintptr_t)0x3fu)); // to actually implement the 64-byte alignment, + // shift base pointer within allocated space to + // 64-byte boundary + } + + private: // member variables + uint64_t _randState; // pseudo random state for expunging + + size_t _slotSize; // total size of a slot + + uint64_t _logSize; // logarithm (base 2) of number of buckets + uint64_t _size; // actual number of buckets + uint64_t _niceSize; // smallest power of 2 at least number of buckets, == + // 2^_logSize + uint64_t _sizeMask; // used to mask out some bits from the hash + uint32_t _sizeShift; // used to shift the bits down to get a position + uint64_t _allocSize; // number of allocated bytes, + // == _size * SlotsPerBucket * _slotSize + 64 + char* _base; // pointer to allocated space, 64-byte aligned + char* _allocBase; // base of original allocation + uint64_t _nrUsed; // number of pairs stored in the table + uint64_t _nrCuckood; // number of elements that have been removed by cuckoo + uint64_t _nrTotal; // number of elements included in total + unsigned _maxRounds; // maximum number of cuckoo rounds on insertion + + HashKey _hasherKey; // Instance to compute the first hash function + Fingerprint _fingerprint; // Instance to compute a fingerprint of a key + HashShort _hasherShort; // Instance to compute the second hash function + + arangodb::basics::ReadWriteLock mutable _bucketLock; +}; + +} // namespace arangodb + +#endif diff --git a/arangod/RocksDBEngine/RocksDBEdgeIndex.cpp b/arangod/RocksDBEngine/RocksDBEdgeIndex.cpp index fee57acc30..52f882e601 100644 --- a/arangod/RocksDBEngine/RocksDBEdgeIndex.cpp +++ b/arangod/RocksDBEngine/RocksDBEdgeIndex.cpp @@ -39,6 +39,7 @@ #include "RocksDBEngine/RocksDBCollection.h" #include "RocksDBEngine/RocksDBCommon.h" +#include "RocksDBEngine/RocksDBCounterManager.h" #include "RocksDBEngine/RocksDBKey.h" #include "RocksDBEngine/RocksDBKeyBounds.h" #include "RocksDBEngine/RocksDBToken.h" @@ -116,15 +117,17 @@ bool RocksDBEdgeIndexIterator::next(TokenCallback const& cb, size_t limit) { // acquire RocksDB collection RocksDBToken token; - auto iterateChachedValues = [this,&cb,&limit,&token](){ - //LOG_TOPIC(ERR, Logger::FIXME) << "value found in cache "; + auto iterateCachedValues = [this,&cb,&limit,&token](){ while(_arrayIterator.valid()){ - StringRef edgeKey(_arrayIterator.value()); - if(lookupDocumentAndUseCb(edgeKey, cb, limit, token, true)){ - _arrayIterator++; - return true; // more documents - function will be re-entered + VPackSlice edgeKey(_arrayIterator.value()); + + cb(RocksDBToken(edgeKey.getUInt())); + --limit; + ++_arrayIterator; + if (limit == 0) { + _doUpdateArrayIterator=false; //limit hit continue with next batch + return true; } - _arrayIterator++; } //reset cache iterator before handling next from/to @@ -137,7 +140,6 @@ bool RocksDBEdgeIndexIterator::next(TokenCallback const& cb, size_t limit) { TRI_ASSERT(limit > 0); StringRef fromTo = getFromToFromIterator(_keysIterator); bool foundInCache = false; - //LOG_TOPIC(ERR, Logger::FIXME) << "fromTo" << fromTo; if (_useCache && _doUpdateArrayIterator){ // try to find cached value @@ -162,7 +164,7 @@ bool RocksDBEdgeIndexIterator::next(TokenCallback const& cb, size_t limit) { _arrayIterator = VPackArrayIterator(_arraySlice); // iterate until batch size limit is hit - bool continueWithNextBatch = iterateChachedValues(); + bool continueWithNextBatch = iterateCachedValues(); if(continueWithNextBatch){ return true; // exit and continue with next batch } @@ -171,7 +173,7 @@ bool RocksDBEdgeIndexIterator::next(TokenCallback const& cb, size_t limit) { // resuming old iterator foundInCache = true; // do not look up key again! _doUpdateArrayIterator = true; - bool continueWithNextBatch = iterateChachedValues(); + bool continueWithNextBatch = iterateCachedValues(); if(continueWithNextBatch){ return true; // exit and continue with next batch } @@ -197,17 +199,18 @@ bool RocksDBEdgeIndexIterator::next(TokenCallback const& cb, size_t limit) { while (_iterator->Valid() && (_index->_cmp->Compare(_iterator->key(), _bounds.end()) < 0)) { StringRef edgeKey = RocksDBKey::primaryKey(_iterator->key()); + + // lookup real document + bool continueWithNextBatch = lookupDocumentAndUseCb(edgeKey, cb, limit, token, false); // build cache value for from/to if(_useCache){ if (_cacheValueSize <= cacheValueSizeLimit){ - _cacheValueBuilder.add(VPackValue(std::string(edgeKey.data(),edgeKey.size()))); + _cacheValueBuilder.add(VPackValue(token.revisionId())); ++_cacheValueSize; } } - // lookup real document - bool continueWithNextBatch = lookupDocumentAndUseCb(edgeKey, cb, limit, token, false); _iterator->Next(); //check batch size limit if(continueWithNextBatch){ @@ -272,19 +275,31 @@ void RocksDBEdgeIndexIterator::reset() { // ============================= Index ==================================== +uint64_t RocksDBEdgeIndex::HashForKey(const rocksdb::Slice& key) { + std::hash hasher; + // NOTE: This function needs to use the same hashing on the + // indexed VPack as the initial inserter does + StringRef tmp = RocksDBKey::vertexId(key); + return static_cast(hasher(tmp)); +} + RocksDBEdgeIndex::RocksDBEdgeIndex(TRI_idx_iid_t iid, arangodb::LogicalCollection* collection, VPackSlice const& info, std::string const& attr) - : RocksDBIndex(iid - ,collection - ,std::vector>({{AttributeName(attr, false)}}) - ,false // unique - ,false // sparse - ,basics::VelocyPackHelper::stringUInt64(info, "objectId") - ,!ServerState::instance()->isCoordinator() // useCache - ) - , _directionAttr(attr) { + : RocksDBIndex(iid, collection, std::vector>( + {{AttributeName(attr, false)}}), + false, false, + basics::VelocyPackHelper::stringUInt64(info, "objectId"), + !ServerState::instance()->isCoordinator() /*useCache*/ + ), + _directionAttr(attr), + _estimator(nullptr) { + if (!ServerState::instance()->isCoordinator()) { + // We activate the estimator only on DBServers + _estimator = std::make_unique>(RocksDBIndex::ESTIMATOR_SIZE); + TRI_ASSERT(_estimator != nullptr); + } TRI_ASSERT(iid != 0); TRI_ASSERT(_objectId != 0); // if we never hit the assertions we need to remove the @@ -306,25 +321,11 @@ double RocksDBEdgeIndex::selectivityEstimate( return 0.1; } - if (attribute != nullptr) { - // the index attribute is given here - // now check if we can restrict the selectivity estimation to the correct - // part of the index - if (attribute->compare(_directionAttr) == 0) { - // _from - return 0.2; //_edgesFrom->selectivity(); - } else { - return 0; - } - // other attribute. now return the average selectivity + if (attribute != nullptr && attribute->compare(_directionAttr)) { + return 0; } - - // return average selectivity of the two index parts - // double estimate = (_edgesFrom->selectivity() + _edgesTo->selectivity()) * - // 0.5; - // TRI_ASSERT(estimate >= 0.0 && - // estimate <= 1.00001); // floating-point tolerance - return 0.1; + TRI_ASSERT(_estimator != nullptr); + return _estimator->computeEstimate(); } /// @brief return the memory usage for the index @@ -354,7 +355,7 @@ int RocksDBEdgeIndex::insert(transaction::Methods* trx, VPackSlice primaryKey = doc.get(StaticStrings::KeyString); VPackSlice fromTo = doc.get(_directionAttr); TRI_ASSERT(primaryKey.isString() && fromTo.isString()); - auto fromToRef=StringRef(fromTo); + auto fromToRef = StringRef(fromTo); RocksDBKey key = RocksDBKey::EdgeIndexValue(_objectId, fromToRef, StringRef(primaryKey)); //blacklist key in cache @@ -367,6 +368,9 @@ int RocksDBEdgeIndex::insert(transaction::Methods* trx, rocksdb::Status status = rtrx->Put(rocksdb::Slice(key.string()), rocksdb::Slice()); if (status.ok()) { + std::hash hasher; + uint64_t hash = static_cast(hasher(fromToRef)); + _estimator->insert(hash); return TRI_ERROR_NO_ERROR; } else { return rocksutils::convertStatus(status).errorNumber(); @@ -383,7 +387,7 @@ int RocksDBEdgeIndex::remove(transaction::Methods* trx, bool isRollback) { VPackSlice primaryKey = doc.get(StaticStrings::KeyString); VPackSlice fromTo = doc.get(_directionAttr); - auto fromToRef=StringRef(fromTo); + auto fromToRef = StringRef(fromTo); TRI_ASSERT(primaryKey.isString() && fromTo.isString()); RocksDBKey key = RocksDBKey::EdgeIndexValue(_objectId, fromToRef, StringRef(primaryKey)); @@ -396,6 +400,9 @@ int RocksDBEdgeIndex::remove(transaction::Methods* trx, rocksdb::Transaction* rtrx = state->rocksTransaction(); rocksdb::Status status = rtrx->Delete(rocksdb::Slice(key.string())); if (status.ok()) { + std::hash hasher; + uint64_t hash = static_cast(hasher(fromToRef)); + _estimator->remove(hash); return TRI_ERROR_NO_ERROR; } else { return rocksutils::convertStatus(status).errorNumber(); @@ -618,10 +625,48 @@ int RocksDBEdgeIndex::cleanup() { return TRI_ERROR_NO_ERROR; } +void RocksDBEdgeIndex::serializeEstimate(std::string& output) const { + TRI_ASSERT(_estimator != nullptr); + _estimator->serialize(output); +} + +bool RocksDBEdgeIndex::deserializeEstimate(RocksDBCounterManager* mgr) { + TRI_ASSERT(!ServerState::instance()->isCoordinator()); + // We simply drop the current estimator and steal the one from recovery + // We are than save for resizing issues in our _estimator format + // and will use the old size. + + TRI_ASSERT(mgr != nullptr); + auto tmp = mgr->stealIndexEstimator(_objectId); + if (tmp == nullptr) { + // We expected to receive a stored index estimate, however we got none. + // We use the freshly created estimator but have to recompute it. + return false; + } + _estimator.swap(tmp); + TRI_ASSERT(_estimator != nullptr); + return true; +} + + +void RocksDBEdgeIndex::recalculateEstimates() { + TRI_ASSERT(_estimator != nullptr); + _estimator->clear(); + + auto bounds = RocksDBKeyBounds::EdgeIndex(_objectId); + rocksutils::iterateBounds(bounds, [&](rocksdb::Iterator* it) { + uint64_t hash = RocksDBEdgeIndex::HashForKey(it->key()); + _estimator->insert(hash); + }); +} + Result RocksDBEdgeIndex::postprocessRemove(transaction::Methods* trx, rocksdb::Slice const& key, rocksdb::Slice const& value) { //blacklist keys during truncate blackListKey(key.data(), key.size()); + + uint64_t hash = RocksDBEdgeIndex::HashForKey(key); + _estimator->remove(hash); return {TRI_ERROR_NO_ERROR}; } diff --git a/arangod/RocksDBEngine/RocksDBEdgeIndex.h b/arangod/RocksDBEngine/RocksDBEdgeIndex.h index fa9865b124..fbae9f8b05 100644 --- a/arangod/RocksDBEngine/RocksDBEdgeIndex.h +++ b/arangod/RocksDBEngine/RocksDBEdgeIndex.h @@ -27,6 +27,7 @@ #include "Basics/Common.h" #include "Indexes/Index.h" #include "Indexes/IndexIterator.h" +#include "RocksDBEngine/RocksDBCuckooIndexEstimator.h" #include "RocksDBEngine/RocksDBIndex.h" #include "RocksDBEngine/RocksDBKey.h" #include "RocksDBEngine/RocksDBKeyBounds.h" @@ -85,6 +86,8 @@ class RocksDBEdgeIndex final : public RocksDBIndex { friend class RocksDBEdgeIndexIterator; public: + static uint64_t HashForKey(const rocksdb::Slice& key); + RocksDBEdgeIndex() = delete; RocksDBEdgeIndex(TRI_idx_iid_t, arangodb::LogicalCollection*, @@ -156,6 +159,13 @@ class RocksDBEdgeIndex final : public RocksDBIndex { arangodb::velocypack::Builder&) const override; int cleanup() override; + void serializeEstimate(std::string& output) const override; + + bool deserializeEstimate(arangodb::RocksDBCounterManager* mgr) override; + + void recalculateEstimates() override; + + private: /// @brief create the iterator IndexIterator* createEqIterator(transaction::Methods*, ManagedDocumentResult*, @@ -171,6 +181,12 @@ class RocksDBEdgeIndex final : public RocksDBIndex { arangodb::aql::AstNode const* valNode) const; std::string _directionAttr; + + /// @brief A fixed size library to estimate the selectivity of the index. + /// On insertion of a document we have to insert it into the estimator, + /// On removal we have to remove it in the estimator as well. + std::unique_ptr> _estimator; + }; } // namespace arangodb diff --git a/arangod/RocksDBEngine/RocksDBEngine.cpp b/arangod/RocksDBEngine/RocksDBEngine.cpp index b26fbf1b26..c6241a3380 100644 --- a/arangod/RocksDBEngine/RocksDBEngine.cpp +++ b/arangod/RocksDBEngine/RocksDBEngine.cpp @@ -290,6 +290,21 @@ void RocksDBEngine::stop() { return; } replicationManager()->dropAll(); + + if (_backgroundThread) { + // stop the press + _backgroundThread->beginShutdown(); + + if (_counterManager) { + _counterManager->sync(true); + } + + // wait until background thread stops + while (_backgroundThread->isRunning()) { + usleep(10000); + } + _backgroundThread.reset(); + } } void RocksDBEngine::unprepare() { @@ -298,15 +313,6 @@ void RocksDBEngine::unprepare() { } if (_db) { - if (_backgroundThread && _backgroundThread->isRunning()) { - // stop the press - _backgroundThread->beginShutdown(); - _backgroundThread.reset(); - } - if (_counterManager) { - _counterManager->sync(true); - } - // now prune all obsolete WAL files determinePrunableWalFiles(0); pruneWalFiles(); @@ -1206,7 +1212,7 @@ TRI_vocbase_t* RocksDBEngine::openExistingDatabase(TRI_voc_tick_t id, VPackSlice slice = builder.slice(); TRI_ASSERT(slice.isArray()); - + for (auto const& it : VPackArrayIterator(slice)) { // we found a collection that is still active TRI_ASSERT(!it.get("id").isNone() || !it.get("cid").isNone()); @@ -1222,6 +1228,7 @@ TRI_vocbase_t* RocksDBEngine::openExistingDatabase(TRI_voc_tick_t id, static_cast(collection->getPhysical()); TRI_ASSERT(physical != nullptr); + physical->deserializeIndexEstimates(counterManager()); LOG_TOPIC(DEBUG, arangodb::Logger::FIXME) << "added document collection '" << collection->name() << "'"; } diff --git a/arangod/RocksDBEngine/RocksDBHashIndex.h b/arangod/RocksDBEngine/RocksDBHashIndex.h index 4c8e89f267..7682c2010e 100644 --- a/arangod/RocksDBEngine/RocksDBHashIndex.h +++ b/arangod/RocksDBEngine/RocksDBHashIndex.h @@ -24,6 +24,7 @@ #ifndef ARANGOD_ROCKSDB_ROCKSDB_HASH_INDEX_H #define ARANGOD_ROCKSDB_ROCKSDB_HASH_INDEX_H 1 +#include "Basics/VelocyPackHelper.h" #include "RocksDBEngine/RocksDBVPackIndex.h" namespace arangodb { @@ -44,6 +45,7 @@ class RocksDBHashIndex final : public RocksDBVPackIndex { bool matchesDefinition(VPackSlice const& info) const override; bool isSorted() const override { return true; } + }; } diff --git a/arangod/RocksDBEngine/RocksDBIndex.cpp b/arangod/RocksDBEngine/RocksDBIndex.cpp index ec42f2619b..0c32559cb3 100644 --- a/arangod/RocksDBEngine/RocksDBIndex.cpp +++ b/arangod/RocksDBEngine/RocksDBIndex.cpp @@ -38,6 +38,12 @@ using namespace arangodb; using namespace arangodb::rocksutils; +// This is the number of distinct elements the index estimator can reliably store +// This correlates directly with the memmory of the estimator: +// memmory == ESTIMATOR_SIZE * 6 bytes + +uint64_t const arangodb::RocksDBIndex::ESTIMATOR_SIZE = 4096; + RocksDBIndex::RocksDBIndex( TRI_idx_iid_t id, LogicalCollection* collection, std::vector> const& attributes, @@ -162,6 +168,22 @@ int RocksDBIndex::drop() { return TRI_ERROR_NO_ERROR; } +void RocksDBIndex::serializeEstimate(std::string&) const { + // All indexes that do not have an estimator do not serialize anything. +} + +bool RocksDBIndex::deserializeEstimate(RocksDBCounterManager*) { + // All indexes that do not have an estimator do not deserialize anything. + // So the estimate is always recreatable. + // We do not advance anything here. + return true; +} + +void RocksDBIndex::recalculateEstimates() { + // Nothing to do. + return; +} + void RocksDBIndex::truncate(transaction::Methods* trx) { RocksDBTransactionState* state = rocksutils::toRocksTransactionState(trx); rocksdb::Transaction* rtrx = state->rocksTransaction(); diff --git a/arangod/RocksDBEngine/RocksDBIndex.h b/arangod/RocksDBEngine/RocksDBIndex.h index d626c449b9..dbebfcf13b 100644 --- a/arangod/RocksDBEngine/RocksDBIndex.h +++ b/arangod/RocksDBEngine/RocksDBIndex.h @@ -41,8 +41,16 @@ class Cache; } class LogicalCollection; class RocksDBComparator; +class RocksDBCounterManager; class RocksDBIndex : public Index { + + protected: + // This is the number of distinct elements the index estimator can reliably store + // This correlates directly with the memmory of the estimator: + // memmory == ESTIMATOR_SIZE * 6 bytes + static uint64_t const ESTIMATOR_SIZE; + protected: RocksDBIndex(TRI_idx_iid_t, LogicalCollection*, std::vector> const& @@ -89,6 +97,12 @@ class RocksDBIndex : public Index { void createCache(); void disableCache(); + virtual void serializeEstimate(std::string& output) const; + + virtual bool deserializeEstimate(RocksDBCounterManager* mgr); + + virtual void recalculateEstimates(); + protected: // Will be called during truncate to allow the index to update selectivity // estimates, blacklist keys, etc. diff --git a/arangod/RocksDBEngine/RocksDBKey.cpp b/arangod/RocksDBEngine/RocksDBKey.cpp index 5e7fd93c6c..2296cadd50 100644 --- a/arangod/RocksDBEngine/RocksDBKey.cpp +++ b/arangod/RocksDBEngine/RocksDBKey.cpp @@ -111,6 +111,9 @@ RocksDBKey RocksDBKey::ReplicationApplierConfig(TRI_voc_tick_t databaseId) { return RocksDBKey(RocksDBEntryType::ReplicationApplierConfig, databaseId); } +RocksDBKey RocksDBKey::IndexEstimateValue(uint64_t collectionObjectId) { + return RocksDBKey(RocksDBEntryType::IndexEstimateValue, collectionObjectId); +} // ========================= Member methods =========================== RocksDBEntryType RocksDBKey::type(RocksDBKey const& key) { @@ -168,12 +171,11 @@ arangodb::StringRef RocksDBKey::primaryKey(RocksDBKey const& key) { arangodb::StringRef RocksDBKey::primaryKey(rocksdb::Slice const& slice) { return primaryKey(slice.data(), slice.size()); } - -std::string RocksDBKey::vertexId(RocksDBKey const& key) { +StringRef RocksDBKey::vertexId(RocksDBKey const& key) { return vertexId(key._buffer.data(), key._buffer.size()); } -std::string RocksDBKey::vertexId(rocksdb::Slice const& slice) { +StringRef RocksDBKey::vertexId(rocksdb::Slice const& slice) { return vertexId(slice.data(), slice.size()); } @@ -214,6 +216,7 @@ RocksDBKey::RocksDBKey(RocksDBEntryType type, uint64_t first) switch (_type) { case RocksDBEntryType::Database: case RocksDBEntryType::CounterValue: + case RocksDBEntryType::IndexEstimateValue: case RocksDBEntryType::ReplicationApplierConfig: { size_t length = sizeof(char) + sizeof(uint64_t); _buffer.reserve(length); @@ -456,7 +459,7 @@ arangodb::StringRef RocksDBKey::primaryKey(char const* data, size_t size) { } } -std::string RocksDBKey::vertexId(char const* data, size_t size) { +StringRef RocksDBKey::vertexId(char const* data, size_t size) { TRI_ASSERT(data != nullptr); TRI_ASSERT(size >= sizeof(char)); RocksDBEntryType type = static_cast(data[0]); @@ -466,7 +469,7 @@ std::string RocksDBKey::vertexId(char const* data, size_t size) { size_t keySize = static_cast(data[size - 1]); size_t idSize = size - (sizeof(char) + sizeof(uint64_t) + sizeof(char) + keySize + sizeof(uint8_t)); - return std::string(data + sizeof(char) + sizeof(uint64_t), idSize); + return StringRef(data + sizeof(char) + sizeof(uint64_t), idSize); } default: diff --git a/arangod/RocksDBEngine/RocksDBKey.h b/arangod/RocksDBEngine/RocksDBKey.h index 29feaaa63a..dbe9fa8b68 100644 --- a/arangod/RocksDBEngine/RocksDBKey.h +++ b/arangod/RocksDBEngine/RocksDBKey.h @@ -135,6 +135,12 @@ class RocksDBKey { ////////////////////////////////////////////////////////////////////////////// static RocksDBKey ReplicationApplierConfig(TRI_voc_tick_t databaseId); + ////////////////////////////////////////////////////////////////////////////// + /// @brief Create a fully-specified key for index estimate values of + /// a collection + ////////////////////////////////////////////////////////////////////////////// + static RocksDBKey IndexEstimateValue(uint64_t objectId); + public: ////////////////////////////////////////////////////////////////////////////// /// @brief Extracts the type from a key @@ -211,8 +217,8 @@ class RocksDBKey { /// /// May be called only on EdgeIndexValue keys. Other types will throw. ////////////////////////////////////////////////////////////////////////////// - static std::string vertexId(RocksDBKey const&); - static std::string vertexId(rocksdb::Slice const&); + static StringRef vertexId(RocksDBKey const&); + static StringRef vertexId(rocksdb::Slice const&); ////////////////////////////////////////////////////////////////////////////// /// @brief Extracts the indexed VelocyPack values from a key @@ -264,7 +270,7 @@ class RocksDBKey { static TRI_voc_cid_t viewId(char const* data, size_t size); static TRI_voc_rid_t revisionId(char const* data, size_t size); static StringRef primaryKey(char const* data, size_t size); - static std::string vertexId(char const* data, size_t size); + static StringRef vertexId(char const* data, size_t size); static VPackSlice indexedVPack(char const* data, size_t size); private: diff --git a/arangod/RocksDBEngine/RocksDBKeyBounds.cpp b/arangod/RocksDBEngine/RocksDBKeyBounds.cpp index b15d53500f..b739c61ff8 100644 --- a/arangod/RocksDBEngine/RocksDBKeyBounds.cpp +++ b/arangod/RocksDBEngine/RocksDBKeyBounds.cpp @@ -119,6 +119,10 @@ RocksDBKeyBounds RocksDBKeyBounds::CounterValues() { return RocksDBKeyBounds(RocksDBEntryType::CounterValue); } +RocksDBKeyBounds RocksDBKeyBounds::IndexEstimateValues() { + return RocksDBKeyBounds(RocksDBEntryType::IndexEstimateValue); +} + RocksDBKeyBounds RocksDBKeyBounds::FulltextIndexPrefix( uint64_t indexId, arangodb::StringRef const& word) { // I did not want to pass a bool to the constructor for this @@ -203,7 +207,8 @@ RocksDBKeyBounds::RocksDBKeyBounds(RocksDBEntryType type) break; } - case RocksDBEntryType::CounterValue: { + case RocksDBEntryType::CounterValue: + case RocksDBEntryType::IndexEstimateValue: { size_t length = sizeof(char) + sizeof(uint64_t); _startBuffer.reserve(length); _startBuffer.push_back(static_cast(_type)); @@ -214,7 +219,6 @@ RocksDBKeyBounds::RocksDBKeyBounds(RocksDBEntryType type) uint64ToPersistent(_endBuffer, UINT64_MAX); break; } - default: THROW_ARANGO_EXCEPTION(TRI_ERROR_BAD_PARAMETER); } diff --git a/arangod/RocksDBEngine/RocksDBKeyBounds.h b/arangod/RocksDBEngine/RocksDBKeyBounds.h index 0f21913af1..0b8a530421 100644 --- a/arangod/RocksDBEngine/RocksDBKeyBounds.h +++ b/arangod/RocksDBEngine/RocksDBKeyBounds.h @@ -123,6 +123,11 @@ class RocksDBKeyBounds { ////////////////////////////////////////////////////////////////////////////// static RocksDBKeyBounds CounterValues(); + ////////////////////////////////////////////////////////////////////////////// + /// @brief Bounds for all index estimate values + ////////////////////////////////////////////////////////////////////////////// + static RocksDBKeyBounds IndexEstimateValues(); + ////////////////////////////////////////////////////////////////////////////// /// @brief Bounds for all entries of a fulltext index, matching prefixes ////////////////////////////////////////////////////////////////////////////// diff --git a/arangod/RocksDBEngine/RocksDBTypes.cpp b/arangod/RocksDBEngine/RocksDBTypes.cpp index f835e55a0b..a4e82f1c13 100644 --- a/arangod/RocksDBEngine/RocksDBTypes.cpp +++ b/arangod/RocksDBEngine/RocksDBTypes.cpp @@ -103,6 +103,12 @@ static rocksdb::Slice ReplicationApplierConfig( reinterpret_cast::type*>( &replicationApplierConfig), 1); + +static RocksDBEntryType indexEstimateValue = RocksDBEntryType::IndexEstimateValue; +static rocksdb::Slice IndexEstimateValue( + reinterpret_cast::type*>( + &indexEstimateValue), + 1); } char const* arangodb::rocksDBEntryTypeName(arangodb::RocksDBEntryType type) { @@ -120,6 +126,7 @@ char const* arangodb::rocksDBEntryTypeName(arangodb::RocksDBEntryType type) { case arangodb::RocksDBEntryType::ReplicationApplierConfig: return "ReplicationApplierConfig"; case arangodb::RocksDBEntryType::FulltextIndexValue: return "FulltextIndexValue"; case arangodb::RocksDBEntryType::GeoIndexValue: return "GeoIndexValue"; + case arangodb::RocksDBEntryType::IndexEstimateValue: return "IndexEstimateValue"; } return "Invalid"; } @@ -175,6 +182,8 @@ rocksdb::Slice const& arangodb::rocksDBSlice(RocksDBEntryType const& type) { return SettingsValue; case RocksDBEntryType::ReplicationApplierConfig: return ReplicationApplierConfig; + case RocksDBEntryType::IndexEstimateValue: + return IndexEstimateValue; } return Document; // avoids warning - errorslice instead ?! diff --git a/arangod/RocksDBEngine/RocksDBTypes.h b/arangod/RocksDBEngine/RocksDBTypes.h index 996ef7d5f2..6ae3f05eaa 100644 --- a/arangod/RocksDBEngine/RocksDBTypes.h +++ b/arangod/RocksDBEngine/RocksDBTypes.h @@ -48,7 +48,8 @@ enum class RocksDBEntryType : char { SettingsValue = '9', ReplicationApplierConfig = ':', FulltextIndexValue = ';', - GeoIndexValue = '<' + GeoIndexValue = '<', + IndexEstimateValue = '=' }; char const* rocksDBEntryTypeName(RocksDBEntryType); diff --git a/arangod/RocksDBEngine/RocksDBVPackIndex.cpp b/arangod/RocksDBEngine/RocksDBVPackIndex.cpp index 81bd657547..4d672d1ad9 100644 --- a/arangod/RocksDBEngine/RocksDBVPackIndex.cpp +++ b/arangod/RocksDBEngine/RocksDBVPackIndex.cpp @@ -34,6 +34,7 @@ #include "RocksDBEngine/RocksDBCollection.h" #include "RocksDBEngine/RocksDBCommon.h" #include "RocksDBEngine/RocksDBComparator.h" +#include "RocksDBEngine/RocksDBCounterManager.h" #include "RocksDBEngine/RocksDBPrimaryIndex.h" #include "RocksDBEngine/RocksDBToken.h" #include "RocksDBEngine/RocksDBTransactionState.h" @@ -157,13 +158,27 @@ bool RocksDBVPackIndexIterator::next(TokenCallback const& cb, size_t limit) { return true; } +uint64_t RocksDBVPackIndex::HashForKey(const rocksdb::Slice& key) { + // NOTE: This function needs to use the same hashing on the + // indexed VPack as the initial inserter does + VPackSlice tmp = RocksDBKey::indexedVPack(key); + return tmp.normalizedHash(); +} + /// @brief create the index RocksDBVPackIndex::RocksDBVPackIndex(TRI_idx_iid_t iid, arangodb::LogicalCollection* collection, arangodb::velocypack::Slice const& info) : RocksDBIndex(iid, collection, info), _useExpansion(false), - _allowPartialIndex(true) { + _allowPartialIndex(true), + _estimator(nullptr) { + if (!_unique && !ServerState::instance()->isCoordinator()) { + // We activate the estimator for all non unique-indexes. + // And only on DBServers + _estimator = std::make_unique>(RocksDBIndex::ESTIMATOR_SIZE); + TRI_ASSERT(_estimator != nullptr); + } TRI_ASSERT(!_fields.empty()); TRI_ASSERT(iid != 0); @@ -181,6 +196,13 @@ RocksDBVPackIndex::RocksDBVPackIndex(TRI_idx_iid_t iid, /// @brief destroy the index RocksDBVPackIndex::~RocksDBVPackIndex() {} +double RocksDBVPackIndex::selectivityEstimate(arangodb::StringRef const*) const { + if (_unique) { + return 1.0; // only valid if unique + } + return _estimator->computeEstimate(); +} + size_t RocksDBVPackIndex::memory() const { rocksdb::TransactionDB* db = rocksutils::globalRocksDB(); RocksDBKeyBounds bounds = _unique ? RocksDBKeyBounds::UniqueIndex(_objectId) @@ -236,7 +258,8 @@ bool RocksDBVPackIndex::implicitlyUnique() const { int RocksDBVPackIndex::fillElement(VPackBuilder& leased, TRI_voc_rid_t revisionId, VPackSlice const& doc, - std::vector& elements) { + std::vector& elements, + std::vector& hashes) { if (doc.isNone()) { LOG_TOPIC(ERR, arangodb::Logger::FIXME) << "encountered invalid marker with slice of type None"; @@ -292,12 +315,13 @@ int RocksDBVPackIndex::fillElement(VPackBuilder& leased, // - Value: empty elements.push_back( RocksDBKey::IndexValue(_objectId, key, leased.slice())); + hashes.push_back(leased.slice().normalizedHash()); } } else { // other path for handling array elements, too std::vector sliceStack; - buildIndexValues(leased, doc, 0, elements, sliceStack); + buildIndexValues(leased, doc, 0, elements, sliceStack, hashes); } return TRI_ERROR_NO_ERROR; @@ -306,7 +330,8 @@ int RocksDBVPackIndex::fillElement(VPackBuilder& leased, void RocksDBVPackIndex::addIndexValue(VPackBuilder& leased, VPackSlice const& document, std::vector& elements, - std::vector& sliceStack) { + std::vector& sliceStack, + std::vector& hashes) { leased.clear(); leased.openArray(); for (VPackSlice const& s : sliceStack) { @@ -326,6 +351,7 @@ void RocksDBVPackIndex::addIndexValue(VPackBuilder& leased, // + primary key // - Value: empty elements.push_back(RocksDBKey::IndexValue(_objectId, key, leased.slice())); + hashes.push_back(leased.slice().normalizedHash()); } } @@ -334,12 +360,13 @@ void RocksDBVPackIndex::buildIndexValues(VPackBuilder& leased, VPackSlice const document, size_t level, std::vector& elements, - std::vector& sliceStack) { + std::vector& sliceStack, + std::vector& hashes) { // Invariant: level == sliceStack.size() // Stop the recursion: if (level == _paths.size()) { - addIndexValue(leased, document, elements, sliceStack); + addIndexValue(leased, document, elements, sliceStack, hashes); return; } @@ -353,7 +380,7 @@ void RocksDBVPackIndex::buildIndexValues(VPackBuilder& leased, } else { sliceStack.emplace_back(slice); } - buildIndexValues(leased, document, level + 1, elements, sliceStack); + buildIndexValues(leased, document, level + 1, elements, sliceStack, hashes); sliceStack.pop_back(); return; } @@ -374,7 +401,7 @@ void RocksDBVPackIndex::buildIndexValues(VPackBuilder& leased, for (size_t i = level; i < _paths.size(); i++) { sliceStack.emplace_back(illegalSlice); } - addIndexValue(leased, document, elements, sliceStack); + addIndexValue(leased, document, elements, sliceStack, hashes); for (size_t i = level; i < _paths.size(); i++) { sliceStack.pop_back(); } @@ -409,7 +436,7 @@ void RocksDBVPackIndex::buildIndexValues(VPackBuilder& leased, if (it == seen.end()) { seen.insert(something); sliceStack.emplace_back(something); - buildIndexValues(leased, document, level + 1, elements, sliceStack); + buildIndexValues(leased, document, level + 1, elements, sliceStack, hashes); sliceStack.pop_back(); } }; @@ -472,10 +499,11 @@ int RocksDBVPackIndex::insert(transaction::Methods* trx, TRI_voc_rid_t revisionId, VPackSlice const& doc, bool isRollback) { std::vector elements; + std::vector hashes; int res; try { transaction::BuilderLeaser leased(trx); - res = fillElement(*(leased.get()), revisionId, doc, elements); + res = fillElement(*(leased.get()), revisionId, doc, elements, hashes); } catch (...) { res = TRI_ERROR_OUT_OF_MEMORY; } @@ -530,6 +558,12 @@ int RocksDBVPackIndex::insert(transaction::Methods* trx, } } + for (auto& it : hashes) { + // The estimator is only useful if we are in a non-unique indexes + TRI_ASSERT(!_unique); + _estimator->insert(it); + } + return res; } @@ -537,10 +571,11 @@ int RocksDBVPackIndex::insertRaw(rocksdb::WriteBatchWithIndex* writeBatch, TRI_voc_rid_t revisionId, VPackSlice const& doc) { std::vector elements; + std::vector hashes; int res; try { VPackBuilder leased; - res = fillElement(leased, revisionId, doc, elements); + res = fillElement(leased, revisionId, doc, elements, hashes); } catch (...) { return TRI_ERROR_OUT_OF_MEMORY; } @@ -569,6 +604,14 @@ int RocksDBVPackIndex::insertRaw(rocksdb::WriteBatchWithIndex* writeBatch, writeBatch->Put(key.string(), value.string()); } } + + for (auto& it : hashes) { + // The estimator is only useful if we are in a non-unique indexes + TRI_ASSERT(!_unique); + _estimator->insert(it); + } + + return res; } @@ -577,11 +620,12 @@ int RocksDBVPackIndex::remove(transaction::Methods* trx, TRI_voc_rid_t revisionId, VPackSlice const& doc, bool isRollback) { std::vector elements; + std::vector hashes; int res; try { transaction::BuilderLeaser leased(trx); - res = fillElement(*(leased.get()), revisionId, doc, elements); + res = fillElement(*(leased.get()), revisionId, doc, elements, hashes); } catch (...) { res = TRI_ERROR_OUT_OF_MEMORY; } @@ -602,6 +646,12 @@ int RocksDBVPackIndex::remove(transaction::Methods* trx, } } + for (auto& it : hashes) { + // The estimator is only useful if we are in a non-unique indexes + TRI_ASSERT(!_unique); + _estimator->remove(it); + } + return res; } @@ -609,11 +659,12 @@ int RocksDBVPackIndex::removeRaw(rocksdb::WriteBatchWithIndex* writeBatch, TRI_voc_rid_t revisionId, VPackSlice const& doc) { std::vector elements; + std::vector hashes; int res; try { VPackBuilder leased; - res = fillElement(leased, revisionId, doc, elements); + res = fillElement(leased, revisionId, doc, elements, hashes); } catch (...) { res = TRI_ERROR_OUT_OF_MEMORY; } @@ -627,6 +678,12 @@ int RocksDBVPackIndex::removeRaw(rocksdb::WriteBatchWithIndex* writeBatch, writeBatch->Delete(elements[i].string()); } + for (auto& it : hashes) { + // The estimator is only useful if we are in a non-unique indexes + TRI_ASSERT(!_unique); + _estimator->remove(it); + } + return TRI_ERROR_NO_ERROR; } @@ -1413,11 +1470,56 @@ int RocksDBVPackIndex::cleanup() { return TRI_ERROR_NO_ERROR; } + +void RocksDBVPackIndex::serializeEstimate(std::string& output) const { + TRI_ASSERT(!ServerState::instance()->isCoordinator()); + if (!_unique) { + TRI_ASSERT(_estimator != nullptr); + _estimator->serialize(output); + } +} + +bool RocksDBVPackIndex::deserializeEstimate(RocksDBCounterManager* mgr) { + TRI_ASSERT(!ServerState::instance()->isCoordinator()); + if (_unique) { + return true; + } + // We simply drop the current estimator and steal the one from recovery + // We are than save for resizing issues in our _estimator format + // and will use the old size. + + TRI_ASSERT(mgr != nullptr); + auto tmp = mgr->stealIndexEstimator(_objectId); + if (tmp == nullptr) { + // We expected to receive a stored index estimate, however we got none. + // We use the freshly created estimator but have to recompute it. + return false; + } + _estimator.swap(tmp); + TRI_ASSERT(_estimator != nullptr); + return true; +} + +void RocksDBVPackIndex::recalculateEstimates() { + if (unique()) { + return; + } + TRI_ASSERT(_estimator != nullptr); + _estimator->clear(); + + auto bounds = RocksDBKeyBounds::IndexEntries(_objectId); + rocksutils::iterateBounds(bounds, [&](rocksdb::Iterator* it) { + uint64_t hash = RocksDBVPackIndex::HashForKey(it->key()); + _estimator->insert(hash); + }); +} + Result RocksDBVPackIndex::postprocessRemove(transaction::Methods* trx, rocksdb::Slice const& key, rocksdb::Slice const& value) { if (!unique()) { - // TODO: update selectivity estimate + uint64_t hash = RocksDBVPackIndex::HashForKey(key); + _estimator->remove(hash); } return {TRI_ERROR_NO_ERROR}; } diff --git a/arangod/RocksDBEngine/RocksDBVPackIndex.h b/arangod/RocksDBEngine/RocksDBVPackIndex.h index 224b52579a..49f560cf22 100644 --- a/arangod/RocksDBEngine/RocksDBVPackIndex.h +++ b/arangod/RocksDBEngine/RocksDBVPackIndex.h @@ -29,6 +29,7 @@ #include "Aql/AstNode.h" #include "Basics/Common.h" #include "Indexes/IndexIterator.h" +#include "RocksDBEngine/RocksDBCuckooIndexEstimator.h" #include "RocksDBEngine/RocksDBIndex.h" #include "RocksDBEngine/RocksDBKey.h" #include "RocksDBEngine/RocksDBKeyBounds.h" @@ -101,6 +102,8 @@ class RocksDBVPackIndex : public RocksDBIndex { friend class RocksDBVPackIndexIterator; public: + static uint64_t HashForKey(const rocksdb::Slice& key); + RocksDBVPackIndex() = delete; RocksDBVPackIndex(TRI_idx_iid_t, LogicalCollection*, @@ -110,13 +113,7 @@ class RocksDBVPackIndex : public RocksDBIndex { bool hasSelectivityEstimate() const override { return true; } - double selectivityEstimate( - arangodb::StringRef const* = nullptr) const override { - if (_unique) { - return 1.0; // only valid if unique - } - return 0.2; // TODO: fix this hard-coded estimate - } + double selectivityEstimate(arangodb::StringRef const* = nullptr) const override; size_t memory() const override; @@ -179,6 +176,12 @@ class RocksDBVPackIndex : public RocksDBIndex { int cleanup() override; + void serializeEstimate(std::string& output) const override; + + bool deserializeEstimate(arangodb::RocksDBCounterManager* mgr) override; + + void recalculateEstimates() override; + protected: Result postprocessRemove(transaction::Methods* trx, rocksdb::Slice const& key, rocksdb::Slice const& value) override; @@ -209,21 +212,26 @@ protected: /// @brief helper function to insert a document into any index type int fillElement(velocypack::Builder& leased, TRI_voc_rid_t revisionId, - VPackSlice const& doc, std::vector& elements); + VPackSlice const& doc, std::vector& elements, + std::vector& hashes); /// @brief helper function to build the key and value for rocksdb from the /// vector of slices + /// @param hashes list of VPackSlice hashes for the estimator. void addIndexValue(velocypack::Builder& leased, VPackSlice const& document, std::vector& elements, - std::vector& sliceStack); + std::vector& sliceStack, + std::vector& hashes); /// @brief helper function to create a set of value combinations to insert /// into the rocksdb index. /// @param elements vector of resulting index entries /// @param sliceStack working list of values to insert into the index + /// @param hashes list of VPackSlice hashes for the estimator. void buildIndexValues(velocypack::Builder& leased, VPackSlice const document, size_t level, std::vector& elements, - std::vector& sliceStack); + std::vector& sliceStack, + std::vector& hashes); private: std::unique_ptr _allocator; @@ -239,6 +247,12 @@ protected: /// @brief whether or not partial indexing is allowed bool _allowPartialIndex; + + /// @brief A fixed size library to estimate the selectivity of the index. + /// On insertion of a document we have to insert it into the estimator, + /// On removal we have to remove it in the estimator as well. + std::unique_ptr> _estimator; + }; } // namespace arangodb diff --git a/arangod/StorageEngine/EngineSelectorFeature.cpp b/arangod/StorageEngine/EngineSelectorFeature.cpp index 5261918da6..0b0bd476cf 100644 --- a/arangod/StorageEngine/EngineSelectorFeature.cpp +++ b/arangod/StorageEngine/EngineSelectorFeature.cpp @@ -70,9 +70,14 @@ void EngineSelectorFeature::prepare() { // file if engine in file does not match command-line option if (basics::FileUtils::isRegularFile(_engineFilePath)) { - std::string content = basics::FileUtils::slurp(_engineFilePath); - if (content != _engine) { - LOG_TOPIC(FATAL, Logger::STARTUP) << "content of 'ENGINE' file '" << _engineFilePath << "' and command-line/configuration option value do not match: '" << content << "' != '" << _engine << "'. please validate the command-line/configuration option value of '--server.storage-engine' or use a different database directory if the change is intentional"; + try { + std::string content = basics::FileUtils::slurp(_engineFilePath); + if (content != _engine) { + LOG_TOPIC(FATAL, Logger::STARTUP) << "content of 'ENGINE' file '" << _engineFilePath << "' and command-line/configuration option value do not match: '" << content << "' != '" << _engine << "'. please validate the command-line/configuration option value of '--server.storage-engine' or use a different database directory if the change is intentional"; + FATAL_ERROR_EXIT(); + } + } catch (std::exception const& ex) { + LOG_TOPIC(FATAL, Logger::STARTUP) << "unable to read content of 'ENGINE' file '" << _engineFilePath << "': " << ex.what() << ". please make sure the file/directory is readable for the arangod process and user"; FATAL_ERROR_EXIT(); } } @@ -105,7 +110,7 @@ void EngineSelectorFeature::start() { try { basics::FileUtils::spit(_engineFilePath, _engine); } catch (std::exception const& ex) { - LOG_TOPIC(FATAL, Logger::STARTUP) << "unable to write 'ENGINE' file '" << _engineFilePath << "': " << ex.what(); + LOG_TOPIC(FATAL, Logger::STARTUP) << "unable to write 'ENGINE' file '" << _engineFilePath << "': " << ex.what() << ". please make sure the file/directory is writable for the arangod process and user"; FATAL_ERROR_EXIT(); } } diff --git a/js/client/tests/agency/agency-test.js b/js/client/tests/agency/agency-test.js index 322c00103e..8eea2f03aa 100644 --- a/js/client/tests/agency/agency-test.js +++ b/js/client/tests/agency/agency-test.js @@ -605,7 +605,9 @@ function agencyTestSuite () { //////////////////////////////////////////////////////////////////////////////// testOpErase : function () { + writeAndCheck([[{"/version":{"op":"delete"}}]]); + writeAndCheck([[{"/a":[0,1,2,3,4,5,6,7,8,9]}]]); // none before assertEqual(readAndCheck([["/a"]]), [{a:[0,1,2,3,4,5,6,7,8,9]}]); writeAndCheck([[{"a":{"op":"erase","val":3}}]]); @@ -629,6 +631,28 @@ function agencyTestSuite () { writeAndCheck([[{"a":{"op":"erase","val":6}}], [{"a":{"op":"erase","val":8}}]]); assertEqual(readAndCheck([["/a"]]), [{a:[]}]); + + writeAndCheck([[{"/a":[0,1,2,3,4,5,6,7,8,9]}]]); // none before + assertEqual(readAndCheck([["/a"]]), [{a:[0,1,2,3,4,5,6,7,8,9]}]); + writeAndCheck([[{"a":{"op":"erase","pos":3}}]]); + assertEqual(readAndCheck([["/a"]]), [{a:[0,1,2,4,5,6,7,8,9]}]); + writeAndCheck([[{"a":{"op":"erase","pos":0}}]]); + assertEqual(readAndCheck([["/a"]]), [{a:[1,2,4,5,6,7,8,9]}]); + writeAndCheck([[{"a":{"op":"erase","pos":0}}]]); + assertEqual(readAndCheck([["/a"]]), [{a:[2,4,5,6,7,8,9]}]); + writeAndCheck([[{"a":{"op":"erase","pos":2}}]]); + assertEqual(readAndCheck([["/a"]]), [{a:[2,4,6,7,8,9]}]); + writeAndCheck([[{"a":{"op":"erase","pos":4}}]]); + assertEqual(readAndCheck([["/a"]]), [{a:[2,4,6,7,9]}]); + writeAndCheck([[{"a":{"op":"erase","pos":2}}]]); + assertEqual(readAndCheck([["/a"]]), [{a:[2,4,7,9]}]); + writeAndCheck([[{"a":{"op":"erase","pos":2}}]]); + assertEqual(readAndCheck([["/a"]]), [{a:[2,4,9]}]); + writeAndCheck([[{"a":{"op":"erase","pos":0}}]]); + assertEqual(readAndCheck([["/a"]]), [{a:[4,9]}]); + writeAndCheck([[{"a":{"op":"erase","pos":1}}], + [{"a":{"op":"erase","pos":0}}]]); + assertEqual(readAndCheck([["/a"]]), [{a:[]}]); }, //////////////////////////////////////////////////////////////////////////////// diff --git a/js/common/tests/shell/shell-edge-index-noncluster-mmfiles.js b/js/common/tests/shell/shell-edge-index-noncluster.js similarity index 78% rename from js/common/tests/shell/shell-edge-index-noncluster-mmfiles.js rename to js/common/tests/shell/shell-edge-index-noncluster.js index ec89f7bbfc..9bfb7de8ff 100644 --- a/js/common/tests/shell/shell-edge-index-noncluster-mmfiles.js +++ b/js/common/tests/shell/shell-edge-index-noncluster.js @@ -33,8 +33,22 @@ var arangodb = require("@arangodb"); var db = arangodb.db; var internal = require("internal"); var wait = internal.wait; +var print = internal.print; var ArangoCollection = arangodb.ArangoCollection; +var mmfilesEngine = false; +if (db._engine().name === "mmfiles") { + mmfilesEngine = true; +} + +var printed=false; +function printNotImplemented(message){ + if(!printed){ + printed = true; + print("test for " + message + " not implemented"); + } +} + //////////////////////////////////////////////////////////////////////////////// /// @brief test suite: buckets //////////////////////////////////////////////////////////////////////////////// @@ -60,9 +74,20 @@ function EdgeIndexBucketsSuite () { db._drop(en1); db._drop(en2); db._drop(en3); - edge1 = db._createEdgeCollection(en1, { indexBuckets: 1 }); - edge2 = db._createEdgeCollection(en2, { indexBuckets: 16 }); - edge3 = db._createEdgeCollection(en3, { indexBuckets: 128 }); + + var options = {}; + if (mmfilesEngine){ + options = { indexBuckets: 1 }; + } + edge1 = db._createEdgeCollection(en1, options); + if (mmfilesEngine){ + options = { indexBuckets: 16 }; + } + edge2 = db._createEdgeCollection(en2, options); + if (mmfilesEngine){ + options = { indexBuckets: 128 }; + } + edge3 = db._createEdgeCollection(en3, options); db._drop(vn); vertex = db._create(vn); @@ -93,7 +118,7 @@ function EdgeIndexBucketsSuite () { edge1.save(vn + "/v" + i, vn + "/v" + j, { }); edge2.save(vn + "/v" + i, vn + "/v" + j, { }); edge3.save(vn + "/v" + i, vn + "/v" + j, { }); - } + } } // unload collections @@ -183,7 +208,7 @@ function EdgeIndexBucketsSuite () { ref = edge1.inEdges(from).length; assertEqual(ref, edge2.inEdges(from).length); assertEqual(ref, edge3.inEdges(from).length); - + ref = edge1.outEdges(to).length; assertEqual(ref, edge2.outEdges(to).length); assertEqual(ref, edge3.outEdges(to).length); @@ -237,6 +262,38 @@ function EdgeIndexSuite () { wait(0.0); }, +//////////////////////////////////////////////////////////////////////////////// +/// @brief test batch size limit +//////////////////////////////////////////////////////////////////////////////// + + testIndexBatchsizeLimit : function () { + [20, 900, 1000, 1100, 2000].forEach( function(n){ + var toKeys = []; + for (var i = 0; i < n; ++i) { + var to = "b" + n + "/"+i; + edge.insert({_from : "a/" + n, _to : to}); + toKeys.push(to); + } + + assertEqual(n,edge.byExample({ _from : "a/" + n }).toArray().length, "compare 1"); + assertEqual(n,edge.byExample({ _from : "a/" + n }).toArray().length, "compare 2"); + var rv = edge.byExample({ _from : "a/" + n }).toArray(); + assertEqual(n,rv.length, "compare 3"); + + //assert equal values + if(n <= 1001){ + var keys = rv.map(function(x){ return x._to; }); + keys.sort(); + toKeys.sort(); + keys.forEach(function(x,i){ + assertEqual(x,toKeys[i], "compare exact values"); + }); + } + + }); + }, + + //////////////////////////////////////////////////////////////////////////////// /// @brief test index presence //////////////////////////////////////////////////////////////////////////////// @@ -255,9 +312,14 @@ function EdgeIndexSuite () { //////////////////////////////////////////////////////////////////////////////// testIndexSelectivityEmpty : function () { + printed = false; var edgeIndex = edge.getIndexes()[1]; assertTrue(edgeIndex.hasOwnProperty("selectivityEstimate")); - assertEqual(1, edgeIndex.selectivityEstimate); + if(mmfilesEngine){ + assertEqual(1, edgeIndex.selectivityEstimate); + } else { + printNotImplemented("selectivityEstimate"); + } }, //////////////////////////////////////////////////////////////////////////////// @@ -265,10 +327,15 @@ function EdgeIndexSuite () { //////////////////////////////////////////////////////////////////////////////// testIndexSelectivityOneDoc : function () { + printed = false; edge.save(v1, v2, { }); var edgeIndex = edge.getIndexes()[1]; assertTrue(edgeIndex.hasOwnProperty("selectivityEstimate")); - assertEqual(1, edgeIndex.selectivityEstimate); + if(mmfilesEngine){ + assertEqual(1, edgeIndex.selectivityEstimate); + } else { + printNotImplemented("selectivityEstimate"); + } }, //////////////////////////////////////////////////////////////////////////////// @@ -276,6 +343,7 @@ function EdgeIndexSuite () { //////////////////////////////////////////////////////////////////////////////// testIndexSelectivityDuplicateDocs : function () { + printed = false; var i, c, edgeIndex, expectedSelectivity; for (i = 0; i < 1000; ++i) { @@ -283,7 +351,11 @@ function EdgeIndexSuite () { edgeIndex = edge.getIndexes()[1]; expectedSelectivity = 1 / (i + 1); // allow for some floating-point deviations - assertTrue(Math.abs(expectedSelectivity - edgeIndex.selectivityEstimate) <= 0.001); + if(mmfilesEngine){ + assertTrue(Math.abs(expectedSelectivity - edgeIndex.selectivityEstimate) <= 0.001); + } else { + printNotImplemented("selectivityEstimate"); + } } var n = edge.count(); @@ -298,7 +370,11 @@ function EdgeIndexSuite () { c = 1000 - (i + 1); expectedSelectivity = (c === 0 ? 1 : 1 / c); // allow for some floating-point deviations - assertTrue(Math.abs(expectedSelectivity - edgeIndex.selectivityEstimate) <= 0.001); + if(mmfilesEngine){ + assertTrue(Math.abs(expectedSelectivity - edgeIndex.selectivityEstimate) <= 0.001); + } else { + printNotImplemented("selectivityEstimate"); + } } }, @@ -319,11 +395,16 @@ function EdgeIndexSuite () { //////////////////////////////////////////////////////////////////////////////// testIndexSelectivityUniqueDocsFrom : function () { + printed = false; for (var i = 0; i < 1000; ++i) { edge.save(vn + "/from" + i, vn + "/1", { }); var edgeIndex = edge.getIndexes()[1]; - var expectedSelectivity = (1 + (1 / (i + 1))) * 0.5; - assertTrue(Math.abs(expectedSelectivity - edgeIndex.selectivityEstimate) <= 0.001); + var expectedSelectivity = (1 + (1 / (i + 1))) * 0.5; + if(mmfilesEngine){ + assertTrue(Math.abs(expectedSelectivity - edgeIndex.selectivityEstimate) <= 0.001); + } else { + printNotImplemented("selectivityEstimate"); + } } }, @@ -332,16 +413,22 @@ function EdgeIndexSuite () { //////////////////////////////////////////////////////////////////////////////// testIndexSelectivityRepeatingDocs : function () { + printed = false; for (var i = 0; i < 1000; ++i) { if (i > 0) { var edgeIndex = edge.getIndexes()[1]; - var expectedSelectivity = (1 + (Math.min(i, 20) / i)) * 0.5; - assertTrue(Math.abs(expectedSelectivity - edgeIndex.selectivityEstimate) <= 0.001); + var expectedSelectivity = (1 + (Math.min(i, 20) / i)) * 0.5; + if(mmfilesEngine){ + assertTrue(Math.abs(expectedSelectivity - edgeIndex.selectivityEstimate) <= 0.001); + } else { + printNotImplemented("selectivityEstimate"); + } } edge.save(vn + "/from" + (i % 20), vn + "/to" + i, { }); } } + }; } @@ -350,8 +437,7 @@ function EdgeIndexSuite () { /// @brief executes the test suite //////////////////////////////////////////////////////////////////////////////// -jsunity.run(EdgeIndexBucketsSuite); jsunity.run(EdgeIndexSuite); +jsunity.run(EdgeIndexBucketsSuite); return jsunity.done(); - diff --git a/js/server/tests/recovery/indexes-hash.js b/js/server/tests/recovery/indexes-hash.js index bc357f0776..d0fea081db 100644 --- a/js/server/tests/recovery/indexes-hash.js +++ b/js/server/tests/recovery/indexes-hash.js @@ -28,32 +28,39 @@ // / @author Copyright 2012, triAGENS GmbH, Cologne, Germany // ////////////////////////////////////////////////////////////////////////////// -var db = require('@arangodb').db; -var internal = require('internal'); -var jsunity = require('jsunity'); +const db = require('@arangodb').db; +const internal = require('internal'); +const jsunity = require('jsunity'); + +const colName1 = 'UnitTestsRecovery1'; +const colName2 = 'UnitTestsRecovery2'; +const colName3 = 'UnitTestsRecovery3'; +const est1 = 1; // The index is de-facto unique so estimate 1 +const est2 = 1; // This index is unique. Estimate 1 +const est3 = 4 / 1000; // This index has 4 different values and stores 1000 documents function runSetup () { 'use strict'; internal.debugClearFailAt(); - db._drop('UnitTestsRecovery1'); - var c = db._create('UnitTestsRecovery1'), i; + db._drop(colName1); + var c = db._create(colName1), i; c.ensureHashIndex('value'); for (i = 0; i < 1000; ++i) { c.save({ value: i }); } - db._drop('UnitTestsRecovery2'); - c = db._create('UnitTestsRecovery2'); + db._drop(colName2); + c = db._create(colName2); c.ensureUniqueConstraint('a.value'); for (i = 0; i < 1000; ++i) { c.save({ a: { value: i } }); } - db._drop('UnitTestsRecovery3'); - c = db._create('UnitTestsRecovery3'); + db._drop(colName3); + c = db._create(colName3); c.ensureHashIndex('a', 'b'); for (i = 0; i < 500; ++i) { @@ -84,39 +91,82 @@ function recoverySuite () { // / @brief test whether we can restore the trx data // ////////////////////////////////////////////////////////////////////////////// - testIndexesHash: function () { - var c = db._collection('UnitTestsRecovery1'), idx, i; - idx = c.getIndexes()[1]; + testSingleAttributeHashIndexInfo: function() { + let c = db._collection(colName1); + let idx = c.getIndexes()[1]; assertFalse(idx.unique); assertFalse(idx.sparse); assertEqual([ 'value' ], idx.fields); - for (i = 0; i < 1000; ++i) { + }, + + testSingleAttributeHashIndexByExample: function() { + let c = db._collection(colName1); + for (let i = 0; i < 1000; ++i) { assertEqual(1, c.byExample({ value: i }).toArray().length); } - assertEqual(1, db._query("FOR doc IN UnitTestsRecovery1 FILTER doc.value == 0 RETURN doc").toArray().length); + }, - c = db._collection('UnitTestsRecovery2'); - idx = c.getIndexes()[1]; + testSingleAttributeHashIndexAql: function() { + assertEqual(1, db._query(`FOR doc IN ${colName1} FILTER doc.value == 0 RETURN doc`).toArray().length); + }, + + testSingleAttributeHashIndexEstimate: function () { + let c = db._collection(colName1); + let idx = c.getIndexes()[1]; + assertEqual(est1, idx.selectivityEstimate); + }, + + testNestedAttributeHashIndexInfo: function() { + let c = db._collection(colName2); + let idx = c.getIndexes()[1]; assertTrue(idx.unique); assertFalse(idx.sparse); assertEqual([ 'a.value' ], idx.fields); - for (i = 0; i < 1000; ++i) { + }, + + testNestedAttributeHashIndexByExample: function() { + let c = db._collection(colName2); + for (let i = 0; i < 1000; ++i) { assertEqual(1, c.byExample({ 'a.value': i }).toArray().length); } - assertEqual(1, db._query("FOR doc IN UnitTestsRecovery2 FILTER doc.a.value == 0 RETURN doc").toArray().length); + }, - c = db._collection('UnitTestsRecovery3'); - idx = c.getIndexes()[1]; + testNestedAttributeHashIndexAql: function() { + assertEqual(1, db._query(`FOR doc IN ${colName2} FILTER doc.a.value == 0 RETURN doc`).toArray().length); + }, + + testNestedAttributeHashIndexEstimate: function () { + let c = db._collection(colName2); + let idx = c.getIndexes()[1]; + assertEqual(est2, idx.selectivityEstimate); + }, + + testManyAttributesHashIndexInfo: function() { + let c = db._collection(colName3); + let idx = c.getIndexes()[1]; assertFalse(idx.unique); assertFalse(idx.sparse); assertEqual([ 'a', 'b' ], idx.fields); + }, + + testManyAttributesHashIndexByExample: function() { + let c = db._collection(colName3); assertEqual(250, c.byExample({ a: 1, b: 1 }).toArray().length); assertEqual(250, c.byExample({ a: 1, b: 2 }).toArray().length); assertEqual(250, c.byExample({ a: 2, b: 1 }).toArray().length); assertEqual(250, c.byExample({ a: 2, b: 2 }).toArray().length); - assertEqual(250, db._query("FOR doc IN UnitTestsRecovery3 FILTER doc.a == 1 && doc.b == 1 RETURN doc").toArray().length); - } + }, + testManyAttributesHashIndexAql: function() { + assertEqual(250, db._query(`FOR doc IN ${colName3} FILTER doc.a == 1 && doc.b == 1 RETURN doc`).toArray().length); + }, + + testManyAttributesHashIndexEstimate: function () { + let c = db._collection(colName3); + let idx = c.getIndexes()[1]; + assertEqual(est3, idx.selectivityEstimate); + }, + }; } diff --git a/js/server/tests/shell/shell-index-cluster-spec.js b/js/server/tests/shell/shell-index-cluster-spec.js new file mode 100644 index 0000000000..c91136feff --- /dev/null +++ b/js/server/tests/shell/shell-index-cluster-spec.js @@ -0,0 +1,80 @@ +/*global describe, it, ArangoAgency, afterEach */ + +//////////////////////////////////////////////////////////////////////////////// +/// @brief cluster collection creation tests +/// +/// @file +/// +/// DISCLAIMER +/// +/// Copyright 2010-2012 triagens GmbH, Cologne, Germany +/// +/// Licensed under the Apache License, Version 2.0 (the "License"); +/// you may not use this file except in compliance with the License. +/// You may obtain a copy of the License at +/// +/// http://www.apache.org/licenses/LICENSE-2.0 +/// +/// Unless required by applicable law or agreed to in writing, software +/// distributed under the License is distributed on an "AS IS" BASIS, +/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +/// See the License for the specific language governing permissions and +/// limitations under the License. +/// +/// Copyright holder is triAGENS GmbH, Cologne, Germany +/// +/// @author Andreas Streichardt +/// @author Copyright 2017, ArangoDB GmbH, Cologne, Germany +//////////////////////////////////////////////////////////////////////////////// + +'use strict'; + +const expect = require('chai').expect; + +var internal = require("internal"); +var db = require("org/arangodb").db; + +describe('Cluster collection creation options', function() { + afterEach(function() { + db._drop('testi'); + }); + it('should always throw an error when creating a faulty index', function() { + db._create("testi", {numberOfShards: 1}); + db.testi.save({"test": 1}); + db.testi.save({"test": 1}); + + expect(function() { + db.testi.ensureIndex({ type: "hash", fields: [ "test" ], unique: true }); + }).to.throw(); + // before fixing it the second call would return the faulty index + expect(function() { + db.testi.ensureIndex({ type: "hash", fields: [ "test" ], unique: true }); + }).to.throw(); + }); + it('should cleanup current after creating a faulty index', function() { + db._create("testi", {numberOfShards: 1}); + let current = ArangoAgency.get('Current/Collections/_system'); + let plan = ArangoAgency.get('Plan/Collections/_system'); + let collectionId = Object.values(plan.arango.Plan.Collections['_system']).reduce((result, collectionDef) => { + if (result) { + return result; + } + + if (collectionDef.name === 'testi') { + return collectionDef.id; + } + }, undefined); + db.testi.save({"test": 1}); + db.testi.save({"test": 1}); + + expect(function() { + db.testi.ensureIndex({ type: "hash", fields: [ "test" ], unique: true }); + }).to.throw(); + // wait for the schmutz + internal.wait(1.0); + current = ArangoAgency.get('Current/Collections/_system/' + collectionId); + Object.values(current.arango.Current.Collections['_system'][collectionId]).forEach(entry => { + expect(entry.indexes).to.have.lengthOf(1); + }); + }); +}); \ No newline at end of file diff --git a/js/server/tests/shell/shell-skiplist-correctness.js b/js/server/tests/shell/shell-skiplist-correctness.js index 21a2e331ea..3b730fbc5b 100644 --- a/js/server/tests/shell/shell-skiplist-correctness.js +++ b/js/server/tests/shell/shell-skiplist-correctness.js @@ -326,9 +326,7 @@ function SkipListCorrSuite() { } // should not have created an index - // TODO: FIXME 04052017: re-activate this check!! waiting for mop to add rollback code - // in case an index cannot be created on a shard - // assertEqual(coll.getIndexes().length, 1); + assertEqual(coll.getIndexes().length, 1); } }; } diff --git a/scripts/cluster-run-common.sh b/scripts/cluster-run-common.sh index 003dd2a78e..6e91f4fc14 100644 --- a/scripts/cluster-run-common.sh +++ b/scripts/cluster-run-common.sh @@ -17,6 +17,7 @@ function help() { echo " -o/--xterm-options XTerm options (default: --geometry=80x43)" echo " -b/--offset-ports Offset ports (default: 0, i.e. A:4001, C:8530, D:8629)" echo " -r/--rocksdb-engine Use Rocksdb engine (default: false )" + echo " -q/--source-dir ArangoDB source dir (default: .)" echo "" echo "EXAMPLES:" echo " $0" @@ -44,6 +45,7 @@ SECONDARIES=0 BUILD="build" JWT_SECRET="" PORT_OFFSET=0 +SRC_DIR="." while [[ ${1} ]]; do case "${1}" in @@ -87,6 +89,10 @@ while [[ ${1} ]]; do JWT_SECRET=${2} shift ;; + -q|--src-dir) + SRC_DIR=${2} + shift + ;; -x|--xterm) XTERM=${2} shift diff --git a/scripts/startLocalCluster.sh b/scripts/startLocalCluster.sh index e3d7bb7653..7df5510282 100755 --- a/scripts/startLocalCluster.sh +++ b/scripts/startLocalCluster.sh @@ -19,7 +19,8 @@ printf " # db servers: %s," "$NRDBSERVERS" printf " # coordinators: %s," "$NRCOORDINATORS" printf " transport: %s\n" "$TRANSPORT" -if [ ! -d arangod ] || [ ! -d arangosh ] || [ ! -d UnitTests ] ; then +echo $SRC_DIR +if [ ! -d ${SRC_DIR}/arangod ] || [ ! -d ${SRC_DIR}/arangosh ] || [ ! -d ${SRC_DIR}/UnitTests ] ; then echo Must be started in the main ArangoDB source directory. exit 1 fi @@ -29,11 +30,6 @@ if [[ $(( $NRAGENTS % 2 )) == 0 ]]; then exit 1 fi -if [ ! -d arangod ] || [ ! -d arangosh ] || [ ! -d UnitTests ] ; then - echo "Must be started in the main ArangoDB source directory! Bailing out." - exit 1 -fi - if [ ! -z "$INTERACTIVE_MODE" ] ; then if [ "$INTERACTIVE_MODE" == "C" ] ; then COORDINATORCONSOLE=1 @@ -102,9 +98,9 @@ for aid in `seq 0 $(( $NRAGENTS - 1 ))`; do --agency.supervision-grace-period 15 \ --agency.wait-for-sync false \ --database.directory cluster/data$port \ - --javascript.app-path ./js/apps \ - --javascript.startup-directory ./js \ - --javascript.module-directory ./enterprise/js \ + --javascript.app-path ${SRC_DIR}/js/apps \ + --javascript.startup-directory ${SRC_DIR}/js \ + --javascript.module-directory ${SRC_DIR}/enterprise/js \ --javascript.v8-contexts 1 \ --server.endpoint $TRANSPORT://$ANYWHERE:$port \ --server.statistics false \ @@ -143,8 +139,8 @@ start() { --log.level $LOG_LEVEL \ --server.statistics true \ --server.threads 5 \ - --javascript.startup-directory ./js \ - --javascript.module-directory ./enterprise/js \ + --javascript.startup-directory ${SRC_DIR}/js \ + --javascript.module-directory ${SRC_DIR}/enterprise/js \ --javascript.app-path cluster/apps$PORT \ --log.force-direct true \ --log.level cluster=$LOG_LEVEL_CLUSTER \ @@ -176,9 +172,9 @@ startTerminal() { --log.level $LOG_LEVEL \ --server.statistics true \ --server.threads 5 \ - --javascript.startup-directory ./js \ - --javascript.module-directory ./enterprise/js \ - --javascript.app-path ./js/apps \ + --javascript.startup-directory ${SRC_DIR}/js \ + --javascript.module-directory ${SRC_DIR}/enterprise/js \ + --javascript.app-path ${SRC_DIR}/js/apps \ $STORAGE_ENGINE \ $DEFAULT_REPLICATION \ $AUTHENTICATION \ @@ -207,9 +203,9 @@ startDebugger() { --log.level $LOG_LEVEL \ --server.statistics false \ --server.threads 5 \ - --javascript.startup-directory ./js \ - --javascript.module-directory ./enterprise/js \ - --javascript.app-path ./js/apps \ + --javascript.startup-directory ${SRC_DIR}/js \ + --javascript.module-directory ${SRC_DIR}/enterprise/js \ + --javascript.app-path ${SRC_DIR}/js/apps \ $STORAGE_ENGINE \ $DEFAULT_REPLICATION \ $SSLKEYFILE \ @@ -238,9 +234,9 @@ startRR() { --log.level $LOG_LEVEL \ --server.statistics true \ --server.threads 5 \ - --javascript.startup-directory ./js \ - --javascript.module-directory ./enterprise/js \ - --javascript.app-path ./js/apps \ + --javascript.startup-directory ${SRC_DIR}/js \ + --javascript.module-directory ${SRC_DIR}/enterprise/js \ + --javascript.app-path ${SRC_DIR}/js/apps \ $STORAGE_ENGINE \ $DEFAULT_REPLICATION \ $AUTHENTICATION \ @@ -331,13 +327,13 @@ if [ "$SECONDARIES" == "1" ] ; then --cluster.my-id $CLUSTER_ID \ --log.file cluster/$PORT.log \ --server.statistics true \ - --javascript.startup-directory ./js \ - --javascript.module-directory ./enterprise/js \ + --javascript.startup-directory ${SRC_DIR}/js \ + --javascript.module-directory ${SRC_DIR}/enterprise/js \ $STORAGE_ENGINE \ $DEFAULT_REPLICATION \ $AUTHENTICATION \ $SSLKEYFILE \ - --javascript.app-path ./js/apps \ + --javascript.app-path ${SRC_DIR}/js/apps \ > cluster/$PORT.stdout 2>&1 & let index=$index+1 diff --git a/scripts/tmux_test_starter b/scripts/tmux_test_starter new file mode 100755 index 0000000000..08482fb232 --- /dev/null +++ b/scripts/tmux_test_starter @@ -0,0 +1,159 @@ +#!/bin/bash +# Author Jan Christoph Uhde + +main(){ + source_file=~/.tmux_starter_suites #may not contain spaces + if [[ -f $source_file ]]; then + . $source_file + fi + + local suite=${1:-all} + local tasks="suite_$suite" + type -t function $tasks || die "suite $suite not defined" + # ceck task + local session_name="$($tasks 'name')" + local panes=$($tasks 'num') + kill_old_session "$session_name" + tmux new-session -d -s "$session_name" || die "unable to spawn session" + local rows=$(( (panes+1) / 2 )) + local cols=$((((panes>1)) * 2)) + spawn_panes "$session_name" $rows $cols + tmux select-pane -t $session_name.0 + execute_tasks "$(pwd)" $tasks + tmux -2 attach-session -t $session_name +} + +# task definitions + +suite_all(){ + local count="$1" + local args_default="" + local args="$2" + + local tests="" + case $1 in + num) + echo "6" + return + ;; + name) + echo "test_all" + return + ;; + 0) + echo "./scripts/quickieTest.sh $args && exit 0" + return + ;; + 1) + tests="shell_server shell_client" + ;; + 2) + tests="shell_server_aql" + ;; + 2) + tests="http_server server_http" + ;; + 3) + tests="dump importing export arangobench upgrade" + ;; + 4) + tests="replication_sync replication_static replication_ongoing http_replication shell_replication" + ;; + 5) + tests="agency cluster_sync" + ;; + *) + esac + + echo "./scripts/unittest $tests $args_default $args && exit 0" +} + +suite_all_rocksdb(){ + local count="$1" + local args_default="" + local args="$2" + + local tests="" + case $1 in + name) + echo "test_all_rocksdb" + return + ;; + *) + suite_all "$1" "--storageEngine rocksdb" + ;; + esac +} + +# tmux +kill_old_session(){ + local session_name="$1" + if tmux has-session -t $session_name 2> /dev/null; then + echo "Session $session_name exists. Kill it? [y/N]" + read kill_sess + echo "killsess '$kill_sess'" + if [[ ($kill_sess == "y") || ($kill_sess == "Y") ]]; then + tmux kill-session -t $session_name + else + die "Session not created because it already exists. Exiting." + fi + fi +} + +spawn_panes(){ + local session_name="$1" + local rows=$2 + local cols=$3 + + # Create rows + local row=$rows + while (( row > 1 )); do + frac=$(echo "scale=2;1/$row" | bc) + percent=$(echo "($frac * 100)/1" | bc) + tmux select-pane -t $session_name.0 || die "could not select pane 0 of session $session_name" + tmux split-window -v -p $percent + ((row--)) + done + + # Create columns + if ((cols > 1)); then + local count=$((rows - 1)) + while ((count >= 0)); do + tmux select-pane -t $session_name.$count || die "could not select pane $session_name.$count" + tmux split-window -h -p 50 + (( count-- )) + done + fi +} + +execute_tasks(){ + cd $1 || die + local tasks="$2" + local args="$3" + local count=0 + while (( count < $($tasks 'num') )); do + local exec_cmd="$($tasks $count "$args")" + echo "running: ${exec_cmd[@]}" + tmux send-keys -t $session_name.$count "${exec_cmd[@]}" Enter + (( count++ )) + done +} + +# helper +die(){ + echo "FATAL: ${1-this should not happen}" + exit 1 +} + + +cores(){ + # Determine automatically on Mac or Linux + if [[ $(uname) == 'Darwin' ]]; then + echo "$(sysctl hw.ncpu | awk '{print $2}')" + else + echo "$(nproc)" + fi +} + +main "$@" +exit 0 diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 19ea30b286..50f7093c64 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -53,6 +53,7 @@ add_executable( Cache/TransactionsWithBackingStore.cpp Geo/georeg.cpp Pregel/typedbuffer.cpp + RocksDBEngine/IndexEstimatorTest.cpp main.cpp ) diff --git a/tests/RocksDBEngine/IndexEstimatorTest.cpp b/tests/RocksDBEngine/IndexEstimatorTest.cpp new file mode 100644 index 0000000000..a02ae17126 --- /dev/null +++ b/tests/RocksDBEngine/IndexEstimatorTest.cpp @@ -0,0 +1,135 @@ +//////////////////////////////////////////////////////////////////////////////// +/// @brief test suite for CuckooFilter based index selectivity estimator +/// +/// @file +/// +/// DISCLAIMER +/// +/// Copyright 2004-2012 triagens GmbH, Cologne, Germany +/// +/// Licensed under the Apache License, Version 2.0 (the "License"); +/// you may not use this file except in compliance with the License. +/// You may obtain a copy of the License at +/// +/// http://www.apache.org/licenses/LICENSE-2.0 +/// +/// Unless required by applicable law or agreed to in writing, software +/// distributed under the License is distributed on an "AS IS" BASIS, +/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +/// See the License for the specific language governing permissions and +/// limitations under the License. +/// +/// Copyright holder is triAGENS GmbH, Cologne, Germany +/// +/// @author Michael Hackstein +/// @author Copyright 2017, ArangoDB GmbH, Cologne, Germany +//////////////////////////////////////////////////////////////////////////////// + + +#include "Basics/Common.h" +#include "Basics/StringRef.h" +#include "catch.hpp" + +#include "RocksDBEngine/RocksDBCuckooIndexEstimator.h" + +using namespace arangodb; + +// ----------------------------------------------------------------------------- +// --SECTION-- test suite +// ----------------------------------------------------------------------------- + + +// @brief setup + +TEST_CASE("IndexEstimator", "[indexestimator]") { + +// @brief Test insert unique correctness + +SECTION("test_unique_values") { + std::vector toInsert(100); + uint64_t i = 0; + RocksDBCuckooIndexEstimator est(2048); + std::generate(toInsert.begin(), toInsert.end(), [&i] {return i++;}); + for (auto it : toInsert) { + est.insert(it); + } + CHECK(est.nrUsed() == 100); + CHECK(est.computeEstimate() == 1); + + for (size_t k = 0; k < 10; ++k) { + est.remove(toInsert[k]); + } + CHECK(est.nrUsed() == 90); + CHECK(est.computeEstimate() == 1); +} + +SECTION("test_multiple_values") { + std::vector toInsert(100); + uint64_t i = 0; + RocksDBCuckooIndexEstimator est(2048); + std::generate(toInsert.begin(), toInsert.end(), [&i] {return (i++) % 10;}); + for (auto it : toInsert) { + est.insert(it); + } + CHECK(est.nrUsed() == 10); + CHECK(est.nrCuckood() == 0); + CHECK(est.computeEstimate() == (double) 10 / 100); + + for (size_t k = 0; k < 10; ++k) { + est.remove(toInsert[k]); + } + CHECK(est.nrCuckood() == 0); + CHECK(est.computeEstimate() == (double) 10 / 90); +} + +SECTION("test_serialize_deserialize") { + std::vector toInsert(10000); + uint64_t i = 0; + std::string serialization; + RocksDBCuckooIndexEstimator est(2048); + std::generate(toInsert.begin(), toInsert.end(), [&i] {return i++;}); + for (auto it : toInsert) { + est.insert(it); + } + + est.serialize(serialization); + + // Test that the serialization first reports the correct length + uint64_t length = serialization.size(); + + // We read starting from the second char. The first char is reserved for the type + uint64_t persLength = rocksutils::uint64FromPersistent(serialization.data() + 1); + CHECK(persLength == length); + + // We first have an uint64_t representing the length. + // This has to be extracted BEFORE initialisation. + StringRef ref(serialization.data(), persLength); + + RocksDBCuckooIndexEstimator copy(ref); + + // After serialisation => deserialisation + // both estimates have to be identical + CHECK(est.nrUsed() == copy.nrUsed()); + CHECK(est.nrCuckood() == copy.nrCuckood()); + CHECK(est.computeEstimate() == copy.computeEstimate()); + + // Now let us remove the same elements in both + bool coin = false; + for (auto it : toInsert) { + if (coin) { + est.remove(it); + copy.remove(it); + } + coin = !coin; + } + + // We cannot relibly check inserts because the cuckoo has a random factor + // Still all values have to be identical + + CHECK(est.nrUsed() == copy.nrUsed()); + CHECK(est.nrCuckood() == copy.nrCuckood()); + CHECK(est.computeEstimate() == copy.computeEstimate()); +} + +// @brief generate tests +}