1
0
Fork 0

Merge branch 'aql2' of ssh://github.com/triAGENS/ArangoDB into aql2

This commit is contained in:
James 2014-08-08 14:32:31 +02:00
commit 1be93b54a9
4 changed files with 307 additions and 203 deletions

View File

@ -703,6 +703,10 @@ namespace triagens {
if (_done) { if (_done) {
return nullptr; return nullptr;
} }
std::cout << _depth << " ";
for (auto i: _varOverview->nrRegs)
std::cout << i << " ";
std::cout << "\n";
AqlItemBlock* res = new AqlItemBlock(1, _varOverview->nrRegs[_depth]); AqlItemBlock* res = new AqlItemBlock(1, _varOverview->nrRegs[_depth]);
if (_inputRegisterValues != nullptr) { if (_inputRegisterValues != nullptr) {
@ -1475,10 +1479,21 @@ namespace triagens {
return nullptr; return nullptr;
} }
for (size_t i = 0; i < res->size(); i++) { for (size_t i = 0; i < res->size(); i++) {
_subquery->initialize(); int ret;
_subquery->bind(res, i); ret = _subquery->initialize();
_subquery->execute(); if (ret == TRI_ERROR_NO_ERROR) {
ret = _subquery->bind(res, i);
}
if (ret == TRI_ERROR_NO_ERROR) {
ret = _subquery->execute();
}
if (ret != TRI_ERROR_NO_ERROR) {
delete res;
THROW_ARANGO_EXCEPTION(ret);
}
auto results = new std::vector<AqlItemBlock*>; auto results = new std::vector<AqlItemBlock*>;
try {
do { do {
auto tmp = _subquery->getSome(DefaultBatchSize, DefaultBatchSize); auto tmp = _subquery->getSome(DefaultBatchSize, DefaultBatchSize);
if (tmp == nullptr) { if (tmp == nullptr) {
@ -1487,9 +1502,21 @@ namespace triagens {
results->push_back(tmp); results->push_back(tmp);
} }
while(true); while(true);
}
catch (...) {
delete res;
delete results;
throw;
}
res->setValue(i, _outReg, AqlValue(results)); res->setValue(i, _outReg, AqlValue(results));
_subquery->shutdown();
ret = _subquery->shutdown();
if (ret != TRI_ERROR_NO_ERROR) {
delete res;
THROW_ARANGO_EXCEPTION(ret);
}
} }
return res; return res;
} }
@ -1805,29 +1832,6 @@ namespace triagens {
return TRI_ERROR_NO_ERROR; return TRI_ERROR_NO_ERROR;
} }
/*
void emitRow (AqlItemBlock* res,
size_t j,
AqlItemBlock* cur,
void* row) {
std::cout << "EMIT ROW: " << j << "\n";
if (j == 0) {
// first row in block
TRI_ASSERT(cur->getNrRegs() <= res->getNrRegs());
inheritRegisters(cur, res, _pos);
}
for (auto it = _aggregateRegisters.begin(); it != _aggregateRegisters.end(); ++it) {
// TODO: return the actual group values
res->setValue(j, (*it).first, AqlValue(new basics::Json(42)));
}
clearGroup();
}
*/
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @brief getSome /// @brief getSome
@ -1874,9 +1878,11 @@ std::cout << "POS: " << _pos << "\n";
if (_currentGroup.groupValues[0].isEmpty()) { if (_currentGroup.groupValues[0].isEmpty()) {
// we never had any previous group // we never had any previous group
newGroup = true; newGroup = true;
std::cout << "NEED TO CREATE NEW GROUP\n";
} }
else { else {
// we already had a group, check if the group has changed // we already had a group, check if the group has changed
std::cout << "HAVE A GROUP\n";
size_t i = 0; size_t i = 0;
for (auto it = _aggregateRegisters.begin(); it != _aggregateRegisters.end(); ++it) { for (auto it = _aggregateRegisters.begin(); it != _aggregateRegisters.end(); ++it) {
@ -1894,13 +1900,18 @@ std::cout << "POS: " << _pos << "\n";
} }
if (newGroup) { if (newGroup) {
std::cout << "CREATING GROUP...\n";
if (! _currentGroup.groupValues[0].isEmpty()) { if (! _currentGroup.groupValues[0].isEmpty()) {
// need to emit the current group first // need to emit the current group first
std::cout << "EMITTING OLD GROUP INTO ROW #" << j << "\n";
size_t i = 0; size_t i = 0;
for (auto it = _aggregateRegisters.begin(); it != _aggregateRegisters.end(); ++it) { for (auto it = _aggregateRegisters.begin(); it != _aggregateRegisters.end(); ++it) {
std::cout << "REGISTER #" << (*it).first << "\n";
res->setValue(j, (*it).first, _currentGroup.groupValues[i]); res->setValue(j, (*it).first, _currentGroup.groupValues[i]);
++i; ++i;
} }
++j;
} }
// construct the new group // construct the new group
@ -1908,7 +1919,7 @@ std::cout << "POS: " << _pos << "\n";
for (auto it = _aggregateRegisters.begin(); it != _aggregateRegisters.end(); ++it) { for (auto it = _aggregateRegisters.begin(); it != _aggregateRegisters.end(); ++it) {
_currentGroup.groupValues[i] = cur->getValue(_pos, (*it).second).clone(); _currentGroup.groupValues[i] = cur->getValue(_pos, (*it).second).clone();
_currentGroup.collections[i] = cur->getDocumentCollection((*it).second); _currentGroup.collections[i] = cur->getDocumentCollection((*it).second);
// std::cout << "GROUP VALUE #" << i << ": " << _currentGroup.groupValues[i].toString(_currentGroup.collections[i]) << "\n"; std::cout << "GROUP VALUE #" << i << ": " << _currentGroup.groupValues[i].toString(_currentGroup.collections[i]) << "\n";
++i; ++i;
} }
} }
@ -1917,16 +1928,19 @@ std::cout << "POS: " << _pos << "\n";
// _currentGroup.groupDetails. // _currentGroup.groupDetails.
} }
++j;
if (++_pos >= cur->size()) { if (++_pos >= cur->size()) {
_buffer.pop_front(); _buffer.pop_front();
delete cur; delete cur;
_pos = 0; _pos = 0;
std::cout << "SHRINKING BLOCK TO " << j << " ROWS\n";
res->shrink(j);
return res; return res;
} }
} }
std::cout << "SHRINKING BLOCK TO " << j << " ROWS\n";
res->shrink(j);
return res; return res;
} }

