mirror of https://gitee.com/bigwinds/arangodb
first version of EnumerateListBlock.
It is missing the method for getSome, and only works for json lists.
This commit is contained in:
parent
7c7901fd5b
commit
2ac9adcf1c
|
@ -1032,6 +1032,237 @@ namespace triagens {
|
||||||
|
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// -----------------------------------------------------------------------------
|
||||||
|
// --SECTION-- EnumerateListBlock
|
||||||
|
// -----------------------------------------------------------------------------
|
||||||
|
|
||||||
|
class EnumerateListBlock : public ExecutionBlock {
|
||||||
|
|
||||||
|
public:
|
||||||
|
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
/// @brief constructor
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
EnumerateListBlock (AQL_TRANSACTION_V8* trx,
|
||||||
|
EnumerateListNode const* ep)
|
||||||
|
: ExecutionBlock(trx, ep), _trx(nullptr) {
|
||||||
|
|
||||||
|
int res = _trx->begin();
|
||||||
|
|
||||||
|
if (res != TRI_ERROR_NO_ERROR) {
|
||||||
|
// transaction failure
|
||||||
|
delete _trx;
|
||||||
|
THROW_ARANGO_EXCEPTION(res);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
/// @brief destructor
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
~EnumerateListBlock () {
|
||||||
|
if (_trx != nullptr) {
|
||||||
|
// finalize our own transaction
|
||||||
|
delete _trx;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
/// @brief initialize fetching of documents
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
// void initDocuments () {
|
||||||
|
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
/// @brief continue fetching of documents
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
//bool moreDocuments () {
|
||||||
|
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
/// @brief initialize, here we get the inVariable
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
int initialize () {
|
||||||
|
int res = ExecutionBlock::initialize();
|
||||||
|
|
||||||
|
if (res != TRI_ERROR_NO_ERROR) {
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
|
||||||
|
auto en = reinterpret_cast<EnumerateListNode const*>(_exeNode);
|
||||||
|
|
||||||
|
// get the inVariable register id . . .
|
||||||
|
// staticAnalysis has been run, so _varOverview is set up
|
||||||
|
|
||||||
|
auto it = _varOverview->varInfo.find(en->_inVariable->id);
|
||||||
|
if (it == _varOverview->varInfo.end()){
|
||||||
|
THROW_ARANGO_EXCEPTION(TRI_ERROR_INTERNAL);
|
||||||
|
}
|
||||||
|
|
||||||
|
_inVarRegId = (*it).second.registerId;
|
||||||
|
|
||||||
|
_index = 0; // index in _inVariable for current run
|
||||||
|
|
||||||
|
return TRI_ERROR_NO_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
/// @brief execute, here we release our docs from this collection
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
int execute () {
|
||||||
|
int res = ExecutionBlock::execute();
|
||||||
|
|
||||||
|
if (res != TRI_ERROR_NO_ERROR) {
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
|
||||||
|
// handle local data (if any)
|
||||||
|
|
||||||
|
return TRI_ERROR_NO_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
/// @brief getOne
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
AqlItemBlock* getOne () {
|
||||||
|
|
||||||
|
if (_done) {
|
||||||
|
return nullptr;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (_buffer.empty()) {
|
||||||
|
if (! ExecutionBlock::getBlock(1, DefaultBatchSize)) {
|
||||||
|
_done = true;
|
||||||
|
return nullptr;
|
||||||
|
}
|
||||||
|
_pos = 0; // this is in the first block
|
||||||
|
}
|
||||||
|
|
||||||
|
// if we make it here, then _buffer.front() exists
|
||||||
|
AqlItemBlock* cur = _buffer.front();
|
||||||
|
|
||||||
|
// copy stuff from the incoming block . . .
|
||||||
|
auto res = new AqlItemBlock(1, _varOverview->nrRegs[_depth]);
|
||||||
|
TRI_ASSERT(cur->getNrRegs() <= res->getNrRegs());
|
||||||
|
for (RegisterId i = 0; i < cur->getNrRegs(); i++) {
|
||||||
|
res->setValue(0, i, cur->getValue(_pos, i).clone());
|
||||||
|
res->setDocumentCollection(i, cur->getDocumentCollection(i));
|
||||||
|
}
|
||||||
|
|
||||||
|
// get the thing we are looping over
|
||||||
|
triagens::basics::Json inVariable = cur->getValue(_pos, _inVarRegId)._json;
|
||||||
|
|
||||||
|
// add the new register value and corresponding doc. collection
|
||||||
|
res->setValue(0, cur->getNrRegs(),
|
||||||
|
AqlValue(new basics::Json(inVariable.at(_pos).json())));
|
||||||
|
res->setDocumentCollection(cur->getNrRegs(), _trx->documentCollection());
|
||||||
|
|
||||||
|
// advance read position in the current block
|
||||||
|
if (++_pos == cur->size() ) {
|
||||||
|
delete cur;
|
||||||
|
_buffer.pop_front();
|
||||||
|
_pos = 0;
|
||||||
|
}
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
/// @brief getSome
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
/* AqlItemBlock* getSome (size_t atLeast, size_t atMost) {
|
||||||
|
if (_done) {
|
||||||
|
return nullptr;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (_buffer.empty()) {
|
||||||
|
if (! ExecutionBlock::getBlock(DefaultBatchSize, DefaultBatchSize)) {
|
||||||
|
_done = true;
|
||||||
|
return nullptr;
|
||||||
|
}
|
||||||
|
_pos = 0; // this is in the first block
|
||||||
|
_posInAllDocs = 0; // Note that we know _allDocs.size() > 0,
|
||||||
|
// otherwise _done would be true already
|
||||||
|
}
|
||||||
|
|
||||||
|
// If we get here, we do have _buffer.front()
|
||||||
|
AqlItemBlock* cur = _buffer.front();
|
||||||
|
size_t const curRegs = cur->getNrRegs();
|
||||||
|
|
||||||
|
size_t available = _documents.size() - _posInAllDocs;
|
||||||
|
size_t toSend = std::min(atMost, available);
|
||||||
|
|
||||||
|
auto res = new AqlItemBlock(toSend, _varOverview->nrRegs[_depth]);
|
||||||
|
TRI_ASSERT(curRegs <= res->getNrRegs());
|
||||||
|
|
||||||
|
// only copy 1st row of registers inherited from previous frame(s)
|
||||||
|
for (RegisterId i = 0; i < curRegs; i++) {
|
||||||
|
res->setValue(0, i, cur->getValue(_pos, i).clone());
|
||||||
|
}
|
||||||
|
res->getDocumentCollections().at(curRegs)
|
||||||
|
= _trx->documentCollection();
|
||||||
|
|
||||||
|
for (size_t j = 0; j < toSend; j++) {
|
||||||
|
if (j > 0) {
|
||||||
|
// re-use already copied aqlvalues
|
||||||
|
for (RegisterId i = 0; i < curRegs; i++) {
|
||||||
|
res->setValue(j, i, res->getValue(0, i));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// The result is in the first variable of this depth,
|
||||||
|
// we do not need to do a lookup in _varOverview->varInfo,
|
||||||
|
// but can just take cur->getNrRegs() as registerId:
|
||||||
|
res->setValue(j, curRegs, AqlValue(reinterpret_cast<TRI_df_marker_t const*>(_documents[_posInAllDocs++].getDataPtr())));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Advance read position:
|
||||||
|
if (_posInAllDocs >= _documents.size()) {
|
||||||
|
// we have exhausted our local documents buffer
|
||||||
|
_posInAllDocs = 0;
|
||||||
|
|
||||||
|
// fetch more documents into our buffer
|
||||||
|
if (! moreDocuments()) {
|
||||||
|
// nothing more to read, re-initialize fetching of documents
|
||||||
|
initDocuments();
|
||||||
|
if (++_pos >= cur->size()) {
|
||||||
|
_buffer.pop_front();
|
||||||
|
delete cur;
|
||||||
|
_pos = 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return res;
|
||||||
|
} */
|
||||||
|
|
||||||
|
private:
|
||||||
|
|
||||||
|
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
/// @brief current position in the _inVariable
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
size_t _index;
|
||||||
|
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
/// @brief the register index containing the inVariable of the EnumerateListNode
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
RegisterId _inVarRegId;
|
||||||
|
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
/// @brief currently ongoing transaction
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
triagens::arango::SingleCollectionReadOnlyTransaction<triagens::arango::V8TransactionContext<true>>* _trx;
|
||||||
|
|
||||||
|
};
|
||||||
|
|
||||||
// -----------------------------------------------------------------------------
|
// -----------------------------------------------------------------------------
|
||||||
// --SECTION-- CalculationBlock
|
// --SECTION-- CalculationBlock
|
||||||
// -----------------------------------------------------------------------------
|
// -----------------------------------------------------------------------------
|
||||||
|
|
Loading…
Reference in New Issue