1
0
Fork 0

fix cluster AQL statistics

This commit is contained in:
jsteemann 2017-02-03 10:43:30 +01:00
parent af0f52b5a1
commit 950c3f40ad
10 changed files with 59 additions and 65 deletions

View File

@ -33,6 +33,7 @@
#include "Aql/AqlValue.h"
#include "Aql/BlockCollector.h"
#include "Aql/ExecutionEngine.h"
#include "Aql/ExecutionStats.h"
#include "Basics/Exceptions.h"
#include "Basics/StaticStrings.h"
#include "Basics/StringBuffer.h"
@ -1328,7 +1329,7 @@ int RemoteBlock::initializeCursor(AqlItemBlock* items, size_t pos) {
responseBodyBuf.c_str(), responseBodyBuf.length());
VPackSlice slice = builder->slice();
if (slice.hasKey("code")) {
return slice.get("code").getNumericValue<int>();
}
@ -1362,9 +1363,14 @@ int RemoteBlock::shutdown(int errorCode) {
std::shared_ptr<VPackBuilder> builder =
VPackParser::fromJson(responseBodyBuf.c_str(), responseBodyBuf.length());
VPackSlice slice = builder->slice();
// read "warnings" attribute if present and add it to our query
if (slice.isObject()) {
if (slice.hasKey("stats")) {
ExecutionStats newStats(slice.get("stats"));
_engine->_stats.add(newStats);
}
// read "warnings" attribute if present and add it to our query
VPackSlice warnings = slice.get("warnings");
if (warnings.isArray()) {
auto query = _engine->getQuery();
@ -1415,19 +1421,14 @@ AqlItemBlock* RemoteBlock::getSome(size_t atLeast, size_t atMost) {
res->result->getBodyVelocyPack();
VPackSlice responseBody = responseBodyBuilder->slice();
ExecutionStats newStats(responseBody.get("stats"));
_engine->_stats.addDelta(_deltaStats, newStats);
_deltaStats = newStats;
if (VelocyPackHelper::getBooleanValue(responseBody, "exhausted", true)) {
traceGetSomeEnd(nullptr);
return nullptr;
}
auto r = new arangodb::aql::AqlItemBlock(_engine->getQuery()->resourceMonitor(), responseBody);
traceGetSomeEnd(r);
return r;
auto r = std::make_unique<AqlItemBlock>(_engine->getQuery()->resourceMonitor(), responseBody);
traceGetSomeEnd(r.get());
return r.release();
// cppcheck-suppress style
DEBUG_END_BLOCK();

View File

@ -28,7 +28,6 @@
#include "Aql/ClusterNodes.h"
#include "Aql/ExecutionBlock.h"
#include "Aql/ExecutionNode.h"
#include "Aql/ExecutionStats.h"
#include "Rest/GeneralRequest.h"
namespace arangodb {
@ -339,9 +338,6 @@ class RemoteBlock : public ExecutionBlock {
/// @brief the ID of the query on the server as a string
std::string _queryId;
/// @brief the ID of the query on the server as a string
ExecutionStats _deltaStats;
/// @brief whether or not this block will forward initialize,
/// initializeCursor or shutDown requests
bool const _isResponsibleForInitializeCursor;

View File

@ -529,6 +529,7 @@ struct CoordinatorInstanciator : public WalkerWorker<ExecutionNode> {
VPackBuilder tmp;
query->ast()->variables()->toVelocyPack(tmp);
result.add("initialize", VPackValue(false));
result.add("variables", tmp.slice());
result.add("collections", VPackValue(VPackValueType::Array));
@ -1133,7 +1134,7 @@ ExecutionEngine* ExecutionEngine::instantiateFromPlan(
bool const isCoordinator =
arangodb::ServerState::instance()->isCoordinator(role);
bool const isDBServer = arangodb::ServerState::instance()->isDBServer(role);
TRI_ASSERT(queryRegistry != nullptr);
ExecutionEngine* engine = nullptr;
@ -1354,8 +1355,11 @@ ExecutionEngine* ExecutionEngine::instantiateFromPlan(
}
engine->_root = root;
root->initialize();
root->initializeCursor(nullptr, 0);
if (plan->isResponsibleForInitialize()) {
root->initialize();
root->initializeCursor(nullptr, 0);
}
return engine;
} catch (...) {

View File

@ -177,6 +177,7 @@ ExecutionPlan::ExecutionPlan(Ast* ast)
: _ids(),
_root(nullptr),
_varUsageComputed(false),
_isResponsibleForInitialize(true),
_nextId(0),
_ast(ast),
_lastLimitNode(nullptr),
@ -280,6 +281,7 @@ ExecutionPlan* ExecutionPlan::clone() {
plan->_root = _root->clone(plan.get(), true, false);
plan->_nextId = _nextId;
plan->_appliedRules = _appliedRules;
plan->_isResponsibleForInitialize = _isResponsibleForInitialize;
CloneNodeAdder adder(plan.get());
plan->_root->walk(&adder);
@ -348,6 +350,7 @@ void ExecutionPlan::toVelocyPack(VPackBuilder& builder, Ast* ast, bool verbose)
size_t nrItems = 0;
builder.add("estimatedCost", VPackValue(_root->getCost(nrItems)));
builder.add("estimatedNrItems", VPackValue(nrItems));
builder.add("initialize", VPackValue(_isResponsibleForInitialize));
builder.close();
}
@ -1882,17 +1885,22 @@ void ExecutionPlan::insertDependency(ExecutionNode* oldNode,
/// @brief create a plan from VPack
ExecutionNode* ExecutionPlan::fromSlice(VPackSlice const& slice) {
ExecutionNode* ret = nullptr;
if (!slice.isObject()) {
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "plan slice is not an object");
}
if (slice.hasKey("initialize")) {
// whether or not this plan (or fragment) is responsible for calling initialize
_isResponsibleForInitialize = slice.get("initialize").getBoolean();
}
VPackSlice nodes = slice.get("nodes");
if (!nodes.isArray()) {
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "plan \"nodes\" attribute is not an array");
}
ExecutionNode* ret = nullptr;
// first, re-create all nodes from the Slice, using the node ids
// no dependency links will be set up in this step

View File

@ -75,6 +75,8 @@ class ExecutionPlan {
/// @brief check if the plan is empty
inline bool empty() const { return (_root == nullptr); }
bool isResponsibleForInitialize() const { return _isResponsibleForInitialize; }
/// @brief note that an optimizer rule was applied
inline void addAppliedRule(int level) { _appliedRules.emplace_back(level); }
@ -299,6 +301,8 @@ class ExecutionPlan {
/// @brief flag to indicate whether the variable usage is computed
bool _varUsageComputed;
bool _isResponsibleForInitialize;
/// @brief auto-increment sequence for node ids
size_t _nextId;

View File

@ -58,21 +58,22 @@ struct ExecutionStats {
scannedIndex += summand.scannedIndex;
filtered += summand.filtered;
httpRequests += summand.httpRequests;
fullCount += summand.fullCount;
if (summand.fullCount > 0) {
// fullCount may be negative, don't add it then
fullCount += summand.fullCount;
}
// intentionally no modification of executionTime
}
/// @brief sumarize the delta of two other sets of ExecutionStats to us
void addDelta(ExecutionStats const& lastStats,
ExecutionStats const& newStats) {
writesExecuted += newStats.writesExecuted - lastStats.writesExecuted;
writesIgnored += newStats.writesIgnored - lastStats.writesIgnored;
scannedFull += newStats.scannedFull - lastStats.scannedFull;
scannedIndex += newStats.scannedIndex - lastStats.scannedIndex;
filtered += newStats.filtered - lastStats.filtered;
httpRequests += newStats.httpRequests - lastStats.httpRequests;
fullCount += newStats.fullCount - lastStats.fullCount;
// intentionally no modification of executionTime
void clear() {
writesExecuted = 0;
writesIgnored = 0;
scannedFull = 0;
scannedIndex = 0;
filtered = 0;
httpRequests = 0;
fullCount = -1;
executionTime = 0.0;
}
/// @brief number of successfully executed write operations

View File

@ -731,14 +731,12 @@ QueryResult Query::execute(QueryRegistry* registry) {
}
_trx->commit();
result.context = _trx->transactionContext();
_engine->_stats.setExecutionTime(TRI_microtime() - _startTime);
auto stats = std::make_shared<VPackBuilder>();
_engine->_stats.toVelocyPack(*(stats.get()));
result.context = _trx->transactionContext();
cleanupPlanAndEngine(TRI_ERROR_NO_ERROR);
cleanupPlanAndEngine(TRI_ERROR_NO_ERROR, stats.get());
enterState(FINALIZATION);
@ -913,18 +911,15 @@ QueryResultV8 Query::executeV8(v8::Isolate* isolate, QueryRegistry* registry) {
_trx->commit();
_engine->_stats.setExecutionTime(TRI_microtime() - _startTime);
auto stats = std::make_shared<VPackBuilder>();
_engine->_stats.toVelocyPack(*(stats.get()));
result.context = _trx->transactionContext();
LOG_TOPIC(DEBUG, Logger::QUERIES)
<< TRI_microtime() - _startTime << " "
<< "Query::executeV8: before cleanupPlanAndEngine"
<< " this: " << (uintptr_t) this;
cleanupPlanAndEngine(TRI_ERROR_NO_ERROR);
result.context = _trx->transactionContext();
_engine->_stats.setExecutionTime(TRI_microtime() - _startTime);
auto stats = std::make_shared<VPackBuilder>();
cleanupPlanAndEngine(TRI_ERROR_NO_ERROR, stats.get());
enterState(FINALIZATION);
@ -1387,10 +1382,13 @@ std::string Query::getStateString() const {
}
/// @brief cleanup plan and engine for current query
void Query::cleanupPlanAndEngine(int errorCode) {
void Query::cleanupPlanAndEngine(int errorCode, VPackBuilder* statsBuilder) {
if (_engine != nullptr) {
try {
_engine->shutdown(errorCode);
if (statsBuilder != nullptr) {
_engine->_stats.toVelocyPack(*statsBuilder);
}
} catch (...) {
// shutdown may fail but we must not throw here
// (we're also called from the destructor)

View File

@ -378,7 +378,7 @@ class Query {
void enterState(ExecutionState);
/// @brief cleanup plan and engine for current query
void cleanupPlanAndEngine(int);
void cleanupPlanAndEngine(int, VPackBuilder* statsBuilder = nullptr);
/// @brief create a TransactionContext
std::shared_ptr<arangodb::TransactionContext> createTransactionContext();

View File

@ -697,7 +697,6 @@ void RestAqlHandler::handleUseQuery(std::string const& operation, Query* query,
try {
res = query->trx()->lockCollections();
} catch (...) {
LOG(ERR) << "lock lead to an exception";
generateError(rest::ResponseCode::SERVER_ERROR,
TRI_ERROR_HTTP_SERVER_ERROR,
"lock lead to an exception");
@ -726,15 +725,10 @@ void RestAqlHandler::handleUseQuery(std::string const& operation, Query* query,
if (items.get() == nullptr) {
answerBuilder.add("exhausted", VPackValue(true));
answerBuilder.add("error", VPackValue(false));
answerBuilder.add(VPackValue("stats"));
query->getStats(answerBuilder);
} else {
try {
items->toVelocyPack(query->trx(), answerBuilder);
answerBuilder.add(VPackValue("stats"));
query->getStats(answerBuilder);
} catch (...) {
LOG(ERR) << "cannot transform AqlItemBlock to VelocyPack";
generateError(rest::ResponseCode::SERVER_ERROR,
TRI_ERROR_HTTP_SERVER_ERROR,
"cannot transform AqlItemBlock to VelocyPack");
@ -760,7 +754,6 @@ void RestAqlHandler::handleUseQuery(std::string const& operation, Query* query,
skipped = block->skipSomeForShard(atLeast, atMost, shardId);
}
} catch (...) {
LOG(ERR) << "skipSome lead to an exception";
generateError(rest::ResponseCode::SERVER_ERROR,
TRI_ERROR_HTTP_SERVER_ERROR,
"skipSome lead to an exception");
@ -768,8 +761,6 @@ void RestAqlHandler::handleUseQuery(std::string const& operation, Query* query,
}
answerBuilder.add("skipped", VPackValue(static_cast<double>(skipped)));
answerBuilder.add("error", VPackValue(false));
answerBuilder.add(VPackValue("stats"));
query->getStats(answerBuilder);
} else if (operation == "skip") {
auto number =
VelocyPackHelper::getNumericValue<size_t>(querySlice, "number", 1);
@ -789,10 +780,7 @@ void RestAqlHandler::handleUseQuery(std::string const& operation, Query* query,
}
answerBuilder.add("exhausted", VPackValue(exhausted));
answerBuilder.add("error", VPackValue(false));
answerBuilder.add(VPackValue("stats"));
query->getStats(answerBuilder);
} catch (...) {
LOG(ERR) << "skip lead to an exception";
generateError(rest::ResponseCode::SERVER_ERROR,
TRI_ERROR_HTTP_SERVER_ERROR,
"skip lead to an exception");
@ -803,7 +791,6 @@ void RestAqlHandler::handleUseQuery(std::string const& operation, Query* query,
try {
res = query->engine()->initialize();
} catch (...) {
LOG(ERR) << "initialize lead to an exception";
generateError(rest::ResponseCode::SERVER_ERROR,
TRI_ERROR_HTTP_SERVER_ERROR,
"initialize lead to an exception");
@ -825,7 +812,6 @@ void RestAqlHandler::handleUseQuery(std::string const& operation, Query* query,
res = query->engine()->initializeCursor(items.get(), pos);
}
} catch (...) {
LOG(ERR) << "initializeCursor lead to an exception";
generateError(rest::ResponseCode::SERVER_ERROR,
TRI_ERROR_HTTP_SERVER_ERROR,
"initializeCursor lead to an exception");
@ -833,8 +819,6 @@ void RestAqlHandler::handleUseQuery(std::string const& operation, Query* query,
}
answerBuilder.add("error", VPackValue(res != TRI_ERROR_NO_ERROR));
answerBuilder.add("code", VPackValue(static_cast<double>(res)));
answerBuilder.add(VPackValue("stats"));
query->getStats(answerBuilder);
} else if (operation == "shutdown") {
int res = TRI_ERROR_INTERNAL;
int errorCode = VelocyPackHelper::getNumericValue<int>(
@ -854,7 +838,6 @@ void RestAqlHandler::handleUseQuery(std::string const& operation, Query* query,
_queryRegistry->destroy(_vocbase, _qId, errorCode);
_qId = 0;
} catch (...) {
LOG(ERR) << "shutdown lead to an exception";
generateError(rest::ResponseCode::SERVER_ERROR,
TRI_ERROR_HTTP_SERVER_ERROR,
"shutdown lead to an exception");
@ -863,7 +846,6 @@ void RestAqlHandler::handleUseQuery(std::string const& operation, Query* query,
answerBuilder.add("error", VPackValue(res != TRI_ERROR_NO_ERROR));
answerBuilder.add("code", VPackValue(res));
} else {
LOG(ERR) << "Unknown operation!";
generateError(rest::ResponseCode::NOT_FOUND,
TRI_ERROR_HTTP_NOT_FOUND);
return;
@ -875,7 +857,6 @@ void RestAqlHandler::handleUseQuery(std::string const& operation, Query* query,
generateError(rest::ResponseCode::BAD, e.code());
return;
} catch (...) {
LOG(ERR) << "OUT OF MEMORY when handling query.";
generateError(rest::ResponseCode::BAD, TRI_ERROR_OUT_OF_MEMORY);
return;
}

View File

@ -2033,6 +2033,7 @@ int flushWalOnAllDBServers(bool waitForSync, bool waitForCollector) {
}
if (nrok != (int)DBservers.size()) {
LOG(WARN) << "could not flush WAL on all servers. confirmed: " << nrok << ", expected: " << DBservers.size();
return TRI_ERROR_INTERNAL;
}