mirror of https://gitee.com/bigwinds/arangodb
first stab at exclusive locks
This commit is contained in:
parent
2322079bed
commit
3727e7fb7a
|
@ -243,8 +243,8 @@ TransactionState* RocksDBEngine::createTransactionState(
|
||||||
|
|
||||||
TransactionCollection* RocksDBEngine::createTransactionCollection(
|
TransactionCollection* RocksDBEngine::createTransactionCollection(
|
||||||
TransactionState* state, TRI_voc_cid_t cid, AccessMode::Type accessType,
|
TransactionState* state, TRI_voc_cid_t cid, AccessMode::Type accessType,
|
||||||
int /*nestingLevel*/) {
|
int nestingLevel) {
|
||||||
return new RocksDBTransactionCollection(state, cid, accessType);
|
return new RocksDBTransactionCollection(state, cid, accessType, nestingLevel);
|
||||||
}
|
}
|
||||||
|
|
||||||
void RocksDBEngine::addParametersForNewCollection(VPackBuilder& builder,
|
void RocksDBEngine::addParametersForNewCollection(VPackBuilder& builder,
|
||||||
|
|
|
@ -35,9 +35,11 @@
|
||||||
using namespace arangodb;
|
using namespace arangodb;
|
||||||
|
|
||||||
RocksDBTransactionCollection::RocksDBTransactionCollection(
|
RocksDBTransactionCollection::RocksDBTransactionCollection(
|
||||||
TransactionState* trx, TRI_voc_cid_t cid, AccessMode::Type accessType)
|
TransactionState* trx, TRI_voc_cid_t cid, AccessMode::Type accessType, int nestingLevel)
|
||||||
: TransactionCollection(trx, cid),
|
: TransactionCollection(trx, cid),
|
||||||
_accessType(accessType),
|
_accessType(accessType),
|
||||||
|
_lockType(AccessMode::Type::NONE),
|
||||||
|
_nestingLevel(nestingLevel),
|
||||||
_initialNumberDocuments(0),
|
_initialNumberDocuments(0),
|
||||||
_revision(0),
|
_revision(0),
|
||||||
_operationSize(0),
|
_operationSize(0),
|
||||||
|
@ -48,33 +50,43 @@ RocksDBTransactionCollection::RocksDBTransactionCollection(
|
||||||
RocksDBTransactionCollection::~RocksDBTransactionCollection() {}
|
RocksDBTransactionCollection::~RocksDBTransactionCollection() {}
|
||||||
|
|
||||||
/// @brief request a main-level lock for a collection
|
/// @brief request a main-level lock for a collection
|
||||||
int RocksDBTransactionCollection::lock() { return TRI_ERROR_NO_ERROR; }
|
int RocksDBTransactionCollection::lock() { return lock(_accessType, 0); }
|
||||||
|
|
||||||
/// @brief request a lock for a collection
|
/// @brief request a lock for a collection
|
||||||
int RocksDBTransactionCollection::lock(AccessMode::Type accessType,
|
int RocksDBTransactionCollection::lock(AccessMode::Type accessType,
|
||||||
int /*nestingLevel*/) {
|
int nestingLevel) {
|
||||||
if (isWrite(accessType) && !isWrite(_accessType)) {
|
if (isWrite(accessType) && !isWrite(_accessType)) {
|
||||||
// wrong lock type
|
// wrong lock type
|
||||||
return TRI_ERROR_INTERNAL;
|
return TRI_ERROR_INTERNAL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (isLocked()) {
|
||||||
|
// already locked
|
||||||
return TRI_ERROR_NO_ERROR;
|
return TRI_ERROR_NO_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return doLock(accessType, nestingLevel);
|
||||||
|
}
|
||||||
|
|
||||||
/// @brief request an unlock for a collection
|
/// @brief request an unlock for a collection
|
||||||
int RocksDBTransactionCollection::unlock(AccessMode::Type accessType,
|
int RocksDBTransactionCollection::unlock(AccessMode::Type accessType,
|
||||||
int /*nestingLevel*/) {
|
int nestingLevel) {
|
||||||
if (isWrite(accessType) && !isWrite(_accessType)) {
|
if (isWrite(accessType) && !isWrite(_accessType)) {
|
||||||
// wrong lock type: write-unlock requested but collection is read-only
|
// wrong lock type: write-unlock requested but collection is read-only
|
||||||
return TRI_ERROR_INTERNAL;
|
return TRI_ERROR_INTERNAL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (!isLocked()) {
|
||||||
|
// already unlocked
|
||||||
return TRI_ERROR_NO_ERROR;
|
return TRI_ERROR_NO_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return doUnlock(accessType, nestingLevel);
|
||||||
|
}
|
||||||
|
|
||||||
/// @brief check if a collection is locked in a specific mode in a transaction
|
/// @brief check if a collection is locked in a specific mode in a transaction
|
||||||
bool RocksDBTransactionCollection::isLocked(AccessMode::Type accessType,
|
bool RocksDBTransactionCollection::isLocked(AccessMode::Type accessType,
|
||||||
int /*nestingLevel*/) const {
|
int nestingLevel) const {
|
||||||
if (isWrite(accessType) && !isWrite(_accessType)) {
|
if (isWrite(accessType) && !isWrite(_accessType)) {
|
||||||
// wrong lock type
|
// wrong lock type
|
||||||
LOG_TOPIC(WARN, arangodb::Logger::FIXME)
|
LOG_TOPIC(WARN, arangodb::Logger::FIXME)
|
||||||
|
@ -82,14 +94,12 @@ bool RocksDBTransactionCollection::isLocked(AccessMode::Type accessType,
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO
|
return isLocked();
|
||||||
return false;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// @brief check whether a collection is locked at all
|
/// @brief check whether a collection is locked at all
|
||||||
bool RocksDBTransactionCollection::isLocked() const {
|
bool RocksDBTransactionCollection::isLocked() const {
|
||||||
// TODO
|
return (_lockType != AccessMode::Type::NONE);
|
||||||
return false;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// @brief whether or not any write operations for the collection happened
|
/// @brief whether or not any write operations for the collection happened
|
||||||
|
@ -128,11 +138,20 @@ int RocksDBTransactionCollection::updateUsage(AccessMode::Type accessType,
|
||||||
_accessType = accessType;
|
_accessType = accessType;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (nestingLevel < _nestingLevel) {
|
||||||
|
_nestingLevel = nestingLevel;
|
||||||
|
}
|
||||||
|
|
||||||
// all correct
|
// all correct
|
||||||
return TRI_ERROR_NO_ERROR;
|
return TRI_ERROR_NO_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
int RocksDBTransactionCollection::use(int nestingLevel) {
|
int RocksDBTransactionCollection::use(int nestingLevel) {
|
||||||
|
if (_nestingLevel != nestingLevel) {
|
||||||
|
// only process our own collections
|
||||||
|
return TRI_ERROR_NO_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
if (_collection == nullptr) {
|
if (_collection == nullptr) {
|
||||||
TRI_vocbase_col_status_e status;
|
TRI_vocbase_col_status_e status;
|
||||||
LOG_TRX(_transaction, nestingLevel) << "using collection " << _cid;
|
LOG_TRX(_transaction, nestingLevel) << "using collection " << _cid;
|
||||||
|
@ -159,12 +178,29 @@ int RocksDBTransactionCollection::use(int nestingLevel) {
|
||||||
static_cast<RocksDBCollection*>(_collection->getPhysical())->revision();
|
static_cast<RocksDBCollection*>(_collection->getPhysical())->revision();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (isExclusive(_accessType) && !isLocked()) {
|
||||||
|
// r/w lock the collection
|
||||||
|
int res = doLock(_accessType, nestingLevel);
|
||||||
|
|
||||||
|
if (res != TRI_ERROR_NO_ERROR) {
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return TRI_ERROR_NO_ERROR;
|
return TRI_ERROR_NO_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
void RocksDBTransactionCollection::unuse(int /*nestingLevel*/) {}
|
void RocksDBTransactionCollection::unuse(int nestingLevel) {
|
||||||
|
// nothing to do here. we're postponing the unlocking until release()
|
||||||
|
}
|
||||||
|
|
||||||
void RocksDBTransactionCollection::release() {
|
void RocksDBTransactionCollection::release() {
|
||||||
|
if (isLocked()) {
|
||||||
|
// unlock our own r/w locks
|
||||||
|
doUnlock(_accessType, 0);
|
||||||
|
_lockType = AccessMode::Type::NONE;
|
||||||
|
}
|
||||||
|
|
||||||
// the top level transaction releases all collections
|
// the top level transaction releases all collections
|
||||||
if (_collection != nullptr) {
|
if (_collection != nullptr) {
|
||||||
// unuse collection, remove usage-lock
|
// unuse collection, remove usage-lock
|
||||||
|
@ -206,3 +242,99 @@ void RocksDBTransactionCollection::addOperation(
|
||||||
}
|
}
|
||||||
_operationSize += operationSize;
|
_operationSize += operationSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// @brief lock a collection
|
||||||
|
int RocksDBTransactionCollection::doLock(AccessMode::Type type, int nestingLevel) {
|
||||||
|
if (!isExclusive(type)) {
|
||||||
|
return TRI_ERROR_NO_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
|
TRI_ASSERT(_collection != nullptr);
|
||||||
|
|
||||||
|
if (CollectionLockState::_noLockHeaders != nullptr) {
|
||||||
|
std::string collName(_collection->name());
|
||||||
|
auto it = CollectionLockState::_noLockHeaders->find(collName);
|
||||||
|
if (it != CollectionLockState::_noLockHeaders->end()) {
|
||||||
|
// do not lock by command
|
||||||
|
// LOCKING-DEBUG
|
||||||
|
// std::cout << "LockCollection blocked: " << collName << std::endl;
|
||||||
|
return TRI_ERROR_NO_ERROR;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
TRI_ASSERT(!isLocked());
|
||||||
|
|
||||||
|
LogicalCollection* collection = _collection;
|
||||||
|
TRI_ASSERT(collection != nullptr);
|
||||||
|
|
||||||
|
auto physical = static_cast<RocksDBCollection*>(collection->getPhysical());
|
||||||
|
TRI_ASSERT(physical != nullptr);
|
||||||
|
|
||||||
|
double timeout = _transaction->timeout();
|
||||||
|
if (_transaction->hasHint(transaction::Hints::Hint::TRY_LOCK)) {
|
||||||
|
// give up early if we cannot acquire the lock instantly
|
||||||
|
timeout = 0.00000001;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool const useDeadlockDetector = !_transaction->hasHint(transaction::Hints::Hint::SINGLE_OPERATION);
|
||||||
|
|
||||||
|
LOG_TRX(_transaction, nestingLevel) << "write-locking collection " << _cid;
|
||||||
|
int res = physical->beginWriteTimed(useDeadlockDetector, timeout);
|
||||||
|
|
||||||
|
if (res == TRI_ERROR_NO_ERROR) {
|
||||||
|
_lockType = type;
|
||||||
|
} else if (res == TRI_ERROR_LOCK_TIMEOUT && timeout >= 0.1) {
|
||||||
|
LOG_TOPIC(WARN, Logger::QUERIES) << "timed out after " << timeout << " s waiting for " << AccessMode::typeString(type) << "-lock on collection '" << _collection->name() << "'";
|
||||||
|
} else if (res == TRI_ERROR_DEADLOCK) {
|
||||||
|
LOG_TOPIC(WARN, Logger::QUERIES) << "deadlock detected while trying to acquire " << AccessMode::typeString(type) << "-lock on collection '" << _collection->name() << "'";
|
||||||
|
}
|
||||||
|
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// @brief unlock a collection
|
||||||
|
int RocksDBTransactionCollection::doUnlock(AccessMode::Type type, int nestingLevel) {
|
||||||
|
if (!isExclusive(type) || !isExclusive(_lockType)) {
|
||||||
|
return TRI_ERROR_NO_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (_transaction->hasHint(transaction::Hints::Hint::LOCK_NEVER)) {
|
||||||
|
// never unlock
|
||||||
|
return TRI_ERROR_NO_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
|
TRI_ASSERT(_collection != nullptr);
|
||||||
|
|
||||||
|
if (CollectionLockState::_noLockHeaders != nullptr) {
|
||||||
|
std::string collName(_collection->name());
|
||||||
|
auto it = CollectionLockState::_noLockHeaders->find(collName);
|
||||||
|
if (it != CollectionLockState::_noLockHeaders->end()) {
|
||||||
|
// do not lock by command
|
||||||
|
// LOCKING-DEBUG
|
||||||
|
// std::cout << "UnlockCollection blocked: " << collName << std::endl;
|
||||||
|
return TRI_ERROR_NO_ERROR;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
TRI_ASSERT(isLocked());
|
||||||
|
|
||||||
|
if (_nestingLevel < nestingLevel) {
|
||||||
|
// only process our own collections
|
||||||
|
return TRI_ERROR_NO_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool const useDeadlockDetector = !_transaction->hasHint(transaction::Hints::Hint::SINGLE_OPERATION);
|
||||||
|
|
||||||
|
LogicalCollection* collection = _collection;
|
||||||
|
TRI_ASSERT(collection != nullptr);
|
||||||
|
|
||||||
|
auto physical = static_cast<RocksDBCollection*>(collection->getPhysical());
|
||||||
|
TRI_ASSERT(physical != nullptr);
|
||||||
|
|
||||||
|
LOG_TRX(_transaction, nestingLevel) << "write-unlocking collection " << _cid;
|
||||||
|
physical->endWrite(useDeadlockDetector);
|
||||||
|
|
||||||
|
_lockType = AccessMode::Type::NONE;
|
||||||
|
|
||||||
|
return TRI_ERROR_NO_ERROR;
|
||||||
|
}
|
||||||
|
|
|
@ -40,7 +40,7 @@ class TransactionState;
|
||||||
class RocksDBTransactionCollection final : public TransactionCollection {
|
class RocksDBTransactionCollection final : public TransactionCollection {
|
||||||
public:
|
public:
|
||||||
|
|
||||||
RocksDBTransactionCollection(TransactionState* trx, TRI_voc_cid_t cid, AccessMode::Type accessType);
|
RocksDBTransactionCollection(TransactionState* trx, TRI_voc_cid_t cid, AccessMode::Type accessType, int nestingLevel);
|
||||||
~RocksDBTransactionCollection();
|
~RocksDBTransactionCollection();
|
||||||
|
|
||||||
/// @brief request a main-level lock for a collection
|
/// @brief request a main-level lock for a collection
|
||||||
|
@ -79,8 +79,17 @@ class RocksDBTransactionCollection final : public TransactionCollection {
|
||||||
void addOperation(TRI_voc_document_operation_e operationType, uint64_t operationSize, TRI_voc_rid_t revisionId) ;
|
void addOperation(TRI_voc_document_operation_e operationType, uint64_t operationSize, TRI_voc_rid_t revisionId) ;
|
||||||
void resetCounts();
|
void resetCounts();
|
||||||
|
|
||||||
|
private:
|
||||||
|
/// @brief request a lock for a collection
|
||||||
|
int doLock(AccessMode::Type, int nestingLevel);
|
||||||
|
|
||||||
|
/// @brief request an unlock for a collection
|
||||||
|
int doUnlock(AccessMode::Type, int nestingLevel);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
AccessMode::Type _accessType; // access type (read|write)
|
AccessMode::Type _accessType; // access type (read|write)
|
||||||
|
AccessMode::Type _lockType; // collection lock type, used for exclusive locks
|
||||||
|
int _nestingLevel; // the transaction level that added this collection
|
||||||
uint64_t _initialNumberDocuments;
|
uint64_t _initialNumberDocuments;
|
||||||
TRI_voc_rid_t _revision;
|
TRI_voc_rid_t _revision;
|
||||||
uint64_t _operationSize;
|
uint64_t _operationSize;
|
||||||
|
|
|
@ -103,9 +103,27 @@ RocksDBTransactionState::~RocksDBTransactionState() {
|
||||||
Result RocksDBTransactionState::beginTransaction(transaction::Hints hints) {
|
Result RocksDBTransactionState::beginTransaction(transaction::Hints hints) {
|
||||||
LOG_TRX(this, _nestingLevel) << "beginning " << AccessMode::typeString(_type)
|
LOG_TRX(this, _nestingLevel) << "beginning " << AccessMode::typeString(_type)
|
||||||
<< " transaction";
|
<< " transaction";
|
||||||
if (_nestingLevel == 0) {
|
|
||||||
TRI_ASSERT(_status == transaction::Status::CREATED);
|
|
||||||
|
|
||||||
|
Result result = useCollections(_nestingLevel);
|
||||||
|
|
||||||
|
if (result.ok()) {
|
||||||
|
// all valid
|
||||||
|
if (_nestingLevel == 0) {
|
||||||
|
updateStatus(transaction::Status::RUNNING);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// something is wrong
|
||||||
|
if (_nestingLevel == 0) {
|
||||||
|
updateStatus(transaction::Status::ABORTED);
|
||||||
|
}
|
||||||
|
|
||||||
|
// free what we have got so far
|
||||||
|
unuseCollections(_nestingLevel);
|
||||||
|
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (_nestingLevel == 0) {
|
||||||
// get a new id
|
// get a new id
|
||||||
_id = TRI_NewTickServer();
|
_id = TRI_NewTickServer();
|
||||||
|
|
||||||
|
@ -133,23 +151,6 @@ Result RocksDBTransactionState::beginTransaction(transaction::Hints hints) {
|
||||||
TRI_ASSERT(_status == transaction::Status::RUNNING);
|
TRI_ASSERT(_status == transaction::Status::RUNNING);
|
||||||
}
|
}
|
||||||
|
|
||||||
Result result = useCollections(_nestingLevel);
|
|
||||||
|
|
||||||
if (result.ok()) {
|
|
||||||
// all valid
|
|
||||||
if (_nestingLevel == 0) {
|
|
||||||
updateStatus(transaction::Status::RUNNING);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// something is wrong
|
|
||||||
if (_nestingLevel == 0) {
|
|
||||||
updateStatus(transaction::Status::ABORTED);
|
|
||||||
}
|
|
||||||
|
|
||||||
// free what we have got so far
|
|
||||||
unuseCollections(_nestingLevel);
|
|
||||||
}
|
|
||||||
|
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -82,6 +82,10 @@ class TransactionCollection {
|
||||||
virtual void release() = 0;
|
virtual void release() = 0;
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
|
inline bool isExclusive(AccessMode::Type type) const {
|
||||||
|
return (type == AccessMode::Type::EXCLUSIVE);
|
||||||
|
}
|
||||||
|
|
||||||
inline bool isWrite(AccessMode::Type type) const {
|
inline bool isWrite(AccessMode::Type type) const {
|
||||||
return (type == AccessMode::Type::WRITE || type == AccessMode::Type::EXCLUSIVE);
|
return (type == AccessMode::Type::WRITE || type == AccessMode::Type::EXCLUSIVE);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue