diff --git a/arangod/Cluster/ApplicationCluster.cpp b/arangod/Cluster/ApplicationCluster.cpp index 1bd2d60aa6..35dbea5743 100644 --- a/arangod/Cluster/ApplicationCluster.cpp +++ b/arangod/Cluster/ApplicationCluster.cpp @@ -444,8 +444,26 @@ bool ApplicationCluster::open () { } } else if (role == ServerState::ROLE_SECONDARY) { - locker.unlock(); - LOG_FATAL_AND_EXIT("secondary server tasks are currently not implemented"); + std::string keyName = std::string("\"") + _myId + std::string("\""); + TRI_json_t* json = TRI_CreateStringCopyJson(TRI_UNKNOWN_MEM_ZONE, + keyName.c_str(), keyName.size()); + + if (json == nullptr) { + locker.unlock(); + LOG_FATAL_AND_EXIT("out of memory"); + } + + ServerState::instance()->setState(ServerState::STATE_SYNCING); + + // register server + AgencyCommResult result = comm.setValue("Current/DBServers/" + + ServerState::instance()->getPrimaryId(), json, 0.0); + TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json); + + if (! result.successful()) { + locker.unlock(); + LOG_FATAL_AND_EXIT("unable to register secondary db server in agency"); + } } } diff --git a/arangod/Cluster/ServerState.cpp b/arangod/Cluster/ServerState.cpp index b896ecd1ec..f81e9adda1 100644 --- a/arangod/Cluster/ServerState.cpp +++ b/arangod/Cluster/ServerState.cpp @@ -72,6 +72,7 @@ ServerState::ServerState () _authentication(), _lock(), _role(), + _idOfPrimary(""), _state(STATE_UNDEFINED), _initialized(false), _clusterEnabled(false) { @@ -364,6 +365,15 @@ std::string ServerState::getId () { return _id; } +//////////////////////////////////////////////////////////////////////////////// +/// @brief get the server id +//////////////////////////////////////////////////////////////////////////////// + +std::string ServerState::getPrimaryId () { + READ_LOCKER(_lock); + return _idOfPrimary; +} + //////////////////////////////////////////////////////////////////////////////// /// @brief set the server id //////////////////////////////////////////////////////////////////////////////// @@ -949,6 +959,7 @@ ServerState::RoleEnum ServerState::checkServersList (std::string const& id) { if (name == id) { role = ServerState::ROLE_SECONDARY; + _idOfPrimary = it->first; break; } diff --git a/arangod/Cluster/ServerState.h b/arangod/Cluster/ServerState.h index e0fae55c2a..a4f1aef692 100644 --- a/arangod/Cluster/ServerState.h +++ b/arangod/Cluster/ServerState.h @@ -237,6 +237,12 @@ namespace triagens { std::string getId (); +//////////////////////////////////////////////////////////////////////////////// +/// @brief for a secondary get the server id of its primary +//////////////////////////////////////////////////////////////////////////////// + + std::string getPrimaryId (); + //////////////////////////////////////////////////////////////////////////////// /// @brief get the server description //////////////////////////////////////////////////////////////////////////////// @@ -566,6 +572,12 @@ namespace triagens { std::atomic _role; +//////////////////////////////////////////////////////////////////////////////// +/// @brief a secondary stores the ID of its primary here: +//////////////////////////////////////////////////////////////////////////////// + + std::string _idOfPrimary; + //////////////////////////////////////////////////////////////////////////////// /// @brief the current state //////////////////////////////////////////////////////////////////////////////// diff --git a/arangod/Cluster/v8-cluster.cpp b/arangod/Cluster/v8-cluster.cpp index d7754a343a..bbe75c9d44 100644 --- a/arangod/Cluster/v8-cluster.cpp +++ b/arangod/Cluster/v8-cluster.cpp @@ -1201,6 +1201,23 @@ static void JS_IdServerState (const v8::FunctionCallbackInfo& args) { TRI_V8_TRY_CATCH_END } +//////////////////////////////////////////////////////////////////////////////// +/// @brief return the primary servers id (only for secondaries) +//////////////////////////////////////////////////////////////////////////////// + +static void JS_IdOfPrimaryServerState (const v8::FunctionCallbackInfo& args) { + TRI_V8_TRY_CATCH_BEGIN(isolate); + v8::HandleScope scope(isolate); + + if (args.Length() != 0) { + TRI_V8_THROW_EXCEPTION_USAGE("idOfPrimary()"); + } + + const std::string id = ServerState::instance()->getPrimaryId(); + TRI_V8_RETURN_STD_STRING(id); + TRI_V8_TRY_CATCH_END +} + //////////////////////////////////////////////////////////////////////////////// /// @brief return the servers description //////////////////////////////////////////////////////////////////////////////// @@ -2167,6 +2184,7 @@ void TRI_InitV8Cluster (v8::Isolate* isolate, v8::Handle context) { TRI_AddMethodVocbase(isolate, rt, TRI_V8_ASCII_STRING("flush"), JS_FlushServerState, true); TRI_AddMethodVocbase(isolate, rt, TRI_V8_ASCII_STRING("localInfo"), JS_LocalInfoServerState); TRI_AddMethodVocbase(isolate, rt, TRI_V8_ASCII_STRING("id"), JS_IdServerState); + TRI_AddMethodVocbase(isolate, rt, TRI_V8_ASCII_STRING("idOfPrimary"), JS_IdOfPrimaryServerState); TRI_AddMethodVocbase(isolate, rt, TRI_V8_ASCII_STRING("description"), JS_DescriptionServerState); TRI_AddMethodVocbase(isolate, rt, TRI_V8_ASCII_STRING("dataPath"), JS_DataPathServerState); TRI_AddMethodVocbase(isolate, rt, TRI_V8_ASCII_STRING("logPath"), JS_LogPathServerState); diff --git a/js/server/modules/org/arangodb/cluster.js b/js/server/modules/org/arangodb/cluster.js index a0ea61a809..fe7d3baef2 100644 --- a/js/server/modules/org/arangodb/cluster.js +++ b/js/server/modules/org/arangodb/cluster.js @@ -1,3 +1,4 @@ +/*global ArangoServerState, ArangoClusterInfo */ 'use strict'; //////////////////////////////////////////////////////////////////////////////// @@ -325,6 +326,23 @@ function dropLocalDatabases (plannedDatabases) { // must drop database console.info("dropping local database '%s'", name); + + // Do we have to stop a replication applier first? + if (ArangoServerState.role() === "SECONDARY") { + try { + db._useDatabase(name); + var rep = require("org/arangodb/replication"); + var state = rep.applier.state(); + if (state.state.running === true) { + console.info("stopping replication applier first"); + rep.applier.stop(); + } + db._useDatabase("_system"); + } + catch (err) { + db._useDatabase("_system"); + } + } db._dropDatabase(name); writeLocked({ part: "Current" }, @@ -773,13 +791,59 @@ function handleCollectionChanges (plan) { cleanupCurrentCollections(plannedCollections); } +//////////////////////////////////////////////////////////////////////////////// +/// @brief make sure that replication is set up for all databases +//////////////////////////////////////////////////////////////////////////////// + +function setupReplication () { + console.debug("Setting up replication..."); + + var db = require("internal").db; + var rep = require("org/arangodb/replication"); + var dbs = db._listDatabases(); + var i; + try { + for (i = 0; i < dbs.length; i++) { + var database = dbs[i]; + console.debug("Checking replication of database "+database); + db._useDatabase(database); + + var state = rep.applier.state(); + if (state.state.running === false) { + var endpoint = ArangoClusterInfo.getServerEndpoint( + ArangoServerState.idOfPrimary()); + var config = { "endpoint": endpoint, "includeSystem": false }; + rep.applier.properties(config); + console.info("Starting synchronisation..."); + var res = rep.sync(config); + console.info("Last log tick: "+res.lastLogTick+ + ", starting replication..."); + var res2 = rep.applier.start(res.lastLogTick); + console.info("Result of replication start: "+res2); + } + } + } + catch (err) { + db._useDatabase("_system"); + } + db._useDatabase("_system"); +} + //////////////////////////////////////////////////////////////////////////////// /// @brief change handling trampoline function //////////////////////////////////////////////////////////////////////////////// function handleChanges (plan, current) { handleDatabaseChanges(plan, current); - handleCollectionChanges(plan, current); + var role = ArangoServerState.role(); + if (role === "PRIMARY" || role === "COORDINATOR") { + // Note: This is only ever called for DBservers (primary and secondary), + // we keep the coordinator case here just in case... + handleCollectionChanges(plan, current); + } + else { + setupReplication(); + } } ////////////////////////////////////////////////////////////////////////////////