mirror of https://gitee.com/bigwinds/arangodb
aggregate!
This commit is contained in:
parent
9e459d986a
commit
61d8998909
|
@ -1369,7 +1369,7 @@ AstNode* Ast::optimizeLet (AstNode* node,
|
|||
}
|
||||
}
|
||||
else if (pass == 1) {
|
||||
if (! v->isReferenceCounted()) {
|
||||
if (! v->isReferenceCounted() && false) {
|
||||
// this optimizes away the assignment of variables which are never read
|
||||
// (i.e. assigned-only variables). this is currently not free of side-effects:
|
||||
// for example, in the following query, the variable 'x' would be optimized
|
||||
|
@ -1381,6 +1381,9 @@ AstNode* Ast::optimizeLet (AstNode* node,
|
|||
// TODO: also decrease the refcount of all variables used in the expression
|
||||
// (i.e. getReferencedVariables(expression)). this might produce further unused
|
||||
// variables
|
||||
|
||||
// TODO: COLLECT needs all variables in the scope if they do not appear directly
|
||||
// in any expression
|
||||
return createNodeNop();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -152,9 +152,12 @@ namespace triagens {
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
struct VarInfo {
|
||||
unsigned int depth;
|
||||
RegisterId registerId;
|
||||
VarInfo(int depth, int registerId) : depth(depth), registerId(registerId) {}
|
||||
unsigned int const depth;
|
||||
RegisterId const registerId;
|
||||
|
||||
VarInfo () = delete;
|
||||
VarInfo (int depth, int registerId) : depth(depth), registerId(registerId) {
|
||||
}
|
||||
};
|
||||
|
||||
struct VarOverview : public WalkerWorker {
|
||||
|
@ -547,7 +550,7 @@ namespace triagens {
|
|||
if (nr == 0) {
|
||||
return true;
|
||||
}
|
||||
return !hasMore();
|
||||
return ! hasMore();
|
||||
}
|
||||
|
||||
virtual int64_t count () {
|
||||
|
@ -1807,7 +1810,8 @@ namespace triagens {
|
|||
AggregateBlock (AQL_TRANSACTION_V8* trx,
|
||||
ExecutionNode const* ep)
|
||||
: ExecutionBlock(trx, ep),
|
||||
_collectDetails(false) {
|
||||
_groupRegister(0),
|
||||
_variableNames() {
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -1843,17 +1847,29 @@ namespace triagens {
|
|||
auto it = _varOverview->varInfo.find(en->_outVariable->id);
|
||||
TRI_ASSERT(it != _varOverview->varInfo.end());
|
||||
_groupRegister = (*it).second.registerId;
|
||||
_collectDetails = true;
|
||||
|
||||
TRI_ASSERT(_groupRegister > 0);
|
||||
|
||||
// construct a mapping of all register ids to variable names
|
||||
// we need this mapping to generate the grouped output
|
||||
|
||||
for (size_t i = 0; i < _varOverview->varInfo.size(); ++i) {
|
||||
_variableNames.push_back(""); // init with some default value
|
||||
}
|
||||
|
||||
// iterate over all our variables
|
||||
for (auto it = _varOverview->varInfo.begin(); it != _varOverview->varInfo.end(); ++it) {
|
||||
// find variable in the global variable map
|
||||
auto itVar = en->_variableMap.find((*it).first);
|
||||
|
||||
if (itVar != en->_variableMap.end()) {
|
||||
_variableNames[(*it).second.registerId] = (*itVar).second;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// reserve space for the current row
|
||||
_currentGroup.groupValues.reserve(_aggregateRegisters.size());
|
||||
_currentGroup.collections.reserve(_aggregateRegisters.size());
|
||||
|
||||
for (size_t i = 0; i < _aggregateRegisters.size(); ++i) {
|
||||
_currentGroup.groupValues[i] = AqlValue();
|
||||
_currentGroup.collections[i] = nullptr;
|
||||
}
|
||||
_currentGroup.initialize(_aggregateRegisters.size());
|
||||
|
||||
return TRI_ERROR_NO_ERROR;
|
||||
}
|
||||
|
@ -1867,7 +1883,6 @@ namespace triagens {
|
|||
return nullptr;
|
||||
}
|
||||
|
||||
std::cout << "AGGREGATE::GETSOME\n";
|
||||
if (_buffer.empty()) {
|
||||
if (! ExecutionBlock::getBlock(DefaultBatchSize, DefaultBatchSize)) {
|
||||
|
||||
|
@ -1876,7 +1891,6 @@ std::cout << "AGGREGATE::GETSOME\n";
|
|||
// auto res = new AqlItemBlock(1, _varOverview->nrRegs[_depth]);
|
||||
// emitRow(res, 0, _previousRow);
|
||||
// clearGroup();
|
||||
std::cout << "AGGREGATE::GETSOME - DONE\n";
|
||||
|
||||
_done = true;
|
||||
return nullptr;
|
||||
|
@ -1891,7 +1905,6 @@ std::cout << "AGGREGATE::GETSOME - DONE\n";
|
|||
auto res = new AqlItemBlock(atMost, _varOverview->nrRegs[_depth]);
|
||||
TRI_ASSERT(curRegs <= res->getNrRegs());
|
||||
|
||||
std::cout << "POS: " << _pos << "\n";
|
||||
inheritRegisters(cur, res, _pos);
|
||||
|
||||
size_t j = 0;
|
||||
|
@ -1903,11 +1916,9 @@ std::cout << "POS: " << _pos << "\n";
|
|||
if (_currentGroup.groupValues[0].isEmpty()) {
|
||||
// we never had any previous group
|
||||
newGroup = true;
|
||||
std::cout << "NEED TO CREATE NEW GROUP\n";
|
||||
}
|
||||
else {
|
||||
// we already had a group, check if the group has changed
|
||||
std::cout << "HAVE A GROUP\n";
|
||||
size_t i = 0;
|
||||
|
||||
for (auto it = _aggregateRegisters.begin(); it != _aggregateRegisters.end(); ++it) {
|
||||
|
@ -1925,17 +1936,10 @@ std::cout << "POS: " << _pos << "\n";
|
|||
}
|
||||
|
||||
if (newGroup) {
|
||||
std::cout << "CREATING GROUP...\n";
|
||||
if (! _currentGroup.groupValues[0].isEmpty()) {
|
||||
// need to emit the current group first
|
||||
std::cout << "EMITTING OLD GROUP INTO ROW #" << j << "\n";
|
||||
size_t i = 0;
|
||||
for (auto it = _aggregateRegisters.begin(); it != _aggregateRegisters.end(); ++it) {
|
||||
std::cout << "REGISTER #" << (*it).first << "\n";
|
||||
res->setValue(j, (*it).first, _currentGroup.groupValues[i]);
|
||||
++i;
|
||||
}
|
||||
|
||||
emitGroup(cur, res, j);
|
||||
// increase output row count
|
||||
++j;
|
||||
}
|
||||
|
||||
|
@ -1944,54 +1948,149 @@ std::cout << "POS: " << _pos << "\n";
|
|||
for (auto it = _aggregateRegisters.begin(); it != _aggregateRegisters.end(); ++it) {
|
||||
_currentGroup.groupValues[i] = cur->getValue(_pos, (*it).second).clone();
|
||||
_currentGroup.collections[i] = cur->getDocumentCollection((*it).second);
|
||||
std::cout << "GROUP VALUE #" << i << ": " << _currentGroup.groupValues[i].toString(_currentGroup.collections[i]) << "\n";
|
||||
++i;
|
||||
}
|
||||
|
||||
_currentGroup.firstRow = _pos;
|
||||
}
|
||||
|
||||
if (_collectDetails) {
|
||||
// _currentGroup.groupDetails.
|
||||
}
|
||||
_currentGroup.lastRow = _pos;
|
||||
|
||||
if (++_pos >= cur->size()) {
|
||||
_buffer.pop_front();
|
||||
delete cur;
|
||||
_pos = 0;
|
||||
std::cout << "SHRINKING BLOCK TO " << j << " ROWS\n";
|
||||
res->shrink(j);
|
||||
return res;
|
||||
|
||||
bool hasMore = ExecutionBlock::getBlock(DefaultBatchSize, DefaultBatchSize);
|
||||
|
||||
if (! hasMore) {
|
||||
try {
|
||||
emitGroup(cur, res, j);
|
||||
++j;
|
||||
delete cur;
|
||||
cur = nullptr;
|
||||
|
||||
TRI_ASSERT(j > 0);
|
||||
res->shrink(j);
|
||||
|
||||
_done = true;
|
||||
return res;
|
||||
}
|
||||
catch (...) {
|
||||
delete cur;
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
// hasMore
|
||||
|
||||
delete cur;
|
||||
cur = _buffer.front();
|
||||
}
|
||||
}
|
||||
|
||||
std::cout << "SHRINKING BLOCK TO " << j << " ROWS\n";
|
||||
TRI_ASSERT(j > 0);
|
||||
res->shrink(j);
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
private:
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief writes the current group data into the result
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
void emitGroup (AqlItemBlock const* cur,
|
||||
AqlItemBlock* res,
|
||||
size_t row) {
|
||||
size_t i = 0;
|
||||
for (auto it = _aggregateRegisters.begin(); it != _aggregateRegisters.end(); ++it) {
|
||||
res->setValue(row, (*it).first, _currentGroup.groupValues[i]);
|
||||
++i;
|
||||
}
|
||||
|
||||
if (_groupRegister > 0) {
|
||||
// emit group details
|
||||
TRI_ASSERT(_currentGroup.firstRow <= _currentGroup.lastRow);
|
||||
|
||||
auto block = cur->slice(_currentGroup.firstRow, _currentGroup.lastRow + 1);
|
||||
try {
|
||||
_currentGroup.groupBlocks.push_back(block);
|
||||
}
|
||||
catch (...) {
|
||||
delete block;
|
||||
throw;
|
||||
}
|
||||
|
||||
// finally set the group details
|
||||
res->setValue(row, _groupRegister, AqlValue::createFromBlocks(_currentGroup.groupBlocks, _variableNames));
|
||||
|
||||
// and reset the group so a new one can start
|
||||
_currentGroup.reset();
|
||||
}
|
||||
}
|
||||
|
||||
private:
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief pairs, consisting of out register and in register
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
std::vector<std::pair<RegisterId, RegisterId>> _aggregateRegisters;
|
||||
|
||||
struct {
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief details about the current group
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
struct Group {
|
||||
std::vector<AqlValue> groupValues;
|
||||
std::vector<TRI_document_collection_t const*> collections;
|
||||
}
|
||||
_currentGroup;
|
||||
|
||||
std::vector<AqlItemBlock*> groupBlocks;
|
||||
|
||||
size_t firstRow;
|
||||
size_t lastRow;
|
||||
|
||||
~Group () {
|
||||
reset();
|
||||
}
|
||||
|
||||
void initialize (size_t capacity) {
|
||||
groupValues.reserve(capacity);
|
||||
collections.reserve(capacity);
|
||||
|
||||
for (size_t i = 0; i < capacity; ++i) {
|
||||
groupValues[i] = AqlValue();
|
||||
collections[i] = nullptr;
|
||||
}
|
||||
}
|
||||
|
||||
void reset () {
|
||||
for (auto it = groupBlocks.begin(); it != groupBlocks.end(); ++it) {
|
||||
delete (*it);
|
||||
}
|
||||
groupBlocks.clear();
|
||||
}
|
||||
};
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief details about the current group
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
Group _currentGroup;
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief the optional register that contains the values for each group
|
||||
/// if no values should be returned, then this has a value of 0
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
RegisterId _groupRegister;
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief whether or not elements should be collected for each group
|
||||
/// @brief list of variables names for the registers
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
bool _collectDetails;
|
||||
std::vector<std::string> _variableNames;
|
||||
|
||||
};
|
||||
|
||||
|
|
|
@ -138,7 +138,7 @@ namespace triagens {
|
|||
/// @brief get all dependencies
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
vector<ExecutionNode*> getDependencies () const {
|
||||
std::vector<ExecutionNode*> getDependencies () const {
|
||||
return _dependencies;
|
||||
}
|
||||
|
||||
|
@ -149,6 +149,7 @@ namespace triagens {
|
|||
|
||||
bool removeDependency (ExecutionNode* ep) {
|
||||
auto it = _dependencies.begin();
|
||||
|
||||
while (it != _dependencies.end()) {
|
||||
if (*it == ep) {
|
||||
_dependencies.erase(it);
|
||||
|
@ -1023,8 +1024,12 @@ namespace triagens {
|
|||
public:
|
||||
|
||||
AggregateNode (std::vector<std::pair<Variable const*, Variable const*>> aggregateVariables,
|
||||
Variable const* outVariable)
|
||||
: ExecutionNode(), _aggregateVariables(aggregateVariables), _outVariable(outVariable) {
|
||||
Variable const* outVariable,
|
||||
std::unordered_map<VariableId, std::string const> const& variableMap)
|
||||
: ExecutionNode(),
|
||||
_aggregateVariables(aggregateVariables),
|
||||
_outVariable(outVariable),
|
||||
_variableMap(variableMap) {
|
||||
// outVariable can be a nullptr
|
||||
}
|
||||
|
||||
|
@ -1057,7 +1062,7 @@ namespace triagens {
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
virtual ExecutionNode* clone () const {
|
||||
auto c = new AggregateNode(_aggregateVariables, _outVariable);
|
||||
auto c = new AggregateNode(_aggregateVariables, _outVariable, _variableMap);
|
||||
cloneDependencies(c);
|
||||
return static_cast<ExecutionNode*>(c);
|
||||
}
|
||||
|
@ -1080,6 +1085,12 @@ namespace triagens {
|
|||
|
||||
Variable const* _outVariable;
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief map of all variable ids and names (needed to construct group data)
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
std::unordered_map<VariableId, std::string const> const _variableMap;
|
||||
|
||||
};
|
||||
|
||||
|
||||
|
|
|
@ -414,7 +414,7 @@ ExecutionNode* ExecutionPlan::fromNodeCollect (Ast const* ast,
|
|||
outVariable = static_cast<Variable*>(v->getData());
|
||||
}
|
||||
|
||||
auto en = addNode(new AggregateNode(aggregateVariables, outVariable));
|
||||
auto en = addNode(new AggregateNode(aggregateVariables, outVariable, ast->variables()->variables(false)));
|
||||
|
||||
return addDependency(previous, en);
|
||||
}
|
||||
|
|
|
@ -287,6 +287,47 @@ Json AqlValue::toJson (TRI_document_collection_t const* document) const {
|
|||
THROW_ARANGO_EXCEPTION(TRI_ERROR_INTERNAL);
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief create an AqlValue from a vector of AqlItemBlock*s
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
AqlValue AqlValue::createFromBlocks (std::vector<AqlItemBlock*> const& src,
|
||||
std::vector<std::string> const& variableNames) {
|
||||
size_t totalSize = 0;
|
||||
|
||||
for (auto it = src.begin(); it != src.end(); ++it) {
|
||||
totalSize += (*it)->size();
|
||||
}
|
||||
|
||||
auto json = new Json(Json::List, totalSize);
|
||||
|
||||
try {
|
||||
for (auto it = src.begin(); it != src.end(); ++it) {
|
||||
auto current = (*it);
|
||||
RegisterId const n = current->getNrRegs();
|
||||
|
||||
for (size_t i = 0; i < current->size(); ++i) {
|
||||
Json values(Json::Array);
|
||||
|
||||
for (RegisterId j = 0; j < n; ++j) {
|
||||
if (variableNames[j][0] != '\0') {
|
||||
// temporaries don't have a name and won't be included
|
||||
values.set(variableNames[j].c_str(), current->getValue(i, j).toJson(current->getDocumentCollection(j)));
|
||||
}
|
||||
}
|
||||
|
||||
json->add(values);
|
||||
}
|
||||
}
|
||||
|
||||
return AqlValue(json);
|
||||
}
|
||||
catch (...) {
|
||||
delete json;
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief splice multiple blocks, note that the new block now owns all
|
||||
/// AqlValue pointers in the old blocks, therefore, the latter are all
|
||||
|
|
|
@ -180,11 +180,18 @@ namespace triagens {
|
|||
|
||||
triagens::basics::Json toJson (TRI_document_collection_t const*) const;
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief create an AqlValue from a vector of AqlItemBlock*s
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
static AqlValue createFromBlocks (std::vector<AqlItemBlock*> const&,
|
||||
std::vector<std::string> const&);
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief a quick method to decide whether a value is empty
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
bool isEmpty () {
|
||||
inline bool isEmpty () const {
|
||||
return _type == EMPTY;
|
||||
}
|
||||
|
||||
|
@ -447,7 +454,7 @@ namespace triagens {
|
|||
/// @brief slice/clone
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
AqlItemBlock* slice (size_t from, size_t to) {
|
||||
AqlItemBlock* slice (size_t from, size_t to) const {
|
||||
TRI_ASSERT(from < to && to <= _nrItems);
|
||||
|
||||
std::unordered_map<AqlValue, AqlValue> cache;
|
||||
|
@ -461,7 +468,7 @@ namespace triagens {
|
|||
}
|
||||
for (size_t row = from; row < to; row++) {
|
||||
for (RegisterId col = 0; col < _nrRegs; col++) {
|
||||
AqlValue& a(_data[row * _nrRegs + col]);
|
||||
AqlValue const& a(_data[row * _nrRegs + col]);
|
||||
|
||||
if (! a.isEmpty()) {
|
||||
auto it = cache.find(a);
|
||||
|
|
|
@ -59,6 +59,25 @@ VariableGenerator::~VariableGenerator () {
|
|||
// --SECTION-- public functions
|
||||
// -----------------------------------------------------------------------------
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief return a map of all variable ids with their names
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
std::unordered_map<VariableId, std::string const> VariableGenerator::variables (bool includeTemporaries) const {
|
||||
std::unordered_map<VariableId, std::string const> result;
|
||||
|
||||
for (auto it = _variables.begin(); it != _variables.end(); ++it) {
|
||||
// check if we should include this variable...
|
||||
if (! includeTemporaries && ! (*it).second->isUserDefined()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
result.insert(std::make_pair((*it).first, (*it).second->name));
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief generate a variable
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
|
|
@ -66,6 +66,12 @@ namespace triagens {
|
|||
|
||||
public:
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief return a map of all variable ids with their names
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
std::unordered_map<VariableId, std::string const> variables (bool) const;
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief generate a variable
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -98,7 +104,6 @@ namespace triagens {
|
|||
|
||||
std::string nextName () const;
|
||||
|
||||
|
||||
// -----------------------------------------------------------------------------
|
||||
// --SECTION-- private functions
|
||||
// -----------------------------------------------------------------------------
|
||||
|
|
Loading…
Reference in New Issue