From f932e9277cc7fc3d5f3cc358d551ae8da04cda0f Mon Sep 17 00:00:00 2001 From: Jan Steemann Date: Fri, 22 Aug 2014 10:16:32 +0200 Subject: [PATCH] cloned EnumerateCollectionBlock into IndexRangeBlock --- arangod/Aql/ExecutionBlock.cpp | 174 ++++++++++++++++++++++++++++++++ arangod/Aql/ExecutionBlock.h | 90 +++++++++++++++++ arangod/Aql/ExecutionEngine.cpp | 5 + 3 files changed, 269 insertions(+) diff --git a/arangod/Aql/ExecutionBlock.cpp b/arangod/Aql/ExecutionBlock.cpp index 0cbe32d60a..7ce9374850 100644 --- a/arangod/Aql/ExecutionBlock.cpp +++ b/arangod/Aql/ExecutionBlock.cpp @@ -704,6 +704,180 @@ size_t EnumerateCollectionBlock::skipSome (size_t atLeast, size_t atMost) { return skipped; } +// ----------------------------------------------------------------------------- +// --SECTION-- class IndexRangeBlock +// ----------------------------------------------------------------------------- + +IndexRangeBlock::IndexRangeBlock (ExecutionEngine* engine, + IndexRangeNode const* ep) + : ExecutionBlock(engine, ep), + _collection(ep->_collection), + _posInAllDocs(0) { +} + +IndexRangeBlock::~IndexRangeBlock () { +} + +bool IndexRangeBlock::moreDocuments () { + if (_documents.empty()) { + _documents.reserve(DefaultBatchSize); + } + + _documents.clear(); + + int res = _trx->readIncremental(_trx->trxCollection(_collection->cid()), + _documents, + _internalSkip, + static_cast(DefaultBatchSize), + 0, + TRI_QRY_NO_LIMIT, + &_totalCount); + + if (res != TRI_ERROR_NO_ERROR) { + THROW_ARANGO_EXCEPTION(res); + } + + return (! _documents.empty()); +} + +int IndexRangeBlock::initialize () { + return ExecutionBlock::initialize(); +} + +int IndexRangeBlock::initCursor (AqlItemBlock* items, size_t pos) { + int res = ExecutionBlock::initCursor(items, pos); + if (res != TRI_ERROR_NO_ERROR) { + return res; + } + + initDocuments(); + + if (_totalCount == 0) { + _done = true; + } + + return TRI_ERROR_NO_ERROR; +} + +//////////////////////////////////////////////////////////////////////////////// +/// @brief getSome +//////////////////////////////////////////////////////////////////////////////// + +AqlItemBlock* IndexRangeBlock::getSome (size_t atLeast, + size_t atMost) { + if (_done) { + return nullptr; + } + + if (_buffer.empty()) { + if (! ExecutionBlock::getBlock(DefaultBatchSize, DefaultBatchSize)) { + _done = true; + return nullptr; + } + _pos = 0; // this is in the first block + _posInAllDocs = 0; // Note that we know _allDocs.size() > 0, + // otherwise _done would be true already + } + + // If we get here, we do have _buffer.front() + AqlItemBlock* cur = _buffer.front(); + size_t const curRegs = cur->getNrRegs(); + + size_t available = _documents.size() - _posInAllDocs; + size_t toSend = std::min(atMost, available); + + unique_ptr res(new AqlItemBlock(toSend, _varOverview->nrRegs[_depth])); + // automatically freed if we throw + TRI_ASSERT(curRegs <= res->getNrRegs()); + + // only copy 1st row of registers inherited from previous frame(s) + inheritRegisters(cur, res.get(), _pos); + + // set our collection for our output register + res->setDocumentCollection(curRegs, _trx->documentCollection(_collection->cid())); + + for (size_t j = 0; j < toSend; j++) { + if (j > 0) { + // re-use already copied aqlvalues + for (RegisterId i = 0; i < curRegs; i++) { + res->setValue(j, i, res->getValue(0, i)); + // Note: if this throws, then all values will be deleted + // properly since the first one is. + } + } + + // The result is in the first variable of this depth, + // we do not need to do a lookup in _varOverview->varInfo, + // but can just take cur->getNrRegs() as registerId: + res->setValue(j, curRegs, + AqlValue(reinterpret_cast(_documents[_posInAllDocs++].getDataPtr()))); + // No harm done, if the setValue throws! + } + + // Advance read position: + if (_posInAllDocs >= _documents.size()) { + // we have exhausted our local documents buffer + _posInAllDocs = 0; + + // fetch more documents into our buffer + if (! moreDocuments()) { + // nothing more to read, re-initialize fetching of documents + initDocuments(); + if (++_pos >= cur->size()) { + _buffer.pop_front(); // does not throw + delete cur; + _pos = 0; + } + } + } + return res.release(); +} + +size_t IndexRangeBlock::skipSome (size_t atLeast, size_t atMost) { + + size_t skipped = 0; + + if (_done) { + return skipped; + } + + while (skipped < atLeast) { + if (_buffer.empty()) { + if (! getBlock(DefaultBatchSize, DefaultBatchSize)) { + _done = true; + return skipped; + } + _pos = 0; // this is in the first block + _posInAllDocs = 0; // Note that we know _allDocs.size() > 0, + // otherwise _done would be true already + } + + // if we get here, then _buffer.front() exists + AqlItemBlock* cur = _buffer.front(); + + if (atMost >= skipped + _documents.size() - _posInAllDocs) { + skipped += _documents.size() - _posInAllDocs; + _posInAllDocs = 0; + + // fetch more documents into our buffer + if (! moreDocuments()) { + // nothing more to read, re-initialize fetching of documents + initDocuments(); + if (++_pos >= cur->size()) { + _buffer.pop_front(); // does not throw + delete cur; + _pos = 0; + } + } + } + else { + _posInAllDocs += atMost - skipped; + skipped = atMost; + } + } + return skipped; +} // ----------------------------------------------------------------------------- // --SECTION-- class EnumerateListBlock diff --git a/arangod/Aql/ExecutionBlock.h b/arangod/Aql/ExecutionBlock.h index 5bf90276e6..c870b90744 100644 --- a/arangod/Aql/ExecutionBlock.h +++ b/arangod/Aql/ExecutionBlock.h @@ -561,6 +561,96 @@ namespace triagens { }; +// ----------------------------------------------------------------------------- +// --SECTION-- IndexRangeBlock +// ----------------------------------------------------------------------------- + + class IndexRangeBlock : public ExecutionBlock { + + public: + + IndexRangeBlock (ExecutionEngine* engine, + IndexRangeNode const* ep); + + ~IndexRangeBlock (); + +//////////////////////////////////////////////////////////////////////////////// +/// @brief initialize fetching of documents +//////////////////////////////////////////////////////////////////////////////// + + void initDocuments () { + _internalSkip = 0; + if (! moreDocuments()) { + _done = true; + } + } + +//////////////////////////////////////////////////////////////////////////////// +/// @brief continue fetching of documents +//////////////////////////////////////////////////////////////////////////////// + + bool moreDocuments (); + +//////////////////////////////////////////////////////////////////////////////// +/// @brief initialize, here we fetch all docs from the database +//////////////////////////////////////////////////////////////////////////////// + + int initialize (); + +//////////////////////////////////////////////////////////////////////////////// +/// @brief initCursor, here we release our docs from this collection +//////////////////////////////////////////////////////////////////////////////// + + int initCursor (AqlItemBlock* items, size_t pos); + + AqlItemBlock* getSome (size_t atLeast, size_t atMost); + +//////////////////////////////////////////////////////////////////////////////// +// skip between atLeast and atMost, returns the number actually skipped . . . +// will only return less than atLeast if there aren't atLeast many +// things to skip overall. +//////////////////////////////////////////////////////////////////////////////// + + size_t skipSome (size_t atLeast, size_t atMost); + +// ----------------------------------------------------------------------------- +// --SECTION-- private variables +// ----------------------------------------------------------------------------- + + private: + +//////////////////////////////////////////////////////////////////////////////// +/// @brief collection +//////////////////////////////////////////////////////////////////////////////// + + Collection* _collection; + +//////////////////////////////////////////////////////////////////////////////// +/// @brief total number of documents in the collection +//////////////////////////////////////////////////////////////////////////////// + + uint32_t _totalCount; + +//////////////////////////////////////////////////////////////////////////////// +/// @brief internal skip value +//////////////////////////////////////////////////////////////////////////////// + + TRI_voc_size_t _internalSkip; + +//////////////////////////////////////////////////////////////////////////////// +/// @brief document buffer +//////////////////////////////////////////////////////////////////////////////// + + std::vector _documents; + +//////////////////////////////////////////////////////////////////////////////// +/// @brief current position in _allDocs +//////////////////////////////////////////////////////////////////////////////// + + size_t _posInAllDocs; + + }; + // ----------------------------------------------------------------------------- // --SECTION-- EnumerateListBlock // ----------------------------------------------------------------------------- diff --git a/arangod/Aql/ExecutionEngine.cpp b/arangod/Aql/ExecutionEngine.cpp index befeddea1c..444c2897bc 100644 --- a/arangod/Aql/ExecutionEngine.cpp +++ b/arangod/Aql/ExecutionEngine.cpp @@ -93,6 +93,11 @@ struct Instanciator : public WalkerWorker { static_cast(en)); break; } + case ExecutionNode::INDEX_RANGE: { + eb = new IndexRangeBlock(engine, + static_cast(en)); + break; + } case ExecutionNode::ENUMERATE_COLLECTION: { eb = new EnumerateCollectionBlock(engine, static_cast(en));