1
0
Fork 0

preparations for locks

This commit is contained in:
jsteemann 2017-04-18 17:27:37 +02:00
parent 2f574a75be
commit 69576ca79e
4 changed files with 157 additions and 10 deletions

View File

@ -26,7 +26,9 @@
#include "Basics/Result.h"
#include "Basics/StaticStrings.h"
#include "Basics/VelocyPackHelper.h"
#include "Basics/WriteLocker.h"
#include "Cluster/ClusterMethods.h"
#include "Cluster/CollectionLockState.h"
#include "Indexes/Index.h"
#include "Indexes/IndexIterator.h"
#include "RestServer/DatabaseFeature.h"
@ -1254,3 +1256,148 @@ void RocksDBCollection::adjustNumberDocuments(int64_t adjustment) {
_numberDocuments += static_cast<uint64_t>(adjustment);
}
}
/// @brief write locks a collection, with a timeout
int RocksDBCollection::beginWriteTimed(bool useDeadlockDetector,
double timeout) {
if (CollectionLockState::_noLockHeaders != nullptr) {
auto it =
CollectionLockState::_noLockHeaders->find(_logicalCollection->name());
if (it != CollectionLockState::_noLockHeaders->end()) {
// do not lock by command
// LOCKING-DEBUG
// std::cout << "BeginWriteTimed blocked: " << _name <<
// std::endl;
return TRI_ERROR_NO_ERROR;
}
}
// LOCKING-DEBUG
// std::cout << "BeginWriteTimed: " << document->_info._name << std::endl;
int iterations = 0;
bool wasBlocked = false;
uint64_t waitTime = 0; // indicate that times uninitialized
double startTime = 0.0;
while (true) {
TRY_WRITE_LOCKER(locker, _exclusiveLock);
if (locker.isLocked()) {
// register writer
if (useDeadlockDetector) {
_logicalCollection->vocbase()->_deadlockDetector.addWriter(
_logicalCollection, wasBlocked);
}
// keep lock and exit loop
locker.steal();
return TRI_ERROR_NO_ERROR;
}
if (useDeadlockDetector) {
try {
if (!wasBlocked) {
// insert writer
wasBlocked = true;
if (_logicalCollection->vocbase()->_deadlockDetector.setWriterBlocked(
_logicalCollection) == TRI_ERROR_DEADLOCK) {
// deadlock
LOG_TOPIC(TRACE, arangodb::Logger::FIXME)
<< "deadlock detected while trying to acquire "
"write-lock on collection '"
<< _logicalCollection->name() << "'";
return TRI_ERROR_DEADLOCK;
}
LOG_TOPIC(TRACE, arangodb::Logger::FIXME)
<< "waiting for write-lock on collection '"
<< _logicalCollection->name() << "'";
} else if (++iterations >= 5) {
// periodically check for deadlocks
TRI_ASSERT(wasBlocked);
iterations = 0;
if (_logicalCollection->vocbase()->_deadlockDetector.detectDeadlock(
_logicalCollection, true) == TRI_ERROR_DEADLOCK) {
// deadlock
_logicalCollection->vocbase()->_deadlockDetector.unsetWriterBlocked(
_logicalCollection);
LOG_TOPIC(TRACE, arangodb::Logger::FIXME)
<< "deadlock detected while trying to acquire "
"write-lock on collection '"
<< _logicalCollection->name() << "'";
return TRI_ERROR_DEADLOCK;
}
}
} catch (...) {
// clean up!
if (wasBlocked) {
_logicalCollection->vocbase()->_deadlockDetector.unsetWriterBlocked(
_logicalCollection);
}
// always exit
return TRI_ERROR_OUT_OF_MEMORY;
}
}
double now = TRI_microtime();
if (waitTime == 0) { // initialize times
// set end time for lock waiting
if (timeout <= 0.0) {
timeout = defaultLockTimeout;
}
startTime = now;
waitTime = 1;
}
if (now > startTime + timeout) {
if (useDeadlockDetector) {
_logicalCollection->vocbase()->_deadlockDetector.unsetWriterBlocked(
_logicalCollection);
}
LOG_TOPIC(TRACE, arangodb::Logger::FIXME)
<< "timed out after " << timeout
<< " s waiting for write-lock on collection '"
<< _logicalCollection->name() << "'";
return TRI_ERROR_LOCK_TIMEOUT;
}
if (now - startTime < 0.001) {
std::this_thread::yield();
} else {
usleep(static_cast<TRI_usleep_t>(waitTime));
if (waitTime < 500000) {
waitTime *= 2;
}
}
}
}
/// @brief write unlocks a collection
int RocksDBCollection::endWrite(bool useDeadlockDetector) {
if (CollectionLockState::_noLockHeaders != nullptr) {
auto it =
CollectionLockState::_noLockHeaders->find(_logicalCollection->name());
if (it != CollectionLockState::_noLockHeaders->end()) {
// do not lock by command
// LOCKING-DEBUG
// std::cout << "EndWrite blocked: " << _name <<
// std::endl;
return TRI_ERROR_NO_ERROR;
}
}
if (useDeadlockDetector) {
// unregister writer
try {
_logicalCollection->vocbase()->_deadlockDetector.unsetWriter(
_logicalCollection);
} catch (...) {
// must go on here to unlock the lock
}
}
// LOCKING-DEBUG
// std::cout << "EndWrite: " << _name << std::endl;
_exclusiveLock.unlockWrite();
return TRI_ERROR_NO_ERROR;
}

