1
0
Fork 0

optimizations

This commit is contained in:
jsteemann 2017-05-17 23:41:16 +02:00
parent 9b8a641e77
commit 226920d7fa
21 changed files with 335 additions and 268 deletions

View File

@ -21,8 +21,10 @@
/// @author Jan Steemann
////////////////////////////////////////////////////////////////////////////////
#include "MMFilesTransactionState.h"
#include "Aql/QueryCache.h"
#include "Basics/Exceptions.h"
#include "Basics/RocksDBUtils.h"
#include "Logger/Logger.h"
#include "MMFiles/MMFilesCollection.h"
#include "MMFiles/MMFilesDatafileHelper.h"
@ -30,8 +32,6 @@
#include "MMFiles/MMFilesLogfileManager.h"
#include "MMFiles/MMFilesPersistentIndexFeature.h"
#include "MMFiles/MMFilesTransactionCollection.h"
#include "MMFilesTransactionState.h"
#include "RocksDBEngine/RocksDBCommon.h"
#include "StorageEngine/TransactionCollection.h"
#include "Transaction/Methods.h"
#include "VocBase/LogicalCollection.h"

View File

@ -26,6 +26,7 @@
#include "Basics/Exceptions.h"
#include "Basics/ReadLocker.h"
#include "Basics/Result.h"
#include "Basics/RocksDBUtils.h"
#include "Basics/StaticStrings.h"
#include "Basics/StringUtils.h"
#include "Basics/VelocyPackHelper.h"
@ -33,7 +34,6 @@
#include "Indexes/IndexIterator.h"
#include "Logger/Logger.h"
#include "RestServer/DatabaseFeature.h"
#include "RocksDBEngine/RocksDBCommon.h"
#include "SimpleHttpClient/SimpleHttpClient.h"
#include "SimpleHttpClient/SimpleHttpResult.h"
#include "StorageEngine/EngineSelectorFeature.h"

View File

