1
0
Fork 0

Merge branch 'devel' of github.com:arangodb/ArangoDB into 3.3

This commit is contained in:
Wilfried Goesgens 2017-10-06 10:32:15 +02:00
commit 255124eb9f
29 changed files with 293 additions and 532 deletions

View File

@ -1,6 +1,10 @@
v3.3.milestone0 (2017-10-04)
----------------------------
* added config option `--log.color` to toggle colorful logging to terminal
* added config option `--log.thread-name` to additionally log thread names
* usernames must not start with `:role:`, added new options:
--server.authentication-timeout
--ldap.roles-attribute-name

View File

@ -109,7 +109,16 @@ will be printed in UTC / Zulu time. The date and time format used in logs
is always `YYYY-MM-DD HH:MM:SS`, regardless of this setting. If UTC time
is used, a `Z` will be appended to indicate Zulu time.
### Line number
### Color logging
`--log.color value`
Logging to terminal output is by default colored. Colorful logging can be
turned off by setting the value to false.
### Source file and Line number
Log line number: `--log.line-number`
@ -124,9 +133,9 @@ Log prefix: `--log.prefix prefix`
This option is used specify an prefix to logged text.
### Thread
### Threads
Log thread identifier: `--log.thread`
Log thread identifier: `--log.thread true`
Whenever log output is generated, the process ID is written as part of the
log information. Setting this option appends the thread id of the calling
@ -144,9 +153,12 @@ when no thread is logged and
when this command line option is set.
To also log thread names, it is possible to set the `--log.thread-name`
option. By default `--log.thread-name` is set to `false`.
### Role
Log role: `--log.role`
Log role: `--log.role true`
When set to `true`, this option will make the ArangoDB logger print a single
character with the server's role into each logged message. The roles are:

View File

@ -715,7 +715,8 @@ if test -n "${DOWNLOAD_SYNCER_USER}"; then
fi
if ! test -f "${BUILD_DIR}/${FN}-${SYNCER_REV}"; then
rm -f "${FN}"
curl -LJO# -H 'Accept: application/octet-stream' "${SYNCER_URL}?access_token=${OAUTH_TOKEN}"
curl -LJO# -H 'Accept: application/octet-stream' "${SYNCER_URL}?access_token=${OAUTH_TOKEN}" || \
${SRC}/Installation/Jenkins/curl_time_machine.sh "${SYNCER_URL}?access_token=${OAUTH_TOKEN}" "${FN}"
mv "${FN}" "${BUILD_DIR}/${TN}"
${MD5} < "${BUILD_DIR}/${TN}" | ${SED} "s; .*;;" > "${BUILD_DIR}/${FN}-${SYNCER_REV}"
OLD_MD5=$(cat "${BUILD_DIR}/${FN}-${SYNCER_REV}")

View File

@ -0,0 +1,9 @@
#!/bin/bash
# Work around stoneage curl versions that can't follow redirects
curl -D /tmp/headers -H 'Accept: application/octet-stream' "$1"
for URL in $(grep location: /tmp/headers |sed -e "s;\r;;" -e "s;.*: ;;"); do
curl $URL > $2
done

View File

@ -1,53 +0,0 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Kaveh Vahedipour
////////////////////////////////////////////////////////////////////////////////
#include "ActivationCallback.h"
#include "Agency/Agent.h"
using namespace arangodb::consensus;
using namespace arangodb::velocypack;
ActivationCallback::ActivationCallback() : _agent(nullptr){}
ActivationCallback::ActivationCallback(
Agent* agent, std::string const& failed, std::string const& replacement)
: _agent(agent),
_failed(failed),
_replacement(replacement) {}
void ActivationCallback::shutdown() { _agent = nullptr; }
bool ActivationCallback::operator()(arangodb::ClusterCommResult* res) {
if (res->status == CL_COMM_SENT) {
if (_agent) {
auto v = res->result->getBodyVelocyPack();
_agent->reportActivated(_failed, _replacement, v);
}
} else {
LOG_TOPIC(DEBUG, Logger::AGENCY)
<< "activation_comm_status(" << res->status << "), replacement("
<< _replacement << ")";
}
return true;
}

View File

@ -1,53 +0,0 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Kaveh Vahedipour
////////////////////////////////////////////////////////////////////////////////
#ifndef ARANGOD_CONSENSUS_ACTIVATION_CALLBACK_H
#define ARANGOD_CONSENSUS_ACTIVATION_CALLBACK_H 1
#include "Agency/AgencyCommon.h"
#include "Cluster/ClusterComm.h"
namespace arangodb {
namespace consensus {
class Agent;
class ActivationCallback : public arangodb::ClusterCommCallback {
public:
ActivationCallback();
ActivationCallback(Agent*, std::string const&, std::string const&);
virtual bool operator()(arangodb::ClusterCommResult*) override final;
void shutdown();
private:
Agent* _agent;
std::string _failed;
std::string _replacement;
};
}
} // namespace
#endif

View File

