1
0
Fork 0

Merge branch 'sharding' of https://github.com/triAGENS/ArangoDB into sharding

This commit is contained in:
Frank Celler 2014-02-17 16:55:17 +01:00
commit 57796c96d1
18 changed files with 495 additions and 322 deletions

View File

@ -21,7 +21,7 @@ BUILT_SOURCES += @LIBEV_LIBS@
mkdir @top_srcdir@/3rdParty/libev/BUILD || true mkdir @top_srcdir@/3rdParty/libev/BUILD || true
cd @top_srcdir@/3rdParty/libev/BUILD && ../configure --disable-shared cd @top_srcdir@/3rdParty/libev/BUILD && ../configure --disable-shared
cd @top_srcdir@/3rdParty/libev/BUILD && $(MAKE) cd @top_srcdir@/3rdParty/libev/BUILD && $(MAKE) CFLAGS="-D EV_CHILD_ENABLE=0"
touch @srcdir@/.libev-build-@TRI_BITS@ touch @srcdir@/.libev-build-@TRI_BITS@

View File

@ -17,7 +17,11 @@ describe ArangoDB do
before do before do
@cn = "UnitTestsCollectionRange" @cn = "UnitTestsCollectionRange"
ArangoDB.drop_collection(@cn) ArangoDB.drop_collection(@cn)
@cid = ArangoDB.create_collection(@cn, false)
body = "{ \"name\" : \"#{@cn}\", \"numberOfShards\" : 8 }"
doc = ArangoDB.post("/_api/collection", :body => body)
doc.code.should eq(200)
@cid = doc.parsed_response['id']
end end
after do after do
@ -53,7 +57,7 @@ describe ArangoDB do
doc.parsed_response['hasMore'].should eq(false) doc.parsed_response['hasMore'].should eq(false)
doc.parsed_response['result'].length.should eq(2) doc.parsed_response['result'].length.should eq(2)
doc.parsed_response['count'].should eq(2) doc.parsed_response['count'].should eq(2)
doc.parsed_response['result'].map{|i| i['i']}.should =~ [2,3] doc.parsed_response['result'].map{|i| i['i']}.should eq([2,3])
# closed range # closed range
cmd = api + "/range" cmd = api + "/range"
@ -67,7 +71,60 @@ describe ArangoDB do
doc.parsed_response['hasMore'].should eq(false) doc.parsed_response['hasMore'].should eq(false)
doc.parsed_response['result'].length.should eq(3) doc.parsed_response['result'].length.should eq(3)
doc.parsed_response['count'].should eq(3) doc.parsed_response['count'].should eq(3)
doc.parsed_response['result'].map{|i| i['i']}.should =~ [2,3,4] doc.parsed_response['result'].map{|i| i['i']}.should eq([2,3,4])
end
it "finds the examples, big result" do
# create data
(0..499).each do |i|
body = "{ \"i\" : #{i} }"
doc = ArangoDB.post("/_api/document?collection=#{@cn}", :body => body)
doc.code.should eq(202)
end
# create index
cmd = "/_api/index?collection=#{@cn}"
body = "{ \"type\" : \"skiplist\", \"unique\" : false, \"fields\" : [ \"i\" ] }"
doc = ArangoDB.log_post("#{prefix}-range", cmd, :body => body)
doc.code.should eq(201)
# range
cmd = api + "/range"
body = "{ \"collection\" : \"#{@cn}\", \"attribute\" : \"i\", \"left\" : 5, \"right\" : 498 }"
doc = ArangoDB.log_put("#{prefix}-range", cmd, :body => body)
cmp = [ ]
(5..497).each do |i|
cmp.push i
end
doc.code.should eq(201)
doc.headers['content-type'].should eq("application/json; charset=utf-8")
doc.parsed_response['error'].should eq(false)
doc.parsed_response['code'].should eq(201)
doc.parsed_response['hasMore'].should eq(false)
doc.parsed_response['result'].length.should eq(493)
doc.parsed_response['count'].should eq(493)
doc.parsed_response['result'].map{|i| i['i']}.should eq(cmp)
# closed range
body = "{ \"collection\" : \"#{@cn}\", \"attribute\" : \"i\", \"left\" : 2, \"right\" : 498, \"closed\" : true }"
doc = ArangoDB.log_put("#{prefix}-range", cmd, :body => body)
cmp = [ ]
(2..498).each do |i|
cmp.push i
end
doc.code.should eq(201)
doc.headers['content-type'].should eq("application/json; charset=utf-8")
doc.parsed_response['error'].should eq(false)
doc.parsed_response['code'].should eq(201)
doc.parsed_response['hasMore'].should eq(false)
doc.parsed_response['result'].length.should eq(497)
doc.parsed_response['count'].should eq(497)
doc.parsed_response['result'].map{|i| i['i']}.should eq(cmp)
end end
end end

View File

