mirror of https://gitee.com/bigwinds/arangodb
Moved CollectionRead/WriteLocker to MMFiles. Also moved cleanupIndexes to MMFilesCollection
This commit is contained in:
parent
dffd9fc807
commit
50368b9985
|
@ -29,6 +29,7 @@
|
|||
#include "Basics/WriteLocker.h"
|
||||
#include "Basics/files.h"
|
||||
#include "Logger/Logger.h"
|
||||
#include "MMFiles/MMFilesCollection.h"
|
||||
#include "StorageEngine/EngineSelectorFeature.h"
|
||||
#include "StorageEngine/StorageEngine.h"
|
||||
#include "Utils/CursorRepository.h"
|
||||
|
@ -100,7 +101,9 @@ void MMFilesCleanupThread::run() {
|
|||
|
||||
// clean indexes?
|
||||
if (iterations % cleanupIndexIterations() == 0 && status != TRI_VOC_COL_STATUS_DELETED) {
|
||||
collection->cleanupIndexes();
|
||||
auto physical = static_cast<MMFilesCollection*>(collection->getPhysical());
|
||||
TRI_ASSERT(physical != nullptr);
|
||||
physical->cleanupIndexes();
|
||||
}
|
||||
|
||||
cleanupCollection(collection);
|
||||
|
|
|
@ -32,6 +32,8 @@
|
|||
#include "Basics/process-utils.h"
|
||||
#include "Cluster/ClusterMethods.h"
|
||||
#include "Logger/Logger.h"
|
||||
#include "MMFiles/MMFilesCollectionReadLocker.h"
|
||||
#include "MMFiles/MMFilesCollectionWriteLocker.h"
|
||||
#include "MMFiles/MMFilesDatafile.h"
|
||||
#include "MMFiles/MMFilesDatafileHelper.h"
|
||||
#include "MMFiles/MMFilesDocumentOperation.h"
|
||||
|
@ -46,8 +48,6 @@
|
|||
#include "Transaction/Helpers.h"
|
||||
#include "Transaction/Methods.h"
|
||||
#include "Utils/CollectionNameResolver.h"
|
||||
#include "Utils/CollectionReadLocker.h"
|
||||
#include "Utils/CollectionWriteLocker.h"
|
||||
#include "Utils/OperationOptions.h"
|
||||
#include "Utils/SingleCollectionTransaction.h"
|
||||
#include "Utils/StandaloneTransactionContext.h"
|
||||
|
@ -1160,8 +1160,7 @@ int MMFilesCollection::read(transaction::Methods* trx, VPackSlice const key,
|
|||
|
||||
bool const useDeadlockDetector =
|
||||
(lock && !trx->isSingleOperationTransaction());
|
||||
CollectionReadLocker collectionLocker(_logicalCollection, useDeadlockDetector,
|
||||
lock);
|
||||
MMFilesCollectionReadLocker collectionLocker(this, useDeadlockDetector, lock);
|
||||
|
||||
int res = lookupDocument(trx, key, result);
|
||||
|
||||
|
@ -1173,6 +1172,289 @@ int MMFilesCollection::read(transaction::Methods* trx, VPackSlice const key,
|
|||
return TRI_ERROR_NO_ERROR;
|
||||
}
|
||||
|
||||
/// @brief garbage-collect a collection's indexes
|
||||
int MMFilesCollection::cleanupIndexes() {
|
||||
int res = TRI_ERROR_NO_ERROR;
|
||||
|
||||
// cleaning indexes is expensive, so only do it if the flag is set for the
|
||||
// collection
|
||||
// TODO FIXME
|
||||
if (_logicalCollection->_cleanupIndexes > 0) {
|
||||
WRITE_LOCKER(writeLocker, _idxLock);
|
||||
auto indexes = _logicalCollection->getIndexes();
|
||||
for (auto& idx : indexes) {
|
||||
if (idx->type() == arangodb::Index::TRI_IDX_TYPE_FULLTEXT_INDEX) {
|
||||
res = idx->cleanup();
|
||||
|
||||
if (res != TRI_ERROR_NO_ERROR) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
|
||||
|
||||
/// @brief read locks a collection, with a timeout (in µseconds)
|
||||
int MMFilesCollection::beginReadTimed(bool useDeadlockDetector,
|
||||
double timeout) {
|
||||
if (transaction::Methods::_makeNolockHeaders != nullptr) {
|
||||
auto it = transaction::Methods::_makeNolockHeaders->find(
|
||||
_logicalCollection->name());
|
||||
if (it != transaction::Methods::_makeNolockHeaders->end()) {
|
||||
// do not lock by command
|
||||
// LOCKING-DEBUG
|
||||
// std::cout << "BeginReadTimed blocked: " << _name <<
|
||||
// std::endl;
|
||||
return TRI_ERROR_NO_ERROR;
|
||||
}
|
||||
}
|
||||
|
||||
// LOCKING-DEBUG
|
||||
// std::cout << "BeginReadTimed: " << _name << std::endl;
|
||||
int iterations = 0;
|
||||
bool wasBlocked = false;
|
||||
double end = 0.0;
|
||||
|
||||
while (true) {
|
||||
TRY_READ_LOCKER(locker, _idxLock);
|
||||
|
||||
if (locker.isLocked()) {
|
||||
// when we are here, we've got the read lock
|
||||
if (useDeadlockDetector) {
|
||||
_logicalCollection->vocbase()->_deadlockDetector.addReader(_logicalCollection, wasBlocked);
|
||||
}
|
||||
|
||||
// keep lock and exit loop
|
||||
locker.steal();
|
||||
return TRI_ERROR_NO_ERROR;
|
||||
}
|
||||
|
||||
if (useDeadlockDetector) {
|
||||
try {
|
||||
if (!wasBlocked) {
|
||||
// insert reader
|
||||
wasBlocked = true;
|
||||
if (_logicalCollection->vocbase()->_deadlockDetector.setReaderBlocked(
|
||||
_logicalCollection) == TRI_ERROR_DEADLOCK) {
|
||||
// deadlock
|
||||
LOG_TOPIC(TRACE, arangodb::Logger::FIXME) << "deadlock detected while trying to acquire read-lock "
|
||||
"on collection '"
|
||||
<< _logicalCollection->name() << "'";
|
||||
return TRI_ERROR_DEADLOCK;
|
||||
}
|
||||
LOG_TOPIC(TRACE, arangodb::Logger::FIXME) << "waiting for read-lock on collection '" << _logicalCollection->name()
|
||||
<< "'";
|
||||
// fall-through intentional
|
||||
} else if (++iterations >= 5) {
|
||||
// periodically check for deadlocks
|
||||
TRI_ASSERT(wasBlocked);
|
||||
iterations = 0;
|
||||
if (_logicalCollection->vocbase()->_deadlockDetector.detectDeadlock(_logicalCollection, false) ==
|
||||
TRI_ERROR_DEADLOCK) {
|
||||
// deadlock
|
||||
_logicalCollection->vocbase()->_deadlockDetector.unsetReaderBlocked(_logicalCollection);
|
||||
LOG_TOPIC(TRACE, arangodb::Logger::FIXME) << "deadlock detected while trying to acquire read-lock "
|
||||
"on collection '"
|
||||
<< _logicalCollection->name() << "'";
|
||||
return TRI_ERROR_DEADLOCK;
|
||||
}
|
||||
}
|
||||
} catch (...) {
|
||||
// clean up!
|
||||
if (wasBlocked) {
|
||||
_logicalCollection->vocbase()->_deadlockDetector.unsetReaderBlocked(
|
||||
_logicalCollection);
|
||||
}
|
||||
// always exit
|
||||
return TRI_ERROR_OUT_OF_MEMORY;
|
||||
}
|
||||
}
|
||||
|
||||
if (end == 0.0) {
|
||||
// set end time for lock waiting
|
||||
if (timeout <= 0.0) {
|
||||
timeout = 15.0 * 60.0;
|
||||
}
|
||||
end = TRI_microtime() + timeout;
|
||||
TRI_ASSERT(end > 0.0);
|
||||
}
|
||||
|
||||
std::this_thread::yield();
|
||||
|
||||
TRI_ASSERT(end > 0.0);
|
||||
|
||||
if (TRI_microtime() > end) {
|
||||
if (useDeadlockDetector) {
|
||||
_logicalCollection->vocbase()->_deadlockDetector.unsetReaderBlocked(_logicalCollection);
|
||||
}
|
||||
LOG_TOPIC(TRACE, arangodb::Logger::FIXME) << "timed out waiting for read-lock on collection '" << _logicalCollection->name()
|
||||
<< "'";
|
||||
return TRI_ERROR_LOCK_TIMEOUT;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// @brief write locks a collection, with a timeout
|
||||
int MMFilesCollection::beginWriteTimed(bool useDeadlockDetector,
|
||||
double timeout) {
|
||||
if (transaction::Methods::_makeNolockHeaders != nullptr) {
|
||||
auto it = transaction::Methods::_makeNolockHeaders->find(_logicalCollection->name());
|
||||
if (it != transaction::Methods::_makeNolockHeaders->end()) {
|
||||
// do not lock by command
|
||||
// LOCKING-DEBUG
|
||||
// std::cout << "BeginWriteTimed blocked: " << _name <<
|
||||
// std::endl;
|
||||
return TRI_ERROR_NO_ERROR;
|
||||
}
|
||||
}
|
||||
|
||||
// LOCKING-DEBUG
|
||||
// std::cout << "BeginWriteTimed: " << document->_info._name << std::endl;
|
||||
int iterations = 0;
|
||||
bool wasBlocked = false;
|
||||
double end = 0.0;
|
||||
|
||||
while (true) {
|
||||
TRY_WRITE_LOCKER(locker, _idxLock);
|
||||
|
||||
if (locker.isLocked()) {
|
||||
// register writer
|
||||
if (useDeadlockDetector) {
|
||||
_logicalCollection->vocbase()->_deadlockDetector.addWriter(_logicalCollection, wasBlocked);
|
||||
}
|
||||
// keep lock and exit loop
|
||||
locker.steal();
|
||||
return TRI_ERROR_NO_ERROR;
|
||||
}
|
||||
|
||||
if (useDeadlockDetector) {
|
||||
try {
|
||||
if (!wasBlocked) {
|
||||
// insert writer
|
||||
wasBlocked = true;
|
||||
if (_logicalCollection->vocbase()->_deadlockDetector.setWriterBlocked(_logicalCollection) ==
|
||||
TRI_ERROR_DEADLOCK) {
|
||||
// deadlock
|
||||
LOG_TOPIC(TRACE, arangodb::Logger::FIXME) << "deadlock detected while trying to acquire "
|
||||
"write-lock on collection '"
|
||||
<< _logicalCollection->name() << "'";
|
||||
return TRI_ERROR_DEADLOCK;
|
||||
}
|
||||
LOG_TOPIC(TRACE, arangodb::Logger::FIXME)
|
||||
<< "waiting for write-lock on collection '"
|
||||
<< _logicalCollection->name() << "'";
|
||||
} else if (++iterations >= 5) {
|
||||
// periodically check for deadlocks
|
||||
TRI_ASSERT(wasBlocked);
|
||||
iterations = 0;
|
||||
if (_logicalCollection->vocbase()->_deadlockDetector.detectDeadlock(
|
||||
_logicalCollection, true) == TRI_ERROR_DEADLOCK) {
|
||||
// deadlock
|
||||
_logicalCollection->vocbase()->_deadlockDetector.unsetWriterBlocked(_logicalCollection);
|
||||
LOG_TOPIC(TRACE, arangodb::Logger::FIXME)
|
||||
<< "deadlock detected while trying to acquire "
|
||||
"write-lock on collection '"
|
||||
<< _logicalCollection->name() << "'";
|
||||
return TRI_ERROR_DEADLOCK;
|
||||
}
|
||||
}
|
||||
} catch (...) {
|
||||
// clean up!
|
||||
if (wasBlocked) {
|
||||
_logicalCollection->vocbase()->_deadlockDetector.unsetWriterBlocked(_logicalCollection);
|
||||
}
|
||||
// always exit
|
||||
return TRI_ERROR_OUT_OF_MEMORY;
|
||||
}
|
||||
}
|
||||
|
||||
std::this_thread::yield();
|
||||
|
||||
if (end == 0.0) {
|
||||
// set end time for lock waiting
|
||||
if (timeout <= 0.0) {
|
||||
timeout = 15.0 * 60.0;
|
||||
}
|
||||
end = TRI_microtime() + timeout;
|
||||
TRI_ASSERT(end > 0.0);
|
||||
}
|
||||
|
||||
std::this_thread::yield();
|
||||
|
||||
TRI_ASSERT(end > 0.0);
|
||||
|
||||
if (TRI_microtime() > end) {
|
||||
if (useDeadlockDetector) {
|
||||
_logicalCollection->vocbase()->_deadlockDetector.unsetWriterBlocked(
|
||||
_logicalCollection);
|
||||
}
|
||||
LOG_TOPIC(TRACE, arangodb::Logger::FIXME) << "timed out waiting for write-lock on collection '" << _logicalCollection->name()
|
||||
<< "'";
|
||||
return TRI_ERROR_LOCK_TIMEOUT;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// @brief read unlocks a collection
|
||||
int MMFilesCollection::endRead(bool useDeadlockDetector) {
|
||||
if (transaction::Methods::_makeNolockHeaders != nullptr) {
|
||||
auto it = transaction::Methods::_makeNolockHeaders->find(_logicalCollection->name());
|
||||
if (it != transaction::Methods::_makeNolockHeaders->end()) {
|
||||
// do not lock by command
|
||||
// LOCKING-DEBUG
|
||||
// std::cout << "EndRead blocked: " << _name << std::endl;
|
||||
return TRI_ERROR_NO_ERROR;
|
||||
}
|
||||
}
|
||||
|
||||
if (useDeadlockDetector) {
|
||||
// unregister reader
|
||||
try {
|
||||
_logicalCollection->vocbase()->_deadlockDetector.unsetReader(_logicalCollection);
|
||||
} catch (...) {
|
||||
}
|
||||
}
|
||||
|
||||
// LOCKING-DEBUG
|
||||
// std::cout << "EndRead: " << _name << std::endl;
|
||||
_idxLock.unlockRead();
|
||||
|
||||
return TRI_ERROR_NO_ERROR;
|
||||
}
|
||||
|
||||
/// @brief write unlocks a collection
|
||||
int MMFilesCollection::endWrite(bool useDeadlockDetector) {
|
||||
if (transaction::Methods::_makeNolockHeaders != nullptr) {
|
||||
auto it = transaction::Methods::_makeNolockHeaders->find(_logicalCollection->name());
|
||||
if (it != transaction::Methods::_makeNolockHeaders->end()) {
|
||||
// do not lock by command
|
||||
// LOCKING-DEBUG
|
||||
// std::cout << "EndWrite blocked: " << _name <<
|
||||
// std::endl;
|
||||
return TRI_ERROR_NO_ERROR;
|
||||
}
|
||||
}
|
||||
|
||||
if (useDeadlockDetector) {
|
||||
// unregister writer
|
||||
try {
|
||||
_logicalCollection->vocbase()->_deadlockDetector.unsetWriter(_logicalCollection);
|
||||
} catch (...) {
|
||||
// must go on here to unlock the lock
|
||||
}
|
||||
}
|
||||
|
||||
// LOCKING-DEBUG
|
||||
// std::cout << "EndWrite: " << _name << std::endl;
|
||||
_idxLock.unlockWrite();
|
||||
|
||||
return TRI_ERROR_NO_ERROR;
|
||||
}
|
||||
|
||||
void MMFilesCollection::truncate(transaction::Methods* trx, OperationOptions& options) {
|
||||
auto primaryIndex = _logicalCollection->primaryIndex();
|
||||
|
||||
|
@ -1264,8 +1546,8 @@ int MMFilesCollection::insert(transaction::Methods* trx,
|
|||
try {
|
||||
// TODO Do we use the CollectionLocker on LogicalCollections
|
||||
// or do we use it on the SE specific one?
|
||||
arangodb::CollectionWriteLocker collectionLocker(
|
||||
_logicalCollection, useDeadlockDetector, lock);
|
||||
arangodb::MMFilesCollectionWriteLocker collectionLocker(
|
||||
this, useDeadlockDetector, lock);
|
||||
|
||||
try {
|
||||
// insert into indexes
|
||||
|
@ -1557,8 +1839,8 @@ int MMFilesCollection::update(arangodb::transaction::Methods* trx,
|
|||
|
||||
bool const useDeadlockDetector =
|
||||
(lock && !trx->isSingleOperationTransaction());
|
||||
arangodb::CollectionWriteLocker collectionLocker(_logicalCollection,
|
||||
useDeadlockDetector, lock);
|
||||
arangodb::MMFilesCollectionWriteLocker collectionLocker(
|
||||
this, useDeadlockDetector, lock);
|
||||
|
||||
// get the previous revision
|
||||
int res = lookupDocument(trx, key, previous);
|
||||
|
@ -1689,8 +1971,8 @@ int MMFilesCollection::replace(
|
|||
|
||||
bool const useDeadlockDetector =
|
||||
(lock && !trx->isSingleOperationTransaction());
|
||||
arangodb::CollectionWriteLocker collectionLocker(_logicalCollection, useDeadlockDetector,
|
||||
lock);
|
||||
arangodb::MMFilesCollectionWriteLocker collectionLocker(
|
||||
this, useDeadlockDetector, lock);
|
||||
|
||||
// get the previous revision
|
||||
int res = lookupDocument(trx, key, previous);
|
||||
|
@ -1847,8 +2129,8 @@ int MMFilesCollection::remove(arangodb::transaction::Methods* trx, VPackSlice co
|
|||
|
||||
bool const useDeadlockDetector =
|
||||
(lock && !trx->isSingleOperationTransaction());
|
||||
arangodb::CollectionWriteLocker collectionLocker(_logicalCollection,
|
||||
useDeadlockDetector, lock);
|
||||
arangodb::MMFilesCollectionWriteLocker collectionLocker(
|
||||
this, useDeadlockDetector, lock);
|
||||
|
||||
// get the previous revision
|
||||
int res = lookupDocument(trx, key, previous);
|
||||
|
|
|
@ -181,6 +181,21 @@ class MMFilesCollection final : public PhysicalCollection {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
int cleanupIndexes();
|
||||
|
||||
////////////////////////////////////
|
||||
// -- SECTION Locking --
|
||||
///////////////////////////////////
|
||||
|
||||
int beginReadTimed(bool useDeadlockDetector, double timeout = 0.0);
|
||||
|
||||
int beginWriteTimed(bool useDeadlockDetector, double timeout = 0.0);
|
||||
|
||||
int endRead(bool useDeadlockDetector);
|
||||
|
||||
int endWrite(bool useDeadlockDetector);
|
||||
|
||||
////////////////////////////////////
|
||||
// -- SECTION DML Operations --
|
||||
///////////////////////////////////
|
||||
|
@ -321,6 +336,9 @@ class MMFilesCollection final : public PhysicalCollection {
|
|||
private:
|
||||
mutable arangodb::Ditches _ditches;
|
||||
|
||||
// lock protecting the indexes
|
||||
mutable basics::ReadWriteLock _idxLock;
|
||||
|
||||
arangodb::basics::ReadWriteLock _filesLock;
|
||||
std::vector<MMFilesDatafile*> _datafiles; // all datafiles
|
||||
std::vector<MMFilesDatafile*> _journals; // all journals
|
||||
|
|
|
@ -21,23 +21,27 @@
|
|||
/// @author Jan Steemann
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
#ifndef ARANGOD_UTILS_COLLECTION_READ_LOCKER_H
|
||||
#define ARANGOD_UTILS_COLLECTION_READ_LOCKER_H 1
|
||||
#ifndef ARANGOD_MMFILES_MMFILES_COLLECTION_READ_LOCKER_H
|
||||
#define ARANGOD_MMFILES_MMFILES_COLLECTION_READ_LOCKER_H 1
|
||||
|
||||
#include "Basics/Common.h"
|
||||
#include "Basics/Exceptions.h"
|
||||
#include "VocBase/LogicalCollection.h"
|
||||
#include "MMFiles/MMFilesCollection.h"
|
||||
|
||||
namespace arangodb {
|
||||
|
||||
class CollectionReadLocker {
|
||||
class MMFilesCollectionReadLocker {
|
||||
public:
|
||||
CollectionReadLocker(CollectionReadLocker const&) = delete;
|
||||
CollectionReadLocker& operator=(CollectionReadLocker const&) = delete;
|
||||
MMFilesCollectionReadLocker(MMFilesCollectionReadLocker const&) = delete;
|
||||
MMFilesCollectionReadLocker& operator=(MMFilesCollectionReadLocker const&) =
|
||||
delete;
|
||||
|
||||
/// @brief create the locker
|
||||
CollectionReadLocker(LogicalCollection* collection, bool useDeadlockDetector, bool doLock)
|
||||
: _collection(collection), _useDeadlockDetector(useDeadlockDetector), _doLock(false) {
|
||||
MMFilesCollectionReadLocker(MMFilesCollection* collection,
|
||||
bool useDeadlockDetector, bool doLock)
|
||||
: _collection(collection),
|
||||
_useDeadlockDetector(useDeadlockDetector),
|
||||
_doLock(false) {
|
||||
if (doLock) {
|
||||
int res = _collection->beginReadTimed(_useDeadlockDetector);
|
||||
|
||||
|
@ -50,7 +54,7 @@ class CollectionReadLocker {
|
|||
}
|
||||
|
||||
/// @brief destroy the locker
|
||||
~CollectionReadLocker() { unlock(); }
|
||||
~MMFilesCollectionReadLocker() { unlock(); }
|
||||
|
||||
/// @brief release the lock
|
||||
inline void unlock() {
|
||||
|
@ -62,8 +66,8 @@ class CollectionReadLocker {
|
|||
|
||||
private:
|
||||
/// @brief collection pointer
|
||||
LogicalCollection* _collection;
|
||||
|
||||
MMFilesCollection* _collection;
|
||||
|
||||
/// @brief whether or not to use the deadlock detector
|
||||
bool const _useDeadlockDetector;
|
||||
|
|
@ -21,23 +21,26 @@
|
|||
/// @author Jan Steemann
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
#ifndef ARANGOD_UTILS_COLLECTION_WRITE_LOCKER_H
|
||||
#define ARANGOD_UTILS_COLLECTION_WRITE_LOCKER_H 1
|
||||
#ifndef ARANGOD_MMFILES_MMFILES_COLLECTION_WRITE_LOCKER_H
|
||||
#define ARANGOD_MMFILES_MMFILES_COLLECTION_WRITE_LOCKER_H 1
|
||||
|
||||
#include "Basics/Common.h"
|
||||
#include "Basics/Exceptions.h"
|
||||
#include "VocBase/LogicalCollection.h"
|
||||
#include "MMFiles/MMFilesCollection.h"
|
||||
|
||||
namespace arangodb {
|
||||
|
||||
class CollectionWriteLocker {
|
||||
class MMFilesCollectionWriteLocker {
|
||||
public:
|
||||
CollectionWriteLocker(CollectionWriteLocker const&) = delete;
|
||||
CollectionWriteLocker& operator=(CollectionWriteLocker const&) = delete;
|
||||
MMFilesCollectionWriteLocker(MMFilesCollectionWriteLocker const&) = delete;
|
||||
MMFilesCollectionWriteLocker& operator=(MMFilesCollectionWriteLocker const&) = delete;
|
||||
|
||||
/// @brief create the locker
|
||||
CollectionWriteLocker(arangodb::LogicalCollection* collection, bool useDeadlockDetector, bool doLock)
|
||||
: _collection(collection), _useDeadlockDetector(useDeadlockDetector), _doLock(false) {
|
||||
MMFilesCollectionWriteLocker(arangodb::MMFilesCollection* collection,
|
||||
bool useDeadlockDetector, bool doLock)
|
||||
: _collection(collection),
|
||||
_useDeadlockDetector(useDeadlockDetector),
|
||||
_doLock(false) {
|
||||
if (doLock) {
|
||||
int res = _collection->beginWriteTimed(_useDeadlockDetector);
|
||||
|
||||
|
@ -50,7 +53,7 @@ class CollectionWriteLocker {
|
|||
}
|
||||
|
||||
/// @brief destroy the locker
|
||||
~CollectionWriteLocker() { unlock(); }
|
||||
~MMFilesCollectionWriteLocker() { unlock(); }
|
||||
|
||||
/// @brief release the lock
|
||||
inline void unlock() {
|
||||
|
@ -62,7 +65,7 @@ class CollectionWriteLocker {
|
|||
|
||||
private:
|
||||
/// @brief collection pointer
|
||||
arangodb::LogicalCollection* _collection;
|
||||
arangodb::MMFilesCollection* _collection;
|
||||
|
||||
/// @brief whether or not to use the deadlock detector
|
||||
bool const _useDeadlockDetector;
|
|
@ -309,8 +309,10 @@ MMFilesCompactorThread::CompactionInitialContext MMFilesCompactorThread::getComp
|
|||
|
||||
bool ok;
|
||||
{
|
||||
auto physical = static_cast<MMFilesCollection*>(context._collection->getPhysical());
|
||||
TRI_ASSERT(physical != nullptr);
|
||||
bool const useDeadlockDetector = false;
|
||||
int res = collection->beginReadTimed(useDeadlockDetector, 86400.0);
|
||||
int res = physical->beginReadTimed(useDeadlockDetector, 86400.0);
|
||||
|
||||
if (res != TRI_ERROR_NO_ERROR) {
|
||||
ok = false;
|
||||
|
@ -321,7 +323,7 @@ MMFilesCompactorThread::CompactionInitialContext MMFilesCompactorThread::getComp
|
|||
} catch (...) {
|
||||
ok = false;
|
||||
}
|
||||
collection->endRead(useDeadlockDetector);
|
||||
physical->endRead(useDeadlockDetector);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -328,6 +328,10 @@ int MMFilesTransactionCollection::doLock(AccessMode::Type type, int nestingLevel
|
|||
|
||||
LogicalCollection* collection = _collection;
|
||||
TRI_ASSERT(collection != nullptr);
|
||||
|
||||
auto physical = static_cast<MMFilesCollection*>(collection->getPhysical());
|
||||
TRI_ASSERT(physical != nullptr);
|
||||
|
||||
double timeout = _transaction->_timeout;
|
||||
if (_transaction->_hints.has(transaction::Hints::Hint::TRY_LOCK)) {
|
||||
// give up early if we cannot acquire the lock instantly
|
||||
|
@ -339,10 +343,10 @@ int MMFilesTransactionCollection::doLock(AccessMode::Type type, int nestingLevel
|
|||
int res;
|
||||
if (!isWrite(type)) {
|
||||
LOG_TRX(_transaction, nestingLevel) << "read-locking collection " << _cid;
|
||||
res = collection->beginReadTimed(useDeadlockDetector, timeout);
|
||||
res = physical->beginReadTimed(useDeadlockDetector, timeout);
|
||||
} else { // WRITE or EXCLUSIVE
|
||||
LOG_TRX(_transaction, nestingLevel) << "write-locking collection " << _cid;
|
||||
res = collection->beginWriteTimed(useDeadlockDetector, timeout);
|
||||
res = physical->beginWriteTimed(useDeadlockDetector, timeout);
|
||||
}
|
||||
|
||||
if (res == TRI_ERROR_NO_ERROR) {
|
||||
|
@ -394,12 +398,16 @@ int MMFilesTransactionCollection::doUnlock(AccessMode::Type type, int nestingLev
|
|||
|
||||
LogicalCollection* collection = _collection;
|
||||
TRI_ASSERT(collection != nullptr);
|
||||
|
||||
auto physical = static_cast<MMFilesCollection*>(collection->getPhysical());
|
||||
TRI_ASSERT(physical != nullptr);
|
||||
|
||||
if (!isWrite(_lockType)) {
|
||||
LOG_TRX(_transaction, nestingLevel) << "read-unlocking collection " << _cid;
|
||||
collection->endRead(useDeadlockDetector);
|
||||
physical->endRead(useDeadlockDetector);
|
||||
} else { // WRITE or EXCLUSIVE
|
||||
LOG_TRX(_transaction, nestingLevel) << "write-unlocking collection " << _cid;
|
||||
collection->endWrite(useDeadlockDetector);
|
||||
physical->endWrite(useDeadlockDetector);
|
||||
}
|
||||
|
||||
_lockType = AccessMode::Type::NONE;
|
||||
|
|
|
@ -1885,29 +1885,6 @@ void LogicalCollection::addIndexCoordinator(
|
|||
}
|
||||
}
|
||||
|
||||
/// @brief garbage-collect a collection's indexes
|
||||
int LogicalCollection::cleanupIndexes() {
|
||||
int res = TRI_ERROR_NO_ERROR;
|
||||
|
||||
// cleaning indexes is expensive, so only do it if the flag is set for the
|
||||
// collection
|
||||
if (_cleanupIndexes > 0) {
|
||||
WRITE_LOCKER(writeLocker, _idxLock);
|
||||
|
||||
for (auto& idx : _indexes) {
|
||||
if (idx->type() == arangodb::Index::TRI_IDX_TYPE_FULLTEXT_INDEX) {
|
||||
res = idx->cleanup();
|
||||
|
||||
if (res != TRI_ERROR_NO_ERROR) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
/// @brief reads an element from the document collection
|
||||
int LogicalCollection::read(transaction::Methods* trx, std::string const& key,
|
||||
ManagedDocumentResult& result, bool lock) {
|
||||
|
@ -2218,259 +2195,6 @@ void LogicalCollection::fillIndex(
|
|||
}
|
||||
}
|
||||
|
||||
/// @brief read unlocks a collection
|
||||
int LogicalCollection::endRead(bool useDeadlockDetector) {
|
||||
if (transaction::Methods::_makeNolockHeaders != nullptr) {
|
||||
auto it = transaction::Methods::_makeNolockHeaders->find(name());
|
||||
if (it != transaction::Methods::_makeNolockHeaders->end()) {
|
||||
// do not lock by command
|
||||
// LOCKING-DEBUG
|
||||
// std::cout << "EndRead blocked: " << _name << std::endl;
|
||||
return TRI_ERROR_NO_ERROR;
|
||||
}
|
||||
}
|
||||
|
||||
if (useDeadlockDetector) {
|
||||
// unregister reader
|
||||
try {
|
||||
_vocbase->_deadlockDetector.unsetReader(this);
|
||||
} catch (...) {
|
||||
}
|
||||
}
|
||||
|
||||
// LOCKING-DEBUG
|
||||
// std::cout << "EndRead: " << _name << std::endl;
|
||||
_idxLock.unlockRead();
|
||||
|
||||
return TRI_ERROR_NO_ERROR;
|
||||
}
|
||||
|
||||
/// @brief write unlocks a collection
|
||||
int LogicalCollection::endWrite(bool useDeadlockDetector) {
|
||||
if (transaction::Methods::_makeNolockHeaders != nullptr) {
|
||||
auto it = transaction::Methods::_makeNolockHeaders->find(name());
|
||||
if (it != transaction::Methods::_makeNolockHeaders->end()) {
|
||||
// do not lock by command
|
||||
// LOCKING-DEBUG
|
||||
// std::cout << "EndWrite blocked: " << _name <<
|
||||
// std::endl;
|
||||
return TRI_ERROR_NO_ERROR;
|
||||
}
|
||||
}
|
||||
|
||||
if (useDeadlockDetector) {
|
||||
// unregister writer
|
||||
try {
|
||||
_vocbase->_deadlockDetector.unsetWriter(this);
|
||||
} catch (...) {
|
||||
// must go on here to unlock the lock
|
||||
}
|
||||
}
|
||||
|
||||
// LOCKING-DEBUG
|
||||
// std::cout << "EndWrite: " << _name << std::endl;
|
||||
_idxLock.unlockWrite();
|
||||
|
||||
return TRI_ERROR_NO_ERROR;
|
||||
}
|
||||
|
||||
/// @brief read locks a collection, with a timeout (in µseconds)
|
||||
int LogicalCollection::beginReadTimed(bool useDeadlockDetector,
|
||||
double timeout) {
|
||||
if (transaction::Methods::_makeNolockHeaders != nullptr) {
|
||||
auto it = transaction::Methods::_makeNolockHeaders->find(name());
|
||||
if (it != transaction::Methods::_makeNolockHeaders->end()) {
|
||||
// do not lock by command
|
||||
// LOCKING-DEBUG
|
||||
// std::cout << "BeginReadTimed blocked: " << _name <<
|
||||
// std::endl;
|
||||
return TRI_ERROR_NO_ERROR;
|
||||
}
|
||||
}
|
||||
|
||||
// LOCKING-DEBUG
|
||||
// std::cout << "BeginReadTimed: " << _name << std::endl;
|
||||
int iterations = 0;
|
||||
bool wasBlocked = false;
|
||||
double end = 0.0;
|
||||
|
||||
while (true) {
|
||||
TRY_READ_LOCKER(locker, _idxLock);
|
||||
|
||||
if (locker.isLocked()) {
|
||||
// when we are here, we've got the read lock
|
||||
if (useDeadlockDetector) {
|
||||
_vocbase->_deadlockDetector.addReader(this, wasBlocked);
|
||||
}
|
||||
|
||||
// keep lock and exit loop
|
||||
locker.steal();
|
||||
return TRI_ERROR_NO_ERROR;
|
||||
}
|
||||
|
||||
if (useDeadlockDetector) {
|
||||
try {
|
||||
if (!wasBlocked) {
|
||||
// insert reader
|
||||
wasBlocked = true;
|
||||
if (_vocbase->_deadlockDetector.setReaderBlocked(this) ==
|
||||
TRI_ERROR_DEADLOCK) {
|
||||
// deadlock
|
||||
LOG_TOPIC(TRACE, arangodb::Logger::FIXME) << "deadlock detected while trying to acquire read-lock "
|
||||
"on collection '"
|
||||
<< name() << "'";
|
||||
return TRI_ERROR_DEADLOCK;
|
||||
}
|
||||
LOG_TOPIC(TRACE, arangodb::Logger::FIXME) << "waiting for read-lock on collection '" << name()
|
||||
<< "'";
|
||||
// fall-through intentional
|
||||
} else if (++iterations >= 5) {
|
||||
// periodically check for deadlocks
|
||||
TRI_ASSERT(wasBlocked);
|
||||
iterations = 0;
|
||||
if (_vocbase->_deadlockDetector.detectDeadlock(this, false) ==
|
||||
TRI_ERROR_DEADLOCK) {
|
||||
// deadlock
|
||||
_vocbase->_deadlockDetector.unsetReaderBlocked(this);
|
||||
LOG_TOPIC(TRACE, arangodb::Logger::FIXME) << "deadlock detected while trying to acquire read-lock "
|
||||
"on collection '"
|
||||
<< name() << "'";
|
||||
return TRI_ERROR_DEADLOCK;
|
||||
}
|
||||
}
|
||||
} catch (...) {
|
||||
// clean up!
|
||||
if (wasBlocked) {
|
||||
_vocbase->_deadlockDetector.unsetReaderBlocked(this);
|
||||
}
|
||||
// always exit
|
||||
return TRI_ERROR_OUT_OF_MEMORY;
|
||||
}
|
||||
}
|
||||
|
||||
if (end == 0.0) {
|
||||
// set end time for lock waiting
|
||||
if (timeout <= 0.0) {
|
||||
timeout = 15.0 * 60.0;
|
||||
}
|
||||
end = TRI_microtime() + timeout;
|
||||
TRI_ASSERT(end > 0.0);
|
||||
}
|
||||
|
||||
std::this_thread::yield();
|
||||
|
||||
TRI_ASSERT(end > 0.0);
|
||||
|
||||
if (TRI_microtime() > end) {
|
||||
if (useDeadlockDetector) {
|
||||
_vocbase->_deadlockDetector.unsetReaderBlocked(this);
|
||||
}
|
||||
LOG_TOPIC(TRACE, arangodb::Logger::FIXME) << "timed out waiting for read-lock on collection '" << name()
|
||||
<< "'";
|
||||
return TRI_ERROR_LOCK_TIMEOUT;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// @brief write locks a collection, with a timeout
|
||||
int LogicalCollection::beginWriteTimed(bool useDeadlockDetector,
|
||||
double timeout) {
|
||||
if (transaction::Methods::_makeNolockHeaders != nullptr) {
|
||||
auto it = transaction::Methods::_makeNolockHeaders->find(name());
|
||||
if (it != transaction::Methods::_makeNolockHeaders->end()) {
|
||||
// do not lock by command
|
||||
// LOCKING-DEBUG
|
||||
// std::cout << "BeginWriteTimed blocked: " << _name <<
|
||||
// std::endl;
|
||||
return TRI_ERROR_NO_ERROR;
|
||||
}
|
||||
}
|
||||
|
||||
// LOCKING-DEBUG
|
||||
// std::cout << "BeginWriteTimed: " << document->_info._name << std::endl;
|
||||
int iterations = 0;
|
||||
bool wasBlocked = false;
|
||||
double end = 0.0;
|
||||
|
||||
while (true) {
|
||||
TRY_WRITE_LOCKER(locker, _idxLock);
|
||||
|
||||
if (locker.isLocked()) {
|
||||
// register writer
|
||||
if (useDeadlockDetector) {
|
||||
_vocbase->_deadlockDetector.addWriter(this, wasBlocked);
|
||||
}
|
||||
// keep lock and exit loop
|
||||
locker.steal();
|
||||
return TRI_ERROR_NO_ERROR;
|
||||
}
|
||||
|
||||
if (useDeadlockDetector) {
|
||||
try {
|
||||
if (!wasBlocked) {
|
||||
// insert writer
|
||||
wasBlocked = true;
|
||||
if (_vocbase->_deadlockDetector.setWriterBlocked(this) ==
|
||||
TRI_ERROR_DEADLOCK) {
|
||||
// deadlock
|
||||
LOG_TOPIC(TRACE, arangodb::Logger::FIXME) << "deadlock detected while trying to acquire "
|
||||
"write-lock on collection '"
|
||||
<< name() << "'";
|
||||
return TRI_ERROR_DEADLOCK;
|
||||
}
|
||||
LOG_TOPIC(TRACE, arangodb::Logger::FIXME) << "waiting for write-lock on collection '" << name()
|
||||
<< "'";
|
||||
} else if (++iterations >= 5) {
|
||||
// periodically check for deadlocks
|
||||
TRI_ASSERT(wasBlocked);
|
||||
iterations = 0;
|
||||
if (_vocbase->_deadlockDetector.detectDeadlock(this, true) ==
|
||||
TRI_ERROR_DEADLOCK) {
|
||||
// deadlock
|
||||
_vocbase->_deadlockDetector.unsetWriterBlocked(this);
|
||||
LOG_TOPIC(TRACE, arangodb::Logger::FIXME) << "deadlock detected while trying to acquire "
|
||||
"write-lock on collection '"
|
||||
<< name() << "'";
|
||||
return TRI_ERROR_DEADLOCK;
|
||||
}
|
||||
}
|
||||
} catch (...) {
|
||||
// clean up!
|
||||
if (wasBlocked) {
|
||||
_vocbase->_deadlockDetector.unsetWriterBlocked(this);
|
||||
}
|
||||
// always exit
|
||||
return TRI_ERROR_OUT_OF_MEMORY;
|
||||
}
|
||||
}
|
||||
|
||||
std::this_thread::yield();
|
||||
|
||||
if (end == 0.0) {
|
||||
// set end time for lock waiting
|
||||
if (timeout <= 0.0) {
|
||||
timeout = 15.0 * 60.0;
|
||||
}
|
||||
end = TRI_microtime() + timeout;
|
||||
TRI_ASSERT(end > 0.0);
|
||||
}
|
||||
|
||||
std::this_thread::yield();
|
||||
|
||||
TRI_ASSERT(end > 0.0);
|
||||
|
||||
if (TRI_microtime() > end) {
|
||||
if (useDeadlockDetector) {
|
||||
_vocbase->_deadlockDetector.unsetWriterBlocked(this);
|
||||
}
|
||||
LOG_TOPIC(TRACE, arangodb::Logger::FIXME) << "timed out waiting for write-lock on collection '" << name()
|
||||
<< "'";
|
||||
return TRI_ERROR_LOCK_TIMEOUT;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/// @brief creates a new entry in the primary index
|
||||
int LogicalCollection::insertPrimaryIndex(transaction::Methods* trx,
|
||||
TRI_voc_rid_t revisionId,
|
||||
|
|
|
@ -356,8 +356,6 @@ class LogicalCollection {
|
|||
|
||||
bool dropIndex(TRI_idx_iid_t iid, bool writeMarker);
|
||||
|
||||
int cleanupIndexes();
|
||||
|
||||
// SECTION: Index access (local only)
|
||||
|
||||
int read(transaction::Methods*, std::string const&,
|
||||
|
@ -390,13 +388,6 @@ class LogicalCollection {
|
|||
TRI_voc_rid_t newRevisionId,
|
||||
velocypack::Slice const& newDoc);
|
||||
|
||||
// TODO MOVE ME
|
||||
int beginReadTimed(bool useDeadlockDetector, double timeout = 0.0);
|
||||
int beginWriteTimed(bool useDeadlockDetector, double timeout = 0.0);
|
||||
int endRead(bool useDeadlockDetector);
|
||||
int endWrite(bool useDeadlockDetector);
|
||||
// END TODO MOVE ME
|
||||
|
||||
bool readDocument(transaction::Methods*, ManagedDocumentResult& result, DocumentIdentifierToken const& token);
|
||||
bool readDocumentConditional(transaction::Methods*, ManagedDocumentResult& result, DocumentIdentifierToken const& token, TRI_voc_tick_t maxTick, bool excludeWal);
|
||||
|
||||
|
@ -571,10 +562,6 @@ class LogicalCollection {
|
|||
mutable basics::ReadWriteLock
|
||||
_lock; // lock protecting the status and name
|
||||
|
||||
// TODO MOVE ME
|
||||
mutable basics::ReadWriteLock
|
||||
_idxLock; // lock protecting the indexes
|
||||
|
||||
mutable basics::ReadWriteLock
|
||||
_infoLock; // lock protecting the info
|
||||
|
||||
|
|
Loading…
Reference in New Issue