diff --git a/arangod/Agency/Agent.cpp b/arangod/Agency/Agent.cpp index 9b86468b17..2231a0fbbe 100644 --- a/arangod/Agency/Agent.cpp +++ b/arangod/Agency/Agent.cpp @@ -924,36 +924,66 @@ TimePoint const& Agent::leaderSince() const { // Notify inactive pool members of configuration change() void Agent::notifyInactive() const { - if (_config.poolSize() > _config.size()) { - std::map pool = _config.pool(); - std::string path = "/_api/agency_priv/inform"; + std::map pool = _config.pool(); + std::string path = "/_api/agency_priv/inform"; - Builder out; - out.openObject(); - out.add("term", VPackValue(term())); - out.add("id", VPackValue(id())); - out.add("active", _config.activeToBuilder()->slice()); - out.add("pool", _config.poolToBuilder()->slice()); - out.add("min ping", VPackValue(_config.minPing())); - out.add("max ping", VPackValue(_config.maxPing())); - out.close(); + Builder out; + out.openObject(); + out.add("term", VPackValue(term())); + out.add("id", VPackValue(id())); + out.add("active", _config.activeToBuilder()->slice()); + out.add("pool", _config.poolToBuilder()->slice()); + out.add("min ping", VPackValue(_config.minPing())); + out.add("max ping", VPackValue(_config.maxPing())); + out.close(); - for (auto const& p : pool) { + for (auto const& p : pool) { - if (p.first != id()) { - auto headerFields = - std::make_unique>(); + if (p.first != id()) { - arangodb::ClusterComm::instance()->asyncRequest( - "1", 1, p.second, arangodb::rest::RequestType::POST, - path, std::make_shared(out.toJson()), headerFields, - nullptr, 1.0, true); - } + auto headerFields = + std::make_unique>(); + arangodb::ClusterComm::instance()->asyncRequest( + "1", 1, p.second, arangodb::rest::RequestType::POST, + path, std::make_shared(out.toJson()), headerFields, + nullptr, 1.0, true); } } + +} + +void Agent::updatePeerEndpoint(query_t const& message) { + + VPackSlice slice = message->slice(); + + if (!slice.isObject() || slice.length() == 0) { + THROW_ARANGO_EXCEPTION_MESSAGE( + TRI_ERROR_AGENCY_INFORM_MUST_BE_OBJECT, + std::string("Inproper greeting: ") + slice.toJson()); + } + + std::string uuid, endpoint; + try { + uuid = slice.keyAt(0).copyString(); + } catch (std::exception const& e) { + THROW_ARANGO_EXCEPTION_MESSAGE( + TRI_ERROR_AGENCY_INFORM_MUST_BE_OBJECT, + std::string("Cannot deal with UUID: ") + e.what()); + } + + try { + endpoint = slice.valueAt(0).copyString(); + } catch (std::exception const& e) { + THROW_ARANGO_EXCEPTION_MESSAGE( + TRI_ERROR_AGENCY_INFORM_MUST_BE_OBJECT, + std::string("Cannot deal with UUID: ") + e.what()); + } + + _config.updateEndpoint(uuid, endpoint); + } void Agent::notify(query_t const& message) { @@ -989,6 +1019,7 @@ void Agent::notify(query_t const& message) { } _config.update(message); + _state.persistActiveAgents(_config.activeToBuilder(), _config.poolToBuilder()); } diff --git a/arangod/Agency/Agent.h b/arangod/Agency/Agent.h index 1cfec54b5e..186c0e3536 100644 --- a/arangod/Agency/Agent.h +++ b/arangod/Agency/Agent.h @@ -196,6 +196,8 @@ class Agent : public arangodb::Thread { /// @brief Get start time of leadership TimePoint const& leaderSince() const; + void updatePeerEndpoint(query_t const& message); + /// @brief State reads persisted state and prepares the agent friend class State; diff --git a/arangod/Agency/AgentConfiguration.cpp b/arangod/Agency/AgentConfiguration.cpp index 741c3ec2d0..dbc8a807ed 100644 --- a/arangod/Agency/AgentConfiguration.cpp +++ b/arangod/Agency/AgentConfiguration.cpp @@ -341,12 +341,27 @@ query_t config_t::poolToBuilder() const { return ret; } + +void config_t::updateEndpoint(std::string const& id, std::string const& ep) { + WRITE_LOCKER(readLocker, _lock); + if (_pool[id] != ep) { + _pool[id] = ep; + ++_version; + } +} + + void config_t::update(query_t const& message) { VPackSlice slice = message->slice(); std::map pool; bool changed = false; for (auto const& p : VPackObjectIterator(slice.get(poolStr))) { - pool[p.key.copyString()] = p.value.copyString(); + auto const& id = p.key.copyString(); + if (id != _id) { + pool[id] = p.value.copyString(); + } else { + pool[id] = _endpoint; + } } std::vector active; for (auto const& a : VPackArrayIterator(slice.get(activeStr))) { @@ -522,7 +537,6 @@ bool config_t::merge(VPackSlice const& conf) { WRITE_LOCKER(writeLocker, _lock); // All must happen under the lock or else ... _id = conf.get(idStr).copyString(); // I get my id - _pool[_id] = _endpoint; // Register my endpoint with it _startup = "persistence"; std::stringstream ss; @@ -562,7 +576,12 @@ bool config_t::merge(VPackSlice const& conf) { if (conf.hasKey(poolStr)) { // Persistence only LOG_TOPIC(DEBUG, Logger::AGENCY) << "Found agent pool in persistence:"; for (auto const& peer : VPackObjectIterator(conf.get(poolStr))) { - _pool[peer.key.copyString()] = peer.value.copyString(); + auto const& id = peer.key.copyString(); + if (id != _id) { + _pool[id] = peer.value.copyString(); + } else { + _pool[id] = _endpoint; + } } ss << conf.get(poolStr).toJson() << " (persisted)"; } else { diff --git a/arangod/Agency/AgentConfiguration.h b/arangod/Agency/AgentConfiguration.h index 8cef2c5c83..c9173528f0 100644 --- a/arangod/Agency/AgentConfiguration.h +++ b/arangod/Agency/AgentConfiguration.h @@ -193,6 +193,9 @@ struct config_t { /// @brief std::string startup() const; + + /// @brief Update an indivdual uuid's endpoint + void updateEndpoint(std::string const&, std::string const&); }; diff --git a/arangod/Agency/Inception.cpp b/arangod/Agency/Inception.cpp index 293d37fee2..e35b2d8f5c 100644 --- a/arangod/Agency/Inception.cpp +++ b/arangod/Agency/Inception.cpp @@ -161,14 +161,22 @@ bool Inception::restartingActiveAgent() { using namespace std::chrono; - auto const path = pubApiPrefix + "config"; - auto const myConfig = _agent->config(); - auto const startTime = system_clock::now(); - auto pool = myConfig.pool(); - auto active = myConfig.active(); - auto const& clientId = myConfig.id(); + auto const path = pubApiPrefix + "config"; + auto const myConfig = _agent->config(); + auto const startTime = system_clock::now(); + auto pool = myConfig.pool(); + auto active = myConfig.active(); + auto const& clientId = myConfig.id(); + auto const& clientEp = myConfig.endpoint(); auto const majority = (myConfig.size()+1)/2; + Builder greeting; + { + VPackObjectBuilder b(&greeting); + greeting.add(clientId, VPackValue(clientEp)); + } + auto const& greetstr = greeting.toJson(); + seconds const timeout(3600); CONDITION_LOCKER(guard, _cv); @@ -195,7 +203,7 @@ bool Inception::restartingActiveAgent() { if (p.first != myConfig.id() && p.first != "") { auto comres = arangodb::ClusterComm::instance()->syncRequest( - clientId, 1, p.second, rest::RequestType::GET, path, std::string(), + clientId, 1, p.second, rest::RequestType::POST, path, greetstr, std::unordered_map(), 2.0); if (comres->status == CL_COMM_SENT) { diff --git a/arangod/Agency/RestAgencyHandler.cpp b/arangod/Agency/RestAgencyHandler.cpp index e7bd618582..1309d81d3a 100644 --- a/arangod/Agency/RestAgencyHandler.cpp +++ b/arangod/Agency/RestAgencyHandler.cpp @@ -419,6 +419,16 @@ inline RestStatus RestAgencyHandler::handleRead() { } RestStatus RestAgencyHandler::handleConfig() { + if (_request->requestType() == rest::RequestType::POST) { + try { + arangodb::velocypack::Options options; + _agent->updatePeerEndpoint(_request->toVelocyPackBuilderPtr(&options)); + } catch (std::exception const& e) { + generateError( + rest::ResponseCode::SERVER_ERROR, TRI_ERROR_INTERNAL, e.what()); + return RestStatus::DONE; + } + } Builder body; body.add(VPackValue(VPackValueType::Object)); body.add("term", Value(_agent->term())); @@ -466,7 +476,8 @@ RestStatus RestAgencyHandler::execute() { } else if (suffixes[0] == "transact") { return handleTransact(); } else if (suffixes[0] == "config") { - if (_request->requestType() != rest::RequestType::GET) { + if (_request->requestType() != rest::RequestType::GET && + _request->requestType() != rest::RequestType::POST) { return reportMethodNotAllowed(); } return handleConfig(); diff --git a/arangod/Cluster/v8-cluster.cpp b/arangod/Cluster/v8-cluster.cpp index 8b258170dc..3d70a19c84 100644 --- a/arangod/Cluster/v8-cluster.cpp +++ b/arangod/Cluster/v8-cluster.cpp @@ -383,6 +383,9 @@ static void JS_SetAgency(v8::FunctionCallbackInfo const& args) { AgencyComm comm; AgencyCommResult result = comm.setValue(key, builder.slice(), ttl); + LOG(WARN) << key; + LOG(WARN) << builder.slice().toJson(); + if (!result.successful()) { THROW_AGENCY_EXCEPTION(result); } diff --git a/js/server/modules/@arangodb/cluster.js b/js/server/modules/@arangodb/cluster.js index 2a2fb37c7f..7a671fb598 100644 --- a/js/server/modules/@arangodb/cluster.js +++ b/js/server/modules/@arangodb/cluster.js @@ -40,8 +40,8 @@ var _ = require('lodash'); const inccv = {'/arango/Current/Version':{'op':'increment'}}; const incpv = {'/arango/Plan/Version':{'op':'increment'}}; -const agencyDBs = global.ArangoAgency.prefix() + '/Current/Databases/'; -const agencyCols = global.ArangoAgency.prefix() + '/Current/Collections/'; +const agencyDBs = '/' + global.ArangoAgency.prefix() + '/Current/Databases/'; +const agencyCols = '/' + global.ArangoAgency.prefix() + '/Current/Collections/'; var endpointToURL = function (endpoint) { if (endpoint.substr(0, 6) === 'ssl://') { @@ -311,7 +311,7 @@ function createLocalDatabases (plannedDatabases, currentDatabases) { var createDatabaseAgency = function (payload) { var envelope = {}; envelope[agencyDBs + payload.name + '/' + ourselves] = payload; - global.ArangoAgency.write([[envelope, inccv]]); + global.ArangoAgency.write([[envelope],[inccv]]); }; var db = require('internal').db; @@ -370,7 +370,7 @@ function dropLocalDatabases (plannedDatabases) { try { var envelope = {}; envelope[agencyDBs + payload.name + '/' + ourselves] = {"op":"delete"}; - global.ArangoAgency.write([[envelope, inccv]]); + global.ArangoAgency.write([[envelope],[inccv]]); } catch (err) { // ignore errors } @@ -425,7 +425,7 @@ function cleanupCurrentDatabases (currentDatabases) { try { var envelope = {}; envelope[agencyDBs + payload.name + '/' + ourselves] = {"op":"delete"}; - global.ArangoAgency.write([[envelope, inccv]]); + global.ArangoAgency.write([[envelope],[inccv]]); } catch (err) { // ignore errors } @@ -470,34 +470,38 @@ function handleDatabaseChanges (plan, current) { // / @brief create collections if they exist in the plan but not locally // ////////////////////////////////////////////////////////////////////////////// -function createLocalCollections (plannedCollections, planVersion, - currentCollections, - takeOverResponsibility) { +function createLocalCollections ( + plannedCollections, planVersion, currentCollections, takeOverResponsibility) { + var ourselves = global.ArangoServerState.id(); - var createCollectionAgency = function (database, shard, collInfo, error) { - var payload = { error: error.error, - errorNum: error.errorNum, - errorMessage: error.errorMessage, + var createCollectionAgency = function (database, shard, collInfo, err) { + + var payload = { + error: err.error, + errorNum: err.errorNum, + errorMessage: err.errorMessage, satellite: collInfo.replicationFactor === 0, indexes: collInfo.indexes, servers: [ ourselves ], - planVersion: planVersion }; + planVersion: planVersion }; console.debug('creating Current/Collections/' + database + '/' + collInfo.planId + '/' + shard); - global.ArangoAgency.set('Current/Collections/' + database + '/' + collInfo.planId + '/' + shard, payload); - global.ArangoAgency.increaseVersion('Current/Version'); - global.ArangoAgency.increaseVersion('Plan/Version'); var envelope = {}; envelope[agencyCols + database + '/' + collInfo.planId + '/' + shard] = payload; - //global.ArangoAgency.write([[envelope, inccv]]); + console.info(envelope); + global.ArangoAgency.set('Current/Collections/' + database + '/' + + collInfo.planId + '/' + shard, payload) + global.ArangoAgency.write([[inccv]]); console.debug('creating Current/Collections/' + database + '/' + collInfo.planId + '/' + shard + ' done.'); }; + var takeOver = createCollectionAgency; + var db = require('internal').db; db._useDatabase('_system'); @@ -505,14 +509,14 @@ function createLocalCollections (plannedCollections, planVersion, var localDatabases = getLocalDatabases(); var database; var i; - + // iterate over all matching databases for (database in plannedCollections) { if (plannedCollections.hasOwnProperty(database)) { if (localDatabases.hasOwnProperty(database)) { // switch into other database db._useDatabase(database); - + try { // iterate over collections of database var localCollections = getLocalCollections(); @@ -635,7 +639,7 @@ function createLocalCollections (plannedCollections, planVersion, if (error.error) { if (takeOverResponsibility && !didWrite) { if (isLeader) { - createCollectionAgency(database, shard, collInfo, error); + takeOver(database, shard, collInfo, error); } } continue; // No point to look for properties and @@ -718,7 +722,7 @@ function createLocalCollections (plannedCollections, planVersion, if ((takeOverResponsibility && !didWrite && isLeader) || (!didWrite && isLeader && !wasLeader)) { - createCollectionAgency(database, shard, collInfo, error); + takeOver(database, shard, collInfo, error); } } } @@ -778,7 +782,7 @@ function dropLocalCollections (plannedCollections, currentCollections) { id + '/' + shardID); var envelope = {}; envelope[agencyCols + database + '/' + id + '/' + shardID] = {"op":"delete"}; - global.ArangoAgency.write([[envelope, inccv]]); + global.ArangoAgency.write([[envelope],[inccv]]); console.debug('dropping Current/Collections/' + database + '/' + id + '/' + shardID + ' done.'); } catch (err) { @@ -888,7 +892,7 @@ function cleanupCurrentCollections (plannedCollections, currentCollections) { var envelope = {}; envelope[agencyCols + database + '/' + collection + '/' + shardID] = {"op": "delete"}; - global.ArangoAgency.write([[envelope, inccv]]); + global.ArangoAgency.write([[envelope],[inccv]]); console.debug('cleaning Current/Collections/' + database + '/' + collection + '/' + shardID + ' done.'); diff --git a/scripts/startStandAloneAgency.sh b/scripts/startStandAloneAgency.sh index c94381870a..eccbf93673 100755 --- a/scripts/startStandAloneAgency.sh +++ b/scripts/startStandAloneAgency.sh @@ -158,7 +158,7 @@ if [ "$GOSSIP_MODE" = "0" ]; then GOSSIP_PEERS=" --agency.endpoint $TRANSPORT://localhost:$BASE" fi -rm -rf agency +#rm -rf agency mkdir -p agency PIDS=""