diff --git a/arangod/Aql/ExecutionBlock.cpp b/arangod/Aql/ExecutionBlock.cpp index db8e393d87..06b12429b0 100644 --- a/arangod/Aql/ExecutionBlock.cpp +++ b/arangod/Aql/ExecutionBlock.cpp @@ -644,23 +644,27 @@ EnumerateCollectionBlock::EnumerateCollectionBlock (ExecutionEngine* engine, : ExecutionBlock(engine, ep), _collection(ep->_collection), _totalCount(0), - _posInAllDocs(0) { + _posInDocuments(0), + _atBeginning(false) { } EnumerateCollectionBlock::~EnumerateCollectionBlock () { } -bool EnumerateCollectionBlock::moreDocuments () { - if (_documents.empty()) { - _documents.reserve(DefaultBatchSize); +bool EnumerateCollectionBlock::moreDocuments (size_t hint) { + + if (hint < DefaultBatchSize) { + hint = DefaultBatchSize; } - _documents.clear(); + std::vector newDocs; + + newDocs.reserve(hint); int res = _trx->readIncremental(_trx->trxCollection(_collection->cid()), - _documents, + newDocs, _internalSkip, - static_cast(DefaultBatchSize), + static_cast(hint), 0, TRI_QRY_NO_LIMIT, &_totalCount); @@ -669,9 +673,18 @@ bool EnumerateCollectionBlock::moreDocuments () { THROW_ARANGO_EXCEPTION(res); } - _engine->_stats.scannedFull += static_cast(_documents.size()); + _engine->_stats.scannedFull += static_cast(newDocs.size()); - return (! _documents.empty()); + if (newDocs.empty()) { + return false; + } + + _documents.swap(newDocs); + + _atBeginning = _internalSkip == 0; + _posInDocuments = 0; + + return true; } int EnumerateCollectionBlock::initialize () { @@ -686,10 +699,6 @@ int EnumerateCollectionBlock::initializeCursor (AqlItemBlock* items, size_t pos) initializeDocuments(); - if (_totalCount == 0) { - _done = true; - } - return TRI_ERROR_NO_ERROR; } @@ -699,6 +708,12 @@ int EnumerateCollectionBlock::initializeCursor (AqlItemBlock* items, size_t pos) AqlItemBlock* EnumerateCollectionBlock::getSome (size_t, // atLeast, size_t atMost) { + // Invariants: + // As soon as we notice that _totalCount == 0, we set _done = true. + // Otherwise, outside of this method (or skipSome), _documents is + // either empty (at the beginning, with _posInDocuments == 0 and + // _atBeginning == false), or is non-empty and + // _posInDocuments < _documents.size() if (_done) { return nullptr; } @@ -709,15 +724,23 @@ AqlItemBlock* EnumerateCollectionBlock::getSome (size_t, // atLeast, return nullptr; } _pos = 0; // this is in the first block - _posInAllDocs = 0; // Note that we know _allDocs.size() > 0, - // otherwise _done would be true already + initializeDocuments(); } // If we get here, we do have _buffer.front() AqlItemBlock* cur = _buffer.front(); size_t const curRegs = cur->getNrRegs(); - size_t available = _documents.size() - _posInAllDocs; + // Get more documents from collection if _documents is empty: + if (_posInDocuments >= _documents.size()) { + if (! moreDocuments(atMost)) { + TRI_ASSERT(_totalCount == 0); + _done = true; + return nullptr; + } + } + + size_t available = _documents.size() - _posInDocuments; size_t toSend = (std::min)(atMost, available); unique_ptr res(new AqlItemBlock(toSend, getPlanNode()->getRegisterPlan()->nrRegs[getPlanNode()->getDepth()])); @@ -745,17 +768,15 @@ AqlItemBlock* EnumerateCollectionBlock::getSome (size_t, // atLeast, // but can just take cur->getNrRegs() as registerId: res->setValue(j, static_cast(curRegs), AqlValue(reinterpret_cast(_documents[_posInAllDocs++].getDataPtr()))); + const*>(_documents[_posInDocuments++].getDataPtr()))); // No harm done, if the setValue throws! } // Advance read position: - if (_posInAllDocs >= _documents.size()) { + if (_posInDocuments >= _documents.size()) { // we have exhausted our local documents buffer - _posInAllDocs = 0; - // fetch more documents into our buffer - if (! moreDocuments()) { + if (! moreDocuments(atMost)) { // nothing more to read, re-initialize fetching of documents initializeDocuments(); if (++_pos >= cur->size()) { @@ -784,19 +805,26 @@ size_t EnumerateCollectionBlock::skipSome (size_t atLeast, size_t atMost) { return skipped; } _pos = 0; // this is in the first block - _posInAllDocs = 0; // Note that we know _allDocs.size() > 0, - // otherwise _done would be true already + initializeDocuments(); } // if we get here, then _buffer.front() exists AqlItemBlock* cur = _buffer.front(); - if (atMost >= skipped + _documents.size() - _posInAllDocs) { - skipped += _documents.size() - _posInAllDocs; - _posInAllDocs = 0; + // Get more documents from collection if _documents is empty: + if (_posInDocuments >= _documents.size()) { + if (! moreDocuments(atMost)) { + TRI_ASSERT(_totalCount == 0); + _done = true; + return skipped; + } + } + + if (atMost >= skipped + _documents.size() - _posInDocuments) { + skipped += _documents.size() - _posInDocuments; // fetch more documents into our buffer - if (! moreDocuments()) { + if (! moreDocuments(atMost - skipped)) { // nothing more to read, re-initialize fetching of documents initializeDocuments(); if (++_pos >= cur->size()) { @@ -807,7 +835,7 @@ size_t EnumerateCollectionBlock::skipSome (size_t atLeast, size_t atMost) { } } else { - _posInAllDocs += atMost - skipped; + _posInDocuments += atMost - skipped; skipped = atMost; } } diff --git a/arangod/Aql/ExecutionBlock.h b/arangod/Aql/ExecutionBlock.h index feed4c3edd..eb977f2e94 100644 --- a/arangod/Aql/ExecutionBlock.h +++ b/arangod/Aql/ExecutionBlock.h @@ -442,16 +442,17 @@ namespace triagens { void initializeDocuments () { _internalSkip = 0; - if (! moreDocuments()) { - _done = true; + if (!_atBeginning) { + _documents.clear(); } + _posInDocuments = 0; } //////////////////////////////////////////////////////////////////////////////// /// @brief continue fetching of documents //////////////////////////////////////////////////////////////////////////////// - bool moreDocuments (); + bool moreDocuments (size_t hint); //////////////////////////////////////////////////////////////////////////////// /// @brief initialize, here we fetch all docs from the database @@ -460,11 +461,15 @@ namespace triagens { int initialize () override; //////////////////////////////////////////////////////////////////////////////// -/// @brief initCursor, here we release our docs from this collection +/// @brief initializeCursor //////////////////////////////////////////////////////////////////////////////// int initializeCursor (AqlItemBlock* items, size_t pos) override; +//////////////////////////////////////////////////////////////////////////////// +/// @brief getSome +//////////////////////////////////////////////////////////////////////////////// + AqlItemBlock* getSome (size_t atLeast, size_t atMost) override; //////////////////////////////////////////////////////////////////////////////// @@ -506,11 +511,16 @@ namespace triagens { std::vector _documents; //////////////////////////////////////////////////////////////////////////////// -/// @brief current position in _allDocs +/// @brief current position in _documents //////////////////////////////////////////////////////////////////////////////// - size_t _posInAllDocs; + size_t _posInDocuments; +//////////////////////////////////////////////////////////////////////////////// +/// @brief current position in _documents +//////////////////////////////////////////////////////////////////////////////// + + bool _atBeginning; }; // -----------------------------------------------------------------------------