diff --git a/arangod/RocksDBEngine/RocksDBCollection.cpp b/arangod/RocksDBEngine/RocksDBCollection.cpp index 26c9a7b40d..28510ae9b3 100644 --- a/arangod/RocksDBEngine/RocksDBCollection.cpp +++ b/arangod/RocksDBEngine/RocksDBCollection.cpp @@ -54,12 +54,20 @@ using namespace arangodb; using namespace arangodb::rocksutils; +namespace { + static std::string const Empty; -rocksdb::TransactionDB* db() { +static rocksdb::TransactionDB* db() { StorageEngine* engine = EngineSelectorFeature::ENGINE; return static_cast(engine)->db(); } + +static inline rocksdb::Transaction* rocksTransaction(arangodb::transaction::Methods* trx) { + return static_cast(trx->state())->rocksTransaction(); +} + +} RocksDBCollection::RocksDBCollection(LogicalCollection* collection, VPackSlice const& info) @@ -661,8 +669,10 @@ int RocksDBCollection::insertDocument(arangodb::transaction::Methods* trx, RocksDBKey key(RocksDBKey::Document(_objectId, revisionId)); RocksDBValue value(RocksDBValue::Document(doc)); - rocksdb::WriteBatch writeBatch; - writeBatch.Put(key.string(), value.value()); + rocksdb::Transaction* rtrx = rocksTransaction(trx); + RocksDBSavePoint guard(rtrx); + + rtrx->Put(key.string(), value.value()); auto indexes = _indexes; size_t const n = indexes.size(); @@ -688,24 +698,13 @@ int RocksDBCollection::insertDocument(arangodb::transaction::Methods* trx, } if (result != TRI_ERROR_NO_ERROR) { - rocksdb::WriteOptions writeOptions; - if (_logicalCollection->waitForSync()) { waitForSync = true; } if (waitForSync) { trx->state()->waitForSync(true); - - // handle waitForSync for single operations here - if (trx->state()->isSingleOperation()) { - writeOptions.sync = true; - } } - - StorageEngine* engine = EngineSelectorFeature::ENGINE; - rocksdb::TransactionDB* db = static_cast(engine)->db(); - db->Write(writeOptions, &writeBatch); } return result; @@ -720,8 +719,11 @@ int RocksDBCollection::removeDocument(arangodb::transaction::Methods* trx, auto key = RocksDBKey::Document(_objectId, revisionId); - rocksdb::WriteBatch writeBatch; - writeBatch.Delete(key.string()); + rocksdb::Transaction* rtrx = rocksTransaction(trx); + + RocksDBSavePoint guard(rtrx); + + rtrx->Delete(key.string()); auto indexes = _indexes; size_t const n = indexes.size(); @@ -740,24 +742,13 @@ int RocksDBCollection::removeDocument(arangodb::transaction::Methods* trx, } if (result != TRI_ERROR_NO_ERROR) { - rocksdb::WriteOptions writeOptions; - if (_logicalCollection->waitForSync()) { waitForSync = true; } if (waitForSync) { trx->state()->waitForSync(true); - - // handle waitForSync for single operations here - if (trx->state()->isSingleOperation()) { - writeOptions.sync = true; - } } - - StorageEngine* engine = EngineSelectorFeature::ENGINE; - rocksdb::TransactionDB* db = static_cast(engine)->db(); - db->Write(writeOptions, &writeBatch); } return result; @@ -799,11 +790,16 @@ Result RocksDBCollection::lookupDocumentToken(transaction::Methods* trx, return outToken.revisionId() > 0 ? Result() : Result(TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND); } -void RocksDBCollection::lookupRevisionVPack(TRI_voc_rid_t revisionId, transaction::Methods* trx,arangodb::ManagedDocumentResult& result){ - auto key = RocksDBKey::Document(_objectId,revisionId); +void RocksDBCollection::lookupRevisionVPack(TRI_voc_rid_t revisionId, transaction::Methods* trx,arangodb::ManagedDocumentResult& result) { + LOG_TOPIC(ERR, Logger::FIXME) << "LOOKING UP DOCUMENT: " << _objectId << ", REV: " << revisionId; + auto key = RocksDBKey::Document(_objectId, revisionId); std::string value; TRI_ASSERT(value.data()); auto* state = toRocksTransactionState(trx); - state->rocksTransaction()->Get(state->readOptions(), key.string(), &value); - result.setManaged(std::move(value), revisionId); + rocksdb::Status status = state->rocksTransaction()->Get(state->readOptions(), key.string(), &value); + + if (status.ok()) { + LOG_TOPIC(ERR, Logger::FIXME) << "FOUND"; + result.setManaged(std::move(value), revisionId); + } } diff --git a/arangod/RocksDBEngine/RocksDBEdgeIndex.cpp b/arangod/RocksDBEngine/RocksDBEdgeIndex.cpp index 821e77c46f..f02d7de907 100644 --- a/arangod/RocksDBEngine/RocksDBEdgeIndex.cpp +++ b/arangod/RocksDBEngine/RocksDBEdgeIndex.cpp @@ -88,8 +88,8 @@ bool RocksDBEdgeIndexIterator::next(TokenCallback const& cb, size_t limit) { } // aquire rocksdb transaction - RocksDBTransactionState *state = rocksutils::toRocksTransactionState(_trx); - rocksdb::Transaction *rtrx = state->rocksTransaction(); + RocksDBTransactionState* state = rocksutils::toRocksTransactionState(_trx); + rocksdb::Transaction* rtrx = state->rocksTransaction(); auto rocksColl = RocksDBCollection::toRocksDBCollection(_collection); while (limit > 0) { @@ -254,8 +254,8 @@ int RocksDBEdgeIndex::remove(transaction::Methods* trx, primaryKey.copyString()); // aquire rocksdb transaction - RocksDBTransactionState *state = rocksutils::toRocksTransactionState(trx); - rocksdb::Transaction *rtrx = state->rocksTransaction(); + RocksDBTransactionState* state = rocksutils::toRocksTransactionState(trx); + rocksdb::Transaction* rtrx = state->rocksTransaction(); rocksdb::Status status = rtrx->Delete(rocksdb::Slice(key.string())); if (status.ok()) { @@ -274,7 +274,7 @@ void RocksDBEdgeIndex::batchInsert( // aquire rocksdb transaction RocksDBTransactionState *state = rocksutils::toRocksTransactionState(trx); - rocksdb::Transaction *rtrx = state->rocksTransaction(); + rocksdb::Transaction* rtrx = state->rocksTransaction(); for (std::pair const& doc : documents) { VPackSlice primaryKey = doc.second.get(StaticStrings::KeyString); diff --git a/arangod/RocksDBEngine/RocksDBTransactionState.cpp b/arangod/RocksDBEngine/RocksDBTransactionState.cpp index e6667d1595..f0418148d3 100644 --- a/arangod/RocksDBEngine/RocksDBTransactionState.cpp +++ b/arangod/RocksDBEngine/RocksDBTransactionState.cpp @@ -44,8 +44,25 @@ using namespace arangodb; -struct RocksDBTransactionData final : public TransactionData { -}; +struct RocksDBTransactionData final : public TransactionData {}; + +RocksDBSavePoint::RocksDBSavePoint(rocksdb::Transaction* trx) + : _trx(trx), _committed(false) {} + +RocksDBSavePoint::~RocksDBSavePoint() { + if (!_committed) { + rollback(); + } +} + +void RocksDBSavePoint::commit() { + _committed = true; // this will prevent the rollback +} + +void RocksDBSavePoint::rollback() { + _trx->RollbackToSavePoint(); + _committed = true; // in order to not roll back again by accident +} /// @brief transaction type RocksDBTransactionState::RocksDBTransactionState(TRI_vocbase_t* vocbase) @@ -74,7 +91,7 @@ int RocksDBTransactionState::beginTransaction(transaction::Hints hints) { StorageEngine* engine = EngineSelectorFeature::ENGINE; rocksdb::TransactionDB* db = static_cast(engine)->db(); - _rocksTransaction.reset(db->BeginTransaction(rocksdb::WriteOptions(), rocksdb::TransactionOptions())); + _rocksTransaction.reset(db->BeginTransaction(_rocksWriteOptions, rocksdb::TransactionOptions())); } else { TRI_ASSERT(_status == transaction::Status::RUNNING); } @@ -109,6 +126,11 @@ int RocksDBTransactionState::commitTransaction(transaction::Methods* activeTrx) if (_nestingLevel == 0) { if (_rocksTransaction != nullptr) { + // set wait for sync flag if required + if (waitForSync()) { + _rocksWriteOptions.sync = true; + } + auto status = _rocksTransaction->Commit(); if (!status.ok()) { diff --git a/arangod/RocksDBEngine/RocksDBTransactionState.h b/arangod/RocksDBEngine/RocksDBTransactionState.h index 809ef58c83..cbaf165f7e 100644 --- a/arangod/RocksDBEngine/RocksDBTransactionState.h +++ b/arangod/RocksDBEngine/RocksDBTransactionState.h @@ -24,7 +24,6 @@ #ifndef ARANGOD_ROCKSDB_ROCKSDB_TRANSACTION_STATE_H #define ARANGOD_ROCKSDB_ROCKSDB_TRANSACTION_STATE_H 1 - #include "Basics/Common.h" #include "Basics/SmallVector.h" #include "StorageEngine/TransactionState.h" @@ -32,10 +31,12 @@ #include "Transaction/Methods.h" #include "VocBase/AccessMode.h" #include "VocBase/voc-types.h" -#include "rocksdb/status.h" -#include "rocksdb/options.h" -struct TRI_vocbase_t; +#include +#include + +struct TRI_vocbase_t; + namespace rocksdb { class Transaction; } @@ -48,6 +49,19 @@ namespace transaction { class Methods; } class TransactionCollection; + +class RocksDBSavePoint { + public: + explicit RocksDBSavePoint(rocksdb::Transaction* trx); + ~RocksDBSavePoint(); + + void commit(); + void rollback(); + + private: + rocksdb::Transaction* _trx; + bool _committed; +}; /// @brief transaction type class RocksDBTransactionState final : public TransactionState { @@ -81,6 +95,7 @@ class RocksDBTransactionState final : public TransactionState { private: std::unique_ptr _rocksTransaction; + rocksdb::WriteOptions _rocksWriteOptions; rocksdb::ReadOptions _rocksReadOptions; bool _hasOperations; }; diff --git a/arangod/VocBase/ManagedDocumentResult.h b/arangod/VocBase/ManagedDocumentResult.h index d3edb0cd12..09c7c64e7b 100644 --- a/arangod/VocBase/ManagedDocumentResult.h +++ b/arangod/VocBase/ManagedDocumentResult.h @@ -24,11 +24,11 @@ #ifndef ARANGOD_VOC_BASE_MANAGED_DOCUMENT_RESULT_H #define ARANGOD_VOC_BASE_MANAGED_DOCUMENT_RESULT_H 1 -#include "velocypack/Slice.h" -#include "velocypack/Buffer.h" -#include "velocypack/velocypack-aliases.h" #include "Basics/Common.h" +#include +#include + namespace arangodb { class ManagedDocumentResult {