mirror of https://gitee.com/bigwinds/arangodb
ported fixes to devel
This commit is contained in:
parent
8afc46134b
commit
cd4cbae6ce
|
@ -814,7 +814,14 @@ uint64_t AqlValue::hash(arangodb::AqlTransaction* trx,
|
|||
auto shaper = document->getShaper();
|
||||
TRI_shaped_json_t shaped;
|
||||
TRI_EXTRACT_SHAPED_JSON_MARKER(shaped, _marker);
|
||||
Json json(shaper->memoryZone(), TRI_JsonShapedJson(shaper, &shaped));
|
||||
|
||||
auto v = TRI_JsonShapedJson(shaper, &shaped);
|
||||
|
||||
if (v == nullptr) {
|
||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
Json json(shaper->memoryZone(), v);
|
||||
|
||||
// append the internal attributes
|
||||
|
||||
|
@ -998,7 +1005,13 @@ Json AqlValue::extractObjectMember(
|
|||
bool ok = shaper->extractShapedJson(&document, 0, pid, &json, &shape);
|
||||
|
||||
if (ok && shape != nullptr) {
|
||||
return Json(TRI_UNKNOWN_MEM_ZONE, TRI_JsonShapedJson(shaper, &json));
|
||||
auto v = TRI_JsonShapedJson(shaper, &json);
|
||||
|
||||
if (v == nullptr) {
|
||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
return Json(TRI_UNKNOWN_MEM_ZONE, v);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -304,6 +304,10 @@ Query* Query::clone(QueryPart part, bool withPlan) {
|
|||
|
||||
if (_options != nullptr) {
|
||||
options.reset(TRI_CopyJson(TRI_UNKNOWN_MEM_ZONE, _options));
|
||||
|
||||
if (options == nullptr) {
|
||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_OUT_OF_MEMORY);
|
||||
}
|
||||
}
|
||||
|
||||
std::unique_ptr<Query> clone;
|
||||
|
@ -639,6 +643,10 @@ QueryResult Query::execute(QueryRegistry* registry) {
|
|||
res.json = TRI_CopyJson(TRI_UNKNOWN_MEM_ZONE, cacheEntry->_queryResult);
|
||||
res.cached = true;
|
||||
|
||||
if (res.json == nullptr) {
|
||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
return res;
|
||||
}
|
||||
}
|
||||
|
@ -685,10 +693,21 @@ QueryResult Query::execute(QueryRegistry* registry) {
|
|||
|
||||
if (_warnings.empty()) {
|
||||
// finally store the generated result in the query cache
|
||||
QueryCache::instance()->store(
|
||||
std::unique_ptr<TRI_json_t> copy(TRI_CopyJson(TRI_UNKNOWN_MEM_ZONE, jsonResult.json()));
|
||||
|
||||
if (copy == nullptr) {
|
||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
auto result = QueryCache::instance()->store(
|
||||
_vocbase, queryStringHash, _queryString, _queryLength,
|
||||
TRI_CopyJson(TRI_UNKNOWN_MEM_ZONE, jsonResult.json()),
|
||||
copy.get(),
|
||||
_trx->collectionNames());
|
||||
|
||||
if (result != nullptr) {
|
||||
// result now belongs to cache
|
||||
copy.release();
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// iterate over result and return it
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -1,8 +1,8 @@
|
|||
/* A Bison parser, made by GNU Bison 3.0.4. */
|
||||
/* A Bison parser, made by GNU Bison 3.0.2. */
|
||||
|
||||
/* Bison interface for Yacc-like parsers in C
|
||||
|
||||
Copyright (C) 1984, 1989-1990, 2000-2015 Free Software Foundation, Inc.
|
||||
Copyright (C) 1984, 1989-1990, 2000-2013 Free Software Foundation, Inc.
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
|
@ -117,10 +117,10 @@ extern int Aqldebug;
|
|||
|
||||
/* Value type. */
|
||||
#if ! defined YYSTYPE && ! defined YYSTYPE_IS_DECLARED
|
||||
|
||||
typedef union YYSTYPE YYSTYPE;
|
||||
union YYSTYPE
|
||||
{
|
||||
#line 18 "arangod/Aql/grammar.y" /* yacc.c:1915 */
|
||||
#line 18 "arangod/Aql/grammar.y" /* yacc.c:1909 */
|
||||
|
||||
arangodb::aql::AstNode* node;
|
||||
struct {
|
||||
|
@ -130,10 +130,8 @@ union YYSTYPE
|
|||
bool boolval;
|
||||
int64_t intval;
|
||||
|
||||
#line 134 "arangod/Aql/grammar.hpp" /* yacc.c:1915 */
|
||||
#line 134 "arangod/Aql/grammar.hpp" /* yacc.c:1909 */
|
||||
};
|
||||
|
||||
typedef union YYSTYPE YYSTYPE;
|
||||
# define YYSTYPE_IS_TRIVIAL 1
|
||||
# define YYSTYPE_IS_DECLARED 1
|
||||
#endif
|
||||
|
|
|
@ -115,8 +115,17 @@ static TRI_index_operator_t* buildRangeOperator(VPackSlice const& lowerBound,
|
|||
VocShaper* shaper) {
|
||||
std::unique_ptr<TRI_index_operator_t> lowerOperator(buildBoundOperator(
|
||||
lowerBound, lowerBoundInclusive, false, parameters, shaper));
|
||||
|
||||
if (lowerOperator == nullptr && !lowerBound.isNone()) {
|
||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
std::unique_ptr<TRI_index_operator_t> upperOperator(buildBoundOperator(
|
||||
upperBound, upperBoundInclusive, true, parameters, shaper));
|
||||
|
||||
if (upperOperator == nullptr && !upperBound.isNone()) {
|
||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
if (lowerOperator == nullptr) {
|
||||
return upperOperator.release();
|
||||
|
|
|
@ -1866,7 +1866,6 @@ static ExplicitTransaction* BeginTransaction(
|
|||
int res = trx->begin();
|
||||
|
||||
if (res != TRI_ERROR_NO_ERROR) {
|
||||
trx->finish(res);
|
||||
THROW_ARANGO_EXCEPTION(res);
|
||||
}
|
||||
|
||||
|
|
|
@ -79,7 +79,6 @@ TRI_document_collection_t::TRI_document_collection_t()
|
|||
_headersPtr(nullptr),
|
||||
_keyGenerator(nullptr),
|
||||
_uncollectedLogfileEntries(0),
|
||||
_currentWriterThread(0),
|
||||
_cleanupIndexes(0) {
|
||||
_tickMax = 0;
|
||||
|
||||
|
@ -150,6 +149,14 @@ int TRI_document_collection_t::beginRead() {
|
|||
// LOCKING-DEBUG
|
||||
// std::cout << "BeginRead: " << document->_info._name << std::endl;
|
||||
TRI_READ_LOCK_DOCUMENTS_INDEXES_PRIMARY_COLLECTION(this);
|
||||
|
||||
try {
|
||||
_vocbase->_deadlockDetector.addReader(this, false);
|
||||
}
|
||||
catch (...) {
|
||||
TRI_READ_UNLOCK_DOCUMENTS_INDEXES_PRIMARY_COLLECTION(this);
|
||||
return TRI_ERROR_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
return TRI_ERROR_NO_ERROR;
|
||||
}
|
||||
|
@ -169,6 +176,13 @@ int TRI_document_collection_t::endRead() {
|
|||
return TRI_ERROR_NO_ERROR;
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
_vocbase->_deadlockDetector.unsetReader(this);
|
||||
}
|
||||
catch (...) {
|
||||
}
|
||||
|
||||
// LOCKING-DEBUG
|
||||
// std::cout << "EndRead: " << document->_info._name << std::endl;
|
||||
TRI_READ_UNLOCK_DOCUMENTS_INDEXES_PRIMARY_COLLECTION(this);
|
||||
|
@ -195,9 +209,15 @@ int TRI_document_collection_t::beginWrite() {
|
|||
// LOCKING_DEBUG
|
||||
// std::cout << "BeginWrite: " << document->_info._name << std::endl;
|
||||
TRI_WRITE_LOCK_DOCUMENTS_INDEXES_PRIMARY_COLLECTION(this);
|
||||
|
||||
|
||||
// register writer
|
||||
_currentWriterThread.store(TRI_CurrentThreadId());
|
||||
try {
|
||||
_vocbase->_deadlockDetector.addWriter(this, false);
|
||||
}
|
||||
catch (...) {
|
||||
TRI_WRITE_UNLOCK_DOCUMENTS_INDEXES_PRIMARY_COLLECTION(this);
|
||||
return TRI_ERROR_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
return TRI_ERROR_NO_ERROR;
|
||||
}
|
||||
|
@ -218,13 +238,19 @@ int TRI_document_collection_t::endWrite() {
|
|||
return TRI_ERROR_NO_ERROR;
|
||||
}
|
||||
}
|
||||
|
||||
// unregister writer
|
||||
try {
|
||||
_vocbase->_deadlockDetector.unsetWriter(this);
|
||||
}
|
||||
catch (...) {
|
||||
// must go on here to unlock the lock
|
||||
}
|
||||
|
||||
// LOCKING-DEBUG
|
||||
// std::cout << "EndWrite: " << document->_info._name << std::endl;
|
||||
TRI_WRITE_UNLOCK_DOCUMENTS_INDEXES_PRIMARY_COLLECTION(this);
|
||||
|
||||
// unregister writer
|
||||
_currentWriterThread.store(0);
|
||||
|
||||
return TRI_ERROR_NO_ERROR;
|
||||
}
|
||||
|
||||
|
@ -260,7 +286,7 @@ int TRI_document_collection_t::beginReadTimed(uint64_t timeout,
|
|||
try {
|
||||
if (!wasBlocked) {
|
||||
// insert reader
|
||||
if (_vocbase->_deadlockDetector.setReaderBlocked(this)) {
|
||||
if (_vocbase->_deadlockDetector.setReaderBlocked(this) == TRI_ERROR_DEADLOCK) {
|
||||
// deadlock
|
||||
LOG_TRACE(
|
||||
"deadlock detected while trying to acquire read-lock on "
|
||||
|
@ -275,9 +301,9 @@ int TRI_document_collection_t::beginReadTimed(uint64_t timeout,
|
|||
// periodically check for deadlocks
|
||||
TRI_ASSERT(wasBlocked);
|
||||
iterations = 0;
|
||||
if (_vocbase->_deadlockDetector.isDeadlocked(this)) {
|
||||
if (_vocbase->_deadlockDetector.detectDeadlock(this, false) == TRI_ERROR_DEADLOCK) {
|
||||
// deadlock
|
||||
_vocbase->_deadlockDetector.setReaderUnblocked(this);
|
||||
_vocbase->_deadlockDetector.unsetReaderBlocked(this);
|
||||
LOG_TRACE(
|
||||
"deadlock detected while trying to acquire read-lock on "
|
||||
"collection '%s'",
|
||||
|
@ -288,7 +314,7 @@ int TRI_document_collection_t::beginReadTimed(uint64_t timeout,
|
|||
} catch (...) {
|
||||
// clean up!
|
||||
if (wasBlocked) {
|
||||
_vocbase->_deadlockDetector.setReaderUnblocked(this);
|
||||
_vocbase->_deadlockDetector.unsetReaderBlocked(this);
|
||||
return TRI_ERROR_OUT_OF_MEMORY;
|
||||
}
|
||||
}
|
||||
|
@ -302,16 +328,20 @@ int TRI_document_collection_t::beginReadTimed(uint64_t timeout,
|
|||
waited += sleepPeriod;
|
||||
|
||||
if (waited > timeout) {
|
||||
_vocbase->_deadlockDetector.setReaderUnblocked(this);
|
||||
_vocbase->_deadlockDetector.unsetReaderBlocked(this);
|
||||
LOG_TRACE("timed out waiting for read-lock on collection '%s'",
|
||||
_info.namec_str());
|
||||
return TRI_ERROR_LOCK_TIMEOUT;
|
||||
}
|
||||
}
|
||||
|
||||
// when we are here, we've got the read lock
|
||||
if (wasBlocked) {
|
||||
_vocbase->_deadlockDetector.setReaderUnblocked(this);
|
||||
|
||||
try {
|
||||
// when we are here, we've got the read lock
|
||||
_vocbase->_deadlockDetector.addReader(this, wasBlocked);
|
||||
}
|
||||
catch (...) {
|
||||
TRI_READ_UNLOCK_DOCUMENTS_INDEXES_PRIMARY_COLLECTION(this);
|
||||
return TRI_ERROR_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
return TRI_ERROR_NO_ERROR;
|
||||
|
@ -348,8 +378,8 @@ int TRI_document_collection_t::beginWriteTimed(uint64_t timeout,
|
|||
while (!TRI_TRY_WRITE_LOCK_DOCUMENTS_INDEXES_PRIMARY_COLLECTION(this)) {
|
||||
try {
|
||||
if (!wasBlocked) {
|
||||
// insert writer (with method named "setReaderBlocked"..., but it works)
|
||||
if (_vocbase->_deadlockDetector.setReaderBlocked(this)) {
|
||||
// insert writer
|
||||
if (_vocbase->_deadlockDetector.setWriterBlocked(this) == TRI_ERROR_DEADLOCK) {
|
||||
// deadlock
|
||||
LOG_TRACE(
|
||||
"deadlock detected while trying to acquire write-lock on "
|
||||
|
@ -364,9 +394,9 @@ int TRI_document_collection_t::beginWriteTimed(uint64_t timeout,
|
|||
// periodically check for deadlocks
|
||||
TRI_ASSERT(wasBlocked);
|
||||
iterations = 0;
|
||||
if (_vocbase->_deadlockDetector.isDeadlocked(this)) {
|
||||
if (_vocbase->_deadlockDetector.detectDeadlock(this, true) == TRI_ERROR_DEADLOCK) {
|
||||
// deadlock
|
||||
_vocbase->_deadlockDetector.setReaderUnblocked(this);
|
||||
_vocbase->_deadlockDetector.unsetWriterBlocked(this);
|
||||
LOG_TRACE(
|
||||
"deadlock detected while trying to acquire write-lock on "
|
||||
"collection '%s'",
|
||||
|
@ -377,7 +407,7 @@ int TRI_document_collection_t::beginWriteTimed(uint64_t timeout,
|
|||
} catch (...) {
|
||||
// clean up!
|
||||
if (wasBlocked) {
|
||||
_vocbase->_deadlockDetector.setReaderUnblocked(this);
|
||||
_vocbase->_deadlockDetector.unsetWriterBlocked(this);
|
||||
return TRI_ERROR_OUT_OF_MEMORY;
|
||||
}
|
||||
}
|
||||
|
@ -391,20 +421,21 @@ int TRI_document_collection_t::beginWriteTimed(uint64_t timeout,
|
|||
waited += sleepPeriod;
|
||||
|
||||
if (waited > timeout) {
|
||||
_vocbase->_deadlockDetector.setReaderUnblocked(this);
|
||||
_vocbase->_deadlockDetector.unsetWriterBlocked(this);
|
||||
LOG_TRACE("timed out waiting for write-lock on collection '%s'",
|
||||
_info.namec_str());
|
||||
return TRI_ERROR_LOCK_TIMEOUT;
|
||||
}
|
||||
}
|
||||
|
||||
// when we are here, we've got the write lock
|
||||
if (wasBlocked) {
|
||||
_vocbase->_deadlockDetector.setReaderUnblocked(this);
|
||||
try {
|
||||
// register writer
|
||||
_vocbase->_deadlockDetector.addWriter(this, wasBlocked);
|
||||
}
|
||||
catch (...) {
|
||||
TRI_WRITE_UNLOCK_DOCUMENTS_INDEXES_PRIMARY_COLLECTION(this);
|
||||
return TRI_ERROR_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
// register writer
|
||||
_currentWriterThread.store(TRI_CurrentThreadId());
|
||||
|
||||
return TRI_ERROR_NO_ERROR;
|
||||
}
|
||||
|
|
|
@ -272,10 +272,6 @@ struct TRI_document_collection_t : public TRI_collection_t {
|
|||
VocShaper* getShaper() const;
|
||||
#endif
|
||||
|
||||
inline TRI_tid_t getCurrentWriterThread() const {
|
||||
return _currentWriterThread.load();
|
||||
}
|
||||
|
||||
void setNextCompactionStartIndex(size_t);
|
||||
size_t getNextCompactionStartIndex();
|
||||
void setCompactionStatus(char const*);
|
||||
|
@ -313,8 +309,6 @@ struct TRI_document_collection_t : public TRI_collection_t {
|
|||
TRI_read_write_lock_t _compactionLock;
|
||||
double _lastCompaction;
|
||||
|
||||
std::atomic<TRI_tid_t> _currentWriterThread;
|
||||
|
||||
// ...........................................................................
|
||||
// this condition variable protects the _journalsCondition
|
||||
// ...........................................................................
|
||||
|
|
|
@ -2082,6 +2082,10 @@ TRI_shaped_json_t* TRI_ShapedJsonJson(VocShaper* shaper, TRI_json_t const* json,
|
|||
|
||||
TRI_json_t* TRI_JsonShapedJson(VocShaper* shaper,
|
||||
TRI_shaped_json_t const* shaped) {
|
||||
#ifdef TRI_ENABLE_MAINTAINER_MODE
|
||||
TRI_ASSERT(shaped != nullptr);
|
||||
#endif
|
||||
|
||||
TRI_shape_t const* shape = shaper->lookupShapeId(shaped->_sid);
|
||||
|
||||
if (shape == nullptr) {
|
||||
|
|
|
@ -42,123 +42,267 @@ class DeadlockDetector {
|
|||
DeadlockDetector& operator=(DeadlockDetector const&) = delete;
|
||||
|
||||
public:
|
||||
bool isDeadlocked(T const* value) {
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief add a thread to the list of blocked threads
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
int detectDeadlock(T const* value, bool isWrite) {
|
||||
auto tid = TRI_CurrentThreadId();
|
||||
std::unordered_set<TRI_tid_t> watchFor({tid});
|
||||
|
||||
std::vector<TRI_tid_t> stack;
|
||||
|
||||
TRI_tid_t writerTid = value->getCurrentWriterThread();
|
||||
|
||||
if (writerTid == 0) {
|
||||
return false;
|
||||
}
|
||||
|
||||
stack.push_back(writerTid);
|
||||
|
||||
MUTEX_LOCKER(mutexLocker, _readersLock);
|
||||
|
||||
while (!stack.empty()) {
|
||||
TRI_tid_t current = stack.back();
|
||||
stack.pop_back();
|
||||
|
||||
watchFor.emplace(current);
|
||||
auto it2 = _readersBlocked.find(current);
|
||||
|
||||
if (it2 == _readersBlocked.end()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (watchFor.find((*it2).second) != watchFor.end()) {
|
||||
// deadlock!
|
||||
return true;
|
||||
}
|
||||
|
||||
stack.push_back((*it2).second);
|
||||
}
|
||||
|
||||
// no deadlock found
|
||||
return false;
|
||||
MUTEX_LOCKER(mutexLocker, _lock);
|
||||
return detectDeadlock(value, tid, isWrite);
|
||||
}
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief insert a reader into the list of blocked readers
|
||||
/// returns true if a deadlock was detected and false otherwise
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief add a reader to the list of blocked readers
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
bool setReaderBlocked(T const* value) {
|
||||
auto tid = TRI_CurrentThreadId();
|
||||
std::unordered_set<TRI_tid_t> watchFor({tid});
|
||||
int setReaderBlocked(T const* value) { return setBlocked(value, false); }
|
||||
|
||||
std::vector<TRI_tid_t> stack;
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief add a writer to the list of blocked writers
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
TRI_tid_t writerTid = value->getCurrentWriterThread();
|
||||
int setWriterBlocked(T const* value) { return setBlocked(value, true); }
|
||||
|
||||
if (writerTid == 0) {
|
||||
return false;
|
||||
}
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief remove a reader from the list of blocked readers
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
stack.push_back(writerTid);
|
||||
void unsetReaderBlocked(T const* value) { unsetBlocked(value, false); }
|
||||
|
||||
MUTEX_LOCKER(mutexLocker, _readersLock);
|
||||
_readersBlocked.emplace(tid, writerTid);
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief remove a writer from the list of blocked writers
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
try {
|
||||
while (!stack.empty()) {
|
||||
TRI_tid_t current = stack.back();
|
||||
stack.pop_back();
|
||||
void unsetWriterBlocked(T const* value) { unsetBlocked(value, true); }
|
||||
|
||||
watchFor.emplace(current);
|
||||
auto it2 = _readersBlocked.find(current);
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief add a reader to the list of active readers
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
if (it2 == _readersBlocked.end()) {
|
||||
return false;
|
||||
void addReader(T const* value, bool wasBlockedBefore) {
|
||||
addActive(value, false, wasBlockedBefore);
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief add a writer to the list of active writers
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
void addWriter(T const* value, bool wasBlockedBefore) {
|
||||
addActive(value, true, wasBlockedBefore);
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief unregister a reader from the list of active readers
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
void unsetReader(T const* value) { unsetActive(value, false); }
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief unregister a writer from the list of active writers
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
void unsetWriter(T const* value) { unsetActive(value, true); }
|
||||
|
||||
private:
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief add a thread to the list of blocked threads
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
int detectDeadlock(T const* value, TRI_tid_t tid, bool isWrite) const {
|
||||
struct StackValue {
|
||||
StackValue(T const* value, TRI_tid_t tid, bool isWrite)
|
||||
: value(value), tid(tid), isWrite(isWrite) {}
|
||||
T const* value;
|
||||
TRI_tid_t tid;
|
||||
bool isWrite;
|
||||
};
|
||||
|
||||
std::unordered_set<TRI_tid_t> visited;
|
||||
std::vector<StackValue> stack;
|
||||
stack.emplace_back(StackValue(value, tid, isWrite));
|
||||
|
||||
while (!stack.empty()) {
|
||||
StackValue top = stack.back(); // intentionally copy StackValue
|
||||
stack.pop_back();
|
||||
|
||||
if (!top.isWrite) {
|
||||
// we are a reader
|
||||
auto it = _active.find(top.value);
|
||||
|
||||
if (it != _active.end()) {
|
||||
bool other = (*it).second.second;
|
||||
|
||||
if (other) {
|
||||
// other is a writer
|
||||
TRI_tid_t otherTid = *((*it).second.first.begin());
|
||||
|
||||
if (visited.find(otherTid) != visited.end()) {
|
||||
return TRI_ERROR_DEADLOCK;
|
||||
}
|
||||
|
||||
auto it2 = _blocked.find(otherTid);
|
||||
|
||||
if (it2 != _blocked.end()) {
|
||||
// writer thread is blocking...
|
||||
stack.emplace_back(
|
||||
StackValue((*it2).second.first, otherTid, other));
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// we are a writer
|
||||
auto it = _active.find(top.value);
|
||||
|
||||
if (watchFor.find((*it2).second) != watchFor.end()) {
|
||||
// deadlock!
|
||||
_readersBlocked.erase(tid);
|
||||
return true;
|
||||
if (it != _active.end()) {
|
||||
// other is either a reader or a writer
|
||||
for (auto const& otherTid : (*it).second.first) {
|
||||
if (visited.find(otherTid) != visited.end()) {
|
||||
return TRI_ERROR_DEADLOCK;
|
||||
}
|
||||
|
||||
auto it2 = _blocked.find(otherTid);
|
||||
|
||||
if (it2 != _blocked.end()) {
|
||||
// writer thread is blocking...
|
||||
stack.emplace_back(StackValue((*it2).second.first, otherTid,
|
||||
(*it).second.second));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
stack.push_back((*it2).second);
|
||||
}
|
||||
|
||||
// no deadlock found
|
||||
return false;
|
||||
visited.emplace(top.tid);
|
||||
}
|
||||
|
||||
// no deadlock
|
||||
return TRI_ERROR_NO_ERROR;
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief add a thread to the list of blocked threads
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
int setBlocked(T const* value, bool isWrite) {
|
||||
auto tid = TRI_CurrentThreadId();
|
||||
|
||||
MUTEX_LOCKER(mutexLocker, _lock);
|
||||
|
||||
auto it = _blocked.find(tid);
|
||||
|
||||
if (it != _blocked.end()) {
|
||||
// we're already blocking. should never happend
|
||||
return TRI_ERROR_DEADLOCK;
|
||||
}
|
||||
|
||||
_blocked.emplace(tid, std::make_pair(value, isWrite));
|
||||
|
||||
try {
|
||||
int res = detectDeadlock(value, tid, isWrite);
|
||||
|
||||
if (res != TRI_ERROR_NO_ERROR) {
|
||||
// clean up
|
||||
_blocked.erase(tid);
|
||||
}
|
||||
|
||||
return res;
|
||||
} catch (...) {
|
||||
// clean up and re-throw
|
||||
_readersBlocked.erase(tid);
|
||||
// clean up
|
||||
_blocked.erase(tid);
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief remove a reader from the list of blocked readers
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief remove a thread from the list of blocked threads
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
void setReaderUnblocked(T const* value) noexcept {
|
||||
void unsetBlocked(T const* value, bool isWrite) {
|
||||
auto tid = TRI_CurrentThreadId();
|
||||
|
||||
try {
|
||||
MUTEX_LOCKER(mutexLocker, _readersLock);
|
||||
_readersBlocked.erase(tid);
|
||||
} catch (...) {
|
||||
MUTEX_LOCKER(mutexLocker, _lock);
|
||||
|
||||
_blocked.erase(tid);
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief unregister a thread from the list of active threads
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
void unsetActive(T const* value, bool isWrite) {
|
||||
auto tid = TRI_CurrentThreadId();
|
||||
|
||||
MUTEX_LOCKER(mutexLocker, _lock);
|
||||
auto it = _active.find(value);
|
||||
|
||||
if (it == _active.end()) {
|
||||
// should not happen, but definitely nothing to do here
|
||||
return;
|
||||
}
|
||||
|
||||
if (isWrite) {
|
||||
TRI_ASSERT((*it).second.second);
|
||||
TRI_ASSERT((*it).second.first.size() == 1);
|
||||
// remove whole entry
|
||||
_active.erase(value);
|
||||
} else {
|
||||
TRI_ASSERT(!(*it).second.second);
|
||||
TRI_ASSERT((*it).second.first.size() >= 1);
|
||||
|
||||
(*it).second.first.erase(tid);
|
||||
if ((*it).second.first.empty()) {
|
||||
// remove last reader
|
||||
_active.erase(value);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief add a reader/writer to the list of active threads
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
void addActive(T const* value, bool isWrite, bool wasBlockedBefore) {
|
||||
auto tid = TRI_CurrentThreadId();
|
||||
|
||||
MUTEX_LOCKER(mutexLocker, _lock);
|
||||
auto it = _active.find(value);
|
||||
|
||||
if (it == _active.end()) {
|
||||
_active.emplace(
|
||||
value, std::make_pair(std::unordered_set<TRI_tid_t>({tid}), isWrite));
|
||||
} else {
|
||||
TRI_ASSERT(!(*it).second.first.empty());
|
||||
TRI_ASSERT(!(*it).second.second);
|
||||
TRI_ASSERT(!isWrite);
|
||||
auto result = (*it).second.first.emplace(tid);
|
||||
TRI_ASSERT(result.second);
|
||||
}
|
||||
|
||||
if (wasBlockedBefore) {
|
||||
_blocked.erase(tid);
|
||||
}
|
||||
}
|
||||
|
||||
private:
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief lock for managing the readers
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief lock for managing the data structures
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
arangodb::Mutex _readersLock;
|
||||
arangodb::Mutex _lock;
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief readers that are blocked on writers
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief threads currently blocked
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
std::unordered_map<TRI_tid_t, TRI_tid_t> _readersBlocked;
|
||||
std::unordered_map<TRI_tid_t, std::pair<T const*, bool>> _blocked;
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief threads currently holding locks
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
std::unordered_map<T const*, std::pair<std::unordered_set<TRI_tid_t>, bool>>
|
||||
_active;
|
||||
};
|
||||
|
||||
} // namespace arangodb::basics
|
||||
|
|
|
@ -394,6 +394,10 @@ TRI_json_t* TRI_CreateStringJson(TRI_memory_zone_t* zone, char* value,
|
|||
|
||||
TRI_json_t* TRI_CreateStringCopyJson(TRI_memory_zone_t* zone, char const* value,
|
||||
size_t length) {
|
||||
if (value == nullptr) {
|
||||
// initial string should be valid...
|
||||
return nullptr;
|
||||
}
|
||||
TRI_json_t* result =
|
||||
static_cast<TRI_json_t*>(TRI_Allocate(zone, sizeof(TRI_json_t), false));
|
||||
|
||||
|
@ -425,6 +429,11 @@ void TRI_InitStringJson(TRI_json_t* result, char* value, size_t length) {
|
|||
|
||||
int TRI_InitStringCopyJson(TRI_memory_zone_t* zone, TRI_json_t* result,
|
||||
char const* value, size_t length) {
|
||||
if (value == nullptr) {
|
||||
// initial string should be valid...
|
||||
return TRI_ERROR_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
char* copy = TRI_DuplicateString(zone, value, length);
|
||||
|
||||
if (copy == nullptr) {
|
||||
|
|
Loading…
Reference in New Issue