mirror of https://gitee.com/bigwinds/arangodb
Fix secondaries
This commit is contained in:
parent
10e653a5d4
commit
1bb8f97773
|
@ -360,18 +360,18 @@ bool ServerState::registerShortName(std::string const& id, ServerState::RoleEnum
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief try to integrate into a cluster
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
bool ServerState::integrateIntoCluster(ServerState::RoleEnum const& role,
|
||||
bool ServerState::integrateIntoCluster(ServerState::RoleEnum role,
|
||||
std::string const& myAddress,
|
||||
std::string const& myId) {
|
||||
// id supplied via command line this is deprecated
|
||||
if (!myId.empty()) {
|
||||
if (!hasPersistedId()) {
|
||||
setId(myId);
|
||||
ServerState::RoleEnum roleInAgency = getRole();
|
||||
role = getRole();
|
||||
|
||||
// we are known to the agency under our old id!
|
||||
if (roleInAgency != ServerState::ROLE_UNDEFINED) {
|
||||
registerShortName(myId, roleInAgency);
|
||||
if (role != ServerState::ROLE_UNDEFINED) {
|
||||
registerShortName(myId, role);
|
||||
writePersistedId(myId);
|
||||
} else {
|
||||
LOG_TOPIC(FATAL, Logger::STARTUP) << "started with --cluster.my-id but id unknown in agency!";
|
||||
|
@ -397,49 +397,14 @@ bool ServerState::integrateIntoCluster(ServerState::RoleEnum const& role,
|
|||
}
|
||||
setId(id);
|
||||
|
||||
registerAtAgency(comm, role, id);
|
||||
if (!registerAtAgency(comm, role, id)) {
|
||||
FATAL_ERROR_EXIT();
|
||||
}
|
||||
|
||||
const std::string agencyKey = roleToAgencyKey(role);
|
||||
const std::string planKey = "Plan/" + agencyKey + "/" + id;
|
||||
const std::string currentKey = "Current/" + agencyKey + "/" + id;
|
||||
|
||||
auto builder = std::make_shared<VPackBuilder>();
|
||||
result = comm.getValues(planKey);
|
||||
bool found = true;
|
||||
if (!result.successful()) {
|
||||
found = false;
|
||||
} else {
|
||||
VPackSlice plan = result.slice()[0].get(std::vector<std::string>(
|
||||
{AgencyCommManager::path(), "Plan", agencyKey, id}));
|
||||
if (!plan.isString()) {
|
||||
found = false;
|
||||
} else {
|
||||
builder->add(plan);
|
||||
}
|
||||
}
|
||||
if (!found) {
|
||||
// mop: hmm ... we are registered but not part of the Plan :O
|
||||
// create a plan for ourselves :)
|
||||
builder->add(VPackValue("none"));
|
||||
|
||||
VPackSlice plan = builder->slice();
|
||||
|
||||
comm.setValue(planKey, plan, 0.0);
|
||||
if (!result.successful()) {
|
||||
LOG_TOPIC(ERR, Logger::CLUSTER) << "Couldn't create plan "
|
||||
<< result.errorMessage();
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
result = comm.setValue(currentKey, builder->slice(), 0.0);
|
||||
|
||||
if (!result.successful()) {
|
||||
LOG_TOPIC(ERR, Logger::CLUSTER) << "Could not talk to agency! "
|
||||
<< result.errorMessage();
|
||||
return false;
|
||||
}
|
||||
|
||||
_id = id;
|
||||
|
||||
findAndSetRoleBlocking();
|
||||
|
@ -554,7 +519,7 @@ bool ServerState::registerAtAgency(AgencyComm& comm,
|
|||
if (!result.successful()) {
|
||||
LOG_TOPIC(FATAL, Logger::STARTUP) << "Couldn't fetch Plan/" << agencyKey
|
||||
<< " from agency. Agency is not initialized?";
|
||||
FATAL_ERROR_EXIT();
|
||||
return false;
|
||||
}
|
||||
|
||||
VPackSlice servers = result.slice()[0].get(
|
||||
|
@ -562,7 +527,7 @@ bool ServerState::registerAtAgency(AgencyComm& comm,
|
|||
if (!servers.isObject()) {
|
||||
LOG_TOPIC(FATAL, Logger::STARTUP) << "Plan/" << agencyKey << " in agency is no object. "
|
||||
<< "Agency not initialized?";
|
||||
FATAL_ERROR_EXIT();
|
||||
return false;
|
||||
}
|
||||
|
||||
VPackSlice entry = servers.get(id);
|
||||
|
@ -570,16 +535,28 @@ bool ServerState::registerAtAgency(AgencyComm& comm,
|
|||
<< id << " found in existing keys: " << (!entry.isNone());
|
||||
|
||||
std::string planUrl = "Plan/" + agencyKey + "/" + id;
|
||||
std::string currentUrl = "Current/" + agencyKey + "/" + id;
|
||||
|
||||
AgencyGeneralTransaction reg;
|
||||
reg.operations.push_back( // Plan entry if not exists
|
||||
operationType(
|
||||
AgencyOperation(planUrl, AgencyValueOperationType::SET, builder.slice()),
|
||||
AgencyPrecondition(planUrl, AgencyPrecondition::Type::EMPTY, true)));
|
||||
|
||||
reg.operations.push_back( // Current entry if not exists
|
||||
operationType(
|
||||
AgencyOperation(currentUrl, AgencyValueOperationType::SET, builder.slice()),
|
||||
AgencyPrecondition(currentUrl, AgencyPrecondition::Type::EMPTY, true)));
|
||||
|
||||
// ok to fail (at least that was how it was before :S)
|
||||
// XXX this should probably be sent as part of the transaction below
|
||||
// ok to fail..if it failed we are already registered
|
||||
comm.sendTransactionWithFailover(reg, 0.0);
|
||||
} else {
|
||||
std::string currentUrl = "Current/" + agencyKey + "/" + _idOfPrimary;
|
||||
AgencyCommResult result = comm.setValue(currentUrl, id, 0.0);
|
||||
if (!result.successful()) {
|
||||
LOG_TOPIC(FATAL, Logger::STARTUP) << "Could not register ourselves as secondary in Current";
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
std::string targetIdStr =
|
||||
|
@ -660,7 +637,6 @@ bool ServerState::registerAtAgency(AgencyComm& comm,
|
|||
}
|
||||
|
||||
LOG_TOPIC(FATAL, Logger::STARTUP) << "Couldn't register shortname for " << id;
|
||||
FATAL_ERROR_EXIT();
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -942,6 +918,17 @@ bool ServerState::redetermineRole() {
|
|||
RoleEnum oldRole = loadRole();
|
||||
if (role != oldRole) {
|
||||
LOG_TOPIC(INFO, Logger::CLUSTER) << "Changed role to: " << roleString;
|
||||
if (oldRole == ROLE_PRIMARY && role == ROLE_SECONDARY) {
|
||||
std::string oldId("Current/DBServers/" + _id);
|
||||
AgencyOperation del(oldId, AgencySimpleOperationType::DELETE_OP);
|
||||
AgencyOperation incrementVersion("Current/Version",
|
||||
AgencySimpleOperationType::INCREMENT_OP);
|
||||
|
||||
AgencyWriteTransaction trx(std::vector<AgencyOperation> {del, incrementVersion});
|
||||
|
||||
AgencyComm comm;
|
||||
comm.sendTransactionWithFailover(trx, 0.0);
|
||||
}
|
||||
if (!storeRole(role)) {
|
||||
return false;
|
||||
}
|
||||
|
@ -1245,7 +1232,7 @@ bool ServerState::storeRole(RoleEnum role) {
|
|||
ServerState::instance()->getPrimaryId());
|
||||
AgencyOperation addMe(myId, AgencyValueOperationType::SET,
|
||||
builder.slice());
|
||||
AgencyOperation incrementVersion("Plan/Version",
|
||||
AgencyOperation incrementVersion("Current/Version",
|
||||
AgencySimpleOperationType::INCREMENT_OP);
|
||||
AgencyPrecondition precondition(myId, AgencyPrecondition::Type::EMPTY, false);
|
||||
trx.reset(new AgencyWriteTransaction({addMe, incrementVersion}, precondition));
|
||||
|
|
|
@ -139,7 +139,7 @@ class ServerState {
|
|||
/// @brief get the server role
|
||||
RoleEnum getRole();
|
||||
|
||||
bool integrateIntoCluster(RoleEnum const&, std::string const&, std::string const&);
|
||||
bool integrateIntoCluster(RoleEnum, std::string const&, std::string const&);
|
||||
|
||||
bool unregister();
|
||||
|
||||
|
|
|
@ -561,7 +561,10 @@ actions.defineHttp({
|
|||
let oldValue = ArangoAgency.get('Plan/DBServers/' + body.primary);
|
||||
actions.resultError(req, res, actions.HTTP_PRECONDITION_FAILED, 0,
|
||||
'Primary does not have the given oldSecondary as ' +
|
||||
'its secondary, current value: ' + JSON.stringify(oldValue));
|
||||
'its secondary, current value: '
|
||||
+ JSON.stringify(
|
||||
fetchKey(oldValue, 'arango', 'Plan', 'DBServers', body.primary)
|
||||
));
|
||||
return;
|
||||
}
|
||||
throw e;
|
||||
|
@ -577,27 +580,32 @@ actions.defineHttp({
|
|||
function changeAllShardReponsibilities (oldServer, newServer) {
|
||||
// This is only called when we have the write lock and we "only" have to
|
||||
// make sure that either all or none of the shards are moved.
|
||||
var collections = ArangoAgency.get('Plan/Collections');
|
||||
collections = collections.arango.Plan.Collections;
|
||||
var databases = ArangoAgency.get('Plan/Collections');
|
||||
databases = databases.arango.Plan.Collections;
|
||||
|
||||
let operations = {};
|
||||
let preconditions = {};
|
||||
Object.keys(collections).forEach(function (collectionKey) {
|
||||
var collection = collections[collectionKey];
|
||||
var old = _.cloneDeep(collection);
|
||||
Object.keys(databases).forEach(function(databaseName) {
|
||||
var collections = databases[databaseName];
|
||||
|
||||
Object.keys(collection.shards).forEach(function (shardKey) {
|
||||
var servers = collection.shards[shardKey];
|
||||
collection.shards[shardKey] = servers.map(function (server) {
|
||||
if (server === oldServer) {
|
||||
return newServer;
|
||||
} else {
|
||||
return server;
|
||||
}
|
||||
Object.keys(collections).forEach(function(collectionKey) {
|
||||
var collection = collections[collectionKey];
|
||||
|
||||
Object.keys(collection.shards).forEach(function (shardKey) {
|
||||
var servers = collection.shards[shardKey];
|
||||
var oldServers = _.cloneDeep(servers);
|
||||
servers = servers.map(function(server) {
|
||||
if (server === oldServer) {
|
||||
return newServer;
|
||||
} else {
|
||||
return server;
|
||||
}
|
||||
});
|
||||
let key = '/arango/Plan/Collections/' + databaseName + '/' + collectionKey + '/shards/' + shardKey;
|
||||
operations[key] = servers;
|
||||
preconditions[key] = {'old': oldServers};
|
||||
});
|
||||
});
|
||||
operations[collectionKey] = collection;
|
||||
preconditions[collectionKey] = old;
|
||||
});
|
||||
return {operations, preconditions};
|
||||
}
|
||||
|
@ -678,25 +686,24 @@ actions.defineHttp({
|
|||
}
|
||||
|
||||
let operations = {};
|
||||
operations['Plan/DBServers/' + body.secondary] = body.primary;
|
||||
operations['Plan/DBServers/' + body.primary] = {'op': 'delete'};
|
||||
operations['Plan/Version'] = {'op': 'increment'};
|
||||
operations['/arango/Plan/DBServers/' + body.secondary] = body.primary;
|
||||
operations['/arango/Plan/DBServers/' + body.primary] = {'op': 'delete'};
|
||||
operations['/arango/Plan/Version'] = {'op': 'increment'};
|
||||
|
||||
let preconditions = {};
|
||||
preconditions['Plan/DBServers/' + body.primary] = {'old': body.secondary};
|
||||
preconditions['/arango/Plan/DBServers/' + body.primary] = {'old': body.secondary};
|
||||
|
||||
let shardChanges = changeAllShardReponsibilities(body.primary, body.secondary);
|
||||
operations = Object.assign(operations, shardChanges.operations);
|
||||
preconditions = Object.assign(preconditions, shardChanges.preconditions);
|
||||
|
||||
|
||||
try {
|
||||
global.ArangoAgency.write([[operations, preconditions]]);
|
||||
} catch (e) {
|
||||
if (e.code === 412) {
|
||||
let oldValue = ArangoAgency.get('Plan/DBServers/' + body.primary);
|
||||
actions.resultError(req, res, actions.HTTP_PRECONDITION_FAILED, 0,
|
||||
'Primary does not have the given oldSecondary as ' +
|
||||
'its secondary, current value: ' + oldValue);
|
||||
'Could not change primary to secondary.')
|
||||
return;
|
||||
}
|
||||
throw e;
|
||||
|
|
|
@ -399,12 +399,14 @@ if [ "$SECONDARIES" == "1" ] ; then
|
|||
let index=1
|
||||
PORTTOPSE=`expr 8729 + $NRDBSERVERS - 1`
|
||||
for PORT in `seq 8729 $PORTTOPSE` ; do
|
||||
let dbserverindex=$index-1
|
||||
mkdir cluster/data$PORT
|
||||
|
||||
CLUSTER_ID="Secondary$index"
|
||||
|
||||
echo Registering secondary $CLUSTER_ID for "DBServer$index"
|
||||
curl -f -X PUT --data "{\"primary\": \"DBServer$index\", \"oldSecondary\": \"none\", \"newSecondary\": \"$CLUSTER_ID\"}" -H "Content-Type: application/json" localhost:$CO_BASE/_admin/cluster/replaceSecondary
|
||||
DBSERVER_ID=$(curl -s 127.0.0.1:$CO_BASE/_admin/cluster/health | jq '.Health | to_entries | map(select(.value.Role == "DBServer")) | .' | jq -r ".[$dbserverindex].key")
|
||||
echo Registering secondary $CLUSTER_ID for $DBSERVER_ID
|
||||
curl -s -f -X PUT --data "{\"primary\": \"$DBSERVER_ID\", \"oldSecondary\": \"none\", \"newSecondary\": \"$CLUSTER_ID\"}" -H "Content-Type: application/json" localhost:$CO_BASE/_admin/cluster/replaceSecondary
|
||||
echo Starting Secondary $CLUSTER_ID on port $PORT
|
||||
${BUILD}/bin/arangod \
|
||||
-c none \
|
||||
|
|
Loading…
Reference in New Issue