diff --git a/.gitignore b/.gitignore index 556dc0f601..e5106e9139 100644 --- a/.gitignore +++ b/.gitignore @@ -108,3 +108,5 @@ npm-debug.log log-* data-* cluster-init + +datafile-*.db diff --git a/arangod/RocksDBEngine/RocksDBCollection.cpp b/arangod/RocksDBEngine/RocksDBCollection.cpp index 8fc98e4bad..9b972fe00c 100644 --- a/arangod/RocksDBEngine/RocksDBCollection.cpp +++ b/arangod/RocksDBEngine/RocksDBCollection.cpp @@ -408,6 +408,9 @@ void RocksDBCollection::invokeOnAllElements( void RocksDBCollection::truncate(transaction::Methods* trx, OperationOptions& options) { + // TODO FIXME -- improve transaction size + // TODO FIXME -- intermediate commit + rocksdb::Comparator const* cmp = globalRocksEngine()->cmp(); TRI_voc_cid_t cid = _logicalCollection->cid(); @@ -428,11 +431,20 @@ void RocksDBCollection::truncate(transaction::Methods* trx, THROW_ARANGO_EXCEPTION(converted); } + // transaction size limit reached -- fail TRI_voc_rid_t revisionId = RocksDBKey::revisionId(iter->key()); - auto result = state->addOperation(cid, revisionId, TRI_VOC_DOCUMENT_OPERATION_REMOVE, 0, iter->key().size()); + // report size of key + RocksDBOperationResult result = state->addOperation(cid, revisionId, TRI_VOC_DOCUMENT_OPERATION_REMOVE, 0, iter->key().size()); + if (result.fail()){ THROW_ARANGO_EXCEPTION(result); } + + // force intermediate commit + if(result.commitRequired()){ + // force commit + } + iter->Next(); } @@ -475,14 +487,20 @@ void RocksDBCollection::truncate(transaction::Methods* trx, THROW_ARANGO_EXCEPTION(converted); } - //update size - auto result = state->addOperation(cid, /*ignored revisionId*/ 0, + // report index key size + RocksDBOperationResult result = state->addOperation(cid, /*ignored revisionId*/ 0, TRI_VOC_NOOP_OPERATION_UPDATE_SIZE, 0, iter->key().size() ); + // transaction size limit reached -- fail if (result.fail()){ THROW_ARANGO_EXCEPTION(result); } + + // force intermediate commit + if(result.commitRequired()){ + // force commit + } iter->Next(); } } @@ -537,6 +555,10 @@ int RocksDBCollection::insert(arangodb::transaction::Methods* trx, arangodb::ManagedDocumentResult& mdr, OperationOptions& options, TRI_voc_tick_t& resultMarkerTick, bool /*lock*/) { + + // TODO FIXME -- limit transaction size + // TODO FIXME -- intermediate commit + // store the tick that was used for writing the document // note that we don't need it for this engine resultMarkerTick = 0; @@ -598,9 +620,21 @@ int RocksDBCollection::insert(arangodb::transaction::Methods* trx, return lookupResult.errorNumber(); } - static_cast(trx->state()) + // report document and key size + RocksDBOperationResult result = static_cast(trx->state()) ->addOperation(_logicalCollection->cid(), revisionId, TRI_VOC_DOCUMENT_OPERATION_INSERT, newSlice.byteSize(), res.keySize()); + + // transaction size limit reached -- fail + if(res.fail()){ + THROW_ARANGO_EXCEPTION(res); + } + + // force intermediate commit + if(result.commitRequired()){ + // force commit + } + guard.commit(); } @@ -617,7 +651,7 @@ int RocksDBCollection::update(arangodb::transaction::Methods* trx, TRI_voc_rid_t const& revisionId, arangodb::velocypack::Slice const key) { - // TODO FIXME -- limit transaction size + // TODO FIXME -- intermediate commit resultMarkerTick = 0; RocksDBOperationResult res; @@ -683,22 +717,30 @@ int RocksDBCollection::update(arangodb::transaction::Methods* trx, res = updateDocument(trx, oldRevisionId, oldDoc, revisionId, newDoc, options.waitForSync); - uint64_t keySize = res.keySize(); if (res.ok()) { - Result result = lookupRevisionVPack(revisionId, trx, mdr); + RocksDBOperationResult result = lookupRevisionVPack(revisionId, trx, mdr); if (result.fail()) { return result.errorNumber(); } TRI_ASSERT(!mdr.empty()); - //update as combination of remove/insert?! - //check add operation result!!!! + // report document and key size result = static_cast(trx->state()) ->addOperation(_logicalCollection->cid(), revisionId, - TRI_VOC_DOCUMENT_OPERATION_UPDATE, newDoc.byteSize(), keySize); - //result.fail -> throw + TRI_VOC_DOCUMENT_OPERATION_UPDATE, newDoc.byteSize(), res.keySize()); + + // transaction size limit reached -- fail + if(result.fail()){ + THROW_ARANGO_EXCEPTION(result); + } + + // force intermediate commit + if(result.commitRequired()){ + // force commit + } + guard.commit(); } @@ -714,7 +756,8 @@ int RocksDBCollection::replace( arangodb::velocypack::Slice const toSlice) { resultMarkerTick = 0; - //TODO FIXME -- limit transaction size + // TODO FIXME -- improve transaction size + // TODO FIXME -- intermediate commit Result res; bool const isEdgeCollection = @@ -774,7 +817,7 @@ int RocksDBCollection::replace( RocksDBOperationResult opResult = updateDocument(trx, oldRevisionId, oldDoc, revisionId, VPackSlice(builder->slice()), options.waitForSync); if (opResult.ok()) { - Result result = lookupRevisionVPack(revisionId, trx, mdr); + RocksDBOperationResult result = lookupRevisionVPack(revisionId, trx, mdr); if (!result.ok()) { return result.errorNumber(); @@ -782,10 +825,22 @@ int RocksDBCollection::replace( TRI_ASSERT(!mdr.empty()); - static_cast(trx->state()) + // report document and key size + result = static_cast(trx->state()) ->addOperation(_logicalCollection->cid(), revisionId, TRI_VOC_DOCUMENT_OPERATION_REPLACE, VPackSlice(builder->slice()).byteSize(), opResult.keySize()); + + // transaction size limit reached -- fail + if(result.fail()){ + THROW_ARANGO_EXCEPTION(result); + } + + // force intermediate commit + if(result.commitRequired()){ + // force commit + } + guard.commit(); } @@ -800,7 +855,8 @@ int RocksDBCollection::remove(arangodb::transaction::Methods* trx, TRI_voc_rid_t const& revisionId, TRI_voc_rid_t& prevRev) { - //TODO FIXME -- limit transaction size + // TODO FIXME -- improve transaction size + // TODO FIXME -- intermediate commit // store the tick that was used for writing the document // note that we don't need it for this engine @@ -846,10 +902,20 @@ int RocksDBCollection::remove(arangodb::transaction::Methods* trx, res = removeDocument(trx, oldRevisionId, oldDoc, options.waitForSync); if (res.ok()) { - static_cast(trx->state()) + // report key size + res = static_cast(trx->state()) ->addOperation(_logicalCollection->cid(), revisionId, - TRI_VOC_DOCUMENT_OPERATION_REMOVE, oldDoc.byteSize(), - res.keySize()); + TRI_VOC_DOCUMENT_OPERATION_REMOVE, 0, res.keySize()); + // transaction size limit reached -- fail + if(res.fail()){ + THROW_ARANGO_EXCEPTION(res); + } + + // force intermediate commit + if(res.commitRequired()){ + // force commit + } + guard.commit(); } @@ -1012,6 +1078,8 @@ RocksDBOperationResult RocksDBCollection::insertDocument(arangodb::transaction:: << " INSERT DOCUMENT FAILED. REVISIONID: " << revisionId;*/ Result converted = rocksutils::convertStatus(status, rocksutils::StatusHint::document); res = converted; + + // set keysize that is passed up to the crud operations res.keySize(key.string().size()); return res; } @@ -1151,6 +1219,8 @@ RocksDBOperationResult RocksDBCollection::updateDocument(transaction::Methods* t VPackSlice const& newDoc, bool& waitForSync) { + // keysize in return value is set by insertDocument + // Coordinator doesn't know index internals TRI_ASSERT(trx->state()->isRunning()); TRI_ASSERT(!ServerState::instance()->isCoordinator()); diff --git a/arangod/RocksDBEngine/RocksDBCommon.h b/arangod/RocksDBEngine/RocksDBCommon.h index cd00c8cea5..c1d2c546eb 100644 --- a/arangod/RocksDBEngine/RocksDBCommon.h +++ b/arangod/RocksDBEngine/RocksDBCommon.h @@ -52,24 +52,32 @@ public: RocksDBOperationResult() :Result() ,_keySize(0) + ,_commitRequired(false) {} RocksDBOperationResult(Result const& other) : _keySize(0) + ,_commitRequired(false) { cloneData(other); } RocksDBOperationResult(Result&& other) : _keySize(0) + ,_commitRequired(false) { cloneData(std::move(other)); } uint64_t keySize(){ return _keySize; } uint64_t keySize(uint64_t s ) { _keySize = s; return _keySize; } + + bool commitRequired(){ return _commitRequired; } + bool commitRequired(bool cr ) { _commitRequired = cr; return _commitRequired; } + protected: uint64_t _keySize; + bool _commitRequired; }; class TransactionState; diff --git a/arangod/RocksDBEngine/RocksDBEngine.cpp b/arangod/RocksDBEngine/RocksDBEngine.cpp index 846ac8558d..1e07f21fdd 100644 --- a/arangod/RocksDBEngine/RocksDBEngine.cpp +++ b/arangod/RocksDBEngine/RocksDBEngine.cpp @@ -88,13 +88,20 @@ RocksDBEngine::~RocksDBEngine() { delete _db; } // --------------------------------- // add the storage engine's specifc options to the global list of options -void RocksDBEngine::collectOptions(std::shared_ptr options) { +void RocksDBEngine::collectOptions( + std::shared_ptr options) { options->addSection("rocksdb", "RocksDB engine specific configuration"); - _maxTransactionSize = std::numeric_limits::max(); // set sensible default value here - options->addOption("--rocksdb.max-transaction-size" - ,"transaction size limit" - ,new UInt64Parameter(&_maxTransactionSize) - ); + + // control transaction size for RocksDB engine + _maxTransactionSize = + std::numeric_limits::max(); // set sensible default value here + options->addOption("--rocksdb.max-transaction-size", "transaction size limit", + new UInt64Parameter(&_maxTransactionSize)); + + // control intermediate transactions in RocksDB + _intermediateTransactionSize = (_maxTransactionSize / 5) * 4; // transaction size that will trigger an intermediate commit + _intermediateTransactionNumber = 100 * 1000; // number operation after that a commit will be tried + _intermediateTransactionEnabled = false; } // validate the storage engine's specific options @@ -134,9 +141,9 @@ void RocksDBEngine::start() { _options.comparator = _cmp.get(); // WAL_ttl_seconds needs to be bigger than the sync interval of the count // manager - _options.WAL_ttl_seconds = 15; //(uint64_t)(counter_sync_seconds * 2.0); + _options.WAL_ttl_seconds = 15; //(uint64_t)(counter_sync_seconds * 2.0); // TODO: prefix_extractior + memtable_insert_with_hint_prefix - + rocksdb::Status status = rocksdb::TransactionDB::Open(_options, transactionOptions, _path, &_db); @@ -185,7 +192,9 @@ transaction::ContextData* RocksDBEngine::createTransactionContextData() { TransactionState* RocksDBEngine::createTransactionState( TRI_vocbase_t* vocbase) { - return new RocksDBTransactionState(vocbase, _maxTransactionSize); + return new RocksDBTransactionState( + vocbase, _maxTransactionSize, _intermediateTransactionEnabled, + _intermediateTransactionSize, _intermediateTransactionNumber); } TransactionCollection* RocksDBEngine::createTransactionCollection( @@ -450,15 +459,15 @@ void RocksDBEngine::prepareDropDatabase(TRI_vocbase_t* vocbase, bool useWriteMarker, int& status) { // probably not required // THROW_ARANGO_NOT_YET_IMPLEMENTED(); - - //status = saveDatabaseParameters(vocbase->id(), vocbase->name(), true); + + // status = saveDatabaseParameters(vocbase->id(), vocbase->name(), true); VPackBuilder builder; builder.openObject(); builder.add("id", VPackValue(std::to_string(vocbase->id()))); builder.add("name", VPackValue(vocbase->name())); builder.add("deleted", VPackValue(true)); builder.close(); - + status = writeCreateDatabaseMarker(vocbase->id(), builder.slice()); } @@ -515,11 +524,11 @@ arangodb::Result RocksDBEngine::persistCollection( int res = writeCreateCollectionMarker(vocbase->id(), cid, slice); result.reset(res); - + #ifdef ARANGODB_ENABLE_MAINTAINER_MODE if (result.ok()) { - RocksDBCollection *rcoll = - RocksDBCollection::toRocksDBCollection(collection->getPhysical()); + RocksDBCollection* rcoll = + RocksDBCollection::toRocksDBCollection(collection->getPhysical()); TRI_ASSERT(rcoll->numberDocuments() == 0); } #endif @@ -564,7 +573,7 @@ arangodb::Result RocksDBEngine::dropCollection( return res; // let collection exist so the remaining elements can still be // accessed } - + // delete collection _counterManager->removeCounter(coll->objectId()); auto key = RocksDBKey::Collection(vocbase->id(), collection->cid()); @@ -671,7 +680,6 @@ void RocksDBEngine::signalCleanup(TRI_vocbase_t*) { void RocksDBEngine::iterateDocuments( TRI_voc_tick_t databaseId, TRI_voc_cid_t collectionId, std::function const& cb) { - THROW_ARANGO_NOT_YET_IMPLEMENTED(); } @@ -696,14 +704,14 @@ bool RocksDBEngine::cleanupCompactionBlockers(TRI_vocbase_t* vocbase) { /// @brief insert a compaction blocker int RocksDBEngine::insertCompactionBlocker(TRI_vocbase_t* vocbase, double ttl, TRI_voc_tick_t& id) { - //THROW_ARANGO_NOT_YET_IMPLEMENTED(); + // THROW_ARANGO_NOT_YET_IMPLEMENTED(); return TRI_ERROR_NO_ERROR; } /// @brief touch an existing compaction blocker int RocksDBEngine::extendCompactionBlocker(TRI_vocbase_t* vocbase, TRI_voc_tick_t id, double ttl) { - //THROW_ARANGO_NOT_YET_IMPLEMENTED(); + // THROW_ARANGO_NOT_YET_IMPLEMENTED(); return TRI_ERROR_NO_ERROR; } @@ -784,8 +792,8 @@ Result RocksDBEngine::dropDatabase(TRI_voc_tick_t id) { if (indexes.isArray()) { for (auto const& it : VPackArrayIterator(indexes)) { // delete index documents - uint64_t objectId = basics::VelocyPackHelper::stringUInt64( - it, "objectId"); + uint64_t objectId = + basics::VelocyPackHelper::stringUInt64(it, "objectId"); RocksDBKeyBounds bounds = RocksDBKeyBounds::IndexEntries(objectId); res = rocksutils::removeLargeRange(_db, bounds); if (res.fail()) { diff --git a/arangod/RocksDBEngine/RocksDBEngine.h b/arangod/RocksDBEngine/RocksDBEngine.h index 8381dbf986..0913b22727 100644 --- a/arangod/RocksDBEngine/RocksDBEngine.h +++ b/arangod/RocksDBEngine/RocksDBEngine.h @@ -235,11 +235,11 @@ class RocksDBEngine final : public StorageEngine { arangodb::velocypack::Slice info) override; void addParametersForNewIndex(arangodb::velocypack::Builder& builder, arangodb::velocypack::Slice info) override; - + rocksdb::TransactionDB* db() const { return _db; } RocksDBComparator* cmp() const { return _cmp.get(); } - + int writeCreateCollectionMarker(TRI_voc_tick_t databaseId, TRI_voc_cid_t id, VPackSlice const& slice); @@ -266,6 +266,9 @@ class RocksDBEngine final : public StorageEngine { std::unique_ptr _counterManager; uint64_t _maxTransactionSize; + uint64_t _intermediateTransactionSize; + uint64_t _intermediateTransactionNumber; + bool _intermediateTransactionEnabled; }; } #endif diff --git a/arangod/RocksDBEngine/RocksDBTransactionState.cpp b/arangod/RocksDBEngine/RocksDBTransactionState.cpp index 950bdfb673..07574a587c 100644 --- a/arangod/RocksDBEngine/RocksDBTransactionState.cpp +++ b/arangod/RocksDBEngine/RocksDBTransactionState.cpp @@ -74,16 +74,21 @@ void RocksDBSavePoint::rollback() { } /// @brief transaction type -RocksDBTransactionState::RocksDBTransactionState(TRI_vocbase_t* vocbase, uint64_t maxTransSize) +RocksDBTransactionState::RocksDBTransactionState( + TRI_vocbase_t* vocbase, uint64_t maxTransSize, + bool intermediateTransactionEnabled, uint64_t intermediateTransactionSize, + uint64_t intermediateTransactionNumber) : TransactionState(vocbase), _rocksReadOptions(), _cacheTx(nullptr), _transactionSize(0), _maxTransactionSize(maxTransSize), + _intermediateTransactionSize(intermediateTransactionSize), + _intermediateTransactionNumber(intermediateTransactionNumber), _numInserts(0), _numUpdates(0), - _numRemoves(0) - {} + _numRemoves(0), + _intermediateTransactionEnabled(intermediateTransactionEnabled) {} /// @brief free a transaction container RocksDBTransactionState::~RocksDBTransactionState() { @@ -159,7 +164,9 @@ Result RocksDBTransactionState::commitTransaction( << " transaction"; TRI_ASSERT(_status == transaction::Status::RUNNING); - TRI_IF_FAILURE("TransactionWriteCommitMarker") { return Result(TRI_ERROR_DEBUG); } + TRI_IF_FAILURE("TransactionWriteCommitMarker") { + return Result(TRI_ERROR_DEBUG); + } arangodb::Result result; @@ -262,15 +269,15 @@ Result RocksDBTransactionState::abortTransaction( } /// @brief add an operation for a transaction collection -Result RocksDBTransactionState::addOperation( +RocksDBOperationResult RocksDBTransactionState::addOperation( TRI_voc_cid_t cid, TRI_voc_rid_t revisionId, - TRI_voc_document_operation_e operationType, - uint64_t operationSize, uint64_t keySize) { - Result res; + TRI_voc_document_operation_e operationType, uint64_t operationSize, + uint64_t keySize) { + RocksDBOperationResult res; uint64_t newSize = _transactionSize + operationSize + keySize; - if(_maxTransactionSize < newSize){ - //we hit the transaction size limit + if (_maxTransactionSize < newSize) { + // we hit the transaction size limit res.reset(TRI_ERROR_RESOURCE_LIMIT, "maximal transaction limit reached"); return res; } @@ -283,7 +290,7 @@ Result RocksDBTransactionState::addOperation( "collection not found in transaction state"); } - //sould not fail or fail with exception + // sould not fail or fail with exception collection->addOperation(operationType, operationSize, revisionId); switch (operationType) { @@ -303,5 +310,17 @@ Result RocksDBTransactionState::addOperation( } _transactionSize = newSize; + auto numOperations = _numInserts + _numUpdates + _numRemoves; + + // signal if intermediate commit is required + // this will be done if intermeadiate transactions are endabled + // and either the number of operations or the transaction size + // has reached the limit + if (_intermediateTransactionEnabled && + (_intermediateTransactionNumber <= numOperations || + _intermediateTransactionSize <= newSize)) { + res.commitRequired(true); + } + return res; } diff --git a/arangod/RocksDBEngine/RocksDBTransactionState.h b/arangod/RocksDBEngine/RocksDBTransactionState.h index 412adbf5a8..2a73defdf3 100644 --- a/arangod/RocksDBEngine/RocksDBTransactionState.h +++ b/arangod/RocksDBEngine/RocksDBTransactionState.h @@ -31,6 +31,7 @@ #include "Transaction/Methods.h" #include "VocBase/AccessMode.h" #include "VocBase/voc-types.h" +#include "RocksDBEngine/RocksDBCommon.h" #include #include @@ -70,7 +71,12 @@ class RocksDBSavePoint { /// @brief transaction type class RocksDBTransactionState final : public TransactionState { public: - explicit RocksDBTransactionState(TRI_vocbase_t* vocbase, uint64_t maxOperationSize); + explicit RocksDBTransactionState(TRI_vocbase_t* vocbase + , uint64_t maxOperationSize + , bool intermediateTransactionEnabled + , uint64_t intermediateTransactionSize + , uint64_t intermediateTransactionNumber + ); ~RocksDBTransactionState(); /// @brief begin a transaction @@ -85,7 +91,7 @@ class RocksDBTransactionState final : public TransactionState { uint64_t numInserts() const { return _numInserts; } uint64_t numUpdates() const { return _numUpdates; } uint64_t numRemoves() const { return _numRemoves; } - + inline bool hasOperations() const { return (_numInserts > 0 || _numRemoves > 0 || _numUpdates > 0); } @@ -95,9 +101,10 @@ class RocksDBTransactionState final : public TransactionState { } /// @brief add an operation for a transaction collection - Result addOperation(TRI_voc_cid_t collectionId, TRI_voc_rid_t revisionId, - TRI_voc_document_operation_e operationType, - uint64_t operationSize, uint64_t keySize); + RocksDBOperationResult addOperation( + TRI_voc_cid_t collectionId, TRI_voc_rid_t revisionId, + TRI_voc_document_operation_e operationType, uint64_t operationSize, + uint64_t keySize); rocksdb::Transaction* rocksTransaction() { TRI_ASSERT(_rocksTransaction != nullptr); @@ -111,11 +118,18 @@ class RocksDBTransactionState final : public TransactionState { rocksdb::WriteOptions _rocksWriteOptions; rocksdb::ReadOptions _rocksReadOptions; cache::Transaction* _cacheTx; - uint64_t _transactionSize; // current transaction size - uint64_t _maxTransactionSize; // a transaction may not become bigger than this value + // current transaction size + uint64_t _transactionSize; + // a transaction may not become bigger than this value + uint64_t _maxTransactionSize; + // if a transaction gets bigger than this value and intermediate trasnations + // are endabled then a commit will be done + uint64_t _intermediateTransactionSize; + uint64_t _intermediateTransactionNumber; uint64_t _numInserts; uint64_t _numUpdates; uint64_t _numRemoves; + bool _intermediateTransactionEnabled; }; }