From 2ac9adcf1cbbd6fa2a8dd3b0412b3d53d138a8ee Mon Sep 17 00:00:00 2001 From: James Date: Thu, 7 Aug 2014 15:05:59 +0200 Subject: [PATCH] first version of EnumerateListBlock. It is missing the method for getSome, and only works for json lists. --- arangod/Aql/ExecutionBlock.h | 231 +++++++++++++++++++++++++++++++++++ 1 file changed, 231 insertions(+) diff --git a/arangod/Aql/ExecutionBlock.h b/arangod/Aql/ExecutionBlock.h index 80df468eea..a8bbea6df7 100644 --- a/arangod/Aql/ExecutionBlock.h +++ b/arangod/Aql/ExecutionBlock.h @@ -1032,6 +1032,237 @@ namespace triagens { }; +// ----------------------------------------------------------------------------- +// --SECTION-- EnumerateListBlock +// ----------------------------------------------------------------------------- + + class EnumerateListBlock : public ExecutionBlock { + + public: + +//////////////////////////////////////////////////////////////////////////////// +/// @brief constructor +//////////////////////////////////////////////////////////////////////////////// + + EnumerateListBlock (AQL_TRANSACTION_V8* trx, + EnumerateListNode const* ep) + : ExecutionBlock(trx, ep), _trx(nullptr) { + + int res = _trx->begin(); + + if (res != TRI_ERROR_NO_ERROR) { + // transaction failure + delete _trx; + THROW_ARANGO_EXCEPTION(res); + } + + } + +//////////////////////////////////////////////////////////////////////////////// +/// @brief destructor +//////////////////////////////////////////////////////////////////////////////// + + ~EnumerateListBlock () { + if (_trx != nullptr) { + // finalize our own transaction + delete _trx; + } + } + +//////////////////////////////////////////////////////////////////////////////// +/// @brief initialize fetching of documents +//////////////////////////////////////////////////////////////////////////////// + + // void initDocuments () { + +//////////////////////////////////////////////////////////////////////////////// +/// @brief continue fetching of documents +//////////////////////////////////////////////////////////////////////////////// + + //bool moreDocuments () { + +//////////////////////////////////////////////////////////////////////////////// +/// @brief initialize, here we get the inVariable +//////////////////////////////////////////////////////////////////////////////// + + int initialize () { + int res = ExecutionBlock::initialize(); + + if (res != TRI_ERROR_NO_ERROR) { + return res; + } + + auto en = reinterpret_cast(_exeNode); + + // get the inVariable register id . . . + // staticAnalysis has been run, so _varOverview is set up + + auto it = _varOverview->varInfo.find(en->_inVariable->id); + if (it == _varOverview->varInfo.end()){ + THROW_ARANGO_EXCEPTION(TRI_ERROR_INTERNAL); + } + + _inVarRegId = (*it).second.registerId; + + _index = 0; // index in _inVariable for current run + + return TRI_ERROR_NO_ERROR; + } + +//////////////////////////////////////////////////////////////////////////////// +/// @brief execute, here we release our docs from this collection +//////////////////////////////////////////////////////////////////////////////// + + int execute () { + int res = ExecutionBlock::execute(); + + if (res != TRI_ERROR_NO_ERROR) { + return res; + } + + // handle local data (if any) + + return TRI_ERROR_NO_ERROR; + } + +//////////////////////////////////////////////////////////////////////////////// +/// @brief getOne +//////////////////////////////////////////////////////////////////////////////// + + AqlItemBlock* getOne () { + + if (_done) { + return nullptr; + } + + if (_buffer.empty()) { + if (! ExecutionBlock::getBlock(1, DefaultBatchSize)) { + _done = true; + return nullptr; + } + _pos = 0; // this is in the first block + } + + // if we make it here, then _buffer.front() exists + AqlItemBlock* cur = _buffer.front(); + + // copy stuff from the incoming block . . . + auto res = new AqlItemBlock(1, _varOverview->nrRegs[_depth]); + TRI_ASSERT(cur->getNrRegs() <= res->getNrRegs()); + for (RegisterId i = 0; i < cur->getNrRegs(); i++) { + res->setValue(0, i, cur->getValue(_pos, i).clone()); + res->setDocumentCollection(i, cur->getDocumentCollection(i)); + } + + // get the thing we are looping over + triagens::basics::Json inVariable = cur->getValue(_pos, _inVarRegId)._json; + + // add the new register value and corresponding doc. collection + res->setValue(0, cur->getNrRegs(), + AqlValue(new basics::Json(inVariable.at(_pos).json()))); + res->setDocumentCollection(cur->getNrRegs(), _trx->documentCollection()); + + // advance read position in the current block + if (++_pos == cur->size() ) { + delete cur; + _buffer.pop_front(); + _pos = 0; + } + return res; + } + +//////////////////////////////////////////////////////////////////////////////// +/// @brief getSome +//////////////////////////////////////////////////////////////////////////////// + + /* AqlItemBlock* getSome (size_t atLeast, size_t atMost) { + if (_done) { + return nullptr; + } + + if (_buffer.empty()) { + if (! ExecutionBlock::getBlock(DefaultBatchSize, DefaultBatchSize)) { + _done = true; + return nullptr; + } + _pos = 0; // this is in the first block + _posInAllDocs = 0; // Note that we know _allDocs.size() > 0, + // otherwise _done would be true already + } + + // If we get here, we do have _buffer.front() + AqlItemBlock* cur = _buffer.front(); + size_t const curRegs = cur->getNrRegs(); + + size_t available = _documents.size() - _posInAllDocs; + size_t toSend = std::min(atMost, available); + + auto res = new AqlItemBlock(toSend, _varOverview->nrRegs[_depth]); + TRI_ASSERT(curRegs <= res->getNrRegs()); + + // only copy 1st row of registers inherited from previous frame(s) + for (RegisterId i = 0; i < curRegs; i++) { + res->setValue(0, i, cur->getValue(_pos, i).clone()); + } + res->getDocumentCollections().at(curRegs) + = _trx->documentCollection(); + + for (size_t j = 0; j < toSend; j++) { + if (j > 0) { + // re-use already copied aqlvalues + for (RegisterId i = 0; i < curRegs; i++) { + res->setValue(j, i, res->getValue(0, i)); + } + } + + // The result is in the first variable of this depth, + // we do not need to do a lookup in _varOverview->varInfo, + // but can just take cur->getNrRegs() as registerId: + res->setValue(j, curRegs, AqlValue(reinterpret_cast(_documents[_posInAllDocs++].getDataPtr()))); + } + + // Advance read position: + if (_posInAllDocs >= _documents.size()) { + // we have exhausted our local documents buffer + _posInAllDocs = 0; + + // fetch more documents into our buffer + if (! moreDocuments()) { + // nothing more to read, re-initialize fetching of documents + initDocuments(); + if (++_pos >= cur->size()) { + _buffer.pop_front(); + delete cur; + _pos = 0; + } + } + } + return res; + } */ + + private: + + +//////////////////////////////////////////////////////////////////////////////// +/// @brief current position in the _inVariable +//////////////////////////////////////////////////////////////////////////////// + + size_t _index; + +//////////////////////////////////////////////////////////////////////////////// +/// @brief the register index containing the inVariable of the EnumerateListNode +//////////////////////////////////////////////////////////////////////////////// + + RegisterId _inVarRegId; + +//////////////////////////////////////////////////////////////////////////////// +/// @brief currently ongoing transaction +//////////////////////////////////////////////////////////////////////////////// + + triagens::arango::SingleCollectionReadOnlyTransaction>* _trx; + + }; + // ----------------------------------------------------------------------------- // --SECTION-- CalculationBlock // -----------------------------------------------------------------------------