1
0
Fork 0

Various changes

This commit is contained in:
Simon Grätzer 2017-05-19 16:43:00 +02:00
parent 024dc652ee
commit a99611e7c0
16 changed files with 186 additions and 185 deletions

View File

@ -630,23 +630,22 @@ void RocksDBCollection::truncate(transaction::Methods* trx,
// TODO FIXME -- improve transaction size
// TODO FIXME -- intermediate commit
TRI_ASSERT(_objectId != 0);
rocksdb::Comparator const* cmp = globalRocksEngine()->cmp();
TRI_voc_cid_t cid = _logicalCollection->cid();
RocksDBTransactionState* state = rocksutils::toRocksTransactionState(trx);
RocksDBMethods* mthd = state->rocksdbMethods();
// rocksdb::Transaction* rtrx = state->rocksTransaction();
// delete documents
RocksDBKeyBounds documentBounds =
RocksDBKeyBounds::CollectionDocuments(this->objectId());
auto const end = documentBounds.end();
std::unique_ptr<rocksdb::Iterator> iter = mthd->NewIterator();
RocksDBMethods* mthd = state->rocksdbMethods();
rocksdb::ReadOptions ro = mthd->readOptions();
rocksdb::Slice const end = documentBounds.end();
ro.iterate_upper_bound = &end;
std::unique_ptr<rocksdb::Iterator> iter = mthd->NewIterator(ro, RocksDBColumnFamily::none());
iter->Seek(documentBounds.start());
while (iter->Valid() && cmp->Compare(iter->key(), end) < 0) {
while (iter->Valid()) {
TRI_voc_rid_t revisionId = RocksDBKey::revisionId(iter->key());
VPackSlice key =
VPackSlice(iter->value().data()).get(StaticStrings::KeyString);
@ -657,7 +656,7 @@ void RocksDBCollection::truncate(transaction::Methods* trx,
// add possible log statement
state->prepareOperation(cid, revisionId, StringRef(key),
TRI_VOC_DOCUMENT_OPERATION_REMOVE);
Result r = mthd->Delete(iter->key());
Result r = mthd->Delete(RocksDBColumnFamily::none(), iter->key());
if (!r.ok()) {
THROW_ARANGO_EXCEPTION(r);
}
@ -687,7 +686,6 @@ void RocksDBCollection::truncate(transaction::Methods* trx,
void RocksDBCollection::truncateNoTrx(transaction::Methods* trx) {
TRI_ASSERT(_objectId != 0);
rocksdb::Comparator const* cmp = globalRocksEngine()->cmp();
TRI_voc_cid_t cid = _logicalCollection->cid();
rocksdb::TransactionDB *db = rocksutils::globalRocksDB();
@ -699,12 +697,14 @@ void RocksDBCollection::truncateNoTrx(transaction::Methods* trx) {
// isolate against newer writes
rocksdb::ReadOptions readOptions;
rocksdb::Slice end = documentBounds.end();
readOptions.upper_bound = &end;
readOptions.snapshot = state->rocksTransaction()->GetSnapshot();
std::unique_ptr<rocksdb::Iterator> iter(db->NewIterator(readOptions));
std::unique_ptr<rocksdb::Iterator> iter(db->NewIterator(RocksDBFamily::none(), readOptions));
iter->Seek(documentBounds.start());
while (iter->Valid() && cmp->Compare(iter->key(), documentBounds.end()) < 0) {
while (iter->Valid()) < 0) {
TRI_voc_rid_t revisionId = RocksDBKey::revisionId(iter->key());
VPackSlice key =
VPackSlice(iter->value().data()).get(StaticStrings::KeyString);
@ -1409,7 +1409,7 @@ RocksDBOperationResult RocksDBCollection::insertDocument(
blackListKey(key.string().data(), static_cast<uint32_t>(key.string().size()));
RocksDBMethods* mthd = rocksutils::toRocksMethods(trx);
res = mthd->Put(key, value.string());
res = mthd->Put(RocksDBColumnFamily::none(), key, value.string());
if (!res.ok()) {
// set keysize that is passed up to the crud operations
res.keySize(key.string().size());
@ -1467,7 +1467,7 @@ RocksDBOperationResult RocksDBCollection::removeDocument(
// Simon: actually we do, because otherwise the counter recovery is broken
// if (!isUpdate) {
RocksDBMethods* mthd = rocksutils::toRocksMethods(trx);
RocksDBOperationResult res = mthd->Delete(key);
RocksDBOperationResult res = mthd->Delete(RocksDBColumnFamily::none(), key);
if (!res.ok()) {
return res;
}
@ -1582,7 +1582,7 @@ arangodb::Result RocksDBCollection::lookupRevisionVPack(
}
RocksDBMethods* mthd = rocksutils::toRocksMethods(trx);
Result res = mthd->Get(key, &value);
Result res = mthd->Get(RocksDBColumnFamily::none(), key, &value);
TRI_ASSERT(value.data());
if (res.ok()) {
if (withCache && useCache()) {

View File

@ -30,7 +30,7 @@ namespace arangodb {
struct RocksDBColumnFamily {
friend class RocksDBEngine;
static rocksdb::ColumnFamilyHandle* none() { return _index; }
static rocksdb::ColumnFamilyHandle* none() { return _none; }
static rocksdb::ColumnFamilyHandle* index() { return _index; }

View File

@ -124,12 +124,11 @@ std::vector<std::pair<RocksDBKey, RocksDBValue>> viewKVPairs(
template <typename T> // T is a invokeable that takes a rocksdb::Iterator*
void iterateBounds(
RocksDBKeyBounds const& bounds, T callback,
rocksdb::ReadOptions const& options = rocksdb::ReadOptions{}) {
auto cmp = globalRocksEngine()->cmp();
auto const end = bounds.end();
rocksdb::ReadOptions options = rocksdb::ReadOptions()) {
rocksdb::Slice const end = bounds.end();
options.iterate_upper_bound = &end;
std::unique_ptr<rocksdb::Iterator> it(globalRocksDB()->NewIterator(options));
for (it->Seek(bounds.start());
it->Valid() && cmp->Compare(it->key(), end) < 0; it->Next()) {
for (it->Seek(bounds.start()); it->Valid(); it->Next()) {
callback(it.get());
}
}

View File

@ -29,64 +29,31 @@
#include "RocksDBEngine/RocksDBPrefixExtractor.h"
#include "RocksDBEngine/RocksDBTypes.h"
#include <velocypack/Iterator.h>
#include <velocypack/velocypack-aliases.h>
using namespace arangodb;
using namespace arangodb::velocypack;
RocksDBComparator::RocksDBComparator() {}
RocksDBComparator::~RocksDBComparator() {}
int RocksDBComparator::Compare(rocksdb::Slice const& lhs,
rocksdb::Slice const& rhs) const {
RocksDBEntryType t = RocksDBKey::type(lhs);
TRI_ASSERT(t == RocksDBEntryType::IndexValue
|| t == RocksDBEntryType::UniqueIndexValue);
// type is first byte of every key
if (lhs[0] != rhs[0]) {
return ((lhs[0] < rhs[0]) ? -1 : 1);
}
return compareIndexValues(lhs, rhs);
/*switch (RocksDBKey::type(lhs)) {
case RocksDBEntryType::IndexValue:
case RocksDBEntryType::UniqueIndexValue: {
}
default: {
return compareLexicographic(lhs, rhs);
}
}*/
}
bool RocksDBComparator::Equal(rocksdb::Slice const& lhs, rocksdb::Slice const& rhs) const {
if (lhs[0] != rhs[0]) {
return false;
}
switch (RocksDBKey::type(lhs)) {
case RocksDBEntryType::IndexValue:
case RocksDBEntryType::UniqueIndexValue: {
return (compareIndexValues(lhs, rhs) == 0);
}
default: {
if (lhs.size() != rhs.size()) {
return false;
}
return (memcmp(lhs.data(), rhs.data(), lhs.size()) == 0);
}
}
inline static VPackSlice indexedVPack(rocksdb::Slice const& slice) {
TRI_ASSERT(slice.size() > (sizeof(char) + sizeof(uint64_t)));
return VPackSlice(slice.data() + sizeof(char) + sizeof(uint64_t));
}
int RocksDBComparator::compareIndexValues(rocksdb::Slice const& lhs,
rocksdb::Slice const& rhs) const {
int result =
memcmp((lhs.data() + sizeof(char)), (rhs.data() + sizeof(char)), sizeof(uint64_t));
if (result != 0) {
return result;
TRI_ASSERT(RocksDBKey::type(lhs) == RocksDBEntryType::IndexValue
|| RocksDBKey::type(lhs) == RocksDBEntryType::UniqueIndexValue);
TRI_ASSERT(RocksDBKey::type(rhs) == RocksDBKey::type(rhs));
constexpr size_t prefixLength = RocksDBPrefixExtractor::getIndexPrefixLength();
int r = memcmp(lhs.data(), rhs.data(), prefixLength);
if (r != 0) {
return r;
}
size_t prefixLength = RocksDBPrefixExtractor::getPrefixLength(
static_cast<RocksDBEntryType>(lhs[0]));
if (lhs.size() == prefixLength || rhs.size() == prefixLength) {
if (lhs.size() == rhs.size()) {
return 0;
@ -98,12 +65,12 @@ int RocksDBComparator::compareIndexValues(rocksdb::Slice const& lhs,
TRI_ASSERT(lhs.size() > sizeof(char) + sizeof(uint64_t));
TRI_ASSERT(rhs.size() > sizeof(char) + sizeof(uint64_t));
VPackSlice const lSlice = RocksDBKey::indexedVPack(lhs);
VPackSlice const rSlice = RocksDBKey::indexedVPack(rhs);
VPackSlice const lSlice = indexedVPack(lhs);
VPackSlice const rSlice = indexedVPack(rhs);
result = compareIndexedValues(lSlice, rSlice);
if (result != 0) {
return result;
r = compareIndexedValues(lSlice, rSlice);
if (r != 0) {
return r;
}
constexpr size_t offset = sizeof(char) + sizeof(uint64_t);
@ -116,9 +83,9 @@ int RocksDBComparator::compareIndexValues(rocksdb::Slice const& lhs,
size_t minSize = ((lSize <= rSize) ? lSize : rSize);
if (minSize > 0) {
result = memcmp(lBase, rBase, minSize);
if (result != 0) {
return result;
r = memcmp(lBase, rBase, minSize);
if (r != 0) {
return r;
}
}
@ -134,18 +101,21 @@ int RocksDBComparator::compareIndexedValues(VPackSlice const& lhs,
TRI_ASSERT(lhs.isArray());
TRI_ASSERT(rhs.isArray());
size_t const lLength = lhs.length();
size_t const rLength = rhs.length();
size_t const n = lLength < rLength ? rLength : lLength;
for (size_t i = 0; i < n; ++i) {
VPackArrayIterator lhsIter(lhs);
VPackArrayIterator rhsIter(rhs);
size_t const lLength = lhsIter.size();
size_t const rLength = rhsIter.size();
while (lhsIter.valid() || rhsIter.valid()) {
size_t i = lhsIter.index();
int res = arangodb::basics::VelocyPackHelper::compare(
(i < lLength ? lhs[i] : VPackSlice::noneSlice()),
(i < rLength ? rhs[i] : VPackSlice::noneSlice()), true);
(i < lLength ? *lhsIter : VPackSlice::noneSlice()),
(i < rLength ? *rhsIter : VPackSlice::noneSlice()), true);
if (res != 0) {
return res;
}
++lhsIter;
++rhsIter;
}
if (lLength != rLength) {

View File

@ -46,9 +46,13 @@ class RocksDBComparator final : public rocksdb::Comparator {
/// > 0 if lhs > rhs
/// 0 if lhs == rhs
//////////////////////////////////////////////////////////////////////////////
int Compare(rocksdb::Slice const& lhs, rocksdb::Slice const& rhs) const override;
int Compare(rocksdb::Slice const& lhs, rocksdb::Slice const& rhs) const override {
return compareIndexValues(lhs, rhs);
}
bool Equal(rocksdb::Slice const& lhs, rocksdb::Slice const& rhs) const override;
bool Equal(rocksdb::Slice const& lhs, rocksdb::Slice const& rhs) const override {
return (compareIndexValues(lhs, rhs) == 0);
}
// SECTION: API compatibility
char const* Name() const override { return "ArangoRocksDBComparator2"; }

View File

@ -437,7 +437,12 @@ struct WBReader final : public rocksdb::WriteBatch::Handler {
TRI_HybridLogicalClock(_maxHLC);
}
bool prepKey(const rocksdb::Slice& key) {
bool prepKey(uint32_t column_family_id, const rocksdb::Slice& key) {
// TODO this needs changing if we decide to add more column fams
if (column_family_id != 0) {
return false;
}
if (RocksDBKey::type(key) == RocksDBEntryType::Document) {
uint64_t objectId = RocksDBKey::counterObjectId(key);
auto const& it = seqStart.find(objectId);
@ -516,9 +521,10 @@ struct WBReader final : public rocksdb::WriteBatch::Handler {
}
}
void Put(const rocksdb::Slice& key, const rocksdb::Slice& value) override {
rocksdb::Status PutCF(uint32_t column_family_id, const rocksdb::Slice& key,
const rocksdb::Slice& value) override {
updateMaxTick(key, value);
if (prepKey(key)) {
if (prepKey(column_family_id, key)) {
uint64_t objectId = RocksDBKey::counterObjectId(key);
uint64_t revisionId = RocksDBKey::revisionId(key);
@ -555,10 +561,11 @@ struct WBReader final : public rocksdb::WriteBatch::Handler {
break;
}
}
return rocksdb::Status();
}
void Delete(const rocksdb::Slice& key) override {
if (prepKey(key)) {
rocksdb::Status DeleteCF(uint32_t column_family_id, const rocksdb::Slice& key) override {
if (prepKey(column_family_id, key)) {
uint64_t objectId = RocksDBKey::counterObjectId(key);
uint64_t revisionId = RocksDBKey::revisionId(key);
@ -595,9 +602,12 @@ struct WBReader final : public rocksdb::WriteBatch::Handler {
break;
}
}
return rocksdb::Status();
}
void SingleDelete(const rocksdb::Slice& key) override { Delete(key); }
rocksdb::Status SingleDeleteCF(uint32_t column_family_id, const rocksdb::Slice& key) override {
return DeleteCF(column_family_id, key);
}
};
/// parse the WAL with the above handler parser class

View File

@ -67,7 +67,8 @@ RocksDBEdgeIndexIterator::RocksDBEdgeIndexIterator(
_keys(keys.get()),
_keysIterator(_keys->slice()),
_index(index),
_iterator(rocksutils::toRocksMethods(trx)->NewIterator()),
_iterator(
rocksutils::toRocksMethods(trx)->NewIterator(index->columnFamily())),
_arrayIterator(VPackSlice::emptyArraySlice()),
_bounds(RocksDBKeyBounds::EdgeIndex(0)),
_doUpdateBounds(true),
@ -202,7 +203,8 @@ bool RocksDBEdgeIndexIterator::next(TokenCallback const& cb, size_t limit) {
StringRef edgeKey = RocksDBKey::primaryKey(_iterator->key());
// lookup real document
bool continueWithNextBatch = lookupDocumentAndUseCb(edgeKey, cb, limit, token);
bool continueWithNextBatch =
lookupDocumentAndUseCb(edgeKey, cb, limit, token);
// build cache value for from/to
if (_useCache) {
if (_cacheValueSize <= cacheValueSizeLimit) {
@ -241,17 +243,18 @@ bool RocksDBEdgeIndexIterator::next(TokenCallback const& cb, size_t limit) {
}
// acquire the document token through the primary index
bool RocksDBEdgeIndexIterator::lookupDocumentAndUseCb(
StringRef primaryKey, TokenCallback const& cb,
size_t& limit, RocksDBToken& token){
//we pass the token in as ref to avoid allocations
bool RocksDBEdgeIndexIterator::lookupDocumentAndUseCb(StringRef primaryKey,
TokenCallback const& cb,
size_t& limit,
RocksDBToken& token) {
// we pass the token in as ref to avoid allocations
auto rocksColl = toRocksDBCollection(_collection);
Result res = rocksColl->lookupDocumentToken(_trx, primaryKey, token);
if (res.ok()) {
cb(token);
--limit;
if (limit == 0) {
_doUpdateBounds=false; //limit hit continue with next batch
_doUpdateBounds = false; // limit hit continue with next batch
return true;
}
} // TODO do we need to handle failed lookups here?
@ -360,7 +363,7 @@ int RocksDBEdgeIndex::insert(transaction::Methods* trx,
// acquire rocksdb transaction
RocksDBMethods* mthd = rocksutils::toRocksMethods(trx);
Result r = mthd->Put(rocksdb::Slice(key.string()), rocksdb::Slice(),
Result r = mthd->Put(_cf, rocksdb::Slice(key.string()), rocksdb::Slice(),
rocksutils::index);
if (r.ok()) {
std::hash<StringRef> hasher;
@ -392,7 +395,7 @@ int RocksDBEdgeIndex::remove(transaction::Methods* trx,
// acquire rocksdb transaction
RocksDBMethods* mthd = rocksutils::toRocksMethods(trx);
Result res = mthd->Delete(rocksdb::Slice(key.string()));
Result res = mthd->Delete(_cf, rocksdb::Slice(key.string()));
if (res.ok()) {
std::hash<StringRef> hasher;
uint64_t hash = static_cast<uint64_t>(hasher(fromToRef));
@ -423,7 +426,7 @@ void RocksDBEdgeIndex::batchInsert(
RocksDBKey::EdgeIndexValue(_objectId, fromToRef, StringRef(primaryKey));
blackListKey(fromToRef);
Result r = mthd->Put(rocksdb::Slice(key.string()), rocksdb::Slice(),
Result r = mthd->Put(_cf, rocksdb::Slice(key.string()), rocksdb::Slice(),
rocksutils::index);
if (!r.ok()) {
queue->setStatus(r.errorNumber());

View File

@ -60,7 +60,7 @@ DocumentIdentifierToken RocksDBFulltextIndex::toDocumentIdentifierToken(
RocksDBFulltextIndex::RocksDBFulltextIndex(
TRI_idx_iid_t iid, arangodb::LogicalCollection* collection,
VPackSlice const& info)
: RocksDBIndex(iid, collection, info, RocksDBColumnFamily::none()),
: RocksDBIndex(iid, collection, info, RocksDBColumnFamily::none()),
_minWordLength(TRI_FULLTEXT_MIN_WORD_LENGTH_DEFAULT) {
TRI_ASSERT(iid != 0);
@ -197,8 +197,8 @@ int RocksDBFulltextIndex::insert(transaction::Methods* trx,
if (words.empty()) {
return TRI_ERROR_NO_ERROR;
}
RocksDBMethods *mthd = rocksutils::toRocksMethods(trx);
RocksDBMethods* mthd = rocksutils::toRocksMethods(trx);
// now we are going to construct the value to insert into rocksdb
// unique indexes have a different key structure
StringRef docKey(doc.get(StaticStrings::KeyString));
@ -210,7 +210,7 @@ int RocksDBFulltextIndex::insert(transaction::Methods* trx,
RocksDBKey key =
RocksDBKey::FulltextIndexValue(_objectId, StringRef(word), docKey);
Result r = mthd->Put(key, value.string(), rocksutils::index);
Result r = mthd->Put(_cf, key, value.string(), rocksutils::index);
if (!r.ok()) {
res = r.errorNumber();
break;
@ -227,8 +227,7 @@ int RocksDBFulltextIndex::insert(transaction::Methods* trx,
return res;
}
int RocksDBFulltextIndex::insertRaw(RocksDBMethods* batch,
TRI_voc_rid_t,
int RocksDBFulltextIndex::insertRaw(RocksDBMethods* batch, TRI_voc_rid_t,
arangodb::velocypack::Slice const& doc) {
std::set<std::string> words = wordlist(doc);
if (words.empty()) {
@ -243,7 +242,7 @@ int RocksDBFulltextIndex::insertRaw(RocksDBMethods* batch,
for (std::string const& word : words) {
RocksDBKey key =
RocksDBKey::FulltextIndexValue(_objectId, StringRef(word), docKey);
batch->Put(key, value.string());
batch->Put(_cf, key, value.string());
}
return TRI_ERROR_NO_ERROR;
@ -259,7 +258,7 @@ int RocksDBFulltextIndex::remove(transaction::Methods* trx,
return TRI_ERROR_OUT_OF_MEMORY;
}
RocksDBMethods *mthd = rocksutils::toRocksMethods(trx);
RocksDBMethods* mthd = rocksutils::toRocksMethods(trx);
// now we are going to construct the value to insert into rocksdb
// unique indexes have a different key structure
StringRef docKey(doc.get(StaticStrings::KeyString));
@ -268,7 +267,7 @@ int RocksDBFulltextIndex::remove(transaction::Methods* trx,
RocksDBKey key =
RocksDBKey::FulltextIndexValue(_objectId, StringRef(word), docKey);
Result r = mthd->Delete(key);
Result r = mthd->Delete(_cf, key);
if (!r.ok()) {
res = r.errorNumber();
break;
@ -277,8 +276,7 @@ int RocksDBFulltextIndex::remove(transaction::Methods* trx,
return res;
}
int RocksDBFulltextIndex::removeRaw(RocksDBMethods* batch,
TRI_voc_rid_t,
int RocksDBFulltextIndex::removeRaw(RocksDBMethods* batch, TRI_voc_rid_t,
arangodb::velocypack::Slice const& doc) {
std::set<std::string> words = wordlist(doc);
// now we are going to construct the value to insert into rocksdb
@ -287,7 +285,7 @@ int RocksDBFulltextIndex::removeRaw(RocksDBMethods* batch,
for (std::string const& word : words) {
RocksDBKey key =
RocksDBKey::FulltextIndexValue(_objectId, StringRef(word), docKey);
batch->Delete(key);
batch->Delete(_cf, key);
}
return TRI_ERROR_NO_ERROR;
}
@ -508,16 +506,16 @@ static RocksDBKeyBounds MakeBounds(uint64_t oid,
Result RocksDBFulltextIndex::applyQueryToken(transaction::Methods* trx,
FulltextQueryToken const& token,
std::set<std::string>& resultSet) {
RocksDBMethods *mthds = rocksutils::toRocksMethods(trx);
RocksDBMethods* mthds = rocksutils::toRocksMethods(trx);
// why can't I have an assignment operator when I want one
RocksDBKeyBounds bounds = MakeBounds(_objectId, token);
rocksdb::Slice upper = bounds.end();
rocksdb::ReadOptions ro = mthds->readOptions();
ro.iterate_upper_bound = &upper;
std::unique_ptr<rocksdb::Iterator> iter = mthds->NewIterator(ro, _cf);
iter->Seek(bounds.start());
// set is used to perform an intersection with the result set
std::set<std::string> intersect;
// apply left to right logic, merging all current results with ALL previous

View File

@ -281,7 +281,7 @@ void GeoIndex_clearRocks(GeoIdx* gi) {
}
inline void RocksRead(GeoIx * gix, RocksDBKey const& key, std::string *val) {
arangodb::Result r = gix->rocksMethods->Get(key, val);
arangodb::Result r = gix->rocksMethods->Get(RocksDBColumnFamily::none(), key, val);
if (!r.ok()) {
THROW_ARANGO_EXCEPTION_MESSAGE(r.errorNumber(), r.errorMessage());
}
@ -300,7 +300,8 @@ inline void RocksWrite(GeoIx * gix,
THROW_ARANGO_EXCEPTION_MESSAGE(r.errorNumber(), r.errorMessage());
}
} else {
arangodb::Result r = gix->rocksMethods->Put(key, slice, rocksutils::index);
arangodb::Result r = gix->rocksMethods->Put(RocksDBColumnFamily::none(),
key, slice, rocksutils::index);
if (!r.ok()) {
THROW_ARANGO_EXCEPTION_MESSAGE(r.errorNumber(), r.errorMessage());
}
@ -308,7 +309,8 @@ inline void RocksWrite(GeoIx * gix,
}
inline void RocksDelete(GeoIx* gix, RocksDBKey const& key) {
arangodb::Result r = gix->rocksMethods->Delete(key);
arangodb::Result r = gix->rocksMethods->Delete(RocksDBColumnFamily::none(),
key);
if (!r.ok()) {
THROW_ARANGO_EXCEPTION_MESSAGE(r.errorNumber(), r.errorMessage());
}

View File

@ -202,7 +202,7 @@ void RocksDBIndex::truncate(transaction::Methods* trx) {
iter->Seek(indexBounds.start());
while (iter->Valid()) {
Result r = mthds->Delete(iter->key());
Result r = mthds->Delete(_cf, iter->key());
if (!r.ok()) {
THROW_ARANGO_EXCEPTION(r);
}

View File

@ -24,8 +24,8 @@
#define ARANGOD_ROCKSDB_ROCKSDB_METHODS_H 1
#include "Basics/Result.h"
#include "RocksDBCommon.h"
#include "RocksDBColumnFamily.h"
#include "RocksDBCommon.h"
namespace rocksdb {
class Transaction;
@ -67,37 +67,23 @@ class RocksDBMethods {
rocksdb::ReadOptions const& readOptions();
bool Exists(RocksDBKey const& key) {
return this->Exists(RocksDBColumnFamily::none(), key);
}
virtual bool Exists(rocksdb::ColumnFamilyHandle*, RocksDBKey const&) = 0;
arangodb::Result Get(RocksDBKey const& key,
std::string* val){
return this->Get(RocksDBColumnFamily::none(), key, val);
}
virtual arangodb::Result Get(rocksdb::ColumnFamilyHandle*, RocksDBKey const&,
std::string*) = 0;
arangodb::Result Put(RocksDBKey const& key, rocksdb::Slice const& val,
rocksutils::StatusHint hint = rocksutils::StatusHint::none){
return this->Put(RocksDBColumnFamily::none(), key, val, hint);
}
virtual arangodb::Result Put(
rocksdb::ColumnFamilyHandle*, RocksDBKey const&, rocksdb::Slice const&,
rocksutils::StatusHint hint = rocksutils::StatusHint::none) = 0;
// virtual arangodb::Result Merge(RocksDBKey const&, rocksdb::Slice const&) =
// 0;
arangodb::Result Delete(RocksDBKey const& key){
return this->Delete(RocksDBColumnFamily::none(), key);
}
virtual arangodb::Result Delete(rocksdb::ColumnFamilyHandle*,
RocksDBKey const&) = 0;
std::unique_ptr<rocksdb::Iterator> NewIterator() {
return this->NewIterator(this->readOptions(), RocksDBColumnFamily::none());
std::unique_ptr<rocksdb::Iterator> NewIterator(
rocksdb::ColumnFamilyHandle* cf) {
return this->NewIterator(this->readOptions(), cf);
}
virtual std::unique_ptr<rocksdb::Iterator> NewIterator(rocksdb::ReadOptions const&, rocksdb::ColumnFamilyHandle*) = 0;
virtual std::unique_ptr<rocksdb::Iterator> NewIterator(
rocksdb::ReadOptions const&, rocksdb::ColumnFamilyHandle*) = 0;
virtual void SetSavePoint() = 0;
virtual arangodb::Result RollbackToSavePoint() = 0;

View File

@ -47,6 +47,7 @@ class RocksDBPrefixExtractor final : public rocksdb::SliceTransform {
bool InRange(rocksdb::Slice const& dst) const;
static size_t getPrefixLength(RocksDBEntryType type);
static constexpr size_t getIndexPrefixLength() { return 9; }
private:
const std::string _name;

View File

@ -367,7 +367,7 @@ RocksDBToken RocksDBPrimaryIndex::lookupKey(transaction::Methods* trx,
auto& options = mthds->readOptions();
TRI_ASSERT(options.snapshot != nullptr);
arangodb::Result r = mthds->Get(key, value.buffer());
arangodb::Result r = mthds->Get(_cf, key, value.buffer());
if (!r.ok()) {
return RocksDBToken();
}
@ -403,13 +403,13 @@ int RocksDBPrimaryIndex::insert(transaction::Methods* trx,
// acquire rocksdb transaction
RocksDBMethods* mthd = rocksutils::toRocksMethods(trx);
if (mthd->Exists(key)) {
if (mthd->Exists(_cf, key)) {
return TRI_ERROR_ARANGO_UNIQUE_CONSTRAINT_VIOLATED;
}
blackListKey(key.string().data(), static_cast<uint32_t>(key.string().size()));
Result status = mthd->Put(key, value.string(), rocksutils::index);
Result status = mthd->Put(_cf, key, value.string(), rocksutils::index);
return status.errorNumber();
}
@ -429,7 +429,7 @@ int RocksDBPrimaryIndex::remove(transaction::Methods* trx,
// acquire rocksdb transaction
RocksDBMethods* mthds = rocksutils::toRocksMethods(trx);
Result r = mthds->Delete(key);
Result r = mthds->Delete(_cf, key);
//rocksutils::convertStatus(status, rocksutils::StatusHint::index);
return r.errorNumber();
}

View File

@ -84,7 +84,7 @@ class WALParser : public rocksdb::WriteBatch::Handler {
TRI_DEFER(_lastLogType = type);
LOG_TOPIC(_LOG, Logger::ROCKSDB) << "tick: " << _currentSequence << " "
<< rocksDBLogTypeName(type);
<< rocksDBLogTypeName(type);
tick();
switch (type) {
case RocksDBLogType::DatabaseCreate: {
@ -168,7 +168,7 @@ class WALParser : public rocksdb::WriteBatch::Handler {
_seenBeginTransaction = true;
_currentDbId = RocksDBLogValue::databaseId(blob);
_currentTrxId = RocksDBLogValue::transactionId(blob);
_builder.openObject();
_builder.add("tick", VPackValue(std::to_string(_currentSequence)));
_builder.add(
@ -181,7 +181,8 @@ class WALParser : public rocksdb::WriteBatch::Handler {
}
case RocksDBLogType::DocumentOperationsPrologue: {
_currentCollectionId = RocksDBLogValue::collectionId(blob);
LOG_TOPIC(_LOG, Logger::ROCKSDB) << "Collection: " << _currentCollectionId;
LOG_TOPIC(_LOG, Logger::ROCKSDB) << "Collection: "
<< _currentCollectionId;
break;
}
case RocksDBLogType::DocumentRemove: {
@ -206,14 +207,16 @@ class WALParser : public rocksdb::WriteBatch::Handler {
}
}
void Put(rocksdb::Slice const& key, rocksdb::Slice const& value) override {
rocksdb::Status PutCF(uint32_t column_family_id, rocksdb::Slice const& key,
rocksdb::Slice const& value) override {
RocksDBEntryType type = RocksDBKey::type(key);
LOG_TOPIC(_LOG, Logger::ROCKSDB) << "tick: " << _currentSequence << " "
<< rocksDBEntryTypeName(type);
LOG_TOPIC(_LOG, Logger::ROCKSDB) << "PUT: key:" << key.ToString() << " value: " << value.ToString();
<< rocksDBEntryTypeName(type);
LOG_TOPIC(_LOG, Logger::ROCKSDB) << "PUT: key:" << key.ToString()
<< " value: " << value.ToString();
tick();
if (!shouldHandleKey(key)) {
return;
if (!shouldHandleKey(column_family_id, key)) {
return rocksdb::Status();
}
switch (type) {
case RocksDBEntryType::Collection: {
@ -279,20 +282,28 @@ class WALParser : public rocksdb::WriteBatch::Handler {
default:
break; // shouldn't get here?
}
return rocksdb::Status();
}
void Delete(rocksdb::Slice const& key) override { handleDeletion(key); }
rocksdb::Status DeleteCF(uint32_t column_family_id,
rocksdb::Slice const& key) override {
return handleDeletion(column_family_id, key);
}
void SingleDelete(rocksdb::Slice const& key) override { handleDeletion(key); }
rocksdb::Status SingleDeleteCF(uint32_t column_family_id,
rocksdb::Slice const& key) override {
return handleDeletion(column_family_id, key);
}
void handleDeletion(rocksdb::Slice const& key) {
rocksdb::Status handleDeletion(uint32_t column_family_id,
rocksdb::Slice const& key) {
RocksDBEntryType type = RocksDBKey::type(key);
LOG_TOPIC(_LOG, Logger::ROCKSDB) << "tick: " << _currentSequence << " "
<< rocksDBEntryTypeName(type);
<< rocksDBEntryTypeName(type);
tick();
if (!shouldHandleKey(key)) {
return;
if (!shouldHandleKey(column_family_id, key)) {
return rocksdb::Status();
}
switch (type) {
case RocksDBEntryType::Collection: {
@ -300,7 +311,7 @@ class WALParser : public rocksdb::WriteBatch::Handler {
if (_lastLogType == RocksDBLogType::CollectionDrop) {
TRI_ASSERT(_currentDbId != 0 && _currentCollectionId != 0);
LOG_TOPIC(_LOG, Logger::ROCKSDB) << "CID: " << _currentCollectionId;
_builder.openObject();
_builder.add("tick", VPackValue(std::to_string(_currentSequence)));
_builder.add("type", VPackValue(REPLICATION_COLLECTION_DROP));
@ -317,16 +328,16 @@ class WALParser : public rocksdb::WriteBatch::Handler {
case RocksDBEntryType::Document: {
// document removes, because of a drop is not transactional and
// should not appear in the WAL. Allso fixes
if (!shouldHandleKey(key) || !(_seenBeginTransaction || _singleOp)) {
return;
if (!(_seenBeginTransaction || _singleOp)) {
return rocksdb::Status();
}
// TODO somehow fix counters if we optimize the DELETE in
// documentRemove on updates
if (_lastLogType != RocksDBLogType::DocumentRemove &&
_lastLogType != RocksDBLogType::SingleRemove) {
return;
return rocksdb::Status();
}
//TRI_ASSERT(_lastLogType == RocksDBLogType::DocumentRemove ||
// TRI_ASSERT(_lastLogType == RocksDBLogType::DocumentRemove ||
// _lastLogType == RocksDBLogType::SingleRemove);
TRI_ASSERT(!_seenBeginTransaction || _currentTrxId != 0);
TRI_ASSERT(_currentDbId != 0 && _currentCollectionId != 0);
@ -358,6 +369,7 @@ class WALParser : public rocksdb::WriteBatch::Handler {
default:
break; // shouldn't get here?
}
return rocksdb::Status();
}
void startNewBatch(rocksdb::SequenceNumber startSequence) {
@ -365,17 +377,17 @@ class WALParser : public rocksdb::WriteBatch::Handler {
_startSequence = startSequence;
_currentSequence = startSequence;
}
void writeCommitMarker() {
if (_seenBeginTransaction) {
LOG_TOPIC(_LOG, Logger::PREGEL) << "tick: " << _currentSequence
<< " commit transaction";
<< " commit transaction";
_builder.openObject();
_builder.add("tick", VPackValue(std::to_string(_currentSequence)));
_builder.add(
"type",
VPackValue(static_cast<uint64_t>(REPLICATION_TRANSACTION_COMMIT)));
"type",
VPackValue(static_cast<uint64_t>(REPLICATION_TRANSACTION_COMMIT)));
_builder.add("database", VPackValue(std::to_string(_currentDbId)));
_builder.add("tid", VPackValue(std::to_string(_currentTrxId)));
_builder.close();
@ -408,7 +420,13 @@ class WALParser : public rocksdb::WriteBatch::Handler {
}
}
bool shouldHandleKey(rocksdb::Slice const& key) const {
bool shouldHandleKey(uint32_t column_family_id,
rocksdb::Slice const& key) const {
// TODO reconsider if we add new colum fams
if (column_family_id != 0) {
return false;
}
TRI_voc_cid_t cid;
switch (RocksDBKey::type(key)) {
case RocksDBEntryType::Collection: {
@ -524,7 +542,8 @@ RocksDBReplicationResult rocksutils::tailWal(TRI_vocbase_t* vocbase,
s = batch.writeBatchPtr->Iterate(handler.get());
if (s.ok()) {
lastWrittenTick = handler->endBatch();
LOG_TOPIC(_LOG, Logger::ROCKSDB) << "End WriteBatch written-tick: " << lastWrittenTick;
LOG_TOPIC(_LOG, Logger::ROCKSDB) << "End WriteBatch written-tick: "
<< lastWrittenTick;
handler->endBatch();
if (!fromTickIncluded && lastTick >= tickStart && lastTick <= tickEnd) {
fromTickIncluded = true;

View File

@ -173,8 +173,8 @@ RocksDBVPackIndex::RocksDBVPackIndex(TRI_idx_iid_t iid,
_useExpansion(false),
_allowPartialIndex(true),
_estimator(nullptr) {
_cf = _unique ? RocksDBColumnFamily::uniqueIndex() :
RocksDBColumnFamily::index();
_cf = _unique ? RocksDBColumnFamily::uniqueIndex()
: RocksDBColumnFamily::index();
if (!_unique && !ServerState::instance()->isCoordinator()) {
// We activate the estimator for all non unique-indexes.
// And only on DBServers
@ -337,7 +337,7 @@ void RocksDBVPackIndex::addIndexValue(VPackBuilder& leased,
std::vector<VPackSlice>& sliceStack,
std::vector<uint64_t>& hashes) {
leased.clear();
leased.openArray();
leased.openArray(true); // unindexed
for (VPackSlice const& s : sliceStack) {
leased.add(s);
}
@ -348,13 +348,15 @@ void RocksDBVPackIndex::addIndexValue(VPackBuilder& leased,
// Unique VPack index values are stored as follows:
// - Key: 7 + 8-byte object ID of index + VPack array with index value(s)
// - Value: primary key
elements.emplace_back(RocksDBKey::UniqueIndexValue(_objectId, leased.slice()));
elements.emplace_back(
RocksDBKey::UniqueIndexValue(_objectId, leased.slice()));
} else {
// Non-unique VPack index values are stored as follows:
// - Key: 6 + 8-byte object ID of index + VPack array with index value(s)
// + primary key
// - Value: empty
elements.emplace_back(RocksDBKey::IndexValue(_objectId, key, leased.slice()));
elements.emplace_back(
RocksDBKey::IndexValue(_objectId, key, leased.slice()));
hashes.push_back(leased.slice().normalizedHash());
}
}
@ -531,13 +533,14 @@ int RocksDBVPackIndex::insert(transaction::Methods* trx,
if (_unique) {
RocksDBValue existing =
RocksDBValue::Empty(RocksDBEntryType::UniqueIndexValue);
if (mthds->Exists(key)) {
if (mthds->Exists(_cf, key)) {
res = TRI_ERROR_ARANGO_UNIQUE_CONSTRAINT_VIOLATED;
}
}
if (res == TRI_ERROR_NO_ERROR) {
arangodb::Result r = mthds->Put(key, value.string(), rocksutils::index);
arangodb::Result r =
mthds->Put(_cf, key, value.string(), rocksutils::index);
if (!r.ok()) {
// auto status =
// rocksutils::convertStatus(s, rocksutils::StatusHint::index);
@ -547,7 +550,7 @@ int RocksDBVPackIndex::insert(transaction::Methods* trx,
if (res != TRI_ERROR_NO_ERROR) {
for (size_t j = 0; j < i; ++j) {
mthds->Delete(elements[j]);
mthds->Delete(_cf, elements[j]);
}
if (res == TRI_ERROR_ARANGO_UNIQUE_CONSTRAINT_VIOLATED && !_unique) {
@ -568,7 +571,7 @@ int RocksDBVPackIndex::insert(transaction::Methods* trx,
return res;
}
int RocksDBVPackIndex::insertRaw(RocksDBMethods* writeBatch,
int RocksDBVPackIndex::insertRaw(RocksDBMethods* batch,
TRI_voc_rid_t revisionId,
VPackSlice const& doc) {
std::vector<RocksDBKey> elements;
@ -593,12 +596,12 @@ int RocksDBVPackIndex::insertRaw(RocksDBMethods* writeBatch,
for (RocksDBKey const& key : elements) {
if (_unique) {
rocksdb::ReadOptions readOpts;
if (writeBatch->Exists(key)) {
if (batch->Exists(_cf, key)) {
res = TRI_ERROR_ARANGO_UNIQUE_CONSTRAINT_VIOLATED;
}
}
if (res == TRI_ERROR_NO_ERROR) {
writeBatch->Put(key, value.string(), rocksutils::index);
batch->Put(_cf, key, value.string(), rocksutils::index);
}
}
@ -634,7 +637,7 @@ int RocksDBVPackIndex::remove(transaction::Methods* trx,
size_t const count = elements.size();
for (size_t i = 0; i < count; ++i) {
arangodb::Result r = mthds->Delete(elements[i]);
arangodb::Result r = mthds->Delete(_cf, elements[i]);
if (!r.ok()) {
res = r.errorNumber();
}
@ -669,7 +672,7 @@ int RocksDBVPackIndex::removeRaw(RocksDBMethods* writeBatch,
size_t const count = elements.size();
for (size_t i = 0; i < count; ++i) {
writeBatch->Delete(elements[i]);
writeBatch->Delete(_cf, elements[i]);
}
for (auto& it : hashes) {

View File

@ -524,6 +524,12 @@ void ApplicationServer::prepare() {
void ApplicationServer::start() {
LOG_TOPIC(TRACE, Logger::STARTUP) << "ApplicationServer::start";
usleep(1000000);
usleep(1000000);
usleep(1000000);
usleep(1000000);
usleep(1000000);
int res = TRI_ERROR_NO_ERROR;