1
0
Fork 0

Make EnumerateCollectionBlock even more lazy.

This commit is contained in:
Max Neunhoeffer 2014-10-24 14:10:30 +02:00
parent 4848840703
commit 93ceafa686
2 changed files with 72 additions and 34 deletions

View File

@ -644,23 +644,27 @@ EnumerateCollectionBlock::EnumerateCollectionBlock (ExecutionEngine* engine,
: ExecutionBlock(engine, ep), : ExecutionBlock(engine, ep),
_collection(ep->_collection), _collection(ep->_collection),
_totalCount(0), _totalCount(0),
_posInAllDocs(0) { _posInDocuments(0),
_atBeginning(false) {
} }
EnumerateCollectionBlock::~EnumerateCollectionBlock () { EnumerateCollectionBlock::~EnumerateCollectionBlock () {
} }
bool EnumerateCollectionBlock::moreDocuments () { bool EnumerateCollectionBlock::moreDocuments (size_t hint) {
if (_documents.empty()) {
_documents.reserve(DefaultBatchSize); if (hint < DefaultBatchSize) {
hint = DefaultBatchSize;
} }
_documents.clear(); std::vector<TRI_doc_mptr_copy_t> newDocs;
newDocs.reserve(hint);
int res = _trx->readIncremental(_trx->trxCollection(_collection->cid()), int res = _trx->readIncremental(_trx->trxCollection(_collection->cid()),
_documents, newDocs,
_internalSkip, _internalSkip,
static_cast<TRI_voc_size_t>(DefaultBatchSize), static_cast<TRI_voc_size_t>(hint),
0, 0,
TRI_QRY_NO_LIMIT, TRI_QRY_NO_LIMIT,
&_totalCount); &_totalCount);
@ -669,9 +673,18 @@ bool EnumerateCollectionBlock::moreDocuments () {
THROW_ARANGO_EXCEPTION(res); THROW_ARANGO_EXCEPTION(res);
} }
_engine->_stats.scannedFull += static_cast<int64_t>(_documents.size()); _engine->_stats.scannedFull += static_cast<int64_t>(newDocs.size());
return (! _documents.empty()); if (newDocs.empty()) {
return false;
}
_documents.swap(newDocs);
_atBeginning = _internalSkip == 0;
_posInDocuments = 0;
return true;
} }
int EnumerateCollectionBlock::initialize () { int EnumerateCollectionBlock::initialize () {
@ -686,10 +699,6 @@ int EnumerateCollectionBlock::initializeCursor (AqlItemBlock* items, size_t pos)
initializeDocuments(); initializeDocuments();
if (_totalCount == 0) {
_done = true;
}
return TRI_ERROR_NO_ERROR; return TRI_ERROR_NO_ERROR;
} }
@ -699,6 +708,12 @@ int EnumerateCollectionBlock::initializeCursor (AqlItemBlock* items, size_t pos)
AqlItemBlock* EnumerateCollectionBlock::getSome (size_t, // atLeast, AqlItemBlock* EnumerateCollectionBlock::getSome (size_t, // atLeast,
size_t atMost) { size_t atMost) {
// Invariants:
// As soon as we notice that _totalCount == 0, we set _done = true.
// Otherwise, outside of this method (or skipSome), _documents is
// either empty (at the beginning, with _posInDocuments == 0 and
// _atBeginning == false), or is non-empty and
// _posInDocuments < _documents.size()
if (_done) { if (_done) {
return nullptr; return nullptr;
} }
@ -709,15 +724,23 @@ AqlItemBlock* EnumerateCollectionBlock::getSome (size_t, // atLeast,
return nullptr; return nullptr;
} }
_pos = 0; // this is in the first block _pos = 0; // this is in the first block
_posInAllDocs = 0; // Note that we know _allDocs.size() > 0, initializeDocuments();
// otherwise _done would be true already
} }
// If we get here, we do have _buffer.front() // If we get here, we do have _buffer.front()
AqlItemBlock* cur = _buffer.front(); AqlItemBlock* cur = _buffer.front();
size_t const curRegs = cur->getNrRegs(); size_t const curRegs = cur->getNrRegs();
size_t available = _documents.size() - _posInAllDocs; // Get more documents from collection if _documents is empty:
if (_posInDocuments >= _documents.size()) {
if (! moreDocuments(atMost)) {
TRI_ASSERT(_totalCount == 0);
_done = true;
return nullptr;
}
}
size_t available = _documents.size() - _posInDocuments;
size_t toSend = (std::min)(atMost, available); size_t toSend = (std::min)(atMost, available);
unique_ptr<AqlItemBlock> res(new AqlItemBlock(toSend, getPlanNode()->getRegisterPlan()->nrRegs[getPlanNode()->getDepth()])); unique_ptr<AqlItemBlock> res(new AqlItemBlock(toSend, getPlanNode()->getRegisterPlan()->nrRegs[getPlanNode()->getDepth()]));
@ -745,17 +768,15 @@ AqlItemBlock* EnumerateCollectionBlock::getSome (size_t, // atLeast,
// but can just take cur->getNrRegs() as registerId: // but can just take cur->getNrRegs() as registerId:
res->setValue(j, static_cast<triagens::aql::RegisterId>(curRegs), res->setValue(j, static_cast<triagens::aql::RegisterId>(curRegs),
AqlValue(reinterpret_cast<TRI_df_marker_t AqlValue(reinterpret_cast<TRI_df_marker_t
const*>(_documents[_posInAllDocs++].getDataPtr()))); const*>(_documents[_posInDocuments++].getDataPtr())));
// No harm done, if the setValue throws! // No harm done, if the setValue throws!
} }
// Advance read position: // Advance read position:
if (_posInAllDocs >= _documents.size()) { if (_posInDocuments >= _documents.size()) {
// we have exhausted our local documents buffer // we have exhausted our local documents buffer
_posInAllDocs = 0;
// fetch more documents into our buffer // fetch more documents into our buffer
if (! moreDocuments()) { if (! moreDocuments(atMost)) {
// nothing more to read, re-initialize fetching of documents // nothing more to read, re-initialize fetching of documents
initializeDocuments(); initializeDocuments();
if (++_pos >= cur->size()) { if (++_pos >= cur->size()) {
@ -784,19 +805,26 @@ size_t EnumerateCollectionBlock::skipSome (size_t atLeast, size_t atMost) {
return skipped; return skipped;
} }
_pos = 0; // this is in the first block _pos = 0; // this is in the first block
_posInAllDocs = 0; // Note that we know _allDocs.size() > 0, initializeDocuments();
// otherwise _done would be true already
} }
// if we get here, then _buffer.front() exists // if we get here, then _buffer.front() exists
AqlItemBlock* cur = _buffer.front(); AqlItemBlock* cur = _buffer.front();
if (atMost >= skipped + _documents.size() - _posInAllDocs) { // Get more documents from collection if _documents is empty:
skipped += _documents.size() - _posInAllDocs; if (_posInDocuments >= _documents.size()) {
_posInAllDocs = 0; if (! moreDocuments(atMost)) {
TRI_ASSERT(_totalCount == 0);
_done = true;
return skipped;
}
}
if (atMost >= skipped + _documents.size() - _posInDocuments) {
skipped += _documents.size() - _posInDocuments;
// fetch more documents into our buffer // fetch more documents into our buffer
if (! moreDocuments()) { if (! moreDocuments(atMost - skipped)) {
// nothing more to read, re-initialize fetching of documents // nothing more to read, re-initialize fetching of documents
initializeDocuments(); initializeDocuments();
if (++_pos >= cur->size()) { if (++_pos >= cur->size()) {
@ -807,7 +835,7 @@ size_t EnumerateCollectionBlock::skipSome (size_t atLeast, size_t atMost) {
} }
} }
else { else {
_posInAllDocs += atMost - skipped; _posInDocuments += atMost - skipped;
skipped = atMost; skipped = atMost;
} }
} }

View File

@ -442,16 +442,17 @@ namespace triagens {
void initializeDocuments () { void initializeDocuments () {
_internalSkip = 0; _internalSkip = 0;
if (! moreDocuments()) { if (!_atBeginning) {
_done = true; _documents.clear();
} }
_posInDocuments = 0;
} }
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @brief continue fetching of documents /// @brief continue fetching of documents
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
bool moreDocuments (); bool moreDocuments (size_t hint);
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @brief initialize, here we fetch all docs from the database /// @brief initialize, here we fetch all docs from the database
@ -460,11 +461,15 @@ namespace triagens {
int initialize () override; int initialize () override;
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @brief initCursor, here we release our docs from this collection /// @brief initializeCursor
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
int initializeCursor (AqlItemBlock* items, size_t pos) override; int initializeCursor (AqlItemBlock* items, size_t pos) override;
////////////////////////////////////////////////////////////////////////////////
/// @brief getSome
////////////////////////////////////////////////////////////////////////////////
AqlItemBlock* getSome (size_t atLeast, size_t atMost) override; AqlItemBlock* getSome (size_t atLeast, size_t atMost) override;
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
@ -506,11 +511,16 @@ namespace triagens {
std::vector<TRI_doc_mptr_copy_t> _documents; std::vector<TRI_doc_mptr_copy_t> _documents;
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @brief current position in _allDocs /// @brief current position in _documents
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
size_t _posInAllDocs; size_t _posInDocuments;
////////////////////////////////////////////////////////////////////////////////
/// @brief current position in _documents
////////////////////////////////////////////////////////////////////////////////
bool _atBeginning;
}; };
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------