1
0
Fork 0

fix heap sort in GatherBlock (#3908)

This commit is contained in:
Jan 2017-12-07 10:34:44 +01:00 committed by Frank Celler
parent 7aed168a0d
commit 1789977d51
2 changed files with 47 additions and 39 deletions

View File

@ -61,7 +61,7 @@ GatherBlock::GatherBlock(ExecutionEngine* engine, GatherNode const* en)
: ExecutionBlock(engine, en), : ExecutionBlock(engine, en),
_sortRegisters(), _sortRegisters(),
_isSimple(en->getElements().empty()), _isSimple(en->getElements().empty()),
_heap(en->_sortmode == 'h' ? new Heap : nullptr ) { _heap(en->_sortmode == 'h' ? new Heap : nullptr) {
if (!_isSimple) { if (!_isSimple) {
for (auto const& p : en->getElements()) { for (auto const& p : en->getElements()) {
@ -155,13 +155,16 @@ int GatherBlock::initializeCursor(AqlItemBlock* items, size_t pos) {
} }
_gatherBlockBuffer.clear(); _gatherBlockBuffer.clear();
_gatherBlockPos.clear(); _gatherBlockPos.clear();
_gatherBlockBuffer.reserve(_dependencies.size()); _gatherBlockBuffer.reserve(_dependencies.size());
_gatherBlockPos.reserve(_dependencies.size()); _gatherBlockPos.reserve(_dependencies.size());
for (size_t i = 0; i < _dependencies.size(); i++) { for (size_t i = 0; i < _dependencies.size(); i++) {
_gatherBlockBuffer.emplace_back(); _gatherBlockBuffer.emplace_back();
_gatherBlockPos.emplace_back(std::make_pair(i, 0)); _gatherBlockPos.emplace_back(std::make_pair(i, 0));
} }
if (_heap) {
_heap->clear();
}
} }
if (_dependencies.empty()) { if (_dependencies.empty()) {
@ -276,21 +279,22 @@ AqlItemBlock* GatherBlock::getSome(size_t atLeast, size_t atMost) {
TRI_ASSERT(_gatherBlockBuffer.size() == _dependencies.size()); TRI_ASSERT(_gatherBlockBuffer.size() == _dependencies.size());
TRI_ASSERT(_gatherBlockBuffer.size() == _gatherBlockPos.size()); TRI_ASSERT(_gatherBlockBuffer.size() == _gatherBlockPos.size());
for (size_t i = 0; i < _dependencies.size(); i++) { for (size_t i = 0; i < _dependencies.size(); ++i) {
if (_gatherBlockBuffer.at(i).empty()) { if (_gatherBlockBuffer[i].empty()) {
if (getBlock(i, atLeast, atMost)) { if (getBlock(i, atLeast, atMost)) {
index = i; index = i;
_gatherBlockPos.at(i) = std::make_pair(i, 0); _gatherBlockPos[i] = std::make_pair(i, 0);
} }
} else { } else {
index = i; index = i;
} }
auto const& cur = _gatherBlockBuffer.at(i); auto const& cur = _gatherBlockBuffer[i];
if (!cur.empty()) { if (!cur.empty()) {
available += cur.at(0)->size() - _gatherBlockPos.at(i).second; TRI_ASSERT(cur[0]->size() >= _gatherBlockPos[i].second);
for (size_t j = 1; j < cur.size(); j++) { available += cur[0]->size() - _gatherBlockPos[i].second;
available += cur.at(j)->size(); for (size_t j = 1; j < cur.size(); ++j) {
available += cur[j]->size();
} }
} }
} }
@ -308,27 +312,27 @@ AqlItemBlock* GatherBlock::getSome(size_t atLeast, size_t atMost) {
// comparison function // comparison function
OurLessThan ourLessThan(_trx, _gatherBlockBuffer, _sortRegisters); OurLessThan ourLessThan(_trx, _gatherBlockBuffer, _sortRegisters);
auto ourGreater = [&ourLessThan](std::pair<std::size_t, std::size_t>& a auto ourGreater = [&ourLessThan](std::pair<std::size_t, std::size_t>& a, std::pair<std::size_t, std::size_t>& b) {
,std::pair<std::size_t, std::size_t>& b){ return ourLessThan(b, a);
return ourLessThan(b,a);
}; };
TRI_ASSERT(!_gatherBlockBuffer.at(index).empty());
AqlItemBlock* example = _gatherBlockBuffer.at(index).front(); AqlItemBlock* example = _gatherBlockBuffer.at(index).front();
size_t nrRegs = example->getNrRegs(); size_t nrRegs = example->getNrRegs();
// automatically deleted if things go wrong // automatically deleted if things go wrong
std::unique_ptr<AqlItemBlock> res(requestBlock(toSend, static_cast<arangodb::aql::RegisterId>(nrRegs))); std::unique_ptr<AqlItemBlock> res(requestBlock(toSend, static_cast<arangodb::aql::RegisterId>(nrRegs)));
if (_heap && _heap->size() !=_dependencies.size() ){ if (_heap && _heap->size() != _dependencies.size()) {
auto& heap = *_heap; auto& heap = *_heap;
std::copy(_gatherBlockPos.begin(),_gatherBlockPos.end(),std::back_inserter(heap)); std::copy(_gatherBlockPos.begin(), _gatherBlockPos.end(), std::back_inserter(heap));
std::make_heap(heap.begin(), heap.end(),ourGreater); std::make_heap(heap.begin(), heap.end(), ourGreater);
} }
for (size_t i = 0; i < toSend; i++) { for (size_t i = 0; i < toSend; i++) {
// get the next smallest row from the buffer . . . // get the next smallest row from the buffer . . .
std::pair<size_t, size_t> val; std::pair<size_t, size_t> val;
if(_heap){ if (_heap) {
val = _heap->front(); val = _heap->front();
} else { } else {
val = *(std::min_element( _gatherBlockPos.begin(), _gatherBlockPos.end(), ourLessThan)); val = *(std::min_element( _gatherBlockPos.begin(), _gatherBlockPos.end(), ourLessThan));
@ -336,7 +340,8 @@ AqlItemBlock* GatherBlock::getSome(size_t atLeast, size_t atMost) {
// copy the row in to the outgoing block . . . // copy the row in to the outgoing block . . .
for (RegisterId col = 0; col < nrRegs; col++) { for (RegisterId col = 0; col < nrRegs; col++) {
AqlValue const& x( _gatherBlockBuffer.at(val.first).front()->getValueReference(val.second, col)); TRI_ASSERT(!_gatherBlockBuffer[val.first].empty());
AqlValue const& x(_gatherBlockBuffer[val.first].front()->getValueReference(val.second, col));
if (!x.isEmpty()) { if (!x.isEmpty()) {
auto it = cache.find(x); auto it = cache.find(x);
@ -356,24 +361,25 @@ AqlItemBlock* GatherBlock::getSome(size_t atLeast, size_t atMost) {
} }
_gatherBlockPos.at(val.first).second++; _gatherBlockPos.at(val.first).second++;
if(_heap){ if (_heap) {
auto& heap = *_heap; auto& heap = *_heap;
std::pop_heap(heap.begin(), heap.end(),ourGreater); // remove element from heap but not from vector std::pop_heap(heap.begin(), heap.end(), ourGreater); // remove element from heap but not from vector
heap.back().second++; //advance position in itemblock of removed element before it is re-inserted later heap.back().second++; //advance position in itemblock of removed element before it is re-inserted later
} }
// renew the _gatherBlockPos and clean up the buffer if necessary // renew the _gatherBlockPos and clean up the buffer if necessary
if ( _gatherBlockPos.at(val.first).second == _gatherBlockBuffer.at(val.first).front()->size() ) { if (_gatherBlockPos.at(val.first).second == _gatherBlockBuffer.at(val.first).front()->size()) {
AqlItemBlock* cur = _gatherBlockBuffer.at(val.first).front(); TRI_ASSERT(!_gatherBlockBuffer[val.first].empty());
AqlItemBlock* cur = _gatherBlockBuffer[val.first].front();
returnBlock(cur); returnBlock(cur);
_gatherBlockBuffer.at(val.first).pop_front(); _gatherBlockBuffer[val.first].pop_front();
_gatherBlockPos.at(val.first) = {val.first, 0}; // .second = 0 ? _gatherBlockPos[val.first] = {val.first, 0};
if( _heap) { if (_heap) {
_heap->back().second = 0; _heap->back().second = 0;
} }
if (_gatherBlockBuffer.at(val.first).empty()) { if (_gatherBlockBuffer[val.first].empty()) {
// if we pulled everything from the buffer, we need to fetch // if we pulled everything from the buffer, we need to fetch
// more data for the shard for which we have no more local // more data for the shard for which we have no more local
// values. // values.
@ -384,8 +390,8 @@ AqlItemBlock* GatherBlock::getSome(size_t atLeast, size_t atMost) {
} }
} }
if(_heap) { if (_heap) {
std::push_heap(_heap->begin(), _heap->end(),ourGreater); //re-insert element std::push_heap(_heap->begin(), _heap->end(), ourGreater); //re-insert element
} }
} }
@ -408,7 +414,7 @@ size_t GatherBlock::skipSome(size_t atLeast, size_t atMost) {
auto skipped = _dependencies.at(_atDep)->skipSome(atLeast, atMost); auto skipped = _dependencies.at(_atDep)->skipSome(atLeast, atMost);
while (skipped == 0 && _atDep < _dependencies.size() - 1) { while (skipped == 0 && _atDep < _dependencies.size() - 1) {
_atDep++; _atDep++;
skipped = _dependencies.at(_atDep)->skipSome(atLeast, atMost); skipped = _dependencies[_atDep]->skipSome(atLeast, atMost);
} }
if (skipped == 0) { if (skipped == 0) {
_done = true; _done = true;
@ -422,9 +428,9 @@ size_t GatherBlock::skipSome(size_t atLeast, size_t atMost) {
// pull more blocks from dependencies . . . // pull more blocks from dependencies . . .
for (size_t i = 0; i < _dependencies.size(); i++) { for (size_t i = 0; i < _dependencies.size(); i++) {
if (_gatherBlockBuffer.at(i).empty()) { if (_gatherBlockBuffer[i].empty()) {
if (getBlock(i, atLeast, atMost)) { if (getBlock(i, atLeast, atMost)) {
_gatherBlockPos.at(i) = std::make_pair(i, 0); _gatherBlockPos[i] = std::make_pair(i, 0);
} }
} }
@ -453,13 +459,14 @@ size_t GatherBlock::skipSome(size_t atLeast, size_t atMost) {
_gatherBlockPos.begin(), _gatherBlockPos.end(), ourLessThan)); _gatherBlockPos.begin(), _gatherBlockPos.end(), ourLessThan));
// renew the _gatherBlockPos and clean up the buffer if necessary // renew the _gatherBlockPos and clean up the buffer if necessary
_gatherBlockPos.at(val.first).second++; _gatherBlockPos[val.first].second++;
if (_gatherBlockPos.at(val.first).second == if (_gatherBlockPos[val.first].second ==
_gatherBlockBuffer.at(val.first).front()->size()) { _gatherBlockBuffer[val.first].front()->size()) {
AqlItemBlock* cur = _gatherBlockBuffer.at(val.first).front(); TRI_ASSERT(!_gatherBlockBuffer[val.first].empty());
AqlItemBlock* cur = _gatherBlockBuffer[val.first].front();
returnBlock(cur); returnBlock(cur);
_gatherBlockBuffer.at(val.first).pop_front(); _gatherBlockBuffer[val.first].pop_front();
_gatherBlockPos.at(val.first) = std::make_pair(val.first, 0); _gatherBlockPos[val.first] = std::make_pair(val.first, 0);
} }
} }
@ -477,7 +484,7 @@ bool GatherBlock::getBlock(size_t i, size_t atLeast, size_t atMost) {
TRI_ASSERT(!_isSimple); TRI_ASSERT(!_isSimple);
std::unique_ptr<AqlItemBlock> docs(_dependencies.at(i)->getSome(atLeast, atMost)); std::unique_ptr<AqlItemBlock> docs(_dependencies.at(i)->getSome(atLeast, atMost));
if (docs != nullptr) { if (docs != nullptr && docs->size() > 0) {
_gatherBlockBuffer.at(i).emplace_back(docs.get()); _gatherBlockBuffer.at(i).emplace_back(docs.get());
docs.release(); docs.release();
return true; return true;
@ -499,6 +506,8 @@ bool GatherBlock::OurLessThan::operator()(std::pair<size_t, size_t> const& a,
if (_gatherBlockBuffer[b.first].empty()) { if (_gatherBlockBuffer[b.first].empty()) {
return true; return true;
} }
TRI_ASSERT(!_gatherBlockBuffer[a.first].empty());
TRI_ASSERT(!_gatherBlockBuffer[b.first].empty());
for (auto const& reg : _sortRegisters) { for (auto const& reg : _sortRegisters) {
// Fast path if there is no attributePath: // Fast path if there is no attributePath:

View File

@ -34,7 +34,6 @@ namespace arangodb {
namespace transaction { namespace transaction {
class Methods; class Methods;
} }
;
struct ClusterCommResult; struct ClusterCommResult;
namespace aql { namespace aql {
@ -117,7 +116,7 @@ class GatherBlock : public ExecutionBlock {
std::vector<std::deque<AqlItemBlock*>>& _gatherBlockBuffer; std::vector<std::deque<AqlItemBlock*>>& _gatherBlockBuffer;
std::vector<SortElementBlock>& _sortRegisters; std::vector<SortElementBlock>& _sortRegisters;
}; };
using Heap = std::vector<std::pair<std::size_t,std::size_t>>; using Heap = std::vector<std::pair<std::size_t, std::size_t>>;
std::unique_ptr<Heap> _heap; std::unique_ptr<Heap> _heap;
}; };