mirror of https://gitee.com/bigwinds/arangodb
Merge branch 'engine-api' of https://github.com/arangodb/arangodb into engine-api
This commit is contained in:
commit
9e15fb4f35
|
@ -21,7 +21,6 @@
|
|||
/// @author Jan-Christoph Uhde
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
#include "RocksDBCollection.h"
|
||||
#include "Aql/PlanCache.h"
|
||||
#include "Basics/Result.h"
|
||||
#include "Basics/StaticStrings.h"
|
||||
|
@ -30,12 +29,16 @@
|
|||
#include "Indexes/Index.h"
|
||||
#include "Indexes/IndexIterator.h"
|
||||
#include "RestServer/DatabaseFeature.h"
|
||||
#include "RocksDBCollection.h"
|
||||
#include "RocksDBEngine/RocksDBCommon.h"
|
||||
#include "RocksDBEngine/RocksDBEngine.h"
|
||||
#include "RocksDBEngine/RocksDBKey.h"
|
||||
#include "RocksDBEngine/RocksDBPrimaryMockIndex.h"
|
||||
#include "RocksDBEngine/RocksDBToken.h"
|
||||
#include "RocksDBEngine/RocksDBValue.h"
|
||||
#include "RocksDBEngine/RocksDBTransactionState.h"
|
||||
#include "StorageEngine/EngineSelectorFeature.h"
|
||||
#include "StorageEngine/TransactionState.h"
|
||||
#include "StorageEngine/StorageEngine.h"
|
||||
#include "StorageEngine/TransactionState.h"
|
||||
#include "Transaction/Helpers.h"
|
||||
|
@ -49,8 +52,14 @@
|
|||
#include <velocypack/velocypack-aliases.h>
|
||||
|
||||
using namespace arangodb;
|
||||
using namespace arangodb::rocksutils;
|
||||
|
||||
static std::string const Empty;
|
||||
|
||||
rocksdb::TransactionDB* db() {
|
||||
StorageEngine* engine = EngineSelectorFeature::ENGINE;
|
||||
return static_cast<RocksDBEngine*>(engine)->db();
|
||||
}
|
||||
|
||||
RocksDBCollection::RocksDBCollection(LogicalCollection* collection,
|
||||
VPackSlice const& info)
|
||||
|
@ -399,12 +408,7 @@ int RocksDBCollection::insert(arangodb::transaction::Methods* trx,
|
|||
res = insertDocument(trx, revisionId, newSlice, options.waitForSync);
|
||||
|
||||
if (res == TRI_ERROR_NO_ERROR) {
|
||||
// TODO: handle returning of result value!
|
||||
|
||||
// uint8_t const* vpack = lookupRevisionVPack(revisionId);
|
||||
// if (vpack != nullptr) {
|
||||
// result.addExisting(vpack, revisionId);
|
||||
// }
|
||||
lookupRevisionVPack(revisionId, trx, result);
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
@ -483,13 +487,7 @@ int RocksDBCollection::replace(
|
|||
|
||||
res = updateDocument(trx, oldRevisionId, oldDoc, revisionId,
|
||||
VPackSlice(builder->slice()), options.waitForSync);
|
||||
/* TODO: handle result handling
|
||||
uint8_t const* vpack = lookupRevisionVPack(revisionId);
|
||||
if (vpack != nullptr) {
|
||||
result.addExisting(vpack, revisionId);
|
||||
}
|
||||
*/
|
||||
|
||||
lookupRevisionVPack(revisionId, trx, result);
|
||||
return res;
|
||||
}
|
||||
|
||||
|
@ -664,7 +662,7 @@ int RocksDBCollection::insertDocument(arangodb::transaction::Methods* trx,
|
|||
RocksDBValue value(RocksDBValue::Document(doc));
|
||||
|
||||
rocksdb::WriteBatch writeBatch;
|
||||
writeBatch.Put(key.key(), value.value());
|
||||
writeBatch.Put(key.string(), value.value());
|
||||
|
||||
auto indexes = _indexes;
|
||||
size_t const n = indexes.size();
|
||||
|
@ -723,7 +721,7 @@ int RocksDBCollection::removeDocument(arangodb::transaction::Methods* trx,
|
|||
auto key = RocksDBKey::Document(_objectId, revisionId);
|
||||
|
||||
rocksdb::WriteBatch writeBatch;
|
||||
writeBatch.Delete(key.key());
|
||||
writeBatch.Delete(key.string());
|
||||
|
||||
auto indexes = _indexes;
|
||||
size_t const n = indexes.size();
|
||||
|
@ -777,12 +775,7 @@ int RocksDBCollection::lookupDocument(transaction::Methods* trx, VPackSlice key,
|
|||
TRI_voc_rid_t revisionId = token.revisionId();
|
||||
|
||||
if (revisionId > 0) {
|
||||
// TODO: add result handling!
|
||||
/* uint8_t const* vpack = lookupRevisionVPack(revisionId);
|
||||
if (vpack != nullptr) {
|
||||
result.addExisting(vpack, revisionId);
|
||||
}
|
||||
*/
|
||||
lookupRevisionVPack(revisionId, trx, result);
|
||||
return TRI_ERROR_NO_ERROR;
|
||||
}
|
||||
|
||||
|
@ -805,3 +798,12 @@ Result RocksDBCollection::lookupDocumentToken(transaction::Methods* trx,
|
|||
outToken = primaryIndex()->lookupKey(trx, key);
|
||||
return outToken.revisionId() > 0 ? Result() : Result(TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND);
|
||||
}
|
||||
|
||||
void RocksDBCollection::lookupRevisionVPack(TRI_voc_rid_t revisionId, transaction::Methods* trx,arangodb::ManagedDocumentResult& result){
|
||||
auto key = RocksDBKey::Document(_objectId,revisionId);
|
||||
std::string value;
|
||||
TRI_ASSERT(value.data());
|
||||
auto* state = toRocksTransactionState(trx);
|
||||
state->rocksTransaction()->Get(rocksdb::ReadOptions(), rocksdb::Slice(key.string()), &value);
|
||||
result.setManaged(std::move(value), revisionId);
|
||||
}
|
||||
|
|
|
@ -18,7 +18,7 @@
|
|||
///
|
||||
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
|
||||
///
|
||||
/// @author Jan-Christoph Uhde
|
||||
/// @author Jan Christoph Uhde
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
#ifndef ARANGOD_ROCKSDB_ENGINE_ROCKSDB_COLLECTION_H
|
||||
|
@ -204,6 +204,9 @@ class RocksDBCollection final : public PhysicalCollection {
|
|||
arangodb::velocypack::Slice const& oldDoc, TRI_voc_rid_t newRevisionId,
|
||||
arangodb::velocypack::Slice const& newDoc, bool& waitForSync);
|
||||
|
||||
void lookupRevisionVPack(TRI_voc_rid_t, transaction::Methods*, arangodb::ManagedDocumentResult&);
|
||||
|
||||
|
||||
private:
|
||||
uint64_t const _objectId; // rocksdb-specific object id for collection
|
||||
};
|
||||
|
|
|
@ -23,6 +23,8 @@
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
#include "RocksDBEngine/RocksDBCommon.h"
|
||||
#include "RocksDBEngine/RocksDBTransactionState.h"
|
||||
#include "Transaction/Methods.h"
|
||||
|
||||
namespace arangodb {
|
||||
namespace rocksutils {
|
||||
|
@ -101,6 +103,13 @@ void uint64ToPersistent(std::string& p, uint64_t value) {
|
|||
value >>= 8;
|
||||
} while (++len < sizeof(uint64_t));
|
||||
}
|
||||
|
||||
RocksDBTransactionState* toRocksTransactionState(transaction::Methods* trx) {
|
||||
TRI_ASSERT(trx != nullptr);
|
||||
TransactionState* state = trx->state();
|
||||
TRI_ASSERT(state != nullptr);
|
||||
return static_cast<RocksDBTransactionState*>(trx->state());
|
||||
}
|
||||
|
||||
} // namespace rocksutils
|
||||
} // namespace arangodb
|
||||
|
|
|
@ -31,6 +31,9 @@
|
|||
#include <rocksdb/status.h>
|
||||
|
||||
namespace arangodb {
|
||||
class TransactionState;
|
||||
class RocksDBTransactionState;
|
||||
namespace transaction { class Methods;}
|
||||
namespace rocksutils {
|
||||
|
||||
enum StatusHint { none, document, collection, view, index, database };
|
||||
|
@ -40,7 +43,8 @@ arangodb::Result convertStatus(rocksdb::Status const&,
|
|||
|
||||
uint64_t uint64FromPersistent(char const* p);
|
||||
void uint64ToPersistent(char* p, uint64_t value);
|
||||
void uint64ToPersistent(std::string& out, uint64_t value);
|
||||
void uint64ToPersistent(std::string& out, uint64_t value);
|
||||
RocksDBTransactionState* toRocksTransactionState(transaction::Methods* trx);
|
||||
|
||||
} // namespace rocksutils
|
||||
} // namespace arangodb
|
||||
|
|
|
@ -37,8 +37,9 @@
|
|||
#include "RocksDBEngine/RocksDBCollection.h"
|
||||
#include "RocksDBEngine/RocksDBCommon.h"
|
||||
#include "RocksDBEngine/RocksDBKey.h"
|
||||
#include "RocksDBEngine/RocksDBTypes.h"
|
||||
#include "RocksDBEngine/RocksDBToken.h"
|
||||
#include "RocksDBEngine/RocksDBTypes.h"
|
||||
#include "RocksDBEngine/RocksDBTransactionState.h"
|
||||
|
||||
#include <rocksdb/db.h>
|
||||
#include <rocksdb/options.h>
|
||||
|
@ -78,46 +79,51 @@ RocksDBEdgeIndexIterator::~RocksDBEdgeIndexIterator() {
|
|||
|
||||
bool RocksDBEdgeIndexIterator::next(TokenCallback const& cb, size_t limit) {
|
||||
THROW_ARANGO_NOT_YET_IMPLEMENTED();
|
||||
|
||||
|
||||
if (limit == 0 || !_iterator.valid()) {
|
||||
// No limit no data, or we are actually done. The last call should have returned false
|
||||
TRI_ASSERT(limit > 0); // Someone called with limit == 0. Api broken
|
||||
// No limit no data, or we are actually done. The last call should have
|
||||
// returned false
|
||||
TRI_ASSERT(limit > 0); // Someone called with limit == 0. Api broken
|
||||
return false;
|
||||
}
|
||||
RocksDBCollection *rocksColl = RocksDBCollection::toRocksDBCollection(_collection);
|
||||
|
||||
|
||||
// aquire rocksdb transaction
|
||||
RocksDBTransactionState *state = rocksutils::toRocksTransactionState(_trx);
|
||||
rocksdb::Transaction *rtrx = state->rocksTransaction();
|
||||
auto rocksColl = RocksDBCollection::toRocksDBCollection(_collection);
|
||||
|
||||
while (limit > 0) {
|
||||
|
||||
VPackSlice fromTo = _iterator.value();
|
||||
TRI_ASSERT(fromTo.isString());
|
||||
//if (tmp.isObject()) {
|
||||
// if (tmp.isObject()) {
|
||||
// tmp = tmp.get(StaticStrings::IndexEq);
|
||||
//}
|
||||
|
||||
RocksDBKey prefix = RocksDBKey::EdgeIndexPrefix(_index->_objectId,
|
||||
fromTo.copyString());
|
||||
|
||||
|
||||
RocksDBKey prefix =
|
||||
RocksDBKey::EdgeIndexPrefix(_index->_objectId, fromTo.copyString());
|
||||
|
||||
rocksdb::ReadOptions readOptions;
|
||||
std::unique_ptr<rocksdb::Iterator> iter(_index->_db->NewIterator(readOptions));
|
||||
|
||||
rocksdb::Slice rSlice(prefix.key());
|
||||
std::unique_ptr<rocksdb::Iterator> iter(
|
||||
rtrx->GetIterator(readOptions));
|
||||
|
||||
rocksdb::Slice rSlice(prefix.string());
|
||||
iter->Seek(rSlice);
|
||||
while (iter->Valid() && iter->key().starts_with(rSlice)) {
|
||||
|
||||
TRI_ASSERT(iter->key().size() > rSlice.size());
|
||||
size_t edgeKeySize = iter->key().size() - rSlice.size();
|
||||
const char* edgeKey = iter->key().data() + rSlice.size();
|
||||
|
||||
|
||||
// TODO do we need to handle failed lookups here?
|
||||
RocksDBToken token;
|
||||
Result res = rocksColl->lookupDocumentToken(_trx, StringRef(edgeKey, edgeKeySize), token);
|
||||
Result res = rocksColl->lookupDocumentToken(
|
||||
_trx, StringRef(edgeKey, edgeKeySize), token);
|
||||
if (res.ok()) {
|
||||
cb(token);
|
||||
if (--limit == 0) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
iter->Next();
|
||||
}
|
||||
if (limit > 0) {
|
||||
|
@ -130,20 +136,16 @@ bool RocksDBEdgeIndexIterator::next(TokenCallback const& cb, size_t limit) {
|
|||
return true;
|
||||
}
|
||||
|
||||
void RocksDBEdgeIndexIterator::reset() {
|
||||
_iterator.reset();
|
||||
}
|
||||
void RocksDBEdgeIndexIterator::reset() { _iterator.reset(); }
|
||||
|
||||
// ============================= Index ====================================
|
||||
|
||||
RocksDBEdgeIndex::RocksDBEdgeIndex(rocksdb::TransactionDB* db,
|
||||
TRI_idx_iid_t iid,
|
||||
RocksDBEdgeIndex::RocksDBEdgeIndex(TRI_idx_iid_t iid,
|
||||
arangodb::LogicalCollection* collection,
|
||||
std::string const& attr)
|
||||
: RocksDBIndex(iid, collection, std::vector<std::vector<AttributeName>>(
|
||||
{{AttributeName(attr, false)}}),
|
||||
false, false),
|
||||
_db(db),
|
||||
_directionAttr(attr) {
|
||||
/*std::vector<std::vector<arangodb::basics::AttributeName>>(
|
||||
{{arangodb::basics::AttributeName(StaticStrings::FromString,
|
||||
|
@ -154,7 +156,7 @@ RocksDBEdgeIndex::RocksDBEdgeIndex(rocksdb::TransactionDB* db,
|
|||
#warning fix object ID
|
||||
TRI_voc_tick_t databaseId = collection->vocbase()->id();
|
||||
RocksDBKey entry = RocksDBKey::Index(databaseId, collection->cid(), iid);
|
||||
_objectId = 3413415 + iid;//entry.key;
|
||||
_objectId = 3413415 + iid; // entry.key;
|
||||
}
|
||||
|
||||
RocksDBEdgeIndex::~RocksDBEdgeIndex() {}
|
||||
|
@ -222,16 +224,18 @@ int RocksDBEdgeIndex::insert(transaction::Methods* trx,
|
|||
} else {
|
||||
key = doc.get(StaticStrings::FromString);
|
||||
}*/
|
||||
|
||||
|
||||
VPackSlice primaryKey = doc.get(StaticStrings::KeyString);
|
||||
VPackSlice fromTo = doc.get(_directionAttr);
|
||||
TRI_ASSERT(primaryKey.isString() && fromTo.isString());
|
||||
RocksDBKey key = RocksDBKey::EdgeIndexValue(_objectId, fromTo.copyString(),
|
||||
primaryKey.copyString());
|
||||
|
||||
rocksdb::WriteOptions writeOptions;
|
||||
rocksdb::Status status = _db->Put(
|
||||
writeOptions, rocksdb::Slice(key.key()), rocksdb::Slice());
|
||||
// aquire rocksdb transaction
|
||||
RocksDBTransactionState* state = rocksutils::toRocksTransactionState(trx);
|
||||
rocksdb::Transaction* rtrx = state->rocksTransaction();
|
||||
|
||||
rocksdb::Status status = rtrx->Put(rocksdb::Slice(key.string()), rocksdb::Slice());
|
||||
if (status.ok()) {
|
||||
return TRI_ERROR_NO_ERROR;
|
||||
} else {
|
||||
|
@ -248,54 +252,31 @@ int RocksDBEdgeIndex::remove(transaction::Methods* trx,
|
|||
TRI_ASSERT(primaryKey.isString() && fromTo.isString());
|
||||
RocksDBKey key = RocksDBKey::EdgeIndexValue(_objectId, fromTo.copyString(),
|
||||
primaryKey.copyString());
|
||||
|
||||
// aquire rocksdb transaction
|
||||
RocksDBTransactionState *state = rocksutils::toRocksTransactionState(trx);
|
||||
rocksdb::Transaction *rtrx = state->rocksTransaction();
|
||||
|
||||
rocksdb::WriteOptions writeOptions;
|
||||
rocksdb::Status status =
|
||||
_db->Delete(writeOptions, rocksdb::Slice(key.key()));
|
||||
rocksdb::Status status = rtrx->Delete(rocksdb::Slice(key.string()));
|
||||
if (status.ok()) {
|
||||
return TRI_ERROR_NO_ERROR;
|
||||
} else {
|
||||
Result res = rocksutils::convertStatus(status, rocksutils::StatusHint::index);
|
||||
Result res =
|
||||
rocksutils::convertStatus(status, rocksutils::StatusHint::index);
|
||||
return res.errorNumber();
|
||||
}
|
||||
}
|
||||
/*
|
||||
struct RDBEdgeInsertTask : public LocalTask {
|
||||
RocksDBEdgeIndex* _index;
|
||||
std::shared_ptr<rocksdb::Transaction> _rtrx;
|
||||
VPackSlice _doc;
|
||||
|
||||
RDBEdgeInsertTask(arangodb::basics::LocalTaskQueue* queue,
|
||||
RocksDBEdgeIndex* index,
|
||||
std::shared_ptr<rocksdb::Transaction> rtrx, VPackSlice doc)
|
||||
: LocalTask(queue), _index(index), _rtrx(rtrx), _doc(doc) {}
|
||||
|
||||
void run() override {
|
||||
size_t keySize;
|
||||
std::unique_ptr<char> key = buildIndexValue(
|
||||
_index->_objectId, _index->_directionAttr, _doc, keySize);
|
||||
|
||||
rocksdb::Status status =
|
||||
_rtrx->Put(rocksdb::Slice(key.get(), keySize), rocksdb::Slice());
|
||||
if (!status.ok()) {
|
||||
Result res = convertRocksDBStatus(status, StatusHint::index);
|
||||
_queue->setStatus(res.errorNumber());
|
||||
}
|
||||
}
|
||||
};*/
|
||||
|
||||
void RocksDBEdgeIndex::batchInsert(
|
||||
transaction::Methods* trx,
|
||||
std::vector<std::pair<TRI_voc_rid_t, VPackSlice>> const& documents,
|
||||
arangodb::basics::LocalTaskQueue* queue) {
|
||||
// setup rocksdb transaction
|
||||
rocksdb::WriteOptions writeOptions;
|
||||
rocksdb::TransactionOptions transactionOptions;
|
||||
std::unique_ptr<rocksdb::Transaction> rtxr(
|
||||
_db->BeginTransaction(writeOptions, transactionOptions));
|
||||
|
||||
for (std::pair<TRI_voc_rid_t, VPackSlice> doc : documents) {
|
||||
|
||||
|
||||
// aquire rocksdb transaction
|
||||
RocksDBTransactionState *state = rocksutils::toRocksTransactionState(trx);
|
||||
rocksdb::Transaction *rtrx = state->rocksTransaction();
|
||||
|
||||
for (std::pair<TRI_voc_rid_t, VPackSlice> const& doc : documents) {
|
||||
VPackSlice primaryKey = doc.second.get(StaticStrings::KeyString);
|
||||
VPackSlice fromTo = doc.second.get(_directionAttr);
|
||||
TRI_ASSERT(primaryKey.isString() && fromTo.isString());
|
||||
|
@ -303,43 +284,14 @@ void RocksDBEdgeIndex::batchInsert(
|
|||
primaryKey.copyString());
|
||||
|
||||
rocksdb::Status status =
|
||||
rtxr->Put(rocksdb::Slice(key.key()), rocksdb::Slice());
|
||||
rtrx->Put(rocksdb::Slice(key.string()), rocksdb::Slice());
|
||||
if (!status.ok()) {
|
||||
Result res = rocksutils::convertStatus(status, rocksutils::StatusHint::index);
|
||||
Result res =
|
||||
rocksutils::convertStatus(status, rocksutils::StatusHint::index);
|
||||
queue->setStatus(res.errorNumber());
|
||||
rtxr->Rollback();
|
||||
break;
|
||||
}
|
||||
|
||||
// auto task =
|
||||
// std::make_shared<RDBEdgeInsertTask>(queue, this, rtxr, doc.second);
|
||||
// queue->enqueue(task);
|
||||
}
|
||||
rocksdb::Status status = rtxr->Commit();
|
||||
if (!status.ok()) {
|
||||
Result res = rocksutils::convertStatus(status, rocksutils::StatusHint::index);
|
||||
queue->setStatus(res.errorNumber());
|
||||
}
|
||||
|
||||
// commit in callback called after all tasks finish
|
||||
/*std::shared_ptr<LocalCallbackTask> callback(
|
||||
new LocalCallbackTask(queue, [rtxr, queue] {
|
||||
rocksdb::Status status = rtxr->Commit();
|
||||
if (!status.ok()) {
|
||||
Result res = convertRocksDBStatus(status);
|
||||
queue->setStatus(res.errorNumber());
|
||||
}
|
||||
}));
|
||||
try {
|
||||
for (std::pair<TRI_voc_rid_t, VPackSlice> doc : documents) {
|
||||
auto task =
|
||||
std::make_shared<RDBEdgeInsertTask>(queue, this, rtxr, doc.second);
|
||||
queue->enqueue(task);
|
||||
}
|
||||
} catch (...) {
|
||||
queue->setStatus(TRI_ERROR_INTERNAL);
|
||||
}
|
||||
queue->enqueueCallback(callback);*/
|
||||
}
|
||||
|
||||
/// @brief unload the index data from memory
|
||||
|
|
|
@ -66,11 +66,11 @@ class RocksDBEdgeIndexIterator final : public IndexIterator {
|
|||
|
||||
class RocksDBEdgeIndex final : public RocksDBIndex {
|
||||
friend class RocksDBEdgeIndexIterator;
|
||||
|
||||
public:
|
||||
RocksDBEdgeIndex() = delete;
|
||||
|
||||
RocksDBEdgeIndex(rocksdb::TransactionDB*, TRI_idx_iid_t,
|
||||
arangodb::LogicalCollection*, std::string const&);
|
||||
RocksDBEdgeIndex(TRI_idx_iid_t, arangodb::LogicalCollection*, std::string const&);
|
||||
|
||||
~RocksDBEdgeIndex();
|
||||
|
||||
|
@ -145,7 +145,6 @@ class RocksDBEdgeIndex final : public RocksDBIndex {
|
|||
void handleValNode(VPackBuilder* keys,
|
||||
arangodb::aql::AstNode const* valNode) const;
|
||||
|
||||
rocksdb::TransactionDB* _db;
|
||||
std::string _directionAttr;
|
||||
uint64_t _objectId;
|
||||
};
|
||||
|
|
|
@ -356,7 +356,7 @@ int RocksDBEngine::writeCreateDatabaseMarker(TRI_voc_tick_t id,
|
|||
auto value = RocksDBValue::Database(slice);
|
||||
rocksdb::WriteOptions options; // TODO: check which options would make sense
|
||||
|
||||
rocksdb::Status res = _db->Put(options, key.key(), value.value());
|
||||
rocksdb::Status res = _db->Put(options, key.string(), value.value());
|
||||
if (res.ok()) {
|
||||
return TRI_ERROR_NO_ERROR;
|
||||
}
|
||||
|
@ -370,7 +370,7 @@ int RocksDBEngine::writeCreateCollectionMarker(TRI_voc_tick_t databaseId,
|
|||
auto value = RocksDBValue::Collection(slice);
|
||||
rocksdb::WriteOptions options; // TODO: check which options would make sense
|
||||
|
||||
rocksdb::Status res = _db->Put(options, key.key(), value.value());
|
||||
rocksdb::Status res = _db->Put(options, key.string(), value.value());
|
||||
if (res.ok()) {
|
||||
return TRI_ERROR_NO_ERROR;
|
||||
}
|
||||
|
@ -473,7 +473,7 @@ void RocksDBEngine::createIndex(TRI_vocbase_t* vocbase,
|
|||
auto value = RocksDBValue::Index(data);
|
||||
rocksdb::WriteOptions options; // TODO: check which options would make sense
|
||||
|
||||
rocksdb::Status res = _db->Put(options, key.key(), value.value());
|
||||
rocksdb::Status res = _db->Put(options, key.string(), value.value());
|
||||
if (!res.ok()) {
|
||||
// TODO: need translation for RocksDB errors
|
||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_INTERNAL);
|
||||
|
|
|
@ -336,10 +336,6 @@ std::shared_ptr<Index> RocksDBIndexFactory::prepareIndexFromSlice(
|
|||
iid = arangodb::Index::generateId();
|
||||
}
|
||||
|
||||
// no need to access this in every single index
|
||||
RocksDBEngine *engine = static_cast<RocksDBEngine*>(EngineSelectorFeature::ENGINE);
|
||||
rocksdb::TransactionDB *db = engine->db();
|
||||
|
||||
switch (type) {
|
||||
case arangodb::Index::TRI_IDX_TYPE_PRIMARY_INDEX: {
|
||||
if (!isClusterConstructor) {
|
||||
|
@ -356,7 +352,7 @@ std::shared_ptr<Index> RocksDBIndexFactory::prepareIndexFromSlice(
|
|||
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL,
|
||||
"cannot create edge index");
|
||||
}
|
||||
newIdx.reset(new arangodb::RocksDBEdgeIndex(db, iid, col, StaticStrings::FromString));
|
||||
newIdx.reset(new arangodb::RocksDBEdgeIndex(iid, col, StaticStrings::FromString));
|
||||
break;
|
||||
}
|
||||
//case arangodb::Index::TRI_IDX_TYPE_HASH_INDEX: {
|
||||
|
@ -388,12 +384,9 @@ void RocksDBIndexFactory::fillSystemIndexes(
|
|||
std::make_shared<arangodb::RocksDBPrimaryMockIndex>(col, builder.slice()));
|
||||
// create edges index
|
||||
if (col->type() == TRI_COL_TYPE_EDGE) {
|
||||
RocksDBEngine *engine = static_cast<RocksDBEngine*>(EngineSelectorFeature::ENGINE);
|
||||
rocksdb::TransactionDB *db = engine->db();
|
||||
|
||||
systemIndexes.emplace_back(
|
||||
std::make_shared<arangodb::RocksDBEdgeIndex>(db, 1, col, StaticStrings::FromString));
|
||||
std::make_shared<arangodb::RocksDBEdgeIndex>(1, col, StaticStrings::FromString));
|
||||
systemIndexes.emplace_back(
|
||||
std::make_shared<arangodb::RocksDBEdgeIndex>(db, 2, col, StaticStrings::ToString));
|
||||
std::make_shared<arangodb::RocksDBEdgeIndex>(2, col, StaticStrings::ToString));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -158,7 +158,7 @@ VPackSlice RocksDBKey::indexedVPack(rocksdb::Slice const& slice) {
|
|||
return indexedVPack(slice.data(), slice.size());
|
||||
}
|
||||
|
||||
std::string const& RocksDBKey::key() const { return _buffer; }
|
||||
std::string const& RocksDBKey::string() const { return _buffer; }
|
||||
|
||||
RocksDBKey::RocksDBKey(RocksDBEntryType type, uint64_t first)
|
||||
: _type(type), _buffer() {
|
||||
|
|
|
@ -200,7 +200,7 @@ class RocksDBKey {
|
|||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief Returns a reference to the full, constructed key
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
std::string const& key() const;
|
||||
std::string const& string() const;
|
||||
|
||||
private:
|
||||
RocksDBKey(RocksDBEntryType type, uint64_t first);
|
||||
|
|
|
@ -53,7 +53,7 @@ RocksDBPrimaryMockIndexIterator::RocksDBPrimaryMockIndexIterator(
|
|||
: IndexIterator(collection, trx, mmdr, index),
|
||||
_index(index),
|
||||
_keys(keys.get()),
|
||||
_iterator(_keys->slice()) {
|
||||
_iterator(_keys->slice()){
|
||||
keys.release(); // now we have ownership for _keys
|
||||
TRI_ASSERT(_keys->slice().isArray());
|
||||
}
|
||||
|
@ -67,27 +67,61 @@ RocksDBPrimaryMockIndexIterator::~RocksDBPrimaryMockIndexIterator() {
|
|||
|
||||
bool RocksDBPrimaryMockIndexIterator::next(TokenCallback const& cb,
|
||||
size_t limit) {
|
||||
THROW_ARANGO_NOT_YET_IMPLEMENTED();
|
||||
return false;
|
||||
if (limit == 0 || !_iterator.valid()) {
|
||||
// No limit no data, or we are actually done. The last call should have
|
||||
// returned false
|
||||
TRI_ASSERT(limit > 0); // Someone called with limit == 0. Api broken
|
||||
return false;
|
||||
}
|
||||
|
||||
ManagedDocumentResult result;
|
||||
while (limit > 0) {
|
||||
|
||||
RocksDBToken token = _index->lookupKey(_trx, *_iterator, result);
|
||||
cb(token);
|
||||
|
||||
_iterator.next();
|
||||
if (!_iterator.valid()) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
void RocksDBPrimaryMockIndexIterator::reset() { _iterator.reset(); }
|
||||
|
||||
RocksDBAllIndexIterator::RocksDBAllIndexIterator(
|
||||
LogicalCollection* collection, transaction::Methods* trx,
|
||||
ManagedDocumentResult* mmdr, RocksDBPrimaryMockIndex const* index,
|
||||
ManagedDocumentResult* mmdr, RocksDBPrimaryMockIndex const* index,
|
||||
bool reverse)
|
||||
: IndexIterator(collection, trx, mmdr, index),
|
||||
_reverse(reverse),
|
||||
_total(0) {}
|
||||
//_reverse(reverse),
|
||||
_keyRevMap(index->_keyRevMap),
|
||||
_iterator(index->_keyRevMap.begin()) {}
|
||||
|
||||
bool RocksDBAllIndexIterator::next(TokenCallback const& cb, size_t limit) {
|
||||
// TODO
|
||||
return false;
|
||||
if (limit == 0 || _iterator == _keyRevMap.end()) {
|
||||
// No limit no data, or we are actually done. The last call should have
|
||||
// returned false
|
||||
TRI_ASSERT(limit > 0); // Someone called with limit == 0. Api broken
|
||||
return false;
|
||||
}
|
||||
|
||||
while (limit > 0) {
|
||||
TRI_voc_rid_t revisionId = _iterator->second;
|
||||
cb(RocksDBToken(revisionId));
|
||||
|
||||
limit--;
|
||||
_iterator++;
|
||||
if (_iterator == _keyRevMap.end()) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
void RocksDBAllIndexIterator::reset() {
|
||||
// TODO
|
||||
_iterator = _keyRevMap.begin();
|
||||
}
|
||||
|
||||
RocksDBPrimaryMockIndex::RocksDBPrimaryMockIndex(
|
||||
|
@ -141,7 +175,7 @@ RocksDBToken RocksDBPrimaryMockIndex::lookupKey(transaction::Methods* trx, arang
|
|||
|
||||
RocksDBToken RocksDBPrimaryMockIndex::lookupKey(transaction::Methods* trx,
|
||||
VPackSlice slice,
|
||||
ManagedDocumentResult& result) {
|
||||
ManagedDocumentResult& result) const {
|
||||
std::string key = slice.copyString();
|
||||
std::lock_guard<std::mutex> lock(_keyRevMutex);
|
||||
LOG_TOPIC(ERR, Logger::FIXME) << "LOOKUP. THE KEY IS: " << key;
|
||||
|
|
|
@ -84,12 +84,14 @@ class RocksDBAllIndexIterator final : public IndexIterator {
|
|||
void reset() override;
|
||||
|
||||
private:
|
||||
bool const _reverse;
|
||||
uint64_t _total;
|
||||
//bool const _reverse;
|
||||
std::unordered_map<std::string, TRI_voc_rid_t> const& _keyRevMap;
|
||||
std::unordered_map<std::string, TRI_voc_rid_t>::const_iterator _iterator;
|
||||
};
|
||||
|
||||
class RocksDBPrimaryMockIndex final : public RocksDBIndex {
|
||||
friend class RocksDBPrimaryMockIndexIterator;
|
||||
friend class RocksDBAllIndexIterator;
|
||||
|
||||
public:
|
||||
RocksDBPrimaryMockIndex() = delete;
|
||||
|
@ -123,7 +125,8 @@ class RocksDBPrimaryMockIndex final : public RocksDBIndex {
|
|||
void toVelocyPackFigures(VPackBuilder&) const override;
|
||||
|
||||
RocksDBToken lookupKey(transaction::Methods* trx, arangodb::StringRef key);
|
||||
RocksDBToken lookupKey(transaction::Methods* trx, arangodb::velocypack::Slice key, ManagedDocumentResult& result);
|
||||
RocksDBToken lookupKey(transaction::Methods* trx, arangodb::velocypack::Slice key,
|
||||
ManagedDocumentResult& result) const;
|
||||
|
||||
int insert(transaction::Methods*, TRI_voc_rid_t, arangodb::velocypack::Slice const&, bool isRollback) override;
|
||||
|
||||
|
@ -150,7 +153,7 @@ class RocksDBPrimaryMockIndex final : public RocksDBIndex {
|
|||
|
||||
private:
|
||||
std::unordered_map<std::string, TRI_voc_rid_t> _keyRevMap;
|
||||
std::mutex _keyRevMutex;
|
||||
mutable std::mutex _keyRevMutex;
|
||||
};
|
||||
}
|
||||
|
||||
|
|
|
@ -68,7 +68,11 @@ class RocksDBTransactionState final : public TransactionState {
|
|||
|
||||
/// @brief add a WAL operation for a transaction collection
|
||||
int addOperation(TRI_voc_rid_t, RocksDBDocumentOperation&, RocksDBWalMarker const* marker, bool&);
|
||||
|
||||
|
||||
rocksdb::Transaction* rocksTransaction() {
|
||||
return _rocksTransaction.get();
|
||||
}
|
||||
|
||||
private:
|
||||
std::unique_ptr<rocksdb::Transaction> _rocksTransaction;
|
||||
bool _hasOperations;
|
||||
|
|
|
@ -33,14 +33,21 @@ namespace arangodb {
|
|||
|
||||
class ManagedDocumentResult {
|
||||
public:
|
||||
ManagedDocumentResult() : _length(0), _lastRevisionId(0), _vpack(nullptr),
|
||||
_managed(false) {}
|
||||
ManagedDocumentResult() :
|
||||
_length(0),
|
||||
_lastRevisionId(0),
|
||||
_vpack(nullptr),
|
||||
_managed(false),
|
||||
_useString(false) {}
|
||||
~ManagedDocumentResult() { reset(); }
|
||||
ManagedDocumentResult(ManagedDocumentResult const& other) = delete;
|
||||
ManagedDocumentResult& operator=(ManagedDocumentResult const& other) = delete;
|
||||
|
||||
ManagedDocumentResult& operator=(ManagedDocumentResult&& other){
|
||||
if (other._managed){
|
||||
if (other._useString){
|
||||
setManaged(std::move(other._string), other._lastRevisionId);
|
||||
}
|
||||
else if (other._managed){
|
||||
reset();
|
||||
_vpack = other._vpack;
|
||||
_length = other._length;
|
||||
|
@ -63,7 +70,7 @@ class ManagedDocumentResult {
|
|||
|
||||
//add unmanaged vpack
|
||||
inline void setUnmanaged(uint8_t const* vpack, TRI_voc_rid_t revisionId) {
|
||||
if(_managed) {
|
||||
if(_managed || _useString) {
|
||||
reset();
|
||||
}
|
||||
TRI_ASSERT(_length == 0);
|
||||
|
@ -86,24 +93,39 @@ class ManagedDocumentResult {
|
|||
_managed = true;
|
||||
}
|
||||
|
||||
inline void setManaged(std::string&& str, TRI_voc_rid_t revisionId) {
|
||||
reset();
|
||||
_string = std::move(str);
|
||||
_vpack = reinterpret_cast<uint8_t*>(const_cast<char *>(_string.data()));
|
||||
_lastRevisionId = revisionId;
|
||||
_useString = true;
|
||||
}
|
||||
|
||||
inline TRI_voc_rid_t lastRevisionId() const { return _lastRevisionId; }
|
||||
|
||||
void reset() noexcept {
|
||||
if(_managed) {
|
||||
delete _vpack;
|
||||
}
|
||||
_vpack = nullptr;
|
||||
_lastRevisionId = 0;
|
||||
_managed = false;
|
||||
_length = 0;
|
||||
|
||||
if(_useString){
|
||||
_string.clear();
|
||||
}
|
||||
_useString = false;
|
||||
|
||||
_lastRevisionId = 0;
|
||||
_vpack = nullptr;
|
||||
}
|
||||
|
||||
private:
|
||||
uint64_t _length;
|
||||
TRI_voc_rid_t _lastRevisionId;
|
||||
uint8_t* _vpack;
|
||||
std::string _string;
|
||||
bool _managed;
|
||||
|
||||
bool _useString;
|
||||
};
|
||||
|
||||
}
|
||||
|
|
|
@ -532,6 +532,13 @@ void ApplicationServer::prepare() {
|
|||
|
||||
void ApplicationServer::start() {
|
||||
LOG_TOPIC(TRACE, Logger::STARTUP) << "ApplicationServer::start";
|
||||
|
||||
usleep(1000000);
|
||||
usleep(1000000);
|
||||
usleep(1000000);
|
||||
usleep(1000000);
|
||||
usleep(1000000);
|
||||
usleep(1000000);
|
||||
|
||||
int res = TRI_ERROR_NO_ERROR;
|
||||
|
||||
|
|
Loading…
Reference in New Issue