diff --git a/arangod/Aql/ClusterBlocks.cpp b/arangod/Aql/ClusterBlocks.cpp index 45a4f9c929..311cc363b0 100644 --- a/arangod/Aql/ClusterBlocks.cpp +++ b/arangod/Aql/ClusterBlocks.cpp @@ -61,7 +61,7 @@ GatherBlock::GatherBlock(ExecutionEngine* engine, GatherNode const* en) : ExecutionBlock(engine, en), _sortRegisters(), _isSimple(en->getElements().empty()), - _heap(en->_sortmode == 'h' ? new Heap : nullptr ) { + _heap(en->_sortmode == 'h' ? new Heap : nullptr) { if (!_isSimple) { for (auto const& p : en->getElements()) { @@ -155,13 +155,16 @@ int GatherBlock::initializeCursor(AqlItemBlock* items, size_t pos) { } _gatherBlockBuffer.clear(); _gatherBlockPos.clear(); - _gatherBlockBuffer.reserve(_dependencies.size()); _gatherBlockPos.reserve(_dependencies.size()); for (size_t i = 0; i < _dependencies.size(); i++) { _gatherBlockBuffer.emplace_back(); _gatherBlockPos.emplace_back(std::make_pair(i, 0)); } + + if (_heap) { + _heap->clear(); + } } if (_dependencies.empty()) { @@ -276,21 +279,22 @@ AqlItemBlock* GatherBlock::getSome(size_t atLeast, size_t atMost) { TRI_ASSERT(_gatherBlockBuffer.size() == _dependencies.size()); TRI_ASSERT(_gatherBlockBuffer.size() == _gatherBlockPos.size()); - for (size_t i = 0; i < _dependencies.size(); i++) { - if (_gatherBlockBuffer.at(i).empty()) { + for (size_t i = 0; i < _dependencies.size(); ++i) { + if (_gatherBlockBuffer[i].empty()) { if (getBlock(i, atLeast, atMost)) { index = i; - _gatherBlockPos.at(i) = std::make_pair(i, 0); + _gatherBlockPos[i] = std::make_pair(i, 0); } } else { index = i; } - auto const& cur = _gatherBlockBuffer.at(i); + auto const& cur = _gatherBlockBuffer[i]; if (!cur.empty()) { - available += cur.at(0)->size() - _gatherBlockPos.at(i).second; - for (size_t j = 1; j < cur.size(); j++) { - available += cur.at(j)->size(); + TRI_ASSERT(cur[0]->size() >= _gatherBlockPos[i].second); + available += cur[0]->size() - _gatherBlockPos[i].second; + for (size_t j = 1; j < cur.size(); ++j) { + available += cur[j]->size(); } } } @@ -308,27 +312,27 @@ AqlItemBlock* GatherBlock::getSome(size_t atLeast, size_t atMost) { // comparison function OurLessThan ourLessThan(_trx, _gatherBlockBuffer, _sortRegisters); - auto ourGreater = [&ourLessThan](std::pair& a - ,std::pair& b){ - return ourLessThan(b,a); + auto ourGreater = [&ourLessThan](std::pair& a, std::pair& b) { + return ourLessThan(b, a); }; + TRI_ASSERT(!_gatherBlockBuffer.at(index).empty()); AqlItemBlock* example = _gatherBlockBuffer.at(index).front(); size_t nrRegs = example->getNrRegs(); // automatically deleted if things go wrong std::unique_ptr res(requestBlock(toSend, static_cast(nrRegs))); - if (_heap && _heap->size() !=_dependencies.size() ){ + if (_heap && _heap->size() != _dependencies.size()) { auto& heap = *_heap; - std::copy(_gatherBlockPos.begin(),_gatherBlockPos.end(),std::back_inserter(heap)); - std::make_heap(heap.begin(), heap.end(),ourGreater); + std::copy(_gatherBlockPos.begin(), _gatherBlockPos.end(), std::back_inserter(heap)); + std::make_heap(heap.begin(), heap.end(), ourGreater); } for (size_t i = 0; i < toSend; i++) { // get the next smallest row from the buffer . . . std::pair val; - if(_heap){ + if (_heap) { val = _heap->front(); } else { val = *(std::min_element( _gatherBlockPos.begin(), _gatherBlockPos.end(), ourLessThan)); @@ -336,7 +340,8 @@ AqlItemBlock* GatherBlock::getSome(size_t atLeast, size_t atMost) { // copy the row in to the outgoing block . . . for (RegisterId col = 0; col < nrRegs; col++) { - AqlValue const& x( _gatherBlockBuffer.at(val.first).front()->getValueReference(val.second, col)); + TRI_ASSERT(!_gatherBlockBuffer[val.first].empty()); + AqlValue const& x(_gatherBlockBuffer[val.first].front()->getValueReference(val.second, col)); if (!x.isEmpty()) { auto it = cache.find(x); @@ -356,24 +361,25 @@ AqlItemBlock* GatherBlock::getSome(size_t atLeast, size_t atMost) { } _gatherBlockPos.at(val.first).second++; - if(_heap){ + if (_heap) { auto& heap = *_heap; - std::pop_heap(heap.begin(), heap.end(),ourGreater); // remove element from heap but not from vector + std::pop_heap(heap.begin(), heap.end(), ourGreater); // remove element from heap but not from vector heap.back().second++; //advance position in itemblock of removed element before it is re-inserted later } // renew the _gatherBlockPos and clean up the buffer if necessary - if ( _gatherBlockPos.at(val.first).second == _gatherBlockBuffer.at(val.first).front()->size() ) { - AqlItemBlock* cur = _gatherBlockBuffer.at(val.first).front(); + if (_gatherBlockPos.at(val.first).second == _gatherBlockBuffer.at(val.first).front()->size()) { + TRI_ASSERT(!_gatherBlockBuffer[val.first].empty()); + AqlItemBlock* cur = _gatherBlockBuffer[val.first].front(); returnBlock(cur); - _gatherBlockBuffer.at(val.first).pop_front(); - _gatherBlockPos.at(val.first) = {val.first, 0}; // .second = 0 ? + _gatherBlockBuffer[val.first].pop_front(); + _gatherBlockPos[val.first] = {val.first, 0}; - if( _heap) { + if (_heap) { _heap->back().second = 0; } - if (_gatherBlockBuffer.at(val.first).empty()) { + if (_gatherBlockBuffer[val.first].empty()) { // if we pulled everything from the buffer, we need to fetch // more data for the shard for which we have no more local // values. @@ -384,8 +390,8 @@ AqlItemBlock* GatherBlock::getSome(size_t atLeast, size_t atMost) { } } - if(_heap) { - std::push_heap(_heap->begin(), _heap->end(),ourGreater); //re-insert element + if (_heap) { + std::push_heap(_heap->begin(), _heap->end(), ourGreater); //re-insert element } } @@ -408,7 +414,7 @@ size_t GatherBlock::skipSome(size_t atLeast, size_t atMost) { auto skipped = _dependencies.at(_atDep)->skipSome(atLeast, atMost); while (skipped == 0 && _atDep < _dependencies.size() - 1) { _atDep++; - skipped = _dependencies.at(_atDep)->skipSome(atLeast, atMost); + skipped = _dependencies[_atDep]->skipSome(atLeast, atMost); } if (skipped == 0) { _done = true; @@ -422,9 +428,9 @@ size_t GatherBlock::skipSome(size_t atLeast, size_t atMost) { // pull more blocks from dependencies . . . for (size_t i = 0; i < _dependencies.size(); i++) { - if (_gatherBlockBuffer.at(i).empty()) { + if (_gatherBlockBuffer[i].empty()) { if (getBlock(i, atLeast, atMost)) { - _gatherBlockPos.at(i) = std::make_pair(i, 0); + _gatherBlockPos[i] = std::make_pair(i, 0); } } @@ -453,13 +459,14 @@ size_t GatherBlock::skipSome(size_t atLeast, size_t atMost) { _gatherBlockPos.begin(), _gatherBlockPos.end(), ourLessThan)); // renew the _gatherBlockPos and clean up the buffer if necessary - _gatherBlockPos.at(val.first).second++; - if (_gatherBlockPos.at(val.first).second == - _gatherBlockBuffer.at(val.first).front()->size()) { - AqlItemBlock* cur = _gatherBlockBuffer.at(val.first).front(); + _gatherBlockPos[val.first].second++; + if (_gatherBlockPos[val.first].second == + _gatherBlockBuffer[val.first].front()->size()) { + TRI_ASSERT(!_gatherBlockBuffer[val.first].empty()); + AqlItemBlock* cur = _gatherBlockBuffer[val.first].front(); returnBlock(cur); - _gatherBlockBuffer.at(val.first).pop_front(); - _gatherBlockPos.at(val.first) = std::make_pair(val.first, 0); + _gatherBlockBuffer[val.first].pop_front(); + _gatherBlockPos[val.first] = std::make_pair(val.first, 0); } } @@ -477,7 +484,7 @@ bool GatherBlock::getBlock(size_t i, size_t atLeast, size_t atMost) { TRI_ASSERT(!_isSimple); std::unique_ptr docs(_dependencies.at(i)->getSome(atLeast, atMost)); - if (docs != nullptr) { + if (docs != nullptr && docs->size() > 0) { _gatherBlockBuffer.at(i).emplace_back(docs.get()); docs.release(); return true; @@ -499,6 +506,8 @@ bool GatherBlock::OurLessThan::operator()(std::pair const& a, if (_gatherBlockBuffer[b.first].empty()) { return true; } + TRI_ASSERT(!_gatherBlockBuffer[a.first].empty()); + TRI_ASSERT(!_gatherBlockBuffer[b.first].empty()); for (auto const& reg : _sortRegisters) { // Fast path if there is no attributePath: diff --git a/arangod/Aql/ClusterBlocks.h b/arangod/Aql/ClusterBlocks.h index 51d62aca77..3f8fc312b9 100644 --- a/arangod/Aql/ClusterBlocks.h +++ b/arangod/Aql/ClusterBlocks.h @@ -34,7 +34,6 @@ namespace arangodb { namespace transaction { class Methods; } -; struct ClusterCommResult; namespace aql { @@ -117,7 +116,7 @@ class GatherBlock : public ExecutionBlock { std::vector>& _gatherBlockBuffer; std::vector& _sortRegisters; }; - using Heap = std::vector>; + using Heap = std::vector>; std::unique_ptr _heap; };