diff --git a/arangod/Cluster/ClusterComm.cpp b/arangod/Cluster/ClusterComm.cpp index e2e61f6eac..314556840d 100644 --- a/arangod/Cluster/ClusterComm.cpp +++ b/arangod/Cluster/ClusterComm.cpp @@ -995,41 +995,63 @@ size_t ClusterComm::performRequests(std::vector& requests, std::unordered_map opIDtoIndex; try { - while (now <= endTime) { + while (true) { + now = TRI_microtime(); + if (now > endTime) { + break; + } if (nrDone >= requests.size()) { // All good, report return nrGood; } + double actionNeeded = endTime; + // First send away what is due: for (size_t i = 0; i < requests.size(); i++) { - if (!requests[i].done && now >= dueTime[i]) { - if (requests[i].headerFields.get() == nullptr) { - requests[i].headerFields = std::make_unique< - std::unordered_map>(); - } - LOG_TOPIC(TRACE, logTopic) - << "ClusterComm::performRequests: sending request to " - << requests[i].destination << ":" << requests[i].path - << "body:" << requests[i].body; - - dueTime[i] = endTime + 10; - double localTimeout = endTime - now; - OperationID opId = asyncRequest( - "", coordinatorTransactionID, requests[i].destination, - requests[i].requestType, requests[i].path, requests[i].body, - requests[i].headerFields, nullptr, localTimeout, false, - 2.0); + if (!requests[i].done) { + if (now >= dueTime[i]) { + if (requests[i].headerFields.get() == nullptr) { + requests[i].headerFields = std::make_unique< + std::unordered_map>(); + } + LOG_TOPIC(TRACE, logTopic) + << "ClusterComm::performRequests: sending request to " + << requests[i].destination << ":" << requests[i].path + << "body:" << requests[i].body; + + dueTime[i] = endTime + 10; + double localTimeout = endTime - now; + OperationID opId = asyncRequest( + "", coordinatorTransactionID, requests[i].destination, + requests[i].requestType, requests[i].path, requests[i].body, + requests[i].headerFields, nullptr, localTimeout, false, + 2.0); - opIDtoIndex.insert(std::make_pair(opId, i)); - // It is possible that an error occurs right away, we will notice - // below after wait(), though, and retry in due course. + opIDtoIndex.insert(std::make_pair(opId, i)); + // It is possible that an error occurs right away, we will notice + // below after wait(), though, and retry in due course. + } else if (dueTime[i] < actionNeeded) { + actionNeeded = dueTime[i]; + } } } - now = TRI_microtime(); - auto res = - wait("", coordinatorTransactionID, 0, "", endTime - now); + auto res = wait("", coordinatorTransactionID, 0, "", actionNeeded - now); + // wait could have taken some time, so we need to update now now + now = TRI_microtime(); + // note that this is needed further below from here, so it is *not* + // good enough to only update at the top of the loop! + + if (res.status == CL_COMM_DROPPED) { + // this indicates that no request for this coordinatorTransactionID + // is in flight, this is possible, since we might have scheduled + // a retry later than now and simply wait till then + if (now < actionNeeded) { + usleep((actionNeeded - now) * 1000000); + } + continue; + } auto it = opIDtoIndex.find(res.operationID); if (it == opIDtoIndex.end()) { @@ -1052,32 +1074,33 @@ size_t ClusterComm::performRequests(std::vector& requests, << requests[index].path << " with return code " << (int)res.answer_code; } else if (res.status == CL_COMM_BACKEND_UNAVAILABLE || - (res.status == CL_COMM_TIMEOUT && !res.sendWasComplete)) { - requests[index].result = res; + (res.status == CL_COMM_TIMEOUT && !res.sendWasComplete)) { + requests[index].result = res; - // In this case we will retry: - dueTime[index] = - (std::min)(10.0, (std::max)(0.2, 2 * (now - startTime))) + now; - if (dueTime[index] >= endTime) { - requests[index].done = true; - nrDone++; - } - LOG_TOPIC(ERR, Logger::CLUSTER) << "ClusterComm::performRequests: " - << "got BACKEND_UNAVAILABLE or TIMEOUT from " - << requests[index].destination << ":" << requests[index].path; - } else { // a "proper error" - requests[index].result = res; + // In this case we will retry: + double tryAgainAfter = now - startTime; + if (tryAgainAfter < 0.2) { + tryAgainAfter = 0.2; + } else if (tryAgainAfter > 10.0) { + tryAgainAfter = 10.0; + } + dueTime[index] = tryAgainAfter + now; + if (dueTime[index] >= endTime) { requests[index].done = true; nrDone++; - LOG_TOPIC(ERR, Logger::CLUSTER) << "ClusterComm::performRequests: " - << "got no answer from " << requests[index].destination << ":" - << requests[index].path << " with error " << res.status; - } - if (nrDone >= requests.size()) { - // We are done, all results are in! - return nrGood; } + LOG_TOPIC(ERR, Logger::CLUSTER) << "ClusterComm::performRequests: " + << "got BACKEND_UNAVAILABLE or TIMEOUT from " + << requests[index].destination << ":" << requests[index].path; + } else { // a "proper error" + requests[index].result = res; + requests[index].done = true; + nrDone++; + LOG_TOPIC(ERR, Logger::CLUSTER) << "ClusterComm::performRequests: " + << "got no answer from " << requests[index].destination << ":" + << requests[index].path << " with error " << res.status; } + } } catch (...) { LOG_TOPIC(ERR, Logger::CLUSTER) << "ClusterComm::performRequests: " << "caught exception, ignoring...";