From 950c3f40adc637a2232dee2fe9d84bd54ef89fc5 Mon Sep 17 00:00:00 2001 From: jsteemann Date: Fri, 3 Feb 2017 10:43:30 +0100 Subject: [PATCH] fix cluster AQL statistics --- arangod/Aql/ClusterBlocks.cpp | 23 ++++++++++++----------- arangod/Aql/ClusterBlocks.h | 4 ---- arangod/Aql/ExecutionEngine.cpp | 10 +++++++--- arangod/Aql/ExecutionPlan.cpp | 12 ++++++++++-- arangod/Aql/ExecutionPlan.h | 4 ++++ arangod/Aql/ExecutionStats.h | 25 +++++++++++++------------ arangod/Aql/Query.cpp | 24 +++++++++++------------- arangod/Aql/Query.h | 2 +- arangod/Aql/RestAqlHandler.cpp | 19 ------------------- arangod/Cluster/ClusterMethods.cpp | 1 + 10 files changed, 59 insertions(+), 65 deletions(-) diff --git a/arangod/Aql/ClusterBlocks.cpp b/arangod/Aql/ClusterBlocks.cpp index c077b3adbc..1cb85a8f5f 100644 --- a/arangod/Aql/ClusterBlocks.cpp +++ b/arangod/Aql/ClusterBlocks.cpp @@ -33,6 +33,7 @@ #include "Aql/AqlValue.h" #include "Aql/BlockCollector.h" #include "Aql/ExecutionEngine.h" +#include "Aql/ExecutionStats.h" #include "Basics/Exceptions.h" #include "Basics/StaticStrings.h" #include "Basics/StringBuffer.h" @@ -1328,7 +1329,7 @@ int RemoteBlock::initializeCursor(AqlItemBlock* items, size_t pos) { responseBodyBuf.c_str(), responseBodyBuf.length()); VPackSlice slice = builder->slice(); - + if (slice.hasKey("code")) { return slice.get("code").getNumericValue(); } @@ -1362,9 +1363,14 @@ int RemoteBlock::shutdown(int errorCode) { std::shared_ptr builder = VPackParser::fromJson(responseBodyBuf.c_str(), responseBodyBuf.length()); VPackSlice slice = builder->slice(); - - // read "warnings" attribute if present and add it to our query + if (slice.isObject()) { + if (slice.hasKey("stats")) { + ExecutionStats newStats(slice.get("stats")); + _engine->_stats.add(newStats); + } + + // read "warnings" attribute if present and add it to our query VPackSlice warnings = slice.get("warnings"); if (warnings.isArray()) { auto query = _engine->getQuery(); @@ -1415,19 +1421,14 @@ AqlItemBlock* RemoteBlock::getSome(size_t atLeast, size_t atMost) { res->result->getBodyVelocyPack(); VPackSlice responseBody = responseBodyBuilder->slice(); - ExecutionStats newStats(responseBody.get("stats")); - - _engine->_stats.addDelta(_deltaStats, newStats); - _deltaStats = newStats; - if (VelocyPackHelper::getBooleanValue(responseBody, "exhausted", true)) { traceGetSomeEnd(nullptr); return nullptr; } - auto r = new arangodb::aql::AqlItemBlock(_engine->getQuery()->resourceMonitor(), responseBody); - traceGetSomeEnd(r); - return r; + auto r = std::make_unique(_engine->getQuery()->resourceMonitor(), responseBody); + traceGetSomeEnd(r.get()); + return r.release(); // cppcheck-suppress style DEBUG_END_BLOCK(); diff --git a/arangod/Aql/ClusterBlocks.h b/arangod/Aql/ClusterBlocks.h index 947b19613e..4194bb994b 100644 --- a/arangod/Aql/ClusterBlocks.h +++ b/arangod/Aql/ClusterBlocks.h @@ -28,7 +28,6 @@ #include "Aql/ClusterNodes.h" #include "Aql/ExecutionBlock.h" #include "Aql/ExecutionNode.h" -#include "Aql/ExecutionStats.h" #include "Rest/GeneralRequest.h" namespace arangodb { @@ -339,9 +338,6 @@ class RemoteBlock : public ExecutionBlock { /// @brief the ID of the query on the server as a string std::string _queryId; - /// @brief the ID of the query on the server as a string - ExecutionStats _deltaStats; - /// @brief whether or not this block will forward initialize, /// initializeCursor or shutDown requests bool const _isResponsibleForInitializeCursor; diff --git a/arangod/Aql/ExecutionEngine.cpp b/arangod/Aql/ExecutionEngine.cpp index c4dd24bbc5..fea5cee911 100644 --- a/arangod/Aql/ExecutionEngine.cpp +++ b/arangod/Aql/ExecutionEngine.cpp @@ -529,6 +529,7 @@ struct CoordinatorInstanciator : public WalkerWorker { VPackBuilder tmp; query->ast()->variables()->toVelocyPack(tmp); + result.add("initialize", VPackValue(false)); result.add("variables", tmp.slice()); result.add("collections", VPackValue(VPackValueType::Array)); @@ -1133,7 +1134,7 @@ ExecutionEngine* ExecutionEngine::instantiateFromPlan( bool const isCoordinator = arangodb::ServerState::instance()->isCoordinator(role); bool const isDBServer = arangodb::ServerState::instance()->isDBServer(role); - + TRI_ASSERT(queryRegistry != nullptr); ExecutionEngine* engine = nullptr; @@ -1354,8 +1355,11 @@ ExecutionEngine* ExecutionEngine::instantiateFromPlan( } engine->_root = root; - root->initialize(); - root->initializeCursor(nullptr, 0); + + if (plan->isResponsibleForInitialize()) { + root->initialize(); + root->initializeCursor(nullptr, 0); + } return engine; } catch (...) { diff --git a/arangod/Aql/ExecutionPlan.cpp b/arangod/Aql/ExecutionPlan.cpp index 77231ded83..e0be4a6c44 100644 --- a/arangod/Aql/ExecutionPlan.cpp +++ b/arangod/Aql/ExecutionPlan.cpp @@ -177,6 +177,7 @@ ExecutionPlan::ExecutionPlan(Ast* ast) : _ids(), _root(nullptr), _varUsageComputed(false), + _isResponsibleForInitialize(true), _nextId(0), _ast(ast), _lastLimitNode(nullptr), @@ -280,6 +281,7 @@ ExecutionPlan* ExecutionPlan::clone() { plan->_root = _root->clone(plan.get(), true, false); plan->_nextId = _nextId; plan->_appliedRules = _appliedRules; + plan->_isResponsibleForInitialize = _isResponsibleForInitialize; CloneNodeAdder adder(plan.get()); plan->_root->walk(&adder); @@ -348,6 +350,7 @@ void ExecutionPlan::toVelocyPack(VPackBuilder& builder, Ast* ast, bool verbose) size_t nrItems = 0; builder.add("estimatedCost", VPackValue(_root->getCost(nrItems))); builder.add("estimatedNrItems", VPackValue(nrItems)); + builder.add("initialize", VPackValue(_isResponsibleForInitialize)); builder.close(); } @@ -1882,17 +1885,22 @@ void ExecutionPlan::insertDependency(ExecutionNode* oldNode, /// @brief create a plan from VPack ExecutionNode* ExecutionPlan::fromSlice(VPackSlice const& slice) { - ExecutionNode* ret = nullptr; - if (!slice.isObject()) { THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "plan slice is not an object"); } + if (slice.hasKey("initialize")) { + // whether or not this plan (or fragment) is responsible for calling initialize + _isResponsibleForInitialize = slice.get("initialize").getBoolean(); + } + VPackSlice nodes = slice.get("nodes"); if (!nodes.isArray()) { THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "plan \"nodes\" attribute is not an array"); } + + ExecutionNode* ret = nullptr; // first, re-create all nodes from the Slice, using the node ids // no dependency links will be set up in this step diff --git a/arangod/Aql/ExecutionPlan.h b/arangod/Aql/ExecutionPlan.h index 63b04cb2cb..39de6a2c35 100644 --- a/arangod/Aql/ExecutionPlan.h +++ b/arangod/Aql/ExecutionPlan.h @@ -75,6 +75,8 @@ class ExecutionPlan { /// @brief check if the plan is empty inline bool empty() const { return (_root == nullptr); } + + bool isResponsibleForInitialize() const { return _isResponsibleForInitialize; } /// @brief note that an optimizer rule was applied inline void addAppliedRule(int level) { _appliedRules.emplace_back(level); } @@ -299,6 +301,8 @@ class ExecutionPlan { /// @brief flag to indicate whether the variable usage is computed bool _varUsageComputed; + bool _isResponsibleForInitialize; + /// @brief auto-increment sequence for node ids size_t _nextId; diff --git a/arangod/Aql/ExecutionStats.h b/arangod/Aql/ExecutionStats.h index 2e92edb4a3..4aaf4b5b45 100644 --- a/arangod/Aql/ExecutionStats.h +++ b/arangod/Aql/ExecutionStats.h @@ -58,21 +58,22 @@ struct ExecutionStats { scannedIndex += summand.scannedIndex; filtered += summand.filtered; httpRequests += summand.httpRequests; - fullCount += summand.fullCount; + if (summand.fullCount > 0) { + // fullCount may be negative, don't add it then + fullCount += summand.fullCount; + } // intentionally no modification of executionTime } - /// @brief sumarize the delta of two other sets of ExecutionStats to us - void addDelta(ExecutionStats const& lastStats, - ExecutionStats const& newStats) { - writesExecuted += newStats.writesExecuted - lastStats.writesExecuted; - writesIgnored += newStats.writesIgnored - lastStats.writesIgnored; - scannedFull += newStats.scannedFull - lastStats.scannedFull; - scannedIndex += newStats.scannedIndex - lastStats.scannedIndex; - filtered += newStats.filtered - lastStats.filtered; - httpRequests += newStats.httpRequests - lastStats.httpRequests; - fullCount += newStats.fullCount - lastStats.fullCount; - // intentionally no modification of executionTime + void clear() { + writesExecuted = 0; + writesIgnored = 0; + scannedFull = 0; + scannedIndex = 0; + filtered = 0; + httpRequests = 0; + fullCount = -1; + executionTime = 0.0; } /// @brief number of successfully executed write operations diff --git a/arangod/Aql/Query.cpp b/arangod/Aql/Query.cpp index 535b9d9fbd..ad973ad648 100644 --- a/arangod/Aql/Query.cpp +++ b/arangod/Aql/Query.cpp @@ -731,14 +731,12 @@ QueryResult Query::execute(QueryRegistry* registry) { } _trx->commit(); + + result.context = _trx->transactionContext(); _engine->_stats.setExecutionTime(TRI_microtime() - _startTime); auto stats = std::make_shared(); - _engine->_stats.toVelocyPack(*(stats.get())); - - result.context = _trx->transactionContext(); - - cleanupPlanAndEngine(TRI_ERROR_NO_ERROR); + cleanupPlanAndEngine(TRI_ERROR_NO_ERROR, stats.get()); enterState(FINALIZATION); @@ -913,18 +911,15 @@ QueryResultV8 Query::executeV8(v8::Isolate* isolate, QueryRegistry* registry) { _trx->commit(); - _engine->_stats.setExecutionTime(TRI_microtime() - _startTime); - auto stats = std::make_shared(); - _engine->_stats.toVelocyPack(*(stats.get())); - - result.context = _trx->transactionContext(); - LOG_TOPIC(DEBUG, Logger::QUERIES) << TRI_microtime() - _startTime << " " << "Query::executeV8: before cleanupPlanAndEngine" << " this: " << (uintptr_t) this; - cleanupPlanAndEngine(TRI_ERROR_NO_ERROR); + result.context = _trx->transactionContext(); + _engine->_stats.setExecutionTime(TRI_microtime() - _startTime); + auto stats = std::make_shared(); + cleanupPlanAndEngine(TRI_ERROR_NO_ERROR, stats.get()); enterState(FINALIZATION); @@ -1387,10 +1382,13 @@ std::string Query::getStateString() const { } /// @brief cleanup plan and engine for current query -void Query::cleanupPlanAndEngine(int errorCode) { +void Query::cleanupPlanAndEngine(int errorCode, VPackBuilder* statsBuilder) { if (_engine != nullptr) { try { _engine->shutdown(errorCode); + if (statsBuilder != nullptr) { + _engine->_stats.toVelocyPack(*statsBuilder); + } } catch (...) { // shutdown may fail but we must not throw here // (we're also called from the destructor) diff --git a/arangod/Aql/Query.h b/arangod/Aql/Query.h index 1dea32cd8c..6717a2af71 100644 --- a/arangod/Aql/Query.h +++ b/arangod/Aql/Query.h @@ -378,7 +378,7 @@ class Query { void enterState(ExecutionState); /// @brief cleanup plan and engine for current query - void cleanupPlanAndEngine(int); + void cleanupPlanAndEngine(int, VPackBuilder* statsBuilder = nullptr); /// @brief create a TransactionContext std::shared_ptr createTransactionContext(); diff --git a/arangod/Aql/RestAqlHandler.cpp b/arangod/Aql/RestAqlHandler.cpp index 8e19b2cb5c..c4aac31141 100644 --- a/arangod/Aql/RestAqlHandler.cpp +++ b/arangod/Aql/RestAqlHandler.cpp @@ -697,7 +697,6 @@ void RestAqlHandler::handleUseQuery(std::string const& operation, Query* query, try { res = query->trx()->lockCollections(); } catch (...) { - LOG(ERR) << "lock lead to an exception"; generateError(rest::ResponseCode::SERVER_ERROR, TRI_ERROR_HTTP_SERVER_ERROR, "lock lead to an exception"); @@ -726,15 +725,10 @@ void RestAqlHandler::handleUseQuery(std::string const& operation, Query* query, if (items.get() == nullptr) { answerBuilder.add("exhausted", VPackValue(true)); answerBuilder.add("error", VPackValue(false)); - answerBuilder.add(VPackValue("stats")); - query->getStats(answerBuilder); } else { try { items->toVelocyPack(query->trx(), answerBuilder); - answerBuilder.add(VPackValue("stats")); - query->getStats(answerBuilder); } catch (...) { - LOG(ERR) << "cannot transform AqlItemBlock to VelocyPack"; generateError(rest::ResponseCode::SERVER_ERROR, TRI_ERROR_HTTP_SERVER_ERROR, "cannot transform AqlItemBlock to VelocyPack"); @@ -760,7 +754,6 @@ void RestAqlHandler::handleUseQuery(std::string const& operation, Query* query, skipped = block->skipSomeForShard(atLeast, atMost, shardId); } } catch (...) { - LOG(ERR) << "skipSome lead to an exception"; generateError(rest::ResponseCode::SERVER_ERROR, TRI_ERROR_HTTP_SERVER_ERROR, "skipSome lead to an exception"); @@ -768,8 +761,6 @@ void RestAqlHandler::handleUseQuery(std::string const& operation, Query* query, } answerBuilder.add("skipped", VPackValue(static_cast(skipped))); answerBuilder.add("error", VPackValue(false)); - answerBuilder.add(VPackValue("stats")); - query->getStats(answerBuilder); } else if (operation == "skip") { auto number = VelocyPackHelper::getNumericValue(querySlice, "number", 1); @@ -789,10 +780,7 @@ void RestAqlHandler::handleUseQuery(std::string const& operation, Query* query, } answerBuilder.add("exhausted", VPackValue(exhausted)); answerBuilder.add("error", VPackValue(false)); - answerBuilder.add(VPackValue("stats")); - query->getStats(answerBuilder); } catch (...) { - LOG(ERR) << "skip lead to an exception"; generateError(rest::ResponseCode::SERVER_ERROR, TRI_ERROR_HTTP_SERVER_ERROR, "skip lead to an exception"); @@ -803,7 +791,6 @@ void RestAqlHandler::handleUseQuery(std::string const& operation, Query* query, try { res = query->engine()->initialize(); } catch (...) { - LOG(ERR) << "initialize lead to an exception"; generateError(rest::ResponseCode::SERVER_ERROR, TRI_ERROR_HTTP_SERVER_ERROR, "initialize lead to an exception"); @@ -825,7 +812,6 @@ void RestAqlHandler::handleUseQuery(std::string const& operation, Query* query, res = query->engine()->initializeCursor(items.get(), pos); } } catch (...) { - LOG(ERR) << "initializeCursor lead to an exception"; generateError(rest::ResponseCode::SERVER_ERROR, TRI_ERROR_HTTP_SERVER_ERROR, "initializeCursor lead to an exception"); @@ -833,8 +819,6 @@ void RestAqlHandler::handleUseQuery(std::string const& operation, Query* query, } answerBuilder.add("error", VPackValue(res != TRI_ERROR_NO_ERROR)); answerBuilder.add("code", VPackValue(static_cast(res))); - answerBuilder.add(VPackValue("stats")); - query->getStats(answerBuilder); } else if (operation == "shutdown") { int res = TRI_ERROR_INTERNAL; int errorCode = VelocyPackHelper::getNumericValue( @@ -854,7 +838,6 @@ void RestAqlHandler::handleUseQuery(std::string const& operation, Query* query, _queryRegistry->destroy(_vocbase, _qId, errorCode); _qId = 0; } catch (...) { - LOG(ERR) << "shutdown lead to an exception"; generateError(rest::ResponseCode::SERVER_ERROR, TRI_ERROR_HTTP_SERVER_ERROR, "shutdown lead to an exception"); @@ -863,7 +846,6 @@ void RestAqlHandler::handleUseQuery(std::string const& operation, Query* query, answerBuilder.add("error", VPackValue(res != TRI_ERROR_NO_ERROR)); answerBuilder.add("code", VPackValue(res)); } else { - LOG(ERR) << "Unknown operation!"; generateError(rest::ResponseCode::NOT_FOUND, TRI_ERROR_HTTP_NOT_FOUND); return; @@ -875,7 +857,6 @@ void RestAqlHandler::handleUseQuery(std::string const& operation, Query* query, generateError(rest::ResponseCode::BAD, e.code()); return; } catch (...) { - LOG(ERR) << "OUT OF MEMORY when handling query."; generateError(rest::ResponseCode::BAD, TRI_ERROR_OUT_OF_MEMORY); return; } diff --git a/arangod/Cluster/ClusterMethods.cpp b/arangod/Cluster/ClusterMethods.cpp index 4e28253aa3..d26962c82d 100644 --- a/arangod/Cluster/ClusterMethods.cpp +++ b/arangod/Cluster/ClusterMethods.cpp @@ -2033,6 +2033,7 @@ int flushWalOnAllDBServers(bool waitForSync, bool waitForCollector) { } if (nrok != (int)DBservers.size()) { + LOG(WARN) << "could not flush WAL on all servers. confirmed: " << nrok << ", expected: " << DBservers.size(); return TRI_ERROR_INTERNAL; }