mirror of https://gitee.com/bigwinds/arangodb
Feature 3.3/small optimizations for query registry (#4259)
This commit is contained in:
parent
0a28878059
commit
f482b7aed4
|
@ -64,50 +64,48 @@ QueryRegistry::~QueryRegistry() {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/// @brief insert
|
||||
void QueryRegistry::insert(QueryId id, Query* query, double ttl) {
|
||||
TRI_ASSERT(query != nullptr);
|
||||
TRI_ASSERT(query->trx() != nullptr);
|
||||
auto vocbase = query->vocbase();
|
||||
|
||||
// create the query info object outside of the lock
|
||||
auto p = std::make_unique<QueryInfo>(id, query, ttl);
|
||||
|
||||
WRITE_LOCKER(writeLocker, _lock);
|
||||
// now insert into table of running queries
|
||||
{
|
||||
WRITE_LOCKER(writeLocker, _lock);
|
||||
|
||||
auto m = _queries.find(vocbase->name());
|
||||
if (m == _queries.end()) {
|
||||
m = _queries.emplace(vocbase->name(),
|
||||
std::unordered_map<QueryId, QueryInfo*>()).first;
|
||||
auto m = _queries.find(vocbase->name());
|
||||
if (m == _queries.end()) {
|
||||
m = _queries.emplace(vocbase->name(),
|
||||
std::unordered_map<QueryId, QueryInfo*>()).first;
|
||||
|
||||
TRI_ASSERT(_queries.find(vocbase->name()) != _queries.end());
|
||||
}
|
||||
|
||||
auto q = m->second.find(id);
|
||||
|
||||
if (q != m->second.end()) {
|
||||
THROW_ARANGO_EXCEPTION_MESSAGE(
|
||||
TRI_ERROR_INTERNAL, "query with given vocbase and id already there");
|
||||
}
|
||||
|
||||
TRI_ASSERT(_queries.find(vocbase->name()) != _queries.end());
|
||||
}
|
||||
auto q = m->second.find(id);
|
||||
if (q == m->second.end()) {
|
||||
auto p = std::make_unique<QueryInfo>();
|
||||
p->_vocbase = vocbase;
|
||||
p->_id = id;
|
||||
p->_query = query;
|
||||
p->_isOpen = false;
|
||||
p->_timeToLive = ttl;
|
||||
p->_expires = TRI_microtime() + ttl;
|
||||
m->second.emplace(id, p.get());
|
||||
p.release();
|
||||
}
|
||||
|
||||
TRI_ASSERT(_queries.find(vocbase->name())->second.find(id) !=
|
||||
_queries.find(vocbase->name())->second.end());
|
||||
|
||||
// If we have set _noLockHeaders, we need to unset it:
|
||||
if (CollectionLockState::_noLockHeaders != nullptr) {
|
||||
if (CollectionLockState::_noLockHeaders == query->engine()->lockedShards()) {
|
||||
CollectionLockState::_noLockHeaders = nullptr;
|
||||
}
|
||||
// else {
|
||||
// We have not set it, just leave it alone. This happens in particular
|
||||
// on the DBServers, who do not set lockedShards() themselves.
|
||||
// }
|
||||
// If we have set _noLockHeaders, we need to unset it:
|
||||
if (CollectionLockState::_noLockHeaders != nullptr) {
|
||||
if (CollectionLockState::_noLockHeaders == query->engine()->lockedShards()) {
|
||||
CollectionLockState::_noLockHeaders = nullptr;
|
||||
}
|
||||
} else {
|
||||
THROW_ARANGO_EXCEPTION_MESSAGE(
|
||||
TRI_ERROR_INTERNAL, "query with given vocbase and id already there");
|
||||
// else {
|
||||
// We have not set it, just leave it alone. This happens in particular
|
||||
// on the DBServers, who do not set lockedShards() themselves.
|
||||
// }
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -192,50 +190,60 @@ void QueryRegistry::close(TRI_vocbase_t* vocbase, QueryId id, double ttl) {
|
|||
/// @brief destroy
|
||||
void QueryRegistry::destroy(std::string const& vocbase, QueryId id,
|
||||
int errorCode) {
|
||||
WRITE_LOCKER(writeLocker, _lock);
|
||||
std::unique_ptr<QueryInfo> queryInfo;
|
||||
|
||||
{
|
||||
WRITE_LOCKER(writeLocker, _lock);
|
||||
|
||||
auto m = _queries.find(vocbase);
|
||||
if (m == _queries.end()) {
|
||||
m = _queries.emplace(vocbase, std::unordered_map<QueryId, QueryInfo*>())
|
||||
.first;
|
||||
}
|
||||
auto q = m->second.find(id);
|
||||
if (q == m->second.end()) {
|
||||
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL,
|
||||
"query with given vocbase and id not found");
|
||||
}
|
||||
QueryInfo* qi = q->second;
|
||||
auto m = _queries.find(vocbase);
|
||||
|
||||
if (qi->_isOpen) {
|
||||
qi->_query->killed(true);
|
||||
return;
|
||||
if (m == _queries.end()) {
|
||||
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_BAD_PARAMETER,
|
||||
"query with given vocbase and id not found");
|
||||
}
|
||||
|
||||
auto q = m->second.find(id);
|
||||
|
||||
if (q == m->second.end()) {
|
||||
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_BAD_PARAMETER,
|
||||
"query with given vocbase and id not found");
|
||||
}
|
||||
|
||||
if (q->second->_isOpen) {
|
||||
// query in use by another thread/request
|
||||
q->second->_query->killed(true);
|
||||
return;
|
||||
}
|
||||
|
||||
// move query into our unique ptr, so we can process it outside
|
||||
// of the lock
|
||||
queryInfo.reset(q->second);
|
||||
|
||||
// remove query from the table of running queries
|
||||
q->second = nullptr;
|
||||
m->second.erase(q);
|
||||
}
|
||||
|
||||
// If the query is open, we can delete it right away, if not, we need
|
||||
TRI_ASSERT(queryInfo != nullptr);
|
||||
TRI_ASSERT(!queryInfo->_isOpen);
|
||||
|
||||
// If the query was open, we can delete it right away, if not, we need
|
||||
// to register the transaction with the current context and adjust
|
||||
// the debugging counters for transactions:
|
||||
if (!qi->_isOpen) {
|
||||
// If we had set _noLockHeaders, we need to reset it:
|
||||
if (qi->_query->engine()->lockedShards() != nullptr) {
|
||||
if (CollectionLockState::_noLockHeaders == nullptr) {
|
||||
CollectionLockState::_noLockHeaders = qi->_query->engine()->lockedShards();
|
||||
} else {
|
||||
LOG_TOPIC(WARN, arangodb::Logger::FIXME) << "Found strange lockedShards in thread, not overwriting!";
|
||||
}
|
||||
|
||||
// If we had set _noLockHeaders, we need to reset it:
|
||||
if (queryInfo->_query->engine()->lockedShards() != nullptr) {
|
||||
if (CollectionLockState::_noLockHeaders == nullptr) {
|
||||
CollectionLockState::_noLockHeaders = queryInfo->_query->engine()->lockedShards();
|
||||
} else {
|
||||
LOG_TOPIC(WARN, arangodb::Logger::FIXME) << "Found strange lockedShards in thread, not overwriting!";
|
||||
}
|
||||
}
|
||||
|
||||
if (errorCode == TRI_ERROR_NO_ERROR) {
|
||||
// commit the operation
|
||||
qi->_query->trx()->commit();
|
||||
queryInfo->_query->trx()->commit();
|
||||
}
|
||||
|
||||
// Now we can delete it:
|
||||
delete qi->_query;
|
||||
delete qi;
|
||||
|
||||
q->second = nullptr;
|
||||
m->second.erase(q);
|
||||
}
|
||||
|
||||
/// @brief destroy
|
||||
|
@ -301,3 +309,15 @@ void QueryRegistry::destroyAll() {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
QueryRegistry::QueryInfo::QueryInfo(QueryId id, Query* query, double ttl)
|
||||
: _vocbase(query->vocbase()),
|
||||
_id(id),
|
||||
_query(query),
|
||||
_isOpen(false),
|
||||
_timeToLive(ttl),
|
||||
_expires(TRI_microtime() + ttl) {}
|
||||
|
||||
QueryRegistry::QueryInfo::~QueryInfo() {
|
||||
delete _query;
|
||||
}
|
||||
|
|
|
@ -85,6 +85,9 @@ class QueryRegistry {
|
|||
private:
|
||||
/// @brief a struct for all information regarding one query in the registry
|
||||
struct QueryInfo {
|
||||
QueryInfo(QueryId id, Query* query, double ttl);
|
||||
~QueryInfo();
|
||||
|
||||
TRI_vocbase_t* _vocbase; // the vocbase
|
||||
QueryId _id; // id of the query
|
||||
Query* _query; // the actual query pointer
|
||||
|
|
Loading…
Reference in New Issue