1
0
Fork 0

method for getOrSkipSome for AggregateBlock, now working.

This commit is contained in:
James 2014-08-12 20:06:18 +02:00
parent 98393aca94
commit eed06f6512
1 changed files with 51 additions and 48 deletions

View File

@ -608,7 +608,8 @@ namespace triagens {
return TRI_ERROR_NO_ERROR;
}
else {
if (! getBlock(atLeast - skipped, std::max(atMost - skipped, DefaultBatchSize))) {
if (! getBlock(atLeast - skipped,
std::max(atMost - skipped, DefaultBatchSize))) {
_done = true;
break; // must still put things in the result from the collector . . .
}
@ -1088,7 +1089,7 @@ namespace triagens {
// if we get here, then _buffer.front() exists
AqlItemBlock* cur = _buffer.front();
if (atMost > _documents.size() - _posInAllDocs) {
if (atMost >= skipped + _documents.size() - _posInAllDocs) {
skipped += _documents.size() - _posInAllDocs;
_posInAllDocs = 0;
@ -1104,14 +1105,13 @@ namespace triagens {
}
}
else {
_posInAllDocs += atMost - skipped;
skipped = atMost;
_posInAllDocs += atMost;
}
}
return skipped;
}
private:
////////////////////////////////////////////////////////////////////////////////
@ -2132,37 +2132,33 @@ namespace triagens {
////////////////////////////////////////////////////////////////////////////////
/// @brief getSome
////////////////////////////////////////////////////////////////////////////////
/*int getOrSkipSome (size_t atLeast, size_t atMost, bool skip, AqlItemBlock*& result,
size_t& skipped) {
//~J not yet working!
int getOrSkipSome (size_t atLeast, size_t atMost, bool skipping,
AqlItemBlock*& result, size_t& skipped) {
if (_done) {
return nullptr;
return TRI_ERROR_NO_ERROR;
}
if (_buffer.empty()) {
if (! ExecutionBlock::getBlock(atLeast, atMost)) {
_done = true;
return nullptr;
return TRI_ERROR_NO_ERROR;
}
_pos = 0; // this is in the first block
}
// If we get here, we do have _buffer.front()
AqlItemBlock* cur = _buffer.front();
if(!skipping){
size_t const curRegs = cur->getNrRegs();
result = new AqlItemBlock(atMost, _varOverview->nrRegs[_depth]);
TRI_ASSERT(curRegs <= result->getNrRegs());
inheritRegisters(cur, result, _pos);
}
size_t const curRegs = cur->getNrRegs();
auto res = new AqlItemBlock(atMost, _varOverview->nrRegs[_depth]);
TRI_ASSERT(curRegs <= res->getNrRegs());
inheritRegisters(cur, res, _pos);
size_t j = 0;
while (j < atMost) {
while (skipped < atMost) {
// read the next input tow
bool newGroup = false;
@ -2190,18 +2186,18 @@ namespace triagens {
if (newGroup) {
if (! _currentGroup.groupValues[0].isEmpty()) {
// need to emit the current group first
emitGroup(cur, res, j);
if(!skipping){
// need to emit the current group first
emitGroup(cur, result, skipped);
}
// increase output row count
++j;
++skipped;
if (j == atMost) {
if (skipped == atMost) {
// output is full
// do NOT advance input pointer
return res;
return TRI_ERROR_NO_ERROR;
}
}
@ -2214,11 +2210,13 @@ namespace triagens {
_currentGroup.collections[i] = cur->getDocumentCollection((*it).second);
++i;
}
_currentGroup.setFirstRow(_pos);
if(!skipping){
_currentGroup.setFirstRow(_pos);
}
}
if(!skipping){
_currentGroup.setLastRow(_pos);
}
_currentGroup.setLastRow(_pos);
if (++_pos >= cur->size()) {
_buffer.pop_front();
@ -2233,16 +2231,19 @@ namespace triagens {
// no more input. we're done
try {
// emit last buffered group
emitGroup(cur, res, j);
++j;
if(!skipping){
emitGroup(cur, result, skipped);
++skipped;
TRI_ASSERT(skipped > 0);
result->shrink(skipped);
}
else {
++skipped;
}
delete cur;
cur = nullptr;
TRI_ASSERT(j > 0);
res->shrink(j);
_done = true;
return res;
return TRI_ERROR_NO_ERROR;
}
catch (...) {
delete cur;
@ -2260,18 +2261,19 @@ namespace triagens {
}
}
if(!skipping){
TRI_ASSERT(skipped > 0);
result->shrink(skipped);
}
TRI_ASSERT(j > 0);
res->shrink(j);
return res;
}*/
return TRI_ERROR_NO_ERROR;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief skipSome
////////////////////////////////////////////////////////////////////////////////
size_t skipSome (size_t atLeast, size_t atMost) {
/* size_t skipSome (size_t atLeast, size_t atMost) {
if (_done) {
return 0;
}
@ -2363,7 +2365,7 @@ namespace triagens {
}
return skipped;
}
} */
private:
@ -2373,7 +2375,7 @@ namespace triagens {
////////////////////////////////////////////////////////////////////////////////
void emitGroup (AqlItemBlock const* cur,
AqlItemBlock* res,
AqlItemBlock*& res,
size_t row) {
size_t i = 0;
@ -2386,7 +2388,8 @@ namespace triagens {
// set the group values
_currentGroup.addValues(cur, _groupRegister);
res->setValue(row, _groupRegister, AqlValue::CreateFromBlocks(_currentGroup.groupBlocks, _variableNames));
res->setValue(row, _groupRegister,
AqlValue::CreateFromBlocks(_currentGroup.groupBlocks, _variableNames));
}
// reset the group so a new one can start