1
0
Fork 0

fixed compaction bug in RAFT

This commit is contained in:
Kaveh Vahedipour 2016-08-02 13:13:05 +02:00 committed by Wilfried Goesgens
parent 5a0560c61a
commit 8aaa8c9fdd
9 changed files with 91 additions and 21 deletions

View File

@ -695,6 +695,7 @@ add_definitions("-DARANGODB_LIBEV_VERSION=\"${LIBEV_VERSION}\"")
################################################################################
find_package(OpenSSL REQUIRED)
include_directories(${OPENSSL_INCLUDE_DIR})
add_definitions(-DARANGODB_OPENSSL_VERSION=\"${OPENSSL_VERSION}\")

View File

@ -343,19 +343,27 @@ bool Agent::load() {
ApplicationServer::getFeature<DatabaseFeature>("Database");
auto vocbase = database->vocbase();
auto queryRegistry = QueryRegistryFeature::QUERY_REGISTRY;
if (vocbase == nullptr) {
LOG_TOPIC(FATAL, Logger::AGENCY) << "could not determine _system database";
FATAL_ERROR_EXIT();
}
LOG_TOPIC(DEBUG, Logger::AGENCY) << "Loading persistent state.";
if (!_state.loadCollections(vocbase, _config.waitForSync)) {
if (!_state.loadCollections(vocbase, queryRegistry, _config.waitForSync)) {
LOG_TOPIC(DEBUG, Logger::AGENCY)
<< "Failed to load persistent state on startup.";
}
LOG_TOPIC(DEBUG, Logger::AGENCY) << "Reassembling spearhead and read stores.";
_spearhead.apply(_state.slices(_lastCommitIndex + 1));
{
CONDITION_LOCKER(guard, _appendCV);
guard.broadcast();
}
reportIn(id(), _state.lastLog().index);
LOG_TOPIC(DEBUG, Logger::AGENCY) << "Starting spearhead worker.";
@ -363,7 +371,6 @@ bool Agent::load() {
_readDB.start();
LOG_TOPIC(DEBUG, Logger::AGENCY) << "Starting constituent personality.";
auto queryRegistry = QueryRegistryFeature::QUERY_REGISTRY;
TRI_ASSERT(queryRegistry != nullptr);
_constituent.start(vocbase, queryRegistry);
@ -434,7 +441,7 @@ void Agent::run() {
while (!this->isStopping() && size() > 1) {
if (leading()) { // Only if leading
_appendCV.wait(100000);
_appendCV.wait(50000);
} else {
_appendCV.wait(); // Else wait for our moment in the sun
}
@ -485,7 +492,7 @@ void Agent::beginShutdown() {
bool Agent::lead() {
// Key value stores
//rebuildDBs();
rebuildDBs();
// Wake up run
CONDITION_LOCKER(guard, _appendCV);
@ -497,10 +504,11 @@ bool Agent::lead() {
// Rebuild key value stores
bool Agent::rebuildDBs() {
MUTEX_LOCKER(mutexLocker, _ioLock);
_spearhead.apply(_state.slices());
_readDB.apply(_state.slices());
_spearhead.apply(_state.slices(_lastCommitIndex+1));
_readDB.apply(_state.slices(_lastCommitIndex+1));
return true;
@ -508,10 +516,16 @@ bool Agent::rebuildDBs() {
/// Last commit index
arangodb::consensus::index_t Agent::lastCommited() const {
arangodb::consensus::index_t Agent::lastCommitted() const {
return _lastCommitIndex;
}
/// Last commit index
void Agent::lastCommitted(
arangodb::consensus::index_t lastCommitIndex) {
MUTEX_LOCKER(mutexLocker, _ioLock);
_lastCommitIndex = lastCommitIndex;
}
/// Last log entry
log_t const& Agent::lastLog() const { return _state.lastLog(); }

View File

@ -71,7 +71,7 @@ class Agent : public arangodb::Thread {
bool fitness() const;
/// @brief Leader ID
arangodb::consensus::index_t lastCommited() const;
arangodb::consensus::index_t lastCommitted() const;
/// @brief Leader ID
arangodb::consensus::id_t leaderID() const;
@ -137,6 +137,9 @@ class Agent : public arangodb::Thread {
private:
Agent& operator=(VPackSlice const&);
/// @brief Leader ID
void lastCommitted(arangodb::consensus::index_t);
/// @brief This server (need endpoint)
TRI_server_t* _server;
@ -164,7 +167,7 @@ class Agent : public arangodb::Thread {
/// @brief Spearhead (write) kv-store
Store _spearhead;
/// @brief Commited (read) kv-store
/// @brief Committed (read) kv-store
Store _readDB;
/// @brief Condition variable for appendEntries

View File

@ -244,6 +244,7 @@ RestHandler::status RestAgencyHandler::handleConfig() {
body.add(VPackValue(VPackValueType::Object));
body.add("term", Value(_agent->term()));
body.add("leaderId", Value(_agent->leaderID()));
body.add("lastCommited", Value(_agent->lastCommitted()));
body.add("configuration", _agent->config().toBuilder()->slice());
body.close();
generateResult(GeneralResponse::ResponseCode::OK, body.slice());

View File

@ -34,6 +34,7 @@
#include <thread>
#include "Aql/Query.h"
#include "Aql/QueryRegistry.h"
#include "Basics/StaticStrings.h"
#include "Basics/VelocyPackHelper.h"
#include "RestServer/QueryRegistryFeature.h"
@ -46,6 +47,7 @@
using namespace arangodb;
using namespace arangodb::application_features;
using namespace arangodb::aql;
using namespace arangodb::consensus;
using namespace arangodb::velocypack;
using namespace arangodb::rest;
@ -58,6 +60,7 @@ State::State(std::string const& endpoint)
_endpoint(endpoint),
_collectionsChecked(false),
_collectionsLoaded(false),
_queryRegistry(nullptr),
_cur(0) {}
@ -167,21 +170,53 @@ arangodb::consensus::index_t State::log(
void State::removeConflicts (query_t const& transactions) {
VPackSlice slice = transactions->slice();
TRI_ASSERT(slice.isArray());
if (slice.length() > 0) {
auto bindVars = std::make_shared<VPackBuilder>();
bindVars->openObject();
bindVars->close();
try {
auto idx = slice[0].get("index").getUInt();
if (idx-_cur < _log.size()) {
LOG_TOPIC(INFO, Logger::AGENCY)
LOG_TOPIC(DEBUG, Logger::AGENCY)
<< "Removing " << _log.size()-idx+_cur
<< " entries from log starting with " << idx;
_log.erase(_log.begin()+idx);
// persisted logs
std::stringstream aql;
aql << "FOR l IN log FILTER l._key >= '" << stringify(idx)
<< "' REMOVE l IN log";
LOG(WARN) << aql.str();
arangodb::aql::Query
query(false, _vocbase, aql.str().c_str(), aql.str().size(), bindVars,
nullptr, arangodb::aql::PART_MAIN);
auto queryResult = query.execute(_queryRegistry);
if (queryResult.code != TRI_ERROR_NO_ERROR) {
THROW_ARANGO_EXCEPTION_MESSAGE(queryResult.code, queryResult.details);
}
VPackSlice result = queryResult.result->slice();
LOG(WARN) << result.toJson();
// volatile logs
{
MUTEX_LOCKER(mutexLocker, _logLock);
_log.erase(_log.begin()+idx-1);
}
}
} catch (std::exception const& e) {
LOG_TOPIC(ERR, Logger::AGENCY) << e.what() << " " << __FILE__ << __LINE__;
}
}
}
@ -327,12 +362,17 @@ template<class T> std::ostream& operator<< (std::ostream& o, std::deque<T> const
}
/// Load collections
bool State::loadCollections(TRI_vocbase_t* vocbase, bool waitForSync) {
bool State::loadCollections(TRI_vocbase_t* vocbase, QueryRegistry* queryRegistry,
bool waitForSync) {
_vocbase = vocbase;
_queryRegistry = queryRegistry;
TRI_ASSERT(_vocbase != nullptr);
_options.waitForSync = false;
_options.silent = true;
if (loadPersisted()) {
if (_log.empty()) {
std::shared_ptr<Buffer<uint8_t>> buf = std::make_shared<Buffer<uint8_t>>();
@ -343,7 +383,7 @@ bool State::loadCollections(TRI_vocbase_t* vocbase, bool waitForSync) {
}
return true;
}
LOG(WARN) << "... done";
return false;
@ -441,6 +481,9 @@ bool State::loadRemaining() {
}
}
_agent->rebuildDBs();
_agent->lastCommitted(_log.back().index);
return true;
}

View File

@ -36,6 +36,11 @@
struct TRI_vocbase_t;
namespace arangodb {
namespace aql {
class QueryRegistry;
}
namespace consensus {
class Agent;
@ -85,7 +90,7 @@ class State {
bool configure(Agent* agent);
/// @brief Load persisted data from above or start with empty log
bool loadCollections(TRI_vocbase_t*, bool);
bool loadCollections(TRI_vocbase_t*, aql::QueryRegistry*, bool);
/// @brief Pipe to ostream
friend std::ostream& operator<<(std::ostream& os, State const& s) {
@ -141,6 +146,9 @@ class State {
bool _collectionsChecked; /**< @brief Collections checked */
bool _collectionsLoaded;
aql::QueryRegistry* _queryRegistry;
size_t _compaction_step;
size_t _cur;

View File

@ -285,7 +285,7 @@ std::vector<bool> Store::apply(
Builder body; // host
body.openObject();
body.add("term", VPackValue(_agent->term()));
body.add("index", VPackValue(_agent->lastCommited()));
body.add("index", VPackValue(_agent->lastCommitted()));
auto ret = in.equal_range(url);
for (auto it = ret.first; it != ret.second; ++it) {

View File

@ -52,8 +52,8 @@ COMP=100
BASE=4001
NATH=$(( $NRDBSERVERS + $NRCOORDINATORS + $NRAGENTS ))
rm -rf cluster
mkdir cluster
#rm -rf cluster
mkdir -p cluster
echo Starting agency ...
if [ $NRAGENTS -gt 1 ]; then
for aid in `seq 0 $(( $NRAGENTS - 2 ))`; do

View File

@ -25,7 +25,7 @@ SFRE=2.5
COMP=1000
BASE=4001
#rm -rf agency
rm -rf agency
mkdir -p agency
echo -n "Starting agency ... "
if [ $NRAGENTS -gt 1 ]; then