1
0
Fork 0

Actually use the given format to execute a query (#10484)

* Actually use the given format to execute a query

* Send the serialization format of AQL from Coordinators.
This commit is contained in:
Michael Hackstein 2019-11-21 09:10:32 +01:00 committed by GitHub
parent 688a680023
commit 3090e49258
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 68 additions and 64 deletions

View File

@ -23,6 +23,7 @@
#include "EngineInfoContainerDBServerServerBased.h"
#include "Aql/AqlItemBlockSerializationFormat.h"
#include "Aql/Ast.h"
#include "Aql/Collection.h"
#include "Aql/ExecutionNode.h"
@ -78,8 +79,7 @@ EngineInfoContainerDBServerServerBased::TraverserEngineShardLists::TraverserEngi
: _node(node), _hasShard(false) {
auto const& edges = _node->edgeColls();
TRI_ASSERT(!edges.empty());
std::unordered_set<std::string> const& restrictToShards =
query.queryOptions().shardIds;
std::unordered_set<std::string> const& restrictToShards = query.queryOptions().shardIds;
// Extract the local shards for edge collections.
for (auto const& col : edges) {
_edgeCollections.emplace_back(
@ -197,7 +197,7 @@ void EngineInfoContainerDBServerServerBased::injectVertexColletions(GraphNode* g
auto const& vCols = graphNode->vertexColls();
if (vCols.empty()) {
std::map<std::string, Collection*> const* allCollections =
_query.collections()->collections();
_query.collections()->collections();
auto& resolver = _query.resolver();
for (auto const& it : *allCollections) {
// If resolver cannot resolve this collection
@ -292,11 +292,11 @@ Result EngineInfoContainerDBServerServerBased::buildEngines(
// Build Lookup Infos
VPackBuilder infoBuilder;
transaction::Methods* trx = _query.trx();
network::RequestOptions options;
options.database = _query.vocbase().name();
options.timeout = network::Timeout(SETUP_TIMEOUT);
options.skipScheduler = true; // hack to speed up future.get()
options.skipScheduler = true; // hack to speed up future.get()
options.param("ttl", std::to_string(_query.queryOptions().ttl));
for (auto const& server : dbServers) {
@ -323,6 +323,8 @@ Result EngineInfoContainerDBServerServerBased::buildEngines(
TRI_ASSERT(didCreateEngine.size() == _graphNodes.size());
TRI_ASSERT(infoBuilder.isOpenObject());
infoBuilder.add(StaticStrings::SerializationFormat,
VPackValue(static_cast<int>(aql::SerializationFormat::SHADOWROWS)));
infoBuilder.close(); // Base object
TRI_ASSERT(infoBuilder.isClosed());
@ -468,8 +470,8 @@ void EngineInfoContainerDBServerServerBased::cleanupEngines(
network::RequestOptions options;
options.database = dbname;
options.timeout = network::Timeout(10.0); // Picked arbitrarily
options.skipScheduler = true; // hack to speed up future.get()
options.skipScheduler = true; // hack to speed up future.get()
// Shutdown query snippets
std::string url("/_api/aql/shutdown/");
VPackBuffer<uint8_t> body;
@ -485,7 +487,7 @@ void EngineInfoContainerDBServerServerBased::cleanupEngines(
for (auto const& shardId : serToSnippets.second) {
// fire and forget
network::sendRequest(pool, server, fuerte::RestVerb::Put, url + shardId,
/*copy*/body, options);
/*copy*/ body, options);
}
_query.incHttpRequests(serToSnippets.second.size());
}
@ -500,8 +502,7 @@ void EngineInfoContainerDBServerServerBased::cleanupEngines(
for (auto const& engine : *allEngines) {
// fire and forget
network::sendRequest(pool, engine.first, fuerte::RestVerb::Delete,
url + basics::StringUtils::itoa(engine.second),
noBody, options);
url + basics::StringUtils::itoa(engine.second), noBody, options);
}
_query.incHttpRequests(allEngines->size());
}

View File

@ -41,8 +41,8 @@
#include "Basics/ScopeGuard.h"
#include "Cluster/ServerState.h"
#include "Futures/Utilities.h"
#include "Logger/Logger.h"
#include "Logger/LogMacros.h"
#include "Logger/Logger.h"
#include "Network/Methods.h"
#include "Network/NetworkFeature.h"
#include "Network/Utils.h"
@ -478,7 +478,7 @@ struct DistributedQueryInstanciator final : public WalkerWorker<ExecutionNode> {
_dbserverParts.cleanupEngines(pool, TRI_ERROR_INTERNAL,
_query.vocbase().name(), queryIds);
});
std::unordered_map<size_t, size_t> nodeAliases;
ExecutionEngineResult res = _dbserverParts.buildEngines(queryIds, nodeAliases);
if (res.fail()) {
@ -489,8 +489,8 @@ struct DistributedQueryInstanciator final : public WalkerWorker<ExecutionNode> {
// however every engine gets injected the list of locked shards.
std::vector<uint64_t> coordinatorQueryIds{};
res = _coordinatorParts.buildEngines(_query, registry, _query.vocbase().name(),
_query.queryOptions().shardIds, queryIds,
coordinatorQueryIds);
_query.queryOptions().shardIds,
queryIds, coordinatorQueryIds);
if (res.ok()) {
TRI_ASSERT(_query.engine() != nullptr);
@ -506,14 +506,16 @@ struct DistributedQueryInstanciator final : public WalkerWorker<ExecutionNode> {
void ExecutionEngine::kill() {
// kill coordinator parts
// TODO: this doesn't seem to be necessary and sometimes even show adverse effects
// so leaving this deactivated for now
// auto queryRegistry = QueryRegistryFeature::registry();
// if (queryRegistry != nullptr) {
// for (auto const& id : _coordinatorQueryIds) {
// queryRegistry->kill(&(_query.vocbase()), id);
// }
// }
// TODO: this doesn't seem to be necessary and sometimes even show adverse
// effects so leaving this deactivated for now
/*
auto queryRegistry = QueryRegistryFeature::registry();
if (queryRegistry != nullptr) {
for (auto const& id : _coordinatorQueryIds) {
queryRegistry->kill(&(_query.vocbase()), id);
}
}
*/
// kill DB server parts
// RemoteNodeId -> DBServerId -> [snippetId]
@ -525,7 +527,7 @@ void ExecutionEngine::kill() {
VPackBuffer<uint8_t> body;
std::vector<network::FutureRes> futures;
for (auto const& it : _dbServerMapping) {
for (auto const& it2 : it.second) {
for (auto const& snippetId : it2.second) {
@ -536,7 +538,7 @@ void ExecutionEngine::kill() {
}
}
}
if (!futures.empty()) {
// killing is best-effort
// we are ignoring all errors intentionally here
@ -654,7 +656,8 @@ std::pair<ExecutionState, Result> ExecutionEngine::shutdown(int errorCode) {
/// @brief create an execution engine from a plan
ExecutionEngine* ExecutionEngine::instantiateFromPlan(QueryRegistry& queryRegistry,
Query& query, ExecutionPlan& plan,
bool planRegisters) {
bool planRegisters,
SerializationFormat format) {
auto role = arangodb::ServerState::instance()->getRole();
plan.findVarUsage();
@ -688,7 +691,7 @@ ExecutionEngine* ExecutionEngine::instantiateFromPlan(QueryRegistry& queryRegist
TRI_ASSERT(root != nullptr);
} else {
// instantiate the engine on a local server
engine.reset(new ExecutionEngine(query, SerializationFormat::SHADOWROWS));
engine.reset(new ExecutionEngine(query, format));
SingleServerQueryInstanciator inst(*engine);
plan.root()->walk(inst);
@ -709,7 +712,8 @@ ExecutionEngine* ExecutionEngine::instantiateFromPlan(QueryRegistry& queryRegist
bool const returnInheritedResults = !arangodb::ServerState::isDBServer(role);
if (returnInheritedResults) {
auto returnNode = dynamic_cast<ExecutionBlockImpl<IdExecutor<BlockPassthrough::Enable, void>>*>(root);
auto returnNode =
dynamic_cast<ExecutionBlockImpl<IdExecutor<BlockPassthrough::Enable, void>>*>(root);
TRI_ASSERT(returnNode != nullptr);
engine->resultRegister(returnNode->getOutputRegisterId());
} else {

View File

@ -58,7 +58,9 @@ class ExecutionEngine {
public:
// @brief create an execution engine from a plan
static ExecutionEngine* instantiateFromPlan(QueryRegistry&, Query&, ExecutionPlan&, bool);
static ExecutionEngine* instantiateFromPlan(QueryRegistry& queryRegistry, Query& query,
ExecutionPlan& plan, bool planRegisters,
SerializationFormat format);
TEST_VIRTUAL Result createBlocks(std::vector<ExecutionNode*> const& nodes,
std::unordered_set<std::string> const& restrictToShards,
@ -72,14 +74,14 @@ class ExecutionEngine {
/// @brief get the query
TEST_VIRTUAL Query* getQuery() const;
/// @brief server to snippet mapping
void snippetMapping(MapRemoteToSnippet&& dbServerMapping,
std::vector<uint64_t>&& coordinatorQueryIds) {
_dbServerMapping = std::move(dbServerMapping);
_coordinatorQueryIds = std::move(coordinatorQueryIds);
std::vector<uint64_t>&& coordinatorQueryIds) {
_dbServerMapping = std::move(dbServerMapping);
_coordinatorQueryIds = std::move(coordinatorQueryIds);
}
/// @brief kill the query
void kill();
@ -142,10 +144,10 @@ class ExecutionEngine {
/// @brief whether or not shutdown() was executed
bool _wasShutdown;
/// @brief server to snippet mapping
MapRemoteToSnippet _dbServerMapping;
/// @brief ids of all coordinator query snippets
std::vector<uint64_t> _coordinatorQueryIds;
};

View File

@ -276,8 +276,8 @@ Query* Query::clone(QueryPart part, bool withPlan) {
}
bool Query::killed() const {
if(_queryOptions.timeout > std::numeric_limits<double>::epsilon()) {
if(TRI_microtime() > (_startTime + _queryOptions.timeout)) {
if (_queryOptions.timeout > std::numeric_limits<double>::epsilon()) {
if (TRI_microtime() > (_startTime + _queryOptions.timeout)) {
return true;
}
}
@ -451,7 +451,7 @@ void Query::prepare(QueryRegistry* registry, SerializationFormat format) {
// this is confusing and should be fixed!
std::unique_ptr<ExecutionEngine> engine(
ExecutionEngine::instantiateFromPlan(*registry, *this, *plan,
!_queryString.empty()));
!_queryString.empty(), format));
if (_engine == nullptr) {
_engine = std::move(engine);
@ -655,7 +655,7 @@ ExecutionState Query::execute(QueryRegistry* registry, QueryResult& queryResult)
_resultBuilder->openArray();
_executionPhase = ExecutionPhase::EXECUTE;
}
[[fallthrough]];
[[fallthrough]];
case ExecutionPhase::EXECUTE: {
TRI_ASSERT(_resultBuilder != nullptr);
TRI_ASSERT(_resultBuilder->isOpenArray());
@ -734,7 +734,7 @@ ExecutionState Query::execute(QueryRegistry* registry, QueryResult& queryResult)
_executionPhase = ExecutionPhase::FINALIZE;
}
[[fallthrough]];
[[fallthrough]];
case ExecutionPhase::FINALIZE: {
// will set warnings, stats, profile and cleanup plan and engine
return finalize(queryResult);
@ -784,7 +784,7 @@ ExecutionState Query::execute(QueryRegistry* registry, QueryResult& queryResult)
QueryResult Query::executeSync(QueryRegistry* registry) {
std::shared_ptr<SharedQueryState> ss = sharedState();
ss->resetWakeupHandler();
QueryResult queryResult;
while (true) {
auto state = execute(registry, queryResult);
@ -872,7 +872,7 @@ QueryResultV8 Query::executeV8(v8::Isolate* isolate, QueryRegistry* registry) {
options.buildUnindexedArrays = true;
options.buildUnindexedObjects = true;
auto builder = std::make_shared<VPackBuilder>(&options);
try {
ss->resetWakeupHandler();

View File

@ -189,9 +189,8 @@ void RestAqlHandler::setupClusterQuery() {
// If we have a new format then it has to be included here.
// If not default to classic (old coordinator will not send it)
SerializationFormat format = static_cast<SerializationFormat>(
VelocyPackHelper::getNumericValue<int>(querySlice, "serializationFormat",
VelocyPackHelper::getNumericValue<int>(querySlice, StaticStrings::SerializationFormat,
static_cast<int>(SerializationFormat::CLASSIC)));
// Now we need to create shared_ptr<VPackBuilder>
// That contains the old-style cluster snippet in order
// to prepare create a Query object.
@ -315,7 +314,6 @@ bool RestAqlHandler::registerSnippets(
}
try {
if (needToLock) {
// Directly try to lock only the first snippet is required to be locked.
// For all others locking is pointless
@ -340,7 +338,7 @@ bool RestAqlHandler::registerSnippets(
// No need to cleanup...
}
QueryId qId = query->id(); // not true in general
QueryId qId = query->id(); // not true in general
TRI_ASSERT(qId > 0);
_queryRegistry->insert(qId, query.get(), ttl, true, false);
query.release();
@ -443,17 +441,16 @@ RestStatus RestAqlHandler::useQuery(std::string const& operation, std::string co
return RestStatus::DONE;
}
if (!_query) { // the PUT verb
if (!_query) { // the PUT verb
TRI_ASSERT(this->state() == RestHandler::HandlerState::EXECUTE);
_query = findQuery(idString);
if (!_query) {
return RestStatus::DONE;
}
std::shared_ptr<SharedQueryState> ss = _query->sharedState();
ss->setWakeupHandler([self = shared_from_this()] {
return self->wakeupHandler();
});
ss->setWakeupHandler(
[self = shared_from_this()] { return self->wakeupHandler(); });
}
TRI_ASSERT(_qId > 0);
@ -545,7 +542,7 @@ RestStatus RestAqlHandler::execute() {
}
break;
}
default: {
generateError(rest::ResponseCode::METHOD_NOT_ALLOWED,
TRI_ERROR_NOT_IMPLEMENTED, "illegal method for /_api/aql");
@ -640,7 +637,6 @@ Query* RestAqlHandler::findQuery(std::string const& idString) {
// handle for useQuery
RestStatus RestAqlHandler::handleUseQuery(std::string const& operation,
VPackSlice const querySlice) {
std::string const& shardId = _request->header("shard-id");
// upon first usage, the "initializeCursor" method must be called
@ -668,7 +664,7 @@ RestStatus RestAqlHandler::handleUseQuery(std::string const& operation,
VPackBuffer<uint8_t> answerBuffer;
VPackBuilder answerBuilder(answerBuffer);
answerBuilder.openObject(/*unindexed*/true);
answerBuilder.openObject(/*unindexed*/ true);
if (operation == "getSome") {
TRI_IF_FAILURE("RestAqlHandler::getSome") {
@ -756,9 +752,9 @@ RestStatus RestAqlHandler::handleUseQuery(std::string const& operation,
answerBuilder.add(StaticStrings::Error, VPackValue(res.fail()));
answerBuilder.add(StaticStrings::Code, VPackValue(res.errorNumber()));
} else if (operation == "shutdown") {
int errorCode =
VelocyPackHelper::getNumericValue<int>(querySlice, StaticStrings::Code, TRI_ERROR_INTERNAL);
int errorCode = VelocyPackHelper::getNumericValue<int>(querySlice, StaticStrings::Code,
TRI_ERROR_INTERNAL);
ExecutionState state;
Result res;
std::tie(state, res) = _query->engine()->shutdown(errorCode);
@ -787,10 +783,9 @@ RestStatus RestAqlHandler::handleUseQuery(std::string const& operation,
generateError(rest::ResponseCode::NOT_FOUND, TRI_ERROR_HTTP_NOT_FOUND);
return RestStatus::DONE;
}
answerBuilder.close();
generateResult(rest::ResponseCode::OK, std::move(answerBuffer),
transactionContext);
generateResult(rest::ResponseCode::OK, std::move(answerBuffer), transactionContext);
return RestStatus::DONE;
}

View File

@ -217,8 +217,7 @@ std::string const StaticStrings::MimeTypeDump(
std::string const StaticStrings::MimeTypeHtml("text/html; charset=utf-8");
std::string const StaticStrings::MimeTypeJson(
"application/json; charset=utf-8");
std::string const StaticStrings::MimeTypeJsonNoEncoding(
"application/json");
std::string const StaticStrings::MimeTypeJsonNoEncoding("application/json");
std::string const StaticStrings::MimeTypeText("text/plain; charset=utf-8");
std::string const StaticStrings::MimeTypeVPack("application/x-velocypack");
std::string const StaticStrings::MultiPartContentType("multipart/form-data");
@ -278,4 +277,6 @@ std::string const StaticStrings::Old("old");
std::string const StaticStrings::UpgradeEnvName(
"ARANGODB_UPGRADE_DURING_RESTORE");
std::string const StaticStrings::BackupToDeleteName("DIRECTORY_TO_DELETE");
std::string const StaticStrings::BackupSearchToDeleteName("DIRECTORY_TO_DELETE_SEARCH");
std::string const StaticStrings::BackupSearchToDeleteName(
"DIRECTORY_TO_DELETE_SEARCH");
std::string const StaticStrings::SerializationFormat("serializationFormat");

View File

@ -254,6 +254,7 @@ class StaticStrings {
static std::string const UpgradeEnvName;
static std::string const BackupToDeleteName;
static std::string const BackupSearchToDeleteName;
static std::string const SerializationFormat;
};
} // namespace arangodb