1
0
Fork 0

Recapsulating MUTEX in key value Store

This commit is contained in:
Kaveh Vahedipour 2016-11-04 11:42:08 +01:00
parent 5efb3d2ce8
commit 62492195e9
4 changed files with 46 additions and 30 deletions

View File

@ -496,11 +496,11 @@ bool Agent::load() {
}
LOG_TOPIC(DEBUG, Logger::AGENCY) << "Reassembling spearhead and read stores.";
{
MUTEX_LOCKER(commitLock, _ioLock);
// {
// MUTEX_LOCKER(commitLock, _ioLock);
_spearhead.apply(
_state.slices(_lastCommitIndex + 1), _lastCommitIndex, _constituent.term());
}
// }
{
CONDITION_LOCKER(guard, _appendCV);
@ -910,7 +910,7 @@ void Agent::notify(query_t const& message) {
// Rebuild key value stores
bool Agent::rebuildDBs() {
MUTEX_LOCKER(mutexLocker, _ioLock);
_spearhead.apply(_state.slices(_lastCommitIndex + 1), _lastCommitIndex,

View File

@ -367,7 +367,7 @@ bool Inception::estimateRAFTInterval() {
2.0, true);
}
}
std::this_thread::sleep_for(std::chrono::duration<double,std::milli>(5));
std::this_thread::sleep_for(std::chrono::duration<double,std::milli>(1));
}
auto s = system_clock::now();
@ -468,9 +468,9 @@ bool Inception::estimateRAFTInterval() {
}
}
maxmean = 1.e-3*std::ceil(1.e3*(.25 + 1.0e-3*(maxmean+3*maxstdev)));
maxmean = 1.e-3*std::ceil(1.e3*(.2 + 1.0e-3*(maxmean+3*maxstdev)));
LOG_TOPIC(INFO, Logger::AGENCY)
LOG_TOPIC(DEBUG, Logger::AGENCY)
<< "Auto-adapting RAFT timing to: {" << maxmean
<< ", " << 5.0*maxmean << "}s";

View File

@ -272,7 +272,7 @@ std::vector<log_t> State::get(arangodb::consensus::index_t start,
std::vector<VPackSlice> State::slices(arangodb::consensus::index_t start,
arangodb::consensus::index_t end) const {
std::vector<VPackSlice> slices;
// MUTEX_LOCKER(mutexLocker, _logLock); // Cannot be read lock (Compaction)
MUTEX_LOCKER(mutexLocker, _logLock); // Cannot be read lock (Compaction)
if (_log.empty()) {
return slices;
@ -562,34 +562,38 @@ bool State::loadRemaining() {
}
auto result = queryResult.result->slice();
index_t back = 0;
{
MUTEX_LOCKER(logLock, _logLock);
if (result.isArray()) {
MUTEX_LOCKER(logLock, _logLock);
if (result.isArray()) {
_log.clear();
for (auto const& i : VPackArrayIterator(result)) {
buffer_t tmp = std::make_shared<arangodb::velocypack::Buffer<uint8_t>>();
auto ii = i.resolveExternals();
auto req = ii.get("request");
tmp->append(req.startAs<char const>(), req.byteSize());
try {
_log.push_back(
_log.clear();
for (auto const& i : VPackArrayIterator(result)) {
buffer_t tmp = std::make_shared<arangodb::velocypack::Buffer<uint8_t>>();
auto ii = i.resolveExternals();
auto req = ii.get("request");
tmp->append(req.startAs<char const>(), req.byteSize());
try {
_log.push_back(
log_t(std::stoi(ii.get(StaticStrings::KeyString).copyString()),
static_cast<term_t>(ii.get("term").getUInt()), tmp));
} catch (std::exception const& e) {
LOG_TOPIC(ERR, Logger::AGENCY)
} catch (std::exception const& e) {
LOG_TOPIC(ERR, Logger::AGENCY)
<< "Failed to convert " +
ii.get(StaticStrings::KeyString).copyString() +
" to integer via std::stoi."
ii.get(StaticStrings::KeyString).copyString() +
" to integer via std::stoi."
<< e.what();
}
}
}
TRI_ASSERT(!_log.empty());
back = _log.back().index;
}
_agent->rebuildDBs();
TRI_ASSERT(!_log.empty());
_agent->lastCommitted(_log.back().index);
_agent->lastCommitted(back);
return true;
}

View File

@ -36,6 +36,15 @@ function shuffle() {
done
}
function isuint () {
re='^[0-9]+$'
if ! [[ $1 =~ $re ]] ; then
return 1;
else
return 0
fi
}
NRAGENTS=3
POOLSZ=""
TRANSPORT="tcp"
@ -133,7 +142,7 @@ if [ "$GOSSIP_MODE" = "0" ]; then
GOSSIP_PEERS=" --agency.endpoint $TRANSPORT://localhost:$BASE"
fi
rm -rf agency
#rm -rf agency
mkdir -p agency
PIDS=""
@ -180,10 +189,13 @@ for aid in "${aaid[@]}"; do
GOSSIP_PEERS+=" --agency.endpoint $TRANSPORT://localhost:$port"
fi
if [ $count -lt $POOLSZ ]; then
sleep $START_DELAYS
if isuint $START_DELAYS; then
printf "fixed delay %02ds " "$START_DELAYS"
sleep $START_DELAYS
fi
if [ "$RANDOM_DELAYS" == "true" ] ; then
delay=$(( RANDOM % 16 ))
printf " delaying %s seconds" "$delay"
printf "random delay %02ds" "$delay"
sleep $delay
fi
((count+=1))