1
0
Fork 0

Merge branch 'devel' of https://github.com/arangodb/arangodb into generic-col-types

This commit is contained in:
jsteemann 2016-09-08 12:53:36 +02:00
commit c488375e36
14 changed files with 287 additions and 69 deletions

View File

@ -28,23 +28,26 @@
using namespace arangodb::consensus;
using namespace arangodb::velocypack;
ActivationCallback::ActivationCallback() : _agent(0), _last(0) {}
ActivationCallback::ActivationCallback() : _agent(nullptr){}
ActivationCallback::ActivationCallback(Agent* agent, std::string const& slaveID,
index_t last)
: _agent(agent), _last(last), _slaveID(slaveID) {}
ActivationCallback::ActivationCallback(
Agent* agent, std::string const& failed, std::string const& replacement)
: _agent(agent),
_failed(failed),
_replacement(replacement) {}
void ActivationCallback::shutdown() { _agent = 0; }
void ActivationCallback::shutdown() { _agent = nullptr; }
bool ActivationCallback::operator()(arangodb::ClusterCommResult* res) {
if (res->status == CL_COMM_SENT) {
if (_agent) {
_agent->reportIn(_slaveID, _last);
auto v = res->result->getBodyVelocyPack();
_agent->reportActivated(_failed, _replacement, v);
}
} else {
LOG_TOPIC(DEBUG, Logger::AGENCY) << "comm_status(" << res->status
<< "), last(" << _last << "), follower("
<< _slaveID << ")";
LOG_TOPIC(DEBUG, Logger::AGENCY)
<< "activation_comm_status(" << res->status << "), replacement("
<< _replacement << ")";
}
return true;
}

View File

@ -36,7 +36,7 @@ class ActivationCallback : public arangodb::ClusterCommCallback {
public:
ActivationCallback();
ActivationCallback(Agent*, std::string const&, index_t);
ActivationCallback(Agent*, std::string const&, std::string const&);
virtual bool operator()(arangodb::ClusterCommResult*) override final;
@ -44,8 +44,8 @@ class ActivationCallback : public arangodb::ClusterCommCallback {
private:
Agent* _agent;
index_t _last;
std::string _slaveID;
std::string _failed;
std::string _replacement;
};
}
} // namespace

View File

