From 3e84a3b650ab43d3a14ed37ec70079dfbae25772 Mon Sep 17 00:00:00 2001 From: Jan Steemann Date: Wed, 22 Jan 2014 08:37:54 +0100 Subject: [PATCH 1/9] don't throw errors on startup on coordinator --- js/server/version-check.js | 511 ++++++++++++++++++++----------------- 1 file changed, 273 insertions(+), 238 deletions(-) diff --git a/js/server/version-check.js b/js/server/version-check.js index 01680b2be3..89feed3561 100644 --- a/js/server/version-check.js +++ b/js/server/version-check.js @@ -47,6 +47,7 @@ var fs = require("fs"); var console = require("console"); var userManager = require("org/arangodb/users"); + var cluster = require("org/arangodb/cluster"); var db = internal.db; // whether or not we are initialising an empty / a new database @@ -259,292 +260,326 @@ }); } - // set up the collection _users - addTask("setupUsers", "setup _users collection", function () { - return createSystemCollection("_users", { waitForSync : true }); - }); + if (! cluster.isCoordinator()) { + // set up the collection _users + addTask("setupUsers", "setup _users collection", function () { + return createSystemCollection("_users", { waitForSync : true }); + }); + } - // create a unique index on "user" attribute in _users - addTask("createUsersIndex", - "create index on 'user' attribute in _users collection", - function () { + if (! cluster.isCoordinator()) { + // create a unique index on "user" attribute in _users + addTask("createUsersIndex", + "create index on 'user' attribute in _users collection", + function () { + var users = getCollection("_users"); + if (! users) { + return false; + } + + users.ensureUniqueConstraint("user"); + + return true; + } + ); + } + + if (! cluster.isCoordinator()) { + // add a default root user with no passwd + addTask("addDefaultUser", "add default root user", function () { var users = getCollection("_users"); if (! users) { return false; } - users.ensureUniqueConstraint("user"); + if (args && args.users) { + args.users.forEach(function(user) { + userManager.save(user.username, user.passwd, user.active, user.extra || { }); + }); + } + + if (users.count() === 0) { + // only add account if user has not created his/her own accounts already + userManager.save("root", "", true); + } return true; }); + } - // add a default root user with no passwd - addTask("addDefaultUser", "add default root user", function () { - var users = getCollection("_users"); - if (! users) { - return false; - } - - if (args && args.users) { - args.users.forEach(function(user) { - userManager.save(user.username, user.passwd, user.active, user.extra || { }); - }); - } - - if (users.count() === 0) { - // only add account if user has not created his/her own accounts already - userManager.save("root", "", true); - } - - return true; - }); + if (! cluster.isCoordinator()) { + // set up the collection _graphs + addTask("setupGraphs", "setup _graphs collection", function () { + return createSystemCollection("_graphs", { waitForSync : true }); + }); + } - // set up the collection _graphs - addTask("setupGraphs", "setup _graphs collection", function () { - return createSystemCollection("_graphs", { waitForSync : true }); - }); - - // create a unique index on name attribute in _graphs - addTask("createGraphsIndex", - "create index on name attribute in _graphs collection", - function () { - var graphs = getCollection("_graphs"); + if (! cluster.isCoordinator()) { + // create a unique index on name attribute in _graphs + addTask("createGraphsIndex", + "create index on name attribute in _graphs collection", + function () { + var graphs = getCollection("_graphs"); - if (! graphs) { + if (! graphs) { + return false; + } + + graphs.ensureUniqueConstraint("name"); + + return true; + } + ); + } + + if (! cluster.isCoordinator()) { + // make distinction between document and edge collections + addUpgradeTask("addCollectionVersion", + "set new collection type for edge collections and update collection version", + function () { + var collections = db._collections(); + var i; + + for (i in collections) { + if (collections.hasOwnProperty(i)) { + var collection = collections[i]; + + try { + if (collection.version() > 1) { + // already upgraded + continue; + } + + if (collection.type() === 3) { + // already an edge collection + collection.setAttribute("version", 2); + continue; + } + + if (collection.count() > 0) { + var isEdge = true; + // check the 1st 50 documents from a collection + var documents = collection.ALL(0, 50); + var j; + + for (j in documents) { + if (documents.hasOwnProperty(j)) { + var doc = documents[j]; + + // check if documents contain both _from and _to attributes + if (! doc.hasOwnProperty("_from") || ! doc.hasOwnProperty("_to")) { + isEdge = false; + break; + } + } + } + + if (isEdge) { + collection.setAttribute("type", 3); + logger.log("made collection '" + collection.name() + " an edge collection"); + } + } + collection.setAttribute("version", 2); + } + catch (e) { + logger.error("could not upgrade collection '" + collection.name() + "'"); + return false; + } + } + } + + return true; + } + ); + } + + if (! cluster.isCoordinator()) { + // create the _modules collection + addTask("createModules", "setup _modules collection", function () { + return createSystemCollection("_modules"); + }); + } + + if (! cluster.isCoordinator()) { + // create the _routing collection + addTask("createRouting", "setup _routing collection", function () { + // needs to be big enough for assets + return createSystemCollection("_routing", { journalSize: 32 * 1024 * 1024 }); + }); + } + + if (! cluster.isCoordinator()) { + // create the default route in the _routing collection + addTask("insertRedirectionsAll", "insert default routes for admin interface", function () { + var routing = getCollection("_routing"); + + if (! routing) { return false; } - graphs.ensureUniqueConstraint("name"); + // first, check for "old" redirects + routing.toArray().forEach(function (doc) { + // check for specific redirects + if (doc.url && doc.action && doc.action.options && doc.action.options.destination) { + if (doc.url.match(/^\/(_admin\/(html|aardvark))?/) && + doc.action.options.destination.match(/_admin\/(html|aardvark)/)) { + // remove old, non-working redirect + routing.remove(doc); + } + } + }); + + // add redirections to new location + [ "/", "/_admin/html", "/_admin/html/index.html" ].forEach (function (src) { + routing.save({ + url: src, + action: { + "do": "org/arangodb/actions/redirectRequest", + options: { + permanently: true, + destination: "/_db/" + db._name() + "/_admin/aardvark/index.html" + } + }, + priority: -1000000 + }); + }); return true; }); - - // make distinction between document and edge collections - addUpgradeTask("addCollectionVersion", - "set new collection type for edge collections and update collection version", - function () { + } + + if (! cluster.isCoordinator()) { + // update markers in all collection datafiles to key markers + addUpgradeTask("upgradeMarkers12", "update markers in all collection datafiles", function () { var collections = db._collections(); var i; - + for (i in collections) { if (collections.hasOwnProperty(i)) { var collection = collections[i]; try { - if (collection.version() > 1) { + if (collection.version() >= 3) { // already upgraded continue; } - if (collection.type() === 3) { - // already an edge collection - collection.setAttribute("version", 2); - continue; + if (collection.upgrade()) { + // success + collection.setAttribute("version", 3); } - - if (collection.count() > 0) { - var isEdge = true; - // check the 1st 50 documents from a collection - var documents = collection.ALL(0, 50); - var j; - - for (j in documents) { - if (documents.hasOwnProperty(j)) { - var doc = documents[j]; - - // check if documents contain both _from and _to attributes - if (! doc.hasOwnProperty("_from") || ! doc.hasOwnProperty("_to")) { - isEdge = false; - break; - } - } - } - - if (isEdge) { - collection.setAttribute("type", 3); - logger.log("made collection '" + collection.name() + " an edge collection"); - } + else { + // fail + logger.error("could not upgrade collection datafiles for '" + + collection.name() + "'"); + return false; } - collection.setAttribute("version", 2); } catch (e) { - logger.error("could not upgrade collection '" + collection.name() + "'"); - return false; - } - } - } - - return true; - }); - - // create the _modules collection - addTask("createModules", "setup _modules collection", function () { - return createSystemCollection("_modules"); - }); - - // create the _routing collection - addTask("createRouting", "setup _routing collection", function () { - // needs to be big enough for assets - return createSystemCollection("_routing", { journalSize: 32 * 1024 * 1024 }); - }); - - // create the default route in the _routing collection - addTask("insertRedirectionsAll", "insert default routes for admin interface", function () { - var routing = getCollection("_routing"); - - if (! routing) { - return false; - } - - // first, check for "old" redirects - routing.toArray().forEach(function (doc) { - // check for specific redirects - if (doc.url && doc.action && doc.action.options && doc.action.options.destination) { - if (doc.url.match(/^\/(_admin\/(html|aardvark))?/) && - doc.action.options.destination.match(/_admin\/(html|aardvark)/)) { - // remove old, non-working redirect - routing.remove(doc); - } - } - }); - - // add redirections to new location - [ "/", "/_admin/html", "/_admin/html/index.html" ].forEach (function (src) { - routing.save({ - url: src, - action: { - "do": "org/arangodb/actions/redirectRequest", - options: { - permanently: true, - destination: "/_db/" + db._name() + "/_admin/aardvark/index.html" - } - }, - priority: -1000000 - }); - }); - - return true; - }); - - // update markers in all collection datafiles to key markers - addUpgradeTask("upgradeMarkers12", "update markers in all collection datafiles", function () { - var collections = db._collections(); - var i; - - for (i in collections) { - if (collections.hasOwnProperty(i)) { - var collection = collections[i]; - - try { - if (collection.version() >= 3) { - // already upgraded - continue; - } - - if (collection.upgrade()) { - // success - collection.setAttribute("version", 3); - } - else { - // fail - logger.error("could not upgrade collection datafiles for '" + logger.error("could not upgrade collection datafiles for '" + collection.name() + "'"); return false; } } - catch (e) { - logger.error("could not upgrade collection datafiles for '" - + collection.name() + "'"); + } + + return true; + }); + } + + if (! cluster.isCoordinator()) { + // set up the collection _aal + addTask("setupAal", "setup _aal collection", function () { + return createSystemCollection("_aal", { waitForSync : true }); + }); + + // create a unique index on collection attribute in _aal + addTask("createAalIndex", + "create index on collection attribute in _aal collection", + function () { + var aal = getCollection("_aal"); + + if (! aal) { return false; } - } - } - return true; - }); - - // set up the collection _aal - addTask("setupAal", "setup _aal collection", function () { - return createSystemCollection("_aal", { waitForSync : true }); - }); + aal.ensureUniqueConstraint("name", "version"); + + return true; + }); + } - // create a unique index on collection attribute in _aal - addTask("createAalIndex", - "create index on collection attribute in _aal collection", - function () { - var aal = getCollection("_aal"); + if (! cluster.isCoordinator()) { + // set up the collection _aqlfunctions + addTask("setupAqlFunctions", "setup _aqlfunctions collection", function () { + return createSystemCollection("_aqlfunctions"); + }); + } - if (! aal) { + if (! cluster.isCoordinator()) { + // set up the collection _trx + addTask("setupTrx", "setup _trx collection", function () { + return createSystemCollection("_trx", { waitForSync : false }); + }); + } + + if (! cluster.isCoordinator()) { + // set up the collection _replication + addTask("setupReplication", "setup _replication collection", function () { + return createSystemCollection("_replication", { waitForSync : false }); + }); + } + + if (! cluster.isCoordinator()) { + // migration aql function names + addUpgradeTask("migrateAqlFunctions", "migrate _aqlfunctions name", function () { + var funcs = getCollection('_aqlfunctions'); + + if (! funcs) { return false; } - aal.ensureUniqueConstraint("name", "version"); + var result = true; + funcs.toArray().forEach(function(f) { + var oldKey = f._key; + var newKey = oldKey.replace(/:{1,}/g, '::'); + + if (oldKey !== newKey) { + try { + var doc = { + _key: newKey.toUpperCase(), + name: newKey, + code: f.code, + isDeterministic: f.isDeterministic + }; + + funcs.save(doc); + funcs.remove(oldKey); + } + catch (err) { + result = false; + } + } + }); + + return result; + }); + } + + if (! cluster.isCoordinator()) { + addUpgradeTask("removeOldFoxxRoutes", "Remove all old Foxx Routes", function () { + var potentialFoxxes = getCollection('_routing'); + + potentialFoxxes.iterate(function (maybeFoxx) { + if (maybeFoxx.foxxMount) { + // This is a Foxx! Let's delete it + potentialFoxxes.remove(maybeFoxx._id); + } + }); return true; - }); - - // set up the collection _aqlfunctions - addTask("setupAqlFunctions", "setup _aqlfunctions collection", function () { - return createSystemCollection("_aqlfunctions"); - }); - - // set up the collection _trx - addTask("setupTrx", "setup _trx collection", function () { - return createSystemCollection("_trx", { waitForSync : false }); - }); - - // set up the collection _replication - addTask("setupReplication", "setup _replication collection", function () { - return createSystemCollection("_replication", { waitForSync : false }); - }); - - // migration aql function names - addUpgradeTask("migrateAqlFunctions", "migrate _aqlfunctions name", function () { - var funcs = getCollection('_aqlfunctions'); - - if (! funcs) { - return false; - } - - var result = true; - funcs.toArray().forEach(function(f) { - var oldKey = f._key; - var newKey = oldKey.replace(/:{1,}/g, '::'); - - if (oldKey !== newKey) { - try { - var doc = { - _key: newKey.toUpperCase(), - name: newKey, - code: f.code, - isDeterministic: f.isDeterministic - }; - - funcs.save(doc); - funcs.remove(oldKey); - } - catch (err) { - result = false; - } - } }); - - return result; - }); - - addUpgradeTask("removeOldFoxxRoutes", "Remove all old Foxx Routes", function () { - var potentialFoxxes = getCollection('_routing'); - - potentialFoxxes.iterate(function (maybeFoxx) { - if (maybeFoxx.foxxMount) { - // This is a Foxx! Let's delete it - potentialFoxxes.remove(maybeFoxx._id); - } - }); - - return true; - }); - + } // loop through all tasks and execute them From d92d23d3c10b7518a7af51fe8539a1ccf9ea1300 Mon Sep 17 00:00:00 2001 From: Jan Steemann Date: Wed, 22 Jan 2014 10:19:00 +0100 Subject: [PATCH 2/9] dont fail on removing collection info --- arangod/Cluster/ClusterInfo.cpp | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/arangod/Cluster/ClusterInfo.cpp b/arangod/Cluster/ClusterInfo.cpp index eddfbf17aa..64aef2ca66 100644 --- a/arangod/Cluster/ClusterInfo.cpp +++ b/arangod/Cluster/ClusterInfo.cpp @@ -969,22 +969,23 @@ int ClusterInfo::dropDatabaseCoordinator (string const& name, string& errorMsg, return setErrormsg(TRI_ERROR_ARANGO_DATABASE_NOT_FOUND, errorMsg); } - res = ac.removeValues("Plan/Collections/" + name, true); - - if (! res.successful()) { - return setErrormsg(TRI_ERROR_CLUSTER_COULD_NOT_REMOVE_DATABASE_IN_PLAN, - errorMsg); - } - res = ac.removeValues("Plan/Databases/"+name, false); if (!res.successful()) { - if (res._statusCode == rest::HttpResponse::NOT_FOUND) { + if (res.httpCode() == (int) rest::HttpResponse::NOT_FOUND) { return setErrormsg(TRI_ERROR_ARANGO_DATABASE_NOT_FOUND, errorMsg); } return setErrormsg(TRI_ERROR_CLUSTER_COULD_NOT_REMOVE_DATABASE_IN_PLAN, errorMsg); } + + res = ac.removeValues("Plan/Collections/" + name, true); + + if (! res.successful() && res.httpCode() != (int) rest::HttpResponse::NOT_FOUND) { + return setErrormsg(TRI_ERROR_CLUSTER_COULD_NOT_REMOVE_DATABASE_IN_PLAN, + errorMsg); + } + } // Now wait for it to appear and be complete: From 8a74c2864e5cde08e0341ec27a7bd14c2cbd051a Mon Sep 17 00:00:00 2001 From: Jan Steemann Date: Wed, 22 Jan 2014 11:41:32 +0100 Subject: [PATCH 3/9] cleanup of current/collections --- init-cluster.sh | 4 +- js/server/modules/org/arangodb/cluster.js | 137 ++++++++++++++++++---- 2 files changed, 119 insertions(+), 22 deletions(-) diff --git a/init-cluster.sh b/init-cluster.sh index 8e46115fd3..b0e13c2e8e 100755 --- a/init-cluster.sh +++ b/init-cluster.sh @@ -17,8 +17,8 @@ curl --silent --dump - -L -X PUT "$ETCD/v2/keys/$NAME/Target/MapIDToEndpoint/Per curl --silent --dump - -L -X PUT "$ETCD/v2/keys/$NAME/Target/MapIDToEndpoint/Claus" -d "value=\"tcp://127.0.0.1:8529\"" || exit 1 #curl --silent --dump - -L -X PUT "$ETCD/v2/keys/$NAME/Current/Collections/@Usystem/5678" -d 'value={"status":3,"shards":{"shardBlubb": "Pavel"},"shardKeys":["xyz"],"indexes":{},"name":"testCollection","type":2,"id":"5678","doCompact":true,"isSystem":false,"isVolatile":false,"waitForSync":false,"maximalSize":1048576,"keyOptions":{"type":"traditional","allowUserKeys":true}}' || exit 1 -curl --silent --dump - -L -X PUT "$ETCD/v2/keys/$NAME/Current/Databases/@Usystem/Pavel" -d 'value={"name":"system"}}' || exit 1 -curl --silent --dump - -L -X PUT "$ETCD/v2/keys/$NAME/Current/Databases/@Usystem/Perry" -d 'value={"name":"system"}}' || exit 1 +#curl --silent --dump - -L -X PUT "$ETCD/v2/keys/$NAME/Current/Databases/@Usystem/Pavel" -d 'value={"name":"system"}}' || exit 1 +#curl --silent --dump - -L -X PUT "$ETCD/v2/keys/$NAME/Current/Databases/@Usystem/Perry" -d 'value={"name":"system"}}' || exit 1 echo echo start arangod with: diff --git a/js/server/modules/org/arangodb/cluster.js b/js/server/modules/org/arangodb/cluster.js index 05496f4668..a4a3a46b43 100644 --- a/js/server/modules/org/arangodb/cluster.js +++ b/js/server/modules/org/arangodb/cluster.js @@ -37,7 +37,26 @@ var ArangoCollection = arangodb.ArangoCollection; /// @brief get values from Plan or Current by a prefix //////////////////////////////////////////////////////////////////////////////// -function getByPrefix (values, prefix, multiDimensional) { +function getByPrefix (values, prefix) { + var result = { }; + var a; + var n = prefix.length; + + for (a in values) { + if (values.hasOwnProperty(a)) { + if (a.substr(0, n) === prefix) { + result[a.substr(n)] = values[a]; + } + } + } + return result; +} + +//////////////////////////////////////////////////////////////////////////////// +/// @brief get values from Plan or Current by a prefix +//////////////////////////////////////////////////////////////////////////////// + +function getByPrefix3d (values, prefix) { var result = { }; var a; var n = prefix.length; @@ -46,16 +65,41 @@ function getByPrefix (values, prefix, multiDimensional) { if (values.hasOwnProperty(a)) { if (a.substr(0, n) === prefix) { var key = a.substr(n); - - if (multiDimensional) { - var parts = key.split('/'); + var parts = key.split('/'); + if (parts.length >= 2) { if (! result.hasOwnProperty(parts[0])) { result[parts[0]] = { }; } result[parts[0]][parts[1]] = values[a]; } - else { - result[key] = values[a]; + } + } + } + return result; +} + +//////////////////////////////////////////////////////////////////////////////// +/// @brief get values from Plan or Current by a prefix +//////////////////////////////////////////////////////////////////////////////// + +function getByPrefix4d (values, prefix) { + var result = { }; + var a; + var n = prefix.length; + + for (a in values) { + if (values.hasOwnProperty(a)) { + if (a.substr(0, n) === prefix) { + var key = a.substr(n); + var parts = key.split('/'); + if (parts.length >= 3) { + if (! result.hasOwnProperty(parts[0])) { + result[parts[0]] = { }; + } + if (! result[parts[0]].hasOwnProperty(parts[1])) { + result[parts[0]][parts[1]] = { }; + } + result[parts[0]][parts[1]][parts[2]] = values[a]; } } } @@ -190,30 +234,31 @@ function createLocalDatabases (plannedDatabases) { // check which databases need to be created locally for (name in plannedDatabases) { if (plannedDatabases.hasOwnProperty(name)) { + var payload = plannedDatabases[name]; + payload.error = false; + payload.errorNum = 0; + payload.errorMessage = "no error"; + if (! localDatabases.hasOwnProperty(name)) { // must create database - var payload = plannedDatabases[name]; // TODO: handle options and user information console.info("creating local database '%s'", payload.name); try { db._createDatabase(payload.name); - payload.error = false; - payload.errorNum = 0; - payload.errorMessage = "no error"; } catch (err) { payload.error = true; payload.errorNum = err.errorNum; payload.errorMessage = err.errorMessage; } - - writeLocked({ part: "Current" }, - createDatabaseAgency, - [ payload ]); } + + writeLocked({ part: "Current" }, + createDatabaseAgency, + [ payload ]); } } } @@ -262,7 +307,7 @@ function dropLocalDatabases (plannedDatabases) { function cleanupCurrentDatabases () { var ourselves = ArangoServerState.id(); - + var dropDatabaseAgency = function (payload) { try { ArangoAgency.remove("Current/Databases/" + payload.name + "/" + ourselves); @@ -275,7 +320,7 @@ function cleanupCurrentDatabases () { db._useDatabase("_system"); var all = ArangoAgency.get("Current/Databases", true); - var currentDatabases = getByPrefix(all, "Current/Databases/", true); + var currentDatabases = getByPrefix3d(all, "Current/Databases/"); var localDatabases = getLocalDatabases(); var name; @@ -286,7 +331,7 @@ function cleanupCurrentDatabases () { if (currentDatabases[name].hasOwnProperty(ourselves)) { // we are entered for a database that we don't have locally - console.info("remvoing entry for local database '%s'", name); + console.info("cleaning up entry for unknown database '%s'", name); writeLocked({ part: "Current" }, dropDatabaseAgency, @@ -509,8 +554,60 @@ function dropLocalCollections (plannedCollections) { } } -function cleanupCurrentCollections () { +//////////////////////////////////////////////////////////////////////////////// +/// @brief clean up what's in Current/Collections for ourselves +//////////////////////////////////////////////////////////////////////////////// + +function cleanupCurrentCollections (plannedCollections) { + var ourselves = ArangoServerState.id(); + + var dropCollectionAgency = function (database, collection, shardID) { + try { + ArangoAgency.remove("Current/Collections/" + database + "/" + collection + "/" + shardID); + } + catch (err) { + // ignore errors + } + }; + db._useDatabase("_system"); + + var all = ArangoAgency.get("Current/Collections", true); + var currentCollections = getByPrefix4d(all, "Current/Collections/"); + var shardMap = getShardMap(plannedCollections); + var database; + + for (database in currentCollections) { + if (currentCollections.hasOwnProperty(database)) { + var collections = currentCollections[database]; + var collection; + + for (collection in collections) { + if (collections.hasOwnProperty(collection)) { + var shards = collections[collection]; + var shard; + + for (shard in shards) { + if (shards.hasOwnProperty(shard)) { + if (! shardMap.hasOwnProperty(shard) || + shardMap[shard] !== ourselves) { + + console.info("cleaning up entry for unknown shard '%s' of '%s/%s", + shard, + database, + collection); + + writeLocked({ part: "Current" }, + dropCollectionAgency, + [ database, collection, shard ]); + } + } + } + } + } + + } + } } //////////////////////////////////////////////////////////////////////////////// @@ -518,11 +615,11 @@ function cleanupCurrentCollections () { //////////////////////////////////////////////////////////////////////////////// function handleCollectionChanges (plan, current) { - var plannedCollections = getByPrefix(plan, "Plan/Collections/", true); + var plannedCollections = getByPrefix3d(plan, "Plan/Collections/"); createLocalCollections(plannedCollections); dropLocalCollections(plannedCollections); - cleanupCurrentCollections(); + cleanupCurrentCollections(plannedCollections); } //////////////////////////////////////////////////////////////////////////////// From 2d44e85f6d32e6682663843465904bc1c5406a0c Mon Sep 17 00:00:00 2001 From: Max Neunhoeffer Date: Wed, 22 Jan 2014 16:20:05 +0100 Subject: [PATCH 4/9] Add hashing of TRI_json_t to uint64_t. --- lib/BasicsC/json-utilities.c | 109 +++++++++++++++++++++++++++++++++++ lib/BasicsC/json-utilities.h | 15 +++++ 2 files changed, 124 insertions(+) diff --git a/lib/BasicsC/json-utilities.c b/lib/BasicsC/json-utilities.c index 658c5deb73..49d83c79e1 100644 --- a/lib/BasicsC/json-utilities.c +++ b/lib/BasicsC/json-utilities.c @@ -27,6 +27,7 @@ #include "BasicsC/json-utilities.h" #include "BasicsC/string-buffer.h" +#include "BasicsC/hashes.h" // ----------------------------------------------------------------------------- // --SECTION-- private functions @@ -766,6 +767,114 @@ TRI_json_t* TRI_MergeJson (TRI_memory_zone_t* zone, return result; } +//////////////////////////////////////////////////////////////////////////////// +/// @brief compute a hash value for a JSON document, starting with a given +/// initial hash value. Note that a NULL pointer for json hashes to the +/// same value as a json pointer that points to a JSON value `null`. +//////////////////////////////////////////////////////////////////////////////// + +static uint64_t HashJsonRecursive (uint64_t hash, TRI_json_t const* object) { + size_t n; + size_t i; + uint64_t tmphash; + TRI_json_t const* subjson; + + if (0 == object) { + return TRI_FnvHashBlock(hash, "null", 4); // strlen("null") + } + switch (object->_type) { + case TRI_JSON_UNUSED: { + return hash; + } + + case TRI_JSON_NULL: { + return TRI_FnvHashBlock(hash, "null", 4); // strlen("null") + } + + case TRI_JSON_BOOLEAN: { + if (object->_value._boolean) { + return TRI_FnvHashBlock(hash, "true", 4); // strlen("true") + } + else { + return TRI_FnvHashBlock(hash, "false", 5); // strlen("true") + } + } + + case TRI_JSON_NUMBER: { + return TRI_FnvHashBlock(hash, (char const*) &(object->_value._number), + sizeof(object->_value._number)); + } + + case TRI_JSON_STRING: + case TRI_JSON_STRING_REFERENCE: { + return TRI_FnvHashBlock(hash, object->_value._string.data, + object->_value._string.length); + } + + case TRI_JSON_ARRAY: { + n = object->_value._objects._length; + tmphash = hash; + for (i = 0; i < n; i += 2) { + subjson = (const TRI_json_t*) TRI_AtVector(&object->_value._objects, i); + assert(TRI_IsStringJson(subjson)); + tmphash ^= HashJsonRecursive(hash, subjson); + subjson = (const TRI_json_t*) TRI_AtVector(&object->_value._objects, + i+1); + tmphash ^= HashJsonRecursive(hash, subjson); + } + return tmphash; + } + + case TRI_JSON_LIST: { + n = object->_value._objects._length; + for (i = 0; i < n; ++i) { + subjson = (const TRI_json_t*) TRI_AtVector(&object->_value._objects, i); + hash = HashJsonRecursive(hash, subjson); + } + return hash; + } + } + return hash; // never reached +} + +//////////////////////////////////////////////////////////////////////////////// +/// @brief compute a hash value for a JSON document. Note that a NULL pointer +/// for json hashes to the same value as a json pointer that points to a +/// JSON value `null`. +//////////////////////////////////////////////////////////////////////////////// + +uint64_t TRI_HashJson (TRI_json_t const* json) { + return HashJsonRecursive(TRI_FnvHashBlockInitial(), json); +} + +//////////////////////////////////////////////////////////////////////////////// +/// @brief compute a hash value for a JSON document depending on a list +/// of attributes. This is used for sharding to map documents to shards. +/// +/// The attributes array `attributes` has to contain exactly `nrAttributes` +/// pointers to zero-terminated strings. +/// Note that all JSON values given for `json` that are not JSON arrays +/// hash to the same value, which is not the same value a JSON array gets +/// that does not contain any of the specified attributes. +//////////////////////////////////////////////////////////////////////////////// + +uint64_t TRI_HashJsonByAttributes (TRI_json_t const* json, + char const *attributes[], + int nrAttributes) { + int i; + TRI_json_t const* subjson; + uint64_t hash; + + hash = TRI_FnvHashBlockInitial(); + if (TRI_IsArrayJson(json)) { + for (i = 0; i < nrAttributes; i++) { + subjson = TRI_LookupArrayJson(json, attributes[i]); + hash = HashJsonRecursive(hash, subjson); + } + } + return hash; +} + //////////////////////////////////////////////////////////////////////////////// /// @} //////////////////////////////////////////////////////////////////////////////// diff --git a/lib/BasicsC/json-utilities.h b/lib/BasicsC/json-utilities.h index 3b703af22a..2c7df7afae 100644 --- a/lib/BasicsC/json-utilities.h +++ b/lib/BasicsC/json-utilities.h @@ -137,6 +137,21 @@ TRI_json_t* TRI_MergeJson (TRI_memory_zone_t*, const TRI_json_t* const, const bool); +//////////////////////////////////////////////////////////////////////////////// +/// @brief compute a hash value for a JSON document. +//////////////////////////////////////////////////////////////////////////////// + +uint64_t TRI_HashJson (TRI_json_t const* json); + +//////////////////////////////////////////////////////////////////////////////// +/// @brief compute a hash value for a JSON document depending on a list +/// of attributes. +//////////////////////////////////////////////////////////////////////////////// + +uint64_t TRI_HashJsonByAttributes (TRI_json_t const* json, + char const *attributes[], + int nrAttributes); + //////////////////////////////////////////////////////////////////////////////// /// @} //////////////////////////////////////////////////////////////////////////////// From 0b2ff098f36adba9aa0b7a148b0b532f1109cede Mon Sep 17 00:00:00 2001 From: Max Neunhoeffer Date: Wed, 22 Jan 2014 16:20:34 +0100 Subject: [PATCH 5/9] Start to use shared_ptr from boost. --- lib/Basics/Common.h | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/lib/Basics/Common.h b/lib/Basics/Common.h index de57744137..8cc0605f28 100644 --- a/lib/Basics/Common.h +++ b/lib/Basics/Common.h @@ -88,6 +88,15 @@ namespace triagens { }; } +// ----------------------------------------------------------------------------- +// --SECTION-- modern C++ stuff +// ----------------------------------------------------------------------------- + +#include + +#define TRI_shared_ptr boost::shared_ptr + + //////////////////////////////////////////////////////////////////////////////// /// @} //////////////////////////////////////////////////////////////////////////////// From 8d3282f237692438126c59611ae0e80eaa15e87b Mon Sep 17 00:00:00 2001 From: Max Neunhoeffer Date: Wed, 22 Jan 2014 16:21:26 +0100 Subject: [PATCH 6/9] Add support to read off responsible shard of a document. Untested, but compiles. --- arangod/Cluster/ClusterInfo.cpp | 79 +++++++++++++++++++++++++++++++++ arangod/Cluster/ClusterInfo.h | 16 ++++++- 2 files changed, 94 insertions(+), 1 deletion(-) diff --git a/arangod/Cluster/ClusterInfo.cpp b/arangod/Cluster/ClusterInfo.cpp index eddfbf17aa..55a3cd86af 100644 --- a/arangod/Cluster/ClusterInfo.cpp +++ b/arangod/Cluster/ClusterInfo.cpp @@ -30,6 +30,7 @@ #include "BasicsC/conversions.h" #include "BasicsC/json.h" +#include "BasicsC/json-utilities.h" #include "BasicsC/logging.h" #include "Basics/JsonHelper.h" #include "Basics/ReadLocker.h" @@ -550,6 +551,7 @@ void ClusterInfo::loadPlannedCollections (bool acquireLock) { WRITE_LOCKER(_lock); _collections.clear(); + _shards.clear(); std::map::iterator it = result._values.begin(); @@ -583,6 +585,20 @@ void ClusterInfo::loadPlannedCollections (bool acquireLock) { (*it).second._json = 0; const CollectionInfo collectionData(json); + vector* shardKeys = new vector; + *shardKeys = collectionData.shardKeys(); + _shardKeys.insert( + make_pair > > + (collection, TRI_shared_ptr > (shardKeys))); + map shardIDs = collectionData.shardIds(); + vector* shards = new vector; + map::iterator it3; + for (it3 = shardIDs.begin(); it3 != shardIDs.end(); ++it3) { + shards->push_back(it3->first); + } + _shards.insert( + make_pair > > + (collection,TRI_shared_ptr >(shards))); // insert the collection into the existing map @@ -1408,6 +1424,69 @@ ServerID ClusterInfo::getResponsibleServer (ShardID const& shardID) { return ServerID(""); } +//////////////////////////////////////////////////////////////////////////////// +/// @brief find the shard that is responsible for a document, which is given +/// as a TRI_json_t const*. +/// +/// There are two modes, one assumes that the document is given as a +/// whole (`docComplete`==`true`), in this case, the non-existence of +/// values for some of the sharding attributes is silently ignored +/// and treated as if these values were `null`. In the second mode +/// (`docComplete`==false) leads to an error which is reported by +/// returning an empty string as the shardID. +//////////////////////////////////////////////////////////////////////////////// + +ShardID ClusterInfo::getResponsibleShard (CollectionID const& collectionID, + TRI_json_t const* json, + bool docComplete) { + // Note that currently we take the number of shards and the shardKeys + // from Plan, since they are immutable. Later we will have to switch + // this to Current, when we allow to add and remove shards. + if (!_collectionsValid) { + loadPlannedCollections(); + } + + int tries = 0; + TRI_shared_ptr > shardKeysPtr; + char const** shardKeys = 0; + int nrShardKeys = 0; + TRI_shared_ptr > shards; + + while (++tries <= 2) { + { + // Get the sharding keys and the number of shards: + READ_LOCKER(_lock); + map > >::iterator it + = _shards.find(collectionID); + if (it != _shards.end()) { + shards = it->second; + map > >::iterator it2 + = _shardKeys.find(collectionID); + if (it2 != _shardKeys.end()) { + shardKeysPtr = it2->second; + shardKeys = new char const * [shardKeysPtr->size()]; + if (shardKeys != 0) { + size_t i; + for (i = 0; i < shardKeysPtr->size(); ++i) { + shardKeys[i] = shardKeysPtr->at(i).c_str(); + } + break; // all OK + } + } + } + } + loadPlannedCollections(); + } + if (0 == shardKeys) { + return string(""); + } + + uint64_t hash = TRI_HashJsonByAttributes(json, shardKeys, nrShardKeys); + delete[] shardKeys; + + return shards->at(hash % shards->size()); +} + // Local Variables: // mode: outline-minor // outline-regexp: "^\\(/// @brief\\|/// {@inheritDoc}\\|/// @addtogroup\\|// --SECTION--\\|/// @\\}\\)" diff --git a/arangod/Cluster/ClusterInfo.h b/arangod/Cluster/ClusterInfo.h index 377b6ff0fc..5c38bf5f96 100644 --- a/arangod/Cluster/ClusterInfo.h +++ b/arangod/Cluster/ClusterInfo.h @@ -881,6 +881,13 @@ namespace triagens { ServerID getResponsibleServer (ShardID const&); +//////////////////////////////////////////////////////////////////////////////// +/// @brief find the shard that is responsible for a document +//////////////////////////////////////////////////////////////////////////////// + + ShardID getResponsibleShard (CollectionID const&, TRI_json_t const*, + bool docComplete); + private: //////////////////////////////////////////////////////////////////////////////// @@ -948,7 +955,14 @@ namespace triagens { // from Current/DBServers bool _DBServersValid; std::map _shardIds; - // from Plan/Collections/ ??? + // from Current/Collections/ + std::map > > + _shards; + // from Plan/Collections/ + // (may later come from Current/Colletions/ ) + std::map > > + _shardKeys; + // from Plan/Collections/ // ----------------------------------------------------------------------------- // --SECTION-- private static variables From 39fd7611edc16a3ec91a16111aef72d1c9a91da9 Mon Sep 17 00:00:00 2001 From: Jan Steemann Date: Wed, 22 Jan 2014 17:20:06 +0100 Subject: [PATCH 7/9] load/unload of collections --- arangod/Cluster/ClusterInfo.cpp | 65 +++++++++++++++++++++++ arangod/Cluster/ClusterInfo.h | 8 +++ arangod/V8Server/v8-vocbase.cpp | 61 ++++++++++++++++++--- js/actions/api-database.js | 25 ++++++--- js/server/modules/org/arangodb/cluster.js | 26 ++++++++- 5 files changed, 168 insertions(+), 17 deletions(-) diff --git a/arangod/Cluster/ClusterInfo.cpp b/arangod/Cluster/ClusterInfo.cpp index 64aef2ca66..a72f18521d 100644 --- a/arangod/Cluster/ClusterInfo.cpp +++ b/arangod/Cluster/ClusterInfo.cpp @@ -1204,6 +1204,71 @@ int ClusterInfo::dropCollectionCoordinator (string const& databaseName, return setErrormsg(TRI_ERROR_CLUSTER_TIMEOUT, errorMsg); } +//////////////////////////////////////////////////////////////////////////////// +/// @brief set collection status in coordinator +//////////////////////////////////////////////////////////////////////////////// + +int ClusterInfo::setCollectionStatusCoordinator (string const& databaseName, + string const& collectionID, + TRI_vocbase_col_status_e status) { + AgencyComm ac; + AgencyCommResult res; + + AgencyCommLocker locker("Plan", "WRITE"); + + if (! locker.successful()) { + return TRI_ERROR_CLUSTER_COULD_NOT_LOCK_PLAN; + } + + if (! ac.exists("Plan/Databases/" + databaseName)) { + return TRI_ERROR_ARANGO_DATABASE_NOT_FOUND; + } + + res = ac.getValues("Plan/Collections/" + databaseName + "/" + collectionID, false); + + if (! res.successful()) { + return TRI_ERROR_ARANGO_COLLECTION_NOT_FOUND; + } + + res.parse("", false); + std::map::const_iterator it = res._values.begin(); + + if (it == res._values.end()) { + return TRI_ERROR_ARANGO_COLLECTION_NOT_FOUND; + } + + TRI_json_t* json = (*it).second._json; + if (json == 0) { + return TRI_ERROR_OUT_OF_MEMORY; + } + + TRI_vocbase_col_status_e old = triagens::basics::JsonHelper::getNumericValue(json, "status", TRI_VOC_COL_STATUS_CORRUPTED); + + if (old == status) { + // no status change + return TRI_ERROR_NO_ERROR; + } + + TRI_json_t* copy = TRI_CopyJson(TRI_UNKNOWN_MEM_ZONE, json); + if (copy == 0) { + return TRI_ERROR_OUT_OF_MEMORY; + } + + TRI_DeleteArrayJson(TRI_UNKNOWN_MEM_ZONE, copy, "status"); + TRI_Insert3ArrayJson(TRI_UNKNOWN_MEM_ZONE, copy, "status", TRI_CreateNumberJson(TRI_UNKNOWN_MEM_ZONE, status)); + + res = ac.setValue("Plan/Collections/" + databaseName + "/" + collectionID, copy, 0.0); + + TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, copy); + + if (res.successful()) { + loadPlannedCollections(false); + return TRI_ERROR_NO_ERROR; + } + + return TRI_ERROR_INTERNAL; +} + //////////////////////////////////////////////////////////////////////////////// /// @brief (re-)load the information about servers from the agency /// Usually one does not have to call this directly. diff --git a/arangod/Cluster/ClusterInfo.h b/arangod/Cluster/ClusterInfo.h index 377b6ff0fc..2478fab403 100644 --- a/arangod/Cluster/ClusterInfo.h +++ b/arangod/Cluster/ClusterInfo.h @@ -837,6 +837,14 @@ namespace triagens { string& errorMsg, double timeout); +//////////////////////////////////////////////////////////////////////////////// +/// @brief set collection status in coordinator +//////////////////////////////////////////////////////////////////////////////// + + int setCollectionStatusCoordinator (string const& databaseName, + string const& collectionID, + TRI_vocbase_col_status_e status); + //////////////////////////////////////////////////////////////////////////////// /// @brief (re-)load the information about all DBservers from the agency /// Usually one does not have to call this directly. diff --git a/arangod/V8Server/v8-vocbase.cpp b/arangod/V8Server/v8-vocbase.cpp index c3100144c5..1aa0260f9b 100644 --- a/arangod/V8Server/v8-vocbase.cpp +++ b/arangod/V8Server/v8-vocbase.cpp @@ -4902,7 +4902,12 @@ static v8::Handle JS_CountVocbaseCol (v8::Arguments const& argv) { TRI_V8_EXCEPTION_INTERNAL(scope, "cannot extract collection"); } - TRI_SHARDING_COLLECTION_NOT_YET_IMPLEMENTED(scope, collection); +#ifdef TRI_ENABLE_CLUSTER + if (ServerState::instance()->isCoordinator()) { + // TODO: fix this + return scope.Close(v8::Number::New(0)); + } +#endif CollectionNameResolver resolver(collection->_vocbase); ReadTransactionType trx(collection->_vocbase, resolver, collection->_cid); @@ -6114,6 +6119,31 @@ static v8::Handle JS_GetIndexesVocbaseCol (v8::Arguments const& argv) static v8::Handle JS_LoadVocbaseCol (v8::Arguments const& argv) { v8::HandleScope scope; +#ifdef TRI_ENABLE_CLUSTER + if (ServerState::instance()->isCoordinator()) { + TRI_vocbase_col_t const* collection = TRI_UnwrapClass(argv.Holder(), WRP_VOCBASE_COL_TYPE); + + if (collection == 0) { + TRI_V8_EXCEPTION_INTERNAL(scope, "cannot extract collection"); + } + + string const databaseName(collection->_dbName); + + if (! ClusterInfo::instance()->doesDatabaseExist(databaseName)) { + TRI_V8_EXCEPTION_PARAMETER(scope, "selected database is not a cluster database"); + } + + const std::string cid = StringUtils::itoa(collection->_cid); + int res = ClusterInfo::instance()->setCollectionStatusCoordinator(databaseName, cid, TRI_VOC_COL_STATUS_LOADED); + + if (res != TRI_ERROR_NO_ERROR) { + TRI_V8_EXCEPTION(scope, res); + } + + return scope.Close(v8::Undefined()); + } +#endif + v8::Handle err; TRI_vocbase_col_t const* collection = UseCollection(argv.Holder(), &err); @@ -6251,8 +6281,8 @@ static v8::Handle JS_PropertiesVocbaseCol (v8::Arguments const& argv) #ifdef TRI_ENABLE_CLUSTER if (! collection->_isLocal) { - char const* originalDatabase = GetCurrentDatabaseName(); - TRI_col_info_t info = ClusterInfo::instance()->getCollectionProperties(std::string(originalDatabase), StringUtils::itoa(collection->_cid)); + string const databaseName(collection->_dbName); + TRI_col_info_t info = ClusterInfo::instance()->getCollectionProperties(databaseName, StringUtils::itoa(collection->_cid)); // return the current parameter set v8::Handle result = v8::Object::New(); @@ -7179,13 +7209,30 @@ static v8::Handle JS_UnloadVocbaseCol (v8::Arguments const& argv) { if (collection == 0) { TRI_V8_EXCEPTION_INTERNAL(scope, "cannot extract collection"); } - - TRI_SHARDING_COLLECTION_NOT_YET_IMPLEMENTED(scope, collection); - int res = TRI_UnloadCollectionVocBase(collection->_vocbase, collection, false); + int res; + +#ifdef TRI_ENABLE_CLUSTER + if (ServerState::instance()->isCoordinator()) { + string const databaseName(collection->_dbName); + + if (! ClusterInfo::instance()->doesDatabaseExist(databaseName)) { + TRI_V8_EXCEPTION_PARAMETER(scope, "selected database is not a cluster database"); + } + + res = ClusterInfo::instance()->setCollectionStatusCoordinator(databaseName, StringUtils::itoa(collection->_cid), TRI_VOC_COL_STATUS_UNLOADED); + } + else { + res = TRI_UnloadCollectionVocBase(collection->_vocbase, collection, false); + } +#else + + res = TRI_UnloadCollectionVocBase(collection->_vocbase, collection, false); + +#endif if (res != TRI_ERROR_NO_ERROR) { - TRI_V8_EXCEPTION_MESSAGE(scope, res, "cannot unload collection"); + TRI_V8_EXCEPTION(scope, res); } return scope.Close(v8::Undefined()); diff --git a/js/actions/api-database.js b/js/actions/api-database.js index 4dc2d5e029..723e0fdff0 100644 --- a/js/actions/api-database.js +++ b/js/actions/api-database.js @@ -1,5 +1,5 @@ /*jslint indent: 2, nomen: true, maxlen: 150, sloppy: true, vars: true, white: true, plusplus: true, stupid: true */ -/*global require */ +/*global require, ArangoAgency */ //////////////////////////////////////////////////////////////////////////////// /// @brief database management @@ -30,6 +30,7 @@ var arangodb = require("org/arangodb"); var actions = require("org/arangodb/actions"); +var cluster = require("org/arangodb/cluster"); var API = "_api/database"; @@ -179,13 +180,21 @@ function get_api_database (req, res) { result = arangodb.db._listDatabases(username, password, auth); } else if (req.suffix[0] === 'current') { - // information about the current database - result = { - name: arangodb.db._name(), - id: arangodb.db._id(), - path: arangodb.db._path(), - isSystem: arangodb.db._isSystem() - }; + if (cluster.isCoordinator()) { + // fetch database information from Agency + var values = ArangoAgency.get("Plan/Databases/" + req.originalDatabase, false); + // TODO: check if this information is sufficient + result = values["Plan/Databases/" + req.originalDatabase]; + } + else { + // information about the current database + result = { + name: arangodb.db._name(), + id: arangodb.db._id(), + path: arangodb.db._path(), + isSystem: arangodb.db._isSystem() + }; + } } else { actions.resultBad(req, res, arangodb.ERROR_HTTP_BAD_PARAMETER); diff --git a/js/server/modules/org/arangodb/cluster.js b/js/server/modules/org/arangodb/cluster.js index a4a3a46b43..5165213055 100644 --- a/js/server/modules/org/arangodb/cluster.js +++ b/js/server/modules/org/arangodb/cluster.js @@ -432,6 +432,29 @@ function createLocalCollections (plannedCollections) { [ database, shard, payload ]); } else { + if (localCollections[shard].status !== payload.status) { + console.info("detected status change for local shard '%s/%s'", + database, + shard); + + if (payload.status === ArangoCollection.STATUS_UNLOADED) { + console.info("unloading local shard '%s/%s'", + database, + shard); + db._collection(shard).unload(); + } + else if (payload.status === ArangoCollection.STATUS_LOADED) { + console.info("loading local shard '%s/%s'", + database, + shard); + db._collection(shard).load(); + } + + writeLocked({ part: "Current" }, + createCollectionAgency, + [ database, shard, payload ]); + } + // collection exists, now compare collection properties var properties = { }; var cmp = [ "journalSize", "waitForSync", "doCompact" ], i; @@ -442,7 +465,7 @@ function createLocalCollections (plannedCollections) { properties[p] = payload[p]; } } - + if (Object.keys(properties).length > 0) { console.info("updating properties for local shard '%s/%s'", database, @@ -453,7 +476,6 @@ function createLocalCollections (plannedCollections) { payload.error = false; payload.errorNum = 0; payload.errorMessage = "no error"; - } catch (err3) { payload.error = true; From 27276205c41345489a10341dcc688799c7cd06a7 Mon Sep 17 00:00:00 2001 From: Jan Steemann Date: Wed, 22 Jan 2014 17:46:26 +0100 Subject: [PATCH 8/9] collection property changes --- arangod/Cluster/ClusterInfo.cpp | 63 +++++++++++++++++++++++ arangod/Cluster/ClusterInfo.h | 8 +++ arangod/V8Server/v8-vocbase.cpp | 88 +++++++++++++++++++++++++++------ 3 files changed, 143 insertions(+), 16 deletions(-) diff --git a/arangod/Cluster/ClusterInfo.cpp b/arangod/Cluster/ClusterInfo.cpp index 03ddf84db7..16b8c50505 100644 --- a/arangod/Cluster/ClusterInfo.cpp +++ b/arangod/Cluster/ClusterInfo.cpp @@ -1220,6 +1220,69 @@ int ClusterInfo::dropCollectionCoordinator (string const& databaseName, return setErrormsg(TRI_ERROR_CLUSTER_TIMEOUT, errorMsg); } +//////////////////////////////////////////////////////////////////////////////// +/// @brief set collection properties in coordinator +//////////////////////////////////////////////////////////////////////////////// + +int ClusterInfo::setCollectionPropertiesCoordinator (string const& databaseName, + string const& collectionID, + TRI_col_info_t const* info) { + AgencyComm ac; + AgencyCommResult res; + + AgencyCommLocker locker("Plan", "WRITE"); + + if (! locker.successful()) { + return TRI_ERROR_CLUSTER_COULD_NOT_LOCK_PLAN; + } + + if (! ac.exists("Plan/Databases/" + databaseName)) { + return TRI_ERROR_ARANGO_DATABASE_NOT_FOUND; + } + + res = ac.getValues("Plan/Collections/" + databaseName + "/" + collectionID, false); + + if (! res.successful()) { + return TRI_ERROR_ARANGO_COLLECTION_NOT_FOUND; + } + + res.parse("", false); + std::map::const_iterator it = res._values.begin(); + + if (it == res._values.end()) { + return TRI_ERROR_ARANGO_COLLECTION_NOT_FOUND; + } + + TRI_json_t* json = (*it).second._json; + if (json == 0) { + return TRI_ERROR_OUT_OF_MEMORY; + } + + TRI_json_t* copy = TRI_CopyJson(TRI_UNKNOWN_MEM_ZONE, json); + if (copy == 0) { + return TRI_ERROR_OUT_OF_MEMORY; + } + + TRI_DeleteArrayJson(TRI_UNKNOWN_MEM_ZONE, copy, "doCompact"); + TRI_DeleteArrayJson(TRI_UNKNOWN_MEM_ZONE, copy, "journalSize"); + TRI_DeleteArrayJson(TRI_UNKNOWN_MEM_ZONE, copy, "waitForSync"); + + TRI_Insert3ArrayJson(TRI_UNKNOWN_MEM_ZONE, copy, "doCompact", TRI_CreateBooleanJson(TRI_UNKNOWN_MEM_ZONE, info->_doCompact)); + TRI_Insert3ArrayJson(TRI_UNKNOWN_MEM_ZONE, copy, "journalSize", TRI_CreateNumberJson(TRI_UNKNOWN_MEM_ZONE, info->_maximalSize)); + TRI_Insert3ArrayJson(TRI_UNKNOWN_MEM_ZONE, copy, "waitForSync", TRI_CreateBooleanJson(TRI_UNKNOWN_MEM_ZONE, info->_waitForSync)); + + res = ac.setValue("Plan/Collections/" + databaseName + "/" + collectionID, copy, 0.0); + + TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, copy); + + if (res.successful()) { + loadPlannedCollections(false); + return TRI_ERROR_NO_ERROR; + } + + return TRI_ERROR_INTERNAL; +} + //////////////////////////////////////////////////////////////////////////////// /// @brief set collection status in coordinator //////////////////////////////////////////////////////////////////////////////// diff --git a/arangod/Cluster/ClusterInfo.h b/arangod/Cluster/ClusterInfo.h index 828fe4c883..b710b780e7 100644 --- a/arangod/Cluster/ClusterInfo.h +++ b/arangod/Cluster/ClusterInfo.h @@ -837,6 +837,14 @@ namespace triagens { string& errorMsg, double timeout); +//////////////////////////////////////////////////////////////////////////////// +/// @brief set collection properties in coordinator +//////////////////////////////////////////////////////////////////////////////// + + int setCollectionPropertiesCoordinator (string const& databaseName, + string const& collectionID, + TRI_col_info_t const*); + //////////////////////////////////////////////////////////////////////////////// /// @brief set collection status in coordinator //////////////////////////////////////////////////////////////////////////////// diff --git a/arangod/V8Server/v8-vocbase.cpp b/arangod/V8Server/v8-vocbase.cpp index 1aa0260f9b..e91545cb69 100644 --- a/arangod/V8Server/v8-vocbase.cpp +++ b/arangod/V8Server/v8-vocbase.cpp @@ -2031,13 +2031,13 @@ static v8::Handle CreateVocBase (v8::Arguments const& argv, #ifdef TRI_ENABLE_CLUSTER if (ServerState::instance()->isCoordinator()) { - char const* originalDatabase = GetCurrentDatabaseName(); - if (! ClusterInfo::instance()->doesDatabaseExist(originalDatabase)) { + char const* databaseName = GetCurrentDatabaseName(); + if (! ClusterInfo::instance()->doesDatabaseExist(databaseName)) { TRI_FreeCollectionInfoOptions(¶meter); TRI_V8_EXCEPTION_PARAMETER(scope, "selected database is not a cluster database"); } - v8::Handle result = CreateCollectionCoordinator(argv, collectionType, originalDatabase, parameter, vocbase); + v8::Handle result = CreateCollectionCoordinator(argv, collectionType, databaseName, parameter, vocbase); TRI_FreeCollectionInfoOptions(¶meter); return scope.Close(result); @@ -6281,9 +6281,65 @@ static v8::Handle JS_PropertiesVocbaseCol (v8::Arguments const& argv) #ifdef TRI_ENABLE_CLUSTER if (! collection->_isLocal) { - string const databaseName(collection->_dbName); + std::string const databaseName = std::string(collection->_dbName); TRI_col_info_t info = ClusterInfo::instance()->getCollectionProperties(databaseName, StringUtils::itoa(collection->_cid)); + if (0 < argv.Length()) { + v8::Handle par = argv[0]; + + if (par->IsObject()) { + v8::Handle po = par->ToObject(); + + // extract doCompact flag + if (po->Has(v8g->DoCompactKey)) { + info._doCompact = TRI_ObjectToBoolean(po->Get(v8g->DoCompactKey)); + } + + // extract sync flag + if (po->Has(v8g->WaitForSyncKey)) { + info._waitForSync = TRI_ObjectToBoolean(po->Get(v8g->WaitForSyncKey)); + } + + // extract the journal size + if (po->Has(v8g->JournalSizeKey)) { + info._maximalSize = (TRI_voc_size_t) TRI_ObjectToUInt64(po->Get(v8g->JournalSizeKey), false); + + if (info._maximalSize < TRI_JOURNAL_MINIMAL_SIZE) { + if (info._keyOptions != 0) { + TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, info._keyOptions); + } + TRI_V8_EXCEPTION_PARAMETER(scope, ".journalSize too small"); + } + } + + if (po->Has(v8g->IsVolatileKey)) { + if (TRI_ObjectToBoolean(po->Get(v8g->IsVolatileKey)) != info._isVolatile) { + if (info._keyOptions != 0) { + TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, info._keyOptions); + } + TRI_V8_EXCEPTION_PARAMETER(scope, "isVolatile option cannot be changed at runtime"); + } + } + + if (info._isVolatile && info._waitForSync) { + if (info._keyOptions != 0) { + TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, info._keyOptions); + } + TRI_V8_EXCEPTION_PARAMETER(scope, "volatile collections do not support the waitForSync option"); + } + } + + int res = ClusterInfo::instance()->setCollectionPropertiesCoordinator(databaseName, StringUtils::itoa(collection->_cid), &info); + + if (res != TRI_ERROR_NO_ERROR) { + if (info._keyOptions != 0) { + TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, info._keyOptions); + } + TRI_V8_EXCEPTION(scope, res); + } + } + + // return the current parameter set v8::Handle result = v8::Object::New(); @@ -7045,13 +7101,13 @@ static v8::Handle JS_StatusVocbaseCol (v8::Arguments const& argv) { #ifdef TRI_ENABLE_CLUSTER if (ServerState::instance()->isCoordinator()) { - char const* originalDatabase = GetCurrentDatabaseName(); + std::string const databaseName = std::string(collection->_dbName); - if (! ClusterInfo::instance()->doesDatabaseExist(originalDatabase)) { + if (! ClusterInfo::instance()->doesDatabaseExist(databaseName)) { TRI_V8_EXCEPTION_PARAMETER(scope, "selected database is not a cluster database"); } - CollectionInfo const& ci = ClusterInfo::instance()->getCollection(originalDatabase, StringUtils::itoa(collection->_cid)); + CollectionInfo const& ci = ClusterInfo::instance()->getCollection(databaseName, StringUtils::itoa(collection->_cid)); return scope.Close(v8::Number::New((int) ci.status())); } // fallthru intentional @@ -7168,13 +7224,13 @@ static v8::Handle JS_TypeVocbaseCol (v8::Arguments const& argv) { #ifdef TRI_ENABLE_CLUSTER if (ServerState::instance()->isCoordinator()) { - char const* originalDatabase = GetCurrentDatabaseName(); + std::string const databaseName = std::string(collection->_dbName); - if (! ClusterInfo::instance()->doesDatabaseExist(originalDatabase)) { + if (! ClusterInfo::instance()->doesDatabaseExist(databaseName)) { TRI_V8_EXCEPTION_PARAMETER(scope, "selected database is not a cluster database"); } - CollectionInfo const& ci = ClusterInfo::instance()->getCollection(originalDatabase, StringUtils::itoa(collection->_cid)); + CollectionInfo const& ci = ClusterInfo::instance()->getCollection(databaseName, StringUtils::itoa(collection->_cid)); return scope.Close(v8::Number::New((int) ci.type())); } // fallthru intentional @@ -7559,13 +7615,13 @@ static v8::Handle JS_CollectionsVocbase (v8::Arguments const& argv) { // if we are a coordinator, we need to fetch the collection info from the agency if (ServerState::instance()->isCoordinator()) { - char const* originalDatabase = GetCurrentDatabaseName(); + char const* databaseName = GetCurrentDatabaseName(); - if (! ClusterInfo::instance()->doesDatabaseExist(originalDatabase)) { + if (! ClusterInfo::instance()->doesDatabaseExist(databaseName)) { TRI_V8_EXCEPTION_PARAMETER(scope, "selected database is not a cluster database"); } - colls = GetCollectionsCluster(vocbase, originalDatabase); + colls = GetCollectionsCluster(vocbase, databaseName); } else { colls = TRI_CollectionsVocBase(vocbase); @@ -7620,10 +7676,10 @@ static v8::Handle JS_CompletionsVocbase (v8::Arguments const& argv) { #ifdef TRI_ENABLE_CLUSTER TRI_vector_string_t names; if (ServerState::instance()->isCoordinator()) { - char const* originalDatabase = GetCurrentDatabaseName(); + char const* databaseName = GetCurrentDatabaseName(); - if (ClusterInfo::instance()->doesDatabaseExist(originalDatabase)) { - names = GetCollectionNamesCluster(vocbase, originalDatabase); + if (ClusterInfo::instance()->doesDatabaseExist(databaseName)) { + names = GetCollectionNamesCluster(vocbase, databaseName); } else { TRI_InitVectorString(&names, TRI_UNKNOWN_MEM_ZONE); From 3887303a82cad354641fca89fa0c0c928023ca22 Mon Sep 17 00:00:00 2001 From: Max Neunhoeffer Date: Wed, 22 Jan 2014 23:57:41 +0100 Subject: [PATCH 9/9] Prepare creation of a document in the coordinator. --- arangod/RestHandler/RestDocumentHandler.cpp | 29 +++++++++++++++++++++ arangod/RestHandler/RestDocumentHandler.h | 8 ++++++ 2 files changed, 37 insertions(+) diff --git a/arangod/RestHandler/RestDocumentHandler.cpp b/arangod/RestHandler/RestDocumentHandler.cpp index 02cf7246bf..0987a15856 100644 --- a/arangod/RestHandler/RestDocumentHandler.cpp +++ b/arangod/RestHandler/RestDocumentHandler.cpp @@ -37,6 +37,12 @@ #include "VocBase/vocbase.h" #include "Utils/Barrier.h" +#ifdef TRI_ENABLE_CLUSTER +#include "Cluster/ServerState.h" +#include "Cluster/ClusterInfo.h" +#include "Cluster/ClusterComm.h" +#endif + using namespace std; using namespace triagens::basics; using namespace triagens::rest; @@ -317,6 +323,12 @@ bool RestDocumentHandler::createDocument () { return false; } +#ifdef TRI_ENABLE_CLUSTER + if (ServerState::instance()->isCoordinator()) { + return createDocumentCoordinator(collection, waitForSync, json); + } +#endif + if (! checkCreateCollection(collection, getCollectionType())) { TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json); return false; @@ -384,6 +396,23 @@ bool RestDocumentHandler::createDocument () { return true; } +//////////////////////////////////////////////////////////////////////////////// +/// @brief creates a document, coordinator case in a cluster +//////////////////////////////////////////////////////////////////////////////// + +#ifdef TRI_ENABLE_CLUSTER +bool RestDocumentHandler::createDocumentCoordinator (char const* collection, + bool waitForSync, + TRI_json_t* json) { + // Find collectionID from collection, which is the name + // ask ClusterInfo for the responsible shard + // send a synchronous request to that shard using ClusterComm + // if not successful prepare error and return false + // prepare successful answer (created or accepted depending on waitForSync) + return true; +} +#endif + //////////////////////////////////////////////////////////////////////////////// /// @brief reads a single or all documents /// diff --git a/arangod/RestHandler/RestDocumentHandler.h b/arangod/RestHandler/RestDocumentHandler.h index 79dbe53d1e..909ad3668d 100644 --- a/arangod/RestHandler/RestDocumentHandler.h +++ b/arangod/RestHandler/RestDocumentHandler.h @@ -113,6 +113,14 @@ namespace triagens { return TRI_COL_TYPE_DOCUMENT; } +//////////////////////////////////////////////////////////////////////////////// +/// @brief creates a document, coordinator case in a cluster +//////////////////////////////////////////////////////////////////////////////// + +bool createDocumentCoordinator (char const* collection, + bool waitForSync, + TRI_json_t* json); + //////////////////////////////////////////////////////////////////////////////// /// @brief creates a document ////////////////////////////////////////////////////////////////////////////////