1
0
Fork 0

Implemented index estimates for RocksDB. Now the following indexes have

a proper estimate:
* HashIndex
* SkiplistIndex / Persistent
* EdgeIndex
This commit is contained in:
Michael Hackstein 2017-05-15 15:37:26 +02:00
parent a829fed1bc
commit d7d650e27d
24 changed files with 1492 additions and 126 deletions

View File

@ -205,8 +205,8 @@ void RocksDBCollection::open(bool ignoreErrors) {
RocksDBEngine* engine =
static_cast<RocksDBEngine*>(EngineSelectorFeature::ENGINE);
auto counterValue = engine->counterManager()->loadCounter(this->objectId());
LOG_TOPIC(ERR, Logger::DEVEL)
<< " number of documents: " << counterValue.added();
LOG_TOPIC(ERR, Logger::DEVEL) << " number of documents: "
<< counterValue.added();
_numberDocuments = counterValue.added() - counterValue.removed();
_revisionId = counterValue.revisionId();
@ -437,10 +437,9 @@ std::shared_ptr<Index> RocksDBCollection::createIndex(
idx->toVelocyPack(indexInfo, false, true);
int res = static_cast<RocksDBEngine*>(engine)->writeCreateCollectionMarker(
_logicalCollection->vocbase()->id(), _logicalCollection->cid(),
builder.slice(),
RocksDBLogValue::IndexCreate(_logicalCollection->vocbase()->id(),
_logicalCollection->cid(),
indexInfo.slice()));
builder.slice(), RocksDBLogValue::IndexCreate(
_logicalCollection->vocbase()->id(),
_logicalCollection->cid(), indexInfo.slice()));
if (res != TRI_ERROR_NO_ERROR) {
// We could not persist the index creation. Better abort
// Remove the Index in the local list again.
@ -520,10 +519,9 @@ int RocksDBCollection::restoreIndex(transaction::Methods* trx,
TRI_ASSERT(engine != nullptr);
int res = engine->writeCreateCollectionMarker(
_logicalCollection->vocbase()->id(), _logicalCollection->cid(),
builder.slice(),
RocksDBLogValue::IndexCreate(_logicalCollection->vocbase()->id(),
_logicalCollection->cid(),
indexInfo.slice()));
builder.slice(), RocksDBLogValue::IndexCreate(
_logicalCollection->vocbase()->id(),
_logicalCollection->cid(), indexInfo.slice()));
if (res != TRI_ERROR_NO_ERROR) {
// We could not persist the index creation. Better abort
// Remove the Index in the local list again.
@ -557,6 +555,7 @@ bool RocksDBCollection::dropIndex(TRI_idx_iid_t iid) {
WRITE_LOCKER(guard, _indexesLock);
for (auto index : _indexes) {
RocksDBIndex* cindex = static_cast<RocksDBIndex*>(index.get());
TRI_ASSERT(cindex != nullptr);
if (iid == cindex->id()) {
int rv = cindex->drop();
@ -568,7 +567,7 @@ bool RocksDBCollection::dropIndex(TRI_idx_iid_t iid) {
_indexes.erase(_indexes.begin() + i);
events::DropIndex("", std::to_string(iid), TRI_ERROR_NO_ERROR);
// toVelocyPackIgnore will take a read lock and we don't need the
// lock anymore, we will always return
// lock anymore, we will always return
guard.unlock();
VPackBuilder builder = _logicalCollection->toVelocyPackIgnore(
@ -674,6 +673,7 @@ void RocksDBCollection::truncate(transaction::Methods* trx,
RocksDBIndex* rindex = static_cast<RocksDBIndex*>(index.get());
rindex->truncate(trx);
}
_needToPersistIndexEstimates = true;
}
/*
@ -1213,7 +1213,7 @@ void RocksDBCollection::figuresSpecific(
/// @brief creates the initial indexes for the collection
void RocksDBCollection::createInitialIndexes() {
{ // addIndex holds an internal write lock
{ // addIndex holds an internal write lock
READ_LOCKER(guard, _indexesLock);
if (!_indexes.empty()) {
return;
@ -1256,7 +1256,7 @@ void RocksDBCollection::addIndex(std::shared_ptr<arangodb::Index> idx) {
void RocksDBCollection::addIndexCoordinator(
std::shared_ptr<arangodb::Index> idx) {
WRITE_LOCKER(guard, _indexesLock);
auto const id = idx->id();
for (auto const& it : _indexes) {
if (it->id() == id) {
@ -1344,8 +1344,8 @@ arangodb::Result RocksDBCollection::fillIndexes(
// occured, this needs to happen since we are non transactional
if (!r.ok()) {
iter->reset();
rocksdb::WriteBatchWithIndex removeBatch(db->DefaultColumnFamily()->GetComparator(),
32 * 1024 * 1024);
rocksdb::WriteBatchWithIndex removeBatch(
db->DefaultColumnFamily()->GetComparator(), 32 * 1024 * 1024);
res = TRI_ERROR_NO_ERROR;
auto removeCb = [&](DocumentIdentifierToken token) {
@ -1368,6 +1368,9 @@ arangodb::Result RocksDBCollection::fillIndexes(
// Simon: Don't think so
db->Write(writeOpts, removeBatch.GetWriteBatch());
}
if (numDocsWritten > 0) {
_needToPersistIndexEstimates = true;
}
return r;
}
@ -1428,12 +1431,12 @@ RocksDBOperationResult RocksDBCollection::insertDocument(
READ_LOCKER(guard, _indexesLock);
for (std::shared_ptr<Index> const& idx : _indexes) {
innerRes.reset(idx->insert(trx, revisionId, doc, false));
// in case of no-memory, return immediately
if (innerRes.is(TRI_ERROR_OUT_OF_MEMORY)) {
return innerRes;
}
if (innerRes.fail()) {
// "prefer" unique constraint violated over other errors
if (innerRes.is(TRI_ERROR_ARANGO_UNIQUE_CONSTRAINT_VIOLATED) ||
@ -1454,6 +1457,7 @@ RocksDBOperationResult RocksDBCollection::insertDocument(
if (waitForSync) {
trx->state()->waitForSync(true);
}
_needToPersistIndexEstimates = true;
}
return res;
@ -1510,6 +1514,7 @@ RocksDBOperationResult RocksDBCollection::removeDocument(
if (waitForSync) {
trx->state()->waitForSync(true);
}
_needToPersistIndexEstimates = true;
}
return res;
@ -1754,6 +1759,7 @@ uint64_t RocksDBCollection::recalculateCounts() {
// need correction. The value is not changed and does not need to be synced
globalRocksEngine()->counterManager()->sync(true);
}
trx.commit();
return _numberDocuments;
}
@ -1798,6 +1804,73 @@ void RocksDBCollection::estimateSize(velocypack::Builder& builder) {
builder.close();
}
arangodb::Result RocksDBCollection::serializeIndexEstimates(
rocksdb::Transaction* rtrx) const {
if (!_needToPersistIndexEstimates) {
return {TRI_ERROR_NO_ERROR};
}
_needToPersistIndexEstimates = false;
std::string output;
rocksdb::TransactionDB* tdb = rocksutils::globalRocksDB();
for (auto index : getIndexes()) {
output.clear();
RocksDBIndex* cindex = static_cast<RocksDBIndex*>(index.get());
TRI_ASSERT(cindex != nullptr);
rocksutils::uint64ToPersistent(output, static_cast<uint64_t>(tdb->GetLatestSequenceNumber()));
cindex->serializeEstimate(output);
if (output.size() > sizeof(uint64_t)) {
RocksDBKey key =
RocksDBKey::IndexEstimateValue(cindex->objectId());
rocksdb::Slice value(output);
rocksdb::Status s = rtrx->Put(key.string(), value);
if (!s.ok()) {
LOG_TOPIC(WARN, Logger::ENGINES) << "writing index estimates failed";
rtrx->Rollback();
return rocksutils::convertStatus(s);
}
}
}
return {TRI_ERROR_NO_ERROR};
}
void RocksDBCollection::deserializeIndexEstimates(RocksDBCounterManager* mgr) {
std::vector<std::shared_ptr<Index>> toRecalculate;
for (auto const& it : getIndexes()) {
auto idx = static_cast<RocksDBIndex*>(it.get());
if (!idx->deserializeEstimate(mgr)) {
toRecalculate.push_back(it);
}
}
if (!toRecalculate.empty()) {
recalculateIndexEstimates(toRecalculate);
}
}
void RocksDBCollection::recalculateIndexEstimates() {
auto idxs = getIndexes();
recalculateIndexEstimates(idxs);
}
void RocksDBCollection::recalculateIndexEstimates(std::vector<std::shared_ptr<Index>>& indexes) {
// start transaction to get a collection lock
arangodb::SingleCollectionTransaction trx(
arangodb::transaction::StandaloneContext::Create(
_logicalCollection->vocbase()),
_logicalCollection->cid(), AccessMode::Type::EXCLUSIVE);
auto res = trx.begin();
if (res.fail()) {
THROW_ARANGO_EXCEPTION(res);
}
for (auto const& it : indexes) {
auto idx = static_cast<RocksDBIndex*>(it.get());
idx->recalculateEstimates();
}
_needToPersistIndexEstimates = true;
trx.commit();
}
void RocksDBCollection::createCache() const {
if (!_useCache || _cachePresent) {
// we leave this if we do not need the cache

View File

@ -32,6 +32,10 @@
#include "VocBase/KeyGenerator.h"
#include "VocBase/ManagedDocumentResult.h"
namespace rocksdb {
class Transaction;
}
namespace arangodb {
namespace cache {
class Cache;
@ -184,6 +188,11 @@ class RocksDBCollection final : public PhysicalCollection {
bool hasGeoIndex() { return _hasGeoIndex; }
Result serializeIndexEstimates(rocksdb::Transaction*) const;
void deserializeIndexEstimates(arangodb::RocksDBCounterManager* mgr);
void recalculateIndexEstimates();
private:
/// @brief return engine-specific figures
void figuresSpecific(
@ -220,15 +229,21 @@ class RocksDBCollection final : public PhysicalCollection {
arangodb::Result lookupRevisionVPack(TRI_voc_rid_t, transaction::Methods*,
arangodb::ManagedDocumentResult&) const;
void recalculateIndexEstimates(std::vector<std::shared_ptr<Index>>& indexes);
void createCache() const;
void disableCache() const;
inline bool useCache() const { return (_useCache && _cachePresent); }
void blackListKey(char const* data, std::size_t len) const;
private:
uint64_t const _objectId; // rocksdb-specific object id for collection
std::atomic<uint64_t> _numberDocuments;
std::atomic<TRI_voc_rid_t> _revisionId;
mutable std::atomic<bool> _needToPersistIndexEstimates;
/// upgrade write locks to exclusive locks if this flag is set
bool _hasGeoIndex;

View File

@ -132,6 +132,34 @@ void uint64ToPersistent(std::string& p, uint64_t value) {
} while (++len < sizeof(uint64_t));
}
uint16_t uint16FromPersistent(char const* p) {
uint16_t value = 0;
uint16_t x = 0;
uint8_t const* ptr = reinterpret_cast<uint8_t const*>(p);
uint8_t const* end = ptr + sizeof(uint16_t);
do {
value += static_cast<uint16_t>(*ptr++) << x;
x += 8;
} while (ptr < end);
return value;
}
void uint16ToPersistent(char* p, uint16_t value) {
char* end = p + sizeof(uint16_t);
do {
*p++ = static_cast<uint8_t>(value & 0xffU);
value >>= 8;
} while (p < end);
}
void uint16ToPersistent(std::string& p, uint16_t value) {
size_t len = 0;
do {
p.push_back(static_cast<char>(value & 0xffU));
value >>= 8;
} while (++len < sizeof(uint16_t));
}
bool hasObjectIds(VPackSlice const& inputSlice) {
bool rv = false;
if (inputSlice.isObject()) {

View File

@ -89,6 +89,10 @@ uint64_t uint64FromPersistent(char const* p);
void uint64ToPersistent(char* p, uint64_t value);
void uint64ToPersistent(std::string& out, uint64_t value);
uint16_t uint16FromPersistent(char const* p);
void uint16ToPersistent(char* p, uint16_t value);
void uint16ToPersistent(std::string& out, uint16_t value);
std::pair<VPackSlice, std::unique_ptr<VPackBuffer<uint8_t>>> stripObjectIds(
VPackSlice const& inputSlice, bool checkBeforeCopy = true);

View File

@ -23,15 +23,22 @@
#include "RocksDBCounterManager.h"
#include "ApplicationFeatures/ApplicationServer.h"
#include "Basics/ReadLocker.h"
#include "Basics/StringUtils.h"
#include "Basics/VelocyPackHelper.h"
#include "Basics/WriteLocker.h"
#include "Logger/Logger.h"
#include "RestServer/DatabaseFeature.h"
#include "RocksDBEngine/RocksDBCollection.h"
#include "RocksDBEngine/RocksDBCommon.h"
#include "RocksDBEngine/RocksDBCuckooIndexEstimator.h"
#include "RocksDBEngine/RocksDBEdgeIndex.h"
#include "RocksDBEngine/RocksDBKey.h"
#include "RocksDBEngine/RocksDBKeyBounds.h"
#include "RocksDBEngine/RocksDBVPackIndex.h"
#include "RocksDBEngine/RocksDBValue.h"
#include "StorageEngine/EngineSelectorFeature.h"
#include "VocBase/ticks.h"
#include <rocksdb/utilities/transaction_db.h>
@ -44,6 +51,7 @@
#include <velocypack/velocypack-aliases.h>
using namespace arangodb;
using namespace arangodb::application_features;
RocksDBCounterManager::CMValue::CMValue(VPackSlice const& slice)
: _sequenceNum(0), _count(0), _revisionId(0) {
@ -74,6 +82,7 @@ void RocksDBCounterManager::CMValue::serialize(VPackBuilder& b) const {
RocksDBCounterManager::RocksDBCounterManager(rocksdb::DB* db)
: _syncing(false), _db(db) {
readSettings();
readIndexEstimates();
readCounterValues();
if (!_counters.empty()) {
@ -135,7 +144,8 @@ void RocksDBCounterManager::updateCounter(uint64_t objectId,
}
}
arangodb::Result RocksDBCounterManager::setAbsoluteCounter(uint64_t objectId, uint64_t value) {
arangodb::Result RocksDBCounterManager::setAbsoluteCounter(uint64_t objectId,
uint64_t value) {
arangodb::Result res;
WRITE_LOCKER(guard, _rwLock);
auto it = _counters.find(objectId);
@ -219,7 +229,7 @@ Result RocksDBCounterManager::sync(bool force) {
return rocksutils::convertStatus(s);
}
}
// now write global settings
b.clear();
b.openObject();
@ -241,6 +251,46 @@ Result RocksDBCounterManager::sync(bool force) {
return rocksutils::convertStatus(s);
}
// Now persist the index estimates:
{
for (auto const& pair : copy) {
auto dbColPair = rocksutils::mapObjectToCollection(pair.first);
if (dbColPair.second == 0 && dbColPair.first == 0) {
// collection with this objectID not known.Skip.
continue;
}
auto dbfeature =
ApplicationServer::getFeature<DatabaseFeature>("Database");
TRI_ASSERT(dbfeature != nullptr);
auto vocbase = dbfeature->useDatabase(dbColPair.first);
if (vocbase == nullptr) {
// Bad state, we have references to a database that is not known
// anymore.
// However let's just skip in production. Not allowed to crash.
// If we cannot find this infos during recovery we can either recompute
// or start fresh.
continue;
}
auto collection = vocbase->lookupCollection(dbColPair.second);
if (collection == nullptr) {
// Bad state, we have references to a collection that is not known
// anymore.
// However let's just skip in production. Not allowed to crash.
// If we cannot find this infos during recovery we can either recompute
// or start fresh.
continue;
}
std::string estimateSerialisation;
auto rocksCollection =
static_cast<RocksDBCollection*>(collection->getPhysical());
TRI_ASSERT(rocksCollection != nullptr);
Result res = rocksCollection->serializeIndexEstimates(rtrx.get());
if (!res.ok()) {
return res;
}
}
}
// we have to commit all counters in one batch
s = rtrx->Commit();
if (s.ok()) {
@ -271,7 +321,7 @@ void RocksDBCounterManager::readSettings() {
basics::VelocyPackHelper::stringUInt64(slice.get("tick"));
LOG_TOPIC(TRACE, Logger::ENGINES) << "using last tick: " << lastTick;
TRI_UpdateTickServer(lastTick);
if (slice.hasKey("hlc")) {
uint64_t lastHlc =
basics::VelocyPackHelper::stringUInt64(slice.get("hlc"));
@ -286,6 +336,55 @@ void RocksDBCounterManager::readSettings() {
}
}
void RocksDBCounterManager::readIndexEstimates() {
WRITE_LOCKER(guard, _rwLock);
RocksDBKeyBounds bounds = RocksDBKeyBounds::IndexEstimateValues();
rocksdb::Comparator const* cmp = _db->GetOptions().comparator;
rocksdb::ReadOptions readOptions;
std::unique_ptr<rocksdb::Iterator> iter(_db->NewIterator(readOptions));
iter->Seek(bounds.start());
for (; iter->Valid() && cmp->Compare(iter->key(), bounds.end()) < 0;
iter->Next()) {
uint64_t objectId = RocksDBKey::counterObjectId(iter->key());
uint64_t lastSeqNumber =
rocksutils::uint64FromPersistent(iter->value().data());
StringRef estimateSerialisation(iter->value().data() + sizeof(uint64_t),
iter->value().size() - sizeof(uint64_t));
// If this hits we have two estimates for the same index
TRI_ASSERT(_estimators.find(objectId) == _estimators.end());
_estimators.emplace(
objectId,
std::make_pair(lastSeqNumber,
std::make_unique<RocksDBCuckooIndexEstimator<uint64_t>>(
estimateSerialisation)));
}
}
std::unique_ptr<RocksDBCuckooIndexEstimator<uint64_t>>
RocksDBCounterManager::stealIndexEstimator(uint64_t objectId) {
std::unique_ptr<RocksDBCuckooIndexEstimator<uint64_t>> res(nullptr);
auto it = _estimators.find(objectId);
if (it != _estimators.end()) {
// We swap out the stored estimate in order to move it to the caller
res.swap(it->second.second);
// Drop the now empty estimator
_estimators.erase(objectId);
}
return res;
}
void RocksDBCounterManager::clearIndexEstimators() {
// We call this to remove all index estimators that have been stored but are
// no longer read
// by recovery.
// TODO REMOVE RocksDB Keys of all not stolen values?
_estimators.clear();
}
/// Parse counter values from rocksdb
void RocksDBCounterManager::readCounterValues() {
WRITE_LOCKER(guard, _rwLock);
@ -312,14 +411,26 @@ struct WBReader final : public rocksdb::WriteBatch::Handler {
// must be set by the counter manager
std::unordered_map<uint64_t, rocksdb::SequenceNumber> seqStart;
std::unordered_map<uint64_t, RocksDBCounterManager::CounterAdjustment> deltas;
std::unordered_map<
uint64_t,
std::pair<uint64_t,
std::unique_ptr<RocksDBCuckooIndexEstimator<uint64_t>>>>*
_estimators;
rocksdb::SequenceNumber currentSeqNum;
uint64_t _maxTick = 0;
uint64_t _maxHLC = 0;
WBReader() : currentSeqNum(0) {}
WBReader(std::unordered_map<
uint64_t,
std::pair<uint64_t,
std::unique_ptr<RocksDBCuckooIndexEstimator<uint64_t>>>>*
estimators)
: _estimators(estimators), currentSeqNum(0) {}
~WBReader() {
// update ticks after parsing wal
LOG_TOPIC(TRACE, Logger::ENGINES) << "max tick found in WAL: " << _maxTick << ", last HLC value: " << _maxHLC;
LOG_TOPIC(TRACE, Logger::ENGINES) << "max tick found in WAL: " << _maxTick
<< ", last HLC value: " << _maxHLC;
TRI_UpdateTickServer(_maxTick);
TRI_HybridLogicalClock(_maxHLC);
@ -367,15 +478,16 @@ struct WBReader final : public rocksdb::WriteBatch::Handler {
storeMaxHLC(RocksDBKey::revisionId(key));
} else if (type == RocksDBEntryType::PrimaryIndexValue) {
// document key
StringRef ref = RocksDBKey::primaryKey(key);
StringRef ref = RocksDBKey::primaryKey(key);
TRI_ASSERT(!ref.empty());
// check if the key is numeric
if (ref[0] >= '1' && ref[0] <= '9') {
if (ref[0] >= '1' && ref[0] <= '9') {
// numeric start byte. looks good
try {
// extract uint64_t value from key. this will throw if the key
// is non-numeric
uint64_t tick = basics::StringUtils::uint64_check(ref.data(), ref.size());
uint64_t tick =
basics::StringUtils::uint64_check(ref.data(), ref.size());
// if no previous _maxTick set or the numeric value found is
// "near" our previous _maxTick, then we update it
if (tick > _maxTick && (_maxTick == 0 || tick - _maxTick < 2048)) {
@ -415,6 +527,32 @@ struct WBReader final : public rocksdb::WriteBatch::Handler {
it->second._added++;
it->second._revisionId = revisionId;
}
} else {
// We have to adjust the estimate with an insert
switch (RocksDBKey::type(key)) {
case RocksDBEntryType::IndexValue: {
uint64_t objectId = RocksDBKey::counterObjectId(key);
auto it = _estimators->find(objectId);
if (it != _estimators->end() && it->second.first < currentSeqNum) {
// We track estimates for this index
uint64_t hash = RocksDBVPackIndex::HashForKey(key);
it->second.second->insert(hash);
}
break;
}
case RocksDBEntryType::EdgeIndexValue: {
uint64_t objectId = RocksDBKey::counterObjectId(key);
auto it = _estimators->find(objectId);
if (it != _estimators->end() && it->second.first < currentSeqNum) {
// We track estimates for this index
uint64_t hash = RocksDBEdgeIndex::HashForKey(key);
it->second.second->insert(hash);
}
break;
}
default:
break;
}
}
}
@ -429,6 +567,32 @@ struct WBReader final : public rocksdb::WriteBatch::Handler {
it->second._removed++;
it->second._revisionId = revisionId;
}
} else {
// We have to adjust the estimate with an remove
switch (RocksDBKey::type(key)) {
case RocksDBEntryType::IndexValue: {
uint64_t objectId = RocksDBKey::counterObjectId(key);
auto it = _estimators->find(objectId);
if (it != _estimators->end() && it->second.first < currentSeqNum) {
// We track estimates for this index
uint64_t hash = RocksDBVPackIndex::HashForKey(key);
it->second.second->remove(hash);
}
break;
}
case RocksDBEntryType::EdgeIndexValue: {
uint64_t objectId = RocksDBKey::counterObjectId(key);
auto it = _estimators->find(objectId);
if (it != _estimators->end() && it->second.first < currentSeqNum) {
// We track estimates for this index
uint64_t hash = RocksDBEdgeIndex::HashForKey(key);
it->second.second->remove(hash);
}
break;
}
default:
break;
}
}
}
@ -442,8 +606,8 @@ bool RocksDBCounterManager::parseRocksWAL() {
rocksdb::SequenceNumber start = UINT64_MAX;
// Tell the WriteBatch reader the transaction markers to look for
auto handler = std::make_unique<WBReader>();
auto handler = std::make_unique<WBReader>(&_estimators);
for (auto const& pair : _counters) {
handler->seqStart.emplace(pair.first, pair.second._sequenceNum);
start = std::min(start, pair.second._sequenceNum);

View File

@ -28,6 +28,7 @@
#include "Basics/Common.h"
#include "Basics/ReadWriteLock.h"
#include "Basics/Result.h"
#include "RocksDBEngine/RocksDBCuckooIndexEstimator.h"
#include "RocksDBEngine/RocksDBTypes.h"
#include "VocBase/voc-types.h"
@ -78,8 +79,9 @@ class RocksDBCounterManager {
/// the sequence number used
void updateCounter(uint64_t objectId, CounterAdjustment const&);
//does not modify seq or revisionid
arangodb::Result setAbsoluteCounter(uint64_t objectId, uint64_t absouluteCount);
// does not modify seq or revisionid
arangodb::Result setAbsoluteCounter(uint64_t objectId,
uint64_t absouluteCount);
/// Thread-Safe remove a counter
void removeCounter(uint64_t objectId);
@ -87,7 +89,22 @@ class RocksDBCounterManager {
/// Thread-Safe force sync
arangodb::Result sync(bool force);
void readSettings();
// Steal the index estimator that the recovery has built up to inject it into
// an index.
// NOTE: If this returns nullptr the recovery was not ably to find any
// estimator
// for this index.
std::unique_ptr<arangodb::RocksDBCuckooIndexEstimator<uint64_t>>
stealIndexEstimator(uint64_t indexObjectId);
// Free up all index estimators that were not read by any index.
// This is to save some memory.
// NOTE: After calling this the stored estimate of all not yet
// read index estimators will be dropped and no attempt
// to reread it will be done.
// So call it after ALL indexes for all databases
// have been created in memory.
void clearIndexEstimators();
protected:
struct CMValue {
@ -105,6 +122,9 @@ class RocksDBCounterManager {
};
void readCounterValues();
void readSettings();
void readIndexEstimates();
bool parseRocksWAL();
//////////////////////////////////////////////////////////////////////////////
@ -112,6 +132,17 @@ class RocksDBCounterManager {
//////////////////////////////////////////////////////////////////////////////
std::unordered_map<uint64_t, CMValue> _counters;
//////////////////////////////////////////////////////////////////////////////
/// @brief Index Estimator contianer.
/// Note the elements in this container will be moved into the
/// index classes and are only temporarily stored here during recovery.
//////////////////////////////////////////////////////////////////////////////
std::unordered_map<
uint64_t,
std::pair<uint64_t,
std::unique_ptr<RocksDBCuckooIndexEstimator<uint64_t>>>>
_estimators;
//////////////////////////////////////////////////////////////////////////////
/// @brief synced sequence numbers
//////////////////////////////////////////////////////////////////////////////

View File

@ -0,0 +1,617 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2017 ArangoDB GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Daniel Larkin
/// @author Michael Hackstein
////////////////////////////////////////////////////////////////////////////////
#ifndef ARANGOD_ROCKSDB_ROCKSDB_INDEX_ESTIMATOR_H
#define ARANGOD_ROCKSDB_ROCKSDB_INDEX_ESTIMATOR_H 1
#include "Basics/Common.h"
#include "Basics/ReadLocker.h"
#include "Basics/StringRef.h"
#include "Basics/WriteLocker.h"
#include "Basics/fasthash.h"
#include "Logger/Logger.h"
#include "RocksDBEngine/RocksDBCommon.h"
// In the following template:
// Key is the key type, it must be copyable and movable, furthermore, Key
// must be default constructible (without arguments) as empty and
// must have an empty() method to indicate that the instance is
// empty.
// If using fasthash64 on all bytes of the object is not
// a suitable hash function, one has to instanciate the template
// with two hash function types as 3rd and 4th argument. If
// std::equal_to<Key> is not implemented or does not behave correctly,
// one has to supply a comparison class as well.
// This class is not thread-safe!
namespace arangodb {
// C++ wrapper for the hash function:
template <class T, uint64_t Seed>
class HashWithSeed {
public:
uint64_t operator()(T const& t) const {
// Some implementation like Fnv or xxhash looking at bytes in type T,
// taking the seed into account.
auto p = reinterpret_cast<void const*>(&t);
return fasthash64(p, sizeof(T), Seed);
}
};
template <class Key, class HashKey = HashWithSeed<Key, 0xdeadbeefdeadbeefULL>,
class Fingerprint = HashWithSeed<Key, 0xabcdefabcdef1234ULL>,
class HashShort = HashWithSeed<uint16_t, 0xfedcbafedcba4321ULL>,
class CompKey = std::equal_to<Key>>
class RocksDBCuckooIndexEstimator {
// Note that the following has to be a power of two and at least 4!
static constexpr uint32_t SlotsPerBucket = 4;
private:
// Helper class to abstract away where which data is stored.
struct Slot {
private:
uint16_t* _data;
public:
Slot(uint16_t* data) : _data(data) {}
~Slot() {
// Not responsible for anything
}
bool operator==(const Slot& other) { return _data == other._data; }
uint16_t* fingerprint() { return _data; }
uint16_t* counter() { return _data + 1; }
void reset() {
*fingerprint() = 0;
*counter() = 0;
}
bool isEqual(uint16_t fp) { return ((*fingerprint()) == fp); }
bool isEmpty() { return (*fingerprint()) == 0; }
};
enum SerializeFormat : char {
// To describe this format we use | as a seperator for readability, but it
// is NOT a printed character in the serialized string
// NOCOMPRESSION: type|length|size|nrUsed|nrCuckood|nrTotal|niceSize|logSize|base
NOCOMPRESSION = '0'
};
public:
RocksDBCuckooIndexEstimator(uint64_t size)
: _randState(0x2636283625154737ULL),
_slotSize(2 * sizeof(uint16_t)), // Sort out offsets and alignments
_nrUsed(0),
_nrCuckood(0),
_nrTotal(0),
_maxRounds(16) {
// Inflate size so that we have some padding to avoid failure
size *= 2.0;
size = (size >= 1024) ? size : 1024; // want 256 buckets minimum
// First find the smallest power of two that is not smaller than size:
size /= SlotsPerBucket;
_size = size;
initializeDefault();
}
RocksDBCuckooIndexEstimator(arangodb::StringRef const serialized)
: _randState(0x2636283625154737ULL),
_slotSize(2 * sizeof(uint16_t)), // Sort out offsets and alignments
_nrUsed(0),
_nrCuckood(0),
_nrTotal(0),
_maxRounds(16) {
switch (serialized.front()) {
case SerializeFormat::NOCOMPRESSION: {
deserializeUncompressed(serialized);
break;
}
default: {
LOG_TOPIC(ERR, arangodb::Logger::ENGINES) << "Unable to restore the "
"index estimates. Invalid "
"format persisted.";
initializeDefault();
}
}
}
~RocksDBCuckooIndexEstimator() { delete[] _allocBase; }
RocksDBCuckooIndexEstimator(RocksDBCuckooIndexEstimator const&) = delete;
RocksDBCuckooIndexEstimator(RocksDBCuckooIndexEstimator&&) = delete;
RocksDBCuckooIndexEstimator& operator=(RocksDBCuckooIndexEstimator const&) =
delete;
RocksDBCuckooIndexEstimator& operator=(RocksDBCuckooIndexEstimator&&) =
delete;
void serialize(std::string& serialized) const {
// This format is always hard coded and the serialisation has to support
// older formats
// for backwards compatibility
// We always have to start with the type and then the length
serialized += SerializeFormat::NOCOMPRESSION;
uint64_t serialLength =
(sizeof(SerializeFormat) + sizeof(uint64_t) + sizeof(_size) + sizeof(_nrUsed) +
sizeof(_nrCuckood) + sizeof(_nrTotal) + sizeof(_niceSize) +
sizeof(_logSize) + (_size * _slotSize * SlotsPerBucket));
serialized.reserve(sizeof(uint64_t) + serialLength);
// We always prepend the length, so parsing is easier
rocksutils::uint64ToPersistent(serialized, serialLength);
{
// Sorry we need a consistent state, so we have to read-lock from here on...
READ_LOCKER(locker, _bucketLock);
// Add all member variables
rocksutils::uint64ToPersistent(serialized, _size);
rocksutils::uint64ToPersistent(serialized, _nrUsed);
rocksutils::uint64ToPersistent(serialized, _nrCuckood);
rocksutils::uint64ToPersistent(serialized, _nrTotal);
rocksutils::uint64ToPersistent(serialized, _niceSize);
rocksutils::uint64ToPersistent(serialized, _logSize);
// Add the data blob
// Size is as follows: nrOfBuckets * SlotsPerBucket * SlotSize
TRI_ASSERT((_size * _slotSize * SlotsPerBucket) <= _allocSize);
for (uint64_t i = 0; i < (_size * _slotSize * SlotsPerBucket) / sizeof(uint16_t);
++i) {
rocksutils::uint16ToPersistent(serialized, *(reinterpret_cast<uint16_t*>(_base + i * 2)));
}
}
}
void clear() {
WRITE_LOCKER(locker, _bucketLock);
// Reset Stats
_nrTotal = 0;
_nrCuckood = 0;
_nrUsed = 0;
// Reset filter content
// Now initialize all slots in all buckets with zero data:
for (uint32_t b = 0; b < _size; ++b) {
for (size_t i = 0; i < SlotsPerBucket; ++i) {
Slot f = findSlot(b, i);
f.reset();
}
}
}
double computeEstimate() {
READ_LOCKER(locker, _bucketLock);
if (_nrTotal == 0) {
// If we do not have any documents we have a rather constant estimate.
return 1;
}
// _nrUsed; These are known to be distinct values
// _nrCuckood; These are eventually distinct documents with unknown state
return (double)(_nrUsed + ((double)_nrCuckood * 3 * _nrUsed / _nrTotal)) /
_nrTotal;
}
bool lookup(Key const& k) const {
// look up a key, return either false if no pair with key k is
// found or true.
uint64_t hash1 = _hasherKey(k);
uint64_t pos1 = hashToPos(hash1);
uint16_t fingerprint = keyToFingerprint(k);
// We compute the second hash already here to allow the result to
// survive a mispredicted branch in the first loop. Is this sensible?
uint64_t hash2 = _hasherPosFingerprint(pos1, fingerprint);
uint64_t pos2 = hashToPos(hash2);
bool found = false;
{
READ_LOCKER(guard, _bucketLock);
findSlotNoCuckoo(pos1, pos2, fingerprint, found);
}
return found;
}
bool insert(Key& k) {
// insert the key k
//
// The inserted key will have its fingerprint input entered in the table. If
// there is a collision and a fingerprint needs to be cuckooed, a certain
// number of attempts will be made. After that, a given fingerprint may
// simply be expunged. If something is expunged, the function will return
// false, otherwise true.
uint64_t hash1 = _hasherKey(k);
uint64_t pos1 = hashToPos(hash1);
uint16_t fingerprint = keyToFingerprint(k);
// We compute the second hash already here to let it survive a
// mispredicted
// branch in the first loop:
uint64_t hash2 = _hasherPosFingerprint(pos1, fingerprint);
uint64_t pos2 = hashToPos(hash2);
{
WRITE_LOCKER(guard, _bucketLock);
Slot slot = findSlotCuckoo(pos1, pos2, fingerprint);
if (slot.isEmpty()) {
// Free slot insert ourself.
*slot.fingerprint() = fingerprint;
*slot.counter() = 1; // We are the first element
_nrUsed++;
} else {
TRI_ASSERT(slot.isEqual(fingerprint));
// TODO replace with constant uint16_t max
if (*slot.counter() < 65536) {
// just to avoid overflow...
(*slot.counter())++;
}
}
}
_nrTotal++;
return true;
}
bool remove(Key const& k) {
// remove one element with key k, if one is in the table. Return true if
// a key was removed and false otherwise.
// look up a key, return either false if no pair with key k is
// found or true.
uint64_t hash1 = _hasherKey(k);
uint64_t pos1 = hashToPos(hash1);
uint16_t fingerprint = keyToFingerprint(k);
// We compute the second hash already here to allow the result to
// survive a mispredicted branch in the first loop. Is this sensible?
uint64_t hash2 = _hasherPosFingerprint(pos1, fingerprint);
uint64_t pos2 = hashToPos(hash2);
bool found = false;
_nrTotal--;
{
WRITE_LOCKER(guard, _bucketLock);
Slot slot = findSlotNoCuckoo(pos1, pos2, fingerprint, found);
if (found) {
if (*slot.counter() <= 1) {
// We remove the last one of those, free slot
slot.reset();
_nrUsed--;
} else {
// Just decrease the counter
(*slot.counter())--;
}
return true;
}
}
// If we get here we assume that the element was once inserted, but removed
// by cuckoo
// Reduce nrCuckood;
if (_nrCuckood > 0) {
--_nrCuckood;
}
return false;
}
uint64_t capacity() const { return _size * SlotsPerBucket; }
uint64_t nrUsed() const { return _nrUsed; }
uint64_t nrCuckood() const { return _nrCuckood; }
uint64_t memoryUsage() const {
return sizeof(RocksDBCuckooIndexEstimator) + _allocSize;
}
private: // methods
Slot findSlotNoCuckoo(uint64_t pos1, uint64_t pos2, uint16_t fp,
bool& found) const {
found = false;
Slot s = findSlotNoCuckoo(pos1, fp, found);
if (found) {
return s;
}
// Not found by first hash. use second hash.
return findSlotNoCuckoo(pos2, fp, found);
}
// Find a slot for this fingerprint
// This function guarantees the following:
// If this fingerprint is stored already, the slot will be
// pointing to this fingerprint.
// If this fingerprint is NOT storead already the returned slot
// will be empty and can be filled with the fingerprint
// In order to create an empty slot this function tries to
// cuckoo neighboring elements, if that does not succeed
// it deletes a random element occupying a position.
Slot findSlotCuckoo(uint64_t pos1, uint64_t pos2, uint16_t fp) {
Slot firstEmpty(nullptr);
bool foundEmpty = false;
for (uint64_t i = 0; i < SlotsPerBucket; ++i) {
Slot slot = findSlot(pos1, i);
if (slot.isEqual(fp)) {
// Found we are done, short-circuit.
return slot;
}
if (!foundEmpty && slot.isEmpty()) {
foundEmpty = true;
firstEmpty = slot;
}
}
for (uint64_t i = 0; i < SlotsPerBucket; ++i) {
Slot slot = findSlot(pos2, i);
if (slot.isEqual(fp)) {
// Found we are done, short-circuit.
return slot;
}
if (!foundEmpty && slot.isEmpty()) {
foundEmpty = true;
firstEmpty = slot;
}
}
// Value not yet inserted.
if (foundEmpty) {
// But we found an empty slot
return firstEmpty;
}
// We also did not find an empty slot, now the cuckoo goes...
uint16_t counter =
0; // We initially write a 0 in here, because the caller will
// Increase the counter by one
uint8_t r = pseudoRandomChoice();
if ((r & 1) != 0) {
std::swap(pos1, pos2);
}
// Now expunge a random element from any of these slots:
// and place our own into it.
// We have to keep the reference to the cuckood slot here.
r = pseudoRandomChoice();
uint64_t i = r & (SlotsPerBucket - 1);
firstEmpty = findSlot(pos1, i);
uint16_t fDummy = *firstEmpty.fingerprint();
uint16_t cDummy = *firstEmpty.counter();
*firstEmpty.fingerprint() = fp;
*firstEmpty.counter() = counter;
fp = fDummy;
counter = cDummy;
uint64_t hash2 = _hasherPosFingerprint(pos1, fp);
pos2 = hashToPos(hash2);
// Now let the cuckoo fly and find a place for the poor one we just took
// out.
for (uint64_t i = 0; i < SlotsPerBucket; ++i) {
Slot slot = findSlot(pos2, i);
if (slot.isEmpty()) {
// Yeah we found an empty place already
*slot.fingerprint() = fp;
*slot.counter() = counter;
++_nrUsed;
return firstEmpty;
}
}
// Bad luck, let us try to move to a different slot.
for (unsigned attempt = 1; attempt < _maxRounds; attempt++) {
std::swap(pos1, pos2);
// Now expunge a random element from any of these slots:
r = pseudoRandomChoice();
uint64_t i = r & (SlotsPerBucket - 1);
// We expunge the element at position pos1 and slot i:
Slot slot = findSlot(pos1, i);
if (slot == firstEmpty) {
// We have to keep this one in place.
// Take a different one
i = (i + 1) % SlotsPerBucket;
slot = findSlot(pos1, i);
}
fDummy = *slot.fingerprint();
cDummy = *slot.counter();
*slot.fingerprint() = fp;
*slot.counter() = counter;
fp = fDummy;
counter = cDummy;
hash2 = _hasherPosFingerprint(pos1, fp);
pos2 = hashToPos(hash2);
for (uint64_t i = 0; i < SlotsPerBucket; ++i) {
Slot slot = findSlot(pos2, i);
if (slot.isEmpty()) {
// Finally an empty place
*slot.fingerprint() = fp;
*slot.counter() = counter;
++_nrUsed;
return firstEmpty;
}
}
}
// If we get here we had to remove one of the elements.
// Let's increas the cuckoo counter
_nrCuckood++;
return firstEmpty;
}
// Do not use the output if found == false
Slot findSlotNoCuckoo(uint64_t pos, uint16_t fp, bool& found) const {
found = false;
for (uint64_t i = 0; i < SlotsPerBucket; ++i) {
Slot slot = findSlot(pos, i);
if (fp == *slot.fingerprint()) {
found = true;
return slot;
}
}
return Slot{nullptr};
}
Slot findSlot(uint64_t pos, uint64_t slot) const {
char* address = _base + _slotSize * (pos * SlotsPerBucket + slot);
auto ret = reinterpret_cast<uint16_t*>(address);
return Slot(ret);
}
uint64_t hashToPos(uint64_t hash) const {
uint64_t relevantBits = (hash >> _sizeShift) & _sizeMask;
return ((relevantBits < _size) ? relevantBits : (relevantBits - _size));
}
uint16_t keyToFingerprint(Key const& k) const {
uint64_t hash = _fingerprint(k);
uint16_t fingerprint = (uint16_t)(
(hash ^ (hash >> 16) ^ (hash >> 32) ^ (hash >> 48)) & 0xFFFF);
return (fingerprint ? fingerprint : 1);
}
uint64_t _hasherPosFingerprint(uint64_t pos, uint16_t fingerprint) const {
return ((pos << _sizeShift) ^ _hasherShort(fingerprint));
}
uint8_t pseudoRandomChoice() {
_randState = _randState * 997 + 17; // ignore overflows
return static_cast<uint8_t>((_randState >> 37) & 0xff);
}
void deserializeUncompressed(arangodb::StringRef const& serialized) {
// Assert that we have at least the member variables
TRI_ASSERT(serialized.size() >= (sizeof(SerializeFormat) + sizeof(uint64_t) + sizeof(_size) + sizeof(_nrUsed) +
sizeof(_nrCuckood) + sizeof(_nrTotal) + sizeof(_niceSize) +
sizeof(_logSize) ));
char const* current = serialized.data();
TRI_ASSERT(*current == SerializeFormat::NOCOMPRESSION);
current++; // Skip format char
uint64_t length = rocksutils::uint64FromPersistent(current);
current += sizeof(uint64_t);
// Validate that the serialized format is exactly as long as we expect it to be
TRI_ASSERT(serialized.size() == length);
_size = rocksutils::uint64FromPersistent(current);
current += sizeof(_size);
_nrUsed = rocksutils::uint64FromPersistent(current);
current += sizeof(_nrUsed);
_nrCuckood = rocksutils::uint64FromPersistent(current);
current += sizeof(_nrCuckood);
_nrTotal = rocksutils::uint64FromPersistent(current);
current += sizeof(_nrTotal);
_niceSize = rocksutils::uint64FromPersistent(current);
current += sizeof(_niceSize);
_logSize = rocksutils::uint64FromPersistent(current);
current += sizeof(_logSize);
deriveSizesAndAlloc();
// Validate that we have enough data in the serialized format.
TRI_ASSERT(serialized.size() ==
(sizeof(SerializeFormat) + sizeof( uint64_t) + sizeof(_size) + sizeof(_nrUsed) +
sizeof(_nrCuckood) + sizeof(_nrTotal) + sizeof(_niceSize) +
sizeof(_logSize) + (_size * _slotSize * SlotsPerBucket)));
// Insert the raw data
// Size is as follows: nrOfBuckets * SlotsPerBucket * SlotSize
TRI_ASSERT((_size * _slotSize * SlotsPerBucket) <= _allocSize);
for (uint64_t i = 0; i < (_size * _slotSize * SlotsPerBucket) / sizeof(uint16_t);
++i) {
*(reinterpret_cast<uint16_t*>(_base + i * 2)) = rocksutils::uint16FromPersistent(current + (i * sizeof(uint16_t)));
}
}
void initializeDefault() {
_niceSize = 256;
_logSize = 8;
while (_niceSize < _size) {
_niceSize <<= 1;
_logSize += 1;
}
deriveSizesAndAlloc();
// Now initialize all slots in all buckets with zero data:
for (uint32_t b = 0; b < _size; ++b) {
for (size_t i = 0; i < SlotsPerBucket; ++i) {
Slot f = findSlot(b, i);
f.reset();
}
}
}
void deriveSizesAndAlloc() {
_sizeMask = _niceSize - 1;
_sizeShift = (64 - _logSize) / 2;
_allocSize = _size * _slotSize * SlotsPerBucket +
64; // give 64 bytes padding to enable 64-byte alignment
_allocBase = new char[_allocSize];
_base = reinterpret_cast<char*>(
(reinterpret_cast<uintptr_t>(_allocBase) + 63) &
~((uintptr_t)0x3fu)); // to actually implement the 64-byte alignment,
// shift base pointer within allocated space to
// 64-byte boundary
}
private: // member variables
uint64_t _randState; // pseudo random state for expunging
size_t _slotSize; // total size of a slot
uint64_t _logSize; // logarithm (base 2) of number of buckets
uint64_t _size; // actual number of buckets
uint64_t _niceSize; // smallest power of 2 at least number of buckets, ==
// 2^_logSize
uint64_t _sizeMask; // used to mask out some bits from the hash
uint32_t _sizeShift; // used to shift the bits down to get a position
uint64_t _allocSize; // number of allocated bytes,
// == _size * SlotsPerBucket * _slotSize + 64
char* _base; // pointer to allocated space, 64-byte aligned
char* _allocBase; // base of original allocation
uint64_t _nrUsed; // number of pairs stored in the table
uint64_t _nrCuckood; // number of elements that have been removed by cuckoo
uint64_t _nrTotal; // number of elements included in total
unsigned _maxRounds; // maximum number of cuckoo rounds on insertion
HashKey _hasherKey; // Instance to compute the first hash function
Fingerprint _fingerprint; // Instance to compute a fingerprint of a key
HashShort _hasherShort; // Instance to compute the second hash function
arangodb::basics::ReadWriteLock mutable _bucketLock;
};
} // namespace arangodb
#endif

View File

@ -39,6 +39,7 @@
#include "RocksDBEngine/RocksDBCollection.h"
#include "RocksDBEngine/RocksDBCommon.h"
#include "RocksDBEngine/RocksDBCounterManager.h"
#include "RocksDBEngine/RocksDBKey.h"
#include "RocksDBEngine/RocksDBKeyBounds.h"
#include "RocksDBEngine/RocksDBToken.h"
@ -274,19 +275,31 @@ void RocksDBEdgeIndexIterator::reset() {
// ============================= Index ====================================
uint64_t RocksDBEdgeIndex::HashForKey(const rocksdb::Slice& key) {
std::hash<StringRef> hasher;
// NOTE: This function needs to use the same hashing on the
// indexed VPack as the initial inserter does
StringRef tmp = RocksDBKey::vertexId(key);
return static_cast<uint64_t>(hasher(tmp));
}
RocksDBEdgeIndex::RocksDBEdgeIndex(TRI_idx_iid_t iid,
arangodb::LogicalCollection* collection,
VPackSlice const& info,
std::string const& attr)
: RocksDBIndex(iid
,collection
,std::vector<std::vector<AttributeName>>({{AttributeName(attr, false)}})
,false // unique
,false // sparse
,basics::VelocyPackHelper::stringUInt64(info, "objectId")
,!ServerState::instance()->isCoordinator() // useCache
)
, _directionAttr(attr) {
: RocksDBIndex(iid, collection, std::vector<std::vector<AttributeName>>(
{{AttributeName(attr, false)}}),
false, false,
basics::VelocyPackHelper::stringUInt64(info, "objectId"),
!ServerState::instance()->isCoordinator() /*useCache*/
),
_directionAttr(attr),
_estimator(nullptr) {
if (!ServerState::instance()->isCoordinator()) {
// We activate the estimator only on DBServers
_estimator = std::make_unique<RocksDBCuckooIndexEstimator<uint64_t>>(RocksDBIndex::ESTIMATOR_SIZE);
TRI_ASSERT(_estimator != nullptr);
}
TRI_ASSERT(iid != 0);
TRI_ASSERT(_objectId != 0);
// if we never hit the assertions we need to remove the
@ -308,25 +321,11 @@ double RocksDBEdgeIndex::selectivityEstimate(
return 0.1;
}
if (attribute != nullptr) {
// the index attribute is given here
// now check if we can restrict the selectivity estimation to the correct
// part of the index
if (attribute->compare(_directionAttr) == 0) {
// _from
return 0.2; //_edgesFrom->selectivity();
} else {
return 0;
}
// other attribute. now return the average selectivity
if (attribute != nullptr && attribute->compare(_directionAttr)) {
return 0;
}
// return average selectivity of the two index parts
// double estimate = (_edgesFrom->selectivity() + _edgesTo->selectivity()) *
// 0.5;
// TRI_ASSERT(estimate >= 0.0 &&
// estimate <= 1.00001); // floating-point tolerance
return 0.1;
TRI_ASSERT(_estimator != nullptr);
return _estimator->computeEstimate();
}
/// @brief return the memory usage for the index
@ -356,7 +355,7 @@ int RocksDBEdgeIndex::insert(transaction::Methods* trx,
VPackSlice primaryKey = doc.get(StaticStrings::KeyString);
VPackSlice fromTo = doc.get(_directionAttr);
TRI_ASSERT(primaryKey.isString() && fromTo.isString());
auto fromToRef=StringRef(fromTo);
auto fromToRef = StringRef(fromTo);
RocksDBKey key = RocksDBKey::EdgeIndexValue(_objectId, fromToRef,
StringRef(primaryKey));
//blacklist key in cache
@ -369,6 +368,9 @@ int RocksDBEdgeIndex::insert(transaction::Methods* trx,
rocksdb::Status status =
rtrx->Put(rocksdb::Slice(key.string()), rocksdb::Slice());
if (status.ok()) {
std::hash<StringRef> hasher;
uint64_t hash = static_cast<uint64_t>(hasher(fromToRef));
_estimator->insert(hash);
return TRI_ERROR_NO_ERROR;
} else {
return rocksutils::convertStatus(status).errorNumber();
@ -385,7 +387,7 @@ int RocksDBEdgeIndex::remove(transaction::Methods* trx,
bool isRollback) {
VPackSlice primaryKey = doc.get(StaticStrings::KeyString);
VPackSlice fromTo = doc.get(_directionAttr);
auto fromToRef=StringRef(fromTo);
auto fromToRef = StringRef(fromTo);
TRI_ASSERT(primaryKey.isString() && fromTo.isString());
RocksDBKey key = RocksDBKey::EdgeIndexValue(_objectId, fromToRef,
StringRef(primaryKey));
@ -398,6 +400,9 @@ int RocksDBEdgeIndex::remove(transaction::Methods* trx,
rocksdb::Transaction* rtrx = state->rocksTransaction();
rocksdb::Status status = rtrx->Delete(rocksdb::Slice(key.string()));
if (status.ok()) {
std::hash<StringRef> hasher;
uint64_t hash = static_cast<uint64_t>(hasher(fromToRef));
_estimator->remove(hash);
return TRI_ERROR_NO_ERROR;
} else {
return rocksutils::convertStatus(status).errorNumber();
@ -620,10 +625,48 @@ int RocksDBEdgeIndex::cleanup() {
return TRI_ERROR_NO_ERROR;
}
void RocksDBEdgeIndex::serializeEstimate(std::string& output) const {
TRI_ASSERT(_estimator != nullptr);
_estimator->serialize(output);
}
bool RocksDBEdgeIndex::deserializeEstimate(RocksDBCounterManager* mgr) {
TRI_ASSERT(!ServerState::instance()->isCoordinator());
// We simply drop the current estimator and steal the one from recovery
// We are than save for resizing issues in our _estimator format
// and will use the old size.
TRI_ASSERT(mgr != nullptr);
auto tmp = mgr->stealIndexEstimator(_objectId);
if (tmp == nullptr) {
// We expected to receive a stored index estimate, however we got none.
// We use the freshly created estimator but have to recompute it.
return false;
}
_estimator.swap(tmp);
TRI_ASSERT(_estimator != nullptr);
return true;
}
void RocksDBEdgeIndex::recalculateEstimates() {
TRI_ASSERT(_estimator != nullptr);
_estimator->clear();
auto bounds = RocksDBKeyBounds::EdgeIndex(_objectId);
rocksutils::iterateBounds(bounds, [&](rocksdb::Iterator* it) {
uint64_t hash = RocksDBEdgeIndex::HashForKey(it->key());
_estimator->insert(hash);
});
}
Result RocksDBEdgeIndex::postprocessRemove(transaction::Methods* trx,
rocksdb::Slice const& key,
rocksdb::Slice const& value) {
//blacklist keys during truncate
blackListKey(key.data(), key.size());
uint64_t hash = RocksDBEdgeIndex::HashForKey(key);
_estimator->remove(hash);
return {TRI_ERROR_NO_ERROR};
}

View File

@ -27,6 +27,7 @@
#include "Basics/Common.h"
#include "Indexes/Index.h"
#include "Indexes/IndexIterator.h"
#include "RocksDBEngine/RocksDBCuckooIndexEstimator.h"
#include "RocksDBEngine/RocksDBIndex.h"
#include "RocksDBEngine/RocksDBKey.h"
#include "RocksDBEngine/RocksDBKeyBounds.h"
@ -85,6 +86,8 @@ class RocksDBEdgeIndex final : public RocksDBIndex {
friend class RocksDBEdgeIndexIterator;
public:
static uint64_t HashForKey(const rocksdb::Slice& key);
RocksDBEdgeIndex() = delete;
RocksDBEdgeIndex(TRI_idx_iid_t, arangodb::LogicalCollection*,
@ -156,6 +159,13 @@ class RocksDBEdgeIndex final : public RocksDBIndex {
arangodb::velocypack::Builder&) const override;
int cleanup() override;
void serializeEstimate(std::string& output) const override;
bool deserializeEstimate(arangodb::RocksDBCounterManager* mgr) override;
void recalculateEstimates() override;
private:
/// @brief create the iterator
IndexIterator* createEqIterator(transaction::Methods*, ManagedDocumentResult*,
@ -171,6 +181,12 @@ class RocksDBEdgeIndex final : public RocksDBIndex {
arangodb::aql::AstNode const* valNode) const;
std::string _directionAttr;
/// @brief A fixed size library to estimate the selectivity of the index.
/// On insertion of a document we have to insert it into the estimator,
/// On removal we have to remove it in the estimator as well.
std::unique_ptr<RocksDBCuckooIndexEstimator<uint64_t>> _estimator;
};
} // namespace arangodb

View File

@ -290,6 +290,21 @@ void RocksDBEngine::stop() {
return;
}
replicationManager()->dropAll();
if (_backgroundThread) {
// stop the press
_backgroundThread->beginShutdown();
if (_counterManager) {
_counterManager->sync(true);
}
// wait until background thread stops
while (_backgroundThread->isRunning()) {
usleep(10000);
}
_backgroundThread.reset();
}
}
void RocksDBEngine::unprepare() {
@ -298,15 +313,6 @@ void RocksDBEngine::unprepare() {
}
if (_db) {
if (_backgroundThread && _backgroundThread->isRunning()) {
// stop the press
_backgroundThread->beginShutdown();
_backgroundThread.reset();
}
if (_counterManager) {
_counterManager->sync(true);
}
// now prune all obsolete WAL files
determinePrunableWalFiles(0);
pruneWalFiles();
@ -1206,7 +1212,7 @@ TRI_vocbase_t* RocksDBEngine::openExistingDatabase(TRI_voc_tick_t id,
VPackSlice slice = builder.slice();
TRI_ASSERT(slice.isArray());
for (auto const& it : VPackArrayIterator(slice)) {
// we found a collection that is still active
TRI_ASSERT(!it.get("id").isNone() || !it.get("cid").isNone());
@ -1222,6 +1228,7 @@ TRI_vocbase_t* RocksDBEngine::openExistingDatabase(TRI_voc_tick_t id,
static_cast<RocksDBCollection*>(collection->getPhysical());
TRI_ASSERT(physical != nullptr);
physical->deserializeIndexEstimates(counterManager());
LOG_TOPIC(DEBUG, arangodb::Logger::FIXME)
<< "added document collection '" << collection->name() << "'";
}

View File

@ -24,6 +24,7 @@
#ifndef ARANGOD_ROCKSDB_ROCKSDB_HASH_INDEX_H
#define ARANGOD_ROCKSDB_ROCKSDB_HASH_INDEX_H 1
#include "Basics/VelocyPackHelper.h"
#include "RocksDBEngine/RocksDBVPackIndex.h"
namespace arangodb {
@ -44,6 +45,7 @@ class RocksDBHashIndex final : public RocksDBVPackIndex {
bool matchesDefinition(VPackSlice const& info) const override;
bool isSorted() const override { return true; }
};
}

View File

@ -38,6 +38,12 @@
using namespace arangodb;
using namespace arangodb::rocksutils;
// This is the number of distinct elements the index estimator can reliably store
// This correlates directly with the memmory of the estimator:
// memmory == ESTIMATOR_SIZE * 6 bytes
uint64_t const arangodb::RocksDBIndex::ESTIMATOR_SIZE = 4096;
RocksDBIndex::RocksDBIndex(
TRI_idx_iid_t id, LogicalCollection* collection,
std::vector<std::vector<arangodb::basics::AttributeName>> const& attributes,
@ -162,6 +168,22 @@ int RocksDBIndex::drop() {
return TRI_ERROR_NO_ERROR;
}
void RocksDBIndex::serializeEstimate(std::string&) const {
// All indexes that do not have an estimator do not serialize anything.
}
bool RocksDBIndex::deserializeEstimate(RocksDBCounterManager*) {
// All indexes that do not have an estimator do not deserialize anything.
// So the estimate is always recreatable.
// We do not advance anything here.
return true;
}
void RocksDBIndex::recalculateEstimates() {
// Nothing to do.
return;
}
void RocksDBIndex::truncate(transaction::Methods* trx) {
RocksDBTransactionState* state = rocksutils::toRocksTransactionState(trx);
rocksdb::Transaction* rtrx = state->rocksTransaction();

View File

@ -41,8 +41,16 @@ class Cache;
}
class LogicalCollection;
class RocksDBComparator;
class RocksDBCounterManager;
class RocksDBIndex : public Index {
protected:
// This is the number of distinct elements the index estimator can reliably store
// This correlates directly with the memmory of the estimator:
// memmory == ESTIMATOR_SIZE * 6 bytes
static uint64_t const ESTIMATOR_SIZE;
protected:
RocksDBIndex(TRI_idx_iid_t, LogicalCollection*,
std::vector<std::vector<arangodb::basics::AttributeName>> const&
@ -89,6 +97,12 @@ class RocksDBIndex : public Index {
void createCache();
void disableCache();
virtual void serializeEstimate(std::string& output) const;
virtual bool deserializeEstimate(RocksDBCounterManager* mgr);
virtual void recalculateEstimates();
protected:
// Will be called during truncate to allow the index to update selectivity
// estimates, blacklist keys, etc.

View File

@ -111,6 +111,9 @@ RocksDBKey RocksDBKey::ReplicationApplierConfig(TRI_voc_tick_t databaseId) {
return RocksDBKey(RocksDBEntryType::ReplicationApplierConfig, databaseId);
}
RocksDBKey RocksDBKey::IndexEstimateValue(uint64_t collectionObjectId) {
return RocksDBKey(RocksDBEntryType::IndexEstimateValue, collectionObjectId);
}
// ========================= Member methods ===========================
RocksDBEntryType RocksDBKey::type(RocksDBKey const& key) {
@ -168,12 +171,11 @@ arangodb::StringRef RocksDBKey::primaryKey(RocksDBKey const& key) {
arangodb::StringRef RocksDBKey::primaryKey(rocksdb::Slice const& slice) {
return primaryKey(slice.data(), slice.size());
}
std::string RocksDBKey::vertexId(RocksDBKey const& key) {
StringRef RocksDBKey::vertexId(RocksDBKey const& key) {
return vertexId(key._buffer.data(), key._buffer.size());
}
std::string RocksDBKey::vertexId(rocksdb::Slice const& slice) {
StringRef RocksDBKey::vertexId(rocksdb::Slice const& slice) {
return vertexId(slice.data(), slice.size());
}
@ -214,6 +216,7 @@ RocksDBKey::RocksDBKey(RocksDBEntryType type, uint64_t first)
switch (_type) {
case RocksDBEntryType::Database:
case RocksDBEntryType::CounterValue:
case RocksDBEntryType::IndexEstimateValue:
case RocksDBEntryType::ReplicationApplierConfig: {
size_t length = sizeof(char) + sizeof(uint64_t);
_buffer.reserve(length);
@ -456,7 +459,7 @@ arangodb::StringRef RocksDBKey::primaryKey(char const* data, size_t size) {
}
}
std::string RocksDBKey::vertexId(char const* data, size_t size) {
StringRef RocksDBKey::vertexId(char const* data, size_t size) {
TRI_ASSERT(data != nullptr);
TRI_ASSERT(size >= sizeof(char));
RocksDBEntryType type = static_cast<RocksDBEntryType>(data[0]);
@ -466,7 +469,7 @@ std::string RocksDBKey::vertexId(char const* data, size_t size) {
size_t keySize = static_cast<size_t>(data[size - 1]);
size_t idSize = size - (sizeof(char) + sizeof(uint64_t) + sizeof(char) +
keySize + sizeof(uint8_t));
return std::string(data + sizeof(char) + sizeof(uint64_t), idSize);
return StringRef(data + sizeof(char) + sizeof(uint64_t), idSize);
}
default:

View File

@ -135,6 +135,12 @@ class RocksDBKey {
//////////////////////////////////////////////////////////////////////////////
static RocksDBKey ReplicationApplierConfig(TRI_voc_tick_t databaseId);
//////////////////////////////////////////////////////////////////////////////
/// @brief Create a fully-specified key for index estimate values of
/// a collection
//////////////////////////////////////////////////////////////////////////////
static RocksDBKey IndexEstimateValue(uint64_t objectId);
public:
//////////////////////////////////////////////////////////////////////////////
/// @brief Extracts the type from a key
@ -211,8 +217,8 @@ class RocksDBKey {
///
/// May be called only on EdgeIndexValue keys. Other types will throw.
//////////////////////////////////////////////////////////////////////////////
static std::string vertexId(RocksDBKey const&);
static std::string vertexId(rocksdb::Slice const&);
static StringRef vertexId(RocksDBKey const&);
static StringRef vertexId(rocksdb::Slice const&);
//////////////////////////////////////////////////////////////////////////////
/// @brief Extracts the indexed VelocyPack values from a key
@ -264,7 +270,7 @@ class RocksDBKey {
static TRI_voc_cid_t viewId(char const* data, size_t size);
static TRI_voc_rid_t revisionId(char const* data, size_t size);
static StringRef primaryKey(char const* data, size_t size);
static std::string vertexId(char const* data, size_t size);
static StringRef vertexId(char const* data, size_t size);
static VPackSlice indexedVPack(char const* data, size_t size);
private:

View File

@ -119,6 +119,10 @@ RocksDBKeyBounds RocksDBKeyBounds::CounterValues() {
return RocksDBKeyBounds(RocksDBEntryType::CounterValue);
}
RocksDBKeyBounds RocksDBKeyBounds::IndexEstimateValues() {
return RocksDBKeyBounds(RocksDBEntryType::IndexEstimateValue);
}
RocksDBKeyBounds RocksDBKeyBounds::FulltextIndexPrefix(
uint64_t indexId, arangodb::StringRef const& word) {
// I did not want to pass a bool to the constructor for this
@ -203,7 +207,8 @@ RocksDBKeyBounds::RocksDBKeyBounds(RocksDBEntryType type)
break;
}
case RocksDBEntryType::CounterValue: {
case RocksDBEntryType::CounterValue:
case RocksDBEntryType::IndexEstimateValue: {
size_t length = sizeof(char) + sizeof(uint64_t);
_startBuffer.reserve(length);
_startBuffer.push_back(static_cast<char>(_type));
@ -214,7 +219,6 @@ RocksDBKeyBounds::RocksDBKeyBounds(RocksDBEntryType type)
uint64ToPersistent(_endBuffer, UINT64_MAX);
break;
}
default:
THROW_ARANGO_EXCEPTION(TRI_ERROR_BAD_PARAMETER);
}

View File

@ -123,6 +123,11 @@ class RocksDBKeyBounds {
//////////////////////////////////////////////////////////////////////////////
static RocksDBKeyBounds CounterValues();
//////////////////////////////////////////////////////////////////////////////
/// @brief Bounds for all index estimate values
//////////////////////////////////////////////////////////////////////////////
static RocksDBKeyBounds IndexEstimateValues();
//////////////////////////////////////////////////////////////////////////////
/// @brief Bounds for all entries of a fulltext index, matching prefixes
//////////////////////////////////////////////////////////////////////////////

View File

@ -103,6 +103,12 @@ static rocksdb::Slice ReplicationApplierConfig(
reinterpret_cast<std::underlying_type<RocksDBEntryType>::type*>(
&replicationApplierConfig),
1);
static RocksDBEntryType indexEstimateValue = RocksDBEntryType::IndexEstimateValue;
static rocksdb::Slice IndexEstimateValue(
reinterpret_cast<std::underlying_type<RocksDBEntryType>::type*>(
&indexEstimateValue),
1);
}
char const* arangodb::rocksDBEntryTypeName(arangodb::RocksDBEntryType type) {
@ -120,6 +126,7 @@ char const* arangodb::rocksDBEntryTypeName(arangodb::RocksDBEntryType type) {
case arangodb::RocksDBEntryType::ReplicationApplierConfig: return "ReplicationApplierConfig";
case arangodb::RocksDBEntryType::FulltextIndexValue: return "FulltextIndexValue";
case arangodb::RocksDBEntryType::GeoIndexValue: return "GeoIndexValue";
case arangodb::RocksDBEntryType::IndexEstimateValue: return "IndexEstimateValue";
}
return "Invalid";
}
@ -175,6 +182,8 @@ rocksdb::Slice const& arangodb::rocksDBSlice(RocksDBEntryType const& type) {
return SettingsValue;
case RocksDBEntryType::ReplicationApplierConfig:
return ReplicationApplierConfig;
case RocksDBEntryType::IndexEstimateValue:
return IndexEstimateValue;
}
return Document; // avoids warning - errorslice instead ?!

View File

@ -48,7 +48,8 @@ enum class RocksDBEntryType : char {
SettingsValue = '9',
ReplicationApplierConfig = ':',
FulltextIndexValue = ';',
GeoIndexValue = '<'
GeoIndexValue = '<',
IndexEstimateValue = '='
};
char const* rocksDBEntryTypeName(RocksDBEntryType);

View File

@ -34,6 +34,7 @@
#include "RocksDBEngine/RocksDBCollection.h"
#include "RocksDBEngine/RocksDBCommon.h"
#include "RocksDBEngine/RocksDBComparator.h"
#include "RocksDBEngine/RocksDBCounterManager.h"
#include "RocksDBEngine/RocksDBPrimaryIndex.h"
#include "RocksDBEngine/RocksDBToken.h"
#include "RocksDBEngine/RocksDBTransactionState.h"
@ -157,13 +158,27 @@ bool RocksDBVPackIndexIterator::next(TokenCallback const& cb, size_t limit) {
return true;
}
uint64_t RocksDBVPackIndex::HashForKey(const rocksdb::Slice& key) {
// NOTE: This function needs to use the same hashing on the
// indexed VPack as the initial inserter does
VPackSlice tmp = RocksDBKey::indexedVPack(key);
return tmp.normalizedHash();
}
/// @brief create the index
RocksDBVPackIndex::RocksDBVPackIndex(TRI_idx_iid_t iid,
arangodb::LogicalCollection* collection,
arangodb::velocypack::Slice const& info)
: RocksDBIndex(iid, collection, info),
_useExpansion(false),
_allowPartialIndex(true) {
_allowPartialIndex(true),
_estimator(nullptr) {
if (!_unique && !ServerState::instance()->isCoordinator()) {
// We activate the estimator for all non unique-indexes.
// And only on DBServers
_estimator = std::make_unique<RocksDBCuckooIndexEstimator<uint64_t>>(RocksDBIndex::ESTIMATOR_SIZE);
TRI_ASSERT(_estimator != nullptr);
}
TRI_ASSERT(!_fields.empty());
TRI_ASSERT(iid != 0);
@ -181,6 +196,13 @@ RocksDBVPackIndex::RocksDBVPackIndex(TRI_idx_iid_t iid,
/// @brief destroy the index
RocksDBVPackIndex::~RocksDBVPackIndex() {}
double RocksDBVPackIndex::selectivityEstimate(arangodb::StringRef const*) const {
if (_unique) {
return 1.0; // only valid if unique
}
return _estimator->computeEstimate();
}
size_t RocksDBVPackIndex::memory() const {
rocksdb::TransactionDB* db = rocksutils::globalRocksDB();
RocksDBKeyBounds bounds = _unique ? RocksDBKeyBounds::UniqueIndex(_objectId)
@ -236,7 +258,8 @@ bool RocksDBVPackIndex::implicitlyUnique() const {
int RocksDBVPackIndex::fillElement(VPackBuilder& leased,
TRI_voc_rid_t revisionId,
VPackSlice const& doc,
std::vector<RocksDBKey>& elements) {
std::vector<RocksDBKey>& elements,
std::vector<uint64_t>& hashes) {
if (doc.isNone()) {
LOG_TOPIC(ERR, arangodb::Logger::FIXME)
<< "encountered invalid marker with slice of type None";
@ -292,12 +315,13 @@ int RocksDBVPackIndex::fillElement(VPackBuilder& leased,
// - Value: empty
elements.push_back(
RocksDBKey::IndexValue(_objectId, key, leased.slice()));
hashes.push_back(leased.slice().normalizedHash());
}
} else {
// other path for handling array elements, too
std::vector<VPackSlice> sliceStack;
buildIndexValues(leased, doc, 0, elements, sliceStack);
buildIndexValues(leased, doc, 0, elements, sliceStack, hashes);
}
return TRI_ERROR_NO_ERROR;
@ -306,7 +330,8 @@ int RocksDBVPackIndex::fillElement(VPackBuilder& leased,
void RocksDBVPackIndex::addIndexValue(VPackBuilder& leased,
VPackSlice const& document,
std::vector<RocksDBKey>& elements,
std::vector<VPackSlice>& sliceStack) {
std::vector<VPackSlice>& sliceStack,
std::vector<uint64_t>& hashes) {
leased.clear();
leased.openArray();
for (VPackSlice const& s : sliceStack) {
@ -326,6 +351,7 @@ void RocksDBVPackIndex::addIndexValue(VPackBuilder& leased,
// + primary key
// - Value: empty
elements.push_back(RocksDBKey::IndexValue(_objectId, key, leased.slice()));
hashes.push_back(leased.slice().normalizedHash());
}
}
@ -334,12 +360,13 @@ void RocksDBVPackIndex::buildIndexValues(VPackBuilder& leased,
VPackSlice const document,
size_t level,
std::vector<RocksDBKey>& elements,
std::vector<VPackSlice>& sliceStack) {
std::vector<VPackSlice>& sliceStack,
std::vector<uint64_t>& hashes) {
// Invariant: level == sliceStack.size()
// Stop the recursion:
if (level == _paths.size()) {
addIndexValue(leased, document, elements, sliceStack);
addIndexValue(leased, document, elements, sliceStack, hashes);
return;
}
@ -353,7 +380,7 @@ void RocksDBVPackIndex::buildIndexValues(VPackBuilder& leased,
} else {
sliceStack.emplace_back(slice);
}
buildIndexValues(leased, document, level + 1, elements, sliceStack);
buildIndexValues(leased, document, level + 1, elements, sliceStack, hashes);
sliceStack.pop_back();
return;
}
@ -374,7 +401,7 @@ void RocksDBVPackIndex::buildIndexValues(VPackBuilder& leased,
for (size_t i = level; i < _paths.size(); i++) {
sliceStack.emplace_back(illegalSlice);
}
addIndexValue(leased, document, elements, sliceStack);
addIndexValue(leased, document, elements, sliceStack, hashes);
for (size_t i = level; i < _paths.size(); i++) {
sliceStack.pop_back();
}
@ -409,7 +436,7 @@ void RocksDBVPackIndex::buildIndexValues(VPackBuilder& leased,
if (it == seen.end()) {
seen.insert(something);
sliceStack.emplace_back(something);
buildIndexValues(leased, document, level + 1, elements, sliceStack);
buildIndexValues(leased, document, level + 1, elements, sliceStack, hashes);
sliceStack.pop_back();
}
};
@ -472,10 +499,11 @@ int RocksDBVPackIndex::insert(transaction::Methods* trx,
TRI_voc_rid_t revisionId, VPackSlice const& doc,
bool isRollback) {
std::vector<RocksDBKey> elements;
std::vector<uint64_t> hashes;
int res;
try {
transaction::BuilderLeaser leased(trx);
res = fillElement(*(leased.get()), revisionId, doc, elements);
res = fillElement(*(leased.get()), revisionId, doc, elements, hashes);
} catch (...) {
res = TRI_ERROR_OUT_OF_MEMORY;
}
@ -530,6 +558,12 @@ int RocksDBVPackIndex::insert(transaction::Methods* trx,
}
}
for (auto& it : hashes) {
// The estimator is only useful if we are in a non-unique indexes
TRI_ASSERT(!_unique);
_estimator->insert(it);
}
return res;
}
@ -537,10 +571,11 @@ int RocksDBVPackIndex::insertRaw(rocksdb::WriteBatchWithIndex* writeBatch,
TRI_voc_rid_t revisionId,
VPackSlice const& doc) {
std::vector<RocksDBKey> elements;
std::vector<uint64_t> hashes;
int res;
try {
VPackBuilder leased;
res = fillElement(leased, revisionId, doc, elements);
res = fillElement(leased, revisionId, doc, elements, hashes);
} catch (...) {
return TRI_ERROR_OUT_OF_MEMORY;
}
@ -569,6 +604,14 @@ int RocksDBVPackIndex::insertRaw(rocksdb::WriteBatchWithIndex* writeBatch,
writeBatch->Put(key.string(), value.string());
}
}
for (auto& it : hashes) {
// The estimator is only useful if we are in a non-unique indexes
TRI_ASSERT(!_unique);
_estimator->insert(it);
}
return res;
}
@ -577,11 +620,12 @@ int RocksDBVPackIndex::remove(transaction::Methods* trx,
TRI_voc_rid_t revisionId, VPackSlice const& doc,
bool isRollback) {
std::vector<RocksDBKey> elements;
std::vector<uint64_t> hashes;
int res;
try {
transaction::BuilderLeaser leased(trx);
res = fillElement(*(leased.get()), revisionId, doc, elements);
res = fillElement(*(leased.get()), revisionId, doc, elements, hashes);
} catch (...) {
res = TRI_ERROR_OUT_OF_MEMORY;
}
@ -602,6 +646,12 @@ int RocksDBVPackIndex::remove(transaction::Methods* trx,
}
}
for (auto& it : hashes) {
// The estimator is only useful if we are in a non-unique indexes
TRI_ASSERT(!_unique);
_estimator->remove(it);
}
return res;
}
@ -609,11 +659,12 @@ int RocksDBVPackIndex::removeRaw(rocksdb::WriteBatchWithIndex* writeBatch,
TRI_voc_rid_t revisionId,
VPackSlice const& doc) {
std::vector<RocksDBKey> elements;
std::vector<uint64_t> hashes;
int res;
try {
VPackBuilder leased;
res = fillElement(leased, revisionId, doc, elements);
res = fillElement(leased, revisionId, doc, elements, hashes);
} catch (...) {
res = TRI_ERROR_OUT_OF_MEMORY;
}
@ -627,6 +678,12 @@ int RocksDBVPackIndex::removeRaw(rocksdb::WriteBatchWithIndex* writeBatch,
writeBatch->Delete(elements[i].string());
}
for (auto& it : hashes) {
// The estimator is only useful if we are in a non-unique indexes
TRI_ASSERT(!_unique);
_estimator->remove(it);
}
return TRI_ERROR_NO_ERROR;
}
@ -1413,11 +1470,56 @@ int RocksDBVPackIndex::cleanup() {
return TRI_ERROR_NO_ERROR;
}
void RocksDBVPackIndex::serializeEstimate(std::string& output) const {
TRI_ASSERT(!ServerState::instance()->isCoordinator());
if (!_unique) {
TRI_ASSERT(_estimator != nullptr);
_estimator->serialize(output);
}
}
bool RocksDBVPackIndex::deserializeEstimate(RocksDBCounterManager* mgr) {
TRI_ASSERT(!ServerState::instance()->isCoordinator());
if (_unique) {
return true;
}
// We simply drop the current estimator and steal the one from recovery
// We are than save for resizing issues in our _estimator format
// and will use the old size.
TRI_ASSERT(mgr != nullptr);
auto tmp = mgr->stealIndexEstimator(_objectId);
if (tmp == nullptr) {
// We expected to receive a stored index estimate, however we got none.
// We use the freshly created estimator but have to recompute it.
return false;
}
_estimator.swap(tmp);
TRI_ASSERT(_estimator != nullptr);
return true;
}
void RocksDBVPackIndex::recalculateEstimates() {
if (unique()) {
return;
}
TRI_ASSERT(_estimator != nullptr);
_estimator->clear();
auto bounds = RocksDBKeyBounds::IndexEntries(_objectId);
rocksutils::iterateBounds(bounds, [&](rocksdb::Iterator* it) {
uint64_t hash = RocksDBVPackIndex::HashForKey(it->key());
_estimator->insert(hash);
});
}
Result RocksDBVPackIndex::postprocessRemove(transaction::Methods* trx,
rocksdb::Slice const& key,
rocksdb::Slice const& value) {
if (!unique()) {
// TODO: update selectivity estimate
uint64_t hash = RocksDBVPackIndex::HashForKey(key);
_estimator->remove(hash);
}
return {TRI_ERROR_NO_ERROR};
}

View File

@ -29,6 +29,7 @@
#include "Aql/AstNode.h"
#include "Basics/Common.h"
#include "Indexes/IndexIterator.h"
#include "RocksDBEngine/RocksDBCuckooIndexEstimator.h"
#include "RocksDBEngine/RocksDBIndex.h"
#include "RocksDBEngine/RocksDBKey.h"
#include "RocksDBEngine/RocksDBKeyBounds.h"
@ -101,6 +102,8 @@ class RocksDBVPackIndex : public RocksDBIndex {
friend class RocksDBVPackIndexIterator;
public:
static uint64_t HashForKey(const rocksdb::Slice& key);
RocksDBVPackIndex() = delete;
RocksDBVPackIndex(TRI_idx_iid_t, LogicalCollection*,
@ -110,13 +113,7 @@ class RocksDBVPackIndex : public RocksDBIndex {
bool hasSelectivityEstimate() const override { return true; }
double selectivityEstimate(
arangodb::StringRef const* = nullptr) const override {
if (_unique) {
return 1.0; // only valid if unique
}
return 0.2; // TODO: fix this hard-coded estimate
}
double selectivityEstimate(arangodb::StringRef const* = nullptr) const override;
size_t memory() const override;
@ -179,6 +176,12 @@ class RocksDBVPackIndex : public RocksDBIndex {
int cleanup() override;
void serializeEstimate(std::string& output) const override;
bool deserializeEstimate(arangodb::RocksDBCounterManager* mgr) override;
void recalculateEstimates() override;
protected:
Result postprocessRemove(transaction::Methods* trx, rocksdb::Slice const& key,
rocksdb::Slice const& value) override;
@ -209,21 +212,26 @@ protected:
/// @brief helper function to insert a document into any index type
int fillElement(velocypack::Builder& leased, TRI_voc_rid_t revisionId,
VPackSlice const& doc, std::vector<RocksDBKey>& elements);
VPackSlice const& doc, std::vector<RocksDBKey>& elements,
std::vector<uint64_t>& hashes);
/// @brief helper function to build the key and value for rocksdb from the
/// vector of slices
/// @param hashes list of VPackSlice hashes for the estimator.
void addIndexValue(velocypack::Builder& leased, VPackSlice const& document,
std::vector<RocksDBKey>& elements,
std::vector<VPackSlice>& sliceStack);
std::vector<VPackSlice>& sliceStack,
std::vector<uint64_t>& hashes);
/// @brief helper function to create a set of value combinations to insert
/// into the rocksdb index.
/// @param elements vector of resulting index entries
/// @param sliceStack working list of values to insert into the index
/// @param hashes list of VPackSlice hashes for the estimator.
void buildIndexValues(velocypack::Builder& leased, VPackSlice const document,
size_t level, std::vector<RocksDBKey>& elements,
std::vector<VPackSlice>& sliceStack);
std::vector<VPackSlice>& sliceStack,
std::vector<uint64_t>& hashes);
private:
std::unique_ptr<FixedSizeAllocator> _allocator;
@ -239,6 +247,12 @@ protected:
/// @brief whether or not partial indexing is allowed
bool _allowPartialIndex;
/// @brief A fixed size library to estimate the selectivity of the index.
/// On insertion of a document we have to insert it into the estimator,
/// On removal we have to remove it in the estimator as well.
std::unique_ptr<RocksDBCuckooIndexEstimator<uint64_t>> _estimator;
};
} // namespace arangodb

View File

@ -28,32 +28,39 @@
// / @author Copyright 2012, triAGENS GmbH, Cologne, Germany
// //////////////////////////////////////////////////////////////////////////////
var db = require('@arangodb').db;
var internal = require('internal');
var jsunity = require('jsunity');
const db = require('@arangodb').db;
const internal = require('internal');
const jsunity = require('jsunity');
const colName1 = 'UnitTestsRecovery1';
const colName2 = 'UnitTestsRecovery2';
const colName3 = 'UnitTestsRecovery3';
const est1 = 1; // The index is de-facto unique so estimate 1
const est2 = 1; // This index is unique. Estimate 1
const est3 = 4 / 1000; // This index has 4 different values and stores 1000 documents
function runSetup () {
'use strict';
internal.debugClearFailAt();
db._drop('UnitTestsRecovery1');
var c = db._create('UnitTestsRecovery1'), i;
db._drop(colName1);
var c = db._create(colName1), i;
c.ensureHashIndex('value');
for (i = 0; i < 1000; ++i) {
c.save({ value: i });
}
db._drop('UnitTestsRecovery2');
c = db._create('UnitTestsRecovery2');
db._drop(colName2);
c = db._create(colName2);
c.ensureUniqueConstraint('a.value');
for (i = 0; i < 1000; ++i) {
c.save({ a: { value: i } });
}
db._drop('UnitTestsRecovery3');
c = db._create('UnitTestsRecovery3');
db._drop(colName3);
c = db._create(colName3);
c.ensureHashIndex('a', 'b');
for (i = 0; i < 500; ++i) {
@ -84,39 +91,82 @@ function recoverySuite () {
// / @brief test whether we can restore the trx data
// //////////////////////////////////////////////////////////////////////////////
testIndexesHash: function () {
var c = db._collection('UnitTestsRecovery1'), idx, i;
idx = c.getIndexes()[1];
testSingleAttributeHashIndexInfo: function() {
let c = db._collection(colName1);
let idx = c.getIndexes()[1];
assertFalse(idx.unique);
assertFalse(idx.sparse);
assertEqual([ 'value' ], idx.fields);
for (i = 0; i < 1000; ++i) {
},
testSingleAttributeHashIndexByExample: function() {
let c = db._collection(colName1);
for (let i = 0; i < 1000; ++i) {
assertEqual(1, c.byExample({ value: i }).toArray().length);
}
assertEqual(1, db._query("FOR doc IN UnitTestsRecovery1 FILTER doc.value == 0 RETURN doc").toArray().length);
},
c = db._collection('UnitTestsRecovery2');
idx = c.getIndexes()[1];
testSingleAttributeHashIndexAql: function() {
assertEqual(1, db._query(`FOR doc IN ${colName1} FILTER doc.value == 0 RETURN doc`).toArray().length);
},
testSingleAttributeHashIndexEstimate: function () {
let c = db._collection(colName1);
let idx = c.getIndexes()[1];
assertEqual(est1, idx.selectivityEstimate);
},
testNestedAttributeHashIndexInfo: function() {
let c = db._collection(colName2);
let idx = c.getIndexes()[1];
assertTrue(idx.unique);
assertFalse(idx.sparse);
assertEqual([ 'a.value' ], idx.fields);
for (i = 0; i < 1000; ++i) {
},
testNestedAttributeHashIndexByExample: function() {
let c = db._collection(colName2);
for (let i = 0; i < 1000; ++i) {
assertEqual(1, c.byExample({ 'a.value': i }).toArray().length);
}
assertEqual(1, db._query("FOR doc IN UnitTestsRecovery2 FILTER doc.a.value == 0 RETURN doc").toArray().length);
},
c = db._collection('UnitTestsRecovery3');
idx = c.getIndexes()[1];
testNestedAttributeHashIndexAql: function() {
assertEqual(1, db._query(`FOR doc IN ${colName2} FILTER doc.a.value == 0 RETURN doc`).toArray().length);
},
testNestedAttributeHashIndexEstimate: function () {
let c = db._collection(colName2);
let idx = c.getIndexes()[1];
assertEqual(est2, idx.selectivityEstimate);
},
testManyAttributesHashIndexInfo: function() {
let c = db._collection(colName3);
let idx = c.getIndexes()[1];
assertFalse(idx.unique);
assertFalse(idx.sparse);
assertEqual([ 'a', 'b' ], idx.fields);
},
testManyAttributesHashIndexByExample: function() {
let c = db._collection(colName3);
assertEqual(250, c.byExample({ a: 1, b: 1 }).toArray().length);
assertEqual(250, c.byExample({ a: 1, b: 2 }).toArray().length);
assertEqual(250, c.byExample({ a: 2, b: 1 }).toArray().length);
assertEqual(250, c.byExample({ a: 2, b: 2 }).toArray().length);
assertEqual(250, db._query("FOR doc IN UnitTestsRecovery3 FILTER doc.a == 1 && doc.b == 1 RETURN doc").toArray().length);
}
},
testManyAttributesHashIndexAql: function() {
assertEqual(250, db._query(`FOR doc IN ${colName3} FILTER doc.a == 1 && doc.b == 1 RETURN doc`).toArray().length);
},
testManyAttributesHashIndexEstimate: function () {
let c = db._collection(colName3);
let idx = c.getIndexes()[1];
assertEqual(est3, idx.selectivityEstimate);
},
};
}

View File

@ -53,6 +53,7 @@ add_executable(
Cache/TransactionsWithBackingStore.cpp
Geo/georeg.cpp
Pregel/typedbuffer.cpp
RocksDBEngine/IndexEstimatorTest.cpp
main.cpp
)

View File

@ -0,0 +1,135 @@
////////////////////////////////////////////////////////////////////////////////
/// @brief test suite for CuckooFilter based index selectivity estimator
///
/// @file
///
/// DISCLAIMER
///
/// Copyright 2004-2012 triagens GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is triAGENS GmbH, Cologne, Germany
///
/// @author Michael Hackstein
/// @author Copyright 2017, ArangoDB GmbH, Cologne, Germany
////////////////////////////////////////////////////////////////////////////////
#include "Basics/Common.h"
#include "Basics/StringRef.h"
#include "catch.hpp"
#include "RocksDBEngine/RocksDBCuckooIndexEstimator.h"
using namespace arangodb;
// -----------------------------------------------------------------------------
// --SECTION-- test suite
// -----------------------------------------------------------------------------
// @brief setup
TEST_CASE("IndexEstimator", "[indexestimator]") {
// @brief Test insert unique correctness
SECTION("test_unique_values") {
std::vector<uint64_t> toInsert(100);
uint64_t i = 0;
RocksDBCuckooIndexEstimator<uint64_t> est(2048);
std::generate(toInsert.begin(), toInsert.end(), [&i] {return i++;});
for (auto it : toInsert) {
est.insert(it);
}
CHECK(est.nrUsed() == 100);
CHECK(est.computeEstimate() == 1);
for (size_t k = 0; k < 10; ++k) {
est.remove(toInsert[k]);
}
CHECK(est.nrUsed() == 90);
CHECK(est.computeEstimate() == 1);
}
SECTION("test_multiple_values") {
std::vector<uint64_t> toInsert(100);
uint64_t i = 0;
RocksDBCuckooIndexEstimator<uint64_t> est(2048);
std::generate(toInsert.begin(), toInsert.end(), [&i] {return (i++) % 10;});
for (auto it : toInsert) {
est.insert(it);
}
CHECK(est.nrUsed() == 10);
CHECK(est.nrCuckood() == 0);
CHECK(est.computeEstimate() == (double) 10 / 100);
for (size_t k = 0; k < 10; ++k) {
est.remove(toInsert[k]);
}
CHECK(est.nrCuckood() == 0);
CHECK(est.computeEstimate() == (double) 10 / 90);
}
SECTION("test_serialize_deserialize") {
std::vector<uint64_t> toInsert(10000);
uint64_t i = 0;
std::string serialization;
RocksDBCuckooIndexEstimator<uint64_t> est(2048);
std::generate(toInsert.begin(), toInsert.end(), [&i] {return i++;});
for (auto it : toInsert) {
est.insert(it);
}
est.serialize(serialization);
// Test that the serialization first reports the correct length
uint64_t length = serialization.size();
// We read starting from the second char. The first char is reserved for the type
uint64_t persLength = rocksutils::uint64FromPersistent(serialization.data() + 1);
CHECK(persLength == length);
// We first have an uint64_t representing the length.
// This has to be extracted BEFORE initialisation.
StringRef ref(serialization.data(), persLength);
RocksDBCuckooIndexEstimator<uint64_t> copy(ref);
// After serialisation => deserialisation
// both estimates have to be identical
CHECK(est.nrUsed() == copy.nrUsed());
CHECK(est.nrCuckood() == copy.nrCuckood());
CHECK(est.computeEstimate() == copy.computeEstimate());
// Now let us remove the same elements in both
bool coin = false;
for (auto it : toInsert) {
if (coin) {
est.remove(it);
copy.remove(it);
}
coin = !coin;
}
// We cannot relibly check inserts because the cuckoo has a random factor
// Still all values have to be identical
CHECK(est.nrUsed() == copy.nrUsed());
CHECK(est.nrCuckood() == copy.nrCuckood());
CHECK(est.computeEstimate() == copy.computeEstimate());
}
// @brief generate tests
}