diff --git a/arangod/Agency/AgencyComm.h b/arangod/Agency/AgencyComm.h index 050af36876..7afb7258a2 100644 --- a/arangod/Agency/AgencyComm.h +++ b/arangod/Agency/AgencyComm.h @@ -261,7 +261,7 @@ class AgencyCommResult { std::string _body; std::string _realBody; - std::map _values; + std::unordered_map _values; int _statusCode; bool _connected; bool _sent; @@ -606,7 +606,7 @@ class AgencyCommManager { // should call `failed` such that the manager can switch to a new // current endpoint. In case a redirect is received, one has to inform // the manager by calling `redirect`. - std::map>> _unusedConnections; }; diff --git a/arangod/Agency/AgencyCommon.h b/arangod/Agency/AgencyCommon.h index 18765da9ea..716b9f1a1e 100644 --- a/arangod/Agency/AgencyCommon.h +++ b/arangod/Agency/AgencyCommon.h @@ -113,10 +113,11 @@ struct log_t { std::string const& clientId = std::string()) : index(idx), term(t), - entry(e), clientId(clientId), timestamp(std::chrono::duration_cast( - std::chrono::system_clock::now().time_since_epoch())) {} + std::chrono::system_clock::now().time_since_epoch())) { + entry = std::make_shared>(*e.get()); + } friend std::ostream& operator<<(std::ostream& o, log_t const& l) { o << l.index << " " << l.term << " " << VPackSlice(l.entry->data()).toJson() diff --git a/arangod/Agency/Agent.cpp b/arangod/Agency/Agent.cpp index 39193a4923..e4cc836211 100644 --- a/arangod/Agency/Agent.cpp +++ b/arangod/Agency/Agent.cpp @@ -199,6 +199,7 @@ AgentInterface::raft_commit_t Agent::waitFor(index_t index, double timeout) { while (true) { /// success? { + _liLock.assertNotLockedByCurrentThread(); MUTEX_LOCKER(lockIndex, _ioLock); if (_commitIndex >= index) { return Agent::raft_commit_t::OK; @@ -208,6 +209,7 @@ AgentInterface::raft_commit_t Agent::waitFor(index_t index, double timeout) { // timeout if (!_waitForCV.wait(static_cast(1.0e6 * timeout))) { if (leading()) { + _liLock.assertNotLockedByCurrentThread(); MUTEX_LOCKER(lockIndex, _ioLock); return (_commitIndex >= index) ? Agent::raft_commit_t::OK : Agent::raft_commit_t::TIMEOUT; @@ -235,6 +237,7 @@ void Agent::reportIn(std::string const& peerId, index_t index, size_t toLog) { { // Enforce _lastCommitIndex, _readDB and compaction to progress atomically + _liLock.assertNotLockedByCurrentThread(); MUTEX_LOCKER(ioLocker, _ioLock); // Update last acknowledged answer @@ -276,6 +279,7 @@ void Agent::reportIn(std::string const& peerId, index_t index, size_t toLog) { true /* inform others by callbacks */ ); } + // TODO: why _liLock here, should by _ioLock, and we already have it MUTEX_LOCKER(liLocker, _liLock); _commitIndex = index; if (_commitIndex >= _nextCompactionAfter) { @@ -376,7 +380,7 @@ bool Agent::recvAppendEntriesRPC( } // Now the log is empty, but this will soon be rectified. { - MUTEX_LOCKER(liLocker, _liLock); + MUTEX_LOCKER(ioLocker, _ioLock); _nextCompactionAfter = (std::min)(_nextCompactionAfter, snapshotIndex + _config.compactionStepSize()); } @@ -389,6 +393,7 @@ bool Agent::recvAppendEntriesRPC( bool ok = true; if (nqs > 0) { + _liLock.assertNotLockedByCurrentThread(); MUTEX_LOCKER(ioLocker, _ioLock); size_t ndups = _state.removeConflicts(queries, gotSnapshot); @@ -417,12 +422,16 @@ bool Agent::recvAppendEntriesRPC( } } + bool wakeup; { + _liLock.assertNotLockedByCurrentThread(); MUTEX_LOCKER(ioLocker, _ioLock); _commitIndex = std::min(leaderCommitIndex, _lastApplied); + + wakeup = (_commitIndex >= _nextCompactionAfter); } - - if (_commitIndex >= _nextCompactionAfter) { + + if (wakeup) { _compactor.wakeUp(); } @@ -451,11 +460,16 @@ void Agent::sendAppendEntriesRPC() { index_t lastConfirmed, commitIndex; auto startTime = system_clock::now(); + time_point earliestPackage, lastAcked; + { + _liLock.assertNotLockedByCurrentThread(); MUTEX_LOCKER(ioLocker, _ioLock); t = this->term(); lastConfirmed = _confirmed[followerId]; commitIndex = _commitIndex; + lastAcked = _lastAcked[followerId]; + earliestPackage = _earliestPackage[followerId]; } duration lockTime = system_clock::now() - startTime; if (lockTime.count() > 0.1) { @@ -463,7 +477,7 @@ void Agent::sendAppendEntriesRPC() { << "Reading lastConfirmed took too long: " << lockTime.count(); } - std::vector unconfirmed = _state.get(lastConfirmed); + std::vector unconfirmed = _state.get(lastConfirmed, lastConfirmed+99); lockTime = system_clock::now() - startTime; if (lockTime.count() > 0.2) { @@ -500,8 +514,7 @@ void Agent::sendAppendEntriesRPC() { _lastSent[followerId].time_since_epoch().count() != 0) { LOG_TOPIC(WARN, Logger::AGENCY) << "Oops, sent out last heartbeat " << "to follower " << followerId << " more than minPing ago: " - << m.count() << " lastAcked: " - << timepointToString(_lastAcked[followerId]) + << m.count() << " lastAcked: " << timepointToString(lastAcked) << " lastSent: " << timepointToString(_lastSent[followerId]); } index_t lowest = unconfirmed.front().index; @@ -553,7 +566,7 @@ void Agent::sendAppendEntriesRPC() { Builder builder; builder.add(VPackValue(VPackValueType::Array)); if ( - ((system_clock::now() - _earliestPackage[followerId]).count() > 0)) { + ((system_clock::now() - earliestPackage).count() > 0)) { if (needSnapshot) { { VPackObjectBuilder guard(&builder); builder.add(VPackValue("readDB")); @@ -581,10 +594,15 @@ void Agent::sendAppendEntriesRPC() { builder.close(); // Really leading? - if (challengeLeadership()) { - _constituent.candidate(); - _preparing = false; - return; + { + MUTEX_LOCKER(ioLocker, _ioLock); + + if (challengeLeadership()) { + ioLocker.unlock(); + _constituent.candidate(); + _preparing = false; + return; + } } // Verbose output @@ -609,19 +627,21 @@ void Agent::sendAppendEntriesRPC() { std::max(1.0e-3 * toLog * dt.count(), _config.minPing() * _config.timeoutMult()), true); - // _lastSent, _lastHighest: local and single threaded access _lastSent[followerId] = system_clock::now(); _lastHighest[followerId] = highest; if (toLog > 0) { - _earliestPackage[followerId] = system_clock::now() + toLog * dt; + earliestPackage = system_clock::now() + toLog * dt; + { + MUTEX_LOCKER(ioLocker, _ioLock); + _earliestPackage[followerId] = earliestPackage; + } LOG_TOPIC(DEBUG, Logger::AGENCY) << "Appending " << unconfirmed.size() - 1 << " entries up to index " - << highest << " to follower " << followerId << ". Message: " - << builder.toJson() + << highest << " to follower " << followerId << ". Next real log contact to " << followerId<< " in: " << std::chrono::duration( - _earliestPackage[followerId]-system_clock::now()).count() << "ms"; + earliestPackage-system_clock::now()).count() << "ms"; } else { LOG_TOPIC(TRACE, Logger::AGENCY) << "Just keeping follower " << followerId @@ -659,12 +679,15 @@ query_t Agent::activate(query_t const& everything) { Slice logs = slice.get("logs"); - std::vector batch; + VPackBuilder batch; + batch.openArray(); for (auto const& q : VPackArrayIterator(logs)) { - batch.push_back(q.get("request")); + batch.add(q.get("request")); } + batch.close(); { + _liLock.assertNotLockedByCurrentThread(); MUTEX_LOCKER(ioLocker, _ioLock); // Atomicity if (!compact.isEmptyArray()) { _readDB = compact.get("readDB"); @@ -676,9 +699,6 @@ query_t Agent::activate(query_t const& everything) { _spearhead = _readDB; } - //_state.persistReadDB(everything->slice().get("compact").get("_key")); - //_state.log((everything->slice().get("logs")); - ret->add("success", VPackValue(true)); ret->add("commitId", VPackValue(commitIndex)); } @@ -755,6 +775,8 @@ void Agent::load() { _compactor.start(); LOG_TOPIC(DEBUG, Logger::AGENCY) << "Starting spearhead worker."; + + // Single threaded startup no need locking _spearhead.start(); _readDB.start(); @@ -776,6 +798,7 @@ void Agent::load() { /// Still leading? Under MUTEX from ::read or ::write bool Agent::challengeLeadership() { + _ioLock.assertLockedByCurrentThread(); size_t good = 0; @@ -793,8 +816,9 @@ bool Agent::challengeLeadership() { /// Get last acknowledged responses on leader query_t Agent::lastAckedAgo() const { - std::map lastAcked; + std::unordered_map lastAcked; { + _liLock.assertNotLockedByCurrentThread(); MUTEX_LOCKER(ioLocker, _ioLock); lastAcked = _lastAcked; } @@ -840,6 +864,7 @@ trans_ret_t Agent::transact(query_t const& queries) { ret->openArray(); { + _liLock.assertNotLockedByCurrentThread(); MUTEX_LOCKER(ioLocker, _ioLock); // Only leader else redirect @@ -898,6 +923,7 @@ trans_ret_t Agent::transient(query_t const& queries) { { VPackArrayBuilder b(ret.get()); + _liLock.assertNotLockedByCurrentThread(); MUTEX_LOCKER(ioLocker, _ioLock); // Only leader else redirect @@ -931,6 +957,7 @@ inquire_ret_t Agent::inquire(query_t const& query) { return inquire_ret_t(false, leader); } + _liLock.assertNotLockedByCurrentThread(); MUTEX_LOCKER(ioLocker, _ioLock); auto si = _state.inquire(query); @@ -1009,6 +1036,7 @@ write_ret_t Agent::write(query_t const& query, bool discardStartup) { } } + _liLock.assertNotLockedByCurrentThread(); MUTEX_LOCKER(ioLocker, _ioLock); // Only leader else redirect @@ -1056,6 +1084,7 @@ read_ret_t Agent::read(query_t const& query) { } } + _liLock.assertNotLockedByCurrentThread(); MUTEX_LOCKER(ioLocker, _ioLock); // Only leader else redirect if (challengeLeadership()) { @@ -1118,6 +1147,7 @@ void Agent::reportActivated( if (state->slice().get("success").getBoolean()) { { + _liLock.assertNotLockedByCurrentThread(); MUTEX_LOCKER(ioLocker, _ioLock); _confirmed.erase(failed); auto commitIndex = state->slice().get("commitId").getNumericValue(); @@ -1136,6 +1166,7 @@ void Agent::reportActivated( } } else { + _liLock.assertNotLockedByCurrentThread(); MUTEX_LOCKER(ioLocker, _ioLock); myterm = _constituent.term(); } @@ -1182,8 +1213,9 @@ void Agent::failedActivation( void Agent::detectActiveAgentFailures() { // Detect faulty agent if pool larger than agency - std::map lastAcked; + std::unordered_map lastAcked; { + _liLock.assertNotLockedByCurrentThread(); MUTEX_LOCKER(ioLocker, _ioLock); lastAcked = _lastAcked; } @@ -1235,8 +1267,11 @@ void Agent::beginShutdown() { _compactor.beginShutdown(); // Stop key value stores - _spearhead.beginShutdown(); - _readDB.beginShutdown(); + { + MUTEX_LOCKER(ioLocker, _ioLock); + _spearhead.beginShutdown(); + _readDB.beginShutdown(); + } // Wake up all waiting rest handlers { @@ -1265,6 +1300,7 @@ bool Agent::prepareLead() { // Reset last acknowledged { + _liLock.assertNotLockedByCurrentThread(); MUTEX_LOCKER(ioLocker, _ioLock); for (auto const& i : _config.active()) { _lastAcked[i] = system_clock::now(); @@ -1288,6 +1324,7 @@ void Agent::lead() { // Agency configuration term_t myterm; { + _liLock.assertNotLockedByCurrentThread(); MUTEX_LOCKER(ioLocker, _ioLock); myterm = _constituent.term(); } @@ -1297,15 +1334,26 @@ void Agent::lead() { // Notify inactive pool notifyInactive(); + index_t commitIndex; + { + MUTEX_LOCKER(ioLocker, _ioLock); + commitIndex = _commitIndex; + } + { CONDITION_LOCKER(guard, _waitForCV); - while(_commitIndex != _state.lastIndex()) { + while(commitIndex != _state.lastIndex()) { _waitForCV.wait(10000); + MUTEX_LOCKER(ioLocker, _ioLock); + commitIndex = _commitIndex; } } - _spearhead = _readDB; - + { + MUTEX_LOCKER(ioLocker, _ioLock); + _spearhead = _readDB; + } + } // When did we take on leader ship? @@ -1321,7 +1369,7 @@ void Agent::notifyInactive() const { return; } - std::map pool = _config.pool(); + std::unordered_map pool = _config.pool(); std::string path = "/_api/agency_priv/inform"; Builder out; @@ -1348,7 +1396,6 @@ void Agent::notifyInactive() const { } void Agent::updatePeerEndpoint(query_t const& message) { - VPackSlice slice = message->slice(); if (!slice.isObject() || slice.length() == 0) { @@ -1379,9 +1426,12 @@ void Agent::updatePeerEndpoint(query_t const& message) { } void Agent::updatePeerEndpoint(std::string const& id, std::string const& ep) { - if (_config.updateEndpoint(id, ep)) { + MUTEX_LOCKER(ioLocker, _ioLock); + if (!challengeLeadership()) { + ioLocker.unlock(); + persistConfiguration(term()); notifyInactive(); } @@ -1433,6 +1483,7 @@ void Agent::notify(query_t const& message) { // Rebuild key value stores arangodb::consensus::index_t Agent::rebuildDBs() { + _liLock.assertNotLockedByCurrentThread(); MUTEX_LOCKER(ioLocker, _ioLock); index_t lastCompactionIndex; @@ -1476,13 +1527,24 @@ void Agent::compact() { // since one usually would like to keep a part of the recent log. Therefore // we cannot use the _readDB ever, since we have to compute a state of the // key/value space well before _lastAppliedIndex anyway: - _nextCompactionAfter += _config.compactionStepSize(); + index_t commitIndex = 0; - if (_commitIndex > _config.compactionKeepSize()) { + { + MUTEX_LOCKER(ioLocker, _ioLock); + _nextCompactionAfter += _config.compactionStepSize(); + commitIndex = _commitIndex; + } + + if (commitIndex > _config.compactionKeepSize()) { // If the keep size is too large, we do not yet compact - if (!_state.compact(_commitIndex - _config.compactionKeepSize())) { + // TODO: check if there is at problem that we call State::compact() + // now with a commit index that may have been slightly modified by other + // threads + // TODO: the question is if we have to lock out others while we + // call compact or while we grab _commitIndex and then call compact + if (!_state.compact(commitIndex - _config.compactionKeepSize())) { LOG_TOPIC(WARN, Logger::AGENCY) << "Compaction for index " - << _commitIndex - _config.compactionKeepSize() + << commitIndex - _config.compactionKeepSize() << " did not work."; } } @@ -1492,6 +1554,7 @@ void Agent::compact() { /// Last commit index std::pair Agent::lastCommitted() const { + _liLock.assertNotLockedByCurrentThread(); MUTEX_LOCKER(ioLocker, _ioLock); MUTEX_LOCKER(liLocker, _liLock); return std::pair( @@ -1500,6 +1563,7 @@ Agent::lastCommitted() const { /// Last commit index void Agent::lastCommitted(arangodb::consensus::index_t lastCommitIndex) { + _liLock.assertNotLockedByCurrentThread(); MUTEX_LOCKER(ioLocker, _ioLock); _commitIndex = lastCommitIndex; MUTEX_LOCKER(liLocker, _liLock); @@ -1513,7 +1577,13 @@ log_t Agent::lastLog() const { return _state.lastLog(); } Store const& Agent::spearhead() const { return _spearhead; } /// Get readdb -Store const& Agent::readDB() const { return _readDB; } +/// intentionally no lock is acquired here, so we can return +/// a const reference +/// the caller has to make sure the lock is actually held +Store const& Agent::readDB() const { + _ioLock.assertLockedByCurrentThread(); + return _readDB; +} /// Get readdb arangodb::consensus::index_t Agent::readDB(Node& node) const { @@ -1522,8 +1592,19 @@ arangodb::consensus::index_t Agent::readDB(Node& node) const { return _commitIndex; } +void Agent::executeLocked(std::function const& cb) { + MUTEX_LOCKER(ioLocker, _ioLock); + cb(); +} + /// Get transient -Store const& Agent::transient() const { return _transient; } +/// intentionally no lock is acquired here, so we can return +/// a const reference +/// the caller has to make sure the lock is actually held +Store const& Agent::transient() const { + _ioLock.assertLockedByCurrentThread(); + return _transient; +} /// Rebuild from persisted state Agent& Agent::operator=(VPackSlice const& compaction) { @@ -1603,7 +1684,7 @@ query_t Agent::gossip(query_t const& in, bool isCallback, size_t version) { LOG_TOPIC(TRACE, Logger::AGENCY) << "Received gossip " << slice.toJson(); - std::map incoming; + std::unordered_map incoming; for (auto const& pair : VPackObjectIterator(pslice)) { if (!pair.value.isString()) { THROW_ARANGO_EXCEPTION_MESSAGE( @@ -1695,32 +1776,39 @@ bool Agent::ready() const { } query_t Agent::buildDB(arangodb::consensus::index_t index) { - Store store(this); index_t oldIndex; term_t term; if (!_state.loadLastCompactedSnapshot(store, oldIndex, term)) { THROW_ARANGO_EXCEPTION(TRI_ERROR_AGENCY_CANNOT_REBUILD_DBS); } - - if (index > _commitIndex) { - LOG_TOPIC(INFO, Logger::AGENCY) - << "Cannot snapshot beyond leaderCommitIndex: " << _commitIndex; - index = _commitIndex; - } else if (index < oldIndex) { - LOG_TOPIC(INFO, Logger::AGENCY) - << "Cannot snapshot before last compaction index: " << oldIndex; - index = oldIndex; + + { + MUTEX_LOCKER(ioLocker, _ioLock); + if (index > _commitIndex) { + LOG_TOPIC(INFO, Logger::AGENCY) + << "Cannot snapshot beyond leaderCommitIndex: " << _commitIndex; + index = _commitIndex; + } else if (index < oldIndex) { + LOG_TOPIC(INFO, Logger::AGENCY) + << "Cannot snapshot before last compaction index: " << oldIndex; + index = oldIndex; + } } - std::vector logs; { MUTEX_LOCKER(mutexLocker, _compactionLock); if (index > oldIndex) { - logs = _state.slices(oldIndex+1, index); + auto logs = _state.slices(oldIndex+1, index); + store.applyLogEntries(logs, index, term, + false /* do not perform callbacks */); + } else { + VPackBuilder logs; + logs.openArray(); + logs.close(); + store.applyLogEntries(logs, index, term, + false /* do not perform callbacks */); } - store.applyLogEntries(logs, index, term, - false /* do not perform callbacks */); } auto builder = std::make_shared(); diff --git a/arangod/Agency/Agent.h b/arangod/Agency/Agent.h index b9c0ce940b..23d87d443b 100644 --- a/arangod/Agency/Agent.h +++ b/arangod/Agency/Agent.h @@ -187,6 +187,9 @@ class Agent : public arangodb::Thread, /// @brief State machine State const& state() const; + /// @brief execute a callback while holding _ioLock + void executeLocked(std::function const& cb); + /// @brief Get read store and compaction index index_t readDB(Node&) const; @@ -325,18 +328,19 @@ class Agent : public arangodb::Thread, arangodb::basics::ConditionVariable _waitForCV; /// @brief Confirmed indices of all members of agency - std::map _confirmed; - std::map _lastHighest; + std::unordered_map _confirmed; + std::unordered_map _lastHighest; - std::map _lastAcked; - std::map _lastSent; - std::map _earliestPackage; + std::unordered_map _lastAcked; + std::unordered_map _lastSent; + std::unordered_map _earliestPackage; /**< @brief RAFT consistency lock: _spearhead - _read_db - _lastCommitedIndex (log index) + _readDB + _commitIndex (log index) _lastAcked + _lastSent _confirmed _nextCompactionAfter */ @@ -345,6 +349,11 @@ class Agent : public arangodb::Thread, // lock for _leaderCommitIndex mutable arangodb::Mutex _liLock; + // note: when both _ioLock and _liLock are acquired, + // the locking order must be: + // 1) _ioLock + // 2) _liLock + // @brief guard _activator mutable arangodb::Mutex _activatorLock; @@ -368,7 +377,7 @@ class Agent : public arangodb::Thread, TimePoint _leaderSince; /// @brief Ids of ongoing transactions, used for inquire: - std::set _ongoingTrxs; + std::unordered_set _ongoingTrxs; // lock for _ongoingTrxs arangodb::Mutex _trxsLock; diff --git a/arangod/Agency/AgentConfiguration.cpp b/arangod/Agency/AgentConfiguration.cpp index 14176526f3..b37482001e 100644 --- a/arangod/Agency/AgentConfiguration.cpp +++ b/arangod/Agency/AgentConfiguration.cpp @@ -194,7 +194,7 @@ void config_t::setTimeoutMult(int64_t m) { } } -std::map config_t::pool() const { +std::unordered_map config_t::pool() const { READ_LOCKER(readLocker, _lock); return _pool; } @@ -392,7 +392,7 @@ bool config_t::updateEndpoint(std::string const& id, std::string const& ep) { void config_t::update(query_t const& message) { VPackSlice slice = message->slice(); - std::map pool; + std::unordered_map pool; bool changed = false; for (auto const& p : VPackObjectIterator(slice.get(poolStr))) { auto const& id = p.key.copyString(); diff --git a/arangod/Agency/AgentConfiguration.h b/arangod/Agency/AgentConfiguration.h index 5f36050f0e..75e4c008e7 100644 --- a/arangod/Agency/AgentConfiguration.h +++ b/arangod/Agency/AgentConfiguration.h @@ -65,7 +65,7 @@ struct config_t { double _maxPing; int64_t _timeoutMult; std::string _endpoint; - std::map _pool; + std::unordered_map _pool; std::vector _gossipPeers; std::vector _active; bool _supervision; @@ -173,7 +173,7 @@ struct config_t { std::string endpoint() const; /// @brief copy of pool - std::map pool() const; + std::unordered_map pool() const; /// @brief get one pair out of pool std::string poolAt(std::string const& id) const; diff --git a/arangod/Agency/Constituent.cpp b/arangod/Agency/Constituent.cpp index 7da3d189e4..0162bbeb71 100644 --- a/arangod/Agency/Constituent.cpp +++ b/arangod/Agency/Constituent.cpp @@ -105,6 +105,8 @@ void Constituent::term(term_t t) { void Constituent::termNoLock(term_t t) { // Only call this when you have the _castLock + _castLock.assertLockedByCurrentThread(); + term_t tmp = _term; _term = t; @@ -200,6 +202,8 @@ void Constituent::follow(term_t t) { } void Constituent::followNoLock(term_t t) { + _castLock.assertLockedByCurrentThread(); + _term = t; _role = FOLLOWER; @@ -373,14 +377,14 @@ bool Constituent::vote(term_t termOfPeer, std::string id, index_t prevLogIndex, } TRI_ASSERT(_vocbase != nullptr); + + MUTEX_LOCKER(guard, _castLock); LOG_TOPIC(TRACE, Logger::AGENCY) << "vote(termOfPeer: " << termOfPeer << ", leaderId: " << id << ", prev-log-index: " << prevLogIndex << ", prev-log-term: " << prevLogTerm << ") in (my) term " << _term; - MUTEX_LOCKER(guard, _castLock); - if (termOfPeer > _term) { termNoLock(termOfPeer); @@ -505,6 +509,7 @@ void Constituent::callElection() { while (true) { if (steady_clock::now() >= timeout) { // Timeout. + MUTEX_LOCKER(locker, _castLock); follow(_term); break; } @@ -522,8 +527,12 @@ void Constituent::callElection() { if (slc.isObject() && slc.hasKey("term") && slc.hasKey("voteGranted")) { // Follow right away? - term_t t = slc.get("term").getUInt(); - if (t > _term) { + term_t t = slc.get("term").getUInt(), term; + { + MUTEX_LOCKER(locker, _castLock); + term = _term; + } + if (t > term) { follow(t); break; } @@ -542,7 +551,12 @@ void Constituent::callElection() { } // Count the vote as a nay if (++nay >= majority) { // Network: majority against? - follow(_term); + term_t term; + { + MUTEX_LOCKER(locker, _castLock); + term = _term; + } + follow(term); break; } @@ -641,6 +655,7 @@ void Constituent::run() { } if (size() == 1) { + MUTEX_LOCKER(guard, _castLock); _leaderID = _agent->config().id(); LOG_TOPIC(DEBUG, Logger::AGENCY) << "Set _leaderID to " << _leaderID << " in term " << _term; @@ -651,7 +666,14 @@ void Constituent::run() { _role = FOLLOWER; } while (!this->isStopping()) { - if (_role == FOLLOWER) { + + role_t role; + { + MUTEX_LOCKER(guard, _castLock); + role = _role; + } + + if (role == FOLLOWER) { static double const M = 1.0e6; int64_t a = static_cast(M * _agent->config().minPing() * _agent->config().timeoutMult()); @@ -707,7 +729,8 @@ void Constituent::run() { candidate(); _agent->unprepareLead(); } - } else if (_role == CANDIDATE) { + + } else if (role == CANDIDATE) { callElection(); // Run for office } else { int32_t left = diff --git a/arangod/Agency/Inception.cpp b/arangod/Agency/Inception.cpp index 2a20affc38..77a03b819f 100644 --- a/arangod/Agency/Inception.cpp +++ b/arangod/Agency/Inception.cpp @@ -373,23 +373,7 @@ bool Inception::restartingActiveAgent() { } void Inception::reportIn(query_t const& query) { - - VPackSlice slice = query->slice(); - - TRI_ASSERT(slice.isObject()); - TRI_ASSERT(slice.hasKey("mean")); - TRI_ASSERT(slice.hasKey("stdev")); - TRI_ASSERT(slice.hasKey("min")); - TRI_ASSERT(slice.hasKey("max")); - - MUTEX_LOCKER(lock, _mLock); - _measurements.push_back( - std::vector( - {slice.get("mean").getNumber(), - slice.get("stdev").getNumber(), - slice.get("max").getNumber(), - slice.get("min").getNumber()} )); - + // does nothing at the moment } diff --git a/arangod/Agency/Inception.h b/arangod/Agency/Inception.h index 4f424a0ed1..a5e2ed591b 100644 --- a/arangod/Agency/Inception.h +++ b/arangod/Agency/Inception.h @@ -74,13 +74,8 @@ public: Agent* _agent; //< @brief The agent arangodb::basics::ConditionVariable _cv; //< @brief For proper shutdown - std::vector _pings; //< @brief pings - std::map _acked; //< @brief acknowledged config version + std::unordered_map _acked; //< @brief acknowledged config version mutable arangodb::Mutex _vLock; //< @brieg Guard _acked - mutable arangodb::Mutex _pLock; //< @brief Guard _pings - std::vector> _measurements; //< @brief measurements - mutable arangodb::Mutex _mLock; //< @brief Guard _measurements - }; }} diff --git a/arangod/Agency/Job.cpp b/arangod/Agency/Job.cpp index bf21799936..b65a4d66b8 100644 --- a/arangod/Agency/Job.cpp +++ b/arangod/Agency/Job.cpp @@ -287,13 +287,13 @@ std::string Job::findNonblockedCommonHealthyInSyncFollower( // Which is in "GOOD auto cs = clones(snap, db, col, shrd); // clones auto nclones = cs.size(); // #clones - std::map good; + std::unordered_map good; for (const auto& i : snap(healthPrefix).children()) { good[i.first] = ((*i.second)("Status").toJson() == "GOOD"); } - std::map currentServers; + std::unordered_map currentServers; for (const auto& clone : cs) { auto currentShardPath = curColPrefix + db + "/" + clone.collection + "/" diff --git a/arangod/Agency/Node.h b/arangod/Agency/Node.h index 582510e5d8..3c03b8645a 100644 --- a/arangod/Agency/Node.h +++ b/arangod/Agency/Node.h @@ -76,7 +76,7 @@ class Node { typedef std::vector PathType; // @brief Child nodes - typedef std::map> Children; + typedef std::unordered_map> Children; /// @brief Construct with name explicit Node(std::string const& name); diff --git a/arangod/Agency/RemoveFollower.cpp b/arangod/Agency/RemoveFollower.cpp index e999b265ea..7f4678efd6 100644 --- a/arangod/Agency/RemoveFollower.cpp +++ b/arangod/Agency/RemoveFollower.cpp @@ -162,7 +162,7 @@ bool RemoveFollower::start() { = clones(_snapshot, _database, _collection, _shard); // Now find some new servers to remove: - std::map overview; // get an overview over the servers + std::unordered_map overview; // get an overview over the servers // -1 : not "GOOD", can be in sync, or leader, or not // >=0: number of servers for which it is in sync or confirmed leader bool leaderBad = false; diff --git a/arangod/Agency/RestAgencyHandler.cpp b/arangod/Agency/RestAgencyHandler.cpp index dab2b93c48..f15e626f70 100644 --- a/arangod/Agency/RestAgencyHandler.cpp +++ b/arangod/Agency/RestAgencyHandler.cpp @@ -186,21 +186,23 @@ RestStatus RestAgencyHandler::handleStores() { { VPackObjectBuilder b(&body); { - body.add(VPackValue("spearhead")); - { - VPackArrayBuilder bb(&body); - _agent->spearhead().dumpToBuilder(body); - } - body.add(VPackValue("read_db")); - { - VPackArrayBuilder bb(&body); - _agent->readDB().dumpToBuilder(body); - } - body.add(VPackValue("transient")); - { - VPackArrayBuilder bb(&body); - _agent->transient().dumpToBuilder(body); - } + _agent->executeLocked([&]() { + body.add(VPackValue("spearhead")); + { + VPackArrayBuilder bb(&body); + _agent->spearhead().dumpToBuilder(body); + } + body.add(VPackValue("read_db")); + { + VPackArrayBuilder bb(&body); + _agent->readDB().dumpToBuilder(body); + } + body.add(VPackValue("transient")); + { + VPackArrayBuilder bb(&body); + _agent->transient().dumpToBuilder(body); + } + }); } } generateResult(rest::ResponseCode::OK, body.slice()); diff --git a/arangod/Agency/State.cpp b/arangod/Agency/State.cpp index 7f536676e2..dc7dd2f1d1 100644 --- a/arangod/Agency/State.cpp +++ b/arangod/Agency/State.cpp @@ -135,8 +135,6 @@ bool State::persist(index_t index, term_t term, std::vector State::log( query_t const& transactions, std::vector const& applicable, term_t term) { - TRI_ASSERT(!_log.empty()); // log must not ever be empty - std::vector idx(applicable.size()); size_t j = 0; auto const& slice = transactions->slice(); @@ -149,6 +147,8 @@ std::vector State::log( TRI_ASSERT(slice.length() == applicable.size()); MUTEX_LOCKER(mutexLocker, _logLock); + TRI_ASSERT(!_log.empty()); // log must never be empty + for (auto const& i : VPackArrayIterator(slice)) { if (!i.isArray()) { @@ -182,6 +182,8 @@ index_t State::logNonBlocking( index_t idx, velocypack::Slice const& slice, term_t term, std::string const& clientId, bool leading) { + _logLock.assertLockedByCurrentThread(); + TRI_ASSERT(!_log.empty()); // log must not ever be empty auto buf = std::make_shared>(); @@ -236,7 +238,6 @@ index_t State::logNonBlocking( /// Log transactions (follower) index_t State::log(query_t const& transactions, size_t ndups) { - VPackSlice slices = transactions->slice(); TRI_ASSERT(slices.isArray()); @@ -244,6 +245,7 @@ index_t State::log(query_t const& transactions, size_t ndups) { size_t nqs = slices.length(); TRI_ASSERT(nqs > ndups); + std::string clientId; MUTEX_LOCKER(mutexLocker, _logLock); // log entries must stay in order @@ -256,14 +258,12 @@ index_t State::log(query_t const& transactions, size_t ndups) { slice.get("term").getUInt(), slice.get("clientId").copyString())==0) { break; } - } return _log.empty() ? 0 : _log.back().index; } -size_t State::removeConflicts(query_t const& transactions, - bool gotSnapshot) { +size_t State::removeConflicts(query_t const& transactions, bool gotSnapshot) { // Under MUTEX in Agent // Note that this will ignore a possible snapshot in the first position! // This looks through the transactions and skips over those that are @@ -346,8 +346,8 @@ size_t State::removeConflicts(query_t const& transactions, } /// Get log entries from indices "start" to "end" -std::vector State::get(index_t start, - index_t end) const { +std::vector State::get(index_t start, index_t end) const { + std::vector entries; MUTEX_LOCKER(mutexLocker, _logLock); // Cannot be read lock (Compaction) @@ -355,15 +355,28 @@ std::vector State::get(index_t start, return entries; } - if (end == (std::numeric_limits::max)() || end > _log.back().index) { + // start must be greater than or equal to the lowest index + // and smaller than or equal to the largest index + if (start < _log[0].index) { + start = _log.front().index; + } else if (start > _log.back().index) { + start = _log.back().index; + } + + // end must be greater than or equal to start + // and smaller than or equal to the largest index + if (end <= start) { + end = start; + } else if ( + end == (std::numeric_limits::max)() || end > _log.back().index) { end = _log.back().index; } - if (start < _log[0].index) { - start = _log[0].index; - } + // subtract offset _cur + start -= _cur; + end -= (_cur-1); - for (size_t i = start - _cur; i <= end - _cur; ++i) { + for (size_t i = start; i < end; ++i) { entries.push_back(_log[i]); } @@ -440,36 +453,41 @@ bool State::has(index_t index, term_t term) const { /// Get vector of past transaction from 'start' to 'end' -std::vector State::slices(index_t start, - index_t end) const { - std::vector slices; +VPackBuilder State::slices(index_t start, + index_t end) const { + VPackBuilder slices; + slices.openArray(); + MUTEX_LOCKER(mutexLocker, _logLock); // Cannot be read lock (Compaction) - if (_log.empty()) { - return slices; - } + if (!_log.empty()) { + if (start < _log.front().index) { // no start specified + start = _log.front().index; + } - if (start < _log.front().index) { // no start specified - start = _log.front().index; - } + if (start > _log.back().index) { // no end specified + slices.close(); + return slices; + } - if (start > _log.back().index) { // no end specified - return slices; - } + if (end == (std::numeric_limits::max)() || + end > _log.back().index) { + end = _log.back().index; + } - if (end == (std::numeric_limits::max)() || - end > _log.back().index) { - end = _log.back().index; - } - - for (size_t i = start - _cur; i <= end - _cur; ++i) { - try { - slices.push_back(VPackSlice(_log.at(i).entry->data())); - } catch (std::exception const&) { - break; + for (size_t i = start - _cur; i <= end - _cur; ++i) { + try { + slices.add(VPackSlice(_log.at(i).entry->data())); + } catch (std::exception const&) { + break; + } } } + mutexLocker.unlock(); + + slices.close(); + return slices; } @@ -662,8 +680,9 @@ bool State::loadCompacted() { VPackSlice result = queryResult.result->slice(); + MUTEX_LOCKER(logLock, _logLock); + if (result.isArray() && result.length()) { - MUTEX_LOCKER(logLock, _logLock); for (auto const& i : VPackArrayIterator(result)) { auto ii = i.resolveExternals(); buffer_t tmp = std::make_shared>(); @@ -680,7 +699,10 @@ bool State::loadCompacted() { // We can be sure that every compacted snapshot only contains index entries // that have been written and agreed upon by an absolute majority of agents. if (!_log.empty()) { - _agent->lastCommitted(lastLog().index); + index_t lastIndex = _log.back().index; + + logLock.unlock(); + _agent->lastCommitted(lastIndex); } return true; @@ -917,10 +939,6 @@ bool State::compactPersisted(index_t cind) { THROW_ARANGO_EXCEPTION_MESSAGE(queryResult.code, queryResult.details); } - if (queryResult.code != TRI_ERROR_NO_ERROR) { - THROW_ARANGO_EXCEPTION_MESSAGE(queryResult.code, queryResult.details); - } - return true; } @@ -1136,19 +1154,17 @@ query_t State::allLogs() const { } std::vector> State::inquire(query_t const& query) const { - - std::vector> result; - MUTEX_LOCKER(mutexLocker, _logLock); // Cannot be read lock (Compaction) - if (!query->slice().isArray()) { THROW_ARANGO_EXCEPTION_MESSAGE( - 210002, + 20001, std::string("Inquiry handles a list of string clientIds: [] ") + ". We got " + query->toJson()); - return result; } - + + std::vector> result; size_t pos = 0; + + MUTEX_LOCKER(mutexLocker, _logLock); // Cannot be read lock (Compaction) for (auto const& i : VPackArrayIterator(query->slice())) { if (!i.isString()) { @@ -1168,11 +1184,9 @@ std::vector> State::inquire(query_t const& query) const { result.push_back(transactions); pos++; - } return result; - } // Index of last log entry diff --git a/arangod/Agency/State.h b/arangod/Agency/State.h index 781b025ce6..03565d9f52 100644 --- a/arangod/Agency/State.h +++ b/arangod/Agency/State.h @@ -99,7 +99,7 @@ class State { /// @brief Get complete logged commands by lower and upper bounds. /// Default: [first, last] - std::vector slices( + arangodb::velocypack::Builder slices( index_t = 0, index_t = (std::numeric_limits::max)()) const; /// @brief log entry at index i diff --git a/arangod/Agency/Store.cpp b/arangod/Agency/Store.cpp index 9544c7835e..e5cce4eed3 100644 --- a/arangod/Agency/Store.cpp +++ b/arangod/Agency/Store.cpp @@ -1,3 +1,4 @@ + //////////////////////////////////////////////////////////////////////////////// /// DISCLAIMER /// @@ -116,15 +117,6 @@ inline static bool endpointPathFromUrl(std::string const& url, Store::Store(Agent* agent, std::string const& name) : Thread(name), _agent(agent), _node(name, this) {} -/// Move constructor. note: this is not thread-safe! -Store::Store(Store&& other) - : Thread(other._node.name()), - _agent(std::move(other._agent)), - _timeTable(std::move(other._timeTable)), - _observerTable(std::move(other._observerTable)), - _observedTable(std::move(other._observedTable)), - _node(std::move(other._node)) {} - /// Copy assignment operator Store& Store::operator=(Store const& rhs) { if (&rhs != this) { @@ -250,7 +242,7 @@ check_ret_t Store::applyTransaction(Slice const& query) { /// template std::multimap std::ostream& operator<<(std::ostream& os, - std::multimap const& m) { + std::unordered_multimap const& m) { for (auto const& i : m) { os << i.first << ": " << i.second << std::endl; } @@ -271,22 +263,30 @@ struct notify_t { /// Apply (from logs) std::vector Store::applyLogEntries( - std::vector const& queries, index_t index, + arangodb::velocypack::Builder const& queries, index_t index, term_t term, bool inform) { std::vector applied; // Apply log entries { + VPackArrayIterator queriesIterator(queries.slice()); + MUTEX_LOCKER(storeLocker, _storeLock); - for (auto const& i : queries) { - applied.push_back(applies(i)); + + while (queriesIterator.valid()) { + applied.push_back(applies(queriesIterator.value())); + queriesIterator.next(); } } if (inform && _agent->leading()) { // Find possibly affected callbacks std::multimap> in; - for (auto const& i : queries) { + VPackArrayIterator queriesIterator(queries.slice()); + + while (queriesIterator.valid()) { + VPackSlice const& i = queriesIterator.value(); + for (auto const& j : VPackObjectIterator(i)) { if (j.value.isObject() && j.value.hasKey("op")) { std::string oper = j.value.get("op").copyString(); @@ -315,6 +315,8 @@ std::vector Store::applyLogEntries( } } } + + queriesIterator.next(); } // Sort by URLS to avoid multiple callbacks @@ -573,19 +575,20 @@ query_t Store::clearExpired() const { query_t tmp = std::make_shared(); { VPackArrayBuilder t(tmp.get()); MUTEX_LOCKER(storeLocker, _storeLock); - for (auto it = _timeTable.cbegin(); it != _timeTable.cend(); ++it) { - if (it->first < std::chrono::system_clock::now()) { - VPackArrayBuilder ttt(tmp.get()); - { VPackObjectBuilder tttt(tmp.get()); - tmp->add(VPackValue(it->second)); - { VPackObjectBuilder ttttt(tmp.get()); - tmp->add("op", VPackValue("delete")); - }} - } else { - break; + if (!_timeTable.empty()) { + for (auto it = _timeTable.cbegin(); it != _timeTable.cend(); ++it) { + if (it->first < std::chrono::system_clock::now()) { + VPackArrayBuilder ttt(tmp.get()); + { VPackObjectBuilder tttt(tmp.get()); + tmp->add(VPackValue(it->second)); + { VPackObjectBuilder ttttt(tmp.get()); + tmp->add("op", VPackValue("delete")); + }} + } else { + break; + } } } - } return tmp; } @@ -752,25 +755,25 @@ std::multimap const& Store::timeTable() const { } /// Observer table -std::multimap& Store::observerTable() { +std::unordered_multimap& Store::observerTable() { _storeLock.assertLockedByCurrentThread(); return _observerTable; } /// Observer table -std::multimap const& Store::observerTable() const { +std::unordered_multimap const& Store::observerTable() const { _storeLock.assertLockedByCurrentThread(); return _observerTable; } /// Observed table -std::multimap& Store::observedTable() { +std::unordered_multimap& Store::observedTable() { _storeLock.assertLockedByCurrentThread(); return _observedTable; } /// Observed table -std::multimap const& Store::observedTable() const { +std::unordered_multimap const& Store::observedTable() const { _storeLock.assertLockedByCurrentThread(); return _observedTable; } diff --git a/arangod/Agency/Store.h b/arangod/Agency/Store.h index 10ab77cbad..6f00d856ae 100644 --- a/arangod/Agency/Store.h +++ b/arangod/Agency/Store.h @@ -99,7 +99,7 @@ class Store : public arangodb::Thread { check_ret_t applyTransaction(Slice const& query); /// @brief Apply log entries in query, also process callbacks - std::vector applyLogEntries(std::vector const& query, + std::vector applyLogEntries(arangodb::velocypack::Builder const& query, index_t index, term_t term, bool inform); /// @brief Read specified query from store @@ -151,10 +151,10 @@ class Store : public arangodb::Thread { std::multimap& timeTable(); std::multimap const& timeTable() const; - std::multimap& observerTable(); - std::multimap const& observerTable() const; - std::multimap& observedTable(); - std::multimap const& observedTable() const; + std::unordered_multimap& observerTable(); + std::unordered_multimap const& observerTable() const; + std::unordered_multimap& observedTable(); + std::unordered_multimap const& observedTable() const; /// @brief Check precondition check_ret_t check(arangodb::velocypack::Slice const&, CheckMode = FIRST_FAIL) const; @@ -180,8 +180,8 @@ class Store : public arangodb::Thread { std::multimap _timeTable; /// @brief Table of observers in tree (only used in root node) - std::multimap _observerTable; - std::multimap _observedTable; + std::unordered_multimap _observerTable; + std::unordered_multimap _observedTable; /// @brief Root node Node _node; diff --git a/arangod/Agency/Supervision.cpp b/arangod/Agency/Supervision.cpp index 4f48397659..8ed02616b5 100644 --- a/arangod/Agency/Supervision.cpp +++ b/arangod/Agency/Supervision.cpp @@ -76,6 +76,7 @@ static std::string const foxxmaster = "/Current/Foxxmaster"; void Supervision::upgradeOne(Builder& builder) { + _lock.assertLockedByCurrentThread(); // "/arango/Agency/Definition" not exists or is 0 if (!_snapshot.has("Agency/Definition")) { { VPackArrayBuilder trx(&builder); @@ -97,6 +98,7 @@ void Supervision::upgradeOne(Builder& builder) { } void Supervision::upgradeZero(Builder& builder) { + _lock.assertLockedByCurrentThread(); // "/arango/Target/FailedServers" is still an array Slice fails = _snapshot(failedServersPrefix).slice(); if (_snapshot(failedServersPrefix).slice().isArray()) { @@ -118,6 +120,7 @@ void Supervision::upgradeZero(Builder& builder) { // Upgrade agency, guarded by wakeUp void Supervision::upgradeAgency() { + _lock.assertLockedByCurrentThread(); Builder builder; { @@ -149,6 +152,7 @@ void Supervision::upgradeAgency() { // Check all DB servers, guarded above doChecks std::vector Supervision::checkDBServers() { + _lock.assertLockedByCurrentThread(); std::vector ret; auto const& machinesPlanned = _snapshot(planDBServersPrefix).children(); auto const& serversRegistered = @@ -310,7 +314,8 @@ std::vector Supervision::checkDBServers() { // Check all coordinators, guarded above doChecks std::vector Supervision::checkCoordinators() { - + _lock.assertLockedByCurrentThread(); + std::vector ret; auto const& machinesPlanned = _snapshot(planCoordinatorsPrefix).children(); auto const& serversRegistered = @@ -457,18 +462,20 @@ std::vector Supervision::checkCoordinators() { // Update local agency snapshot, guarded by callers bool Supervision::updateSnapshot() { + _lock.assertLockedByCurrentThread(); if (_agent == nullptr || this->isStopping()) { return false; } - if (_agent->readDB().has(_agencyPrefix)) { - _snapshot = _agent->readDB().get(_agencyPrefix); - } - - if (_agent->transient().has(_agencyPrefix)) { - _transient = _agent->transient().get(_agencyPrefix); - } + _agent->executeLocked([&]() { + if (_agent->readDB().has(_agencyPrefix)) { + _snapshot = _agent->readDB().get(_agencyPrefix); + } + if (_agent->transient().has(_agencyPrefix)) { + _transient = _agent->transient().get(_agencyPrefix); + } + }); return true; @@ -476,6 +483,7 @@ bool Supervision::updateSnapshot() { // All checks, guarded by main thread bool Supervision::doChecks() { + _lock.assertLockedByCurrentThread(); checkDBServers(); checkCoordinators(); return true; @@ -494,19 +502,27 @@ void Supervision::run() { CONDITION_LOCKER(guard, _cv); _cv.wait(static_cast(1000000 * _frequency)); } - + + bool done = false; MUTEX_LOCKER(locker, _lock); - if (_agent->readDB().has(supervisionNode)) { - try { - _snapshot = _agent->readDB().get(supervisionNode); - if (_snapshot.children().size() > 0) { - break; + _agent->executeLocked([&]() { + if (_agent->readDB().has(supervisionNode)) { + try { + _snapshot = _agent->readDB().get(supervisionNode); + if (_snapshot.children().size() > 0) { + done = true; + } + } catch (...) { + LOG_TOPIC(WARN, Logger::SUPERVISION) << + "Main node in agency gone. Contact your db administrator."; } - } catch (...) { - LOG_TOPIC(WARN, Logger::SUPERVISION) << - "Main node in agency gone. Contact your db administrator."; } + }); + + if (done) { + break; } + LOG_TOPIC(DEBUG, Logger::SUPERVISION) << "Waiting for ArangoDB to " "initialize its data."; } @@ -518,13 +534,13 @@ void Supervision::run() { while (!this->isStopping()) { - // Get bunch of job IDs from agency for future jobs - if (_agent->leading() && (_jobId == 0 || _jobId == _jobIdMax)) { - getUniqueIds(); // cannot fail but only hang - } - { MUTEX_LOCKER(locker, _lock); + + // Get bunch of job IDs from agency for future jobs + if (_agent->leading() && (_jobId == 0 || _jobId == _jobIdMax)) { + getUniqueIds(); // cannot fail but only hang + } updateSnapshot(); @@ -562,12 +578,14 @@ void Supervision::run() { // Guarded by caller bool Supervision::isShuttingDown() { + _lock.assertLockedByCurrentThread(); return (_snapshot.has("Shutdown") && _snapshot("Shutdown").isBool()) ? _snapshot("/Shutdown").getBool() : false; } // Guarded by caller std::string Supervision::serverHealth(std::string const& serverName) { + _lock.assertLockedByCurrentThread(); std::string const serverStatus(healthPrefix + serverName + "/Status"); return (_snapshot.has(serverStatus)) ? _snapshot(serverStatus).getString() : std::string(); @@ -575,6 +593,8 @@ std::string Supervision::serverHealth(std::string const& serverName) { // Guarded by caller void Supervision::handleShutdown() { + _lock.assertLockedByCurrentThread(); + _selfShutdown = true; LOG_TOPIC(DEBUG, Logger::SUPERVISION) << "Waiting for clients to shut down"; auto const& serversRegistered = @@ -622,6 +642,7 @@ void Supervision::handleShutdown() { // Guarded by caller bool Supervision::handleJobs() { + _lock.assertLockedByCurrentThread(); // Do supervision shrinkCluster(); @@ -633,6 +654,7 @@ bool Supervision::handleJobs() { // Guarded by caller void Supervision::workJobs() { + _lock.assertLockedByCurrentThread(); for (auto const& todoEnt : _snapshot(toDoPrefix).children()) { JobContext( @@ -648,6 +670,7 @@ void Supervision::workJobs() { void Supervision::enforceReplication() { + _lock.assertLockedByCurrentThread(); auto const& plannedDBs = _snapshot(planColPrefix).children(); for (const auto& db_ : plannedDBs) { // Planned databases @@ -721,6 +744,7 @@ void Supervision::enforceReplication() { } void Supervision::fixPrototypeChain(Builder& migrate) { + _lock.assertLockedByCurrentThread(); auto const& snap = _snapshot; @@ -763,6 +787,7 @@ void Supervision::fixPrototypeChain(Builder& migrate) { // Shrink cluster if applicable, guarded by caller void Supervision::shrinkCluster() { + _lock.assertLockedByCurrentThread(); auto const& todo = _snapshot(toDoPrefix).children(); auto const& pending = _snapshot(pendingPrefix).children(); @@ -863,6 +888,7 @@ bool Supervision::start(Agent* agent) { static std::string const syncLatest = "/Sync/LatestID"; void Supervision::getUniqueIds() { + _lock.assertLockedByCurrentThread(); size_t n = 10000; @@ -907,28 +933,3 @@ void Supervision::beginShutdown() { CONDITION_LOCKER(guard, _cv); guard.broadcast(); } - - -void Supervision::missingPrototype() { - - auto const& plannedDBs = _snapshot(planColPrefix).children(); - //auto available = Job::availableServers(_snapshot); - - // key: prototype, value: clone - //std::multimap likeness; - - for (const auto& db_ : plannedDBs) { // Planned databases - auto const& db = *(db_.second); - - for (const auto& col_ : db.children()) { // Planned collections - auto const& col = *(col_.second); - - auto prototype = col("distributeShardsLike").slice().copyString(); - if (prototype.empty()) { - continue; - } - - } - } -} - diff --git a/arangod/Agency/Supervision.h b/arangod/Agency/Supervision.h index a461605b9d..62497a35dd 100644 --- a/arangod/Agency/Supervision.h +++ b/arangod/Agency/Supervision.h @@ -132,9 +132,6 @@ class Supervision : public arangodb::Thread { /// @brief Upgrade agency to supervision overhaul jobs void upgradeOne(VPackBuilder&); - /// @brief Check for inconsistencies in distributeShardsLike - void missingPrototype(); - /// @brief Check for inconsistencies in replication factor vs dbs entries void enforceReplication(); diff --git a/arangod/Cluster/ClusterInfo.cpp b/arangod/Cluster/ClusterInfo.cpp index c40b329a18..2474bf1b68 100644 --- a/arangod/Cluster/ClusterInfo.cpp +++ b/arangod/Cluster/ClusterInfo.cpp @@ -1326,9 +1326,9 @@ int ClusterInfo::createCollectionCoordinator(std::string const& databaseName, if (!res.successful()) { if (res.httpCode() == (int)arangodb::rest::ResponseCode::PRECONDITION_FAILED) { + auto result = res.slice(); AgencyCommResult ag = ac.getValues("/"); - auto result = res.slice(); if (result.isArray() && result.length() > 0) { if (result[0].isObject()) { auto tres = result[0]; diff --git a/js/client/tests/agency/agency-test.js b/js/client/tests/agency/agency-test.js index 76c15aaa8f..51a4e9bbc3 100644 --- a/js/client/tests/agency/agency-test.js +++ b/js/client/tests/agency/agency-test.js @@ -942,7 +942,7 @@ function agencyTestSuite () { require("console").warn("Provoking second log compaction for now with", count3, "keys, from log entry", cur + count + count2, "on."); doCountTransactions(count3, count + count2); - }/*, + }, //////////////////////////////////////////////////////////////////////////////// /// @brief Huge transaction package @@ -955,7 +955,7 @@ function agencyTestSuite () { } writeAndCheck(huge); assertEqual(readAndCheck([["a"]]), [{"a":20000}]); - }*/ + } }; } diff --git a/js/common/bootstrap/errors.js b/js/common/bootstrap/errors.js index d4f301dfef..3a6921fc05 100644 --- a/js/common/bootstrap/errors.js +++ b/js/common/bootstrap/errors.js @@ -307,6 +307,7 @@ "ERROR_CANNOT_DROP_SMART_COLLECTION" : { "code" : 4002, "message" : "cannot drop this smart collection" }, "ERROR_KEY_MUST_BE_PREFIXED_WITH_SMART_GRAPH_ATTRIBUTE" : { "code" : 4003, "message" : "in smart vertex collections _key must be prefixed with the value of the smart graph attribute" }, "ERROR_ILLEGAL_SMART_GRAPH_ATTRIBUTE" : { "code" : 4004, "message" : "attribute cannot be used as smart graph attribute" }, + "ERROR_AGENCY_INQUIRY_SYNTAX" : { "code" : 20001, "message" : "Illegal inquiry syntax" }, "ERROR_AGENCY_INFORM_MUST_BE_OBJECT" : { "code" : 20011, "message" : "Inform message must be an object." }, "ERROR_AGENCY_INFORM_MUST_CONTAIN_TERM" : { "code" : 20012, "message" : "Inform message must contain uint parameter 'term'" }, "ERROR_AGENCY_INFORM_MUST_CONTAIN_ID" : { "code" : 20013, "message" : "Inform message must contain string parameter 'id'" }, diff --git a/js/server/tests/resilience/moving-shards-cluster.js b/js/server/tests/resilience/moving-shards-cluster.js index 21716298e5..df541ce49c 100644 --- a/js/server/tests/resilience/moving-shards-cluster.js +++ b/js/server/tests/resilience/moving-shards-cluster.js @@ -112,6 +112,7 @@ function MovingShardsSuite () { var request = require("@arangodb/request"); var endpointToURL = require("@arangodb/cluster").endpointToURL; var url = endpointToURL(coordEndpoint); + var res; try { var envelope = @@ -120,7 +121,7 @@ function MovingShardsSuite () { } catch (err) { console.error( "Exception for POST /_admin/cluster/cleanOutServer:", err.stack); - return []; + return {}; } var body = res.body; if (typeof body === "string") { diff --git a/lib/Basics/Mutex.cpp b/lib/Basics/Mutex.cpp index fc76580dc2..c85ca9ee4a 100644 --- a/lib/Basics/Mutex.cpp +++ b/lib/Basics/Mutex.cpp @@ -129,6 +129,10 @@ void Mutex::unlock() { void Mutex::assertLockedByCurrentThread() { TRI_ASSERT(_holder == Thread::currentThreadId()); } + +void Mutex::assertNotLockedByCurrentThread() { + TRI_ASSERT(_holder != Thread::currentThreadId()); +} #endif // ----------------------------------------------------------------------------- diff --git a/lib/Basics/Mutex.h b/lib/Basics/Mutex.h index 46e6d5d0dd..f9a12bf147 100644 --- a/lib/Basics/Mutex.h +++ b/lib/Basics/Mutex.h @@ -51,8 +51,10 @@ class Mutex { // nothing in non-maintainer mode and will do nothing for non-posix locks #ifdef ARANGODB_ENABLE_MAINTAINER_MODE void assertLockedByCurrentThread(); + void assertNotLockedByCurrentThread(); #else inline void assertLockedByCurrentThread() {} + inline void assertNotLockedByCurrentThread() {} #endif private: diff --git a/lib/Basics/errors.dat b/lib/Basics/errors.dat index baae74b4ee..7dbdee7e71 100755 --- a/lib/Basics/errors.dat +++ b/lib/Basics/errors.dat @@ -429,6 +429,7 @@ ERROR_ILLEGAL_SMART_GRAPH_ATTRIBUTE,4004,"attribute cannot be used as smart grap ## Agency errors ################################################################################ +ERROR_AGENCY_INQUIRY_SYNTAX,20001,"Illegal inquiry syntax","Inquiry handles a list of string clientIds: [,...]." ERROR_AGENCY_INFORM_MUST_BE_OBJECT,20011,"Inform message must be an object.","The inform message in the agency must be an object." ERROR_AGENCY_INFORM_MUST_CONTAIN_TERM,20012,"Inform message must contain uint parameter 'term'","The inform message in the agency must contain a uint parameter 'term'." ERROR_AGENCY_INFORM_MUST_CONTAIN_ID,20013,"Inform message must contain string parameter 'id'","The inform message in the agency must contain a string parameter 'id'." diff --git a/lib/Basics/voc-errors.cpp b/lib/Basics/voc-errors.cpp index a13093e4d5..3b8ac11522 100644 --- a/lib/Basics/voc-errors.cpp +++ b/lib/Basics/voc-errors.cpp @@ -169,9 +169,9 @@ void TRI_InitializeErrorMessages () { REG_ERROR(ERROR_CLUSTER_SHARD_LEADER_REFUSES_REPLICATION, "a shard leader refuses to perform a replication operation"); REG_ERROR(ERROR_CLUSTER_SHARD_FOLLOWER_REFUSES_OPERATION, "a shard follower refuses to perform an operation that is not a replication"); REG_ERROR(ERROR_CLUSTER_SHARD_LEADER_RESIGNED, "a (former) shard leader refuses to perform an operation, because it has resigned in the meantime"); + REG_ERROR(ERROR_CLUSTER_AGENCY_COMMUNICATION_FAILED, "some agency operation failed"); REG_ERROR(ERROR_CLUSTER_DISTRIBUTE_SHARDS_LIKE_REPLICATION_FACTOR, "conflicting replication factor with distributeShardsLike parameter assignment"); REG_ERROR(ERROR_CLUSTER_DISTRIBUTE_SHARDS_LIKE_NUMBER_OF_SHARDS, "conflicting shard number with distributeShardsLike parameter assignment"); - REG_ERROR(ERROR_CLUSTER_AGENCY_COMMUNICATION_FAILED, "some agency operation failed"); REG_ERROR(ERROR_QUERY_KILLED, "query killed"); REG_ERROR(ERROR_QUERY_PARSE, "%s"); REG_ERROR(ERROR_QUERY_EMPTY, "query is empty"); @@ -303,6 +303,7 @@ void TRI_InitializeErrorMessages () { REG_ERROR(ERROR_CANNOT_DROP_SMART_COLLECTION, "cannot drop this smart collection"); REG_ERROR(ERROR_KEY_MUST_BE_PREFIXED_WITH_SMART_GRAPH_ATTRIBUTE, "in smart vertex collections _key must be prefixed with the value of the smart graph attribute"); REG_ERROR(ERROR_ILLEGAL_SMART_GRAPH_ATTRIBUTE, "attribute cannot be used as smart graph attribute"); + REG_ERROR(ERROR_AGENCY_INQUIRY_SYNTAX, "Illegal inquiry syntax"); REG_ERROR(ERROR_AGENCY_INFORM_MUST_BE_OBJECT, "Inform message must be an object."); REG_ERROR(ERROR_AGENCY_INFORM_MUST_CONTAIN_TERM, "Inform message must contain uint parameter 'term'"); REG_ERROR(ERROR_AGENCY_INFORM_MUST_CONTAIN_ID, "Inform message must contain string parameter 'id'"); diff --git a/lib/Basics/voc-errors.h b/lib/Basics/voc-errors.h index 11910b26f6..d1c2176ce6 100644 --- a/lib/Basics/voc-errors.h +++ b/lib/Basics/voc-errors.h @@ -723,6 +723,8 @@ /// - 4004: @LIT{attribute cannot be used as smart graph attribute} /// The given smartGraph attribute is illegal and connot be used for /// sharding. All system attributes are forbidden. +/// - 20001: @LIT{Illegal inquiry syntax} +/// Inquiry handles a list of string clientIds: [,...]. /// - 20011: @LIT{Inform message must be an object.} /// The inform message in the agency must be an object. /// - 20012: @LIT{Inform message must contain uint parameter 'term'} @@ -2494,7 +2496,8 @@ void TRI_InitializeErrorMessages (); //////////////////////////////////////////////////////////////////////////////// /// @brief 1493: ERROR_CLUSTER_DISTRIBUTE_SHARDS_LIKE_REPLICATION_FACTOR /// -/// conflicting replication factor with distributeShardsLike parameter assignment +/// conflicting replication factor with distributeShardsLike parameter +/// assignment /// /// Will be raised if intended replication factor does not match that of the /// prototype shard given in ditributeShardsLike parameter. @@ -3851,6 +3854,16 @@ void TRI_InitializeErrorMessages (); #define TRI_ERROR_ILLEGAL_SMART_GRAPH_ATTRIBUTE (4004) +//////////////////////////////////////////////////////////////////////////////// +/// @brief 20001: ERROR_AGENCY_INQUIRY_SYNTAX +/// +/// Illegal inquiry syntax +/// +/// Inquiry handles a list of string clientIds: [,...]. +//////////////////////////////////////////////////////////////////////////////// + +#define TRI_ERROR_AGENCY_INQUIRY_SYNTAX (20001) + //////////////////////////////////////////////////////////////////////////////// /// @brief 20011: ERROR_AGENCY_INFORM_MUST_BE_OBJECT ///