diff --git a/arangod/Aql/EngineInfoContainerCoordinator.cpp b/arangod/Aql/EngineInfoContainerCoordinator.cpp index 8e4e1db102..7383cd80bf 100644 --- a/arangod/Aql/EngineInfoContainerCoordinator.cpp +++ b/arangod/Aql/EngineInfoContainerCoordinator.cpp @@ -146,7 +146,7 @@ ExecutionEngineResult EngineInfoContainerCoordinator::buildEngines( // destroy all query snippets in case of error auto guard = scopeGuard([&dbname, ®istry, &coordinatorQueryIds]() { for (auto const& it : coordinatorQueryIds) { - registry->destroy(dbname, it, TRI_ERROR_INTERNAL); + registry->destroy(dbname, it, TRI_ERROR_INTERNAL, false); } }); diff --git a/arangod/Aql/QueryRegistry.cpp b/arangod/Aql/QueryRegistry.cpp index f28ce733df..1ac973e866 100644 --- a/arangod/Aql/QueryRegistry.cpp +++ b/arangod/Aql/QueryRegistry.cpp @@ -22,6 +22,7 @@ //////////////////////////////////////////////////////////////////////////////// #include "QueryRegistry.h" +#include "ApplicationFeatures/ApplicationServer.h" #include "Aql/AqlItemBlock.h" #include "Aql/ExecutionEngine.h" #include "Aql/Query.h" @@ -60,7 +61,7 @@ QueryRegistry::~QueryRegistry() { // holding the lock for (auto& p : toDelete) { try { // just in case - destroy(p.first, p.second, TRI_ERROR_TRANSACTION_ABORTED); + destroy(p.first, p.second, TRI_ERROR_TRANSACTION_ABORTED, false); } catch (...) { } } @@ -82,6 +83,9 @@ void QueryRegistry::insert(QueryId id, Query* query, double ttl, // now insert into table of running queries { WRITE_LOCKER(writeLocker, _lock); + if (_disallowInserts) { + THROW_ARANGO_EXCEPTION(TRI_ERROR_SHUTTING_DOWN); + } auto result = _queries[vocbase.name()].emplace(id, std::move(p)); if (!result.second) { @@ -119,7 +123,13 @@ Query* QueryRegistry::open(TRI_vocbase_t* vocbase, QueryId id) { qi->_isOpen = true; if (!qi->_isPrepared) { - qi->_query->prepare(this); + try { + qi->_query->prepare(this); + } catch (...) { + qi->_isOpen = false; + qi->_expires = 0.0; + throw; + } qi->_isPrepared = true; } @@ -152,7 +162,13 @@ void QueryRegistry::close(TRI_vocbase_t* vocbase, QueryId id, double ttl) { if (!qi->_isPrepared) { qi->_isPrepared = true; - qi->_query->prepare(this); + try { + qi->_query->prepare(this); + } catch (...) { + qi->_isOpen = false; + qi->_expires = 0.0; + throw; + } } qi->_isOpen = false; @@ -161,7 +177,7 @@ void QueryRegistry::close(TRI_vocbase_t* vocbase, QueryId id, double ttl) { } /// @brief destroy -void QueryRegistry::destroy(std::string const& vocbase, QueryId id, int errorCode) { +void QueryRegistry::destroy(std::string const& vocbase, QueryId id, int errorCode, bool ignoreOpened) { std::unique_ptr queryInfo; { @@ -181,9 +197,10 @@ void QueryRegistry::destroy(std::string const& vocbase, QueryId id, int errorCod TRI_ERROR_BAD_PARAMETER, "query with given vocbase and id not found"); } - if (q->second->_isOpen) { + if (q->second->_isOpen && !ignoreOpened) { // query in use by another thread/request q->second->_query->kill(); + q->second->_expires = 0.0; return; } @@ -213,11 +230,6 @@ void QueryRegistry::destroy(std::string const& vocbase, QueryId id, int errorCod LOG_TOPIC(DEBUG, arangodb::Logger::AQL) << "query with id " << id << " is now destroyed"; } -/// @brief destroy -void QueryRegistry::destroy(TRI_vocbase_t* vocbase, QueryId id, int errorCode) { - destroy(vocbase->name(), id, errorCode); -} - ResultT QueryRegistry::isQueryInUse(TRI_vocbase_t* vocbase, QueryId id) { LOG_TOPIC(DEBUG, arangodb::Logger::AQL) << "Test if query with id " << id << "is in use."; @@ -268,7 +280,7 @@ void QueryRegistry::expireQueries() { for (auto& p : toDelete) { try { // just in case LOG_TOPIC(DEBUG, arangodb::Logger::AQL) << "timeout for query with id " << p.second; - destroy(p.first, p.second, TRI_ERROR_TRANSACTION_ABORTED); + destroy(p.first, p.second, TRI_ERROR_TRANSACTION_ABORTED, false); } catch (...) { } } @@ -299,13 +311,32 @@ void QueryRegistry::destroyAll() { try { LOG_TOPIC(DEBUG, arangodb::Logger::AQL) << "Timeout for query with id " << p.second << " due to shutdown"; - destroy(p.first, p.second, TRI_ERROR_SHUTTING_DOWN); + destroy(p.first, p.second, TRI_ERROR_SHUTTING_DOWN, false); } catch (...) { // ignore any errors here } } + + size_t count = 0; + { + READ_LOCKER(readlock, _lock); + for (auto& p : _queries) { + count += p.second.size(); + } + } + if (count > 0) { + LOG_TOPIC(INFO, arangodb::Logger::AQL) + << "number of remaining queries in query registry at shutdown: " << count; + } } +void QueryRegistry::disallowInserts() { + WRITE_LOCKER(writelock, _lock); + _disallowInserts = true; + // from here on, there shouldn't be any more inserts into the registry +} + + QueryRegistry::QueryInfo::QueryInfo(QueryId id, Query* query, double ttl, bool isPrepared) : _vocbase(&(query->vocbase())), _id(id), diff --git a/arangod/Aql/QueryRegistry.h b/arangod/Aql/QueryRegistry.h index 5008e5c3f4..ffc4bb1303 100644 --- a/arangod/Aql/QueryRegistry.h +++ b/arangod/Aql/QueryRegistry.h @@ -38,7 +38,7 @@ class Query; class QueryRegistry { public: - explicit QueryRegistry(double defTTL) : _defaultTTL(defTTL) {} + explicit QueryRegistry(double defTTL) : _defaultTTL(defTTL), _disallowInserts(false) {} TEST_VIRTUAL ~QueryRegistry(); @@ -75,9 +75,10 @@ class QueryRegistry { /// from the same thread that has opened it! Note that if the query is /// "open", then this will set the "killed" flag in the query and do not /// more. - TEST_VIRTUAL void destroy(std::string const& vocbase, QueryId id, int errorCode); - - void destroy(TRI_vocbase_t* vocbase, QueryId id, int errorCode); + /// if the ignoreOpened flag is set, it means the query will be shut down + /// and removed regardless if it is in use by anything else. this is only + /// safe to call if the current thread is currently using the query itself + TEST_VIRTUAL void destroy(std::string const& vocbase, QueryId id, int errorCode, bool ignoreOpened); ResultT isQueryInUse(TRI_vocbase_t* vocbase, QueryId id); @@ -90,6 +91,9 @@ class QueryRegistry { /// @brief for shutdown, we need to shut down all queries: void destroyAll(); + /// @brief from here on, disallow entering new queries into the registry + void disallowInserts(); + /// @brief return the default TTL value TEST_VIRTUAL double defaultTTL() const { return _defaultTTL; } @@ -130,6 +134,8 @@ class QueryRegistry { /// @brief the default TTL value double const _defaultTTL; + + bool _disallowInserts; }; } // namespace aql diff --git a/arangod/Aql/RestAqlHandler.cpp b/arangod/Aql/RestAqlHandler.cpp index 41c188fe62..d5995000d5 100644 --- a/arangod/Aql/RestAqlHandler.cpp +++ b/arangod/Aql/RestAqlHandler.cpp @@ -830,7 +830,7 @@ RestStatus RestAqlHandler::handleUseQuery(std::string const& operation, Query* q closeGuard.cancel(); // delete the query from the registry - _queryRegistry->destroy(&_vocbase, _qId, errorCode); + _queryRegistry->destroy(_vocbase.name(), _qId, errorCode, false); _qId = 0; answerBuilder.add(StaticStrings::Error, VPackValue(res.fail())); answerBuilder.add(StaticStrings::Code, VPackValue(res.errorNumber())); diff --git a/arangod/RestHandler/RestReplicationHandler.cpp b/arangod/RestHandler/RestReplicationHandler.cpp index e22c7b60e5..daef4fa403 100644 --- a/arangod/RestHandler/RestReplicationHandler.cpp +++ b/arangod/RestHandler/RestReplicationHandler.cpp @@ -2733,7 +2733,9 @@ Result RestReplicationHandler::createBlockingTransaction(aql::QueryId id, if (isTombstoned(id)) { try { // Code does not matter, read only access, so we can roll back. - queryRegistry->destroy(&_vocbase, id, TRI_ERROR_QUERY_KILLED); + // we can ignore the openness here, as it was our thread that had + // inserted the query just a couple of instructions before + queryRegistry->destroy(_vocbase.name(), id, TRI_ERROR_QUERY_KILLED, true /*ignoreOpened*/); } catch (...) { // Maybe thrown in shutdown. } @@ -2780,7 +2782,7 @@ ResultT RestReplicationHandler::cancelBlockingTransaction(aql::QueryId id) } try { // Code does not matter, read only access, so we can roll back. - queryRegistry->destroy(&_vocbase, id, TRI_ERROR_QUERY_KILLED); + queryRegistry->destroy(_vocbase.name(), id, TRI_ERROR_QUERY_KILLED, false); } catch (...) { // All errors that show up here can only be // triggered if the query is destroyed in between. diff --git a/arangod/RestServer/QueryRegistryFeature.cpp b/arangod/RestServer/QueryRegistryFeature.cpp index 3bd38755a7..7d1d5dafcf 100644 --- a/arangod/RestServer/QueryRegistryFeature.cpp +++ b/arangod/RestServer/QueryRegistryFeature.cpp @@ -177,6 +177,17 @@ void QueryRegistryFeature::prepare() { void QueryRegistryFeature::start() {} +void QueryRegistryFeature::beginShutdown() { + TRI_ASSERT(_queryRegistry != nullptr); + _queryRegistry->disallowInserts(); +} + +void QueryRegistryFeature::stop() { + TRI_ASSERT(_queryRegistry != nullptr); + _queryRegistry->disallowInserts(); + _queryRegistry->destroyAll(); +} + void QueryRegistryFeature::unprepare() { // clear the query registery QUERY_REGISTRY.store(nullptr, std::memory_order_release); diff --git a/arangod/RestServer/QueryRegistryFeature.h b/arangod/RestServer/QueryRegistryFeature.h index 00ce0f349f..4174a2fb29 100644 --- a/arangod/RestServer/QueryRegistryFeature.h +++ b/arangod/RestServer/QueryRegistryFeature.h @@ -44,6 +44,8 @@ class QueryRegistryFeature final : public application_features::ApplicationFeatu void validateOptions(std::shared_ptr) override final; void prepare() override final; void start() override final; + void beginShutdown() override final; + void stop() override final; void unprepare() override final; bool trackSlowQueries() const { return _trackSlowQueries; } diff --git a/tests/Aql/EngineInfoContainerCoordinatorTest.cpp b/tests/Aql/EngineInfoContainerCoordinatorTest.cpp index 68ca1a2ecf..5409d81df9 100644 --- a/tests/Aql/EngineInfoContainerCoordinatorTest.cpp +++ b/tests/Aql/EngineInfoContainerCoordinatorTest.cpp @@ -647,8 +647,8 @@ TEST_CASE("EngineInfoContainerCoordinator", "[aql][cluster][coordinator]") { .AlwaysReturn(&block); fakeit::When(OverloadedMethod(mockRegistry, destroy, - void(std::string const&, QueryId, int))) - .Do([&](std::string const& vocbase, QueryId id, int errorCode) { + void(std::string const&, QueryId, int, bool))) + .Do([&](std::string const& vocbase, QueryId id, int errorCode, bool ignoreOpened) { REQUIRE(vocbase == dbname); REQUIRE(id == secondId); REQUIRE(errorCode == TRI_ERROR_INTERNAL); @@ -742,7 +742,7 @@ TEST_CASE("EngineInfoContainerCoordinator", "[aql][cluster][coordinator]") { // Assert unregister of second engine. fakeit::Verify(OverloadedMethod(mockRegistry, destroy, - void(std::string const&, QueryId, int))) + void(std::string const&, QueryId, int, bool))) .Exactly(1); } @@ -788,7 +788,7 @@ TEST_CASE("EngineInfoContainerCoordinator", "[aql][cluster][coordinator]") { fakeit::Verify(Method(mockRegistry, insert)).Exactly(1); // Assert unregister of second engine. - fakeit::Verify(OverloadedMethod(mockRegistry, destroy, void(std::string const&, QueryId, int))).Exactly(0); + fakeit::Verify(OverloadedMethod(mockRegistry, destroy, void(std::string const&, QueryId, int, bool))).Exactly(0); } */