1
0
Fork 0

Merge branch 'devel' of ssh://github.com/triAGENS/ArangoDB into devel

This commit is contained in:
Max Neunhoeffer 2014-09-26 13:23:33 +02:00
commit e9bac83e67
15 changed files with 302 additions and 55 deletions

View File

@ -5,9 +5,15 @@
@startDocuBlock collectionDatabaseName
!SUBSECTION Create
<!-- arangod/V8Server/v8-vocbase.cpp -->
<!-- arangod/V8Server/v8-vocindex.cpp -->
@startDocuBlock collectionDatabaseCreate
<!-- arangod/V8Server/v8-vocindex.cpp -->
@startDocuBlock collectionCreateEdgeCollection
<!-- arangod/V8Server/v8-vocindex.cpp -->
@startDocuBlock collectionCreateDocumentCollection
!SUBSECTION All Collections
<!-- arangod/V8Server/v8-vocbase.cpp -->
@startDocuBlock collectionDatabaseNameAll

View File

@ -256,7 +256,7 @@ nobase_pkgdataACTIONS_DATA = $(shell find @srcdir@/js/actions -name "*.js" -prin
nobase_pkgdataCOMMON_DATA = $(shell find @srcdir@/js/common -name "*.js" -print)
nobase_pkgdataSERVER_DATA = $(shell find @srcdir@/js/server -name "*.js" -print)
nobase_pkgdataCLIENT_DATA = $(shell find @srcdir@/js/client -name "*.js" -print)
nobase_pkgdataNODE_DATA = $(shell find @srcdir@/js/node -type f "(" -name .travis.yml -o -name .npmignore -o -print ")")
nobase_pkgdataNODE_DATA = $(shell find @srcdir@/js/node -type f "(" -name .travis.yml -o -name .npmignore -o -print ")" | grep -v "\(htmlparser2\|js-yaml\)/test/" )
nobase_pkgdataAPPS_DATA = $(shell find @srcdir@/js/apps/system -type f "(" -path "*/test/*" -o -path "*/test_data/*" -o -path "*/coverage/*" -o -print ")")
if ENABLE_MRUBY

View File

@ -1366,7 +1366,7 @@ void IndexRangeBlock::readEdgeIndex (IndexOrCondition const& ranges) {
TRI_document_collection_t* document = _collection->documentCollection();
std::string key;
TRI_edge_direction_e direction;
TRI_edge_direction_e direction = TRI_EDGE_IN; // must set a default to satisfy compiler
for (auto x: ranges.at(0)) {
if (x._attr == std::string(TRI_VOC_ATTRIBUTE_FROM)) {
@ -2251,10 +2251,9 @@ int AggregateBlock::getOrSkipSome (size_t atLeast,
unique_ptr<AqlItemBlock> res;
if(!skipping){
size_t const curRegs = cur->getNrRegs();
res.reset(new AqlItemBlock(atMost, _varOverview->nrRegs[_depth]));
TRI_ASSERT(curRegs <= res->getNrRegs());
TRI_ASSERT(cur->getNrRegs() <= res->getNrRegs());
inheritRegisters(cur, res.get(), _pos);
}
@ -3492,6 +3491,26 @@ int GatherBlock::initializeCursor (AqlItemBlock* items, size_t pos) {
return TRI_ERROR_NO_ERROR;
}
int GatherBlock::shutdown() {
//don't call default shutdown method since it does the wrong thing to _buffer
for (auto it = _dependencies.begin(); it != _dependencies.end(); ++it) {
int res = (*it)->shutdown();
if (res != TRI_ERROR_NO_ERROR) {
return res;
}
}
for (std::deque<AqlItemBlock*> x: _buffer) {
for (AqlItemBlock* y: x) {
delete y;
}
x.clear();
}
_buffer.clear();
return TRI_ERROR_NO_ERROR;
}
int64_t GatherBlock::count () const {
int64_t sum = 0;
for (auto x: _dependencies) {
@ -3535,7 +3554,7 @@ bool GatherBlock::hasMore () {
if (_done) {
return false;
}
for (auto i = 0; i < _buffer.size(); i++){
for (size_t i = 0; i < _buffer.size(); i++){
if (!_buffer.at(i).empty()) {
return true;
} else if (getBlock(i, DefaultBatchSize, DefaultBatchSize)) {
@ -3724,7 +3743,7 @@ size_t GatherBlock::skipSome (size_t atLeast, size_t atMost) {
// renew the comparison function
OurLessThan ourLessThan(_trx, _buffer, _sortRegisters, colls);
}
_pos.at(val.first) = make_pair(val.first,0);
_pos.at(val.first) = make_pair(val.first, 0);
}
}
@ -3756,9 +3775,45 @@ bool GatherBlock::OurLessThan::operator() (std::pair<size_t, size_t> const& a,
// --SECTION-- class ScatterBlock
// -----------------------------------------------------------------------------
bool ScatterBlock::hasMoreForClient (size_t clientId){
TRI_ASSERT(0 <= clientId && clientId < _nrClients);
ScatterBlock::ScatterBlock (ExecutionEngine* engine,
ScatterNode const* ep,
std::vector<std::string> shardIds)
: ExecutionBlock(engine, ep),
_nrClients(shardIds.size()) {
for (size_t i = 0; i < _nrClients; i++) {
_shardIdMap.insert(make_pair(shardIds.at(i), i));
}
}
size_t ScatterBlock::getClientId(std::string shardId) {
auto it = _shardIdMap.find(shardId);
if (it == _shardIdMap.end()) {
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL,
"AQL: unknown shard id");
}
return ((*it).second);
}
int ScatterBlock::initializeCursor (AqlItemBlock* items, size_t pos) {
int res = ExecutionBlock::initializeCursor(items, pos);
if (res != TRI_ERROR_NO_ERROR) {
return res;
}
for (size_t i = 0; i < _nrClients; i++) {
_posForClient.push_back(std::make_pair(0, 0));
_doneForClient.push_back(false);
}
return TRI_ERROR_NO_ERROR;
}
bool ScatterBlock::hasMoreForShard (std::string shardId){
size_t clientId = getClientId(shardId);
if (_doneForClient.at(clientId)) {
return false;
}
@ -3782,21 +3837,23 @@ bool ScatterBlock::hasMore () {
return false;
}
for(size_t i = 0; i < _nrClients; i++){
if (hasMoreForClient(i)){
for(auto x: _shardIdMap) {
if (hasMoreForShard(x.first)){
return true;
}
}
_done = true;
return false;
}
int ScatterBlock::getOrSkipSomeForClient (size_t atLeast,
int ScatterBlock::getOrSkipSomeForShard (size_t atLeast,
size_t atMost, bool skipping, AqlItemBlock*& result,
size_t& skipped, size_t clientId){
size_t& skipped, std::string shardId){
TRI_ASSERT(0 < atLeast && atLeast <= atMost);
TRI_ASSERT(result == nullptr && skipped == 0);
TRI_ASSERT(0 <= clientId && clientId < _nrClients);
size_t clientId = getClientId(shardId);
if (_doneForClient.at(clientId)) {
return TRI_ERROR_NO_ERROR;
@ -3850,21 +3907,21 @@ int ScatterBlock::getOrSkipSomeForClient (size_t atLeast,
return TRI_ERROR_NO_ERROR;
}
AqlItemBlock* ScatterBlock::getSomeForClient (
size_t atLeast, size_t atMost, size_t clientId) {
AqlItemBlock* ScatterBlock::getSomeForShard (
size_t atLeast, size_t atMost, std::string shardId) {
size_t skipped = 0;
AqlItemBlock* result = nullptr;
int out = getOrSkipSome(atLeast, atMost, false, result, skipped);
int out = getOrSkipSomeForShard(atLeast, atMost, false, result, skipped, shardId);
if (out != TRI_ERROR_NO_ERROR) {
THROW_ARANGO_EXCEPTION(out);
}
return result;
}
size_t ScatterBlock::skipSomeForClient (size_t atLeast, size_t atMost, size_t clientId) {
size_t ScatterBlock::skipSomeForShard (size_t atLeast, size_t atMost, std::string shardId) {
size_t skipped = 0;
AqlItemBlock* result = nullptr;
int out = getOrSkipSome(atLeast, atMost, true, result, skipped);
int out = getOrSkipSomeForShard(atLeast, atMost, true, result, skipped, shardId);
TRI_ASSERT(result == nullptr);
if (out != TRI_ERROR_NO_ERROR) {
THROW_ARANGO_EXCEPTION(out);

View File

@ -1517,6 +1517,9 @@ public:
size_t skipSome (size_t, size_t);
// need our own shutdown method since our _buffer is different
int shutdown ();
private:
// the block is simple if we do not do merge sort . . .
@ -1579,9 +1582,7 @@ public:
ScatterBlock (ExecutionEngine* engine,
ScatterNode const* ep,
size_t nrClients)
: ExecutionBlock(engine, ep), _nrClients(nrClients){
}
std::vector<std::string> shardIds);
~ScatterBlock () {
}
@ -1590,7 +1591,7 @@ public:
return ExecutionBlock::initialize();
}
//int initializeCursor (AqlItemBlock* items, size_t pos);
int initializeCursor (AqlItemBlock* items, size_t pos);
int64_t remaining () {
return _dependencies[0]->remaining();
@ -1600,27 +1601,32 @@ public:
virtual AqlItemBlock* getSome (size_t atLeast, size_t atMost) {
TRI_ASSERT(false);
THROW_ARANGO_EXCEPTION(TRI_ERROR_NOT_IMPLEMENTED);
}
virtual size_t skipSome (size_t atLeast, size_t atMost) {
TRI_ASSERT(false);
THROW_ARANGO_EXCEPTION(TRI_ERROR_NOT_IMPLEMENTED);
}
bool hasMoreForClient (size_t clientId);
bool hasMoreForShard (std::string shardId);
int getOrSkipSomeForClient (size_t atLeast, size_t atMost,
bool skipping, AqlItemBlock*& result, size_t& skipped, size_t clientId);
int getOrSkipSomeForShard (size_t atLeast, size_t atMost,
bool skipping, AqlItemBlock*& result, size_t& skipped, std::string shardId);
size_t skipSomeForClient (size_t atLeast, size_t atMost, size_t clientId);
size_t skipSomeForShard (size_t atLeast, size_t atMost, std::string shardId);
AqlItemBlock* getSomeForClient (size_t atLeast, size_t atMost, size_t clientId);
AqlItemBlock* getSomeForShard (size_t atLeast, size_t atMost, std::string shardId);
private:
size_t getClientId(std::string shardId);
//_posForClient.at(i).second is the nr of rows of
//_buffer.at(posForClient.at(i).first) sent to the client with id <i>.
std::vector<std::pair<size_t,size_t>> _posForClient;
std::vector<bool> _doneForClient;
std::unordered_map<std::string, size_t> _shardIdMap;
size_t _nrClients;

View File

@ -1608,6 +1608,15 @@ void GatherNode::toJsonHelper (triagens::basics::Json& nodes,
return;
}
triagens::basics::Json values(triagens::basics::Json::List, _elements.size());
for (auto it = _elements.begin(); it != _elements.end(); ++it) {
triagens::basics::Json element(triagens::basics::Json::Array);
element("inVariable", (*it).first->toJson())
("ascending", triagens::basics::Json((*it).second));
values(element);
}
json("elements", values);
// And add it:
nodes(json);
}

View File

@ -2724,6 +2724,10 @@ namespace triagens {
return _elements;
}
void setElements (std::vector<std::pair<Variable const*, bool>> const src) {
_elements = src;
}
private:
////////////////////////////////////////////////////////////////////////////////

View File

@ -453,14 +453,18 @@ void Optimizer::setupRules () {
distributeInCluster,
distributeInCluster_pass10,
false);
}
if (triagens::arango::ServerState::instance()->isCoordinator()) {
// distribute operations in cluster
registerRule("distribute-filtercalc-to-cluster",
distributeFilternCalcToCluster,
distributeFilternCalcToCluster_pass10,
false);
registerRule("distribute-sort-to-cluster",
distributeSortToCluster,
distributeSortToCluster_pass10,
false);
}
}

View File

@ -141,7 +141,11 @@ namespace triagens {
// move FilterNodes & Calculation nodes inbetween
// scatter(remote) <-> gather(remote) so they're
// distributed to the cluster nodes.
distributeFilternCalcToCluster_pass10 = 1010
distributeFilternCalcToCluster_pass10 = 1010,
// move SortNodes into the distribution.
// adjust gathernode to also contain the sort criterions.
distributeSortToCluster_pass10 = 1020
};
public:

View File

@ -1683,11 +1683,10 @@ int triagens::aql::distributeInCluster (Optimizer* opt,
////////////////////////////////////////////////////////////////////////////////
/// @brief move filters up in the plan
/// @brief move filters up into the cluster distribution part of the plan
/// this rule modifies the plan in place
/// filters are moved as far up in the plan as possible to make result sets
/// as small as possible as early as possible
/// filters are not pushed beyond limits
////////////////////////////////////////////////////////////////////////////////
int triagens::aql::distributeFilternCalcToCluster (Optimizer* opt,
@ -1763,6 +1762,89 @@ int triagens::aql::distributeFilternCalcToCluster (Optimizer* opt,
return TRI_ERROR_NO_ERROR;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief move sorts up into the cluster distribution part of the plan
/// this rule modifies the plan in place
/// sorts are moved as far up in the plan as possible to make result sets
/// as small as possible as early as possible
///
/// filters are not pushed beyond limits
////////////////////////////////////////////////////////////////////////////////
int triagens::aql::distributeSortToCluster (Optimizer* opt,
ExecutionPlan* plan,
Optimizer::Rule const* rule) {
bool modified = false;
std::vector<ExecutionNode*> nodes
= plan->findNodesOfType(triagens::aql::ExecutionNode::GATHER, true);
for (auto n : nodes) {
auto remoteNodeList = n->getDependencies();
auto gatherNode = static_cast<GatherNode*>(n);
TRI_ASSERT(remoteNodeList.size() > 0);
auto rn = remoteNodeList[0];
auto parents = n->getParents();
if (parents.size() < 1) {
continue;
}
while (1) {
bool stopSearching = false;
auto inspectNode = parents[0];
switch (inspectNode->getType()) {
case EN::ENUMERATE_LIST:
case EN::SINGLETON:
case EN::AGGREGATE:
case EN::INSERT:
case EN::REMOVE:
case EN::REPLACE:
case EN::UPDATE:
case EN::CALCULATION:
case EN::FILTER:
parents = inspectNode->getParents();
continue;
case EN::SUBQUERY:
case EN::RETURN:
case EN::NORESULTS:
case EN::SCATTER:
case EN::GATHER:
case EN::ILLEGAL:
//do break
case EN::REMOTE:
case EN::LIMIT:
case EN::INDEX_RANGE:
case EN::ENUMERATE_COLLECTION:
stopSearching = true;
break;
case EN::SORT:
auto thisSortNode = static_cast<SortNode*>(inspectNode);
// remember our cursor...
parents = inspectNode->getParents();
// then unlink the filter/calculator from the plan
plan->unlinkNode(inspectNode);
// and re-insert into plan in front of the remoteNode
plan->insertDependency(rn, inspectNode);
gatherNode->setElements(thisSortNode->getElements());
modified = true;
//ready to rumble!
};
if (stopSearching) {
break;
}
}
}
if (modified) {
plan->findVarUsage();
}
opt->addPlan(plan, rule->level, modified);
return TRI_ERROR_NO_ERROR;
}
// Local Variables:
// mode: outline-minor
// outline-regexp: "^\\(/// @brief\\|/// {@inheritDoc}\\|/// @addtogroup\\|// --SECTION--\\|/// @\\}\\)"

View File

@ -113,6 +113,10 @@ namespace triagens {
int distributeInCluster (Optimizer*, ExecutionPlan*, Optimizer::Rule const*);
int distributeFilternCalcToCluster (Optimizer*, ExecutionPlan*, Optimizer::Rule const*);
int distributeSortToCluster (Optimizer*, ExecutionPlan*, Optimizer::Rule const*);
int distributeSortToCluster (Optimizer*, ExecutionPlan*, Optimizer::Rule const*);
} // namespace aql
} // namespace triagens

View File

@ -1131,6 +1131,75 @@ static v8::Handle<v8::Value> JS_NthQuery (v8::Arguments const& argv) {
return scope.Close(result);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief selects documents from a collection, hashing the document key and
/// only returning these documents which fall into a specific partition
////////////////////////////////////////////////////////////////////////////////
static v8::Handle<v8::Value> JS_Nth2Query (v8::Arguments const& argv) {
v8::HandleScope scope;
// expecting two arguments
if (argv.Length() != 2 || ! argv[0]->IsNumber() || ! argv[1]->IsNumber()) {
TRI_V8_EXCEPTION_USAGE(scope, "NTH2(<partitionId>, <numberOfPartitions>)");
}
TRI_vocbase_col_t const* col;
col = TRI_UnwrapClass<TRI_vocbase_col_t>(argv.Holder(), TRI_GetVocBaseColType());
if (col == nullptr) {
TRI_V8_EXCEPTION_INTERNAL(scope, "cannot extract collection");
}
TRI_SHARDING_COLLECTION_NOT_YET_IMPLEMENTED(scope, col);
uint64_t const partitionId = TRI_ObjectToUInt64(argv[0], false);
uint64_t const numberOfPartitions = TRI_ObjectToUInt64(argv[1], false);
if (partitionId >= numberOfPartitions || numberOfPartitions == 0) {
TRI_V8_EXCEPTION_PARAMETER(scope, "invalid value for <partitionId> or <numberOfPartitions>");
}
uint32_t total = 0;
vector<TRI_doc_mptr_copy_t> docs;
V8ReadTransaction trx(col->_vocbase, col->_cid);
int res = trx.begin();
if (res != TRI_ERROR_NO_ERROR) {
TRI_V8_EXCEPTION(scope, res);
}
res = trx.readPartition(docs, partitionId, numberOfPartitions, &total);
TRI_ASSERT(docs.empty() || trx.hasBarrier());
res = trx.finish(res);
if (res != TRI_ERROR_NO_ERROR) {
TRI_V8_EXCEPTION(scope, res);
}
size_t const n = docs.size();
uint32_t count = 0;
// setup result
v8::Handle<v8::Object> result = v8::Object::New();
v8::Handle<v8::Array> documents = v8::Array::New((int) n);
// reserve full capacity in one go
result->Set(v8::String::New("documents"), documents);
for (size_t i = 0; i < n; ++i) {
char const* key = TRI_EXTRACT_MARKER_KEY(static_cast<TRI_df_marker_t const*>(docs[i].getDataPtr()));
documents->Set(count++, v8::String::New(key));
}
result->Set(v8::String::New("total"), v8::Number::New(total));
result->Set(v8::String::New("count"), v8::Number::New(count));
return scope.Close(result);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief selects documents from a collection, using an offset into the
/// primary index. this can be used for incremental access
@ -2406,6 +2475,7 @@ void TRI_InitV8Queries (v8::Handle<v8::Context> context) {
// internal method. not intended to be used by end-users
TRI_AddMethodVocbase(rt, "NTH", JS_NthQuery, true);
TRI_AddMethodVocbase(rt, "NTH2", JS_Nth2Query, true);
// internal method. not intended to be used by end-users
TRI_AddMethodVocbase(rt, "OFFSET", JS_OffsetQuery, true);

View File

@ -1162,7 +1162,7 @@ static v8::Handle<v8::Value> JS_KeySetCas (v8::Arguments const& argv) {
auto h = &(static_cast<UserStructures*>(vocbase->_userStructures)->hashes);
int res;
bool match;
bool match = false;
{
READ_LOCKER(h->lock);
@ -1668,11 +1668,6 @@ void TRI_FreeUserStructuresVocBase (TRI_vocbase_t* vocbase) {
void TRI_InitV8UserStructures (v8::Handle<v8::Context> context) {
v8::HandleScope scope;
v8::Isolate* isolate = v8::Isolate::GetCurrent();
TRI_v8_global_t* v8g = static_cast<TRI_v8_global_t*>(isolate->GetData());
TRI_ASSERT(v8g != nullptr);
TRI_AddGlobalFunctionVocbase(context, "KEYSPACE_CREATE", JS_KeyspaceCreate);
TRI_AddGlobalFunctionVocbase(context, "KEYSPACE_DROP", JS_KeyspaceDrop);
TRI_AddGlobalFunctionVocbase(context, "KEYSPACE_COUNT", JS_KeyspaceCount);

View File

@ -1673,7 +1673,7 @@ static v8::Handle<v8::Value> CreateVocBase (v8::Arguments const& argv,
/// *false*, then the key generator will solely be responsible for
/// generating keys and supplying own key values in the *_key* attribute
/// of documents is considered an error.
/// * *{increment*: increment value for *autoincrement* key generator.
/// * *increment*: increment value for *autoincrement* key generator.
/// Not used for other key generator types.
/// * *offset*: initial offset value for *autoincrement* key generator.
/// Not used for other key generator types.
@ -1705,6 +1705,12 @@ static v8::Handle<v8::Value> CreateVocBase (v8::Arguments const& argv,
/// attribute and this can only be done efficiently if this is the
/// only shard key by delegating to the individual shards.
///
/// `db._create(collection-name, properties, type)`
///
/// Specifies the optional *type* of the collection, it can either be *document*
/// or *edge*. On default it is document. Instead of giving a type you can also use
/// *db._createEdgeCollection* or *db._createDocumentCollection*.
///
/// @EXAMPLES
///
/// With defaults:
@ -1751,14 +1757,11 @@ static v8::Handle<v8::Value> JS_CreateVocbase (v8::Arguments const& argv) {
////////////////////////////////////////////////////////////////////////////////
/// @brief creates a new document collection
/// @startDocuBlock collectionCreateDocumentaion
/// @startDocuBlock collectionCreateDocumentCollection
/// `db._createDocumentCollection(collection-name)`
///
/// `db._createDocumentCollection(collection-name, properties)`
///
/// Creates a new document collection named *collection-name*.
/// This is an alias for @ref JS_CreateVocbase, with the difference that the
/// collection type is not automatically detected.
/// Creates a new document collection named *collection-name*. If the
/// document name already exists and error is thrown.
/// @endDocuBlock
////////////////////////////////////////////////////////////////////////////////
@ -1772,7 +1775,7 @@ static v8::Handle<v8::Value> JS_CreateDocumentCollectionVocbase (v8::Arguments c
/// `db._createEdgeCollection(collection-name)`
///
/// Creates a new edge collection named *collection-name*. If the
/// collection name already exists, then an error is thrown. The default value
/// collection name already exists an error is thrown. The default value
/// for *waitForSync* is *false*.
///
/// `db._createEdgeCollection(collection-name, properties)`
@ -1781,14 +1784,11 @@ static v8::Handle<v8::Value> JS_CreateDocumentCollectionVocbase (v8::Arguments c
///
/// * *waitForSync* (optional, default *false*): If *true* creating
/// a document will only return after the data was synced to disk.
/// * *journalSize* (optional, default is a @ref CommandLineArangod
/// * *journalSize* (optional, default is
/// "configuration parameter"): The maximal size of
/// a journal or datafile. Note that this also limits the maximal
/// size of a single object. Must be at least 1MB.
/// size of a single object and must be at least 1MB.
///
/// @EXAMPLES
///
/// See @ref JS_CreateVocbase for example.
/// @endDocuBlock
////////////////////////////////////////////////////////////////////////////////

View File

@ -374,9 +374,15 @@ static TRI_shape_t const* FindShape (TRI_shaper_t* shaper,
TRI_shape_t const* result = reinterpret_cast<TRI_shape_t const*>(m);
void* f = TRI_InsertKeyAssociativeSynced(&s->_shapeIds, &sid, (void*) m, false);
if (f != nullptr) {
LOG_ERROR("logic error when inserting shape");
}
TRI_ASSERT(f == nullptr);
f = TRI_InsertElementAssociativeSynced(&s->_shapeDictionary, (void*) m, false);
if (f != nullptr) {
LOG_ERROR("logic error when inserting shape");
}
TRI_ASSERT(f == nullptr);
TRI_Free(TRI_UNKNOWN_MEM_ZONE, shape);

View File

@ -597,7 +597,7 @@ testFuncs.single_server = function (options) {
print("Shutting down...");
shutdownInstance(instanceInfo,options);
print("done.");
return result
return result;
}
else {
return { status: false, message: "No test specified!"};
@ -622,7 +622,7 @@ testFuncs.single_client = function (options) {
print("Shutting down...");
shutdownInstance(instanceInfo,options);
print("done.");
return result
return result;
}
else {
return { status: false, message: "No test specified!"};