From 152bc7c556c50e9c024747b06396b6fe1e7e843f Mon Sep 17 00:00:00 2001 From: Jan Date: Wed, 6 Nov 2019 13:33:44 +0100 Subject: [PATCH] better killability of cluster AQL queries (#10360) --- CHANGELOG | 17 +++ arangod/Aql/DistributeExecutor.cpp | 17 +++ arangod/Aql/DistributeExecutor.h | 4 + .../Aql/EngineInfoContainerCoordinator.cpp | 4 +- arangod/Aql/EngineInfoContainerCoordinator.h | 3 +- arangod/Aql/ExecutionEngine.cpp | 77 +++++++++- arangod/Aql/ExecutionEngine.h | 16 ++ arangod/Aql/Query.cpp | 25 +++- arangod/Aql/Query.h | 4 +- arangod/Aql/QueryExecutionState.cpp | 1 + arangod/Aql/QueryExecutionState.h | 1 + arangod/Aql/QueryList.cpp | 40 ++--- arangod/Aql/QueryList.h | 7 +- arangod/Aql/QueryProfile.cpp | 25 +++- arangod/Aql/QueryProfile.h | 10 +- arangod/Aql/QueryRegistry.cpp | 18 +++ arangod/Aql/QueryRegistry.h | 4 + arangod/Aql/RemoteExecutor.cpp | 8 + arangod/Aql/RestAqlHandler.cpp | 28 +++- arangod/Aql/RestAqlHandler.h | 6 +- arangod/RestHandler/RestQueryHandler.cpp | 25 ++-- arangod/RestHandler/RestQueryHandler.h | 6 +- arangod/RestServer/BootstrapFeature.cpp | 2 +- arangod/Transaction/Manager.cpp | 16 +- arangod/V8Server/v8-vocbase.cpp | 4 +- .../EngineInfoContainerCoordinatorTest.cpp | 15 +- tests/js/client/shell/shell-aql-kill.js | 140 ++++++++++++++++++ tests/js/server/aql/aql-failures-cluster.js | 93 ++++++++++++ 28 files changed, 548 insertions(+), 68 deletions(-) create mode 100644 tests/js/client/shell/shell-aql-kill.js create mode 100644 tests/js/server/aql/aql-failures-cluster.js diff --git a/CHANGELOG b/CHANGELOG index ea1ac98d41..815cb63f9d 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,23 @@ v3.5.3 (XXXX-XX-XX) ------------------- +* Improve killability of some types of cluster AQL queries. Previously, several + cluster queries, especially those containing a `DistributeNode` in their + execution plans, did not respond to a kill instruction. + + This change also introduces a new query status "killed", which may now be + returned by the REST APIs at `/_api/query/current` and `/_api/query/slow` in + the `state` attribute of each query. + +* Improve shutdown of some cluster AQL queries on the coordinator in case the + query has multiple coordinator snippets (true for queries involving more than + one collection) and the database server(s) cannot be reached on query + shutdown. In this case the proper shutdown of the coordinator parts of the + query previously was deferred until the coordinator snippets were removed by + the automatic garbage collection. Now, the cleanup of the coordinator snippets + will happen much more quickly, which reduces the chances of the queries + blocking resources. + * Fixed ArangoSearch index removes being discarded on commiting consolidation results with pending removes after some segments under consolidation were already committed. diff --git a/arangod/Aql/DistributeExecutor.cpp b/arangod/Aql/DistributeExecutor.cpp index 95b84edce4..df645bced3 100644 --- a/arangod/Aql/DistributeExecutor.cpp +++ b/arangod/Aql/DistributeExecutor.cpp @@ -23,6 +23,7 @@ #include "DistributeExecutor.h" #include "Aql/Collection.h" +#include "Aql/Query.h" #include "VocBase/LogicalCollection.h" #include @@ -73,6 +74,9 @@ std::pair ExecutionBlockImpl ExecutionBlockImpl::getSomeForShardWithoutTrace( size_t atMost, std::string const& shardId) { + if (getQuery().killed()) { + THROW_ARANGO_EXCEPTION(TRI_ERROR_QUERY_KILLED); + } // NOTE: We do not need to retain these, the getOrSkipSome is required to! size_t skipped = 0; SharedAqlItemBlockPtr result = nullptr; @@ -96,6 +100,9 @@ std::pair ExecutionBlockImpl::skipSo std::pair ExecutionBlockImpl::skipSomeForShardWithoutTrace( size_t atMost, std::string const& shardId) { + if (getQuery().killed()) { + THROW_ARANGO_EXCEPTION(TRI_ERROR_QUERY_KILLED); + } // NOTE: We do not need to retain these, the getOrSkipSome is required to! size_t skipped = 0; SharedAqlItemBlockPtr result = nullptr; @@ -209,6 +216,7 @@ bool ExecutionBlockImpl::hasMoreForClientId(size_t clientId) /// current one. std::pair ExecutionBlockImpl::getBlockForClient( size_t atMost, size_t clientId) { + if (_buffer.empty()) { _index = 0; // position in _buffer _pos = 0; // position in _buffer.at(_index) @@ -219,6 +227,9 @@ std::pair ExecutionBlockImpl::getBlock while (buf.size() < atMost) { if (_index == _buffer.size()) { + if (getQuery().killed()) { + THROW_ARANGO_EXCEPTION(TRI_ERROR_QUERY_KILLED); + } auto res = getBlock(atMost); if (res.first == ExecutionState::WAITING) { return {res.first, false}; @@ -257,6 +268,10 @@ std::pair ExecutionBlockImpl::getBlock /// attributes of the Aql value to determine to which shard /// the row should be sent and return its clientId size_t ExecutionBlockImpl::sendToClient(SharedAqlItemBlockPtr cur) { + if (getQuery().killed()) { + THROW_ARANGO_EXCEPTION(TRI_ERROR_QUERY_KILLED); + } + // inspect cur in row _pos and check to which shard it should be sent . . AqlValue val = cur->getValueReference(_pos, _regId); @@ -356,6 +371,8 @@ size_t ExecutionBlockImpl::sendToClient(SharedAqlItemBlockPt return getClientId(shardId); } +Query const& ExecutionBlockImpl::getQuery() const noexcept { return _query; } + /// @brief create a new document key std::string ExecutionBlockImpl::createKey(VPackSlice input) const { return _collection->getCollection()->createKey(input); diff --git a/arangod/Aql/DistributeExecutor.h b/arangod/Aql/DistributeExecutor.h index 0d5c1c8182..4073c6ee75 100644 --- a/arangod/Aql/DistributeExecutor.h +++ b/arangod/Aql/DistributeExecutor.h @@ -35,6 +35,8 @@ class DistributeNode; // ExecutionBlockImpl, so this class only exists to identify the specialization. class DistributeExecutor {}; +class Query; + /** * @brief See ExecutionBlockImpl.h for documentation. */ @@ -97,6 +99,8 @@ class ExecutionBlockImpl : public BlocksWithClients { ExecutorInfos const& infos() const { return _infos; } + Query const& getQuery() const noexcept; + private: ExecutorInfos _infos; diff --git a/arangod/Aql/EngineInfoContainerCoordinator.cpp b/arangod/Aql/EngineInfoContainerCoordinator.cpp index 9f449502ad..f3d6fdc757 100644 --- a/arangod/Aql/EngineInfoContainerCoordinator.cpp +++ b/arangod/Aql/EngineInfoContainerCoordinator.cpp @@ -138,11 +138,11 @@ QueryId EngineInfoContainerCoordinator::closeSnippet() { ExecutionEngineResult EngineInfoContainerCoordinator::buildEngines( Query* query, QueryRegistry* registry, std::string const& dbname, std::unordered_set const& restrictToShards, - MapRemoteToSnippet const& dbServerQueryIds) const { + MapRemoteToSnippet const& dbServerQueryIds, + std::vector& coordinatorQueryIds) const { TRI_ASSERT(_engineStack.size() == 1); TRI_ASSERT(_engineStack.top() == 0); - std::vector coordinatorQueryIds{}; // destroy all query snippets in case of error auto guard = scopeGuard([&dbname, ®istry, &coordinatorQueryIds]() { for (auto const& it : coordinatorQueryIds) { diff --git a/arangod/Aql/EngineInfoContainerCoordinator.h b/arangod/Aql/EngineInfoContainerCoordinator.h index 410f2cb342..df63e23a88 100644 --- a/arangod/Aql/EngineInfoContainerCoordinator.h +++ b/arangod/Aql/EngineInfoContainerCoordinator.h @@ -99,7 +99,8 @@ class EngineInfoContainerCoordinator { ExecutionEngineResult buildEngines(Query* query, QueryRegistry* registry, std::string const& dbname, std::unordered_set const& restrictToShards, - MapRemoteToSnippet const& dbServerQueryIds) const; + MapRemoteToSnippet const& dbServerQueryIds, + std::vector& coordinatorQueryIds) const; private: // @brief List of EngineInfos to distribute accross the cluster diff --git a/arangod/Aql/ExecutionEngine.cpp b/arangod/Aql/ExecutionEngine.cpp index afcdbb6c9f..f98147aaf6 100644 --- a/arangod/Aql/ExecutionEngine.cpp +++ b/arangod/Aql/ExecutionEngine.cpp @@ -41,6 +41,7 @@ #include "Cluster/ClusterComm.h" #include "Cluster/ServerState.h" #include "Logger/Logger.h" +#include "RestServer/QueryRegistryFeature.h" using namespace arangodb; using namespace arangodb::aql; @@ -438,19 +439,59 @@ struct CoordinatorInstanciator final : public WalkerWorker { // The coordinator engines cannot decide on lock issues later on, // however every engine gets injected the list of locked shards. + std::vector coordinatorQueryIds{}; res = _coordinatorParts.buildEngines(_query, registry, _query->vocbase().name(), - _query->queryOptions().shardIds, queryIds); + _query->queryOptions().shardIds, queryIds, coordinatorQueryIds); if (res.ok()) { cleanupGuard.cancel(); } + _query->engine()->snippetMapping(std::move(queryIds), std::move(coordinatorQueryIds)); + return res; } }; +void ExecutionEngine::kill() { + // kill coordinator parts + // TODO: this doesn't seem to be necessary and sometimes even show adverse effects + // so leaving this deactivated for now + // auto queryRegistry = QueryRegistryFeature::registry(); + // if (queryRegistry != nullptr) { + // for (auto const& id : _coordinatorQueryIds) { + // queryRegistry->kill(&(_query.vocbase()), id); + // } + // } + auto cc = ClusterComm::instance(); + if (cc == nullptr) { + return; + } + + std::unordered_map headers; + CoordTransactionID coordinatorTransactionID = TRI_NewTickServer(); + auto body = std::make_shared(); + + // kill DB server parts + // RemoteNodeId -> DBServerId -> [snippetId] + + for (auto const& it : _dbServerMapping) { + for (auto const& it2 : it.second) { + for (auto const& snippetId : it2.second) { + TRI_ASSERT(it2.first.substr(0, 7) == "server:"); + cc->asyncRequest(coordinatorTransactionID, it2.first, rest::RequestType::DELETE_REQ, + "/_api/aql/kill/" + snippetId, body, headers, nullptr, + 10.0, false, 2.0); + } + } + } +} + std::pair ExecutionEngine::initializeCursor(SharedAqlItemBlockPtr&& items, size_t pos) { + if (_query->killed()) { + THROW_ARANGO_EXCEPTION(TRI_ERROR_QUERY_KILLED); + } InputAqlItemRow inputRow{CreateInvalidInputRowHint{}}; if (items != nullptr) { inputRow = InputAqlItemRow{std::move(items), pos}; @@ -464,6 +505,9 @@ std::pair ExecutionEngine::initializeCursor(SharedAqlIte } std::pair ExecutionEngine::getSome(size_t atMost) { + if (_query->killed()) { + THROW_ARANGO_EXCEPTION(TRI_ERROR_QUERY_KILLED); + } if (!_initializeCursorCalled) { auto res = initializeCursor(nullptr, 0); if (res.first == ExecutionState::WAITING) { @@ -474,6 +518,9 @@ std::pair ExecutionEngine::getSome(size_t } std::pair ExecutionEngine::skipSome(size_t atMost) { + if (_query->killed()) { + THROW_ARANGO_EXCEPTION(TRI_ERROR_QUERY_KILLED); + } if (!_initializeCursorCalled) { auto res = initializeCursor(nullptr, 0); if (res.first == ExecutionState::WAITING) { @@ -483,10 +530,13 @@ std::pair ExecutionEngine::skipSome(size_t atMost) { return _root->skipSome(atMost); } -Result ExecutionEngine::shutdownSync(int errorCode) noexcept { +Result ExecutionEngine::shutdownSync(int errorCode) noexcept try { Result res{TRI_ERROR_INTERNAL}; ExecutionState state = ExecutionState::WAITING; try { + TRI_IF_FAILURE("ExecutionEngine::shutdownSync") { + THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG); + } std::shared_ptr sharedState = _query->sharedState(); if (sharedState != nullptr) { sharedState->setContinueCallback(); @@ -498,10 +548,33 @@ Result ExecutionEngine::shutdownSync(int errorCode) noexcept { } } } + } catch (basics::Exception const& ex) { + res.reset(ex.code(), std::string("unable to shutdown query: ") + ex.what()); + } catch (std::exception const& ex) { + res.reset(TRI_ERROR_INTERNAL, std::string("unable to shutdown query: ") + ex.what()); } catch (...) { res.reset(TRI_ERROR_INTERNAL); } + + if (res.fail() && ServerState::instance()->isCoordinator()) { + // shutdown attempt has failed... + // in a cluster, try to at least abort all other coordinator parts + auto queryRegistry = QueryRegistryFeature::registry(); + if (queryRegistry != nullptr) { + for (auto const& id : _coordinatorQueryIds) { + try { + queryRegistry->destroy(_query->vocbase().name(), id, errorCode, false); + } catch (...) { + // we want to abort all parts, even if aborting other parts fails + } + } + } + } + return res; +} catch (...) { + // nothing we can do here... + return Result(TRI_ERROR_INTERNAL, "unable to shutdown query"); } /// @brief shutdown, will be called exactly once for the whole query diff --git a/arangod/Aql/ExecutionEngine.h b/arangod/Aql/ExecutionEngine.h index b9c6f071f2..614bedb8c4 100644 --- a/arangod/Aql/ExecutionEngine.h +++ b/arangod/Aql/ExecutionEngine.h @@ -67,6 +67,16 @@ class ExecutionEngine { /// @brief get the query TEST_VIRTUAL Query* getQuery() const { return _query; } + /// @brief server to snippet mapping + void snippetMapping(MapRemoteToSnippet&& dbServerMapping, + std::vector&& coordinatorQueryIds) { + _dbServerMapping = std::move(dbServerMapping); + _coordinatorQueryIds = std::move(coordinatorQueryIds); + } + + /// @brief kill the query + void kill(); + /// @brief initializeCursor, could be called multiple times std::pair initializeCursor(SharedAqlItemBlockPtr&& items, size_t pos); @@ -133,6 +143,12 @@ class ExecutionEngine { /// @brief whether or not shutdown() was executed bool _wasShutdown; + + /// @brief server to snippet mapping + MapRemoteToSnippet _dbServerMapping; + + /// @brief ids of all coordinator query snippets + std::vector _coordinatorQueryIds; }; } // namespace aql } // namespace arangodb diff --git a/arangod/Aql/Query.cpp b/arangod/Aql/Query.cpp index e5b0a7d408..75ee950a59 100644 --- a/arangod/Aql/Query.cpp +++ b/arangod/Aql/Query.cpp @@ -206,6 +206,7 @@ Query::~Query() { << " this: " << (uintptr_t)this; } + // this will reset _trx, so _trx is invalid after here cleanupPlanAndEngineSync(TRI_ERROR_INTERNAL); exitContext(); @@ -270,7 +271,14 @@ Query* Query::clone(QueryPart part, bool withPlan) { } /// @brief set the query to killed -void Query::kill() { _killed = true; } +void Query::kill() { + _killed = true; + if (_engine != nullptr) { + // killing is best effort... + // intentionally ignoring the result of this call here + _engine->kill(); + } +} void Query::setExecutionTime() { if (_engine != nullptr) { @@ -568,6 +576,10 @@ ExecutionState Query::execute(QueryRegistry* registry, QueryResult& queryResult) TRI_ASSERT(registry != nullptr); try { + if (_killed) { + THROW_ARANGO_EXCEPTION(TRI_ERROR_QUERY_KILLED); + } + bool useQueryCache = canUseQueryCache(); switch (_executionPhase) { @@ -887,6 +899,10 @@ ExecutionState Query::executeV8(v8::Isolate* isolate, QueryRegistry* registry, } } } + + if (_killed) { + THROW_ARANGO_EXCEPTION(TRI_ERROR_QUERY_KILLED); + } } builder->close(); @@ -1398,6 +1414,13 @@ ExecutionState Query::cleanupPlanAndEngine(int errorCode, VPackBuilder* statsBui _engine.reset(); } + // the following call removes the query from the list of currently + // running queries. so whoever fetches that list will not see a Query that + // is about to shut down/be destroyed + if (_profile != nullptr) { + _profile->unregisterFromQueryList(); + } + // If the transaction was not committed, it is automatically aborted _trx = nullptr; diff --git a/arangod/Aql/Query.h b/arangod/Aql/Query.h index 23350903cf..9fe2cdbaae 100644 --- a/arangod/Aql/Query.h +++ b/arangod/Aql/Query.h @@ -105,12 +105,14 @@ class Query { /// @brief clone a query /// note: as a side-effect, this will also create and start a transaction for /// the query - TEST_VIRTUAL Query* clone(QueryPart, bool); + TEST_VIRTUAL Query* clone(QueryPart, bool withPlan); constexpr static uint64_t DontCache = 0; /// @brief whether or not the query is killed inline bool killed() const { return _killed; } + + void setKilled() { _killed = true; } /// @brief set the query to killed void kill(); diff --git a/arangod/Aql/QueryExecutionState.cpp b/arangod/Aql/QueryExecutionState.cpp index 35444fc370..0c0e66f333 100644 --- a/arangod/Aql/QueryExecutionState.cpp +++ b/arangod/Aql/QueryExecutionState.cpp @@ -36,6 +36,7 @@ static std::string const StateNames[] = { "executing", // EXECUTION "finalizing", // FINALIZATION "finished", // FINISHED + "killed", // KILLED "invalid" // INVALID }; diff --git a/arangod/Aql/QueryExecutionState.h b/arangod/Aql/QueryExecutionState.h index ea2f599949..e6df4b50e5 100644 --- a/arangod/Aql/QueryExecutionState.h +++ b/arangod/Aql/QueryExecutionState.h @@ -43,6 +43,7 @@ enum class ValueType { EXECUTION, FINALIZATION, FINISHED, + KILLED, INVALID_STATE }; diff --git a/arangod/Aql/QueryList.cpp b/arangod/Aql/QueryList.cpp index 8635ca4d51..02e4b457f9 100644 --- a/arangod/Aql/QueryList.cpp +++ b/arangod/Aql/QueryList.cpp @@ -27,6 +27,7 @@ #include "Aql/QueryProfile.h" #include "Basics/Exceptions.h" #include "Basics/ReadLocker.h" +#include "Basics/Result.h" #include "Basics/WriteLocker.h" #include "Logger/Logger.h" #include "RestServer/QueryRegistryFeature.h" @@ -114,14 +115,12 @@ void QueryList::remove(Query* query) { } WRITE_LOCKER(writeLocker, _lock); - auto it = _current.find(query->id()); - if (it == _current.end()) { + if (_current.erase(query->id()) == 0) { + // not found return; } - _current.erase(it); - bool const isStreaming = query->queryOptions().stream; double threshold = (isStreaming ? _slowStreamingQueryThreshold : _slowQueryThreshold); @@ -178,7 +177,7 @@ void QueryList::remove(Query* query) { _slow.emplace_back(query->id(), std::move(q), _trackBindVars ? query->bindParameters() : nullptr, started, now - started, - QueryExecutionState::ValueType::FINISHED, isStreaming); + query->killed() ? QueryExecutionState::ValueType::KILLED : QueryExecutionState::ValueType::FINISHED, isStreaming); if (++_slowCount > _maxSlowQueries) { // free first element @@ -191,13 +190,13 @@ void QueryList::remove(Query* query) { } /// @brief kills a query -int QueryList::kill(TRI_voc_tick_t id) { - WRITE_LOCKER(writeLocker, _lock); +Result QueryList::kill(TRI_voc_tick_t id) { + READ_LOCKER(writeLocker, _lock); auto it = _current.find(id); if (it == _current.end()) { - return TRI_ERROR_QUERY_NOT_FOUND; + return {TRI_ERROR_QUERY_NOT_FOUND}; } Query* query = (*it).second; @@ -205,27 +204,32 @@ int QueryList::kill(TRI_voc_tick_t id) { << "killing AQL query " << id << " '" << query->queryString() << "'"; query->kill(); - return TRI_ERROR_NO_ERROR; + return Result(); } -/// @brief kills all currently running queries -uint64_t QueryList::killAll(bool silent) { +/// @brief kills all currently running queries that match the filter function +/// (i.e. the filter should return true for a queries to be killed) +uint64_t QueryList::kill(std::function const& filter, bool silent) { uint64_t killed = 0; - WRITE_LOCKER(writeLocker, _lock); + READ_LOCKER(readLocker, _lock); for (auto& it : _current) { - Query* query = it.second; + Query& query = *(it.second); + + if (!filter(query)) { + continue; + } if (silent) { LOG_TOPIC("f7722", TRACE, arangodb::Logger::FIXME) - << "killing AQL query " << query->id() << " '" << query->queryString() << "'"; + << "killing AQL query " << query.id() << " '" << query.queryString() << "'"; } else { LOG_TOPIC("90113", WARN, arangodb::Logger::FIXME) - << "killing AQL query " << query->id() << " '" << query->queryString() << "'"; + << "killing AQL query " << query.id() << " '" << query.queryString() << "'"; } - query->kill(); + query.kill(); ++killed; } @@ -259,7 +263,9 @@ std::vector QueryList::listCurrent() { result.emplace_back(query->id(), extractQueryString(query, maxLength), _trackBindVars ? query->bindParameters() : nullptr, started, - now - started, query->state(), query->queryOptions().stream); + now - started, + query->killed() ? QueryExecutionState::ValueType::KILLED : query->state(), + query->queryOptions().stream); } } diff --git a/arangod/Aql/QueryList.h b/arangod/Aql/QueryList.h index e5dfc56b16..abb4287d89 100644 --- a/arangod/Aql/QueryList.h +++ b/arangod/Aql/QueryList.h @@ -184,10 +184,11 @@ class QueryList { void remove(Query*); /// @brief kills a query - int kill(TRI_voc_tick_t); + Result kill(TRI_voc_tick_t id); - /// @brief kills all currently running queries - uint64_t killAll(bool silent); + /// @brief kills all currently running queries that match the filter function + /// (i.e. the filter should return true for a queries to be killed) + uint64_t kill(std::function const& filter, bool silent); /// @brief return the list of running queries std::vector listCurrent(); diff --git a/arangod/Aql/QueryProfile.cpp b/arangod/Aql/QueryProfile.cpp index 1820281e2a..842ff3034a 100644 --- a/arangod/Aql/QueryProfile.cpp +++ b/arangod/Aql/QueryProfile.cpp @@ -36,21 +36,27 @@ using namespace arangodb::aql; /// @brief create a profile QueryProfile::QueryProfile(Query* query) - : _query(query), _lastStamp(query->startTime()), _tracked(false) { + : _query(query), + _lastStamp(query->startTime()), + _tracked(false) { for (auto& it : _timers) { it = 0.0; // reset timers } - auto queryList = query->vocbase().queryList(); - - try { - _tracked = queryList->insert(query); - } catch (...) { - } + registerInQueryList(query); } /// @brief destroy a profile QueryProfile::~QueryProfile() { + unregisterFromQueryList(); +} + +void QueryProfile::registerInQueryList(Query* query) { + auto queryList = query->vocbase().queryList(); + _tracked = queryList->insert(query); +} + +void QueryProfile::unregisterFromQueryList() noexcept { // only remove from list when the query was inserted into it... if (_tracked) { auto queryList = _query->vocbase().queryList(); @@ -59,6 +65,8 @@ QueryProfile::~QueryProfile() { queryList->remove(_query); } catch (...) { } + + _tracked = false; } } @@ -66,7 +74,8 @@ QueryProfile::~QueryProfile() { double QueryProfile::setStateDone(QueryExecutionState::ValueType state) { double const now = TRI_microtime(); - if (state != QueryExecutionState::ValueType::INVALID_STATE) { + if (state != QueryExecutionState::ValueType::INVALID_STATE && + state != QueryExecutionState::ValueType::KILLED) { // record duration of state _timers[static_cast(state)] = now - _lastStamp; } diff --git a/arangod/Aql/QueryProfile.h b/arangod/Aql/QueryProfile.h index b73b3b247d..8e6ada06e2 100644 --- a/arangod/Aql/QueryProfile.h +++ b/arangod/Aql/QueryProfile.h @@ -42,11 +42,14 @@ struct QueryProfile { QueryProfile(QueryProfile const&) = delete; QueryProfile& operator=(QueryProfile const&) = delete; - explicit QueryProfile(Query*); + explicit QueryProfile(Query* query); ~QueryProfile(); public: + /// @brief unregister the query from the list of queries, if entered + void unregisterFromQueryList() noexcept; + double setStateDone(QueryExecutionState::ValueType); /// @brief sets the absolute end time for an execution state @@ -59,6 +62,9 @@ struct QueryProfile { /// @brief convert the profile to VelocyPack void toVelocyPack(arangodb::velocypack::Builder&) const; + + private: + void registerInQueryList(Query* query); private: Query* _query; @@ -71,7 +77,7 @@ struct QueryProfile { // as we reserve a statically sized array for it static_assert(static_cast(QueryExecutionState::ValueType::INITIALIZATION) == 0, "unexpected min QueryExecutionState enum value"); -static_assert(static_cast(QueryExecutionState::ValueType::INVALID_STATE) < 10, +static_assert(static_cast(QueryExecutionState::ValueType::INVALID_STATE) < 11, "unexpected max QueryExecutionState enum value"); } // namespace aql diff --git a/arangod/Aql/QueryRegistry.cpp b/arangod/Aql/QueryRegistry.cpp index 30a242a1c6..42eb7739d0 100644 --- a/arangod/Aql/QueryRegistry.cpp +++ b/arangod/Aql/QueryRegistry.cpp @@ -100,6 +100,24 @@ void QueryRegistry::insert(QueryId id, Query* query, double ttl, } } +/// @brief kill a query +bool QueryRegistry::kill(TRI_vocbase_t* vocbase, QueryId id) { + READ_LOCKER(writeLocker, _lock); + + auto m = _queries.find(vocbase->name()); + if (m == _queries.end()) { + return false; + } + auto q = m->second.find(id); + if (q == m->second.end()) { + return false; + } + + std::unique_ptr& qi = q->second; + qi->_query->setKilled(); + return true; +} + /// @brief open Query* QueryRegistry::open(TRI_vocbase_t* vocbase, QueryId id) { LOG_TOPIC("8c204", DEBUG, arangodb::Logger::AQL) << "Open query with id " << id; diff --git a/arangod/Aql/QueryRegistry.h b/arangod/Aql/QueryRegistry.h index edf4213184..82ace1fdd6 100644 --- a/arangod/Aql/QueryRegistry.h +++ b/arangod/Aql/QueryRegistry.h @@ -43,6 +43,10 @@ class QueryRegistry { TEST_VIRTUAL ~QueryRegistry(); public: + /// @brief kills a query by id - returns true if the query was found and + /// false otherwise + bool kill(TRI_vocbase_t* vocbase, QueryId id); + /// @brief insert, this inserts the query for the vocbase /// and the id into the registry. It is in error if there is already /// a query for this and combination and an exception will diff --git a/arangod/Aql/RemoteExecutor.cpp b/arangod/Aql/RemoteExecutor.cpp index 44fe02ba5a..9229268dbf 100644 --- a/arangod/Aql/RemoteExecutor.cpp +++ b/arangod/Aql/RemoteExecutor.cpp @@ -149,6 +149,10 @@ std::pair ExecutionBlockImpl::skipSomeWi // we were called with an error need to throw it. THROW_ARANGO_EXCEPTION(res); } + + if (getQuery().killed()) { + THROW_ARANGO_EXCEPTION(TRI_ERROR_QUERY_KILLED); + } if (_lastResponse != nullptr) { TRI_ASSERT(_lastError.ok()); @@ -214,6 +218,10 @@ std::pair ExecutionBlockImpl::initialize // will initialize the cursor lazily return {ExecutionState::DONE, TRI_ERROR_NO_ERROR}; } + + if (getQuery().killed()) { + THROW_ARANGO_EXCEPTION(TRI_ERROR_QUERY_KILLED); + } if (_lastResponse != nullptr || _lastError.fail()) { // We have an open result still. diff --git a/arangod/Aql/RestAqlHandler.cpp b/arangod/Aql/RestAqlHandler.cpp index b4ec56ddde..c81e055c99 100644 --- a/arangod/Aql/RestAqlHandler.cpp +++ b/arangod/Aql/RestAqlHandler.cpp @@ -470,6 +470,12 @@ void RestAqlHandler::createQueryFromVelocyPack() { sendResponse(rest::ResponseCode::ACCEPTED, answerBody.slice()); } +// DELETE method for /_api/aql/kill/, (internal) +bool RestAqlHandler::killQuery(std::string const& idString) { + _qId = arangodb::basics::StringUtils::uint64(idString); + return _queryRegistry->kill(&_vocbase, _qId); +} + // PUT method for /_api/aql//, (internal) // this is using the part of the cursor API with side effects. // : can be "lock" or "getSome" or "skip" or "initializeCursor" or @@ -606,7 +612,27 @@ RestStatus RestAqlHandler::execute() { } break; } - case rest::RequestType::DELETE_REQ: + case rest::RequestType::DELETE_REQ: { + if (suffixes.size() != 2) { + std::string msg("Unknown DELETE API: "); + msg += arangodb::basics::StringUtils::join(suffixes, '/'); + LOG_TOPIC("f1993", ERR, arangodb::Logger::AQL) << msg; + generateError(rest::ResponseCode::NOT_FOUND, TRI_ERROR_HTTP_NOT_FOUND, + std::move(msg)); + } else { + if (killQuery(suffixes[1])) { + VPackBuilder answerBody; + { + VPackObjectBuilder guard(&answerBody); + answerBody.add("error", VPackValue(false)); + } + sendResponse(rest::ResponseCode::OK, answerBody.slice()); + } else { + generateError(rest::ResponseCode::NOT_FOUND, TRI_ERROR_QUERY_NOT_FOUND); + } + } + break; + } case rest::RequestType::HEAD: case rest::RequestType::PATCH: case rest::RequestType::OPTIONS: diff --git a/arangod/Aql/RestAqlHandler.h b/arangod/Aql/RestAqlHandler.h index 8ce92abac7..3956d9de4d 100644 --- a/arangod/Aql/RestAqlHandler.h +++ b/arangod/Aql/RestAqlHandler.h @@ -54,6 +54,9 @@ class RestAqlHandler : public RestVocbaseBaseHandler { RestStatus continueExecute() override; public: + // DELETE method for /_api/aql/kill/, (internal) + bool killQuery(std::string const& idString); + // POST method for /_api/aql/instantiate // The body is a VelocyPack with attributes "plan" for the execution plan and // "options" for the options, all exactly as in AQL_EXECUTEJSON. @@ -125,8 +128,7 @@ class RestAqlHandler : public RestVocbaseBaseHandler { RestStatus handleUseQuery(std::string const&, Query*, arangodb::velocypack::Slice const); // parseVelocyPackBody, returns a nullptr and produces an error - // response if - // parse was not successful. + // response if parse was not successful. std::shared_ptr parseVelocyPackBody(); private: diff --git a/arangod/RestHandler/RestQueryHandler.cpp b/arangod/RestHandler/RestQueryHandler.cpp index de39df1cad..63e84027a7 100644 --- a/arangod/RestHandler/RestQueryHandler.cpp +++ b/arangod/RestHandler/RestQueryHandler.cpp @@ -149,7 +149,7 @@ bool RestQueryHandler::readQuery() { return true; } -bool RestQueryHandler::deleteQuerySlow() { +void RestQueryHandler::deleteQuerySlow() { auto queryList = _vocbase.queryList(); queryList->clearSlow(); @@ -161,18 +161,16 @@ bool RestQueryHandler::deleteQuerySlow() { result.close(); generateResult(rest::ResponseCode::OK, result.slice()); - - return true; } -bool RestQueryHandler::deleteQuery(std::string const& name) { +void RestQueryHandler::deleteQuery(std::string const& name) { auto id = StringUtils::uint64(name); auto queryList = _vocbase.queryList(); TRI_ASSERT(queryList != nullptr); - auto res = queryList->kill(id); + Result res = queryList->kill(id); - if (res == TRI_ERROR_NO_ERROR) { + if (res.ok()) { VPackBuilder result; result.add(VPackValue(VPackValueType::Object)); result.add(StaticStrings::Error, VPackValue(false)); @@ -181,29 +179,28 @@ bool RestQueryHandler::deleteQuery(std::string const& name) { generateResult(rest::ResponseCode::OK, result.slice()); } else { - generateError(GeneralResponse::responseCode(res), res, - "cannot kill query '" + name + "': " + TRI_errno_string(res)); + generateError(GeneralResponse::responseCode(res.errorNumber()), res.errorNumber(), + "cannot kill query '" + name + "': " + res.errorMessage()); } - - return true; } /// @brief interrupts a query -bool RestQueryHandler::deleteQuery() { +void RestQueryHandler::deleteQuery() { auto const& suffixes = _request->suffixes(); if (suffixes.size() != 1) { generateError(rest::ResponseCode::BAD, TRI_ERROR_HTTP_BAD_PARAMETER, "expecting DELETE /_api/query/ or /_api/query/slow"); - return true; + return; } auto const& name = suffixes[0]; if (name == "slow") { - return deleteQuerySlow(); + deleteQuerySlow(); + } else { + deleteQuery(name); } - return deleteQuery(name); } bool RestQueryHandler::replaceProperties() { diff --git a/arangod/RestHandler/RestQueryHandler.h b/arangod/RestHandler/RestQueryHandler.h index 5bc39cc7da..cff8c6c2e4 100644 --- a/arangod/RestHandler/RestQueryHandler.h +++ b/arangod/RestHandler/RestQueryHandler.h @@ -69,19 +69,19 @@ class RestQueryHandler : public RestVocbaseBaseHandler { /// @brief removes the slow log ////////////////////////////////////////////////////////////////////////////// - bool deleteQuerySlow(); + void deleteQuerySlow(); ////////////////////////////////////////////////////////////////////////////// /// @brief interrupts a named query ////////////////////////////////////////////////////////////////////////////// - bool deleteQuery(std::string const& name); + void deleteQuery(std::string const& name); ////////////////////////////////////////////////////////////////////////////// /// @brief interrupts a query ////////////////////////////////////////////////////////////////////////////// - bool deleteQuery(); + void deleteQuery(); ////////////////////////////////////////////////////////////////////////////// /// @brief changes the settings diff --git a/arangod/RestServer/BootstrapFeature.cpp b/arangod/RestServer/BootstrapFeature.cpp index 4ff4fede2b..e79437d1b0 100644 --- a/arangod/RestServer/BootstrapFeature.cpp +++ b/arangod/RestServer/BootstrapFeature.cpp @@ -376,7 +376,7 @@ void BootstrapFeature::unprepare() { TRI_vocbase_t* vocbase = databaseFeature->useDatabase(name); if (vocbase != nullptr) { - vocbase->queryList()->killAll(true); + vocbase->queryList()->kill([](aql::Query&) { return true; }, true); vocbase->release(); } } diff --git a/arangod/Transaction/Manager.cpp b/arangod/Transaction/Manager.cpp index b61b7e23d6..db546e5ff3 100644 --- a/arangod/Transaction/Manager.cpp +++ b/arangod/Transaction/Manager.cpp @@ -23,7 +23,10 @@ #include "Manager.h" +#include "Aql/Query.h" +#include "Aql/QueryList.h" #include "Basics/ReadLocker.h" +#include "Basics/ScopeGuard.h" #include "Basics/WriteLocker.h" #include "Cluster/ClusterComm.h" #include "Cluster/ClusterInfo.h" @@ -130,6 +133,9 @@ void Manager::registerTransaction(TRI_voc_tid_t transactionId, _transactions[bucket]._activeTransactions.emplace(transactionId, std::move(data)); } catch (...) { _nrRunning.fetch_sub(1, std::memory_order_relaxed); + if (!isReadOnlyTransaction) { + _rwLock.unlockRead(); + } throw; } } @@ -137,6 +143,13 @@ void Manager::registerTransaction(TRI_voc_tid_t transactionId, // unregisters a transaction void Manager::unregisterTransaction(TRI_voc_tid_t transactionId, bool markAsFailed, bool isReadOnlyTransaction) { + // always perform an unlock when we leave this function + auto guard = scopeGuard([this, &isReadOnlyTransaction]() { + if (!isReadOnlyTransaction) { + _rwLock.unlockRead(); + } + }); + uint64_t r = _nrRunning.fetch_sub(1, std::memory_order_relaxed); TRI_ASSERT(r > 0); @@ -151,9 +164,6 @@ void Manager::unregisterTransaction(TRI_voc_tid_t transactionId, bool markAsFail _transactions[bucket]._failedTransactions.emplace(transactionId); } } - if (!isReadOnlyTransaction) { - _rwLock.unlockRead(); - } } // return the set of failed transactions diff --git a/arangod/V8Server/v8-vocbase.cpp b/arangod/V8Server/v8-vocbase.cpp index 3ffaba22b7..a29e31d62f 100644 --- a/arangod/V8Server/v8-vocbase.cpp +++ b/arangod/V8Server/v8-vocbase.cpp @@ -1039,9 +1039,9 @@ static void JS_QueriesKillAql(v8::FunctionCallbackInfo const& args) { auto* queryList = vocbase.queryList(); TRI_ASSERT(queryList != nullptr); - auto res = queryList->kill(id); + Result res = queryList->kill(id); - if (res == TRI_ERROR_NO_ERROR) { + if (res.ok()) { TRI_V8_RETURN_TRUE(); } diff --git a/tests/Aql/EngineInfoContainerCoordinatorTest.cpp b/tests/Aql/EngineInfoContainerCoordinatorTest.cpp index dfaba7ceae..a9ead317e5 100644 --- a/tests/Aql/EngineInfoContainerCoordinatorTest.cpp +++ b/tests/Aql/EngineInfoContainerCoordinatorTest.cpp @@ -139,8 +139,9 @@ TEST(EngineInfoContainerTest, it_should_create_an_executionengine_for_the_first_ EngineInfoContainerCoordinator testee; testee.addNode(&sNode); + std::vector coordinatorQueryIds{}; ExecutionEngineResult result = - testee.buildEngines(&query, ®istry, dbname, restrictToShards, queryIds); + testee.buildEngines(&query, ®istry, dbname, restrictToShards, queryIds, coordinatorQueryIds); ASSERT_TRUE(result.ok()); ExecutionEngine* engine = result.engine(); @@ -296,8 +297,9 @@ TEST(EngineInfoContainerTest, // Close the second snippet testee.closeSnippet(); + std::vector coordinatorQueryIds{}; ExecutionEngineResult result = - testee.buildEngines(&query, ®istry, dbname, restrictToShards, queryIds); + testee.buildEngines(&query, ®istry, dbname, restrictToShards, queryIds, coordinatorQueryIds); ASSERT_TRUE(result.ok()); ExecutionEngine* engine = result.engine(); @@ -532,8 +534,9 @@ TEST(EngineInfoContainerTest, snippets_are_a_stack_insert_node_always_into_top_s testee.addNode(&tbNode); + std::vector coordinatorQueryIds{}; ExecutionEngineResult result = - testee.buildEngines(&query, ®istry, dbname, restrictToShards, queryIds); + testee.buildEngines(&query, ®istry, dbname, restrictToShards, queryIds, coordinatorQueryIds); ASSERT_TRUE(result.ok()); ExecutionEngine* engine = result.engine(); @@ -699,8 +702,9 @@ TEST(EngineInfoContainerTest, error_cases_cloning_of_a_query_fails_throws_an_err }) .Throw(arangodb::basics::Exception(TRI_ERROR_DEBUG, __FILE__, __LINE__)); + std::vector coordinatorQueryIds{}; ExecutionEngineResult result = - testee.buildEngines(&query, ®istry, dbname, restrictToShards, queryIds); + testee.buildEngines(&query, ®istry, dbname, restrictToShards, queryIds, coordinatorQueryIds); ASSERT_TRUE(!result.ok()); // Make sure we check the right thing here ASSERT_TRUE(result.errorNumber() == TRI_ERROR_DEBUG); @@ -866,8 +870,9 @@ TEST(EngineInfoContainerTest, error_cases_cloning_of_a_query_fails_returns_a_nul return nullptr; }); + std::vector coordinatorQueryIds{}; ExecutionEngineResult result = - testee.buildEngines(&query, ®istry, dbname, restrictToShards, queryIds); + testee.buildEngines(&query, ®istry, dbname, restrictToShards, queryIds, coordinatorQueryIds); ASSERT_TRUE(!result.ok()); // Make sure we check the right thing here ASSERT_TRUE(result.errorNumber() == TRI_ERROR_INTERNAL); diff --git a/tests/js/client/shell/shell-aql-kill.js b/tests/js/client/shell/shell-aql-kill.js new file mode 100644 index 0000000000..d991a95f0b --- /dev/null +++ b/tests/js/client/shell/shell-aql-kill.js @@ -0,0 +1,140 @@ +/* jshint globalstrict:false, strict:false, maxlen: 200 */ +/* global assertTrue, assertEqual, arango */ + +// ////////////////////////////////////////////////////////////////////////////// +// / @brief ArangoTransaction sTests +// / +// / +// / DISCLAIMER +// / +// / Copyright 2018 ArangoDB GmbH, Cologne, Germany +// / +// / Licensed under the Apache License, Version 2.0 (the "License") +// / you may not use this file except in compliance with the License. +// / You may obtain a copy of the License at +// / +// / http://www.apache.org/licenses/LICENSE-2.0 +// / +// / Unless required by applicable law or agreed to in writing, software +// / distributed under the License is distributed on an "AS IS" BASIS, +// / WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// / See the License for the specific language governing permissions and +// / limitations under the License. +// / +// / Copyright holder is triAGENS GmbH, Cologne, Germany +// / +// / @author Jan Steemann +// ////////////////////////////////////////////////////////////////////////////// + +var jsunity = require('jsunity'); +var internal = require('internal'); +var arangodb = require('@arangodb'); +var db = arangodb.db; + +// ////////////////////////////////////////////////////////////////////////////// +// / @brief test suite +// ////////////////////////////////////////////////////////////////////////////// + +function aqlKillSuite () { + 'use strict'; + const cn = "UnitTestsCollection"; + + return { + + setUpAll: function () { + db._drop(cn); + db._create(cn, { numberOfShards: 3 }); + }, + + tearDownAll: function () { + db._drop(cn); + }, + + testAbortReadQuery: function () { + let result = arango.POST_RAW("/_api/cursor", { + query: "FOR i IN 1..10000000 RETURN SLEEP(1)" + }, { + "x-arango-async" : "store" + }); + + let jobId = result.headers["x-arango-async-id"]; + + let queryId = 0; + let tries = 0; + while (++tries < 30) { + let queries = require("@arangodb/aql/queries").current(); + queries.filter(function(data) { + if (data.query.indexOf("SLEEP(1)") !== -1) { + queryId = data.id; + } + }); + if (queryId > 0) { + break; + } + + require("internal").wait(1, false); + } + + assertTrue(queryId > 0); + + result = arango.DELETE("/_api/query/" + queryId); + assertEqual(result.code, 200); + + tries = 0; + while (++tries < 30) { + result = arango.PUT_RAW("/_api/job/" + jobId, {}); + if (result.code === 410) { + break; + } + require("internal").wait(1, false); + } + assertEqual(410, result.code); + }, + + testAbortWriteQuery: function () { + let result = arango.POST_RAW("/_api/cursor", { + query: "FOR i IN 1..10000000 INSERT {} INTO " + cn + }, { + "x-arango-async" : "store" + }); + + let jobId = result.headers["x-arango-async-id"]; + + let queryId = 0; + let tries = 0; + while (++tries < 30) { + let queries = require("@arangodb/aql/queries").current(); + queries.filter(function(data) { + if (data.query.indexOf(cn) !== -1) { + queryId = data.id; + } + }); + if (queryId > 0) { + break; + } + + require("internal").wait(1, false); + } + + assertTrue(queryId > 0); + + result = arango.DELETE("/_api/query/" + queryId); + assertEqual(result.code, 200); + + tries = 0; + while (++tries < 30) { + result = arango.PUT_RAW("/_api/job/" + jobId, {}); + if (result.code === 410) { + break; + } + require("internal").wait(1, false); + } + assertEqual(410, result.code); + }, + + }; +} + +jsunity.run(aqlKillSuite); + +return jsunity.done(); diff --git a/tests/js/server/aql/aql-failures-cluster.js b/tests/js/server/aql/aql-failures-cluster.js new file mode 100644 index 0000000000..2ea750422d --- /dev/null +++ b/tests/js/server/aql/aql-failures-cluster.js @@ -0,0 +1,93 @@ +/*jshint globalstrict:false, strict:false, maxlen: 400 */ +/*global fail, assertEqual, AQL_EXECUTE */ + +//////////////////////////////////////////////////////////////////////////////// +/// @brief test failure scenarios +/// +/// @file +/// +/// DISCLAIMER +/// +/// Copyright 2010-2012 triagens GmbH, Cologne, Germany +/// +/// Licensed under the Apache License, Version 2.0 (the "License"); +/// you may not use this file except in compliance with the License. +/// You may obtain a copy of the License at +/// +/// http://www.apache.org/licenses/LICENSE-2.0 +/// +/// Unless required by applicable law or agreed to in writing, software +/// distributed under the License is distributed on an "AS IS" BASIS, +/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +/// See the License for the specific language governing permissions and +/// limitations under the License. +/// +/// Copyright holder is triAGENS GmbH, Cologne, Germany +/// +/// @author Jan Steemann +/// @author Michael Hackstein +/// @author Copyright 2012, triAGENS GmbH, Cologne, Germany +//////////////////////////////////////////////////////////////////////////////// + +var jsunity = require("jsunity"); +var arangodb = require("@arangodb"); +var db = arangodb.db; +var internal = require("internal"); + +//////////////////////////////////////////////////////////////////////////////// +/// @brief test suite +//////////////////////////////////////////////////////////////////////////////// + +function ahuacatlFailureSuite () { + 'use strict'; + const cn = "UnitTestsAhuacatlFailures"; + const en = "UnitTestsAhuacatlEdgeFailures"; + + return { + + setUpAll: function () { + internal.debugClearFailAt(); + db._drop(cn); + db._create(cn); + db._drop(en); + db._createEdgeCollection(en); + }, + + setUp: function () { + internal.debugClearFailAt(); + }, + + tearDownAll: function () { + internal.debugClearFailAt(); + db._drop(cn); + db._drop(en); + }, + + tearDown: function() { + internal.debugClearFailAt(); + }, + + testShutdownSync : function () { + internal.debugSetFailAt("ExecutionEngine::shutdownSync"); + + let res = AQL_EXECUTE("FOR doc IN " + cn + " RETURN doc").json; + // no real test expectations here, just that the query works and doesn't fail on shutdown + assertEqual(0, res.length); + }, + + testShutdownSyncDiamond : function () { + internal.debugSetFailAt("ExecutionEngine::shutdownSync"); + + let res = AQL_EXECUTE("FOR doc1 IN " + cn + " FOR doc2 IN " + en + " FILTER doc1._key == doc2._key RETURN doc1").json; + // no real test expectations here, just that the query works and doesn't fail on shutdown + assertEqual(0, res.length); + }, + + }; +} + +if (internal.debugCanUseFailAt()) { + jsunity.run(ahuacatlFailureSuite); +} + +return jsunity.done();