diff --git a/arangod/Agency/AgencyComm.cpp b/arangod/Agency/AgencyComm.cpp index 0e411dd641..863fe7281b 100644 --- a/arangod/Agency/AgencyComm.cpp +++ b/arangod/Agency/AgencyComm.cpp @@ -1316,7 +1316,7 @@ AgencyCommResult AgencyComm::sendWithFailover( AgencyCommResult result; std::string url = initialUrl; - std::chrono::duration waitInterval (.25); // seconds + std::chrono::duration waitInterval (.0); // seconds auto started = std::chrono::steady_clock::now(); auto timeOut = std::chrono::steady_clock::now() + std::chrono::duration(timeout); diff --git a/arangod/RocksDBEngine/RocksDBCollection.cpp b/arangod/RocksDBEngine/RocksDBCollection.cpp index 689ce19afe..3c16877c69 100644 --- a/arangod/RocksDBEngine/RocksDBCollection.cpp +++ b/arangod/RocksDBEngine/RocksDBCollection.cpp @@ -701,12 +701,14 @@ int RocksDBCollection::insert(arangodb::transaction::Methods* trx, TRI_voc_rid_t revisionId = transaction::helpers::extractRevFromDocument(newSlice); - - RocksDBSavePoint guard(rocksTransaction(trx), - trx->isSingleOperationTransaction()); - + RocksDBTransactionState* state = static_cast(trx->state()); + + RocksDBSavePoint guard(rocksTransaction(trx), + trx->isSingleOperationTransaction(), + [&state]() { state->resetLogState(); }); + state->prepareOperation(_logicalCollection->cid(), revisionId, StringRef(), TRI_VOC_DOCUMENT_OPERATION_INSERT); @@ -810,7 +812,8 @@ int RocksDBCollection::update(arangodb::transaction::Methods* trx, } RocksDBSavePoint guard(rocksTransaction(trx), - trx->isSingleOperationTransaction()); + trx->isSingleOperationTransaction(), + [&state]() { state->resetLogState(); }); // add possible log statement under guard state->prepareOperation(_logicalCollection->cid(), revisionId, StringRef(), @@ -912,7 +915,8 @@ int RocksDBCollection::replace( } RocksDBSavePoint guard(rocksTransaction(trx), - trx->isSingleOperationTransaction()); + trx->isSingleOperationTransaction(), + [&state]() { state->resetLogState(); }); // add possible log statement under guard state->prepareOperation(_logicalCollection->cid(), revisionId, StringRef(), @@ -998,13 +1002,15 @@ int RocksDBCollection::remove(arangodb::transaction::Methods* trx, return res; } } - - RocksDBSavePoint guard(rocksTransaction(trx), - trx->isSingleOperationTransaction()); - - // add possible log statement under guard + RocksDBTransactionState* state = static_cast(trx->state()); + + RocksDBSavePoint guard(rocksTransaction(trx), + trx->isSingleOperationTransaction(), + [&state]() { state->resetLogState(); }); + + // add possible log statement under guard state->prepareOperation(_logicalCollection->cid(), revisionId, StringRef(key), TRI_VOC_DOCUMENT_OPERATION_REMOVE); diff --git a/arangod/RocksDBEngine/RocksDBCounterManager.cpp b/arangod/RocksDBEngine/RocksDBCounterManager.cpp index da9c190b6b..1c84588efc 100644 --- a/arangod/RocksDBEngine/RocksDBCounterManager.cpp +++ b/arangod/RocksDBEngine/RocksDBCounterManager.cpp @@ -229,10 +229,8 @@ void RocksDBCounterManager::readSettings() { if (!result.empty()) { try { - std::shared_ptr builder = VPackParser::fromJson(result); - VPackSlice s = builder->slice(); - - uint64_t lastTick = basics::VelocyPackHelper::stringUInt64(s.get("tick")); + uint64_t lastTick = basics::VelocyPackHelper::stringUInt64(slice.get("tick")); + LOG_TOPIC(TRACE, Logger::ENGINES) << "using last tick: " << lastTick; TRI_UpdateTickServer(lastTick); } catch (...) { LOG_TOPIC(WARN, Logger::ENGINES) << "unable to read initial settings: invalid data"; diff --git a/arangod/RocksDBEngine/RocksDBEdgeIndex.cpp b/arangod/RocksDBEngine/RocksDBEdgeIndex.cpp index c1ade6cf77..561b166367 100644 --- a/arangod/RocksDBEngine/RocksDBEdgeIndex.cpp +++ b/arangod/RocksDBEngine/RocksDBEdgeIndex.cpp @@ -66,6 +66,7 @@ RocksDBEdgeIndexIterator::RocksDBEdgeIndexIterator( keys.release(); // now we have ownership for _keys TRI_ASSERT(_keys->slice().isArray()); RocksDBTransactionState* state = rocksutils::toRocksTransactionState(_trx); + TRI_ASSERT(state != nullptr); rocksdb::Transaction* rtrx = state->rocksTransaction(); _iterator.reset(rtrx->GetIterator(state->readOptions())); updateBounds(); @@ -95,6 +96,8 @@ RocksDBEdgeIndexIterator::~RocksDBEdgeIndexIterator() { } bool RocksDBEdgeIndexIterator::next(TokenCallback const& cb, size_t limit) { + TRI_ASSERT(_trx->state()->isRunning()); + if (limit == 0 || !_keysIterator.valid()) { // No limit no data, or we are actually done. The last call should have // returned false diff --git a/arangod/RocksDBEngine/RocksDBPrimaryIndex.cpp b/arangod/RocksDBEngine/RocksDBPrimaryIndex.cpp index 901a71461f..f0c36b3de3 100644 --- a/arangod/RocksDBEngine/RocksDBPrimaryIndex.cpp +++ b/arangod/RocksDBEngine/RocksDBPrimaryIndex.cpp @@ -121,8 +121,10 @@ RocksDBAllIndexIterator::RocksDBAllIndexIterator( _bounds(RocksDBKeyBounds::PrimaryIndex(index->objectId())) { // acquire rocksdb transaction RocksDBTransactionState* state = rocksutils::toRocksTransactionState(trx); + TRI_ASSERT(state != nullptr); + rocksdb::Transaction* rtrx = state->rocksTransaction(); - auto& options = state->readOptions(); + auto const& options = state->readOptions(); TRI_ASSERT(options.snapshot != nullptr); _iterator.reset(rtrx->GetIterator(options)); @@ -134,6 +136,8 @@ RocksDBAllIndexIterator::RocksDBAllIndexIterator( } bool RocksDBAllIndexIterator::next(TokenCallback const& cb, size_t limit) { + TRI_ASSERT(_trx->state()->isRunning()); + if (limit == 0 || !_iterator->Valid() || outOfRange()) { // No limit no data, or we are actually done. The last call should have // returned false @@ -163,6 +167,8 @@ bool RocksDBAllIndexIterator::next(TokenCallback const& cb, size_t limit) { /// special method to expose the document key for incremental replication bool RocksDBAllIndexIterator::nextWithKey(TokenKeyCallback const& cb, size_t limit) { + TRI_ASSERT(_trx->state()->isRunning()); + if (limit == 0 || !_iterator->Valid() || outOfRange()) { // No limit no data, or we are actually done. The last call should have // returned false @@ -189,6 +195,8 @@ bool RocksDBAllIndexIterator::nextWithKey(TokenKeyCallback const& cb, size_t lim } void RocksDBAllIndexIterator::reset() { + TRI_ASSERT(_trx->state()->isRunning()); + if (_reverse) { _iterator->SeekForPrev(_bounds.end()); } else { @@ -197,6 +205,8 @@ void RocksDBAllIndexIterator::reset() { } bool RocksDBAllIndexIterator::outOfRange() const { + TRI_ASSERT(_trx->state()->isRunning()); + if (_reverse) { return _cmp->Compare(_iterator->key(), _bounds.start()) < 0; } else { @@ -216,8 +226,10 @@ RocksDBAnyIndexIterator::RocksDBAnyIndexIterator( _returned(0) { // acquire rocksdb transaction RocksDBTransactionState* state = rocksutils::toRocksTransactionState(trx); + TRI_ASSERT(state != nullptr); + rocksdb::Transaction* rtrx = state->rocksTransaction(); - auto& options = state->readOptions(); + auto const& options = state->readOptions(); TRI_ASSERT(options.snapshot != nullptr); _iterator.reset(rtrx->GetIterator(options)); @@ -244,6 +256,8 @@ RocksDBAnyIndexIterator::RocksDBAnyIndexIterator( } bool RocksDBAnyIndexIterator::next(TokenCallback const& cb, size_t limit) { + TRI_ASSERT(_trx->state()->isRunning()); + if (limit == 0 || !_iterator->Valid() || outOfRange()) { // No limit no data, or we are actually done. The last call should have // returned false @@ -272,6 +286,7 @@ bool RocksDBAnyIndexIterator::next(TokenCallback const& cb, size_t limit) { void RocksDBAnyIndexIterator::reset() { _iterator->Seek(_bounds.start()); } bool RocksDBAnyIndexIterator::outOfRange() const { + TRI_ASSERT(_trx->state()->isRunning()); return _cmp->Compare(_iterator->key(), _bounds.end()) > 0; } diff --git a/arangod/RocksDBEngine/RocksDBTransactionState.cpp b/arangod/RocksDBEngine/RocksDBTransactionState.cpp index d5cc9528ef..039b84cb35 100644 --- a/arangod/RocksDBEngine/RocksDBTransactionState.cpp +++ b/arangod/RocksDBEngine/RocksDBTransactionState.cpp @@ -55,11 +55,8 @@ using namespace arangodb; // for the RocksDB engine we do not need any additional data struct RocksDBTransactionData final : public TransactionData {}; -RocksDBSavePoint::RocksDBSavePoint(rocksdb::Transaction* trx) - : RocksDBSavePoint(trx, false) {} - -RocksDBSavePoint::RocksDBSavePoint(rocksdb::Transaction* trx, bool handled) - : _trx(trx), _handled(handled) { +RocksDBSavePoint::RocksDBSavePoint(rocksdb::Transaction* trx, bool handled, std::function const& rollbackCallback) + : _trx(trx), _rollbackCallback(rollbackCallback), _handled(handled) { TRI_ASSERT(trx != nullptr); if (!_handled) { _trx->SetSavePoint(); @@ -81,6 +78,7 @@ void RocksDBSavePoint::rollback() { TRI_ASSERT(!_handled); _trx->RollbackToSavePoint(); _handled = true; // in order to not roll back again by accident + _rollbackCallback(); } /// @brief transaction type @@ -159,7 +157,7 @@ Result RocksDBTransactionState::beginTransaction(transaction::Hints hints) { _rocksTransaction->SetSnapshot(); _rocksReadOptions.snapshot = _rocksTransaction->GetSnapshot(); - if (!hasHint(transaction::Hints::Hint::SINGLE_OPERATION)) { + if (!isReadOnlyTransaction() && !hasHint(transaction::Hints::Hint::SINGLE_OPERATION)) { RocksDBLogValue header = RocksDBLogValue::BeginTransaction(_vocbase->id(), _id); _rocksTransaction->PutLogData(header.slice()); @@ -187,50 +185,60 @@ Result RocksDBTransactionState::commitTransaction( if (_nestingLevel == 0) { if (_rocksTransaction != nullptr) { + if (hasOperations()) { // set wait for sync flag if required - if (waitForSync()) { - _rocksWriteOptions.sync = true; - _rocksTransaction->SetWriteOptions(_rocksWriteOptions); - } + if (waitForSync()) { + _rocksWriteOptions.sync = true; + _rocksTransaction->SetWriteOptions(_rocksWriteOptions); + } + + // TODO wait for response on github issue to see how we can use the + // sequence number + result = rocksutils::convertStatus(_rocksTransaction->Commit()); + rocksdb::SequenceNumber latestSeq = + rocksutils::globalRocksDB()->GetLatestSequenceNumber(); + if (!result.ok()) { + abortTransaction(activeTrx); + return result; + } - // TODO wait for response on github issue to see how we can use the - // sequence number - result = rocksutils::convertStatus(_rocksTransaction->Commit()); - rocksdb::SequenceNumber latestSeq = - rocksutils::globalRocksDB()->GetLatestSequenceNumber(); - if (!result.ok()) { - abortTransaction(activeTrx); - return result; - } + if (_cacheTx != nullptr) { + // note: endTransaction() will delete _cacheTx! + CacheManagerFeature::MANAGER->endTransaction(_cacheTx); + _cacheTx = nullptr; + } - if (_cacheTx != nullptr) { - // note: endTransaction() will delete _cacheTx! - CacheManagerFeature::MANAGER->endTransaction(_cacheTx); - _cacheTx = nullptr; - } + for (auto& trxCollection : _collections) { + RocksDBTransactionCollection* collection = + static_cast(trxCollection); + int64_t adjustment = + collection->numInserts() - collection->numRemoves(); - rocksdb::Snapshot const* snap = this->_rocksReadOptions.snapshot; - TRI_ASSERT(snap != nullptr); + if (collection->numInserts() != 0 || collection->numRemoves() != 0 || + collection->revision() != 0) { + RocksDBCollection* coll = static_cast( + trxCollection->collection()->getPhysical()); + coll->adjustNumberDocuments(adjustment); + coll->setRevision(collection->revision()); + RocksDBEngine* engine = + static_cast(EngineSelectorFeature::ENGINE); - for (auto& trxCollection : _collections) { - RocksDBTransactionCollection* collection = - static_cast(trxCollection); - int64_t adjustment = - collection->numInserts() - collection->numRemoves(); - - if (collection->numInserts() != 0 || collection->numRemoves() != 0 || - collection->revision() != 0) { - RocksDBCollection* coll = static_cast( - trxCollection->collection()->getPhysical()); - coll->adjustNumberDocuments(adjustment); - coll->setRevision(collection->revision()); - RocksDBEngine* engine = - static_cast(EngineSelectorFeature::ENGINE); - - RocksDBCounterManager::CounterAdjustment update( - latestSeq, collection->numInserts(), collection->numRemoves(), - collection->revision()); - engine->counterManager()->updateCounter(coll->objectId(), update); + RocksDBCounterManager::CounterAdjustment update( + latestSeq, collection->numInserts(), collection->numRemoves(), + collection->revision()); + engine->counterManager()->updateCounter(coll->objectId(), update); + } + } + } else { + // don't write anything if the transaction is empty + // TODO: calling Rollback() here does not work for some reason but it should. + // must investigate further!! + result = rocksutils::convertStatus(_rocksTransaction->Commit()); + + if (_cacheTx != nullptr) { + // note: endTransaction() will delete _cacheTx! + CacheManagerFeature::MANAGER->endTransaction(_cacheTx); + _cacheTx = nullptr; } } @@ -257,13 +265,14 @@ Result RocksDBTransactionState::abortTransaction( if (_rocksTransaction != nullptr) { rocksdb::Status status = _rocksTransaction->Rollback(); result = rocksutils::convertStatus(status); - _rocksTransaction.reset(); - } + + if (_cacheTx != nullptr) { + // note: endTransaction() will delete _cacheTx! + CacheManagerFeature::MANAGER->endTransaction(_cacheTx); + _cacheTx = nullptr; + } - if (_cacheTx != nullptr) { - // note: endTransaction() will delete _cacheTx! - CacheManagerFeature::MANAGER->endTransaction(_cacheTx); - _cacheTx = nullptr; + _rocksTransaction.reset(); } updateStatus(transaction::Status::ABORTED); @@ -283,7 +292,9 @@ Result RocksDBTransactionState::abortTransaction( void RocksDBTransactionState::prepareOperation( TRI_voc_cid_t collectionId, TRI_voc_rid_t revisionId, StringRef const& key, TRI_voc_document_operation_e operationType) { - + + TRI_ASSERT(!isReadOnlyTransaction()); + bool singleOp = hasHint(transaction::Hints::Hint::SINGLE_OPERATION); // single operations should never call this method twice TRI_ASSERT(!singleOp || _lastUsedCollection == 0); @@ -321,7 +332,7 @@ void RocksDBTransactionState::prepareOperation( _lastUsedCollection = collectionId; } - // we need to the remove log entry, if we don't have the single optimization + // we need to log the remove log entry, if we don't have the single optimization if (!singleOp && operationType == TRI_VOC_DOCUMENT_OPERATION_REMOVE) { RocksDBLogValue logValue = RocksDBLogValue::DocumentRemove(key); diff --git a/arangod/RocksDBEngine/RocksDBTransactionState.h b/arangod/RocksDBEngine/RocksDBTransactionState.h index 55d283d7a8..0af2a1cc64 100644 --- a/arangod/RocksDBEngine/RocksDBTransactionState.h +++ b/arangod/RocksDBEngine/RocksDBTransactionState.h @@ -57,8 +57,7 @@ class TransactionCollection; class RocksDBSavePoint { public: - explicit RocksDBSavePoint(rocksdb::Transaction* trx); - RocksDBSavePoint(rocksdb::Transaction* trx, bool handled); + RocksDBSavePoint(rocksdb::Transaction* trx, bool handled, std::function const& rollbackCallback); ~RocksDBSavePoint(); void commit(); @@ -68,6 +67,7 @@ class RocksDBSavePoint { private: rocksdb::Transaction* _trx; + std::function const _rollbackCallback; bool _handled; }; @@ -94,6 +94,9 @@ class RocksDBTransactionState final : public TransactionState { uint64_t numUpdates() const { return _numUpdates; } uint64_t numRemoves() const { return _numRemoves; } + /// @brief reset previous log state after a rollback to safepoint + void resetLogState() { _lastUsedCollection = 0; } + inline bool hasOperations() const { return (_numInserts > 0 || _numRemoves > 0 || _numUpdates > 0); } diff --git a/arangod/RocksDBEngine/RocksDBVPackIndex.cpp b/arangod/RocksDBEngine/RocksDBVPackIndex.cpp index 633459e003..73a11336e7 100644 --- a/arangod/RocksDBEngine/RocksDBVPackIndex.cpp +++ b/arangod/RocksDBEngine/RocksDBVPackIndex.cpp @@ -90,7 +90,8 @@ RocksDBVPackIndexIterator::RocksDBVPackIndexIterator( left, right)) { RocksDBTransactionState* state = rocksutils::toRocksTransactionState(trx); rocksdb::Transaction* rtrx = state->rocksTransaction(); - auto options = state->readOptions(); + TRI_ASSERT(state != nullptr); + auto const& options = state->readOptions(); _iterator.reset(rtrx->GetIterator(options)); if (reverse) { @@ -102,6 +103,8 @@ RocksDBVPackIndexIterator::RocksDBVPackIndexIterator( /// @brief Reset the cursor void RocksDBVPackIndexIterator::reset() { + TRI_ASSERT(_trx->state()->isRunning()); + if (_reverse) { _iterator->SeekForPrev(_bounds.end()); } else { @@ -110,6 +113,8 @@ void RocksDBVPackIndexIterator::reset() { } bool RocksDBVPackIndexIterator::outOfRange() const { + TRI_ASSERT(_trx->state()->isRunning()); + if (_reverse) { return (_cmp->Compare(_iterator->key(), _bounds.start()) < 0); } else { @@ -118,6 +123,8 @@ bool RocksDBVPackIndexIterator::outOfRange() const { } bool RocksDBVPackIndexIterator::next(TokenCallback const& cb, size_t limit) { + TRI_ASSERT(_trx->state()->isRunning()); + if (limit == 0 || !_iterator->Valid() || outOfRange()) { // No limit no data, or we are actually done. The last call should have // returned false diff --git a/js/server/tests/dump/dump-authentication.js b/js/server/tests/dump/dump-authentication.js index c8ec587c75..3192dc0101 100644 --- a/js/server/tests/dump/dump-authentication.js +++ b/js/server/tests/dump/dump-authentication.js @@ -66,7 +66,9 @@ function dumpTestSuite () { assertEqual(2, c.type()); // document assertTrue(p.waitForSync); assertFalse(p.isVolatile); - assertEqual(256, p.indexBuckets); + if (db._engine().name === "mmfiles") { + assertEqual(256, p.indexBuckets); + } assertEqual(1, c.getIndexes().length); // just primary index assertEqual("primary", c.getIndexes()[0].type); @@ -139,7 +141,9 @@ function dumpTestSuite () { assertEqual(2, c.type()); // document assertFalse(p.waitForSync); assertFalse(p.isVolatile); - assertEqual(16, p.indexBuckets); + if (db._engine().name === "mmfiles") { + assertEqual(16, p.indexBuckets); + } assertEqual(1, c.getIndexes().length); // just primary index assertEqual("primary", c.getIndexes()[0].type); diff --git a/js/server/tests/replication/replication-ongoing.js b/js/server/tests/replication/replication-ongoing.js index 31f3f35f9c..b735462fdf 100644 --- a/js/server/tests/replication/replication-ongoing.js +++ b/js/server/tests/replication/replication-ongoing.js @@ -164,8 +164,14 @@ function ReplicationSuite() { var printed = false; while (true) { - if (!slaveFuncOngoing(state)) { - return; + var r = slaveFuncOngoing(state); + if (r === "wait") { + // special return code that tells us to hang on + internal.wait(0.5, false); + continue; + } + if (!r) { + break; } var slaveState = replication.applier.state(); @@ -666,6 +672,9 @@ function ReplicationSuite() { try { require("@arangodb/tasks").get(state.task); // task exists + connectToSlave(); + internal.wait(0.5, false); + return "wait"; } catch (err) { // task does not exist. we're done state.checksum = collectionChecksum(cn); @@ -674,10 +683,6 @@ function ReplicationSuite() { connectToSlave(); return false; } - - connectToSlave(); - internal.wait(0.5, false); - return true; }, function(state) { @@ -745,6 +750,12 @@ function ReplicationSuite() { try { require("@arangodb/tasks").get(state.task); // task exists + connectToSlave(); + + internal.wait(0.5, false); + replication.applier.start(); + assertTrue(replication.applier.state().state.running); + return "wait"; } catch (err) { // task does not exist. we're done state.checksum = collectionChecksum(cn); @@ -753,13 +764,6 @@ function ReplicationSuite() { connectToSlave(); return false; } - - connectToSlave(); - - internal.wait(0.5, false); - replication.applier.start(); - assertTrue(replication.applier.state().state.running); - return true; }, function(state) {