1
0
Fork 0

Fix performRequests?

This commit is contained in:
Max Neunhoeffer 2016-10-13 10:57:29 +02:00
parent 3ee28ac333
commit b993de3c42
1 changed files with 68 additions and 45 deletions

View File

@ -995,41 +995,63 @@ size_t ClusterComm::performRequests(std::vector<ClusterCommRequest>& requests,
std::unordered_map<OperationID, size_t> opIDtoIndex;
try {
while (now <= endTime) {
while (true) {
now = TRI_microtime();
if (now > endTime) {
break;
}
if (nrDone >= requests.size()) {
// All good, report
return nrGood;
}
double actionNeeded = endTime;
// First send away what is due:
for (size_t i = 0; i < requests.size(); i++) {
if (!requests[i].done && now >= dueTime[i]) {
if (requests[i].headerFields.get() == nullptr) {
requests[i].headerFields = std::make_unique<
std::unordered_map<std::string, std::string>>();
}
LOG_TOPIC(TRACE, logTopic)
<< "ClusterComm::performRequests: sending request to "
<< requests[i].destination << ":" << requests[i].path
<< "body:" << requests[i].body;
dueTime[i] = endTime + 10;
double localTimeout = endTime - now;
OperationID opId = asyncRequest(
"", coordinatorTransactionID, requests[i].destination,
requests[i].requestType, requests[i].path, requests[i].body,
requests[i].headerFields, nullptr, localTimeout, false,
2.0);
if (!requests[i].done) {
if (now >= dueTime[i]) {
if (requests[i].headerFields.get() == nullptr) {
requests[i].headerFields = std::make_unique<
std::unordered_map<std::string, std::string>>();
}
LOG_TOPIC(TRACE, logTopic)
<< "ClusterComm::performRequests: sending request to "
<< requests[i].destination << ":" << requests[i].path
<< "body:" << requests[i].body;
dueTime[i] = endTime + 10;
double localTimeout = endTime - now;
OperationID opId = asyncRequest(
"", coordinatorTransactionID, requests[i].destination,
requests[i].requestType, requests[i].path, requests[i].body,
requests[i].headerFields, nullptr, localTimeout, false,
2.0);
opIDtoIndex.insert(std::make_pair(opId, i));
// It is possible that an error occurs right away, we will notice
// below after wait(), though, and retry in due course.
opIDtoIndex.insert(std::make_pair(opId, i));
// It is possible that an error occurs right away, we will notice
// below after wait(), though, and retry in due course.
} else if (dueTime[i] < actionNeeded) {
actionNeeded = dueTime[i];
}
}
}
now = TRI_microtime();
auto res =
wait("", coordinatorTransactionID, 0, "", endTime - now);
auto res = wait("", coordinatorTransactionID, 0, "", actionNeeded - now);
// wait could have taken some time, so we need to update now now
now = TRI_microtime();
// note that this is needed further below from here, so it is *not*
// good enough to only update at the top of the loop!
if (res.status == CL_COMM_DROPPED) {
// this indicates that no request for this coordinatorTransactionID
// is in flight, this is possible, since we might have scheduled
// a retry later than now and simply wait till then
if (now < actionNeeded) {
usleep((actionNeeded - now) * 1000000);
}
continue;
}
auto it = opIDtoIndex.find(res.operationID);
if (it == opIDtoIndex.end()) {
@ -1052,32 +1074,33 @@ size_t ClusterComm::performRequests(std::vector<ClusterCommRequest>& requests,
<< requests[index].path << " with return code "
<< (int)res.answer_code;
} else if (res.status == CL_COMM_BACKEND_UNAVAILABLE ||
(res.status == CL_COMM_TIMEOUT && !res.sendWasComplete)) {
requests[index].result = res;
(res.status == CL_COMM_TIMEOUT && !res.sendWasComplete)) {
requests[index].result = res;
// In this case we will retry:
dueTime[index] =
(std::min)(10.0, (std::max)(0.2, 2 * (now - startTime))) + now;
if (dueTime[index] >= endTime) {
requests[index].done = true;
nrDone++;
}
LOG_TOPIC(ERR, Logger::CLUSTER) << "ClusterComm::performRequests: "
<< "got BACKEND_UNAVAILABLE or TIMEOUT from "
<< requests[index].destination << ":" << requests[index].path;
} else { // a "proper error"
requests[index].result = res;
// In this case we will retry:
double tryAgainAfter = now - startTime;
if (tryAgainAfter < 0.2) {
tryAgainAfter = 0.2;
} else if (tryAgainAfter > 10.0) {
tryAgainAfter = 10.0;
}
dueTime[index] = tryAgainAfter + now;
if (dueTime[index] >= endTime) {
requests[index].done = true;
nrDone++;
LOG_TOPIC(ERR, Logger::CLUSTER) << "ClusterComm::performRequests: "
<< "got no answer from " << requests[index].destination << ":"
<< requests[index].path << " with error " << res.status;
}
if (nrDone >= requests.size()) {
// We are done, all results are in!
return nrGood;
}
LOG_TOPIC(ERR, Logger::CLUSTER) << "ClusterComm::performRequests: "
<< "got BACKEND_UNAVAILABLE or TIMEOUT from "
<< requests[index].destination << ":" << requests[index].path;
} else { // a "proper error"
requests[index].result = res;
requests[index].done = true;
nrDone++;
LOG_TOPIC(ERR, Logger::CLUSTER) << "ClusterComm::performRequests: "
<< "got no answer from " << requests[index].destination << ":"
<< requests[index].path << " with error " << res.status;
}
}
} catch (...) {
LOG_TOPIC(ERR, Logger::CLUSTER) << "ClusterComm::performRequests: "
<< "caught exception, ignoring...";