1
0
Fork 0

multi-host agency in tests

This commit is contained in:
Kaveh Vahedipour 2016-09-09 14:46:54 +02:00
parent 64da7d0fc7
commit 16a35ee15a
8 changed files with 103 additions and 54 deletions

View File

@ -102,8 +102,8 @@ struct log_t {
};
static std::string const pubApiPrefix = "/api/agency/";
static std::string const privApiPrefix = "/api/agency_priv/";
static std::string const pubApiPrefix = "/_api/agency/";
static std::string const privApiPrefix = "/_api/agency_priv/";
/// @brief Private RPC return type
struct priv_rpc_ret_t {

View File

@ -356,15 +356,25 @@ query_t Agent::activate(query_t const& everything) {
auto ret = std::make_shared<Builder>();
ret->openObject();
Slice slice = everything->slice();
if (slice.isObject()) {
if (active()) {
ret->add("success", VPackValue(false));
} else {
MUTEX_LOCKER(mutexLocker, _ioLock);
Slice compact = slice.get("compact");
Slice logs = slice.get("logs");
if (!compact.isEmptyArray()) {
_readDB = compact.get("readDB");
}
_readDB = everything->slice().get("compact").get("readDB");
std::vector<Slice> batch;
for (auto const& q : VPackArrayIterator(everything->slice().get("logs"))) {
for (auto const& q : VPackArrayIterator(logs)) {
batch.push_back(q.get("request"));
}
_readDB.apply(batch);
@ -377,6 +387,13 @@ query_t Agent::activate(query_t const& everything) {
ret->add("commitId", VPackValue(_lastCommitIndex));
}
} else {
LOG_TOPIC(ERR, Logger::AGENCY)
<< "Activation failed. \"Everything\" must be an object, is however "
<< slice.typeName();
}
ret->close();
return ret;
@ -572,12 +589,37 @@ void Agent::run() {
void Agent::reportActivated(
std::string const& failed, std::string const& replacement, query_t state) {
_config.swapActiveMember(failed, replacement);
if (state->slice().get("success").getBoolean()) {
MUTEX_LOCKER(mutexLocker, _ioLock);
_confirmed.erase(failed);
auto commitIndex = state->slice().get("commitIndex").getNumericValue<index_t>();
auto commitIndex = state->slice().get("commitId").getNumericValue<index_t>();
_confirmed[replacement] = commitIndex;
_lastAcked[replacement] = system_clock::now();
_config.swapActiveMember(failed, replacement);
if (_activator->isRunning()) {
_activator->beginShutdown();
}
_activator.reset(nullptr);
}
// Agency configuration
auto agency = std::make_shared<Builder>();
agency->openArray();
agency->openArray();
agency->openObject();
agency->add(".agency", VPackValue(VPackValueType::Object));
agency->add("term", VPackValue(term()));
agency->add("id", VPackValue(id()));
agency->add("active", _config.activeToBuilder()->slice());
agency->add("pool", _config.poolToBuilder()->slice());
agency->close();
agency->close();
agency->close();
agency->close();
write(agency);
// Notify inactive pool
notifyInactive();
}
@ -593,11 +635,12 @@ void Agent::detectActiveAgentFailures() {
if (_config.poolSize() > _config.size()) {
std::vector<std::string> active = _config.active();
for (auto const& id : active) {
if (id != this->id()) {
auto ds = duration<double>(
system_clock::now() - _lastAcked.at(id)).count();
if (ds > 10.0) {
std::string repl = _config.nextAgentInLine();
LOG(WARN) << "Active agent " << id << " has failed. << "
LOG_TOPIC(DEBUG, Logger::AGENCY) << "Active agent " << id << " has failed. << "
<< repl << " will be promoted to active agency membership";
_activator =
std::unique_ptr<AgentActivator>(new AgentActivator(this, id, repl));
@ -605,6 +648,7 @@ void Agent::detectActiveAgentFailures() {
}
}
}
}
}
@ -686,7 +730,7 @@ bool Agent::lead() {
// Notify inactive pool members of configuration change()
void Agent::notifyInactive() const {
if (_config.poolSize() > _config.size()) {
size_t size = _config.size(), counter = 0;
std::map<std::string, std::string> pool = _config.pool();
std::string path = "/_api/agency_priv/inform";
@ -699,16 +743,19 @@ void Agent::notifyInactive() const {
out.close();
for (auto const& p : pool) {
++counter;
if (counter > size) {
if (p.first != id()) {
auto headerFields =
std::make_unique<std::unordered_map<std::string, std::string>>();
arangodb::ClusterComm::instance()->asyncRequest(
"1", 1, p.second, arangodb::rest::RequestType::POST,
path, std::make_shared<std::string>(out.toJson()), headerFields,
nullptr, 1.0, true);
}
}
}
}
@ -745,6 +792,7 @@ void Agent::notify(query_t const& message) {
_config.update(message);
_state.persistActiveAgents(_config.activeToBuilder(),
_config.poolToBuilder());
}
// Rebuild key value stores

View File

@ -63,7 +63,6 @@ void AgentActivator::run() {
// All snapshots and all logs
query_t allLogs = _agent->allLogs();
LOG(WARN) << allLogs->toJson();
auto headerFields =
std::make_unique<std::unordered_map<std::string, std::string>>();
@ -81,6 +80,7 @@ void AgentActivator::run() {
<< "Timed out while activating agent " << _replacement;
break;
}
}
}

View File

@ -113,8 +113,6 @@ bool FailedServer::start() {
pending.close();
pending.close();
LOG(WARN) << pending.toJson();
// Transact to agency
write_ret_t res = transact(_agent, pending);
@ -207,8 +205,6 @@ bool FailedServer::create() {
_jb->close();
_jb->close();
LOG(WARN) << _jb->toJson();
write_ret_t res = transact(_agent, *_jb);
if (res.accepted && res.indices.size() == 1 && res.indices[0]) {

View File

@ -149,7 +149,7 @@ RestHandler::status RestAgencyHandler::handleWrite() {
body.close();
generateResult(rest::ResponseCode::SERVICE_UNAVAILABLE,
body.slice());
LOG_TOPIC(ERR, Logger::AGENCY) << "We don't know who the leader is";
LOG_TOPIC(DEBUG, Logger::AGENCY) << "We don't know who the leader is";
return status::DONE;
}
std::this_thread::sleep_for(duration_t(100));
@ -225,7 +225,7 @@ RestHandler::status RestAgencyHandler::handleWrite() {
body.close();
generateResult(rest::ResponseCode::SERVICE_UNAVAILABLE,
body.slice());
LOG_TOPIC(ERR, Logger::AGENCY) << "We don't know who the leader is";
LOG_TOPIC(DEBUG, Logger::AGENCY) << "We don't know who the leader is";
return status::DONE;
} else {
@ -260,7 +260,7 @@ inline RestHandler::status RestAgencyHandler::handleRead() {
body.close();
generateResult(rest::ResponseCode::SERVICE_UNAVAILABLE,
body.slice());
LOG_TOPIC(ERR, Logger::AGENCY) << "We don't know who the leader is";
LOG_TOPIC(DEBUG, Logger::AGENCY) << "We don't know who the leader is";
return status::DONE;
}
std::this_thread::sleep_for(duration_t(100));
@ -283,7 +283,7 @@ inline RestHandler::status RestAgencyHandler::handleRead() {
body.close();
generateResult(rest::ResponseCode::SERVICE_UNAVAILABLE,
body.slice());
LOG_TOPIC(ERR, Logger::AGENCY) << "We don't know who the leader is";
LOG_TOPIC(DEBUG, Logger::AGENCY) << "We don't know who the leader is";
return status::DONE;
} else {

View File

@ -134,7 +134,6 @@ RestHandler::status RestAgencyPrivHandler::execute() {
if (_request->requestType() != rest::RequestType::POST) {
return reportMethodNotAllowed();
}
arangodb::velocypack::Options options;
query_t everything;
try {
@ -143,8 +142,14 @@ RestHandler::status RestAgencyPrivHandler::execute() {
LOG_TOPIC(ERR, Logger::AGENCY)
<< "Failure getting activation body: e.what()";
}
_agent->activate(everything);
try {
query_t res = _agent->activate(everything);
for (auto const& i : VPackObjectIterator(res->slice())) {
result.add(i.key.copyString(),i.value);
}
} catch (std::exception const& e) {
LOG_TOPIC(ERR, Logger::AGENCY) << "Activation failed: " << e.what();
}
} else if (_request->suffix()[0] == "gossip") {
if (_request->requestType() != rest::RequestType::POST) {

View File

@ -1301,7 +1301,7 @@ function startInstanceCluster (instanceInfo, protocol, options,
return [subArgs, subDir];
};
options.agencySize = 1;
// options.agencySize = 1
options.agencyWaitForSync = false;
startInstanceAgency(instanceInfo, protocol, options, ...makeArgs('agency', {}));
@ -1468,12 +1468,12 @@ function startInstanceAgency (instanceInfo, protocol, options,
fs.makeDirectoryRecursive(dir);
fs.makeDirectoryRecursive(instanceArgs['database.directory']);
instanceInfo.arangods.push(startArango(protocol, options, instanceArgs, rootDir, 'agent'));
}
instanceInfo.endpoint = instanceInfo.arangods[instanceInfo.arangods.length - 1].endpoint;
instanceInfo.url = instanceInfo.arangods[instanceInfo.arangods.length - 1].url;
instanceInfo.role = 'agent';
print('Agency Endpoint: ' + instanceInfo.endpoint);
}
return instanceInfo;
}

View File

@ -2,4 +2,4 @@
scripts/unittest shell_server --test js/common/tests/shell/shell-quickie.js
scripts/unittest shell_server --test js/common/tests/shell/shell-quickie.js --cluster true
scripts/unittest shell_client --test js/common/tests/shell/shell-quickie.js
scripts/unittest shell_client --test js/common/tests/shell/shell-quickie.js --cluster true
scripts/unittest shell_client --test js/common/tests/shell/shell-quickie.js --cluster true --agencySize 1