diff --git a/arangod/RocksDBEngine/RocksDBEngine.cpp b/arangod/RocksDBEngine/RocksDBEngine.cpp index a2c669d7bf..ddfdf69300 100644 --- a/arangod/RocksDBEngine/RocksDBEngine.cpp +++ b/arangod/RocksDBEngine/RocksDBEngine.cpp @@ -243,8 +243,8 @@ TransactionState* RocksDBEngine::createTransactionState( TransactionCollection* RocksDBEngine::createTransactionCollection( TransactionState* state, TRI_voc_cid_t cid, AccessMode::Type accessType, - int /*nestingLevel*/) { - return new RocksDBTransactionCollection(state, cid, accessType); + int nestingLevel) { + return new RocksDBTransactionCollection(state, cid, accessType, nestingLevel); } void RocksDBEngine::addParametersForNewCollection(VPackBuilder& builder, diff --git a/arangod/RocksDBEngine/RocksDBTransactionCollection.cpp b/arangod/RocksDBEngine/RocksDBTransactionCollection.cpp index 043b8e469f..3b8dea4380 100644 --- a/arangod/RocksDBEngine/RocksDBTransactionCollection.cpp +++ b/arangod/RocksDBEngine/RocksDBTransactionCollection.cpp @@ -35,9 +35,11 @@ using namespace arangodb; RocksDBTransactionCollection::RocksDBTransactionCollection( - TransactionState* trx, TRI_voc_cid_t cid, AccessMode::Type accessType) + TransactionState* trx, TRI_voc_cid_t cid, AccessMode::Type accessType, int nestingLevel) : TransactionCollection(trx, cid), _accessType(accessType), + _lockType(AccessMode::Type::NONE), + _nestingLevel(nestingLevel), _initialNumberDocuments(0), _revision(0), _operationSize(0), @@ -48,33 +50,43 @@ RocksDBTransactionCollection::RocksDBTransactionCollection( RocksDBTransactionCollection::~RocksDBTransactionCollection() {} /// @brief request a main-level lock for a collection -int RocksDBTransactionCollection::lock() { return TRI_ERROR_NO_ERROR; } +int RocksDBTransactionCollection::lock() { return lock(_accessType, 0); } /// @brief request a lock for a collection int RocksDBTransactionCollection::lock(AccessMode::Type accessType, - int /*nestingLevel*/) { + int nestingLevel) { if (isWrite(accessType) && !isWrite(_accessType)) { // wrong lock type return TRI_ERROR_INTERNAL; } - return TRI_ERROR_NO_ERROR; + if (isLocked()) { + // already locked + return TRI_ERROR_NO_ERROR; + } + + return doLock(accessType, nestingLevel); } /// @brief request an unlock for a collection int RocksDBTransactionCollection::unlock(AccessMode::Type accessType, - int /*nestingLevel*/) { + int nestingLevel) { if (isWrite(accessType) && !isWrite(_accessType)) { // wrong lock type: write-unlock requested but collection is read-only return TRI_ERROR_INTERNAL; } - return TRI_ERROR_NO_ERROR; + if (!isLocked()) { + // already unlocked + return TRI_ERROR_NO_ERROR; + } + + return doUnlock(accessType, nestingLevel); } /// @brief check if a collection is locked in a specific mode in a transaction bool RocksDBTransactionCollection::isLocked(AccessMode::Type accessType, - int /*nestingLevel*/) const { + int nestingLevel) const { if (isWrite(accessType) && !isWrite(_accessType)) { // wrong lock type LOG_TOPIC(WARN, arangodb::Logger::FIXME) @@ -82,14 +94,12 @@ bool RocksDBTransactionCollection::isLocked(AccessMode::Type accessType, return false; } - // TODO - return false; + return isLocked(); } /// @brief check whether a collection is locked at all bool RocksDBTransactionCollection::isLocked() const { - // TODO - return false; + return (_lockType != AccessMode::Type::NONE); } /// @brief whether or not any write operations for the collection happened @@ -127,12 +137,21 @@ int RocksDBTransactionCollection::updateUsage(AccessMode::Type accessType, // upgrade collection type to write-access _accessType = accessType; } + + if (nestingLevel < _nestingLevel) { + _nestingLevel = nestingLevel; + } // all correct return TRI_ERROR_NO_ERROR; } int RocksDBTransactionCollection::use(int nestingLevel) { + if (_nestingLevel != nestingLevel) { + // only process our own collections + return TRI_ERROR_NO_ERROR; + } + if (_collection == nullptr) { TRI_vocbase_col_status_e status; LOG_TRX(_transaction, nestingLevel) << "using collection " << _cid; @@ -159,12 +178,29 @@ int RocksDBTransactionCollection::use(int nestingLevel) { static_cast(_collection->getPhysical())->revision(); } + if (isExclusive(_accessType) && !isLocked()) { + // r/w lock the collection + int res = doLock(_accessType, nestingLevel); + + if (res != TRI_ERROR_NO_ERROR) { + return res; + } + } + return TRI_ERROR_NO_ERROR; } -void RocksDBTransactionCollection::unuse(int /*nestingLevel*/) {} +void RocksDBTransactionCollection::unuse(int nestingLevel) { + // nothing to do here. we're postponing the unlocking until release() +} void RocksDBTransactionCollection::release() { + if (isLocked()) { + // unlock our own r/w locks + doUnlock(_accessType, 0); + _lockType = AccessMode::Type::NONE; + } + // the top level transaction releases all collections if (_collection != nullptr) { // unuse collection, remove usage-lock @@ -206,3 +242,99 @@ void RocksDBTransactionCollection::addOperation( } _operationSize += operationSize; } + +/// @brief lock a collection +int RocksDBTransactionCollection::doLock(AccessMode::Type type, int nestingLevel) { + if (!isExclusive(type)) { + return TRI_ERROR_NO_ERROR; + } + + TRI_ASSERT(_collection != nullptr); + + if (CollectionLockState::_noLockHeaders != nullptr) { + std::string collName(_collection->name()); + auto it = CollectionLockState::_noLockHeaders->find(collName); + if (it != CollectionLockState::_noLockHeaders->end()) { + // do not lock by command + // LOCKING-DEBUG + // std::cout << "LockCollection blocked: " << collName << std::endl; + return TRI_ERROR_NO_ERROR; + } + } + + TRI_ASSERT(!isLocked()); + + LogicalCollection* collection = _collection; + TRI_ASSERT(collection != nullptr); + + auto physical = static_cast(collection->getPhysical()); + TRI_ASSERT(physical != nullptr); + + double timeout = _transaction->timeout(); + if (_transaction->hasHint(transaction::Hints::Hint::TRY_LOCK)) { + // give up early if we cannot acquire the lock instantly + timeout = 0.00000001; + } + + bool const useDeadlockDetector = !_transaction->hasHint(transaction::Hints::Hint::SINGLE_OPERATION); + + LOG_TRX(_transaction, nestingLevel) << "write-locking collection " << _cid; + int res = physical->beginWriteTimed(useDeadlockDetector, timeout); + + if (res == TRI_ERROR_NO_ERROR) { + _lockType = type; + } else if (res == TRI_ERROR_LOCK_TIMEOUT && timeout >= 0.1) { + LOG_TOPIC(WARN, Logger::QUERIES) << "timed out after " << timeout << " s waiting for " << AccessMode::typeString(type) << "-lock on collection '" << _collection->name() << "'"; + } else if (res == TRI_ERROR_DEADLOCK) { + LOG_TOPIC(WARN, Logger::QUERIES) << "deadlock detected while trying to acquire " << AccessMode::typeString(type) << "-lock on collection '" << _collection->name() << "'"; + } + + return res; +} + +/// @brief unlock a collection +int RocksDBTransactionCollection::doUnlock(AccessMode::Type type, int nestingLevel) { + if (!isExclusive(type) || !isExclusive(_lockType)) { + return TRI_ERROR_NO_ERROR; + } + + if (_transaction->hasHint(transaction::Hints::Hint::LOCK_NEVER)) { + // never unlock + return TRI_ERROR_NO_ERROR; + } + + TRI_ASSERT(_collection != nullptr); + + if (CollectionLockState::_noLockHeaders != nullptr) { + std::string collName(_collection->name()); + auto it = CollectionLockState::_noLockHeaders->find(collName); + if (it != CollectionLockState::_noLockHeaders->end()) { + // do not lock by command + // LOCKING-DEBUG + // std::cout << "UnlockCollection blocked: " << collName << std::endl; + return TRI_ERROR_NO_ERROR; + } + } + + TRI_ASSERT(isLocked()); + + if (_nestingLevel < nestingLevel) { + // only process our own collections + return TRI_ERROR_NO_ERROR; + } + + bool const useDeadlockDetector = !_transaction->hasHint(transaction::Hints::Hint::SINGLE_OPERATION); + + LogicalCollection* collection = _collection; + TRI_ASSERT(collection != nullptr); + + auto physical = static_cast(collection->getPhysical()); + TRI_ASSERT(physical != nullptr); + + LOG_TRX(_transaction, nestingLevel) << "write-unlocking collection " << _cid; + physical->endWrite(useDeadlockDetector); + + _lockType = AccessMode::Type::NONE; + + return TRI_ERROR_NO_ERROR; +} diff --git a/arangod/RocksDBEngine/RocksDBTransactionCollection.h b/arangod/RocksDBEngine/RocksDBTransactionCollection.h index 08cc3c3c0a..6bb3805864 100644 --- a/arangod/RocksDBEngine/RocksDBTransactionCollection.h +++ b/arangod/RocksDBEngine/RocksDBTransactionCollection.h @@ -40,7 +40,7 @@ class TransactionState; class RocksDBTransactionCollection final : public TransactionCollection { public: - RocksDBTransactionCollection(TransactionState* trx, TRI_voc_cid_t cid, AccessMode::Type accessType); + RocksDBTransactionCollection(TransactionState* trx, TRI_voc_cid_t cid, AccessMode::Type accessType, int nestingLevel); ~RocksDBTransactionCollection(); /// @brief request a main-level lock for a collection @@ -78,9 +78,18 @@ class RocksDBTransactionCollection final : public TransactionCollection { /// @brief add an operation for a transaction collection void addOperation(TRI_voc_document_operation_e operationType, uint64_t operationSize, TRI_voc_rid_t revisionId) ; void resetCounts(); + + private: + /// @brief request a lock for a collection + int doLock(AccessMode::Type, int nestingLevel); + + /// @brief request an unlock for a collection + int doUnlock(AccessMode::Type, int nestingLevel); private: AccessMode::Type _accessType; // access type (read|write) + AccessMode::Type _lockType; // collection lock type, used for exclusive locks + int _nestingLevel; // the transaction level that added this collection uint64_t _initialNumberDocuments; TRI_voc_rid_t _revision; uint64_t _operationSize; diff --git a/arangod/RocksDBEngine/RocksDBTransactionState.cpp b/arangod/RocksDBEngine/RocksDBTransactionState.cpp index 89c9a56730..7f846d3d68 100644 --- a/arangod/RocksDBEngine/RocksDBTransactionState.cpp +++ b/arangod/RocksDBEngine/RocksDBTransactionState.cpp @@ -103,9 +103,27 @@ RocksDBTransactionState::~RocksDBTransactionState() { Result RocksDBTransactionState::beginTransaction(transaction::Hints hints) { LOG_TRX(this, _nestingLevel) << "beginning " << AccessMode::typeString(_type) << " transaction"; - if (_nestingLevel == 0) { - TRI_ASSERT(_status == transaction::Status::CREATED); + + Result result = useCollections(_nestingLevel); + if (result.ok()) { + // all valid + if (_nestingLevel == 0) { + updateStatus(transaction::Status::RUNNING); + } + } else { + // something is wrong + if (_nestingLevel == 0) { + updateStatus(transaction::Status::ABORTED); + } + + // free what we have got so far + unuseCollections(_nestingLevel); + + return result; + } + + if (_nestingLevel == 0) { // get a new id _id = TRI_NewTickServer(); @@ -132,24 +150,7 @@ Result RocksDBTransactionState::beginTransaction(transaction::Hints hints) { } else { TRI_ASSERT(_status == transaction::Status::RUNNING); } - - Result result = useCollections(_nestingLevel); - - if (result.ok()) { - // all valid - if (_nestingLevel == 0) { - updateStatus(transaction::Status::RUNNING); - } - } else { - // something is wrong - if (_nestingLevel == 0) { - updateStatus(transaction::Status::ABORTED); - } - - // free what we have got so far - unuseCollections(_nestingLevel); - } - + return result; } diff --git a/arangod/StorageEngine/TransactionCollection.h b/arangod/StorageEngine/TransactionCollection.h index 3604dbb7bf..85900a141b 100644 --- a/arangod/StorageEngine/TransactionCollection.h +++ b/arangod/StorageEngine/TransactionCollection.h @@ -82,6 +82,10 @@ class TransactionCollection { virtual void release() = 0; protected: + inline bool isExclusive(AccessMode::Type type) const { + return (type == AccessMode::Type::EXCLUSIVE); + } + inline bool isWrite(AccessMode::Type type) const { return (type == AccessMode::Type::WRITE || type == AccessMode::Type::EXCLUSIVE); }