@ -8910,7 +8910,8 @@ static v8::Handle<v8::Value> JS_CreateDatabase (v8::Arguments const& argv) {
TRI_V8_EXCEPTION(scope, TRI_ERROR_ARANGO_USE_SYSTEM_DATABASE); TRI_V8_EXCEPTION(scope, TRI_ERROR_ARANGO_USE_SYSTEM_DATABASE);
} }
return scope.Close(JS_CreateDatabase_Coordinator(argv)); v8::Handle<v8::Value> ret = JS_CreateDatabase_Coordinator(argv);
return scope.Close(ret);
} }
#endif #endif

View File

@ -16,10 +16,6 @@ curl --silent --dump - -L -X PUT "$ETCD/v2/keys/$NAME/Target/MapIDToEndpoint/Pav
curl --silent --dump - -L -X PUT "$ETCD/v2/keys/$NAME/Target/MapIDToEndpoint/Perry" -d "value=\"tcp://127.0.0.1:8531\"" || exit 1 curl --silent --dump - -L -X PUT "$ETCD/v2/keys/$NAME/Target/MapIDToEndpoint/Perry" -d "value=\"tcp://127.0.0.1:8531\"" || exit 1
curl --silent --dump - -L -X PUT "$ETCD/v2/keys/$NAME/Target/MapIDToEndpoint/Claus" -d "value=\"tcp://127.0.0.1:8529\"" || exit 1 curl --silent --dump - -L -X PUT "$ETCD/v2/keys/$NAME/Target/MapIDToEndpoint/Claus" -d "value=\"tcp://127.0.0.1:8529\"" || exit 1
#curl --silent --dump - -L -X PUT "$ETCD/v2/keys/$NAME/Current/Collections/@Usystem/5678" -d 'value={"status":3,"shards":{"shardBlubb": "Pavel"},"shardKeys":["xyz"],"indexes":{},"name":"testCollection","type":2,"id":"5678","doCompact":true,"isSystem":false,"isVolatile":false,"waitForSync":false,"maximalSize":1048576,"keyOptions":{"type":"traditional","allowUserKeys":true}}' || exit 1
#curl --silent --dump - -L -X PUT "$ETCD/v2/keys/$NAME/Current/Databases/@Usystem/Pavel" -d 'value={"name":"system"}}' || exit 1
#curl --silent --dump - -L -X PUT "$ETCD/v2/keys/$NAME/Current/Databases/@Usystem/Perry" -d 'value={"name":"system"}}' || exit 1
echo echo
echo start arangod with: echo start arangod with:
echo "Pavel: bin/arangod --cluster.my-id Pavel --cluster.agency-prefix $NAME --cluster.agency-endpoint tcp://127.0.0.1:4001 --server.endpoint tcp://127.0.0.1:8530 data-pavel" echo "Pavel: bin/arangod --cluster.my-id Pavel --cluster.agency-prefix $NAME --cluster.agency-endpoint tcp://127.0.0.1:4001 --server.endpoint tcp://127.0.0.1:8530 data-pavel"

View File

@ -233,7 +233,7 @@ function getLocalCollections () {
} }
} }
result[name] = p; result[name] = data;
} }
}); });

View File

