1
0
Fork 0

corrected ttl for accepting integer seconds :)

This commit is contained in:
Kaveh Vahedipour 2016-03-31 09:08:18 +02:00
parent 0e49485c73
commit 153610c76f
6 changed files with 53 additions and 18 deletions

View File

@ -57,12 +57,6 @@ State const& Agent::state () const {
/// @brief Start all agency threads
bool Agent::start() {
LOG_TOPIC(INFO, Logger::AGENCY) << "Starting constituent personality.";
_constituent.start();
LOG_TOPIC(INFO, Logger::AGENCY) << "Starting spearhead worker.";
_spearhead.start();
LOG_TOPIC(INFO, Logger::AGENCY) << "Starting agency comm worker.";
Thread::start();
@ -106,6 +100,10 @@ bool Agent::leading() const {
return _constituent.leading();
}
void Agent::persist(term_t t, id_t i) {
// _state.persist(t, i);
}
bool Agent::waitFor (index_t index, duration_t timeout) {
if (size() == 1) // single host agency
@ -238,11 +236,23 @@ append_entries_t Agent::sendAppendEntriesRPC (id_t slave_id) {
}
bool Agent::load () {
LOG_TOPIC(INFO, Logger::AGENCY) << "Loading persistent state.";
if (!_state.loadCollections())
if (!_state.loadCollections()) {
LOG(FATAL) << "Failed to load persistent state on statup.";
}
LOG_TOPIC(INFO, Logger::AGENCY) << "Reassembling spearhead and read stores.";
_read_db.apply(_state.slices());
_spearhead.apply(_state.slices(_last_commit_index+1));
LOG_TOPIC(INFO, Logger::AGENCY) << "Starting spearhead worker.";
_spearhead.start();
_read_db.start();
LOG_TOPIC(INFO, Logger::AGENCY) << "Starting constituent personality.";
_constituent.update(0,0);
_constituent.start();
return true;
}
@ -299,6 +309,7 @@ void Agent::beginShutdown() {
Thread::beginShutdown();
_constituent.beginShutdown();
_spearhead.beginShutdown();
_read_db.beginShutdown();
CONDITION_LOCKER(guard, _cv);
guard.broadcast();
}

View File

@ -121,6 +121,9 @@ public:
return o;
}
/// @brief Persist term
void persist (term_t, id_t);
/// @brief State machine
State const& state () const;

View File

@ -185,6 +185,10 @@ bool Constituent::vote (
}
}
void Constituent::update (term_t t, id_t i) {
}
void Constituent::gossip (const constituency_t& constituency) {
// TODO: Replace lame notification by gossip protocol
}

View File

@ -94,6 +94,9 @@ public:
/// @brief Orderly shutdown of thread
void beginShutdown () override;
/// @brief Update with persisted term and voted_for
void update (term_t, id_t);
private:
/// @brief set term to new term

View File

@ -114,8 +114,12 @@ public:
return os;
}
private:
// @brief Persist term/leaderid
bool persist (term_t, id_t);
private:
bool snapshot ();
/// @brief Save currentTerm, votedFor, log entries
bool persist (index_t index, term_t term, id_t lid,

View File

@ -185,15 +185,14 @@ bool Node::addTimeToLive (long millis) {
bool Node::applies (VPackSlice const& slice) {
if (slice.type() == ValueType::Object) {
for (auto const& i : VPackObjectIterator(slice)) {
std::string key = i.key.toString();
key = key.substr(1,key.length()-2);
std::string key = i.key.copyString();
if (slice.hasKey("op")) {
std::string oper = slice.get("op").toString();
oper = oper.substr(1,oper.length()-2);
std::string oper = slice.get("op").copyString();
Slice const& self = this->slice();
if (oper == "delete") {
return _parent->removeChild(_node_name);
@ -204,7 +203,19 @@ bool Node::applies (VPackSlice const& slice) {
return false;
}
if (slice.hasKey("ttl")) {
addTimeToLive ((long)slice.get("ttl").getDouble()*1000);
long ttl = -1;
VPackSlice ttl_v = slice.get("ttl");
if (ttl_v.isNumber()) {
if (ttl_v.isDouble()) {
ttl = 1000l*static_cast<long>(slice.get("ttl").getDouble());
} else {
ttl = 1000l*slice.get("ttl").getInt();
}
addTimeToLive (ttl);
} else {
LOG_TOPIC(WARN, Logger::AGENCY) <<
"Non-number value assigned to ttl: " << ttl_v.toJson();
}
}
*this = slice.get("new");
return true;
@ -382,11 +393,10 @@ bool Store::check (VPackSlice const& slice) const {
return false;
}
for (auto const& precond : VPackObjectIterator(slice)) {
std::string path = precond.key.toString();
path = path.substr(1,path.size()-2);
std::string path = precond.key.copyString();
bool found = false;
Node node ("precond");
try {
node = (*this)(path);
found = true;