1
0
Fork 0

Merge branch 'devel' of github.com:arangodb/arangodb into devel

* 'devel' of github.com:arangodb/arangodb:
  attempt to fix some compile warnings caused by Boost header files
  fix some log topics
  fix segfault
  sendAppendEntries does resonable estimation of follower time needs leading to less frequent spamming of followers
  fix issue caused by wrong merge
  fix global loglevel adjustment
  fix typo
This commit is contained in:
Jan Christoph Uhde 2017-02-10 12:14:05 +01:00
commit fcda5f167f
15 changed files with 130 additions and 74 deletions

View File

@ -9,14 +9,14 @@ set(boost_src "${CMAKE_CURRENT_SOURCE_DIR}/${boost_version}")
option(USE_BOOST_UNITTESTS "use boost unit-tests" ON)
if(false) #for now we do not use the system's boost but our own instead!
add_library(boost_boost INTERFACE)
target_include_directories(boost_boost PUBLIC SYSTEM ${Boost_INCLUDE_DIRS})
target_include_directories(boost_boost SYSTEM PUBLIC ${Boost_INCLUDE_DIRS})
# we are good
# create imported targets?!
else()
#create interface target for boost header only libraries
message(STATUS "using 3rdParty BOOST")
add_library(boost_boost INTERFACE)
target_include_directories(boost_boost INTERFACE SYSTEM "${boost_src}")
target_include_directories(boost_boost SYSTEM INTERFACE "${boost_src}")
#build boost_system - there seems to be just one cpp file:)
add_library(boost_system STATIC

View File

@ -45,7 +45,7 @@ AgencyFeature::AgencyFeature(application_features::ApplicationServer* server)
_supervision(false),
_waitForSync(true),
_supervisionFrequency(5.0),
_compactionStepSize(2000),
_compactionStepSize(200000),
_compactionKeepSize(500),
_supervisionGracePeriod(15.0),
_cmdLineTimings(false)
@ -233,7 +233,7 @@ void AgencyFeature::start() {
_agent.reset(new consensus::Agent(consensus::config_t(
_size, _poolSize, _minElectionTimeout, _maxElectionTimeout, endpoint,
_agencyEndpoints, _supervision, false, _supervisionFrequency,
_agencyEndpoints, _supervision, _waitForSync, _supervisionFrequency,
_compactionStepSize, _compactionKeepSize, _supervisionGracePeriod,
_cmdLineTimings)));

View File

@ -205,17 +205,20 @@ Agent::raft_commit_t Agent::waitFor(index_t index, double timeout) {
}
// AgentCallback reports id of follower and its highest processed index
void Agent::reportIn(std::string const& peerId, index_t index) {
void Agent::reportIn(std::string const& peerId, index_t index, size_t toLog) {
{
// Enforce _lastCommitIndex, _readDB and compaction to progress atomically
MUTEX_LOCKER(mutexLocker, _ioLock);
MUTEX_LOCKER(ioLocker, _ioLock);
// Update last acknowledged answer
_lastAcked[peerId] = system_clock::now();
if (index > _confirmed[peerId]) { // progress this follower?
_confirmed[peerId] = index;
if (toLog > 0) { // We want to reset the wait time only if a package callback
_earliestPackage[peerId] = system_clock::now();
}
}
if (index > _lastCommitIndex) { // progress last commit?
@ -238,9 +241,10 @@ void Agent::reportIn(std::string const& peerId, index_t index) {
_lastCommitIndex + 1, index), _lastCommitIndex, _constituent.term());
_lastCommitIndex = index;
_leaderCommitIndex = index;
_lastAppliedIndex = index;
MUTEX_LOCKER(liLocker, _liLock);
_leaderCommitIndex = index;
if (_leaderCommitIndex >= _nextCompationAfter) {
_compactor.wakeUp();
}
@ -279,7 +283,7 @@ bool Agent::recvAppendEntriesRPC(
}
{
MUTEX_LOCKER(mutexLocker, _ioLock);
MUTEX_LOCKER(ioLocker, _liLock);
_leaderCommitIndex = leaderCommitIndex;
}
@ -288,7 +292,7 @@ bool Agent::recvAppendEntriesRPC(
// State machine, _lastCommitIndex to advance atomically
if (nqs > 0) {
MUTEX_LOCKER(mutexLocker, _ioLock);
MUTEX_LOCKER(ioLocker, _ioLock);
size_t ndups = _state.removeConflicts(queries);
@ -318,6 +322,9 @@ bool Agent::recvAppendEntriesRPC(
/// Leader's append entries
void Agent::sendAppendEntriesRPC() {
std::chrono::duration<int, std::ratio<1, 1000000>> const dt (
(_config.waitForSync() ? 10000 : 500));
auto cc = ClusterComm::instance();
if (cc == nullptr) {
// nullptr only happens during controlled shutdown
@ -335,7 +342,7 @@ void Agent::sendAppendEntriesRPC() {
index_t last_confirmed, lastCommitIndex;
{
MUTEX_LOCKER(mutexLocker, _ioLock);
MUTEX_LOCKER(ioLocker, _ioLock);
t = this->term();
last_confirmed = _confirmed[followerId];
lastCommitIndex = _lastCommitIndex;
@ -366,21 +373,25 @@ void Agent::sendAppendEntriesRPC() {
<< "&prevLogTerm=" << unconfirmed.front().term << "&leaderCommit="
<< lastCommitIndex;
size_t toLog = 0;
// Body
Builder builder;
builder.add(VPackValue(VPackValueType::Array));
for (size_t i = 1; i < unconfirmed.size(); ++i) {
auto const& entry = unconfirmed.at(i);
builder.add(VPackValue(VPackValueType::Object));
builder.add("index", VPackValue(entry.index));
builder.add("term", VPackValue(entry.term));
builder.add("query", VPackSlice(entry.entry->data()));
builder.add("clientId", VPackValue(entry.clientId));
builder.close();
highest = entry.index;
if ((system_clock::now() - _earliestPackage[followerId]).count() > 0) {
for (size_t i = 1; i < unconfirmed.size(); ++i) {
auto const& entry = unconfirmed.at(i);
builder.add(VPackValue(VPackValueType::Object));
builder.add("index", VPackValue(entry.index));
builder.add("term", VPackValue(entry.term));
builder.add("query", VPackSlice(entry.entry->data()));
builder.add("clientId", VPackValue(entry.clientId));
builder.close();
highest = entry.index;
++toLog;
}
}
builder.close();
// Verbose output
if (unconfirmed.size() > 1) {
LOG_TOPIC(TRACE, Logger::AGENCY)
@ -401,13 +412,28 @@ void Agent::sendAppendEntriesRPC() {
"1", 1, _config.poolAt(followerId),
arangodb::rest::RequestType::POST, path.str(),
std::make_shared<std::string>(builder.toJson()), headerFields,
std::make_shared<AgentCallback>(this, followerId, highest),
0.7 * _config.minPing(), true);
std::make_shared<AgentCallback>(this, followerId, highest, toLog),
5.0 * _config.maxPing(), true);
// _lastSent, _lastHighest: local and single threaded access
_lastSent[followerId] = system_clock::now();
_lastHighest[followerId] = highest;
_lastSent[followerId] = system_clock::now();
_lastHighest[followerId] = highest;
if (toLog > 0) {
_earliestPackage[followerId] = system_clock::now() + toLog * dt;
LOG_TOPIC(TRACE, Logger::AGENCY)
<< "Appending " << unconfirmed.size() - 1 << " entries up to index "
<< highest << " to follower " << followerId << ". Message: "
<< builder.toJson()
<< ". Next real log contact to " << followerId<< " in: "
<< std::chrono::duration<double, std::milli>(
_earliestPackage[followerId]-system_clock::now()).count() << "ms";
} else {
LOG_TOPIC(TRACE, Logger::AGENCY)
<< "Just keeping follower " << followerId
<< " devout with " << builder.toJson();
}
}
}
}
@ -445,7 +471,7 @@ query_t Agent::activate(query_t const& everything) {
}
{
MUTEX_LOCKER(mutexLocker, _ioLock); // Atomicity
MUTEX_LOCKER(ioLocker, _ioLock); // Atomicity
if (!compact.isEmptyArray()) {
_readDB = compact.get("readDB");
}
@ -579,7 +605,7 @@ query_t Agent::lastAckedAgo() const {
std::map<std::string, TimePoint> lastAcked;
{
MUTEX_LOCKER(mutexLocker, _ioLock);
MUTEX_LOCKER(ioLocker, _ioLock);
lastAcked = _lastAcked;
}
@ -615,7 +641,7 @@ trans_ret_t Agent::transact(query_t const& queries) {
ret->openArray();
{
MUTEX_LOCKER(mutexLocker, _ioLock);
MUTEX_LOCKER(ioLocker, _ioLock);
// Only leader else redirect
if (challengeLeadership()) {
@ -675,7 +701,7 @@ trans_ret_t Agent::transient(query_t const& queries) {
{
VPackArrayBuilder b(ret.get());
MUTEX_LOCKER(mutexLocker, _ioLock);
MUTEX_LOCKER(ioLocker, _ioLock);
// Only leader else redirect
if (challengeLeadership()) {
@ -707,7 +733,7 @@ inquire_ret_t Agent::inquire(query_t const& query) {
return inquire_ret_t(false, leader);
}
MUTEX_LOCKER(mutexLocker, _ioLock);
MUTEX_LOCKER(ioLocker, _ioLock);
auto si = _state.inquire(query);
@ -744,7 +770,7 @@ write_ret_t Agent::write(query_t const& query) {
// Apply to spearhead and get indices for log entries
{
MUTEX_LOCKER(mutexLocker, _ioLock);
MUTEX_LOCKER(ioLocker, _ioLock);
// Only leader else redirect
if (multihost && challengeLeadership()) {
@ -777,7 +803,7 @@ read_ret_t Agent::read(query_t const& query) {
return read_ret_t(false, leader);
}
MUTEX_LOCKER(mutexLocker, _ioLock);
MUTEX_LOCKER(ioLocker, _ioLock);
// Only leader else redirect
if (challengeLeadership()) {
@ -836,7 +862,7 @@ void Agent::reportActivated(
if (state->slice().get("success").getBoolean()) {
{
MUTEX_LOCKER(mutexLocker, _ioLock);
MUTEX_LOCKER(ioLocker, _ioLock);
_confirmed.erase(failed);
auto commitIndex = state->slice().get("commitId").getNumericValue<index_t>();
_confirmed[replacement] = commitIndex;
@ -854,7 +880,7 @@ void Agent::reportActivated(
}
} else {
MUTEX_LOCKER(mutexLocker, _ioLock);
MUTEX_LOCKER(ioLocker, _ioLock);
myterm = _constituent.term();
}
@ -903,7 +929,7 @@ void Agent::detectActiveAgentFailures() {
std::map<std::string, TimePoint> lastAcked;
{
MUTEX_LOCKER(mutexLocker, _ioLock);
MUTEX_LOCKER(ioLocker, _ioLock);
lastAcked = _lastAcked;
}
@ -978,7 +1004,7 @@ void Agent::prepareLead() {
// Reset last acknowledged
{
MUTEX_LOCKER(mutexLocker, _ioLock);
MUTEX_LOCKER(ioLocker, _ioLock);
for (auto const& i : _config.active()) {
_lastAcked[i] = system_clock::now();
}
@ -999,7 +1025,7 @@ void Agent::lead() {
// Agency configuration
term_t myterm;
{
MUTEX_LOCKER(mutexLocker, _ioLock);
MUTEX_LOCKER(ioLocker, _ioLock);
myterm = _constituent.term();
}
@ -1137,7 +1163,8 @@ void Agent::notify(query_t const& message) {
// Rebuild key value stores
arangodb::consensus::index_t Agent::rebuildDBs() {
MUTEX_LOCKER(mutexLocker, _ioLock);
MUTEX_LOCKER(ioLocker, _ioLock);
MUTEX_LOCKER(liLocker, _liLock);
// Apply logs from last applied index to leader's commit index
LOG_TOPIC(DEBUG, Logger::AGENCY)
@ -1168,14 +1195,15 @@ void Agent::compact() {
/// Last commit index
arangodb::consensus::index_t Agent::lastCommitted() const {
MUTEX_LOCKER(mutexLocker, _ioLock);
MUTEX_LOCKER(ioLocker, _ioLock);
return _lastCommitIndex;
}
/// Last commit index
void Agent::lastCommitted(arangodb::consensus::index_t lastCommitIndex) {
MUTEX_LOCKER(mutexLocker, _ioLock);
MUTEX_LOCKER(ioLocker, _ioLock);
_lastCommitIndex = lastCommitIndex;
MUTEX_LOCKER(liLocker, _liLock);
_leaderCommitIndex = lastCommitIndex;
}
@ -1190,7 +1218,7 @@ Store const& Agent::readDB() const { return _readDB; }
/// Get readdb
arangodb::consensus::index_t Agent::readDB(Node& node) const {
MUTEX_LOCKER(mutexLocker, _ioLock);
MUTEX_LOCKER(ioLocker, _ioLock);
node = _readDB.get();
return _lastCommitIndex;
}
@ -1201,13 +1229,14 @@ Store const& Agent::transient() const { return _transient; }
/// Rebuild from persisted state
Agent& Agent::operator=(VPackSlice const& compaction) {
// Catch up with compacted state
MUTEX_LOCKER(mutexLocker, _ioLock);
MUTEX_LOCKER(ioLocker, _ioLock);
_spearhead = compaction.get("readDB");
_readDB = compaction.get("readDB");
// Catch up with commit
try {
_lastCommitIndex = std::stoul(compaction.get("_key").copyString());
MUTEX_LOCKER(liLocker, _liLock);
_leaderCommitIndex = _lastCommitIndex;
} catch (std::exception const& e) {
LOG_TOPIC(ERR, Logger::AGENCY) << e.what() << " " << __FILE__ << __LINE__;

View File

@ -145,7 +145,7 @@ class Agent : public arangodb::Thread {
void beginShutdown() override final;
/// @brief Report appended entries from AgentCallback
void reportIn(std::string const& id, index_t idx);
void reportIn(std::string const&, index_t, size_t = 0);
/// @brief Wait for slaves to confirm appended entries
raft_commit_t waitFor(index_t last_entry, double timeout = 2.0);
@ -294,6 +294,7 @@ class Agent : public arangodb::Thread {
std::map<std::string, TimePoint> _lastAcked;
std::map<std::string, TimePoint> _lastSent;
std::map<std::string, TimePoint> _earliestPackage;
/**< @brief RAFT consistency lock:
_spearhead
@ -305,6 +306,9 @@ class Agent : public arangodb::Thread {
*/
mutable arangodb::Mutex _ioLock;
// lock for _leaderCommitIndex
mutable arangodb::Mutex _liLock;
// @brief guard _activator
mutable arangodb::Mutex _activatorLock;

View File

@ -28,10 +28,11 @@
using namespace arangodb::consensus;
using namespace arangodb::velocypack;
AgentCallback::AgentCallback() : _agent(0), _last(0), _startTime(0.0) {}
AgentCallback::AgentCallback() :
_agent(0), _last(0), _startTime(0.0), _toLog(0) {}
AgentCallback::AgentCallback(Agent* agent, std::string const& slaveID,
index_t last)
index_t last, index_t toLog)
: _agent(agent), _last(last), _slaveID(slaveID),
_startTime(TRI_microtime()) {}
@ -40,7 +41,7 @@ void AgentCallback::shutdown() { _agent = 0; }
bool AgentCallback::operator()(arangodb::ClusterCommResult* res) {
if (res->status == CL_COMM_SENT) {
if (_agent) {
_agent->reportIn(_slaveID, _last);
_agent->reportIn(_slaveID, _last, _toLog);
}
LOG_TOPIC(DEBUG, Logger::AGENCY)
<< "Got good callback from AppendEntriesRPC: "

View File

@ -36,7 +36,7 @@ class AgentCallback : public arangodb::ClusterCommCallback {
public:
AgentCallback();
AgentCallback(Agent*, std::string const&, index_t);
AgentCallback(Agent*, std::string const&, index_t, size_t);
virtual bool operator()(arangodb::ClusterCommResult*) override final;
@ -47,6 +47,7 @@ class AgentCallback : public arangodb::ClusterCommCallback {
index_t _last;
std::string _slaveID;
double _startTime;
size_t _toLog;
};
}
} // namespace

View File

@ -109,36 +109,46 @@ bool State::persist(arangodb::consensus::index_t index, term_t term,
return (res == TRI_ERROR_NO_ERROR);
}
/// Log transaction (leader)
std::vector<arangodb::consensus::index_t> State::log(
query_t const& transaction, std::vector<bool> const& appl, term_t term) {
std::vector<arangodb::consensus::index_t> idx(appl.size());
std::vector<bool> good = appl;
query_t const& transactions, std::vector<bool> const& applicable, term_t term) {
std::vector<arangodb::consensus::index_t> idx(applicable.size());
size_t j = 0;
auto const& slice = transaction->slice();
auto const& slice = transactions->slice();
if (!slice.isArray()) {
THROW_ARANGO_EXCEPTION_MESSAGE(30000,
"Agency request syntax is [[<queries>]]");
THROW_ARANGO_EXCEPTION_MESSAGE(
30000, "Agency syntax requires array of transactions [[<queries>]]");
}
if (slice.length() != good.size()) {
THROW_ARANGO_EXCEPTION_MESSAGE(30000,
"Agency request syntax is [[<queries>]]");
}
MUTEX_LOCKER(mutexLocker, _logLock); // log entries must stay in order
TRI_ASSERT(slice.length() == applicable.size());
MUTEX_LOCKER(mutexLocker, _logLock);
for (auto const& i : VPackArrayIterator(slice)) {
TRI_ASSERT(i.isArray());
std::string clientId;
if (good[j]) {
if (!i.isArray()) {
THROW_ARANGO_EXCEPTION_MESSAGE(
30000,
"Transaction syntax is [{<operations>}, {<preconditions>}, \"clientId\"]"
);
}
if (applicable[j]) {
std::shared_ptr<Buffer<uint8_t>> buf =
std::make_shared<Buffer<uint8_t>>();
buf->append((char const*)i[0].begin(), i[0].byteSize());
std::string clientId;
if (i.length()==3) {
clientId = i[2].copyString();
}
TRI_ASSERT(!_log.empty()); // log must not ever be empty
idx[j] = _log.back().index + 1;
_log.push_back(log_t(idx[j], term, buf, clientId)); // log to RAM
@ -146,10 +156,12 @@ std::vector<arangodb::consensus::index_t> State::log(
std::pair<std::string, arangodb::consensus::index_t>(clientId, idx[j]));
persist(idx[j], term, i[0], clientId); // log to disk
}
++j;
}
return idx;
}
/// Log transaction (leader)

View File

@ -138,18 +138,15 @@ void DatabaseManagerThread::run() {
// regular database
// ---------------------------
LOG_TOPIC(TRACE, arangodb::Logger::FIXME) << "physically removing database directory '"
<< engine->databasePath(database) << "' of database '"
<< database->name() << "'";
std::string path;
// remove apps directory for database
auto appPath = dealer->appPath();
if (database->isOwnAppsDirectory() && !appPath.empty()) {
path = arangodb::basics::FileUtils::buildFilename(
std::string path = arangodb::basics::FileUtils::buildFilename(
arangodb::basics::FileUtils::buildFilename(appPath, "_db"),
database->name());
@ -160,6 +157,8 @@ void DatabaseManagerThread::run() {
TRI_RemoveDirectory(path.c_str());
}
}
engine->dropDatabase(database);
}
delete database;

View File

@ -201,7 +201,7 @@ function DatabaseSuite () {
}
}
if (tries > 15) {
require("internal").printf("[WARNING] waited " + tries * 2 +" seconds for " + path + " do disappear");
require("internal").printf("[WARNING] waited " + tries * 2 + " seconds for " + path + " to disappear");
}
// yes, we know this test fails in windows now and then.
assertFalse(fs.exists(path));

View File

@ -58,6 +58,8 @@ LogTopic Logger::QUERIES("queries", LogLevel::INFO);
LogTopic Logger::REPLICATION("replication", LogLevel::INFO);
LogTopic Logger::REQUESTS("requests", LogLevel::FATAL); // suppress
LogTopic Logger::STARTUP("startup", LogLevel::INFO);
LogTopic Logger::SUPERVISION("supervision", LogLevel::INFO);
LogTopic Logger::SYSCALL("syscall", LogLevel::WARN);
LogTopic Logger::THREADS("threads", LogLevel::WARN);
LogTopic Logger::V8("v8", LogLevel::WARN);

View File

@ -111,6 +111,9 @@ void Logger::setLogLevel(std::string const& levelName) {
if (isGeneral) {
Logger::setLogLevel(level);
// setting the log level for topic "fixme" is required here, too,
// as "fixme" is the previous general log topic...
LogTopic::setLogLevel(std::string("fixme"), level);
} else {
LogTopic::setLogLevel(v[0], level);
}

View File

@ -145,6 +145,7 @@ class Logger {
static LogTopic REQUESTS;
static LogTopic STARTUP;
static LogTopic SUPERVISION;
static LogTopic SYSCALL;
static LogTopic THREADS;
static LogTopic V8;

View File

@ -1769,9 +1769,13 @@ static void JS_Log(v8::FunctionCallbackInfo<v8::Value> const& args) {
msg = ls + "!" + msg;
}
LogTopic* topic = ts.empty() ? &Logger::FIXME : LogTopic::lookup(ts);
LogTopic* topic = ts.empty() ? nullptr : LogTopic::lookup(ts);
LOG_TOPIC_RAW(ll, *topic) << msg;
if (topic == nullptr) {
LOG_TOPIC_RAW(ll, Logger::FIXME) << msg;
} else {
LOG_TOPIC_RAW(ll, *topic) << msg;
}
TRI_V8_RETURN_UNDEFINED();
TRI_V8_TRY_CATCH_END

View File

@ -155,8 +155,8 @@ if [ ! -z "$INTERACTIVE_MODE" ] ; then
fi
SFRE=5.0
COMP=1000
KEEP=100
COMP=200000
KEEP=500
MINT=0.2
MAXT=1.0
AG_BASE=$(( $PORT_OFFSET + 4001 ))

View File

@ -148,11 +148,11 @@ if [[ $(( $NRAGENTS % 2 )) == 0 ]]; then
exit 1
fi
MINP=0.2
MAXP=1.0
MINP=0.5
MAXP=2.5
SFRE=2.5
COMP=100
KEEP=10
COMP=200000
KEEP=500
BASE=5000
if [ "$GOSSIP_MODE" = "0" ]; then