mirror of https://gitee.com/bigwinds/arangodb
cloned EnumerateCollectionBlock into IndexRangeBlock
This commit is contained in:
parent
871351f803
commit
f932e9277c
|
@ -704,6 +704,180 @@ size_t EnumerateCollectionBlock::skipSome (size_t atLeast, size_t atMost) {
|
|||
return skipped;
|
||||
}
|
||||
|
||||
// -----------------------------------------------------------------------------
|
||||
// --SECTION-- class IndexRangeBlock
|
||||
// -----------------------------------------------------------------------------
|
||||
|
||||
IndexRangeBlock::IndexRangeBlock (ExecutionEngine* engine,
|
||||
IndexRangeNode const* ep)
|
||||
: ExecutionBlock(engine, ep),
|
||||
_collection(ep->_collection),
|
||||
_posInAllDocs(0) {
|
||||
}
|
||||
|
||||
IndexRangeBlock::~IndexRangeBlock () {
|
||||
}
|
||||
|
||||
bool IndexRangeBlock::moreDocuments () {
|
||||
if (_documents.empty()) {
|
||||
_documents.reserve(DefaultBatchSize);
|
||||
}
|
||||
|
||||
_documents.clear();
|
||||
|
||||
int res = _trx->readIncremental(_trx->trxCollection(_collection->cid()),
|
||||
_documents,
|
||||
_internalSkip,
|
||||
static_cast<TRI_voc_size_t>(DefaultBatchSize),
|
||||
0,
|
||||
TRI_QRY_NO_LIMIT,
|
||||
&_totalCount);
|
||||
|
||||
if (res != TRI_ERROR_NO_ERROR) {
|
||||
THROW_ARANGO_EXCEPTION(res);
|
||||
}
|
||||
|
||||
return (! _documents.empty());
|
||||
}
|
||||
|
||||
int IndexRangeBlock::initialize () {
|
||||
return ExecutionBlock::initialize();
|
||||
}
|
||||
|
||||
int IndexRangeBlock::initCursor (AqlItemBlock* items, size_t pos) {
|
||||
int res = ExecutionBlock::initCursor(items, pos);
|
||||
if (res != TRI_ERROR_NO_ERROR) {
|
||||
return res;
|
||||
}
|
||||
|
||||
initDocuments();
|
||||
|
||||
if (_totalCount == 0) {
|
||||
_done = true;
|
||||
}
|
||||
|
||||
return TRI_ERROR_NO_ERROR;
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief getSome
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
AqlItemBlock* IndexRangeBlock::getSome (size_t atLeast,
|
||||
size_t atMost) {
|
||||
if (_done) {
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
if (_buffer.empty()) {
|
||||
if (! ExecutionBlock::getBlock(DefaultBatchSize, DefaultBatchSize)) {
|
||||
_done = true;
|
||||
return nullptr;
|
||||
}
|
||||
_pos = 0; // this is in the first block
|
||||
_posInAllDocs = 0; // Note that we know _allDocs.size() > 0,
|
||||
// otherwise _done would be true already
|
||||
}
|
||||
|
||||
// If we get here, we do have _buffer.front()
|
||||
AqlItemBlock* cur = _buffer.front();
|
||||
size_t const curRegs = cur->getNrRegs();
|
||||
|
||||
size_t available = _documents.size() - _posInAllDocs;
|
||||
size_t toSend = std::min(atMost, available);
|
||||
|
||||
unique_ptr<AqlItemBlock> res(new AqlItemBlock(toSend, _varOverview->nrRegs[_depth]));
|
||||
// automatically freed if we throw
|
||||
TRI_ASSERT(curRegs <= res->getNrRegs());
|
||||
|
||||
// only copy 1st row of registers inherited from previous frame(s)
|
||||
inheritRegisters(cur, res.get(), _pos);
|
||||
|
||||
// set our collection for our output register
|
||||
res->setDocumentCollection(curRegs, _trx->documentCollection(_collection->cid()));
|
||||
|
||||
for (size_t j = 0; j < toSend; j++) {
|
||||
if (j > 0) {
|
||||
// re-use already copied aqlvalues
|
||||
for (RegisterId i = 0; i < curRegs; i++) {
|
||||
res->setValue(j, i, res->getValue(0, i));
|
||||
// Note: if this throws, then all values will be deleted
|
||||
// properly since the first one is.
|
||||
}
|
||||
}
|
||||
|
||||
// The result is in the first variable of this depth,
|
||||
// we do not need to do a lookup in _varOverview->varInfo,
|
||||
// but can just take cur->getNrRegs() as registerId:
|
||||
res->setValue(j, curRegs,
|
||||
AqlValue(reinterpret_cast<TRI_df_marker_t
|
||||
const*>(_documents[_posInAllDocs++].getDataPtr())));
|
||||
// No harm done, if the setValue throws!
|
||||
}
|
||||
|
||||
// Advance read position:
|
||||
if (_posInAllDocs >= _documents.size()) {
|
||||
// we have exhausted our local documents buffer
|
||||
_posInAllDocs = 0;
|
||||
|
||||
// fetch more documents into our buffer
|
||||
if (! moreDocuments()) {
|
||||
// nothing more to read, re-initialize fetching of documents
|
||||
initDocuments();
|
||||
if (++_pos >= cur->size()) {
|
||||
_buffer.pop_front(); // does not throw
|
||||
delete cur;
|
||||
_pos = 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
return res.release();
|
||||
}
|
||||
|
||||
size_t IndexRangeBlock::skipSome (size_t atLeast, size_t atMost) {
|
||||
|
||||
size_t skipped = 0;
|
||||
|
||||
if (_done) {
|
||||
return skipped;
|
||||
}
|
||||
|
||||
while (skipped < atLeast) {
|
||||
if (_buffer.empty()) {
|
||||
if (! getBlock(DefaultBatchSize, DefaultBatchSize)) {
|
||||
_done = true;
|
||||
return skipped;
|
||||
}
|
||||
_pos = 0; // this is in the first block
|
||||
_posInAllDocs = 0; // Note that we know _allDocs.size() > 0,
|
||||
// otherwise _done would be true already
|
||||
}
|
||||
|
||||
// if we get here, then _buffer.front() exists
|
||||
AqlItemBlock* cur = _buffer.front();
|
||||
|
||||
if (atMost >= skipped + _documents.size() - _posInAllDocs) {
|
||||
skipped += _documents.size() - _posInAllDocs;
|
||||
_posInAllDocs = 0;
|
||||
|
||||
// fetch more documents into our buffer
|
||||
if (! moreDocuments()) {
|
||||
// nothing more to read, re-initialize fetching of documents
|
||||
initDocuments();
|
||||
if (++_pos >= cur->size()) {
|
||||
_buffer.pop_front(); // does not throw
|
||||
delete cur;
|
||||
_pos = 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
else {
|
||||
_posInAllDocs += atMost - skipped;
|
||||
skipped = atMost;
|
||||
}
|
||||
}
|
||||
return skipped;
|
||||
}
|
||||
|
||||
// -----------------------------------------------------------------------------
|
||||
// --SECTION-- class EnumerateListBlock
|
||||
|
|
|
@ -561,6 +561,96 @@ namespace triagens {
|
|||
|
||||
};
|
||||
|
||||
// -----------------------------------------------------------------------------
|
||||
// --SECTION-- IndexRangeBlock
|
||||
// -----------------------------------------------------------------------------
|
||||
|
||||
class IndexRangeBlock : public ExecutionBlock {
|
||||
|
||||
public:
|
||||
|
||||
IndexRangeBlock (ExecutionEngine* engine,
|
||||
IndexRangeNode const* ep);
|
||||
|
||||
~IndexRangeBlock ();
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief initialize fetching of documents
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
void initDocuments () {
|
||||
_internalSkip = 0;
|
||||
if (! moreDocuments()) {
|
||||
_done = true;
|
||||
}
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief continue fetching of documents
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
bool moreDocuments ();
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief initialize, here we fetch all docs from the database
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
int initialize ();
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief initCursor, here we release our docs from this collection
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
int initCursor (AqlItemBlock* items, size_t pos);
|
||||
|
||||
AqlItemBlock* getSome (size_t atLeast, size_t atMost);
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
// skip between atLeast and atMost, returns the number actually skipped . . .
|
||||
// will only return less than atLeast if there aren't atLeast many
|
||||
// things to skip overall.
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
size_t skipSome (size_t atLeast, size_t atMost);
|
||||
|
||||
// -----------------------------------------------------------------------------
|
||||
// --SECTION-- private variables
|
||||
// -----------------------------------------------------------------------------
|
||||
|
||||
private:
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief collection
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
Collection* _collection;
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief total number of documents in the collection
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
uint32_t _totalCount;
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief internal skip value
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
TRI_voc_size_t _internalSkip;
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief document buffer
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
std::vector<TRI_doc_mptr_copy_t> _documents;
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief current position in _allDocs
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
size_t _posInAllDocs;
|
||||
|
||||
};
|
||||
|
||||
// -----------------------------------------------------------------------------
|
||||
// --SECTION-- EnumerateListBlock
|
||||
// -----------------------------------------------------------------------------
|
||||
|
|
|
@ -93,6 +93,11 @@ struct Instanciator : public WalkerWorker<ExecutionNode> {
|
|||
static_cast<SingletonNode const*>(en));
|
||||
break;
|
||||
}
|
||||
case ExecutionNode::INDEX_RANGE: {
|
||||
eb = new IndexRangeBlock(engine,
|
||||
static_cast<IndexRangeNode const*>(en));
|
||||
break;
|
||||
}
|
||||
case ExecutionNode::ENUMERATE_COLLECTION: {
|
||||
eb = new EnumerateCollectionBlock(engine,
|
||||
static_cast<EnumerateCollectionNode const*>(en));
|
||||
|
|
Loading…
Reference in New Issue