mirror of https://gitee.com/bigwinds/arangodb
cleaning up, and comments in .cpp for GatherBlock
This commit is contained in:
parent
14eaac044c
commit
69f2cfd5a5
|
@ -3468,6 +3468,30 @@ int NoResultsBlock::getOrSkipSome (size_t, // atLeast
|
||||||
// --SECTION-- class GatherBlock
|
// --SECTION-- class GatherBlock
|
||||||
// -----------------------------------------------------------------------------
|
// -----------------------------------------------------------------------------
|
||||||
|
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
/// @brief initializeCursor
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
int GatherBlock::shutdown() {
|
||||||
|
//don't call default shutdown method since it does the wrong thing to _gatherBlockBuffer
|
||||||
|
for (auto it = _dependencies.begin(); it != _dependencies.end(); ++it) {
|
||||||
|
int res = (*it)->shutdown();
|
||||||
|
|
||||||
|
if (res != TRI_ERROR_NO_ERROR) {
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for (std::deque<AqlItemBlock*>& x: _gatherBlockBuffer) {
|
||||||
|
for (AqlItemBlock* y: x) {
|
||||||
|
delete y;
|
||||||
|
}
|
||||||
|
x.clear();
|
||||||
|
}
|
||||||
|
_gatherBlockBuffer.clear();
|
||||||
|
return TRI_ERROR_NO_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
int GatherBlock::initializeCursor (AqlItemBlock* items, size_t pos) {
|
int GatherBlock::initializeCursor (AqlItemBlock* items, size_t pos) {
|
||||||
int res = ExecutionBlock::initializeCursor(items, pos);
|
int res = ExecutionBlock::initializeCursor(items, pos);
|
||||||
|
|
||||||
|
@ -3493,25 +3517,10 @@ int GatherBlock::initializeCursor (AqlItemBlock* items, size_t pos) {
|
||||||
return TRI_ERROR_NO_ERROR;
|
return TRI_ERROR_NO_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
int GatherBlock::shutdown() {
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
//don't call default shutdown method since it does the wrong thing to _gatherBlockBuffer
|
/// @brief count: the sum of the count() of the dependencies or -1 (if any
|
||||||
for (auto it = _dependencies.begin(); it != _dependencies.end(); ++it) {
|
/// dependency has count -1
|
||||||
int res = (*it)->shutdown();
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
if (res != TRI_ERROR_NO_ERROR) {
|
|
||||||
return res;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for (std::deque<AqlItemBlock*>& x: _gatherBlockBuffer) {
|
|
||||||
for (AqlItemBlock* y: x) {
|
|
||||||
delete y;
|
|
||||||
}
|
|
||||||
x.clear();
|
|
||||||
}
|
|
||||||
_gatherBlockBuffer.clear();
|
|
||||||
return TRI_ERROR_NO_ERROR;
|
|
||||||
}
|
|
||||||
|
|
||||||
int64_t GatherBlock::count () const {
|
int64_t GatherBlock::count () const {
|
||||||
int64_t sum = 0;
|
int64_t sum = 0;
|
||||||
|
@ -3524,6 +3533,11 @@ int64_t GatherBlock::count () const {
|
||||||
return sum;
|
return sum;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
/// @brief remaining: the sum of the remaining() of the dependencies or -1 (if
|
||||||
|
/// any dependency has remaining -1
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
int64_t GatherBlock::remaining () {
|
int64_t GatherBlock::remaining () {
|
||||||
int64_t sum = 0;
|
int64_t sum = 0;
|
||||||
for (auto x: _dependencies) {
|
for (auto x: _dependencies) {
|
||||||
|
@ -3535,22 +3549,10 @@ int64_t GatherBlock::remaining () {
|
||||||
return sum;
|
return sum;
|
||||||
}
|
}
|
||||||
|
|
||||||
// get blocks from dependencies.at(i) into _gatherBlockBuffer.at(i)
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
bool GatherBlock::getBlock (size_t i, size_t atLeast, size_t atMost) {
|
/// @brief hasMore: true if any position of _buffer hasMore and false
|
||||||
TRI_ASSERT(0 < i && i < _dependencies.size());
|
/// otherwise.
|
||||||
AqlItemBlock* docs = _dependencies.at(i)->getSome(atLeast, atMost);
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
if (docs != nullptr) {
|
|
||||||
try {
|
|
||||||
_gatherBlockBuffer.at(i).push_back(docs);
|
|
||||||
}
|
|
||||||
catch (...) {
|
|
||||||
delete docs;
|
|
||||||
throw;
|
|
||||||
}
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
bool GatherBlock::hasMore () {
|
bool GatherBlock::hasMore () {
|
||||||
if (_done) {
|
if (_done) {
|
||||||
|
@ -3568,6 +3570,10 @@ bool GatherBlock::hasMore () {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
/// @brief getSome
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
AqlItemBlock* GatherBlock::getSome (size_t atLeast, size_t atMost) {
|
AqlItemBlock* GatherBlock::getSome (size_t atLeast, size_t atMost) {
|
||||||
|
|
||||||
if (_done) {
|
if (_done) {
|
||||||
|
@ -3614,7 +3620,8 @@ AqlItemBlock* GatherBlock::getSome (size_t atLeast, size_t atMost) {
|
||||||
// get collections for ourLessThan . . .
|
// get collections for ourLessThan . . .
|
||||||
std::vector<TRI_document_collection_t const*> colls;
|
std::vector<TRI_document_collection_t const*> colls;
|
||||||
for (RegisterId i = 0; i < _sortRegisters.size(); i++) {
|
for (RegisterId i = 0; i < _sortRegisters.size(); i++) {
|
||||||
colls.push_back(_gatherBlockBuffer.at(index).front()->getDocumentCollection(_sortRegisters[i].first));
|
colls.push_back(_gatherBlockBuffer.at(index).front()->
|
||||||
|
getDocumentCollection(_sortRegisters[i].first));
|
||||||
}
|
}
|
||||||
|
|
||||||
// the following is similar to AqlItemBlock's slice method . . .
|
// the following is similar to AqlItemBlock's slice method . . .
|
||||||
|
@ -3626,7 +3633,9 @@ AqlItemBlock* GatherBlock::getSome (size_t atLeast, size_t atMost) {
|
||||||
AqlItemBlock* example =_gatherBlockBuffer.at(index).front();
|
AqlItemBlock* example =_gatherBlockBuffer.at(index).front();
|
||||||
size_t nrRegs = example->getNrRegs();
|
size_t nrRegs = example->getNrRegs();
|
||||||
|
|
||||||
std::unique_ptr<AqlItemBlock> res(new AqlItemBlock(toSend, static_cast<triagens::aql::RegisterId>(nrRegs))); // automatically deleted if things go wrong
|
std::unique_ptr<AqlItemBlock> res(new AqlItemBlock(toSend,
|
||||||
|
static_cast<triagens::aql::RegisterId>(nrRegs)));
|
||||||
|
// automatically deleted if things go wrong
|
||||||
|
|
||||||
for (RegisterId i = 0; i < nrRegs; i++) {
|
for (RegisterId i = 0; i < nrRegs; i++) {
|
||||||
res->setDocumentCollection(i, example->getDocumentCollection(i));
|
res->setDocumentCollection(i, example->getDocumentCollection(i));
|
||||||
|
@ -3634,11 +3643,13 @@ AqlItemBlock* GatherBlock::getSome (size_t atLeast, size_t atMost) {
|
||||||
|
|
||||||
for (size_t i = 0; i < toSend; i++) {
|
for (size_t i = 0; i < toSend; i++) {
|
||||||
// get the next smallest row from the buffer . . .
|
// get the next smallest row from the buffer . . .
|
||||||
std::pair<size_t, size_t> val = *(std::min_element(_gatherBlockPos.begin(), _gatherBlockPos.end(), ourLessThan));
|
std::pair<size_t, size_t> val = *(std::min_element(_gatherBlockPos.begin(),
|
||||||
|
_gatherBlockPos.end(), ourLessThan));
|
||||||
|
|
||||||
// copy the row in to the outgoing block . . .
|
// copy the row in to the outgoing block . . .
|
||||||
for (RegisterId col = 0; col < nrRegs; col++) {
|
for (RegisterId col = 0; col < nrRegs; col++) {
|
||||||
AqlValue const& x(_gatherBlockBuffer.at(val.first).front()->getValue(val.second, col));
|
AqlValue const&
|
||||||
|
x(_gatherBlockBuffer.at(val.first).front()->getValue(val.second, col));
|
||||||
if (! x.isEmpty()) {
|
if (! x.isEmpty()) {
|
||||||
auto it = cache.find(x);
|
auto it = cache.find(x);
|
||||||
if (it == cache.end()) {
|
if (it == cache.end()) {
|
||||||
|
@ -3660,7 +3671,8 @@ AqlItemBlock* GatherBlock::getSome (size_t atLeast, size_t atMost) {
|
||||||
|
|
||||||
// renew the buffer and comparison function if necessary . . .
|
// renew the buffer and comparison function if necessary . . .
|
||||||
_gatherBlockPos.at(val.first).second++;
|
_gatherBlockPos.at(val.first).second++;
|
||||||
if (_gatherBlockPos.at(val.first).second == _gatherBlockBuffer.at(val.first).front()->size()) {
|
if (_gatherBlockPos.at(val.first).second ==
|
||||||
|
_gatherBlockBuffer.at(val.first).front()->size()) {
|
||||||
_gatherBlockBuffer.at(val.first).pop_front();
|
_gatherBlockBuffer.at(val.first).pop_front();
|
||||||
if (_gatherBlockBuffer.at(val.first).empty()) {
|
if (_gatherBlockBuffer.at(val.first).empty()) {
|
||||||
getBlock(val.first, DefaultBatchSize, DefaultBatchSize);
|
getBlock(val.first, DefaultBatchSize, DefaultBatchSize);
|
||||||
|
@ -3675,6 +3687,10 @@ AqlItemBlock* GatherBlock::getSome (size_t atLeast, size_t atMost) {
|
||||||
return res.release();
|
return res.release();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
/// @brief skipSome
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
size_t GatherBlock::skipSome (size_t atLeast, size_t atMost) {
|
size_t GatherBlock::skipSome (size_t atLeast, size_t atMost) {
|
||||||
|
|
||||||
if (_done) {
|
if (_done) {
|
||||||
|
@ -3727,7 +3743,8 @@ size_t GatherBlock::skipSome (size_t atLeast, size_t atMost) {
|
||||||
|
|
||||||
for (size_t i = 0; i < skipped; i++) {
|
for (size_t i = 0; i < skipped; i++) {
|
||||||
// get the next smallest row from the buffer . . .
|
// get the next smallest row from the buffer . . .
|
||||||
std::pair<size_t, size_t> val = *(std::min_element(_gatherBlockPos.begin(), _gatherBlockPos.end(), ourLessThan));
|
std::pair<size_t, size_t> val = *(std::min_element(_gatherBlockPos.begin(),
|
||||||
|
_gatherBlockPos.end(), ourLessThan));
|
||||||
// renew the buffer and comparison function if necessary . . .
|
// renew the buffer and comparison function if necessary . . .
|
||||||
_gatherBlockPos.at(val.first).second++;
|
_gatherBlockPos.at(val.first).second++;
|
||||||
if (_gatherBlockPos.at(val.first).second == _gatherBlockBuffer.at(val.first).front()->size()) {
|
if (_gatherBlockPos.at(val.first).second == _gatherBlockBuffer.at(val.first).front()->size()) {
|
||||||
|
@ -3746,15 +3763,42 @@ size_t GatherBlock::skipSome (size_t atLeast, size_t atMost) {
|
||||||
return skipped;
|
return skipped;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
/// @brief getBlock: from dependency i into _gatherBlockBuffer.at(i),
|
||||||
|
/// non-simple case only
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
bool GatherBlock::getBlock (size_t i, size_t atLeast, size_t atMost) {
|
||||||
|
TRI_ASSERT(0 < i && i < _dependencies.size());
|
||||||
|
AqlItemBlock* docs = _dependencies.at(i)->getSome(atLeast, atMost);
|
||||||
|
if (docs != nullptr) {
|
||||||
|
try {
|
||||||
|
_gatherBlockBuffer.at(i).push_back(docs);
|
||||||
|
}
|
||||||
|
catch (...) {
|
||||||
|
delete docs;
|
||||||
|
throw;
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
/// @brief OurLessThan: comparison method for elements of _gatherBlockPos
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
bool GatherBlock::OurLessThan::operator() (std::pair<size_t, size_t> const& a,
|
bool GatherBlock::OurLessThan::operator() (std::pair<size_t, size_t> const& a,
|
||||||
std::pair<size_t, size_t> const& b) {
|
std::pair<size_t, size_t> const& b) {
|
||||||
size_t i = 0;
|
size_t i = 0;
|
||||||
for (auto reg : _sortRegisters) {
|
for (auto reg : _sortRegisters) {
|
||||||
int cmp = AqlValue::Compare(_trx,
|
int cmp = AqlValue::Compare(
|
||||||
|
_trx,
|
||||||
_gatherBlockBuffer.at(a.first).front()->getValue(a.second, reg.first),
|
_gatherBlockBuffer.at(a.first).front()->getValue(a.second, reg.first),
|
||||||
_colls[i],
|
_colls[i],
|
||||||
_gatherBlockBuffer.at(b.first).front()->getValue(b.second, reg.first),
|
_gatherBlockBuffer.at(b.first).front()->getValue(b.second, reg.first),
|
||||||
_colls[i]);
|
_colls[i]);
|
||||||
|
|
||||||
if (cmp == -1) {
|
if (cmp == -1) {
|
||||||
return reg.second;
|
return reg.second;
|
||||||
}
|
}
|
||||||
|
|
|
@ -1514,6 +1514,12 @@ public:
|
||||||
return ExecutionBlock::initialize();
|
return ExecutionBlock::initialize();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
/// @brief shutdown: need our own method since our _buffer is different
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
int shutdown ();
|
||||||
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
/// @brief initializeCursor
|
/// @brief initializeCursor
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
@ -1553,12 +1559,6 @@ public:
|
||||||
|
|
||||||
size_t skipSome (size_t, size_t);
|
size_t skipSome (size_t, size_t);
|
||||||
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
|
||||||
/// @brief shutdown: need our own method since our _buffer is different
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
|
||||||
|
|
||||||
int shutdown ();
|
|
||||||
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
/// @brief _pos: pairs (i, _buffer.at(i)), i.e. the same as the usual _pos but
|
/// @brief _pos: pairs (i, _buffer.at(i)), i.e. the same as the usual _pos but
|
||||||
/// one pair per dependency
|
/// one pair per dependency
|
||||||
|
@ -1568,6 +1568,13 @@ public:
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
|
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
/// @brief getBlock: from dependency i into _gatherBlockBuffer.at(i),
|
||||||
|
/// non-simple case only
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
bool getBlock (size_t i, size_t atLeast, size_t atMost);
|
||||||
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
/// @brief _gatherBlockBuffer: buffer the incoming block from each dependency
|
/// @brief _gatherBlockBuffer: buffer the incoming block from each dependency
|
||||||
/// separately
|
/// separately
|
||||||
|
@ -1593,12 +1600,6 @@ public:
|
||||||
|
|
||||||
size_t _atDep = 0;
|
size_t _atDep = 0;
|
||||||
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
|
||||||
/// @brief getBlock: from dependency i, non-simple case only
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
|
||||||
|
|
||||||
bool getBlock (size_t i, size_t atLeast, size_t atMost);
|
|
||||||
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
/// @brief pairs, consisting of variable and sort direction
|
/// @brief pairs, consisting of variable and sort direction
|
||||||
/// (true = ascending | false = descending)
|
/// (true = ascending | false = descending)
|
||||||
|
|
Loading…
Reference in New Issue