diff --git a/arangod/RocksDBEngine/RocksDBCollection.cpp b/arangod/RocksDBEngine/RocksDBCollection.cpp index 7a3a1d01d4..a9a3e6881d 100644 --- a/arangod/RocksDBEngine/RocksDBCollection.cpp +++ b/arangod/RocksDBEngine/RocksDBCollection.cpp @@ -26,7 +26,9 @@ #include "Basics/Result.h" #include "Basics/StaticStrings.h" #include "Basics/VelocyPackHelper.h" +#include "Basics/WriteLocker.h" #include "Cluster/ClusterMethods.h" +#include "Cluster/CollectionLockState.h" #include "Indexes/Index.h" #include "Indexes/IndexIterator.h" #include "RestServer/DatabaseFeature.h" @@ -1254,3 +1256,148 @@ void RocksDBCollection::adjustNumberDocuments(int64_t adjustment) { _numberDocuments += static_cast(adjustment); } } + +/// @brief write locks a collection, with a timeout +int RocksDBCollection::beginWriteTimed(bool useDeadlockDetector, + double timeout) { + if (CollectionLockState::_noLockHeaders != nullptr) { + auto it = + CollectionLockState::_noLockHeaders->find(_logicalCollection->name()); + if (it != CollectionLockState::_noLockHeaders->end()) { + // do not lock by command + // LOCKING-DEBUG + // std::cout << "BeginWriteTimed blocked: " << _name << + // std::endl; + return TRI_ERROR_NO_ERROR; + } + } + + // LOCKING-DEBUG + // std::cout << "BeginWriteTimed: " << document->_info._name << std::endl; + int iterations = 0; + bool wasBlocked = false; + uint64_t waitTime = 0; // indicate that times uninitialized + double startTime = 0.0; + + while (true) { + TRY_WRITE_LOCKER(locker, _exclusiveLock); + + if (locker.isLocked()) { + // register writer + if (useDeadlockDetector) { + _logicalCollection->vocbase()->_deadlockDetector.addWriter( + _logicalCollection, wasBlocked); + } + // keep lock and exit loop + locker.steal(); + return TRI_ERROR_NO_ERROR; + } + + if (useDeadlockDetector) { + try { + if (!wasBlocked) { + // insert writer + wasBlocked = true; + if (_logicalCollection->vocbase()->_deadlockDetector.setWriterBlocked( + _logicalCollection) == TRI_ERROR_DEADLOCK) { + // deadlock + LOG_TOPIC(TRACE, arangodb::Logger::FIXME) + << "deadlock detected while trying to acquire " + "write-lock on collection '" + << _logicalCollection->name() << "'"; + return TRI_ERROR_DEADLOCK; + } + LOG_TOPIC(TRACE, arangodb::Logger::FIXME) + << "waiting for write-lock on collection '" + << _logicalCollection->name() << "'"; + } else if (++iterations >= 5) { + // periodically check for deadlocks + TRI_ASSERT(wasBlocked); + iterations = 0; + if (_logicalCollection->vocbase()->_deadlockDetector.detectDeadlock( + _logicalCollection, true) == TRI_ERROR_DEADLOCK) { + // deadlock + _logicalCollection->vocbase()->_deadlockDetector.unsetWriterBlocked( + _logicalCollection); + LOG_TOPIC(TRACE, arangodb::Logger::FIXME) + << "deadlock detected while trying to acquire " + "write-lock on collection '" + << _logicalCollection->name() << "'"; + return TRI_ERROR_DEADLOCK; + } + } + } catch (...) { + // clean up! + if (wasBlocked) { + _logicalCollection->vocbase()->_deadlockDetector.unsetWriterBlocked( + _logicalCollection); + } + // always exit + return TRI_ERROR_OUT_OF_MEMORY; + } + } + + double now = TRI_microtime(); + + if (waitTime == 0) { // initialize times + // set end time for lock waiting + if (timeout <= 0.0) { + timeout = defaultLockTimeout; + } + startTime = now; + waitTime = 1; + } + + if (now > startTime + timeout) { + if (useDeadlockDetector) { + _logicalCollection->vocbase()->_deadlockDetector.unsetWriterBlocked( + _logicalCollection); + } + LOG_TOPIC(TRACE, arangodb::Logger::FIXME) + << "timed out after " << timeout + << " s waiting for write-lock on collection '" + << _logicalCollection->name() << "'"; + return TRI_ERROR_LOCK_TIMEOUT; + } + + if (now - startTime < 0.001) { + std::this_thread::yield(); + } else { + usleep(static_cast(waitTime)); + if (waitTime < 500000) { + waitTime *= 2; + } + } + } +} + +/// @brief write unlocks a collection +int RocksDBCollection::endWrite(bool useDeadlockDetector) { + if (CollectionLockState::_noLockHeaders != nullptr) { + auto it = + CollectionLockState::_noLockHeaders->find(_logicalCollection->name()); + if (it != CollectionLockState::_noLockHeaders->end()) { + // do not lock by command + // LOCKING-DEBUG + // std::cout << "EndWrite blocked: " << _name << + // std::endl; + return TRI_ERROR_NO_ERROR; + } + } + + if (useDeadlockDetector) { + // unregister writer + try { + _logicalCollection->vocbase()->_deadlockDetector.unsetWriter( + _logicalCollection); + } catch (...) { + // must go on here to unlock the lock + } + } + + // LOCKING-DEBUG + // std::cout << "EndWrite: " << _name << std::endl; + _exclusiveLock.unlockWrite(); + + return TRI_ERROR_NO_ERROR; +} diff --git a/arangod/RocksDBEngine/RocksDBCollection.h b/arangod/RocksDBEngine/RocksDBCollection.h index f892773ff0..dfc759cdf3 100644 --- a/arangod/RocksDBEngine/RocksDBCollection.h +++ b/arangod/RocksDBEngine/RocksDBCollection.h @@ -43,6 +43,8 @@ struct RocksDBToken; class RocksDBCollection final : public PhysicalCollection { friend class RocksDBEngine; friend class RocksDBVPackIndex; + + constexpr static double defaultLockTimeout = 10.0 * 60.0; public: static inline RocksDBCollection* toRocksDBCollection( @@ -179,7 +181,11 @@ class RocksDBCollection final : public PhysicalCollection { Result lookupDocumentToken(transaction::Methods* trx, arangodb::StringRef key, RocksDBToken& token); - + + int beginWriteTimed(bool useDeadlockDetector, double timeout = 0.0); + + int endWrite(bool useDeadlockDetector); + private: /// @brief return engine-specific figures void figuresSpecific( diff --git a/arangod/RocksDBEngine/RocksDBTransactionCollection.cpp b/arangod/RocksDBEngine/RocksDBTransactionCollection.cpp index 250cca030e..435e575ab2 100644 --- a/arangod/RocksDBEngine/RocksDBTransactionCollection.cpp +++ b/arangod/RocksDBEngine/RocksDBTransactionCollection.cpp @@ -175,7 +175,7 @@ void RocksDBTransactionCollection::release() { } } -void RocksDBTransactionCollection::resetCounts(){ +void RocksDBTransactionCollection::resetCounts() { // _initialNumberDocuments; -- HAS TO BE UPDATED _operationSize = 0; _numInserts = 0; @@ -183,7 +183,6 @@ void RocksDBTransactionCollection::resetCounts(){ _numRemoves = 0; } - /// @brief add an operation for a transaction collection void RocksDBTransactionCollection::addOperation( TRI_voc_document_operation_e operationType, uint64_t operationSize, diff --git a/arangod/RocksDBEngine/RocksDBTransactionState.cpp b/arangod/RocksDBEngine/RocksDBTransactionState.cpp index 9aebfd547d..21c7552d99 100644 --- a/arangod/RocksDBEngine/RocksDBTransactionState.cpp +++ b/arangod/RocksDBEngine/RocksDBTransactionState.cpp @@ -129,10 +129,6 @@ Result RocksDBTransactionState::beginTransaction(transaction::Hints hints) { _rocksWriteOptions, rocksdb::TransactionOptions())); _rocksTransaction->SetSnapshot(); _rocksReadOptions.snapshot = _rocksTransaction->GetSnapshot(); - - /*LOG_TOPIC(ERR, Logger::FIXME) - << "#" << _id << " BEGIN (read-only: " << isReadOnlyTransaction() - << ")";*/ } else { TRI_ASSERT(_status == transaction::Status::RUNNING); } @@ -333,8 +329,8 @@ RocksDBOperationResult RocksDBTransactionState::addOperation( return res; } -void RocksDBTransactionState::reset(){ - //only rest when already commited +void RocksDBTransactionState::reset() { + //only reset when already commited TRI_ASSERT(_status == transaction::Status::COMMITTED); //reset count _transactionSize = 0; @@ -352,5 +348,4 @@ void RocksDBTransactionState::reset(){ // start new transaction beginTransaction(transaction::Hints()); - }