diff --git a/arangod/Aql/DistributeExecutor.cpp b/arangod/Aql/DistributeExecutor.cpp index 4a8255222b..f2011af567 100644 --- a/arangod/Aql/DistributeExecutor.cpp +++ b/arangod/Aql/DistributeExecutor.cpp @@ -25,6 +25,7 @@ #include "Aql/ClusterNodes.h" #include "Aql/Collection.h" #include "Aql/ExecutionEngine.h" +#include "Aql/Query.h" #include "Aql/RegisterPlan.h" #include "Basics/StaticStrings.h" #include "VocBase/LogicalCollection.h" @@ -78,6 +79,9 @@ std::pair ExecutionBlockImpl ExecutionBlockImpl::getSomeForShardWithoutTrace( size_t atMost, std::string const& shardId) { + if (getQuery().killed()) { + THROW_ARANGO_EXCEPTION(TRI_ERROR_QUERY_KILLED); + } // NOTE: We do not need to retain these, the getOrSkipSome is required to! size_t skipped = 0; SharedAqlItemBlockPtr result = nullptr; @@ -101,6 +105,9 @@ std::pair ExecutionBlockImpl::skipSo std::pair ExecutionBlockImpl::skipSomeForShardWithoutTrace( size_t atMost, std::string const& shardId) { + if (getQuery().killed()) { + THROW_ARANGO_EXCEPTION(TRI_ERROR_QUERY_KILLED); + } // NOTE: We do not need to retain these, the getOrSkipSome is required to! size_t skipped = 0; SharedAqlItemBlockPtr result = nullptr; @@ -214,6 +221,7 @@ bool ExecutionBlockImpl::hasMoreForClientId(size_t clientId) /// current one. std::pair ExecutionBlockImpl::getBlockForClient( size_t atMost, size_t clientId) { + if (_buffer.empty()) { _index = 0; // position in _buffer _pos = 0; // position in _buffer.at(_index) @@ -224,6 +232,9 @@ std::pair ExecutionBlockImpl::getBlock while (buf.size() < atMost) { if (_index == _buffer.size()) { + if (getQuery().killed()) { + THROW_ARANGO_EXCEPTION(TRI_ERROR_QUERY_KILLED); + } auto res = getBlock(atMost); if (res.first == ExecutionState::WAITING) { return {res.first, false}; @@ -262,6 +273,10 @@ std::pair ExecutionBlockImpl::getBlock /// attributes of the Aql value to determine to which shard /// the row should be sent and return its clientId size_t ExecutionBlockImpl::sendToClient(SharedAqlItemBlockPtr cur) { + if (getQuery().killed()) { + THROW_ARANGO_EXCEPTION(TRI_ERROR_QUERY_KILLED); + } + // inspect cur in row _pos and check to which shard it should be sent . . AqlValue val = cur->getValueReference(_pos, _regId); @@ -363,6 +378,8 @@ size_t ExecutionBlockImpl::sendToClient(SharedAqlItemBlockPt return getClientId(shardId); } +Query const& ExecutionBlockImpl::getQuery() const noexcept { return _query; } + /// @brief create a new document key std::string ExecutionBlockImpl::createKey(VPackSlice input) const { return _logCol->createKey(input); diff --git a/arangod/Aql/DistributeExecutor.h b/arangod/Aql/DistributeExecutor.h index ea91488e90..003e3fda1f 100644 --- a/arangod/Aql/DistributeExecutor.h +++ b/arangod/Aql/DistributeExecutor.h @@ -36,6 +36,8 @@ class DistributeNode; // ExecutionBlockImpl, so this class only exists to identify the specialization. class DistributeExecutor {}; +class Query; + /** * @brief See ExecutionBlockImpl.h for documentation. */ @@ -97,6 +99,8 @@ class ExecutionBlockImpl : public BlocksWithClients { std::string createKey(arangodb::velocypack::Slice) const; ExecutorInfos const& infos() const; + + Query const& getQuery() const noexcept; private: ExecutorInfos _infos; diff --git a/arangod/Aql/EngineInfoContainerCoordinator.cpp b/arangod/Aql/EngineInfoContainerCoordinator.cpp index 4d730850ab..f6479f17f8 100644 --- a/arangod/Aql/EngineInfoContainerCoordinator.cpp +++ b/arangod/Aql/EngineInfoContainerCoordinator.cpp @@ -145,11 +145,11 @@ QueryId EngineInfoContainerCoordinator::closeSnippet() { ExecutionEngineResult EngineInfoContainerCoordinator::buildEngines( Query& query, QueryRegistry* registry, std::string const& dbname, std::unordered_set const& restrictToShards, - MapRemoteToSnippet const& dbServerQueryIds) const { + MapRemoteToSnippet const& dbServerQueryIds, + std::vector& coordinatorQueryIds) const { TRI_ASSERT(_engineStack.size() == 1); TRI_ASSERT(_engineStack.top() == 0); - std::vector coordinatorQueryIds{}; // destroy all query snippets in case of error auto guard = scopeGuard([&dbname, ®istry, &coordinatorQueryIds]() { for (auto const& it : coordinatorQueryIds) { diff --git a/arangod/Aql/EngineInfoContainerCoordinator.h b/arangod/Aql/EngineInfoContainerCoordinator.h index baa6f4e85e..392fe4e759 100644 --- a/arangod/Aql/EngineInfoContainerCoordinator.h +++ b/arangod/Aql/EngineInfoContainerCoordinator.h @@ -100,7 +100,8 @@ class EngineInfoContainerCoordinator { ExecutionEngineResult buildEngines(Query& query, QueryRegistry* registry, std::string const& dbname, std::unordered_set const& restrictToShards, - MapRemoteToSnippet const& dbServerQueryIds) const; + MapRemoteToSnippet const& dbServerQueryIds, + std::vector& coordinatorQueryIds) const; private: // @brief List of EngineInfos to distribute accross the cluster diff --git a/arangod/Aql/EngineInfoContainerDBServerServerBased.cpp b/arangod/Aql/EngineInfoContainerDBServerServerBased.cpp index 50355f61d5..83c26cd69d 100644 --- a/arangod/Aql/EngineInfoContainerDBServerServerBased.cpp +++ b/arangod/Aql/EngineInfoContainerDBServerServerBased.cpp @@ -193,22 +193,22 @@ EngineInfoContainerDBServerServerBased::EngineInfoContainerDBServerServerBased(Q // NOTE: We need to start with _lastSnippetID > 0. 0 is reserved for GraphNodes } -void EngineInfoContainerDBServerServerBased::injectVertexColletions(GraphNode* graphNode){ - auto const& vCols = graphNode->vertexColls(); - if (vCols.empty()) { - std::map const* allCollections = - _query.collections()->collections(); - auto& resolver = _query.resolver(); - for (auto const& it : *allCollections) { - // If resolver cannot resolve this collection - // it has to be a view. - if (!resolver.getCollection(it.first)) { - continue; - } - // All known edge collections will be ignored by this call! - graphNode->injectVertexCollection(it.second); +void EngineInfoContainerDBServerServerBased::injectVertexColletions(GraphNode* graphNode) { + auto const& vCols = graphNode->vertexColls(); + if (vCols.empty()) { + std::map const* allCollections = + _query.collections()->collections(); + auto& resolver = _query.resolver(); + for (auto const& it : *allCollections) { + // If resolver cannot resolve this collection + // it has to be a view. + if (!resolver.getCollection(it.first)) { + continue; } + // All known edge collections will be ignored by this call! + graphNode->injectVertexCollection(it.second); } + } } // Insert a new node into the last engine on the stack diff --git a/arangod/Aql/ExecutionEngine.cpp b/arangod/Aql/ExecutionEngine.cpp index 54e71a345e..a479456226 100644 --- a/arangod/Aql/ExecutionEngine.cpp +++ b/arangod/Aql/ExecutionEngine.cpp @@ -40,8 +40,13 @@ #include "Aql/WalkerWorker.h" #include "Basics/ScopeGuard.h" #include "Cluster/ServerState.h" +#include "Futures/Utilities.h" #include "Logger/Logger.h" +#include "Logger/LogMacros.h" +#include "Network/Methods.h" #include "Network/NetworkFeature.h" +#include "Network/Utils.h" +#include "RestServer/QueryRegistryFeature.h" using namespace arangodb; using namespace arangodb::aql; @@ -473,6 +478,7 @@ struct DistributedQueryInstanciator final : public WalkerWorker { _dbserverParts.cleanupEngines(pool, TRI_ERROR_INTERNAL, _query.vocbase().name(), queryIds); }); + std::unordered_map nodeAliases; ExecutionEngineResult res = _dbserverParts.buildEngines(queryIds, nodeAliases); if (res.fail()) { @@ -481,8 +487,10 @@ struct DistributedQueryInstanciator final : public WalkerWorker { // The coordinator engines cannot decide on lock issues later on, // however every engine gets injected the list of locked shards. + std::vector coordinatorQueryIds{}; res = _coordinatorParts.buildEngines(_query, registry, _query.vocbase().name(), - _query.queryOptions().shardIds, queryIds); + _query.queryOptions().shardIds, queryIds, + coordinatorQueryIds); if (res.ok()) { TRI_ASSERT(_query.engine() != nullptr); @@ -490,12 +498,58 @@ struct DistributedQueryInstanciator final : public WalkerWorker { cleanupGuard.cancel(); } + _query.engine()->snippetMapping(std::move(queryIds), std::move(coordinatorQueryIds)); + return res; } }; +void ExecutionEngine::kill() { + // kill coordinator parts + // TODO: this doesn't seem to be necessary and sometimes even show adverse effects + // so leaving this deactivated for now + // auto queryRegistry = QueryRegistryFeature::registry(); + // if (queryRegistry != nullptr) { + // for (auto const& id : _coordinatorQueryIds) { + // queryRegistry->kill(&(_query.vocbase()), id); + // } + // } + + // kill DB server parts + // RemoteNodeId -> DBServerId -> [snippetId] + NetworkFeature const& nf = _query.vocbase().server().getFeature(); + network::ConnectionPool* pool = nf.pool(); + if (pool == nullptr) { + return; + } + + VPackBuffer body; + std::vector futures; + + for (auto const& it : _dbServerMapping) { + for (auto const& it2 : it.second) { + for (auto const& snippetId : it2.second) { + network::Headers headers; + TRI_ASSERT(it2.first.substr(0, 7) == "server:"); + auto future = network::sendRequest(pool, it2.first, fuerte::RestVerb::Delete, + "/_api/aql/kill/" + snippetId, body, std::move(headers)); + futures.emplace_back(std::move(future)); + } + } + } + + if (!futures.empty()) { + // killing is best-effort + // we are ignoring all errors intentionally here + futures::collectAll(futures).get(); + } +} + std::pair ExecutionEngine::initializeCursor(SharedAqlItemBlockPtr&& items, size_t pos) { + if (_query.killed()) { + THROW_ARANGO_EXCEPTION(TRI_ERROR_QUERY_KILLED); + } InputAqlItemRow inputRow{CreateInvalidInputRowHint{}}; if (items != nullptr) { inputRow = InputAqlItemRow{std::move(items), pos}; @@ -509,6 +563,9 @@ std::pair ExecutionEngine::initializeCursor(SharedAqlIte } std::pair ExecutionEngine::getSome(size_t atMost) { + if (_query.killed()) { + THROW_ARANGO_EXCEPTION(TRI_ERROR_QUERY_KILLED); + } if (!_initializeCursorCalled) { auto res = initializeCursor(nullptr, 0); if (res.first == ExecutionState::WAITING) { @@ -519,6 +576,9 @@ std::pair ExecutionEngine::getSome(size_t } std::pair ExecutionEngine::skipSome(size_t atMost) { + if (_query.killed()) { + THROW_ARANGO_EXCEPTION(TRI_ERROR_QUERY_KILLED); + } if (!_initializeCursorCalled) { auto res = initializeCursor(nullptr, 0); if (res.first == ExecutionState::WAITING) { diff --git a/arangod/Aql/ExecutionEngine.h b/arangod/Aql/ExecutionEngine.h index b53df0f08a..b3b70ff34d 100644 --- a/arangod/Aql/ExecutionEngine.h +++ b/arangod/Aql/ExecutionEngine.h @@ -72,6 +72,16 @@ class ExecutionEngine { /// @brief get the query TEST_VIRTUAL Query* getQuery() const; + + /// @brief server to snippet mapping + void snippetMapping(MapRemoteToSnippet&& dbServerMapping, + std::vector&& coordinatorQueryIds) { + _dbServerMapping = std::move(dbServerMapping); + _coordinatorQueryIds = std::move(coordinatorQueryIds); + } + + /// @brief kill the query + void kill(); /// @brief initializeCursor, could be called multiple times std::pair initializeCursor(SharedAqlItemBlockPtr&& items, size_t pos); @@ -132,6 +142,12 @@ class ExecutionEngine { /// @brief whether or not shutdown() was executed bool _wasShutdown; + + /// @brief server to snippet mapping + MapRemoteToSnippet _dbServerMapping; + + /// @brief ids of all coordinator query snippets + std::vector _coordinatorQueryIds; }; } // namespace aql } // namespace arangodb diff --git a/arangod/Aql/GraphNode.cpp b/arangod/Aql/GraphNode.cpp index bbaa3f928b..0deda90265 100644 --- a/arangod/Aql/GraphNode.cpp +++ b/arangod/Aql/GraphNode.cpp @@ -418,9 +418,9 @@ GraphNode::GraphNode(ExecutionPlan* plan, size_t id, TRI_vocbase_t* vocbase, GraphNode::~GraphNode() = default; std::string const& GraphNode::collectionToShardName(std::string const& collName) const { - if(_collectionToShard.empty()){ + if (_collectionToShard.empty()) { return collName; - }; + } auto found = _collectionToShard.find(collName); TRI_ASSERT(found != _collectionToShard.cend()); diff --git a/arangod/Aql/ModificationExecutor.cpp b/arangod/Aql/ModificationExecutor.cpp index cde853e8e9..7a95e081ec 100644 --- a/arangod/Aql/ModificationExecutor.cpp +++ b/arangod/Aql/ModificationExecutor.cpp @@ -94,7 +94,7 @@ ModificationExecutor::produceRows(OutputAqlItemRow& outpu TRI_IF_FAILURE("ModificationBlock::getSome") { THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG); } - + TRI_ASSERT(_modifier._block != nullptr); // prepares modifier for single row output diff --git a/arangod/Aql/NoResultsExecutor.cpp b/arangod/Aql/NoResultsExecutor.cpp index d9c225def0..1a52cf4290 100644 --- a/arangod/Aql/NoResultsExecutor.cpp +++ b/arangod/Aql/NoResultsExecutor.cpp @@ -34,7 +34,7 @@ constexpr bool NoResultsExecutor::Properties::preservesOrder; constexpr BlockPassthrough NoResultsExecutor::Properties::allowsBlockPassthrough; constexpr bool NoResultsExecutor::Properties::inputSizeRestrictsOutputSize; -NoResultsExecutor::NoResultsExecutor(Fetcher& fetcher, ExecutorInfos& infos){}; +NoResultsExecutor::NoResultsExecutor(Fetcher& fetcher, ExecutorInfos& infos) {} NoResultsExecutor::~NoResultsExecutor() = default; std::pair NoResultsExecutor::produceRows(OutputAqlItemRow& output) { diff --git a/arangod/Aql/Query.cpp b/arangod/Aql/Query.cpp index 7247ccb068..13d2fc6a7c 100644 --- a/arangod/Aql/Query.cpp +++ b/arangod/Aql/Query.cpp @@ -210,6 +210,7 @@ Query::~Query() { << " this: " << (uintptr_t)this; } + // this will reset _trx, so _trx is invalid after here cleanupPlanAndEngineSync(TRI_ERROR_INTERNAL); exitContext(); @@ -275,7 +276,14 @@ Query* Query::clone(QueryPart part, bool withPlan) { } /// @brief set the query to killed -void Query::kill() { _killed = true; } +void Query::kill() { + _killed = true; + if (_engine != nullptr) { + // killing is best effort... + // intentionally ignoring the result of this call here + _engine->kill(); + } +} void Query::setExecutionTime() { if (_engine != nullptr) { @@ -573,6 +581,10 @@ ExecutionState Query::execute(QueryRegistry* registry, QueryResult& queryResult) TRI_ASSERT(registry != nullptr); try { + if (_killed) { + THROW_ARANGO_EXCEPTION(TRI_ERROR_QUERY_KILLED); + } + bool useQueryCache = canUseQueryCache(); switch (_executionPhase) { @@ -898,6 +910,10 @@ ExecutionState Query::executeV8(v8::Isolate* isolate, QueryRegistry* registry, } } } + + if (_killed) { + THROW_ARANGO_EXCEPTION(TRI_ERROR_QUERY_KILLED); + } } builder->close(); @@ -1428,6 +1444,13 @@ ExecutionState Query::cleanupPlanAndEngine(int errorCode, VPackBuilder* statsBui _engine.reset(); } + // the following call removes the query from the list of currently + // running queries. so whoever fetches that list will not see a Query that + // is about to shut down/be destroyed + if (_profile != nullptr) { + _profile->unregisterFromQueryList(); + } + // If the transaction was not committed, it is automatically aborted _trx = nullptr; diff --git a/arangod/Aql/Query.h b/arangod/Aql/Query.h index 62aaae144a..75d590c0eb 100644 --- a/arangod/Aql/Query.h +++ b/arangod/Aql/Query.h @@ -104,12 +104,14 @@ class Query { /// @brief clone a query /// note: as a side-effect, this will also create and start a transaction for /// the query - TEST_VIRTUAL Query* clone(QueryPart, bool); + TEST_VIRTUAL Query* clone(QueryPart, bool withPlan); constexpr static uint64_t DontCache = 0; /// @brief whether or not the query is killed inline bool killed() const { return _killed; } + + void setKilled() { _killed = true; } /// @brief set the query to killed void kill(); diff --git a/arangod/Aql/QueryExecutionState.cpp b/arangod/Aql/QueryExecutionState.cpp index e5f757cd4c..d82b337bcf 100644 --- a/arangod/Aql/QueryExecutionState.cpp +++ b/arangod/Aql/QueryExecutionState.cpp @@ -40,6 +40,7 @@ static std::string const StateNames[] = { "executing", // EXECUTION "finalizing", // FINALIZATION "finished", // FINISHED + "killed", // KILLED "invalid" // INVALID }; diff --git a/arangod/Aql/QueryExecutionState.h b/arangod/Aql/QueryExecutionState.h index baddc90d59..27252ff6b1 100644 --- a/arangod/Aql/QueryExecutionState.h +++ b/arangod/Aql/QueryExecutionState.h @@ -44,6 +44,7 @@ enum class ValueType { EXECUTION, FINALIZATION, FINISHED, + KILLED, INVALID_STATE }; diff --git a/arangod/Aql/QueryList.cpp b/arangod/Aql/QueryList.cpp index db8450e307..de98a548db 100644 --- a/arangod/Aql/QueryList.cpp +++ b/arangod/Aql/QueryList.cpp @@ -27,6 +27,7 @@ #include "Aql/QueryProfile.h" #include "Basics/Exceptions.h" #include "Basics/ReadLocker.h" +#include "Basics/Result.h" #include "Basics/WriteLocker.h" #include "Basics/system-functions.h" #include "Logger/LogMacros.h" @@ -109,14 +110,12 @@ void QueryList::remove(Query* query) { } WRITE_LOCKER(writeLocker, _lock); - auto it = _current.find(query->id()); - if (it == _current.end()) { + if (_current.erase(query->id()) == 0) { + // not found return; } - _current.erase(it); - bool const isStreaming = query->queryOptions().stream; double threshold = (isStreaming ? _slowStreamingQueryThreshold : _slowQueryThreshold); @@ -173,7 +172,7 @@ void QueryList::remove(Query* query) { _slow.emplace_back(query->id(), std::move(q), _trackBindVars ? query->bindParameters() : nullptr, started, now - started, - QueryExecutionState::ValueType::FINISHED, isStreaming); + query->killed() ? QueryExecutionState::ValueType::KILLED : QueryExecutionState::ValueType::FINISHED, isStreaming); if (++_slowCount > _maxSlowQueries) { // free first element @@ -186,13 +185,13 @@ void QueryList::remove(Query* query) { } /// @brief kills a query -int QueryList::kill(TRI_voc_tick_t id) { - WRITE_LOCKER(writeLocker, _lock); +Result QueryList::kill(TRI_voc_tick_t id) { + READ_LOCKER(writeLocker, _lock); auto it = _current.find(id); if (it == _current.end()) { - return TRI_ERROR_QUERY_NOT_FOUND; + return {TRI_ERROR_QUERY_NOT_FOUND}; } Query* query = (*it).second; @@ -200,27 +199,32 @@ int QueryList::kill(TRI_voc_tick_t id) { << "killing AQL query " << id << " '" << query->queryString() << "'"; query->kill(); - return TRI_ERROR_NO_ERROR; + return Result(); } -/// @brief kills all currently running queries -uint64_t QueryList::killAll(bool silent) { +/// @brief kills all currently running queries that match the filter function +/// (i.e. the filter should return true for a queries to be killed) +uint64_t QueryList::kill(std::function const& filter, bool silent) { uint64_t killed = 0; - WRITE_LOCKER(writeLocker, _lock); + READ_LOCKER(readLocker, _lock); for (auto& it : _current) { - Query* query = it.second; + Query& query = *(it.second); + + if (!filter(query)) { + continue; + } if (silent) { LOG_TOPIC("f7722", TRACE, arangodb::Logger::FIXME) - << "killing AQL query " << query->id() << " '" << query->queryString() << "'"; + << "killing AQL query " << query.id() << " '" << query.queryString() << "'"; } else { LOG_TOPIC("90113", WARN, arangodb::Logger::FIXME) - << "killing AQL query " << query->id() << " '" << query->queryString() << "'"; + << "killing AQL query " << query.id() << " '" << query.queryString() << "'"; } - query->kill(); + query.kill(); ++killed; } @@ -254,7 +258,9 @@ std::vector QueryList::listCurrent() { result.emplace_back(query->id(), extractQueryString(query, maxLength), _trackBindVars ? query->bindParameters() : nullptr, started, - now - started, query->state(), query->queryOptions().stream); + now - started, + query->killed() ? QueryExecutionState::ValueType::KILLED : query->state(), + query->queryOptions().stream); } } diff --git a/arangod/Aql/QueryList.h b/arangod/Aql/QueryList.h index 6a62fad992..c7daffc126 100644 --- a/arangod/Aql/QueryList.h +++ b/arangod/Aql/QueryList.h @@ -39,6 +39,7 @@ namespace velocypack { class Builder; } class QueryRegistryFeature; +class Result; namespace aql { @@ -186,10 +187,11 @@ class QueryList { void remove(Query*); /// @brief kills a query - int kill(TRI_voc_tick_t); + Result kill(TRI_voc_tick_t id); - /// @brief kills all currently running queries - uint64_t killAll(bool silent); + /// @brief kills all currently running queries that match the filter function + /// (i.e. the filter should return true for a queries to be killed) + uint64_t kill(std::function const& filter, bool silent); /// @brief return the list of running queries std::vector listCurrent(); diff --git a/arangod/Aql/QueryProfile.cpp b/arangod/Aql/QueryProfile.cpp index 1f8ad8c180..7480bd2c0e 100644 --- a/arangod/Aql/QueryProfile.cpp +++ b/arangod/Aql/QueryProfile.cpp @@ -37,21 +37,27 @@ using namespace arangodb::aql; /// @brief create a profile QueryProfile::QueryProfile(Query* query) - : _query(query), _lastStamp(query->startTime()), _tracked(false) { + : _query(query), + _lastStamp(query->startTime()), + _tracked(false) { for (auto& it : _timers) { it = 0.0; // reset timers } - auto queryList = query->vocbase().queryList(); - - try { - _tracked = queryList->insert(query); - } catch (...) { - } + registerInQueryList(query); } /// @brief destroy a profile QueryProfile::~QueryProfile() { + unregisterFromQueryList(); +} + +void QueryProfile::registerInQueryList(Query* query) { + auto queryList = query->vocbase().queryList(); + _tracked = queryList->insert(query); +} + +void QueryProfile::unregisterFromQueryList() noexcept { // only remove from list when the query was inserted into it... if (_tracked) { auto queryList = _query->vocbase().queryList(); @@ -60,6 +66,8 @@ QueryProfile::~QueryProfile() { queryList->remove(_query); } catch (...) { } + + _tracked = false; } } @@ -67,7 +75,8 @@ QueryProfile::~QueryProfile() { double QueryProfile::setStateDone(QueryExecutionState::ValueType state) { double const now = TRI_microtime(); - if (state != QueryExecutionState::ValueType::INVALID_STATE) { + if (state != QueryExecutionState::ValueType::INVALID_STATE && + state != QueryExecutionState::ValueType::KILLED) { // record duration of state _timers[static_cast(state)] = now - _lastStamp; } diff --git a/arangod/Aql/QueryProfile.h b/arangod/Aql/QueryProfile.h index b73b3b247d..8e6ada06e2 100644 --- a/arangod/Aql/QueryProfile.h +++ b/arangod/Aql/QueryProfile.h @@ -42,11 +42,14 @@ struct QueryProfile { QueryProfile(QueryProfile const&) = delete; QueryProfile& operator=(QueryProfile const&) = delete; - explicit QueryProfile(Query*); + explicit QueryProfile(Query* query); ~QueryProfile(); public: + /// @brief unregister the query from the list of queries, if entered + void unregisterFromQueryList() noexcept; + double setStateDone(QueryExecutionState::ValueType); /// @brief sets the absolute end time for an execution state @@ -59,6 +62,9 @@ struct QueryProfile { /// @brief convert the profile to VelocyPack void toVelocyPack(arangodb::velocypack::Builder&) const; + + private: + void registerInQueryList(Query* query); private: Query* _query; @@ -71,7 +77,7 @@ struct QueryProfile { // as we reserve a statically sized array for it static_assert(static_cast(QueryExecutionState::ValueType::INITIALIZATION) == 0, "unexpected min QueryExecutionState enum value"); -static_assert(static_cast(QueryExecutionState::ValueType::INVALID_STATE) < 10, +static_assert(static_cast(QueryExecutionState::ValueType::INVALID_STATE) < 11, "unexpected max QueryExecutionState enum value"); } // namespace aql diff --git a/arangod/Aql/QueryRegistry.cpp b/arangod/Aql/QueryRegistry.cpp index 3cf3783e67..63847fe746 100644 --- a/arangod/Aql/QueryRegistry.cpp +++ b/arangod/Aql/QueryRegistry.cpp @@ -101,6 +101,24 @@ void QueryRegistry::insert(QueryId id, Query* query, double ttl, } } +/// @brief kill a query +bool QueryRegistry::kill(TRI_vocbase_t* vocbase, QueryId id) { + READ_LOCKER(writeLocker, _lock); + + auto m = _queries.find(vocbase->name()); + if (m == _queries.end()) { + return false; + } + auto q = m->second.find(id); + if (q == m->second.end()) { + return false; + } + + std::unique_ptr& qi = q->second; + qi->_query->setKilled(); + return true; +} + /// @brief open Query* QueryRegistry::open(TRI_vocbase_t* vocbase, QueryId id) { LOG_TOPIC("8c204", DEBUG, arangodb::Logger::AQL) << "Open query with id " << id; diff --git a/arangod/Aql/QueryRegistry.h b/arangod/Aql/QueryRegistry.h index edf4213184..82ace1fdd6 100644 --- a/arangod/Aql/QueryRegistry.h +++ b/arangod/Aql/QueryRegistry.h @@ -43,6 +43,10 @@ class QueryRegistry { TEST_VIRTUAL ~QueryRegistry(); public: + /// @brief kills a query by id - returns true if the query was found and + /// false otherwise + bool kill(TRI_vocbase_t* vocbase, QueryId id); + /// @brief insert, this inserts the query for the vocbase /// and the id into the registry. It is in error if there is already /// a query for this and combination and an exception will diff --git a/arangod/Aql/RemoteExecutor.cpp b/arangod/Aql/RemoteExecutor.cpp index 01dbe671ba..fe581035f4 100644 --- a/arangod/Aql/RemoteExecutor.cpp +++ b/arangod/Aql/RemoteExecutor.cpp @@ -165,6 +165,10 @@ std::pair ExecutionBlockImpl::skipSomeWi // we were called with an error need to throw it. THROW_ARANGO_EXCEPTION(res); } + + if (getQuery().killed()) { + THROW_ARANGO_EXCEPTION(TRI_ERROR_QUERY_KILLED); + } if (_lastResponse != nullptr) { TRI_ASSERT(_lastError.ok()); @@ -233,6 +237,10 @@ std::pair ExecutionBlockImpl::initialize // will initialize the cursor lazily return {ExecutionState::DONE, TRI_ERROR_NO_ERROR}; } + + if (getQuery().killed()) { + THROW_ARANGO_EXCEPTION(TRI_ERROR_QUERY_KILLED); + } if (_lastResponse != nullptr || _lastError.fail()) { // We have an open result still. diff --git a/arangod/Aql/RestAqlHandler.cpp b/arangod/Aql/RestAqlHandler.cpp index a05f2b3213..8eecf8e011 100644 --- a/arangod/Aql/RestAqlHandler.cpp +++ b/arangod/Aql/RestAqlHandler.cpp @@ -392,6 +392,12 @@ bool RestAqlHandler::registerTraverserEngines(VPackSlice const traverserEngines, return true; } +// DELETE method for /_api/aql/kill/, (internal) +bool RestAqlHandler::killQuery(std::string const& idString) { + _qId = arangodb::basics::StringUtils::uint64(idString); + return _queryRegistry->kill(&_vocbase, _qId); +} + // PUT method for /_api/aql//, (internal) // this is using the part of the cursor API with side effects. // : can be "lock" or "getSome" or "skip" or "initializeCursor" or @@ -533,7 +539,27 @@ RestStatus RestAqlHandler::execute() { } break; } - case rest::RequestType::DELETE_REQ: + case rest::RequestType::DELETE_REQ: { + if (suffixes.size() != 2) { + std::string msg("Unknown DELETE API: "); + msg += arangodb::basics::StringUtils::join(suffixes, '/'); + LOG_TOPIC("f1993", ERR, arangodb::Logger::AQL) << msg; + generateError(rest::ResponseCode::NOT_FOUND, TRI_ERROR_HTTP_NOT_FOUND, + std::move(msg)); + } else { + if (killQuery(suffixes[1])) { + VPackBuilder answerBody; + { + VPackObjectBuilder guard(&answerBody); + answerBody.add("error", VPackValue(false)); + } + sendResponse(rest::ResponseCode::OK, answerBody.slice()); + } else { + generateError(rest::ResponseCode::NOT_FOUND, TRI_ERROR_QUERY_NOT_FOUND); + } + } + break; + } case rest::RequestType::HEAD: case rest::RequestType::PATCH: case rest::RequestType::OPTIONS: diff --git a/arangod/Aql/RestAqlHandler.h b/arangod/Aql/RestAqlHandler.h index d2e6e9268e..a28e98158d 100644 --- a/arangod/Aql/RestAqlHandler.h +++ b/arangod/Aql/RestAqlHandler.h @@ -55,6 +55,9 @@ class RestAqlHandler : public RestVocbaseBaseHandler { RestStatus continueExecute() override; public: + // DELETE method for /_api/aql/kill/, (internal) + bool killQuery(std::string const& idString); + // PUT method for /_api/aql//, this is using // the part of the cursor API with side effects. // : can be "getSome" or "skip". @@ -122,8 +125,7 @@ class RestAqlHandler : public RestVocbaseBaseHandler { RestStatus handleUseQuery(std::string const&, Query*, arangodb::velocypack::Slice const); // parseVelocyPackBody, returns a nullptr and produces an error - // response if - // parse was not successful. + // response if parse was not successful. std::shared_ptr parseVelocyPackBody(); private: diff --git a/arangod/Graph/BaseOptions.cpp b/arangod/Graph/BaseOptions.cpp index 0eb256cd60..dd649ea860 100644 --- a/arangod/Graph/BaseOptions.cpp +++ b/arangod/Graph/BaseOptions.cpp @@ -315,7 +315,7 @@ void BaseOptions::serializeVariables(VPackBuilder& builder) const { _ctx->serializeAllVariables(_trx, builder); } -void BaseOptions::setCollectionToShard(std::mapconst& in){ +void BaseOptions::setCollectionToShard(std::map const& in) { _collectionToShard = std::move(in); } diff --git a/arangod/IResearch/IResearchAnalyzerFeature.cpp b/arangod/IResearch/IResearchAnalyzerFeature.cpp index 47334aa385..195e7eab26 100644 --- a/arangod/IResearch/IResearchAnalyzerFeature.cpp +++ b/arangod/IResearch/IResearchAnalyzerFeature.cpp @@ -1337,7 +1337,7 @@ arangodb::Result IResearchAnalyzerFeature::ensure( // ensure analyzer existence if (res.ok()) { result = std::make_pair(pool, itr.second); - // cppcheck-suppress unreadVariable + // cppcheck-suppress unreadVariable erase = false; // successful pool creation, cleanup not required } diff --git a/arangod/IResearch/IResearchView.h b/arangod/IResearch/IResearchView.h index 73ef993154..8fb8c66e0c 100644 --- a/arangod/IResearch/IResearchView.h +++ b/arangod/IResearch/IResearchView.h @@ -153,7 +153,7 @@ class IResearchView final: public arangodb::LogicalView { /// if mode == Find && list found doesn't match then return nullptr /// @param key the specified key will be as snapshot indentifier /// in a transaction - /// (nullptr == view address will be used) + /// (nullptr == view address will be used) /// @return pointer to an index reader containing the datastore record /// snapshot associated with 'state' /// (nullptr == no view snapshot associated with the specified state) diff --git a/arangod/MMFiles/MMFilesWalRecoverState.cpp b/arangod/MMFiles/MMFilesWalRecoverState.cpp index dcce50db4e..8ea8876e4a 100644 --- a/arangod/MMFiles/MMFilesWalRecoverState.cpp +++ b/arangod/MMFiles/MMFilesWalRecoverState.cpp @@ -1220,7 +1220,7 @@ bool MMFilesWalRecoverState::ReplayMarker(MMFilesMarker const* marker, vocbase = nullptr; - arangodb::CreateDatabaseInfo info(state->databaseFeature.server()); + arangodb::CreateDatabaseInfo info(state->databaseFeature.server()); auto res = info.load(payloadSlice, VPackSlice::emptyArraySlice()); if (res.fail()) { THROW_ARANGO_EXCEPTION(res); diff --git a/arangod/Pregel/Recovery.cpp b/arangod/Pregel/Recovery.cpp index bf1be4fc94..27d9417692 100644 --- a/arangod/Pregel/Recovery.cpp +++ b/arangod/Pregel/Recovery.cpp @@ -40,14 +40,9 @@ using namespace arangodb; using namespace arangodb::pregel; RecoveryManager::RecoveryManager(ClusterInfo& ci) - : _ci(ci) {} //(AgencyCallbackRegistry* registry){} -// : _agencyCallbackRegistry(registry) + : _ci(ci) {} RecoveryManager::~RecoveryManager() { - // for (auto const& call : _agencyCallbacks) { - // _agencyCallbackRegistry->unregisterCallback(call.second); - // } - // _agencyCallbacks.clear(); _listeners.clear(); } @@ -58,12 +53,6 @@ void RecoveryManager::stopMonitoring(Conductor* listener) { if (pair.second.find(listener) != pair.second.end()) { pair.second.erase(listener); } - // if (pair.second.size() == 0) { - // std::shared_ptr callback = - // _agencyCallbacks[pair.first]; - // _agencyCallbackRegistry->unregisterCallback(callback); - // _agencyCallbacks.erase(pair.first); - // } } } diff --git a/arangod/RestHandler/RestJobHandler.cpp b/arangod/RestHandler/RestJobHandler.cpp index ff0d22945d..170b65d520 100644 --- a/arangod/RestHandler/RestJobHandler.cpp +++ b/arangod/RestHandler/RestJobHandler.cpp @@ -100,8 +100,7 @@ void RestJobHandler::putJob() { // return the original response // plus a new header - static std::string const xArango = "x-arango-async-id"; - _response->setHeaderNC(xArango, value); + _response->setHeaderNC(StaticStrings::AsyncId, value); } void RestJobHandler::putJobMethod() { diff --git a/arangod/RestHandler/RestQueryHandler.cpp b/arangod/RestHandler/RestQueryHandler.cpp index 22a7bde90b..9780ac754c 100644 --- a/arangod/RestHandler/RestQueryHandler.cpp +++ b/arangod/RestHandler/RestQueryHandler.cpp @@ -150,7 +150,7 @@ bool RestQueryHandler::readQuery() { return true; } -bool RestQueryHandler::deleteQuerySlow() { +void RestQueryHandler::deleteQuerySlow() { auto queryList = _vocbase.queryList(); queryList->clearSlow(); @@ -162,18 +162,16 @@ bool RestQueryHandler::deleteQuerySlow() { result.close(); generateResult(rest::ResponseCode::OK, result.slice()); - - return true; } -bool RestQueryHandler::deleteQuery(std::string const& name) { +void RestQueryHandler::deleteQuery(std::string const& name) { auto id = StringUtils::uint64(name); auto queryList = _vocbase.queryList(); TRI_ASSERT(queryList != nullptr); - auto res = queryList->kill(id); + Result res = queryList->kill(id); - if (res == TRI_ERROR_NO_ERROR) { + if (res.ok()) { VPackBuilder result; result.add(VPackValue(VPackValueType::Object)); result.add(StaticStrings::Error, VPackValue(false)); @@ -182,29 +180,28 @@ bool RestQueryHandler::deleteQuery(std::string const& name) { generateResult(rest::ResponseCode::OK, result.slice()); } else { - generateError(GeneralResponse::responseCode(res), res, - "cannot kill query '" + name + "': " + TRI_errno_string(res)); + generateError(GeneralResponse::responseCode(res.errorNumber()), res.errorNumber(), + "cannot kill query '" + name + "': " + res.errorMessage()); } - - return true; } /// @brief interrupts a query -bool RestQueryHandler::deleteQuery() { +void RestQueryHandler::deleteQuery() { auto const& suffixes = _request->suffixes(); if (suffixes.size() != 1) { generateError(rest::ResponseCode::BAD, TRI_ERROR_HTTP_BAD_PARAMETER, "expecting DELETE /_api/query/ or /_api/query/slow"); - return true; + return; } auto const& name = suffixes[0]; if (name == "slow") { - return deleteQuerySlow(); + deleteQuerySlow(); + } else { + deleteQuery(name); } - return deleteQuery(name); } bool RestQueryHandler::replaceProperties() { diff --git a/arangod/RestHandler/RestQueryHandler.h b/arangod/RestHandler/RestQueryHandler.h index a25854c341..30690cf029 100644 --- a/arangod/RestHandler/RestQueryHandler.h +++ b/arangod/RestHandler/RestQueryHandler.h @@ -70,19 +70,19 @@ class RestQueryHandler : public RestVocbaseBaseHandler { /// @brief removes the slow log ////////////////////////////////////////////////////////////////////////////// - bool deleteQuerySlow(); + void deleteQuerySlow(); ////////////////////////////////////////////////////////////////////////////// /// @brief interrupts a named query ////////////////////////////////////////////////////////////////////////////// - bool deleteQuery(std::string const& name); + void deleteQuery(std::string const& name); ////////////////////////////////////////////////////////////////////////////// /// @brief interrupts a query ////////////////////////////////////////////////////////////////////////////// - bool deleteQuery(); + void deleteQuery(); ////////////////////////////////////////////////////////////////////////////// /// @brief changes the settings diff --git a/arangod/RestHandler/RestTransactionHandler.cpp b/arangod/RestHandler/RestTransactionHandler.cpp index c395ef71c0..d7eafbf153 100644 --- a/arangod/RestHandler/RestTransactionHandler.cpp +++ b/arangod/RestHandler/RestTransactionHandler.cpp @@ -213,21 +213,36 @@ void RestTransactionHandler::executeAbort() { generateError(rest::ResponseCode::BAD, TRI_ERROR_BAD_PARAMETER); return; } - - TRI_voc_tid_t tid = basics::StringUtils::uint64(_request->suffixes()[0]); - if (tid == 0) { - generateError(rest::ResponseCode::BAD, TRI_ERROR_BAD_PARAMETER, - "bad transaction ID"); - return; - } + transaction::Manager* mgr = transaction::ManagerFeature::manager(); TRI_ASSERT(mgr != nullptr); - - Result res = mgr->abortManagedTrx(tid); - if (res.fail()) { - generateError(res); + + if (_request->suffixes()[0] == "write") { + // abort all transactions + bool const fanout = ServerState::instance()->isCoordinator() && !_request->parsedValue("local", false); + ExecContext const& exec = ExecContext::current(); + Result res = mgr->abortAllManagedWriteTrx(exec.user(), fanout); + + if (res.ok()) { + generateOk(rest::ResponseCode::OK, VPackSlice::emptyObjectSlice()); + } else { + generateError(res); + } } else { - generateTransactionResult(rest::ResponseCode::OK, tid, transaction::Status::ABORTED); + TRI_voc_tid_t tid = basics::StringUtils::uint64(_request->suffixes()[0]); + if (tid == 0) { + generateError(rest::ResponseCode::BAD, TRI_ERROR_BAD_PARAMETER, + "bad transaction ID"); + return; + } + + Result res = mgr->abortManagedTrx(tid); + + if (res.fail()) { + generateError(res); + } else { + generateTransactionResult(rest::ResponseCode::OK, tid, transaction::Status::ABORTED); + } } } diff --git a/arangod/RestServer/BootstrapFeature.cpp b/arangod/RestServer/BootstrapFeature.cpp index 052b6e1c8c..0a594a30e6 100644 --- a/arangod/RestServer/BootstrapFeature.cpp +++ b/arangod/RestServer/BootstrapFeature.cpp @@ -45,13 +45,18 @@ namespace { static std::string const FEATURE_NAME("Bootstrap"); +static std::string const boostrapKey = "Bootstrap"; +} + +namespace arangodb { +namespace aql { +class Query; +} } using namespace arangodb; using namespace arangodb::options; -static std::string const boostrapKey = "Bootstrap"; - BootstrapFeature::BootstrapFeature(application_features::ApplicationServer& server) : ApplicationFeature(server, ::FEATURE_NAME), _isReady(false), _bark(false) { startsAfter(); @@ -90,7 +95,7 @@ void raceForClusterBootstrap(BootstrapFeature& feature) { AgencyComm agency; auto& ci = feature.server().getFeature().clusterInfo(); while (true) { - AgencyCommResult result = agency.getValues(boostrapKey); + AgencyCommResult result = agency.getValues(::boostrapKey); if (!result.successful()) { // Error in communication, note that value not found is not an error LOG_TOPIC("2488f", TRACE, Logger::STARTUP) @@ -100,7 +105,7 @@ void raceForClusterBootstrap(BootstrapFeature& feature) { } VPackSlice value = result.slice()[0].get( - std::vector({AgencyCommManager::path(), boostrapKey})); + std::vector({AgencyCommManager::path(), ::boostrapKey})); if (value.isString()) { // key was found and is a string std::string boostrapVal = value.copyString(); @@ -110,7 +115,7 @@ void raceForClusterBootstrap(BootstrapFeature& feature) { << "raceForClusterBootstrap: bootstrap already done"; return; } else if (boostrapVal == ServerState::instance()->getId()) { - agency.removeValues(boostrapKey, false); + agency.removeValues(::boostrapKey, false); } LOG_TOPIC("49437", DEBUG, Logger::STARTUP) << "raceForClusterBootstrap: somebody else does the bootstrap"; @@ -121,7 +126,7 @@ void raceForClusterBootstrap(BootstrapFeature& feature) { // No value set, we try to do the bootstrap ourselves: VPackBuilder b; b.add(VPackValue(arangodb::ServerState::instance()->getId())); - result = agency.casValue(boostrapKey, b.slice(), false, 300, 15); + result = agency.casValue(::boostrapKey, b.slice(), false, 300, 15); if (!result.successful()) { LOG_TOPIC("a1ecb", DEBUG, Logger::STARTUP) << "raceForClusterBootstrap: lost race, somebody else will bootstrap"; @@ -141,7 +146,7 @@ void raceForClusterBootstrap(BootstrapFeature& feature) { if (dbservers.size() == 0) { LOG_TOPIC("0ad1c", TRACE, Logger::STARTUP) << "raceForClusterBootstrap: no DBservers, waiting"; - agency.removeValues(boostrapKey, false); + agency.removeValues(::boostrapKey, false); std::this_thread::sleep_for(std::chrono::seconds(1)); continue; } @@ -156,7 +161,7 @@ void raceForClusterBootstrap(BootstrapFeature& feature) { if (upgradeRes.fail()) { LOG_TOPIC("8903f", ERR, Logger::STARTUP) << "Problems with cluster bootstrap, " << "marking as not successful."; - agency.removeValues(boostrapKey, false); + agency.removeValues(::boostrapKey, false); std::this_thread::sleep_for(std::chrono::seconds(1)); continue; } @@ -177,7 +182,7 @@ void raceForClusterBootstrap(BootstrapFeature& feature) { b.clear(); b.add(VPackValue(arangodb::ServerState::instance()->getId() + ": done")); - result = agency.setValue(boostrapKey, b.slice(), 0); + result = agency.setValue(::boostrapKey, b.slice(), 0); if (result.successful()) { return; } @@ -355,7 +360,7 @@ void BootstrapFeature::unprepare() { TRI_vocbase_t* vocbase = databaseFeature.useDatabase(name); if (vocbase != nullptr) { - vocbase->queryList()->killAll(true); + vocbase->queryList()->kill([](aql::Query&) { return true; }, true); vocbase->release(); } } diff --git a/arangod/RestServer/DatabaseFeature.cpp b/arangod/RestServer/DatabaseFeature.cpp index ceaf223fc3..477bb5f1a0 100644 --- a/arangod/RestServer/DatabaseFeature.cpp +++ b/arangod/RestServer/DatabaseFeature.cpp @@ -594,10 +594,18 @@ Result DatabaseFeature::registerPostRecoveryCallback(std::function&& c return Result(); } + +void DatabaseFeature::enumerate(std::function const& callback) { + auto unuser(_databasesProtector.use()); + auto theLists = _databasesLists.load(); + + for (auto& p : theLists->_databases) { + callback(p.second); + } +} /// @brief create a new database -Result DatabaseFeature::createDatabase(CreateDatabaseInfo&& info, TRI_vocbase_t*& result){ - +Result DatabaseFeature::createDatabase(CreateDatabaseInfo&& info, TRI_vocbase_t*& result) { std::string name = info.getName(); auto dbId = info.getId(); VPackBuilder markerBuilder; @@ -1270,7 +1278,7 @@ int DatabaseFeature::iterateDatabases(VPackSlice const& databases) { arangodb::CreateDatabaseInfo info(server()); info.allowSystemDB(true); auto res = info.load(it, VPackSlice::emptyArraySlice()); - if(res.fail()){ + if (res.fail()) { THROW_ARANGO_EXCEPTION(res); } auto database = engine->openDatabase(std::move(info), _upgrade); diff --git a/arangod/RestServer/DatabaseFeature.h b/arangod/RestServer/DatabaseFeature.h index caf3aa47a1..0f89274ad7 100644 --- a/arangod/RestServer/DatabaseFeature.h +++ b/arangod/RestServer/DatabaseFeature.h @@ -84,11 +84,11 @@ class DatabaseFeature : public application_features::ApplicationFeature { void unprepare() override final; // used by catch tests - #ifdef ARANGODB_USE_GOOGLE_TESTS - inline int loadDatabases(velocypack::Slice const& databases) { - return iterateDatabases(databases); - } - #endif +#ifdef ARANGODB_USE_GOOGLE_TESTS + inline int loadDatabases(velocypack::Slice const& databases) { + return iterateDatabases(databases); + } +#endif /// @brief will be called when the recovery phase has run /// this will call the engine-specific recoveryDone() procedures @@ -96,6 +96,9 @@ class DatabaseFeature : public application_features::ApplicationFeature { /// the replication appliers) for all databases void recoveryDone(); + /// @brief enumerate all databases + void enumerate(std::function const& callback); + ////////////////////////////////////////////////////////////////////////////// /// @brief register a callback /// if StorageEngine.inRecovery() -> call at start of recoveryDone() diff --git a/arangod/Scheduler/Scheduler.h b/arangod/Scheduler/Scheduler.h index 0ae3e8277d..254cda7368 100644 --- a/arangod/Scheduler/Scheduler.h +++ b/arangod/Scheduler/Scheduler.h @@ -91,7 +91,7 @@ class Scheduler { explicit WorkItem(std::function&& handler, RequestLane lane, Scheduler* scheduler) - : _handler(std::move(handler)), _lane(lane), _disable(false), _scheduler(scheduler){}; + : _handler(std::move(handler)), _lane(lane), _disable(false), _scheduler(scheduler) {} private: // This is not copyable or movable diff --git a/arangod/Statistics/ServerStatistics.h b/arangod/Statistics/ServerStatistics.h index 2d1fc0c4ad..8c6bf485f7 100644 --- a/arangod/Statistics/ServerStatistics.h +++ b/arangod/Statistics/ServerStatistics.h @@ -25,12 +25,11 @@ #define ARANGOD_STATISTICS_SERVER_STATISTICS_H 1 #include -#include - +#include struct TransactionStatistics { TransactionStatistics() : _transactionsStarted(0), _transactionsAborted(0) - , _transactionsCommitted(0), _intermediateCommits(0) {}; + , _transactionsCommitted(0), _intermediateCommits(0) {} std::atomic _transactionsStarted; std::atomic _transactionsAborted; diff --git a/arangod/Transaction/ClusterUtils.cpp b/arangod/Transaction/ClusterUtils.cpp index 7d1546c562..9bec0c39e3 100644 --- a/arangod/Transaction/ClusterUtils.cpp +++ b/arangod/Transaction/ClusterUtils.cpp @@ -39,7 +39,7 @@ void abortLeaderTransactionsOnShard(TRI_voc_cid_t cid) { transaction::Manager* mgr = transaction::ManagerFeature::manager(); TRI_ASSERT(mgr != nullptr); - bool didWork = mgr->abortManagedTrx([cid](TransactionState const& state) -> bool { + bool didWork = mgr->abortManagedTrx([cid](TransactionState const& state, std::string const& /*user*/) -> bool { if (transaction::isLeaderTransactionId(state.id())) { TransactionCollection* tcoll = state.collection(cid, AccessMode::Type::NONE); return tcoll != nullptr; @@ -54,7 +54,7 @@ void abortFollowerTransactionsOnShard(TRI_voc_cid_t cid) { transaction::Manager* mgr = transaction::ManagerFeature::manager(); TRI_ASSERT(mgr != nullptr); - bool didWork = mgr->abortManagedTrx([cid](TransactionState const& state) -> bool { + bool didWork = mgr->abortManagedTrx([cid](TransactionState const& state, std::string const& /*user*/) -> bool { if (transaction::isFollowerTransactionId(state.id())) { TransactionCollection* tcoll = state.collection(cid, AccessMode::Type::NONE); return tcoll != nullptr; @@ -76,7 +76,7 @@ void abortTransactionsWithFailedServers(ClusterInfo& ci) { if (ServerState::instance()->isCoordinator()) { // abort all transactions using a lead server - didWork = mgr->abortManagedTrx([&](TransactionState const& state) -> bool { + didWork = mgr->abortManagedTrx([&](TransactionState const& state, std::string const& /*user*/) -> bool { for (ServerID const& sid : failed) { if (state.knowsServer(sid)) { return true; @@ -96,7 +96,7 @@ void abortTransactionsWithFailedServers(ClusterInfo& ci) { } // abort all transaction started by a certain coordinator - didWork = mgr->abortManagedTrx([&](TransactionState const& state) -> bool { + didWork = mgr->abortManagedTrx([&](TransactionState const& state, std::string const& /*user*/) -> bool { uint32_t serverId = TRI_ExtractServerIdFromTick(state.id()); if (serverId != 0) { ServerID coordId = ci.getCoordinatorByShortID(serverId); diff --git a/arangod/Transaction/Manager.cpp b/arangod/Transaction/Manager.cpp index 9ec0251f87..d058d7a9b6 100644 --- a/arangod/Transaction/Manager.cpp +++ b/arangod/Transaction/Manager.cpp @@ -23,7 +23,10 @@ #include "Manager.h" +#include "Aql/Query.h" +#include "Aql/QueryList.h" #include "Basics/ReadLocker.h" +#include "Basics/ScopeGuard.h" #include "Basics/WriteLocker.h" #include "Basics/system-functions.h" #include "Cluster/ClusterFeature.h" @@ -34,6 +37,8 @@ #include "Logger/LogMacros.h" #include "Network/Methods.h" #include "Network/NetworkFeature.h" +#include "Network/Utils.h" +#include "RestServer/DatabaseFeature.h" #include "StorageEngine/EngineSelectorFeature.h" #include "StorageEngine/StorageEngine.h" #include "StorageEngine/TransactionState.h" @@ -125,6 +130,9 @@ void Manager::registerTransaction(TRI_voc_tid_t transactionId, _transactions[bucket]._activeTransactions.emplace(transactionId, std::move(data)); } catch (...) { _nrRunning.fetch_sub(1, std::memory_order_relaxed); + if (!isReadOnlyTransaction) { + _rwLock.unlockRead(); + } throw; } } @@ -133,6 +141,13 @@ void Manager::registerTransaction(TRI_voc_tid_t transactionId, // unregisters a transaction void Manager::unregisterTransaction(TRI_voc_tid_t transactionId, bool markAsFailed, bool isReadOnlyTransaction) { + // always perform an unlock when we leave this function + auto guard = scopeGuard([this, &isReadOnlyTransaction]() { + if (!isReadOnlyTransaction) { + _rwLock.unlockRead(); + } + }); + uint64_t r = _nrRunning.fetch_sub(1, std::memory_order_relaxed); TRI_ASSERT(r > 0); @@ -147,9 +162,6 @@ void Manager::unregisterTransaction(TRI_voc_tid_t transactionId, bool markAsFail _transactions[bucket]._failedTransactions.emplace(transactionId); } } - if (!isReadOnlyTransaction) { - _rwLock.unlockRead(); - } } // return the set of failed transactions @@ -830,7 +842,7 @@ bool Manager::garbageCollect(bool abortAll) { } /// @brief abort all transactions matching -bool Manager::abortManagedTrx(std::function cb) { +bool Manager::abortManagedTrx(std::function cb) { ::arangodb::containers::SmallVector::allocator_type::arena_type arena; ::arangodb::containers::SmallVector toAbort{arena}; @@ -844,7 +856,7 @@ bool Manager::abortManagedTrx(std::function cb) { if (mtrx.type == MetaType::Managed) { TRI_ASSERT(mtrx.state != nullptr); TRY_READ_LOCKER(tryGuard, mtrx.rwlock); // needs lock to access state - if (tryGuard.isLocked() && cb(*mtrx.state)) { + if (tryGuard.isLocked() && cb(*mtrx.state, mtrx.user)) { toAbort.emplace_back(it->first); } } @@ -952,5 +964,93 @@ void Manager::toVelocyPack(VPackBuilder& builder, std::string const& database, }); } +Result Manager::abortAllManagedWriteTrx(std::string const& username, bool fanout) { + LOG_TOPIC("bba16", INFO, Logger::QUERIES) << "aborting all " << (fanout ? "" : "local ") << "write transactions"; + Result res; + + DatabaseFeature& databaseFeature = _feature.server().getFeature(); + databaseFeature.enumerate([](TRI_vocbase_t* vocbase) { + auto queryList = vocbase->queryList(); + TRI_ASSERT(queryList != nullptr); + // we are only interested in killed write queries + queryList->kill([](aql::Query& query) { + auto* state = query.trx()->state(); + return state && !state->isReadOnlyTransaction(); + }, false); + }); + + // abort local transactions + abortManagedTrx([](TransactionState const& state, std::string const& user) { + return ::authorized(user) && !state.isReadOnlyTransaction(); + }); + + if (fanout && + ServerState::instance()->isCoordinator()) { + auto& ci = _feature.server().getFeature().clusterInfo(); + + NetworkFeature const& nf = _feature.server().getFeature(); + network::ConnectionPool* pool = nf.pool(); + if (pool == nullptr) { + THROW_ARANGO_EXCEPTION(TRI_ERROR_SHUTTING_DOWN); + } + + std::vector futures; + auto auth = AuthenticationFeature::instance(); + + network::RequestOptions options; + options.timeout = network::Timeout(30.0); + + VPackBuffer body; + + for (auto const& coordinator : ci.getCurrentCoordinators()) { + if (coordinator == ServerState::instance()->getId()) { + // ourselves! + continue; + } + + network::Headers headers; + if (auth != nullptr && auth->isActive()) { + if (!username.empty()) { + VPackBuilder builder; + { + VPackObjectBuilder payload{&builder}; + payload->add("preferred_username", VPackValue(username)); + } + VPackSlice slice = builder.slice(); + headers.emplace(StaticStrings::Authorization, + "bearer " + auth->tokenCache().generateJwt(slice)); + } else { + headers.emplace(StaticStrings::Authorization, + "bearer " + auth->tokenCache().jwtToken()); + } + } + + auto f = network::sendRequest(pool, "server:" + coordinator, fuerte::RestVerb::Delete, + "/_db/_system/_api/transaction/write?local=true", + body, std::move(headers), options); + futures.emplace_back(std::move(f)); + } + + if (!futures.empty()) { + auto responses = futures::collectAll(futures).get(); + for (auto const& it : responses) { + if (!it.hasValue()) { + THROW_ARANGO_EXCEPTION(TRI_ERROR_CLUSTER_BACKEND_UNAVAILABLE); + } + auto& resp = it.get(); + if (resp.response && resp.response->statusCode() != fuerte::StatusOK) { + auto slices = resp.response->slices(); + if (!slices.empty()) { + VPackSlice slice = slices[0]; + res.reset(network::resultFromBody(slice, TRI_ERROR_FAILED)); + } + } + } + } + } + + return res; +} + } // namespace transaction } // namespace arangodb diff --git a/arangod/Transaction/Manager.h b/arangod/Transaction/Manager.h index 68ef0a5e64..17ff6c2647 100644 --- a/arangod/Transaction/Manager.h +++ b/arangod/Transaction/Manager.h @@ -156,7 +156,10 @@ class Manager final { bool garbageCollect(bool abortAll); /// @brief abort all transactions matching - bool abortManagedTrx(std::function); + bool abortManagedTrx(std::function); + + /// @brief abort all managed write transactions + Result abortAllManagedWriteTrx(std::string const& username, bool fanout); /// @brief convert the list of running transactions to a VelocyPack array /// the array must be opened already. diff --git a/arangod/V8Server/v8-actions.cpp b/arangod/V8Server/v8-actions.cpp index b94297ccc3..206b122532 100644 --- a/arangod/V8Server/v8-actions.cpp +++ b/arangod/V8Server/v8-actions.cpp @@ -141,7 +141,7 @@ class v8_action_t final : public TRI_action_t { // and execute it { - // cppcheck-suppress redundantPointerOp + // cppcheck-suppress redundantPointerOp MUTEX_LOCKER(mutexLocker, *dataLock); if (*data != nullptr) { @@ -166,7 +166,7 @@ class v8_action_t final : public TRI_action_t { } { - // cppcheck-suppress redundantPointerOp + // cppcheck-suppress redundantPointerOp MUTEX_LOCKER(mutexLocker, *dataLock); *data = nullptr; } diff --git a/arangod/V8Server/v8-vocbase.cpp b/arangod/V8Server/v8-vocbase.cpp index adc6bf5632..35fbbe490d 100644 --- a/arangod/V8Server/v8-vocbase.cpp +++ b/arangod/V8Server/v8-vocbase.cpp @@ -1041,9 +1041,9 @@ static void JS_QueriesKillAql(v8::FunctionCallbackInfo const& args) { auto* queryList = vocbase.queryList(); TRI_ASSERT(queryList != nullptr); - auto res = queryList->kill(id); + Result res = queryList->kill(id); - if (res == TRI_ERROR_NO_ERROR) { + if (res.ok()) { TRI_V8_RETURN_TRUE(); } diff --git a/lib/Rest/GeneralResponse.h b/lib/Rest/GeneralResponse.h index 00d9cc6261..769c1f0a7c 100644 --- a/lib/Rest/GeneralResponse.h +++ b/lib/Rest/GeneralResponse.h @@ -94,7 +94,7 @@ class GeneralResponse { explicit GeneralResponse(ResponseCode); public: - virtual ~GeneralResponse() {} + virtual ~GeneralResponse() = default; public: // response codes are http response codes, but they are used in other @@ -165,9 +165,9 @@ class GeneralResponse { virtual int reservePayload(std::size_t size) { return TRI_ERROR_NO_ERROR; } /// used for head - bool generateBody() const { return _generateBody; }; + bool generateBody() const { return _generateBody; } /// used for head - virtual bool setGenerateBody(bool) { return _generateBody; }; + virtual bool setGenerateBody(bool) { return _generateBody; } virtual int deflate(size_t size = 16384) = 0; diff --git a/tests/Aql/EngineInfoContainerCoordinatorTest.cpp b/tests/Aql/EngineInfoContainerCoordinatorTest.cpp index 408edd9f6f..7b0c294423 100644 --- a/tests/Aql/EngineInfoContainerCoordinatorTest.cpp +++ b/tests/Aql/EngineInfoContainerCoordinatorTest.cpp @@ -139,8 +139,9 @@ TEST(EngineInfoContainerTest, it_should_create_an_executionengine_for_the_first_ EngineInfoContainerCoordinator testee; testee.addNode(&sNode); + std::vector coordinatorQueryIds{}; ExecutionEngineResult result = - testee.buildEngines(query, ®istry, dbname, restrictToShards, queryIds); + testee.buildEngines(query, ®istry, dbname, restrictToShards, queryIds, coordinatorQueryIds); ASSERT_TRUE(result.ok()); ExecutionEngine* engine = result.engine(); @@ -296,8 +297,9 @@ TEST(EngineInfoContainerTest, // Close the second snippet testee.closeSnippet(); + std::vector coordinatorQueryIds{}; ExecutionEngineResult result = - testee.buildEngines(query, ®istry, dbname, restrictToShards, queryIds); + testee.buildEngines(query, ®istry, dbname, restrictToShards, queryIds, coordinatorQueryIds); ASSERT_TRUE(result.ok()); ExecutionEngine* engine = result.engine(); @@ -532,8 +534,9 @@ TEST(EngineInfoContainerTest, snippets_are_a_stack_insert_node_always_into_top_s testee.addNode(&tbNode); + std::vector coordinatorQueryIds{}; ExecutionEngineResult result = - testee.buildEngines(query, ®istry, dbname, restrictToShards, queryIds); + testee.buildEngines(query, ®istry, dbname, restrictToShards, queryIds, coordinatorQueryIds); ASSERT_TRUE(result.ok()); ExecutionEngine* engine = result.engine(); @@ -699,8 +702,9 @@ TEST(EngineInfoContainerTest, error_cases_cloning_of_a_query_fails_throws_an_err }) .Throw(arangodb::basics::Exception(TRI_ERROR_DEBUG, __FILE__, __LINE__)); + std::vector coordinatorQueryIds{}; ExecutionEngineResult result = - testee.buildEngines(query, ®istry, dbname, restrictToShards, queryIds); + testee.buildEngines(query, ®istry, dbname, restrictToShards, queryIds, coordinatorQueryIds); ASSERT_FALSE(result.ok()); // Make sure we check the right thing here ASSERT_EQ(result.errorNumber(), TRI_ERROR_DEBUG); @@ -866,8 +870,9 @@ TEST(EngineInfoContainerTest, error_cases_cloning_of_a_query_fails_returns_a_nul return nullptr; }); + std::vector coordinatorQueryIds{}; ExecutionEngineResult result = - testee.buildEngines(query, ®istry, dbname, restrictToShards, queryIds); + testee.buildEngines(query, ®istry, dbname, restrictToShards, queryIds, coordinatorQueryIds); ASSERT_FALSE(result.ok()); // Make sure we check the right thing here ASSERT_EQ(result.errorNumber(), TRI_ERROR_INTERNAL); diff --git a/tests/Transaction/Manager-test.cpp b/tests/Transaction/Manager-test.cpp index 82208aa887..efaf2db78f 100644 --- a/tests/Transaction/Manager-test.cpp +++ b/tests/Transaction/Manager-test.cpp @@ -414,7 +414,7 @@ TEST_F(TransactionManagerTest, abort_transactions_with_matcher) { ASSERT_EQ(mgr->getManagedTrxStatus(tid), transaction::Status::RUNNING); // - mgr->abortManagedTrx([](TransactionState const& state) -> bool { + mgr->abortManagedTrx([](TransactionState const& state, std::string const& /*user*/) -> bool { TransactionCollection* tcoll = state.collection(42, AccessMode::Type::NONE); return tcoll != nullptr; }); diff --git a/tests/js/client/shell/shell-aql-kill.js b/tests/js/client/shell/shell-aql-kill.js new file mode 100644 index 0000000000..d991a95f0b --- /dev/null +++ b/tests/js/client/shell/shell-aql-kill.js @@ -0,0 +1,140 @@ +/* jshint globalstrict:false, strict:false, maxlen: 200 */ +/* global assertTrue, assertEqual, arango */ + +// ////////////////////////////////////////////////////////////////////////////// +// / @brief ArangoTransaction sTests +// / +// / +// / DISCLAIMER +// / +// / Copyright 2018 ArangoDB GmbH, Cologne, Germany +// / +// / Licensed under the Apache License, Version 2.0 (the "License") +// / you may not use this file except in compliance with the License. +// / You may obtain a copy of the License at +// / +// / http://www.apache.org/licenses/LICENSE-2.0 +// / +// / Unless required by applicable law or agreed to in writing, software +// / distributed under the License is distributed on an "AS IS" BASIS, +// / WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// / See the License for the specific language governing permissions and +// / limitations under the License. +// / +// / Copyright holder is triAGENS GmbH, Cologne, Germany +// / +// / @author Jan Steemann +// ////////////////////////////////////////////////////////////////////////////// + +var jsunity = require('jsunity'); +var internal = require('internal'); +var arangodb = require('@arangodb'); +var db = arangodb.db; + +// ////////////////////////////////////////////////////////////////////////////// +// / @brief test suite +// ////////////////////////////////////////////////////////////////////////////// + +function aqlKillSuite () { + 'use strict'; + const cn = "UnitTestsCollection"; + + return { + + setUpAll: function () { + db._drop(cn); + db._create(cn, { numberOfShards: 3 }); + }, + + tearDownAll: function () { + db._drop(cn); + }, + + testAbortReadQuery: function () { + let result = arango.POST_RAW("/_api/cursor", { + query: "FOR i IN 1..10000000 RETURN SLEEP(1)" + }, { + "x-arango-async" : "store" + }); + + let jobId = result.headers["x-arango-async-id"]; + + let queryId = 0; + let tries = 0; + while (++tries < 30) { + let queries = require("@arangodb/aql/queries").current(); + queries.filter(function(data) { + if (data.query.indexOf("SLEEP(1)") !== -1) { + queryId = data.id; + } + }); + if (queryId > 0) { + break; + } + + require("internal").wait(1, false); + } + + assertTrue(queryId > 0); + + result = arango.DELETE("/_api/query/" + queryId); + assertEqual(result.code, 200); + + tries = 0; + while (++tries < 30) { + result = arango.PUT_RAW("/_api/job/" + jobId, {}); + if (result.code === 410) { + break; + } + require("internal").wait(1, false); + } + assertEqual(410, result.code); + }, + + testAbortWriteQuery: function () { + let result = arango.POST_RAW("/_api/cursor", { + query: "FOR i IN 1..10000000 INSERT {} INTO " + cn + }, { + "x-arango-async" : "store" + }); + + let jobId = result.headers["x-arango-async-id"]; + + let queryId = 0; + let tries = 0; + while (++tries < 30) { + let queries = require("@arangodb/aql/queries").current(); + queries.filter(function(data) { + if (data.query.indexOf(cn) !== -1) { + queryId = data.id; + } + }); + if (queryId > 0) { + break; + } + + require("internal").wait(1, false); + } + + assertTrue(queryId > 0); + + result = arango.DELETE("/_api/query/" + queryId); + assertEqual(result.code, 200); + + tries = 0; + while (++tries < 30) { + result = arango.PUT_RAW("/_api/job/" + jobId, {}); + if (result.code === 410) { + break; + } + require("internal").wait(1, false); + } + assertEqual(410, result.code); + }, + + }; +} + +jsunity.run(aqlKillSuite); + +return jsunity.done(); diff --git a/tests/js/client/shell/shell-transaction.js b/tests/js/client/shell/shell-transaction.js index cd5ca2fbfb..c0210b79e7 100644 --- a/tests/js/client/shell/shell-transaction.js +++ b/tests/js/client/shell/shell-transaction.js @@ -1,5 +1,5 @@ /* jshint globalstrict:false, strict:false, maxlen: 200 */ -/* global fail, assertTrue, assertFalse, assertEqual, assertNotUndefined */ +/* global fail, assertTrue, assertFalse, assertEqual, assertNotUndefined, arango */ // ////////////////////////////////////////////////////////////////////////////// // / @brief ArangoTransaction sTests @@ -649,7 +649,110 @@ function transactionInvocationSuite () { try { trx.abort(); } catch (err) {} }); } - } + }, + + // ////////////////////////////////////////////////////////////////////////////// + // / @brief test: abort write transactions + // ////////////////////////////////////////////////////////////////////////////// + + testAbortWriteTransactions: function () { + db._create(cn, {numberOfShards: 2}); + let trx1, trx2, trx3; + + let obj = { + collections: { + write: [ cn ] + } + }; + + try { + trx1 = db._createTransaction(obj); + trx2 = db._createTransaction(obj); + trx3 = db._createTransaction(obj); + + let trx = db._transactions(); + // the following assertions are not safe, as transactions have + // an idle timeout of 10 seconds, and we cannot guarantee any + // runtime performance in our test environment + // assertInList(trx, trx1); + // assertInList(trx, trx2); + // assertInList(trx, trx3); + + let result = arango.DELETE("/_api/transaction/write"); + assertEqual(result.code, 200); + + trx = db._transactions(); + assertNotInList(trx, trx1); + assertNotInList(trx, trx2); + assertNotInList(trx, trx3); + } finally { + if (trx1 && trx1._id) { + try { trx1.abort(); } catch (err) {} + } + if (trx2 && trx2._id) { + try { trx2.abort(); } catch (err) {} + } + if (trx3 && trx3._id) { + try { trx3.abort(); } catch (err) {} + } + } + }, + + // ////////////////////////////////////////////////////////////////////////////// + // / @brief test: abort write transactions + // ////////////////////////////////////////////////////////////////////////////// + + testAbortWriteTransactionAQL: function () { + db._create(cn, {numberOfShards: 2}); + let trx1; + + let obj = { + collections: { + write: [ cn ] + } + }; + + try { + trx1 = db._createTransaction(obj); + let result = arango.POST_RAW("/_api/cursor", { + query: "FOR i IN 1..10000000 INSERT {} INTO " + cn + }, { + "x-arango-trx-id" : trx1._id, + "x-arango-async" : "store" + }); + + let jobId = result.headers["x-arango-async-id"]; + + let tries = 0; + while (++tries < 60) { + result = arango.PUT_RAW("/_api/job/" + jobId, {}); + if (result.code === 204) { + break; + } + require("internal").wait(0.5, false); + } + + let trx = db._transactions(); + assertInList(trx, trx1); + + result = arango.DELETE("/_api/transaction/write"); + assertEqual(result.code, 200); + + tries = 0; + while (++tries < 60) { + result = arango.PUT_RAW("/_api/job/" + jobId, {}); + if (result.code === 410) { + break; + } + require("internal").wait(0.5, false); + } + assertEqual(410, result.code); + } finally { + if (trx1 && trx1._id) { + try { trx1.abort(); } catch (err) {} + } + } + }, }; } @@ -4047,7 +4150,6 @@ function transactionAQLStreamSuite () { }; } - // ////////////////////////////////////////////////////////////////////////////// // / @brief test suite // ////////////////////////////////////////////////////////////////////////////// diff --git a/tests/js/common/shell/shell-document.js b/tests/js/common/shell/shell-document.js index 80251588f5..8f2cf102f1 100644 --- a/tests/js/common/shell/shell-document.js +++ b/tests/js/common/shell/shell-document.js @@ -2383,36 +2383,6 @@ function DatabaseDocumentSuiteReturnStuff () { }, - -//////////////////////////////////////////////////////////////////////////////// -/// @brief test new features from 3.0 -//////////////////////////////////////////////////////////////////////////////// - -/* Not Functional in arangosh connected to coordinator. - testNewFeatures : function () { - if (! require("@arangodb/cluster").isCluster()) { - var x = collection.insert({Hallo: 12}, { silent: true }); - assertEqual(true, x); - x = collection.insert([{Hallo: 13}], { silent: true }); - assertEqual(true, x); - x = collection.insert({Hallo:14}); - var y = collection.replace(x._key, {Hallo:15}, { silent: true }); - assertEqual(true, y); - y = db._replace(x._id, {Hallo: 16}, {silent: true}); - assertEqual(true, y); - y = collection.update(x._key, {Hallo:17}, { silent: true }); - assertEqual(true, y); - y = db._update(x._id, {Hallo:18}, { silent: true }); - assertEqual(true, y); - y = collection.remove(x._key, { silent: true }); - assertEqual(true, y); - x = collection.insert({Hallo:19}); - y = db._remove(x._id, {silent: true}); - assertEqual(true, y); - } - } -*/ - }; } diff --git a/tests/js/common/shell/shell-statistics.js b/tests/js/common/shell/shell-statistics.js index 34504cef31..88f3d584c2 100644 --- a/tests/js/common/shell/shell-statistics.js +++ b/tests/js/common/shell/shell-statistics.js @@ -93,7 +93,7 @@ function CommonStatisticsSuite() { db._query(`FOR i IN 1..3 INSERT { "ulf" : i } IN ${c.name()}`, {}, { "intermediateCommitCount" : 2}); let stats2 = internal.serverStatistics(); - if(db._engine().name === "rocksdb" && !internal.isCluster()) { + if (db._engine().name === "rocksdb" && !internal.isCluster()) { assertTrue(stats1.transactions.intermediateCommits < stats2.transactions.intermediateCommits); } else { assertEqual(stats1.transactions.intermediateCommits, 0); @@ -115,7 +115,7 @@ function CommonStatisticsSuite() { assertMatch(/abort on purpose/, err.errorMessage); } - if(db._engine().name === "rocksdb" && !internal.isCluster()) { + if (db._engine().name === "rocksdb" && !internal.isCluster()) { assertTrue(stats1.transactions.intermediateCommits <= stats2.transactions.intermediateCommits); } else { assertEqual(stats1.transactions.intermediateCommits, 0);