1
0
Fork 0

adding SortBlock, something not working.

This commit is contained in:
James 2014-08-06 13:36:50 +02:00
parent 3e902323dd
commit 5ac060e687
3 changed files with 179 additions and 2 deletions

View File

@ -1046,7 +1046,7 @@ namespace triagens {
std::unordered_set<Variable*> inVars = _expression->variables();
for (auto it = inVars.begin(); it != inVars.end(); ++it) {
for (auto it = inVars.begin(); it != inVars.end(); ++it) {
_inVars.push_back(*it);
auto it2 = _varOverview->varInfo.find((*it)->id);
TRI_ASSERT(it2 != _varOverview->varInfo.end());
@ -1410,6 +1410,171 @@ namespace triagens {
};
// -----------------------------------------------------------------------------
// --SECTION-- SortBlock
// -----------------------------------------------------------------------------
class SortBlock : public ExecutionBlock {
public:
////////////////////////////////////////////////////////////////////////////////
/// @brief constructor
////////////////////////////////////////////////////////////////////////////////
SortBlock (AQL_TRANSACTION_V8* trx,
ExecutionNode const* ep)
: ExecutionBlock(trx, ep) {
}
////////////////////////////////////////////////////////////////////////////////
/// @brief destructor
////////////////////////////////////////////////////////////////////////////////
virtual ~SortBlock () {};
////////////////////////////////////////////////////////////////////////////////
/// @brief initialize
////////////////////////////////////////////////////////////////////////////////
int initialize () {
int res = ExecutionBlock::initialize();
if (res != TRI_ERROR_NO_ERROR) {
return res;
}
auto en = static_cast<SortNode const*>(getPlanNode());
for( auto p: en->_elements){
//We know that staticAnalysis has been run, so _varOverview is set up
auto it = _varOverview->varInfo.find(p.first->id);
TRI_ASSERT(it != _varOverview->varInfo.end());
_sortRegisters.push_back(make_pair(it->second.registerId, p.second));
}
return TRI_ERROR_NO_ERROR;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief execute
////////////////////////////////////////////////////////////////////////////////
virtual int execute () {
int res = ExecutionBlock::execute();
if (res != TRI_ERROR_NO_ERROR) {
return res;
}
// suck all blocks into _buffer
while (getBlock(DefaultBatchSize, DefaultBatchSize)) {
}
if (_buffer.size()==0){
_done = true;
return TRI_ERROR_NO_ERROR;
}
// coords[i][j] is the <j>th row of the <i>th block
std::vector<std::pair<size_t, size_t>> coords;
size_t sum = 0;
for (auto block:_buffer){
sum += block->size();
}
coords.reserve(sum);
// install the coords
size_t count = 0;
for (auto block:_buffer) {
for (size_t i = 0; i < block->size(); i++) {
coords.push_back(std::make_pair(count, i));
}
count++;
}
// comparison function
OurLessThan ourLessThan(_buffer, _sortRegisters);
// sort coords
std::sort(coords.begin(), coords.end(), ourLessThan);
//copy old points from _buffer into newbuf
std::vector<AqlItemBlock*> newbuf;
for (auto x: _buffer) {
newbuf.push_back(x);
}
_buffer.clear();
count = 0;
RegisterId nrregs = newbuf.front()->getNrRegs();
//install the rearranged values from <newbuf> into <_buffer>
while (count < sum) {
size_t size_next
= (sum - count > DefaultBatchSize ? DefaultBatchSize : sum - count);
AqlItemBlock* next = new AqlItemBlock(size_next, nrregs);
for (size_t i = 0; i < size_next; i++){
count++;
for (RegisterId j = 0; j < nrregs; j++){
next->setValue(i, j,
newbuf[coords[count].first]->getValue(coords[count].second, j));
newbuf[coords[count].first]->eraseValue(coords[count].second, j);
}
}
_buffer.push_back(next);
}
for ( auto x: newbuf){
delete x;
}
return TRI_ERROR_NO_ERROR;
}
private:
////////////////////////////////////////////////////////////////////////////////
/// @brief OurLessThan
////////////////////////////////////////////////////////////////////////////////
class OurLessThan {
public:
OurLessThan (std::deque<AqlItemBlock*>& buffer,
std::vector<std::pair<RegisterId, bool>>& sortRegisters)
: _buffer(buffer), _sortRegisters(sortRegisters) {
}
bool operator() (std::pair<size_t, size_t> const& a,
std::pair<size_t, size_t> const& b) {
for(auto reg: _sortRegisters){
int cmp = _buffer[a.first]->getValue(a.second, reg.first).compare(
_buffer[b.first]->getValue(b.second, reg.first));
if(cmp == -1) {
return reg.second;
}
else if (cmp == 1) {
return !reg.second;
}
}
return false;
}
private:
std::deque<AqlItemBlock*>& _buffer;
std::vector<std::pair<RegisterId, bool>>& _sortRegisters;
};
////////////////////////////////////////////////////////////////////////////////
/// @brief pairs, consisting of variable and sort direction
/// (true = ascending | false = descending)
////////////////////////////////////////////////////////////////////////////////
std::vector<std::pair<RegisterId, bool>> _sortRegisters;
};
// -----------------------------------------------------------------------------
// --SECTION-- LimitBlock

View File

@ -111,6 +111,11 @@ struct Instanciator : public ExecutionNode::WalkerWorker {
root = eb;
break;
}
case ExecutionNode::SORT: {
eb = new SortBlock(engine->getTransaction(),
static_cast<SortNode const*>(en));
break;
}
default: {
THROW_ARANGO_EXCEPTION(TRI_ERROR_NOT_IMPLEMENTED);
}

View File

@ -211,8 +211,15 @@ namespace triagens {
return false;
}
}
};
////////////////////////////////////////////////////////////////////////////////
/// @brief 3-way comparison
////////////////////////////////////////////////////////////////////////////////
int compare (AqlValue const& val){
return 0;
}
};
}
}