@ -494,36 +494,129 @@ SimpleQueryByExample.prototype.execute = function () {
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
function rangedQuery (collection, attribute, left, right, type, skip, limit) { function rangedQuery (collection, attribute, left, right, type, skip, limit) {
var idx = collection.lookupSkiplist(attribute); var documents;
var cluster = require("org/arangodb/cluster");
if (idx === null) {
idx = collection.lookupUniqueSkiplist(attribute); if (cluster.isCoordinator()) {
var dbName = require("internal").db._name();
if (idx !== null) { var shards = cluster.shardList(dbName, collection.name());
console.debug("found unique skip-list index %s", idx.id); var coord = { coordTransactionID: ArangoClusterInfo.uniqid() };
var options = { coordTransactionID: coord.coordTransactionID, timeout: 360 };
var _limit = 0;
if (limit > 0) {
if (skip >= 0) {
_limit = skip + limit;
}
} }
shards.forEach(function (shard) {
ArangoClusterComm.asyncRequest("put",
"shard:" + shard,
dbName,
"/_api/simple/range",
JSON.stringify({
collection: shard,
attribute: attribute,
left: left,
right: right,
closed: type,
skip: 0,
limit: _limit || undefined,
batchSize: 100000000
}),
{ },
options);
});
var _documents = [ ], total = 0;
var result = cluster.wait(coord, shards);
var toSkip = skip, toLimit = limit;
if (toSkip < 0) {
// negative skip is special
toLimit = null;
}
result.forEach(function(part) {
var body = JSON.parse(part.body);
total += body.total;
if (toSkip > 0) {
if (toSkip >= body.result.length) {
toSkip -= body.result.length;
return;
}
body.result = body.result.slice(toSkip);
toSkip = 0;
}
if (toLimit !== null && toLimit !== undefined) {
if (body.result.length >= toLimit) {
body.result = body.result.slice(0, toLimit);
toLimit = 0;
}
else {
toLimit -= body.result.length;
}
}
_documents = _documents.concat(body.result);
});
if (shards.length > 1) {
var cmp = require("org/arangodb/ahuacatl").RELATIONAL_CMP;
_documents.sort(function (l, r) {
return cmp(l[attribute], r[attribute]);
});
}
if (skip < 0) {
// apply negative skip
_documents = _documents.slice(_documents.length + skip, limit || 100000000);
}
documents = {
documents: _documents,
count: _documents.length,
total: total
};
} }
else { else {
console.debug("found skip-list index %s", idx.id); var idx = collection.lookupSkiplist(attribute);
}
if (idx !== null) { if (idx === null) {
var cond = {}; idx = collection.lookupUniqueSkiplist(attribute);
if (type === 0) { if (idx !== null) {
cond[attribute] = [ [ ">=", left ], [ "<", right ] ]; console.debug("found unique skip-list index %s", idx.id);
} }
else if (type === 1) {
cond[attribute] = [ [ ">=", left ], [ "<=", right ] ];
} }
else { else {
throw "unknown type"; console.debug("found skip-list index %s", idx.id);
} }
return collection.BY_CONDITION_SKIPLIST(idx.id, cond, skip, limit); if (idx !== null) {
var cond = {};
if (type === 0) {
cond[attribute] = [ [ ">=", left ], [ "<", right ] ];
}
else if (type === 1) {
cond[attribute] = [ [ ">=", left ], [ "<=", right ] ];
}
else {
throw "unknown type";
}
documents = collection.BY_CONDITION_SKIPLIST(idx.id, cond, skip, limit);
}
else {
throw "not implemented";
}
} }
throw "not implemented"; return documents;
} }
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////

View File

@ -385,6 +385,16 @@ testFuncs.shell_server_ahuacatl = function(options) {
return "skipped"; return "skipped";
}; };
function waitForCompletion (pid) {
var stat;
while (true) {
wait(0.1);
stat = statusExternal(pid);
if (stat.status !== "RUNNING") { break; }
}
return stat.exit;
}
testFuncs.shell_client = function(options) { testFuncs.shell_client = function(options) {
var topDir = fs.normalize(fs.join(ArangoServerState.executablePath(), var topDir = fs.normalize(fs.join(ArangoServerState.executablePath(),
"..","..")); "..",".."));
@ -404,14 +414,7 @@ testFuncs.shell_client = function(options) {
var arangosh = fs.normalize(fs.join(ArangoServerState.executablePath(), var arangosh = fs.normalize(fs.join(ArangoServerState.executablePath(),
"..","arangosh")); "..","arangosh"));
var pid = executeExternal(arangosh, args); var pid = executeExternal(arangosh, args);
var stat; results[te] = waitForCompletion(pid);
while (true) {
wait(0.1);
stat = statusExternal(pid);
if (stat.status !== "RUNNING") { break; }
}
r = stat.exit;
results[te] = r;
args.pop(); args.pop();
if (r !== 0 && !options.force) { if (r !== 0 && !options.force) {
break; break;
@ -423,6 +426,22 @@ testFuncs.shell_client = function(options) {
return results; return results;
}; };
testFuncs.config = function (options) {
var topDir = fs.normalize(fs.join(ArangoServerState.executablePath(),
"..",".."));
var results = {};
var pid, r;
pid = executeExternal(fs.join(topDir,"bin","arangod"),
["--configuration",
fs.join(topDir,"etc","arangodb","arangod.conf"),
"--help"]);
r = waitForCompletion(pid);
results.arangod = r;
print("Config test arangod...",r);
return results;
};
testFuncs.dummy = function (options) { testFuncs.dummy = function (options) {
var instanceInfo = startInstance("tcp",options); var instanceInfo = startInstance("tcp",options);
print("Startup done."); print("Startup done.");
@ -435,7 +454,7 @@ testFuncs.dummy = function (options) {
var optionsDefaults = { "cluster": false, var optionsDefaults = { "cluster": false,
"valgrind": false, "valgrind": false,
"force": false, "force": true,
"skipBoost": false, "skipBoost": false,
"skipGeo": false, "skipGeo": false,
"skipAhuacatl": false, "skipAhuacatl": false,

View File

@ -118,6 +118,9 @@
// helper function to define a task that is run on upgrades only, but not on initialisation // helper function to define a task that is run on upgrades only, but not on initialisation
// of a new empty database // of a new empty database
function addUpgradeTask (name, description, fn) { function addUpgradeTask (name, description, fn) {
if (cluster.isCoordinator()) {
return;
}
if (isInitialisation) { if (isInitialisation) {
// if we are initialising a new database, set the task to completed // if we are initialising a new database, set the task to completed
// without executing it. this saves unnecessary migrations for empty // without executing it. this saves unnecessary migrations for empty
@ -130,7 +133,7 @@
} }
} }
if (fs.exists(versionFile)) { if (!cluster.isCoordinator() && fs.exists(versionFile)) {
// VERSION file exists, read its contents // VERSION file exists, read its contents
var versionInfo = fs.read(versionFile); var versionInfo = fs.read(versionFile);
@ -262,263 +265,237 @@
}); });
} }
if (! cluster.isCoordinator()) { // set up the collection _users
// set up the collection _users addTask("setupUsers", "setup _users collection", function () {
addTask("setupUsers", "setup _users collection", function () { return createSystemCollection("_users", { waitForSync : true });
return createSystemCollection("_users", { waitForSync : true }); });
});
}
if (! cluster.isCoordinator()) { // create a unique index on "user" attribute in _users
// create a unique index on "user" attribute in _users addTask("createUsersIndex",
addTask("createUsersIndex", "create index on 'user' attribute in _users collection",
"create index on 'user' attribute in _users collection", function () {
function () {
var users = getCollection("_users");
if (! users) {
return false;
}
users.ensureUniqueConstraint("user");
return true;
}
);
}
if (! cluster.isCoordinator()) {
// add a default root user with no passwd
addTask("addDefaultUser", "add default root user", function () {
var users = getCollection("_users"); var users = getCollection("_users");
if (! users) { if (! users) {
return false; return false;
} }
if (args && args.users) { users.ensureUniqueConstraint("user");
args.users.forEach(function(user) {
userManager.save(user.username, user.passwd, user.active, user.extra || { });
});
}
if (users.count() === 0) {
// only add account if user has not created his/her own accounts already
userManager.save("root", "", true);
}
return true; return true;
}); }
} );
if (! cluster.isCoordinator()) { // add a default root user with no passwd
// set up the collection _graphs addTask("addDefaultUser", "add default root user", function () {
addTask("setupGraphs", "setup _graphs collection", function () { var users = getCollection("_users");
return createSystemCollection("_graphs", { waitForSync : true }); if (! users) {
}); return false;
} }
if (args && args.users) {
args.users.forEach(function(user) {
userManager.save(user.username, user.passwd, user.active, user.extra || { });
});
}
if (users.count() === 0) {
// only add account if user has not created his/her own accounts already
userManager.save("root", "", true);
}
return true;
});
if (! cluster.isCoordinator()) { // set up the collection _graphs
// create a unique index on name attribute in _graphs addTask("setupGraphs", "setup _graphs collection", function () {
addTask("createGraphsIndex", return createSystemCollection("_graphs", { waitForSync : true });
"create index on name attribute in _graphs collection", });
function () {
var graphs = getCollection("_graphs"); // create a unique index on name attribute in _graphs
addTask("createGraphsIndex",
"create index on name attribute in _graphs collection",
function () {
var graphs = getCollection("_graphs");
if (! graphs) { if (! graphs) {
return false;
}
graphs.ensureUniqueConstraint("name");
return true;
}
);
}
if (! cluster.isCoordinator()) {
// make distinction between document and edge collections
addUpgradeTask("addCollectionVersion",
"set new collection type for edge collections and update collection version",
function () {
var collections = db._collections();
var i;
for (i in collections) {
if (collections.hasOwnProperty(i)) {
var collection = collections[i];
try {
if (collection.version() > 1) {
// already upgraded
continue;
}
if (collection.type() === 3) {
// already an edge collection
collection.setAttribute("version", 2);
continue;
}
if (collection.count() > 0) {
var isEdge = true;
// check the 1st 50 documents from a collection
var documents = collection.ALL(0, 50);
var j;
for (j in documents) {
if (documents.hasOwnProperty(j)) {
var doc = documents[j];
// check if documents contain both _from and _to attributes
if (! doc.hasOwnProperty("_from") || ! doc.hasOwnProperty("_to")) {
isEdge = false;
break;
}
}
}
if (isEdge) {
collection.setAttribute("type", 3);
logger.log("made collection '" + collection.name() + " an edge collection");
}
}
collection.setAttribute("version", 2);
}
catch (e) {
logger.error("could not upgrade collection '" + collection.name() + "'");
return false;
}
}
}
return true;
}
);
}
if (! cluster.isCoordinator()) {
// create the _modules collection
addTask("createModules", "setup _modules collection", function () {
return createSystemCollection("_modules");
});
}
if (! cluster.isCoordinator()) {
// create the _routing collection
addTask("createRouting", "setup _routing collection", function () {
// needs to be big enough for assets
return createSystemCollection("_routing", { journalSize: 32 * 1024 * 1024 });
});
}
if (! cluster.isCoordinator()) {
// create the default route in the _routing collection
addTask("insertRedirectionsAll", "insert default routes for admin interface", function () {
var routing = getCollection("_routing");
if (! routing) {
return false; return false;
} }
// first, check for "old" redirects graphs.ensureUniqueConstraint("name");
routing.toArray().forEach(function (doc) {
// check for specific redirects
if (doc.url && doc.action && doc.action.options && doc.action.options.destination) {
if (doc.url.match(/^\/(_admin\/(html|aardvark))?/) &&
doc.action.options.destination.match(/_admin\/(html|aardvark)/)) {
// remove old, non-working redirect
routing.remove(doc);
}
}
});
// add redirections to new location
[ "/", "/_admin/html", "/_admin/html/index.html" ].forEach (function (src) {
routing.save({
url: src,
action: {
"do": "org/arangodb/actions/redirectRequest",
options: {
permanently: true,
destination: "/_db/" + db._name() + "/_admin/aardvark/index.html"
}
},
priority: -1000000
});
});
return true; return true;
}); }
} );
if (! cluster.isCoordinator()) { // make distinction between document and edge collections
// update markers in all collection datafiles to key markers addUpgradeTask("addCollectionVersion",
addUpgradeTask("upgradeMarkers12", "update markers in all collection datafiles", function () { "set new collection type for edge collections and update collection version",
function () {
var collections = db._collections(); var collections = db._collections();
var i; var i;
for (i in collections) { for (i in collections) {
if (collections.hasOwnProperty(i)) { if (collections.hasOwnProperty(i)) {
var collection = collections[i]; var collection = collections[i];
try { try {
if (collection.version() >= 3) { if (collection.version() > 1) {
// already upgraded // already upgraded
continue; continue;
} }
if (collection.upgrade()) { if (collection.type() === 3) {
// success // already an edge collection
collection.setAttribute("version", 3); collection.setAttribute("version", 2);
continue;
} }
else {
// fail if (collection.count() > 0) {
logger.error("could not upgrade collection datafiles for '" var isEdge = true;
+ collection.name() + "'"); // check the 1st 50 documents from a collection
return false; var documents = collection.ALL(0, 50);
var j;
for (j in documents) {
if (documents.hasOwnProperty(j)) {
var doc = documents[j];
// check if documents contain both _from and _to attributes
if (! doc.hasOwnProperty("_from") || ! doc.hasOwnProperty("_to")) {
isEdge = false;
break;
}
}
}
if (isEdge) {
collection.setAttribute("type", 3);
logger.log("made collection '" + collection.name() + " an edge collection");
}
} }
collection.setAttribute("version", 2);
} }
catch (e) { catch (e) {
logger.error("could not upgrade collection datafiles for '" logger.error("could not upgrade collection '" + collection.name() + "'");
+ collection.name() + "'");
return false; return false;
} }
} }
} }
return true; return true;
}); }
} );
if (! cluster.isCoordinator()) {
// set up the collection _aal
addTask("setupAal", "setup _aal collection", function () {
return createSystemCollection("_aal", { waitForSync : true });
});
}
if (! cluster.isCoordinator()) { // create the _modules collection
// create a unique index on collection attribute in _aal addTask("createModules", "setup _modules collection", function () {
addTask("createAalIndex", return createSystemCollection("_modules");
"create index on collection attribute in _aal collection", });
function () {
var aal = getCollection("_aal"); // create the _routing collection
addTask("createRouting", "setup _routing collection", function () {
// needs to be big enough for assets
return createSystemCollection("_routing", { journalSize: 32 * 1024 * 1024 });
});
// create the default route in the _routing collection
addTask("insertRedirectionsAll", "insert default routes for admin interface", function () {
var routing = getCollection("_routing");
if (! aal) { if (! routing) {
return false;
}
// first, check for "old" redirects
routing.toArray().forEach(function (doc) {
// check for specific redirects
if (doc.url && doc.action && doc.action.options && doc.action.options.destination) {
if (doc.url.match(/^\/(_admin\/(html|aardvark))?/) &&
doc.action.options.destination.match(/_admin\/(html|aardvark)/)) {
// remove old, non-working redirect
routing.remove(doc);
}
}
});
// add redirections to new location
[ "/", "/_admin/html", "/_admin/html/index.html" ].forEach (function (src) {
routing.save({
url: src,
action: {
"do": "org/arangodb/actions/redirectRequest",
options: {
permanently: true,
destination: "/_db/" + db._name() + "/_admin/aardvark/index.html"
}
},
priority: -1000000
});
});
return true;
});
// update markers in all collection datafiles to key markers
addUpgradeTask("upgradeMarkers12", "update markers in all collection datafiles", function () {
var collections = db._collections();
var i;
for (i in collections) {
if (collections.hasOwnProperty(i)) {
var collection = collections[i];
try {
if (collection.version() >= 3) {
// already upgraded
continue;
}
if (collection.upgrade()) {
// success
collection.setAttribute("version", 3);
}
else {
// fail
logger.error("could not upgrade collection datafiles for '"
+ collection.name() + "'");
return false;
}
}
catch (e) {
logger.error("could not upgrade collection datafiles for '"
+ collection.name() + "'");
return false; return false;
} }
}
}
aal.ensureUniqueConstraint("name", "version"); return true;
});
return true;
}); // set up the collection _aal
} addTask("setupAal", "setup _aal collection", function () {
return createSystemCollection("_aal", { waitForSync : true });
});
if (! cluster.isCoordinator()) { // create a unique index on collection attribute in _aal
// set up the collection _aqlfunctions addTask("createAalIndex",
addTask("setupAqlFunctions", "setup _aqlfunctions collection", function () { "create index on collection attribute in _aal collection",
return createSystemCollection("_aqlfunctions"); function () {
}); var aal = getCollection("_aal");
}
if (! aal) {
return false;
}
aal.ensureUniqueConstraint("name", "version");
return true;
});
// set up the collection _aqlfunctions
addTask("setupAqlFunctions", "setup _aqlfunctions collection", function () {
return createSystemCollection("_aqlfunctions");
});
if (! cluster.isCoordinator()) { if (! cluster.isCoordinator()) {
// set up the collection _trx // set up the collection _trx
@ -534,56 +511,52 @@
}); });
} }
if (! cluster.isCoordinator()) { // migration aql function names
// migration aql function names addUpgradeTask("migrateAqlFunctions", "migrate _aqlfunctions name", function () {
addUpgradeTask("migrateAqlFunctions", "migrate _aqlfunctions name", function () { var funcs = getCollection('_aqlfunctions');
var funcs = getCollection('_aqlfunctions');
if (! funcs) { if (! funcs) {
return false; return false;
}
var result = true;
funcs.toArray().forEach(function(f) {
var oldKey = f._key;
var newKey = oldKey.replace(/:{1,}/g, '::');
if (oldKey !== newKey) {
try {
var doc = {
_key: newKey.toUpperCase(),
name: newKey,
code: f.code,
isDeterministic: f.isDeterministic
};
funcs.save(doc);
funcs.remove(oldKey);
}
catch (err) {
result = false;
}
} }
var result = true;
funcs.toArray().forEach(function(f) {
var oldKey = f._key;
var newKey = oldKey.replace(/:{1,}/g, '::');
if (oldKey !== newKey) {
try {
var doc = {
_key: newKey.toUpperCase(),
name: newKey,
code: f.code,
isDeterministic: f.isDeterministic
};
funcs.save(doc);
funcs.remove(oldKey);
}
catch (err) {
result = false;
}
}
});
return result;
}); });
}
if (! cluster.isCoordinator()) { return result;
addUpgradeTask("removeOldFoxxRoutes", "Remove all old Foxx Routes", function () { });
var potentialFoxxes = getCollection('_routing');
potentialFoxxes.iterate(function (maybeFoxx) { addUpgradeTask("removeOldFoxxRoutes", "Remove all old Foxx Routes", function () {
if (maybeFoxx.foxxMount) { var potentialFoxxes = getCollection('_routing');
// This is a Foxx! Let's delete it
potentialFoxxes.remove(maybeFoxx._id);
}
});
return true; potentialFoxxes.iterate(function (maybeFoxx) {
if (maybeFoxx.foxxMount) {
// This is a Foxx! Let's delete it
potentialFoxxes.remove(maybeFoxx._id);
}
}); });
}
return true;
});
// loop through all tasks and execute them // loop through all tasks and execute them
@ -615,10 +588,12 @@
// success // success
lastTasks[task.name] = true; lastTasks[task.name] = true;
// save/update version info if (!cluster.isCoordinator()) {
fs.write( // save/update version info
versionFile, fs.write(
JSON.stringify({ version: currentVersion, tasks: lastTasks })); versionFile,
JSON.stringify({ version: currentVersion, tasks: lastTasks }));
}
logger.log("Task successful"); logger.log("Task successful");
} }
@ -630,10 +605,12 @@
} }
} }
// save file so version gets saved even if there are no tasks if (!cluster.isCoordinator()) {
fs.write( // save file so version gets saved even if there are no tasks
versionFile, fs.write(
JSON.stringify({ version: currentVersion, tasks: lastTasks })); versionFile,
JSON.stringify({ version: currentVersion, tasks: lastTasks }));
}
logger.log(procedure + " successfully finished"); logger.log(procedure + " successfully finished");
@ -653,6 +630,10 @@
var currentVersion = parseFloat(currentServerVersion[1]); var currentVersion = parseFloat(currentServerVersion[1]);
if (cluster.isCoordinator()) {
return runUpgrade(currentVersion);
}
if (! fs.exists(versionFile)) { if (! fs.exists(versionFile)) {
logger.info("No version information file found in database directory."); logger.info("No version information file found in database directory.");
return runUpgrade(currentVersion); return runUpgrade(currentVersion);

View File

@ -739,7 +739,8 @@ void TRI_CreateExternalProcess (const char* executable,
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
#ifndef _WIN32 #ifndef _WIN32
TRI_external_status_t TRI_CheckExternalProcess (TRI_external_id_t pid) { TRI_external_status_t TRI_CheckExternalProcess (TRI_external_id_t pid,
bool wait) {
TRI_external_status_t status; TRI_external_status_t status;
TRI_external_t* external; TRI_external_t* external;
int loc; int loc;
@ -766,7 +767,12 @@ TRI_external_status_t TRI_CheckExternalProcess (TRI_external_id_t pid) {
} }
if (external->_status == TRI_EXT_RUNNING || external->_status == TRI_EXT_STOPPED) { if (external->_status == TRI_EXT_RUNNING || external->_status == TRI_EXT_STOPPED) {
opts = WNOHANG | WUNTRACED; if (!wait) {
opts = WNOHANG | WUNTRACED;
}
else {
opts = WUNTRACED;
}
res = waitpid(external->_pid, &loc, opts); res = waitpid(external->_pid, &loc, opts);
if (res == 0) { if (res == 0) {
@ -793,7 +799,8 @@ TRI_external_status_t TRI_CheckExternalProcess (TRI_external_id_t pid) {
return status; return status;
} }
#else #else
TRI_external_status_t TRI_CheckExternalProcess (HANDLE hProcess) { TRI_external_status_t TRI_CheckExternalProcess (HANDLE hProcess,
bool wait) {
TRI_external_status_t status; TRI_external_status_t status;
TRI_external_t* external; TRI_external_t* external;
int loc; int loc;
@ -818,6 +825,10 @@ TRI_external_status_t TRI_CheckExternalProcess (HANDLE hProcess) {
} }
if (external->_status == TRI_EXT_RUNNING || external->_status == TRI_EXT_STOPPED) { if (external->_status == TRI_EXT_RUNNING || external->_status == TRI_EXT_STOPPED) {
if (wait) {
DWORD result;
result = WaitForSingleObject(hProcess, INFINITE);
}
DWORD exitCode; DWORD exitCode;
if (!GetExitCodeProcess(hProcess , &exitCode)) { if (!GetExitCodeProcess(hProcess , &exitCode)) {
LOG_WARNING("exit status could not be called for handle '%p'", hProcess); LOG_WARNING("exit status could not be called for handle '%p'", hProcess);

View File

@ -186,7 +186,7 @@ void TRI_CreateExternalProcess (const char* executable,
/// @brief returns the status of an external process /// @brief returns the status of an external process
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
TRI_external_status_t TRI_CheckExternalProcess (pid_t pid); TRI_external_status_t TRI_CheckExternalProcess (pid_t pid, bool wait);
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @brief kills an external process /// @brief kills an external process

View File

@ -251,11 +251,18 @@ static inline void TRI_invalidatesocket (TRI_socket_t* socket) {
} }
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @brief get file descriptor /// @brief get file descriptor or handle, depending on OS
///
/// Note that this returns the fileHandle under Windows which is exactly
/// the right thing we need in all but one places.
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
static inline int TRI_get_fd_of_socket (TRI_socket_t socket) { static inline int TRI_get_fd_or_handle_of_socket (TRI_socket_t socket) {
#ifdef _WIN32
return (int)(socket.fileHandle);
#else
return socket.fileDescriptor; return socket.fileDescriptor;
#endif
} }
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////

View File

@ -151,6 +151,7 @@ add_library(
Admin/RestAdminLogHandler.cpp Admin/RestAdminLogHandler.cpp
Admin/RestBaseHandler.cpp Admin/RestBaseHandler.cpp
Admin/RestJobHandler.cpp Admin/RestJobHandler.cpp
Admin/RestShutdownHandler.cpp
Admin/RestVersionHandler.cpp Admin/RestVersionHandler.cpp
ApplicationServer/ApplicationFeature.cpp ApplicationServer/ApplicationFeature.cpp
ApplicationServer/ApplicationServer.cpp ApplicationServer/ApplicationServer.cpp

View File

@ -103,7 +103,7 @@ namespace triagens {
_maximalHeaderSize(0), _maximalHeaderSize(0),
_maximalBodySize(0) { _maximalBodySize(0) {
LOG_TRACE("connection established, client %d, server ip %s, server port %d, client ip %s, client port %d", LOG_TRACE("connection established, client %d, server ip %s, server port %d, client ip %s, client port %d",
TRI_get_fd_of_socket(socket), TRI_get_fd_or_handle_of_socket(socket),
_connectionInfo.serverAddress.c_str(), _connectionInfo.serverAddress.c_str(),
(int) _connectionInfo.serverPort, (int) _connectionInfo.serverPort,
_connectionInfo.clientAddress.c_str(), _connectionInfo.clientAddress.c_str(),
@ -123,7 +123,7 @@ namespace triagens {
protected: protected:
~GeneralCommTask () { ~GeneralCommTask () {
LOG_TRACE("connection closed, client %d", (int) TRI_get_fd_of_socket(_commSocket)); LOG_TRACE("connection closed, client %d", (int) TRI_get_fd_or_handle_of_socket(_commSocket));
// free write buffers // free write buffers
for (deque<basics::StringBuffer*>::iterator i = _writeBuffers.begin(); i != _writeBuffers.end(); i++) { for (deque<basics::StringBuffer*>::iterator i = _writeBuffers.begin(); i != _writeBuffers.end(); i++) {

View File

@ -276,7 +276,7 @@ namespace triagens {
LOG_DEBUG("trying to establish secure connection"); LOG_DEBUG("trying to establish secure connection");
// convert in a SSL BIO structure // convert in a SSL BIO structure
BIO * sbio = BIO_new_socket(TRI_get_fd_of_socket(socket), BIO_NOCLOSE); BIO * sbio = BIO_new_socket(TRI_get_fd_or_handle_of_socket(socket), BIO_NOCLOSE);
if (sbio == 0) { if (sbio == 0) {
LOG_WARNING("cannot build new SSL BIO: %s", triagens::basics::lastSSLError().c_str()); LOG_WARNING("cannot build new SSL BIO: %s", triagens::basics::lastSSLError().c_str());

View File

@ -579,7 +579,10 @@ EventToken SchedulerLibev::installSocketEvent (EventLoop loop, EventType type, T
} }
ev_io* w = (ev_io*) watcher; ev_io* w = (ev_io*) watcher;
ev_io_init(w, socketCallback, TRI_get_fd_of_socket(socket), flags); // Note that we do not use TRI_get_fd_or_handle_of_socket here because even
// under Windows we want get the entry fileDescriptor here because
// of the reason that is mentioned above!
ev_io_init(w, socketCallback, socket.fileDescriptor, flags);
ev_io_start(watcher->loop, w); ev_io_start(watcher->loop, w);
return watcher->token; return watcher->token;

View File

@ -177,7 +177,7 @@ bool ClientConnection::prepare (const double timeout, const bool isWrite) const
tv.tv_usec = ((long) (timeout * 1000000.0)) % 1000000; tv.tv_usec = ((long) (timeout * 1000000.0)) % 1000000;
FD_ZERO(&fdset); FD_ZERO(&fdset);
FD_SET(TRI_get_fd_of_socket(_socket), &fdset); FD_SET(TRI_get_fd_or_handle_of_socket(_socket), &fdset);
fd_set* readFds = NULL; fd_set* readFds = NULL;
fd_set* writeFds = NULL; fd_set* writeFds = NULL;
@ -189,7 +189,7 @@ bool ClientConnection::prepare (const double timeout, const bool isWrite) const
readFds = &fdset; readFds = &fdset;
} }
int sockn = (int) (TRI_get_fd_of_socket(_socket) + 1); int sockn = (int) (TRI_get_fd_or_handle_of_socket(_socket) + 1);
int res = select(sockn, readFds, writeFds, NULL, &tv); int res = select(sockn, readFds, writeFds, NULL, &tv);
if (res > 0) { if (res > 0) {

View File

@ -162,7 +162,7 @@ bool SslClientConnection::connectSocket () {
return false; return false;
} }
if (SSL_set_fd(_ssl, TRI_get_fd_of_socket(_socket)) != 1) { if (SSL_set_fd(_ssl, TRI_get_fd_or_handle_of_socket(_socket)) != 1) {
_endpoint->disconnect(); _endpoint->disconnect();
SSL_free(_ssl); SSL_free(_ssl);
_ssl = 0; _ssl = 0;
@ -210,7 +210,7 @@ bool SslClientConnection::prepare (const double timeout, const bool isWrite) con
tv.tv_usec = ((long) (timeout * 1000000.0)) % 1000000; tv.tv_usec = ((long) (timeout * 1000000.0)) % 1000000;
FD_ZERO(&fdset); FD_ZERO(&fdset);
FD_SET(TRI_get_fd_of_socket(_socket), &fdset); FD_SET(TRI_get_fd_or_handle_of_socket(_socket), &fdset);
fd_set* readFds = NULL; fd_set* readFds = NULL;
fd_set* writeFds = NULL; fd_set* writeFds = NULL;
@ -222,7 +222,7 @@ bool SslClientConnection::prepare (const double timeout, const bool isWrite) con
readFds = &fdset; readFds = &fdset;
} }
int sockn = (int) (TRI_get_fd_of_socket(_socket) + 1); int sockn = (int) (TRI_get_fd_or_handle_of_socket(_socket) + 1);
if (select(sockn, readFds, writeFds, NULL, &tv) > 0) { if (select(sockn, readFds, writeFds, NULL, &tv) > 0) {
return true; return true;
} }

View File

@ -2544,12 +2544,16 @@ static v8::Handle<v8::Value> JS_StatusExternal (v8::Arguments const& argv) {
v8::HandleScope scope; v8::HandleScope scope;
// extract the arguments // extract the arguments
if (argv.Length() != 1) { if (argv.Length() < 1 || argv.Length() > 2) {
TRI_V8_EXCEPTION_USAGE(scope, "statusExternal(<external-identifier>)"); TRI_V8_EXCEPTION_USAGE(scope, "statusExternal(<external-identifier>, <wait>)");
} }
TRI_external_id_t pid = TRI_ObjectToUInt64(argv[0], true); TRI_external_id_t pid = TRI_ObjectToUInt64(argv[0], true);
TRI_external_status_t external = TRI_CheckExternalProcess(pid); bool wait = false;
if (argv.Length() == 2) {
wait = TRI_ObjectToBoolean(argv[1]);
}
TRI_external_status_t external = TRI_CheckExternalProcess(pid, wait);
v8::Handle<v8::Object> result = v8::Object::New(); v8::Handle<v8::Object> result = v8::Object::New();
const char* status = "UNKNOWN"; const char* status = "UNKNOWN";