1
0
Fork 0

Merge branch 'devel' of https://github.com/arangodb/arangodb into devel

# Conflicts:
#	arangod/RocksDBEngine/RocksDBCollection.cpp
This commit is contained in:
Simon Grätzer 2017-05-15 17:36:09 +02:00
commit 6472d7cc94
49 changed files with 2171 additions and 306 deletions

View File

@ -166,6 +166,8 @@ void AgencyOperation::toVelocyPack(VPackBuilder& builder) const {
if (_opType.value == AgencyValueOperationType::OBSERVE ||
_opType.value == AgencyValueOperationType::UNOBSERVE) {
builder.add("url", _value);
} else if (_opType.value == AgencyValueOperationType::ERASE) {
builder.add("val", _value);
} else {
builder.add("new", _value);
}

View File

@ -84,7 +84,7 @@ enum class AgencyReadOperationType { READ };
// --SECTION-- AgencyValueOperationType
// -----------------------------------------------------------------------------
enum class AgencyValueOperationType { SET, OBSERVE, UNOBSERVE, PUSH, PREPEND };
enum class AgencyValueOperationType { ERASE, SET, OBSERVE, UNOBSERVE, PUSH, PREPEND };
// -----------------------------------------------------------------------------
// --SECTION-- AgencySimpleOperationType
@ -130,6 +130,8 @@ class AgencyOperationType {
return "push";
case AgencyValueOperationType::PREPEND:
return "prepend";
case AgencyValueOperationType::ERASE:
return "erase";
default:
return "unknown_operation_type";
}

View File

@ -464,24 +464,55 @@ bool Node::handle<PUSH>(VPackSlice const& slice) {
return true;
}
/// Remove element from any place in array by value array
template <>
bool Node::handle<ERASE>(VPackSlice const& slice) {
if (!slice.hasKey("val")) {
/// Remove element from any place in array by value or position
template <> bool Node::handle<ERASE>(VPackSlice const& slice) {
bool haveVal = slice.hasKey("val");
bool havePos = slice.hasKey("pos");
if (!haveVal && !havePos) {
LOG_TOPIC(WARN, Logger::AGENCY)
<< "Operator erase without value to be erased: " << slice.toJson();
<< "Operator erase without value or position to be erased is illegal: "
<< slice.toJson();
return false;
} else if (haveVal && havePos) {
LOG_TOPIC(WARN, Logger::AGENCY)
<< "Operator erase with value and position to be erased is illegal: "
<< slice.toJson();
return false;
} else if (havePos &&
(!slice.get("pos").isUInt() && !slice.get("pos").isSmallInt())) {
LOG_TOPIC(WARN, Logger::AGENCY)
<< "Operator erase with non-positive integer position is illegal: "
<< slice.toJson();
}
Builder tmp;
{ VPackArrayBuilder t(&tmp);
if (this->slice().isArray()) {
for (auto const& old : VPackArrayIterator(this->slice())) {
if (old != slice.get("val")) {
tmp.add(old);
if (haveVal) {
for (auto const& old : VPackArrayIterator(this->slice())) {
if (old != slice.get("val")) {
tmp.add(old);
}
}
} else {
size_t pos = slice.get("pos").getNumber<size_t>();
if (pos >= this->slice().length()) {
return false;
}
size_t n = 0;
for (const auto& old : VPackArrayIterator(this->slice())) {
if (n != pos) {
tmp.add(old);
}
++n;
}
}
}
}
*this = tmp.slice();
return true;
}

View File

@ -237,98 +237,102 @@ int arangodb::aql::CompareAstNodes(AstNode const* lhs, AstNode const* rhs,
if (diff < 0) {
return -1;
} else if (diff > 0) {
return 1;
}
return 1;
}
switch (lType) {
case VPackValueType::Null: {
return 0;
}
TRI_ASSERT(false);
return 0;
}
if (lType == VPackValueType::Null) {
return 0;
}
case VPackValueType::Bool: {
int diff = static_cast<int>(lhs->getIntValue() - rhs->getIntValue());
if (lType == VPackValueType::Bool) {
int diff = static_cast<int>(lhs->getIntValue() - rhs->getIntValue());
if (diff != 0) {
if (diff < 0) {
return -1;
if (diff != 0) {
if (diff < 0) {
return -1;
}
return 1;
}
return 1;
}
return 0;
}
if (lType == VPackValueType::Int || lType == VPackValueType::Double) {
// TODO
double d = lhs->getDoubleValue() - rhs->getDoubleValue();
if (d < 0.0) {
return -1;
} else if (d > 0.0) {
return 1;
}
return 0;
}
if (lType == VPackValueType::String) {
if (compareUtf8) {
return TRI_compare_utf8(lhs->getStringValue(), lhs->getStringLength(),
rhs->getStringValue(), rhs->getStringLength());
}
size_t const minLength =
(std::min)(lhs->getStringLength(), rhs->getStringLength());
int res = memcmp(lhs->getStringValue(), rhs->getStringValue(), minLength);
if (res != 0) {
return res;
return 0;
}
int diff = static_cast<int>(lhs->getStringLength()) -
static_cast<int>(rhs->getStringLength());
if (diff != 0) {
return diff < 0 ? -1 : 1;
case VPackValueType::Int:
case VPackValueType::Double: {
// TODO
double d = lhs->getDoubleValue() - rhs->getDoubleValue();
if (d != 0.0) {
if (d < 0.0) {
return -1;
}
return 1;
}
return 0;
}
return 0;
}
if (lType == VPackValueType::Array) {
size_t const numLhs = lhs->numMembers();
size_t const numRhs = rhs->numMembers();
size_t const n = ((numLhs > numRhs) ? numRhs : numLhs);
case VPackValueType::String: {
if (compareUtf8) {
return TRI_compare_utf8(lhs->getStringValue(), lhs->getStringLength(),
rhs->getStringValue(), rhs->getStringLength());
}
size_t const minLength =
(std::min)(lhs->getStringLength(), rhs->getStringLength());
for (size_t i = 0; i < n; ++i) {
int res = arangodb::aql::CompareAstNodes(lhs->getMember(i),
rhs->getMember(i), compareUtf8);
int res = memcmp(lhs->getStringValue(), rhs->getStringValue(), minLength);
if (res != 0) {
return res;
}
int diff = static_cast<int>(lhs->getStringLength()) -
static_cast<int>(rhs->getStringLength());
if (diff != 0) {
return diff < 0 ? -1 : 1;
}
return 0;
}
if (numLhs < numRhs) {
return -1;
} else if (numLhs > numRhs) {
return 1;
case VPackValueType::Array: {
size_t const numLhs = lhs->numMembers();
size_t const numRhs = rhs->numMembers();
size_t const n = ((numLhs > numRhs) ? numRhs : numLhs);
for (size_t i = 0; i < n; ++i) {
int res = arangodb::aql::CompareAstNodes(lhs->getMember(i),
rhs->getMember(i), compareUtf8);
if (res != 0) {
return res;
}
}
if (numLhs < numRhs) {
return -1;
} else if (numLhs > numRhs) {
return 1;
}
return 0;
}
case VPackValueType::Object: {
// this is a rather exceptional case, so we can
// afford the inefficiency to convert the node to VPack
// for comparison (this saves us from writing our own compare function
// for array AstNodes)
auto l = lhs->toVelocyPackValue();
auto r = rhs->toVelocyPackValue();
return basics::VelocyPackHelper::compare(l->slice(), r->slice(), compareUtf8);
}
default: {
// all things equal
return 0;
}
return 0;
}
if (lType == VPackValueType::Object) {
// this is a rather exceptional case, so we can
// afford the inefficiency to convert the node to VPack
// for comparison (this saves us from writing our own compare function
// for array AstNodes)
auto l = lhs->toVelocyPackValue();
auto r = rhs->toVelocyPackValue();
return basics::VelocyPackHelper::compare(l->slice(), r->slice(), compareUtf8);
}
// all things equal
return 0;
}
/// @brief returns whether or not the string is empty

View File

@ -279,8 +279,7 @@ std::shared_ptr<Table> Cache::table() { return _table; }
void Cache::beginShutdown() {
_state.lock();
if (!_state.isSet(State::Flag::shutdown) &&
!_state.isSet(State::Flag::shuttingDown)) {
if (!_state.isSet(State::Flag::shutdown, State::Flag::shuttingDown)) {
_state.toggleFlag(State::Flag::shuttingDown);
}
_state.unlock();

View File

@ -59,7 +59,7 @@ CacheManagerFeature::CacheManagerFeature(
_cacheSize((TRI_PhysicalMemory >= (static_cast<uint64_t>(4) << 30))
? ((TRI_PhysicalMemory - (static_cast<uint64_t>(2) << 30)) * 0.3)
: (256 << 20)),
_rebalancingInterval(2 * 1000 * 1000) {
_rebalancingInterval(static_cast<uint64_t>(2 * 1000 * 1000)) {
setOptional(true);
requiresElevatedPrivileges(false);
startsAfter("Scheduler");

View File

@ -57,9 +57,13 @@ bool CachedValue::sameKey(void const* k, uint32_t kSize) const {
return (0 == memcmp(key(), k, keySize));
}
void CachedValue::lease() { refCount++; }
void CachedValue::lease() { ++refCount; }
void CachedValue::release() { refCount--; }
void CachedValue::release() {
if (--refCount == UINT32_MAX) {
TRI_ASSERT(false);
}
}
bool CachedValue::isFreeable() { return (refCount.load() == 0); }

View File

@ -26,6 +26,8 @@
using namespace arangodb::cache;
Finding::Finding() : _value(nullptr) {}
Finding::Finding(CachedValue* v) : _value(v) {
if (_value != nullptr) {
_value->lease();
@ -83,6 +85,16 @@ Finding::~Finding() {
void Finding::release() {
if (_value != nullptr) {
_value->release();
// reset value so we do not unintentionally release multiple times
_value = nullptr;
}
}
void Finding::set(CachedValue* v) {
TRI_ASSERT(_value == nullptr);
_value = v;
if (v != nullptr) {
_value->lease();
}
}

View File

@ -41,7 +41,8 @@ namespace cache {
////////////////////////////////////////////////////////////////////////////////
class Finding {
public:
Finding(CachedValue* v);
Finding();
explicit Finding(CachedValue* v);
Finding(Finding const& other);
Finding(Finding&& other);
Finding& operator=(Finding const& other);
@ -53,6 +54,12 @@ class Finding {
//////////////////////////////////////////////////////////////////////////////
void reset(CachedValue* v);
//////////////////////////////////////////////////////////////////////////////
/// @brief Sets the underlying CachedValue pointer. Assumes that the Finding
/// is currently empty
//////////////////////////////////////////////////////////////////////////////
void set(CachedValue* v);
//////////////////////////////////////////////////////////////////////////////
/// @brief Specifies whether the value was found. If not, value is nullptr.
//////////////////////////////////////////////////////////////////////////////

View File

@ -64,7 +64,7 @@ class FrequencyBuffer {
//////////////////////////////////////////////////////////////////////////////
/// @brief Initialize with the given capacity.
//////////////////////////////////////////////////////////////////////////////
FrequencyBuffer(uint64_t capacity)
explicit FrequencyBuffer(uint64_t capacity)
: _current(0),
_capacity(0),
_mask(0),
@ -91,7 +91,7 @@ class FrequencyBuffer {
//////////////////////////////////////////////////////////////////////////////
/// @brief Reports the memory usage in bytes.
//////////////////////////////////////////////////////////////////////////////
uint64_t memoryUsage() {
uint64_t memoryUsage() const {
return ((_capacity * sizeof(T)) + sizeof(FrequencyBuffer<T>) +
sizeof(std::vector<T>));
}

View File

@ -447,14 +447,14 @@ void Manager::reportAccess(std::shared_ptr<Cache> cache) {
void Manager::reportHitStat(Stat stat) {
switch (stat) {
case Stat::findHit: {
_findHits++;
++_findHits;
if (_enableWindowedStats && _findStats.get() != nullptr) {
_findStats->insertRecord(static_cast<uint8_t>(Stat::findHit));
}
break;
}
case Stat::findMiss: {
_findMisses++;
++_findMisses;
if (_enableWindowedStats && _findStats.get() != nullptr) {
_findStats->insertRecord(static_cast<uint8_t>(Stat::findMiss));
}
@ -466,14 +466,12 @@ void Manager::reportHitStat(Stat stat) {
bool Manager::isOperational() const {
TRI_ASSERT(_state.isLocked());
return (!_state.isSet(State::Flag::shutdown) &&
!_state.isSet(State::Flag::shuttingDown));
return !_state.isSet(State::Flag::shutdown, State::Flag::shuttingDown);
}
bool Manager::globalProcessRunning() const {
TRI_ASSERT(_state.isLocked());
return (_state.isSet(State::Flag::rebalancing) ||
_state.isSet(State::Flag::resizing));
return _state.isSet(State::Flag::rebalancing, State::Flag::resizing);
}
boost::asio::io_service* Manager::ioService() { return _ioService; }

View File

@ -74,6 +74,11 @@ bool State::isSet(State::Flag flag) const {
return ((_state.load() & static_cast<uint32_t>(flag)) > 0);
}
bool State::isSet(State::Flag flag1, State::Flag flag2) const {
TRI_ASSERT(isLocked());
return ((_state.load() & (static_cast<uint32_t>(flag1) | static_cast<uint32_t>(flag2))) > 0);
}
void State::toggleFlag(State::Flag flag) {
TRI_ASSERT(isLocked());
_state ^= static_cast<uint32_t>(flag);

View File

@ -107,6 +107,7 @@ struct State {
/// @brief Checks whether the given flag is set. Requires state to be locked.
//////////////////////////////////////////////////////////////////////////////
bool isSet(State::Flag flag) const;
bool isSet(State::Flag flag1, State::Flag flag2) const;
//////////////////////////////////////////////////////////////////////////////
/// @brief Toggles the given flag. Requires state to be locked.

View File

@ -39,13 +39,11 @@
#include <chrono>
#include <list>
#include <iostream>
using namespace arangodb::cache;
Finding TransactionalCache::find(void const* key, uint32_t keySize) {
TRI_ASSERT(key != nullptr);
Finding result(nullptr);
Finding result;
uint32_t hash = hashKey(key, keySize);
bool ok;
@ -54,7 +52,7 @@ Finding TransactionalCache::find(void const* key, uint32_t keySize) {
std::tie(ok, bucket, source) = getBucket(hash, Cache::triesFast);
if (ok) {
result.reset(bucket->find(hash, key, keySize));
result.set(bucket->find(hash, key, keySize));
recordStat(result.found() ? Stat::findHit : Stat::findMiss);
bucket->unlock();
endOperation();

View File

@ -1584,28 +1584,98 @@ int ClusterInfo::ensureIndexCoordinator(
VPackSlice const& slice, bool create,
bool (*compare)(VPackSlice const&, VPackSlice const&),
VPackBuilder& resultBuilder, std::string& errorMsg, double timeout) {
AgencyComm ac;
double const realTimeout = getTimeout(timeout);
double const endTime = TRI_microtime() + realTimeout;
double const interval = getPollInterval();
std::string where =
"Current/Collections/" + databaseName + "/" + collectionID;
// check index id
uint64_t iid = 0;
VPackSlice const idxSlice = slice.get("id");
if (idxSlice.isString()) {
VPackSlice const idSlice = slice.get("id");
if (idSlice.isString()) {
// use predefined index id
iid = arangodb::basics::StringUtils::uint64(idxSlice.copyString());
iid = arangodb::basics::StringUtils::uint64(idSlice.copyString());
}
if (iid == 0) {
// no id set, create a new one!
iid = uniqid();
}
std::string const idString = arangodb::basics::StringUtils::itoa(iid);
int errorCode = ensureIndexCoordinatorWithoutRollback(
databaseName, collectionID, idString, slice, create, compare, resultBuilder, errorMsg, timeout);
if (errorCode == TRI_ERROR_NO_ERROR) {
return errorCode;
}
std::shared_ptr<VPackBuilder> planValue;
std::shared_ptr<VPackBuilder> oldPlanIndexes;
std::shared_ptr<LogicalCollection> c;
size_t tries = 0;
do {
loadPlan();
// find index in plan
planValue = nullptr;
oldPlanIndexes.reset(new VPackBuilder());
c = getCollection(databaseName, collectionID);
c->getIndexesVPack(*(oldPlanIndexes.get()), false);
VPackSlice const planIndexes = oldPlanIndexes->slice();
if (planIndexes.isArray()) {
for (auto const& index : VPackArrayIterator(planIndexes)) {
auto idPlanSlice = index.get("id");
if (idPlanSlice.isString() && idPlanSlice.copyString() == idString) {
planValue.reset(new VPackBuilder());
planValue->add(index);
break;
}
}
}
if (!planValue) {
// hmm :S both empty :S did somebody else clean up? :S
// should not happen?
return errorCode;
}
std::string const planIndexesKey = "Plan/Collections/" + databaseName + "/" + collectionID +"/indexes";
std::vector<AgencyOperation> operations;
std::vector<AgencyPrecondition> preconditions;
if (planValue) {
AgencyOperation planEraser(planIndexesKey, AgencyValueOperationType::ERASE, planValue->slice());
TRI_ASSERT(oldPlanIndexes);
AgencyPrecondition planPrecondition(planIndexesKey, AgencyPrecondition::Type::VALUE, oldPlanIndexes->slice());
operations.push_back(planEraser);
operations.push_back(AgencyOperation("Plan/Version", AgencySimpleOperationType::INCREMENT_OP));
preconditions.push_back(planPrecondition);
}
AgencyWriteTransaction trx(operations, preconditions);
AgencyCommResult eraseResult = _agency.sendTransactionWithFailover(trx, 0.0);
if (eraseResult.successful()) {
loadPlan();
return errorCode;
}
std::chrono::duration<size_t, std::milli> waitTime(10);
std::this_thread::sleep_for(waitTime);
} while (++tries < 5);
LOG_TOPIC(ERR, Logger::CLUSTER) << "Couldn't roll back index creation of " << idString << ". Database: " << databaseName << ", Collection " << collectionID;
return errorCode;
}
int ClusterInfo::ensureIndexCoordinatorWithoutRollback(
std::string const& databaseName, std::string const& collectionID,
std::string const& idString, VPackSlice const& slice, bool create,
bool (*compare)(VPackSlice const&, VPackSlice const&),
VPackBuilder& resultBuilder, std::string& errorMsg, double timeout) {
AgencyComm ac;
double const realTimeout = getTimeout(timeout);
double const endTime = TRI_microtime() + realTimeout;
double const interval = getPollInterval();
TRI_ASSERT(resultBuilder.isEmpty());
std::string const key =
@ -1639,8 +1709,6 @@ int ClusterInfo::ensureIndexCoordinator(
std::shared_ptr<LogicalCollection> c =
getCollection(databaseName, collectionID);
READ_LOCKER(readLocker, _planProt.lock);
if (c == nullptr) {
return setErrormsg(TRI_ERROR_ARANGO_COLLECTION_NOT_FOUND, errorMsg);
}
@ -1705,8 +1773,6 @@ int ClusterInfo::ensureIndexCoordinator(
return setErrormsg(TRI_ERROR_CLUSTER_AGENCY_STRUCTURE_INVALID, errorMsg);
}
std::string const idString = arangodb::basics::StringUtils::itoa(iid);
try {
VPackObjectBuilder b(newBuilder.get());
// Create a new collection VPack with the new Index
@ -1833,6 +1899,9 @@ int ClusterInfo::ensureIndexCoordinator(
// local variables. Therefore we have to protect all accesses to them
// by a mutex. We use the mutex of the condition variable in the
// AgencyCallback for this.
std::string where =
"Current/Collections/" + databaseName + "/" + collectionID;
auto agencyCallback =
std::make_shared<AgencyCallback>(ac, where, dbServerChanged, true, false);
_agencyCallbackRegistry->registerCallback(agencyCallback);

View File

@ -538,6 +538,17 @@ class ClusterInfo {
double getReloadServerListTimeout() const { return 60.0; }
//////////////////////////////////////////////////////////////////////////////
/// @brief ensure an index in coordinator.
//////////////////////////////////////////////////////////////////////////////
int ensureIndexCoordinatorWithoutRollback(
std::string const& databaseName, std::string const& collectionID,
std::string const& idSlice, arangodb::velocypack::Slice const& slice, bool create,
bool (*compare)(arangodb::velocypack::Slice const&,
arangodb::velocypack::Slice const&),
arangodb::velocypack::Builder& resultBuilder, std::string& errorMsg, double timeout);
//////////////////////////////////////////////////////////////////////////////
/// @brief object for agency communication
//////////////////////////////////////////////////////////////////////////////

View File

@ -116,8 +116,8 @@ static int runServer(int argc, char** argv, ArangoGlobalContext &context) {
"Action", "Affinity",
"Agency", "Authentication",
"Cluster", "Daemon",
"Dispatcher", "FoxxQueues",
"GeneralServer", "LoggerBufferFeature",
"FoxxQueues", "GeneralServer",
"Greetings", "LoggerBufferFeature",
"Server", "SslServer",
"Statistics", "Supervisor"};

View File

@ -201,8 +201,8 @@ void RocksDBCollection::open(bool ignoreErrors) {
RocksDBEngine* engine =
static_cast<RocksDBEngine*>(EngineSelectorFeature::ENGINE);
auto counterValue = engine->counterManager()->loadCounter(this->objectId());
LOG_TOPIC(ERR, Logger::DEVEL)
<< " number of documents: " << counterValue.added();
LOG_TOPIC(ERR, Logger::DEVEL) << " number of documents: "
<< counterValue.added();
_numberDocuments = counterValue.added() - counterValue.removed();
_revisionId = counterValue.revisionId();
@ -433,10 +433,9 @@ std::shared_ptr<Index> RocksDBCollection::createIndex(
idx->toVelocyPack(indexInfo, false, true);
int res = static_cast<RocksDBEngine*>(engine)->writeCreateCollectionMarker(
_logicalCollection->vocbase()->id(), _logicalCollection->cid(),
builder.slice(),
RocksDBLogValue::IndexCreate(_logicalCollection->vocbase()->id(),
_logicalCollection->cid(),
indexInfo.slice()));
builder.slice(), RocksDBLogValue::IndexCreate(
_logicalCollection->vocbase()->id(),
_logicalCollection->cid(), indexInfo.slice()));
if (res != TRI_ERROR_NO_ERROR) {
// We could not persist the index creation. Better abort
// Remove the Index in the local list again.
@ -516,10 +515,9 @@ int RocksDBCollection::restoreIndex(transaction::Methods* trx,
TRI_ASSERT(engine != nullptr);
int res = engine->writeCreateCollectionMarker(
_logicalCollection->vocbase()->id(), _logicalCollection->cid(),
builder.slice(),
RocksDBLogValue::IndexCreate(_logicalCollection->vocbase()->id(),
_logicalCollection->cid(),
indexInfo.slice()));
builder.slice(), RocksDBLogValue::IndexCreate(
_logicalCollection->vocbase()->id(),
_logicalCollection->cid(), indexInfo.slice()));
if (res != TRI_ERROR_NO_ERROR) {
// We could not persist the index creation. Better abort
// Remove the Index in the local list again.
@ -553,6 +551,7 @@ bool RocksDBCollection::dropIndex(TRI_idx_iid_t iid) {
WRITE_LOCKER(guard, _indexesLock);
for (auto index : _indexes) {
RocksDBIndex* cindex = static_cast<RocksDBIndex*>(index.get());
TRI_ASSERT(cindex != nullptr);
if (iid == cindex->id()) {
int rv = cindex->drop();
@ -564,7 +563,7 @@ bool RocksDBCollection::dropIndex(TRI_idx_iid_t iid) {
_indexes.erase(_indexes.begin() + i);
events::DropIndex("", std::to_string(iid), TRI_ERROR_NO_ERROR);
// toVelocyPackIgnore will take a read lock and we don't need the
// lock anymore, we will always return
// lock anymore, we will always return
guard.unlock();
VPackBuilder builder = _logicalCollection->toVelocyPackIgnore(
@ -664,6 +663,7 @@ void RocksDBCollection::truncate(transaction::Methods* trx,
RocksDBIndex* rindex = static_cast<RocksDBIndex*>(index.get());
rindex->truncate(trx);
}
_needToPersistIndexEstimates = true;
}
/*
@ -1203,7 +1203,7 @@ void RocksDBCollection::figuresSpecific(
/// @brief creates the initial indexes for the collection
void RocksDBCollection::createInitialIndexes() {
{ // addIndex holds an internal write lock
{ // addIndex holds an internal write lock
READ_LOCKER(guard, _indexesLock);
if (!_indexes.empty()) {
return;
@ -1246,7 +1246,7 @@ void RocksDBCollection::addIndex(std::shared_ptr<arangodb::Index> idx) {
void RocksDBCollection::addIndexCoordinator(
std::shared_ptr<arangodb::Index> idx) {
WRITE_LOCKER(guard, _indexesLock);
auto const id = idx->id();
for (auto const& it : _indexes) {
if (it->id() == id) {
@ -1334,8 +1334,8 @@ arangodb::Result RocksDBCollection::fillIndexes(
// occured, this needs to happen since we are non transactional
if (!r.ok()) {
iter->reset();
rocksdb::WriteBatchWithIndex removeBatch(db->DefaultColumnFamily()->GetComparator(),
32 * 1024 * 1024);
rocksdb::WriteBatchWithIndex removeBatch(
db->DefaultColumnFamily()->GetComparator(), 32 * 1024 * 1024);
res = TRI_ERROR_NO_ERROR;
auto removeCb = [&](DocumentIdentifierToken token) {
@ -1358,6 +1358,9 @@ arangodb::Result RocksDBCollection::fillIndexes(
// Simon: Don't think so
db->Write(writeOpts, removeBatch.GetWriteBatch());
}
if (numDocsWritten > 0) {
_needToPersistIndexEstimates = true;
}
return r;
}
@ -1418,12 +1421,12 @@ RocksDBOperationResult RocksDBCollection::insertDocument(
READ_LOCKER(guard, _indexesLock);
for (std::shared_ptr<Index> const& idx : _indexes) {
innerRes.reset(idx->insert(trx, revisionId, doc, false));
// in case of no-memory, return immediately
if (innerRes.is(TRI_ERROR_OUT_OF_MEMORY)) {
return innerRes;
}
if (innerRes.fail()) {
// "prefer" unique constraint violated over other errors
if (innerRes.is(TRI_ERROR_ARANGO_UNIQUE_CONSTRAINT_VIOLATED) ||
@ -1444,6 +1447,7 @@ RocksDBOperationResult RocksDBCollection::insertDocument(
if (waitForSync) {
trx->state()->waitForSync(true);
}
_needToPersistIndexEstimates = true;
}
return res;
@ -1500,6 +1504,7 @@ RocksDBOperationResult RocksDBCollection::removeDocument(
if (waitForSync) {
trx->state()->waitForSync(true);
}
_needToPersistIndexEstimates = true;
}
return res;
@ -1744,6 +1749,7 @@ uint64_t RocksDBCollection::recalculateCounts() {
// need correction. The value is not changed and does not need to be synced
globalRocksEngine()->counterManager()->sync(true);
}
trx.commit();
return _numberDocuments;
}
@ -1788,6 +1794,73 @@ void RocksDBCollection::estimateSize(velocypack::Builder& builder) {
builder.close();
}
arangodb::Result RocksDBCollection::serializeIndexEstimates(
rocksdb::Transaction* rtrx) const {
if (!_needToPersistIndexEstimates) {
return {TRI_ERROR_NO_ERROR};
}
_needToPersistIndexEstimates = false;
std::string output;
rocksdb::TransactionDB* tdb = rocksutils::globalRocksDB();
for (auto index : getIndexes()) {
output.clear();
RocksDBIndex* cindex = static_cast<RocksDBIndex*>(index.get());
TRI_ASSERT(cindex != nullptr);
rocksutils::uint64ToPersistent(output, static_cast<uint64_t>(tdb->GetLatestSequenceNumber()));
cindex->serializeEstimate(output);
if (output.size() > sizeof(uint64_t)) {
RocksDBKey key =
RocksDBKey::IndexEstimateValue(cindex->objectId());
rocksdb::Slice value(output);
rocksdb::Status s = rtrx->Put(key.string(), value);
if (!s.ok()) {
LOG_TOPIC(WARN, Logger::ENGINES) << "writing index estimates failed";
rtrx->Rollback();
return rocksutils::convertStatus(s);
}
}
}
return {TRI_ERROR_NO_ERROR};
}
void RocksDBCollection::deserializeIndexEstimates(RocksDBCounterManager* mgr) {
std::vector<std::shared_ptr<Index>> toRecalculate;
for (auto const& it : getIndexes()) {
auto idx = static_cast<RocksDBIndex*>(it.get());
if (!idx->deserializeEstimate(mgr)) {
toRecalculate.push_back(it);
}
}
if (!toRecalculate.empty()) {
recalculateIndexEstimates(toRecalculate);
}
}
void RocksDBCollection::recalculateIndexEstimates() {
auto idxs = getIndexes();
recalculateIndexEstimates(idxs);
}
void RocksDBCollection::recalculateIndexEstimates(std::vector<std::shared_ptr<Index>>& indexes) {
// start transaction to get a collection lock
arangodb::SingleCollectionTransaction trx(
arangodb::transaction::StandaloneContext::Create(
_logicalCollection->vocbase()),
_logicalCollection->cid(), AccessMode::Type::EXCLUSIVE);
auto res = trx.begin();
if (res.fail()) {
THROW_ARANGO_EXCEPTION(res);
}
for (auto const& it : indexes) {
auto idx = static_cast<RocksDBIndex*>(it.get());
idx->recalculateEstimates();
}
_needToPersistIndexEstimates = true;
trx.commit();
}
void RocksDBCollection::createCache() const {
if (!_useCache || _cachePresent) {
// we leave this if we do not need the cache
@ -1826,7 +1899,7 @@ void RocksDBCollection::blackListKey(char const* data, std::size_t len) const {
bool blacklisted = false;
uint64_t attempts = 0;
while (!blacklisted) {
blacklisted = _cache->blacklist(data,(uint32_t)len);
blacklisted = _cache->blacklist(data, static_cast<uint32_t>(len));
if (attempts++ % 10 == 0) {
if (_cache->isShutdown()) {
disableCache();

View File

@ -32,6 +32,10 @@
#include "VocBase/KeyGenerator.h"
#include "VocBase/ManagedDocumentResult.h"
namespace rocksdb {
class Transaction;
}
namespace arangodb {
namespace cache {
class Cache;
@ -184,6 +188,11 @@ class RocksDBCollection final : public PhysicalCollection {
bool hasGeoIndex() { return _hasGeoIndex; }
Result serializeIndexEstimates(rocksdb::Transaction*) const;
void deserializeIndexEstimates(arangodb::RocksDBCounterManager* mgr);
void recalculateIndexEstimates();
private:
/// @brief return engine-specific figures
void figuresSpecific(
@ -220,15 +229,21 @@ class RocksDBCollection final : public PhysicalCollection {
arangodb::Result lookupRevisionVPack(TRI_voc_rid_t, transaction::Methods*,
arangodb::ManagedDocumentResult&) const;
void recalculateIndexEstimates(std::vector<std::shared_ptr<Index>>& indexes);
void createCache() const;
void disableCache() const;
inline bool useCache() const { return (_useCache && _cachePresent); }
void blackListKey(char const* data, std::size_t len) const;
private:
uint64_t const _objectId; // rocksdb-specific object id for collection
std::atomic<uint64_t> _numberDocuments;
std::atomic<TRI_voc_rid_t> _revisionId;
mutable std::atomic<bool> _needToPersistIndexEstimates;
/// upgrade write locks to exclusive locks if this flag is set
bool _hasGeoIndex;

View File

@ -132,6 +132,34 @@ void uint64ToPersistent(std::string& p, uint64_t value) {
} while (++len < sizeof(uint64_t));
}
uint16_t uint16FromPersistent(char const* p) {
uint16_t value = 0;
uint16_t x = 0;
uint8_t const* ptr = reinterpret_cast<uint8_t const*>(p);
uint8_t const* end = ptr + sizeof(uint16_t);
do {
value += static_cast<uint16_t>(*ptr++) << x;
x += 8;
} while (ptr < end);
return value;
}
void uint16ToPersistent(char* p, uint16_t value) {
char* end = p + sizeof(uint16_t);
do {
*p++ = static_cast<uint8_t>(value & 0xffU);
value >>= 8;
} while (p < end);
}
void uint16ToPersistent(std::string& p, uint16_t value) {
size_t len = 0;
do {
p.push_back(static_cast<char>(value & 0xffU));
value >>= 8;
} while (++len < sizeof(uint16_t));
}
bool hasObjectIds(VPackSlice const& inputSlice) {
bool rv = false;
if (inputSlice.isObject()) {

View File

@ -86,6 +86,10 @@ uint64_t uint64FromPersistent(char const* p);
void uint64ToPersistent(char* p, uint64_t value);
void uint64ToPersistent(std::string& out, uint64_t value);
uint16_t uint16FromPersistent(char const* p);
void uint16ToPersistent(char* p, uint16_t value);
void uint16ToPersistent(std::string& out, uint16_t value);
std::pair<VPackSlice, std::unique_ptr<VPackBuffer<uint8_t>>> stripObjectIds(
VPackSlice const& inputSlice, bool checkBeforeCopy = true);

View File

@ -23,15 +23,22 @@
#include "RocksDBCounterManager.h"
#include "ApplicationFeatures/ApplicationServer.h"
#include "Basics/ReadLocker.h"
#include "Basics/StringUtils.h"
#include "Basics/VelocyPackHelper.h"
#include "Basics/WriteLocker.h"
#include "Logger/Logger.h"
#include "RestServer/DatabaseFeature.h"
#include "RocksDBEngine/RocksDBCollection.h"
#include "RocksDBEngine/RocksDBCommon.h"
#include "RocksDBEngine/RocksDBCuckooIndexEstimator.h"
#include "RocksDBEngine/RocksDBEdgeIndex.h"
#include "RocksDBEngine/RocksDBKey.h"
#include "RocksDBEngine/RocksDBKeyBounds.h"
#include "RocksDBEngine/RocksDBVPackIndex.h"
#include "RocksDBEngine/RocksDBValue.h"
#include "StorageEngine/EngineSelectorFeature.h"
#include "VocBase/ticks.h"
#include <rocksdb/utilities/transaction_db.h>
@ -44,6 +51,7 @@
#include <velocypack/velocypack-aliases.h>
using namespace arangodb;
using namespace arangodb::application_features;
RocksDBCounterManager::CMValue::CMValue(VPackSlice const& slice)
: _sequenceNum(0), _count(0), _revisionId(0) {
@ -74,6 +82,7 @@ void RocksDBCounterManager::CMValue::serialize(VPackBuilder& b) const {
RocksDBCounterManager::RocksDBCounterManager(rocksdb::DB* db)
: _syncing(false), _db(db) {
readSettings();
readIndexEstimates();
readCounterValues();
if (!_counters.empty()) {
@ -135,7 +144,8 @@ void RocksDBCounterManager::updateCounter(uint64_t objectId,
}
}
arangodb::Result RocksDBCounterManager::setAbsoluteCounter(uint64_t objectId, uint64_t value) {
arangodb::Result RocksDBCounterManager::setAbsoluteCounter(uint64_t objectId,
uint64_t value) {
arangodb::Result res;
WRITE_LOCKER(guard, _rwLock);
auto it = _counters.find(objectId);
@ -219,7 +229,7 @@ Result RocksDBCounterManager::sync(bool force) {
return rocksutils::convertStatus(s);
}
}
// now write global settings
b.clear();
b.openObject();
@ -241,6 +251,46 @@ Result RocksDBCounterManager::sync(bool force) {
return rocksutils::convertStatus(s);
}
// Now persist the index estimates:
{
for (auto const& pair : copy) {
auto dbColPair = rocksutils::mapObjectToCollection(pair.first);
if (dbColPair.second == 0 && dbColPair.first == 0) {
// collection with this objectID not known.Skip.
continue;
}
auto dbfeature =
ApplicationServer::getFeature<DatabaseFeature>("Database");
TRI_ASSERT(dbfeature != nullptr);
auto vocbase = dbfeature->useDatabase(dbColPair.first);
if (vocbase == nullptr) {
// Bad state, we have references to a database that is not known
// anymore.
// However let's just skip in production. Not allowed to crash.
// If we cannot find this infos during recovery we can either recompute
// or start fresh.
continue;
}
auto collection = vocbase->lookupCollection(dbColPair.second);
if (collection == nullptr) {
// Bad state, we have references to a collection that is not known
// anymore.
// However let's just skip in production. Not allowed to crash.
// If we cannot find this infos during recovery we can either recompute
// or start fresh.
continue;
}
std::string estimateSerialisation;
auto rocksCollection =
static_cast<RocksDBCollection*>(collection->getPhysical());
TRI_ASSERT(rocksCollection != nullptr);
Result res = rocksCollection->serializeIndexEstimates(rtrx.get());
if (!res.ok()) {
return res;
}
}
}
// we have to commit all counters in one batch
s = rtrx->Commit();
if (s.ok()) {
@ -271,7 +321,7 @@ void RocksDBCounterManager::readSettings() {
basics::VelocyPackHelper::stringUInt64(slice.get("tick"));
LOG_TOPIC(TRACE, Logger::ENGINES) << "using last tick: " << lastTick;
TRI_UpdateTickServer(lastTick);
if (slice.hasKey("hlc")) {
uint64_t lastHlc =
basics::VelocyPackHelper::stringUInt64(slice.get("hlc"));
@ -286,6 +336,55 @@ void RocksDBCounterManager::readSettings() {
}
}
void RocksDBCounterManager::readIndexEstimates() {
WRITE_LOCKER(guard, _rwLock);
RocksDBKeyBounds bounds = RocksDBKeyBounds::IndexEstimateValues();
rocksdb::Comparator const* cmp = _db->GetOptions().comparator;
rocksdb::ReadOptions readOptions;
std::unique_ptr<rocksdb::Iterator> iter(_db->NewIterator(readOptions));
iter->Seek(bounds.start());
for (; iter->Valid() && cmp->Compare(iter->key(), bounds.end()) < 0;
iter->Next()) {
uint64_t objectId = RocksDBKey::counterObjectId(iter->key());
uint64_t lastSeqNumber =
rocksutils::uint64FromPersistent(iter->value().data());
StringRef estimateSerialisation(iter->value().data() + sizeof(uint64_t),
iter->value().size() - sizeof(uint64_t));
// If this hits we have two estimates for the same index
TRI_ASSERT(_estimators.find(objectId) == _estimators.end());
_estimators.emplace(
objectId,
std::make_pair(lastSeqNumber,
std::make_unique<RocksDBCuckooIndexEstimator<uint64_t>>(
estimateSerialisation)));
}
}
std::unique_ptr<RocksDBCuckooIndexEstimator<uint64_t>>
RocksDBCounterManager::stealIndexEstimator(uint64_t objectId) {
std::unique_ptr<RocksDBCuckooIndexEstimator<uint64_t>> res(nullptr);
auto it = _estimators.find(objectId);
if (it != _estimators.end()) {
// We swap out the stored estimate in order to move it to the caller
res.swap(it->second.second);
// Drop the now empty estimator
_estimators.erase(objectId);
}
return res;
}
void RocksDBCounterManager::clearIndexEstimators() {
// We call this to remove all index estimators that have been stored but are
// no longer read
// by recovery.
// TODO REMOVE RocksDB Keys of all not stolen values?
_estimators.clear();
}
/// Parse counter values from rocksdb
void RocksDBCounterManager::readCounterValues() {
WRITE_LOCKER(guard, _rwLock);
@ -312,14 +411,26 @@ struct WBReader final : public rocksdb::WriteBatch::Handler {
// must be set by the counter manager
std::unordered_map<uint64_t, rocksdb::SequenceNumber> seqStart;
std::unordered_map<uint64_t, RocksDBCounterManager::CounterAdjustment> deltas;
std::unordered_map<
uint64_t,
std::pair<uint64_t,
std::unique_ptr<RocksDBCuckooIndexEstimator<uint64_t>>>>*
_estimators;
rocksdb::SequenceNumber currentSeqNum;
uint64_t _maxTick = 0;
uint64_t _maxHLC = 0;
WBReader() : currentSeqNum(0) {}
WBReader(std::unordered_map<
uint64_t,
std::pair<uint64_t,
std::unique_ptr<RocksDBCuckooIndexEstimator<uint64_t>>>>*
estimators)
: _estimators(estimators), currentSeqNum(0) {}
~WBReader() {
// update ticks after parsing wal
LOG_TOPIC(TRACE, Logger::ENGINES) << "max tick found in WAL: " << _maxTick << ", last HLC value: " << _maxHLC;
LOG_TOPIC(TRACE, Logger::ENGINES) << "max tick found in WAL: " << _maxTick
<< ", last HLC value: " << _maxHLC;
TRI_UpdateTickServer(_maxTick);
TRI_HybridLogicalClock(_maxHLC);
@ -367,15 +478,16 @@ struct WBReader final : public rocksdb::WriteBatch::Handler {
storeMaxHLC(RocksDBKey::revisionId(key));
} else if (type == RocksDBEntryType::PrimaryIndexValue) {
// document key
StringRef ref = RocksDBKey::primaryKey(key);
StringRef ref = RocksDBKey::primaryKey(key);
TRI_ASSERT(!ref.empty());
// check if the key is numeric
if (ref[0] >= '1' && ref[0] <= '9') {
if (ref[0] >= '1' && ref[0] <= '9') {
// numeric start byte. looks good
try {
// extract uint64_t value from key. this will throw if the key
// is non-numeric
uint64_t tick = basics::StringUtils::uint64_check(ref.data(), ref.size());
uint64_t tick =
basics::StringUtils::uint64_check(ref.data(), ref.size());
// if no previous _maxTick set or the numeric value found is
// "near" our previous _maxTick, then we update it
if (tick > _maxTick && (_maxTick == 0 || tick - _maxTick < 2048)) {
@ -415,6 +527,32 @@ struct WBReader final : public rocksdb::WriteBatch::Handler {
it->second._added++;
it->second._revisionId = revisionId;
}
} else {
// We have to adjust the estimate with an insert
switch (RocksDBKey::type(key)) {
case RocksDBEntryType::IndexValue: {
uint64_t objectId = RocksDBKey::counterObjectId(key);
auto it = _estimators->find(objectId);
if (it != _estimators->end() && it->second.first < currentSeqNum) {
// We track estimates for this index
uint64_t hash = RocksDBVPackIndex::HashForKey(key);
it->second.second->insert(hash);
}
break;
}
case RocksDBEntryType::EdgeIndexValue: {
uint64_t objectId = RocksDBKey::counterObjectId(key);
auto it = _estimators->find(objectId);
if (it != _estimators->end() && it->second.first < currentSeqNum) {
// We track estimates for this index
uint64_t hash = RocksDBEdgeIndex::HashForKey(key);
it->second.second->insert(hash);
}
break;
}
default:
break;
}
}
}
@ -429,6 +567,32 @@ struct WBReader final : public rocksdb::WriteBatch::Handler {
it->second._removed++;
it->second._revisionId = revisionId;
}
} else {
// We have to adjust the estimate with an remove
switch (RocksDBKey::type(key)) {
case RocksDBEntryType::IndexValue: {
uint64_t objectId = RocksDBKey::counterObjectId(key);
auto it = _estimators->find(objectId);
if (it != _estimators->end() && it->second.first < currentSeqNum) {
// We track estimates for this index
uint64_t hash = RocksDBVPackIndex::HashForKey(key);
it->second.second->remove(hash);
}
break;
}
case RocksDBEntryType::EdgeIndexValue: {
uint64_t objectId = RocksDBKey::counterObjectId(key);
auto it = _estimators->find(objectId);
if (it != _estimators->end() && it->second.first < currentSeqNum) {
// We track estimates for this index
uint64_t hash = RocksDBEdgeIndex::HashForKey(key);
it->second.second->remove(hash);
}
break;
}
default:
break;
}
}
}
@ -442,8 +606,8 @@ bool RocksDBCounterManager::parseRocksWAL() {
rocksdb::SequenceNumber start = UINT64_MAX;
// Tell the WriteBatch reader the transaction markers to look for
auto handler = std::make_unique<WBReader>();
auto handler = std::make_unique<WBReader>(&_estimators);
for (auto const& pair : _counters) {
handler->seqStart.emplace(pair.first, pair.second._sequenceNum);
start = std::min(start, pair.second._sequenceNum);

View File

@ -28,6 +28,7 @@
#include "Basics/Common.h"
#include "Basics/ReadWriteLock.h"
#include "Basics/Result.h"
#include "RocksDBEngine/RocksDBCuckooIndexEstimator.h"
#include "RocksDBEngine/RocksDBTypes.h"
#include "VocBase/voc-types.h"
@ -78,8 +79,9 @@ class RocksDBCounterManager {
/// the sequence number used
void updateCounter(uint64_t objectId, CounterAdjustment const&);
//does not modify seq or revisionid
arangodb::Result setAbsoluteCounter(uint64_t objectId, uint64_t absouluteCount);
// does not modify seq or revisionid
arangodb::Result setAbsoluteCounter(uint64_t objectId,
uint64_t absouluteCount);
/// Thread-Safe remove a counter
void removeCounter(uint64_t objectId);
@ -87,7 +89,22 @@ class RocksDBCounterManager {
/// Thread-Safe force sync
arangodb::Result sync(bool force);
void readSettings();
// Steal the index estimator that the recovery has built up to inject it into
// an index.
// NOTE: If this returns nullptr the recovery was not ably to find any
// estimator
// for this index.
std::unique_ptr<arangodb::RocksDBCuckooIndexEstimator<uint64_t>>
stealIndexEstimator(uint64_t indexObjectId);
// Free up all index estimators that were not read by any index.
// This is to save some memory.
// NOTE: After calling this the stored estimate of all not yet
// read index estimators will be dropped and no attempt
// to reread it will be done.
// So call it after ALL indexes for all databases
// have been created in memory.
void clearIndexEstimators();
protected:
struct CMValue {
@ -105,6 +122,9 @@ class RocksDBCounterManager {
};
void readCounterValues();
void readSettings();
void readIndexEstimates();
bool parseRocksWAL();
//////////////////////////////////////////////////////////////////////////////
@ -112,6 +132,17 @@ class RocksDBCounterManager {
//////////////////////////////////////////////////////////////////////////////
std::unordered_map<uint64_t, CMValue> _counters;
//////////////////////////////////////////////////////////////////////////////
/// @brief Index Estimator contianer.
/// Note the elements in this container will be moved into the
/// index classes and are only temporarily stored here during recovery.
//////////////////////////////////////////////////////////////////////////////
std::unordered_map<
uint64_t,
std::pair<uint64_t,
std::unique_ptr<RocksDBCuckooIndexEstimator<uint64_t>>>>
_estimators;
//////////////////////////////////////////////////////////////////////////////
/// @brief synced sequence numbers
//////////////////////////////////////////////////////////////////////////////

View File

@ -0,0 +1,617 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2017 ArangoDB GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Daniel Larkin
/// @author Michael Hackstein
////////////////////////////////////////////////////////////////////////////////
#ifndef ARANGOD_ROCKSDB_ROCKSDB_INDEX_ESTIMATOR_H
#define ARANGOD_ROCKSDB_ROCKSDB_INDEX_ESTIMATOR_H 1
#include "Basics/Common.h"
#include "Basics/ReadLocker.h"
#include "Basics/StringRef.h"
#include "Basics/WriteLocker.h"
#include "Basics/fasthash.h"
#include "Logger/Logger.h"
#include "RocksDBEngine/RocksDBCommon.h"
// In the following template:
// Key is the key type, it must be copyable and movable, furthermore, Key
// must be default constructible (without arguments) as empty and
// must have an empty() method to indicate that the instance is
// empty.
// If using fasthash64 on all bytes of the object is not
// a suitable hash function, one has to instanciate the template
// with two hash function types as 3rd and 4th argument. If
// std::equal_to<Key> is not implemented or does not behave correctly,
// one has to supply a comparison class as well.
// This class is not thread-safe!
namespace arangodb {
// C++ wrapper for the hash function:
template <class T, uint64_t Seed>
class HashWithSeed {
public:
uint64_t operator()(T const& t) const {
// Some implementation like Fnv or xxhash looking at bytes in type T,
// taking the seed into account.
auto p = reinterpret_cast<void const*>(&t);
return fasthash64(p, sizeof(T), Seed);
}
};
template <class Key, class HashKey = HashWithSeed<Key, 0xdeadbeefdeadbeefULL>,
class Fingerprint = HashWithSeed<Key, 0xabcdefabcdef1234ULL>,
class HashShort = HashWithSeed<uint16_t, 0xfedcbafedcba4321ULL>,
class CompKey = std::equal_to<Key>>
class RocksDBCuckooIndexEstimator {
// Note that the following has to be a power of two and at least 4!
static constexpr uint32_t SlotsPerBucket = 4;
private:
// Helper class to abstract away where which data is stored.
struct Slot {
private:
uint16_t* _data;
public:
Slot(uint16_t* data) : _data(data) {}
~Slot() {
// Not responsible for anything
}
bool operator==(const Slot& other) { return _data == other._data; }
uint16_t* fingerprint() { return _data; }
uint16_t* counter() { return _data + 1; }
void reset() {
*fingerprint() = 0;
*counter() = 0;
}
bool isEqual(uint16_t fp) { return ((*fingerprint()) == fp); }
bool isEmpty() { return (*fingerprint()) == 0; }
};
enum SerializeFormat : char {
// To describe this format we use | as a seperator for readability, but it
// is NOT a printed character in the serialized string
// NOCOMPRESSION: type|length|size|nrUsed|nrCuckood|nrTotal|niceSize|logSize|base
NOCOMPRESSION = '0'
};
public:
RocksDBCuckooIndexEstimator(uint64_t size)
: _randState(0x2636283625154737ULL),
_slotSize(2 * sizeof(uint16_t)), // Sort out offsets and alignments
_nrUsed(0),
_nrCuckood(0),
_nrTotal(0),
_maxRounds(16) {
// Inflate size so that we have some padding to avoid failure
size *= 2.0;
size = (size >= 1024) ? size : 1024; // want 256 buckets minimum
// First find the smallest power of two that is not smaller than size:
size /= SlotsPerBucket;
_size = size;
initializeDefault();
}
RocksDBCuckooIndexEstimator(arangodb::StringRef const serialized)
: _randState(0x2636283625154737ULL),
_slotSize(2 * sizeof(uint16_t)), // Sort out offsets and alignments
_nrUsed(0),
_nrCuckood(0),
_nrTotal(0),
_maxRounds(16) {
switch (serialized.front()) {
case SerializeFormat::NOCOMPRESSION: {
deserializeUncompressed(serialized);
break;
}
default: {
LOG_TOPIC(ERR, arangodb::Logger::ENGINES) << "Unable to restore the "
"index estimates. Invalid "
"format persisted.";
initializeDefault();
}
}
}
~RocksDBCuckooIndexEstimator() { delete[] _allocBase; }
RocksDBCuckooIndexEstimator(RocksDBCuckooIndexEstimator const&) = delete;
RocksDBCuckooIndexEstimator(RocksDBCuckooIndexEstimator&&) = delete;
RocksDBCuckooIndexEstimator& operator=(RocksDBCuckooIndexEstimator const&) =
delete;
RocksDBCuckooIndexEstimator& operator=(RocksDBCuckooIndexEstimator&&) =
delete;
void serialize(std::string& serialized) const {
// This format is always hard coded and the serialisation has to support
// older formats
// for backwards compatibility
// We always have to start with the type and then the length
serialized += SerializeFormat::NOCOMPRESSION;
uint64_t serialLength =
(sizeof(SerializeFormat) + sizeof(uint64_t) + sizeof(_size) + sizeof(_nrUsed) +
sizeof(_nrCuckood) + sizeof(_nrTotal) + sizeof(_niceSize) +
sizeof(_logSize) + (_size * _slotSize * SlotsPerBucket));
serialized.reserve(sizeof(uint64_t) + serialLength);
// We always prepend the length, so parsing is easier
rocksutils::uint64ToPersistent(serialized, serialLength);
{
// Sorry we need a consistent state, so we have to read-lock from here on...
READ_LOCKER(locker, _bucketLock);
// Add all member variables
rocksutils::uint64ToPersistent(serialized, _size);
rocksutils::uint64ToPersistent(serialized, _nrUsed);
rocksutils::uint64ToPersistent(serialized, _nrCuckood);
rocksutils::uint64ToPersistent(serialized, _nrTotal);
rocksutils::uint64ToPersistent(serialized, _niceSize);
rocksutils::uint64ToPersistent(serialized, _logSize);
// Add the data blob
// Size is as follows: nrOfBuckets * SlotsPerBucket * SlotSize
TRI_ASSERT((_size * _slotSize * SlotsPerBucket) <= _allocSize);
for (uint64_t i = 0; i < (_size * _slotSize * SlotsPerBucket) / sizeof(uint16_t);
++i) {
rocksutils::uint16ToPersistent(serialized, *(reinterpret_cast<uint16_t*>(_base + i * 2)));
}
}
}
void clear() {
WRITE_LOCKER(locker, _bucketLock);
// Reset Stats
_nrTotal = 0;
_nrCuckood = 0;
_nrUsed = 0;
// Reset filter content
// Now initialize all slots in all buckets with zero data:
for (uint32_t b = 0; b < _size; ++b) {
for (size_t i = 0; i < SlotsPerBucket; ++i) {
Slot f = findSlot(b, i);
f.reset();
}
}
}
double computeEstimate() {
READ_LOCKER(locker, _bucketLock);
if (_nrTotal == 0) {
// If we do not have any documents we have a rather constant estimate.
return 1;
}
// _nrUsed; These are known to be distinct values
// _nrCuckood; These are eventually distinct documents with unknown state
return (double)(_nrUsed + ((double)_nrCuckood * 3 * _nrUsed / _nrTotal)) /
_nrTotal;
}
bool lookup(Key const& k) const {
// look up a key, return either false if no pair with key k is
// found or true.
uint64_t hash1 = _hasherKey(k);
uint64_t pos1 = hashToPos(hash1);
uint16_t fingerprint = keyToFingerprint(k);
// We compute the second hash already here to allow the result to
// survive a mispredicted branch in the first loop. Is this sensible?
uint64_t hash2 = _hasherPosFingerprint(pos1, fingerprint);
uint64_t pos2 = hashToPos(hash2);
bool found = false;
{
READ_LOCKER(guard, _bucketLock);
findSlotNoCuckoo(pos1, pos2, fingerprint, found);
}
return found;
}
bool insert(Key& k) {
// insert the key k
//
// The inserted key will have its fingerprint input entered in the table. If
// there is a collision and a fingerprint needs to be cuckooed, a certain
// number of attempts will be made. After that, a given fingerprint may
// simply be expunged. If something is expunged, the function will return
// false, otherwise true.
uint64_t hash1 = _hasherKey(k);
uint64_t pos1 = hashToPos(hash1);
uint16_t fingerprint = keyToFingerprint(k);
// We compute the second hash already here to let it survive a
// mispredicted
// branch in the first loop:
uint64_t hash2 = _hasherPosFingerprint(pos1, fingerprint);
uint64_t pos2 = hashToPos(hash2);
{
WRITE_LOCKER(guard, _bucketLock);
Slot slot = findSlotCuckoo(pos1, pos2, fingerprint);
if (slot.isEmpty()) {
// Free slot insert ourself.
*slot.fingerprint() = fingerprint;
*slot.counter() = 1; // We are the first element
_nrUsed++;
} else {
TRI_ASSERT(slot.isEqual(fingerprint));
// TODO replace with constant uint16_t max
if (*slot.counter() < 65536) {
// just to avoid overflow...
(*slot.counter())++;
}
}
}
_nrTotal++;
return true;
}
bool remove(Key const& k) {
// remove one element with key k, if one is in the table. Return true if
// a key was removed and false otherwise.
// look up a key, return either false if no pair with key k is
// found or true.
uint64_t hash1 = _hasherKey(k);
uint64_t pos1 = hashToPos(hash1);
uint16_t fingerprint = keyToFingerprint(k);
// We compute the second hash already here to allow the result to
// survive a mispredicted branch in the first loop. Is this sensible?
uint64_t hash2 = _hasherPosFingerprint(pos1, fingerprint);
uint64_t pos2 = hashToPos(hash2);
bool found = false;
_nrTotal--;
{
WRITE_LOCKER(guard, _bucketLock);
Slot slot = findSlotNoCuckoo(pos1, pos2, fingerprint, found);
if (found) {
if (*slot.counter() <= 1) {
// We remove the last one of those, free slot
slot.reset();
_nrUsed--;
} else {
// Just decrease the counter
(*slot.counter())--;
}
return true;
}
}
// If we get here we assume that the element was once inserted, but removed
// by cuckoo
// Reduce nrCuckood;
if (_nrCuckood > 0) {
--_nrCuckood;
}
return false;
}
uint64_t capacity() const { return _size * SlotsPerBucket; }
uint64_t nrUsed() const { return _nrUsed; }
uint64_t nrCuckood() const { return _nrCuckood; }
uint64_t memoryUsage() const {
return sizeof(RocksDBCuckooIndexEstimator) + _allocSize;
}
private: // methods
Slot findSlotNoCuckoo(uint64_t pos1, uint64_t pos2, uint16_t fp,
bool& found) const {
found = false;
Slot s = findSlotNoCuckoo(pos1, fp, found);
if (found) {
return s;
}
// Not found by first hash. use second hash.
return findSlotNoCuckoo(pos2, fp, found);
}
// Find a slot for this fingerprint
// This function guarantees the following:
// If this fingerprint is stored already, the slot will be
// pointing to this fingerprint.
// If this fingerprint is NOT storead already the returned slot
// will be empty and can be filled with the fingerprint
// In order to create an empty slot this function tries to
// cuckoo neighboring elements, if that does not succeed
// it deletes a random element occupying a position.
Slot findSlotCuckoo(uint64_t pos1, uint64_t pos2, uint16_t fp) {
Slot firstEmpty(nullptr);
bool foundEmpty = false;
for (uint64_t i = 0; i < SlotsPerBucket; ++i) {
Slot slot = findSlot(pos1, i);
if (slot.isEqual(fp)) {
// Found we are done, short-circuit.
return slot;
}
if (!foundEmpty && slot.isEmpty()) {
foundEmpty = true;
firstEmpty = slot;
}
}
for (uint64_t i = 0; i < SlotsPerBucket; ++i) {
Slot slot = findSlot(pos2, i);
if (slot.isEqual(fp)) {
// Found we are done, short-circuit.
return slot;
}
if (!foundEmpty && slot.isEmpty()) {
foundEmpty = true;
firstEmpty = slot;
}
}
// Value not yet inserted.
if (foundEmpty) {
// But we found an empty slot
return firstEmpty;
}
// We also did not find an empty slot, now the cuckoo goes...
uint16_t counter =
0; // We initially write a 0 in here, because the caller will
// Increase the counter by one
uint8_t r = pseudoRandomChoice();
if ((r & 1) != 0) {
std::swap(pos1, pos2);
}
// Now expunge a random element from any of these slots:
// and place our own into it.
// We have to keep the reference to the cuckood slot here.
r = pseudoRandomChoice();
uint64_t i = r & (SlotsPerBucket - 1);
firstEmpty = findSlot(pos1, i);
uint16_t fDummy = *firstEmpty.fingerprint();
uint16_t cDummy = *firstEmpty.counter();
*firstEmpty.fingerprint() = fp;
*firstEmpty.counter() = counter;
fp = fDummy;
counter = cDummy;
uint64_t hash2 = _hasherPosFingerprint(pos1, fp);
pos2 = hashToPos(hash2);
// Now let the cuckoo fly and find a place for the poor one we just took
// out.
for (uint64_t i = 0; i < SlotsPerBucket; ++i) {
Slot slot = findSlot(pos2, i);
if (slot.isEmpty()) {
// Yeah we found an empty place already
*slot.fingerprint() = fp;
*slot.counter() = counter;
++_nrUsed;
return firstEmpty;
}
}
// Bad luck, let us try to move to a different slot.
for (unsigned attempt = 1; attempt < _maxRounds; attempt++) {
std::swap(pos1, pos2);
// Now expunge a random element from any of these slots:
r = pseudoRandomChoice();
uint64_t i = r & (SlotsPerBucket - 1);
// We expunge the element at position pos1 and slot i:
Slot slot = findSlot(pos1, i);
if (slot == firstEmpty) {
// We have to keep this one in place.
// Take a different one
i = (i + 1) % SlotsPerBucket;
slot = findSlot(pos1, i);
}
fDummy = *slot.fingerprint();
cDummy = *slot.counter();
*slot.fingerprint() = fp;
*slot.counter() = counter;
fp = fDummy;
counter = cDummy;
hash2 = _hasherPosFingerprint(pos1, fp);
pos2 = hashToPos(hash2);
for (uint64_t i = 0; i < SlotsPerBucket; ++i) {
Slot slot = findSlot(pos2, i);
if (slot.isEmpty()) {
// Finally an empty place
*slot.fingerprint() = fp;
*slot.counter() = counter;
++_nrUsed;
return firstEmpty;
}
}
}
// If we get here we had to remove one of the elements.
// Let's increas the cuckoo counter
_nrCuckood++;
return firstEmpty;
}
// Do not use the output if found == false
Slot findSlotNoCuckoo(uint64_t pos, uint16_t fp, bool& found) const {
found = false;
for (uint64_t i = 0; i < SlotsPerBucket; ++i) {
Slot slot = findSlot(pos, i);
if (fp == *slot.fingerprint()) {
found = true;
return slot;
}
}
return Slot{nullptr};
}
Slot findSlot(uint64_t pos, uint64_t slot) const {
char* address = _base + _slotSize * (pos * SlotsPerBucket + slot);
auto ret = reinterpret_cast<uint16_t*>(address);
return Slot(ret);
}
uint64_t hashToPos(uint64_t hash) const {
uint64_t relevantBits = (hash >> _sizeShift) & _sizeMask;
return ((relevantBits < _size) ? relevantBits : (relevantBits - _size));
}
uint16_t keyToFingerprint(Key const& k) const {
uint64_t hash = _fingerprint(k);
uint16_t fingerprint = (uint16_t)(
(hash ^ (hash >> 16) ^ (hash >> 32) ^ (hash >> 48)) & 0xFFFF);
return (fingerprint ? fingerprint : 1);
}
uint64_t _hasherPosFingerprint(uint64_t pos, uint16_t fingerprint) const {
return ((pos << _sizeShift) ^ _hasherShort(fingerprint));
}
uint8_t pseudoRandomChoice() {
_randState = _randState * 997 + 17; // ignore overflows
return static_cast<uint8_t>((_randState >> 37) & 0xff);
}
void deserializeUncompressed(arangodb::StringRef const& serialized) {
// Assert that we have at least the member variables
TRI_ASSERT(serialized.size() >= (sizeof(SerializeFormat) + sizeof(uint64_t) + sizeof(_size) + sizeof(_nrUsed) +
sizeof(_nrCuckood) + sizeof(_nrTotal) + sizeof(_niceSize) +
sizeof(_logSize) ));
char const* current = serialized.data();
TRI_ASSERT(*current == SerializeFormat::NOCOMPRESSION);
current++; // Skip format char
uint64_t length = rocksutils::uint64FromPersistent(current);
current += sizeof(uint64_t);
// Validate that the serialized format is exactly as long as we expect it to be
TRI_ASSERT(serialized.size() == length);
_size = rocksutils::uint64FromPersistent(current);
current += sizeof(_size);
_nrUsed = rocksutils::uint64FromPersistent(current);
current += sizeof(_nrUsed);
_nrCuckood = rocksutils::uint64FromPersistent(current);
current += sizeof(_nrCuckood);
_nrTotal = rocksutils::uint64FromPersistent(current);
current += sizeof(_nrTotal);
_niceSize = rocksutils::uint64FromPersistent(current);
current += sizeof(_niceSize);
_logSize = rocksutils::uint64FromPersistent(current);
current += sizeof(_logSize);
deriveSizesAndAlloc();
// Validate that we have enough data in the serialized format.
TRI_ASSERT(serialized.size() ==
(sizeof(SerializeFormat) + sizeof( uint64_t) + sizeof(_size) + sizeof(_nrUsed) +
sizeof(_nrCuckood) + sizeof(_nrTotal) + sizeof(_niceSize) +
sizeof(_logSize) + (_size * _slotSize * SlotsPerBucket)));
// Insert the raw data
// Size is as follows: nrOfBuckets * SlotsPerBucket * SlotSize
TRI_ASSERT((_size * _slotSize * SlotsPerBucket) <= _allocSize);
for (uint64_t i = 0; i < (_size * _slotSize * SlotsPerBucket) / sizeof(uint16_t);
++i) {
*(reinterpret_cast<uint16_t*>(_base + i * 2)) = rocksutils::uint16FromPersistent(current + (i * sizeof(uint16_t)));
}
}
void initializeDefault() {
_niceSize = 256;
_logSize = 8;
while (_niceSize < _size) {
_niceSize <<= 1;
_logSize += 1;
}
deriveSizesAndAlloc();
// Now initialize all slots in all buckets with zero data:
for (uint32_t b = 0; b < _size; ++b) {
for (size_t i = 0; i < SlotsPerBucket; ++i) {
Slot f = findSlot(b, i);
f.reset();
}
}
}
void deriveSizesAndAlloc() {
_sizeMask = _niceSize - 1;
_sizeShift = (64 - _logSize) / 2;
_allocSize = _size * _slotSize * SlotsPerBucket +
64; // give 64 bytes padding to enable 64-byte alignment
_allocBase = new char[_allocSize];
_base = reinterpret_cast<char*>(
(reinterpret_cast<uintptr_t>(_allocBase) + 63) &
~((uintptr_t)0x3fu)); // to actually implement the 64-byte alignment,
// shift base pointer within allocated space to
// 64-byte boundary
}
private: // member variables
uint64_t _randState; // pseudo random state for expunging
size_t _slotSize; // total size of a slot
uint64_t _logSize; // logarithm (base 2) of number of buckets
uint64_t _size; // actual number of buckets
uint64_t _niceSize; // smallest power of 2 at least number of buckets, ==
// 2^_logSize
uint64_t _sizeMask; // used to mask out some bits from the hash
uint32_t _sizeShift; // used to shift the bits down to get a position
uint64_t _allocSize; // number of allocated bytes,
// == _size * SlotsPerBucket * _slotSize + 64
char* _base; // pointer to allocated space, 64-byte aligned
char* _allocBase; // base of original allocation
uint64_t _nrUsed; // number of pairs stored in the table
uint64_t _nrCuckood; // number of elements that have been removed by cuckoo
uint64_t _nrTotal; // number of elements included in total
unsigned _maxRounds; // maximum number of cuckoo rounds on insertion
HashKey _hasherKey; // Instance to compute the first hash function
Fingerprint _fingerprint; // Instance to compute a fingerprint of a key
HashShort _hasherShort; // Instance to compute the second hash function
arangodb::basics::ReadWriteLock mutable _bucketLock;
};
} // namespace arangodb
#endif

View File

@ -39,6 +39,7 @@
#include "RocksDBEngine/RocksDBCollection.h"
#include "RocksDBEngine/RocksDBCommon.h"
#include "RocksDBEngine/RocksDBCounterManager.h"
#include "RocksDBEngine/RocksDBKey.h"
#include "RocksDBEngine/RocksDBKeyBounds.h"
#include "RocksDBEngine/RocksDBToken.h"
@ -116,15 +117,17 @@ bool RocksDBEdgeIndexIterator::next(TokenCallback const& cb, size_t limit) {
// acquire RocksDB collection
RocksDBToken token;
auto iterateChachedValues = [this,&cb,&limit,&token](){
//LOG_TOPIC(ERR, Logger::FIXME) << "value found in cache ";
auto iterateCachedValues = [this,&cb,&limit,&token](){
while(_arrayIterator.valid()){
StringRef edgeKey(_arrayIterator.value());
if(lookupDocumentAndUseCb(edgeKey, cb, limit, token, true)){
_arrayIterator++;
return true; // more documents - function will be re-entered
VPackSlice edgeKey(_arrayIterator.value());
cb(RocksDBToken(edgeKey.getUInt()));
--limit;
++_arrayIterator;
if (limit == 0) {
_doUpdateArrayIterator=false; //limit hit continue with next batch
return true;
}
_arrayIterator++;
}
//reset cache iterator before handling next from/to
@ -137,7 +140,6 @@ bool RocksDBEdgeIndexIterator::next(TokenCallback const& cb, size_t limit) {
TRI_ASSERT(limit > 0);
StringRef fromTo = getFromToFromIterator(_keysIterator);
bool foundInCache = false;
//LOG_TOPIC(ERR, Logger::FIXME) << "fromTo" << fromTo;
if (_useCache && _doUpdateArrayIterator){
// try to find cached value
@ -162,7 +164,7 @@ bool RocksDBEdgeIndexIterator::next(TokenCallback const& cb, size_t limit) {
_arrayIterator = VPackArrayIterator(_arraySlice);
// iterate until batch size limit is hit
bool continueWithNextBatch = iterateChachedValues();
bool continueWithNextBatch = iterateCachedValues();
if(continueWithNextBatch){
return true; // exit and continue with next batch
}
@ -171,7 +173,7 @@ bool RocksDBEdgeIndexIterator::next(TokenCallback const& cb, size_t limit) {
// resuming old iterator
foundInCache = true; // do not look up key again!
_doUpdateArrayIterator = true;
bool continueWithNextBatch = iterateChachedValues();
bool continueWithNextBatch = iterateCachedValues();
if(continueWithNextBatch){
return true; // exit and continue with next batch
}
@ -197,17 +199,18 @@ bool RocksDBEdgeIndexIterator::next(TokenCallback const& cb, size_t limit) {
while (_iterator->Valid() && (_index->_cmp->Compare(_iterator->key(), _bounds.end()) < 0)) {
StringRef edgeKey = RocksDBKey::primaryKey(_iterator->key());
// lookup real document
bool continueWithNextBatch = lookupDocumentAndUseCb(edgeKey, cb, limit, token, false);
// build cache value for from/to
if(_useCache){
if (_cacheValueSize <= cacheValueSizeLimit){
_cacheValueBuilder.add(VPackValue(std::string(edgeKey.data(),edgeKey.size())));
_cacheValueBuilder.add(VPackValue(token.revisionId()));
++_cacheValueSize;
}
}
// lookup real document
bool continueWithNextBatch = lookupDocumentAndUseCb(edgeKey, cb, limit, token, false);
_iterator->Next();
//check batch size limit
if(continueWithNextBatch){
@ -272,19 +275,31 @@ void RocksDBEdgeIndexIterator::reset() {
// ============================= Index ====================================
uint64_t RocksDBEdgeIndex::HashForKey(const rocksdb::Slice& key) {
std::hash<StringRef> hasher;
// NOTE: This function needs to use the same hashing on the
// indexed VPack as the initial inserter does
StringRef tmp = RocksDBKey::vertexId(key);
return static_cast<uint64_t>(hasher(tmp));
}
RocksDBEdgeIndex::RocksDBEdgeIndex(TRI_idx_iid_t iid,
arangodb::LogicalCollection* collection,
VPackSlice const& info,
std::string const& attr)
: RocksDBIndex(iid
,collection
,std::vector<std::vector<AttributeName>>({{AttributeName(attr, false)}})
,false // unique
,false // sparse
,basics::VelocyPackHelper::stringUInt64(info, "objectId")
,!ServerState::instance()->isCoordinator() // useCache
)
, _directionAttr(attr) {
: RocksDBIndex(iid, collection, std::vector<std::vector<AttributeName>>(
{{AttributeName(attr, false)}}),
false, false,
basics::VelocyPackHelper::stringUInt64(info, "objectId"),
!ServerState::instance()->isCoordinator() /*useCache*/
),
_directionAttr(attr),
_estimator(nullptr) {
if (!ServerState::instance()->isCoordinator()) {
// We activate the estimator only on DBServers
_estimator = std::make_unique<RocksDBCuckooIndexEstimator<uint64_t>>(RocksDBIndex::ESTIMATOR_SIZE);
TRI_ASSERT(_estimator != nullptr);
}
TRI_ASSERT(iid != 0);
TRI_ASSERT(_objectId != 0);
// if we never hit the assertions we need to remove the
@ -306,25 +321,11 @@ double RocksDBEdgeIndex::selectivityEstimate(
return 0.1;
}
if (attribute != nullptr) {
// the index attribute is given here
// now check if we can restrict the selectivity estimation to the correct
// part of the index
if (attribute->compare(_directionAttr) == 0) {
// _from
return 0.2; //_edgesFrom->selectivity();
} else {
return 0;
}
// other attribute. now return the average selectivity
if (attribute != nullptr && attribute->compare(_directionAttr)) {
return 0;
}
// return average selectivity of the two index parts
// double estimate = (_edgesFrom->selectivity() + _edgesTo->selectivity()) *
// 0.5;
// TRI_ASSERT(estimate >= 0.0 &&
// estimate <= 1.00001); // floating-point tolerance
return 0.1;
TRI_ASSERT(_estimator != nullptr);
return _estimator->computeEstimate();
}
/// @brief return the memory usage for the index
@ -354,7 +355,7 @@ int RocksDBEdgeIndex::insert(transaction::Methods* trx,
VPackSlice primaryKey = doc.get(StaticStrings::KeyString);
VPackSlice fromTo = doc.get(_directionAttr);
TRI_ASSERT(primaryKey.isString() && fromTo.isString());
auto fromToRef=StringRef(fromTo);
auto fromToRef = StringRef(fromTo);
RocksDBKey key = RocksDBKey::EdgeIndexValue(_objectId, fromToRef,
StringRef(primaryKey));
//blacklist key in cache
@ -367,6 +368,9 @@ int RocksDBEdgeIndex::insert(transaction::Methods* trx,
rocksdb::Status status =
rtrx->Put(rocksdb::Slice(key.string()), rocksdb::Slice());
if (status.ok()) {
std::hash<StringRef> hasher;
uint64_t hash = static_cast<uint64_t>(hasher(fromToRef));
_estimator->insert(hash);
return TRI_ERROR_NO_ERROR;
} else {
return rocksutils::convertStatus(status).errorNumber();
@ -383,7 +387,7 @@ int RocksDBEdgeIndex::remove(transaction::Methods* trx,
bool isRollback) {
VPackSlice primaryKey = doc.get(StaticStrings::KeyString);
VPackSlice fromTo = doc.get(_directionAttr);
auto fromToRef=StringRef(fromTo);
auto fromToRef = StringRef(fromTo);
TRI_ASSERT(primaryKey.isString() && fromTo.isString());
RocksDBKey key = RocksDBKey::EdgeIndexValue(_objectId, fromToRef,
StringRef(primaryKey));
@ -396,6 +400,9 @@ int RocksDBEdgeIndex::remove(transaction::Methods* trx,
rocksdb::Transaction* rtrx = state->rocksTransaction();
rocksdb::Status status = rtrx->Delete(rocksdb::Slice(key.string()));
if (status.ok()) {
std::hash<StringRef> hasher;
uint64_t hash = static_cast<uint64_t>(hasher(fromToRef));
_estimator->remove(hash);
return TRI_ERROR_NO_ERROR;
} else {
return rocksutils::convertStatus(status).errorNumber();
@ -618,10 +625,48 @@ int RocksDBEdgeIndex::cleanup() {
return TRI_ERROR_NO_ERROR;
}
void RocksDBEdgeIndex::serializeEstimate(std::string& output) const {
TRI_ASSERT(_estimator != nullptr);
_estimator->serialize(output);
}
bool RocksDBEdgeIndex::deserializeEstimate(RocksDBCounterManager* mgr) {
TRI_ASSERT(!ServerState::instance()->isCoordinator());
// We simply drop the current estimator and steal the one from recovery
// We are than save for resizing issues in our _estimator format
// and will use the old size.
TRI_ASSERT(mgr != nullptr);
auto tmp = mgr->stealIndexEstimator(_objectId);
if (tmp == nullptr) {
// We expected to receive a stored index estimate, however we got none.
// We use the freshly created estimator but have to recompute it.
return false;
}
_estimator.swap(tmp);
TRI_ASSERT(_estimator != nullptr);
return true;
}
void RocksDBEdgeIndex::recalculateEstimates() {
TRI_ASSERT(_estimator != nullptr);
_estimator->clear();
auto bounds = RocksDBKeyBounds::EdgeIndex(_objectId);
rocksutils::iterateBounds(bounds, [&](rocksdb::Iterator* it) {
uint64_t hash = RocksDBEdgeIndex::HashForKey(it->key());
_estimator->insert(hash);
});
}
Result RocksDBEdgeIndex::postprocessRemove(transaction::Methods* trx,
rocksdb::Slice const& key,
rocksdb::Slice const& value) {
//blacklist keys during truncate
blackListKey(key.data(), key.size());
uint64_t hash = RocksDBEdgeIndex::HashForKey(key);
_estimator->remove(hash);
return {TRI_ERROR_NO_ERROR};
}

View File

@ -27,6 +27,7 @@
#include "Basics/Common.h"
#include "Indexes/Index.h"
#include "Indexes/IndexIterator.h"
#include "RocksDBEngine/RocksDBCuckooIndexEstimator.h"
#include "RocksDBEngine/RocksDBIndex.h"
#include "RocksDBEngine/RocksDBKey.h"
#include "RocksDBEngine/RocksDBKeyBounds.h"
@ -85,6 +86,8 @@ class RocksDBEdgeIndex final : public RocksDBIndex {
friend class RocksDBEdgeIndexIterator;
public:
static uint64_t HashForKey(const rocksdb::Slice& key);
RocksDBEdgeIndex() = delete;
RocksDBEdgeIndex(TRI_idx_iid_t, arangodb::LogicalCollection*,
@ -156,6 +159,13 @@ class RocksDBEdgeIndex final : public RocksDBIndex {
arangodb::velocypack::Builder&) const override;
int cleanup() override;
void serializeEstimate(std::string& output) const override;
bool deserializeEstimate(arangodb::RocksDBCounterManager* mgr) override;
void recalculateEstimates() override;
private:
/// @brief create the iterator
IndexIterator* createEqIterator(transaction::Methods*, ManagedDocumentResult*,
@ -171,6 +181,12 @@ class RocksDBEdgeIndex final : public RocksDBIndex {
arangodb::aql::AstNode const* valNode) const;
std::string _directionAttr;
/// @brief A fixed size library to estimate the selectivity of the index.
/// On insertion of a document we have to insert it into the estimator,
/// On removal we have to remove it in the estimator as well.
std::unique_ptr<RocksDBCuckooIndexEstimator<uint64_t>> _estimator;
};
} // namespace arangodb

View File

@ -290,6 +290,21 @@ void RocksDBEngine::stop() {
return;
}
replicationManager()->dropAll();
if (_backgroundThread) {
// stop the press
_backgroundThread->beginShutdown();
if (_counterManager) {
_counterManager->sync(true);
}
// wait until background thread stops
while (_backgroundThread->isRunning()) {
usleep(10000);
}
_backgroundThread.reset();
}
}
void RocksDBEngine::unprepare() {
@ -298,15 +313,6 @@ void RocksDBEngine::unprepare() {
}
if (_db) {
if (_backgroundThread && _backgroundThread->isRunning()) {
// stop the press
_backgroundThread->beginShutdown();
_backgroundThread.reset();
}
if (_counterManager) {
_counterManager->sync(true);
}
// now prune all obsolete WAL files
determinePrunableWalFiles(0);
pruneWalFiles();
@ -1206,7 +1212,7 @@ TRI_vocbase_t* RocksDBEngine::openExistingDatabase(TRI_voc_tick_t id,
VPackSlice slice = builder.slice();
TRI_ASSERT(slice.isArray());
for (auto const& it : VPackArrayIterator(slice)) {
// we found a collection that is still active
TRI_ASSERT(!it.get("id").isNone() || !it.get("cid").isNone());
@ -1222,6 +1228,7 @@ TRI_vocbase_t* RocksDBEngine::openExistingDatabase(TRI_voc_tick_t id,
static_cast<RocksDBCollection*>(collection->getPhysical());
TRI_ASSERT(physical != nullptr);
physical->deserializeIndexEstimates(counterManager());
LOG_TOPIC(DEBUG, arangodb::Logger::FIXME)
<< "added document collection '" << collection->name() << "'";
}

View File

@ -24,6 +24,7 @@
#ifndef ARANGOD_ROCKSDB_ROCKSDB_HASH_INDEX_H
#define ARANGOD_ROCKSDB_ROCKSDB_HASH_INDEX_H 1
#include "Basics/VelocyPackHelper.h"
#include "RocksDBEngine/RocksDBVPackIndex.h"
namespace arangodb {
@ -44,6 +45,7 @@ class RocksDBHashIndex final : public RocksDBVPackIndex {
bool matchesDefinition(VPackSlice const& info) const override;
bool isSorted() const override { return true; }
};
}

View File

@ -38,6 +38,12 @@
using namespace arangodb;
using namespace arangodb::rocksutils;
// This is the number of distinct elements the index estimator can reliably store
// This correlates directly with the memmory of the estimator:
// memmory == ESTIMATOR_SIZE * 6 bytes
uint64_t const arangodb::RocksDBIndex::ESTIMATOR_SIZE = 4096;
RocksDBIndex::RocksDBIndex(
TRI_idx_iid_t id, LogicalCollection* collection,
std::vector<std::vector<arangodb::basics::AttributeName>> const& attributes,
@ -162,6 +168,22 @@ int RocksDBIndex::drop() {
return TRI_ERROR_NO_ERROR;
}
void RocksDBIndex::serializeEstimate(std::string&) const {
// All indexes that do not have an estimator do not serialize anything.
}
bool RocksDBIndex::deserializeEstimate(RocksDBCounterManager*) {
// All indexes that do not have an estimator do not deserialize anything.
// So the estimate is always recreatable.
// We do not advance anything here.
return true;
}
void RocksDBIndex::recalculateEstimates() {
// Nothing to do.
return;
}
void RocksDBIndex::truncate(transaction::Methods* trx) {
RocksDBTransactionState* state = rocksutils::toRocksTransactionState(trx);
rocksdb::Transaction* rtrx = state->rocksTransaction();

View File

@ -41,8 +41,16 @@ class Cache;
}
class LogicalCollection;
class RocksDBComparator;
class RocksDBCounterManager;
class RocksDBIndex : public Index {
protected:
// This is the number of distinct elements the index estimator can reliably store
// This correlates directly with the memmory of the estimator:
// memmory == ESTIMATOR_SIZE * 6 bytes
static uint64_t const ESTIMATOR_SIZE;
protected:
RocksDBIndex(TRI_idx_iid_t, LogicalCollection*,
std::vector<std::vector<arangodb::basics::AttributeName>> const&
@ -89,6 +97,12 @@ class RocksDBIndex : public Index {
void createCache();
void disableCache();
virtual void serializeEstimate(std::string& output) const;
virtual bool deserializeEstimate(RocksDBCounterManager* mgr);
virtual void recalculateEstimates();
protected:
// Will be called during truncate to allow the index to update selectivity
// estimates, blacklist keys, etc.

View File

@ -111,6 +111,9 @@ RocksDBKey RocksDBKey::ReplicationApplierConfig(TRI_voc_tick_t databaseId) {
return RocksDBKey(RocksDBEntryType::ReplicationApplierConfig, databaseId);
}
RocksDBKey RocksDBKey::IndexEstimateValue(uint64_t collectionObjectId) {
return RocksDBKey(RocksDBEntryType::IndexEstimateValue, collectionObjectId);
}
// ========================= Member methods ===========================
RocksDBEntryType RocksDBKey::type(RocksDBKey const& key) {
@ -168,12 +171,11 @@ arangodb::StringRef RocksDBKey::primaryKey(RocksDBKey const& key) {
arangodb::StringRef RocksDBKey::primaryKey(rocksdb::Slice const& slice) {
return primaryKey(slice.data(), slice.size());
}
std::string RocksDBKey::vertexId(RocksDBKey const& key) {
StringRef RocksDBKey::vertexId(RocksDBKey const& key) {
return vertexId(key._buffer.data(), key._buffer.size());
}
std::string RocksDBKey::vertexId(rocksdb::Slice const& slice) {
StringRef RocksDBKey::vertexId(rocksdb::Slice const& slice) {
return vertexId(slice.data(), slice.size());
}
@ -214,6 +216,7 @@ RocksDBKey::RocksDBKey(RocksDBEntryType type, uint64_t first)
switch (_type) {
case RocksDBEntryType::Database:
case RocksDBEntryType::CounterValue:
case RocksDBEntryType::IndexEstimateValue:
case RocksDBEntryType::ReplicationApplierConfig: {
size_t length = sizeof(char) + sizeof(uint64_t);
_buffer.reserve(length);
@ -456,7 +459,7 @@ arangodb::StringRef RocksDBKey::primaryKey(char const* data, size_t size) {
}
}
std::string RocksDBKey::vertexId(char const* data, size_t size) {
StringRef RocksDBKey::vertexId(char const* data, size_t size) {
TRI_ASSERT(data != nullptr);
TRI_ASSERT(size >= sizeof(char));
RocksDBEntryType type = static_cast<RocksDBEntryType>(data[0]);
@ -466,7 +469,7 @@ std::string RocksDBKey::vertexId(char const* data, size_t size) {
size_t keySize = static_cast<size_t>(data[size - 1]);
size_t idSize = size - (sizeof(char) + sizeof(uint64_t) + sizeof(char) +
keySize + sizeof(uint8_t));
return std::string(data + sizeof(char) + sizeof(uint64_t), idSize);
return StringRef(data + sizeof(char) + sizeof(uint64_t), idSize);
}
default:

View File

@ -135,6 +135,12 @@ class RocksDBKey {
//////////////////////////////////////////////////////////////////////////////
static RocksDBKey ReplicationApplierConfig(TRI_voc_tick_t databaseId);
//////////////////////////////////////////////////////////////////////////////
/// @brief Create a fully-specified key for index estimate values of
/// a collection
//////////////////////////////////////////////////////////////////////////////
static RocksDBKey IndexEstimateValue(uint64_t objectId);
public:
//////////////////////////////////////////////////////////////////////////////
/// @brief Extracts the type from a key
@ -211,8 +217,8 @@ class RocksDBKey {
///
/// May be called only on EdgeIndexValue keys. Other types will throw.
//////////////////////////////////////////////////////////////////////////////
static std::string vertexId(RocksDBKey const&);
static std::string vertexId(rocksdb::Slice const&);
static StringRef vertexId(RocksDBKey const&);
static StringRef vertexId(rocksdb::Slice const&);
//////////////////////////////////////////////////////////////////////////////
/// @brief Extracts the indexed VelocyPack values from a key
@ -264,7 +270,7 @@ class RocksDBKey {
static TRI_voc_cid_t viewId(char const* data, size_t size);
static TRI_voc_rid_t revisionId(char const* data, size_t size);
static StringRef primaryKey(char const* data, size_t size);
static std::string vertexId(char const* data, size_t size);
static StringRef vertexId(char const* data, size_t size);
static VPackSlice indexedVPack(char const* data, size_t size);
private:

View File

@ -119,6 +119,10 @@ RocksDBKeyBounds RocksDBKeyBounds::CounterValues() {
return RocksDBKeyBounds(RocksDBEntryType::CounterValue);
}
RocksDBKeyBounds RocksDBKeyBounds::IndexEstimateValues() {
return RocksDBKeyBounds(RocksDBEntryType::IndexEstimateValue);
}
RocksDBKeyBounds RocksDBKeyBounds::FulltextIndexPrefix(
uint64_t indexId, arangodb::StringRef const& word) {
// I did not want to pass a bool to the constructor for this
@ -203,7 +207,8 @@ RocksDBKeyBounds::RocksDBKeyBounds(RocksDBEntryType type)
break;
}
case RocksDBEntryType::CounterValue: {
case RocksDBEntryType::CounterValue:
case RocksDBEntryType::IndexEstimateValue: {
size_t length = sizeof(char) + sizeof(uint64_t);
_startBuffer.reserve(length);
_startBuffer.push_back(static_cast<char>(_type));
@ -214,7 +219,6 @@ RocksDBKeyBounds::RocksDBKeyBounds(RocksDBEntryType type)
uint64ToPersistent(_endBuffer, UINT64_MAX);
break;
}
default:
THROW_ARANGO_EXCEPTION(TRI_ERROR_BAD_PARAMETER);
}

View File

@ -123,6 +123,11 @@ class RocksDBKeyBounds {
//////////////////////////////////////////////////////////////////////////////
static RocksDBKeyBounds CounterValues();
//////////////////////////////////////////////////////////////////////////////
/// @brief Bounds for all index estimate values
//////////////////////////////////////////////////////////////////////////////
static RocksDBKeyBounds IndexEstimateValues();
//////////////////////////////////////////////////////////////////////////////
/// @brief Bounds for all entries of a fulltext index, matching prefixes
//////////////////////////////////////////////////////////////////////////////

View File

@ -103,6 +103,12 @@ static rocksdb::Slice ReplicationApplierConfig(
reinterpret_cast<std::underlying_type<RocksDBEntryType>::type*>(
&replicationApplierConfig),
1);
static RocksDBEntryType indexEstimateValue = RocksDBEntryType::IndexEstimateValue;
static rocksdb::Slice IndexEstimateValue(
reinterpret_cast<std::underlying_type<RocksDBEntryType>::type*>(
&indexEstimateValue),
1);
}
char const* arangodb::rocksDBEntryTypeName(arangodb::RocksDBEntryType type) {
@ -120,6 +126,7 @@ char const* arangodb::rocksDBEntryTypeName(arangodb::RocksDBEntryType type) {
case arangodb::RocksDBEntryType::ReplicationApplierConfig: return "ReplicationApplierConfig";
case arangodb::RocksDBEntryType::FulltextIndexValue: return "FulltextIndexValue";
case arangodb::RocksDBEntryType::GeoIndexValue: return "GeoIndexValue";
case arangodb::RocksDBEntryType::IndexEstimateValue: return "IndexEstimateValue";
}
return "Invalid";
}
@ -175,6 +182,8 @@ rocksdb::Slice const& arangodb::rocksDBSlice(RocksDBEntryType const& type) {
return SettingsValue;
case RocksDBEntryType::ReplicationApplierConfig:
return ReplicationApplierConfig;
case RocksDBEntryType::IndexEstimateValue:
return IndexEstimateValue;
}
return Document; // avoids warning - errorslice instead ?!

View File

@ -48,7 +48,8 @@ enum class RocksDBEntryType : char {
SettingsValue = '9',
ReplicationApplierConfig = ':',
FulltextIndexValue = ';',
GeoIndexValue = '<'
GeoIndexValue = '<',
IndexEstimateValue = '='
};
char const* rocksDBEntryTypeName(RocksDBEntryType);

View File

@ -34,6 +34,7 @@
#include "RocksDBEngine/RocksDBCollection.h"
#include "RocksDBEngine/RocksDBCommon.h"
#include "RocksDBEngine/RocksDBComparator.h"
#include "RocksDBEngine/RocksDBCounterManager.h"
#include "RocksDBEngine/RocksDBPrimaryIndex.h"
#include "RocksDBEngine/RocksDBToken.h"
#include "RocksDBEngine/RocksDBTransactionState.h"
@ -157,13 +158,27 @@ bool RocksDBVPackIndexIterator::next(TokenCallback const& cb, size_t limit) {
return true;
}
uint64_t RocksDBVPackIndex::HashForKey(const rocksdb::Slice& key) {
// NOTE: This function needs to use the same hashing on the
// indexed VPack as the initial inserter does
VPackSlice tmp = RocksDBKey::indexedVPack(key);
return tmp.normalizedHash();
}
/// @brief create the index
RocksDBVPackIndex::RocksDBVPackIndex(TRI_idx_iid_t iid,
arangodb::LogicalCollection* collection,
arangodb::velocypack::Slice const& info)
: RocksDBIndex(iid, collection, info),
_useExpansion(false),
_allowPartialIndex(true) {
_allowPartialIndex(true),
_estimator(nullptr) {
if (!_unique && !ServerState::instance()->isCoordinator()) {
// We activate the estimator for all non unique-indexes.
// And only on DBServers
_estimator = std::make_unique<RocksDBCuckooIndexEstimator<uint64_t>>(RocksDBIndex::ESTIMATOR_SIZE);
TRI_ASSERT(_estimator != nullptr);
}
TRI_ASSERT(!_fields.empty());
TRI_ASSERT(iid != 0);
@ -181,6 +196,13 @@ RocksDBVPackIndex::RocksDBVPackIndex(TRI_idx_iid_t iid,
/// @brief destroy the index
RocksDBVPackIndex::~RocksDBVPackIndex() {}
double RocksDBVPackIndex::selectivityEstimate(arangodb::StringRef const*) const {
if (_unique) {
return 1.0; // only valid if unique
}
return _estimator->computeEstimate();
}
size_t RocksDBVPackIndex::memory() const {
rocksdb::TransactionDB* db = rocksutils::globalRocksDB();
RocksDBKeyBounds bounds = _unique ? RocksDBKeyBounds::UniqueIndex(_objectId)
@ -236,7 +258,8 @@ bool RocksDBVPackIndex::implicitlyUnique() const {
int RocksDBVPackIndex::fillElement(VPackBuilder& leased,
TRI_voc_rid_t revisionId,
VPackSlice const& doc,
std::vector<RocksDBKey>& elements) {
std::vector<RocksDBKey>& elements,
std::vector<uint64_t>& hashes) {
if (doc.isNone()) {
LOG_TOPIC(ERR, arangodb::Logger::FIXME)
<< "encountered invalid marker with slice of type None";
@ -292,12 +315,13 @@ int RocksDBVPackIndex::fillElement(VPackBuilder& leased,
// - Value: empty
elements.push_back(
RocksDBKey::IndexValue(_objectId, key, leased.slice()));
hashes.push_back(leased.slice().normalizedHash());
}
} else {
// other path for handling array elements, too
std::vector<VPackSlice> sliceStack;
buildIndexValues(leased, doc, 0, elements, sliceStack);
buildIndexValues(leased, doc, 0, elements, sliceStack, hashes);
}
return TRI_ERROR_NO_ERROR;
@ -306,7 +330,8 @@ int RocksDBVPackIndex::fillElement(VPackBuilder& leased,
void RocksDBVPackIndex::addIndexValue(VPackBuilder& leased,
VPackSlice const& document,
std::vector<RocksDBKey>& elements,
std::vector<VPackSlice>& sliceStack) {
std::vector<VPackSlice>& sliceStack,
std::vector<uint64_t>& hashes) {
leased.clear();
leased.openArray();
for (VPackSlice const& s : sliceStack) {
@ -326,6 +351,7 @@ void RocksDBVPackIndex::addIndexValue(VPackBuilder& leased,
// + primary key
// - Value: empty
elements.push_back(RocksDBKey::IndexValue(_objectId, key, leased.slice()));
hashes.push_back(leased.slice().normalizedHash());
}
}
@ -334,12 +360,13 @@ void RocksDBVPackIndex::buildIndexValues(VPackBuilder& leased,
VPackSlice const document,
size_t level,
std::vector<RocksDBKey>& elements,
std::vector<VPackSlice>& sliceStack) {
std::vector<VPackSlice>& sliceStack,
std::vector<uint64_t>& hashes) {
// Invariant: level == sliceStack.size()
// Stop the recursion:
if (level == _paths.size()) {
addIndexValue(leased, document, elements, sliceStack);
addIndexValue(leased, document, elements, sliceStack, hashes);
return;
}
@ -353,7 +380,7 @@ void RocksDBVPackIndex::buildIndexValues(VPackBuilder& leased,
} else {
sliceStack.emplace_back(slice);
}
buildIndexValues(leased, document, level + 1, elements, sliceStack);
buildIndexValues(leased, document, level + 1, elements, sliceStack, hashes);
sliceStack.pop_back();
return;
}
@ -374,7 +401,7 @@ void RocksDBVPackIndex::buildIndexValues(VPackBuilder& leased,
for (size_t i = level; i < _paths.size(); i++) {
sliceStack.emplace_back(illegalSlice);
}
addIndexValue(leased, document, elements, sliceStack);
addIndexValue(leased, document, elements, sliceStack, hashes);
for (size_t i = level; i < _paths.size(); i++) {
sliceStack.pop_back();
}
@ -409,7 +436,7 @@ void RocksDBVPackIndex::buildIndexValues(VPackBuilder& leased,
if (it == seen.end()) {
seen.insert(something);
sliceStack.emplace_back(something);
buildIndexValues(leased, document, level + 1, elements, sliceStack);
buildIndexValues(leased, document, level + 1, elements, sliceStack, hashes);
sliceStack.pop_back();
}
};
@ -472,10 +499,11 @@ int RocksDBVPackIndex::insert(transaction::Methods* trx,
TRI_voc_rid_t revisionId, VPackSlice const& doc,
bool isRollback) {
std::vector<RocksDBKey> elements;
std::vector<uint64_t> hashes;
int res;
try {
transaction::BuilderLeaser leased(trx);
res = fillElement(*(leased.get()), revisionId, doc, elements);
res = fillElement(*(leased.get()), revisionId, doc, elements, hashes);
} catch (...) {
res = TRI_ERROR_OUT_OF_MEMORY;
}
@ -530,6 +558,12 @@ int RocksDBVPackIndex::insert(transaction::Methods* trx,
}
}
for (auto& it : hashes) {
// The estimator is only useful if we are in a non-unique indexes
TRI_ASSERT(!_unique);
_estimator->insert(it);
}
return res;
}
@ -537,10 +571,11 @@ int RocksDBVPackIndex::insertRaw(rocksdb::WriteBatchWithIndex* writeBatch,
TRI_voc_rid_t revisionId,
VPackSlice const& doc) {
std::vector<RocksDBKey> elements;
std::vector<uint64_t> hashes;
int res;
try {
VPackBuilder leased;
res = fillElement(leased, revisionId, doc, elements);
res = fillElement(leased, revisionId, doc, elements, hashes);
} catch (...) {
return TRI_ERROR_OUT_OF_MEMORY;
}
@ -569,6 +604,14 @@ int RocksDBVPackIndex::insertRaw(rocksdb::WriteBatchWithIndex* writeBatch,
writeBatch->Put(key.string(), value.string());
}
}
for (auto& it : hashes) {
// The estimator is only useful if we are in a non-unique indexes
TRI_ASSERT(!_unique);
_estimator->insert(it);
}
return res;
}
@ -577,11 +620,12 @@ int RocksDBVPackIndex::remove(transaction::Methods* trx,
TRI_voc_rid_t revisionId, VPackSlice const& doc,
bool isRollback) {
std::vector<RocksDBKey> elements;
std::vector<uint64_t> hashes;
int res;
try {
transaction::BuilderLeaser leased(trx);
res = fillElement(*(leased.get()), revisionId, doc, elements);
res = fillElement(*(leased.get()), revisionId, doc, elements, hashes);
} catch (...) {
res = TRI_ERROR_OUT_OF_MEMORY;
}
@ -602,6 +646,12 @@ int RocksDBVPackIndex::remove(transaction::Methods* trx,
}
}
for (auto& it : hashes) {
// The estimator is only useful if we are in a non-unique indexes
TRI_ASSERT(!_unique);
_estimator->remove(it);
}
return res;
}
@ -609,11 +659,12 @@ int RocksDBVPackIndex::removeRaw(rocksdb::WriteBatchWithIndex* writeBatch,
TRI_voc_rid_t revisionId,
VPackSlice const& doc) {
std::vector<RocksDBKey> elements;
std::vector<uint64_t> hashes;
int res;
try {
VPackBuilder leased;
res = fillElement(leased, revisionId, doc, elements);
res = fillElement(leased, revisionId, doc, elements, hashes);
} catch (...) {
res = TRI_ERROR_OUT_OF_MEMORY;
}
@ -627,6 +678,12 @@ int RocksDBVPackIndex::removeRaw(rocksdb::WriteBatchWithIndex* writeBatch,
writeBatch->Delete(elements[i].string());
}
for (auto& it : hashes) {
// The estimator is only useful if we are in a non-unique indexes
TRI_ASSERT(!_unique);
_estimator->remove(it);
}
return TRI_ERROR_NO_ERROR;
}
@ -1413,11 +1470,56 @@ int RocksDBVPackIndex::cleanup() {
return TRI_ERROR_NO_ERROR;
}
void RocksDBVPackIndex::serializeEstimate(std::string& output) const {
TRI_ASSERT(!ServerState::instance()->isCoordinator());
if (!_unique) {
TRI_ASSERT(_estimator != nullptr);
_estimator->serialize(output);
}
}
bool RocksDBVPackIndex::deserializeEstimate(RocksDBCounterManager* mgr) {
TRI_ASSERT(!ServerState::instance()->isCoordinator());
if (_unique) {
return true;
}
// We simply drop the current estimator and steal the one from recovery
// We are than save for resizing issues in our _estimator format
// and will use the old size.
TRI_ASSERT(mgr != nullptr);
auto tmp = mgr->stealIndexEstimator(_objectId);
if (tmp == nullptr) {
// We expected to receive a stored index estimate, however we got none.
// We use the freshly created estimator but have to recompute it.
return false;
}
_estimator.swap(tmp);
TRI_ASSERT(_estimator != nullptr);
return true;
}
void RocksDBVPackIndex::recalculateEstimates() {
if (unique()) {
return;
}
TRI_ASSERT(_estimator != nullptr);
_estimator->clear();
auto bounds = RocksDBKeyBounds::IndexEntries(_objectId);
rocksutils::iterateBounds(bounds, [&](rocksdb::Iterator* it) {
uint64_t hash = RocksDBVPackIndex::HashForKey(it->key());
_estimator->insert(hash);
});
}
Result RocksDBVPackIndex::postprocessRemove(transaction::Methods* trx,
rocksdb::Slice const& key,
rocksdb::Slice const& value) {
if (!unique()) {
// TODO: update selectivity estimate
uint64_t hash = RocksDBVPackIndex::HashForKey(key);
_estimator->remove(hash);
}
return {TRI_ERROR_NO_ERROR};
}

View File

@ -29,6 +29,7 @@
#include "Aql/AstNode.h"
#include "Basics/Common.h"
#include "Indexes/IndexIterator.h"
#include "RocksDBEngine/RocksDBCuckooIndexEstimator.h"
#include "RocksDBEngine/RocksDBIndex.h"
#include "RocksDBEngine/RocksDBKey.h"
#include "RocksDBEngine/RocksDBKeyBounds.h"
@ -101,6 +102,8 @@ class RocksDBVPackIndex : public RocksDBIndex {
friend class RocksDBVPackIndexIterator;
public:
static uint64_t HashForKey(const rocksdb::Slice& key);
RocksDBVPackIndex() = delete;
RocksDBVPackIndex(TRI_idx_iid_t, LogicalCollection*,
@ -110,13 +113,7 @@ class RocksDBVPackIndex : public RocksDBIndex {
bool hasSelectivityEstimate() const override { return true; }
double selectivityEstimate(
arangodb::StringRef const* = nullptr) const override {
if (_unique) {
return 1.0; // only valid if unique
}
return 0.2; // TODO: fix this hard-coded estimate
}
double selectivityEstimate(arangodb::StringRef const* = nullptr) const override;
size_t memory() const override;
@ -179,6 +176,12 @@ class RocksDBVPackIndex : public RocksDBIndex {
int cleanup() override;
void serializeEstimate(std::string& output) const override;
bool deserializeEstimate(arangodb::RocksDBCounterManager* mgr) override;
void recalculateEstimates() override;
protected:
Result postprocessRemove(transaction::Methods* trx, rocksdb::Slice const& key,
rocksdb::Slice const& value) override;
@ -209,21 +212,26 @@ protected:
/// @brief helper function to insert a document into any index type
int fillElement(velocypack::Builder& leased, TRI_voc_rid_t revisionId,
VPackSlice const& doc, std::vector<RocksDBKey>& elements);
VPackSlice const& doc, std::vector<RocksDBKey>& elements,
std::vector<uint64_t>& hashes);
/// @brief helper function to build the key and value for rocksdb from the
/// vector of slices
/// @param hashes list of VPackSlice hashes for the estimator.
void addIndexValue(velocypack::Builder& leased, VPackSlice const& document,
std::vector<RocksDBKey>& elements,
std::vector<VPackSlice>& sliceStack);
std::vector<VPackSlice>& sliceStack,
std::vector<uint64_t>& hashes);
/// @brief helper function to create a set of value combinations to insert
/// into the rocksdb index.
/// @param elements vector of resulting index entries
/// @param sliceStack working list of values to insert into the index
/// @param hashes list of VPackSlice hashes for the estimator.
void buildIndexValues(velocypack::Builder& leased, VPackSlice const document,
size_t level, std::vector<RocksDBKey>& elements,
std::vector<VPackSlice>& sliceStack);
std::vector<VPackSlice>& sliceStack,
std::vector<uint64_t>& hashes);
private:
std::unique_ptr<FixedSizeAllocator> _allocator;
@ -239,6 +247,12 @@ protected:
/// @brief whether or not partial indexing is allowed
bool _allowPartialIndex;
/// @brief A fixed size library to estimate the selectivity of the index.
/// On insertion of a document we have to insert it into the estimator,
/// On removal we have to remove it in the estimator as well.
std::unique_ptr<RocksDBCuckooIndexEstimator<uint64_t>> _estimator;
};
} // namespace arangodb

View File

@ -70,9 +70,14 @@ void EngineSelectorFeature::prepare() {
// file if engine in file does not match command-line option
if (basics::FileUtils::isRegularFile(_engineFilePath)) {
std::string content = basics::FileUtils::slurp(_engineFilePath);
if (content != _engine) {
LOG_TOPIC(FATAL, Logger::STARTUP) << "content of 'ENGINE' file '" << _engineFilePath << "' and command-line/configuration option value do not match: '" << content << "' != '" << _engine << "'. please validate the command-line/configuration option value of '--server.storage-engine' or use a different database directory if the change is intentional";
try {
std::string content = basics::FileUtils::slurp(_engineFilePath);
if (content != _engine) {
LOG_TOPIC(FATAL, Logger::STARTUP) << "content of 'ENGINE' file '" << _engineFilePath << "' and command-line/configuration option value do not match: '" << content << "' != '" << _engine << "'. please validate the command-line/configuration option value of '--server.storage-engine' or use a different database directory if the change is intentional";
FATAL_ERROR_EXIT();
}
} catch (std::exception const& ex) {
LOG_TOPIC(FATAL, Logger::STARTUP) << "unable to read content of 'ENGINE' file '" << _engineFilePath << "': " << ex.what() << ". please make sure the file/directory is readable for the arangod process and user";
FATAL_ERROR_EXIT();
}
}
@ -105,7 +110,7 @@ void EngineSelectorFeature::start() {
try {
basics::FileUtils::spit(_engineFilePath, _engine);
} catch (std::exception const& ex) {
LOG_TOPIC(FATAL, Logger::STARTUP) << "unable to write 'ENGINE' file '" << _engineFilePath << "': " << ex.what();
LOG_TOPIC(FATAL, Logger::STARTUP) << "unable to write 'ENGINE' file '" << _engineFilePath << "': " << ex.what() << ". please make sure the file/directory is writable for the arangod process and user";
FATAL_ERROR_EXIT();
}
}

View File

@ -605,7 +605,9 @@ function agencyTestSuite () {
////////////////////////////////////////////////////////////////////////////////
testOpErase : function () {
writeAndCheck([[{"/version":{"op":"delete"}}]]);
writeAndCheck([[{"/a":[0,1,2,3,4,5,6,7,8,9]}]]); // none before
assertEqual(readAndCheck([["/a"]]), [{a:[0,1,2,3,4,5,6,7,8,9]}]);
writeAndCheck([[{"a":{"op":"erase","val":3}}]]);
@ -629,6 +631,28 @@ function agencyTestSuite () {
writeAndCheck([[{"a":{"op":"erase","val":6}}],
[{"a":{"op":"erase","val":8}}]]);
assertEqual(readAndCheck([["/a"]]), [{a:[]}]);
writeAndCheck([[{"/a":[0,1,2,3,4,5,6,7,8,9]}]]); // none before
assertEqual(readAndCheck([["/a"]]), [{a:[0,1,2,3,4,5,6,7,8,9]}]);
writeAndCheck([[{"a":{"op":"erase","pos":3}}]]);
assertEqual(readAndCheck([["/a"]]), [{a:[0,1,2,4,5,6,7,8,9]}]);
writeAndCheck([[{"a":{"op":"erase","pos":0}}]]);
assertEqual(readAndCheck([["/a"]]), [{a:[1,2,4,5,6,7,8,9]}]);
writeAndCheck([[{"a":{"op":"erase","pos":0}}]]);
assertEqual(readAndCheck([["/a"]]), [{a:[2,4,5,6,7,8,9]}]);
writeAndCheck([[{"a":{"op":"erase","pos":2}}]]);
assertEqual(readAndCheck([["/a"]]), [{a:[2,4,6,7,8,9]}]);
writeAndCheck([[{"a":{"op":"erase","pos":4}}]]);
assertEqual(readAndCheck([["/a"]]), [{a:[2,4,6,7,9]}]);
writeAndCheck([[{"a":{"op":"erase","pos":2}}]]);
assertEqual(readAndCheck([["/a"]]), [{a:[2,4,7,9]}]);
writeAndCheck([[{"a":{"op":"erase","pos":2}}]]);
assertEqual(readAndCheck([["/a"]]), [{a:[2,4,9]}]);
writeAndCheck([[{"a":{"op":"erase","pos":0}}]]);
assertEqual(readAndCheck([["/a"]]), [{a:[4,9]}]);
writeAndCheck([[{"a":{"op":"erase","pos":1}}],
[{"a":{"op":"erase","pos":0}}]]);
assertEqual(readAndCheck([["/a"]]), [{a:[]}]);
},
////////////////////////////////////////////////////////////////////////////////

View File

@ -33,8 +33,22 @@ var arangodb = require("@arangodb");
var db = arangodb.db;
var internal = require("internal");
var wait = internal.wait;
var print = internal.print;
var ArangoCollection = arangodb.ArangoCollection;
var mmfilesEngine = false;
if (db._engine().name === "mmfiles") {
mmfilesEngine = true;
}
var printed=false;
function printNotImplemented(message){
if(!printed){
printed = true;
print("test for " + message + " not implemented");
}
}
////////////////////////////////////////////////////////////////////////////////
/// @brief test suite: buckets
////////////////////////////////////////////////////////////////////////////////
@ -60,9 +74,20 @@ function EdgeIndexBucketsSuite () {
db._drop(en1);
db._drop(en2);
db._drop(en3);
edge1 = db._createEdgeCollection(en1, { indexBuckets: 1 });
edge2 = db._createEdgeCollection(en2, { indexBuckets: 16 });
edge3 = db._createEdgeCollection(en3, { indexBuckets: 128 });
var options = {};
if (mmfilesEngine){
options = { indexBuckets: 1 };
}
edge1 = db._createEdgeCollection(en1, options);
if (mmfilesEngine){
options = { indexBuckets: 16 };
}
edge2 = db._createEdgeCollection(en2, options);
if (mmfilesEngine){
options = { indexBuckets: 128 };
}
edge3 = db._createEdgeCollection(en3, options);
db._drop(vn);
vertex = db._create(vn);
@ -93,7 +118,7 @@ function EdgeIndexBucketsSuite () {
edge1.save(vn + "/v" + i, vn + "/v" + j, { });
edge2.save(vn + "/v" + i, vn + "/v" + j, { });
edge3.save(vn + "/v" + i, vn + "/v" + j, { });
}
}
}
// unload collections
@ -183,7 +208,7 @@ function EdgeIndexBucketsSuite () {
ref = edge1.inEdges(from).length;
assertEqual(ref, edge2.inEdges(from).length);
assertEqual(ref, edge3.inEdges(from).length);
ref = edge1.outEdges(to).length;
assertEqual(ref, edge2.outEdges(to).length);
assertEqual(ref, edge3.outEdges(to).length);
@ -237,6 +262,38 @@ function EdgeIndexSuite () {
wait(0.0);
},
////////////////////////////////////////////////////////////////////////////////
/// @brief test batch size limit
////////////////////////////////////////////////////////////////////////////////
testIndexBatchsizeLimit : function () {
[20, 900, 1000, 1100, 2000].forEach( function(n){
var toKeys = [];
for (var i = 0; i < n; ++i) {
var to = "b" + n + "/"+i;
edge.insert({_from : "a/" + n, _to : to});
toKeys.push(to);
}
assertEqual(n,edge.byExample({ _from : "a/" + n }).toArray().length, "compare 1");
assertEqual(n,edge.byExample({ _from : "a/" + n }).toArray().length, "compare 2");
var rv = edge.byExample({ _from : "a/" + n }).toArray();
assertEqual(n,rv.length, "compare 3");
//assert equal values
if(n <= 1001){
var keys = rv.map(function(x){ return x._to; });
keys.sort();
toKeys.sort();
keys.forEach(function(x,i){
assertEqual(x,toKeys[i], "compare exact values");
});
}
});
},
////////////////////////////////////////////////////////////////////////////////
/// @brief test index presence
////////////////////////////////////////////////////////////////////////////////
@ -255,9 +312,14 @@ function EdgeIndexSuite () {
////////////////////////////////////////////////////////////////////////////////
testIndexSelectivityEmpty : function () {
printed = false;
var edgeIndex = edge.getIndexes()[1];
assertTrue(edgeIndex.hasOwnProperty("selectivityEstimate"));
assertEqual(1, edgeIndex.selectivityEstimate);
if(mmfilesEngine){
assertEqual(1, edgeIndex.selectivityEstimate);
} else {
printNotImplemented("selectivityEstimate");
}
},
////////////////////////////////////////////////////////////////////////////////
@ -265,10 +327,15 @@ function EdgeIndexSuite () {
////////////////////////////////////////////////////////////////////////////////
testIndexSelectivityOneDoc : function () {
printed = false;
edge.save(v1, v2, { });
var edgeIndex = edge.getIndexes()[1];
assertTrue(edgeIndex.hasOwnProperty("selectivityEstimate"));
assertEqual(1, edgeIndex.selectivityEstimate);
if(mmfilesEngine){
assertEqual(1, edgeIndex.selectivityEstimate);
} else {
printNotImplemented("selectivityEstimate");
}
},
////////////////////////////////////////////////////////////////////////////////
@ -276,6 +343,7 @@ function EdgeIndexSuite () {
////////////////////////////////////////////////////////////////////////////////
testIndexSelectivityDuplicateDocs : function () {
printed = false;
var i, c, edgeIndex, expectedSelectivity;
for (i = 0; i < 1000; ++i) {
@ -283,7 +351,11 @@ function EdgeIndexSuite () {
edgeIndex = edge.getIndexes()[1];
expectedSelectivity = 1 / (i + 1);
// allow for some floating-point deviations
assertTrue(Math.abs(expectedSelectivity - edgeIndex.selectivityEstimate) <= 0.001);
if(mmfilesEngine){
assertTrue(Math.abs(expectedSelectivity - edgeIndex.selectivityEstimate) <= 0.001);
} else {
printNotImplemented("selectivityEstimate");
}
}
var n = edge.count();
@ -298,7 +370,11 @@ function EdgeIndexSuite () {
c = 1000 - (i + 1);
expectedSelectivity = (c === 0 ? 1 : 1 / c);
// allow for some floating-point deviations
assertTrue(Math.abs(expectedSelectivity - edgeIndex.selectivityEstimate) <= 0.001);
if(mmfilesEngine){
assertTrue(Math.abs(expectedSelectivity - edgeIndex.selectivityEstimate) <= 0.001);
} else {
printNotImplemented("selectivityEstimate");
}
}
},
@ -319,11 +395,16 @@ function EdgeIndexSuite () {
////////////////////////////////////////////////////////////////////////////////
testIndexSelectivityUniqueDocsFrom : function () {
printed = false;
for (var i = 0; i < 1000; ++i) {
edge.save(vn + "/from" + i, vn + "/1", { });
var edgeIndex = edge.getIndexes()[1];
var expectedSelectivity = (1 + (1 / (i + 1))) * 0.5;
assertTrue(Math.abs(expectedSelectivity - edgeIndex.selectivityEstimate) <= 0.001);
var expectedSelectivity = (1 + (1 / (i + 1))) * 0.5;
if(mmfilesEngine){
assertTrue(Math.abs(expectedSelectivity - edgeIndex.selectivityEstimate) <= 0.001);
} else {
printNotImplemented("selectivityEstimate");
}
}
},
@ -332,16 +413,22 @@ function EdgeIndexSuite () {
////////////////////////////////////////////////////////////////////////////////
testIndexSelectivityRepeatingDocs : function () {
printed = false;
for (var i = 0; i < 1000; ++i) {
if (i > 0) {
var edgeIndex = edge.getIndexes()[1];
var expectedSelectivity = (1 + (Math.min(i, 20) / i)) * 0.5;
assertTrue(Math.abs(expectedSelectivity - edgeIndex.selectivityEstimate) <= 0.001);
var expectedSelectivity = (1 + (Math.min(i, 20) / i)) * 0.5;
if(mmfilesEngine){
assertTrue(Math.abs(expectedSelectivity - edgeIndex.selectivityEstimate) <= 0.001);
} else {
printNotImplemented("selectivityEstimate");
}
}
edge.save(vn + "/from" + (i % 20), vn + "/to" + i, { });
}
}
};
}
@ -350,8 +437,7 @@ function EdgeIndexSuite () {
/// @brief executes the test suite
////////////////////////////////////////////////////////////////////////////////
jsunity.run(EdgeIndexBucketsSuite);
jsunity.run(EdgeIndexSuite);
jsunity.run(EdgeIndexBucketsSuite);
return jsunity.done();

View File

@ -28,32 +28,39 @@
// / @author Copyright 2012, triAGENS GmbH, Cologne, Germany
// //////////////////////////////////////////////////////////////////////////////
var db = require('@arangodb').db;
var internal = require('internal');
var jsunity = require('jsunity');
const db = require('@arangodb').db;
const internal = require('internal');
const jsunity = require('jsunity');
const colName1 = 'UnitTestsRecovery1';
const colName2 = 'UnitTestsRecovery2';
const colName3 = 'UnitTestsRecovery3';
const est1 = 1; // The index is de-facto unique so estimate 1
const est2 = 1; // This index is unique. Estimate 1
const est3 = 4 / 1000; // This index has 4 different values and stores 1000 documents
function runSetup () {
'use strict';
internal.debugClearFailAt();
db._drop('UnitTestsRecovery1');
var c = db._create('UnitTestsRecovery1'), i;
db._drop(colName1);
var c = db._create(colName1), i;
c.ensureHashIndex('value');
for (i = 0; i < 1000; ++i) {
c.save({ value: i });
}
db._drop('UnitTestsRecovery2');
c = db._create('UnitTestsRecovery2');
db._drop(colName2);
c = db._create(colName2);
c.ensureUniqueConstraint('a.value');
for (i = 0; i < 1000; ++i) {
c.save({ a: { value: i } });
}
db._drop('UnitTestsRecovery3');
c = db._create('UnitTestsRecovery3');
db._drop(colName3);
c = db._create(colName3);
c.ensureHashIndex('a', 'b');
for (i = 0; i < 500; ++i) {
@ -84,39 +91,82 @@ function recoverySuite () {
// / @brief test whether we can restore the trx data
// //////////////////////////////////////////////////////////////////////////////
testIndexesHash: function () {
var c = db._collection('UnitTestsRecovery1'), idx, i;
idx = c.getIndexes()[1];
testSingleAttributeHashIndexInfo: function() {
let c = db._collection(colName1);
let idx = c.getIndexes()[1];
assertFalse(idx.unique);
assertFalse(idx.sparse);
assertEqual([ 'value' ], idx.fields);
for (i = 0; i < 1000; ++i) {
},
testSingleAttributeHashIndexByExample: function() {
let c = db._collection(colName1);
for (let i = 0; i < 1000; ++i) {
assertEqual(1, c.byExample({ value: i }).toArray().length);
}
assertEqual(1, db._query("FOR doc IN UnitTestsRecovery1 FILTER doc.value == 0 RETURN doc").toArray().length);
},
c = db._collection('UnitTestsRecovery2');
idx = c.getIndexes()[1];
testSingleAttributeHashIndexAql: function() {
assertEqual(1, db._query(`FOR doc IN ${colName1} FILTER doc.value == 0 RETURN doc`).toArray().length);
},
testSingleAttributeHashIndexEstimate: function () {
let c = db._collection(colName1);
let idx = c.getIndexes()[1];
assertEqual(est1, idx.selectivityEstimate);
},
testNestedAttributeHashIndexInfo: function() {
let c = db._collection(colName2);
let idx = c.getIndexes()[1];
assertTrue(idx.unique);
assertFalse(idx.sparse);
assertEqual([ 'a.value' ], idx.fields);
for (i = 0; i < 1000; ++i) {
},
testNestedAttributeHashIndexByExample: function() {
let c = db._collection(colName2);
for (let i = 0; i < 1000; ++i) {
assertEqual(1, c.byExample({ 'a.value': i }).toArray().length);
}
assertEqual(1, db._query("FOR doc IN UnitTestsRecovery2 FILTER doc.a.value == 0 RETURN doc").toArray().length);
},
c = db._collection('UnitTestsRecovery3');
idx = c.getIndexes()[1];
testNestedAttributeHashIndexAql: function() {
assertEqual(1, db._query(`FOR doc IN ${colName2} FILTER doc.a.value == 0 RETURN doc`).toArray().length);
},
testNestedAttributeHashIndexEstimate: function () {
let c = db._collection(colName2);
let idx = c.getIndexes()[1];
assertEqual(est2, idx.selectivityEstimate);
},
testManyAttributesHashIndexInfo: function() {
let c = db._collection(colName3);
let idx = c.getIndexes()[1];
assertFalse(idx.unique);
assertFalse(idx.sparse);
assertEqual([ 'a', 'b' ], idx.fields);
},
testManyAttributesHashIndexByExample: function() {
let c = db._collection(colName3);
assertEqual(250, c.byExample({ a: 1, b: 1 }).toArray().length);
assertEqual(250, c.byExample({ a: 1, b: 2 }).toArray().length);
assertEqual(250, c.byExample({ a: 2, b: 1 }).toArray().length);
assertEqual(250, c.byExample({ a: 2, b: 2 }).toArray().length);
assertEqual(250, db._query("FOR doc IN UnitTestsRecovery3 FILTER doc.a == 1 && doc.b == 1 RETURN doc").toArray().length);
}
},
testManyAttributesHashIndexAql: function() {
assertEqual(250, db._query(`FOR doc IN ${colName3} FILTER doc.a == 1 && doc.b == 1 RETURN doc`).toArray().length);
},
testManyAttributesHashIndexEstimate: function () {
let c = db._collection(colName3);
let idx = c.getIndexes()[1];
assertEqual(est3, idx.selectivityEstimate);
},
};
}

View File

@ -0,0 +1,80 @@
/*global describe, it, ArangoAgency, afterEach */
////////////////////////////////////////////////////////////////////////////////
/// @brief cluster collection creation tests
///
/// @file
///
/// DISCLAIMER
///
/// Copyright 2010-2012 triagens GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is triAGENS GmbH, Cologne, Germany
///
/// @author Andreas Streichardt
/// @author Copyright 2017, ArangoDB GmbH, Cologne, Germany
////////////////////////////////////////////////////////////////////////////////
'use strict';
const expect = require('chai').expect;
var internal = require("internal");
var db = require("org/arangodb").db;
describe('Cluster collection creation options', function() {
afterEach(function() {
db._drop('testi');
});
it('should always throw an error when creating a faulty index', function() {
db._create("testi", {numberOfShards: 1});
db.testi.save({"test": 1});
db.testi.save({"test": 1});
expect(function() {
db.testi.ensureIndex({ type: "hash", fields: [ "test" ], unique: true });
}).to.throw();
// before fixing it the second call would return the faulty index
expect(function() {
db.testi.ensureIndex({ type: "hash", fields: [ "test" ], unique: true });
}).to.throw();
});
it('should cleanup current after creating a faulty index', function() {
db._create("testi", {numberOfShards: 1});
let current = ArangoAgency.get('Current/Collections/_system');
let plan = ArangoAgency.get('Plan/Collections/_system');
let collectionId = Object.values(plan.arango.Plan.Collections['_system']).reduce((result, collectionDef) => {
if (result) {
return result;
}
if (collectionDef.name === 'testi') {
return collectionDef.id;
}
}, undefined);
db.testi.save({"test": 1});
db.testi.save({"test": 1});
expect(function() {
db.testi.ensureIndex({ type: "hash", fields: [ "test" ], unique: true });
}).to.throw();
// wait for the schmutz
internal.wait(1.0);
current = ArangoAgency.get('Current/Collections/_system/' + collectionId);
Object.values(current.arango.Current.Collections['_system'][collectionId]).forEach(entry => {
expect(entry.indexes).to.have.lengthOf(1);
});
});
});

View File

@ -326,9 +326,7 @@ function SkipListCorrSuite() {
}
// should not have created an index
// TODO: FIXME 04052017: re-activate this check!! waiting for mop to add rollback code
// in case an index cannot be created on a shard
// assertEqual(coll.getIndexes().length, 1);
assertEqual(coll.getIndexes().length, 1);
}
};
}

View File

@ -17,6 +17,7 @@ function help() {
echo " -o/--xterm-options XTerm options (default: --geometry=80x43)"
echo " -b/--offset-ports Offset ports (default: 0, i.e. A:4001, C:8530, D:8629)"
echo " -r/--rocksdb-engine Use Rocksdb engine (default: false )"
echo " -q/--source-dir ArangoDB source dir (default: .)"
echo ""
echo "EXAMPLES:"
echo " $0"
@ -44,6 +45,7 @@ SECONDARIES=0
BUILD="build"
JWT_SECRET=""
PORT_OFFSET=0
SRC_DIR="."
while [[ ${1} ]]; do
case "${1}" in
@ -87,6 +89,10 @@ while [[ ${1} ]]; do
JWT_SECRET=${2}
shift
;;
-q|--src-dir)
SRC_DIR=${2}
shift
;;
-x|--xterm)
XTERM=${2}
shift

View File

@ -19,7 +19,8 @@ printf " # db servers: %s," "$NRDBSERVERS"
printf " # coordinators: %s," "$NRCOORDINATORS"
printf " transport: %s\n" "$TRANSPORT"
if [ ! -d arangod ] || [ ! -d arangosh ] || [ ! -d UnitTests ] ; then
echo $SRC_DIR
if [ ! -d ${SRC_DIR}/arangod ] || [ ! -d ${SRC_DIR}/arangosh ] || [ ! -d ${SRC_DIR}/UnitTests ] ; then
echo Must be started in the main ArangoDB source directory.
exit 1
fi
@ -29,11 +30,6 @@ if [[ $(( $NRAGENTS % 2 )) == 0 ]]; then
exit 1
fi
if [ ! -d arangod ] || [ ! -d arangosh ] || [ ! -d UnitTests ] ; then
echo "Must be started in the main ArangoDB source directory! Bailing out."
exit 1
fi
if [ ! -z "$INTERACTIVE_MODE" ] ; then
if [ "$INTERACTIVE_MODE" == "C" ] ; then
COORDINATORCONSOLE=1
@ -102,9 +98,9 @@ for aid in `seq 0 $(( $NRAGENTS - 1 ))`; do
--agency.supervision-grace-period 15 \
--agency.wait-for-sync false \
--database.directory cluster/data$port \
--javascript.app-path ./js/apps \
--javascript.startup-directory ./js \
--javascript.module-directory ./enterprise/js \
--javascript.app-path ${SRC_DIR}/js/apps \
--javascript.startup-directory ${SRC_DIR}/js \
--javascript.module-directory ${SRC_DIR}/enterprise/js \
--javascript.v8-contexts 1 \
--server.endpoint $TRANSPORT://$ANYWHERE:$port \
--server.statistics false \
@ -143,8 +139,8 @@ start() {
--log.level $LOG_LEVEL \
--server.statistics true \
--server.threads 5 \
--javascript.startup-directory ./js \
--javascript.module-directory ./enterprise/js \
--javascript.startup-directory ${SRC_DIR}/js \
--javascript.module-directory ${SRC_DIR}/enterprise/js \
--javascript.app-path cluster/apps$PORT \
--log.force-direct true \
--log.level cluster=$LOG_LEVEL_CLUSTER \
@ -176,9 +172,9 @@ startTerminal() {
--log.level $LOG_LEVEL \
--server.statistics true \
--server.threads 5 \
--javascript.startup-directory ./js \
--javascript.module-directory ./enterprise/js \
--javascript.app-path ./js/apps \
--javascript.startup-directory ${SRC_DIR}/js \
--javascript.module-directory ${SRC_DIR}/enterprise/js \
--javascript.app-path ${SRC_DIR}/js/apps \
$STORAGE_ENGINE \
$DEFAULT_REPLICATION \
$AUTHENTICATION \
@ -207,9 +203,9 @@ startDebugger() {
--log.level $LOG_LEVEL \
--server.statistics false \
--server.threads 5 \
--javascript.startup-directory ./js \
--javascript.module-directory ./enterprise/js \
--javascript.app-path ./js/apps \
--javascript.startup-directory ${SRC_DIR}/js \
--javascript.module-directory ${SRC_DIR}/enterprise/js \
--javascript.app-path ${SRC_DIR}/js/apps \
$STORAGE_ENGINE \
$DEFAULT_REPLICATION \
$SSLKEYFILE \
@ -238,9 +234,9 @@ startRR() {
--log.level $LOG_LEVEL \
--server.statistics true \
--server.threads 5 \
--javascript.startup-directory ./js \
--javascript.module-directory ./enterprise/js \
--javascript.app-path ./js/apps \
--javascript.startup-directory ${SRC_DIR}/js \
--javascript.module-directory ${SRC_DIR}/enterprise/js \
--javascript.app-path ${SRC_DIR}/js/apps \
$STORAGE_ENGINE \
$DEFAULT_REPLICATION \
$AUTHENTICATION \
@ -331,13 +327,13 @@ if [ "$SECONDARIES" == "1" ] ; then
--cluster.my-id $CLUSTER_ID \
--log.file cluster/$PORT.log \
--server.statistics true \
--javascript.startup-directory ./js \
--javascript.module-directory ./enterprise/js \
--javascript.startup-directory ${SRC_DIR}/js \
--javascript.module-directory ${SRC_DIR}/enterprise/js \
$STORAGE_ENGINE \
$DEFAULT_REPLICATION \
$AUTHENTICATION \
$SSLKEYFILE \
--javascript.app-path ./js/apps \
--javascript.app-path ${SRC_DIR}/js/apps \
> cluster/$PORT.stdout 2>&1 &
let index=$index+1

159
scripts/tmux_test_starter Executable file
View File

@ -0,0 +1,159 @@
#!/bin/bash
# Author Jan Christoph Uhde
main(){
source_file=~/.tmux_starter_suites #may not contain spaces
if [[ -f $source_file ]]; then
. $source_file
fi
local suite=${1:-all}
local tasks="suite_$suite"
type -t function $tasks || die "suite $suite not defined"
# ceck task
local session_name="$($tasks 'name')"
local panes=$($tasks 'num')
kill_old_session "$session_name"
tmux new-session -d -s "$session_name" || die "unable to spawn session"
local rows=$(( (panes+1) / 2 ))
local cols=$((((panes>1)) * 2))
spawn_panes "$session_name" $rows $cols
tmux select-pane -t $session_name.0
execute_tasks "$(pwd)" $tasks
tmux -2 attach-session -t $session_name
}
# task definitions
suite_all(){
local count="$1"
local args_default=""
local args="$2"
local tests=""
case $1 in
num)
echo "6"
return
;;
name)
echo "test_all"
return
;;
0)
echo "./scripts/quickieTest.sh $args && exit 0"
return
;;
1)
tests="shell_server shell_client"
;;
2)
tests="shell_server_aql"
;;
2)
tests="http_server server_http"
;;
3)
tests="dump importing export arangobench upgrade"
;;
4)
tests="replication_sync replication_static replication_ongoing http_replication shell_replication"
;;
5)
tests="agency cluster_sync"
;;
*)
esac
echo "./scripts/unittest $tests $args_default $args && exit 0"
}
suite_all_rocksdb(){
local count="$1"
local args_default=""
local args="$2"
local tests=""
case $1 in
name)
echo "test_all_rocksdb"
return
;;
*)
suite_all "$1" "--storageEngine rocksdb"
;;
esac
}
# tmux
kill_old_session(){
local session_name="$1"
if tmux has-session -t $session_name 2> /dev/null; then
echo "Session $session_name exists. Kill it? [y/N]"
read kill_sess
echo "killsess '$kill_sess'"
if [[ ($kill_sess == "y") || ($kill_sess == "Y") ]]; then
tmux kill-session -t $session_name
else
die "Session not created because it already exists. Exiting."
fi
fi
}
spawn_panes(){
local session_name="$1"
local rows=$2
local cols=$3
# Create rows
local row=$rows
while (( row > 1 )); do
frac=$(echo "scale=2;1/$row" | bc)
percent=$(echo "($frac * 100)/1" | bc)
tmux select-pane -t $session_name.0 || die "could not select pane 0 of session $session_name"
tmux split-window -v -p $percent
((row--))
done
# Create columns
if ((cols > 1)); then
local count=$((rows - 1))
while ((count >= 0)); do
tmux select-pane -t $session_name.$count || die "could not select pane $session_name.$count"
tmux split-window -h -p 50
(( count-- ))
done
fi
}
execute_tasks(){
cd $1 || die
local tasks="$2"
local args="$3"
local count=0
while (( count < $($tasks 'num') )); do
local exec_cmd="$($tasks $count "$args")"
echo "running: ${exec_cmd[@]}"
tmux send-keys -t $session_name.$count "${exec_cmd[@]}" Enter
(( count++ ))
done
}
# helper
die(){
echo "FATAL: ${1-this should not happen}"
exit 1
}
cores(){
# Determine automatically on Mac or Linux
if [[ $(uname) == 'Darwin' ]]; then
echo "$(sysctl hw.ncpu | awk '{print $2}')"
else
echo "$(nproc)"
fi
}
main "$@"
exit 0

View File

@ -53,6 +53,7 @@ add_executable(
Cache/TransactionsWithBackingStore.cpp
Geo/georeg.cpp
Pregel/typedbuffer.cpp
RocksDBEngine/IndexEstimatorTest.cpp
main.cpp
)

View File

@ -0,0 +1,135 @@
////////////////////////////////////////////////////////////////////////////////
/// @brief test suite for CuckooFilter based index selectivity estimator
///
/// @file
///
/// DISCLAIMER
///
/// Copyright 2004-2012 triagens GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is triAGENS GmbH, Cologne, Germany
///
/// @author Michael Hackstein
/// @author Copyright 2017, ArangoDB GmbH, Cologne, Germany
////////////////////////////////////////////////////////////////////////////////
#include "Basics/Common.h"
#include "Basics/StringRef.h"
#include "catch.hpp"
#include "RocksDBEngine/RocksDBCuckooIndexEstimator.h"
using namespace arangodb;
// -----------------------------------------------------------------------------
// --SECTION-- test suite
// -----------------------------------------------------------------------------
// @brief setup
TEST_CASE("IndexEstimator", "[indexestimator]") {
// @brief Test insert unique correctness
SECTION("test_unique_values") {
std::vector<uint64_t> toInsert(100);
uint64_t i = 0;
RocksDBCuckooIndexEstimator<uint64_t> est(2048);
std::generate(toInsert.begin(), toInsert.end(), [&i] {return i++;});
for (auto it : toInsert) {
est.insert(it);
}
CHECK(est.nrUsed() == 100);
CHECK(est.computeEstimate() == 1);
for (size_t k = 0; k < 10; ++k) {
est.remove(toInsert[k]);
}
CHECK(est.nrUsed() == 90);
CHECK(est.computeEstimate() == 1);
}
SECTION("test_multiple_values") {
std::vector<uint64_t> toInsert(100);
uint64_t i = 0;
RocksDBCuckooIndexEstimator<uint64_t> est(2048);
std::generate(toInsert.begin(), toInsert.end(), [&i] {return (i++) % 10;});
for (auto it : toInsert) {
est.insert(it);
}
CHECK(est.nrUsed() == 10);
CHECK(est.nrCuckood() == 0);
CHECK(est.computeEstimate() == (double) 10 / 100);
for (size_t k = 0; k < 10; ++k) {
est.remove(toInsert[k]);
}
CHECK(est.nrCuckood() == 0);
CHECK(est.computeEstimate() == (double) 10 / 90);
}
SECTION("test_serialize_deserialize") {
std::vector<uint64_t> toInsert(10000);
uint64_t i = 0;
std::string serialization;
RocksDBCuckooIndexEstimator<uint64_t> est(2048);
std::generate(toInsert.begin(), toInsert.end(), [&i] {return i++;});
for (auto it : toInsert) {
est.insert(it);
}
est.serialize(serialization);
// Test that the serialization first reports the correct length
uint64_t length = serialization.size();
// We read starting from the second char. The first char is reserved for the type
uint64_t persLength = rocksutils::uint64FromPersistent(serialization.data() + 1);
CHECK(persLength == length);
// We first have an uint64_t representing the length.
// This has to be extracted BEFORE initialisation.
StringRef ref(serialization.data(), persLength);
RocksDBCuckooIndexEstimator<uint64_t> copy(ref);
// After serialisation => deserialisation
// both estimates have to be identical
CHECK(est.nrUsed() == copy.nrUsed());
CHECK(est.nrCuckood() == copy.nrCuckood());
CHECK(est.computeEstimate() == copy.computeEstimate());
// Now let us remove the same elements in both
bool coin = false;
for (auto it : toInsert) {
if (coin) {
est.remove(it);
copy.remove(it);
}
coin = !coin;
}
// We cannot relibly check inserts because the cuckoo has a random factor
// Still all values have to be identical
CHECK(est.nrUsed() == copy.nrUsed());
CHECK(est.nrCuckood() == copy.nrCuckood());
CHECK(est.computeEstimate() == copy.computeEstimate());
}
// @brief generate tests
}