From 70a1d2712159f35d82db4d865ea030406949ceb8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Simon=20Gra=CC=88tzer?= Date: Mon, 15 May 2017 17:35:16 +0200 Subject: [PATCH] intermediate commits --- arangod/RocksDBEngine/CMakeLists.txt | 1 + arangod/RocksDBEngine/RocksDBCollection.cpp | 26 +- arangod/RocksDBEngine/RocksDBCommon.h | 13 +- arangod/RocksDBEngine/RocksDBGeoIndex.cpp | 17 +- arangod/RocksDBEngine/RocksDBGeoIndexImpl.cpp | 73 +---- arangod/RocksDBEngine/RocksDBGeoIndexImpl.h | 12 +- arangod/RocksDBEngine/RocksDBMethods.cpp | 128 +++++++++ arangod/RocksDBEngine/RocksDBMethods.h | 114 ++++++++ .../RocksDBTransactionCollection.cpp | 8 + .../RocksDBTransactionCollection.h | 4 +- .../RocksDBEngine/RocksDBTransactionState.cpp | 256 +++++++++--------- .../RocksDBEngine/RocksDBTransactionState.h | 45 ++- 12 files changed, 428 insertions(+), 269 deletions(-) create mode 100644 arangod/RocksDBEngine/RocksDBMethods.cpp create mode 100644 arangod/RocksDBEngine/RocksDBMethods.h diff --git a/arangod/RocksDBEngine/CMakeLists.txt b/arangod/RocksDBEngine/CMakeLists.txt index ec68d50470..1cdc30e951 100644 --- a/arangod/RocksDBEngine/CMakeLists.txt +++ b/arangod/RocksDBEngine/CMakeLists.txt @@ -21,6 +21,7 @@ set(ROCKSDB_SOURCES RocksDBEngine/RocksDBKey.cpp RocksDBEngine/RocksDBKeyBounds.cpp RocksDBEngine/RocksDBLogValue.cpp + RocksDBEngine/RocksDBMethods.cpp RocksDBEngine/RocksDBPrefixExtractor.cpp RocksDBEngine/RocksDBPrimaryIndex.cpp RocksDBEngine/RocksDBReplicationCommon.cpp diff --git a/arangod/RocksDBEngine/RocksDBCollection.cpp b/arangod/RocksDBEngine/RocksDBCollection.cpp index 096a66a58d..22aa65690f 100644 --- a/arangod/RocksDBEngine/RocksDBCollection.cpp +++ b/arangod/RocksDBEngine/RocksDBCollection.cpp @@ -42,6 +42,7 @@ #include "RocksDBEngine/RocksDBEngine.h" #include "RocksDBEngine/RocksDBKey.h" #include "RocksDBEngine/RocksDBLogValue.h" +#include "RocksDBEngine/RocksDBMethods.h" #include "RocksDBEngine/RocksDBPrimaryIndex.h" #include "RocksDBEngine/RocksDBToken.h" #include "RocksDBEngine/RocksDBTransactionCollection.h" @@ -73,11 +74,6 @@ namespace { static std::string const Empty; -static inline rocksdb::Transaction* rocksTransaction( - arangodb::transaction::Methods* trx) { - return static_cast(trx->state()) - ->rocksTransaction(); -} } // namespace RocksDBCollection::RocksDBCollection(LogicalCollection* collection, @@ -625,13 +621,13 @@ void RocksDBCollection::truncate(transaction::Methods* trx, TRI_voc_cid_t cid = _logicalCollection->cid(); RocksDBTransactionState* state = rocksutils::toRocksTransactionState(trx); - rocksdb::Transaction* rtrx = state->rocksTransaction(); + RocksDBMethods *mthd = state->rocksdbMethods(); + //rocksdb::Transaction* rtrx = state->rocksTransaction(); // delete documents RocksDBKeyBounds documentBounds = RocksDBKeyBounds::CollectionDocuments(this->objectId()); - std::unique_ptr iter( - rtrx->GetIterator(state->readOptions())); + std::unique_ptr iter = mthd->NewIterator(); iter->Seek(documentBounds.start()); while (iter->Valid() && cmp->Compare(iter->key(), documentBounds.end()) < 0) { @@ -645,10 +641,9 @@ void RocksDBCollection::truncate(transaction::Methods* trx, // add possible log statement state->prepareOperation(cid, revisionId, StringRef(key), TRI_VOC_DOCUMENT_OPERATION_REMOVE); - rocksdb::Status s = rtrx->Delete(iter->key()); - if (!s.ok()) { - auto converted = convertStatus(s); - THROW_ARANGO_EXCEPTION(converted); + Result r = mthd->Delete(iter->key()); + if (!r.ok()) { + THROW_ARANGO_EXCEPTION(r); } // report size of key RocksDBOperationResult result = @@ -660,11 +655,6 @@ void RocksDBCollection::truncate(transaction::Methods* trx, THROW_ARANGO_EXCEPTION(result); } - // force intermediate commit - if (result.commitRequired()) { - // force commit - } - iter->Next(); } @@ -867,7 +857,7 @@ int RocksDBCollection::insert(arangodb::transaction::Methods* trx, RocksDBTransactionState* state = toRocksTransactionState(trx); - RocksDBSavePoint guard(rocksTransaction(trx), + RocksDBSavePoint guard(rocksutils::toRocksMethods(trx), trx->isSingleOperationTransaction(), [&state]() { state->resetLogState(); }); diff --git a/arangod/RocksDBEngine/RocksDBCommon.h b/arangod/RocksDBEngine/RocksDBCommon.h index 5aaf5c9e8d..9c25c00a5e 100644 --- a/arangod/RocksDBEngine/RocksDBCommon.h +++ b/arangod/RocksDBEngine/RocksDBCommon.h @@ -49,30 +49,27 @@ namespace arangodb { class RocksDBOperationResult : public Result { public: - explicit RocksDBOperationResult() : Result(), _keySize(0), _commitRequired(false) {} + explicit RocksDBOperationResult() : Result(), _keySize(0) {} RocksDBOperationResult(Result const& other) - : _keySize(0), _commitRequired(false) { + : _keySize(0) { cloneData(other); } - RocksDBOperationResult(Result&& other) : _keySize(0), _commitRequired(false) { + RocksDBOperationResult(Result&& other) : _keySize(0) { cloneData(std::move(other)); } uint64_t keySize() const { return _keySize; } void keySize(uint64_t s) { _keySize = s; } - bool commitRequired() const { return _commitRequired; } - void commitRequired(bool cr) { _commitRequired = cr; } - protected: uint64_t _keySize; - bool _commitRequired; }; class TransactionState; class RocksDBTransactionState; +class RocksDBMethods; class RocksDBKeyBounds; class RocksDBEngine; namespace transaction { @@ -93,6 +90,8 @@ std::pair>> stripObjectIds( VPackSlice const& inputSlice, bool checkBeforeCopy = true); RocksDBTransactionState* toRocksTransactionState(transaction::Methods* trx); +RocksDBMethods* toRocksMethods(transaction::Methods* trx); + rocksdb::TransactionDB* globalRocksDB(); RocksDBEngine* globalRocksEngine(); arangodb::Result globalRocksDBPut( diff --git a/arangod/RocksDBEngine/RocksDBGeoIndex.cpp b/arangod/RocksDBEngine/RocksDBGeoIndex.cpp index b015cb9527..2f7d26c9bd 100644 --- a/arangod/RocksDBEngine/RocksDBGeoIndex.cpp +++ b/arangod/RocksDBEngine/RocksDBGeoIndex.cpp @@ -476,10 +476,7 @@ int RocksDBGeoIndex::internalInsert(TRI_voc_rid_t revisionId, int RocksDBGeoIndex::insert(transaction::Methods* trx, TRI_voc_rid_t revisionId, VPackSlice const& doc, bool isRollback) { // acquire rocksdb transaction - RocksDBTransactionState* state = rocksutils::toRocksTransactionState(trx); - rocksdb::Transaction* rtrx = state->rocksTransaction(); - - GeoIndex_setRocksTransaction(_geoIndex, rtrx); + GeoIndex_setRocksMethods(_geoIndex, rocksutils::toRocksMethods(trx)); int res = this->internalInsert(revisionId, doc); GeoIndex_clearRocks(_geoIndex); return res; @@ -488,7 +485,7 @@ int RocksDBGeoIndex::insert(transaction::Methods* trx, TRI_voc_rid_t revisionId, int RocksDBGeoIndex::insertRaw(rocksdb::WriteBatchWithIndex* batch, TRI_voc_rid_t revisionId, arangodb::velocypack::Slice const& doc) { - GeoIndex_setRocksBatch(_geoIndex, batch); + GeoIndex_setRocksMethods(_geoIndex, rocksutils::toRocksMethods(trx)); int res = this->internalInsert(revisionId, doc); GeoIndex_clearRocks(_geoIndex); return res; @@ -556,11 +553,9 @@ int RocksDBGeoIndex::internalRemove(TRI_voc_rid_t revisionId, int RocksDBGeoIndex::remove(transaction::Methods* trx, TRI_voc_rid_t revisionId, VPackSlice const& doc, bool isRollback) { - // acquire rocksdb transaction - RocksDBTransactionState* state = rocksutils::toRocksTransactionState(trx); - rocksdb::Transaction* rtrx = state->rocksTransaction(); - - GeoIndex_setRocksTransaction(_geoIndex, rtrx); + // acquire rocksdb methods + RocksDBMethods *methods = rocksutils::toRocksMethods(trx); + GeoIndex_setRocksMethods(_geoIndex, rocksutils::toRocksMethods(trx)); int res = this->internalRemove(revisionId, doc); GeoIndex_clearRocks(_geoIndex); return res; @@ -569,7 +564,7 @@ int RocksDBGeoIndex::remove(transaction::Methods* trx, TRI_voc_rid_t revisionId, int RocksDBGeoIndex::removeRaw(rocksdb::WriteBatchWithIndex* batch, TRI_voc_rid_t revisionId, arangodb::velocypack::Slice const& doc) { - GeoIndex_setRocksBatch(_geoIndex, batch); + GeoIndex_setRocksMethods(_geoIndex, rocksutils::toRocksMethods(trx)); int res = this->internalRemove(revisionId, doc); GeoIndex_clearRocks(_geoIndex); return res; diff --git a/arangod/RocksDBEngine/RocksDBGeoIndexImpl.cpp b/arangod/RocksDBEngine/RocksDBGeoIndexImpl.cpp index 866ceea097..4615244f37 100644 --- a/arangod/RocksDBEngine/RocksDBGeoIndexImpl.cpp +++ b/arangod/RocksDBEngine/RocksDBGeoIndexImpl.cpp @@ -28,9 +28,8 @@ #include #include -#include -#include -#include +#include "RocksDBEngine/RocksDBGeoIndexImpl.h" +#include "RocksDBEngine/RocksDBMethods.h" /* Radius of the earth used for distances */ #define EARTHRADIAN 6371000.0 @@ -132,8 +131,7 @@ typedef struct { GeoIndexFixed fixed; /* fixed point data */ int nextFreePot; /* pots allocated */ int nextFreeSlot; /* slots allocated */ - rocksdb::Transaction *rocksTransaction; - rocksdb::WriteBatchWithIndex *rocksBatch; + RocksDBMethods *rocksMethods; //GeoPot* ypots; /* the pots themselves */// gone //GeoCoordinate* gxc; /* the slots themselves */// gone //size_t _memoryUsed; /* the amount of memory currently used */// gone @@ -271,40 +269,20 @@ namespace arangodb { namespace rocksdbengine { /* CRUD interface */ - -void GeoIndex_setRocksTransaction(GeoIdx* gi, - rocksdb::Transaction* trx) { - GeoIx* gix = (GeoIx*)gi; - gix->rocksTransaction = trx; -} -void GeoIndex_setRocksBatch(GeoIdx* gi, - rocksdb::WriteBatchWithIndex* wb) { +void GeoIndex_setRocksMethods(GeoIdx* gi, RocksDBMethods* trx) { GeoIx* gix = (GeoIx*)gi; - gix->rocksBatch = wb; + gix->rocksMethods = trx; } void GeoIndex_clearRocks(GeoIdx* gi) { GeoIx* gix = (GeoIx*)gi; - gix->rocksTransaction = nullptr; - gix->rocksBatch = nullptr; + gix->rocksMethods = nullptr; } inline void RocksRead(GeoIx * gix, RocksDBKey const& key, std::string *val) { - rocksdb::Status s; - rocksdb::ReadOptions opts; - if (gix->rocksTransaction != nullptr) { - s = gix->rocksTransaction->Get(opts, key.string(), val); - } else { - rocksdb::TransactionDB *db = rocksutils::globalRocksDB(); - if (gix->rocksBatch != nullptr) { - s = gix->rocksBatch->GetFromBatchAndDB(db, opts, key.string(), val); - } else { - s = db->Get(opts, key.string(), val); - } - } - if (!s.ok()) { - arangodb::Result r = rocksutils::convertStatus(s, rocksutils::index); + arangodb::Result r = gix->rocksMethods->Get(key, val); + if (!r.ok()) { THROW_ARANGO_EXCEPTION_MESSAGE(r.errorNumber(), r.errorMessage()); } } @@ -312,39 +290,15 @@ inline void RocksRead(GeoIx * gix, RocksDBKey const& key, std::string *val) { inline void RocksWrite(GeoIx * gix, RocksDBKey const& key, rocksdb::Slice const& slice) { - rocksdb::Status s; - if (gix->rocksTransaction != nullptr) { - s = gix->rocksTransaction->Put(key.string(), slice); - } else { - rocksdb::TransactionDB *db = rocksutils::globalRocksDB(); - if (gix->rocksBatch != nullptr) { - gix->rocksBatch->Put(key.string(), slice); - } else { - rocksdb::WriteOptions opts; - s = db->Put(opts, key.string(), slice); - } - } - if (!s.ok()) { - arangodb::Result r = rocksutils::convertStatus(s, rocksutils::index); + arangodb::Result r = gix->rocksMethods->Put(key, slice); + if (!r.ok()) { THROW_ARANGO_EXCEPTION_MESSAGE(r.errorNumber(), r.errorMessage()); } } inline void RocksDelete(GeoIx* gix, RocksDBKey const& key) { - rocksdb::Status s; - if (gix->rocksTransaction != nullptr) { - s = gix->rocksTransaction->Delete(key.string()); - } else { - rocksdb::TransactionDB *db = rocksutils::globalRocksDB(); - if (gix->rocksBatch != nullptr) { - gix->rocksBatch->Delete(key.string()); - } else { - rocksdb::WriteOptions opts; - s = db->Delete(opts, key.string()); - } - } - if (!s.ok()) { - arangodb::Result r = rocksutils::convertStatus(s, rocksutils::index); + arangodb::Result r = gix->rocksMethods->Delete(key); + if (!r.ok()) { THROW_ARANGO_EXCEPTION_MESSAGE(r.errorNumber(), r.errorMessage()); } } @@ -466,8 +420,7 @@ GeoIdx* GeoIndex_new(uint64_t objectId, return (GeoIdx*)gix; } // need to set these to null - gix->rocksTransaction = nullptr; - gix->rocksBatch = nullptr; + gix->rocksMethods = nullptr; /* set up the fixed points structure */ diff --git a/arangod/RocksDBEngine/RocksDBGeoIndexImpl.h b/arangod/RocksDBEngine/RocksDBGeoIndexImpl.h index 6ae10a33d4..c8265fe9d4 100644 --- a/arangod/RocksDBEngine/RocksDBGeoIndexImpl.h +++ b/arangod/RocksDBEngine/RocksDBGeoIndexImpl.h @@ -30,12 +30,9 @@ #include "Basics/Common.h" #include -namespace rocksdb { - class Transaction; - class WriteBatchWithIndex; -} - -namespace arangodb { namespace rocksdbengine { +namespace arangodb { +struct RocksDBMethods; +namespace rocksdbengine { /* first the things that a user might want to change */ @@ -115,8 +112,7 @@ void GeoIndex_INDEXDUMP(GeoIdx* gi, FILE* f); int GeoIndex_INDEXVALID(GeoIdx* gi); #endif -void GeoIndex_setRocksTransaction(GeoIdx* gi, rocksdb::Transaction*); -void GeoIndex_setRocksBatch(GeoIdx* gi, rocksdb::WriteBatchWithIndex*); +void GeoIndex_setRocksMethods(GeoIdx* gi, RocksDBMethods*); void GeoIndex_clearRocks(GeoIdx* gi); }} diff --git a/arangod/RocksDBEngine/RocksDBMethods.cpp b/arangod/RocksDBEngine/RocksDBMethods.cpp new file mode 100644 index 0000000000..3f719edd99 --- /dev/null +++ b/arangod/RocksDBEngine/RocksDBMethods.cpp @@ -0,0 +1,128 @@ +//////////////////////////////////////////////////////////////////////////////// +/// DISCLAIMER +/// +/// Copyright 2017 ArangoDB GmbH, Cologne, Germany +/// +/// Licensed under the Apache License, Version 2.0 (the "License"); +/// you may not use this file except in compliance with the License. +/// You may obtain a copy of the License at +/// +/// http://www.apache.org/licenses/LICENSE-2.0 +/// +/// Unless required by applicable law or agreed to in writing, software +/// distributed under the License is distributed on an "AS IS" BASIS, +/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +/// See the License for the specific language governing permissions and +/// limitations under the License. +/// +/// Copyright holder is ArangoDB GmbH, Cologne, Germany +/// +/// @author Simon Grätzer +//////////////////////////////////////////////////////////////////////////////// + +#include "RocksDBMethods.h" +#include "RocksDBEngine/RocksDBCommon.h" +#include "RocksDBEngine/RocksDBTransactionState.h" + +#include +#include +#include +#include +#include +#include + +using namespace arangodb; + +// ================= RocksDBSavePoint ================== + +RocksDBSavePoint::RocksDBSavePoint( + RocksDBMethods* trx, bool handled, + std::function const& rollbackCallback) + : _trx(trx), _rollbackCallback(rollbackCallback), _handled(handled) { + TRI_ASSERT(trx != nullptr); + if (!_handled) { + _trx->SetSavePoint(); + } +} + +RocksDBSavePoint::~RocksDBSavePoint() { + if (!_handled) { + rollback(); + } +} + +void RocksDBSavePoint::commit() { + // note: _handled may already be true here + _handled = true; // this will prevent the rollback +} + +void RocksDBSavePoint::rollback() { + TRI_ASSERT(!_handled); + _trx->RollbackToSavePoint(); + _handled = true; // in order to not roll back again by accident + _rollbackCallback(); +} + +// =================== RocksDBReadOnlyMethods ==================== + +RocksDBReadOnlyMethods::RocksDBReadOnlyMethods(RocksDBTransactionState* state) + : _state(state) { + _db = rocksutils::globalRocksDB(); +} + +arangodb::Result RocksDBReadOnlyMethods::Get(RocksDBKey const& key, + std::string* val) { + rocksdb::Status s = _db->Get(_state->_rocksReadOptions, key.string(), val); + return s.ok() ? arangodb::Result() : rocksutils::convertStatus(s); +} + +arangodb::Result RocksDBReadOnlyMethods::Put(RocksDBKey const& key, + rocksdb::Slice const& val) { + THROW_ARANGO_EXCEPTION(TRI_ERROR_ARANGO_READ_ONLY); +} + +arangodb::Result RocksDBReadOnlyMethods::Delete(RocksDBKey const& key) { + THROW_ARANGO_EXCEPTION(TRI_ERROR_ARANGO_READ_ONLY); +} + +std::unique_ptr RocksDBReadOnlyMethods::NewIterator() { + return std::unique_ptr( + _db->NewIterator(_state->_rocksReadOptions)); +} + +// =================== RocksDBTrxMethods ==================== + +RocksDBTrxMethods::RocksDBTrxMethods(RocksDBTransactionState* state) + : _state(state) {} + +arangodb::Result RocksDBTrxMethods::Get(RocksDBKey const& key, + std::string* val) { + rocksdb::Status s = _state->_rocksTransaction->Get(_state->_rocksReadOptions, + key.string(), val); + return s.ok() ? arangodb::Result() : rocksutils::convertStatus(s); +} + +arangodb::Result RocksDBTrxMethods::Put(RocksDBKey const& key, + rocksdb::Slice const& val) { + rocksdb::Status s = _state->_rocksTransaction->Put(key.string(), val); + return s.ok() ? arangodb::Result() : rocksutils::convertStatus(s); +} + +arangodb::Result RocksDBTrxMethods::Delete(RocksDBKey const& key) { + rocksdb::Status s = _state->_rocksTransaction->Delete(key.string()); + return s.ok() ? arangodb::Result() : rocksutils::convertStatus(s); +} + +std::unique_ptr RocksDBTrxMethods::NewIterator() { + return std::unique_ptr( + _state->_rocksTransaction->GetIterator(_state->_rocksReadOptions)); +} + +void RocksDBTrxMethods::SetSavePoint() { + _state->_rocksTransaction->SetSavePoint(); +} + +arangodb::Result RocksDBTrxMethods::RollbackToSavePoint() { + return rocksutils::convertStatus( + _state->_rocksTransaction->RollbackToSavePoint()); +} diff --git a/arangod/RocksDBEngine/RocksDBMethods.h b/arangod/RocksDBEngine/RocksDBMethods.h new file mode 100644 index 0000000000..071afb3838 --- /dev/null +++ b/arangod/RocksDBEngine/RocksDBMethods.h @@ -0,0 +1,114 @@ +//////////////////////////////////////////////////////////////////////////////// +/// DISCLAIMER +/// +/// Copyright 2017 ArangoDB GmbH, Cologne, Germany +/// +/// Licensed under the Apache License, Version 2.0 (the "License"); +/// you may not use this file except in compliance with the License. +/// You may obtain a copy of the License at +/// +/// http://www.apache.org/licenses/LICENSE-2.0 +/// +/// Unless required by applicable law or agreed to in writing, software +/// distributed under the License is distributed on an "AS IS" BASIS, +/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +/// See the License for the specific language governing permissions and +/// limitations under the License. +/// +/// Copyright holder is ArangoDB GmbH, Cologne, Germany +/// +/// @author Simon Grätzer +//////////////////////////////////////////////////////////////////////////////// + +#ifndef ARANGOD_ROCKSDB_ROCKSDB_METHODS_H +#define ARANGOD_ROCKSDB_ROCKSDB_METHODS_H 1 + +#include "Basics/Result.h" + +namespace rocksdb { +class Transaction; +class Slice; +class Iterator; +class TransactionDB; +} // namespace rocksdb + +namespace arangodb { + +class RocksDBKey; +class RocksDBMethods; +class RocksDBTransactionState; + +class RocksDBSavePoint { + public: + RocksDBSavePoint(RocksDBMethods* trx, bool handled, + std::function const& rollbackCallback); + ~RocksDBSavePoint(); + + void commit(); + + private: + void rollback(); + + private: + RocksDBMethods* _trx; + std::function const _rollbackCallback; + bool _handled; +}; + +class RocksDBMethods { + public: + // RocksDBOperations(rocksdb::ReadOptions ro, rocksdb::WriteOptions wo) : + // _readOptions(ro), _writeOptions(wo) {} + virtual ~RocksDBMethods() {} + virtual arangodb::Result Get(RocksDBKey const&, std::string*) = 0; + virtual arangodb::Result Put(RocksDBKey const&, rocksdb::Slice const&) = 0; + // virtual arangodb::Result Merge(RocksDBKey const&, rocksdb::Slice const&) = + // 0; + virtual arangodb::Result Delete(RocksDBKey const&) = 0; + virtual std::unique_ptr NewIterator() = 0; + + virtual void SetSavePoint() = 0; + virtual arangodb::Result RollbackToSavePoint() = 0; +}; + +class RocksDBReadOnlyMethods : public RocksDBMethods { + public: + RocksDBReadOnlyMethods(RocksDBTransactionState* state); + + arangodb::Result Get(RocksDBKey const& key, std::string* val) override; + + arangodb::Result Put(RocksDBKey const& key, + rocksdb::Slice const& val) override; + arangodb::Result Delete(RocksDBKey const& key) override; + std::unique_ptr NewIterator() override; + + void SetSavePoint() override {} + arangodb::Result RollbackToSavePoint() override { return arangodb::Result(); } + + private: + RocksDBTransactionState* _state; + rocksdb::TransactionDB* _db; +}; + +/// transactional operations +class RocksDBTrxMethods : public RocksDBMethods { + public: + RocksDBTrxMethods(RocksDBTransactionState* state); + + arangodb::Result Get(RocksDBKey const& key, std::string* val) override; + + arangodb::Result Put(RocksDBKey const& key, + rocksdb::Slice const& val) override; + arangodb::Result Delete(RocksDBKey const& key) override; + std::unique_ptr NewIterator() override; + + void SetSavePoint() override; + arangodb::Result RollbackToSavePoint() override; + + private: + RocksDBTransactionState* _state; +}; + +} // namespace arangodb + +#endif diff --git a/arangod/RocksDBEngine/RocksDBTransactionCollection.cpp b/arangod/RocksDBEngine/RocksDBTransactionCollection.cpp index 3e8e23e85e..37416dec94 100644 --- a/arangod/RocksDBEngine/RocksDBTransactionCollection.cpp +++ b/arangod/RocksDBEngine/RocksDBTransactionCollection.cpp @@ -258,6 +258,14 @@ void RocksDBTransactionCollection::addOperation( _operationSize += operationSize; } +void RocksDBTransactionCollection::commitCounts() { + _initialNumberDocuments = _numInserts - _numRemoves; + _operationSize = 0; + _numInserts = 0; + _numUpdates = 0; + _numRemoves = 0; +} + /// @brief lock a collection int RocksDBTransactionCollection::doLock(AccessMode::Type type, int nestingLevel) { if (!AccessMode::isWriteOrExclusive(type)) { diff --git a/arangod/RocksDBEngine/RocksDBTransactionCollection.h b/arangod/RocksDBEngine/RocksDBTransactionCollection.h index 5b04c4d320..3e0af84139 100644 --- a/arangod/RocksDBEngine/RocksDBTransactionCollection.h +++ b/arangod/RocksDBEngine/RocksDBTransactionCollection.h @@ -76,8 +76,8 @@ class RocksDBTransactionCollection final : public TransactionCollection { uint64_t numRemoves() const { return _numRemoves; } /// @brief add an operation for a transaction collection - void addOperation(TRI_voc_document_operation_e operationType, uint64_t operationSize, TRI_voc_rid_t revisionId) ; - void resetCounts(); + void addOperation(TRI_voc_document_operation_e operationType, uint64_t operationSize, TRI_voc_rid_t revisionId); + void commitCounts(); private: /// @brief request a lock for a collection diff --git a/arangod/RocksDBEngine/RocksDBTransactionState.cpp b/arangod/RocksDBEngine/RocksDBTransactionState.cpp index f26611789a..640899485f 100644 --- a/arangod/RocksDBEngine/RocksDBTransactionState.cpp +++ b/arangod/RocksDBEngine/RocksDBTransactionState.cpp @@ -34,6 +34,7 @@ #include "RocksDBEngine/RocksDBCounterManager.h" #include "RocksDBEngine/RocksDBEngine.h" #include "RocksDBEngine/RocksDBLogValue.h" +#include "RocksDBEngine/RocksDBMethods.h" #include "RocksDBEngine/RocksDBTransactionCollection.h" #include "StorageEngine/EngineSelectorFeature.h" #include "StorageEngine/StorageEngine.h" @@ -56,34 +57,6 @@ using namespace arangodb; // for the RocksDB engine we do not need any additional data struct RocksDBTransactionData final : public TransactionData {}; -RocksDBSavePoint::RocksDBSavePoint( - rocksdb::Transaction* trx, bool handled, - std::function const& rollbackCallback) - : _trx(trx), _rollbackCallback(rollbackCallback), _handled(handled) { - TRI_ASSERT(trx != nullptr); - if (!_handled) { - _trx->SetSavePoint(); - } -} - -RocksDBSavePoint::~RocksDBSavePoint() { - if (!_handled) { - rollback(); - } -} - -void RocksDBSavePoint::commit() { - // note: _handled may already be true here - _handled = true; // this will prevent the rollback -} - -void RocksDBSavePoint::rollback() { - TRI_ASSERT(!_handled); - _trx->RollbackToSavePoint(); - _handled = true; // in order to not roll back again by accident - _rollbackCallback(); -} - /// @brief transaction type RocksDBTransactionState::RocksDBTransactionState( TRI_vocbase_t* vocbase, uint64_t maxTransSize, @@ -92,7 +65,6 @@ RocksDBTransactionState::RocksDBTransactionState( : TransactionState(vocbase), _rocksReadOptions(), _cacheTx(nullptr), - _transactionSize(0), _maxTransactionSize(maxTransSize), _intermediateTransactionSize(intermediateTransactionSize), _intermediateTransactionNumber(intermediateTransactionNumber), @@ -156,25 +128,14 @@ Result RocksDBTransactionState::beginTransaction(transaction::Hints hints) { // start cache transaction _cacheTx = CacheManagerFeature::MANAGER->beginTransaction(isReadOnlyTransaction()); - - // start rocks transaction - StorageEngine* engine = EngineSelectorFeature::ENGINE; - rocksdb::TransactionDB* db = static_cast(engine)->db(); - _rocksTransaction.reset(db->BeginTransaction( - _rocksWriteOptions, rocksdb::TransactionOptions())); - _rocksTransaction->SetSnapshot(); - _rocksReadOptions.snapshot = _rocksTransaction->GetSnapshot(); - _rocksReadOptions.prefix_same_as_start = true; - - if (!isReadOnlyTransaction() && - !hasHint(transaction::Hints::Hint::SINGLE_OPERATION)) { - RocksDBLogValue header = - RocksDBLogValue::BeginTransaction(_vocbase->id(), _id); - _rocksTransaction->PutLogData(header.slice()); -#ifdef ARANGODB_ENABLE_MAINTAINER_MODE - ++_numLogdata; -#endif + + if (isReadOnlyTransaction()) { + _rocksMethods.reset(new RocksDBReadOnlyMethods()); + } else { + createTransaction(); + _rocksMethods.reset(new RocksDBTrxMethods(this)); } + } else { TRI_ASSERT(_status == transaction::Status::RUNNING); } @@ -182,6 +143,102 @@ Result RocksDBTransactionState::beginTransaction(transaction::Hints hints) { return result; } +void RocksDBTransactionState::createTransaction() { + TRI_ASSERT(!_rocksTransaction); + // TODO intermediates + + // start rocks transaction + rocksdb::TransactionDB* db = rocksutils::globalRocksDB(); + _rocksTransaction.reset(db->BeginTransaction( + _rocksWriteOptions, rocksdb::TransactionOptions())); + _rocksTransaction->SetSnapshot(); + _rocksReadOptions.snapshot = _rocksTransaction->GetSnapshot(); + _rocksReadOptions.prefix_same_as_start = true; + + if (!hasHint(transaction::Hints::Hint::SINGLE_OPERATION)) { + RocksDBLogValue header = + RocksDBLogValue::BeginTransaction(_vocbase->id(), _id); + _rocksTransaction->PutLogData(header.slice()); +#ifdef ARANGODB_ENABLE_MAINTAINER_MODE + ++_numLogdata; +#endif + } +} + +arangodb::Result RocksDBTransactionState::internalCommit() { + TRI_ASSERT(_rocksTransaction != nullptr); + + arangodb::Result result; + if (_rocksTransaction->GetNumKeys() > 0) { + // set wait for sync flag if required + if (waitForSync()) { + _rocksWriteOptions.sync = true; + _rocksTransaction->SetWriteOptions(_rocksWriteOptions); + } + + // double t1 = TRI_microtime(); + result = rocksutils::convertStatus(_rocksTransaction->Commit()); + // double t2 = TRI_microtime(); + // if (t2 - t1 > 0.25) { + // LOG_TOPIC(ERR, Logger::FIXME) + // << "COMMIT TOOK: " << (t2 - t1) + // << " S. NUMINSERTS: " << _numInserts + // << ", NUMUPDATES: " << _numUpdates + // << ", NUMREMOVES: " << _numRemoves + // << ", TRANSACTIONSIZE: " << _transactionSize; + // } + rocksdb::SequenceNumber latestSeq = + rocksutils::globalRocksDB()->GetLatestSequenceNumber(); + if (!result.ok()) { + return result; + } + + if (_cacheTx != nullptr) { + // note: endTransaction() will delete _cacheTx! + CacheManagerFeature::MANAGER->endTransaction(_cacheTx); + _cacheTx = nullptr; + } + + for (auto& trxCollection : _collections) { + RocksDBTransactionCollection* collection = + static_cast(trxCollection); + int64_t adjustment = + collection->numInserts() - collection->numRemoves(); + + if (collection->numInserts() != 0 || collection->numRemoves() != 0 || + collection->revision() != 0) { + RocksDBCollection* coll = static_cast( + trxCollection->collection()->getPhysical()); + coll->adjustNumberDocuments(adjustment); + coll->setRevision(collection->revision()); + RocksDBEngine* engine = + static_cast(EngineSelectorFeature::ENGINE); + + RocksDBCounterManager::CounterAdjustment update( + latestSeq, collection->numInserts(), collection->numRemoves(), + collection->revision()); + engine->counterManager()->updateCounter(coll->objectId(), update); + } + + // we need this in case of an intermediate commit. The number of + // initial documents is adjusted and numInserts / removes is set to 0 + collection->commitCounts(); + } + } else { + // don't write anything if the transaction is empty + result = rocksutils::convertStatus(_rocksTransaction->Rollback()); + + if (_cacheTx != nullptr) { + // note: endTransaction() will delete _cacheTx! + CacheManagerFeature::MANAGER->endTransaction(_cacheTx); + _cacheTx = nullptr; + } + } + + _rocksTransaction.reset(); + return result; +} + /// @brief commit a transaction Result RocksDBTransactionState::commitTransaction( transaction::Methods* activeTrx) { @@ -193,86 +250,20 @@ Result RocksDBTransactionState::commitTransaction( return Result(TRI_ERROR_DEBUG); } - arangodb::Result result; - + arangodb::Result res; if (_nestingLevel == 0) { if (_rocksTransaction != nullptr) { - // if (hasOperations()) { - if (_rocksTransaction->GetNumKeys() > 0) { - // set wait for sync flag if required - if (waitForSync()) { - _rocksWriteOptions.sync = true; - _rocksTransaction->SetWriteOptions(_rocksWriteOptions); - } - - // TODO wait for response on github issue to see how we can use the - // sequence number - // double t1 = TRI_microtime(); - result = rocksutils::convertStatus(_rocksTransaction->Commit()); - - // double t2 = TRI_microtime(); - // if (t2 - t1 > 0.25) { - // LOG_TOPIC(ERR, Logger::FIXME) - // << "COMMIT TOOK: " << (t2 - t1) - // << " S. NUMINSERTS: " << _numInserts - // << ", NUMUPDATES: " << _numUpdates - // << ", NUMREMOVES: " << _numRemoves - // << ", TRANSACTIONSIZE: " << _transactionSize; - // } - rocksdb::SequenceNumber latestSeq = - rocksutils::globalRocksDB()->GetLatestSequenceNumber(); - if (!result.ok()) { - abortTransaction(activeTrx); - return result; - } - - if (_cacheTx != nullptr) { - // note: endTransaction() will delete _cacheTx! - CacheManagerFeature::MANAGER->endTransaction(_cacheTx); - _cacheTx = nullptr; - } - - for (auto& trxCollection : _collections) { - RocksDBTransactionCollection* collection = - static_cast(trxCollection); - int64_t adjustment = - collection->numInserts() - collection->numRemoves(); - - if (collection->numInserts() != 0 || collection->numRemoves() != 0 || - collection->revision() != 0) { - RocksDBCollection* coll = static_cast( - trxCollection->collection()->getPhysical()); - coll->adjustNumberDocuments(adjustment); - coll->setRevision(collection->revision()); - RocksDBEngine* engine = - static_cast(EngineSelectorFeature::ENGINE); - - RocksDBCounterManager::CounterAdjustment update( - latestSeq, collection->numInserts(), collection->numRemoves(), - collection->revision()); - engine->counterManager()->updateCounter(coll->objectId(), update); - } - } - } else { - // don't write anything if the transaction is empty - result = rocksutils::convertStatus(_rocksTransaction->Rollback()); - - if (_cacheTx != nullptr) { - // note: endTransaction() will delete _cacheTx! - CacheManagerFeature::MANAGER->endTransaction(_cacheTx); - _cacheTx = nullptr; - } + res = internalCommit(); + if (!res.ok()) { + abortTransaction(activeTrx); } - - _rocksTransaction.reset(); } - updateStatus(transaction::Status::COMMITTED); } unuseCollections(_nestingLevel); - return result; + return res; } /// @brief abort and rollback a transaction @@ -385,7 +376,8 @@ RocksDBOperationResult RocksDBTransactionState::addOperation( uint64_t keySize) { RocksDBOperationResult res; - uint64_t newSize = _transactionSize + operationSize + keySize; + size_t currentSize = _rocksTransaction->GetWriteBatch()->GetWriteBatch()->GetDataSize(); + uint64_t newSize = currentSize + operationSize + keySize; if (_maxTransactionSize < newSize) { // we hit the transaction size limit std::string message = @@ -428,9 +420,7 @@ RocksDBOperationResult RocksDBTransactionState::addOperation( break; } - _transactionSize = newSize; auto numOperations = _numInserts + _numUpdates + _numRemoves; - // signal if intermediate commit is required // this will be done if intermediate transactions are enabled // and either the "number of operations" or the "transaction size" @@ -438,25 +428,23 @@ RocksDBOperationResult RocksDBTransactionState::addOperation( if (_intermediateTransactionEnabled && (_intermediateTransactionNumber <= numOperations || _intermediateTransactionSize <= newSize)) { - res.commitRequired(true); + //res.commitRequired(true); + + internalCommit(); + + // TODO perform intermediate commit + } return res; } +RocksDBMethods* RocksDBTransactionState::rocksdbMethods() { + TRI_ASSERT(_rocksMethods); + return _rocksMethods.get(); +} + uint64_t RocksDBTransactionState::sequenceNumber() const { return static_cast( _rocksTransaction->GetSnapshot()->GetSequenceNumber()); } - -/* -class RocksDBBatchTrx : public RocksDBBatch { - arangodb::Result Get(RocksDBKey const&, std::string*) override { - - } - - arangodb::Result Put(RocksDBKey const&, rocksdb::Slice const&) override { - - } - arangodb::Result Delete(RocksDBKey const&) override ; -};*/ diff --git a/arangod/RocksDBEngine/RocksDBTransactionState.h b/arangod/RocksDBEngine/RocksDBTransactionState.h index f867b3f3b7..94c908a526 100644 --- a/arangod/RocksDBEngine/RocksDBTransactionState.h +++ b/arangod/RocksDBEngine/RocksDBTransactionState.h @@ -54,32 +54,14 @@ namespace transaction { class Methods; } class TransactionCollection; - -class RocksDBSavePoint { - public: - RocksDBSavePoint(rocksdb::Transaction* trx, bool handled, std::function const& rollbackCallback); - ~RocksDBSavePoint(); - - void commit(); - - private: - void rollback(); - - private: - rocksdb::Transaction* _trx; - std::function const _rollbackCallback; - bool _handled; -}; +class RocksDBMethods; -/*class RocksDBKey; -struct RocksDBBatch { - virtual arangodb::Result Get(RocksDBKey const&, std::string*) = 0; - virtual arangodb::Result Put(RocksDBKey const&, rocksdb::Slice const&) = 0; - virtual arangodb::Result Delete(RocksDBKey const&) = 0; -};*/ - /// @brief transaction type class RocksDBTransactionState final : public TransactionState { + friend class RocksDBReadOnlyMethods; + friend class RocksDBTrxMethods; + //friend struct RocksDBIntermediateOps; + public: explicit RocksDBTransactionState(TRI_vocbase_t* vocbase, uint64_t maxOperationSize, @@ -122,24 +104,29 @@ class RocksDBTransactionState final : public TransactionState { TRI_voc_document_operation_e operationType, uint64_t operationSize, uint64_t keySize); - rocksdb::Transaction* rocksTransaction() { + RocksDBMethods* rocksdbMethods(); + /*rocksdb::Transaction* rocksTransaction() { TRI_ASSERT(_rocksTransaction != nullptr); return _rocksTransaction.get(); } - rocksdb::ReadOptions const& readOptions() const { return _rocksReadOptions; } - - rocksdb::WriteOptions const& writeOptions() const { return _rocksWriteOptions; } + rocksdb::WriteOptions const& writeOptions() const { return _rocksWriteOptions; }*/ uint64_t sequenceNumber() const; + +private: + + void createTransaction(); + arangodb::Result internalCommit(); private: std::unique_ptr _rocksTransaction; rocksdb::WriteOptions _rocksWriteOptions; rocksdb::ReadOptions _rocksReadOptions; cache::Transaction* _cacheTx; - // current transaction size - uint64_t _transactionSize; + + std::unique_ptr _rocksMethods; + // a transaction may not become bigger than this value uint64_t _maxTransactionSize; // if a transaction gets bigger than this value and intermediate transactions