1
0
Fork 0

Merge branch 'devel' of https://github.com/arangodb/arangodb into devel

This commit is contained in:
Simon Grätzer 2017-04-28 17:37:42 +02:00
commit f38b203f94
5 changed files with 97 additions and 10 deletions

View File

@ -184,7 +184,8 @@ bool AddFollower::start() {
// Randomly choose enough servers:
std::vector<std::string> chosen;
for (size_t i = 0; i < desiredReplFactor - actualReplFactor; ++i) {
size_t pos = arangodb::RandomGenerator::interval(0, available.size() - 1);
size_t pos = arangodb::RandomGenerator::interval(static_cast<int64_t>(0),
available.size() - 1);
chosen.push_back(available[pos]);
if (pos < available.size() - 1) {
available[pos] = available[available.size() - 1];

View File

@ -1402,14 +1402,28 @@ AgencyCommResult AgencyComm::sendWithFailover(
"Failed agency comm (" << result._statusCode << ")! " <<
"Inquiring about clientId " << clientId << ".";
AgencyCommResult inq = send(
connection.get(), method, conTimeout, "/_api/agency/inquire",
b.toJson(), "");
AgencyCommResult inq;
std::shared_ptr<VPackBuilder> bodyBuilder;
VPackSlice outer;
while (true) {
inq = send(
connection.get(), method, conTimeout, "/_api/agency/inquire",
b.toJson(), "");
if (!inq.successful()) {
break;
}
bodyBuilder = VPackParser::fromJson(inq._body);
outer = bodyBuilder->slice();
if (!outer.isString() || outer.copyString() != "ongoing") {
break;
}
// We do not really know what has happened, so we have to ask
// again later!
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
if (inq.successful()) {
auto bodyBuilder = VPackParser::fromJson(inq._body);
auto const& outer = bodyBuilder->slice();
if (outer.isArray() && outer.length() > 0) {
bool success = false;
for (auto const& inner : VPackArrayIterator(outer)) {

View File

@ -639,6 +639,7 @@ trans_ret_t Agent::transact(query_t const& queries) {
// Apply to spearhead and get indices for log entries
auto qs = queries->slice();
addTrxsOngoing(qs); // remember that these are ongoing
auto ret = std::make_shared<arangodb::velocypack::Builder>();
size_t failed = 0;
ret->openArray();
@ -669,6 +670,8 @@ trans_ret_t Agent::transact(query_t const& queries) {
}
}
removeTrxsOngoing(qs);
// (either no writes or all preconditions failed)
/* if (maxind == 0) {
ret->clear();
@ -741,22 +744,37 @@ inquire_ret_t Agent::inquire(query_t const& query) {
auto si = _state.inquire(query);
bool found = false;
auto builder = std::make_shared<VPackBuilder>();
{
VPackArrayBuilder b(builder.get());
for (auto const& i : si) {
VPackArrayBuilder bb(builder.get());
for (auto const& j : i) {
found = true;
VPackObjectBuilder bbb(builder.get());
builder->add("index", VPackValue(j.index));
builder->add("term", VPackValue(j.term));
builder->add("query", VPackSlice(j.entry->data()));
builder->add("index", VPackValue(j.index));
}
}
}
ret = inquire_ret_t(true, id(), builder);
if (!found) {
return ret;
}
// Check ongoing ones:
for (auto const& s : VPackArrayIterator(query->slice())) {
std::string ss = s.copyString();
if (isTrxOngoing(ss)) {
ret.result->clear();
ret.result->add(VPackValue("ongoing"));
}
}
return ret;
}
@ -773,6 +791,8 @@ write_ret_t Agent::write(query_t const& query) {
return write_ret_t(false, leader);
}
addTrxsOngoing(query->slice()); // remember that these are ongoing
// Apply to spearhead and get indices for log entries
{
MUTEX_LOCKER(ioLocker, _ioLock);
@ -785,9 +805,10 @@ write_ret_t Agent::write(query_t const& query) {
applied = _spearhead.apply(query);
indices = _state.log(query, applied, term());
}
removeTrxsOngoing(query->slice());
// Maximum log index
index_t maxind = 0;
if (!indices.empty()) {
@ -1436,4 +1457,40 @@ query_t Agent::buildDB(arangodb::consensus::index_t index) {
}
void Agent::addTrxsOngoing(Slice trxs) {
try {
MUTEX_LOCKER(guard,_trxsLock);
for (auto const& trx : VPackArrayIterator(trxs)) {
if (trx[0].isObject() && trx.length() == 3 && trx[2].isString()) {
// only those are interesting:
_ongoingTrxs.insert(trx[2].copyString());
}
}
} catch (...) {
}
}
void Agent::removeTrxsOngoing(Slice trxs) {
try {
MUTEX_LOCKER(guard, _trxsLock);
for (auto const& trx : VPackArrayIterator(trxs)) {
if (trx[0].isObject() && trx.length() == 3 && trx[2].isString()) {
// only those are interesting:
_ongoingTrxs.erase(trx[2].copyString());
}
}
} catch (...) {
}
}
bool Agent::isTrxOngoing(std::string& id) {
try {
MUTEX_LOCKER(guard, _trxsLock);
auto it = _ongoingTrxs.find(id);
return it != _ongoingTrxs.end();
} catch (...) {
return false;
}
}
}} // namespace

View File

@ -108,6 +108,15 @@ class Agent : public arangodb::Thread,
/// @brief Attempt read/write transaction
trans_ret_t transact(query_t const&) override;
/// @brief Put trxs into list of ongoing ones.
void addTrxsOngoing(Slice trxs);
/// @brief Remove trxs from list of ongoing ones.
void removeTrxsOngoing(Slice trxs);
/// @brief Check whether a trx is ongoing.
bool isTrxOngoing(std::string& id);
/// @brief Received by followers to replicate log entries ($5.3);
/// also used as heartbeat ($5.2).
bool recvAppendEntriesRPC(term_t term, std::string const& leaderId,
@ -343,6 +352,12 @@ class Agent : public arangodb::Thread,
/// @brief Keep track of when I last took on leadership
TimePoint _leaderSince;
/// @brief Ids of ongoing transactions, used for inquire:
std::set<std::string> _ongoingTrxs;
// lock for _ongoingTrxs
arangodb::Mutex _trxsLock;
};
}
}

View File

@ -392,7 +392,7 @@ bool CleanOutServer::scheduleMoveShards(std::shared_ptr<Builder>& trx) {
}
toServer = serversCopy.at(arangodb::RandomGenerator::interval(
0, serversCopy.size()-1));
static_cast<int64_t>(0), serversCopy.size()-1));
// Schedule move into trx:
MoveShard(_snapshot, _agent, _jobId + "-" + std::to_string(sub++),