mirror of https://gitee.com/bigwinds/arangodb
Waiting for leader election in AgencyComm::sendWithFailover
This commit is contained in:
parent
0cbe27a799
commit
07cddc3560
|
@ -21,6 +21,8 @@
|
||||||
/// @author Jan Steemann
|
/// @author Jan Steemann
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
#include <chrono>
|
||||||
|
|
||||||
#include "AgencyComm.h"
|
#include "AgencyComm.h"
|
||||||
|
|
||||||
#include <velocypack/Iterator.h>
|
#include <velocypack/Iterator.h>
|
||||||
|
@ -327,7 +329,9 @@ void AgencyCommResult::clear() {
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
VPackSlice AgencyCommResult::slice() {
|
VPackSlice AgencyCommResult::slice() {
|
||||||
TRI_ASSERT(_vpack != nullptr);
|
if (_vpack == nullptr) { // If not initialised, initialise to none.
|
||||||
|
_vpack = std::make_shared<arangodb::velocypack::Builder>();
|
||||||
|
}
|
||||||
return _vpack->slice();
|
return _vpack->slice();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1609,6 +1613,10 @@ AgencyCommResult AgencyComm::sendWithFailover(
|
||||||
std::string const& url, std::string const& body, bool isWatch) {
|
std::string const& url, std::string const& body, bool isWatch) {
|
||||||
size_t numEndpoints;
|
size_t numEndpoints;
|
||||||
|
|
||||||
|
using namespace std::chrono;
|
||||||
|
auto start = system_clock::now();
|
||||||
|
seconds ltimeout(static_cast<int>(timeout));
|
||||||
|
|
||||||
{
|
{
|
||||||
READ_LOCKER(readLocker, AgencyComm::_globalLock);
|
READ_LOCKER(readLocker, AgencyComm::_globalLock);
|
||||||
numEndpoints = AgencyComm::_globalEndpoints.size();
|
numEndpoints = AgencyComm::_globalEndpoints.size();
|
||||||
|
@ -1629,8 +1637,11 @@ AgencyCommResult AgencyComm::sendWithFailover(
|
||||||
std::string forceEndpoint;
|
std::string forceEndpoint;
|
||||||
|
|
||||||
AgencyCommResult result;
|
AgencyCommResult result;
|
||||||
|
|
||||||
while (tries++ < numEndpoints) {
|
while (tries++ < numEndpoints) {
|
||||||
|
|
||||||
|
size_t ltries = 0;
|
||||||
|
|
||||||
AgencyEndpoint* agencyEndpoint = popEndpoint(forceEndpoint);
|
AgencyEndpoint* agencyEndpoint = popEndpoint(forceEndpoint);
|
||||||
|
|
||||||
TRI_ASSERT(agencyEndpoint != nullptr);
|
TRI_ASSERT(agencyEndpoint != nullptr);
|
||||||
|
@ -1640,22 +1651,38 @@ AgencyCommResult AgencyComm::sendWithFailover(
|
||||||
<< agencyEndpoint->_endpoint->specification() << " tries: " << tries;
|
<< agencyEndpoint->_endpoint->specification() << " tries: " << tries;
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
while (true) {
|
||||||
result =
|
try {
|
||||||
|
result =
|
||||||
send(agencyEndpoint->_connection, method, timeout, realUrl, body);
|
send(agencyEndpoint->_connection, method, timeout, realUrl, body);
|
||||||
} catch (...) {
|
} catch (...) {
|
||||||
result._connected = false;
|
result._connected = false;
|
||||||
result._statusCode = 0;
|
result._statusCode = 0;
|
||||||
result._message = "could not send request to agency";
|
result._message = "could not send request to agency";
|
||||||
|
|
||||||
|
agencyEndpoint->_connection->disconnect();
|
||||||
|
|
||||||
|
requeueEndpoint(agencyEndpoint, true);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
agencyEndpoint->_connection->disconnect();
|
if (system_clock::now() - start > ltimeout) {
|
||||||
|
LOG_TOPIC(ERR, Logger::AGENCYCOMM) << "Timed out waiting for leader "
|
||||||
|
<< agencyEndpoint->_endpoint->specification() << " tries: " << ltries;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
requeueEndpoint(agencyEndpoint, true);
|
if (result._statusCode !=
|
||||||
break;
|
(int)arangodb::rest::ResponseCode::SERVICE_UNAVAILABLE) {
|
||||||
|
break;
|
||||||
|
} else {
|
||||||
|
LOG_TOPIC(WARN, Logger::AGENCYCOMM) << "Waiting on leader election "
|
||||||
|
<< agencyEndpoint->_endpoint->specification() << " tries: " << ltries;
|
||||||
|
}
|
||||||
|
|
||||||
|
sleep(1);
|
||||||
}
|
}
|
||||||
|
|
||||||
// LOG(WARN) << result._statusCode;
|
|
||||||
|
|
||||||
if (result._statusCode ==
|
if (result._statusCode ==
|
||||||
(int)arangodb::rest::ResponseCode::TEMPORARY_REDIRECT) {
|
(int)arangodb::rest::ResponseCode::TEMPORARY_REDIRECT) {
|
||||||
// sometimes the agency will return a 307 (temporary redirect)
|
// sometimes the agency will return a 307 (temporary redirect)
|
||||||
|
|
Loading…
Reference in New Issue