diff --git a/arangod/Agency/AgencyCommon.h b/arangod/Agency/AgencyCommon.h index e14a063dae..b44108cf15 100644 --- a/arangod/Agency/AgencyCommon.h +++ b/arangod/Agency/AgencyCommon.h @@ -102,8 +102,8 @@ struct log_t { }; -static std::string const pubApiPrefix = "/api/agency/"; -static std::string const privApiPrefix = "/api/agency_priv/"; +static std::string const pubApiPrefix = "/_api/agency/"; +static std::string const privApiPrefix = "/_api/agency_priv/"; /// @brief Private RPC return type struct priv_rpc_ret_t { diff --git a/arangod/Agency/Agent.cpp b/arangod/Agency/Agent.cpp index b6a693f6ae..7eddf15115 100644 --- a/arangod/Agency/Agent.cpp +++ b/arangod/Agency/Agent.cpp @@ -356,27 +356,44 @@ query_t Agent::activate(query_t const& everything) { auto ret = std::make_shared(); ret->openObject(); - if (active()) { - ret->add("success", VPackValue(false)); + Slice slice = everything->slice(); + + if (slice.isObject()) { + + if (active()) { + ret->add("success", VPackValue(false)); + } else { + + MUTEX_LOCKER(mutexLocker, _ioLock); + Slice compact = slice.get("compact"); + + Slice logs = slice.get("logs"); + + if (!compact.isEmptyArray()) { + _readDB = compact.get("readDB"); + } + + std::vector batch; + for (auto const& q : VPackArrayIterator(logs)) { + batch.push_back(q.get("request")); + } + _readDB.apply(batch); + _spearhead = _readDB; + + //_state.persistReadDB(everything->slice().get("compact").get("_key")); + //_state.log((everything->slice().get("logs")); + + ret->add("success", VPackValue(true)); + ret->add("commitId", VPackValue(_lastCommitIndex)); + } + } else { - MUTEX_LOCKER(mutexLocker, _ioLock); + LOG_TOPIC(ERR, Logger::AGENCY) + << "Activation failed. \"Everything\" must be an object, is however " + << slice.typeName(); - _readDB = everything->slice().get("compact").get("readDB"); - std::vector batch; - for (auto const& q : VPackArrayIterator(everything->slice().get("logs"))) { - batch.push_back(q.get("request")); - } - _readDB.apply(batch); - _spearhead = _readDB; - - //_state.persistReadDB(everything->slice().get("compact").get("_key")); - //_state.log((everything->slice().get("logs")); - - ret->add("success", VPackValue(true)); - ret->add("commitId", VPackValue(_lastCommitIndex)); } - ret->close(); return ret; @@ -572,12 +589,37 @@ void Agent::run() { void Agent::reportActivated( std::string const& failed, std::string const& replacement, query_t state) { - _config.swapActiveMember(failed, replacement); - MUTEX_LOCKER(mutexLocker, _ioLock); - _confirmed.erase(failed); - auto commitIndex = state->slice().get("commitIndex").getNumericValue(); - _confirmed[replacement] = commitIndex; - _lastAcked[replacement] = system_clock::now(); + if (state->slice().get("success").getBoolean()) { + MUTEX_LOCKER(mutexLocker, _ioLock); + _confirmed.erase(failed); + auto commitIndex = state->slice().get("commitId").getNumericValue(); + _confirmed[replacement] = commitIndex; + _lastAcked[replacement] = system_clock::now(); + _config.swapActiveMember(failed, replacement); + if (_activator->isRunning()) { + _activator->beginShutdown(); + } + _activator.reset(nullptr); + } + + // Agency configuration + auto agency = std::make_shared(); + agency->openArray(); + agency->openArray(); + agency->openObject(); + agency->add(".agency", VPackValue(VPackValueType::Object)); + agency->add("term", VPackValue(term())); + agency->add("id", VPackValue(id())); + agency->add("active", _config.activeToBuilder()->slice()); + agency->add("pool", _config.poolToBuilder()->slice()); + agency->close(); + agency->close(); + agency->close(); + agency->close(); + write(agency); + + // Notify inactive pool + notifyInactive(); } @@ -593,15 +635,17 @@ void Agent::detectActiveAgentFailures() { if (_config.poolSize() > _config.size()) { std::vector active = _config.active(); for (auto const& id : active) { - auto ds = duration( - system_clock::now() - _lastAcked.at(id)).count(); - if (ds > 10.0) { - std::string repl = _config.nextAgentInLine(); - LOG(WARN) << "Active agent " << id << " has failed. << " - << repl << " will be promoted to active agency membership"; - _activator = - std::unique_ptr(new AgentActivator(this, id, repl)); - _activator->start(); + if (id != this->id()) { + auto ds = duration( + system_clock::now() - _lastAcked.at(id)).count(); + if (ds > 10.0) { + std::string repl = _config.nextAgentInLine(); + LOG_TOPIC(DEBUG, Logger::AGENCY) << "Active agent " << id << " has failed. << " + << repl << " will be promoted to active agency membership"; + _activator = + std::unique_ptr(new AgentActivator(this, id, repl)); + _activator->start(); + } } } } @@ -686,7 +730,7 @@ bool Agent::lead() { // Notify inactive pool members of configuration change() void Agent::notifyInactive() const { if (_config.poolSize() > _config.size()) { - size_t size = _config.size(), counter = 0; + std::map pool = _config.pool(); std::string path = "/_api/agency_priv/inform"; @@ -699,16 +743,19 @@ void Agent::notifyInactive() const { out.close(); for (auto const& p : pool) { - ++counter; - if (counter > size) { + + if (p.first != id()) { auto headerFields = - std::make_unique>(); + std::make_unique>(); + arangodb::ClusterComm::instance()->asyncRequest( "1", 1, p.second, arangodb::rest::RequestType::POST, path, std::make_shared(out.toJson()), headerFields, nullptr, 1.0, true); } + } + } } @@ -745,6 +792,7 @@ void Agent::notify(query_t const& message) { _config.update(message); _state.persistActiveAgents(_config.activeToBuilder(), _config.poolToBuilder()); + } // Rebuild key value stores diff --git a/arangod/Agency/AgentActivator.cpp b/arangod/Agency/AgentActivator.cpp index f343a281ee..199a4da50d 100644 --- a/arangod/Agency/AgentActivator.cpp +++ b/arangod/Agency/AgentActivator.cpp @@ -63,7 +63,6 @@ void AgentActivator::run() { // All snapshots and all logs query_t allLogs = _agent->allLogs(); - LOG(WARN) << allLogs->toJson(); auto headerFields = std::make_unique>(); @@ -81,6 +80,7 @@ void AgentActivator::run() { << "Timed out while activating agent " << _replacement; break; } + } } diff --git a/arangod/Agency/FailedServer.cpp b/arangod/Agency/FailedServer.cpp index 05b75df6ca..d915e57ae0 100644 --- a/arangod/Agency/FailedServer.cpp +++ b/arangod/Agency/FailedServer.cpp @@ -113,8 +113,6 @@ bool FailedServer::start() { pending.close(); pending.close(); - LOG(WARN) << pending.toJson(); - // Transact to agency write_ret_t res = transact(_agent, pending); @@ -207,8 +205,6 @@ bool FailedServer::create() { _jb->close(); _jb->close(); - LOG(WARN) << _jb->toJson(); - write_ret_t res = transact(_agent, *_jb); if (res.accepted && res.indices.size() == 1 && res.indices[0]) { diff --git a/arangod/Agency/RestAgencyHandler.cpp b/arangod/Agency/RestAgencyHandler.cpp index 03ef24b149..e04c4c70d7 100644 --- a/arangod/Agency/RestAgencyHandler.cpp +++ b/arangod/Agency/RestAgencyHandler.cpp @@ -149,7 +149,7 @@ RestHandler::status RestAgencyHandler::handleWrite() { body.close(); generateResult(rest::ResponseCode::SERVICE_UNAVAILABLE, body.slice()); - LOG_TOPIC(ERR, Logger::AGENCY) << "We don't know who the leader is"; + LOG_TOPIC(DEBUG, Logger::AGENCY) << "We don't know who the leader is"; return status::DONE; } std::this_thread::sleep_for(duration_t(100)); @@ -225,7 +225,7 @@ RestHandler::status RestAgencyHandler::handleWrite() { body.close(); generateResult(rest::ResponseCode::SERVICE_UNAVAILABLE, body.slice()); - LOG_TOPIC(ERR, Logger::AGENCY) << "We don't know who the leader is"; + LOG_TOPIC(DEBUG, Logger::AGENCY) << "We don't know who the leader is"; return status::DONE; } else { @@ -260,7 +260,7 @@ inline RestHandler::status RestAgencyHandler::handleRead() { body.close(); generateResult(rest::ResponseCode::SERVICE_UNAVAILABLE, body.slice()); - LOG_TOPIC(ERR, Logger::AGENCY) << "We don't know who the leader is"; + LOG_TOPIC(DEBUG, Logger::AGENCY) << "We don't know who the leader is"; return status::DONE; } std::this_thread::sleep_for(duration_t(100)); @@ -283,7 +283,7 @@ inline RestHandler::status RestAgencyHandler::handleRead() { body.close(); generateResult(rest::ResponseCode::SERVICE_UNAVAILABLE, body.slice()); - LOG_TOPIC(ERR, Logger::AGENCY) << "We don't know who the leader is"; + LOG_TOPIC(DEBUG, Logger::AGENCY) << "We don't know who the leader is"; return status::DONE; } else { diff --git a/arangod/Agency/RestAgencyPrivHandler.cpp b/arangod/Agency/RestAgencyPrivHandler.cpp index b42fadc0f5..1f1ba57c5f 100644 --- a/arangod/Agency/RestAgencyPrivHandler.cpp +++ b/arangod/Agency/RestAgencyPrivHandler.cpp @@ -134,7 +134,6 @@ RestHandler::status RestAgencyPrivHandler::execute() { if (_request->requestType() != rest::RequestType::POST) { return reportMethodNotAllowed(); } - arangodb::velocypack::Options options; query_t everything; try { @@ -143,8 +142,14 @@ RestHandler::status RestAgencyPrivHandler::execute() { LOG_TOPIC(ERR, Logger::AGENCY) << "Failure getting activation body: e.what()"; } - - _agent->activate(everything); + try { + query_t res = _agent->activate(everything); + for (auto const& i : VPackObjectIterator(res->slice())) { + result.add(i.key.copyString(),i.value); + } + } catch (std::exception const& e) { + LOG_TOPIC(ERR, Logger::AGENCY) << "Activation failed: " << e.what(); + } } else if (_request->suffix()[0] == "gossip") { if (_request->requestType() != rest::RequestType::POST) { diff --git a/js/client/modules/@arangodb/testing.js b/js/client/modules/@arangodb/testing.js index b580b8f24f..b8c51473b2 100644 --- a/js/client/modules/@arangodb/testing.js +++ b/js/client/modules/@arangodb/testing.js @@ -1301,7 +1301,7 @@ function startInstanceCluster (instanceInfo, protocol, options, return [subArgs, subDir]; }; - options.agencySize = 1; +// options.agencySize = 1 options.agencyWaitForSync = false; startInstanceAgency(instanceInfo, protocol, options, ...makeArgs('agency', {})); @@ -1468,12 +1468,12 @@ function startInstanceAgency (instanceInfo, protocol, options, fs.makeDirectoryRecursive(dir); fs.makeDirectoryRecursive(instanceArgs['database.directory']); instanceInfo.arangods.push(startArango(protocol, options, instanceArgs, rootDir, 'agent')); - } - instanceInfo.endpoint = instanceInfo.arangods[instanceInfo.arangods.length - 1].endpoint; instanceInfo.url = instanceInfo.arangods[instanceInfo.arangods.length - 1].url; instanceInfo.role = 'agent'; print('Agency Endpoint: ' + instanceInfo.endpoint); + } + return instanceInfo; } diff --git a/scripts/quickieTest.sh b/scripts/quickieTest.sh index 0a31203e01..40c6b0aebc 100755 --- a/scripts/quickieTest.sh +++ b/scripts/quickieTest.sh @@ -2,4 +2,4 @@ scripts/unittest shell_server --test js/common/tests/shell/shell-quickie.js scripts/unittest shell_server --test js/common/tests/shell/shell-quickie.js --cluster true scripts/unittest shell_client --test js/common/tests/shell/shell-quickie.js -scripts/unittest shell_client --test js/common/tests/shell/shell-quickie.js --cluster true +scripts/unittest shell_client --test js/common/tests/shell/shell-quickie.js --cluster true --agencySize 1