1
0
Fork 0

split gather block (#5506)

* initial split into sorted and unsorted blocks

* some fixes

* move gather blocks out of header

* cleanup

* cleanup

* ensure ExecutionBlock constructor is noexcept

* extract sorting logic into separate SortingStrategy class

* ensure we not rebuild heap each getSome/skipSome call

* cleanup API attempt

* Revert "cleanup API attempt"

This reverts commit c824de1a1d29e2cb7f405138f830cc0e02e7667b.

* Revert "Revert "cleanup API attempt""

This reverts commit 65a2c95f6fdbbcbd00d6582af4d354fe339d4738.

* fix comments

* fixes after review

* more fixes after review
This commit is contained in:
Andrey Abramov 2018-06-01 22:26:51 +03:00 committed by GitHub
parent 1283e0f629
commit 1a83809b84
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 626 additions and 443 deletions

View File

@ -58,11 +58,11 @@ using StringBuffer = arangodb::basics::StringBuffer;
namespace { namespace {
/// @brief OurLessThan: comparison method for elements of _gatherBlockPos /// @brief OurLessThan: comparison method for elements of SortingGatherBlock
class OurLessThan { class OurLessThan {
public: public:
OurLessThan( OurLessThan(
transaction::Methods* trx, arangodb::transaction::Methods* trx,
std::vector<std::deque<AqlItemBlock*>>& gatherBlockBuffer, std::vector<std::deque<AqlItemBlock*>>& gatherBlockBuffer,
std::vector<SortRegister>& sortRegisters) noexcept std::vector<SortRegister>& sortRegisters) noexcept
: _trx(trx), : _trx(trx),
@ -76,7 +76,7 @@ class OurLessThan {
) const; ) const;
private: private:
transaction::Methods* _trx; arangodb::transaction::Methods* _trx;
std::vector<std::deque<AqlItemBlock*>>& _gatherBlockBuffer; std::vector<std::deque<AqlItemBlock*>>& _gatherBlockBuffer;
std::vector<SortRegister>& _sortRegisters; std::vector<SortRegister>& _sortRegisters;
}; // OurLessThan }; // OurLessThan
@ -133,407 +133,84 @@ bool OurLessThan::operator()(
return false; return false;
} }
} ////////////////////////////////////////////////////////////////////////////////
/// @class HeapSorting
GatherBlock::GatherBlock(ExecutionEngine* engine, GatherNode const* en) /// @brief "Heap" sorting strategy
: ExecutionBlock(engine, en), ////////////////////////////////////////////////////////////////////////////////
_isSimple(en->elements().empty()) { class HeapSorting final : public SortingStrategy, private OurLessThan {
TRI_ASSERT(en && en->plan() && en->getRegisterPlan()); public:
HeapSorting(
if (!_isSimple) { arangodb::transaction::Methods* trx,
if (en->_sortmode == GatherNode::SortMode::Heap) { std::vector<std::deque<AqlItemBlock*>>& gatherBlockBuffer,
_heap.reset(new Heap()); std::vector<SortRegister>& sortRegisters) noexcept
} : OurLessThan(trx, gatherBlockBuffer, sortRegisters) {
// We know that planRegisters has been run, so
// getPlanNode()->_registerPlan is set up
SortRegister::fill(
*en->plan(),
*en->getRegisterPlan(),
en->elements(),
_sortRegisters
);
}
}
GatherBlock::~GatherBlock() {
for (std::deque<AqlItemBlock*>& x : _gatherBlockBuffer) {
for (AqlItemBlock* y : x) {
delete y;
}
x.clear();
}
_gatherBlockBuffer.clear();
}
/// @brief shutdown: need our own method since our _buffer is different
int GatherBlock::shutdown(int errorCode) {
DEBUG_BEGIN_BLOCK();
// don't call default shutdown method since it does the wrong thing to
// _gatherBlockBuffer
int ret = TRI_ERROR_NO_ERROR;
for (auto it = _dependencies.begin(); it != _dependencies.end(); ++it) {
int res = (*it)->shutdown(errorCode);
if (res != TRI_ERROR_NO_ERROR) {
ret = res;
}
} }
if (ret != TRI_ERROR_NO_ERROR) { virtual ValueType nextValue() override {
return ret; TRI_ASSERT(!_heap.empty());
std::push_heap(_heap.begin(), _heap.end(), *this); // re-insert element
std::pop_heap(_heap.begin(), _heap.end(), *this); // remove element from _heap but not from vector
return _heap.back();
} }
if (!_isSimple) { virtual void prepare(std::vector<ValueType>& blockPos) override {
for (std::deque<AqlItemBlock*>& x : _gatherBlockBuffer) { TRI_ASSERT(!blockPos.empty());
for (AqlItemBlock* y : x) {
delete y; if (_heap.size() == blockPos.size()) {
} return;
x.clear();
} }
_gatherBlockBuffer.clear();
_gatherBlockPos.clear(); _heap.clear();
std::copy(blockPos.begin(), blockPos.end(), std::back_inserter(_heap));
std::make_heap(_heap.begin(), _heap.end()-1, *this); // remain last element out of heap to maintain invariant
TRI_ASSERT(!_heap.empty());
} }
return TRI_ERROR_NO_ERROR; virtual void reset() noexcept override {
_heap.clear();
// cppcheck-suppress style
DEBUG_END_BLOCK();
}
/// @brief initializeCursor
int GatherBlock::initializeCursor(AqlItemBlock* items, size_t pos) {
DEBUG_BEGIN_BLOCK();
int res = ExecutionBlock::initializeCursor(items, pos);
if (res != TRI_ERROR_NO_ERROR) {
return res;
} }
_atDep = 0; bool operator()(
std::pair<size_t, size_t> const& lhs,
if (!_isSimple) { std::pair<size_t, size_t> const& rhs
for (std::deque<AqlItemBlock*>& x : _gatherBlockBuffer) { ) const {
for (AqlItemBlock* y : x) { return OurLessThan::operator()(rhs, lhs);
delete y;
}
x.clear();
}
_gatherBlockBuffer.clear();
_gatherBlockPos.clear();
_gatherBlockBuffer.reserve(_dependencies.size());
_gatherBlockPos.reserve(_dependencies.size());
for (size_t i = 0; i < _dependencies.size(); i++) {
_gatherBlockBuffer.emplace_back();
_gatherBlockPos.emplace_back(std::make_pair(i, 0));
}
if (_heap) {
_heap->clear();
}
} }
if (_dependencies.empty()) { private:
_done = true; std::vector<std::reference_wrapper<ValueType>> _heap;
} else { }; // HeapSorting
_done = false;
}
return TRI_ERROR_NO_ERROR;
// cppcheck-suppress style ////////////////////////////////////////////////////////////////////////////////
DEBUG_END_BLOCK(); /// @class MinElementSorting
} /// @brief "MinElement" sorting strategy
////////////////////////////////////////////////////////////////////////////////
/// @brief hasMore: true if any position of _buffer hasMore and false class MinElementSorting final : public SortingStrategy, public OurLessThan {
/// otherwise. public:
bool GatherBlock::hasMore() { MinElementSorting(
DEBUG_BEGIN_BLOCK(); arangodb::transaction::Methods* trx,
if (_done || _dependencies.empty()) { std::vector<std::deque<AqlItemBlock*>>& gatherBlockBuffer,
return false; std::vector<SortRegister>& sortRegisters) noexcept
: OurLessThan(trx, gatherBlockBuffer, sortRegisters) {
} }
if (_isSimple) { virtual ValueType nextValue() override {
for (size_t i = 0; i < _dependencies.size(); i++) { TRI_ASSERT(_blockPos);
if (_dependencies.at(i)->hasMore()) { return *(std::min_element(_blockPos->begin(), _blockPos->end(), *this));
return true;
}
}
} else {
for (size_t i = 0; i < _gatherBlockBuffer.size(); i++) {
if (!_gatherBlockBuffer.at(i).empty()) {
return true;
} else if (getBlock(i, DefaultBatchSize())) {
_gatherBlockPos.at(i) = std::make_pair(i, 0);
return true;
}
}
}
_done = true;
return false;
// cppcheck-suppress style
DEBUG_END_BLOCK();
}
/// @brief getSome
AqlItemBlock* GatherBlock::getSome(size_t atMost) {
DEBUG_BEGIN_BLOCK();
traceGetSomeBegin(atMost);
if (_dependencies.empty()) {
_done = true;
} }
if (_done) { virtual void prepare(std::vector<ValueType>& blockPos) override {
traceGetSomeEnd(nullptr); _blockPos = &blockPos;
return nullptr;
} }
// the simple case . . . virtual void reset() noexcept override {
if (_isSimple) { _blockPos = nullptr;
auto res = _dependencies.at(_atDep)->getSome(atMost);
while (res == nullptr && _atDep < _dependencies.size() - 1) {
_atDep++;
res = _dependencies.at(_atDep)->getSome(atMost);
}
if (res == nullptr) {
_done = true;
}
traceGetSomeEnd(res);
return res;
} }
// the non-simple case . . . private:
size_t available = 0; // nr of available rows std::vector<ValueType> const* _blockPos;
size_t index = 0; // an index of a non-empty buffer };
// pull more blocks from dependencies . . .
TRI_ASSERT(_gatherBlockBuffer.size() == _dependencies.size());
TRI_ASSERT(_gatherBlockBuffer.size() == _gatherBlockPos.size());
for (size_t i = 0; i < _dependencies.size(); ++i) {
if (_gatherBlockBuffer[i].empty()) {
if (getBlock(i, atMost)) {
index = i;
_gatherBlockPos[i] = std::make_pair(i, 0);
}
} else {
index = i;
}
auto const& cur = _gatherBlockBuffer[i];
if (!cur.empty()) {
TRI_ASSERT(cur[0]->size() >= _gatherBlockPos[i].second);
available += cur[0]->size() - _gatherBlockPos[i].second;
for (size_t j = 1; j < cur.size(); ++j) {
available += cur[j]->size();
}
}
}
if (available == 0) {
_done = true;
traceGetSomeEnd(nullptr);
return nullptr;
}
size_t toSend = (std::min)(available, atMost); // nr rows in outgoing block
// the following is similar to AqlItemBlock's slice method . . .
std::vector<std::unordered_map<AqlValue, AqlValue>> cache;
cache.resize(_gatherBlockBuffer.size());
// comparison function
OurLessThan ourLessThan(_trx, _gatherBlockBuffer, _sortRegisters);
auto ourGreater = [&ourLessThan](std::pair<std::size_t, std::size_t>& a, std::pair<std::size_t, std::size_t>& b) {
return ourLessThan(b, a);
};
TRI_ASSERT(!_gatherBlockBuffer.at(index).empty());
AqlItemBlock* example = _gatherBlockBuffer.at(index).front();
size_t nrRegs = example->getNrRegs();
// automatically deleted if things go wrong
std::unique_ptr<AqlItemBlock> res(requestBlock(toSend, static_cast<arangodb::aql::RegisterId>(nrRegs)));
if (_heap && _heap->size() != _dependencies.size()) {
auto& heap = *_heap;
std::copy(_gatherBlockPos.begin(), _gatherBlockPos.end(), std::back_inserter(heap));
std::make_heap(heap.begin(), heap.end(), ourGreater);
}
for (size_t i = 0; i < toSend; i++) {
// get the next smallest row from the buffer . . .
std::pair<size_t, size_t> val;
if (_heap) {
val = _heap->front();
} else {
val = *(std::min_element( _gatherBlockPos.begin(), _gatherBlockPos.end(), ourLessThan));
}
// copy the row in to the outgoing block . . .
for (RegisterId col = 0; col < nrRegs; col++) {
TRI_ASSERT(!_gatherBlockBuffer[val.first].empty());
AqlValue const& x(_gatherBlockBuffer[val.first].front()->getValueReference(val.second, col));
if (!x.isEmpty()) {
if (x.requiresDestruction()) {
// complex value, with ownership transfer
auto it = cache[val.first].find(x);
if (it == cache[val.first].end()) {
AqlValue y = x.clone();
try {
res->setValue(i, col, y);
} catch (...) {
y.destroy();
throw;
}
cache[val.first].emplace(x, y);
} else {
res->setValue(i, col, (*it).second);
}
} else {
// simple value, no ownership transfer needed
res->setValue(i, col, x);
}
}
}
_gatherBlockPos.at(val.first).second++;
if (_heap) {
auto& heap = *_heap;
std::pop_heap(heap.begin(), heap.end(), ourGreater); // remove element from heap but not from vector
heap.back().second++; //advance position in itemblock of removed element before it is re-inserted later
}
// renew the _gatherBlockPos and clean up the buffer if necessary
if (_gatherBlockPos.at(val.first).second == _gatherBlockBuffer.at(val.first).front()->size()) {
TRI_ASSERT(!_gatherBlockBuffer[val.first].empty());
AqlItemBlock* cur = _gatherBlockBuffer[val.first].front();
returnBlock(cur);
_gatherBlockBuffer[val.first].pop_front();
_gatherBlockPos[val.first] = {val.first, 0};
if (_heap) {
_heap->back().second = 0;
}
if (_gatherBlockBuffer[val.first].empty()) {
// if we pulled everything from the buffer, we need to fetch
// more data for the shard for which we have no more local
// values.
getBlock(val.first, atMost);
cache[val.first].clear();
// note that if getBlock() returns false here, this is not
// a problem, because the sort function used takes care of
// this
}
}
if (_heap) {
std::push_heap(_heap->begin(), _heap->end(), ourGreater); //re-insert element
}
}
traceGetSomeEnd(res.get());
return res.release();
// cppcheck-suppress style
DEBUG_END_BLOCK();
}
/// @brief skipSome
size_t GatherBlock::skipSome(size_t atMost) {
DEBUG_BEGIN_BLOCK();
if (_done) {
return 0;
}
// the simple case . . .
if (_isSimple) {
auto skipped = _dependencies.at(_atDep)->skipSome(atMost);
while (skipped == 0 && _atDep < _dependencies.size() - 1) {
_atDep++;
skipped = _dependencies[_atDep]->skipSome(atMost);
}
if (skipped == 0) {
_done = true;
}
return skipped;
}
// the non-simple case . . .
size_t available = 0; // nr of available rows
TRI_ASSERT(_dependencies.size() != 0);
// pull more blocks from dependencies . . .
for (size_t i = 0; i < _dependencies.size(); i++) {
if (_gatherBlockBuffer[i].empty()) {
if (getBlock(i, atMost)) {
_gatherBlockPos[i] = std::make_pair(i, 0);
}
}
auto cur = _gatherBlockBuffer.at(i);
if (!cur.empty()) {
available += cur.at(0)->size() - _gatherBlockPos.at(i).second;
for (size_t j = 1; j < cur.size(); j++) {
available += cur.at(j)->size();
}
}
}
if (available == 0) {
_done = true;
return 0;
}
size_t skipped = (std::min)(available, atMost); // nr rows in outgoing block
// comparison function
OurLessThan ourLessThan(_trx, _gatherBlockBuffer, _sortRegisters);
for (size_t i = 0; i < skipped; i++) {
// get the next smallest row from the buffer . . .
std::pair<size_t, size_t> val = *(std::min_element(
_gatherBlockPos.begin(), _gatherBlockPos.end(), ourLessThan));
// renew the _gatherBlockPos and clean up the buffer if necessary
_gatherBlockPos[val.first].second++;
if (_gatherBlockPos[val.first].second ==
_gatherBlockBuffer[val.first].front()->size()) {
TRI_ASSERT(!_gatherBlockBuffer[val.first].empty());
AqlItemBlock* cur = _gatherBlockBuffer[val.first].front();
returnBlock(cur);
_gatherBlockBuffer[val.first].pop_front();
_gatherBlockPos[val.first] = std::make_pair(val.first, 0);
}
}
return skipped;
// cppcheck-suppress style
DEBUG_END_BLOCK();
}
/// @brief getBlock: from dependency i into _gatherBlockBuffer.at(i),
/// non-simple case only
bool GatherBlock::getBlock(size_t i, size_t atMost) {
DEBUG_BEGIN_BLOCK();
TRI_ASSERT(i < _dependencies.size());
TRI_ASSERT(!_isSimple);
std::unique_ptr<AqlItemBlock> docs(_dependencies.at(i)->getSome(atMost));
if (docs != nullptr && docs->size() > 0) {
_gatherBlockBuffer.at(i).emplace_back(docs.get());
docs.release();
return true;
}
return false;
// cppcheck-suppress style
DEBUG_END_BLOCK();
} }
BlockWithClients::BlockWithClients(ExecutionEngine* engine, BlockWithClients::BlockWithClients(ExecutionEngine* engine,
@ -1486,3 +1163,452 @@ bool RemoteBlock::hasMore() {
// cppcheck-suppress style // cppcheck-suppress style
DEBUG_END_BLOCK(); DEBUG_END_BLOCK();
} }
// -----------------------------------------------------------------------------
// -- SECTION -- UnsortingGatherBlock
// -----------------------------------------------------------------------------
/// @brief shutdown: need our own method since our _buffer is different
int UnsortingGatherBlock::shutdown(int errorCode) {
DEBUG_BEGIN_BLOCK();
// don't call default shutdown method since it does the wrong thing to
// _gatherBlockBuffer
int ret = TRI_ERROR_NO_ERROR;
for (auto* dependency : _dependencies) {
int res = dependency->shutdown(errorCode);
if (res != TRI_ERROR_NO_ERROR) {
ret = res;
}
}
return ret;
// cppcheck-suppress style
DEBUG_END_BLOCK();
}
/// @brief initializeCursor
int UnsortingGatherBlock::initializeCursor(AqlItemBlock* items, size_t pos) {
DEBUG_BEGIN_BLOCK();
int res = ExecutionBlock::initializeCursor(items, pos);
if (res != TRI_ERROR_NO_ERROR) {
return res;
}
_atDep = 0;
_done = _dependencies.empty();
return TRI_ERROR_NO_ERROR;
// cppcheck-suppress style
DEBUG_END_BLOCK();
}
/// @brief hasMore: true if any position of _buffer hasMore and false
/// otherwise.
bool UnsortingGatherBlock::hasMore() {
DEBUG_BEGIN_BLOCK();
if (_done || _dependencies.empty()) {
return false;
}
for (auto* dependency : _dependencies) {
if (dependency->hasMore()) {
return true;
}
}
_done = true;
return false;
// cppcheck-suppress style
DEBUG_END_BLOCK();
}
/// @brief getSome
AqlItemBlock* UnsortingGatherBlock::getSome(size_t atMost) {
DEBUG_BEGIN_BLOCK();
traceGetSomeBegin(atMost);
_done = _dependencies.empty();
if (_done) {
traceGetSomeEnd(nullptr);
return nullptr;
}
// the simple case . . .
auto* res = _dependencies[_atDep]->getSome(atMost);
while (!res && _atDep < _dependencies.size() - 1) {
_atDep++;
res = _dependencies[_atDep]->getSome(atMost);
}
_done = (nullptr == res);
traceGetSomeEnd(res);
return res;
// cppcheck-suppress style
DEBUG_END_BLOCK();
}
/// @brief skipSome
size_t UnsortingGatherBlock::skipSome(size_t atMost) {
DEBUG_BEGIN_BLOCK();
if (_done) {
return 0;
}
// the simple case . . .
auto skipped = _dependencies[_atDep]->skipSome(atMost);
while (skipped == 0 && _atDep < _dependencies.size() - 1) {
_atDep++;
skipped = _dependencies[_atDep]->skipSome(atMost);
}
_done = (skipped == 0);
return skipped;
// cppcheck-suppress style
DEBUG_END_BLOCK();
}
// -----------------------------------------------------------------------------
// -- SECTION -- SortingGatherBlock
// -----------------------------------------------------------------------------
SortingGatherBlock::SortingGatherBlock(
ExecutionEngine& engine,
GatherNode const& en)
: ExecutionBlock(&engine, &en) {
TRI_ASSERT(!en.elements().empty());
switch (en.sortMode()) {
case GatherNode::SortMode::MinElement:
_strategy = std::make_unique<HeapSorting>(
_trx, _gatherBlockBuffer, _sortRegisters
);
break;
case GatherNode::SortMode::Heap:
_strategy = std::make_unique<MinElementSorting>(
_trx, _gatherBlockBuffer, _sortRegisters
);
break;
default:
TRI_ASSERT(false);
break;
}
TRI_ASSERT(_strategy);
// We know that planRegisters has been run, so
// getPlanNode()->_registerPlan is set up
SortRegister::fill(
*en.plan(),
*en.getRegisterPlan(),
en.elements(),
_sortRegisters
);
}
/// @brief shutdown: need our own method since our _buffer is different
int SortingGatherBlock::shutdown(int errorCode) {
DEBUG_BEGIN_BLOCK();
// don't call default shutdown method since it does the wrong thing to
// _gatherBlockBuffer
int ret = TRI_ERROR_NO_ERROR;
for (auto it = _dependencies.begin(); it != _dependencies.end(); ++it) {
int res = (*it)->shutdown(errorCode);
if (res != TRI_ERROR_NO_ERROR) {
ret = res;
}
}
if (ret != TRI_ERROR_NO_ERROR) {
return ret;
}
for (std::deque<AqlItemBlock*>& x : _gatherBlockBuffer) {
for (AqlItemBlock* y : x) {
delete y;
}
x.clear();
}
_gatherBlockBuffer.clear();
_gatherBlockPos.clear();
return TRI_ERROR_NO_ERROR;
// cppcheck-suppress style
DEBUG_END_BLOCK();
}
/// @brief initializeCursor
int SortingGatherBlock::initializeCursor(AqlItemBlock* items, size_t pos) {
DEBUG_BEGIN_BLOCK();
int res = ExecutionBlock::initializeCursor(items, pos);
if (res != TRI_ERROR_NO_ERROR) {
return res;
}
for (std::deque<AqlItemBlock*>& x : _gatherBlockBuffer) {
for (AqlItemBlock* y : x) {
delete y;
}
x.clear();
}
_gatherBlockBuffer.clear();
_gatherBlockPos.clear();
_gatherBlockBuffer.reserve(_dependencies.size());
_gatherBlockPos.reserve(_dependencies.size());
for (size_t i = 0; i < _dependencies.size(); i++) {
_gatherBlockBuffer.emplace_back();
_gatherBlockPos.emplace_back(i, 0);
}
_strategy->reset();
_done = _dependencies.empty();
return TRI_ERROR_NO_ERROR;
// cppcheck-suppress style
DEBUG_END_BLOCK();
}
/// @brief hasMore: true if any position of _buffer hasMore and false
/// otherwise.
bool SortingGatherBlock::hasMore() {
DEBUG_BEGIN_BLOCK();
if (_done || _dependencies.empty()) {
return false;
}
for (size_t i = 0; i < _gatherBlockBuffer.size(); i++) {
if (!_gatherBlockBuffer[i].empty()) {
return true;
} else if (getBlock(i, DefaultBatchSize())) {
_gatherBlockPos[i] = std::make_pair(i, 0);
return true;
}
}
_done = true;
return false;
// cppcheck-suppress style
DEBUG_END_BLOCK();
}
/// @brief getSome
AqlItemBlock* SortingGatherBlock::getSome(size_t atMost) {
DEBUG_BEGIN_BLOCK();
traceGetSomeBegin(atMost);
if (_dependencies.empty()) {
_done = true;
}
if (_done) {
traceGetSomeEnd(nullptr);
return nullptr;
}
// the non-simple case . . .
size_t available = 0; // nr of available rows
size_t index = 0; // an index of a non-empty buffer
// pull more blocks from dependencies . . .
TRI_ASSERT(_gatherBlockBuffer.size() == _dependencies.size());
TRI_ASSERT(_gatherBlockBuffer.size() == _gatherBlockPos.size());
for (size_t i = 0; i < _dependencies.size(); ++i) {
if (_gatherBlockBuffer[i].empty()) {
if (getBlock(i, atMost)) {
index = i;
_gatherBlockPos[i] = std::make_pair(i, 0);
}
} else {
index = i;
}
auto const& cur = _gatherBlockBuffer[i];
if (!cur.empty()) {
TRI_ASSERT(cur[0]->size() >= _gatherBlockPos[i].second);
available += cur[0]->size() - _gatherBlockPos[i].second;
for (size_t j = 1; j < cur.size(); ++j) {
available += cur[j]->size();
}
}
}
if (available == 0) {
_done = true;
traceGetSomeEnd(nullptr);
return nullptr;
}
size_t toSend = (std::min)(available, atMost); // nr rows in outgoing block
// the following is similar to AqlItemBlock's slice method . . .
std::vector<std::unordered_map<AqlValue, AqlValue>> cache;
cache.resize(_gatherBlockBuffer.size());
TRI_ASSERT(!_gatherBlockBuffer.at(index).empty());
AqlItemBlock* example = _gatherBlockBuffer[index].front();
size_t nrRegs = example->getNrRegs();
// automatically deleted if things go wrong
std::unique_ptr<AqlItemBlock> res(requestBlock(toSend, static_cast<arangodb::aql::RegisterId>(nrRegs)));
_strategy->prepare(_gatherBlockPos);
for (size_t i = 0; i < toSend; i++) {
// get the next smallest row from the buffer . . .
auto const val = _strategy->nextValue();
auto& blocks = _gatherBlockBuffer[val.first];
auto& blocksPos = _gatherBlockPos[val.first];
// copy the row in to the outgoing block . . .
for (RegisterId col = 0; col < nrRegs; col++) {
TRI_ASSERT(!blocks.empty());
AqlValue const& x = blocks.front()->getValueReference(val.second, col);
if (!x.isEmpty()) {
if (x.requiresDestruction()) {
// complex value, with ownership transfer
auto it = cache[val.first].find(x);
if (it == cache[val.first].end()) {
AqlValue y = x.clone();
try {
res->setValue(i, col, y);
} catch (...) {
y.destroy();
throw;
}
cache[val.first].emplace(x, y);
} else {
res->setValue(i, col, (*it).second);
}
} else {
// simple value, no ownership transfer needed
res->setValue(i, col, x);
}
}
}
// renew the _gatherBlockPos and clean up the buffer if necessary
if (++blocksPos.second == blocks.front()->size()) {
TRI_ASSERT(!blocks.empty());
AqlItemBlock* cur = blocks.front();
returnBlock(cur);
blocks.pop_front();
blocksPos.second = 0; // reset position within a dependency
if (blocks.empty()) {
// if we pulled everything from the buffer, we need to fetch
// more data for the shard for which we have no more local
// values.
getBlock(val.first, atMost);
cache[val.first].clear();
// note that if getBlock() returns false here, this is not
// a problem, because the sort function used takes care of
// this
}
}
}
traceGetSomeEnd(res.get());
return res.release();
// cppcheck-suppress style
DEBUG_END_BLOCK();
}
/// @brief skipSome
size_t SortingGatherBlock::skipSome(size_t atMost) {
DEBUG_BEGIN_BLOCK();
if (_done) {
return 0;
}
// the non-simple case . . .
size_t available = 0; // nr of available rows
TRI_ASSERT(_dependencies.size() != 0);
// pull more blocks from dependencies . . .
for (size_t i = 0; i < _dependencies.size(); i++) {
if (_gatherBlockBuffer[i].empty()) {
if (getBlock(i, atMost)) {
_gatherBlockPos[i] = std::make_pair(i, 0);
}
}
auto cur = _gatherBlockBuffer[i];
if (!cur.empty()) {
available += cur[0]->size() - _gatherBlockPos[i].second;
for (size_t j = 1; j < cur.size(); j++) {
available += cur[j]->size();
}
}
}
if (available == 0) {
_done = true;
return 0;
}
size_t const skipped = (std::min)(available, atMost); // nr rows in outgoing block
_strategy->prepare(_gatherBlockPos);
for (size_t i = 0; i < skipped; i++) {
// get the next smallest row from the buffer . . .
auto const val = _strategy->nextValue();
auto& blocks = _gatherBlockBuffer[val.first];
auto& blocksPos = _gatherBlockPos[val.first];
// renew the _gatherBlockPos and clean up the buffer if necessary
if (++blocksPos.second == blocks.front()->size()) {
TRI_ASSERT(!blocks.empty());
AqlItemBlock* cur = blocks.front();
returnBlock(cur);
blocks.pop_front();
blocksPos.second = 0; // reset position within a dependency
}
}
return skipped;
// cppcheck-suppress style
DEBUG_END_BLOCK();
}
/// @brief getBlock: from dependency i into _gatherBlockBuffer.at(i),
/// non-simple case only
bool SortingGatherBlock::getBlock(size_t i, size_t atMost) {
DEBUG_BEGIN_BLOCK();
TRI_ASSERT(i < _dependencies.size());
std::unique_ptr<AqlItemBlock> docs(_dependencies[i]->getSome(atMost));
if (docs && docs->size() > 0) {
_gatherBlockBuffer[i].emplace_back(docs.get());
docs.release();
return true;
}
return false;
// cppcheck-suppress style
DEBUG_END_BLOCK();
}

View File

@ -44,56 +44,6 @@ namespace aql {
class AqlItemBlock; class AqlItemBlock;
struct Collection; struct Collection;
class ExecutionEngine; class ExecutionEngine;
class GatherBlock : public ExecutionBlock {
public:
GatherBlock(ExecutionEngine*, GatherNode const*);
~GatherBlock();
/// @brief shutdown: need our own method since our _buffer is different
int shutdown(int) override final;
/// @brief initializeCursor
int initializeCursor(AqlItemBlock* items, size_t pos) override final;
/// @brief hasMore: true if any position of _buffer hasMore and false
/// otherwise.
bool hasMore() override final;
/// @brief getSome
AqlItemBlock* getSome(size_t atMost) override final;
/// @brief skipSome
size_t skipSome(size_t atMost) override final;
protected:
/// @brief getBlock: from dependency i into _gatherBlockBuffer.at(i),
/// non-simple case only
bool getBlock(size_t i, size_t atMost);
/// @brief _gatherBlockBuffer: buffer the incoming block from each dependency
/// separately
std::vector<std::deque<AqlItemBlock*>> _gatherBlockBuffer;
private:
/// @brief _gatherBlockPos: pairs (i, _pos in _buffer.at(i)), i.e. the same as
/// the usual _pos but one pair per dependency
std::vector<std::pair<size_t, size_t>> _gatherBlockPos;
/// @brief _atDep: currently pulling blocks from _dependencies.at(_atDep),
/// simple case only
size_t _atDep = 0;
/// @brief sort elements for this block
std::vector<SortRegister> _sortRegisters;
/// @brief isSimple: the block is simple if we do not do merge sort . . .
bool const _isSimple;
using Heap = std::vector<std::pair<std::size_t, std::size_t>>;
std::unique_ptr<Heap> _heap;
};
class BlockWithClients : public ExecutionBlock { class BlockWithClients : public ExecutionBlock {
public: public:
@ -299,6 +249,106 @@ class RemoteBlock final : public ExecutionBlock {
bool const _isResponsibleForInitializeCursor; bool const _isResponsibleForInitializeCursor;
}; };
////////////////////////////////////////////////////////////////////////////////
/// @class UnsortingGatherBlock
/// @brief Execution block for gathers without order
////////////////////////////////////////////////////////////////////////////////
class UnsortingGatherBlock final : public ExecutionBlock {
public:
UnsortingGatherBlock(ExecutionEngine& engine, GatherNode const& en)
: ExecutionBlock(&engine, &en) {
TRI_ASSERT(en.elements().empty());
}
/// @brief shutdown: need our own method since our _buffer is different
int shutdown(int errorCode) override final;
/// @brief initializeCursor
int initializeCursor(AqlItemBlock* items, size_t pos) override final;
/// @brief hasMore: true if any position of _buffer hasMore and false
/// otherwise.
bool hasMore() override final;
/// @brief getSome
AqlItemBlock* getSome(size_t atMost) override final;
/// @brief skipSome
size_t skipSome(size_t atMost) override final;
private:
/// @brief _atDep: currently pulling blocks from _dependencies.at(_atDep),
size_t _atDep{};
}; // UnsortingGatherBlock
////////////////////////////////////////////////////////////////////////////////
/// @struct SortingStrategy
////////////////////////////////////////////////////////////////////////////////
struct SortingStrategy {
typedef std::pair<
size_t, // dependency index
size_t // position within a dependecy
> ValueType;
virtual ~SortingStrategy() = default;
/// @brief returns next value
virtual ValueType nextValue() = 0;
/// @brief prepare strategy fetching values
virtual void prepare(std::vector<ValueType>& /*blockPos*/) { }
/// @brief resets strategy state
virtual void reset() = 0;
}; // SortingStrategy
////////////////////////////////////////////////////////////////////////////////
/// @class SortingGatherBlock
/// @brief Execution block for gathers with order
////////////////////////////////////////////////////////////////////////////////
class SortingGatherBlock final : public ExecutionBlock {
public:
SortingGatherBlock(
ExecutionEngine& engine,
GatherNode const& en
);
/// @brief shutdown: need our own method since our _buffer is different
int shutdown(int errorCode) override final;
/// @brief initializeCursor
int initializeCursor(AqlItemBlock* items, size_t pos) override final;
/// @brief hasMore: true if any position of _buffer hasMore and false
/// otherwise.
bool hasMore() override final;
/// @brief getSome
AqlItemBlock* getSome(size_t atMost) override final;
/// @brief skipSome
size_t skipSome(size_t atMost) override final;
private:
/// @brief getBlock: from dependency i into _gatherBlockBuffer.at(i),
/// non-simple case only
bool getBlock(size_t i, size_t atMost);
/// @brief _gatherBlockBuffer: buffer the incoming block from each dependency
/// separately
std::vector<std::deque<AqlItemBlock*>> _gatherBlockBuffer;
/// @brief _gatherBlockPos: pairs (i, _pos in _buffer.at(i)), i.e. the same as
/// the usual _pos but one pair per dependency
std::vector<std::pair<size_t, size_t>> _gatherBlockPos;
/// @brief sort elements for this block
std::vector<SortRegister> _sortRegisters;
/// @brief sorting strategy
std::unique_ptr<SortingStrategy> _strategy;
}; // SortingGatherBlock
} // namespace arangodb::aql } // namespace arangodb::aql
} // namespace arangodb } // namespace arangodb

View File

@ -23,12 +23,14 @@
#include "ClusterNodes.h" #include "ClusterNodes.h"
#include "Aql/Ast.h" #include "Aql/Ast.h"
#include "Aql/AqlValue.h"
#include "Aql/Collection.h" #include "Aql/Collection.h"
#include "Aql/ClusterBlocks.h" #include "Aql/ClusterBlocks.h"
#include "Aql/ExecutionPlan.h" #include "Aql/ExecutionPlan.h"
#include "Aql/Query.h" #include "Aql/Query.h"
#include "Aql/IndexNode.h" #include "Aql/IndexNode.h"
#include "Aql/GraphNode.h" #include "Aql/GraphNode.h"
#include "Transaction/Methods.h"
#include <type_traits> #include <type_traits>
@ -341,7 +343,11 @@ std::unique_ptr<ExecutionBlock> GatherNode::createBlock(
std::unordered_map<ExecutionNode*, ExecutionBlock*> const&, std::unordered_map<ExecutionNode*, ExecutionBlock*> const&,
std::unordered_set<std::string> const& std::unordered_set<std::string> const&
) const { ) const {
return std::make_unique<GatherBlock>(&engine, this); if (elements().empty()) {
return std::make_unique<UnsortingGatherBlock>(engine, *this);
}
return std::make_unique<SortingGatherBlock>(engine, *this);
} }
/// @brief estimateCost /// @brief estimateCost

View File

@ -310,7 +310,6 @@ class DistributeNode : public ExecutionNode {
/// @brief class GatherNode /// @brief class GatherNode
class GatherNode final : public ExecutionNode { class GatherNode final : public ExecutionNode {
friend class ExecutionBlock; friend class ExecutionBlock;
friend class GatherBlock;
friend class RedundantCalculationsReplacer; friend class RedundantCalculationsReplacer;
public: public:

View File

@ -32,14 +32,16 @@
using namespace arangodb::aql; using namespace arangodb::aql;
ExecutionBlock::ExecutionBlock(ExecutionEngine* engine, ExecutionNode const* ep) ExecutionBlock::ExecutionBlock(
: _engine(engine), ExecutionEngine* engine,
_trx(engine->getQuery()->trx()), ExecutionNode const* ep)
_exeNode(ep), : _engine(engine),
_pos(0), _trx(engine->getQuery()->trx()),
_done(false), _exeNode(ep),
_profile(engine->getQuery()->queryOptions().profile), _pos(0),
_getSomeBegin(0) { _done(false),
_profile(engine->getQuery()->queryOptions().profile),
_getSomeBegin(0) {
TRI_ASSERT(_trx != nullptr); TRI_ASSERT(_trx != nullptr);
} }