@ -51,7 +51,6 @@ Agent::Agent(config_t const& config)
_readDB(this),
_transient(this),
_agentNeedsWakeup(false),
_activator(nullptr),
_compactor(this),
_ready(false),
_preparing(false) {
@ -256,6 +255,10 @@ void Agent::reportIn(std::string const& peerId, index_t index, size_t toLog) {
LOG_TOPIC(WARN, Logger::AGENCY) << "Last confirmation from peer "
<< peerId << " was received more than minPing ago: " << d.count();
}
LOG_TOPIC(DEBUG, Logger::AGENCY) << "Setting _lastAcked["
<< peerId << "] to time "
<< std::chrono::duration_cast<std::chrono::microseconds>(
t.time_since_epoch()).count();
_lastAcked[peerId] = t;
if (index > _confirmed[peerId]) { // progress this follower?
@ -288,7 +291,7 @@ void Agent::reportFailed(std::string const& slaveId, size_t toLog) {
}
/// Followers' append entries
bool Agent::recvAppendEntriesRPC(
priv_rpc_ret_t Agent::recvAppendEntriesRPC(
term_t term, std::string const& leaderId, index_t prevIndex, term_t prevTerm,
index_t leaderCommitIndex, query_t const& queries) {
@ -297,17 +300,18 @@ bool Agent::recvAppendEntriesRPC(
VPackSlice payload = queries->slice();
term_t t(this->term());
// Update commit index
if (payload.type() != VPackValueType::Array) {
LOG_TOPIC(DEBUG, Logger::AGENCY)
<< "Received malformed entries for appending. Discarding!";
return false;
return priv_rpc_ret_t(false,t);
}
if (!_constituent.checkLeader(term, leaderId, prevIndex, prevTerm)) {
LOG_TOPIC(DEBUG, Logger::AGENCY)
<< "Not accepting appendEntries from " << leaderId;
return false;
return priv_rpc_ret_t(false,t);
}
size_t nqs = payload.length();
@ -315,7 +319,7 @@ bool Agent::recvAppendEntriesRPC(
if (nqs == 0) {
LOG_TOPIC(DEBUG, Logger::AGENCY) << "Finished empty AppendEntriesRPC from "
<< leaderId << " with term " << term;
return true;
return priv_rpc_ret_t(true,t);
}
bool ok = true;
@ -348,7 +352,7 @@ bool Agent::recvAppendEntriesRPC(
LOG_TOPIC(DEBUG, Logger::AGENCY) << "Finished AppendEntriesRPC from "
<< leaderId << " with term " << term;
return ok;
return priv_rpc_ret_t(ok,t);
}
/// Leader's append entries
@ -511,8 +515,7 @@ void Agent::sendAppendEntriesRPC() {
// Really leading?
{
if (challengeLeadership()) {
_constituent.candidate();
_preparing = false;
resign();
return;
}
}
@ -537,7 +540,10 @@ void Agent::sendAppendEntriesRPC() {
// message if a timeout occurs.
_lastSent[followerId] = system_clock::now();
_constituent.notifyHeartbeatSent(followerId);
// _constituent.notifyHeartbeatSent(followerId);
// Do not notify constituent, because the AppendEntriesRPC here could
// take a very long time, so this must not disturb the empty ones
// being sent out.
LOG_TOPIC(DEBUG, Logger::AGENCY)
<< "Appending (" << (uint64_t) (TRI_microtime() * 1000000000.0) << ") "
@ -552,6 +558,12 @@ void Agent::sendAppendEntriesRPC() {
}
void Agent::resign(term_t otherTerm) {
_constituent.follow(otherTerm);
_preparing = false;
}
/// Leader's append entries, empty ones for heartbeat, triggered by Constituent
void Agent::sendEmptyAppendEntriesRPC(std::string followerId) {
@ -591,8 +603,13 @@ void Agent::sendEmptyAppendEntriesRPC(std::string followerId) {
3 * _config.minPing() * _config.timeoutMult(), true);
_constituent.notifyHeartbeatSent(followerId);
double now = TRI_microtime();
LOG_TOPIC(DEBUG, Logger::AGENCY)
<< "Sending empty appendEntriesRPC to follower " << followerId;
double diff = TRI_microtime() - now;
if (diff > 0.01) {
LOG_TOPIC(DEBUG, Logger::AGENCY) << "Logging of a line took more than 1/100 of a second, this is bad:" << diff;
}
}
void Agent::advanceCommitIndex() {
@ -794,12 +811,37 @@ bool Agent::challengeLeadership() {
MUTEX_LOCKER(tiLocker, _tiLock);
size_t good = 0;
std::string const myid = id();
for (auto const& i : _lastAcked) {
duration<double> m = system_clock::now() - i.second;
if (0.9 * _config.minPing() * _config.timeoutMult() > m.count()) {
++good;
if (i.first != myid) { // do not count ourselves
duration<double> m = system_clock::now() - i.second;
LOG_TOPIC(DEBUG, Logger::AGENCY) << "challengeLeadership: found "
"_lastAcked[" << i.first << "] to be "
<< std::chrono::duration_cast<std::chrono::microseconds>(
i.second.time_since_epoch()).count()
<< " which is " << static_cast<uint64_t>(m.count() * 1000000.0)
<< " microseconds in the past.";
// This is rather arbitrary here: We used to have 0.9 here to absolutely
// ensure that a leader resigns before another one even starts an election.
// However, the Raft paper does not mention this at all. Rather, in the
// paper it is written that the leader should resign immediately if it
// sees a higher term from another server. Currently we have not
// implemented to return the follower's term with a response to
// AppendEntriesRPC, so the leader cannot find out a higher term this
// way. The leader can, however, see a higher term in the incoming
// AppendEntriesRPC a new leader sends out, and it will immediately
// resign if it sees that. For the moment, this value here can stay.
// We should soon implement sending the follower's term back with
// each response and probably get rid of this method altogether,
// but this requires a bit more thought.
if (_config.maxPing() * _config.timeoutMult() > m.count()) {
++good;
}
}
}
LOG_TOPIC(DEBUG, Logger::AGENCY) << "challengeLeadership: good=" << good;
return (good < size() / 2); // not counting myself
}
@ -856,8 +898,7 @@ trans_ret_t Agent::transact(query_t const& queries) {
{
// Only leader else redirect
if (challengeLeadership()) {
_constituent.candidate();
_preparing = false;
resign();
return trans_ret_t(false, NO_LEADER);
}
@ -919,8 +960,7 @@ trans_ret_t Agent::transient(query_t const& queries) {
// Only leader else redirect
if (challengeLeadership()) {
_constituent.candidate();
_preparing = false;
resign();
return trans_ret_t(false, NO_LEADER);
}
@ -1032,8 +1072,7 @@ write_ret_t Agent::write(query_t const& query, bool discardStartup) {
// Only leader else redirect
if (multihost && challengeLeadership()) {
_constituent.candidate();
_preparing = false;
resign();
return write_ret_t(false, NO_LEADER);
}
@ -1081,8 +1120,7 @@ read_ret_t Agent::read(query_t const& query) {
// Only leader else redirect
if (challengeLeadership()) {
_constituent.candidate();
_preparing = false;
resign();
return read_ret_t(false, NO_LEADER);
}
@ -1101,10 +1139,6 @@ read_ret_t Agent::read(query_t const& query) {
/// Send out append entries to followers regularly or on event
void Agent::run() {
using namespace std::chrono;
auto tp = system_clock::now();
// Only run in case we are in multi-host mode
while (!this->isStopping() && size() > 1) {
@ -1134,14 +1168,6 @@ void Agent::run() {
// loop often enough in cases of high load.
}
}
// Detect faulty agent and replace
// if possible and only if not already activating
if (duration<double>(system_clock::now() - tp).count() > 10.0) {
detectActiveAgentFailures();
tp = system_clock::now();
}
} else {
CONDITION_LOCKER(guard, _appendCV);
if (!_agentNeedsWakeup) {
@ -1153,41 +1179,6 @@ void Agent::run() {
}
void Agent::reportActivated(
std::string const& failed, std::string const& replacement, query_t state) {
term_t myterm = _constituent.term();
if (state->slice().get("success").getBoolean()) {
{
MUTEX_LOCKER(tiLocker, _tiLock);
_confirmed.erase(failed);
auto commitIndex = state->slice().get("commitId").getNumericValue<index_t>();
_confirmed[replacement] = commitIndex;
_lastAcked[replacement] = system_clock::now();
_config.swapActiveMember(failed, replacement);
}
{
MUTEX_LOCKER(actLock, _activatorLock);
if (_activator->isRunning()) {
_activator->beginShutdown();
}
_activator.reset(nullptr);
}
}
persistConfiguration(myterm);
// Notify inactive pool
notifyInactive();
}
void Agent::persistConfiguration(term_t t) {
// Agency configuration
@ -1211,48 +1202,6 @@ void Agent::persistConfiguration(term_t t) {
}
void Agent::failedActivation(
std::string const& failed, std::string const& replacement) {
MUTEX_LOCKER(actLock, _activatorLock);
_activator.reset(nullptr);
}
void Agent::detectActiveAgentFailures() {
// Detect faulty agent if pool larger than agency
std::unordered_map<std::string, TimePoint> lastAcked;
{
MUTEX_LOCKER(tiLocker, _tiLock);
lastAcked = _lastAcked;
}
MUTEX_LOCKER(actLock, _activatorLock);
if (_activator != nullptr) {
return;
}
if (_config.poolSize() > _config.size()) {
std::vector<std::string> active = _config.active();
for (auto const& id : active) {
if (id != this->id()) {
auto ds = duration<double>(
system_clock::now() - lastAcked.at(id)).count();
if (ds > 180.0) {
std::string repl = _config.nextAgentInLine();
LOG_TOPIC(DEBUG, Logger::AGENCY)
<< "Active agent " << id << " has failed. << " << repl
<< " will be promoted to active agency membership";
_activator = std::make_unique<AgentActivator>(this, id, repl);
_activator->start();
return;
}
}
}
}
}
/// Orderly shutdown
void Agent::beginShutdown() {
Thread::beginShutdown();
@ -1317,9 +1266,6 @@ void Agent::lead() {
persistConfiguration(myterm);
// Notify inactive pool
notifyInactive();
{
CONDITION_LOCKER(guard, _waitForCV);
// Note that when we are stopping
@ -1710,12 +1656,6 @@ query_t Agent::gossip(query_t const& in, bool isCallback, size_t version) {
}
void Agent::reportMeasurement(query_t const& query) {
if (_inception != nullptr) {
_inception->reportIn(query);
}
}
void Agent::resetRAFTTimes(double min_timeout, double max_timeout) {
_config.pingTimes(min_timeout,max_timeout);
}

View File

@ -25,7 +25,6 @@
#define ARANGOD_CONSENSUS_AGENT_H 1
#include "Agency/AgencyCommon.h"
#include "Agency/AgentActivator.h"
#include "Agency/AgentCallback.h"
#include "Agency/AgentConfiguration.h"
#include "Agency/AgentInterface.h"
@ -134,13 +133,16 @@ class Agent : public arangodb::Thread,
/// @brief Received by followers to replicate log entries ($5.3);
/// also used as heartbeat ($5.2).
bool recvAppendEntriesRPC(term_t term, std::string const& leaderId,
index_t prevIndex, term_t prevTerm,
priv_rpc_ret_t recvAppendEntriesRPC(term_t term, std::string const& leaderId,
index_t prevIndex, term_t prevTerm,
index_t leaderCommitIndex, query_t const& queries);
/// @brief Resign leadership
void resign(term_t otherTerm = 0);
private:
/// @brief Invoked by leader to replicate log entries ($5.3);
/// also used as heartbeat ($5.2).
private:
void sendAppendEntriesRPC();
/// @brief check whether _confirmed indexes have been advance so that we
@ -167,12 +169,6 @@ class Agent : public arangodb::Thread,
/// @brief Persisted agents
bool persistedAgents();
/// @brief Activate new agent in pool to replace failed
void reportActivated(std::string const&, std::string const&, query_t);
/// @brief Activate new agent in pool to replace failed
void failedActivation(std::string const&, std::string const&);
/// @brief Gossip in
bool activeAgency();
@ -227,9 +223,6 @@ class Agent : public arangodb::Thread,
/// @brief Get notification as inactive pool member
void notify(query_t const&);
/// @brief Detect active agent failures
void detectActiveAgentFailures();
/// @brief All there is in the state machine
query_t allLogs() const;
@ -242,9 +235,6 @@ class Agent : public arangodb::Thread,
/// @brief Become active agent
query_t activate(query_t const&);
/// @brief Report measured round trips to inception
void reportMeasurement(query_t const&);
/// @brief Are we ready for RAFT?
bool ready() const;
@ -416,9 +406,6 @@ class Agent : public arangodb::Thread,
/// @brief Inception thread getting an agent up to join RAFT from cmd or persistence
std::unique_ptr<Inception> _inception;
/// @brief Activator thread for the leader to wake up a sleeping agent from pool
std::unique_ptr<AgentActivator> _activator;
/// @brief Compactor
Compactor _compactor;

View File

@ -1,95 +0,0 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Kaveh Vahedipour
////////////////////////////////////////////////////////////////////////////////
#include "AgentActivator.h"
#include <chrono>
#include <thread>
#include "Agency/Agent.h"
#include "Agency/ActivationCallback.h"
#include "Basics/ConditionLocker.h"
using namespace arangodb::consensus;
using namespace std::chrono;
AgentActivator::AgentActivator() : Thread("AgentActivator"), _agent(nullptr) {}
AgentActivator::AgentActivator(Agent* agent, std::string const& failed,
std::string const& replacement)
: Thread("AgentActivator"),
_agent(agent),
_failed(failed),
_replacement(replacement) {}
// Shutdown if not already
AgentActivator::~AgentActivator() {
LOG_TOPIC(DEBUG, Logger::AGENCY) << "Done activating " << _replacement;
shutdown();
}
void AgentActivator::run() {
LOG_TOPIC(DEBUG, Logger::AGENCY) << "Starting activation of " << _replacement;
std::string const path = privApiPrefix + "activate";
auto const started = system_clock::now();
auto const timeout = seconds(60);
auto const endpoint = _agent->config().pool().at(_replacement);
CONDITION_LOCKER(guard, _cv);
while (!this->isStopping()) {
// All snapshots and all logs
query_t allLogs = _agent->allLogs();
auto headerFields =
std::make_unique<std::unordered_map<std::string, std::string>>();
auto cc = arangodb::ClusterComm::instance();
if (cc != nullptr) {
// nullptr only happens on controlled shutdown
cc->asyncRequest(
"1", 1, endpoint, rest::RequestType::POST, path,
std::make_shared<std::string>(allLogs->toJson()), headerFields,
std::make_shared<ActivationCallback>(_agent, _failed, _replacement),
5.0, true, 1.0);
}
_cv.wait(10000000); // 10 sec
if ((std::chrono::system_clock::now() - started) > timeout) {
_agent->failedActivation(_failed, _replacement);
LOG_TOPIC(WARN, Logger::AGENCY)
<< "Timed out while activating agent " << _replacement;
break;
}
}
}
void AgentActivator::beginShutdown() {
Thread::beginShutdown();
CONDITION_LOCKER(guard, _cv);
guard.broadcast();
}

View File

@ -1,63 +0,0 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Kaveh Vahedipour
////////////////////////////////////////////////////////////////////////////////
#ifndef ARANGOD_CONSENSUS_AGENT_ACTIVATOR_H
#define ARANGOD_CONSENSUS_AGENT_ACTIVATOR_H 1
#include <memory>
#include "Basics/Common.h"
#include "Basics/ConditionVariable.h"
#include "Basics/Thread.h"
#include <velocypack/Builder.h>
#include <velocypack/velocypack-aliases.h>
namespace arangodb {
namespace consensus {
class Agent;
class AgentActivator : public Thread {
public:
AgentActivator();
AgentActivator(Agent*, std::string const&, std::string const&);
~AgentActivator();
void beginShutdown() override;
void run() override;
private:
Agent* _agent;
std::string _failed;
std::string _replacement;
arangodb::basics::ConditionVariable _cv;
};
}
}
#endif

View File

@ -42,7 +42,16 @@ bool AgentCallback::operator()(arangodb::ClusterCommResult* res) {
if (res->status == CL_COMM_SENT) {
if (_agent) {
auto body = res->result->getBodyVelocyPack();
if (!body->slice().get("success").isTrue()) {
term_t otherTerm = 0;
try {
otherTerm = body->slice().get("term").getNumber<term_t>();
} catch (std::exception const& e) {
LOG_TOPIC(WARN, Logger::AGENCY) <<
"Received agent call back without or with invalid term";
}
if (otherTerm > _agent->term()) {
_agent->resign(otherTerm);
} else if (!body->slice().get("success").isTrue()) {
LOG_TOPIC(DEBUG, Logger::CLUSTER)
<< "Got negative answer from follower, will retry later.";
_agent->reportFailed(_slaveID, _toLog);

View File

@ -556,13 +556,13 @@ query_t config_t::toBuilder() const {
}
}
/* ret->add(VPackValue(activeStr));
ret->add(VPackValue(activeStr));
{
VPackArrayBuilder bb(ret.get());
for (auto const& i : _active) {
ret->add(VPackValue(i));
}
}*/
}
ret->add(idStr, VPackValue(_id));
ret->add(agencySizeStr, VPackValue(_agencySize));

View File

@ -203,7 +203,11 @@ void Constituent::follow(term_t t) {
void Constituent::followNoLock(term_t t) {
_castLock.assertLockedByCurrentThread();
_term = t;
if (t > 0 && t != _term) {
LOG_TOPIC(DEBUG, Logger::AGENCY)
<< "Changing term from " << _term << " to " << t;
_term = t;
}
_role = FOLLOWER;
LOG_TOPIC(INFO, Logger::AGENCY) << "Set _role to FOLLOWER in term " << _term;
@ -271,7 +275,7 @@ void Constituent::candidate() {
if (_leaderID != NO_LEADER) {
_leaderID = NO_LEADER;
LOG_TOPIC(DEBUG, Logger::AGENCY) << "Set _leaderID to NO_LEADER";
LOG_TOPIC(DEBUG, Logger::AGENCY) << "Set _leaderID to NO_LEADER in Constituent::candidate";
}
if (_role != CANDIDATE) {
@ -739,17 +743,12 @@ void Constituent::run() {
} else if (role == CANDIDATE) {
callElection(); // Run for office
} else {
// This is 1/4th of the minPing timeout (_cv.wait() below is in
// microseconds):
uint64_t timeout =
static_cast<uint64_t>(250000.0 * _agent->config().minPing() *
_agent->config().timeoutMult());
{
CONDITION_LOCKER(guardv, _cv);
_cv.wait(timeout);
}
double interval = 0.25 * _agent->config().minPing()
* _agent->config().timeoutMult();
double now = TRI_microtime();
double nextWakeup = interval; // might be lowered below
std::string const myid = _agent->id();
for (auto const& followerId : _agent->config().active()) {
if (followerId != myid) {
@ -757,10 +756,21 @@ void Constituent::run() {
{
MUTEX_LOCKER(guard, _heartBeatMutex);
auto it = _lastHeartbeatSent.find(followerId);
if (it == _lastHeartbeatSent.end() ||
now - it->second > _agent->config().minPing()
* _agent->config().timeoutMult() / 4.0) {
if (it == _lastHeartbeatSent.end()) {
needed = true;
} else {
double diff = now - it->second;
if (diff >= interval) {
needed = true;
} else {
// diff < interval, so only needed again in interval-diff s
double waitOnly = interval - diff;
if (nextWakeup > waitOnly) {
nextWakeup = waitOnly;
}
LOG_TOPIC(DEBUG, Logger::AGENCY)
<< "No need for empty AppendEntriesRPC: " << diff;
}
}
}
if (needed) {
@ -768,6 +778,14 @@ void Constituent::run() {
}
}
}
// This is the smallest time until any of the followers need a
// new empty heartbeat:
uint64_t timeout = static_cast<uint64_t>(1000000.0 * nextWakeup);
{
CONDITION_LOCKER(guardv, _cv);
_cv.wait(timeout);
}
}
}
}

View File

@ -313,28 +313,48 @@ bool Inception::restartingActiveAgent() {
_agent->notify(agency);
return true;
}
auto const theirActive = tcc.get("active").toJson();
auto const myActive = myConfig.activeToBuilder()->toJson();
auto const theirActive = tcc.get("active");
auto const myActiveB = myConfig.activeToBuilder();
auto const myActive = myActiveB->slice();
auto i = std::find(active.begin(),active.end(),p.first);
if (i != active.end()) {
if (theirActive != myActive) {
if (!this->isStopping()) {
LOG_TOPIC(FATAL, Logger::AGENCY)
<< "Assumed active RAFT peer and I disagree on active membership:";
LOG_TOPIC(FATAL, Logger::AGENCY)
<< "Their active list is " << theirActive;
LOG_TOPIC(FATAL, Logger::AGENCY)
<< "My active list is " << myActive;
FATAL_ERROR_EXIT();
if (i != active.end()) { // Member in my active list
TRI_ASSERT(theirActive.isArray());
if (theirActive.length() == myActive.length()) {
std::vector<std::string> theirActVec, myActVec;
for (auto const i : VPackArrayIterator(theirActive)) {
theirActVec.push_back(i.copyString());
}
for (auto const i : VPackArrayIterator(myActive)) {
myActVec.push_back(i.copyString());
}
std::sort(myActVec.begin(),myActVec.end());
std::sort(theirActVec.begin(),theirActVec.end());
if (theirActVec != myActVec) {
if (!this->isStopping()) {
LOG_TOPIC(FATAL, Logger::AGENCY)
<< "Assumed active RAFT peer and I disagree on active membership:";
LOG_TOPIC(FATAL, Logger::AGENCY)
<< "Their active list is " << theirActive.toJson();
LOG_TOPIC(FATAL, Logger::AGENCY)
<< "My active list is " << myActive.toJson();
FATAL_ERROR_EXIT();
}
return false;
} else {
*i = "";
}
return false;
} else {
*i = "";
LOG_TOPIC(FATAL, Logger::AGENCY)
<< "Assumed active RAFT peer and I disagree on active agency size:";
LOG_TOPIC(FATAL, Logger::AGENCY)
<< "Their active list is " << theirActive.toJson();
LOG_TOPIC(FATAL, Logger::AGENCY)
<< "My active list is " << myActive.toJson();
FATAL_ERROR_EXIT();
}
}
}
} catch (std::exception const& e) {
if (!this->isStopping()) {
LOG_TOPIC(FATAL, Logger::AGENCY)
@ -345,10 +365,9 @@ bool Inception::restartingActiveAgent() {
return false;
}
}
}
}
// Timed out? :(
if ((system_clock::now() - startTime) > timeout) {
@ -372,11 +391,6 @@ bool Inception::restartingActiveAgent() {
}
void Inception::reportIn(query_t const& query) {
// does nothing at the moment
}
void Inception::reportVersionForEp(std::string const& endpoint, size_t version) {
MUTEX_LOCKER(versionLocker, _vLock);
if (_acked[endpoint] < version) {

View File

@ -55,9 +55,6 @@ public:
/// @brief Defualt dtor
virtual ~Inception();
/// @brief Report in from other agents measurements
void reportIn(query_t const&);
/// @brief Report acknowledged version for peer id
void reportVersionForEp(std::string const&, size_t);

View File

@ -108,10 +108,11 @@ RestStatus RestAgencyPrivHandler::execute() {
readValue("prevLogIndex", prevLogIndex) &&
readValue("prevLogTerm", prevLogTerm) &&
readValue("leaderCommit", leaderCommit)) { // found all values
bool ret = _agent->recvAppendEntriesRPC(
term, id, prevLogIndex, prevLogTerm, leaderCommit,
_request->toVelocyPackBuilderPtr());
result.add("success", VPackValue(ret));
auto ret = _agent->recvAppendEntriesRPC(
term, id, prevLogIndex, prevLogTerm, leaderCommit,
_request->toVelocyPackBuilderPtr());
result.add("success", VPackValue(ret.success));
result.add("term", VPackValue(ret.term));
result.add("senderTimeStamp", VPackValue(senderTimeStamp));
} else {
return reportBadQuery(); // bad query
@ -140,26 +141,6 @@ RestStatus RestAgencyPrivHandler::execute() {
} else {
return reportBadQuery(); // bad query
}
} else if (suffixes[0] == "activate") { // notify
if (_request->requestType() != rest::RequestType::POST) {
return reportMethodNotAllowed();
}
query_t everything;
try {
everything = _request->toVelocyPackBuilderPtr();
} catch (std::exception const& e) {
LOG_TOPIC(ERR, Logger::AGENCY)
<< "Failure getting activation body:" << e.what();
}
try {
query_t res = _agent->activate(everything);
for (auto const& i : VPackObjectIterator(res->slice())) {
result.add(i.key.copyString(),i.value);
}
} catch (std::exception const& e) {
LOG_TOPIC(ERR, Logger::AGENCY) << "Activation failed: " << e.what();
}
} else if (suffixes[0] == "gossip") {
if (_request->requestType() != rest::RequestType::POST) {
return reportMethodNotAllowed();
@ -173,16 +154,6 @@ RestStatus RestAgencyPrivHandler::execute() {
} catch (std::exception const& e) {
return reportBadQuery(e.what());
}
} else if (suffixes[0] == "measure") {
if (_request->requestType() != rest::RequestType::POST) {
return reportMethodNotAllowed();
}
auto query = _request->toVelocyPackBuilderPtr();
try {
_agent->reportMeasurement(query);
} catch (std::exception const& e) {
return reportBadQuery(e.what());
}
} else if (suffixes[0] == "activeAgents") {
if (_request->requestType() != rest::RequestType::GET) {
return reportMethodNotAllowed();

View File

@ -68,12 +68,10 @@ SET(ARANGOD_SOURCES
Actions/ActionFeature.cpp
Actions/RestActionHandler.cpp
Actions/actions.cpp
Agency/ActivationCallback.cpp
Agency/AddFollower.cpp
Agency/AgencyComm.cpp
Agency/AgencyFeature.cpp
Agency/Agent.cpp
Agency/AgentActivator.cpp
Agency/AgentCallback.cpp
Agency/AgentConfiguration.cpp
Agency/CleanOutServer.cpp

View File

@ -45,6 +45,7 @@ using namespace arangodb::basics;
////////////////////////////////////////////////////////////////////////////////
static thread_local uint64_t LOCAL_THREAD_NUMBER = 0;
static thread_local char const* LOCAL_THREAD_NAME = nullptr;
#if !defined(ARANGODB_HAVE_GETTID) && !defined(_WIN32)
@ -78,6 +79,8 @@ void Thread::startThread(void* arg) {
TRI_ASSERT(ptr != nullptr);
ptr->_threadNumber = LOCAL_THREAD_NUMBER;
LOCAL_THREAD_NAME = ptr->name().c_str();
if (0 <= ptr->_affinity) {
TRI_SetProcessorAffinity(&ptr->_thread, ptr->_affinity);
@ -129,6 +132,13 @@ TRI_pid_t Thread::currentProcessId() {
////////////////////////////////////////////////////////////////////////////////
uint64_t Thread::currentThreadNumber() { return LOCAL_THREAD_NUMBER; }
////////////////////////////////////////////////////////////////////////////////
/// @brief returns the name of the current thread, if set
/// note that this function may return a nullptr
////////////////////////////////////////////////////////////////////////////////
char const* Thread::currentThreadName() { return LOCAL_THREAD_NAME; }
////////////////////////////////////////////////////////////////////////////////
/// @brief returns the thread id
@ -212,6 +222,8 @@ Thread::~Thread() {
<< ". shutting down hard";
FATAL_ERROR_ABORT();
}
LOCAL_THREAD_NAME = nullptr;
}
////////////////////////////////////////////////////////////////////////////////
@ -431,6 +443,7 @@ void Thread::runMe() {
void Thread::cleanupMe() {
if (_deleteOnExit) {
LOCAL_THREAD_NAME = nullptr;
delete this;
}
}

View File

@ -93,6 +93,13 @@ class Thread {
//////////////////////////////////////////////////////////////////////////////
static uint64_t currentThreadNumber();
//////////////////////////////////////////////////////////////////////////////
/// @brief returns the name of the current thread, if set
/// note that this function may return a nullptr
//////////////////////////////////////////////////////////////////////////////
static char const* currentThreadName();
//////////////////////////////////////////////////////////////////////////////
/// @brief returns the thread id

View File

@ -23,6 +23,7 @@
#include "Basics/Common.h"
#include <chrono>
#include <thread>
using namespace arangodb;
@ -149,11 +150,8 @@ time_t TRI_timegm(struct tm* tm) {
}
double TRI_microtime() {
struct timeval t;
gettimeofday(&t, 0);
return (t.tv_sec) + (t.tv_usec / 1000000.0);
return std::chrono::duration<double>( // time since epoch in seconds
std::chrono::system_clock::now().time_since_epoch()).count();
}
size_t TRI_numberProcessors() {

View File

@ -53,14 +53,14 @@ class LogAppender {
virtual ~LogAppender() {}
public:
virtual bool logMessage(LogLevel, std::string const& message,
virtual void logMessage(LogLevel, std::string const& message,
size_t offset) = 0;
virtual std::string details() = 0;
public:
bool logMessage(LogLevel level, std::string const& message) {
return logMessage(level, message, 0);
void logMessage(LogLevel level, std::string const& message) {
logMessage(level, message, 0);
}
bool checkContent(std::string const& message) {

View File

@ -39,9 +39,9 @@ std::vector<std::pair<int, std::string>> LogAppenderFile::_fds = {};
LogAppenderStream::LogAppenderStream(std::string const& filename,
std::string const& filter, int fd)
: LogAppender(filter), _bufferSize(0), _fd(fd), _isTty(false) {}
: LogAppender(filter), _bufferSize(0), _fd(fd), _useColors(false) {}
bool LogAppenderStream::logMessage(LogLevel level, std::string const& message,
void LogAppenderStream::logMessage(LogLevel level, std::string const& message,
size_t offset) {
// check max. required output length
size_t const neededBufferSize = TRI_MaxLengthEscapeControlsCString(message.size());
@ -60,7 +60,7 @@ bool LogAppenderStream::logMessage(LogLevel level, std::string const& message,
_bufferSize = neededBufferSize * 2;
} catch (...) {
// if allocation fails, simply give up
return false;
return;
}
}
@ -78,8 +78,6 @@ bool LogAppenderStream::logMessage(LogLevel level, std::string const& message,
_buffer.reset();
_bufferSize = 0;
}
return _isTty;
}
LogAppenderFile::LogAppenderFile(std::string const& filename, std::string const& filter)
@ -116,20 +114,20 @@ LogAppenderFile::LogAppenderFile(std::string const& filename, std::string const&
}
}
_isTty = (isatty(_fd) == 1);
_useColors = ((isatty(_fd) == 1) && Logger::getUseColor());
}
void LogAppenderFile::writeLogMessage(LogLevel level, char const* buffer, size_t len) {
bool giveUp = false;
int fd = _fd;
while (len > 0) {
ssize_t n = TRI_WRITE(fd, buffer, static_cast<TRI_write_t>(len));
ssize_t n = TRI_WRITE(_fd, buffer, static_cast<TRI_write_t>(len));
if (n < 0) {
fprintf(stderr, "cannot log data: %s\n", TRI_LAST_ERROR_STR);
return; // give up, but do not try to log failure
} else if (n == 0) {
return; // give up, but do not try to log the failure via the Logger
}
if (n == 0) {
if (!giveUp) {
giveUp = true;
continue;
@ -141,8 +139,10 @@ void LogAppenderFile::writeLogMessage(LogLevel level, char const* buffer, size_t
}
if (level == LogLevel::FATAL) {
FILE* f = TRI_FDOPEN(fd, "w+");
FILE* f = TRI_FDOPEN(_fd, "a");
if (f != nullptr) {
// valid file pointer...
// now flush the file one last time before we shut down
fflush(f);
}
}
@ -204,6 +204,7 @@ void LogAppenderFile::closeAll() {
it.first = -1;
if (fd > STDERR_FILENO) {
fsync(fd);
TRI_TRACKED_CLOSE_FILE(fd);
}
}
@ -211,7 +212,7 @@ void LogAppenderFile::closeAll() {
LogAppenderStdStream::LogAppenderStdStream(std::string const& filename, std::string const& filter, int fd)
: LogAppenderStream(filename, filter, fd) {
_isTty = (isatty(_fd) == 1);
_useColors = ((isatty(_fd) == 1) && Logger::getUseColor());
}
LogAppenderStdStream::~LogAppenderStdStream() {
@ -221,15 +222,15 @@ LogAppenderStdStream::~LogAppenderStdStream() {
}
void LogAppenderStdStream::writeLogMessage(LogLevel level, char const* buffer, size_t len) {
writeLogMessage(_fd, _isTty, level, buffer, len, false);
writeLogMessage(_fd, _useColors, level, buffer, len, false);
}
void LogAppenderStdStream::writeLogMessage(int fd, bool isTty, LogLevel level, char const* buffer, size_t len, bool appendNewline) {
void LogAppenderStdStream::writeLogMessage(int fd, bool useColors, LogLevel level, char const* buffer, size_t len, bool appendNewline) {
// out stream
FILE* fp = (fd == STDOUT_FILENO ? stdout : stderr);
char const* nl = (appendNewline ? "\n" : "");
if (isTty) {
if (useColors) {
// joyful color output
if (level == LogLevel::FATAL || level == LogLevel::ERR) {
fprintf(fp, "%s%s%s%s", ShellColorsFeature::SHELL_COLOR_RED, buffer, ShellColorsFeature::SHELL_COLOR_RESET, nl);
@ -243,8 +244,12 @@ void LogAppenderStdStream::writeLogMessage(int fd, bool isTty, LogLevel level, c
fprintf(fp, "%s%s", buffer, nl);
}
if (level == LogLevel::FATAL || level == LogLevel::ERR || level == LogLevel::WARN) {
if (level == LogLevel::FATAL || level == LogLevel::ERR ||
level == LogLevel::WARN || level == LogLevel::INFO) {
// flush the output so it becomes visible immediately
// at least for log levels that are used seldomly
// it would probably be overkill to flush everytime we
// encounter a log message for level DEBUG or TRACE
fflush(fp);
}
}

View File

@ -32,7 +32,7 @@ class LogAppenderStream : public LogAppender {
LogAppenderStream(std::string const& filename, std::string const& filter, int fd);
~LogAppenderStream() {}
bool logMessage(LogLevel, std::string const& message,
void logMessage(LogLevel, std::string const& message,
size_t offset) override final;
virtual std::string details() override = 0;
@ -56,8 +56,8 @@ class LogAppenderStream : public LogAppender {
/// @brief file descriptor
int _fd;
/// @brief whether or not the outfile is a tty
bool _isTty;
/// @brief whether or not we should use colors
bool _useColors;
};
class LogAppenderFile : public LogAppenderStream {
@ -85,7 +85,7 @@ class LogAppenderStdStream : public LogAppenderStream {
std::string details() override final { return std::string(); }
static void writeLogMessage(int fd, bool isTty, LogLevel, char const* p, size_t length, bool appendNewline);
static void writeLogMessage(int fd, bool useColors, LogLevel, char const* p, size_t length, bool appendNewline);
private:
void writeLogMessage(LogLevel, char const*, size_t) override final;

View File

@ -81,7 +81,7 @@ LogAppenderSyslog::LogAppenderSyslog(std::string const& facility,
_opened = true;
}
bool LogAppenderSyslog::logMessage(LogLevel level, std::string const& message,
void LogAppenderSyslog::logMessage(LogLevel level, std::string const& message,
size_t offset) {
int priority = LOG_ERR;
@ -110,9 +110,6 @@ bool LogAppenderSyslog::logMessage(LogLevel level, std::string const& message,
if (_opened) {
::syslog(priority, "%s", message.c_str() + offset);
}
// not logging to TTY
return false;
}
std::string LogAppenderSyslog::details() {

View File

@ -37,7 +37,7 @@ class LogAppenderSyslog : public LogAppender {
LogAppenderSyslog(std::string const& facility, std::string const& name,
std::string const& filter);
bool logMessage(LogLevel, std::string const& message,
void logMessage(LogLevel, std::string const& message,
size_t offset) override final;
std::string details() override final;

View File

@ -56,7 +56,9 @@ std::atomic<LogLevel> Logger::_level(LogLevel::INFO);
bool Logger::_showLineNumber(false);
bool Logger::_shortenFilenames(true);
bool Logger::_showThreadIdentifier(false);
bool Logger::_showThreadName(false);
bool Logger::_threaded(false);
bool Logger::_useColor(true);
bool Logger::_useLocalTime(false);
bool Logger::_keepLogRotate(false);
bool Logger::_useMicrotime(false);
@ -181,6 +183,27 @@ void Logger::setShowThreadIdentifier(bool show) {
_showThreadIdentifier = show;
}
// NOTE: this function should not be called if the logging is active.
void Logger::setShowThreadName(bool show) {
if (_active) {
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL,
"cannot change show thread name if logging is active");
}
_showThreadName = show;
}
// NOTE: this function should not be called if the logging is active.
void Logger::setUseColor(bool value) {
if (_active) {
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL,
"cannot change color if logging is active");
}
_useColor = value;
}
// NOTE: this function should not be called if the logging is active.
void Logger::setUseLocalTime(bool show) {
if (_active) {
@ -291,17 +314,23 @@ void Logger::log(char const* function, char const* file, long int line,
}
// append the process / thread identifier
TRI_pid_t processId = Thread::currentProcessId();
out << '[' << Thread::currentProcessId();
if (_showThreadIdentifier) {
uint64_t threadNumber = Thread::currentThreadNumber();
snprintf(buf, sizeof(buf), "[%llu-%llu] ",
(unsigned long long)processId, (unsigned long long)threadNumber);
} else {
snprintf(buf, sizeof(buf), "[%llu] ",
(unsigned long long)processId);
out << '-' << Thread::currentThreadNumber();
}
out << buf;
// log thread name
if (_showThreadName) {
char const* threadName = Thread::currentThreadName();
if (threadName == nullptr) {
threadName = "main";
}
out << '-' << threadName;
}
out << "] ";
if (_showRole && _role != '\0') {
out << _role << ' ';

View File

@ -211,6 +211,9 @@ class Logger {
static bool getShowRole() {return _showRole;};
static void setShortenFilenames(bool);
static void setShowThreadIdentifier(bool);
static void setShowThreadName(bool);
static void setUseColor(bool);
static bool getUseColor() {return _useColor;};
static void setUseLocalTime(bool);
static bool getUseLocalTime() {return _useLocalTime;};
static void setUseMicrotime(bool);
@ -248,8 +251,10 @@ class Logger {
static bool _showLineNumber;
static bool _shortenFilenames;
static bool _showThreadIdentifier;
static bool _showThreadName;
static bool _showRole;
static bool _threaded;
static bool _useColor;
static bool _useLocalTime;
static bool _keepLogRotate;
static bool _useMicrotime;

View File

@ -71,6 +71,9 @@ void LoggerFeature::collectOptions(std::shared_ptr<ProgramOptions> options) {
new VectorParameter<StringParameter>(&_levels));
options->addSection("log", "Configure the logging");
options->addOption("--log.color", "use colors for TTY logging",
new BooleanParameter(&_useColor));
options->addOption("--log.output,-o", "log destination(s)",
new VectorParameter<StringParameter>(&_output));
@ -108,7 +111,11 @@ void LoggerFeature::collectOptions(std::shared_ptr<ProgramOptions> options) {
options->addHiddenOption("--log.thread",
"show thread identifier in log message",
new BooleanParameter(&_thread));
new BooleanParameter(&_threadId));
options->addHiddenOption("--log.thread-name",
"show thread name in log message",
new BooleanParameter(&_threadName));
options->addHiddenOption("--log.performance",
"shortcut for '--log.level performance=trace'",
@ -163,11 +170,13 @@ void LoggerFeature::prepare() {
Logger::setLogLevel(_levels);
Logger::setShowRole(_showRole);
Logger::setUseColor(_useColor);
Logger::setUseLocalTime(_useLocalTime);
Logger::setUseMicrotime(_useMicrotime);
Logger::setShowLineNumber(_lineNumber);
Logger::setShortenFilenames(_shortenFilenames);
Logger::setShowThreadIdentifier(_thread);
Logger::setShowThreadIdentifier(_threadId);
Logger::setShowThreadName(_threadName);
Logger::setOutputPrefix(_prefix);
Logger::setKeepLogrotate(_keepLogRotate);

View File

@ -47,12 +47,14 @@ class LoggerFeature final : public application_features::ApplicationFeature {
private:
std::vector<std::string> _output;
std::vector<std::string> _levels;
bool _useLocalTime = false;
std::string _prefix;
std::string _file;
bool _useLocalTime = false;
bool _useColor = true;
bool _lineNumber = false;
bool _shortenFilenames = true;
bool _thread = false;
bool _threadId = false;
bool _threadName = false;
bool _performance = false;
bool _keepLogRotate = false;
bool _foregroundTty = false;