diff --git a/arangod/Agency/Agent.cpp b/arangod/Agency/Agent.cpp index d6afb614ee..b48acd2080 100644 --- a/arangod/Agency/Agent.cpp +++ b/arangod/Agency/Agent.cpp @@ -394,7 +394,7 @@ void Agent::sendAppendEntriesRPC() { auto startTime = steady_clock::now(); SteadyTimePoint earliestPackage; SteadyTimePoint lastAcked; - + { t = this->term(); MUTEX_LOCKER(tiLocker, _tiLock); @@ -452,7 +452,7 @@ void Agent::sendAppendEntriesRPC() { _lastSent[followerId].time_since_epoch().count() != 0) { LOG_TOPIC(DEBUG, Logger::AGENCY) << "Note: sent out last AppendEntriesRPC " - << "to follower " << followerId << " more than minPing ago: " + << "to follower " << followerId << " more than minPing ago: " << m.count() << " lastAcked: " << duration_cast>(lastAcked.time_since_epoch()).count(); } @@ -539,7 +539,7 @@ void Agent::sendAppendEntriesRPC() { resign(); return; } - + // Postpone sending the next message for 30 seconds or until an // error or successful result occurs. earliestPackage = steady_clock::now() + std::chrono::seconds(30); @@ -813,7 +813,7 @@ bool Agent::challengeLeadership() { /// Get last acknowledged responses on leader query_t Agent::lastAckedAgo() const { - + std::unordered_map lastAcked; { MUTEX_LOCKER(tiLocker, _tiLock); @@ -1251,7 +1251,7 @@ void Agent::beginShutdown() { bool Agent::prepareLead() { - + { // Erase _earliestPackage, which allows for immediate sending of // AppendEntriesRPC when we become a leader. @@ -1529,7 +1529,14 @@ arangodb::consensus::index_t Agent::readDB(Node& node) const { return _commitIndex; } -void Agent::executeLocked(std::function const& cb) { +void Agent::executeLockedRead(std::function const& cb) { + _tiLock.assertNotLockedByCurrentThread(); + MUTEX_LOCKER(ioLocker, _ioLock); + READ_LOCKER(oLocker, _outputLock); + cb(); +} + +void Agent::executeLockedWrite(std::function const& cb) { _tiLock.assertNotLockedByCurrentThread(); MUTEX_LOCKER(ioLocker, _ioLock); WRITE_LOCKER(oLocker, _outputLock); diff --git a/arangod/Agency/Agent.h b/arangod/Agency/Agent.h index 0a7fba46cf..5a6f52a92c 100644 --- a/arangod/Agency/Agent.h +++ b/arangod/Agency/Agent.h @@ -199,18 +199,32 @@ class Agent : public arangodb::Thread, State const& state() const; /// @brief execute a callback while holding _ioLock - void executeLocked(std::function const& cb); + /// and read lock for _readDB + void executeLockedRead(std::function const& cb); + + /// @brief execute a callback while holding _ioLock + /// and write lock for _readDB + void executeLockedWrite(std::function const& cb); /// @brief Get read store and compaction index index_t readDB(Node&) const; /// @brief Get read store + /// WARNING: this assumes caller holds appropriate + /// locks or will use executeLockedRead() or + /// executeLockedWrite() with a lambda function Store const& readDB() const; /// @brief Get spearhead store + /// WARNING: this assumes caller holds appropriate + /// locks or will use executeLockedRead() or + /// executeLockedWrite() with a lambda function Store const& spearhead() const; /// @brief Get transient store + /// WARNING: this assumes caller holds appropriate + /// locks or will use executeLockedRead() or + /// executeLockedWrite() with a lambda function Store const& transient() const; /// @brief Serve active agent interface @@ -313,7 +327,7 @@ class Agent : public arangodb::Thread, /// answers to appendEntriesRPC messages come in on the leader, and when /// appendEntriesRPC calls are received on the follower. In each case /// we hold the _ioLock when _commitIndex is changed. Reading and writing - /// must be done under the write lock of _outputLog and the mutex of + /// must be done under the write lock of _outputLog and the mutex of /// _waitForCV to allow a thread to wait for a change using that /// condition variable. index_t _commitIndex; @@ -420,7 +434,7 @@ class Agent : public arangodb::Thread, /// @brief Keep track of when I last took on leadership SteadyTimePoint _leaderSince; - + /// @brief Ids of ongoing transactions, used for inquire: std::unordered_set _ongoingTrxs; diff --git a/arangod/Agency/RestAgencyHandler.cpp b/arangod/Agency/RestAgencyHandler.cpp index 2f24851b8e..ebd4289a54 100644 --- a/arangod/Agency/RestAgencyHandler.cpp +++ b/arangod/Agency/RestAgencyHandler.cpp @@ -158,7 +158,7 @@ RestStatus RestAgencyHandler::handleStores() { { VPackObjectBuilder b(&body); { - _agent->executeLocked([&]() { + _agent->executeLockedRead([&]() { body.add(VPackValue("spearhead")); { VPackArrayBuilder bb(&body); @@ -167,9 +167,7 @@ RestStatus RestAgencyHandler::handleStores() { body.add(VPackValue("read_db")); { VPackArrayBuilder bb(&body); - _agent->executeLocked([&]() { - _agent->readDB().dumpToBuilder(body); - }); + _agent->readDB().dumpToBuilder(body); } body.add(VPackValue("transient")); { diff --git a/arangod/Agency/Supervision.cpp b/arangod/Agency/Supervision.cpp index 7543beb786..3fb2b7a1b4 100644 --- a/arangod/Agency/Supervision.cpp +++ b/arangod/Agency/Supervision.cpp @@ -542,7 +542,7 @@ bool Supervision::updateSnapshot() { return false; } - _agent->executeLocked([&]() { + _agent->executeLockedRead([&]() { if (_agent->readDB().has(_agencyPrefix)) { _snapshot = _agent->readDB().get(_agencyPrefix); } @@ -582,7 +582,7 @@ void Supervision::run() { bool done = false; MUTEX_LOCKER(locker, _lock); - _agent->executeLocked([&]() { + _agent->executeLockedRead([&]() { if (_agent->readDB().has(supervisionNode)) { try { _snapshot = _agent->readDB().get(supervisionNode);