View File

@ -43,6 +43,8 @@ struct RocksDBToken;
class RocksDBCollection final : public PhysicalCollection {
friend class RocksDBEngine;
friend class RocksDBVPackIndex;
constexpr static double defaultLockTimeout = 10.0 * 60.0;
public:
static inline RocksDBCollection* toRocksDBCollection(
@ -179,7 +181,11 @@ class RocksDBCollection final : public PhysicalCollection {
Result lookupDocumentToken(transaction::Methods* trx, arangodb::StringRef key,
RocksDBToken& token);
int beginWriteTimed(bool useDeadlockDetector, double timeout = 0.0);
int endWrite(bool useDeadlockDetector);
private:
/// @brief return engine-specific figures
void figuresSpecific(

View File

@ -175,7 +175,7 @@ void RocksDBTransactionCollection::release() {
}
}
void RocksDBTransactionCollection::resetCounts(){
void RocksDBTransactionCollection::resetCounts() {
// _initialNumberDocuments; -- HAS TO BE UPDATED
_operationSize = 0;
_numInserts = 0;
@ -183,7 +183,6 @@ void RocksDBTransactionCollection::resetCounts(){
_numRemoves = 0;
}
/// @brief add an operation for a transaction collection
void RocksDBTransactionCollection::addOperation(
TRI_voc_document_operation_e operationType, uint64_t operationSize,

View File

@ -129,10 +129,6 @@ Result RocksDBTransactionState::beginTransaction(transaction::Hints hints) {
_rocksWriteOptions, rocksdb::TransactionOptions()));
_rocksTransaction->SetSnapshot();
_rocksReadOptions.snapshot = _rocksTransaction->GetSnapshot();
/*LOG_TOPIC(ERR, Logger::FIXME)
<< "#" << _id << " BEGIN (read-only: " << isReadOnlyTransaction()
<< ")";*/
} else {
TRI_ASSERT(_status == transaction::Status::RUNNING);
}
@ -333,8 +329,8 @@ RocksDBOperationResult RocksDBTransactionState::addOperation(
return res;
}
void RocksDBTransactionState::reset(){
//only rest when already commited
void RocksDBTransactionState::reset() {
//only reset when already commited
TRI_ASSERT(_status == transaction::Status::COMMITTED);
//reset count
_transactionSize = 0;
@ -352,5 +348,4 @@ void RocksDBTransactionState::reset(){
// start new transaction
beginTransaction(transaction::Hints());
}