1
0
Fork 0

Bug fix/agency mt fixes (#3158)

* added debugging methods

* try to fix invalid access in case of error

* remove unused members

* bugfixes and comments

* all agency fixes in

* merge bug

* partially unguarded Agent::lead fixed

* all agency fixes in

* added nrBlocked to thread startup eval

* added nrBlocked to thread startup eval

* recombination of cases in State::get

* some maps replaced with unordered_maps

* optimized maps some
This commit is contained in:
Kaveh Vahedipour 2017-08-30 10:43:51 +02:00 committed by Frank Celler
parent 73055db6b6
commit 00650e6a3f
28 changed files with 400 additions and 260 deletions

View File

@ -261,7 +261,7 @@ class AgencyCommResult {
std::string _body;
std::string _realBody;
std::map<std::string, AgencyCommResultEntry> _values;
std::unordered_map<std::string, AgencyCommResultEntry> _values;
int _statusCode;
bool _connected;
bool _sent;
@ -606,7 +606,7 @@ class AgencyCommManager {
// should call `failed` such that the manager can switch to a new
// current endpoint. In case a redirect is received, one has to inform
// the manager by calling `redirect`.
std::map<std::string,
std::unordered_map<std::string,
std::vector<std::unique_ptr<httpclient::GeneralClientConnection>>>
_unusedConnections;
};

View File

@ -113,10 +113,11 @@ struct log_t {
std::string const& clientId = std::string())
: index(idx),
term(t),
entry(e),
clientId(clientId),
timestamp(std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch())) {}
std::chrono::system_clock::now().time_since_epoch())) {
entry = std::make_shared<arangodb::velocypack::Buffer<uint8_t>>(*e.get());
}
friend std::ostream& operator<<(std::ostream& o, log_t const& l) {
o << l.index << " " << l.term << " " << VPackSlice(l.entry->data()).toJson()

View File

@ -199,6 +199,7 @@ AgentInterface::raft_commit_t Agent::waitFor(index_t index, double timeout) {
while (true) {
/// success?
{
_liLock.assertNotLockedByCurrentThread();
MUTEX_LOCKER(lockIndex, _ioLock);
if (_commitIndex >= index) {
return Agent::raft_commit_t::OK;
@ -208,6 +209,7 @@ AgentInterface::raft_commit_t Agent::waitFor(index_t index, double timeout) {
// timeout
if (!_waitForCV.wait(static_cast<uint64_t>(1.0e6 * timeout))) {
if (leading()) {
_liLock.assertNotLockedByCurrentThread();
MUTEX_LOCKER(lockIndex, _ioLock);
return (_commitIndex >= index) ?
Agent::raft_commit_t::OK : Agent::raft_commit_t::TIMEOUT;
@ -235,6 +237,7 @@ void Agent::reportIn(std::string const& peerId, index_t index, size_t toLog) {
{
// Enforce _lastCommitIndex, _readDB and compaction to progress atomically
_liLock.assertNotLockedByCurrentThread();
MUTEX_LOCKER(ioLocker, _ioLock);
// Update last acknowledged answer
@ -276,6 +279,7 @@ void Agent::reportIn(std::string const& peerId, index_t index, size_t toLog) {
true /* inform others by callbacks */ );
}
// TODO: why _liLock here, should by _ioLock, and we already have it
MUTEX_LOCKER(liLocker, _liLock);
_commitIndex = index;
if (_commitIndex >= _nextCompactionAfter) {
@ -376,7 +380,7 @@ bool Agent::recvAppendEntriesRPC(
}
// Now the log is empty, but this will soon be rectified.
{
MUTEX_LOCKER(liLocker, _liLock);
MUTEX_LOCKER(ioLocker, _ioLock);
_nextCompactionAfter = (std::min)(_nextCompactionAfter,
snapshotIndex + _config.compactionStepSize());
}
@ -389,6 +393,7 @@ bool Agent::recvAppendEntriesRPC(
bool ok = true;
if (nqs > 0) {
_liLock.assertNotLockedByCurrentThread();
MUTEX_LOCKER(ioLocker, _ioLock);
size_t ndups = _state.removeConflicts(queries, gotSnapshot);
@ -417,12 +422,16 @@ bool Agent::recvAppendEntriesRPC(
}
}
bool wakeup;
{
_liLock.assertNotLockedByCurrentThread();
MUTEX_LOCKER(ioLocker, _ioLock);
_commitIndex = std::min(leaderCommitIndex, _lastApplied);
wakeup = (_commitIndex >= _nextCompactionAfter);
}
if (_commitIndex >= _nextCompactionAfter) {
if (wakeup) {
_compactor.wakeUp();
}
@ -451,11 +460,16 @@ void Agent::sendAppendEntriesRPC() {
index_t lastConfirmed, commitIndex;
auto startTime = system_clock::now();
time_point<system_clock> earliestPackage, lastAcked;
{
_liLock.assertNotLockedByCurrentThread();
MUTEX_LOCKER(ioLocker, _ioLock);
t = this->term();
lastConfirmed = _confirmed[followerId];
commitIndex = _commitIndex;
lastAcked = _lastAcked[followerId];
earliestPackage = _earliestPackage[followerId];
}
duration<double> lockTime = system_clock::now() - startTime;
if (lockTime.count() > 0.1) {
@ -463,7 +477,7 @@ void Agent::sendAppendEntriesRPC() {
<< "Reading lastConfirmed took too long: " << lockTime.count();
}
std::vector<log_t> unconfirmed = _state.get(lastConfirmed);
std::vector<log_t> unconfirmed = _state.get(lastConfirmed, lastConfirmed+99);
lockTime = system_clock::now() - startTime;
if (lockTime.count() > 0.2) {
@ -500,8 +514,7 @@ void Agent::sendAppendEntriesRPC() {
_lastSent[followerId].time_since_epoch().count() != 0) {
LOG_TOPIC(WARN, Logger::AGENCY) << "Oops, sent out last heartbeat "
<< "to follower " << followerId << " more than minPing ago: "
<< m.count() << " lastAcked: "
<< timepointToString(_lastAcked[followerId])
<< m.count() << " lastAcked: " << timepointToString(lastAcked)
<< " lastSent: " << timepointToString(_lastSent[followerId]);
}
index_t lowest = unconfirmed.front().index;
@ -553,7 +566,7 @@ void Agent::sendAppendEntriesRPC() {
Builder builder;
builder.add(VPackValue(VPackValueType::Array));
if (
((system_clock::now() - _earliestPackage[followerId]).count() > 0)) {
((system_clock::now() - earliestPackage).count() > 0)) {
if (needSnapshot) {
{ VPackObjectBuilder guard(&builder);
builder.add(VPackValue("readDB"));
@ -581,10 +594,15 @@ void Agent::sendAppendEntriesRPC() {
builder.close();
// Really leading?
if (challengeLeadership()) {
_constituent.candidate();
_preparing = false;
return;
{
MUTEX_LOCKER(ioLocker, _ioLock);
if (challengeLeadership()) {
ioLocker.unlock();
_constituent.candidate();
_preparing = false;
return;
}
}
// Verbose output
@ -609,19 +627,21 @@ void Agent::sendAppendEntriesRPC() {
std::max(1.0e-3 * toLog * dt.count(),
_config.minPing() * _config.timeoutMult()), true);
// _lastSent, _lastHighest: local and single threaded access
_lastSent[followerId] = system_clock::now();
_lastHighest[followerId] = highest;
if (toLog > 0) {
_earliestPackage[followerId] = system_clock::now() + toLog * dt;
earliestPackage = system_clock::now() + toLog * dt;
{
MUTEX_LOCKER(ioLocker, _ioLock);
_earliestPackage[followerId] = earliestPackage;
}
LOG_TOPIC(DEBUG, Logger::AGENCY)
<< "Appending " << unconfirmed.size() - 1 << " entries up to index "
<< highest << " to follower " << followerId << ". Message: "
<< builder.toJson()
<< highest << " to follower " << followerId
<< ". Next real log contact to " << followerId<< " in: "
<< std::chrono::duration<double, std::milli>(
_earliestPackage[followerId]-system_clock::now()).count() << "ms";
earliestPackage-system_clock::now()).count() << "ms";
} else {
LOG_TOPIC(TRACE, Logger::AGENCY)
<< "Just keeping follower " << followerId
@ -659,12 +679,15 @@ query_t Agent::activate(query_t const& everything) {
Slice logs = slice.get("logs");
std::vector<Slice> batch;
VPackBuilder batch;
batch.openArray();
for (auto const& q : VPackArrayIterator(logs)) {
batch.push_back(q.get("request"));
batch.add(q.get("request"));
}
batch.close();
{
_liLock.assertNotLockedByCurrentThread();
MUTEX_LOCKER(ioLocker, _ioLock); // Atomicity
if (!compact.isEmptyArray()) {
_readDB = compact.get("readDB");
@ -676,9 +699,6 @@ query_t Agent::activate(query_t const& everything) {
_spearhead = _readDB;
}
//_state.persistReadDB(everything->slice().get("compact").get("_key"));
//_state.log((everything->slice().get("logs"));
ret->add("success", VPackValue(true));
ret->add("commitId", VPackValue(commitIndex));
}
@ -755,6 +775,8 @@ void Agent::load() {
_compactor.start();
LOG_TOPIC(DEBUG, Logger::AGENCY) << "Starting spearhead worker.";
// Single threaded startup no need locking
_spearhead.start();
_readDB.start();
@ -776,6 +798,7 @@ void Agent::load() {
/// Still leading? Under MUTEX from ::read or ::write
bool Agent::challengeLeadership() {
_ioLock.assertLockedByCurrentThread();
size_t good = 0;
@ -793,8 +816,9 @@ bool Agent::challengeLeadership() {
/// Get last acknowledged responses on leader
query_t Agent::lastAckedAgo() const {
std::map<std::string, TimePoint> lastAcked;
std::unordered_map<std::string, TimePoint> lastAcked;
{
_liLock.assertNotLockedByCurrentThread();
MUTEX_LOCKER(ioLocker, _ioLock);
lastAcked = _lastAcked;
}
@ -840,6 +864,7 @@ trans_ret_t Agent::transact(query_t const& queries) {
ret->openArray();
{
_liLock.assertNotLockedByCurrentThread();
MUTEX_LOCKER(ioLocker, _ioLock);
// Only leader else redirect
@ -898,6 +923,7 @@ trans_ret_t Agent::transient(query_t const& queries) {
{
VPackArrayBuilder b(ret.get());
_liLock.assertNotLockedByCurrentThread();
MUTEX_LOCKER(ioLocker, _ioLock);
// Only leader else redirect
@ -931,6 +957,7 @@ inquire_ret_t Agent::inquire(query_t const& query) {
return inquire_ret_t(false, leader);
}
_liLock.assertNotLockedByCurrentThread();
MUTEX_LOCKER(ioLocker, _ioLock);
auto si = _state.inquire(query);
@ -1009,6 +1036,7 @@ write_ret_t Agent::write(query_t const& query, bool discardStartup) {
}
}
_liLock.assertNotLockedByCurrentThread();
MUTEX_LOCKER(ioLocker, _ioLock);
// Only leader else redirect
@ -1056,6 +1084,7 @@ read_ret_t Agent::read(query_t const& query) {
}
}
_liLock.assertNotLockedByCurrentThread();
MUTEX_LOCKER(ioLocker, _ioLock);
// Only leader else redirect
if (challengeLeadership()) {
@ -1118,6 +1147,7 @@ void Agent::reportActivated(
if (state->slice().get("success").getBoolean()) {
{
_liLock.assertNotLockedByCurrentThread();
MUTEX_LOCKER(ioLocker, _ioLock);
_confirmed.erase(failed);
auto commitIndex = state->slice().get("commitId").getNumericValue<index_t>();
@ -1136,6 +1166,7 @@ void Agent::reportActivated(
}
} else {
_liLock.assertNotLockedByCurrentThread();
MUTEX_LOCKER(ioLocker, _ioLock);
myterm = _constituent.term();
}
@ -1182,8 +1213,9 @@ void Agent::failedActivation(
void Agent::detectActiveAgentFailures() {
// Detect faulty agent if pool larger than agency
std::map<std::string, TimePoint> lastAcked;
std::unordered_map<std::string, TimePoint> lastAcked;
{
_liLock.assertNotLockedByCurrentThread();
MUTEX_LOCKER(ioLocker, _ioLock);
lastAcked = _lastAcked;
}
@ -1235,8 +1267,11 @@ void Agent::beginShutdown() {
_compactor.beginShutdown();
// Stop key value stores
_spearhead.beginShutdown();
_readDB.beginShutdown();
{
MUTEX_LOCKER(ioLocker, _ioLock);
_spearhead.beginShutdown();
_readDB.beginShutdown();
}
// Wake up all waiting rest handlers
{
@ -1265,6 +1300,7 @@ bool Agent::prepareLead() {
// Reset last acknowledged
{
_liLock.assertNotLockedByCurrentThread();
MUTEX_LOCKER(ioLocker, _ioLock);
for (auto const& i : _config.active()) {
_lastAcked[i] = system_clock::now();
@ -1288,6 +1324,7 @@ void Agent::lead() {
// Agency configuration
term_t myterm;
{
_liLock.assertNotLockedByCurrentThread();
MUTEX_LOCKER(ioLocker, _ioLock);
myterm = _constituent.term();
}
@ -1297,15 +1334,26 @@ void Agent::lead() {
// Notify inactive pool
notifyInactive();
index_t commitIndex;
{
MUTEX_LOCKER(ioLocker, _ioLock);
commitIndex = _commitIndex;
}
{
CONDITION_LOCKER(guard, _waitForCV);
while(_commitIndex != _state.lastIndex()) {
while(commitIndex != _state.lastIndex()) {
_waitForCV.wait(10000);
MUTEX_LOCKER(ioLocker, _ioLock);
commitIndex = _commitIndex;
}
}
_spearhead = _readDB;
{
MUTEX_LOCKER(ioLocker, _ioLock);
_spearhead = _readDB;
}
}
// When did we take on leader ship?
@ -1321,7 +1369,7 @@ void Agent::notifyInactive() const {
return;
}
std::map<std::string, std::string> pool = _config.pool();
std::unordered_map<std::string, std::string> pool = _config.pool();
std::string path = "/_api/agency_priv/inform";
Builder out;
@ -1348,7 +1396,6 @@ void Agent::notifyInactive() const {
}
void Agent::updatePeerEndpoint(query_t const& message) {
VPackSlice slice = message->slice();
if (!slice.isObject() || slice.length() == 0) {
@ -1379,9 +1426,12 @@ void Agent::updatePeerEndpoint(query_t const& message) {
}
void Agent::updatePeerEndpoint(std::string const& id, std::string const& ep) {
if (_config.updateEndpoint(id, ep)) {
MUTEX_LOCKER(ioLocker, _ioLock);
if (!challengeLeadership()) {
ioLocker.unlock();
persistConfiguration(term());
notifyInactive();
}
@ -1433,6 +1483,7 @@ void Agent::notify(query_t const& message) {
// Rebuild key value stores
arangodb::consensus::index_t Agent::rebuildDBs() {
_liLock.assertNotLockedByCurrentThread();
MUTEX_LOCKER(ioLocker, _ioLock);
index_t lastCompactionIndex;
@ -1476,13 +1527,24 @@ void Agent::compact() {
// since one usually would like to keep a part of the recent log. Therefore
// we cannot use the _readDB ever, since we have to compute a state of the
// key/value space well before _lastAppliedIndex anyway:
_nextCompactionAfter += _config.compactionStepSize();
index_t commitIndex = 0;
if (_commitIndex > _config.compactionKeepSize()) {
{
MUTEX_LOCKER(ioLocker, _ioLock);
_nextCompactionAfter += _config.compactionStepSize();
commitIndex = _commitIndex;
}
if (commitIndex > _config.compactionKeepSize()) {
// If the keep size is too large, we do not yet compact
if (!_state.compact(_commitIndex - _config.compactionKeepSize())) {
// TODO: check if there is at problem that we call State::compact()
// now with a commit index that may have been slightly modified by other
// threads
// TODO: the question is if we have to lock out others while we
// call compact or while we grab _commitIndex and then call compact
if (!_state.compact(commitIndex - _config.compactionKeepSize())) {
LOG_TOPIC(WARN, Logger::AGENCY) << "Compaction for index "
<< _commitIndex - _config.compactionKeepSize()
<< commitIndex - _config.compactionKeepSize()
<< " did not work.";
}
}
@ -1492,6 +1554,7 @@ void Agent::compact() {
/// Last commit index
std::pair<arangodb::consensus::index_t, arangodb::consensus::index_t>
Agent::lastCommitted() const {
_liLock.assertNotLockedByCurrentThread();
MUTEX_LOCKER(ioLocker, _ioLock);
MUTEX_LOCKER(liLocker, _liLock);
return std::pair<arangodb::consensus::index_t, arangodb::consensus::index_t>(
@ -1500,6 +1563,7 @@ Agent::lastCommitted() const {
/// Last commit index
void Agent::lastCommitted(arangodb::consensus::index_t lastCommitIndex) {
_liLock.assertNotLockedByCurrentThread();
MUTEX_LOCKER(ioLocker, _ioLock);
_commitIndex = lastCommitIndex;
MUTEX_LOCKER(liLocker, _liLock);
@ -1513,7 +1577,13 @@ log_t Agent::lastLog() const { return _state.lastLog(); }
Store const& Agent::spearhead() const { return _spearhead; }
/// Get readdb
Store const& Agent::readDB() const { return _readDB; }
/// intentionally no lock is acquired here, so we can return
/// a const reference
/// the caller has to make sure the lock is actually held
Store const& Agent::readDB() const {
_ioLock.assertLockedByCurrentThread();
return _readDB;
}
/// Get readdb
arangodb::consensus::index_t Agent::readDB(Node& node) const {
@ -1522,8 +1592,19 @@ arangodb::consensus::index_t Agent::readDB(Node& node) const {
return _commitIndex;
}
void Agent::executeLocked(std::function<void()> const& cb) {
MUTEX_LOCKER(ioLocker, _ioLock);
cb();
}
/// Get transient
Store const& Agent::transient() const { return _transient; }
/// intentionally no lock is acquired here, so we can return
/// a const reference
/// the caller has to make sure the lock is actually held
Store const& Agent::transient() const {
_ioLock.assertLockedByCurrentThread();
return _transient;
}
/// Rebuild from persisted state
Agent& Agent::operator=(VPackSlice const& compaction) {
@ -1603,7 +1684,7 @@ query_t Agent::gossip(query_t const& in, bool isCallback, size_t version) {
LOG_TOPIC(TRACE, Logger::AGENCY) << "Received gossip " << slice.toJson();
std::map<std::string, std::string> incoming;
std::unordered_map<std::string, std::string> incoming;
for (auto const& pair : VPackObjectIterator(pslice)) {
if (!pair.value.isString()) {
THROW_ARANGO_EXCEPTION_MESSAGE(
@ -1695,32 +1776,39 @@ bool Agent::ready() const {
}
query_t Agent::buildDB(arangodb::consensus::index_t index) {
Store store(this);
index_t oldIndex;
term_t term;
if (!_state.loadLastCompactedSnapshot(store, oldIndex, term)) {
THROW_ARANGO_EXCEPTION(TRI_ERROR_AGENCY_CANNOT_REBUILD_DBS);
}
if (index > _commitIndex) {
LOG_TOPIC(INFO, Logger::AGENCY)
<< "Cannot snapshot beyond leaderCommitIndex: " << _commitIndex;
index = _commitIndex;
} else if (index < oldIndex) {
LOG_TOPIC(INFO, Logger::AGENCY)
<< "Cannot snapshot before last compaction index: " << oldIndex;
index = oldIndex;
{
MUTEX_LOCKER(ioLocker, _ioLock);
if (index > _commitIndex) {
LOG_TOPIC(INFO, Logger::AGENCY)
<< "Cannot snapshot beyond leaderCommitIndex: " << _commitIndex;
index = _commitIndex;
} else if (index < oldIndex) {
LOG_TOPIC(INFO, Logger::AGENCY)
<< "Cannot snapshot before last compaction index: " << oldIndex;
index = oldIndex;
}
}
std::vector<VPackSlice> logs;
{
MUTEX_LOCKER(mutexLocker, _compactionLock);
if (index > oldIndex) {
logs = _state.slices(oldIndex+1, index);
auto logs = _state.slices(oldIndex+1, index);
store.applyLogEntries(logs, index, term,
false /* do not perform callbacks */);
} else {
VPackBuilder logs;
logs.openArray();
logs.close();
store.applyLogEntries(logs, index, term,
false /* do not perform callbacks */);
}
store.applyLogEntries(logs, index, term,
false /* do not perform callbacks */);
}
auto builder = std::make_shared<VPackBuilder>();

View File

@ -187,6 +187,9 @@ class Agent : public arangodb::Thread,
/// @brief State machine
State const& state() const;
/// @brief execute a callback while holding _ioLock
void executeLocked(std::function<void()> const& cb);
/// @brief Get read store and compaction index
index_t readDB(Node&) const;
@ -325,18 +328,19 @@ class Agent : public arangodb::Thread,
arangodb::basics::ConditionVariable _waitForCV;
/// @brief Confirmed indices of all members of agency
std::map<std::string, index_t> _confirmed;
std::map<std::string, index_t> _lastHighest;
std::unordered_map<std::string, index_t> _confirmed;
std::unordered_map<std::string, index_t> _lastHighest;
std::map<std::string, TimePoint> _lastAcked;
std::map<std::string, TimePoint> _lastSent;
std::map<std::string, TimePoint> _earliestPackage;
std::unordered_map<std::string, TimePoint> _lastAcked;
std::unordered_map<std::string, TimePoint> _lastSent;
std::unordered_map<std::string, TimePoint> _earliestPackage;
/**< @brief RAFT consistency lock:
_spearhead
_read_db
_lastCommitedIndex (log index)
_readDB
_commitIndex (log index)
_lastAcked
_lastSent
_confirmed
_nextCompactionAfter
*/
@ -345,6 +349,11 @@ class Agent : public arangodb::Thread,
// lock for _leaderCommitIndex
mutable arangodb::Mutex _liLock;
// note: when both _ioLock and _liLock are acquired,
// the locking order must be:
// 1) _ioLock
// 2) _liLock
// @brief guard _activator
mutable arangodb::Mutex _activatorLock;
@ -368,7 +377,7 @@ class Agent : public arangodb::Thread,
TimePoint _leaderSince;
/// @brief Ids of ongoing transactions, used for inquire:
std::set<std::string> _ongoingTrxs;
std::unordered_set<std::string> _ongoingTrxs;
// lock for _ongoingTrxs
arangodb::Mutex _trxsLock;

View File

@ -194,7 +194,7 @@ void config_t::setTimeoutMult(int64_t m) {
}
}
std::map<std::string, std::string> config_t::pool() const {
std::unordered_map<std::string, std::string> config_t::pool() const {
READ_LOCKER(readLocker, _lock);
return _pool;
}
@ -392,7 +392,7 @@ bool config_t::updateEndpoint(std::string const& id, std::string const& ep) {
void config_t::update(query_t const& message) {
VPackSlice slice = message->slice();
std::map<std::string, std::string> pool;
std::unordered_map<std::string, std::string> pool;
bool changed = false;
for (auto const& p : VPackObjectIterator(slice.get(poolStr))) {
auto const& id = p.key.copyString();

View File

@ -65,7 +65,7 @@ struct config_t {
double _maxPing;
int64_t _timeoutMult;
std::string _endpoint;
std::map<std::string, std::string> _pool;
std::unordered_map<std::string, std::string> _pool;
std::vector<std::string> _gossipPeers;
std::vector<std::string> _active;
bool _supervision;
@ -173,7 +173,7 @@ struct config_t {
std::string endpoint() const;
/// @brief copy of pool
std::map<std::string, std::string> pool() const;
std::unordered_map<std::string, std::string> pool() const;
/// @brief get one pair out of pool
std::string poolAt(std::string const& id) const;

View File

@ -105,6 +105,8 @@ void Constituent::term(term_t t) {
void Constituent::termNoLock(term_t t) {
// Only call this when you have the _castLock
_castLock.assertLockedByCurrentThread();
term_t tmp = _term;
_term = t;
@ -200,6 +202,8 @@ void Constituent::follow(term_t t) {
}
void Constituent::followNoLock(term_t t) {
_castLock.assertLockedByCurrentThread();
_term = t;
_role = FOLLOWER;
@ -373,14 +377,14 @@ bool Constituent::vote(term_t termOfPeer, std::string id, index_t prevLogIndex,
}
TRI_ASSERT(_vocbase != nullptr);
MUTEX_LOCKER(guard, _castLock);
LOG_TOPIC(TRACE, Logger::AGENCY)
<< "vote(termOfPeer: " << termOfPeer << ", leaderId: " << id
<< ", prev-log-index: " << prevLogIndex << ", prev-log-term: " << prevLogTerm
<< ") in (my) term " << _term;
MUTEX_LOCKER(guard, _castLock);
if (termOfPeer > _term) {
termNoLock(termOfPeer);
@ -505,6 +509,7 @@ void Constituent::callElection() {
while (true) {
if (steady_clock::now() >= timeout) { // Timeout.
MUTEX_LOCKER(locker, _castLock);
follow(_term);
break;
}
@ -522,8 +527,12 @@ void Constituent::callElection() {
if (slc.isObject() && slc.hasKey("term") && slc.hasKey("voteGranted")) {
// Follow right away?
term_t t = slc.get("term").getUInt();
if (t > _term) {
term_t t = slc.get("term").getUInt(), term;
{
MUTEX_LOCKER(locker, _castLock);
term = _term;
}
if (t > term) {
follow(t);
break;
}
@ -542,7 +551,12 @@ void Constituent::callElection() {
}
// Count the vote as a nay
if (++nay >= majority) { // Network: majority against?
follow(_term);
term_t term;
{
MUTEX_LOCKER(locker, _castLock);
term = _term;
}
follow(term);
break;
}
@ -641,6 +655,7 @@ void Constituent::run() {
}
if (size() == 1) {
MUTEX_LOCKER(guard, _castLock);
_leaderID = _agent->config().id();
LOG_TOPIC(DEBUG, Logger::AGENCY) << "Set _leaderID to " << _leaderID
<< " in term " << _term;
@ -651,7 +666,14 @@ void Constituent::run() {
_role = FOLLOWER;
}
while (!this->isStopping()) {
if (_role == FOLLOWER) {
role_t role;
{
MUTEX_LOCKER(guard, _castLock);
role = _role;
}
if (role == FOLLOWER) {
static double const M = 1.0e6;
int64_t a = static_cast<int64_t>(M * _agent->config().minPing() *
_agent->config().timeoutMult());
@ -707,7 +729,8 @@ void Constituent::run() {
candidate();
_agent->unprepareLead();
}
} else if (_role == CANDIDATE) {
} else if (role == CANDIDATE) {
callElection(); // Run for office
} else {
int32_t left =

View File

@ -373,23 +373,7 @@ bool Inception::restartingActiveAgent() {
}
void Inception::reportIn(query_t const& query) {
VPackSlice slice = query->slice();
TRI_ASSERT(slice.isObject());
TRI_ASSERT(slice.hasKey("mean"));
TRI_ASSERT(slice.hasKey("stdev"));
TRI_ASSERT(slice.hasKey("min"));
TRI_ASSERT(slice.hasKey("max"));
MUTEX_LOCKER(lock, _mLock);
_measurements.push_back(
std::vector<double>(
{slice.get("mean").getNumber<double>(),
slice.get("stdev").getNumber<double>(),
slice.get("max").getNumber<double>(),
slice.get("min").getNumber<double>()} ));
// does nothing at the moment
}

View File

@ -74,13 +74,8 @@ public:
Agent* _agent; //< @brief The agent
arangodb::basics::ConditionVariable _cv; //< @brief For proper shutdown
std::vector<double> _pings; //< @brief pings
std::map<std::string,size_t> _acked; //< @brief acknowledged config version
std::unordered_map<std::string,size_t> _acked; //< @brief acknowledged config version
mutable arangodb::Mutex _vLock; //< @brieg Guard _acked
mutable arangodb::Mutex _pLock; //< @brief Guard _pings
std::vector<std::vector<double>> _measurements; //< @brief measurements
mutable arangodb::Mutex _mLock; //< @brief Guard _measurements
};
}}

View File

@ -287,13 +287,13 @@ std::string Job::findNonblockedCommonHealthyInSyncFollower( // Which is in "GOOD
auto cs = clones(snap, db, col, shrd); // clones
auto nclones = cs.size(); // #clones
std::map<std::string,bool> good;
std::unordered_map<std::string,bool> good;
for (const auto& i : snap(healthPrefix).children()) {
good[i.first] = ((*i.second)("Status").toJson() == "GOOD");
}
std::map<std::string,size_t> currentServers;
std::unordered_map<std::string,size_t> currentServers;
for (const auto& clone : cs) {
auto currentShardPath =
curColPrefix + db + "/" + clone.collection + "/"

View File

@ -76,7 +76,7 @@ class Node {
typedef std::vector<std::string> PathType;
// @brief Child nodes
typedef std::map<std::string, std::shared_ptr<Node>> Children;
typedef std::unordered_map<std::string, std::shared_ptr<Node>> Children;
/// @brief Construct with name
explicit Node(std::string const& name);

View File

@ -162,7 +162,7 @@ bool RemoveFollower::start() {
= clones(_snapshot, _database, _collection, _shard);
// Now find some new servers to remove:
std::map<std::string, int> overview; // get an overview over the servers
std::unordered_map<std::string, int> overview; // get an overview over the servers
// -1 : not "GOOD", can be in sync, or leader, or not
// >=0: number of servers for which it is in sync or confirmed leader
bool leaderBad = false;

View File

@ -186,21 +186,23 @@ RestStatus RestAgencyHandler::handleStores() {
{
VPackObjectBuilder b(&body);
{
body.add(VPackValue("spearhead"));
{
VPackArrayBuilder bb(&body);
_agent->spearhead().dumpToBuilder(body);
}
body.add(VPackValue("read_db"));
{
VPackArrayBuilder bb(&body);
_agent->readDB().dumpToBuilder(body);
}
body.add(VPackValue("transient"));
{
VPackArrayBuilder bb(&body);
_agent->transient().dumpToBuilder(body);
}
_agent->executeLocked([&]() {
body.add(VPackValue("spearhead"));
{
VPackArrayBuilder bb(&body);
_agent->spearhead().dumpToBuilder(body);
}
body.add(VPackValue("read_db"));
{
VPackArrayBuilder bb(&body);
_agent->readDB().dumpToBuilder(body);
}
body.add(VPackValue("transient"));
{
VPackArrayBuilder bb(&body);
_agent->transient().dumpToBuilder(body);
}
});
}
}
generateResult(rest::ResponseCode::OK, body.slice());

View File

@ -135,8 +135,6 @@ bool State::persist(index_t index, term_t term,
std::vector<index_t> State::log(
query_t const& transactions, std::vector<bool> const& applicable, term_t term) {
TRI_ASSERT(!_log.empty()); // log must not ever be empty
std::vector<index_t> idx(applicable.size());
size_t j = 0;
auto const& slice = transactions->slice();
@ -149,6 +147,8 @@ std::vector<index_t> State::log(
TRI_ASSERT(slice.length() == applicable.size());
MUTEX_LOCKER(mutexLocker, _logLock);
TRI_ASSERT(!_log.empty()); // log must never be empty
for (auto const& i : VPackArrayIterator(slice)) {
if (!i.isArray()) {
@ -182,6 +182,8 @@ index_t State::logNonBlocking(
index_t idx, velocypack::Slice const& slice, term_t term,
std::string const& clientId, bool leading) {
_logLock.assertLockedByCurrentThread();
TRI_ASSERT(!_log.empty()); // log must not ever be empty
auto buf = std::make_shared<Buffer<uint8_t>>();
@ -236,7 +238,6 @@ index_t State::logNonBlocking(
/// Log transactions (follower)
index_t State::log(query_t const& transactions, size_t ndups) {
VPackSlice slices = transactions->slice();
TRI_ASSERT(slices.isArray());
@ -244,6 +245,7 @@ index_t State::log(query_t const& transactions, size_t ndups) {
size_t nqs = slices.length();
TRI_ASSERT(nqs > ndups);
std::string clientId;
MUTEX_LOCKER(mutexLocker, _logLock); // log entries must stay in order
@ -256,14 +258,12 @@ index_t State::log(query_t const& transactions, size_t ndups) {
slice.get("term").getUInt(), slice.get("clientId").copyString())==0) {
break;
}
}
return _log.empty() ? 0 : _log.back().index;
}
size_t State::removeConflicts(query_t const& transactions,
bool gotSnapshot) {
size_t State::removeConflicts(query_t const& transactions, bool gotSnapshot) {
// Under MUTEX in Agent
// Note that this will ignore a possible snapshot in the first position!
// This looks through the transactions and skips over those that are
@ -346,8 +346,8 @@ size_t State::removeConflicts(query_t const& transactions,
}
/// Get log entries from indices "start" to "end"
std::vector<log_t> State::get(index_t start,
index_t end) const {
std::vector<log_t> State::get(index_t start, index_t end) const {
std::vector<log_t> entries;
MUTEX_LOCKER(mutexLocker, _logLock); // Cannot be read lock (Compaction)
@ -355,15 +355,28 @@ std::vector<log_t> State::get(index_t start,
return entries;
}
if (end == (std::numeric_limits<uint64_t>::max)() || end > _log.back().index) {
// start must be greater than or equal to the lowest index
// and smaller than or equal to the largest index
if (start < _log[0].index) {
start = _log.front().index;
} else if (start > _log.back().index) {
start = _log.back().index;
}
// end must be greater than or equal to start
// and smaller than or equal to the largest index
if (end <= start) {
end = start;
} else if (
end == (std::numeric_limits<uint64_t>::max)() || end > _log.back().index) {
end = _log.back().index;
}
if (start < _log[0].index) {
start = _log[0].index;
}
// subtract offset _cur
start -= _cur;
end -= (_cur-1);
for (size_t i = start - _cur; i <= end - _cur; ++i) {
for (size_t i = start; i < end; ++i) {
entries.push_back(_log[i]);
}
@ -440,36 +453,41 @@ bool State::has(index_t index, term_t term) const {
/// Get vector of past transaction from 'start' to 'end'
std::vector<VPackSlice> State::slices(index_t start,
index_t end) const {
std::vector<VPackSlice> slices;
VPackBuilder State::slices(index_t start,
index_t end) const {
VPackBuilder slices;
slices.openArray();
MUTEX_LOCKER(mutexLocker, _logLock); // Cannot be read lock (Compaction)
if (_log.empty()) {
return slices;
}
if (!_log.empty()) {
if (start < _log.front().index) { // no start specified
start = _log.front().index;
}
if (start < _log.front().index) { // no start specified
start = _log.front().index;
}
if (start > _log.back().index) { // no end specified
slices.close();
return slices;
}
if (start > _log.back().index) { // no end specified
return slices;
}
if (end == (std::numeric_limits<uint64_t>::max)() ||
end > _log.back().index) {
end = _log.back().index;
}
if (end == (std::numeric_limits<uint64_t>::max)() ||
end > _log.back().index) {
end = _log.back().index;
}
for (size_t i = start - _cur; i <= end - _cur; ++i) {
try {
slices.push_back(VPackSlice(_log.at(i).entry->data()));
} catch (std::exception const&) {
break;
for (size_t i = start - _cur; i <= end - _cur; ++i) {
try {
slices.add(VPackSlice(_log.at(i).entry->data()));
} catch (std::exception const&) {
break;
}
}
}
mutexLocker.unlock();
slices.close();
return slices;
}
@ -662,8 +680,9 @@ bool State::loadCompacted() {
VPackSlice result = queryResult.result->slice();
MUTEX_LOCKER(logLock, _logLock);
if (result.isArray() && result.length()) {
MUTEX_LOCKER(logLock, _logLock);
for (auto const& i : VPackArrayIterator(result)) {
auto ii = i.resolveExternals();
buffer_t tmp = std::make_shared<arangodb::velocypack::Buffer<uint8_t>>();
@ -680,7 +699,10 @@ bool State::loadCompacted() {
// We can be sure that every compacted snapshot only contains index entries
// that have been written and agreed upon by an absolute majority of agents.
if (!_log.empty()) {
_agent->lastCommitted(lastLog().index);
index_t lastIndex = _log.back().index;
logLock.unlock();
_agent->lastCommitted(lastIndex);
}
return true;
@ -917,10 +939,6 @@ bool State::compactPersisted(index_t cind) {
THROW_ARANGO_EXCEPTION_MESSAGE(queryResult.code, queryResult.details);
}
if (queryResult.code != TRI_ERROR_NO_ERROR) {
THROW_ARANGO_EXCEPTION_MESSAGE(queryResult.code, queryResult.details);
}
return true;
}
@ -1136,19 +1154,17 @@ query_t State::allLogs() const {
}
std::vector<std::vector<log_t>> State::inquire(query_t const& query) const {
std::vector<std::vector<log_t>> result;
MUTEX_LOCKER(mutexLocker, _logLock); // Cannot be read lock (Compaction)
if (!query->slice().isArray()) {
THROW_ARANGO_EXCEPTION_MESSAGE(
210002,
20001,
std::string("Inquiry handles a list of string clientIds: [<clientId>] ")
+ ". We got " + query->toJson());
return result;
}
std::vector<std::vector<log_t>> result;
size_t pos = 0;
MUTEX_LOCKER(mutexLocker, _logLock); // Cannot be read lock (Compaction)
for (auto const& i : VPackArrayIterator(query->slice())) {
if (!i.isString()) {
@ -1168,11 +1184,9 @@ std::vector<std::vector<log_t>> State::inquire(query_t const& query) const {
result.push_back(transactions);
pos++;
}
return result;
}
// Index of last log entry

View File

@ -99,7 +99,7 @@ class State {
/// @brief Get complete logged commands by lower and upper bounds.
/// Default: [first, last]
std::vector<VPackSlice> slices(
arangodb::velocypack::Builder slices(
index_t = 0, index_t = (std::numeric_limits<uint64_t>::max)()) const;
/// @brief log entry at index i

View File

@ -1,3 +1,4 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
@ -116,15 +117,6 @@ inline static bool endpointPathFromUrl(std::string const& url,
Store::Store(Agent* agent, std::string const& name)
: Thread(name), _agent(agent), _node(name, this) {}
/// Move constructor. note: this is not thread-safe!
Store::Store(Store&& other)
: Thread(other._node.name()),
_agent(std::move(other._agent)),
_timeTable(std::move(other._timeTable)),
_observerTable(std::move(other._observerTable)),
_observedTable(std::move(other._observedTable)),
_node(std::move(other._node)) {}
/// Copy assignment operator
Store& Store::operator=(Store const& rhs) {
if (&rhs != this) {
@ -250,7 +242,7 @@ check_ret_t Store::applyTransaction(Slice const& query) {
/// template<class T, class U> std::multimap<std::string, std::string>
std::ostream& operator<<(std::ostream& os,
std::multimap<std::string, std::string> const& m) {
std::unordered_multimap<std::string, std::string> const& m) {
for (auto const& i : m) {
os << i.first << ": " << i.second << std::endl;
}
@ -271,22 +263,30 @@ struct notify_t {
/// Apply (from logs)
std::vector<bool> Store::applyLogEntries(
std::vector<VPackSlice> const& queries, index_t index,
arangodb::velocypack::Builder const& queries, index_t index,
term_t term, bool inform) {
std::vector<bool> applied;
// Apply log entries
{
VPackArrayIterator queriesIterator(queries.slice());
MUTEX_LOCKER(storeLocker, _storeLock);
for (auto const& i : queries) {
applied.push_back(applies(i));
while (queriesIterator.valid()) {
applied.push_back(applies(queriesIterator.value()));
queriesIterator.next();
}
}
if (inform && _agent->leading()) {
// Find possibly affected callbacks
std::multimap<std::string, std::shared_ptr<notify_t>> in;
for (auto const& i : queries) {
VPackArrayIterator queriesIterator(queries.slice());
while (queriesIterator.valid()) {
VPackSlice const& i = queriesIterator.value();
for (auto const& j : VPackObjectIterator(i)) {
if (j.value.isObject() && j.value.hasKey("op")) {
std::string oper = j.value.get("op").copyString();
@ -315,6 +315,8 @@ std::vector<bool> Store::applyLogEntries(
}
}
}
queriesIterator.next();
}
// Sort by URLS to avoid multiple callbacks
@ -573,19 +575,20 @@ query_t Store::clearExpired() const {
query_t tmp = std::make_shared<Builder>();
{ VPackArrayBuilder t(tmp.get());
MUTEX_LOCKER(storeLocker, _storeLock);
for (auto it = _timeTable.cbegin(); it != _timeTable.cend(); ++it) {
if (it->first < std::chrono::system_clock::now()) {
VPackArrayBuilder ttt(tmp.get());
{ VPackObjectBuilder tttt(tmp.get());
tmp->add(VPackValue(it->second));
{ VPackObjectBuilder ttttt(tmp.get());
tmp->add("op", VPackValue("delete"));
}}
} else {
break;
if (!_timeTable.empty()) {
for (auto it = _timeTable.cbegin(); it != _timeTable.cend(); ++it) {
if (it->first < std::chrono::system_clock::now()) {
VPackArrayBuilder ttt(tmp.get());
{ VPackObjectBuilder tttt(tmp.get());
tmp->add(VPackValue(it->second));
{ VPackObjectBuilder ttttt(tmp.get());
tmp->add("op", VPackValue("delete"));
}}
} else {
break;
}
}
}
}
return tmp;
}
@ -752,25 +755,25 @@ std::multimap<TimePoint, std::string> const& Store::timeTable() const {
}
/// Observer table
std::multimap<std::string, std::string>& Store::observerTable() {
std::unordered_multimap<std::string, std::string>& Store::observerTable() {
_storeLock.assertLockedByCurrentThread();
return _observerTable;
}
/// Observer table
std::multimap<std::string, std::string> const& Store::observerTable() const {
std::unordered_multimap<std::string, std::string> const& Store::observerTable() const {
_storeLock.assertLockedByCurrentThread();
return _observerTable;
}
/// Observed table
std::multimap<std::string, std::string>& Store::observedTable() {
std::unordered_multimap<std::string, std::string>& Store::observedTable() {
_storeLock.assertLockedByCurrentThread();
return _observedTable;
}
/// Observed table
std::multimap<std::string, std::string> const& Store::observedTable() const {
std::unordered_multimap<std::string, std::string> const& Store::observedTable() const {
_storeLock.assertLockedByCurrentThread();
return _observedTable;
}

View File

@ -99,7 +99,7 @@ class Store : public arangodb::Thread {
check_ret_t applyTransaction(Slice const& query);
/// @brief Apply log entries in query, also process callbacks
std::vector<bool> applyLogEntries(std::vector<Slice> const& query,
std::vector<bool> applyLogEntries(arangodb::velocypack::Builder const& query,
index_t index, term_t term, bool inform);
/// @brief Read specified query from store
@ -151,10 +151,10 @@ class Store : public arangodb::Thread {
std::multimap<TimePoint, std::string>& timeTable();
std::multimap<TimePoint, std::string> const& timeTable() const;
std::multimap<std::string, std::string>& observerTable();
std::multimap<std::string, std::string> const& observerTable() const;
std::multimap<std::string, std::string>& observedTable();
std::multimap<std::string, std::string> const& observedTable() const;
std::unordered_multimap<std::string, std::string>& observerTable();
std::unordered_multimap<std::string, std::string> const& observerTable() const;
std::unordered_multimap<std::string, std::string>& observedTable();
std::unordered_multimap<std::string, std::string> const& observedTable() const;
/// @brief Check precondition
check_ret_t check(arangodb::velocypack::Slice const&, CheckMode = FIRST_FAIL) const;
@ -180,8 +180,8 @@ class Store : public arangodb::Thread {
std::multimap<TimePoint, std::string> _timeTable;
/// @brief Table of observers in tree (only used in root node)
std::multimap<std::string, std::string> _observerTable;
std::multimap<std::string, std::string> _observedTable;
std::unordered_multimap<std::string, std::string> _observerTable;
std::unordered_multimap<std::string, std::string> _observedTable;
/// @brief Root node
Node _node;

View File

@ -76,6 +76,7 @@ static std::string const foxxmaster = "/Current/Foxxmaster";
void Supervision::upgradeOne(Builder& builder) {
_lock.assertLockedByCurrentThread();
// "/arango/Agency/Definition" not exists or is 0
if (!_snapshot.has("Agency/Definition")) {
{ VPackArrayBuilder trx(&builder);
@ -97,6 +98,7 @@ void Supervision::upgradeOne(Builder& builder) {
}
void Supervision::upgradeZero(Builder& builder) {
_lock.assertLockedByCurrentThread();
// "/arango/Target/FailedServers" is still an array
Slice fails = _snapshot(failedServersPrefix).slice();
if (_snapshot(failedServersPrefix).slice().isArray()) {
@ -118,6 +120,7 @@ void Supervision::upgradeZero(Builder& builder) {
// Upgrade agency, guarded by wakeUp
void Supervision::upgradeAgency() {
_lock.assertLockedByCurrentThread();
Builder builder;
{
@ -149,6 +152,7 @@ void Supervision::upgradeAgency() {
// Check all DB servers, guarded above doChecks
std::vector<check_t> Supervision::checkDBServers() {
_lock.assertLockedByCurrentThread();
std::vector<check_t> ret;
auto const& machinesPlanned = _snapshot(planDBServersPrefix).children();
auto const& serversRegistered =
@ -310,7 +314,8 @@ std::vector<check_t> Supervision::checkDBServers() {
// Check all coordinators, guarded above doChecks
std::vector<check_t> Supervision::checkCoordinators() {
_lock.assertLockedByCurrentThread();
std::vector<check_t> ret;
auto const& machinesPlanned = _snapshot(planCoordinatorsPrefix).children();
auto const& serversRegistered =
@ -457,18 +462,20 @@ std::vector<check_t> Supervision::checkCoordinators() {
// Update local agency snapshot, guarded by callers
bool Supervision::updateSnapshot() {
_lock.assertLockedByCurrentThread();
if (_agent == nullptr || this->isStopping()) {
return false;
}
if (_agent->readDB().has(_agencyPrefix)) {
_snapshot = _agent->readDB().get(_agencyPrefix);
}
if (_agent->transient().has(_agencyPrefix)) {
_transient = _agent->transient().get(_agencyPrefix);
}
_agent->executeLocked([&]() {
if (_agent->readDB().has(_agencyPrefix)) {
_snapshot = _agent->readDB().get(_agencyPrefix);
}
if (_agent->transient().has(_agencyPrefix)) {
_transient = _agent->transient().get(_agencyPrefix);
}
});
return true;
@ -476,6 +483,7 @@ bool Supervision::updateSnapshot() {
// All checks, guarded by main thread
bool Supervision::doChecks() {
_lock.assertLockedByCurrentThread();
checkDBServers();
checkCoordinators();
return true;
@ -494,19 +502,27 @@ void Supervision::run() {
CONDITION_LOCKER(guard, _cv);
_cv.wait(static_cast<uint64_t>(1000000 * _frequency));
}
bool done = false;
MUTEX_LOCKER(locker, _lock);
if (_agent->readDB().has(supervisionNode)) {
try {
_snapshot = _agent->readDB().get(supervisionNode);
if (_snapshot.children().size() > 0) {
break;
_agent->executeLocked([&]() {
if (_agent->readDB().has(supervisionNode)) {
try {
_snapshot = _agent->readDB().get(supervisionNode);
if (_snapshot.children().size() > 0) {
done = true;
}
} catch (...) {
LOG_TOPIC(WARN, Logger::SUPERVISION) <<
"Main node in agency gone. Contact your db administrator.";
}
} catch (...) {
LOG_TOPIC(WARN, Logger::SUPERVISION) <<
"Main node in agency gone. Contact your db administrator.";
}
});
if (done) {
break;
}
LOG_TOPIC(DEBUG, Logger::SUPERVISION) << "Waiting for ArangoDB to "
"initialize its data.";
}
@ -518,13 +534,13 @@ void Supervision::run() {
while (!this->isStopping()) {
// Get bunch of job IDs from agency for future jobs
if (_agent->leading() && (_jobId == 0 || _jobId == _jobIdMax)) {
getUniqueIds(); // cannot fail but only hang
}
{
MUTEX_LOCKER(locker, _lock);
// Get bunch of job IDs from agency for future jobs
if (_agent->leading() && (_jobId == 0 || _jobId == _jobIdMax)) {
getUniqueIds(); // cannot fail but only hang
}
updateSnapshot();
@ -562,12 +578,14 @@ void Supervision::run() {
// Guarded by caller
bool Supervision::isShuttingDown() {
_lock.assertLockedByCurrentThread();
return (_snapshot.has("Shutdown") && _snapshot("Shutdown").isBool()) ?
_snapshot("/Shutdown").getBool() : false;
}
// Guarded by caller
std::string Supervision::serverHealth(std::string const& serverName) {
_lock.assertLockedByCurrentThread();
std::string const serverStatus(healthPrefix + serverName + "/Status");
return (_snapshot.has(serverStatus)) ?
_snapshot(serverStatus).getString() : std::string();
@ -575,6 +593,8 @@ std::string Supervision::serverHealth(std::string const& serverName) {
// Guarded by caller
void Supervision::handleShutdown() {
_lock.assertLockedByCurrentThread();
_selfShutdown = true;
LOG_TOPIC(DEBUG, Logger::SUPERVISION) << "Waiting for clients to shut down";
auto const& serversRegistered =
@ -622,6 +642,7 @@ void Supervision::handleShutdown() {
// Guarded by caller
bool Supervision::handleJobs() {
_lock.assertLockedByCurrentThread();
// Do supervision
shrinkCluster();
@ -633,6 +654,7 @@ bool Supervision::handleJobs() {
// Guarded by caller
void Supervision::workJobs() {
_lock.assertLockedByCurrentThread();
for (auto const& todoEnt : _snapshot(toDoPrefix).children()) {
JobContext(
@ -648,6 +670,7 @@ void Supervision::workJobs() {
void Supervision::enforceReplication() {
_lock.assertLockedByCurrentThread();
auto const& plannedDBs = _snapshot(planColPrefix).children();
for (const auto& db_ : plannedDBs) { // Planned databases
@ -721,6 +744,7 @@ void Supervision::enforceReplication() {
}
void Supervision::fixPrototypeChain(Builder& migrate) {
_lock.assertLockedByCurrentThread();
auto const& snap = _snapshot;
@ -763,6 +787,7 @@ void Supervision::fixPrototypeChain(Builder& migrate) {
// Shrink cluster if applicable, guarded by caller
void Supervision::shrinkCluster() {
_lock.assertLockedByCurrentThread();
auto const& todo = _snapshot(toDoPrefix).children();
auto const& pending = _snapshot(pendingPrefix).children();
@ -863,6 +888,7 @@ bool Supervision::start(Agent* agent) {
static std::string const syncLatest = "/Sync/LatestID";
void Supervision::getUniqueIds() {
_lock.assertLockedByCurrentThread();
size_t n = 10000;
@ -907,28 +933,3 @@ void Supervision::beginShutdown() {
CONDITION_LOCKER(guard, _cv);
guard.broadcast();
}
void Supervision::missingPrototype() {
auto const& plannedDBs = _snapshot(planColPrefix).children();
//auto available = Job::availableServers(_snapshot);
// key: prototype, value: clone
//std::multimap<std::string, std::string> likeness;
for (const auto& db_ : plannedDBs) { // Planned databases
auto const& db = *(db_.second);
for (const auto& col_ : db.children()) { // Planned collections
auto const& col = *(col_.second);
auto prototype = col("distributeShardsLike").slice().copyString();
if (prototype.empty()) {
continue;
}
}
}
}

View File

@ -132,9 +132,6 @@ class Supervision : public arangodb::Thread {
/// @brief Upgrade agency to supervision overhaul jobs
void upgradeOne(VPackBuilder&);
/// @brief Check for inconsistencies in distributeShardsLike
void missingPrototype();
/// @brief Check for inconsistencies in replication factor vs dbs entries
void enforceReplication();

View File

@ -1326,9 +1326,9 @@ int ClusterInfo::createCollectionCoordinator(std::string const& databaseName,
if (!res.successful()) {
if (res.httpCode() ==
(int)arangodb::rest::ResponseCode::PRECONDITION_FAILED) {
auto result = res.slice();
AgencyCommResult ag = ac.getValues("/");
auto result = res.slice();
if (result.isArray() && result.length() > 0) {
if (result[0].isObject()) {
auto tres = result[0];

View File

@ -942,7 +942,7 @@ function agencyTestSuite () {
require("console").warn("Provoking second log compaction for now with",
count3, "keys, from log entry", cur + count + count2, "on.");
doCountTransactions(count3, count + count2);
}/*,
},
////////////////////////////////////////////////////////////////////////////////
/// @brief Huge transaction package
@ -955,7 +955,7 @@ function agencyTestSuite () {
}
writeAndCheck(huge);
assertEqual(readAndCheck([["a"]]), [{"a":20000}]);
}*/
}
};
}

View File

@ -307,6 +307,7 @@
"ERROR_CANNOT_DROP_SMART_COLLECTION" : { "code" : 4002, "message" : "cannot drop this smart collection" },
"ERROR_KEY_MUST_BE_PREFIXED_WITH_SMART_GRAPH_ATTRIBUTE" : { "code" : 4003, "message" : "in smart vertex collections _key must be prefixed with the value of the smart graph attribute" },
"ERROR_ILLEGAL_SMART_GRAPH_ATTRIBUTE" : { "code" : 4004, "message" : "attribute cannot be used as smart graph attribute" },
"ERROR_AGENCY_INQUIRY_SYNTAX" : { "code" : 20001, "message" : "Illegal inquiry syntax" },
"ERROR_AGENCY_INFORM_MUST_BE_OBJECT" : { "code" : 20011, "message" : "Inform message must be an object." },
"ERROR_AGENCY_INFORM_MUST_CONTAIN_TERM" : { "code" : 20012, "message" : "Inform message must contain uint parameter 'term'" },
"ERROR_AGENCY_INFORM_MUST_CONTAIN_ID" : { "code" : 20013, "message" : "Inform message must contain string parameter 'id'" },

View File

@ -112,6 +112,7 @@ function MovingShardsSuite () {
var request = require("@arangodb/request");
var endpointToURL = require("@arangodb/cluster").endpointToURL;
var url = endpointToURL(coordEndpoint);
var res;
try {
var envelope =
@ -120,7 +121,7 @@ function MovingShardsSuite () {
} catch (err) {
console.error(
"Exception for POST /_admin/cluster/cleanOutServer:", err.stack);
return [];
return {};
}
var body = res.body;
if (typeof body === "string") {

View File

@ -129,6 +129,10 @@ void Mutex::unlock() {
void Mutex::assertLockedByCurrentThread() {
TRI_ASSERT(_holder == Thread::currentThreadId());
}
void Mutex::assertNotLockedByCurrentThread() {
TRI_ASSERT(_holder != Thread::currentThreadId());
}
#endif
// -----------------------------------------------------------------------------

View File

@ -51,8 +51,10 @@ class Mutex {
// nothing in non-maintainer mode and will do nothing for non-posix locks
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
void assertLockedByCurrentThread();
void assertNotLockedByCurrentThread();
#else
inline void assertLockedByCurrentThread() {}
inline void assertNotLockedByCurrentThread() {}
#endif
private:

View File

@ -429,6 +429,7 @@ ERROR_ILLEGAL_SMART_GRAPH_ATTRIBUTE,4004,"attribute cannot be used as smart grap
## Agency errors
################################################################################
ERROR_AGENCY_INQUIRY_SYNTAX,20001,"Illegal inquiry syntax","Inquiry handles a list of string clientIds: [<clientId>,...]."
ERROR_AGENCY_INFORM_MUST_BE_OBJECT,20011,"Inform message must be an object.","The inform message in the agency must be an object."
ERROR_AGENCY_INFORM_MUST_CONTAIN_TERM,20012,"Inform message must contain uint parameter 'term'","The inform message in the agency must contain a uint parameter 'term'."
ERROR_AGENCY_INFORM_MUST_CONTAIN_ID,20013,"Inform message must contain string parameter 'id'","The inform message in the agency must contain a string parameter 'id'."

View File

@ -169,9 +169,9 @@ void TRI_InitializeErrorMessages () {
REG_ERROR(ERROR_CLUSTER_SHARD_LEADER_REFUSES_REPLICATION, "a shard leader refuses to perform a replication operation");
REG_ERROR(ERROR_CLUSTER_SHARD_FOLLOWER_REFUSES_OPERATION, "a shard follower refuses to perform an operation that is not a replication");
REG_ERROR(ERROR_CLUSTER_SHARD_LEADER_RESIGNED, "a (former) shard leader refuses to perform an operation, because it has resigned in the meantime");
REG_ERROR(ERROR_CLUSTER_AGENCY_COMMUNICATION_FAILED, "some agency operation failed");
REG_ERROR(ERROR_CLUSTER_DISTRIBUTE_SHARDS_LIKE_REPLICATION_FACTOR, "conflicting replication factor with distributeShardsLike parameter assignment");
REG_ERROR(ERROR_CLUSTER_DISTRIBUTE_SHARDS_LIKE_NUMBER_OF_SHARDS, "conflicting shard number with distributeShardsLike parameter assignment");
REG_ERROR(ERROR_CLUSTER_AGENCY_COMMUNICATION_FAILED, "some agency operation failed");
REG_ERROR(ERROR_QUERY_KILLED, "query killed");
REG_ERROR(ERROR_QUERY_PARSE, "%s");
REG_ERROR(ERROR_QUERY_EMPTY, "query is empty");
@ -303,6 +303,7 @@ void TRI_InitializeErrorMessages () {
REG_ERROR(ERROR_CANNOT_DROP_SMART_COLLECTION, "cannot drop this smart collection");
REG_ERROR(ERROR_KEY_MUST_BE_PREFIXED_WITH_SMART_GRAPH_ATTRIBUTE, "in smart vertex collections _key must be prefixed with the value of the smart graph attribute");
REG_ERROR(ERROR_ILLEGAL_SMART_GRAPH_ATTRIBUTE, "attribute cannot be used as smart graph attribute");
REG_ERROR(ERROR_AGENCY_INQUIRY_SYNTAX, "Illegal inquiry syntax");
REG_ERROR(ERROR_AGENCY_INFORM_MUST_BE_OBJECT, "Inform message must be an object.");
REG_ERROR(ERROR_AGENCY_INFORM_MUST_CONTAIN_TERM, "Inform message must contain uint parameter 'term'");
REG_ERROR(ERROR_AGENCY_INFORM_MUST_CONTAIN_ID, "Inform message must contain string parameter 'id'");

View File

@ -723,6 +723,8 @@
/// - 4004: @LIT{attribute cannot be used as smart graph attribute}
/// The given smartGraph attribute is illegal and connot be used for
/// sharding. All system attributes are forbidden.
/// - 20001: @LIT{Illegal inquiry syntax}
/// Inquiry handles a list of string clientIds: [<clientId>,...].
/// - 20011: @LIT{Inform message must be an object.}
/// The inform message in the agency must be an object.
/// - 20012: @LIT{Inform message must contain uint parameter 'term'}
@ -2494,7 +2496,8 @@ void TRI_InitializeErrorMessages ();
////////////////////////////////////////////////////////////////////////////////
/// @brief 1493: ERROR_CLUSTER_DISTRIBUTE_SHARDS_LIKE_REPLICATION_FACTOR
///
/// conflicting replication factor with distributeShardsLike parameter assignment
/// conflicting replication factor with distributeShardsLike parameter
/// assignment
///
/// Will be raised if intended replication factor does not match that of the
/// prototype shard given in ditributeShardsLike parameter.
@ -3851,6 +3854,16 @@ void TRI_InitializeErrorMessages ();
#define TRI_ERROR_ILLEGAL_SMART_GRAPH_ATTRIBUTE (4004)
////////////////////////////////////////////////////////////////////////////////
/// @brief 20001: ERROR_AGENCY_INQUIRY_SYNTAX
///
/// Illegal inquiry syntax
///
/// Inquiry handles a list of string clientIds: [<clientId>,...].
////////////////////////////////////////////////////////////////////////////////
#define TRI_ERROR_AGENCY_INQUIRY_SYNTAX (20001)
////////////////////////////////////////////////////////////////////////////////
/// @brief 20011: ERROR_AGENCY_INFORM_MUST_BE_OBJECT
///