1
0
Fork 0

Finish the role "SECONDARY" in a cluster. Configure replication automatically.

This commit is contained in:
Max Neunhoeffer 2015-08-12 10:54:30 +02:00
parent da73b7e214
commit b844ff3468
5 changed files with 126 additions and 3 deletions

View File

@ -444,8 +444,26 @@ bool ApplicationCluster::open () {
}
}
else if (role == ServerState::ROLE_SECONDARY) {
locker.unlock();
LOG_FATAL_AND_EXIT("secondary server tasks are currently not implemented");
std::string keyName = std::string("\"") + _myId + std::string("\"");
TRI_json_t* json = TRI_CreateStringCopyJson(TRI_UNKNOWN_MEM_ZONE,
keyName.c_str(), keyName.size());
if (json == nullptr) {
locker.unlock();
LOG_FATAL_AND_EXIT("out of memory");
}
ServerState::instance()->setState(ServerState::STATE_SYNCING);
// register server
AgencyCommResult result = comm.setValue("Current/DBServers/" +
ServerState::instance()->getPrimaryId(), json, 0.0);
TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json);
if (! result.successful()) {
locker.unlock();
LOG_FATAL_AND_EXIT("unable to register secondary db server in agency");
}
}
}

View File

@ -72,6 +72,7 @@ ServerState::ServerState ()
_authentication(),
_lock(),
_role(),
_idOfPrimary(""),
_state(STATE_UNDEFINED),
_initialized(false),
_clusterEnabled(false) {
@ -364,6 +365,15 @@ std::string ServerState::getId () {
return _id;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief get the server id
////////////////////////////////////////////////////////////////////////////////
std::string ServerState::getPrimaryId () {
READ_LOCKER(_lock);
return _idOfPrimary;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief set the server id
////////////////////////////////////////////////////////////////////////////////
@ -949,6 +959,7 @@ ServerState::RoleEnum ServerState::checkServersList (std::string const& id) {
if (name == id) {
role = ServerState::ROLE_SECONDARY;
_idOfPrimary = it->first;
break;
}

View File

@ -237,6 +237,12 @@ namespace triagens {
std::string getId ();
////////////////////////////////////////////////////////////////////////////////
/// @brief for a secondary get the server id of its primary
////////////////////////////////////////////////////////////////////////////////
std::string getPrimaryId ();
////////////////////////////////////////////////////////////////////////////////
/// @brief get the server description
////////////////////////////////////////////////////////////////////////////////
@ -566,6 +572,12 @@ namespace triagens {
std::atomic<int> _role;
////////////////////////////////////////////////////////////////////////////////
/// @brief a secondary stores the ID of its primary here:
////////////////////////////////////////////////////////////////////////////////
std::string _idOfPrimary;
////////////////////////////////////////////////////////////////////////////////
/// @brief the current state
////////////////////////////////////////////////////////////////////////////////

View File

@ -1201,6 +1201,23 @@ static void JS_IdServerState (const v8::FunctionCallbackInfo<v8::Value>& args) {
TRI_V8_TRY_CATCH_END
}
////////////////////////////////////////////////////////////////////////////////
/// @brief return the primary servers id (only for secondaries)
////////////////////////////////////////////////////////////////////////////////
static void JS_IdOfPrimaryServerState (const v8::FunctionCallbackInfo<v8::Value>& args) {
TRI_V8_TRY_CATCH_BEGIN(isolate);
v8::HandleScope scope(isolate);
if (args.Length() != 0) {
TRI_V8_THROW_EXCEPTION_USAGE("idOfPrimary()");
}
const std::string id = ServerState::instance()->getPrimaryId();
TRI_V8_RETURN_STD_STRING(id);
TRI_V8_TRY_CATCH_END
}
////////////////////////////////////////////////////////////////////////////////
/// @brief return the servers description
////////////////////////////////////////////////////////////////////////////////
@ -2167,6 +2184,7 @@ void TRI_InitV8Cluster (v8::Isolate* isolate, v8::Handle<v8::Context> context) {
TRI_AddMethodVocbase(isolate, rt, TRI_V8_ASCII_STRING("flush"), JS_FlushServerState, true);
TRI_AddMethodVocbase(isolate, rt, TRI_V8_ASCII_STRING("localInfo"), JS_LocalInfoServerState);
TRI_AddMethodVocbase(isolate, rt, TRI_V8_ASCII_STRING("id"), JS_IdServerState);
TRI_AddMethodVocbase(isolate, rt, TRI_V8_ASCII_STRING("idOfPrimary"), JS_IdOfPrimaryServerState);
TRI_AddMethodVocbase(isolate, rt, TRI_V8_ASCII_STRING("description"), JS_DescriptionServerState);
TRI_AddMethodVocbase(isolate, rt, TRI_V8_ASCII_STRING("dataPath"), JS_DataPathServerState);
TRI_AddMethodVocbase(isolate, rt, TRI_V8_ASCII_STRING("logPath"), JS_LogPathServerState);

View File

@ -1,3 +1,4 @@
/*global ArangoServerState, ArangoClusterInfo */
'use strict';
////////////////////////////////////////////////////////////////////////////////
@ -325,6 +326,23 @@ function dropLocalDatabases (plannedDatabases) {
// must drop database
console.info("dropping local database '%s'", name);
// Do we have to stop a replication applier first?
if (ArangoServerState.role() === "SECONDARY") {
try {
db._useDatabase(name);
var rep = require("org/arangodb/replication");
var state = rep.applier.state();
if (state.state.running === true) {
console.info("stopping replication applier first");
rep.applier.stop();
}
db._useDatabase("_system");
}
catch (err) {
db._useDatabase("_system");
}
}
db._dropDatabase(name);
writeLocked({ part: "Current" },
@ -773,13 +791,59 @@ function handleCollectionChanges (plan) {
cleanupCurrentCollections(plannedCollections);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief make sure that replication is set up for all databases
////////////////////////////////////////////////////////////////////////////////
function setupReplication () {
console.debug("Setting up replication...");
var db = require("internal").db;
var rep = require("org/arangodb/replication");
var dbs = db._listDatabases();
var i;
try {
for (i = 0; i < dbs.length; i++) {
var database = dbs[i];
console.debug("Checking replication of database "+database);
db._useDatabase(database);
var state = rep.applier.state();
if (state.state.running === false) {
var endpoint = ArangoClusterInfo.getServerEndpoint(
ArangoServerState.idOfPrimary());
var config = { "endpoint": endpoint, "includeSystem": false };
rep.applier.properties(config);
console.info("Starting synchronisation...");
var res = rep.sync(config);
console.info("Last log tick: "+res.lastLogTick+
", starting replication...");
var res2 = rep.applier.start(res.lastLogTick);
console.info("Result of replication start: "+res2);
}
}
}
catch (err) {
db._useDatabase("_system");
}
db._useDatabase("_system");
}
////////////////////////////////////////////////////////////////////////////////
/// @brief change handling trampoline function
////////////////////////////////////////////////////////////////////////////////
function handleChanges (plan, current) {
handleDatabaseChanges(plan, current);
handleCollectionChanges(plan, current);
var role = ArangoServerState.role();
if (role === "PRIMARY" || role === "COORDINATOR") {
// Note: This is only ever called for DBservers (primary and secondary),
// we keep the coordinator case here just in case...
handleCollectionChanges(plan, current);
}
else {
setupReplication();
}
}
////////////////////////////////////////////////////////////////////////////////