View File

@ -123,14 +123,41 @@ v8::Handle<v8::Value> AqlValue::toV8 (AQL_TRANSACTION_V8* trx,
} }
case DOCVEC: { case DOCVEC: {
THROW_ARANGO_EXCEPTION(TRI_ERROR_NOT_IMPLEMENTED); // calculate the result list length
size_t totalSize = 0;
for (auto it = _vector->begin(); it != _vector->end(); ++it) {
totalSize += (*it)->size();
}
// allocate the result list
v8::Handle<v8::Array> result = v8::Array::New(static_cast<int>(totalSize));
uint32_t j = 0; // output row count
for (auto it = _vector->begin(); it != _vector->end(); ++it) {
auto current = (*it);
size_t const n = current->size();
auto vecCollection = current->getDocumentCollection(0);
for (size_t i = 0; i < n; ++i) {
result->Set(j++, current->getValue(i, 0).toV8(trx, vecCollection));
}
}
return result;
} }
case RANGE: { case RANGE: {
v8::Handle<v8::Array> values = v8::Array::New(); TRI_ASSERT(_range != nullptr);
// TODO: fill range
THROW_ARANGO_EXCEPTION(TRI_ERROR_NOT_IMPLEMENTED); // allocate the buffer for the result
return values; int64_t const n = _range->_high - _range->_low + 1;
v8::Handle<v8::Array> result = v8::Array::New(static_cast<int>(n));
uint32_t j = 0; // output row count
for (int64_t i = _range->_low; i <= _range->_high; ++i) {
// is it safe to use a double here (precision loss)?
result->Set(j++, v8::Number::New(static_cast<double>(i)));
}
return result;
} }
case EMPTY: { case EMPTY: {
@ -281,6 +308,7 @@ AqlItemBlock* AqlItemBlock::splice (std::vector<AqlItemBlock*>& blocks) {
} }
pos += (*it)->size(); pos += (*it)->size();
} }
return res; return res;
} }
@ -288,36 +316,44 @@ AqlItemBlock* AqlItemBlock::splice (std::vector<AqlItemBlock*>& blocks) {
/// @brief 3-way comparison for AqlValue objects /// @brief 3-way comparison for AqlValue objects
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
int triagens::aql::CompareAqlValues ( AqlValue const& left, int triagens::aql::CompareAqlValues (AqlValue const& left,
TRI_document_collection_t const* leftcoll, TRI_document_collection_t const* leftcoll,
AqlValue const& right, AqlValue const& right,
TRI_document_collection_t const* rightcoll ) { TRI_document_collection_t const* rightcoll) {
if (left._type != right._type) { if (left._type != right._type) {
if (left._type == AqlValue::EMPTY) { if (left._type == AqlValue::EMPTY) {
return -1; return -1;
} }
if (right._type == AqlValue::EMPTY) { if (right._type == AqlValue::EMPTY) {
return 1; return 1;
} }
if (left._type == AqlValue::JSON && right._type == AqlValue::SHAPED) { if (left._type == AqlValue::JSON && right._type == AqlValue::SHAPED) {
triagens::basics::Json rjson = right.toJson(rightcoll); triagens::basics::Json rjson = right.toJson(rightcoll);
return TRI_CompareValuesJson(left._json->json(), rjson.json()); return TRI_CompareValuesJson(left._json->json(), rjson.json());
} }
if (left._type == AqlValue::SHAPED && right._type == AqlValue::JSON) { if (left._type == AqlValue::SHAPED && right._type == AqlValue::JSON) {
triagens::basics::Json ljson = left.toJson(leftcoll); triagens::basics::Json ljson = left.toJson(leftcoll);
return TRI_CompareValuesJson(ljson.json(), right._json->json()); return TRI_CompareValuesJson(ljson.json(), right._json->json());
} }
// No other comparisons are defined // No other comparisons are defined
TRI_ASSERT(false); TRI_ASSERT(false);
} }
// if we get here, types are equal
switch (left._type) { switch (left._type) {
case AqlValue::EMPTY: { case AqlValue::EMPTY: {
return 0; return 0;
} }
case AqlValue::JSON: { case AqlValue::JSON: {
return TRI_CompareValuesJson(left._json->json(), right._json->json()); return TRI_CompareValuesJson(left._json->json(), right._json->json());
} }
case AqlValue::SHAPED: { case AqlValue::SHAPED: {
TRI_shaped_json_t l; TRI_shaped_json_t l;
TRI_shaped_json_t r; TRI_shaped_json_t r;
@ -327,6 +363,7 @@ int triagens::aql::CompareAqlValues ( AqlValue const& left,
return TRI_CompareShapeTypes(nullptr, nullptr, &l, leftcoll->getShaper(), return TRI_CompareShapeTypes(nullptr, nullptr, &l, leftcoll->getShaper(),
nullptr, nullptr, &r, rightcoll->getShaper()); nullptr, nullptr, &r, rightcoll->getShaper());
} }
case AqlValue::DOCVEC: { case AqlValue::DOCVEC: {
// use lexicographic ordering of AqlValues regardless of block, // use lexicographic ordering of AqlValues regardless of block,
// DOCVECs have a single register coming from ReturnNode. // DOCVECs have a single register coming from ReturnNode.
@ -335,34 +372,36 @@ int triagens::aql::CompareAqlValues ( AqlValue const& left,
size_t rblock = 0; size_t rblock = 0;
size_t ritem = 0; size_t ritem = 0;
while( lblock < left._vector->size() && rblock < right._vector->size() ){ while (lblock < left._vector->size() &&
rblock < right._vector->size()) {
AqlValue lval = left._vector->at(lblock)->getValue(litem, 0); AqlValue lval = left._vector->at(lblock)->getValue(litem, 0);
AqlValue rval = right._vector->at(rblock)->getValue(ritem, 0); AqlValue rval = right._vector->at(rblock)->getValue(ritem, 0);
int cmp = CompareAqlValues(lval, int cmp = CompareAqlValues(lval,
left._vector->at(lblock)->getDocumentCollection(0), left._vector->at(lblock)->getDocumentCollection(0),
rval, rval,
right._vector->at(rblock)->getDocumentCollection(0)); right._vector->at(rblock)->getDocumentCollection(0));
if(cmp != 0){ if (cmp != 0){
return cmp; return cmp;
} }
if(++litem == left._vector->size()){ if (++litem == left._vector->size()) {
litem = 0; litem = 0;
lblock++; lblock++;
} }
if(++ritem == right._vector->size()){ if (++ritem == right._vector->size()) {
ritem = 0; ritem = 0;
rblock++; rblock++;
} }
} }
if(lblock == left._vector->size() && rblock == right._vector->size()){ if (lblock == left._vector->size() &&
rblock == right._vector->size()){
return 0; return 0;
} }
return (lblock < left._vector->size()?-1:1); return (lblock < left._vector->size() ? -1 : 1);
} }
case AqlValue::RANGE: { case AqlValue::RANGE: {
if(left._range->_low < right._range->_low){ if (left._range->_low < right._range->_low){
return -1; return -1;
} }
if (left._range->_low > right._range->_low){ if (left._range->_low > right._range->_low){
@ -376,13 +415,12 @@ int triagens::aql::CompareAqlValues ( AqlValue const& left,
} }
return 0; return 0;
} }
default: { default: {
TRI_ASSERT(false); TRI_ASSERT(false);
return 0; return 0;
} }
} }
} }
// Local Variables: // Local Variables:

View File

@ -330,9 +330,11 @@ namespace triagens {
~AqlItemBlock () { ~AqlItemBlock () {
std::unordered_set<AqlValue> cache; std::unordered_set<AqlValue> cache;
for (size_t i = 0; i < _nrItems * _nrRegs; i++) { for (size_t i = 0; i < _nrItems * _nrRegs; i++) {
if (! _data[i].isEmpty()) { if (! _data[i].isEmpty()) {
auto it = cache.find(_data[i]); auto it = cache.find(_data[i]);
if (it == cache.end()) { if (it == cache.end()) {
cache.insert(_data[i]); cache.insert(_data[i]);
_data[i].destroy(); _data[i].destroy();
@ -341,6 +343,31 @@ namespace triagens {
} }
} }
////////////////////////////////////////////////////////////////////////////////
/// @brief shrink the block to the specified number of rows
////////////////////////////////////////////////////////////////////////////////
void shrink (size_t nrItems) {
if (nrItems == _nrItems) {
// nothing to do
return;
}
if (nrItems > _nrItems) {
// cannot use shrink() to increase the size of the block
THROW_ARANGO_EXCEPTION(TRI_ERROR_INTERNAL);
}
// erase all stored values in the region that we freed
for (size_t i = nrItems; i < _nrItems; ++i) {
for (RegisterId j = 0; j < _nrRegs; ++j) {
eraseValue(i, j);
}
}
// adjust the size of the block
_nrItems = nrItems;
}
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @brief getValue, get the value of a register /// @brief getValue, get the value of a register
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
@ -354,7 +381,8 @@ namespace triagens {
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
void setValue (size_t index, RegisterId varNr, AqlValue value) { void setValue (size_t index, RegisterId varNr, AqlValue value) {
TRI_ASSERT(_data[index * _nrRegs + varNr].isEmpty()); TRI_ASSERT(_data.capacity() > index * _nrRegs + varNr);
TRI_ASSERT(_data.at(index * _nrRegs + varNr).isEmpty());
_data[index * _nrRegs + varNr] = value; _data[index * _nrRegs + varNr] = value;
} }
@ -423,7 +451,11 @@ namespace triagens {
TRI_ASSERT(from < to && to <= _nrItems); TRI_ASSERT(from < to && to <= _nrItems);
std::unordered_map<AqlValue, AqlValue> cache; std::unordered_map<AqlValue, AqlValue> cache;
auto res = new AqlItemBlock(to - from, _nrRegs);
AqlItemBlock* res = nullptr;
try {
res = new AqlItemBlock(to - from, _nrRegs);
for (RegisterId col = 0; col < _nrRegs; col++) { for (RegisterId col = 0; col < _nrRegs; col++) {
res->_docColls[col] = _docColls[col]; res->_docColls[col] = _docColls[col];
} }
@ -444,8 +476,14 @@ namespace triagens {
} }
} }
} }
return res; return res;
} }
catch (...) {
delete res;
throw;
}
}
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @brief slice/clone for a subset /// @brief slice/clone for a subset
@ -455,7 +493,11 @@ namespace triagens {
TRI_ASSERT(from < to && to <= chosen.size()); TRI_ASSERT(from < to && to <= chosen.size());
std::unordered_map<AqlValue, AqlValue> cache; std::unordered_map<AqlValue, AqlValue> cache;
auto res = new AqlItemBlock(to - from, _nrRegs);
AqlItemBlock* res = nullptr;
try {
res = new AqlItemBlock(to - from, _nrRegs);
for (RegisterId col = 0; col < _nrRegs; col++) { for (RegisterId col = 0; col < _nrRegs; col++) {
res->_docColls[col] = _docColls[col]; res->_docColls[col] = _docColls[col];
} }
@ -476,8 +518,14 @@ namespace triagens {
} }
} }
} }
return res; return res;
} }
catch (...) {
delete res;
throw;
}
}
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @brief splice multiple blocks, note that the new block now owns all /// @brief splice multiple blocks, note that the new block now owns all
@ -485,7 +533,7 @@ namespace triagens {
/// set to nullptr, just to be sure. /// set to nullptr, just to be sure.
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
static AqlItemBlock* splice(std::vector<AqlItemBlock*>& blocks); static AqlItemBlock* splice (std::vector<AqlItemBlock*>& blocks);
}; };

View File

@ -35,13 +35,17 @@
#include "Ahuacatl/ahuacatl-collections.h" #include "Ahuacatl/ahuacatl-collections.h"
#include "Ahuacatl/ahuacatl-explain.h" #include "Ahuacatl/ahuacatl-explain.h"
#include "Aql/ExecutionBlock.h"
#include "Aql/Query.h" #include "Aql/Query.h"
#include "Basics/Utf8Helper.h" #include "Basics/Utf8Helper.h"
#include "HttpServer/ApplicationEndpointServer.h"
#include "BasicsC/conversions.h"
#include "BasicsC/json-utilities.h"
#include "Utils/transactions.h"
#include "Utils/AhuacatlGuard.h" #include "Utils/AhuacatlGuard.h"
#include "Utils/AhuacatlTransaction.h" #include "Utils/AhuacatlTransaction.h"
#include "Utils/V8ResolverGuard.h" #include "Utils/V8ResolverGuard.h"
#include "HttpServer/ApplicationEndpointServer.h"
#include "V8/v8-conv.h" #include "V8/v8-conv.h"
#include "V8/v8-execution.h" #include "V8/v8-execution.h"
#include "V8/v8-utils.h" #include "V8/v8-utils.h"