From fabfeae14dd56cf1c37186e6a6b24e0f731030bb Mon Sep 17 00:00:00 2001 From: Max Neunhoeffer Date: Mon, 4 Aug 2014 16:34:37 +0200 Subject: [PATCH] Add LimitNode. Implement skip generically. --- arangod/Aql/ExecutionBlock.h | 222 +++++++++++++++++++++++++++++++- arangod/Aql/ExecutionEngine.cpp | 6 +- arangod/Aql/ExecutionNode.h | 4 +- 3 files changed, 224 insertions(+), 8 deletions(-) diff --git a/arangod/Aql/ExecutionBlock.h b/arangod/Aql/ExecutionBlock.h index a6ec48189e..c7e07b0a11 100644 --- a/arangod/Aql/ExecutionBlock.h +++ b/arangod/Aql/ExecutionBlock.h @@ -449,7 +449,9 @@ namespace triagens { } virtual bool hasMore () { - // FIXME: do this correctly! + if (_done) { + return false; + } if (_buffer.size() > 0) { return true; } @@ -461,8 +463,52 @@ namespace triagens { } virtual bool skip (size_t number) { - // NOT YET IMPLEMENTED - return false; + if (_done) { + return false; + } + + // Here, if _buffer.size() is > 0 then _pos points to a valid place + // in it. + + size_t skipped = 0; + vector collector; + while (skipped < number) { + if (_buffer.empty()) { + if (! getBlock(number - skipped, number - skipped)) { + _done = true; + return false; + } + _pos = 0; + } + AqlItemBlock* cur = _buffer.front(); + if (cur->size() - _pos + skipped > number) { + // The current block is too large: + _pos += number - skipped; + skipped = number; + } + else { + // The current block fits into our result, but it might be + // half-eaten already: + skipped += cur->size() - _pos; + delete cur; + _buffer.pop_front(); + _pos = 0; + } + } + + // When we get here, we have skipped enough documents and + // kept our data structures OK. + // If _buffer.size() == 0, we have to try to get another block + // just to be sure to get the return value right: + if (_buffer.size() > 0) { + return true; + } + + if (! getBlock(1000,1000)) { + _done = true; + return false; + } + return true; } virtual int64_t count () { @@ -496,7 +542,13 @@ namespace triagens { std::vector _dependencies; //////////////////////////////////////////////////////////////////////////////// -/// @brief this is our buffer for the items, it is a deque of AqlItemBlocks +/// @brief this is our buffer for the items, it is a deque of AqlItemBlocks. +/// We keep the following invariant between this and the other two variables +/// _pos and _done: If _buffer.size() != 0, then 0 <= _pos < _buffer[0]->size() +/// and _buffer[0][_pos] is the next item to be handed on. If _done is true, +/// then no more documents will ever be returned. _done will be set to +/// true if and only if we have no more data ourselves (i.e. _buffer.size()==0) +/// and we have unsuccessfully tried to get another block from our dependency. //////////////////////////////////////////////////////////////////////////////// std::deque _buffer; @@ -1041,7 +1093,7 @@ namespace triagens { }; // ----------------------------------------------------------------------------- -// --SECTION-- FilterBlock +// --SECTION-- FilterBlock // ----------------------------------------------------------------------------- class FilterBlock : public ExecutionBlock { @@ -1297,6 +1349,150 @@ namespace triagens { }; +// ----------------------------------------------------------------------------- +// --SECTION-- LimitBlock +// ----------------------------------------------------------------------------- + + class LimitBlock : public ExecutionBlock { + + public: + +//////////////////////////////////////////////////////////////////////////////// +/// @brief constructor +//////////////////////////////////////////////////////////////////////////////// + + LimitBlock (LimitNode const* ep) + : ExecutionBlock(ep), _offset(ep->_offset), _limit(ep->_limit), + _state(0) { // start in the beginning + } + +//////////////////////////////////////////////////////////////////////////////// +/// @brief destructor +//////////////////////////////////////////////////////////////////////////////// + + ~LimitBlock () { + } + +//////////////////////////////////////////////////////////////////////////////// +/// @brief initialize +//////////////////////////////////////////////////////////////////////////////// + + int initialize () { + int res = ExecutionBlock::initialize(); + if (res != TRI_ERROR_NO_ERROR) { + return res; + } + _state = 0; + _count = 0; + return TRI_ERROR_NO_ERROR; + } + +//////////////////////////////////////////////////////////////////////////////// +/// @brief getOne +//////////////////////////////////////////////////////////////////////////////// + + virtual AqlItemBlock* getOne () { + if (_state == 2) { + return nullptr; + } + + if (_state == 0) { + if (_offset > 0) { + ExecutionBlock::skip(_offset); + _state = 1; + _count = 0; + if (_limit == 0) { + _state = 2; + return nullptr; + } + } + } + + // If we get to here, _state == 1 and _count < _limit + + // Fetch one from above, possibly using our _buffer: + auto res = ExecutionBlock::getOne(); + if (res == nullptr) { + _state = 2; + return res; + } + TRI_ASSERT(res->size() == 1); + + if (++_count >= _limit) { + _state = 2; + } + + return res; + } + +//////////////////////////////////////////////////////////////////////////////// +/// @brief getSome +//////////////////////////////////////////////////////////////////////////////// + + virtual AqlItemBlock* getSome (size_t atLeast, + size_t atMost) { + if (_state == 2) { + return nullptr; + } + + if (_state == 0) { + if (_offset > 0) { + ExecutionBlock::skip(_offset); + _state = 1; + _count = 0; + if (_limit == 0) { + _state = 2; + return nullptr; + } + } + } + + // If we get to here, _state == 1 and _count < _limit + + if (atMost > _limit - _count) { + atMost = _limit - _count; + if (atLeast > atMost) { + atLeast = atMost; + } + } + + auto res = ExecutionBlock::getSome(atLeast, atMost); + if (res == nullptr) { + return res; + } + _count += res->size(); + if (_count >= _limit) { + _state = 2; + } + + return res; + } + +//////////////////////////////////////////////////////////////////////////////// +/// @brief _offset +//////////////////////////////////////////////////////////////////////////////// + + size_t _offset; + +//////////////////////////////////////////////////////////////////////////////// +/// @brief _limit +//////////////////////////////////////////////////////////////////////////////// + + size_t _limit; + +//////////////////////////////////////////////////////////////////////////////// +/// @brief _count, number of items already handed on +//////////////////////////////////////////////////////////////////////////////// + + size_t _count; + +//////////////////////////////////////////////////////////////////////////////// +/// @brief _state, 0 is beginning, 1 is after offset, 2 is done +//////////////////////////////////////////////////////////////////////////////// + + int _state; + }; + // ----------------------------------------------------------------------------- // --SECTION-- ReturnBlock // ----------------------------------------------------------------------------- @@ -1305,14 +1501,26 @@ namespace triagens { public: +//////////////////////////////////////////////////////////////////////////////// +/// @brief constructor +//////////////////////////////////////////////////////////////////////////////// + ReturnBlock (ReturnNode const* ep) : ExecutionBlock(ep) { } +//////////////////////////////////////////////////////////////////////////////// +/// @brief destructor +//////////////////////////////////////////////////////////////////////////////// + ~ReturnBlock () { } +//////////////////////////////////////////////////////////////////////////////// +/// @brief getOne +//////////////////////////////////////////////////////////////////////////////// + virtual AqlItemBlock* getOne () { // Fetch one from above, possibly using our _buffer: auto res = ExecutionBlock::getOne(); @@ -1333,6 +1541,10 @@ namespace triagens { return stripped; } +//////////////////////////////////////////////////////////////////////////////// +/// @brief getSome +//////////////////////////////////////////////////////////////////////////////// + virtual AqlItemBlock* getSome (size_t atLeast, size_t atMost) { auto res = ExecutionBlock::getSome(atLeast, atMost); diff --git a/arangod/Aql/ExecutionEngine.cpp b/arangod/Aql/ExecutionEngine.cpp index 28ba7daa85..bd3810dcc6 100644 --- a/arangod/Aql/ExecutionEngine.cpp +++ b/arangod/Aql/ExecutionEngine.cpp @@ -95,7 +95,11 @@ struct Instanciator : public ExecutionNode::WalkerWorker { eb = new FilterBlock(static_cast(en)); break; } - case ExecutionNode::ROOT: { + case ExecutionNode::LIMIT: { + eb = new LimitBlock(static_cast(en)); + break; + } + case ExecutionNode::RETURN: { eb = new ReturnBlock(static_cast(en)); root = eb; break; diff --git a/arangod/Aql/ExecutionNode.h b/arangod/Aql/ExecutionNode.h index 471f1a2248..5e39542996 100644 --- a/arangod/Aql/ExecutionNode.h +++ b/arangod/Aql/ExecutionNode.h @@ -82,7 +82,7 @@ namespace triagens { REMOVE, REPLACE, UPDATE, - ROOT // done + RETURN // done }; // ----------------------------------------------------------------------------- @@ -1113,7 +1113,7 @@ namespace triagens { //////////////////////////////////////////////////////////////////////////////// virtual NodeType getType () const { - return ROOT; + return RETURN; } ////////////////////////////////////////////////////////////////////////////////