@ -36,6 +36,7 @@
using namespace arangodb::application_features;
using namespace arangodb::velocypack;
using namespace std::chrono;
namespace arangodb {
namespace consensus {
@ -49,7 +50,8 @@ Agent::Agent(config_t const& config)
_readDB(this),
_serveActiveAgent(false),
_nextCompationAfter(_config.compactionStepSize()),
_inception(std::make_unique<Inception>(this)) {
_inception(std::make_unique<Inception>(this)),
_activator(nullptr) {
_state.configure(this);
_constituent.configure(this);
}
@ -100,6 +102,11 @@ bool Agent::start() {
return true;
}
/// Get all logs from state machine
query_t Agent::allLogs() const {
return _state.allLogs();
}
/// This agent's term
term_t Agent::term() const { return _constituent.term(); }
@ -126,9 +133,6 @@ std::string Agent::leaderID() const { return _constituent.leaderID(); }
/// Are we leading?
bool Agent::leading() const { return _constituent.leading(); }
/// Activate a standby agent
bool Agent::activateStandbyAgent() { return true; }
/// Start constituent personality
void Agent::startConstituent() {
activateAgency();
@ -169,7 +173,7 @@ bool Agent::waitFor(index_t index, double timeout) {
void Agent::reportIn(std::string const& id, index_t index) {
MUTEX_LOCKER(mutexLocker, _ioLock);
_lastAcked[id] = std::chrono::system_clock::now();
_lastAcked[id] = system_clock::now();
if (index > _confirmed[id]) { // progress this follower?
_confirmed[id] = index;
@ -279,8 +283,8 @@ void Agent::sendAppendEntriesRPC() {
std::vector<log_t> unconfirmed = _state.get(last_confirmed);
index_t highest = unconfirmed.back().index;
std::chrono::duration<double> m =
std::chrono::system_clock::now() - _lastSent[followerId];
duration<double> m =
system_clock::now() - _lastSent[followerId];
if (highest == _lastHighest[followerId]
&& 0.5 * _config.minPing() > m.count()) {
@ -328,7 +332,7 @@ void Agent::sendAppendEntriesRPC() {
{
MUTEX_LOCKER(mutexLocker, _ioLock);
_lastSent[followerId] = std::chrono::system_clock::now();
_lastSent[followerId] = system_clock::now();
_lastHighest[followerId] = highest;
}
}
@ -392,16 +396,21 @@ bool Agent::load() {
reportIn(id(), _state.lastLog().index);
LOG_TOPIC(DEBUG, Logger::AGENCY) << "Starting spearhead worker.";
_spearhead.start();
_readDB.start();
if (!this->isStopping()) {
_spearhead.start();
_readDB.start();
}
TRI_ASSERT(queryRegistry != nullptr);
if (size() == 1) {
activateAgency();
}
_constituent.start(vocbase, queryRegistry);
if (_config.supervision()) {
if (!this->isStopping()) {
_constituent.start(vocbase, queryRegistry);
}
if (!this->isStopping() && _config.supervision()) {
LOG_TOPIC(DEBUG, Logger::AGENCY) << "Starting cluster sanity facilities";
_supervision.start(this);
}
@ -414,8 +423,8 @@ bool Agent::challengeLeadership() {
// Still leading?
size_t good = 0;
for (auto const& i : _lastAcked) {
std::chrono::duration<double> m =
std::chrono::system_clock::now() - i.second;
duration<double> m =
system_clock::now() - i.second;
if (0.9 * _config.minPing() > m.count()) {
++good;
}
@ -478,24 +487,91 @@ read_ret_t Agent::read(query_t const& query) {
return read_ret_t(true, _constituent.leaderID(), success, result);
}
/// Send out append entries to followers regularly or on event
void Agent::run() {
CONDITION_LOCKER(guard, _appendCV);
using namespace std::chrono;
auto tp = system_clock::now();
// Only run in case we are in multi-host mode
while (!this->isStopping() && size() > 1) {
// Leader working only
if (leading()) {
_appendCV.wait(1000);
} else {
_appendCV.wait();
}
// Append entries to followers
sendAppendEntriesRPC();
// Append entries to followers
sendAppendEntriesRPC();
// Detect faulty agent and replace
// if possible and only if not already activating
if (_activator == nullptr &&
duration<double>(system_clock::now() - tp).count() > 5.0) {
detectActiveAgentFailures();
tp = system_clock::now();
}
} else {
_appendCV.wait(1000000);
updateConfiguration();
}
}
}
void Agent::reportActivated(
std::string const& failed, std::string const& replacement, query_t state) {
_config.swapActiveMember(failed, replacement);
MUTEX_LOCKER(mutexLocker, _ioLock);
_confirmed.erase(failed);
auto commitIndex = state->slice().get("commitIndex").getNumericValue<index_t>();
_confirmed[replacement] = commitIndex;
_lastAcked[replacement] = system_clock::now();
}
void Agent::failedActivation(
std::string const& failed, std::string const& replacement) {
_activator.reset(nullptr);
}
void Agent::detectActiveAgentFailures() {
// Detect faulty agent if pool larger than agency
if (_config.poolSize() > _config.size()) {
std::vector<std::string> active = _config.active();
for (auto const& id : active) {
auto ds = duration<double>(
system_clock::now() - _lastAcked.at(id)).count();
if (ds > 10.0) {
std::string repl = _config.nextAgentInLine();
LOG(WARN) << "Active agent " << id << " has failed. << "
<< repl << " will be promoted to active agency membership";
MUTEX_LOCKER(mutexLocker, _ioLock);
_activator =
std::unique_ptr<AgentActivator>(new AgentActivator(this, id, repl));
}
}
}
}
void Agent::updateConfiguration() {
// First ask last know leader
}
/// Orderly shutdown
void Agent::beginShutdown() {
Thread::beginShutdown();
@ -536,7 +612,7 @@ bool Agent::lead() {
}
for (auto const& i : _config.active()) {
_lastAcked[i] = std::chrono::system_clock::now();
_lastAcked[i] = system_clock::now();
}
// Agency configuration

View File

@ -115,11 +115,14 @@ class Agent : public arangodb::Thread {
/// @brief Persisted agents
bool persistedAgents();
/// @brief Gossip in
bool activeAgency();
/// @brief Activate new agent in pool to replace failed
void reportActivated(std::string const&, std::string const&, query_t);
/// @brief Activate new agent in pool to replace failed
void failedActivation(std::string const&, std::string const&);
/// @brief Gossip in
bool activeStandbyAgent();
bool activeAgency();
/// @brief Start orderly shutdown of threads
void beginShutdown() override final;
@ -157,10 +160,20 @@ class Agent : public arangodb::Thread {
/// @brief Get notification as inactve pool member
void notify(query_t const&);
/// @brief Detect active agent failures
void detectActiveAgentFailures();
/// @brief All there is in the state machine
query_t allLogs() const;
/// @brief State reads persisted state and prepares the agent
friend class State;
private:
/// @brief Update my configuration as passive agent
void updateConfiguration();
/// @brief Find out, if we've had acknowledged RPCs recent enough
bool challengeLeadership();
@ -170,9 +183,6 @@ class Agent : public arangodb::Thread {
/// @brief Activate this agent in single agent mode.
bool activateAgency();
/// @brief Activate new agent in pool to replace failed
bool activateStandbyAgent();
/// @brief Assignment of persisted state
Agent& operator=(VPackSlice const&);

View File

@ -31,38 +31,60 @@
#include <thread>
using namespace arangodb::consensus;
using namespace std::chrono;
AgentActivator::AgentActivator() : Thread("AgentActivator"), _agent(nullptr) {}
AgentActivator::AgentActivator(Agent* agent, std::string const& peerId)
: Thread("AgentActivator"), _agent(agent), _peerId(peerId) {}
AgentActivator::AgentActivator(Agent* agent, std::string const& failed,
std::string const& replacement)
: Thread("AgentActivator"),
_agent(agent),
_failed(failed),
_replacement(replacement) {}
// Shutdown if not already
AgentActivator::~AgentActivator() { shutdown(); }
AgentActivator::~AgentActivator() {
LOG_TOPIC(DEBUG, Logger::AGENCY) << "Done activating " << _replacement;
shutdown();
}
void AgentActivator::run() {
LOG_TOPIC(DEBUG, Logger::AGENCY) << "Starting activation of " << _peerId;
LOG_TOPIC(DEBUG, Logger::AGENCY) << "Starting activation of " << _replacement;
std::string const path = privApiPrefix + "activate";
auto const started = system_clock::now();
auto timeout = seconds(60);
auto const& endpoint = _agent->config().pool().at(_replacement);
CONDITION_LOCKER(guard, _cv);
while (!this->isStopping()) {
auto const& pool = _agent->config().pool();
Builder builder;
size_t highest = 0;
// All snapshots and all logs
query_t allLogs = _agent->allLogs();
auto headerFields =
std::make_unique<std::unordered_map<std::string, std::string>>();
arangodb::ClusterComm::instance()->asyncRequest(
"1", 1, pool.at(_peerId), rest::RequestType::POST,
path, std::make_shared<std::string>(builder.toJson()), headerFields,
std::make_shared<ActivationCallback>(_agent, _peerId, highest), 5.0, true,
1.0);
"1", 1, endpoint, rest::RequestType::POST, path,
std::make_shared<std::string>(allLogs->toJson()), headerFields,
std::make_shared<ActivationCallback>(_agent, _failed, _replacement),
5.0, true, 1.0);
_cv.wait(10000000); // 10 sec
if ((std::chrono::system_clock::now() - started) > timeout) {
_agent->failedActivation(_failed, _replacement);
LOG_TOPIC(WARN, Logger::AGENCY)
<< "Timed out while activating agent " << _replacement;
break;
}
}
LOG_TOPIC(DEBUG, Logger::AGENCY) << "Done activating " << _peerId;
}
void AgentActivator::beginShutdown() {
Thread::beginShutdown();
CONDITION_LOCKER(guard, _cv);
guard.broadcast();
}

View File

@ -39,17 +39,22 @@ namespace consensus {
class Agent;
class AgentActivator : public Thread {
public:
public:
AgentActivator();
AgentActivator(Agent*, std::string const&);
AgentActivator(Agent*, std::string const&, std::string const&);
~AgentActivator();
void beginShutdown() override;
void run() override;
private:
private:
Agent* _agent;
std::string _peerId;
std::string _failed;
std::string _replacement;
arangodb::basics::ConditionVariable _cv;
};
}

View File

@ -194,6 +194,34 @@ bool config_t::addToPool(std::pair<std::string, std::string> const& i) {
return true;
}
bool config_t::swapActiveMember(
std::string const& failed, std::string const& repl) {
WRITE_LOCKER(writeLocker, _lock);
try {
std::replace (_active.begin(), _active.end(), failed, repl);
} catch (std::exception const& e) {
LOG_TOPIC(ERR, Logger::AGENCY)
<< "Replacing " << failed << " with " << repl << "failed miserably";
return false;
}
return true;
}
std::string config_t::nextAgentInLine() const {
READ_LOCKER(writeLocker, _lock);
if (_poolSize > _agencySize) {
for (const auto& p : _pool) {
if (std::find(_active.begin(), _active.end(), p.first) == _active.end()) {
return p.first;
}
}
}
return ""; // No one left
}
size_t config_t::compactionStepSize() const {
READ_LOCKER(readLocker, _lock);
return _compactionStepSize;

View File

@ -160,8 +160,18 @@ struct config_t {
/// @brief get active agents
std::vector<std::string> active() const;
/// @brief Get minimum RAFT timeout
double minPing() const;
/// @brief Get maximum RAFT timeout
double maxPing() const;
/// @brief Get replacement for deceased active agent
bool swapActiveMember(std::string const&, std::string const&);
/// @brief Get next agent in line of succession
std::string nextAgentInLine() const;
};
}
}

View File

@ -229,9 +229,8 @@ std::string Constituent::endpoint(std::string id) const {
/// @brief Vote
bool Constituent::vote(term_t term, std::string id, index_t prevLogIndex,
term_t prevLogTerm, bool appendEntries) {
if(_vocbase==nullptr) {
return false;
}
TRI_ASSERT(_vocbase);
term_t t = 0;
std::string lid;

View File

@ -49,8 +49,11 @@ void Inception::gossip() {
std::chrono::seconds timeout(120);
size_t i = 0;
while (!this->isStopping()) {
config_t config = _agent->config(); // get a copy of conf
CONDITION_LOCKER(guard, _cv);
while (!this->isStopping() && !_agent->isStopping()) {
config_t config = _agent->config(); // get a copy of conf
query_t out = std::make_shared<Builder>();
out->openObject();
out->add("endpoint", VPackValue(config.endpoint()));
@ -88,7 +91,7 @@ void Inception::gossip() {
}
}
std::this_thread::sleep_for(std::chrono::milliseconds(250));
_cv.wait(100000);
if ((std::chrono::system_clock::now() - s) > timeout) {
if (config.poolComplete()) {
@ -157,3 +160,9 @@ void Inception::run() {
gossip();
}
}
void Inception::beginShutdown() {
Thread::beginShutdown();
CONDITION_LOCKER(guard, _cv);
guard.broadcast();
}

View File

@ -44,6 +44,7 @@ class Inception : public Thread {
explicit Inception(Agent*);
virtual ~Inception();
void beginShutdown() override;
void run() override;
private:
@ -51,6 +52,8 @@ class Inception : public Thread {
void gossip();
Agent* _agent;
arangodb::basics::ConditionVariable _cv;
};
}
}

View File

@ -731,3 +731,51 @@ bool State::persistActiveAgents(query_t const& active, query_t const& pool) {
return true;
}
query_t State::allLogs() const {
MUTEX_LOCKER(mutexLocker, _logLock);
auto bindVars = std::make_shared<VPackBuilder>();
bindVars->openObject();
bindVars->close();
std::string const comp("FOR c IN compact SORT c._key RETURN c");
std::string const logs("FOR l IN log SORT l._key RETURN l");
arangodb::aql::Query compq(false, _vocbase, comp.c_str(), comp.size(),
bindVars, nullptr, arangodb::aql::PART_MAIN);
arangodb::aql::Query logsq(false, _vocbase, logs.c_str(), logs.size(),
bindVars, nullptr, arangodb::aql::PART_MAIN);
auto compqResult = compq.execute(QueryRegistryFeature::QUERY_REGISTRY);
if (compqResult.code != TRI_ERROR_NO_ERROR) {
THROW_ARANGO_EXCEPTION_MESSAGE(compqResult.code, compqResult.details);
}
auto logsqResult = logsq.execute(QueryRegistryFeature::QUERY_REGISTRY);
if (logsqResult.code != TRI_ERROR_NO_ERROR) {
THROW_ARANGO_EXCEPTION_MESSAGE(logsqResult.code, logsqResult.details);
}
auto everything = std::make_shared<VPackBuilder>();
everything->openObject();
try {
everything->add("compact", compqResult.result->slice());
} catch (std::exception const& e) {
LOG_TOPIC(ERR, Logger::AGENCY)
<< "Failed to assemble compaction part of everything package";
}
try{
everything->add("logs", logsqResult.result->slice());
} catch (std::exception const& e) {
LOG_TOPIC(ERR, Logger::AGENCY)
<< "Failed to assemble remaining part of everything package";
}
everything->close();
return everything;
}

View File

@ -110,6 +110,9 @@ class State {
/// @brief Persist active agency in pool
bool persistActiveAgents(query_t const& active, query_t const& pool);
/// @brief Get everything from the state machine
query_t allLogs() const;
private:
/// @brief Save currentTerm, votedFor, log entries
bool persist(index_t index, term_t term,

View File

@ -521,7 +521,9 @@ void Store::run() {
}
toClear = clearExpired();
_agent->write(toClear);
if (_agent && !_agent->isStopping()) {
_agent->write(toClear);
}
}
}