diff --git a/arangod/Cluster/v8-cluster.cpp b/arangod/Cluster/v8-cluster.cpp index b40fa5efbd..122d438c83 100644 --- a/arangod/Cluster/v8-cluster.cpp +++ b/arangod/Cluster/v8-cluster.cpp @@ -1138,8 +1138,10 @@ static void PrepareClusterCommRequest ( TRI_v8_global_t* v8g = (TRI_v8_global_t*) v8::Isolate::GetCurrent()->GetData(); + assert(argv.Length() >= 4); + reqType = triagens::rest::HttpRequest::HTTP_REQUEST_GET; - if (argv.Length() > 0 && argv[0]->IsString()) { + if (argv[0]->IsString()) { TRI_Utf8ValueNFC UTF8(TRI_UNKNOWN_MEM_ZONE, argv[0]); string methstring = *UTF8; reqType = triagens::rest::HttpRequest::translateMethod(methstring); @@ -1148,28 +1150,11 @@ static void PrepareClusterCommRequest ( } } - destination.clear(); - if (argv.Length() > 1) { - destination = TRI_ObjectToString(argv[1]); - } - if (destination == "") { - destination = "shard:shardBlubb"; - } + destination = TRI_ObjectToString(argv[1]); - string dbname; - if (argv.Length() > 2) { - dbname = TRI_ObjectToString(argv[2]); - } - if (dbname == "") { - dbname = "_system"; - } - path.clear(); - if (argv.Length() > 3) { - path = TRI_ObjectToString(argv[3]); - } - if (path == "") { - path = "/_admin/version"; - } + string dbname = TRI_ObjectToString(argv[2]); + + path = TRI_ObjectToString(argv[3]); path = "/_db/" + dbname + path; body.clear(); @@ -1404,7 +1389,7 @@ static v8::Handle JS_SyncRequest (v8::Arguments const& argv) { CoordTransactionID coordTransactionID; double timeout; - PrepareClusterCommRequest(argv, reqType, destination, path, body,headerFields, + PrepareClusterCommRequest(argv, reqType, destination, path, body, headerFields, clientTransactionID, coordTransactionID, timeout); ClusterCommResult const* res; @@ -1435,7 +1420,7 @@ static v8::Handle JS_Enquire (v8::Arguments const& argv) { v8::HandleScope scope; if (argv.Length() != 1) { - TRI_V8_EXCEPTION_USAGE(scope, "wait(operationID)"); + TRI_V8_EXCEPTION_USAGE(scope, "enquire(operationID)"); } if (ServerState::instance()->getRole() != ServerState::ROLE_COORDINATOR) { diff --git a/js/server/modules/org/arangodb/cluster.js b/js/server/modules/org/arangodb/cluster.js index 23439501f4..44acbf6781 100644 --- a/js/server/modules/org/arangodb/cluster.js +++ b/js/server/modules/org/arangodb/cluster.js @@ -1,5 +1,5 @@ /*jslint indent: 2, nomen: true, maxlen: 100, sloppy: true, vars: true, white: true, plusplus: true */ -/*global ArangoAgency, ArangoServerState, require, exports */ +/*global ArangoAgency, ArangoClusterComm, ArangoClusterInfo, ArangoServerState, require, exports */ //////////////////////////////////////////////////////////////////////////////// /// @brief JavaScript cluster functionality @@ -30,7 +30,6 @@ var console = require("console"); var arangodb = require("org/arangodb"); -var db = arangodb.db; var ArangoCollection = arangodb.ArangoCollection; //////////////////////////////////////////////////////////////////////////////// @@ -173,6 +172,7 @@ function writeLocked (lockInfo, cb, args) { function getLocalDatabases () { var result = { }; + var db = require("internal").db; db._listDatabases().forEach(function (database) { result[database] = { name: database }; @@ -187,6 +187,7 @@ function getLocalDatabases () { function getLocalCollections () { var result = { }; + var db = require("internal").db; db._collections().forEach(function (collection) { var name = collection.name(); @@ -226,6 +227,7 @@ function createLocalDatabases (plannedDatabases) { payload); }; + var db = require("internal").db; db._useDatabase("_system"); var localDatabases = getLocalDatabases(); @@ -282,6 +284,7 @@ function dropLocalDatabases (plannedDatabases) { } }; + var db = require("internal").db; db._useDatabase("_system"); var localDatabases = getLocalDatabases(); @@ -320,6 +323,7 @@ function cleanupCurrentDatabases () { } }; + var db = require("internal").db; db._useDatabase("_system"); var all = ArangoAgency.get("Current/Databases", true); @@ -370,6 +374,7 @@ function createLocalCollections (plannedCollections) { payload); }; + var db = require("internal").db; db._useDatabase("_system"); var localDatabases = getLocalDatabases(); var database; @@ -529,6 +534,7 @@ function dropLocalCollections (plannedCollections) { } }; + var db = require("internal").db; db._useDatabase("_system"); var shardMap = getShardMap(plannedCollections); @@ -599,6 +605,7 @@ function cleanupCurrentCollections (plannedCollections) { } }; + var db = require("internal").db; db._useDatabase("_system"); var all = ArangoAgency.get("Current/Collections", true); @@ -661,13 +668,74 @@ function handleChanges (plan, current) { handleDatabaseChanges(plan, current); handleCollectionChanges(plan, current); } + +//////////////////////////////////////////////////////////////////////////////// +/// @brief retrieve a list of shards for a collection +//////////////////////////////////////////////////////////////////////////////// +var shardList = function (dbName, collectionName) { + var ci = ArangoClusterInfo.getCollectionInfo(dbName, collectionName); + + if (ci === undefined || typeof ci !== 'object') { + throw "unable to determine shard list for '" + dbName + "/" + collectionName + "'"; + } + + var shards = [ ], shard; + for (shard in ci.shards) { + if (ci.shards.hasOwnProperty(shard)) { + shards.push(shard); + } + } + + return shards; +}; + +//////////////////////////////////////////////////////////////////////////////// +/// @brief wait for a distributed response +//////////////////////////////////////////////////////////////////////////////// + +var wait = function (data, shards) { + var received = [ ]; + + while (received.length < shards.length) { + var result = ArangoClusterComm.wait(data); + var status = result.status; + + if (status === "ERROR") { + throw "received an error"; + } + else if (status === "TIMEOUT") { + throw "received a timeout"; + } + else if (status === "DROPPED") { + throw "operation was dropped"; + } + else if (status === "RECEIVED") { + received.push(result); + } + else { + // something else... wait without GC + require("internal").print(result); + require("internal").wait(0.1, false); + } + } + + return received; +}; + +//////////////////////////////////////////////////////////////////////////////// +/// @brief whether or not clustering is enabled +//////////////////////////////////////////////////////////////////////////////// var isCluster = function () { return (typeof ArangoServerState !== "undefined" && ArangoServerState.initialised()); }; +//////////////////////////////////////////////////////////////////////////////// +/// @brief whether or not we are a coordinator +//////////////////////////////////////////////////////////////////////////////// + var isCoordinator = function () { if (! isCluster()) { return false; @@ -718,6 +786,8 @@ var handlePlanChange = function () { } }; +exports.shardList = shardList; +exports.wait = wait; exports.isCluster = isCluster; exports.isCoordinator = isCoordinator; exports.role = role; diff --git a/js/server/modules/org/arangodb/simple-query.js b/js/server/modules/org/arangodb/simple-query.js index ce115781bb..8b39e695fc 100644 --- a/js/server/modules/org/arangodb/simple-query.js +++ b/js/server/modules/org/arangodb/simple-query.js @@ -1,5 +1,5 @@ /*jslint indent: 2, nomen: true, maxlen: 100, sloppy: true, vars: true, white: true, plusplus: true */ -/*global require, exports */ +/*global require, exports, ArangoClusterComm, ArangoClusterInfo */ //////////////////////////////////////////////////////////////////////////////// /// @brief Arango Simple Query Language @@ -63,14 +63,74 @@ var SimpleQueryWithin = sq.SimpleQueryWithin; //////////////////////////////////////////////////////////////////////////////// SimpleQueryAll.prototype.execute = function () { - var documents; - if (this._execution === null) { if (this._skip === null) { this._skip = 0; } + + var documents; + var cluster = require("org/arangodb/cluster"); + + if (cluster.isCoordinator()) { + var dbName = require("internal").db._name(); + var shards = cluster.shardList(dbName, this._collection.name()); + var coord = { coordTransactionID: ArangoClusterInfo.uniqid() }; + var options = { coordTransactionID: coord.coordTransactionID, timeout: 360 }; + + shards.forEach(function (shard) { + ArangoClusterComm.asyncRequest("put", + "shard:" + shard, + dbName, + "/_api/simple/all", + JSON.stringify({ + collection: shard, + skip: 0, + limit: this._skip + this._limit + }), + { }, + options); + }); - documents = this._collection.ALL(this._skip, this._limit); + var _documents = [ ], total = 0; + var result = cluster.wait(coord, shards); + var toSkip = this._skip, toLimit = this._limit; + + result.forEach(function(part) { + var body = JSON.parse(part.body); + total += body.total; + + if (toSkip > 0) { + if (toSkip >= body.result.length) { + toSkip -= body.result.length; + return; + } + + body.result = body.result.slice(toSkip); + toSkip = 0; + } + + if (toLimit !== null && toLimit !== undefined) { + if (body.result.length >= toLimit) { + body.result = body.result.slice(0, toLimit); + toLimit = 0; + } + else { + toLimit -= body.result.length; + } + } + + _documents = _documents.concat(body.result); + }); + + documents = { + documents: _documents, + count: _documents.length, + total: total + }; + } + else { + documents = this._collection.ALL(this._skip, this._limit); + } this._execution = new GeneralArrayCursor(documents.documents); this._countQuery = documents.count; diff --git a/lib/V8/v8-utils.cpp b/lib/V8/v8-utils.cpp index e141c461f5..53afc4aa3e 100644 --- a/lib/V8/v8-utils.cpp +++ b/lib/V8/v8-utils.cpp @@ -2154,27 +2154,41 @@ static v8::Handle JS_Wait (v8::Arguments const& argv) { v8::HandleScope scope; // extract arguments - if (argv.Length() != 1) { - TRI_V8_EXCEPTION_USAGE(scope, "wait()"); + if (argv.Length() < 1) { + TRI_V8_EXCEPTION_USAGE(scope, "wait(, )"); } double n = TRI_ObjectToDouble(argv[0]); double until = TRI_microtime() + n; - v8::V8::LowMemoryNotification(); - while(! v8::V8::IdleNotification()) { + bool gc = true; // default is to trigger the gc + if (argv.Length() > 1) { + gc = TRI_ObjectToBoolean(argv[1]); } - size_t i = 0; - while (TRI_microtime() < until) { - if (++i % 1000 == 0) { - // garbage collection only every x iterations, otherwise we'll use too much CPU - v8::V8::LowMemoryNotification(); - while(! v8::V8::IdleNotification()) { - } + if (gc) { + // wait with gc + v8::V8::LowMemoryNotification(); + while(! v8::V8::IdleNotification()) { } - usleep(100); + size_t i = 0; + while (TRI_microtime() < until) { + if (++i % 1000 == 0) { + // garbage collection only every x iterations, otherwise we'll use too much CPU + v8::V8::LowMemoryNotification(); + while(! v8::V8::IdleNotification()) { + } + } + + usleep(100); + } + } + else { + // wait without gc + while (TRI_microtime() < until) { + usleep(100); + } } return scope.Close(v8::Undefined());