diff --git a/arangod/Aql/QueryRegistry.cpp b/arangod/Aql/QueryRegistry.cpp index 4a87c3adfc..32c3eac2cb 100644 --- a/arangod/Aql/QueryRegistry.cpp +++ b/arangod/Aql/QueryRegistry.cpp @@ -64,50 +64,48 @@ QueryRegistry::~QueryRegistry() { } } } - + /// @brief insert void QueryRegistry::insert(QueryId id, Query* query, double ttl) { TRI_ASSERT(query != nullptr); TRI_ASSERT(query->trx() != nullptr); auto vocbase = query->vocbase(); + + // create the query info object outside of the lock + auto p = std::make_unique(id, query, ttl); - WRITE_LOCKER(writeLocker, _lock); + // now insert into table of running queries + { + WRITE_LOCKER(writeLocker, _lock); - auto m = _queries.find(vocbase->name()); - if (m == _queries.end()) { - m = _queries.emplace(vocbase->name(), - std::unordered_map()).first; + auto m = _queries.find(vocbase->name()); + if (m == _queries.end()) { + m = _queries.emplace(vocbase->name(), + std::unordered_map()).first; + + TRI_ASSERT(_queries.find(vocbase->name()) != _queries.end()); + } + + auto q = m->second.find(id); + + if (q != m->second.end()) { + THROW_ARANGO_EXCEPTION_MESSAGE( + TRI_ERROR_INTERNAL, "query with given vocbase and id already there"); + } - TRI_ASSERT(_queries.find(vocbase->name()) != _queries.end()); - } - auto q = m->second.find(id); - if (q == m->second.end()) { - auto p = std::make_unique(); - p->_vocbase = vocbase; - p->_id = id; - p->_query = query; - p->_isOpen = false; - p->_timeToLive = ttl; - p->_expires = TRI_microtime() + ttl; m->second.emplace(id, p.get()); p.release(); + } - TRI_ASSERT(_queries.find(vocbase->name())->second.find(id) != - _queries.find(vocbase->name())->second.end()); - - // If we have set _noLockHeaders, we need to unset it: - if (CollectionLockState::_noLockHeaders != nullptr) { - if (CollectionLockState::_noLockHeaders == query->engine()->lockedShards()) { - CollectionLockState::_noLockHeaders = nullptr; - } - // else { - // We have not set it, just leave it alone. This happens in particular - // on the DBServers, who do not set lockedShards() themselves. - // } + // If we have set _noLockHeaders, we need to unset it: + if (CollectionLockState::_noLockHeaders != nullptr) { + if (CollectionLockState::_noLockHeaders == query->engine()->lockedShards()) { + CollectionLockState::_noLockHeaders = nullptr; } - } else { - THROW_ARANGO_EXCEPTION_MESSAGE( - TRI_ERROR_INTERNAL, "query with given vocbase and id already there"); + // else { + // We have not set it, just leave it alone. This happens in particular + // on the DBServers, who do not set lockedShards() themselves. + // } } } @@ -192,50 +190,60 @@ void QueryRegistry::close(TRI_vocbase_t* vocbase, QueryId id, double ttl) { /// @brief destroy void QueryRegistry::destroy(std::string const& vocbase, QueryId id, int errorCode) { - WRITE_LOCKER(writeLocker, _lock); + std::unique_ptr queryInfo; + + { + WRITE_LOCKER(writeLocker, _lock); - auto m = _queries.find(vocbase); - if (m == _queries.end()) { - m = _queries.emplace(vocbase, std::unordered_map()) - .first; - } - auto q = m->second.find(id); - if (q == m->second.end()) { - THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, - "query with given vocbase and id not found"); - } - QueryInfo* qi = q->second; + auto m = _queries.find(vocbase); - if (qi->_isOpen) { - qi->_query->killed(true); - return; + if (m == _queries.end()) { + THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_BAD_PARAMETER, + "query with given vocbase and id not found"); + } + + auto q = m->second.find(id); + + if (q == m->second.end()) { + THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_BAD_PARAMETER, + "query with given vocbase and id not found"); + } + + if (q->second->_isOpen) { + // query in use by another thread/request + q->second->_query->killed(true); + return; + } + + // move query into our unique ptr, so we can process it outside + // of the lock + queryInfo.reset(q->second); + + // remove query from the table of running queries + q->second = nullptr; + m->second.erase(q); } - // If the query is open, we can delete it right away, if not, we need + TRI_ASSERT(queryInfo != nullptr); + TRI_ASSERT(!queryInfo->_isOpen); + + // If the query was open, we can delete it right away, if not, we need // to register the transaction with the current context and adjust // the debugging counters for transactions: - if (!qi->_isOpen) { - // If we had set _noLockHeaders, we need to reset it: - if (qi->_query->engine()->lockedShards() != nullptr) { - if (CollectionLockState::_noLockHeaders == nullptr) { - CollectionLockState::_noLockHeaders = qi->_query->engine()->lockedShards(); - } else { - LOG_TOPIC(WARN, arangodb::Logger::FIXME) << "Found strange lockedShards in thread, not overwriting!"; - } + + // If we had set _noLockHeaders, we need to reset it: + if (queryInfo->_query->engine()->lockedShards() != nullptr) { + if (CollectionLockState::_noLockHeaders == nullptr) { + CollectionLockState::_noLockHeaders = queryInfo->_query->engine()->lockedShards(); + } else { + LOG_TOPIC(WARN, arangodb::Logger::FIXME) << "Found strange lockedShards in thread, not overwriting!"; } } if (errorCode == TRI_ERROR_NO_ERROR) { // commit the operation - qi->_query->trx()->commit(); + queryInfo->_query->trx()->commit(); } - - // Now we can delete it: - delete qi->_query; - delete qi; - - q->second = nullptr; - m->second.erase(q); } /// @brief destroy @@ -301,3 +309,15 @@ void QueryRegistry::destroyAll() { } } } + +QueryRegistry::QueryInfo::QueryInfo(QueryId id, Query* query, double ttl) + : _vocbase(query->vocbase()), + _id(id), + _query(query), + _isOpen(false), + _timeToLive(ttl), + _expires(TRI_microtime() + ttl) {} + +QueryRegistry::QueryInfo::~QueryInfo() { + delete _query; +} diff --git a/arangod/Aql/QueryRegistry.h b/arangod/Aql/QueryRegistry.h index e601019e1d..e7b8bee2bf 100644 --- a/arangod/Aql/QueryRegistry.h +++ b/arangod/Aql/QueryRegistry.h @@ -85,6 +85,9 @@ class QueryRegistry { private: /// @brief a struct for all information regarding one query in the registry struct QueryInfo { + QueryInfo(QueryId id, Query* query, double ttl); + ~QueryInfo(); + TRI_vocbase_t* _vocbase; // the vocbase QueryId _id; // id of the query Query* _query; // the actual query pointer