From 3dc2f9b29941ffa5fe6e9c0a19178be89b5fa478 Mon Sep 17 00:00:00 2001 From: Jan Steemann Date: Wed, 22 Oct 2014 16:30:06 +0200 Subject: [PATCH 01/11] fixed test --- arangod/Aql/Query.cpp | 36 +++++-------------- arangod/Aql/Query.h | 12 ------- arangod/V8Server/ApplicationV8.cpp | 6 ++-- js/server/modules/org/arangodb/aql-helper.js | 5 +-- .../aql-modify-noncluster-serializetest.js | 14 ++++---- 5 files changed, 20 insertions(+), 53 deletions(-) diff --git a/arangod/Aql/Query.cpp b/arangod/Aql/Query.cpp index ccf9747397..3e0c0bc233 100644 --- a/arangod/Aql/Query.cpp +++ b/arangod/Aql/Query.cpp @@ -146,7 +146,6 @@ Query::Query (triagens::arango::ApplicationV8* applicationV8, _trx(nullptr), _engine(nullptr), _part(part), - _clusterStatus(-1), _contextOwnedByExterior(contextOwnedByExterior) { TRI_ASSERT(_vocbase != nullptr); @@ -191,7 +190,6 @@ Query::Query (triagens::arango::ApplicationV8* applicationV8, _trx(nullptr), _engine(nullptr), _part(part), - _clusterStatus(-1), _contextOwnedByExterior(contextOwnedByExterior) { TRI_ASSERT(_vocbase != nullptr); @@ -796,22 +794,6 @@ char* Query::registerString (std::string const& p, // --SECTION-- private methods // ----------------------------------------------------------------------------- -//////////////////////////////////////////////////////////////////////////////// -/// @brief whether or not we are running in a cluster -//////////////////////////////////////////////////////////////////////////////// - -bool Query::isRunningInCluster () { - if (_clusterStatus == -1) { - // not yet determined - _clusterStatus = 0; - if (triagens::arango::ServerState::instance()->isRunningInCluster()) { - _clusterStatus = 1; - } - } - TRI_ASSERT(_clusterStatus == 0 || _clusterStatus == 1); - return (_clusterStatus == 1); -} - //////////////////////////////////////////////////////////////////////////////// /// @brief enter a V8 context //////////////////////////////////////////////////////////////////////////////// @@ -844,17 +826,15 @@ void Query::enterContext () { void Query::exitContext () { if (! _contextOwnedByExterior) { if (_context != nullptr) { - if (isRunningInCluster()) { - // unregister transaction and resolver in context - TRI_v8_global_t* v8g = static_cast(v8::Isolate::GetCurrent()->GetData()); - auto ctx = static_cast(v8g->_transactionContext); - if (ctx != nullptr) { - ctx->unregisterTransaction(); - } - - _applicationV8->exitContext(_context); - _context = nullptr; + // unregister transaction and resolver in context + TRI_v8_global_t* v8g = static_cast(v8::Isolate::GetCurrent()->GetData()); + auto ctx = static_cast(v8g->_transactionContext); + if (ctx != nullptr) { + ctx->unregisterTransaction(); } + + _applicationV8->exitContext(_context); + _context = nullptr; } TRI_ASSERT(_context == nullptr); } diff --git a/arangod/Aql/Query.h b/arangod/Aql/Query.h index 9d42055a89..66f33111cb 100644 --- a/arangod/Aql/Query.h +++ b/arangod/Aql/Query.h @@ -383,12 +383,6 @@ namespace triagens { // --SECTION-- private methods // ----------------------------------------------------------------------------- -//////////////////////////////////////////////////////////////////////////////// -/// @brief whether or not we are running in a cluster -//////////////////////////////////////////////////////////////////////////////// - - bool isRunningInCluster (); - //////////////////////////////////////////////////////////////////////////////// /// @brief fetch a boolean value from the options //////////////////////////////////////////////////////////////////////////////// @@ -562,12 +556,6 @@ namespace triagens { QueryPart const _part; -//////////////////////////////////////////////////////////////////////////////// -/// @brief internal variable we use to determine whether we are in a cluster -//////////////////////////////////////////////////////////////////////////////// - - short int _clusterStatus; - //////////////////////////////////////////////////////////////////////////////// /// @brief whether or not someone else has acquired a V8 context for us //////////////////////////////////////////////////////////////////////////////// diff --git a/arangod/V8Server/ApplicationV8.cpp b/arangod/V8Server/ApplicationV8.cpp index f8965e8725..087c3c92ca 100644 --- a/arangod/V8Server/ApplicationV8.cpp +++ b/arangod/V8Server/ApplicationV8.cpp @@ -286,7 +286,7 @@ ApplicationV8::ApplicationV8 (TRI_server_t* server, _gcFrequency(10.0), _v8Options(""), _startupLoader(), - _vocbase(0), + _vocbase(nullptr), _nrInstances(), _contexts(), _contextCondition(), @@ -352,7 +352,7 @@ ApplicationV8::V8Context* ApplicationV8::enterContext (std::string const& name, // in case we are in the shutdown phase, do not enter a context! // the context might have been deleted by the shutdown if (_stopping) { - return 0; + return nullptr; } LOG_TRACE("found unused V8 context"); @@ -594,7 +594,7 @@ void ApplicationV8::collectGarbage () { // there is no context to clean up, probably they all have been cleaned up // already. increase the wait time so we don't cycle too much in the GC loop // and waste CPU unnecessary - useReducedWait = (context != 0); + useReducedWait = (context != nullptr); } } diff --git a/js/server/modules/org/arangodb/aql-helper.js b/js/server/modules/org/arangodb/aql-helper.js index 20e17a02fc..0c23f1a539 100644 --- a/js/server/modules/org/arangodb/aql-helper.js +++ b/js/server/modules/org/arangodb/aql-helper.js @@ -531,8 +531,9 @@ function getQueryMultiplePlansAndExecutions (query, bindVars, testObject, debug) } results[i] = AQL_EXECUTEJSON(plans[i].plan, paramNone); - // ignore statistics for comparisons - delete results[i].stats; + // ignore these statistics for comparisons + delete results[i].stats.scannedFull; + delete results[i].stats.scannedIndex; if (debug) { require("internal").print("\n" + i + " DONE\n"); diff --git a/js/server/tests/aql-modify-noncluster-serializetest.js b/js/server/tests/aql-modify-noncluster-serializetest.js index 630cd8571d..7580fda0ed 100644 --- a/js/server/tests/aql-modify-noncluster-serializetest.js +++ b/js/server/tests/aql-modify-noncluster-serializetest.js @@ -34,7 +34,6 @@ var helper = require("org/arangodb/aql-helper"); var cluster = require("org/arangodb/cluster"); var getQueryMultiplePlansAndExecutions = helper.getQueryMultiplePlansAndExecutions; - //////////////////////////////////////////////////////////////////////////////// /// @brief test suite //////////////////////////////////////////////////////////////////////////////// @@ -95,7 +94,7 @@ function ahuacatlRemoveSuite () { var allresults = getQueryMultiplePlansAndExecutions(query, {}, this); assertEqual(100, c1.count()); - for (var i=0; i < allresults.results.length; i++) { + for (var i = 0; i < allresults.results.length; i++) { assertEqual(expected, allresults.results[i].stats, "comparing " + i + " : " + allresults.results[i].stats); } @@ -113,7 +112,7 @@ function ahuacatlRemoveSuite () { var allresults = getQueryMultiplePlansAndExecutions(query, { "@cn": cn1 }, this); assertEqual(100, c1.count()); - for (var i=0; i < allresults.results.length; i++) { + for (var i = 0; i < allresults.results.length; i++) { assertEqual(expected, allresults.results[i].stats, "comparing " + i + " : " + allresults.results[i].stats); } @@ -399,7 +398,6 @@ function ahuacatlInsertSuite () { } db._drop("UnitTestsAhuacatlEdge"); edge = db._createEdgeCollection("UnitTestsAhuacatlEdge"); - }, //////////////////////////////////////////////////////////////////////////////// @@ -424,7 +422,7 @@ function ahuacatlInsertSuite () { var allresults = getQueryMultiplePlansAndExecutions(query, {}, this); assertEqual(100, c1.count()); - for (var i=0; i < allresults.results.length; i++) { + for (var i = 0; i < allresults.results.length; i++) { assertEqual(expected, allresults.results[i].stats, "comparing " + i + " : " + allresults.results[i].stats); } @@ -439,7 +437,7 @@ function ahuacatlInsertSuite () { var allresults = getQueryMultiplePlansAndExecutions(query, { "@cn": cn1 }, this); assertEqual(100, c1.count()); - for (var i=0; i < allresults.results.length; i++) { + for (var i = 0; i < allresults.results.length; i++) { assertEqual(expected, allresults.results[i].stats, "comparing " + i + " : " + allresults.results[i].stats); } @@ -455,7 +453,7 @@ function ahuacatlInsertSuite () { var allresults = getQueryMultiplePlansAndExecutions(query, { "@cn": cn1 }, this); assertEqual(100, c1.count()); - for (var i=0; i < allresults.results.length; i++) { + for (var i = 0; i < allresults.results.length; i++) { assertEqual(expected, allresults.results[i].stats, "comparing " + i + " : " + allresults.results[i].stats); } @@ -471,7 +469,7 @@ function ahuacatlInsertSuite () { var allresults = getQueryMultiplePlansAndExecutions(query, { "@cn": cn1 }, this); assertEqual(101, c1.count()); - for (var i=0; i < allresults.results.length; i++) { + for (var i = 0; i < allresults.results.length; i++) { assertEqual(expected, allresults.results[i].stats, "comparing " + i + " : " + allresults.results[i].stats); } From 7f2373204a25b298352ba6d5d7826722f1c62a60 Mon Sep 17 00:00:00 2001 From: Jan Steemann Date: Wed, 22 Oct 2014 17:25:59 +0200 Subject: [PATCH 02/11] fixed cloning of coordinator queries --- arangod/Aql/ExecutionBlock.cpp | 1 + arangod/Aql/ExecutionBlock.h | 120 +++++++++++++++----------------- arangod/Aql/ExecutionEngine.cpp | 41 ++++------- arangod/Aql/ExecutionPlan.h | 8 +++ arangod/Aql/Query.cpp | 22 +++++- arangod/Aql/Query.h | 19 ++--- arangod/Aql/QueryRegistry.cpp | 5 +- arangod/Aql/QueryRegistry.h | 3 +- arangod/Aql/RestAqlHandler.cpp | 4 +- 9 files changed, 114 insertions(+), 109 deletions(-) diff --git a/arangod/Aql/ExecutionBlock.cpp b/arangod/Aql/ExecutionBlock.cpp index c0c00da4f4..d24041cc3d 100644 --- a/arangod/Aql/ExecutionBlock.cpp +++ b/arangod/Aql/ExecutionBlock.cpp @@ -2330,6 +2330,7 @@ int AggregateBlock::getOrSkipSome (size_t atLeast, _pos = 0; bool hasMore = ! _buffer.empty(); + if (! hasMore) { hasMore = ExecutionBlock::getBlock(atLeast, atMost); } diff --git a/arangod/Aql/ExecutionBlock.h b/arangod/Aql/ExecutionBlock.h index fc73c34454..f9d781e7ba 100644 --- a/arangod/Aql/ExecutionBlock.h +++ b/arangod/Aql/ExecutionBlock.h @@ -278,7 +278,7 @@ namespace triagens { virtual int64_t remaining (); - ExecutionNode const* getPlanNode () { + ExecutionNode const* getPlanNode () const { return _exeNode; } @@ -377,7 +377,7 @@ namespace triagens { } } - int initialize () { + int initialize () override { _inputRegisterValues = nullptr; // just in case return ExecutionBlock::initialize(); } @@ -386,19 +386,19 @@ namespace triagens { /// @brief initializeCursor, store a copy of the register values coming from above //////////////////////////////////////////////////////////////////////////////// - int initializeCursor (AqlItemBlock* items, size_t pos); + int initializeCursor (AqlItemBlock* items, size_t pos) override; int shutdown (); - bool hasMore () { + bool hasMore () override final { return ! _done; } - int64_t count () const { + int64_t count () const override final { return 1; } - int64_t remaining () { + int64_t remaining () override final { return _done ? 0 : 1; } @@ -456,15 +456,15 @@ namespace triagens { /// @brief initialize, here we fetch all docs from the database //////////////////////////////////////////////////////////////////////////////// - int initialize (); + int initialize () override; //////////////////////////////////////////////////////////////////////////////// /// @brief initCursor, here we release our docs from this collection //////////////////////////////////////////////////////////////////////////////// - int initializeCursor (AqlItemBlock* items, size_t pos); + int initializeCursor (AqlItemBlock* items, size_t pos) override; - AqlItemBlock* getSome (size_t atLeast, size_t atMost); + AqlItemBlock* getSome (size_t atLeast, size_t atMost) override; //////////////////////////////////////////////////////////////////////////////// // skip between atLeast and atMost, returns the number actually skipped . . . @@ -472,7 +472,7 @@ namespace triagens { // things to skip overall. //////////////////////////////////////////////////////////////////////////////// - size_t skipSome (size_t atLeast, size_t atMost); + size_t skipSome (size_t atLeast, size_t atMost) override final; // ----------------------------------------------------------------------------- // --SECTION-- private variables @@ -529,15 +529,15 @@ namespace triagens { /// @brief initialize, here we fetch all docs from the database //////////////////////////////////////////////////////////////////////////////// - int initialize (); + int initialize () override; //////////////////////////////////////////////////////////////////////////////// /// @brief initializeCursor, here we release our docs from this collection //////////////////////////////////////////////////////////////////////////////// - int initializeCursor (AqlItemBlock* items, size_t pos); + int initializeCursor (AqlItemBlock* items, size_t pos) override; - AqlItemBlock* getSome (size_t atLeast, size_t atMost); + AqlItemBlock* getSome (size_t atLeast, size_t atMost) override; //////////////////////////////////////////////////////////////////////////////// // skip between atLeast and atMost, returns the number actually skipped . . . @@ -545,7 +545,7 @@ namespace triagens { // things to skip overall. //////////////////////////////////////////////////////////////////////////////// - virtual size_t skipSome (size_t atLeast, size_t atMost); + size_t skipSome (size_t atLeast, size_t atMost) override final; // ----------------------------------------------------------------------------- // --SECTION-- private methods @@ -650,15 +650,15 @@ namespace triagens { ~EnumerateListBlock (); - int initialize (); + int initialize () override; //////////////////////////////////////////////////////////////////////////////// /// @brief initializeCursor, here we release our docs from this collection //////////////////////////////////////////////////////////////////////////////// - int initializeCursor (AqlItemBlock* items, size_t pos); + int initializeCursor (AqlItemBlock* items, size_t pos) override; - AqlItemBlock* getSome (size_t atLeast, size_t atMost); + AqlItemBlock* getSome (size_t atLeast, size_t atMost) override; //////////////////////////////////////////////////////////////////////////////// // skip between atLeast and atMost returns the number actually skipped . . . @@ -666,7 +666,7 @@ namespace triagens { // things to skip overall. //////////////////////////////////////////////////////////////////////////////// - size_t skipSome (size_t atLeast, size_t atMost); + size_t skipSome (size_t atLeast, size_t atMost) override final; //////////////////////////////////////////////////////////////////////////////// /// @brief create an AqlValue from the inVariable using the current _index @@ -727,7 +727,7 @@ namespace triagens { ~CalculationBlock (); - int initialize (); + int initialize () override; //////////////////////////////////////////////////////////////////////////////// /// @brief doEvaluation, private helper to do the work @@ -743,8 +743,8 @@ namespace triagens { /// @brief getSome //////////////////////////////////////////////////////////////////////////////// - virtual AqlItemBlock* getSome (size_t atLeast, - size_t atMost); + AqlItemBlock* getSome (size_t atLeast, + size_t atMost) override; private: @@ -794,10 +794,10 @@ namespace triagens { ~SubqueryBlock (); - int initialize (); + int initialize () override; - virtual AqlItemBlock* getSome (size_t atLeast, - size_t atMost); + AqlItemBlock* getSome (size_t atLeast, + size_t atMost) override; //////////////////////////////////////////////////////////////////////////////// /// @brief getter for the pointer to the subquery @@ -835,7 +835,7 @@ namespace triagens { ~FilterBlock (); - int initialize (); + int initialize () override; //////////////////////////////////////////////////////////////////////////////// /// @brief internal function to actually decide @@ -860,13 +860,13 @@ namespace triagens { AqlItemBlock*& result, size_t& skipped); - bool hasMore (); + bool hasMore () override final; - int64_t count () const { + int64_t count () const override final { return -1; // refuse to work } - int64_t remaining () { + int64_t remaining () override final { return -1; // refuse to work } @@ -900,7 +900,7 @@ namespace triagens { ~AggregateBlock (); - int initialize (); + int initialize () override; private: @@ -960,7 +960,7 @@ namespace triagens { ~SortBlock (); - int initialize (); + int initialize () override; virtual int initializeCursor (AqlItemBlock* items, size_t pos); @@ -1032,7 +1032,7 @@ namespace triagens { ~LimitBlock () { } - int initialize (); + int initialize () override; int initializeCursor (AqlItemBlock* items, size_t pos); @@ -1096,8 +1096,8 @@ namespace triagens { /// @brief getSome //////////////////////////////////////////////////////////////////////////////// - virtual AqlItemBlock* getSome (size_t atLeast, - size_t atMost); + AqlItemBlock* getSome (size_t atLeast, + size_t atMost) override; }; @@ -1126,8 +1126,8 @@ namespace triagens { /// @brief getSome //////////////////////////////////////////////////////////////////////////////// - virtual AqlItemBlock* getSome (size_t atLeast, - size_t atMost); + AqlItemBlock* getSome (size_t atLeast, + size_t atMost) override final; // ----------------------------------------------------------------------------- // --SECTION-- protected methods @@ -1327,7 +1327,7 @@ namespace triagens { ~NoResultsBlock () { } - int initialize () { + int initialize () override { return ExecutionBlock::initialize(); } @@ -1337,15 +1337,15 @@ namespace triagens { int initializeCursor (AqlItemBlock* items, size_t pos); - bool hasMore () { + bool hasMore () override final { return false; } - int64_t count () const { + int64_t count () const override final { return 0; } - int64_t remaining () { + int64_t remaining () override final { return 0; } @@ -1384,7 +1384,7 @@ namespace triagens { /// @brief initialize //////////////////////////////////////////////////////////////////////////////// - int initialize (); + int initialize () override; //////////////////////////////////////////////////////////////////////////////// /// @brief shutdown: need our own method since our _buffer is different @@ -1403,33 +1403,33 @@ namespace triagens { /// dependency has count -1 //////////////////////////////////////////////////////////////////////////////// - int64_t count () const; + int64_t count () const override final; //////////////////////////////////////////////////////////////////////////////// /// @brief remaining: the sum of the remaining() of the dependencies or -1 (if /// any dependency has remaining -1 //////////////////////////////////////////////////////////////////////////////// - int64_t remaining (); + int64_t remaining () override final; //////////////////////////////////////////////////////////////////////////////// /// @brief hasMore: true if any position of _buffer hasMore and false /// otherwise. //////////////////////////////////////////////////////////////////////////////// - bool hasMore (); + bool hasMore () override final; //////////////////////////////////////////////////////////////////////////////// /// @brief getSome //////////////////////////////////////////////////////////////////////////////// - AqlItemBlock* getSome (size_t, size_t); + AqlItemBlock* getSome (size_t, size_t) override final; //////////////////////////////////////////////////////////////////////////////// /// @brief skipSome //////////////////////////////////////////////////////////////////////////////// - size_t skipSome (size_t, size_t); + size_t skipSome (size_t, size_t) override final; //////////////////////////////////////////////////////////////////////////////// /// @brief _gatherBlockPos: pairs (i, _pos in _buffer.at(i)), i.e. the same as @@ -1517,7 +1517,6 @@ namespace triagens { std::vector const& shardIds); virtual ~BlockWithClients () {} - // ----------------------------------------------------------------------------- // --SECTION-- BlockWithClients public methods @@ -1535,7 +1534,7 @@ namespace triagens { /// @brief getSome: shouldn't be used, use skipSomeForShard //////////////////////////////////////////////////////////////////////////////// - AqlItemBlock* getSome (size_t atLeast, size_t atMost) { + AqlItemBlock* getSome (size_t atLeast, size_t atMost) override final { TRI_ASSERT(false); THROW_ARANGO_EXCEPTION(TRI_ERROR_NOT_IMPLEMENTED); } @@ -1544,7 +1543,7 @@ namespace triagens { /// @brief skipSome: shouldn't be used, use skipSomeForShard //////////////////////////////////////////////////////////////////////////////// - size_t skipSome (size_t atLeast, size_t atMost) { + size_t skipSome (size_t atLeast, size_t atMost) override final { TRI_ASSERT(false); THROW_ARANGO_EXCEPTION(TRI_ERROR_NOT_IMPLEMENTED); } @@ -1553,7 +1552,7 @@ namespace triagens { /// @brief remaining //////////////////////////////////////////////////////////////////////////////// - int64_t remaining () { + int64_t remaining () override final { TRI_ASSERT(false); THROW_ARANGO_EXCEPTION(TRI_ERROR_NOT_IMPLEMENTED); } @@ -1562,16 +1561,7 @@ namespace triagens { /// @brief hasMore //////////////////////////////////////////////////////////////////////////////// - bool hasMore () { - TRI_ASSERT(false); - THROW_ARANGO_EXCEPTION(TRI_ERROR_NOT_IMPLEMENTED); - } - -//////////////////////////////////////////////////////////////////////////////// -/// @brief skip -//////////////////////////////////////////////////////////////////////////////// - - int64_t skip () { + bool hasMore () override final { TRI_ASSERT(false); THROW_ARANGO_EXCEPTION(TRI_ERROR_NOT_IMPLEMENTED); } @@ -1865,7 +1855,7 @@ namespace triagens { /// @brief initialize //////////////////////////////////////////////////////////////////////////////// - int initialize () final; + int initialize () override final; //////////////////////////////////////////////////////////////////////////////// /// @brief initializeCursor, could be called multiple times @@ -1884,31 +1874,31 @@ namespace triagens { //////////////////////////////////////////////////////////////////////////////// AqlItemBlock* getSome (size_t atLeast, - size_t atMost) final; + size_t atMost) override final; //////////////////////////////////////////////////////////////////////////////// /// @brief skipSome //////////////////////////////////////////////////////////////////////////////// - size_t skipSome (size_t atLeast, size_t atMost) final; + size_t skipSome (size_t atLeast, size_t atMost) override final; //////////////////////////////////////////////////////////////////////////////// /// @brief hasMore //////////////////////////////////////////////////////////////////////////////// - bool hasMore () final; + bool hasMore () override final; //////////////////////////////////////////////////////////////////////////////// /// @brief count //////////////////////////////////////////////////////////////////////////////// - int64_t count () const final; + int64_t count () const override final; //////////////////////////////////////////////////////////////////////////////// /// @brief remaining //////////////////////////////////////////////////////////////////////////////// - int64_t remaining () final; + int64_t remaining () override final; //////////////////////////////////////////////////////////////////////////////// /// @brief internal method to send a request diff --git a/arangod/Aql/ExecutionEngine.cpp b/arangod/Aql/ExecutionEngine.cpp index 7ad712d17e..b73b5b300b 100644 --- a/arangod/Aql/ExecutionEngine.cpp +++ b/arangod/Aql/ExecutionEngine.cpp @@ -333,39 +333,25 @@ struct CoordinatorInstanciator : public WalkerWorker { if ((*it).location == COORDINATOR) { // create a coordinator-based engine engine = buildEngineCoordinator((*it), queryIds); - TRI_ASSERT(engine != nullptr); + + auto plan = engine->getQuery()->plan(); + TRI_ASSERT(plan != nullptr); + TRI_ASSERT(plan->empty()); + if ((*it).id > 0) { - Query* otherQuery = query->clone(PART_DEPENDENT); - otherQuery->engine(engine); - - int res = otherQuery->trx()->begin(); - if (res != TRI_ERROR_NO_ERROR) { - THROW_ARANGO_EXCEPTION_MESSAGE(res, "could not begin transaction"); - } - - auto* newPlan = new ExecutionPlan(otherQuery->ast()); - otherQuery->setPlan(newPlan); - - // clone all variables - for (auto it2 : query->ast()->variables()->variables(true)) { - auto var = query->ast()->variables()->getVariable(it2.first); - TRI_ASSERT(var != nullptr); - otherQuery->ast()->variables()->createVariable(var); - } - ExecutionNode const* current = (*it).nodes.front(); ExecutionNode* previous = nullptr; // TODO: fix instanciation here as in DBserver case while (current != nullptr) { - auto clone = current->clone(newPlan, false, true); - newPlan->registerNode(clone); + auto clone = current->clone(plan, false, true); + plan->registerNode(clone); if (previous == nullptr) { // set the root node - newPlan->root(clone); + plan->root(clone); } else { previous->addDependency(clone); @@ -380,16 +366,13 @@ struct CoordinatorInstanciator : public WalkerWorker { current = deps[0]; } - // TODO: test if this is necessary or does harm - // newPlan->setVarUsageComputed(); - // we need to instanciate this engine in the registry // create a remote id for the engine that we can pass to // the plans to be created for the DBServers id = TRI_NewTickServer(); - queryRegistry->insert(otherQuery->vocbase(), id, otherQuery, 3600.0); + queryRegistry->insert(id, engine->getQuery(), 3600.0); } } else { @@ -576,7 +559,11 @@ struct CoordinatorInstanciator : public WalkerWorker { ExecutionEngine* buildEngineCoordinator (EngineInfo& info, std::unordered_map const& queryIds) { - std::unique_ptr engine(new ExecutionEngine(query)); + // need a new query instance on the coordinator + auto clone = query->clone(PART_DEPENDENT, false); + + std::unique_ptr engine(new ExecutionEngine(clone)); + clone->engine(engine.get()); std::unordered_map cache; RemoteNode* remoteNode = nullptr; diff --git a/arangod/Aql/ExecutionPlan.h b/arangod/Aql/ExecutionPlan.h index e44da3883b..6120b55ecc 100644 --- a/arangod/Aql/ExecutionPlan.h +++ b/arangod/Aql/ExecutionPlan.h @@ -107,6 +107,14 @@ namespace triagens { TRI_memory_zone_t* zone, bool verbose) const; +//////////////////////////////////////////////////////////////////////////////// +/// @brief check if the plan is empty +//////////////////////////////////////////////////////////////////////////////// + + inline bool empty () const { + return (_root == nullptr); + } + //////////////////////////////////////////////////////////////////////////////// /// @brief note that an optimizer rule was applied //////////////////////////////////////////////////////////////////////////////// diff --git a/arangod/Aql/Query.cpp b/arangod/Aql/Query.cpp index 3e0c0bc233..2e757061b2 100644 --- a/arangod/Aql/Query.cpp +++ b/arangod/Aql/Query.cpp @@ -258,9 +258,12 @@ Query::~Query () { //////////////////////////////////////////////////////////////////////////////// /// @brief clone a query +/// note: as a side-effect, this will also create and start a transaction for +/// the query //////////////////////////////////////////////////////////////////////////////// -Query* Query::clone (QueryPart part) { +Query* Query::clone (QueryPart part, + bool withPlan) { TRI_json_t* options = nullptr; if (_options != nullptr) { @@ -287,7 +290,10 @@ Query* Query::clone (QueryPart part) { } if (_plan != nullptr) { - clone->_plan = _plan->clone(*clone); + if (withPlan) { + // clone the existing plan + clone->setPlan(_plan->clone(*clone)); + } // clone all variables for (auto it : _ast->variables()->variables(true)) { @@ -296,11 +302,23 @@ Query* Query::clone (QueryPart part) { clone->ast()->variables()->createVariable(var); } } + + if (clone->_plan == nullptr) { + // initialize an empty plan + clone->setPlan(new ExecutionPlan(ast())); + } TRI_ASSERT(clone->_trx == nullptr); clone->_trx = _trx->clone(); // A daughter transaction which does not // actually lock the collections + + int res = clone->_trx->begin(); + + if (res != TRI_ERROR_NO_ERROR) { + THROW_ARANGO_EXCEPTION_MESSAGE(res, "could not begin transaction"); + } + return clone.release(); } diff --git a/arangod/Aql/Query.h b/arangod/Aql/Query.h index 66f33111cb..ccd6a620e0 100644 --- a/arangod/Aql/Query.h +++ b/arangod/Aql/Query.h @@ -146,7 +146,13 @@ namespace triagens { ~Query (); - Query* clone (QueryPart); +//////////////////////////////////////////////////////////////////////////////// +/// @brief clone a query +/// note: as a side-effect, this will also create and start a transaction for +/// the query +//////////////////////////////////////////////////////////////////////////////// + + Query* clone (QueryPart, bool); // ----------------------------------------------------------------------------- // --SECTION-- public methods @@ -349,16 +355,11 @@ namespace triagens { } //////////////////////////////////////////////////////////////////////////////// -/// @brief set the transaction for the query +/// @brief get the plan for the query //////////////////////////////////////////////////////////////////////////////// - void setTrx (triagens::arango::AqlTransaction* trx) { - TRI_ASSERT(_trx == nullptr); - _trx = trx; - } - - triagens::arango::AqlTransaction* getTrx () { - return _trx; + ExecutionPlan* plan () const { + return _plan; } //////////////////////////////////////////////////////////////////////////////// diff --git a/arangod/Aql/QueryRegistry.cpp b/arangod/Aql/QueryRegistry.cpp index 83e6b3b517..dcc00ee900 100644 --- a/arangod/Aql/QueryRegistry.cpp +++ b/arangod/Aql/QueryRegistry.cpp @@ -70,12 +70,13 @@ QueryRegistry::~QueryRegistry () { /// @brief insert //////////////////////////////////////////////////////////////////////////////// -void QueryRegistry::insert (TRI_vocbase_t* vocbase, - QueryId id, +void QueryRegistry::insert (QueryId id, Query* query, double ttl) { + TRI_ASSERT(query != nullptr); TRI_ASSERT(query->trx() != nullptr); + auto vocbase = query->vocbase(); WRITE_LOCKER(_lock); diff --git a/arangod/Aql/QueryRegistry.h b/arangod/Aql/QueryRegistry.h index 990abfffa1..2dc68cf815 100644 --- a/arangod/Aql/QueryRegistry.h +++ b/arangod/Aql/QueryRegistry.h @@ -65,8 +65,7 @@ namespace triagens { /// query will be deleted if it is not opened for that amount of time. //////////////////////////////////////////////////////////////////////////////// - void insert (TRI_vocbase_t* vocbase, - QueryId id, + void insert (QueryId id, Query* query, double ttl = 3600.0); diff --git a/arangod/Aql/RestAqlHandler.cpp b/arangod/Aql/RestAqlHandler.cpp index 44e7474683..170a52c474 100644 --- a/arangod/Aql/RestAqlHandler.cpp +++ b/arangod/Aql/RestAqlHandler.cpp @@ -151,7 +151,7 @@ void RestAqlHandler::createQueryFromJson () { _qId = TRI_NewTickServer(); try { - _queryRegistry->insert(_vocbase, _qId, query, ttl); + _queryRegistry->insert(_qId, query, ttl); } catch (...) { LOG_ERROR("could not keep query in registry"); @@ -336,7 +336,7 @@ void RestAqlHandler::createQueryFromString () { _qId = TRI_NewTickServer(); try { - _queryRegistry->insert(_vocbase, _qId, query, ttl); + _queryRegistry->insert(_qId, query, ttl); } catch (...) { LOG_ERROR("could not keep query in registry"); From 55f3c9829cbb799fce1417803d5ff0f0b33e4361 Mon Sep 17 00:00:00 2001 From: Alan Plum Date: Wed, 22 Oct 2014 17:28:48 +0200 Subject: [PATCH 03/11] Fixed PBKDF2 usage. --- lib/V8/v8-utils.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/V8/v8-utils.cpp b/lib/V8/v8-utils.cpp index 48fcfd5382..b64636a437 100644 --- a/lib/V8/v8-utils.cpp +++ b/lib/V8/v8-utils.cpp @@ -2838,7 +2838,7 @@ static v8::Handle JS_PBKDF2 (v8::Arguments const& argv) { // extract arguments if (argv.Length() < 4 || ! argv[0]->IsString() || ! argv[1]->IsString() || ! argv[2]->IsNumber() || ! argv[3]->IsNumber()) { - TRI_V8_EXCEPTION_USAGE(scope, "PBKDF2(, , , , )"); + TRI_V8_EXCEPTION_USAGE(scope, "PBKDF2(, , , )"); } string salt = TRI_ObjectToString(argv[0]); From adf6391968c4ad98dd1d4f4a2af9bbcc366a0ea7 Mon Sep 17 00:00:00 2001 From: Willi Goesgens Date: Wed, 22 Oct 2014 17:34:39 +0200 Subject: [PATCH 04/11] Make it const, baby. --- lib/Basics/JsonHelper.h | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/lib/Basics/JsonHelper.h b/lib/Basics/JsonHelper.h index 1f55176c3d..11fad8b7e3 100644 --- a/lib/Basics/JsonHelper.h +++ b/lib/Basics/JsonHelper.h @@ -870,7 +870,7 @@ namespace triagens { /// @brief checks whether *this is a Json that is equal to null. //////////////////////////////////////////////////////////////////////////////// - bool isNull () throw() { + bool isNull () const throw() { return _json != nullptr && _json->_type == TRI_JSON_NULL; } @@ -878,7 +878,7 @@ namespace triagens { /// @brief checks whether *this is a boolean Json. //////////////////////////////////////////////////////////////////////////////// - bool isBoolean () throw() { + bool isBoolean () const throw() { return TRI_IsBooleanJson(_json); } @@ -886,7 +886,7 @@ namespace triagens { /// @brief checks whether *this is a number Json. //////////////////////////////////////////////////////////////////////////////// - bool isNumber () throw() { + bool isNumber () const throw() { return TRI_IsNumberJson(_json); } @@ -894,7 +894,7 @@ namespace triagens { /// @brief checks whether *this is a string Json. //////////////////////////////////////////////////////////////////////////////// - bool isString () throw() { + bool isString () const throw() { return TRI_IsStringJson(_json); } @@ -902,7 +902,7 @@ namespace triagens { /// @brief checks whether *this is an array Json. //////////////////////////////////////////////////////////////////////////////// - bool isArray () throw() { + bool isArray () const throw() { return TRI_IsArrayJson(_json); } @@ -910,7 +910,7 @@ namespace triagens { /// @brief checks whether *this is a list Json. //////////////////////////////////////////////////////////////////////////////// - bool isList () throw() { + bool isList () const throw() { return TRI_IsListJson(_json); } From 9558f5aa0ad150f8745a3f225057b55d9e9c8dd8 Mon Sep 17 00:00:00 2001 From: Willi Goesgens Date: Wed, 22 Oct 2014 17:36:23 +0200 Subject: [PATCH 05/11] Implement passing of query statistics through cluster. --- arangod/Aql/ExecutionBlock.cpp | 3 +++ arangod/Aql/ExecutionStats.cpp | 21 ++++++++++++++++++++- arangod/Aql/ExecutionStats.h | 23 +++++++++++++++++++---- arangod/Aql/Query.cpp | 8 ++++++++ arangod/Aql/Query.h | 6 ++++++ arangod/Aql/RestAqlHandler.cpp | 1 + 6 files changed, 57 insertions(+), 5 deletions(-) diff --git a/arangod/Aql/ExecutionBlock.cpp b/arangod/Aql/ExecutionBlock.cpp index c0c00da4f4..3cb05a5767 100644 --- a/arangod/Aql/ExecutionBlock.cpp +++ b/arangod/Aql/ExecutionBlock.cpp @@ -4527,6 +4527,9 @@ AqlItemBlock* RemoteBlock::getSome (size_t atLeast, } auto items = new triagens::aql::AqlItemBlock(responseBodyJson); + + _engine->_stats.add(ExecutionStats(responseBodyJson.get("stats"))); + return items; } diff --git a/arangod/Aql/ExecutionStats.cpp b/arangod/Aql/ExecutionStats.cpp index 4b77af33be..7bdd8e441c 100644 --- a/arangod/Aql/ExecutionStats.cpp +++ b/arangod/Aql/ExecutionStats.cpp @@ -28,9 +28,10 @@ //////////////////////////////////////////////////////////////////////////////// #include "Aql/ExecutionStats.h" - +#include "Utils/Exception.h" using namespace triagens::aql; using Json = triagens::basics::Json; +using JsonHelper = triagens::basics::JsonHelper; // ----------------------------------------------------------------------------- // --SECTION-- public methods @@ -50,6 +51,24 @@ Json ExecutionStats::toJson () const { return json; } +ExecutionStats::ExecutionStats() + :writesExecuted(0), + writesIgnored(0), + scannedFull(0), + scannedIndex(0) { +} + +ExecutionStats::ExecutionStats (triagens::basics::Json const& jsonStats) { + if (!jsonStats.isArray()) { + THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "stats is not an Array"); + } + std::cout << jsonStats.toString() << "\n"; + writesExecuted = JsonHelper::checkAndGetNumericValue(jsonStats.json(), "writesExecuted"); + writesIgnored = JsonHelper::checkAndGetNumericValue(jsonStats.json(), "writesIgnored"); + scannedFull = JsonHelper::checkAndGetNumericValue(jsonStats.json(), "scannedFull"); + scannedIndex = JsonHelper::checkAndGetNumericValue(jsonStats.json(), "scannedIndex"); +} + // ----------------------------------------------------------------------------- // --SECTION-- END-OF-FILE // ----------------------------------------------------------------------------- diff --git a/arangod/Aql/ExecutionStats.h b/arangod/Aql/ExecutionStats.h index 36a86c7c1b..3b73c39504 100644 --- a/arangod/Aql/ExecutionStats.h +++ b/arangod/Aql/ExecutionStats.h @@ -38,35 +38,50 @@ namespace triagens { struct ExecutionStats { + ExecutionStats (); + +//////////////////////////////////////////////////////////////////////////////// +/// @brief instanciate the statistics from JSON +//////////////////////////////////////////////////////////////////////////////// + + ExecutionStats (triagens::basics::Json const& jsonStats); + //////////////////////////////////////////////////////////////////////////////// /// @brief convert the statistics to JSON //////////////////////////////////////////////////////////////////////////////// triagens::basics::Json toJson () const; + void add (ExecutionStats const& summand) { + writesExecuted += summand.writesExecuted; + writesIgnored += summand.writesIgnored; + scannedFull += summand.scannedFull; + scannedIndex += summand.scannedIndex; + } + //////////////////////////////////////////////////////////////////////////////// /// @brief number of successfully executed write operations //////////////////////////////////////////////////////////////////////////////// - int64_t writesExecuted = 0; + int64_t writesExecuted; //////////////////////////////////////////////////////////////////////////////// /// @brief number of ignored write operations (ignored due to errors) //////////////////////////////////////////////////////////////////////////////// - int64_t writesIgnored = 0; + int64_t writesIgnored; //////////////////////////////////////////////////////////////////////////////// /// @brief number of documents scanned (full-collection scan) //////////////////////////////////////////////////////////////////////////////// - int64_t scannedFull = 0; + int64_t scannedFull; //////////////////////////////////////////////////////////////////////////////// /// @brief number of documents scanned (using indexes scan) //////////////////////////////////////////////////////////////////////////////// - int64_t scannedIndex = 0; + int64_t scannedIndex; }; diff --git a/arangod/Aql/Query.cpp b/arangod/Aql/Query.cpp index 3e0c0bc233..082f8050f5 100644 --- a/arangod/Aql/Query.cpp +++ b/arangod/Aql/Query.cpp @@ -840,6 +840,14 @@ void Query::exitContext () { } } +//////////////////////////////////////////////////////////////////////////////// +/// @brief returns statistics for current query. +//////////////////////////////////////////////////////////////////////////////// + +triagens::basics::Json Query::getStats() { + return _engine->_stats.toJson(); +} + //////////////////////////////////////////////////////////////////////////////// /// @brief fetch a boolean value from the options //////////////////////////////////////////////////////////////////////////////// diff --git a/arangod/Aql/Query.h b/arangod/Aql/Query.h index 66f33111cb..2f9cb82b2a 100644 --- a/arangod/Aql/Query.h +++ b/arangod/Aql/Query.h @@ -379,6 +379,12 @@ namespace triagens { void exitContext (); +//////////////////////////////////////////////////////////////////////////////// +/// @brief returns statistics for current query. +//////////////////////////////////////////////////////////////////////////////// + + triagens::basics::Json getStats(); + // ----------------------------------------------------------------------------- // --SECTION-- private methods // ----------------------------------------------------------------------------- diff --git a/arangod/Aql/RestAqlHandler.cpp b/arangod/Aql/RestAqlHandler.cpp index 44e7474683..630e613629 100644 --- a/arangod/Aql/RestAqlHandler.cpp +++ b/arangod/Aql/RestAqlHandler.cpp @@ -747,6 +747,7 @@ void RestAqlHandler::handleUseQuery (std::string const& operation, else { try { answerBody = items->toJson(query->trx()); + answerBody.set("stats", query->getStats()); // std::cout << "ANSWERBODY: " << JsonHelper::toString(answerBody.json()) << "\n\n"; } catch (...) { From a8b8fbcf283f4cf70a951b2906a84b63db926c7f Mon Sep 17 00:00:00 2001 From: Willi Goesgens Date: Wed, 22 Oct 2014 18:24:42 +0200 Subject: [PATCH 06/11] Properly handle statistics if multiple getSome() calls are performed. --- arangod/Aql/ExecutionBlock.cpp | 5 ++++- arangod/Aql/ExecutionBlock.h | 8 ++++++++ arangod/Aql/ExecutionStats.cpp | 2 +- arangod/Aql/ExecutionStats.h | 16 ++++++++++++++++ 4 files changed, 29 insertions(+), 2 deletions(-) diff --git a/arangod/Aql/ExecutionBlock.cpp b/arangod/Aql/ExecutionBlock.cpp index 3cb05a5767..4ea7d169d7 100644 --- a/arangod/Aql/ExecutionBlock.cpp +++ b/arangod/Aql/ExecutionBlock.cpp @@ -4528,7 +4528,10 @@ AqlItemBlock* RemoteBlock::getSome (size_t atLeast, auto items = new triagens::aql::AqlItemBlock(responseBodyJson); - _engine->_stats.add(ExecutionStats(responseBodyJson.get("stats"))); + ExecutionStats newStats(responseBodyJson.get("stats")); + + _engine->_stats.addDelta(_deltaStats, newStats); + _deltaStats = newStats; return items; } diff --git a/arangod/Aql/ExecutionBlock.h b/arangod/Aql/ExecutionBlock.h index fc73c34454..99a7306c5f 100644 --- a/arangod/Aql/ExecutionBlock.h +++ b/arangod/Aql/ExecutionBlock.h @@ -36,6 +36,7 @@ #include "Aql/ExecutionNode.h" #include "Aql/Range.h" #include "Aql/WalkerWorker.h" +#include "Aql/ExecutionStats.h" #include "Utils/AqlTransaction.h" #include "Utils/transactions.h" #include "Utils/V8TransactionContext.h" @@ -1940,6 +1941,13 @@ namespace triagens { std::string _queryId; +//////////////////////////////////////////////////////////////////////////////// +/// @brief the ID of the query on the server as a string +//////////////////////////////////////////////////////////////////////////////// + + ExecutionStats _deltaStats; + + }; } // namespace triagens::aql diff --git a/arangod/Aql/ExecutionStats.cpp b/arangod/Aql/ExecutionStats.cpp index 7bdd8e441c..0dadd9f702 100644 --- a/arangod/Aql/ExecutionStats.cpp +++ b/arangod/Aql/ExecutionStats.cpp @@ -62,7 +62,7 @@ ExecutionStats::ExecutionStats (triagens::basics::Json const& jsonStats) { if (!jsonStats.isArray()) { THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "stats is not an Array"); } - std::cout << jsonStats.toString() << "\n"; + writesExecuted = JsonHelper::checkAndGetNumericValue(jsonStats.json(), "writesExecuted"); writesIgnored = JsonHelper::checkAndGetNumericValue(jsonStats.json(), "writesIgnored"); scannedFull = JsonHelper::checkAndGetNumericValue(jsonStats.json(), "scannedFull"); diff --git a/arangod/Aql/ExecutionStats.h b/arangod/Aql/ExecutionStats.h index 3b73c39504..733deb177f 100644 --- a/arangod/Aql/ExecutionStats.h +++ b/arangod/Aql/ExecutionStats.h @@ -52,6 +52,10 @@ namespace triagens { triagens::basics::Json toJson () const; +//////////////////////////////////////////////////////////////////////////////// +/// @brief sumarize two sets of ExecutionStats +//////////////////////////////////////////////////////////////////////////////// + void add (ExecutionStats const& summand) { writesExecuted += summand.writesExecuted; writesIgnored += summand.writesIgnored; @@ -59,6 +63,18 @@ namespace triagens { scannedIndex += summand.scannedIndex; } +//////////////////////////////////////////////////////////////////////////////// +/// @brief sumarize the delta of two other sets of ExecutionStats to us +//////////////////////////////////////////////////////////////////////////////// + + void addDelta (ExecutionStats const& lastStats, ExecutionStats const& newStats) { + writesExecuted += newStats.writesExecuted - lastStats.writesExecuted; + writesIgnored += newStats.writesIgnored - lastStats.writesIgnored; + scannedFull += newStats.scannedFull - lastStats.scannedFull; + scannedIndex += newStats.scannedIndex - lastStats.scannedIndex; + } + + //////////////////////////////////////////////////////////////////////////////// /// @brief number of successfully executed write operations //////////////////////////////////////////////////////////////////////////////// From 54a0f182e2aab9905ed4fb4de3d324171d502ea5 Mon Sep 17 00:00:00 2001 From: Jan Steemann Date: Wed, 22 Oct 2014 19:01:56 +0200 Subject: [PATCH 07/11] call shutdown using an errorCode --- arangod/Aql/ExecutionBlock.cpp | 42 ++++++++++++++++++++-------------- arangod/Aql/ExecutionBlock.h | 12 +++++----- arangod/Aql/ExecutionEngine.h | 17 +++++++++++--- arangod/Aql/Query.cpp | 30 ++++++++++++++---------- arangod/Aql/Query.h | 2 +- arangod/Aql/QueryRegistry.cpp | 21 ++++++++++++----- arangod/Aql/QueryRegistry.h | 4 ++-- arangod/Aql/RestAqlHandler.cpp | 18 +++++++-------- 8 files changed, 89 insertions(+), 57 deletions(-) diff --git a/arangod/Aql/ExecutionBlock.cpp b/arangod/Aql/ExecutionBlock.cpp index d24041cc3d..c1aefd5d67 100644 --- a/arangod/Aql/ExecutionBlock.cpp +++ b/arangod/Aql/ExecutionBlock.cpp @@ -234,9 +234,8 @@ int ExecutionBlock::initialize () { /// @brief shutdown, will be called exactly once for the whole query //////////////////////////////////////////////////////////////////////////////// -int ExecutionBlock::shutdown () { +int ExecutionBlock::shutdown (int errorCode) { int ret = TRI_ERROR_NO_ERROR; - int res; for (auto it = _buffer.begin(); it != _buffer.end(); ++it) { delete *it; @@ -244,8 +243,9 @@ int ExecutionBlock::shutdown () { _buffer.clear(); for (auto it = _dependencies.begin(); it != _dependencies.end(); ++it) { + int res; try { - res = (*it)->shutdown(); + res = (*it)->shutdown(errorCode); } catch (...) { res = TRI_ERROR_INTERNAL; @@ -558,12 +558,18 @@ int SingletonBlock::initializeCursor (AqlItemBlock* items, size_t pos) { return TRI_ERROR_NO_ERROR; } -int SingletonBlock::shutdown () { - int res = ExecutionBlock::shutdown(); +//////////////////////////////////////////////////////////////////////////////// +/// @brief shutdown the singleton block +//////////////////////////////////////////////////////////////////////////////// + +int SingletonBlock::shutdown (int errorCode) { + int res = ExecutionBlock::shutdown(errorCode); + if (_inputRegisterValues != nullptr) { delete _inputRegisterValues; _inputRegisterValues = nullptr; } + return res; } @@ -3336,11 +3342,11 @@ int GatherBlock::initialize () { /// @brief shutdown: need our own method since our _buffer is different //////////////////////////////////////////////////////////////////////////////// -int GatherBlock::shutdown () { +int GatherBlock::shutdown (int errorCode) { // don't call default shutdown method since it does the wrong thing to // _gatherBlockBuffer for (auto it = _dependencies.begin(); it != _dependencies.end(); ++it) { - int res = (*it)->shutdown(); + int res = (*it)->shutdown(errorCode); if (res != TRI_ERROR_NO_ERROR) { return res; @@ -3726,10 +3732,11 @@ bool GatherBlock::OurLessThan::operator() (std::pair const& a, BlockWithClients::BlockWithClients (ExecutionEngine* engine, ExecutionNode const* ep, - std::vector const& shardIds) : - ExecutionBlock(engine, ep), - _nrClients(shardIds.size()), - _initOrShutdown(true) { + std::vector const& shardIds) + : ExecutionBlock(engine, ep), + _nrClients(shardIds.size()), + _initOrShutdown(true) { + _shardIdMap.reserve(_nrClients); for (size_t i = 0; i < _nrClients; i++) { _shardIdMap.emplace(std::make_pair(shardIds[i], i)); @@ -3740,12 +3747,13 @@ BlockWithClients::BlockWithClients (ExecutionEngine* engine, /// @brief shutdown //////////////////////////////////////////////////////////////////////////////// -int BlockWithClients::shutdown () { - if (!_initOrShutdown) { +int BlockWithClients::shutdown (int errorCode) { + if (! _initOrShutdown) { return TRI_ERROR_NO_ERROR; } + _initOrShutdown = false; - return ExecutionBlock::shutdown(); + return ExecutionBlock::shutdown(errorCode); } //////////////////////////////////////////////////////////////////////////////// @@ -4395,7 +4403,7 @@ RemoteBlock::~RemoteBlock () { ClusterCommResult* RemoteBlock::sendRequest ( triagens::rest::HttpRequest::HttpRequestType type, - std::string urlPart, + std::string const& urlPart, std::string const& body) const { ClusterComm* cc = ClusterComm::instance(); @@ -4475,13 +4483,13 @@ int RemoteBlock::initializeCursor (AqlItemBlock* items, size_t pos) { /// @brief shutdown, will be called exactly once for the whole query //////////////////////////////////////////////////////////////////////////////// -int RemoteBlock::shutdown () { +int RemoteBlock::shutdown (int errorCode) { // For every call we simply forward via HTTP std::unique_ptr res; res.reset(sendRequest(rest::HttpRequest::HTTP_REQUEST_PUT, "/_api/aql/shutdown/", - string())); + string("{\"code\":\"" + std::to_string(errorCode) + "\"}"))); if (throwExceptionAfterBadSyncRequest(res.get(), true)) { // artificially ignore error in case query was not found during shutdown return TRI_ERROR_NO_ERROR; diff --git a/arangod/Aql/ExecutionBlock.h b/arangod/Aql/ExecutionBlock.h index f9d781e7ba..c4e6cc66ca 100644 --- a/arangod/Aql/ExecutionBlock.h +++ b/arangod/Aql/ExecutionBlock.h @@ -185,7 +185,7 @@ namespace triagens { /// @brief shutdown, will be called exactly once for the whole query //////////////////////////////////////////////////////////////////////////////// - virtual int shutdown (); + virtual int shutdown (int); //////////////////////////////////////////////////////////////////////////////// /// @brief getOne, gets one more item @@ -388,7 +388,7 @@ namespace triagens { int initializeCursor (AqlItemBlock* items, size_t pos) override; - int shutdown (); + int shutdown (int) override final; bool hasMore () override final { return ! _done; @@ -1390,7 +1390,7 @@ namespace triagens { /// @brief shutdown: need our own method since our _buffer is different //////////////////////////////////////////////////////////////////////////////// - int shutdown (); + int shutdown (int) override final; //////////////////////////////////////////////////////////////////////////////// /// @brief initializeCursor @@ -1528,7 +1528,7 @@ namespace triagens { /// @brief shutdown //////////////////////////////////////////////////////////////////////////////// - int shutdown (); + int shutdown (int) override; //////////////////////////////////////////////////////////////////////////////// /// @brief getSome: shouldn't be used, use skipSomeForShard @@ -1867,7 +1867,7 @@ namespace triagens { /// @brief shutdown, will be called exactly once for the whole query //////////////////////////////////////////////////////////////////////////////// - int shutdown () final; + int shutdown (int) override final; //////////////////////////////////////////////////////////////////////////////// /// @brief getSome @@ -1908,7 +1908,7 @@ namespace triagens { triagens::arango::ClusterCommResult* sendRequest ( rest::HttpRequest::HttpRequestType type, - std::string urlPart, + std::string const& urlPart, std::string const& body) const; //////////////////////////////////////////////////////////////////////////////// diff --git a/arangod/Aql/ExecutionEngine.h b/arangod/Aql/ExecutionEngine.h index 35386d7ab7..46d0d6aed3 100644 --- a/arangod/Aql/ExecutionEngine.h +++ b/arangod/Aql/ExecutionEngine.h @@ -131,10 +131,15 @@ namespace triagens { /// @brief shutdown, will be called exactly once for the whole query //////////////////////////////////////////////////////////////////////////////// - int shutdown () { - if (_root != nullptr) { - return _root->shutdown(); + int shutdown (int errorCode) { + if (_root != nullptr && ! _wasShutdown) { + // prevent a duplicate shutdown + int res = _root->shutdown(errorCode); + _wasShutdown = true; + + return res; } + return TRI_ERROR_NO_ERROR; } @@ -236,6 +241,12 @@ namespace triagens { //////////////////////////////////////////////////////////////////////////////// Query* _query; + +//////////////////////////////////////////////////////////////////////////////// +/// @brief whether or not shutdown() was executed +//////////////////////////////////////////////////////////////////////////////// + + bool _wasShutdown; }; } diff --git a/arangod/Aql/Query.cpp b/arangod/Aql/Query.cpp index 2e757061b2..b4edb9b5c5 100644 --- a/arangod/Aql/Query.cpp +++ b/arangod/Aql/Query.cpp @@ -210,7 +210,7 @@ Query::Query (triagens::arango::ApplicationV8* applicationV8, //////////////////////////////////////////////////////////////////////////////// Query::~Query () { - cleanupPlanAndEngine(); + cleanupPlanAndEngine(TRI_ERROR_INTERNAL); // abort the transaction if (_profile != nullptr) { delete _profile; @@ -524,19 +524,19 @@ QueryResult Query::prepare (QueryRegistry* registry) { return QueryResult(); } catch (triagens::arango::Exception const& ex) { - cleanupPlanAndEngine(); + cleanupPlanAndEngine(ex.code()); return QueryResult(ex.code(), getStateString() + ex.message()); } catch (std::bad_alloc const&) { - cleanupPlanAndEngine(); + cleanupPlanAndEngine(TRI_ERROR_OUT_OF_MEMORY); return QueryResult(TRI_ERROR_OUT_OF_MEMORY, getStateString() + TRI_errno_string(TRI_ERROR_OUT_OF_MEMORY)); } catch (std::exception const& ex) { - cleanupPlanAndEngine(); + cleanupPlanAndEngine(TRI_ERROR_INTERNAL); return QueryResult(TRI_ERROR_INTERNAL, getStateString() + ex.what()); } catch (...) { - cleanupPlanAndEngine(); + cleanupPlanAndEngine(TRI_ERROR_INTERNAL); return QueryResult(TRI_ERROR_INTERNAL, getStateString() + TRI_errno_string(TRI_ERROR_INTERNAL)); } } @@ -578,7 +578,7 @@ QueryResult Query::execute (QueryRegistry* registry) { _trx->commit(); - cleanupPlanAndEngine(); + cleanupPlanAndEngine(TRI_ERROR_NO_ERROR); enterState(FINALIZATION); @@ -593,19 +593,19 @@ QueryResult Query::execute (QueryRegistry* registry) { return result; } catch (triagens::arango::Exception const& ex) { - cleanupPlanAndEngine(); + cleanupPlanAndEngine(ex.code()); return QueryResult(ex.code(), getStateString() + ex.message()); } catch (std::bad_alloc const&) { - cleanupPlanAndEngine(); + cleanupPlanAndEngine(TRI_ERROR_OUT_OF_MEMORY); return QueryResult(TRI_ERROR_OUT_OF_MEMORY, getStateString() + TRI_errno_string(TRI_ERROR_OUT_OF_MEMORY)); } catch (std::exception const& ex) { - cleanupPlanAndEngine(); + cleanupPlanAndEngine(TRI_ERROR_INTERNAL); return QueryResult(TRI_ERROR_INTERNAL, getStateString() + ex.what()); } catch (...) { - cleanupPlanAndEngine(); + cleanupPlanAndEngine(TRI_ERROR_INTERNAL); return QueryResult(TRI_ERROR_INTERNAL, getStateString() + TRI_errno_string(TRI_ERROR_INTERNAL)); } } @@ -972,9 +972,15 @@ std::string Query::getStateString () const { /// @brief cleanup plan and engine for current query //////////////////////////////////////////////////////////////////////////////// -void Query::cleanupPlanAndEngine () { +void Query::cleanupPlanAndEngine (int errorCode) { if (_engine != nullptr) { - _engine->shutdown(); + try { + _engine->shutdown(errorCode); + } + catch (...) { + // shutdown may fail but we must not throw here + // (we're also called from the destructor) + } delete _engine; _engine = nullptr; } diff --git a/arangod/Aql/Query.h b/arangod/Aql/Query.h index ccd6a620e0..e3b56fa28a 100644 --- a/arangod/Aql/Query.h +++ b/arangod/Aql/Query.h @@ -426,7 +426,7 @@ namespace triagens { /// @brief cleanup plan and engine for current query //////////////////////////////////////////////////////////////////////////////// - void cleanupPlanAndEngine (); + void cleanupPlanAndEngine (int); // ----------------------------------------------------------------------------- // --SECTION-- private variables diff --git a/arangod/Aql/QueryRegistry.cpp b/arangod/Aql/QueryRegistry.cpp index dcc00ee900..fcd3c176d9 100644 --- a/arangod/Aql/QueryRegistry.cpp +++ b/arangod/Aql/QueryRegistry.cpp @@ -59,7 +59,7 @@ QueryRegistry::~QueryRegistry () { } for (auto& p : toDelete) { try { // just in case - destroy(p.first, p.second); + destroy(p.first, p.second, TRI_ERROR_TRANSACTION_ABORTED); } catch (...) { } @@ -172,7 +172,9 @@ void QueryRegistry::close (TRI_vocbase_t* vocbase, QueryId id, double ttl) { /// @brief destroy //////////////////////////////////////////////////////////////////////////////// -void QueryRegistry::destroy (std::string const& vocbase, QueryId id) { +void QueryRegistry::destroy (std::string const& vocbase, + QueryId id, + int errorCode) { WRITE_LOCKER(_lock); auto m = _queries.find(vocbase); @@ -194,6 +196,11 @@ void QueryRegistry::destroy (std::string const& vocbase, QueryId id) { triagens::arango::TransactionBase::increaseNumbers(1, 1); } + if (errorCode == TRI_ERROR_NO_ERROR) { + // commit the operation + qi->_query->trx()->commit(); + } + // Now we can delete it: delete qi->_query; delete qi; @@ -206,8 +213,10 @@ void QueryRegistry::destroy (std::string const& vocbase, QueryId id) { /// @brief destroy //////////////////////////////////////////////////////////////////////////////// -void QueryRegistry::destroy (TRI_vocbase_t* vocbase, QueryId id) { - destroy(vocbase->_name, id); +void QueryRegistry::destroy (TRI_vocbase_t* vocbase, + QueryId id, + int errorCode) { + destroy(vocbase->_name, id, errorCode); } //////////////////////////////////////////////////////////////////////////////// @@ -233,8 +242,8 @@ void QueryRegistry::expireQueries () { } } for (auto& p : toDelete) { - try { // just in case - destroy(p.first, p.second); + try { // just in case + destroy(p.first, p.second, TRI_ERROR_TRANSACTION_ABORTED); } catch (...) { } diff --git a/arangod/Aql/QueryRegistry.h b/arangod/Aql/QueryRegistry.h index 2dc68cf815..a0084300a2 100644 --- a/arangod/Aql/QueryRegistry.h +++ b/arangod/Aql/QueryRegistry.h @@ -98,9 +98,9 @@ namespace triagens { /// from the same thread that has opened it! //////////////////////////////////////////////////////////////////////////////// - void destroy (std::string const& vocbase, QueryId id); + void destroy (std::string const& vocbase, QueryId id, int errorCode); - void destroy (TRI_vocbase_t* vocbase, QueryId id); + void destroy (TRI_vocbase_t* vocbase, QueryId id, int errorCode); //////////////////////////////////////////////////////////////////////////////// /// @brief expireQueries, this deletes all expired queries from the registry diff --git a/arangod/Aql/RestAqlHandler.cpp b/arangod/Aql/RestAqlHandler.cpp index 170a52c474..bf334c153b 100644 --- a/arangod/Aql/RestAqlHandler.cpp +++ b/arangod/Aql/RestAqlHandler.cpp @@ -416,14 +416,10 @@ void RestAqlHandler::useQuery (std::string const& operation, TRI_ASSERT(_qId > 0); TRI_ASSERT(query->engine() != nullptr); - Json queryJson; - if (operation != "shutdown") { - // /shutdown does not require a body - queryJson = Json(TRI_UNKNOWN_MEM_ZONE, parseJsonBody()); - if (queryJson.isEmpty()) { - _queryRegistry->close(_vocbase, _qId); - return; - } + Json queryJson = Json(TRI_UNKNOWN_MEM_ZONE, parseJsonBody()); + if (queryJson.isEmpty()) { + _queryRegistry->close(_vocbase, _qId); + return; } try { @@ -837,9 +833,11 @@ void RestAqlHandler::handleUseQuery (std::string const& operation, } else if (operation == "shutdown") { int res = TRI_ERROR_INTERNAL; + int errorCode = JsonHelper::getNumericValue(queryJson.json(), "code", TRI_ERROR_INTERNAL); + try { - res = query->engine()->shutdown(); - _queryRegistry->destroy(_vocbase, _qId); + res = query->engine()->shutdown(errorCode); // pass errorCode to shutdown + _queryRegistry->destroy(_vocbase, _qId, errorCode); } catch (...) { LOG_ERROR("shutdown lead to an exception"); From 36e0ec5e55a99fb61d4c78f982764176dd461c29 Mon Sep 17 00:00:00 2001 From: Jan Steemann Date: Wed, 22 Oct 2014 19:05:54 +0200 Subject: [PATCH 08/11] commented out code --- arangod/Aql/ExecutionEngine.cpp | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/arangod/Aql/ExecutionEngine.cpp b/arangod/Aql/ExecutionEngine.cpp index b73b5b300b..c02f198d86 100644 --- a/arangod/Aql/ExecutionEngine.cpp +++ b/arangod/Aql/ExecutionEngine.cpp @@ -161,7 +161,8 @@ ExecutionEngine::ExecutionEngine (Query* query) : _stats(), _blocks(), _root(nullptr), - _query(query) { + _query(query), + _wasShutdown(false) { _blocks.reserve(8); } @@ -171,8 +172,11 @@ ExecutionEngine::ExecutionEngine (Query* query) //////////////////////////////////////////////////////////////////////////////// ExecutionEngine::~ExecutionEngine () { - if (_root != nullptr) { - _root->shutdown(); + try { + shutdown(TRI_ERROR_INTERNAL); + } + catch (...) { + // shutdown can throw - ignore it in the destructor } for (auto it = _blocks.begin(); it != _blocks.end(); ++it) { @@ -602,19 +606,21 @@ struct CoordinatorInstanciator : public WalkerWorker { } } - if (nodeType == ExecutionNode::GATHER || - nodeType == ExecutionNode::DISTRIBUTE) { - // we found a gather or distribute node + if (nodeType == ExecutionNode::GATHER /* || + nodeType == ExecutionNode::DISTRIBUTE */) { + // we found a gather node TRI_ASSERT(remoteNode != nullptr); - // now we'll create a remote node for each shard and add it to the gather|distribute node + // now we'll create a remote node for each shard and add it to the gather node Collection const* collection = nullptr; if (nodeType == ExecutionNode::GATHER) { collection = static_cast((*en))->collection(); } + /* TODO: do we need to handle distribute nodes here, too?? else if (nodeType == ExecutionNode::DISTRIBUTE) { collection = static_cast((*en))->collection(); } + */ else { THROW_ARANGO_EXCEPTION(TRI_ERROR_INTERNAL); } From 612abc6a201aa3f9d94124e83f153d0ddea6b49c Mon Sep 17 00:00:00 2001 From: Jan Steemann Date: Thu, 23 Oct 2014 10:18:30 +0200 Subject: [PATCH 09/11] less throwing in destructors --- arangod/Aql/QueryRegistry.cpp | 11 ++++-- arangod/RestServer/ArangoServer.cpp | 2 +- arangod/RestServer/arango.cpp | 52 ++++++++++++++++------------- arangod/VocBase/server.cpp | 4 +-- lib/Rest/AnyServer.cpp | 4 +-- 5 files changed, 42 insertions(+), 31 deletions(-) diff --git a/arangod/Aql/QueryRegistry.cpp b/arangod/Aql/QueryRegistry.cpp index fcd3c176d9..6715b4cc65 100644 --- a/arangod/Aql/QueryRegistry.cpp +++ b/arangod/Aql/QueryRegistry.cpp @@ -45,8 +45,10 @@ using namespace triagens::aql; QueryRegistry::~QueryRegistry () { std::vector> toDelete; - { - WRITE_LOCKER(_lock); + + WRITE_LOCKER(_lock); + + try { for (auto& x : _queries) { // x.first is a TRI_vocbase_t* and // x.second is a std::unordered_map @@ -57,6 +59,11 @@ QueryRegistry::~QueryRegistry () { } } } + catch (...) { + // the emplace_back() above might fail + // prevent throwing exceptions in the destructor + } + for (auto& p : toDelete) { try { // just in case destroy(p.first, p.second, TRI_ERROR_TRANSACTION_ABORTED); diff --git a/arangod/RestServer/ArangoServer.cpp b/arangod/RestServer/ArangoServer.cpp index 2116ea242d..8830ff2fc9 100644 --- a/arangod/RestServer/ArangoServer.cpp +++ b/arangod/RestServer/ArangoServer.cpp @@ -1192,7 +1192,7 @@ void ArangoServer::closeDatabases () { // stop the replication appliers so all replication transactions can end TRI_StopReplicationAppliersServer(_server); - // enfore logfile manager shutdown so we are sure no one else will + // enforce logfile manager shutdown so we are sure no one else will // write to the logs wal::LogfileManager::instance()->stop(); diff --git a/arangod/RestServer/arango.cpp b/arangod/RestServer/arango.cpp index 8c20a9813f..af81f86f15 100644 --- a/arangod/RestServer/arango.cpp +++ b/arangod/RestServer/arango.cpp @@ -47,7 +47,7 @@ using namespace triagens::arango; /// @brief ArangoDB server //////////////////////////////////////////////////////////////////////////////// -static ArangoServer* ArangoInstance = 0; +static ArangoServer* ArangoInstance = nullptr; //////////////////////////////////////////////////////////////////////////////// /// @brief running flag @@ -174,7 +174,7 @@ static void InstallServiceCommand (string command) { SC_HANDLE schSCManager = OpenSCManager(NULL, SERVICES_ACTIVE_DATABASE, SC_MANAGER_ALL_ACCESS); if (schSCManager == 0) { - cout << "FATAL: OpenSCManager failed with " << GetLastError() << endl; + cerr << "FATAL: OpenSCManager failed with " << GetLastError() << endl; exit(EXIT_FAILURE); } @@ -196,7 +196,7 @@ static void InstallServiceCommand (string command) { CloseServiceHandle(schSCManager); if (schService == 0) { - cout << "FATAL: CreateServiceA failed with " << GetLastError() << endl; + cerr << "FATAL: CreateServiceA failed with " << GetLastError() << endl; exit(EXIT_FAILURE); } @@ -220,7 +220,7 @@ static void InstallService (int argc, char* argv[]) { CHAR path[MAX_PATH]; if(! GetModuleFileNameA(NULL, path, MAX_PATH)) { - cout << "FATAL: GetModuleFileNameA failed" << endl; + cerr << "FATAL: GetModuleFileNameA failed" << endl; exit(EXIT_FAILURE); } @@ -251,7 +251,7 @@ static void DeleteService (int argc, char* argv[]) { SC_HANDLE schSCManager = OpenSCManager(NULL, SERVICES_ACTIVE_DATABASE, SC_MANAGER_ALL_ACCESS); if (schSCManager == 0) { - cout << "FATAL: OpenSCManager failed with " << GetLastError() << endl; + cerr << "FATAL: OpenSCManager failed with " << GetLastError() << endl; exit(EXIT_FAILURE); } @@ -263,12 +263,12 @@ static void DeleteService (int argc, char* argv[]) { CloseServiceHandle(schSCManager); if (schService == 0) { - cout << "FATAL: OpenServiceA failed with " << GetLastError() << endl; + cerr << "FATAL: OpenServiceA failed with " << GetLastError() << endl; exit(EXIT_FAILURE); } if (! DeleteService(schService)) { - cout << "FATAL: DeleteService failed with " << GetLastError() << endl; + cerr << "FATAL: DeleteService failed with " << GetLastError() << endl; exit(EXIT_FAILURE); } @@ -309,7 +309,7 @@ static void SetServiceStatus (DWORD dwCurrentState, DWORD dwWin32ExitCode, DWORD ss.dwControlsAccepted = 0; SetServiceStatus(ServiceStatus, &ss); - if (ArangoInstance != 0) { + if (ArangoInstance != nullptr) { ArangoInstance->beginShutdown(); } @@ -346,7 +346,7 @@ static void WINAPI ServiceCtrl (DWORD dwCtrlCode) { if (dwCtrlCode == SERVICE_CONTROL_STOP || dwCtrlCode == SERVICE_CONTROL_SHUTDOWN) { SetServiceStatus(SERVICE_STOP_PENDING, NO_ERROR, 0, 0); - if (ArangoInstance != 0) { + if (ArangoInstance != nullptr) { ArangoInstance->beginShutdown(); while (IsRunning) { @@ -402,9 +402,9 @@ static void WINAPI ServiceMain (DWORD dwArgc, LPSTR *lpszArgv) { int main (int argc, char* argv[]) { int res = 0; + bool startAsService = false; #ifdef _WIN32 - bool startAsService = false; if (1 < argc) { if (TRI_EqualString(argv[1], "--install-service")) { @@ -440,30 +440,36 @@ int main (int argc, char* argv[]) { ARGV = argv; if (! StartServiceCtrlDispatcher(ste)) { - cout << "FATAL: StartServiceCtrlDispatcher has failed with " << GetLastError() << endl; - exit(EXIT_SUCCESS); + cerr << "FATAL: StartServiceCtrlDispatcher has failed with " << GetLastError() << endl; + exit(EXIT_FAILURE); } } - else { + +#endif + + if (! startAsService) { ArangoInstance = new ArangoServer(argc, argv); res = ArangoInstance->start(); } -#else + if (ArangoInstance != nullptr) { + try { + delete ArangoInstance; + } + catch (...) { + // caught an error during shutdown + res = EXIT_FAILURE; - ArangoInstance = new ArangoServer(argc, argv); - res = ArangoInstance->start(); - -#endif - - if (ArangoInstance != 0) { - delete ArangoInstance; - ArangoInstance = 0; +#ifdef TRI_ENABLE_MAINTAINER_MODE + cerr << "Caught an exception during shutdown"; +#endif + } + ArangoInstance = nullptr; } // shutdown sub-systems TRIAGENS_REST_SHUTDOWN; - TRI_GlobalExitFunction(res, NULL); + TRI_GlobalExitFunction(res, nullptr); return res; } diff --git a/arangod/VocBase/server.cpp b/arangod/VocBase/server.cpp index 41c17b9018..197f0ed588 100644 --- a/arangod/VocBase/server.cpp +++ b/arangod/VocBase/server.cpp @@ -1988,15 +1988,13 @@ int TRI_InitDatabasesServer (TRI_server_t* server) { //////////////////////////////////////////////////////////////////////////////// int TRI_StopServer (TRI_server_t* server) { - int res; - // set shutdown flag TRI_LockMutex(&server->_createLock); server->_shutdown = true; TRI_UnlockMutex(&server->_createLock); // stop dbm thread - res = TRI_JoinThread(&server->_databaseManager); + int res = TRI_JoinThread(&server->_databaseManager); CloseDatabases(server); diff --git a/lib/Rest/AnyServer.cpp b/lib/Rest/AnyServer.cpp index 0f35f826b3..5a36d67893 100644 --- a/lib/Rest/AnyServer.cpp +++ b/lib/Rest/AnyServer.cpp @@ -241,7 +241,7 @@ AnyServer::AnyServer () _supervisorMode(false), _pidFile(""), _workingDirectory(""), - _applicationServer(0) { + _applicationServer(nullptr) { } //////////////////////////////////////////////////////////////////////////////// @@ -249,7 +249,7 @@ AnyServer::AnyServer () //////////////////////////////////////////////////////////////////////////////// AnyServer::~AnyServer () { - if (_applicationServer != 0) { + if (_applicationServer != nullptr) { delete _applicationServer; } } From fc4886e1f3786a57e11b0a7f9d42dcee965ef763 Mon Sep 17 00:00:00 2001 From: Jan Steemann Date: Thu, 23 Oct 2014 10:38:31 +0200 Subject: [PATCH 10/11] added diagnostic information --- arangod/Aql/ExecutionBlock.cpp | 79 +++++++++++++++++++++++++++++++++- 1 file changed, 78 insertions(+), 1 deletion(-) diff --git a/arangod/Aql/ExecutionBlock.cpp b/arangod/Aql/ExecutionBlock.cpp index 6754752434..2f195b7568 100644 --- a/arangod/Aql/ExecutionBlock.cpp +++ b/arangod/Aql/ExecutionBlock.cpp @@ -44,6 +44,14 @@ using Json = triagens::basics::Json; using JsonHelper = triagens::basics::JsonHelper; using StringBuffer = triagens::basics::StringBuffer; +#ifdef TRI_ENABLE_MAINTAINER_MODE +#define ENTER_BLOCK try { (void) 0; +#define LEAVE_BLOCK } catch (...) { std::cout << "caught an exception in " << __FUNCTION__ << ", " << __FILE__ << ":" << __LINE__ << "!\n"; throw; } +#else +#define ENTER_BLOCK +#define LEAVE_BLOCK +#endif + // ----------------------------------------------------------------------------- // --SECTION-- struct AggregatorGroup // ----------------------------------------------------------------------------- @@ -3315,6 +3323,7 @@ GatherBlock::GatherBlock (ExecutionEngine* engine, //////////////////////////////////////////////////////////////////////////////// GatherBlock::~GatherBlock () { + ENTER_BLOCK for (std::deque& x : _gatherBlockBuffer) { for (AqlItemBlock* y: x) { delete y; @@ -3322,6 +3331,7 @@ GatherBlock::~GatherBlock () { x.clear(); } _gatherBlockBuffer.clear(); + LEAVE_BLOCK } //////////////////////////////////////////////////////////////////////////////// @@ -3329,6 +3339,7 @@ GatherBlock::~GatherBlock () { //////////////////////////////////////////////////////////////////////////////// int GatherBlock::initialize () { + ENTER_BLOCK auto res = ExecutionBlock::initialize(); if (res != TRI_ERROR_NO_ERROR) { @@ -3336,6 +3347,7 @@ int GatherBlock::initialize () { } return TRI_ERROR_NO_ERROR; + LEAVE_BLOCK } //////////////////////////////////////////////////////////////////////////////// @@ -3343,6 +3355,7 @@ int GatherBlock::initialize () { //////////////////////////////////////////////////////////////////////////////// int GatherBlock::shutdown (int errorCode) { + ENTER_BLOCK // don't call default shutdown method since it does the wrong thing to // _gatherBlockBuffer for (auto it = _dependencies.begin(); it != _dependencies.end(); ++it) { @@ -3362,6 +3375,7 @@ int GatherBlock::shutdown (int errorCode) { _gatherBlockBuffer.clear(); return TRI_ERROR_NO_ERROR; + LEAVE_BLOCK } //////////////////////////////////////////////////////////////////////////////// @@ -3369,6 +3383,7 @@ int GatherBlock::shutdown (int errorCode) { //////////////////////////////////////////////////////////////////////////////// int GatherBlock::initializeCursor (AqlItemBlock* items, size_t pos) { + ENTER_BLOCK int res = ExecutionBlock::initializeCursor(items, pos); if (res != TRI_ERROR_NO_ERROR) { @@ -3395,6 +3410,7 @@ int GatherBlock::initializeCursor (AqlItemBlock* items, size_t pos) { _done = false; return TRI_ERROR_NO_ERROR; + LEAVE_BLOCK } //////////////////////////////////////////////////////////////////////////////// @@ -3403,6 +3419,7 @@ int GatherBlock::initializeCursor (AqlItemBlock* items, size_t pos) { //////////////////////////////////////////////////////////////////////////////// int64_t GatherBlock::count () const { + ENTER_BLOCK int64_t sum = 0; for (auto x: _dependencies) { if (x->count() == -1) { @@ -3411,6 +3428,7 @@ int64_t GatherBlock::count () const { sum += x->count(); } return sum; + LEAVE_BLOCK } //////////////////////////////////////////////////////////////////////////////// @@ -3419,6 +3437,7 @@ int64_t GatherBlock::count () const { //////////////////////////////////////////////////////////////////////////////// int64_t GatherBlock::remaining () { + ENTER_BLOCK int64_t sum = 0; for (auto x : _dependencies) { if (x->remaining() == -1) { @@ -3427,6 +3446,7 @@ int64_t GatherBlock::remaining () { sum += x->remaining(); } return sum; + LEAVE_BLOCK } //////////////////////////////////////////////////////////////////////////////// @@ -3435,6 +3455,7 @@ int64_t GatherBlock::remaining () { //////////////////////////////////////////////////////////////////////////////// bool GatherBlock::hasMore () { + ENTER_BLOCK if (_done) { return false; } @@ -3459,6 +3480,7 @@ bool GatherBlock::hasMore () { } _done = true; return false; + LEAVE_BLOCK } //////////////////////////////////////////////////////////////////////////////// @@ -3466,6 +3488,7 @@ bool GatherBlock::hasMore () { //////////////////////////////////////////////////////////////////////////////// AqlItemBlock* GatherBlock::getSome (size_t atLeast, size_t atMost) { + ENTER_BLOCK if (_done) { return nullptr; } @@ -3579,6 +3602,7 @@ AqlItemBlock* GatherBlock::getSome (size_t atLeast, size_t atMost) { } return res.release(); + LEAVE_BLOCK } //////////////////////////////////////////////////////////////////////////////// @@ -3586,6 +3610,7 @@ AqlItemBlock* GatherBlock::getSome (size_t atLeast, size_t atMost) { //////////////////////////////////////////////////////////////////////////////// size_t GatherBlock::skipSome (size_t atLeast, size_t atMost) { + ENTER_BLOCK if (_done) { return 0; } @@ -3662,6 +3687,7 @@ size_t GatherBlock::skipSome (size_t atLeast, size_t atMost) { } return skipped; + LEAVE_BLOCK } //////////////////////////////////////////////////////////////////////////////// @@ -3670,6 +3696,7 @@ size_t GatherBlock::skipSome (size_t atLeast, size_t atMost) { //////////////////////////////////////////////////////////////////////////////// bool GatherBlock::getBlock (size_t i, size_t atLeast, size_t atMost) { + ENTER_BLOCK TRI_ASSERT(0 <= i && i < _dependencies.size()); AqlItemBlock* docs = _dependencies.at(i)->getSome(atLeast, atMost); if (docs != nullptr) { @@ -3684,6 +3711,7 @@ bool GatherBlock::getBlock (size_t i, size_t atLeast, size_t atMost) { } return false; + LEAVE_BLOCK } //////////////////////////////////////////////////////////////////////////////// @@ -3748,12 +3776,14 @@ BlockWithClients::BlockWithClients (ExecutionEngine* engine, //////////////////////////////////////////////////////////////////////////////// int BlockWithClients::shutdown (int errorCode) { + ENTER_BLOCK if (! _initOrShutdown) { return TRI_ERROR_NO_ERROR; } _initOrShutdown = false; return ExecutionBlock::shutdown(errorCode); + LEAVE_BLOCK } //////////////////////////////////////////////////////////////////////////////// @@ -3763,6 +3793,7 @@ int BlockWithClients::shutdown (int errorCode) { AqlItemBlock* BlockWithClients::getSomeForShard (size_t atLeast, size_t atMost, std::string const& shardId) { + ENTER_BLOCK size_t skipped = 0; AqlItemBlock* result = nullptr; int out = getOrSkipSomeForShard(atLeast, atMost, false, result, skipped, shardId); @@ -3770,6 +3801,7 @@ AqlItemBlock* BlockWithClients::getSomeForShard (size_t atLeast, THROW_ARANGO_EXCEPTION(out); } return result; + LEAVE_BLOCK } //////////////////////////////////////////////////////////////////////////////// @@ -3779,6 +3811,7 @@ AqlItemBlock* BlockWithClients::getSomeForShard (size_t atLeast, size_t BlockWithClients::skipSomeForShard (size_t atLeast, size_t atMost, std::string const& shardId) { + ENTER_BLOCK size_t skipped = 0; AqlItemBlock* result = nullptr; int out = getOrSkipSomeForShard(atLeast, atMost, true, result, skipped, shardId); @@ -3787,6 +3820,7 @@ size_t BlockWithClients::skipSomeForShard (size_t atLeast, THROW_ARANGO_EXCEPTION(out); } return skipped; + LEAVE_BLOCK } //////////////////////////////////////////////////////////////////////////////// @@ -3795,6 +3829,7 @@ size_t BlockWithClients::skipSomeForShard (size_t atLeast, bool BlockWithClients::skipForShard (size_t number, std::string const& shardId) { + ENTER_BLOCK size_t skipped = skipSomeForShard(number, number, shardId); size_t nr = skipped; while (nr != 0 && skipped < number) { @@ -3805,6 +3840,7 @@ bool BlockWithClients::skipForShard (size_t number, return true; } return ! hasMoreForShard(shardId); + LEAVE_BLOCK } //////////////////////////////////////////////////////////////////////////////// @@ -3813,6 +3849,7 @@ bool BlockWithClients::skipForShard (size_t number, //////////////////////////////////////////////////////////////////////////////// size_t BlockWithClients::getClientId (std::string const& shardId) { + ENTER_BLOCK if (shardId.empty()) { TRI_ASSERT(false); THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "got empty shard id"); @@ -3825,6 +3862,7 @@ size_t BlockWithClients::getClientId (std::string const& shardId) { THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, message); } return ((*it).second); + LEAVE_BLOCK } //////////////////////////////////////////////////////////////////////////////// @@ -3833,6 +3871,7 @@ size_t BlockWithClients::getClientId (std::string const& shardId) { //////////////////////////////////////////////////////////////////////////////// bool BlockWithClients::preInitCursor () { + ENTER_BLOCK if (! _initOrShutdown) { return false; } @@ -3846,6 +3885,7 @@ bool BlockWithClients::preInitCursor () { _initOrShutdown = false; return true; + LEAVE_BLOCK } // ----------------------------------------------------------------------------- @@ -3857,6 +3897,7 @@ bool BlockWithClients::preInitCursor () { //////////////////////////////////////////////////////////////////////////////// int ScatterBlock::initializeCursor (AqlItemBlock* items, size_t pos) { + ENTER_BLOCK if (! preInitCursor()) { return TRI_ERROR_NO_ERROR; } @@ -3872,6 +3913,7 @@ int ScatterBlock::initializeCursor (AqlItemBlock* items, size_t pos) { _posForClient.push_back(std::make_pair(0, 0)); } return TRI_ERROR_NO_ERROR; + LEAVE_BLOCK } //////////////////////////////////////////////////////////////////////////////// @@ -3879,6 +3921,7 @@ int ScatterBlock::initializeCursor (AqlItemBlock* items, size_t pos) { //////////////////////////////////////////////////////////////////////////////// bool ScatterBlock::hasMoreForShard (std::string const& shardId) { + ENTER_BLOCK size_t clientId = getClientId(shardId); if (_doneForClient.at(clientId)) { @@ -3897,6 +3940,7 @@ bool ScatterBlock::hasMoreForShard (std::string const& shardId) { } } return true; + LEAVE_BLOCK } //////////////////////////////////////////////////////////////////////////////// @@ -3905,6 +3949,7 @@ bool ScatterBlock::hasMoreForShard (std::string const& shardId) { //////////////////////////////////////////////////////////////////////////////// int64_t ScatterBlock::remainingForShard (std::string const& shardId) { + ENTER_BLOCK size_t clientId = getClientId(shardId); if (_doneForClient.at(clientId)) { return 0; @@ -3925,6 +3970,7 @@ int64_t ScatterBlock::remainingForShard (std::string const& shardId) { } return sum; + LEAVE_BLOCK } //////////////////////////////////////////////////////////////////////////////// @@ -3934,7 +3980,7 @@ int64_t ScatterBlock::remainingForShard (std::string const& shardId) { int ScatterBlock::getOrSkipSomeForShard (size_t atLeast, size_t atMost, bool skipping, AqlItemBlock*& result, size_t& skipped, std::string const& shardId) { - + ENTER_BLOCK TRI_ASSERT(0 < atLeast && atLeast <= atMost); TRI_ASSERT(result == nullptr && skipped == 0); @@ -3993,6 +4039,7 @@ int ScatterBlock::getOrSkipSomeForShard (size_t atLeast, } return TRI_ERROR_NO_ERROR; + LEAVE_BLOCK } // ----------------------------------------------------------------------------- @@ -4020,6 +4067,7 @@ DistributeBlock::DistributeBlock (ExecutionEngine* engine, //////////////////////////////////////////////////////////////////////////////// int DistributeBlock::initializeCursor (AqlItemBlock* items, size_t pos) { + ENTER_BLOCK if (! preInitCursor()) { return TRI_ERROR_NO_ERROR; } @@ -4036,6 +4084,7 @@ int DistributeBlock::initializeCursor (AqlItemBlock* items, size_t pos) { } return TRI_ERROR_NO_ERROR; + LEAVE_BLOCK } //////////////////////////////////////////////////////////////////////////////// @@ -4043,6 +4092,7 @@ int DistributeBlock::initializeCursor (AqlItemBlock* items, size_t pos) { //////////////////////////////////////////////////////////////////////////////// bool DistributeBlock::hasMoreForShard (std::string const& shardId) { + ENTER_BLOCK size_t clientId = getClientId(shardId); if (_doneForClient.at(clientId)) { @@ -4058,6 +4108,7 @@ bool DistributeBlock::hasMoreForShard (std::string const& shardId) { return false; } return true; + LEAVE_BLOCK } //////////////////////////////////////////////////////////////////////////////// @@ -4070,6 +4121,7 @@ int DistributeBlock::getOrSkipSomeForShard (size_t atLeast, AqlItemBlock*& result, size_t& skipped, std::string const& shardId) { + ENTER_BLOCK TRI_ASSERT(0 < atLeast && atLeast <= atMost); TRI_ASSERT(result == nullptr && skipped == 0); @@ -4175,6 +4227,7 @@ int DistributeBlock::getOrSkipSomeForShard (size_t atLeast, } return TRI_ERROR_NO_ERROR; + LEAVE_BLOCK } //////////////////////////////////////////////////////////////////////////////// @@ -4188,6 +4241,7 @@ int DistributeBlock::getOrSkipSomeForShard (size_t atLeast, bool DistributeBlock::getBlockForClient (size_t atLeast, size_t atMost, size_t clientId) { + ENTER_BLOCK if (_buffer.empty()) { _index = 0; // position in _buffer _pos = 0; // position in _buffer.at(_index) @@ -4224,6 +4278,7 @@ bool DistributeBlock::getBlockForClient (size_t atLeast, } return true; + LEAVE_BLOCK } //////////////////////////////////////////////////////////////////////////////// @@ -4233,6 +4288,7 @@ bool DistributeBlock::getBlockForClient (size_t atLeast, //////////////////////////////////////////////////////////////////////////////// size_t DistributeBlock::sendToClient (AqlValue val) { + ENTER_BLOCK TRI_json_t const* json; if (val._type == AqlValue::JSON) { json = val._json->json(); @@ -4268,6 +4324,7 @@ size_t DistributeBlock::sendToClient (AqlValue val) { TRI_ASSERT(!shardId.empty()); return getClientId(shardId); + LEAVE_BLOCK } // ----------------------------------------------------------------------------- @@ -4280,6 +4337,7 @@ size_t DistributeBlock::sendToClient (AqlValue val) { static bool throwExceptionAfterBadSyncRequest (ClusterCommResult* res, bool isShutdown) { + ENTER_BLOCK if (res->status == CL_COMM_TIMEOUT) { std::string errorMessage; errorMessage += std::string("Timeout in communication with shard '") + @@ -4367,6 +4425,7 @@ static bool throwExceptionAfterBadSyncRequest (ClusterCommResult* res, } return false; + LEAVE_BLOCK } //////////////////////////////////////////////////////////////////////////////// @@ -4405,6 +4464,7 @@ ClusterCommResult* RemoteBlock::sendRequest ( triagens::rest::HttpRequest::HttpRequestType type, std::string const& urlPart, std::string const& body) const { + ENTER_BLOCK ClusterComm* cc = ClusterComm::instance(); // Later, we probably want to set these sensibly: @@ -4427,6 +4487,7 @@ ClusterCommResult* RemoteBlock::sendRequest ( defaultTimeOut); return result; + LEAVE_BLOCK } //////////////////////////////////////////////////////////////////////////////// @@ -4434,6 +4495,7 @@ ClusterCommResult* RemoteBlock::sendRequest ( //////////////////////////////////////////////////////////////////////////////// int RemoteBlock::initialize () { + ENTER_BLOCK int res = ExecutionBlock::initialize(); if (res != TRI_ERROR_NO_ERROR) { @@ -4441,6 +4503,7 @@ int RemoteBlock::initialize () { } return TRI_ERROR_NO_ERROR; + LEAVE_BLOCK } //////////////////////////////////////////////////////////////////////////////// @@ -4448,6 +4511,7 @@ int RemoteBlock::initialize () { //////////////////////////////////////////////////////////////////////////////// int RemoteBlock::initializeCursor (AqlItemBlock* items, size_t pos) { + ENTER_BLOCK // For every call we simply forward via HTTP Json body(Json::Array, 2); @@ -4477,6 +4541,7 @@ int RemoteBlock::initializeCursor (AqlItemBlock* items, size_t pos) { responseBodyBuf.begin())); return JsonHelper::getNumericValue (responseBodyJson.json(), "code", TRI_ERROR_INTERNAL); + LEAVE_BLOCK } //////////////////////////////////////////////////////////////////////////////// @@ -4484,6 +4549,7 @@ int RemoteBlock::initializeCursor (AqlItemBlock* items, size_t pos) { //////////////////////////////////////////////////////////////////////////////// int RemoteBlock::shutdown (int errorCode) { + ENTER_BLOCK // For every call we simply forward via HTTP std::unique_ptr res; @@ -4504,6 +4570,7 @@ int RemoteBlock::shutdown (int errorCode) { return JsonHelper::getNumericValue (responseBodyJson.json(), "code", TRI_ERROR_INTERNAL); + LEAVE_BLOCK } //////////////////////////////////////////////////////////////////////////////// @@ -4512,6 +4579,7 @@ int RemoteBlock::shutdown (int errorCode) { AqlItemBlock* RemoteBlock::getSome (size_t atLeast, size_t atMost) { + ENTER_BLOCK // For every call we simply forward via HTTP Json body(Json::Array, 2); @@ -4543,6 +4611,7 @@ AqlItemBlock* RemoteBlock::getSome (size_t atLeast, _deltaStats = newStats; return items; + LEAVE_BLOCK } //////////////////////////////////////////////////////////////////////////////// @@ -4550,6 +4619,7 @@ AqlItemBlock* RemoteBlock::getSome (size_t atLeast, //////////////////////////////////////////////////////////////////////////////// size_t RemoteBlock::skipSome (size_t atLeast, size_t atMost) { + ENTER_BLOCK // For every call we simply forward via HTTP Json body(Json::Array, 2); @@ -4575,6 +4645,7 @@ size_t RemoteBlock::skipSome (size_t atLeast, size_t atMost) { size_t skipped = JsonHelper::getNumericValue(responseBodyJson.json(), "skipped", 0); return skipped; + LEAVE_BLOCK } //////////////////////////////////////////////////////////////////////////////// @@ -4582,6 +4653,7 @@ size_t RemoteBlock::skipSome (size_t atLeast, size_t atMost) { //////////////////////////////////////////////////////////////////////////////// bool RemoteBlock::hasMore () { + ENTER_BLOCK // For every call we simply forward via HTTP std::unique_ptr res; res.reset(sendRequest(rest::HttpRequest::HTTP_REQUEST_GET, @@ -4599,6 +4671,7 @@ bool RemoteBlock::hasMore () { THROW_ARANGO_EXCEPTION(TRI_ERROR_CLUSTER_AQL_COMMUNICATION); } return JsonHelper::getBooleanValue(responseBodyJson.json(), "hasMore", true); + LEAVE_BLOCK } //////////////////////////////////////////////////////////////////////////////// @@ -4606,6 +4679,7 @@ bool RemoteBlock::hasMore () { //////////////////////////////////////////////////////////////////////////////// int64_t RemoteBlock::count () const { + ENTER_BLOCK // For every call we simply forward via HTTP std::unique_ptr res; res.reset(sendRequest(rest::HttpRequest::HTTP_REQUEST_GET, @@ -4624,6 +4698,7 @@ int64_t RemoteBlock::count () const { } return JsonHelper::getNumericValue (responseBodyJson.json(), "count", 0); + LEAVE_BLOCK } //////////////////////////////////////////////////////////////////////////////// @@ -4631,6 +4706,7 @@ int64_t RemoteBlock::count () const { //////////////////////////////////////////////////////////////////////////////// int64_t RemoteBlock::remaining () { + ENTER_BLOCK // For every call we simply forward via HTTP std::unique_ptr res; res.reset(sendRequest(rest::HttpRequest::HTTP_REQUEST_GET, @@ -4649,6 +4725,7 @@ int64_t RemoteBlock::remaining () { } return JsonHelper::getNumericValue (responseBodyJson.json(), "remaining", 0); + LEAVE_BLOCK } // Local Variables: From 8a544eefb579a957f3102732c51652fea017e220 Mon Sep 17 00:00:00 2001 From: Jan Steemann Date: Thu, 23 Oct 2014 11:26:28 +0200 Subject: [PATCH 11/11] commit transaction on remote servers --- arangod/Aql/ExecutionBlock.cpp | 2 +- arangod/Aql/ExecutionBlock.h | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/arangod/Aql/ExecutionBlock.cpp b/arangod/Aql/ExecutionBlock.cpp index 2f195b7568..5815b58d08 100644 --- a/arangod/Aql/ExecutionBlock.cpp +++ b/arangod/Aql/ExecutionBlock.cpp @@ -4555,7 +4555,7 @@ int RemoteBlock::shutdown (int errorCode) { std::unique_ptr res; res.reset(sendRequest(rest::HttpRequest::HTTP_REQUEST_PUT, "/_api/aql/shutdown/", - string("{\"code\":\"" + std::to_string(errorCode) + "\"}"))); + string("{\"code\":" + std::to_string(errorCode) + "}"))); if (throwExceptionAfterBadSyncRequest(res.get(), true)) { // artificially ignore error in case query was not found during shutdown return TRI_ERROR_NO_ERROR; diff --git a/arangod/Aql/ExecutionBlock.h b/arangod/Aql/ExecutionBlock.h index f4b5ed4951..10a31b5a1a 100644 --- a/arangod/Aql/ExecutionBlock.h +++ b/arangod/Aql/ExecutionBlock.h @@ -1529,7 +1529,7 @@ namespace triagens { /// @brief shutdown //////////////////////////////////////////////////////////////////////////////// - int shutdown (int) override; + int shutdown (int) override; //////////////////////////////////////////////////////////////////////////////// /// @brief getSome: shouldn't be used, use skipSomeForShard