From 7c23c9527a45af007bba09cce11a355021da738a Mon Sep 17 00:00:00 2001 From: Jan Steemann Date: Fri, 17 Jan 2014 10:53:29 +0100 Subject: [PATCH 1/2] removed nrShards attribute from Json --- arangod/V8Server/v8-vocbase.cpp | 1 - 1 file changed, 1 deletion(-) diff --git a/arangod/V8Server/v8-vocbase.cpp b/arangod/V8Server/v8-vocbase.cpp index 2bfd114952..a86dea504f 100644 --- a/arangod/V8Server/v8-vocbase.cpp +++ b/arangod/V8Server/v8-vocbase.cpp @@ -1901,7 +1901,6 @@ static v8::Handle CreateCollectionCoordinator ( TRI_Insert3ArrayJson(TRI_UNKNOWN_MEM_ZONE, json, "shardKeys", JsonHelper::stringList(TRI_UNKNOWN_MEM_ZONE, shardKeys)); TRI_Insert3ArrayJson(TRI_UNKNOWN_MEM_ZONE, json, "shards", JsonHelper::stringObject(TRI_UNKNOWN_MEM_ZONE, shards)); - TRI_Insert3ArrayJson(TRI_UNKNOWN_MEM_ZONE, json, "nrShards", TRI_CreateNumberJson(TRI_UNKNOWN_MEM_ZONE, numberOfShards)); string errorMsg; int myerrno = ci->createCollectionCoordinator( databaseName, cid, From 07c0ed3babeea4485cda52cafb5c8cf684ef9839 Mon Sep 17 00:00:00 2001 From: Jan Steemann Date: Fri, 17 Jan 2014 12:43:34 +0100 Subject: [PATCH 2/2] creation/removal of local shards --- arangod/Cluster/ApplicationCluster.cpp | 4 +- arangod/V8Server/v8-vocbase.cpp | 4 +- init-cluster.sh | 2 +- js/server/modules/org/arangodb/cluster.js | 371 ++++++++++++++++++++-- 4 files changed, 349 insertions(+), 32 deletions(-) diff --git a/arangod/Cluster/ApplicationCluster.cpp b/arangod/Cluster/ApplicationCluster.cpp index be9d0c3a2e..9c23840ae7 100644 --- a/arangod/Cluster/ApplicationCluster.cpp +++ b/arangod/Cluster/ApplicationCluster.cpp @@ -308,7 +308,9 @@ bool ApplicationCluster::open () { if (! result.successful()) { locker.unlock(); - LOG_FATAL_AND_EXIT("unable to register server in agency"); + LOG_FATAL_AND_EXIT("unable to register server in agency: http code: %d, body: %s", + (int) result.httpCode(), + result.body().c_str()); } if (role == ServerState::ROLE_COORDINATOR) { diff --git a/arangod/V8Server/v8-vocbase.cpp b/arangod/V8Server/v8-vocbase.cpp index a86dea504f..b873350e95 100644 --- a/arangod/V8Server/v8-vocbase.cpp +++ b/arangod/V8Server/v8-vocbase.cpp @@ -1890,10 +1890,12 @@ static v8::Handle CreateCollectionCoordinator ( TRI_Insert3ArrayJson(TRI_UNKNOWN_MEM_ZONE, json, "name", TRI_CreateString2CopyJson(TRI_UNKNOWN_MEM_ZONE, name.c_str(), name.size())); TRI_Insert3ArrayJson(TRI_UNKNOWN_MEM_ZONE, json, "type", TRI_CreateNumberJson(TRI_UNKNOWN_MEM_ZONE, (int) collectionType)); TRI_Insert3ArrayJson(TRI_UNKNOWN_MEM_ZONE, json, "status", TRI_CreateNumberJson(TRI_UNKNOWN_MEM_ZONE, (int) TRI_VOC_COL_STATUS_LOADED)); + TRI_Insert3ArrayJson(TRI_UNKNOWN_MEM_ZONE, json, "deleted", TRI_CreateBooleanJson(TRI_UNKNOWN_MEM_ZONE, parameter._deleted)); + TRI_Insert3ArrayJson(TRI_UNKNOWN_MEM_ZONE, json, "doCompact", TRI_CreateBooleanJson(TRI_UNKNOWN_MEM_ZONE, parameter._doCompact)); TRI_Insert3ArrayJson(TRI_UNKNOWN_MEM_ZONE, json, "isSystem", TRI_CreateBooleanJson(TRI_UNKNOWN_MEM_ZONE, parameter._isSystem)); TRI_Insert3ArrayJson(TRI_UNKNOWN_MEM_ZONE, json, "isVolatile", TRI_CreateBooleanJson(TRI_UNKNOWN_MEM_ZONE, parameter._isVolatile)); TRI_Insert3ArrayJson(TRI_UNKNOWN_MEM_ZONE, json, "waitForSync", TRI_CreateBooleanJson(TRI_UNKNOWN_MEM_ZONE, parameter._waitForSync)); - TRI_Insert3ArrayJson(TRI_UNKNOWN_MEM_ZONE, json, "maximalSize", TRI_CreateNumberJson(TRI_UNKNOWN_MEM_ZONE, parameter._maximalSize)); + TRI_Insert3ArrayJson(TRI_UNKNOWN_MEM_ZONE, json, "journalSize", TRI_CreateNumberJson(TRI_UNKNOWN_MEM_ZONE, parameter._maximalSize)); if (parameter._keyOptions != 0) { TRI_Insert3ArrayJson(TRI_UNKNOWN_MEM_ZONE, json, "keyOptions", TRI_CopyJson(TRI_UNKNOWN_MEM_ZONE, parameter._keyOptions)); diff --git a/init-cluster.sh b/init-cluster.sh index 3d8a10324c..8e46115fd3 100755 --- a/init-cluster.sh +++ b/init-cluster.sh @@ -16,7 +16,7 @@ curl --silent --dump - -L -X PUT "$ETCD/v2/keys/$NAME/Target/MapIDToEndpoint/Pav curl --silent --dump - -L -X PUT "$ETCD/v2/keys/$NAME/Target/MapIDToEndpoint/Perry" -d "value=\"tcp://127.0.0.1:8531\"" || exit 1 curl --silent --dump - -L -X PUT "$ETCD/v2/keys/$NAME/Target/MapIDToEndpoint/Claus" -d "value=\"tcp://127.0.0.1:8529\"" || exit 1 -curl --silent --dump - -L -X PUT "$ETCD/v2/keys/$NAME/Current/Collections/@Usystem/5678" -d 'value={"status":3,"shards":{"shardBlubb": "Pavel"},"shardKeys":["xyz"],"indexes":{},"name":"testCollection","type":2,"id":"5678","doCompact":true,"isSystem":false,"isVolatile":false,"waitForSync":false,"maximalSize":1048576,"keyOptions":{"type":"traditional","allowUserKeys":true}}' || exit 1 +#curl --silent --dump - -L -X PUT "$ETCD/v2/keys/$NAME/Current/Collections/@Usystem/5678" -d 'value={"status":3,"shards":{"shardBlubb": "Pavel"},"shardKeys":["xyz"],"indexes":{},"name":"testCollection","type":2,"id":"5678","doCompact":true,"isSystem":false,"isVolatile":false,"waitForSync":false,"maximalSize":1048576,"keyOptions":{"type":"traditional","allowUserKeys":true}}' || exit 1 curl --silent --dump - -L -X PUT "$ETCD/v2/keys/$NAME/Current/Databases/@Usystem/Pavel" -d 'value={"name":"system"}}' || exit 1 curl --silent --dump - -L -X PUT "$ETCD/v2/keys/$NAME/Current/Databases/@Usystem/Perry" -d 'value={"name":"system"}}' || exit 1 diff --git a/js/server/modules/org/arangodb/cluster.js b/js/server/modules/org/arangodb/cluster.js index 5dc79a4d3c..4c2364f867 100644 --- a/js/server/modules/org/arangodb/cluster.js +++ b/js/server/modules/org/arangodb/cluster.js @@ -29,9 +29,15 @@ //////////////////////////////////////////////////////////////////////////////// var console = require("console"); -var db = require("org/arangodb").db; +var arangodb = require("org/arangodb"); +var db = arangodb.db; +var ArangoCollection = arangodb.ArangoCollection; + +//////////////////////////////////////////////////////////////////////////////// +/// @brief get values from Plan or Current by a prefix +//////////////////////////////////////////////////////////////////////////////// -function getByPrefix (values, prefix) { +function getByPrefix (values, prefix, multiDimensional) { var result = { }; var a; var n = prefix.length; @@ -39,13 +45,60 @@ function getByPrefix (values, prefix) { for (a in values) { if (values.hasOwnProperty(a)) { if (a.substr(0, n) === prefix) { - result[a.substr(n)] = values[a]; + var key = a.substr(n); + + if (multiDimensional) { + var parts = key.split('/'); + if (! result.hasOwnProperty(parts[0])) { + result[parts[0]] = { }; + } + result[parts[0]][parts[1]] = values[a]; + } + else { + result[key] = values[a]; + } } } } return result; } +//////////////////////////////////////////////////////////////////////////////// +/// @brief return a shardId => server map +//////////////////////////////////////////////////////////////////////////////// + +function getShardMap (plannedCollections) { + var shardMap = { }; + + var database; + + for (database in plannedCollections) { + if (plannedCollections.hasOwnProperty(database)) { + var collections = plannedCollections[database]; + var collection; + + for (collection in collections) { + if (collections.hasOwnProperty(collection)) { + var shards = collections[collection].shards; + var shard; + + for (shard in shards) { + if (shards.hasOwnProperty(shard)) { + shardMap[shard] = shards[shard]; + } + } + } + } + } + } + + return shardMap; +} + +//////////////////////////////////////////////////////////////////////////////// +/// @brief execute an action under a write-lock +//////////////////////////////////////////////////////////////////////////////// + function writeLocked (lockInfo, cb, args) { var timeout = lockInfo.timeout; if (timeout === undefined) { @@ -70,58 +123,317 @@ function writeLocked (lockInfo, cb, args) { } } -function handleDatabaseChanges (plan, current) { - var plannedDatabases = getByPrefix(plan, "Plan/Databases/"); - // var currentDatabases = getByPrefix(current, "Current/Databases/"); - var localDatabases = db._listDatabases(); - - var createDatabase = function (payload) { - ArangoAgency.set("Current/Databases/" + payload.name + "/" + ArangoServerState.id(), payload); - }; - - var dropDatabase = function (payload) { - try { - ArangoAgency.remove("Current/Databases/" + payload.name + "/" + ArangoServerState.id()); - } - catch (err) { - } - }; +//////////////////////////////////////////////////////////////////////////////// +/// @brief return a hash with the local databases +//////////////////////////////////////////////////////////////////////////////// +function getLocalDatabases () { + var result = { }; + + db._listDatabases().forEach(function (database) { + result[database] = { name: database }; + }); + + return result; +} + +//////////////////////////////////////////////////////////////////////////////// +/// @brief return a hash with the local collections +//////////////////////////////////////////////////////////////////////////////// + +function getLocalCollections () { + var result = { }; + + db._collections().forEach(function (collection) { + var name = collection.name(); + + if (name.substr(0, 1) !== '_') { + result[name] = { + name: name, + type: collection.type(), + status: collection.status() + }; + + // merge properties + var properties = collection.properties(); + var p; + for (p in properties) { + if (properties.hasOwnProperty(p)) { + result[name][p] = properties[p]; + } + } + } + }); + + return result; +} + +//////////////////////////////////////////////////////////////////////////////// +/// @brief create databases if they exist in the plan but not locally +//////////////////////////////////////////////////////////////////////////////// + +function createLocalDatabases (plannedDatabases) { + var ourselves = ArangoServerState.id(); + + var createDatabaseAgency = function (payload) { + ArangoAgency.set("Current/Databases/" + payload.name + "/" + ourselves, + payload); + }; + + var localDatabases = getLocalDatabases(); var name; // check which databases need to be created locally for (name in plannedDatabases) { if (plannedDatabases.hasOwnProperty(name)) { - if (localDatabases.indexOf(name) === -1) { + if (! localDatabases.hasOwnProperty(name)) { // must create database var payload = plannedDatabases[name]; // TODO: handle options and user information - console.info("creating local database '%s'", payload.name); db._createDatabase(payload.name); - writeLocked({ part: "Current" }, createDatabase, [ payload ]); + writeLocked({ part: "Current" }, + createDatabaseAgency, + [ payload ]); } } } +} + +//////////////////////////////////////////////////////////////////////////////// +/// @brief drop databases if they do exist locally but not in the plan +//////////////////////////////////////////////////////////////////////////////// + +function dropLocalDatabases (plannedDatabases) { + var ourselves = ArangoServerState.id(); + + var dropDatabaseAgency = function (payload) { + try { + ArangoAgency.remove("Current/Databases/" + payload.name + "/" + ourselves); + } + catch (err) { + // ignore errors + } + }; + + var localDatabases = getLocalDatabases(); + var name; // check which databases need to be deleted locally - localDatabases.forEach (function (name) { - if (! plannedDatabases.hasOwnProperty(name)) { - // must drop database + for (name in localDatabases) { + if (localDatabases.hasOwnProperty(name)) { + if (! plannedDatabases.hasOwnProperty(name)) { + // must drop database - console.info("dropping local database '%s'", name); - db._dropDatabase(name); + console.info("dropping local database '%s'", name); + db._dropDatabase(name); - writeLocked({ part: "Current" }, dropDatabase, [ { name: name } ]); + writeLocked({ part: "Current" }, + dropDatabaseAgency, + [ { name: name } ]); + } } - }); + } } +//////////////////////////////////////////////////////////////////////////////// +/// @brief handle database changes +//////////////////////////////////////////////////////////////////////////////// + +function handleDatabaseChanges (plan, current) { + var plannedDatabases = getByPrefix(plan, "Plan/Databases/"); + + db._useDatabase("_system"); + createLocalDatabases(plannedDatabases); + dropLocalDatabases(plannedDatabases); +} + +//////////////////////////////////////////////////////////////////////////////// +/// @brief create collections if they exist in the plan but not locally +//////////////////////////////////////////////////////////////////////////////// + +function createLocalCollections (plannedCollections) { + var ourselves = ArangoServerState.id(); + + var createCollectionAgency = function (database, payload) { + ArangoAgency.set("Current/Collections/" + database + "/" + payload.name + "/" + ourselves, + payload); + }; + + var localDatabases = getLocalDatabases(); + var database; + + // iterate over all matching databases + for (database in plannedCollections) { + if (plannedCollections.hasOwnProperty(database)) { + if (localDatabases.hasOwnProperty(database)) { + // save old database name + var previousDatabase = db._name(); + // switch into other database + db._useDatabase(database); + + try { + // iterate over collections of database + var localCollections = getLocalCollections(); + + var collections = plannedCollections[database]; + var collection; + + // diff the collections + for (collection in collections) { + if (collections.hasOwnProperty(collection)) { + var payload = collections[collection]; + var shards = payload.shards; + var shard; + + for (shard in shards) { + if (shards.hasOwnProperty(shard)) { + if (shards[shard] === ourselves) { + // found a shard we are responsible for + + if (! localCollections.hasOwnProperty(shard)) { + // must create this shard + console.info("creating local shard '%s/%s'", database, shard); + + if (payload.type === ArangoCollection.TYPE_EDGE) { + db._createEdgeCollection(shard, payload); + } + else { + db._create(shard, payload); + } + + writeLocked({ part: "Current" }, + createCollectionAgency, + [ database, payload ]); + } + else { + // collection exists, now compare collection properties + var properties = { }; + var cmp = [ "journalSize", "waitForSync", "doCompact" ], i; + for (i = 0; i < cmp.length; ++i) { + var p = cmp[i]; + if (localCollections[shard][p] !== payload[p]) { + // property change + properties[p] = payload[p]; + } + } + + if (Object.keys(properties).length > 0) { + console.info("updating properties for local shard '%s/%s'", + database, + shard); + db._collection(shard).properties(properties); + + writeLocked({ part: "Current" }, + createCollectionAgency, + [ database, payload ]); + } + } + } + } + } + } + } + } + catch (err) { + // always return to previous database + db._useDatabase(previousDatabase); + throw err; + } + + } + } + } +} + +//////////////////////////////////////////////////////////////////////////////// +/// @brief drop collections if they exist locally but not in the plan +//////////////////////////////////////////////////////////////////////////////// + +function dropLocalCollections (plannedCollections) { + var ourselves = ArangoServerState.id(); + + var dropCollectionAgency = function (database, name) { + try { + ArangoAgency.remove("Current/Collections/" + database + "/" + name + "/" + ourselves); + } + catch (err) { + // ignore errors + } + }; + + var shardMap = getShardMap(plannedCollections); + + var localDatabases = getLocalDatabases(); + var database; + + // iterate over all databases + for (database in localDatabases) { + if (localDatabases.hasOwnProperty(database)) { + var removeAll = ! plannedCollections.hasOwnProperty(database); + + // save old database name + var previousDatabase = db._name(); + // switch into other database + db._useDatabase(database); + + try { + // iterate over collections of database + var collections = getLocalCollections(); + var collection; + + for (collection in collections) { + if (collections.hasOwnProperty(collection)) { + // found a local collection + // check if it is in the plan and we are responsible for it + + var remove = removeAll || + (! shardMap.hasOwnProperty(collection)) || + (shardMap[collection] !== ourselves); + + if (remove) { + console.info("dropping local shard '%s/%s'", database, collection); + db._drop(collection); + + writeLocked({ part: "Current" }, + dropCollectionAgency, + [ database, collection ]); + } + } + } + } + catch (err) { + db._useDatabase("_system"); + throw err; + } + } + } +} + +//////////////////////////////////////////////////////////////////////////////// +/// @brief handle collection changes +//////////////////////////////////////////////////////////////////////////////// + +function handleCollectionChanges (plan, current) { + var plannedCollections = getByPrefix(plan, "Plan/Collections/", true); + + db._useDatabase("_system"); + createLocalCollections(plannedCollections); + + db._useDatabase("_system"); + dropLocalCollections(plannedCollections); +} + +//////////////////////////////////////////////////////////////////////////////// +/// @brief change handling trampoline function +//////////////////////////////////////////////////////////////////////////////// + function handleChanges (plan, current) { handleDatabaseChanges(plan, current); + handleCollectionChanges(plan, current); } @@ -175,6 +487,7 @@ var handlePlanChange = function () { console.info("plan change handling successful"); } catch (err) { + require("internal").print(err); console.error("plan change handling failed"); } };