1
0
Fork 0

create independent executeLockedRead and executeLockedWrite to speed performance (#4178)

This commit is contained in:
Matthew Von-Maszewski 2017-12-29 07:36:48 -05:00 committed by Max Neunhöffer
parent 1c01d07dbd
commit 41d1bfce23
4 changed files with 34 additions and 15 deletions

View File

@ -394,7 +394,7 @@ void Agent::sendAppendEntriesRPC() {
auto startTime = steady_clock::now();
SteadyTimePoint earliestPackage;
SteadyTimePoint lastAcked;
{
t = this->term();
MUTEX_LOCKER(tiLocker, _tiLock);
@ -452,7 +452,7 @@ void Agent::sendAppendEntriesRPC() {
_lastSent[followerId].time_since_epoch().count() != 0) {
LOG_TOPIC(DEBUG, Logger::AGENCY)
<< "Note: sent out last AppendEntriesRPC "
<< "to follower " << followerId << " more than minPing ago: "
<< "to follower " << followerId << " more than minPing ago: "
<< m.count() << " lastAcked: "
<< duration_cast<duration<double>>(lastAcked.time_since_epoch()).count();
}
@ -539,7 +539,7 @@ void Agent::sendAppendEntriesRPC() {
resign();
return;
}
// Postpone sending the next message for 30 seconds or until an
// error or successful result occurs.
earliestPackage = steady_clock::now() + std::chrono::seconds(30);
@ -813,7 +813,7 @@ bool Agent::challengeLeadership() {
/// Get last acknowledged responses on leader
query_t Agent::lastAckedAgo() const {
std::unordered_map<std::string, SteadyTimePoint> lastAcked;
{
MUTEX_LOCKER(tiLocker, _tiLock);
@ -1251,7 +1251,7 @@ void Agent::beginShutdown() {
bool Agent::prepareLead() {
{
// Erase _earliestPackage, which allows for immediate sending of
// AppendEntriesRPC when we become a leader.
@ -1529,7 +1529,14 @@ arangodb::consensus::index_t Agent::readDB(Node& node) const {
return _commitIndex;
}
void Agent::executeLocked(std::function<void()> const& cb) {
void Agent::executeLockedRead(std::function<void()> const& cb) {
_tiLock.assertNotLockedByCurrentThread();
MUTEX_LOCKER(ioLocker, _ioLock);
READ_LOCKER(oLocker, _outputLock);
cb();
}
void Agent::executeLockedWrite(std::function<void()> const& cb) {
_tiLock.assertNotLockedByCurrentThread();
MUTEX_LOCKER(ioLocker, _ioLock);
WRITE_LOCKER(oLocker, _outputLock);

View File

@ -199,18 +199,32 @@ class Agent : public arangodb::Thread,
State const& state() const;
/// @brief execute a callback while holding _ioLock
void executeLocked(std::function<void()> const& cb);
/// and read lock for _readDB
void executeLockedRead(std::function<void()> const& cb);
/// @brief execute a callback while holding _ioLock
/// and write lock for _readDB
void executeLockedWrite(std::function<void()> const& cb);
/// @brief Get read store and compaction index
index_t readDB(Node&) const;
/// @brief Get read store
/// WARNING: this assumes caller holds appropriate
/// locks or will use executeLockedRead() or
/// executeLockedWrite() with a lambda function
Store const& readDB() const;
/// @brief Get spearhead store
/// WARNING: this assumes caller holds appropriate
/// locks or will use executeLockedRead() or
/// executeLockedWrite() with a lambda function
Store const& spearhead() const;
/// @brief Get transient store
/// WARNING: this assumes caller holds appropriate
/// locks or will use executeLockedRead() or
/// executeLockedWrite() with a lambda function
Store const& transient() const;
/// @brief Serve active agent interface
@ -313,7 +327,7 @@ class Agent : public arangodb::Thread,
/// answers to appendEntriesRPC messages come in on the leader, and when
/// appendEntriesRPC calls are received on the follower. In each case
/// we hold the _ioLock when _commitIndex is changed. Reading and writing
/// must be done under the write lock of _outputLog and the mutex of
/// must be done under the write lock of _outputLog and the mutex of
/// _waitForCV to allow a thread to wait for a change using that
/// condition variable.
index_t _commitIndex;
@ -420,7 +434,7 @@ class Agent : public arangodb::Thread,
/// @brief Keep track of when I last took on leadership
SteadyTimePoint _leaderSince;
/// @brief Ids of ongoing transactions, used for inquire:
std::unordered_set<std::string> _ongoingTrxs;

View File

@ -158,7 +158,7 @@ RestStatus RestAgencyHandler::handleStores() {
{
VPackObjectBuilder b(&body);
{
_agent->executeLocked([&]() {
_agent->executeLockedRead([&]() {
body.add(VPackValue("spearhead"));
{
VPackArrayBuilder bb(&body);
@ -167,9 +167,7 @@ RestStatus RestAgencyHandler::handleStores() {
body.add(VPackValue("read_db"));
{
VPackArrayBuilder bb(&body);
_agent->executeLocked([&]() {
_agent->readDB().dumpToBuilder(body);
});
_agent->readDB().dumpToBuilder(body);
}
body.add(VPackValue("transient"));
{

View File

@ -542,7 +542,7 @@ bool Supervision::updateSnapshot() {
return false;
}
_agent->executeLocked([&]() {
_agent->executeLockedRead([&]() {
if (_agent->readDB().has(_agencyPrefix)) {
_snapshot = _agent->readDB().get(_agencyPrefix);
}
@ -582,7 +582,7 @@ void Supervision::run() {
bool done = false;
MUTEX_LOCKER(locker, _lock);
_agent->executeLocked([&]() {
_agent->executeLockedRead([&]() {
if (_agent->readDB().has(supervisionNode)) {
try {
_snapshot = _agent->readDB().get(supervisionNode);