1
0
Fork 0

Merge branch 'sharding' of ssh://github.com/triAGENS/ArangoDB into sharding

This commit is contained in:
Max Neunhoeffer 2014-01-29 16:23:45 +01:00
commit 22f0ab02db
4 changed files with 171 additions and 42 deletions

View File

@ -1138,8 +1138,10 @@ static void PrepareClusterCommRequest (
TRI_v8_global_t* v8g = (TRI_v8_global_t*)
v8::Isolate::GetCurrent()->GetData();
assert(argv.Length() >= 4);
reqType = triagens::rest::HttpRequest::HTTP_REQUEST_GET;
if (argv.Length() > 0 && argv[0]->IsString()) {
if (argv[0]->IsString()) {
TRI_Utf8ValueNFC UTF8(TRI_UNKNOWN_MEM_ZONE, argv[0]);
string methstring = *UTF8;
reqType = triagens::rest::HttpRequest::translateMethod(methstring);
@ -1148,28 +1150,11 @@ static void PrepareClusterCommRequest (
}
}
destination.clear();
if (argv.Length() > 1) {
destination = TRI_ObjectToString(argv[1]);
}
if (destination == "") {
destination = "shard:shardBlubb";
}
destination = TRI_ObjectToString(argv[1]);
string dbname;
if (argv.Length() > 2) {
dbname = TRI_ObjectToString(argv[2]);
}
if (dbname == "") {
dbname = "_system";
}
path.clear();
if (argv.Length() > 3) {
path = TRI_ObjectToString(argv[3]);
}
if (path == "") {
path = "/_admin/version";
}
string dbname = TRI_ObjectToString(argv[2]);
path = TRI_ObjectToString(argv[3]);
path = "/_db/" + dbname + path;
body.clear();
@ -1404,7 +1389,7 @@ static v8::Handle<v8::Value> JS_SyncRequest (v8::Arguments const& argv) {
CoordTransactionID coordTransactionID;
double timeout;
PrepareClusterCommRequest(argv, reqType, destination, path, body,headerFields,
PrepareClusterCommRequest(argv, reqType, destination, path, body, headerFields,
clientTransactionID, coordTransactionID, timeout);
ClusterCommResult const* res;
@ -1435,7 +1420,7 @@ static v8::Handle<v8::Value> JS_Enquire (v8::Arguments const& argv) {
v8::HandleScope scope;
if (argv.Length() != 1) {
TRI_V8_EXCEPTION_USAGE(scope, "wait(operationID)");
TRI_V8_EXCEPTION_USAGE(scope, "enquire(operationID)");
}
if (ServerState::instance()->getRole() != ServerState::ROLE_COORDINATOR) {

View File

@ -1,5 +1,5 @@
/*jslint indent: 2, nomen: true, maxlen: 100, sloppy: true, vars: true, white: true, plusplus: true */
/*global ArangoAgency, ArangoServerState, require, exports */
/*global ArangoAgency, ArangoClusterComm, ArangoClusterInfo, ArangoServerState, require, exports */
////////////////////////////////////////////////////////////////////////////////
/// @brief JavaScript cluster functionality
@ -30,7 +30,6 @@
var console = require("console");
var arangodb = require("org/arangodb");
var db = arangodb.db;
var ArangoCollection = arangodb.ArangoCollection;
////////////////////////////////////////////////////////////////////////////////
@ -173,6 +172,7 @@ function writeLocked (lockInfo, cb, args) {
function getLocalDatabases () {
var result = { };
var db = require("internal").db;
db._listDatabases().forEach(function (database) {
result[database] = { name: database };
@ -187,6 +187,7 @@ function getLocalDatabases () {
function getLocalCollections () {
var result = { };
var db = require("internal").db;
db._collections().forEach(function (collection) {
var name = collection.name();
@ -226,6 +227,7 @@ function createLocalDatabases (plannedDatabases) {
payload);
};
var db = require("internal").db;
db._useDatabase("_system");
var localDatabases = getLocalDatabases();
@ -282,6 +284,7 @@ function dropLocalDatabases (plannedDatabases) {
}
};
var db = require("internal").db;
db._useDatabase("_system");
var localDatabases = getLocalDatabases();
@ -320,6 +323,7 @@ function cleanupCurrentDatabases () {
}
};
var db = require("internal").db;
db._useDatabase("_system");
var all = ArangoAgency.get("Current/Databases", true);
@ -370,6 +374,7 @@ function createLocalCollections (plannedCollections) {
payload);
};
var db = require("internal").db;
db._useDatabase("_system");
var localDatabases = getLocalDatabases();
var database;
@ -529,6 +534,7 @@ function dropLocalCollections (plannedCollections) {
}
};
var db = require("internal").db;
db._useDatabase("_system");
var shardMap = getShardMap(plannedCollections);
@ -599,6 +605,7 @@ function cleanupCurrentCollections (plannedCollections) {
}
};
var db = require("internal").db;
db._useDatabase("_system");
var all = ArangoAgency.get("Current/Collections", true);
@ -661,13 +668,74 @@ function handleChanges (plan, current) {
handleDatabaseChanges(plan, current);
handleCollectionChanges(plan, current);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief retrieve a list of shards for a collection
////////////////////////////////////////////////////////////////////////////////
var shardList = function (dbName, collectionName) {
var ci = ArangoClusterInfo.getCollectionInfo(dbName, collectionName);
if (ci === undefined || typeof ci !== 'object') {
throw "unable to determine shard list for '" + dbName + "/" + collectionName + "'";
}
var shards = [ ], shard;
for (shard in ci.shards) {
if (ci.shards.hasOwnProperty(shard)) {
shards.push(shard);
}
}
return shards;
};
////////////////////////////////////////////////////////////////////////////////
/// @brief wait for a distributed response
////////////////////////////////////////////////////////////////////////////////
var wait = function (data, shards) {
var received = [ ];
while (received.length < shards.length) {
var result = ArangoClusterComm.wait(data);
var status = result.status;
if (status === "ERROR") {
throw "received an error";
}
else if (status === "TIMEOUT") {
throw "received a timeout";
}
else if (status === "DROPPED") {
throw "operation was dropped";
}
else if (status === "RECEIVED") {
received.push(result);
}
else {
// something else... wait without GC
require("internal").print(result);
require("internal").wait(0.1, false);
}
}
return received;
};
////////////////////////////////////////////////////////////////////////////////
/// @brief whether or not clustering is enabled
////////////////////////////////////////////////////////////////////////////////
var isCluster = function () {
return (typeof ArangoServerState !== "undefined" &&
ArangoServerState.initialised());
};
////////////////////////////////////////////////////////////////////////////////
/// @brief whether or not we are a coordinator
////////////////////////////////////////////////////////////////////////////////
var isCoordinator = function () {
if (! isCluster()) {
return false;
@ -718,6 +786,8 @@ var handlePlanChange = function () {
}
};
exports.shardList = shardList;
exports.wait = wait;
exports.isCluster = isCluster;
exports.isCoordinator = isCoordinator;
exports.role = role;

View File

@ -1,5 +1,5 @@
/*jslint indent: 2, nomen: true, maxlen: 100, sloppy: true, vars: true, white: true, plusplus: true */
/*global require, exports */
/*global require, exports, ArangoClusterComm, ArangoClusterInfo */
////////////////////////////////////////////////////////////////////////////////
/// @brief Arango Simple Query Language
@ -63,14 +63,74 @@ var SimpleQueryWithin = sq.SimpleQueryWithin;
////////////////////////////////////////////////////////////////////////////////
SimpleQueryAll.prototype.execute = function () {
var documents;
if (this._execution === null) {
if (this._skip === null) {
this._skip = 0;
}
var documents;
var cluster = require("org/arangodb/cluster");
if (cluster.isCoordinator()) {
var dbName = require("internal").db._name();
var shards = cluster.shardList(dbName, this._collection.name());
var coord = { coordTransactionID: ArangoClusterInfo.uniqid() };
var options = { coordTransactionID: coord.coordTransactionID, timeout: 360 };
shards.forEach(function (shard) {
ArangoClusterComm.asyncRequest("put",
"shard:" + shard,
dbName,
"/_api/simple/all",
JSON.stringify({
collection: shard,
skip: 0,
limit: this._skip + this._limit
}),
{ },
options);
});
documents = this._collection.ALL(this._skip, this._limit);
var _documents = [ ], total = 0;
var result = cluster.wait(coord, shards);
var toSkip = this._skip, toLimit = this._limit;
result.forEach(function(part) {
var body = JSON.parse(part.body);
total += body.total;
if (toSkip > 0) {
if (toSkip >= body.result.length) {
toSkip -= body.result.length;
return;
}
body.result = body.result.slice(toSkip);
toSkip = 0;
}
if (toLimit !== null && toLimit !== undefined) {
if (body.result.length >= toLimit) {
body.result = body.result.slice(0, toLimit);
toLimit = 0;
}
else {
toLimit -= body.result.length;
}
}
_documents = _documents.concat(body.result);
});
documents = {
documents: _documents,
count: _documents.length,
total: total
};
}
else {
documents = this._collection.ALL(this._skip, this._limit);
}
this._execution = new GeneralArrayCursor(documents.documents);
this._countQuery = documents.count;

View File

@ -2154,27 +2154,41 @@ static v8::Handle<v8::Value> JS_Wait (v8::Arguments const& argv) {
v8::HandleScope scope;
// extract arguments
if (argv.Length() != 1) {
TRI_V8_EXCEPTION_USAGE(scope, "wait(<seconds>)");
if (argv.Length() < 1) {
TRI_V8_EXCEPTION_USAGE(scope, "wait(<seconds>, <gc>)");
}
double n = TRI_ObjectToDouble(argv[0]);
double until = TRI_microtime() + n;
v8::V8::LowMemoryNotification();
while(! v8::V8::IdleNotification()) {
bool gc = true; // default is to trigger the gc
if (argv.Length() > 1) {
gc = TRI_ObjectToBoolean(argv[1]);
}
size_t i = 0;
while (TRI_microtime() < until) {
if (++i % 1000 == 0) {
// garbage collection only every x iterations, otherwise we'll use too much CPU
v8::V8::LowMemoryNotification();
while(! v8::V8::IdleNotification()) {
}
if (gc) {
// wait with gc
v8::V8::LowMemoryNotification();
while(! v8::V8::IdleNotification()) {
}
usleep(100);
size_t i = 0;
while (TRI_microtime() < until) {
if (++i % 1000 == 0) {
// garbage collection only every x iterations, otherwise we'll use too much CPU
v8::V8::LowMemoryNotification();
while(! v8::V8::IdleNotification()) {
}
}
usleep(100);
}
}
else {
// wait without gc
while (TRI_microtime() < until) {
usleep(100);
}
}
return scope.Close(v8::Undefined());