1
0
Fork 0

Merge branch 'spdvpk' of ssh://github.com/ArangoDB/ArangoDB into spdvpk

This commit is contained in:
Max Neunhoeffer 2016-03-18 11:09:04 +01:00
commit a9b47e1608
9 changed files with 374 additions and 309 deletions

View File

@ -42,6 +42,7 @@ using Json = arangodb::basics::Json;
IndexBlock::IndexBlock(ExecutionEngine* engine, IndexNode const* en) IndexBlock::IndexBlock(ExecutionEngine* engine, IndexNode const* en)
: ExecutionBlock(engine, en), : ExecutionBlock(engine, en),
_collection(en->collection()), _collection(en->collection()),
_result(std::make_shared<OperationResult>(TRI_ERROR_NO_ERROR)),
_posInDocs(0), _posInDocs(0),
_currentIndex(0), _currentIndex(0),
_indexes(en->getIndexes()), _indexes(en->getIndexes()),
@ -377,17 +378,17 @@ bool IndexBlock::readIndex(size_t atMost) {
bool isReverse = (static_cast<IndexNode const*>(getPlanNode()))->_reverse; bool isReverse = (static_cast<IndexNode const*>(getPlanNode()))->_reverse;
bool isLastIndex = (_currentIndex == lastIndexNr && !isReverse) || bool isLastIndex = (_currentIndex == lastIndexNr && !isReverse) ||
(_currentIndex == 0 && isReverse); (_currentIndex == 0 && isReverse);
auto result = std::make_shared<OperationResult>(TRI_ERROR_NO_ERROR); auto _result = std::make_shared<OperationResult>(TRI_ERROR_NO_ERROR);
while (_cursor != nullptr) { while (_cursor != nullptr) {
if (!_cursor->hasMore()) { if (!_cursor->hasMore()) {
startNextCursor(); startNextCursor();
continue; continue;
} }
_cursor->getMore(result, atMost, true); _cursor->getMore(_result, atMost, true);
if (result->failed()) { if (_result->failed()) {
THROW_ARANGO_EXCEPTION(result->code); THROW_ARANGO_EXCEPTION(_result->code);
} }
VPackSlice slice = result->slice(); VPackSlice slice = _result->slice();
TRI_ASSERT(slice.isArray()); TRI_ASSERT(slice.isArray());
size_t length = static_cast<size_t>(slice.length()); size_t length = static_cast<size_t>(slice.length());

View File

@ -130,6 +130,12 @@ class IndexBlock : public ExecutionBlock {
Collection const* _collection; Collection const* _collection;
//////////////////////////////////////////////////////////////////////////////
/// @brief document result
//////////////////////////////////////////////////////////////////////////////
std::shared_ptr<OperationResult> _result;
////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////
/// @brief document buffer /// @brief document buffer
////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////

View File

@ -355,16 +355,15 @@ bool ApplicationCluster::open() {
return true; return true;
} }
ServerState::RoleEnum role = ServerState::instance()->getRole();
// tell the agency that we are ready
{
AgencyComm comm; AgencyComm comm;
AgencyCommResult result; AgencyCommResult result;
bool success;
do {
AgencyCommLocker locker("Current", "WRITE"); AgencyCommLocker locker("Current", "WRITE");
if (locker.successful()) { success = locker.successful();
if (success) {
VPackBuilder builder; VPackBuilder builder;
try { try {
VPackObjectBuilder b(&builder); VPackObjectBuilder b(&builder);
@ -382,68 +381,21 @@ bool ApplicationCluster::open() {
LOG(FATAL) << "unable to register server in agency: http code: " << result.httpCode() << ", body: " << result.body(); FATAL_ERROR_EXIT(); LOG(FATAL) << "unable to register server in agency: http code: " << result.httpCode() << ", body: " << result.body(); FATAL_ERROR_EXIT();
} }
if (success) {
break;
}
sleep(1);
} while (true);
ServerState::RoleEnum role = ServerState::instance()->getRole();
if (role == ServerState::ROLE_COORDINATOR) { if (role == ServerState::ROLE_COORDINATOR) {
VPackBuilder builder;
try {
builder.add(VPackValue("none"));
} catch (...) {
locker.unlock();
LOG(FATAL) << "out of memory"; FATAL_ERROR_EXIT();
}
ServerState::instance()->setState(ServerState::STATE_SERVING); ServerState::instance()->setState(ServerState::STATE_SERVING);
// register coordinator
AgencyCommResult result =
comm.setValue("Current/Coordinators/" + _myId, builder.slice(), 0.0);
if (!result.successful()) {
locker.unlock();
LOG(FATAL) << "unable to register coordinator in agency"; FATAL_ERROR_EXIT();
}
} else if (role == ServerState::ROLE_PRIMARY) { } else if (role == ServerState::ROLE_PRIMARY) {
VPackBuilder builder;
try {
builder.add(VPackValue("none"));
} catch (...) {
locker.unlock();
LOG(FATAL) << "out of memory"; FATAL_ERROR_EXIT();
}
ServerState::instance()->setState(ServerState::STATE_SERVINGASYNC); ServerState::instance()->setState(ServerState::STATE_SERVINGASYNC);
// register server
AgencyCommResult result =
comm.setValue("Current/DBServers/" + _myId, builder.slice(), 0.0);
if (!result.successful()) {
locker.unlock();
LOG(FATAL) << "unable to register db server in agency"; FATAL_ERROR_EXIT();
}
} else if (role == ServerState::ROLE_SECONDARY) { } else if (role == ServerState::ROLE_SECONDARY) {
std::string keyName = std::string("\"") + _myId + std::string("\"");
VPackBuilder builder;
try {
builder.add(VPackValue(keyName));
} catch (...) {
locker.unlock();
LOG(FATAL) << "out of memory"; FATAL_ERROR_EXIT();
}
ServerState::instance()->setState(ServerState::STATE_SYNCING); ServerState::instance()->setState(ServerState::STATE_SYNCING);
// register server
AgencyCommResult result = comm.setValue(
"Current/DBServers/" + ServerState::instance()->getPrimaryId(),
builder.slice(), 0.0);
if (!result.successful()) {
locker.unlock();
LOG(FATAL) << "unable to register secondary db server in agency"; FATAL_ERROR_EXIT();
} }
}
}
return true; return true;
} }

View File

@ -37,6 +37,12 @@ using namespace arangodb::basics;
/// running /// running
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
static bool isClusterRole(ServerState::RoleEnum role) {
return (role == ServerState::ROLE_PRIMARY ||
role == ServerState::ROLE_SECONDARY ||
role == ServerState::ROLE_COORDINATOR);
}
static ServerState Instance; static ServerState Instance;
ServerState::ServerState() ServerState::ServerState()
@ -172,10 +178,25 @@ void ServerState::setAuthentication(std::string const& username,
std::string ServerState::getAuthentication() { return _authentication; } std::string ServerState::getAuthentication() { return _authentication; }
////////////////////////////////////////////////////////////////////////////////
/// @brief find and set our role
////////////////////////////////////////////////////////////////////////////////
void ServerState::findAndSetRoleBlocking() {
while (true) {
auto role = determineRole(_localInfo, _id);
std::string roleString = roleToString(role);
LOG(DEBUG) << "Found my role: " << roleString;
if (storeRole(role)) {
break;
}
sleep(1);
}
}
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @brief flush the server state (used for testing) /// @brief flush the server state (used for testing)
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
void ServerState::flush() { void ServerState::flush() {
{ {
WRITE_LOCKER(writeLocker, _lock); WRITE_LOCKER(writeLocker, _lock);
@ -187,7 +208,7 @@ void ServerState::flush() {
_address = ClusterInfo::instance()->getTargetServerEndpoint(_id); _address = ClusterInfo::instance()->getTargetServerEndpoint(_id);
} }
storeRole(determineRole(_localInfo, _id)); findAndSetRoleBlocking();
} }
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
@ -228,9 +249,7 @@ bool ServerState::isDBServer(ServerState::RoleEnum role) {
bool ServerState::isRunningInCluster() { bool ServerState::isRunningInCluster() {
auto role = loadRole(); auto role = loadRole();
return (role == ServerState::ROLE_PRIMARY || return isClusterRole(role);
role == ServerState::ROLE_SECONDARY ||
role == ServerState::ROLE_COORDINATOR);
} }
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
@ -272,15 +291,8 @@ ServerState::RoleEnum ServerState::getRole() {
LOG(DEBUG) << "Have stored " << builder.slice().toJson() << " under Current/NewServers/" << _localInfo << " in agency."; LOG(DEBUG) << "Have stored " << builder.slice().toJson() << " under Current/NewServers/" << _localInfo << " in agency.";
} }
// role not yet set findAndSetRoleBlocking();
role = determineRole(info, id); return loadRole();
std::string roleString = roleToString(role);
LOG(DEBUG) << "Found my role: " << roleString;
storeRole(role);
return role;
} }
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
@ -351,8 +363,8 @@ bool ServerState::registerWithRole(ServerState::RoleEnum role) {
} }
_id = id; _id = id;
storeRole(role);
findAndSetRoleBlocking();
LOG(DEBUG) << "We successfully announced ourselves as " << roleToString(role) << " and our id is " << id; LOG(DEBUG) << "We successfully announced ourselves as " << roleToString(role) << " and our id is " << id;
return true; return true;
@ -779,7 +791,9 @@ bool ServerState::redetermineRole() {
RoleEnum oldRole = loadRole(); RoleEnum oldRole = loadRole();
if (role != oldRole) { if (role != oldRole) {
LOG(INFO) << "Changed role to: " << roleString; LOG(INFO) << "Changed role to: " << roleString;
storeRole(role); if (!storeRole(role)) {
return false;
}
return true; return true;
} }
if (_idOfPrimary != saveIdOfPrimary) { if (_idOfPrimary != saveIdOfPrimary) {
@ -807,20 +821,11 @@ ServerState::RoleEnum ServerState::determineRole(std::string const& info,
setId(id); setId(id);
} }
ServerState::RoleEnum role = checkServersList(id); ServerState::RoleEnum role = checkCoordinatorsList(id);
ServerState::RoleEnum role2 = checkCoordinatorsList(id);
if (role == ServerState::ROLE_UNDEFINED) { if (role == ServerState::ROLE_UNDEFINED) {
// role is still unknown. check if we are a coordinator role = checkServersList(id);
role = role2;
} else {
// we are a primary or a secondary.
// now we double-check that we are not a coordinator as well
if (role2 != ServerState::ROLE_UNDEFINED) {
role = ServerState::ROLE_UNDEFINED;
} }
} // mop: role might still be undefined
return role; return role;
} }
@ -1056,3 +1061,81 @@ ServerState::RoleEnum ServerState::checkServersList(std::string const& id) {
return role; return role;
} }
//////////////////////////////////////////////////////////////////////////////
/// @brief store the server role
//////////////////////////////////////////////////////////////////////////////
bool ServerState::storeRole(RoleEnum role)
{
if (isClusterRole(role)) {
AgencyComm comm;
AgencyCommResult result;
std::unique_ptr<AgencyCommLocker> locker;
locker.reset(new AgencyCommLocker("Current", "WRITE"));
if (!locker->successful()) {
return false;
}
if (role == ServerState::ROLE_COORDINATOR) {
VPackBuilder builder;
try {
builder.add(VPackValue("none"));
} catch (...) {
locker->unlock();
LOG(FATAL) << "out of memory"; FATAL_ERROR_EXIT();
}
// register coordinator
AgencyCommResult result =
comm.setValue("Current/Coordinators/" + _id, builder.slice(), 0.0);
if (!result.successful()) {
locker->unlock();
LOG(FATAL) << "unable to register coordinator in agency"; FATAL_ERROR_EXIT();
}
} else if (role == ServerState::ROLE_PRIMARY) {
VPackBuilder builder;
try {
builder.add(VPackValue("none"));
} catch (...) {
locker->unlock();
LOG(FATAL) << "out of memory"; FATAL_ERROR_EXIT();
}
// register server
AgencyCommResult result =
comm.setValue("Current/DBServers/" + _id, builder.slice(), 0.0);
if (!result.successful()) {
locker->unlock();
LOG(FATAL) << "unable to register db server in agency"; FATAL_ERROR_EXIT();
}
} else if (role == ServerState::ROLE_SECONDARY) {
std::string keyName = _id;
VPackBuilder builder;
try {
builder.add(VPackValue(keyName));
} catch (...) {
locker->unlock();
LOG(FATAL) << "out of memory"; FATAL_ERROR_EXIT();
}
// register server
AgencyCommResult result = comm.casValue(
"Current/DBServers/" + ServerState::instance()->getPrimaryId(),
builder.slice(),
true,
0.0,
0.0);
if (!result.successful()) {
locker->unlock();
// mop: fail gracefully (allow retry)
return false;
}
}
}
_role.store(role, std::memory_order_release);
return true;
}

View File

@ -376,12 +376,14 @@ class ServerState {
} }
////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////
/// @brief atomically stores the server role /// @brief determine role and save role blocking
////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////
void findAndSetRoleBlocking();
void storeRole(RoleEnum role) { //////////////////////////////////////////////////////////////////////////////
_role.store(role, std::memory_order_release); /// @brief store the server role
} //////////////////////////////////////////////////////////////////////////////
bool storeRole(RoleEnum role);
////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////
/// @brief determine the server role /// @brief determine the server role

View File

@ -34,6 +34,7 @@
var actions = require("@arangodb/actions"); var actions = require("@arangodb/actions");
var cluster = require("@arangodb/cluster"); var cluster = require("@arangodb/cluster");
var internal = require("internal"); var internal = require("internal");
var _ = require("lodash");
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
@ -921,42 +922,34 @@ actions.defineHttp({
function changeAllShardReponsibilities (oldServer, newServer) { function changeAllShardReponsibilities (oldServer, newServer) {
// This is only called when we have the write lock and we "only" have to // This is only called when we have the write lock and we "only" have to
// make sure that either all or none of the shards are moved. // make sure that either all or none of the shards are moved.
var l = ArangoAgency.get("Plan/Collections", true, false); var collections = ArangoAgency.get("Plan/Collections", true, false);
var ll = Object.keys(l); var done = {};
try {
Object.keys(collections).forEach(function(collectionKey) {
var collection = collections[collectionKey];
var old = _.cloneDeep(collection);
var i = 0; Object.keys(collection.shards).forEach(function(shardKey) {
var c; var servers = collection.shards[shardKey];
var oldShards = []; collection.shards[shardKey] = servers.map(function(server) {
var shards; if (server === oldServer) {
var names; return newServer;
var j; } else {
return server;
}
});
});
ArangoAgency.set(collectionKey, collection, 0);
done[collectionKey] = old;
});
} catch (e) {
// mop: rollback
try { try {
while (i < ll.length) { Object.keys(done).forEach(function(collectionKey) {
c = l[ll[i]]; // A collection entry ArangoAgency.set(collectionKey, done[collectionKey], 0);
shards = c.shards; });
names = Object.keys(shards); } catch (e2) {
// Poor man's deep copy: console.error("Got error during rolback", e2);
oldShards.push(JSON.parse(JSON.stringify(shards)));
for (j = 0; j < names.length; j++) {
if (shards[names[j]] === oldServer) {
shards[names[j]] = newServer;
}
}
ArangoAgency.set(ll[i], c, 0);
i += 1;
}
}
catch (e) {
i -= 1;
while (i >= 0) {
c = l[ll[i]];
c.shards = oldShards[i];
try {
ArangoAgency.set(ll[i], c, 0);
}
catch (e2) {
}
i -= 1;
} }
throw e; throw e;
} }

View File

@ -2357,7 +2357,8 @@ testFuncs.arangob = function(options) {
break; break;
} }
let args = benchTodo; let args = _.clone(benchTodo);
delete args.transaction;
if (options.hasOwnProperty('benchargs')) { if (options.hasOwnProperty('benchargs')) {
args = _.extend(args, options.benchargs); args = _.extend(args, options.benchargs);

View File

@ -541,7 +541,7 @@ function handleDatabaseChanges (plan) {
/// @brief create collections if they exist in the plan but not locally /// @brief create collections if they exist in the plan but not locally
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
function createLocalCollections (plannedCollections, planVersion) { function createLocalCollections (plannedCollections, planVersion, takeOverResponsibility) {
var ourselves = global.ArangoServerState.id(); var ourselves = global.ArangoServerState.id();
var createCollectionAgency = function (database, shard, collInfo, error) { var createCollectionAgency = function (database, shard, collInfo, error) {
@ -551,11 +551,16 @@ function createLocalCollections (plannedCollections, planVersion) {
indexes: collInfo.indexes, indexes: collInfo.indexes,
servers: [ ourselves ], servers: [ ourselves ],
planVersion: planVersion }; planVersion: planVersion };
global.ArangoAgency.set("Current/Collections/" + database + "/" + global.ArangoAgency.set("Current/Collections/" + database + "/" +
collInfo.planId + "/" + shard, collInfo.planId + "/" + shard,
payload); payload);
}; };
// mop: just a function alias but this way one at least knows what it is supposed to do :S
var takeOver = createCollectionAgency;
var db = require("internal").db; var db = require("internal").db;
db._useDatabase("_system"); db._useDatabase("_system");
var localDatabases = getLocalDatabases(); var localDatabases = getLocalDatabases();
@ -576,11 +581,9 @@ function createLocalCollections (plannedCollections, planVersion) {
var localCollections = getLocalCollections(); var localCollections = getLocalCollections();
var collections = plannedCollections[database]; var collections = plannedCollections[database];
var collection;
// diff the collections // diff the collections
for (collection in collections) { Object.keys(collections).forEach(function(collection) {
if (collections.hasOwnProperty(collection)) {
var collInfo = collections[collection]; var collInfo = collections[collection];
var shards = collInfo.shards; var shards = collInfo.shards;
var shard; var shard;
@ -592,6 +595,7 @@ function createLocalCollections (plannedCollections, planVersion) {
for (shard in shards) { for (shard in shards) {
if (shards.hasOwnProperty(shard)) { if (shards.hasOwnProperty(shard)) {
var didWrite = false;
if (shards[shard][0] === ourselves) { if (shards[shard][0] === ourselves) {
// found a shard we are responsible for // found a shard we are responsible for
@ -628,6 +632,7 @@ function createLocalCollections (plannedCollections, planVersion) {
writeLocked({ part: "Current" }, writeLocked({ part: "Current" },
createCollectionAgency, createCollectionAgency,
[ database, shard, collInfo, error ]); [ database, shard, collInfo, error ]);
didWrite = true;
} }
else { else {
if (localCollections[shard].status !== collInfo.status) { if (localCollections[shard].status !== collInfo.status) {
@ -650,6 +655,7 @@ function createLocalCollections (plannedCollections, planVersion) {
writeLocked({ part: "Current" }, writeLocked({ part: "Current" },
createCollectionAgency, createCollectionAgency,
[ database, shard, collInfo, error ]); [ database, shard, collInfo, error ]);
didWrite = true;
} }
// collection exists, now compare collection properties // collection exists, now compare collection properties
@ -679,10 +685,16 @@ function createLocalCollections (plannedCollections, planVersion) {
writeLocked({ part: "Current" }, writeLocked({ part: "Current" },
createCollectionAgency, createCollectionAgency,
[ database, shard, collInfo, error ]); [ database, shard, collInfo, error ]);
didWrite = true;
} }
} }
if (error.error) { if (error.error) {
if (takeOverResponsibility && !didWrite) {
writeLocked({ part: "Current" },
takeOver,
[ database, shard, collInfo, error ]);
}
continue; // No point to look for properties and continue; // No point to look for properties and
// indices, if the creation has not worked // indices, if the creation has not worked
} }
@ -722,6 +734,7 @@ function createLocalCollections (plannedCollections, planVersion) {
writeLocked({ part: "Current" }, writeLocked({ part: "Current" },
createCollectionAgency, createCollectionAgency,
[ database, shard, collInfo, error ]); [ database, shard, collInfo, error ]);
didWrite = true;
} }
} }
@ -761,16 +774,23 @@ function createLocalCollections (plannedCollections, planVersion) {
writeLocked({ part: "Current" }, writeLocked({ part: "Current" },
createCollectionAgency, createCollectionAgency,
[ database, shard, collInfo, error ]); [ database, shard, collInfo, error ]);
didWrite = true;
} }
} }
if (takeOverResponsibility && !didWrite) {
console.info("HMMMM WRITE");
writeLocked({ part: "Current" },
takeOver,
[ database, shard, collInfo, error ]);
}
} }
} }
} }
collInfo.id = save[0]; collInfo.id = save[0];
collInfo.name = save[1]; collInfo.name = save[1];
}
} });
} }
catch (err) { catch (err) {
// always return to previous database // always return to previous database
@ -1062,20 +1082,20 @@ function synchronizeLocalFollowerCollections (plannedCollections) {
/// @brief handle collection changes /// @brief handle collection changes
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
function handleCollectionChanges (plan) { function handleCollectionChanges (plan, takeOverResponsibility) {
var plannedCollections = getByPrefix3d(plan, "Plan/Collections/"); var plannedCollections = getByPrefix3d(plan, "Plan/Collections/");
var ok = true; var ok = true;
try { try {
createLocalCollections(plannedCollections, plan["Plan/Version"]); createLocalCollections(plannedCollections, plan["Plan/Version"], takeOverResponsibility);
dropLocalCollections(plannedCollections); dropLocalCollections(plannedCollections);
cleanupCurrentCollections(plannedCollections); cleanupCurrentCollections(plannedCollections);
synchronizeLocalFollowerCollections(plannedCollections); synchronizeLocalFollowerCollections(plannedCollections);
} }
catch (err) { catch (err) {
console.error("Caught error in handleCollectionChanges: " + console.error("Caught error in handleCollectionChanges: " +
JSON.stringify(err)); JSON.stringify(err), JSON.stringify(err.stack));
ok = false; ok = false;
} }
return ok; return ok;
@ -1183,8 +1203,11 @@ function handleChanges (plan, current) {
} }
else { // role === "SECONDARY" else { // role === "SECONDARY"
if (plan.hasOwnProperty("Plan/DBServers/"+myId)) { if (plan.hasOwnProperty("Plan/DBServers/"+myId)) {
// Ooops! We are now a primary!
changed = ArangoServerState.redetermineRole(); changed = ArangoServerState.redetermineRole();
if (!changed) {
// mop: oops...changing role has failed. retry next time.
return false;
}
} }
else { else {
var found = null; var found = null;
@ -1219,7 +1242,7 @@ function handleChanges (plan, current) {
if (role === "PRIMARY" || role === "COORDINATOR") { if (role === "PRIMARY" || role === "COORDINATOR") {
// Note: This is only ever called for DBservers (primary and secondary), // Note: This is only ever called for DBservers (primary and secondary),
// we keep the coordinator case here just in case... // we keep the coordinator case here just in case...
success = handleCollectionChanges(plan, current); success = handleCollectionChanges(plan, changed);
} }
else { else {
success = setupReplication(); success = setupReplication();

View File

@ -219,7 +219,11 @@ Endpoint* Endpoint::factory(const Endpoint::EndpointType type,
return nullptr; return nullptr;
} }
#ifndef _WIN32
return new EndpointSrv(specification.substr(6)); return new EndpointSrv(specification.substr(6));
#else
return nullptr;
#endif
} }
else if (!StringUtils::isPrefix(domainType, "tcp://")) { else if (!StringUtils::isPrefix(domainType, "tcp://")) {