diff --git a/arangod/Aql/EngineInfoContainerDBServerServerBased.cpp b/arangod/Aql/EngineInfoContainerDBServerServerBased.cpp index 88aabcae78..2bc74e809e 100644 --- a/arangod/Aql/EngineInfoContainerDBServerServerBased.cpp +++ b/arangod/Aql/EngineInfoContainerDBServerServerBased.cpp @@ -23,6 +23,7 @@ #include "EngineInfoContainerDBServerServerBased.h" +#include "Aql/AqlItemBlockSerializationFormat.h" #include "Aql/Ast.h" #include "Aql/Collection.h" #include "Aql/ExecutionNode.h" @@ -78,8 +79,7 @@ EngineInfoContainerDBServerServerBased::TraverserEngineShardLists::TraverserEngi : _node(node), _hasShard(false) { auto const& edges = _node->edgeColls(); TRI_ASSERT(!edges.empty()); - std::unordered_set const& restrictToShards = - query.queryOptions().shardIds; + std::unordered_set const& restrictToShards = query.queryOptions().shardIds; // Extract the local shards for edge collections. for (auto const& col : edges) { _edgeCollections.emplace_back( @@ -197,7 +197,7 @@ void EngineInfoContainerDBServerServerBased::injectVertexColletions(GraphNode* g auto const& vCols = graphNode->vertexColls(); if (vCols.empty()) { std::map const* allCollections = - _query.collections()->collections(); + _query.collections()->collections(); auto& resolver = _query.resolver(); for (auto const& it : *allCollections) { // If resolver cannot resolve this collection @@ -292,11 +292,11 @@ Result EngineInfoContainerDBServerServerBased::buildEngines( // Build Lookup Infos VPackBuilder infoBuilder; transaction::Methods* trx = _query.trx(); - + network::RequestOptions options; options.database = _query.vocbase().name(); options.timeout = network::Timeout(SETUP_TIMEOUT); - options.skipScheduler = true; // hack to speed up future.get() + options.skipScheduler = true; // hack to speed up future.get() options.param("ttl", std::to_string(_query.queryOptions().ttl)); for (auto const& server : dbServers) { @@ -323,6 +323,8 @@ Result EngineInfoContainerDBServerServerBased::buildEngines( TRI_ASSERT(didCreateEngine.size() == _graphNodes.size()); TRI_ASSERT(infoBuilder.isOpenObject()); + infoBuilder.add(StaticStrings::SerializationFormat, + VPackValue(static_cast(aql::SerializationFormat::SHADOWROWS))); infoBuilder.close(); // Base object TRI_ASSERT(infoBuilder.isClosed()); @@ -468,8 +470,8 @@ void EngineInfoContainerDBServerServerBased::cleanupEngines( network::RequestOptions options; options.database = dbname; options.timeout = network::Timeout(10.0); // Picked arbitrarily - options.skipScheduler = true; // hack to speed up future.get() - + options.skipScheduler = true; // hack to speed up future.get() + // Shutdown query snippets std::string url("/_api/aql/shutdown/"); VPackBuffer body; @@ -485,7 +487,7 @@ void EngineInfoContainerDBServerServerBased::cleanupEngines( for (auto const& shardId : serToSnippets.second) { // fire and forget network::sendRequest(pool, server, fuerte::RestVerb::Put, url + shardId, - /*copy*/body, options); + /*copy*/ body, options); } _query.incHttpRequests(serToSnippets.second.size()); } @@ -500,8 +502,7 @@ void EngineInfoContainerDBServerServerBased::cleanupEngines( for (auto const& engine : *allEngines) { // fire and forget network::sendRequest(pool, engine.first, fuerte::RestVerb::Delete, - url + basics::StringUtils::itoa(engine.second), - noBody, options); + url + basics::StringUtils::itoa(engine.second), noBody, options); } _query.incHttpRequests(allEngines->size()); } diff --git a/arangod/Aql/ExecutionEngine.cpp b/arangod/Aql/ExecutionEngine.cpp index 9dfb03a477..cdbeed882c 100644 --- a/arangod/Aql/ExecutionEngine.cpp +++ b/arangod/Aql/ExecutionEngine.cpp @@ -41,8 +41,8 @@ #include "Basics/ScopeGuard.h" #include "Cluster/ServerState.h" #include "Futures/Utilities.h" -#include "Logger/Logger.h" #include "Logger/LogMacros.h" +#include "Logger/Logger.h" #include "Network/Methods.h" #include "Network/NetworkFeature.h" #include "Network/Utils.h" @@ -478,7 +478,7 @@ struct DistributedQueryInstanciator final : public WalkerWorker { _dbserverParts.cleanupEngines(pool, TRI_ERROR_INTERNAL, _query.vocbase().name(), queryIds); }); - + std::unordered_map nodeAliases; ExecutionEngineResult res = _dbserverParts.buildEngines(queryIds, nodeAliases); if (res.fail()) { @@ -489,8 +489,8 @@ struct DistributedQueryInstanciator final : public WalkerWorker { // however every engine gets injected the list of locked shards. std::vector coordinatorQueryIds{}; res = _coordinatorParts.buildEngines(_query, registry, _query.vocbase().name(), - _query.queryOptions().shardIds, queryIds, - coordinatorQueryIds); + _query.queryOptions().shardIds, + queryIds, coordinatorQueryIds); if (res.ok()) { TRI_ASSERT(_query.engine() != nullptr); @@ -506,14 +506,16 @@ struct DistributedQueryInstanciator final : public WalkerWorker { void ExecutionEngine::kill() { // kill coordinator parts - // TODO: this doesn't seem to be necessary and sometimes even show adverse effects - // so leaving this deactivated for now - // auto queryRegistry = QueryRegistryFeature::registry(); - // if (queryRegistry != nullptr) { - // for (auto const& id : _coordinatorQueryIds) { - // queryRegistry->kill(&(_query.vocbase()), id); - // } - // } + // TODO: this doesn't seem to be necessary and sometimes even show adverse + // effects so leaving this deactivated for now + /* + auto queryRegistry = QueryRegistryFeature::registry(); + if (queryRegistry != nullptr) { + for (auto const& id : _coordinatorQueryIds) { + queryRegistry->kill(&(_query.vocbase()), id); + } + } + */ // kill DB server parts // RemoteNodeId -> DBServerId -> [snippetId] @@ -525,7 +527,7 @@ void ExecutionEngine::kill() { VPackBuffer body; std::vector futures; - + for (auto const& it : _dbServerMapping) { for (auto const& it2 : it.second) { for (auto const& snippetId : it2.second) { @@ -536,7 +538,7 @@ void ExecutionEngine::kill() { } } } - + if (!futures.empty()) { // killing is best-effort // we are ignoring all errors intentionally here @@ -654,7 +656,8 @@ std::pair ExecutionEngine::shutdown(int errorCode) { /// @brief create an execution engine from a plan ExecutionEngine* ExecutionEngine::instantiateFromPlan(QueryRegistry& queryRegistry, Query& query, ExecutionPlan& plan, - bool planRegisters) { + bool planRegisters, + SerializationFormat format) { auto role = arangodb::ServerState::instance()->getRole(); plan.findVarUsage(); @@ -688,7 +691,7 @@ ExecutionEngine* ExecutionEngine::instantiateFromPlan(QueryRegistry& queryRegist TRI_ASSERT(root != nullptr); } else { // instantiate the engine on a local server - engine.reset(new ExecutionEngine(query, SerializationFormat::SHADOWROWS)); + engine.reset(new ExecutionEngine(query, format)); SingleServerQueryInstanciator inst(*engine); plan.root()->walk(inst); @@ -709,7 +712,8 @@ ExecutionEngine* ExecutionEngine::instantiateFromPlan(QueryRegistry& queryRegist bool const returnInheritedResults = !arangodb::ServerState::isDBServer(role); if (returnInheritedResults) { - auto returnNode = dynamic_cast>*>(root); + auto returnNode = + dynamic_cast>*>(root); TRI_ASSERT(returnNode != nullptr); engine->resultRegister(returnNode->getOutputRegisterId()); } else { diff --git a/arangod/Aql/ExecutionEngine.h b/arangod/Aql/ExecutionEngine.h index b3b70ff34d..92fb7ebc9e 100644 --- a/arangod/Aql/ExecutionEngine.h +++ b/arangod/Aql/ExecutionEngine.h @@ -58,7 +58,9 @@ class ExecutionEngine { public: // @brief create an execution engine from a plan - static ExecutionEngine* instantiateFromPlan(QueryRegistry&, Query&, ExecutionPlan&, bool); + static ExecutionEngine* instantiateFromPlan(QueryRegistry& queryRegistry, Query& query, + ExecutionPlan& plan, bool planRegisters, + SerializationFormat format); TEST_VIRTUAL Result createBlocks(std::vector const& nodes, std::unordered_set const& restrictToShards, @@ -72,14 +74,14 @@ class ExecutionEngine { /// @brief get the query TEST_VIRTUAL Query* getQuery() const; - + /// @brief server to snippet mapping void snippetMapping(MapRemoteToSnippet&& dbServerMapping, - std::vector&& coordinatorQueryIds) { - _dbServerMapping = std::move(dbServerMapping); - _coordinatorQueryIds = std::move(coordinatorQueryIds); + std::vector&& coordinatorQueryIds) { + _dbServerMapping = std::move(dbServerMapping); + _coordinatorQueryIds = std::move(coordinatorQueryIds); } - + /// @brief kill the query void kill(); @@ -142,10 +144,10 @@ class ExecutionEngine { /// @brief whether or not shutdown() was executed bool _wasShutdown; - + /// @brief server to snippet mapping MapRemoteToSnippet _dbServerMapping; - + /// @brief ids of all coordinator query snippets std::vector _coordinatorQueryIds; }; diff --git a/arangod/Aql/Query.cpp b/arangod/Aql/Query.cpp index 97e91944ea..511e993494 100644 --- a/arangod/Aql/Query.cpp +++ b/arangod/Aql/Query.cpp @@ -276,8 +276,8 @@ Query* Query::clone(QueryPart part, bool withPlan) { } bool Query::killed() const { - if(_queryOptions.timeout > std::numeric_limits::epsilon()) { - if(TRI_microtime() > (_startTime + _queryOptions.timeout)) { + if (_queryOptions.timeout > std::numeric_limits::epsilon()) { + if (TRI_microtime() > (_startTime + _queryOptions.timeout)) { return true; } } @@ -451,7 +451,7 @@ void Query::prepare(QueryRegistry* registry, SerializationFormat format) { // this is confusing and should be fixed! std::unique_ptr engine( ExecutionEngine::instantiateFromPlan(*registry, *this, *plan, - !_queryString.empty())); + !_queryString.empty(), format)); if (_engine == nullptr) { _engine = std::move(engine); @@ -655,7 +655,7 @@ ExecutionState Query::execute(QueryRegistry* registry, QueryResult& queryResult) _resultBuilder->openArray(); _executionPhase = ExecutionPhase::EXECUTE; } - [[fallthrough]]; + [[fallthrough]]; case ExecutionPhase::EXECUTE: { TRI_ASSERT(_resultBuilder != nullptr); TRI_ASSERT(_resultBuilder->isOpenArray()); @@ -734,7 +734,7 @@ ExecutionState Query::execute(QueryRegistry* registry, QueryResult& queryResult) _executionPhase = ExecutionPhase::FINALIZE; } - [[fallthrough]]; + [[fallthrough]]; case ExecutionPhase::FINALIZE: { // will set warnings, stats, profile and cleanup plan and engine return finalize(queryResult); @@ -784,7 +784,7 @@ ExecutionState Query::execute(QueryRegistry* registry, QueryResult& queryResult) QueryResult Query::executeSync(QueryRegistry* registry) { std::shared_ptr ss = sharedState(); ss->resetWakeupHandler(); - + QueryResult queryResult; while (true) { auto state = execute(registry, queryResult); @@ -872,7 +872,7 @@ QueryResultV8 Query::executeV8(v8::Isolate* isolate, QueryRegistry* registry) { options.buildUnindexedArrays = true; options.buildUnindexedObjects = true; auto builder = std::make_shared(&options); - + try { ss->resetWakeupHandler(); diff --git a/arangod/Aql/RestAqlHandler.cpp b/arangod/Aql/RestAqlHandler.cpp index 47f443ae4a..6521ffdab5 100644 --- a/arangod/Aql/RestAqlHandler.cpp +++ b/arangod/Aql/RestAqlHandler.cpp @@ -189,9 +189,8 @@ void RestAqlHandler::setupClusterQuery() { // If we have a new format then it has to be included here. // If not default to classic (old coordinator will not send it) SerializationFormat format = static_cast( - VelocyPackHelper::getNumericValue(querySlice, "serializationFormat", + VelocyPackHelper::getNumericValue(querySlice, StaticStrings::SerializationFormat, static_cast(SerializationFormat::CLASSIC))); - // Now we need to create shared_ptr // That contains the old-style cluster snippet in order // to prepare create a Query object. @@ -315,7 +314,6 @@ bool RestAqlHandler::registerSnippets( } try { - if (needToLock) { // Directly try to lock only the first snippet is required to be locked. // For all others locking is pointless @@ -340,7 +338,7 @@ bool RestAqlHandler::registerSnippets( // No need to cleanup... } - QueryId qId = query->id(); // not true in general + QueryId qId = query->id(); // not true in general TRI_ASSERT(qId > 0); _queryRegistry->insert(qId, query.get(), ttl, true, false); query.release(); @@ -443,17 +441,16 @@ RestStatus RestAqlHandler::useQuery(std::string const& operation, std::string co return RestStatus::DONE; } - if (!_query) { // the PUT verb + if (!_query) { // the PUT verb TRI_ASSERT(this->state() == RestHandler::HandlerState::EXECUTE); - + _query = findQuery(idString); if (!_query) { return RestStatus::DONE; } std::shared_ptr ss = _query->sharedState(); - ss->setWakeupHandler([self = shared_from_this()] { - return self->wakeupHandler(); - }); + ss->setWakeupHandler( + [self = shared_from_this()] { return self->wakeupHandler(); }); } TRI_ASSERT(_qId > 0); @@ -545,7 +542,7 @@ RestStatus RestAqlHandler::execute() { } break; } - + default: { generateError(rest::ResponseCode::METHOD_NOT_ALLOWED, TRI_ERROR_NOT_IMPLEMENTED, "illegal method for /_api/aql"); @@ -640,7 +637,6 @@ Query* RestAqlHandler::findQuery(std::string const& idString) { // handle for useQuery RestStatus RestAqlHandler::handleUseQuery(std::string const& operation, VPackSlice const querySlice) { - std::string const& shardId = _request->header("shard-id"); // upon first usage, the "initializeCursor" method must be called @@ -668,7 +664,7 @@ RestStatus RestAqlHandler::handleUseQuery(std::string const& operation, VPackBuffer answerBuffer; VPackBuilder answerBuilder(answerBuffer); - answerBuilder.openObject(/*unindexed*/true); + answerBuilder.openObject(/*unindexed*/ true); if (operation == "getSome") { TRI_IF_FAILURE("RestAqlHandler::getSome") { @@ -756,9 +752,9 @@ RestStatus RestAqlHandler::handleUseQuery(std::string const& operation, answerBuilder.add(StaticStrings::Error, VPackValue(res.fail())); answerBuilder.add(StaticStrings::Code, VPackValue(res.errorNumber())); } else if (operation == "shutdown") { - int errorCode = - VelocyPackHelper::getNumericValue(querySlice, StaticStrings::Code, TRI_ERROR_INTERNAL); - + int errorCode = VelocyPackHelper::getNumericValue(querySlice, StaticStrings::Code, + TRI_ERROR_INTERNAL); + ExecutionState state; Result res; std::tie(state, res) = _query->engine()->shutdown(errorCode); @@ -787,10 +783,9 @@ RestStatus RestAqlHandler::handleUseQuery(std::string const& operation, generateError(rest::ResponseCode::NOT_FOUND, TRI_ERROR_HTTP_NOT_FOUND); return RestStatus::DONE; } - + answerBuilder.close(); - generateResult(rest::ResponseCode::OK, std::move(answerBuffer), - transactionContext); - + generateResult(rest::ResponseCode::OK, std::move(answerBuffer), transactionContext); + return RestStatus::DONE; } diff --git a/lib/Basics/StaticStrings.cpp b/lib/Basics/StaticStrings.cpp index e05c5398af..d82726fd00 100644 --- a/lib/Basics/StaticStrings.cpp +++ b/lib/Basics/StaticStrings.cpp @@ -217,8 +217,7 @@ std::string const StaticStrings::MimeTypeDump( std::string const StaticStrings::MimeTypeHtml("text/html; charset=utf-8"); std::string const StaticStrings::MimeTypeJson( "application/json; charset=utf-8"); -std::string const StaticStrings::MimeTypeJsonNoEncoding( - "application/json"); +std::string const StaticStrings::MimeTypeJsonNoEncoding("application/json"); std::string const StaticStrings::MimeTypeText("text/plain; charset=utf-8"); std::string const StaticStrings::MimeTypeVPack("application/x-velocypack"); std::string const StaticStrings::MultiPartContentType("multipart/form-data"); @@ -278,4 +277,6 @@ std::string const StaticStrings::Old("old"); std::string const StaticStrings::UpgradeEnvName( "ARANGODB_UPGRADE_DURING_RESTORE"); std::string const StaticStrings::BackupToDeleteName("DIRECTORY_TO_DELETE"); -std::string const StaticStrings::BackupSearchToDeleteName("DIRECTORY_TO_DELETE_SEARCH"); +std::string const StaticStrings::BackupSearchToDeleteName( + "DIRECTORY_TO_DELETE_SEARCH"); +std::string const StaticStrings::SerializationFormat("serializationFormat"); diff --git a/lib/Basics/StaticStrings.h b/lib/Basics/StaticStrings.h index 82ac6555de..180416dfc9 100644 --- a/lib/Basics/StaticStrings.h +++ b/lib/Basics/StaticStrings.h @@ -254,6 +254,7 @@ class StaticStrings { static std::string const UpgradeEnvName; static std::string const BackupToDeleteName; static std::string const BackupSearchToDeleteName; + static std::string const SerializationFormat; }; } // namespace arangodb