1
0
Fork 0

Merge branch 'engine-api' of github.com:arangodb/arangodb into engine-api

This commit is contained in:
Dan Larkin 2017-03-28 11:42:59 -04:00
commit 77c95181d8
5 changed files with 34 additions and 39 deletions

View File

@ -38,8 +38,8 @@
#include "RocksDBEngine/RocksDBCommon.h" #include "RocksDBEngine/RocksDBCommon.h"
#include "RocksDBEngine/RocksDBKey.h" #include "RocksDBEngine/RocksDBKey.h"
#include "RocksDBEngine/RocksDBToken.h" #include "RocksDBEngine/RocksDBToken.h"
#include "RocksDBEngine/RocksDBTypes.h"
#include "RocksDBEngine/RocksDBTransactionState.h" #include "RocksDBEngine/RocksDBTransactionState.h"
#include "RocksDBEngine/RocksDBTypes.h"
#include <rocksdb/db.h> #include <rocksdb/db.h>
#include <rocksdb/options.h> #include <rocksdb/options.h>
@ -86,25 +86,20 @@ bool RocksDBEdgeIndexIterator::next(TokenCallback const& cb, size_t limit) {
TRI_ASSERT(limit > 0); // Someone called with limit == 0. Api broken TRI_ASSERT(limit > 0); // Someone called with limit == 0. Api broken
return false; return false;
} }
// aquire rocksdb transaction // aquire rocksdb transaction
RocksDBTransactionState *state = rocksutils::toRocksTransactionState(_trx); RocksDBTransactionState* state = rocksutils::toRocksTransactionState(_trx);
rocksdb::Transaction *rtrx = state->rocksTransaction(); rocksdb::Transaction* rtrx = state->rocksTransaction();
auto rocksColl = RocksDBCollection::toRocksDBCollection(_collection); auto rocksColl = RocksDBCollection::toRocksDBCollection(_collection);
while (limit > 0) { while (limit > 0) {
VPackSlice fromTo = _iterator.value(); VPackSlice fromTo = _iterator.value();
TRI_ASSERT(fromTo.isString()); TRI_ASSERT(fromTo.isString());
// if (tmp.isObject()) {
// tmp = tmp.get(StaticStrings::IndexEq);
//}
RocksDBKey prefix = RocksDBKey prefix =
RocksDBKey::EdgeIndexPrefix(_index->_objectId, fromTo.copyString()); RocksDBKey::EdgeIndexPrefix(_index->_objectId, fromTo.copyString());
rocksdb::ReadOptions readOptions; std::unique_ptr<rocksdb::Iterator> iter(rtrx->GetIterator(state->readOptions()));
std::unique_ptr<rocksdb::Iterator> iter(
rtrx->GetIterator(readOptions));
rocksdb::Slice rSlice(prefix.string()); rocksdb::Slice rSlice(prefix.string());
iter->Seek(rSlice); iter->Seek(rSlice);
@ -113,7 +108,7 @@ bool RocksDBEdgeIndexIterator::next(TokenCallback const& cb, size_t limit) {
size_t edgeKeySize = iter->key().size() - rSlice.size(); size_t edgeKeySize = iter->key().size() - rSlice.size();
const char* edgeKey = iter->key().data() + rSlice.size(); const char* edgeKey = iter->key().data() + rSlice.size();
// TODO do we need to handle failed lookups here? // aquire the document token through the primary index
RocksDBToken token; RocksDBToken token;
Result res = rocksColl->lookupDocumentToken( Result res = rocksColl->lookupDocumentToken(
_trx, StringRef(edgeKey, edgeKeySize), token); _trx, StringRef(edgeKey, edgeKeySize), token);
@ -122,7 +117,7 @@ bool RocksDBEdgeIndexIterator::next(TokenCallback const& cb, size_t limit) {
if (--limit == 0) { if (--limit == 0) {
break; break;
} }
} } // TODO do we need to handle failed lookups here?
iter->Next(); iter->Next();
} }
@ -144,8 +139,8 @@ RocksDBEdgeIndex::RocksDBEdgeIndex(TRI_idx_iid_t iid,
arangodb::LogicalCollection* collection, arangodb::LogicalCollection* collection,
std::string const& attr) std::string const& attr)
: RocksDBIndex(iid, collection, std::vector<std::vector<AttributeName>>( : RocksDBIndex(iid, collection, std::vector<std::vector<AttributeName>>(
{{AttributeName(attr, false)}}), {{AttributeName(attr, false)}}),
false, false), false, false),
_directionAttr(attr) { _directionAttr(attr) {
/*std::vector<std::vector<arangodb::basics::AttributeName>>( /*std::vector<std::vector<arangodb::basics::AttributeName>>(
{{arangodb::basics::AttributeName(StaticStrings::FromString, {{arangodb::basics::AttributeName(StaticStrings::FromString,
@ -234,8 +229,9 @@ int RocksDBEdgeIndex::insert(transaction::Methods* trx,
// aquire rocksdb transaction // aquire rocksdb transaction
RocksDBTransactionState* state = rocksutils::toRocksTransactionState(trx); RocksDBTransactionState* state = rocksutils::toRocksTransactionState(trx);
rocksdb::Transaction* rtrx = state->rocksTransaction(); rocksdb::Transaction* rtrx = state->rocksTransaction();
rocksdb::Status status = rtrx->Put(rocksdb::Slice(key.string()), rocksdb::Slice()); rocksdb::Status status =
rtrx->Put(rocksdb::Slice(key.string()), rocksdb::Slice());
if (status.ok()) { if (status.ok()) {
return TRI_ERROR_NO_ERROR; return TRI_ERROR_NO_ERROR;
} else { } else {
@ -254,9 +250,9 @@ int RocksDBEdgeIndex::remove(transaction::Methods* trx,
primaryKey.copyString()); primaryKey.copyString());
// aquire rocksdb transaction // aquire rocksdb transaction
RocksDBTransactionState *state = rocksutils::toRocksTransactionState(trx); RocksDBTransactionState* state = rocksutils::toRocksTransactionState(trx);
rocksdb::Transaction *rtrx = state->rocksTransaction(); rocksdb::Transaction* rtrx = state->rocksTransaction();
rocksdb::Status status = rtrx->Delete(rocksdb::Slice(key.string())); rocksdb::Status status = rtrx->Delete(rocksdb::Slice(key.string()));
if (status.ok()) { if (status.ok()) {
return TRI_ERROR_NO_ERROR; return TRI_ERROR_NO_ERROR;
@ -271,11 +267,10 @@ void RocksDBEdgeIndex::batchInsert(
transaction::Methods* trx, transaction::Methods* trx,
std::vector<std::pair<TRI_voc_rid_t, VPackSlice>> const& documents, std::vector<std::pair<TRI_voc_rid_t, VPackSlice>> const& documents,
arangodb::basics::LocalTaskQueue* queue) { arangodb::basics::LocalTaskQueue* queue) {
// aquire rocksdb transaction // aquire rocksdb transaction
RocksDBTransactionState *state = rocksutils::toRocksTransactionState(trx); RocksDBTransactionState* state = rocksutils::toRocksTransactionState(trx);
rocksdb::Transaction *rtrx = state->rocksTransaction(); rocksdb::Transaction* rtrx = state->rocksTransaction();
for (std::pair<TRI_voc_rid_t, VPackSlice> const& doc : documents) { for (std::pair<TRI_voc_rid_t, VPackSlice> const& doc : documents) {
VPackSlice primaryKey = doc.second.get(StaticStrings::KeyString); VPackSlice primaryKey = doc.second.get(StaticStrings::KeyString);
VPackSlice fromTo = doc.second.get(_directionAttr); VPackSlice fromTo = doc.second.get(_directionAttr);

View File

@ -63,14 +63,15 @@ class RocksDBEdgeIndexIterator final : public IndexIterator {
arangodb::velocypack::ArrayIterator _iterator; arangodb::velocypack::ArrayIterator _iterator;
RocksDBEdgeIndex const* _index; RocksDBEdgeIndex const* _index;
}; };
class RocksDBEdgeIndex final : public RocksDBIndex { class RocksDBEdgeIndex final : public RocksDBIndex {
friend class RocksDBEdgeIndexIterator; friend class RocksDBEdgeIndexIterator;
public: public:
RocksDBEdgeIndex() = delete; RocksDBEdgeIndex() = delete;
RocksDBEdgeIndex(TRI_idx_iid_t, arangodb::LogicalCollection*, std::string const&); RocksDBEdgeIndex(TRI_idx_iid_t, arangodb::LogicalCollection*,
std::string const&);
~RocksDBEdgeIndex(); ~RocksDBEdgeIndex();

View File

@ -41,6 +41,7 @@
#include <rocksdb/options.h> #include <rocksdb/options.h>
#include <rocksdb/utilities/optimistic_transaction_db.h> #include <rocksdb/utilities/optimistic_transaction_db.h>
#include <rocksdb/utilities/transaction.h> #include <rocksdb/utilities/transaction.h>
#include <rocksdb/status.h>
using namespace arangodb; using namespace arangodb;
@ -75,6 +76,8 @@ int RocksDBTransactionState::beginTransaction(transaction::Hints hints) {
StorageEngine* engine = EngineSelectorFeature::ENGINE; StorageEngine* engine = EngineSelectorFeature::ENGINE;
rocksdb::TransactionDB* db = static_cast<RocksDBEngine*>(engine)->db(); rocksdb::TransactionDB* db = static_cast<RocksDBEngine*>(engine)->db();
_rocksTransaction.reset(db->BeginTransaction(rocksdb::WriteOptions(), rocksdb::TransactionOptions())); _rocksTransaction.reset(db->BeginTransaction(rocksdb::WriteOptions(), rocksdb::TransactionOptions()));
// _rocksTransaction->SetSnapshot()
} else { } else {
TRI_ASSERT(_status == transaction::Status::RUNNING); TRI_ASSERT(_status == transaction::Status::RUNNING);
} }
@ -172,3 +175,4 @@ int RocksDBTransactionState::addOperation(TRI_voc_rid_t revisionId,
THROW_ARANGO_NOT_YET_IMPLEMENTED(); THROW_ARANGO_NOT_YET_IMPLEMENTED();
return 0; return 0;
} }

View File

@ -32,12 +32,14 @@
#include "Transaction/Methods.h" #include "Transaction/Methods.h"
#include "VocBase/AccessMode.h" #include "VocBase/AccessMode.h"
#include "VocBase/voc-types.h" #include "VocBase/voc-types.h"
#include "rocksdb/status.h" #include <rocksdb/options.h>
#include "rocksdb/options.h"
struct TRI_vocbase_t; struct TRI_vocbase_t;
namespace rocksdb { namespace rocksdb {
class Transaction; class Transaction;
class Slice;
class Iterator;
} }
namespace arangodb { namespace arangodb {
@ -74,17 +76,17 @@ class RocksDBTransactionState final : public TransactionState {
rocksdb::Transaction* rocksTransaction() { rocksdb::Transaction* rocksTransaction() {
return _rocksTransaction.get(); return _rocksTransaction.get();
} }
rocksdb::ReadOptions& readOptions(){ rocksdb::ReadOptions const& readOptions(){
return _rocksReadOptions; return _rocksReadOptions;
} }
private: private:
std::unique_ptr<rocksdb::Transaction> _rocksTransaction; std::unique_ptr<rocksdb::Transaction> _rocksTransaction;
rocksdb::ReadOptions _rocksReadOptions; rocksdb::ReadOptions _rocksReadOptions;
bool _hasOperations; bool _hasOperations;
}; };
} }
#endif #endif

View File

@ -532,13 +532,6 @@ void ApplicationServer::prepare() {
void ApplicationServer::start() { void ApplicationServer::start() {
LOG_TOPIC(TRACE, Logger::STARTUP) << "ApplicationServer::start"; LOG_TOPIC(TRACE, Logger::STARTUP) << "ApplicationServer::start";
usleep(1000000);
usleep(1000000);
usleep(1000000);
usleep(1000000);
usleep(1000000);
usleep(1000000);
int res = TRI_ERROR_NO_ERROR; int res = TRI_ERROR_NO_ERROR;