1
0
Fork 0

Merge branch 'devel' of github.com:arangodb/ArangoDB into pipeline

This commit is contained in:
Wilfried Goesgens 2016-08-02 09:22:46 +02:00
commit 04a1be577f
6 changed files with 68 additions and 49 deletions

View File

@ -96,17 +96,16 @@ struct log_t {
index_t index; ///< @brief Log index
term_t term; ///< @brief Log term
id_t leaderId; ///< @brief Leader's ID
buffer_t entry; ///< @brief To log
std::chrono::milliseconds timestamp; ///< @brief Timestamp
log_t(index_t idx, term_t t, id_t lid, buffer_t const& e)
: index(idx), term(t), leaderId(lid), entry(e),
log_t(index_t idx, term_t t, buffer_t const& e)
: index(idx), term(t), entry(e),
timestamp(std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch())) {}
friend std::ostream& operator<<(std::ostream& o, log_t const& l) {
o << l.index << " " << l.term << " " << l.leaderId << " "
o << l.index << " " << l.term << " "
<< l.entry->toString() << " " << l.timestamp.count();
return o;
}

View File

@ -248,12 +248,14 @@ bool Agent::recvAppendEntriesRPC(term_t term,
return false;
}
_state.removeConflicts(queries);
if (queries->slice().length()) {
LOG_TOPIC(DEBUG, Logger::AGENCY) << "Appending "
<< queries->slice().length()
<< " entries to state machine.";
/* bool success = */
_state.log(queries, term, leaderId, prevIndex, prevTerm);
_state.log(queries, term, prevIndex, prevTerm);
_spearhead.apply(_state.slices(_lastCommitIndex + 1, leaderCommitIndex));
_readDB.apply(_state.slices(_lastCommitIndex + 1, leaderCommitIndex));
_lastCommitIndex = leaderCommitIndex;
@ -308,6 +310,7 @@ priv_rpc_ret_t Agent::sendAppendEntriesRPC(
auto const& entry = unconfirmed.at(i);
builder.add(VPackValue(VPackValueType::Object));
builder.add("index", VPackValue(entry.index));
builder.add("term", VPackValue(entry.term));
builder.add("query", VPackSlice(entry.entry->data()));
builder.close();
last = entry.index;
@ -389,7 +392,7 @@ write_ret_t Agent::write(query_t const& query) {
{
MUTEX_LOCKER(mutexLocker, _ioLock);
applied = _spearhead.apply(query);
indices = _state.log(query, applied, term(), id());
indices = _state.log(query, applied, term());
}
// Maximum log index
@ -435,7 +438,7 @@ void Agent::run() {
} else {
_appendCV.wait(); // Else wait for our moment in the sun
}
// Append entries to followers
for (arangodb::consensus::id_t i = 0; i < size(); ++i) {
if (i != id()) {
@ -482,11 +485,11 @@ void Agent::beginShutdown() {
bool Agent::lead() {
// Key value stores
rebuildDBs();
//rebuildDBs();
// Wake up run
CONDITION_LOCKER(guard, _appendCV);
guard.signal();
guard.broadcast();
return true;

View File

@ -257,7 +257,6 @@ RestHandler::status RestAgencyHandler::handleState() {
body.add(VPackValue(VPackValueType::Object));
body.add("index", VPackValue(i.index));
body.add("term", VPackValue(i.term));
body.add("leader", VPackValue(i.leaderId));
body.add("query", VPackSlice(i.entry->data()));
body.close();
}

View File

@ -58,32 +58,26 @@ State::State(std::string const& endpoint)
_endpoint(endpoint),
_collectionsChecked(false),
_collectionsLoaded(false),
_cur(0) {
std::shared_ptr<Buffer<uint8_t>> buf = std::make_shared<Buffer<uint8_t>>();
VPackSlice value = arangodb::basics::VelocyPackHelper::EmptyObjectValue();
buf->append(value.startAs<char const>(), value.byteSize());
if (!_log.size()) {
_log.push_back(log_t(arangodb::consensus::index_t(0), term_t(0),
arangodb::consensus::id_t(0), buf));
}
}
_cur(0) {}
/// Default dtor
State::~State() {}
inline std::string stringify (arangodb::consensus::index_t index) {
std::ostringstream i_str;
i_str << std::setw(20) << std::setfill('0') << index;
return i_str.str();
}
/// Persist one entry
bool State::persist(arangodb::consensus::index_t index, term_t term,
arangodb::consensus::id_t lid,
arangodb::velocypack::Slice const& entry) {
Builder body;
body.add(VPackValue(VPackValueType::Object));
std::ostringstream i_str;
i_str << std::setw(20) << std::setfill('0') << index;
body.add("_key", Value(i_str.str()));
body.add("_key", Value(stringify(index)));
body.add("term", Value(term));
body.add("leader", Value((uint32_t)lid));
body.add("request", entry);
body.close();
@ -111,8 +105,7 @@ bool State::persist(arangodb::consensus::index_t index, term_t term,
/// Log transaction (leader)
std::vector<arangodb::consensus::index_t> State::log(
query_t const& transaction, std::vector<bool> const& appl, term_t term,
arangodb::consensus::id_t lid) {
query_t const& transaction, std::vector<bool> const& appl, term_t term) {
std::vector<arangodb::consensus::index_t> idx(appl.size());
std::vector<bool> good = appl;
@ -130,8 +123,8 @@ std::vector<arangodb::consensus::index_t> State::log(
std::make_shared<Buffer<uint8_t>>();
buf->append((char const*)i[0].begin(), i[0].byteSize());
idx[j] = _log.back().index + 1;
_log.push_back(log_t(idx[j], term, lid, buf)); // log to RAM
persist(idx[j], term, lid, i[0]); // log to disk
_log.push_back(log_t(idx[j], term, buf)); // log to RAM
persist(idx[j], term, i[0]); // log to disk
++j;
}
}
@ -142,7 +135,7 @@ std::vector<arangodb::consensus::index_t> State::log(
/// Log transactions (follower)
arangodb::consensus::index_t State::log(
query_t const& transactions, term_t term, arangodb::consensus::id_t lid,
query_t const& transactions, term_t term,
arangodb::consensus::index_t prevLogIndex, term_t prevLogTerm) {
if (transactions->slice().type() != VPackValueType::Array) {
@ -155,13 +148,14 @@ arangodb::consensus::index_t State::log(
for (auto const& i : VPackArrayIterator(transactions->slice())) {
try {
auto idx = i.get("index").getUInt();
auto trm = i.get("term").getUInt();
if (highest < idx) {
highest = idx;
}
std::shared_ptr<Buffer<uint8_t>> buf = std::make_shared<Buffer<uint8_t>>();
buf->append((char const*)i.get("query").begin(),i.get("query").byteSize());
_log.push_back(log_t(idx, term, lid, buf));
persist(idx, term, lid, i.get("query")); // to disk
_log.push_back(log_t(idx, trm, buf));
persist(idx, trm, i.get("query")); // to disk
} catch (std::exception const& e) {
LOG_TOPIC(ERR, Logger::AGENCY) << e.what() << " " << __FILE__ << __LINE__;
}
@ -172,6 +166,25 @@ arangodb::consensus::index_t State::log(
}
void State::removeConflicts (query_t const& transactions) {
VPackSlice slice = transactions->slice();
TRI_ASSERT(slice.isArray());
if (slice.length() > 0) {
try {
auto idx = slice[0].get("index").getUInt();
if (idx-_cur < _log.size()) {
LOG_TOPIC(INFO, Logger::AGENCY)
<< "Removing " << _log.size()-idx+_cur
<< " entries from log starting with " << idx;
_log.erase(_log.begin()+idx);
}
} catch (std::exception const& e) {
LOG_TOPIC(ERR, Logger::AGENCY) << e.what() << " " << __FILE__ << __LINE__;
}
}
}
/// Get log entries from indices "start" to "end"
std::vector<log_t> State::get(arangodb::consensus::index_t start,
arangodb::consensus::index_t end) const {
@ -306,6 +319,12 @@ bool State::createCollection(std::string const& name) {
return true;
}
template<class T> std::ostream& operator<< (std::ostream& o, std::deque<T> const& d) {
for (auto const& i : d ) {
o << i;
}
return o;
}
/// Load collections
bool State::loadCollections(TRI_vocbase_t* vocbase, bool waitForSync) {
@ -319,14 +338,13 @@ bool State::loadCollections(TRI_vocbase_t* vocbase, bool waitForSync) {
std::shared_ptr<Buffer<uint8_t>> buf = std::make_shared<Buffer<uint8_t>>();
VPackSlice value = arangodb::basics::VelocyPackHelper::EmptyObjectValue();
buf->append(value.startAs<char const>(), value.byteSize());
_log.push_back(log_t(arangodb::consensus::index_t(0), term_t(0),
arangodb::consensus::id_t(0), buf));
persist(
0, 0, (std::numeric_limits<arangodb::consensus::id_t>::max)(), value);
_log.push_back(log_t(arangodb::consensus::index_t(0), term_t(0), buf));
persist(0, 0, value);
}
return true;
}
LOG(WARN) << "... done";
return false;
}
@ -414,7 +432,6 @@ bool State::loadRemaining() {
log_t(
std::stoi(i.get(StaticStrings::KeyString).copyString()),
static_cast<term_t>(i.get("term").getUInt()),
static_cast<arangodb::consensus::id_t>(i.get("leader").getUInt()),
tmp));
} catch (std::exception const& e) {
LOG_TOPIC(ERR, Logger::AGENCY) <<

View File

@ -56,14 +56,12 @@ class State {
/// @brief Log entries (leader)
std::vector<index_t> log(query_t const& query,
std::vector<bool> const& indices, term_t term,
arangodb::consensus::id_t lid);
std::vector<bool> const& indices, term_t term);
/// @brief Log entries (followers)
index_t log(query_t const& queries, term_t term,
arangodb::consensus::id_t leaderId, index_t prevLogIndex,
term_t prevLogTerm);
index_t log(query_t const& queries, term_t term, index_t prevLogIndex,
term_t prevLogTerm);
/// @brief Find entry at index with term
bool find(index_t index, term_t term);
@ -93,19 +91,22 @@ class State {
friend std::ostream& operator<<(std::ostream& os, State const& s) {
for (auto const& i : s._log)
LOG_TOPIC(INFO, Logger::AGENCY)
<< "index(" << i.index << ") term(" << i.term << ") leader: ("
<< i.leaderId << ") query(" << VPackSlice(i.entry->data()).toJson()
<< ")";
<< "index(" << i.index << ") term(" << i.term << ") query("
<< VPackSlice(i.entry->data()).toJson() << ")";
return os;
}
bool compact(arangodb::consensus::index_t cind);
void removeConflicts(query_t const&);
private:
bool snapshot();
/// @brief Save currentTerm, votedFor, log entries
bool persist(index_t index, term_t term, arangodb::consensus::id_t lid,
bool persist(index_t index, term_t term,
arangodb::velocypack::Slice const& entry);
/// @brief Load collection from persistent store

View File

@ -25,8 +25,8 @@ SFRE=2.5
COMP=1000
BASE=4001
rm -rf agency
mkdir agency
#rm -rf agency
mkdir -p agency
echo -n "Starting agency ... "
if [ $NRAGENTS -gt 1 ]; then
for aid in `seq 0 $(( $NRAGENTS - 2 ))`; do