@ -641,10 +641,12 @@ void RocksDBCollection::truncate(transaction::Methods* trx,
// delete documents
RocksDBKeyBounds documentBounds =
RocksDBKeyBounds::CollectionDocuments(this->objectId());
auto const end = documentBounds.end();
std::unique_ptr<rocksdb::Iterator> iter = mthd->NewIterator();
iter->Seek(documentBounds.start());
while (iter->Valid() && cmp->Compare(iter->key(), documentBounds.end()) < 0) {
while (iter->Valid() && cmp->Compare(iter->key(), end) < 0) {
TRI_voc_rid_t revisionId = RocksDBKey::revisionId(iter->key());
VPackSlice key =
VPackSlice(iter->value().data()).get(StaticStrings::KeyString);
@ -810,7 +812,18 @@ bool RocksDBCollection::readDocument(transaction::Methods* trx,
// TODO: why do we have read(), readDocument() and lookupKey()?
auto tkn = static_cast<RocksDBToken const*>(&token);
TRI_voc_rid_t revisionId = tkn->revisionId();
auto res = lookupRevisionVPack(revisionId, trx, result);
auto res = lookupRevisionVPack(revisionId, trx, result, true);
return res.ok();
}
// read using a token, bypassing the cache
bool RocksDBCollection::readDocumentNoCache(transaction::Methods* trx,
DocumentIdentifierToken const& token,
ManagedDocumentResult& result) {
// TODO: why do we have read(), readDocument() and lookupKey()?
auto tkn = static_cast<RocksDBToken const*>(&token);
TRI_voc_rid_t revisionId = tkn->revisionId();
auto res = lookupRevisionVPack(revisionId, trx, result, false);
return res.ok();
}
@ -881,7 +894,7 @@ int RocksDBCollection::insert(arangodb::transaction::Methods* trx,
res = insertDocument(trx, revisionId, newSlice, options.waitForSync);
if (res.ok()) {
Result lookupResult = lookupRevisionVPack(revisionId, trx, mdr);
Result lookupResult = lookupRevisionVPack(revisionId, trx, mdr, false);
if (lookupResult.fail()) {
return lookupResult.errorNumber();
@ -913,11 +926,10 @@ int RocksDBCollection::update(arangodb::transaction::Methods* trx,
TRI_voc_rid_t const& revisionId,
arangodb::velocypack::Slice const key) {
resultMarkerTick = 0;
RocksDBOperationResult res;
bool const isEdgeCollection =
(_logicalCollection->type() == TRI_COL_TYPE_EDGE);
res = lookupDocument(trx, key, previous);
RocksDBOperationResult res = lookupDocument(trx, key, previous);
if (res.fail()) {
return res.errorNumber();
@ -970,6 +982,8 @@ int RocksDBCollection::update(arangodb::transaction::Methods* trx,
return TRI_ERROR_CLUSTER_MUST_NOT_CHANGE_SHARDING_ATTRIBUTES;
}
}
VPackSlice const newDoc(builder->slice());
RocksDBSavePoint guard(rocksutils::toRocksMethods(trx),
trx->isSingleOperationTransaction(),
@ -978,20 +992,15 @@ int RocksDBCollection::update(arangodb::transaction::Methods* trx,
// add possible log statement under guard
state->prepareOperation(_logicalCollection->cid(), revisionId, StringRef(),
TRI_VOC_DOCUMENT_OPERATION_UPDATE);
VPackSlice const newDoc(builder->slice());
res = updateDocument(trx, oldRevisionId, oldDoc, revisionId, newDoc,
options.waitForSync);
if (res.ok()) {
RocksDBOperationResult result = lookupRevisionVPack(revisionId, trx, mdr);
if (result.fail()) {
return result.errorNumber();
}
mdr.setManaged(newDoc.begin(), revisionId);
TRI_ASSERT(!mdr.empty());
// report document and key size
result = state->addOperation(_logicalCollection->cid(), revisionId,
RocksDBOperationResult result = state->addOperation(_logicalCollection->cid(), revisionId,
TRI_VOC_DOCUMENT_OPERATION_UPDATE,
newDoc.byteSize(), res.keySize());
@ -1015,7 +1024,6 @@ int RocksDBCollection::replace(
arangodb::velocypack::Slice const toSlice) {
resultMarkerTick = 0;
Result res;
bool const isEdgeCollection =
(_logicalCollection->type() == TRI_COL_TYPE_EDGE);
@ -1026,7 +1034,7 @@ int RocksDBCollection::replace(
}
// get the previous revision
res.reset(lookupDocument(trx, key, previous).errorNumber());
Result res = lookupDocument(trx, key, previous).errorNumber();
if (res.fail()) {
return res.errorNumber();
@ -1075,23 +1083,20 @@ int RocksDBCollection::replace(
// add possible log statement under guard
state->prepareOperation(_logicalCollection->cid(), revisionId, StringRef(),
TRI_VOC_DOCUMENT_OPERATION_REPLACE);
VPackSlice const newDoc(builder->slice());
RocksDBOperationResult opResult =
updateDocument(trx, oldRevisionId, oldDoc, revisionId,
VPackSlice(builder->slice()), options.waitForSync);
newDoc, options.waitForSync);
if (opResult.ok()) {
RocksDBOperationResult result = lookupRevisionVPack(revisionId, trx, mdr);
if (!result.ok()) {
return result.errorNumber();
}
mdr.setManaged(newDoc.begin(), revisionId);
TRI_ASSERT(!mdr.empty());
// report document and key size
result = state->addOperation(_logicalCollection->cid(), revisionId,
RocksDBOperationResult result = state->addOperation(_logicalCollection->cid(), revisionId,
TRI_VOC_DOCUMENT_OPERATION_REPLACE,
VPackSlice(builder->slice()).byteSize(),
newDoc.byteSize(),
opResult.keySize());
// transaction size limit reached -- fail
@ -1296,12 +1301,11 @@ arangodb::Result RocksDBCollection::fillIndexes(
// write batch will be reset each 5000 documents
rocksdb::WriteBatchWithIndex batch(db->DefaultColumnFamily()->GetComparator(),
32 * 1024 * 1024);
rocksdb::ReadOptions readOptions;
RocksDBBatchedMethods batched(state, &batch);
int res = TRI_ERROR_NO_ERROR;
auto cb = [&](DocumentIdentifierToken token) {
if (res == TRI_ERROR_NO_ERROR && this->readDocument(trx, token, mmdr)) {
if (res == TRI_ERROR_NO_ERROR && this->readDocumentNoCache(trx, token, mmdr)) {
res = ridx->insertRaw(&batched, mmdr.lastRevisionId(),
VPackSlice(mmdr.vpack()));
if (res == TRI_ERROR_NO_ERROR) {
@ -1310,6 +1314,7 @@ arangodb::Result RocksDBCollection::fillIndexes(
}
};
rocksdb::WriteOptions writeOpts;
Result r;
bool hasMore = true;
while (hasMore) {
@ -1322,7 +1327,6 @@ arangodb::Result RocksDBCollection::fillIndexes(
r = Result(res);
break;
}
rocksdb::WriteOptions writeOpts;
rocksdb::Status s = db->Write(writeOpts, batch.GetWriteBatch());
if (!s.ok()) {
r = rocksutils::convertStatus(s, rocksutils::StatusHint::index);
@ -1518,7 +1522,7 @@ RocksDBOperationResult RocksDBCollection::lookupDocument(
TRI_voc_rid_t revisionId = token.revisionId();
if (revisionId > 0) {
res = lookupRevisionVPack(revisionId, trx, mdr);
res = lookupRevisionVPack(revisionId, trx, mdr, true);
} else {
res.reset(TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND);
}
@ -1559,14 +1563,15 @@ Result RocksDBCollection::lookupDocumentToken(transaction::Methods* trx,
arangodb::Result RocksDBCollection::lookupRevisionVPack(
TRI_voc_rid_t revisionId, transaction::Methods* trx,
arangodb::ManagedDocumentResult& mdr) const {
arangodb::ManagedDocumentResult& mdr,
bool withCache) const {
TRI_ASSERT(trx->state()->isRunning());
TRI_ASSERT(_objectId != 0);
auto key = RocksDBKey::Document(_objectId, revisionId);
std::string value;
if (useCache()) {
if (withCache && useCache()) {
TRI_ASSERT(_cache != nullptr);
// check cache first for fast path
auto f = _cache->find(key.string().data(),
@ -1583,7 +1588,7 @@ arangodb::Result RocksDBCollection::lookupRevisionVPack(
Result res = mthd->Get(key, &value);
TRI_ASSERT(value.data());
if (res.ok()) {
if (useCache()) {
if (withCache && useCache()) {
TRI_ASSERT(_cache != nullptr);
// write entry back to cache
auto entry = cache::CachedValue::construct(
@ -1726,11 +1731,14 @@ uint64_t RocksDBCollection::recalculateCounts() {
if (res.fail()) {
THROW_ARANGO_EXCEPTION(res);
}
rocksdb::ReadOptions readOptions;
readOptions.fill_cache = false;
// count documents
auto documentBounds = RocksDBKeyBounds::CollectionDocuments(_objectId);
_numberDocuments = rocksutils::countKeyRange(
globalRocksDB(), rocksdb::ReadOptions(), documentBounds);
globalRocksDB(), readOptions, documentBounds);
// update counter manager value
res = globalRocksEngine()->counterManager()->setAbsoluteCounter(

View File

@ -134,6 +134,10 @@ class RocksDBCollection final : public PhysicalCollection {
bool readDocument(transaction::Methods* trx,
DocumentIdentifierToken const& token,
ManagedDocumentResult& result) override;
bool readDocumentNoCache(transaction::Methods* trx,
DocumentIdentifierToken const& token,
ManagedDocumentResult& result);
int insert(arangodb::transaction::Methods* trx,
arangodb::velocypack::Slice const newSlice,
@ -227,7 +231,8 @@ class RocksDBCollection final : public PhysicalCollection {
arangodb::velocypack::Slice const& newDoc, bool& waitForSync) const;
arangodb::Result lookupRevisionVPack(TRI_voc_rid_t, transaction::Methods*,
arangodb::ManagedDocumentResult&) const;
arangodb::ManagedDocumentResult&,
bool withCache) const;
void recalculateIndexEstimates(std::vector<std::shared_ptr<Index>>& indexes);

View File

@ -23,8 +23,10 @@
/// @author Jan Christoph Uhde
////////////////////////////////////////////////////////////////////////////////
#include "RocksDBCommon.h"
#include "Basics/RocksDBUtils.h"
#include "Basics/StringRef.h"
#include "RocksDBEngine/RocksDBCommon.h"
#include "Logger/Logger.h"
#include "RocksDBEngine/RocksDBComparator.h"
#include "RocksDBEngine/RocksDBEngine.h"
#include "RocksDBEngine/RocksDBKey.h"
@ -37,73 +39,10 @@
#include <rocksdb/convenience.h>
#include <rocksdb/utilities/transaction_db.h>
#include <velocypack/Iterator.h>
#include "Logger/Logger.h"
namespace arangodb {
namespace rocksutils {
arangodb::Result convertStatus(rocksdb::Status const& status, StatusHint hint) {
switch (status.code()) {
case rocksdb::Status::Code::kOk:
return {TRI_ERROR_NO_ERROR};
case rocksdb::Status::Code::kNotFound:
switch (hint) {
case StatusHint::collection:
return {TRI_ERROR_ARANGO_COLLECTION_NOT_FOUND, status.ToString()};
case StatusHint::database:
return {TRI_ERROR_ARANGO_DATABASE_NOT_FOUND, status.ToString()};
case StatusHint::document:
return {TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND, status.ToString()};
case StatusHint::index:
return {TRI_ERROR_ARANGO_INDEX_NOT_FOUND, status.ToString()};
case StatusHint::view:
return {TRI_ERROR_ARANGO_VIEW_NOT_FOUND, status.ToString()};
case StatusHint::wal:
// suppress this error if the WAL is queried for changes that are not available
return {TRI_ERROR_NO_ERROR};
default:
return {TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND, status.ToString()};
}
case rocksdb::Status::Code::kCorruption:
return {TRI_ERROR_ARANGO_CORRUPTED_DATAFILE, status.ToString()};
case rocksdb::Status::Code::kNotSupported:
return {TRI_ERROR_NOT_IMPLEMENTED, status.ToString()};
case rocksdb::Status::Code::kInvalidArgument:
return {TRI_ERROR_BAD_PARAMETER, status.ToString()};
case rocksdb::Status::Code::kIOError:
if (status.subcode() == rocksdb::Status::SubCode::kNoSpace) {
return {TRI_ERROR_ARANGO_FILESYSTEM_FULL, status.ToString()};
}
return {TRI_ERROR_ARANGO_IO_ERROR, status.ToString()};
case rocksdb::Status::Code::kMergeInProgress:
return {TRI_ERROR_ARANGO_MERGE_IN_PROGRESS, status.ToString()};
case rocksdb::Status::Code::kIncomplete:
return {TRI_ERROR_INTERNAL, "'incomplete' error in storage engine"};
case rocksdb::Status::Code::kShutdownInProgress:
return {TRI_ERROR_SHUTTING_DOWN, status.ToString()};
case rocksdb::Status::Code::kTimedOut:
if (status.subcode() == rocksdb::Status::SubCode::kMutexTimeout ||
status.subcode() == rocksdb::Status::SubCode::kLockTimeout) {
// TODO: maybe add a separator error code/message here
return {TRI_ERROR_LOCK_TIMEOUT, status.ToString()};
}
return {TRI_ERROR_LOCK_TIMEOUT, status.ToString()};
case rocksdb::Status::Code::kAborted:
return {TRI_ERROR_TRANSACTION_ABORTED, status.ToString()};
case rocksdb::Status::Code::kBusy:
if (status.subcode() == rocksdb::Status::SubCode::kDeadlock) {
return {TRI_ERROR_DEADLOCK};
}
return {TRI_ERROR_ARANGO_CONFLICT};
case rocksdb::Status::Code::kExpired:
return {TRI_ERROR_INTERNAL, "key expired; TTL was set in error"};
case rocksdb::Status::Code::kTryAgain:
return {TRI_ERROR_ARANGO_TRY_AGAIN, status.ToString()};
default:
return {TRI_ERROR_INTERNAL, "unknown RocksDB status code"};
}
}
uint64_t uint64FromPersistent(char const* p) {
uint64_t value = 0;
uint64_t x = 0;
@ -160,68 +99,6 @@ void uint16ToPersistent(std::string& p, uint16_t value) {
} while (++len < sizeof(uint16_t));
}
bool hasObjectIds(VPackSlice const& inputSlice) {
bool rv = false;
if (inputSlice.isObject()) {
for (auto const& objectPair :
arangodb::velocypack::ObjectIterator(inputSlice)) {
if (arangodb::StringRef(objectPair.key) == "objectId") {
return true;
}
rv = hasObjectIds(objectPair.value);
if (rv) {
return rv;
}
}
} else if (inputSlice.isArray()) {
for (auto const& slice : arangodb::velocypack::ArrayIterator(inputSlice)) {
if (rv) {
return rv;
}
rv = hasObjectIds(slice);
}
}
return rv;
}
VPackBuilder& stripObjectIdsImpl(VPackBuilder& builder, VPackSlice const& inputSlice) {
if (inputSlice.isObject()) {
builder.openObject();
for (auto const& objectPair :
arangodb::velocypack::ObjectIterator(inputSlice)) {
if (arangodb::StringRef(objectPair.key) == "objectId") {
continue;
}
builder.add(objectPair.key);
stripObjectIdsImpl(builder, objectPair.value);
}
builder.close();
} else if (inputSlice.isArray()) {
builder.openArray();
for (auto const& slice : arangodb::velocypack::ArrayIterator(inputSlice)) {
stripObjectIdsImpl(builder, slice);
}
builder.close();
} else {
builder.add(inputSlice);
}
return builder;
}
std::pair<VPackSlice, std::unique_ptr<VPackBuffer<uint8_t>>> stripObjectIds(
VPackSlice const& inputSlice, bool checkBeforeCopy) {
std::unique_ptr<VPackBuffer<uint8_t>> buffer = nullptr;
if (checkBeforeCopy) {
if (!hasObjectIds(inputSlice)) {
return {inputSlice, std::move(buffer)};
}
}
buffer.reset(new VPackBuffer<uint8_t>);
VPackBuilder builder(*buffer);
stripObjectIdsImpl(builder, inputSlice);
return {VPackSlice(buffer->data()), std::move(buffer)};
}
RocksDBTransactionState* toRocksTransactionState(transaction::Methods* trx) {
TRI_ASSERT(trx != nullptr);
TransactionState* state = trx->state();
@ -288,7 +165,7 @@ std::pair<TRI_voc_tick_t, TRI_voc_cid_t> mapObjectToCollection(
std::size_t countKeyRange(rocksdb::DB* db, rocksdb::ReadOptions const& opts,
RocksDBKeyBounds const& bounds) {
const rocksdb::Comparator* cmp = db->GetOptions().comparator;
rocksdb::Comparator const* cmp = db->GetOptions().comparator;
std::unique_ptr<rocksdb::Iterator> it(db->NewIterator(opts));
std::size_t count = 0;
@ -316,20 +193,20 @@ Result removeLargeRange(rocksdb::TransactionDB* db,
&upper);
if (!status.ok()) {
// if file deletion failed, we will still iterate over the remaining
// keys, so we
// don't need to abort and raise an error here
// keys, so we don't need to abort and raise an error here
LOG_TOPIC(WARN, arangodb::Logger::FIXME)
<< "RocksDB file deletion failed";
}
}
// go on and delete the remaining keys (delete files in range does not
// necessarily
// find them all, just complete files)
const rocksdb::Comparator* cmp = db->GetOptions().comparator;
// necessarily find them all, just complete files)
rocksdb::Comparator const* cmp = db->GetOptions().comparator;
rocksdb::WriteBatch batch;
rocksdb::ReadOptions readOptions;
readOptions.fill_cache = false;
std::unique_ptr<rocksdb::Iterator> it(
db->NewIterator(rocksdb::ReadOptions()));
db->NewIterator(readOptions));
it->Seek(lower);
while (it->Valid() && cmp->Compare(it->key(), upper) < 0) {

View File

@ -28,6 +28,7 @@
#include "Basics/Common.h"
#include "Basics/Result.h"
#include "Basics/RocksDBUtils.h"
#include "RocksDBEngine/RocksDBComparator.h"
#include "RocksDBEngine/RocksDBEngine.h"
#include "RocksDBEngine/RocksDBKey.h"
@ -77,11 +78,6 @@ class Methods;
}
namespace rocksutils {
enum StatusHint { none, document, collection, view, index, database, wal };
arangodb::Result convertStatus(rocksdb::Status const&,
StatusHint hint = StatusHint::none);
uint64_t uint64FromPersistent(char const* p);
void uint64ToPersistent(char* p, uint64_t value);
void uint64ToPersistent(std::string& out, uint64_t value);
@ -90,9 +86,6 @@ uint16_t uint16FromPersistent(char const* p);
void uint16ToPersistent(char* p, uint16_t value);
void uint16ToPersistent(std::string& out, uint16_t value);
std::pair<VPackSlice, std::unique_ptr<VPackBuffer<uint8_t>>> stripObjectIds(
VPackSlice const& inputSlice, bool checkBeforeCopy = true);
RocksDBTransactionState* toRocksTransactionState(transaction::Methods* trx);
RocksDBMethods* toRocksMethods(transaction::Methods* trx);
@ -133,9 +126,10 @@ void iterateBounds(
RocksDBKeyBounds const& bounds, T callback,
rocksdb::ReadOptions const& options = rocksdb::ReadOptions{}) {
auto cmp = globalRocksEngine()->cmp();
auto const end = bounds.end();
std::unique_ptr<rocksdb::Iterator> it(globalRocksDB()->NewIterator(options));
for (it->Seek(bounds.start());
it->Valid() && cmp->Compare(it->key(), bounds.end()) < 0; it->Next()) {
it->Valid() && cmp->Compare(it->key(), end) < 0; it->Next()) {
callback(it.get());
}
}

View File

@ -29,7 +29,6 @@
#include "Basics/StringRef.h"
#include "Basics/WriteLocker.h"
#include "Basics/fasthash.h"
#include "Logger/Logger.h"
#include "RocksDBEngine/RocksDBCommon.h"
@ -74,7 +73,7 @@ class RocksDBCuckooIndexEstimator {
uint16_t* _data;
public:
Slot(uint16_t* data) : _data(data) {}
explicit Slot(uint16_t* data) : _data(data) {}
~Slot() {
// Not responsible for anything
@ -82,7 +81,7 @@ class RocksDBCuckooIndexEstimator {
bool operator==(const Slot& other) { return _data == other._data; }
uint16_t* fingerprint() { return _data; }
uint16_t* fingerprint() const { return _data; }
uint16_t* counter() { return _data + 1; }
@ -91,9 +90,9 @@ class RocksDBCuckooIndexEstimator {
*counter() = 0;
}
bool isEqual(uint16_t fp) { return ((*fingerprint()) == fp); }
bool isEqual(uint16_t fp) const { return ((*fingerprint()) == fp); }
bool isEmpty() { return (*fingerprint()) == 0; }
bool isEmpty() const { return (*fingerprint()) == 0; }
};
enum SerializeFormat : char {
@ -108,6 +107,8 @@ class RocksDBCuckooIndexEstimator {
RocksDBCuckooIndexEstimator(uint64_t size)
: _randState(0x2636283625154737ULL),
_slotSize(2 * sizeof(uint16_t)), // Sort out offsets and alignments
_base(nullptr),
_allocBase(nullptr),
_nrUsed(0),
_nrCuckood(0),
_nrTotal(0),
@ -125,6 +126,8 @@ class RocksDBCuckooIndexEstimator {
RocksDBCuckooIndexEstimator(arangodb::StringRef const serialized)
: _randState(0x2636283625154737ULL),
_slotSize(2 * sizeof(uint16_t)), // Sort out offsets and alignments
_base(nullptr),
_allocBase(nullptr),
_nrUsed(0),
_nrCuckood(0),
_nrTotal(0),
@ -237,7 +240,7 @@ class RocksDBCuckooIndexEstimator {
return found;
}
bool insert(Key& k) {
bool insert(Key const& k) {
// insert the key k
//
// The inserted key will have its fingerprint input entered in the table. If
@ -271,8 +274,8 @@ class RocksDBCuckooIndexEstimator {
(*slot.counter())++;
}
}
_nrTotal++;
}
_nrTotal++;
return true;
}
@ -290,9 +293,9 @@ class RocksDBCuckooIndexEstimator {
uint64_t pos2 = hashToPos(hash2);
bool found = false;
_nrTotal--;
{
WRITE_LOCKER(guard, _bucketLock);
_nrTotal--;
Slot slot = findSlotNoCuckoo(pos1, pos2, fingerprint, found);
if (found) {
if (*slot.counter() <= 1) {
@ -305,27 +308,30 @@ class RocksDBCuckooIndexEstimator {
}
return true;
}
}
// If we get here we assume that the element was once inserted, but removed
// by cuckoo
// Reduce nrCuckood;
if (_nrCuckood > 0) {
--_nrCuckood;
// If we get here we assume that the element was once inserted, but removed
// by cuckoo
// Reduce nrCuckood;
if (_nrCuckood > 0) {
--_nrCuckood;
}
}
return false;
}
uint64_t capacity() const { return _size * SlotsPerBucket; }
// not thread safe. called only during tests
uint64_t nrUsed() const { return _nrUsed; }
// not thread safe. called only during tests
uint64_t nrCuckood() const { return _nrCuckood; }
private: // methods
uint64_t memoryUsage() const {
return sizeof(RocksDBCuckooIndexEstimator) + _allocSize;
}
private: // methods
Slot findSlotNoCuckoo(uint64_t pos1, uint64_t pos2, uint16_t fp,
bool& found) const {
found = false;

View File

@ -196,8 +196,10 @@ bool RocksDBEdgeIndexIterator::next(TokenCallback const& cb, size_t limit) {
_doUpdateBounds = true;
}
auto const end = _bounds.end();
while (_iterator->Valid() &&
(_index->_cmp->Compare(_iterator->key(), _bounds.end()) < 0)) {
(_index->_cmp->Compare(_iterator->key(), end) < 0)) {
StringRef edgeKey = RocksDBKey::primaryKey(_iterator->key());
// lookup real document

View File

@ -520,7 +520,8 @@ Result RocksDBFulltextIndex::applyQueryToken(transaction::Methods* trx,
// TODO: set options.iterate_upper_bound and remove compare?
// apply left to right logic, merging all current results with ALL previous
while (iter->Valid() && _cmp->Compare(iter->key(), bounds.end()) < 0) {
auto const end = bounds.end();
while (iter->Valid() && _cmp->Compare(iter->key(), end) < 0) {
rocksdb::Status s = iter->status();
if (!s.ok()) {
return rocksutils::convertStatus(s);

View File

@ -55,12 +55,8 @@ RocksDBIndex::RocksDBIndex(
_cachePresent(false),
_useCache(useCache) {
if (_useCache) {
//LOG_TOPIC(ERR, Logger::FIXME) << "creating cache";
createCache();
} else {
//LOG_TOPIC(ERR, Logger::FIXME) << "not creating cache";
}
}
}
RocksDBIndex::RocksDBIndex(TRI_idx_iid_t id, LogicalCollection* collection,
@ -94,14 +90,15 @@ void RocksDBIndex::toVelocyPackFigures(VPackBuilder& builder) const {
TRI_ASSERT(builder.isOpenObject());
Index::toVelocyPackFigures(builder);
builder.add("cacheInUse", VPackValue(useCache()));
if(useCache()){
if (useCache()) {
builder.add("cacheSize", VPackValue(_cache->size()));
double rate =_cache->hitRates().first;
auto hitRates = _cache->hitRates();
double rate = hitRates.first;
rate = std::isnan(rate) ? 0.0 : rate;
builder.add("cacheLiftimeHitRate", VPackValue(rate));
rate =_cache->hitRates().second;
builder.add("cacheLifeTimeHitRate", VPackValue(rate));
rate = hitRates.second;
rate = std::isnan(rate) ? 0.0 : rate;
builder.add("cacheWindowHitRate", VPackValue(rate));
builder.add("cacheWindowedHitRate", VPackValue(rate));
} else {
builder.add("cacheSize", VPackValue(0));
}
@ -162,7 +159,6 @@ int RocksDBIndex::drop() {
// Try to drop the cache as well.
if (_cachePresent) {
try {
TRI_ASSERT(_cachePresent);
TRI_ASSERT(CacheManagerFeature::MANAGER != nullptr);
CacheManagerFeature::MANAGER->destroyCache(_cache);
// Reset flag

View File

@ -42,7 +42,16 @@ class RocksDBKey {
RocksDBKey() = delete;
RocksDBKey(rocksdb::Slice slice)
: _type(static_cast<RocksDBEntryType>(slice.data()[0])),
_buffer(slice.data(), slice.size()){};
_buffer(slice.data(), slice.size()) {}
RocksDBKey(RocksDBKey const& other)
: _type(other._type), _buffer(other._buffer) {}
RocksDBKey(RocksDBKey&& other)
: _type(other._type), _buffer(std::move(other._buffer)) {}
RocksDBKey& operator=(RocksDBKey const& other) = delete;
RocksDBKey& operator=(RocksDBKey&& other) = delete;
//////////////////////////////////////////////////////////////////////////////
/// @brief Create a fully-specified database key

View File

@ -112,47 +112,6 @@ std::unique_ptr<rocksdb::Iterator> RocksDBReadOnlyMethods::NewIterator(
return std::unique_ptr<rocksdb::Iterator>(_db->NewIterator(opts));
}
// =================== RocksDBGlobalMethods ====================
/*
RocksDBGlobalMethods::RocksDBGlobalMethods(RocksDBTransactionState* state)
: RocksDBMethods(state) {
_db = rocksutils::globalRocksDB();
}
bool RocksDBGlobalMethods::Exists(RocksDBKey const& key) {
std::string val; // do not care about value
bool mayExists =
_db->KeyMayExist(_state->_rocksReadOptions, key.string(), &val, nullptr);
if (mayExists) {
rocksdb::Status s = _db->Get(_state->_rocksReadOptions, key.string(), &val);
return !s.IsNotFound(); }
return false;
}
arangodb::Result RocksDBGlobalMethods::Get(RocksDBKey const& key,
std::string* val) {
rocksdb::Status s = _db->Get(_state->_rocksReadOptions, key.string(), val);
return s.ok() ? arangodb::Result() : rocksutils::convertStatus(s);
}
arangodb::Result RocksDBGlobalMethods::Put(RocksDBKey const& key,
rocksdb::Slice const& val,
rocksutils::StatusHint h) {
rocksdb::Status s = _db->Put(_state->_rocksWriteOptions, key.string(), val);
return s.ok() ? arangodb::Result() : rocksutils::convertStatus(s, h);
}
arangodb::Result RocksDBGlobalMethods::Delete(RocksDBKey const& key) {
rocksdb::Status s = _db->Delete(_state->_rocksWriteOptions, key.string());
return s.ok() ? arangodb::Result() : rocksutils::convertStatus(s);}
std::unique_ptr<rocksdb::Iterator> RocksDBGlobalMethods::NewIterator(
rocksdb::ReadOptions
const& opts) {
return std::unique_ptr<rocksdb::Iterator>(_db->NewIterator(opts));
}*/
// =================== RocksDBTrxMethods ====================
RocksDBTrxMethods::RocksDBTrxMethods(RocksDBTransactionState* state)

View File

@ -61,9 +61,7 @@ class RocksDBSavePoint {
class RocksDBMethods {
public:
// RocksDBOperations(rocksdb::ReadOptions ro, rocksdb::WriteOptions wo) :
// _readOptions(ro), _writeOptions(wo) {}
RocksDBMethods(RocksDBTransactionState* state) : _state(state) {}
explicit RocksDBMethods(RocksDBTransactionState* state) : _state(state) {}
virtual ~RocksDBMethods() {}
rocksdb::ReadOptions const& readOptions();
@ -90,7 +88,7 @@ class RocksDBMethods {
// only implements GET and NewIterator
class RocksDBReadOnlyMethods : public RocksDBMethods {
public:
RocksDBReadOnlyMethods(RocksDBTransactionState* state);
explicit RocksDBReadOnlyMethods(RocksDBTransactionState* state);
bool Exists(RocksDBKey const&) override;
arangodb::Result Get(RocksDBKey const& key, std::string* val) override;
@ -135,7 +133,7 @@ private:
/// transactio wrapper, uses the current rocksdb transaction
class RocksDBTrxMethods : public RocksDBMethods {
public:
RocksDBTrxMethods(RocksDBTransactionState* state);
explicit RocksDBTrxMethods(RocksDBTransactionState* state);
bool Exists(RocksDBKey const&) override;
arangodb::Result Get(RocksDBKey const& key, std::string* val) override;

View File

@ -95,7 +95,7 @@ class RocksDBAllIndexIterator final : public IndexIterator {
private:
bool const _reverse;
std::unique_ptr<rocksdb::Iterator> _iterator;
RocksDBKeyBounds _bounds;
RocksDBKeyBounds const _bounds;
};
class RocksDBAnyIndexIterator final : public IndexIterator {
@ -120,7 +120,7 @@ class RocksDBAnyIndexIterator final : public IndexIterator {
RocksDBComparator const* _cmp;
std::unique_ptr<rocksdb::Iterator> _iterator;
RocksDBKeyBounds _bounds;
RocksDBKeyBounds const _bounds;
uint64_t _total;
uint64_t _returned;
};

View File

@ -107,12 +107,6 @@ class RocksDBTransactionState final : public TransactionState {
uint64_t keySize);
RocksDBMethods* rocksdbMethods();
/*rocksdb::Transaction* rocksTransaction() {
TRI_ASSERT(_rocksTransaction != nullptr);
return _rocksTransaction.get();
}
rocksdb::ReadOptions const& readOptions() const { return _rocksReadOptions; }
rocksdb::WriteOptions const& writeOptions() const { return _rocksWriteOptions; }*/
uint64_t sequenceNumber() const;

View File

@ -309,7 +309,7 @@ int RocksDBVPackIndex::fillElement(VPackBuilder& leased,
// value(s)
// + separator (NUL) byte
// - Value: primary key
elements.push_back(
elements.emplace_back(
RocksDBKey::UniqueIndexValue(_objectId, leased.slice()));
} else {
// Non-unique VPack index values are stored as follows:
@ -317,7 +317,7 @@ int RocksDBVPackIndex::fillElement(VPackBuilder& leased,
// value(s)
// + separator (NUL) byte + primary key
// - Value: empty
elements.push_back(
elements.emplace_back(
RocksDBKey::IndexValue(_objectId, key, leased.slice()));
hashes.push_back(leased.slice().normalizedHash());
}
@ -348,13 +348,13 @@ void RocksDBVPackIndex::addIndexValue(VPackBuilder& leased,
// Unique VPack index values are stored as follows:
// - Key: 7 + 8-byte object ID of index + VPack array with index value(s)
// - Value: primary key
elements.push_back(RocksDBKey::UniqueIndexValue(_objectId, leased.slice()));
elements.emplace_back(RocksDBKey::UniqueIndexValue(_objectId, leased.slice()));
} else {
// Non-unique VPack index values are stored as follows:
// - Key: 6 + 8-byte object ID of index + VPack array with index value(s)
// + primary key
// - Value: empty
elements.push_back(RocksDBKey::IndexValue(_objectId, key, leased.slice()));
elements.emplace_back(RocksDBKey::IndexValue(_objectId, key, leased.slice()));
hashes.push_back(leased.slice().normalizedHash());
}
}

View File

@ -102,7 +102,10 @@ class RocksDBValue {
RocksDBValue(RocksDBEntryType type, rocksdb::Slice slice)
: _type(type), _buffer(slice.data(), slice.size()) {}
RocksDBValue(RocksDBValue&& other)
: _type(other._type), _buffer(std::move(other._buffer)) {}
private:
RocksDBValue();
explicit RocksDBValue(RocksDBEntryType type);

View File

@ -56,8 +56,8 @@ function verifyCache(index){
expect(index.figures.cacheInUse).to.be.true;
expect(index.figures).to.be.ok;
expect(index.figures.cacheSize).to.be.a('number');
expect(index.figures.cacheLiftimeHitRate).to.be.a('number');
expect(index.figures.cacheWindowHitRate).to.be.a('number');
expect(index.figures.cacheLifeTimeHitRate).to.be.a('number');
expect(index.figures.cacheWindowedHitRate).to.be.a('number');
}

162
lib/Basics/RocksDBUtils.cpp Normal file
View File

@ -0,0 +1,162 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2014-2017 ArangoDB GmbH, Cologne, Germany
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Daniel H. Larkin
/// @author Jan Steemann
/// @author Jan Christoph Uhde
////////////////////////////////////////////////////////////////////////////////
#include "RocksDBUtils.h"
#include "Basics/StringRef.h"
#include <rocksdb/comparator.h>
#include <rocksdb/convenience.h>
#include <rocksdb/utilities/transaction_db.h>
#include <velocypack/Iterator.h>
namespace arangodb {
namespace rocksutils {
static bool hasObjectIds(VPackSlice const& inputSlice) {
bool rv = false;
if (inputSlice.isObject()) {
for (auto const& objectPair :
arangodb::velocypack::ObjectIterator(inputSlice)) {
if (arangodb::StringRef(objectPair.key) == "objectId") {
return true;
}
rv = hasObjectIds(objectPair.value);
if (rv) {
return rv;
}
}
} else if (inputSlice.isArray()) {
for (auto const& slice : arangodb::velocypack::ArrayIterator(inputSlice)) {
if (rv) {
return rv;
}
rv = hasObjectIds(slice);
}
}
return rv;
}
static VPackBuilder& stripObjectIdsImpl(VPackBuilder& builder, VPackSlice const& inputSlice) {
if (inputSlice.isObject()) {
builder.openObject();
for (auto const& objectPair :
arangodb::velocypack::ObjectIterator(inputSlice)) {
if (arangodb::StringRef(objectPair.key) == "objectId") {
continue;
}
builder.add(objectPair.key);
stripObjectIdsImpl(builder, objectPair.value);
}
builder.close();
} else if (inputSlice.isArray()) {
builder.openArray();
for (auto const& slice : arangodb::velocypack::ArrayIterator(inputSlice)) {
stripObjectIdsImpl(builder, slice);
}
builder.close();
} else {
builder.add(inputSlice);
}
return builder;
}
arangodb::Result convertStatus(rocksdb::Status const& status, StatusHint hint) {
switch (status.code()) {
case rocksdb::Status::Code::kOk:
return {TRI_ERROR_NO_ERROR};
case rocksdb::Status::Code::kNotFound:
switch (hint) {
case StatusHint::collection:
return {TRI_ERROR_ARANGO_COLLECTION_NOT_FOUND, status.ToString()};
case StatusHint::database:
return {TRI_ERROR_ARANGO_DATABASE_NOT_FOUND, status.ToString()};
case StatusHint::document:
return {TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND, status.ToString()};
case StatusHint::index:
return {TRI_ERROR_ARANGO_INDEX_NOT_FOUND, status.ToString()};
case StatusHint::view:
return {TRI_ERROR_ARANGO_VIEW_NOT_FOUND, status.ToString()};
case StatusHint::wal:
// suppress this error if the WAL is queried for changes that are not available
return {TRI_ERROR_NO_ERROR};
default:
return {TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND, status.ToString()};
}
case rocksdb::Status::Code::kCorruption:
return {TRI_ERROR_ARANGO_CORRUPTED_DATAFILE, status.ToString()};
case rocksdb::Status::Code::kNotSupported:
return {TRI_ERROR_NOT_IMPLEMENTED, status.ToString()};
case rocksdb::Status::Code::kInvalidArgument:
return {TRI_ERROR_BAD_PARAMETER, status.ToString()};
case rocksdb::Status::Code::kIOError:
if (status.subcode() == rocksdb::Status::SubCode::kNoSpace) {
return {TRI_ERROR_ARANGO_FILESYSTEM_FULL, status.ToString()};
}
return {TRI_ERROR_ARANGO_IO_ERROR, status.ToString()};
case rocksdb::Status::Code::kMergeInProgress:
return {TRI_ERROR_ARANGO_MERGE_IN_PROGRESS, status.ToString()};
case rocksdb::Status::Code::kIncomplete:
return {TRI_ERROR_INTERNAL, "'incomplete' error in storage engine"};
case rocksdb::Status::Code::kShutdownInProgress:
return {TRI_ERROR_SHUTTING_DOWN, status.ToString()};
case rocksdb::Status::Code::kTimedOut:
if (status.subcode() == rocksdb::Status::SubCode::kMutexTimeout ||
status.subcode() == rocksdb::Status::SubCode::kLockTimeout) {
// TODO: maybe add a separator error code/message here
return {TRI_ERROR_LOCK_TIMEOUT, status.ToString()};
}
return {TRI_ERROR_LOCK_TIMEOUT, status.ToString()};
case rocksdb::Status::Code::kAborted:
return {TRI_ERROR_TRANSACTION_ABORTED, status.ToString()};
case rocksdb::Status::Code::kBusy:
if (status.subcode() == rocksdb::Status::SubCode::kDeadlock) {
return {TRI_ERROR_DEADLOCK};
}
return {TRI_ERROR_ARANGO_CONFLICT};
case rocksdb::Status::Code::kExpired:
return {TRI_ERROR_INTERNAL, "key expired; TTL was set in error"};
case rocksdb::Status::Code::kTryAgain:
return {TRI_ERROR_ARANGO_TRY_AGAIN, status.ToString()};
default:
return {TRI_ERROR_INTERNAL, "unknown RocksDB status code"};
}
}
std::pair<VPackSlice, std::unique_ptr<VPackBuffer<uint8_t>>> stripObjectIds(
VPackSlice const& inputSlice, bool checkBeforeCopy) {
std::unique_ptr<VPackBuffer<uint8_t>> buffer;
if (checkBeforeCopy) {
if (!hasObjectIds(inputSlice)) {
return {inputSlice, std::move(buffer)};
}
}
buffer.reset(new VPackBuffer<uint8_t>);
VPackBuilder builder(*buffer);
stripObjectIdsImpl(builder, inputSlice);
return {VPackSlice(buffer->data()), std::move(buffer)};
}
}
}

52
lib/Basics/RocksDBUtils.h Normal file
View File

@ -0,0 +1,52 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2014-2017 ArangoDB GmbH, Cologne, Germany
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Daniel H. Larkin
/// @author Jan Steemann
/// @author Jan Christoph Uhde
////////////////////////////////////////////////////////////////////////////////
#ifndef ARANGO_ROCKSDB_UTILS_H
#define ARANGO_ROCKSDB_UTILS_H 1
#include "Basics/Common.h"
#include "Basics/Result.h"
#include <rocksdb/status.h>
#include <velocypack/Buffer.h>
#include <velocypack/Builder.h>
#include <velocypack/Slice.h>
#include <velocypack/velocypack-aliases.h>
namespace arangodb {
namespace rocksutils {
enum StatusHint { none, document, collection, view, index, database, wal };
arangodb::Result convertStatus(rocksdb::Status const&,
StatusHint hint = StatusHint::none);
std::pair<VPackSlice, std::unique_ptr<VPackBuffer<uint8_t>>> stripObjectIds(
VPackSlice const& inputSlice, bool checkBeforeCopy = true);
} // namespace rocksutils
} // namespace arangodb
#endif

View File

@ -136,6 +136,7 @@ add_library(${LIB_ARANGO} STATIC
Basics/OpenFilesTracker.cpp
Basics/ReadWriteLockCPP11.cpp
Basics/RocksDBLogger.cpp
Basics/RocksDBUtils.cpp
Basics/StaticStrings.cpp
Basics/StringBuffer.cpp
Basics/StringHeap.cpp