From dca42efb2e7d7a49ec60afd70e6d751f26ebcc47 Mon Sep 17 00:00:00 2001 From: Andreas Streichardt Date: Thu, 17 Mar 2016 14:48:33 +0000 Subject: [PATCH 1/6] Rework cluster state handling --- arangod/Cluster/ApplicationCluster.cpp | 94 +++++++------------------- arangod/Cluster/ServerState.cpp | 92 +++++++++++++++++++++++-- arangod/Cluster/ServerState.h | 7 +- 3 files changed, 113 insertions(+), 80 deletions(-) diff --git a/arangod/Cluster/ApplicationCluster.cpp b/arangod/Cluster/ApplicationCluster.cpp index 1dea185ef1..5d45d1f753 100644 --- a/arangod/Cluster/ApplicationCluster.cpp +++ b/arangod/Cluster/ApplicationCluster.cpp @@ -355,17 +355,16 @@ bool ApplicationCluster::open() { if (!enabled()) { return true; } - - ServerState::RoleEnum role = ServerState::instance()->getRole(); - - // tell the agency that we are ready - { - AgencyComm comm; - AgencyCommResult result; - + + AgencyComm comm; + AgencyCommResult result; + + bool success; + do { AgencyCommLocker locker("Current", "WRITE"); - - if (locker.successful()) { + + success = locker.successful(); + if (success) { VPackBuilder builder; try { VPackObjectBuilder b(&builder); @@ -382,69 +381,22 @@ bool ApplicationCluster::open() { locker.unlock(); LOG(FATAL) << "unable to register server in agency: http code: " << result.httpCode() << ", body: " << result.body(); FATAL_ERROR_EXIT(); } - - if (role == ServerState::ROLE_COORDINATOR) { - VPackBuilder builder; - try { - builder.add(VPackValue("none")); - } catch (...) { - locker.unlock(); - LOG(FATAL) << "out of memory"; FATAL_ERROR_EXIT(); - } - - ServerState::instance()->setState(ServerState::STATE_SERVING); - - // register coordinator - AgencyCommResult result = - comm.setValue("Current/Coordinators/" + _myId, builder.slice(), 0.0); - - if (!result.successful()) { - locker.unlock(); - LOG(FATAL) << "unable to register coordinator in agency"; FATAL_ERROR_EXIT(); - } - } else if (role == ServerState::ROLE_PRIMARY) { - VPackBuilder builder; - try { - builder.add(VPackValue("none")); - } catch (...) { - locker.unlock(); - LOG(FATAL) << "out of memory"; FATAL_ERROR_EXIT(); - } - - ServerState::instance()->setState(ServerState::STATE_SERVINGASYNC); - - // register server - AgencyCommResult result = - comm.setValue("Current/DBServers/" + _myId, builder.slice(), 0.0); - - if (!result.successful()) { - locker.unlock(); - LOG(FATAL) << "unable to register db server in agency"; FATAL_ERROR_EXIT(); - } - } else if (role == ServerState::ROLE_SECONDARY) { - std::string keyName = std::string("\"") + _myId + std::string("\""); - VPackBuilder builder; - try { - builder.add(VPackValue(keyName)); - } catch (...) { - locker.unlock(); - LOG(FATAL) << "out of memory"; FATAL_ERROR_EXIT(); - } - - ServerState::instance()->setState(ServerState::STATE_SYNCING); - - // register server - AgencyCommResult result = comm.setValue( - "Current/DBServers/" + ServerState::instance()->getPrimaryId(), - builder.slice(), 0.0); - - if (!result.successful()) { - locker.unlock(); - LOG(FATAL) << "unable to register secondary db server in agency"; FATAL_ERROR_EXIT(); - } + + if (success) { + break; } - } + sleep(1); + } while (true); + ServerState::RoleEnum role = ServerState::instance()->getRole(); + + if (role == ServerState::ROLE_COORDINATOR) { + ServerState::instance()->setState(ServerState::STATE_SERVING); + } else if (role == ServerState::ROLE_PRIMARY) { + ServerState::instance()->setState(ServerState::STATE_SERVINGASYNC); + } else if (role == ServerState::ROLE_SECONDARY) { + ServerState::instance()->setState(ServerState::STATE_SYNCING); + } return true; } diff --git a/arangod/Cluster/ServerState.cpp b/arangod/Cluster/ServerState.cpp index 394d1ea9fb..8a86b4b6ca 100644 --- a/arangod/Cluster/ServerState.cpp +++ b/arangod/Cluster/ServerState.cpp @@ -37,6 +37,12 @@ using namespace arangodb::basics; /// running //////////////////////////////////////////////////////////////////////////////// +static bool isClusterRole(ServerState::RoleEnum role) { + return (role == ServerState::ROLE_PRIMARY || + role == ServerState::ROLE_SECONDARY || + role == ServerState::ROLE_COORDINATOR); +} + static ServerState Instance; ServerState::ServerState() @@ -227,10 +233,8 @@ bool ServerState::isDBServer(ServerState::RoleEnum role) { bool ServerState::isRunningInCluster() { auto role = loadRole(); - - return (role == ServerState::ROLE_PRIMARY || - role == ServerState::ROLE_SECONDARY || - role == ServerState::ROLE_COORDINATOR); + + return isClusterRole(role); } //////////////////////////////////////////////////////////////////////////////// @@ -1056,3 +1060,83 @@ ServerState::RoleEnum ServerState::checkServersList(std::string const& id) { return role; } + +////////////////////////////////////////////////////////////////////////////// +/// @brief store the server role +////////////////////////////////////////////////////////////////////////////// +void ServerState::storeRole(RoleEnum role) +{ + _role.store(role, std::memory_order_release); + + if (!isClusterRole(role)) { + return; + } + AgencyComm comm; + AgencyCommResult result; + std::unique_ptr locker; + + do { + locker.reset(new AgencyCommLocker("Current", "WRITE")); + if (locker->successful()) { + break; + } + sleep(1); + LOG(DEBUG) << "Couldn't aquire lock on Current. Retrying"; + } while (true); + + if (role == ServerState::ROLE_COORDINATOR) { + VPackBuilder builder; + try { + builder.add(VPackValue("none")); + } catch (...) { + locker->unlock(); + LOG(FATAL) << "out of memory"; FATAL_ERROR_EXIT(); + } + + + // register coordinator + AgencyCommResult result = + comm.setValue("Current/Coordinators/" + _id, builder.slice(), 0.0); + + if (!result.successful()) { + locker->unlock(); + LOG(FATAL) << "unable to register coordinator in agency"; FATAL_ERROR_EXIT(); + } + } else if (role == ServerState::ROLE_PRIMARY) { + VPackBuilder builder; + try { + builder.add(VPackValue("none")); + } catch (...) { + locker->unlock(); + LOG(FATAL) << "out of memory"; FATAL_ERROR_EXIT(); + } + + // register server + AgencyCommResult result = + comm.setValue("Current/DBServers/" + _id, builder.slice(), 0.0); + + if (!result.successful()) { + locker->unlock(); + LOG(FATAL) << "unable to register db server in agency"; FATAL_ERROR_EXIT(); + } + } else if (role == ServerState::ROLE_SECONDARY) { + std::string keyName = _id; + VPackBuilder builder; + try { + builder.add(VPackValue(keyName)); + } catch (...) { + locker->unlock(); + LOG(FATAL) << "out of memory"; FATAL_ERROR_EXIT(); + } + + // register server + AgencyCommResult result = comm.setValue( + "Current/DBServers/" + ServerState::instance()->getPrimaryId(), + builder.slice(), 0.0); + + if (!result.successful()) { + locker->unlock(); + LOG(FATAL) << "unable to register secondary db server in agency"; FATAL_ERROR_EXIT(); + } + } +} diff --git a/arangod/Cluster/ServerState.h b/arangod/Cluster/ServerState.h index 3570fe34b4..18effd70a6 100644 --- a/arangod/Cluster/ServerState.h +++ b/arangod/Cluster/ServerState.h @@ -376,12 +376,9 @@ class ServerState { } ////////////////////////////////////////////////////////////////////////////// - /// @brief atomically stores the server role + /// @brief store the server role ////////////////////////////////////////////////////////////////////////////// - - void storeRole(RoleEnum role) { - _role.store(role, std::memory_order_release); - } + void storeRole(RoleEnum role); ////////////////////////////////////////////////////////////////////////////// /// @brief determine the server role From ddea85ff44751ce429b685e0242c6433e1a540d2 Mon Sep 17 00:00:00 2001 From: Wilfried Goesgens Date: Thu, 17 Mar 2016 23:32:25 +0100 Subject: [PATCH 2/6] RestEndpointServer is unavailable on windows. --- lib/Rest/Endpoint.cpp | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/lib/Rest/Endpoint.cpp b/lib/Rest/Endpoint.cpp index f583162dc0..793b90a327 100644 --- a/lib/Rest/Endpoint.cpp +++ b/lib/Rest/Endpoint.cpp @@ -219,7 +219,11 @@ Endpoint* Endpoint::factory(const Endpoint::EndpointType type, return nullptr; } +#ifndef _WIN32 return new EndpointSrv(specification.substr(6)); +#else + return nullptr; +#endif } else if (!StringUtils::isPrefix(domainType, "tcp://")) { From 90862b60816de944b62aa9710df2aa9db05da7f0 Mon Sep 17 00:00:00 2001 From: Andreas Streichardt Date: Thu, 17 Mar 2016 22:39:15 +0000 Subject: [PATCH 3/6] Proper secondary => Primary failover --- arangod/Cluster/ServerState.cpp | 181 +++++++------ arangod/Cluster/ServerState.h | 7 +- js/actions/api-cluster.js | 61 ++--- js/server/modules/@arangodb/cluster.js | 344 +++++++++++++------------ 4 files changed, 307 insertions(+), 286 deletions(-) diff --git a/arangod/Cluster/ServerState.cpp b/arangod/Cluster/ServerState.cpp index 8a86b4b6ca..91204f30a4 100644 --- a/arangod/Cluster/ServerState.cpp +++ b/arangod/Cluster/ServerState.cpp @@ -178,10 +178,25 @@ void ServerState::setAuthentication(std::string const& username, std::string ServerState::getAuthentication() { return _authentication; } +//////////////////////////////////////////////////////////////////////////////// +/// @brief find and set our role +//////////////////////////////////////////////////////////////////////////////// +void ServerState::findAndSetRoleBlocking() { + while (true) { + auto role = determineRole(_localInfo, _id); + std::string roleString = roleToString(role); + LOG(DEBUG) << "Found my role: " << roleString; + + if (storeRole(role)) { + break; + } + sleep(1); + } +} + //////////////////////////////////////////////////////////////////////////////// /// @brief flush the server state (used for testing) //////////////////////////////////////////////////////////////////////////////// - void ServerState::flush() { { WRITE_LOCKER(writeLocker, _lock); @@ -192,8 +207,8 @@ void ServerState::flush() { _address = ClusterInfo::instance()->getTargetServerEndpoint(_id); } - - storeRole(determineRole(_localInfo, _id)); + + findAndSetRoleBlocking(); } //////////////////////////////////////////////////////////////////////////////// @@ -276,15 +291,8 @@ ServerState::RoleEnum ServerState::getRole() { LOG(DEBUG) << "Have stored " << builder.slice().toJson() << " under Current/NewServers/" << _localInfo << " in agency."; } - // role not yet set - role = determineRole(info, id); - std::string roleString = roleToString(role); - - LOG(DEBUG) << "Found my role: " << roleString; - - storeRole(role); - - return role; + findAndSetRoleBlocking(); + return loadRole(); } //////////////////////////////////////////////////////////////////////////////// @@ -355,8 +363,8 @@ bool ServerState::registerWithRole(ServerState::RoleEnum role) { } _id = id; - storeRole(role); - + + findAndSetRoleBlocking(); LOG(DEBUG) << "We successfully announced ourselves as " << roleToString(role) << " and our id is " << id; return true; @@ -783,7 +791,9 @@ bool ServerState::redetermineRole() { RoleEnum oldRole = loadRole(); if (role != oldRole) { LOG(INFO) << "Changed role to: " << roleString; - storeRole(role); + if (!storeRole(role)) { + return false; + } return true; } if (_idOfPrimary != saveIdOfPrimary) { @@ -810,21 +820,12 @@ ServerState::RoleEnum ServerState::determineRole(std::string const& info, LOG(DEBUG) << "Learned my own Id: " << id; setId(id); } - - ServerState::RoleEnum role = checkServersList(id); - ServerState::RoleEnum role2 = checkCoordinatorsList(id); - + + ServerState::RoleEnum role = checkCoordinatorsList(id); if (role == ServerState::ROLE_UNDEFINED) { - // role is still unknown. check if we are a coordinator - role = role2; - } else { - // we are a primary or a secondary. - // now we double-check that we are not a coordinator as well - if (role2 != ServerState::ROLE_UNDEFINED) { - role = ServerState::ROLE_UNDEFINED; - } + role = checkServersList(id); } - + // mop: role might still be undefined return role; } @@ -1064,79 +1065,77 @@ ServerState::RoleEnum ServerState::checkServersList(std::string const& id) { ////////////////////////////////////////////////////////////////////////////// /// @brief store the server role ////////////////////////////////////////////////////////////////////////////// -void ServerState::storeRole(RoleEnum role) +bool ServerState::storeRole(RoleEnum role) { - _role.store(role, std::memory_order_release); + if (isClusterRole(role)) { + AgencyComm comm; + AgencyCommResult result; + std::unique_ptr locker; - if (!isClusterRole(role)) { - return; - } - AgencyComm comm; - AgencyCommResult result; - std::unique_ptr locker; - - do { locker.reset(new AgencyCommLocker("Current", "WRITE")); - if (locker->successful()) { - break; - } - sleep(1); - LOG(DEBUG) << "Couldn't aquire lock on Current. Retrying"; - } while (true); - - if (role == ServerState::ROLE_COORDINATOR) { - VPackBuilder builder; - try { - builder.add(VPackValue("none")); - } catch (...) { - locker->unlock(); - LOG(FATAL) << "out of memory"; FATAL_ERROR_EXIT(); + if (!locker->successful()) { + return false; } + if (role == ServerState::ROLE_COORDINATOR) { + VPackBuilder builder; + try { + builder.add(VPackValue("none")); + } catch (...) { + locker->unlock(); + LOG(FATAL) << "out of memory"; FATAL_ERROR_EXIT(); + } - // register coordinator - AgencyCommResult result = - comm.setValue("Current/Coordinators/" + _id, builder.slice(), 0.0); + // register coordinator + AgencyCommResult result = + comm.setValue("Current/Coordinators/" + _id, builder.slice(), 0.0); - if (!result.successful()) { - locker->unlock(); - LOG(FATAL) << "unable to register coordinator in agency"; FATAL_ERROR_EXIT(); - } - } else if (role == ServerState::ROLE_PRIMARY) { - VPackBuilder builder; - try { - builder.add(VPackValue("none")); - } catch (...) { - locker->unlock(); - LOG(FATAL) << "out of memory"; FATAL_ERROR_EXIT(); - } + if (!result.successful()) { + locker->unlock(); + LOG(FATAL) << "unable to register coordinator in agency"; FATAL_ERROR_EXIT(); + } + } else if (role == ServerState::ROLE_PRIMARY) { + VPackBuilder builder; + try { + builder.add(VPackValue("none")); + } catch (...) { + locker->unlock(); + LOG(FATAL) << "out of memory"; FATAL_ERROR_EXIT(); + } - // register server - AgencyCommResult result = - comm.setValue("Current/DBServers/" + _id, builder.slice(), 0.0); + // register server + AgencyCommResult result = + comm.setValue("Current/DBServers/" + _id, builder.slice(), 0.0); - if (!result.successful()) { - locker->unlock(); - LOG(FATAL) << "unable to register db server in agency"; FATAL_ERROR_EXIT(); - } - } else if (role == ServerState::ROLE_SECONDARY) { - std::string keyName = _id; - VPackBuilder builder; - try { - builder.add(VPackValue(keyName)); - } catch (...) { - locker->unlock(); - LOG(FATAL) << "out of memory"; FATAL_ERROR_EXIT(); - } + if (!result.successful()) { + locker->unlock(); + LOG(FATAL) << "unable to register db server in agency"; FATAL_ERROR_EXIT(); + } + } else if (role == ServerState::ROLE_SECONDARY) { + std::string keyName = _id; + VPackBuilder builder; + try { + builder.add(VPackValue(keyName)); + } catch (...) { + locker->unlock(); + LOG(FATAL) << "out of memory"; FATAL_ERROR_EXIT(); + } + + // register server + AgencyCommResult result = comm.casValue( + "Current/DBServers/" + ServerState::instance()->getPrimaryId(), + builder.slice(), + true, + 0.0, + 0.0); - // register server - AgencyCommResult result = comm.setValue( - "Current/DBServers/" + ServerState::instance()->getPrimaryId(), - builder.slice(), 0.0); - - if (!result.successful()) { - locker->unlock(); - LOG(FATAL) << "unable to register secondary db server in agency"; FATAL_ERROR_EXIT(); + if (!result.successful()) { + locker->unlock(); + // mop: fail gracefully (allow retry) + return false; + } } } + _role.store(role, std::memory_order_release); + return true; } diff --git a/arangod/Cluster/ServerState.h b/arangod/Cluster/ServerState.h index 18effd70a6..e289c1f95d 100644 --- a/arangod/Cluster/ServerState.h +++ b/arangod/Cluster/ServerState.h @@ -374,11 +374,16 @@ class ServerState { RoleEnum loadRole() { return static_cast(_role.load(std::memory_order_consume)); } + + ////////////////////////////////////////////////////////////////////////////// + /// @brief determine role and save role blocking + ////////////////////////////////////////////////////////////////////////////// + void findAndSetRoleBlocking(); ////////////////////////////////////////////////////////////////////////////// /// @brief store the server role ////////////////////////////////////////////////////////////////////////////// - void storeRole(RoleEnum role); + bool storeRole(RoleEnum role); ////////////////////////////////////////////////////////////////////////////// /// @brief determine the server role diff --git a/js/actions/api-cluster.js b/js/actions/api-cluster.js index 590c3806b9..03e5324c07 100644 --- a/js/actions/api-cluster.js +++ b/js/actions/api-cluster.js @@ -34,6 +34,7 @@ var actions = require("@arangodb/actions"); var cluster = require("@arangodb/cluster"); var internal = require("internal"); +var _ = require("lodash"); //////////////////////////////////////////////////////////////////////////////// @@ -921,42 +922,34 @@ actions.defineHttp({ function changeAllShardReponsibilities (oldServer, newServer) { // This is only called when we have the write lock and we "only" have to // make sure that either all or none of the shards are moved. - var l = ArangoAgency.get("Plan/Collections", true, false); - var ll = Object.keys(l); - - var i = 0; - var c; - var oldShards = []; - var shards; - var names; - var j; + var collections = ArangoAgency.get("Plan/Collections", true, false); + var done = {}; try { - while (i < ll.length) { - c = l[ll[i]]; // A collection entry - shards = c.shards; - names = Object.keys(shards); - // Poor man's deep copy: - oldShards.push(JSON.parse(JSON.stringify(shards))); - for (j = 0; j < names.length; j++) { - if (shards[names[j]] === oldServer) { - shards[names[j]] = newServer; + Object.keys(collections).forEach(function(collectionKey) { + var collection = collections[collectionKey]; + var old = _.cloneDeep(collection); + + Object.keys(collection.shards).forEach(function(shardKey) { + var servers = collection.shards[shardKey]; + collection.shards[shardKey] = servers.map(function(server) { + if (server == oldServer) { + return newServer; + } else { + return server; } - } - ArangoAgency.set(ll[i], c, 0); - i += 1; - } - } - catch (e) { - i -= 1; - while (i >= 0) { - c = l[ll[i]]; - c.shards = oldShards[i]; - try { - ArangoAgency.set(ll[i], c, 0); - } - catch (e2) { - } - i -= 1; + }); + }); + ArangoAgency.set(collectionKey, collection, 0); + done[collectionKey] = old; + }); + } catch (e) { + // mop: rollback + try { + Object.keys(done).forEach(function(collectionKey) { + ArangoAgency.set(collectionKey, done[collectionKey], 0); + }); + } catch (e2) { + console.error("Got error during rolback", e2); } throw e; } diff --git a/js/server/modules/@arangodb/cluster.js b/js/server/modules/@arangodb/cluster.js index d3d5c49f1c..8c8f205023 100644 --- a/js/server/modules/@arangodb/cluster.js +++ b/js/server/modules/@arangodb/cluster.js @@ -541,9 +541,9 @@ function handleDatabaseChanges (plan) { /// @brief create collections if they exist in the plan but not locally //////////////////////////////////////////////////////////////////////////////// -function createLocalCollections (plannedCollections, planVersion) { +function createLocalCollections (plannedCollections, planVersion, takeOverResponsibility) { var ourselves = global.ArangoServerState.id(); - + var createCollectionAgency = function (database, shard, collInfo, error) { var payload = { error: error.error, errorNum: error.errorNum, @@ -551,10 +551,15 @@ function createLocalCollections (plannedCollections, planVersion) { indexes: collInfo.indexes, servers: [ ourselves ], planVersion: planVersion }; + global.ArangoAgency.set("Current/Collections/" + database + "/" + collInfo.planId + "/" + shard, payload); }; + + // mop: just a function alias but this way one at least knows what it is supposed to do :S + var takeOver = createCollectionAgency; + var db = require("internal").db; db._useDatabase("_system"); @@ -579,198 +584,214 @@ function createLocalCollections (plannedCollections, planVersion) { var collection; // diff the collections - for (collection in collections) { - if (collections.hasOwnProperty(collection)) { - var collInfo = collections[collection]; - var shards = collInfo.shards; - var shard; + Object.keys(collections).forEach(function(collection) { + var collInfo = collections[collection]; + var shards = collInfo.shards; + var shard; - collInfo.planId = collInfo.id; - var save = [collInfo.id, collInfo.name]; - delete collInfo.id; // must not actually set it here - delete collInfo.name; // name is now shard + collInfo.planId = collInfo.id; + var save = [collInfo.id, collInfo.name]; + delete collInfo.id; // must not actually set it here + delete collInfo.name; // name is now shard - for (shard in shards) { - if (shards.hasOwnProperty(shard)) { - if (shards[shard][0] === ourselves) { - // found a shard we are responsible for + for (shard in shards) { + if (shards.hasOwnProperty(shard)) { + var didWrite = false; + if (shards[shard][0] === ourselves) { + // found a shard we are responsible for - var error = { error: false, errorNum: 0, - errorMessage: "no error" }; + var error = { error: false, errorNum: 0, + errorMessage: "no error" }; - if (! localCollections.hasOwnProperty(shard)) { - // must create this shard - console.info("creating local shard '%s/%s' for central '%s/%s'", - database, - shard, - database, - collInfo.planId); + if (! localCollections.hasOwnProperty(shard)) { + // must create this shard + console.info("creating local shard '%s/%s' for central '%s/%s'", + database, + shard, + database, + collInfo.planId); - try { - if (collInfo.type === ArangoCollection.TYPE_EDGE) { - db._createEdgeCollection(shard, collInfo); - } - else { - db._create(shard, collInfo); - } + try { + if (collInfo.type === ArangoCollection.TYPE_EDGE) { + db._createEdgeCollection(shard, collInfo); } - catch (err2) { - error = { error: true, errorNum: err2.errorNum, - errorMessage: err2.errorMessage }; - console.error("creating local shard '%s/%s' for central '%s/%s' failed: %s", - database, - shard, - database, - collInfo.planId, - JSON.stringify(err2)); + else { + db._create(shard, collInfo); } + } + catch (err2) { + error = { error: true, errorNum: err2.errorNum, + errorMessage: err2.errorMessage }; + console.error("creating local shard '%s/%s' for central '%s/%s' failed: %s", + database, + shard, + database, + collInfo.planId, + JSON.stringify(err2)); + } + writeLocked({ part: "Current" }, + createCollectionAgency, + [ database, shard, collInfo, error ]); + didWrite = true; + } + else { + if (localCollections[shard].status !== collInfo.status) { + console.info("detected status change for local shard '%s/%s'", + database, + shard); + + if (collInfo.status === ArangoCollection.STATUS_UNLOADED) { + console.info("unloading local shard '%s/%s'", + database, + shard); + db._collection(shard).unload(); + } + else if (collInfo.status === ArangoCollection.STATUS_LOADED) { + console.info("loading local shard '%s/%s'", + database, + shard); + db._collection(shard).load(); + } writeLocked({ part: "Current" }, createCollectionAgency, [ database, shard, collInfo, error ]); + didWrite = true; } - else { - if (localCollections[shard].status !== collInfo.status) { - console.info("detected status change for local shard '%s/%s'", - database, - shard); - if (collInfo.status === ArangoCollection.STATUS_UNLOADED) { - console.info("unloading local shard '%s/%s'", - database, - shard); - db._collection(shard).unload(); - } - else if (collInfo.status === ArangoCollection.STATUS_LOADED) { - console.info("loading local shard '%s/%s'", - database, - shard); - db._collection(shard).load(); - } - writeLocked({ part: "Current" }, - createCollectionAgency, - [ database, shard, collInfo, error ]); + // collection exists, now compare collection properties + var properties = { }; + var cmp = [ "journalSize", "waitForSync", "doCompact", + "indexBuckets" ]; + for (i = 0; i < cmp.length; ++i) { + var p = cmp[i]; + if (localCollections[shard][p] !== collInfo[p]) { + // property change + properties[p] = collInfo[p]; } + } - // collection exists, now compare collection properties - var properties = { }; - var cmp = [ "journalSize", "waitForSync", "doCompact", - "indexBuckets" ]; - for (i = 0; i < cmp.length; ++i) { - var p = cmp[i]; - if (localCollections[shard][p] !== collInfo[p]) { - // property change - properties[p] = collInfo[p]; - } + if (Object.keys(properties).length > 0) { + console.info("updating properties for local shard '%s/%s'", + database, + shard); + + try { + db._collection(shard).properties(properties); } + catch (err3) { + error = { error: true, errorNum: err3.errorNum, + errorMessage: err3.errorMessage }; + } + writeLocked({ part: "Current" }, + createCollectionAgency, + [ database, shard, collInfo, error ]); + didWrite = true; + } + } - if (Object.keys(properties).length > 0) { - console.info("updating properties for local shard '%s/%s'", + if (error.error) { + if (takeOverResponsibility && !didWrite) { + writeLocked({ part: "Current" }, + takeOver, + [ database, shard, collInfo, error ]); + } + continue; // No point to look for properties and + // indices, if the creation has not worked + } + + var indexes = getIndexMap(shard); + var idx; + var index; + + if (collInfo.hasOwnProperty("indexes")) { + for (i = 0; i < collInfo.indexes.length; ++i) { + index = collInfo.indexes[i]; + + var changed = false; + + if (index.type !== "primary" && index.type !== "edge" && + ! indexes.hasOwnProperty(index.id)) { + console.info("creating index '%s/%s': %s", database, - shard); + shard, + JSON.stringify(index)); try { - db._collection(shard).properties(properties); + arangodb.db._collection(shard).ensureIndex(index); + index.error = false; + index.errorNum = 0; + index.errorMessage = ""; } - catch (err3) { - error = { error: true, errorNum: err3.errorNum, - errorMessage: err3.errorMessage }; + catch (err5) { + index.error = true; + index.errorNum = err5.errorNum; + index.errorMessage = err5.errorMessage; } + + changed = true; + } + if (changed) { writeLocked({ part: "Current" }, createCollectionAgency, [ database, shard, collInfo, error ]); + didWrite = true; } } - if (error.error) { - continue; // No point to look for properties and - // indices, if the creation has not worked - } + var changed2 = false; + for (idx in indexes) { + if (indexes.hasOwnProperty(idx)) { + // found an index in the index map, check if it must be deleted - var indexes = getIndexMap(shard); - var idx; - var index; - - if (collInfo.hasOwnProperty("indexes")) { - for (i = 0; i < collInfo.indexes.length; ++i) { - index = collInfo.indexes[i]; - - var changed = false; - - if (index.type !== "primary" && index.type !== "edge" && - ! indexes.hasOwnProperty(index.id)) { - console.info("creating index '%s/%s': %s", - database, - shard, - JSON.stringify(index)); - - try { - arangodb.db._collection(shard).ensureIndex(index); - index.error = false; - index.errorNum = 0; - index.errorMessage = ""; - } - catch (err5) { - index.error = true; - index.errorNum = err5.errorNum; - index.errorMessage = err5.errorMessage; - } - - changed = true; - } - if (changed) { - writeLocked({ part: "Current" }, - createCollectionAgency, - [ database, shard, collInfo, error ]); - } - } - - var changed2 = false; - for (idx in indexes) { - if (indexes.hasOwnProperty(idx)) { - // found an index in the index map, check if it must be deleted - - if (indexes[idx].type !== "primary" && indexes[idx].type !== "edge") { - var found = false; - for (i = 0; i < collInfo.indexes.length; ++i) { - if (collInfo.indexes[i].id === idx) { - found = true; - break; - } - } - - if (! found) { - // found an index to delete locally - changed2 = true; - index = indexes[idx]; - - console.info("dropping index '%s/%s': %s", - database, - shard, - JSON.stringify(index)); - - arangodb.db._collection(shard).dropIndex(index); - - delete indexes[idx]; - collInfo.indexes.splice(i, i); + if (indexes[idx].type !== "primary" && indexes[idx].type !== "edge") { + var found = false; + for (i = 0; i < collInfo.indexes.length; ++i) { + if (collInfo.indexes[i].id === idx) { + found = true; + break; } } + + if (! found) { + // found an index to delete locally + changed2 = true; + index = indexes[idx]; + + console.info("dropping index '%s/%s': %s", + database, + shard, + JSON.stringify(index)); + + arangodb.db._collection(shard).dropIndex(index); + + delete indexes[idx]; + collInfo.indexes.splice(i, i); + } } } - if (changed2) { - writeLocked({ part: "Current" }, - createCollectionAgency, - [ database, shard, collInfo, error ]); - } } + if (changed2) { + writeLocked({ part: "Current" }, + createCollectionAgency, + [ database, shard, collInfo, error ]); + didWrite = true; + } + } + if (takeOverResponsibility && !didWrite) { + console.info("HMMMM WRITE"); + writeLocked({ part: "Current" }, + takeOver, + [ database, shard, collInfo, error ]); } } } - collInfo.id = save[0]; - collInfo.name = save[1]; } - } + collInfo.id = save[0]; + collInfo.name = save[1]; + + }); } catch (err) { // always return to previous database @@ -1062,20 +1083,20 @@ function synchronizeLocalFollowerCollections (plannedCollections) { /// @brief handle collection changes //////////////////////////////////////////////////////////////////////////////// -function handleCollectionChanges (plan) { +function handleCollectionChanges (plan, takeOverResponsibility) { var plannedCollections = getByPrefix3d(plan, "Plan/Collections/"); var ok = true; try { - createLocalCollections(plannedCollections, plan["Plan/Version"]); + createLocalCollections(plannedCollections, plan["Plan/Version"], takeOverResponsibility); dropLocalCollections(plannedCollections); cleanupCurrentCollections(plannedCollections); synchronizeLocalFollowerCollections(plannedCollections); } catch (err) { console.error("Caught error in handleCollectionChanges: " + - JSON.stringify(err)); + JSON.stringify(err), JSON.stringify(err.stack)); ok = false; } return ok; @@ -1183,8 +1204,11 @@ function handleChanges (plan, current) { } else { // role === "SECONDARY" if (plan.hasOwnProperty("Plan/DBServers/"+myId)) { - // Ooops! We are now a primary! changed = ArangoServerState.redetermineRole(); + if (!changed) { + // mop: oops...changing role has failed. retry next time. + return false; + } } else { var found = null; @@ -1219,7 +1243,7 @@ function handleChanges (plan, current) { if (role === "PRIMARY" || role === "COORDINATOR") { // Note: This is only ever called for DBservers (primary and secondary), // we keep the coordinator case here just in case... - success = handleCollectionChanges(plan, current); + success = handleCollectionChanges(plan, changed); } else { success = setupReplication(); From 970f38390be82cf6267261978545b21f9f0fea20 Mon Sep 17 00:00:00 2001 From: Frank Celler Date: Fri, 18 Mar 2016 09:34:49 +0100 Subject: [PATCH 4/6] removed transaction key --- js/client/modules/@arangodb/testing.js | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/js/client/modules/@arangodb/testing.js b/js/client/modules/@arangodb/testing.js index 79d3f49cc7..90d4062a87 100644 --- a/js/client/modules/@arangodb/testing.js +++ b/js/client/modules/@arangodb/testing.js @@ -1,4 +1,4 @@ -/*jshint strict: false, sub: true */ +*jshint strict: false, sub: true */ /*global print, arango */ 'use strict'; @@ -2347,7 +2347,8 @@ testFuncs.arangob = function(options) { break; } - let args = benchTodo; + let args = _.clone(benchTodo); + delete args.transaction; if (options.hasOwnProperty('benchargs')) { args = _.extend(args, options.benchargs); From def5b35a844f7a7c09572ea9a2a2ab20a3243ac5 Mon Sep 17 00:00:00 2001 From: Frank Celler Date: Fri, 18 Mar 2016 11:02:42 +0100 Subject: [PATCH 5/6] jslint fixes --- js/actions/api-cluster.js | 2 +- js/client/modules/@arangodb/testing.js | 2 +- js/server/modules/@arangodb/cluster.js | 1 - 3 files changed, 2 insertions(+), 3 deletions(-) diff --git a/js/actions/api-cluster.js b/js/actions/api-cluster.js index 03e5324c07..d6a84e7b17 100644 --- a/js/actions/api-cluster.js +++ b/js/actions/api-cluster.js @@ -932,7 +932,7 @@ function changeAllShardReponsibilities (oldServer, newServer) { Object.keys(collection.shards).forEach(function(shardKey) { var servers = collection.shards[shardKey]; collection.shards[shardKey] = servers.map(function(server) { - if (server == oldServer) { + if (server === oldServer) { return newServer; } else { return server; diff --git a/js/client/modules/@arangodb/testing.js b/js/client/modules/@arangodb/testing.js index 90d4062a87..8cf3d3153d 100644 --- a/js/client/modules/@arangodb/testing.js +++ b/js/client/modules/@arangodb/testing.js @@ -1,4 +1,4 @@ -*jshint strict: false, sub: true */ +/*jshint strict: false, sub: true */ /*global print, arango */ 'use strict'; diff --git a/js/server/modules/@arangodb/cluster.js b/js/server/modules/@arangodb/cluster.js index 8c8f205023..f7f3f8ff58 100644 --- a/js/server/modules/@arangodb/cluster.js +++ b/js/server/modules/@arangodb/cluster.js @@ -581,7 +581,6 @@ function createLocalCollections (plannedCollections, planVersion, takeOverRespon var localCollections = getLocalCollections(); var collections = plannedCollections[database]; - var collection; // diff the collections Object.keys(collections).forEach(function(collection) { From 172ff37b43af83e470188855b79a40969018fbd8 Mon Sep 17 00:00:00 2001 From: Michael Hackstein Date: Fri, 18 Mar 2016 11:07:53 +0100 Subject: [PATCH 6/6] Fixed used after free in IndexBlock --- arangod/Aql/IndexBlock.cpp | 11 ++++++----- arangod/Aql/IndexBlock.h | 6 ++++++ 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/arangod/Aql/IndexBlock.cpp b/arangod/Aql/IndexBlock.cpp index abe4539ebe..8ac5dace6e 100644 --- a/arangod/Aql/IndexBlock.cpp +++ b/arangod/Aql/IndexBlock.cpp @@ -42,6 +42,7 @@ using Json = arangodb::basics::Json; IndexBlock::IndexBlock(ExecutionEngine* engine, IndexNode const* en) : ExecutionBlock(engine, en), _collection(en->collection()), + _result(std::make_shared(TRI_ERROR_NO_ERROR)), _posInDocs(0), _currentIndex(0), _indexes(en->getIndexes()), @@ -377,17 +378,17 @@ bool IndexBlock::readIndex(size_t atMost) { bool isReverse = (static_cast(getPlanNode()))->_reverse; bool isLastIndex = (_currentIndex == lastIndexNr && !isReverse) || (_currentIndex == 0 && isReverse); - auto result = std::make_shared(TRI_ERROR_NO_ERROR); + auto _result = std::make_shared(TRI_ERROR_NO_ERROR); while (_cursor != nullptr) { if (!_cursor->hasMore()) { startNextCursor(); continue; } - _cursor->getMore(result, atMost, true); - if (result->failed()) { - THROW_ARANGO_EXCEPTION(result->code); + _cursor->getMore(_result, atMost, true); + if (_result->failed()) { + THROW_ARANGO_EXCEPTION(_result->code); } - VPackSlice slice = result->slice(); + VPackSlice slice = _result->slice(); TRI_ASSERT(slice.isArray()); size_t length = static_cast(slice.length()); diff --git a/arangod/Aql/IndexBlock.h b/arangod/Aql/IndexBlock.h index b3c2387b05..1ac75db323 100644 --- a/arangod/Aql/IndexBlock.h +++ b/arangod/Aql/IndexBlock.h @@ -130,6 +130,12 @@ class IndexBlock : public ExecutionBlock { Collection const* _collection; + ////////////////////////////////////////////////////////////////////////////// + /// @brief document result + ////////////////////////////////////////////////////////////////////////////// + + std::shared_ptr _result; + ////////////////////////////////////////////////////////////////////////////// /// @brief document buffer //////////////////////////////////////////////////////////////////////////////