1
0
Fork 0

failover handling for agency connections

This commit is contained in:
Jan Steemann 2013-12-10 14:33:34 +01:00
parent dd5a3a9e60
commit 57223ecaba
4 changed files with 270 additions and 277 deletions

View File

@ -84,7 +84,8 @@ AgencyEndpoint::~AgencyEndpoint () {
////////////////////////////////////////////////////////////////////////////////
AgencyCommResult::AgencyCommResult ()
: _message(),
: _location(),
_message(),
_body(),
_index(0),
_statusCode(0) {
@ -106,7 +107,7 @@ AgencyCommResult::~AgencyCommResult () {
/// if there is no error, an empty string will be returned
////////////////////////////////////////////////////////////////////////////////
std::string AgencyCommResult::getErrorMessage () const {
std::string AgencyCommResult::errorMessage () const {
std::string result;
TRI_json_t* json = TRI_JsonString(TRI_UNKNOWN_MEM_ZONE, _body.c_str());
@ -193,25 +194,23 @@ bool AgencyCommResult::processJsonNode (TRI_json_t const* node,
// get "value" attribute
TRI_json_t const* value = TRI_LookupArrayJson(node, "value");
if (! TRI_IsStringJson(value)) {
return false;
}
if (TRI_IsStringJson(value)) {
if (! prefix.empty()) {
if (returnIndex) {
// return "modifiedIndex"
TRI_json_t const* modifiedIndex = TRI_LookupArrayJson(node, "modifiedIndex");
if (! prefix.empty()) {
if (returnIndex) {
// return "modifiedIndex"
TRI_json_t const* modifiedIndex = TRI_LookupArrayJson(node, "modifiedIndex");
if (! TRI_IsNumberJson(modifiedIndex)) {
return false;
if (! TRI_IsNumberJson(modifiedIndex)) {
return false;
}
// convert the number to an integer
out[prefix] = triagens::basics::StringUtils::itoa((uint64_t) modifiedIndex->_value._number);
}
else {
// otherwise return value
out[prefix] = std::string(value->_value._string.data, value->_value._string.length - 1);
}
// convert the number to an integer
out[prefix] = triagens::basics::StringUtils::itoa((uint64_t) modifiedIndex->_value._number);
}
else {
// otherwise return value
out[prefix] = std::string(value->_value._string.data, value->_value._string.length - 1);
}
}
}
@ -471,6 +470,32 @@ bool AgencyComm::removeEndpoint (std::string const& endpointSpecification) {
return false;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief checks if an endpoint is present
////////////////////////////////////////////////////////////////////////////////
bool AgencyComm::hasEndpoint (std::string const& endpointSpecification) {
{
READ_LOCKER(AgencyComm::_globalLock);
// check if we have got this endpoint
std::list<AgencyEndpoint*>::iterator it = _globalEndpoints.begin();
while (it != _globalEndpoints.end()) {
AgencyEndpoint const* agencyEndpoint = (*it);
if (agencyEndpoint->_endpoint->getSpecification() == endpointSpecification) {
return true;
}
++it;
}
}
// not found
return false;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief sets the global prefix for all operations
////////////////////////////////////////////////////////////////////////////////
@ -575,43 +600,39 @@ AgencyEndpoint* AgencyComm::createAgencyEndpoint (std::string const& endpointSpe
// --SECTION-- public methods
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @brief creates a directory in the backend
////////////////////////////////////////////////////////////////////////////////
AgencyCommResult AgencyComm::createDirectory (std::string const& key) {
AgencyCommResult result;
sendWithFailover(triagens::rest::HttpRequest::HTTP_REQUEST_PUT,
_globalConnectionOptions._requestTimeout,
result,
buildUrl(key) + "?dir=true",
"",
false);
return result;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief sets a value in the backend
////////////////////////////////////////////////////////////////////////////////
bool AgencyComm::setValue (std::string const& key,
std::string const& value) {
AgencyCommResult AgencyComm::setValue (std::string const& key,
std::string const& value) {
AgencyCommResult result;
size_t numEndpoints;
{
READ_LOCKER(AgencyComm::_globalLock);
numEndpoints = AgencyComm::_globalEndpoints.size();
assert(numEndpoints > 0);
}
size_t tries = 0;
while (tries++ < numEndpoints) {
AgencyEndpoint* agencyEndpoint = popEndpoint();
send(agencyEndpoint->_connection,
triagens::rest::HttpRequest::HTTP_REQUEST_PUT,
_globalConnectionOptions._requestTimeout,
result,
buildUrl(key),
"value=" + triagens::basics::StringUtils::urlEncode(value));
sendWithFailover(triagens::rest::HttpRequest::HTTP_REQUEST_PUT,
_globalConnectionOptions._requestTimeout,
result,
buildUrl(key),
"value=" + triagens::basics::StringUtils::urlEncode(value),
false);
if (requeueEndpoint(agencyEndpoint, result.successful())) {
// we're done
return true;
}
// otherwise, try next
}
// if we get here, we could not send data to any endpoint successfully
return false;
return result;
}
////////////////////////////////////////////////////////////////////////////////
@ -624,36 +645,16 @@ AgencyCommResult AgencyComm::getValues (std::string const& key,
if (recursive) {
url += "?recursive=true";
}
AgencyCommResult result;
size_t numEndpoints;
{
READ_LOCKER(AgencyComm::_globalLock);
numEndpoints = AgencyComm::_globalEndpoints.size();
assert(numEndpoints > 0);
}
size_t tries = 0;
while (tries++ < numEndpoints) {
AgencyEndpoint* agencyEndpoint = popEndpoint();
send(agencyEndpoint->_connection,
triagens::rest::HttpRequest::HTTP_REQUEST_GET,
_globalConnectionOptions._requestTimeout * 1000.0 * 1000.0,
result,
url);
if (requeueEndpoint(agencyEndpoint, result.successful())) {
// we're done
break;
}
// otherwise, try next
}
// if we get here, we could not send data to any endpoint successfully
sendWithFailover(triagens::rest::HttpRequest::HTTP_REQUEST_GET,
_globalConnectionOptions._requestTimeout,
result,
url,
"",
false);
return result;
}
@ -661,43 +662,23 @@ AgencyCommResult AgencyComm::getValues (std::string const& key,
/// @brief removes one or multiple values from the backend
////////////////////////////////////////////////////////////////////////////////
bool AgencyComm::removeValues (std::string const& key,
bool recursive) {
AgencyCommResult AgencyComm::removeValues (std::string const& key,
bool recursive) {
std::string url(buildUrl(key));
if (recursive) {
url += "?recursive=true";
}
AgencyCommResult result;
size_t numEndpoints;
{
READ_LOCKER(AgencyComm::_globalLock);
numEndpoints = AgencyComm::_globalEndpoints.size();
assert(numEndpoints > 0);
}
sendWithFailover(triagens::rest::HttpRequest::HTTP_REQUEST_DELETE,
_globalConnectionOptions._requestTimeout,
result,
url,
"",
false);
size_t tries = 0;
while (tries++ < numEndpoints) {
AgencyEndpoint* agencyEndpoint = popEndpoint();
send(agencyEndpoint->_connection,
triagens::rest::HttpRequest::HTTP_REQUEST_DELETE,
_globalConnectionOptions._requestTimeout,
result,
url);
if (requeueEndpoint(agencyEndpoint, result.successful())) {
// we're done
return true;
}
// otherwise, try next
}
// if we get here, we could not send data to any endpoint successfully
return false;
return result;
}
////////////////////////////////////////////////////////////////////////////////
@ -705,43 +686,19 @@ bool AgencyComm::removeValues (std::string const& key,
/// the CAS condition is whether or not a previous value existed for the key
////////////////////////////////////////////////////////////////////////////////
int AgencyComm::casValue (std::string const& key,
std::string const& value,
bool prevExists) {
AgencyCommResult AgencyComm::casValue (std::string const& key,
std::string const& value,
bool prevExists) {
AgencyCommResult result;
size_t numEndpoints;
{
READ_LOCKER(AgencyComm::_globalLock);
numEndpoints = AgencyComm::_globalEndpoints.size();
assert(numEndpoints > 0);
}
size_t tries = 0;
while (tries++ < numEndpoints) {
AgencyEndpoint* agencyEndpoint = popEndpoint();
send(agencyEndpoint->_connection,
triagens::rest::HttpRequest::HTTP_REQUEST_PUT,
_globalConnectionOptions._requestTimeout,
result,
buildUrl(key) + "?prevExists=" + (prevExists ? "true" : "false"),
"value=" + triagens::basics::StringUtils::urlEncode(value));
sendWithFailover(triagens::rest::HttpRequest::HTTP_REQUEST_PUT,
_globalConnectionOptions._requestTimeout,
result,
buildUrl(key) + "?prevExists=" + (prevExists ? "true" : "false"),
"value=" + triagens::basics::StringUtils::urlEncode(value),
false);
if (requeueEndpoint(agencyEndpoint, result.successful())) {
// we're done
return true;
}
// otherwise, try next
}
// if we get here, we could not send data to any endpoint successfully
return false;
return 0;
return result;
}
////////////////////////////////////////////////////////////////////////////////
@ -750,43 +707,19 @@ int AgencyComm::casValue (std::string const& key,
/// identical to `oldValue`
////////////////////////////////////////////////////////////////////////////////
int AgencyComm::casValue (std::string const& key,
std::string const& oldValue,
std::string const& newValue) {
AgencyCommResult AgencyComm::casValue (std::string const& key,
std::string const& oldValue,
std::string const& newValue) {
AgencyCommResult result;
size_t numEndpoints;
{
READ_LOCKER(AgencyComm::_globalLock);
numEndpoints = AgencyComm::_globalEndpoints.size();
assert(numEndpoints > 0);
}
sendWithFailover(triagens::rest::HttpRequest::HTTP_REQUEST_PUT,
_globalConnectionOptions._requestTimeout,
result,
buildUrl(key) + "?prevValue=" + triagens::basics::StringUtils::urlEncode(oldValue),
"value=" + triagens::basics::StringUtils::urlEncode(newValue),
false);
size_t tries = 0;
while (tries++ < numEndpoints) {
AgencyEndpoint* agencyEndpoint = popEndpoint();
send(agencyEndpoint->_connection,
triagens::rest::HttpRequest::HTTP_REQUEST_PUT,
_globalConnectionOptions._requestTimeout,
result,
buildUrl(key) + "?prevValue=" + triagens::basics::StringUtils::urlEncode(oldValue),
"value=" + triagens::basics::StringUtils::urlEncode(newValue));
if (requeueEndpoint(agencyEndpoint, result.successful())) {
// we're done
return true;
}
// otherwise, try next
}
// if we get here, we could not send data to any endpoint successfully
return false;
return 0;
return result;
}
////////////////////////////////////////////////////////////////////////////////
@ -805,34 +738,14 @@ AgencyCommResult AgencyComm::watchValue (std::string const& key,
}
AgencyCommResult result;
size_t numEndpoints;
sendWithFailover(triagens::rest::HttpRequest::HTTP_REQUEST_GET,
timeout == 0.0 ? _globalConnectionOptions._requestTimeout : timeout,
result,
url,
"",
true);
{
READ_LOCKER(AgencyComm::_globalLock);
numEndpoints = AgencyComm::_globalEndpoints.size();
assert(numEndpoints > 0);
}
size_t tries = 0;
while (tries++ < numEndpoints) {
AgencyEndpoint* agencyEndpoint = popEndpoint();
send(agencyEndpoint->_connection,
triagens::rest::HttpRequest::HTTP_REQUEST_GET,
timeout == 0.0 ? _globalConnectionOptions._requestTimeout : timeout,
result,
url);
if (requeueEndpoint(agencyEndpoint, result.successful())) {
// we're done
break;
}
// otherwise, try next
}
// if we get here, we could not send data to any endpoint successfully
return result;
}
@ -921,26 +834,104 @@ bool AgencyComm::requeueEndpoint (AgencyEndpoint* agencyEndpoint,
std::string AgencyComm::buildUrl (std::string const& relativePart) const {
return AgencyComm::AGENCY_URL_PREFIX + _globalPrefix + relativePart;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief sends an HTTP request to the agency, handling failover
////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////
/// @brief sends data to the URL w/o body
////////////////////////////////////////////////////////////////////////////////
bool AgencyComm::send (triagens::httpclient::GeneralClientConnection* connection,
triagens::rest::HttpRequest::HttpRequestType method,
double timeout,
AgencyCommResult& result,
std::string const& url) {
// only these methods can be called without a body
assert(method == triagens::rest::HttpRequest::HTTP_REQUEST_DELETE ||
method == triagens::rest::HttpRequest::HTTP_REQUEST_GET ||
method == triagens::rest::HttpRequest::HTTP_REQUEST_HEAD);
bool AgencyComm::sendWithFailover (triagens::rest::HttpRequest::HttpRequestType method,
const double timeout,
AgencyCommResult& result,
std::string const& url,
std::string const& body,
bool isWatch) {
size_t numEndpoints;
return send(connection, method, timeout, result, url, "");
}
{
READ_LOCKER(AgencyComm::_globalLock);
numEndpoints = AgencyComm::_globalEndpoints.size();
assert(numEndpoints > 0);
}
size_t tries = 0;
std::string realUrl = url;
while (tries++ < numEndpoints) {
AgencyEndpoint* agencyEndpoint = popEndpoint();
send(agencyEndpoint->_connection,
method,
timeout,
result,
realUrl,
body);
if (result._statusCode == 307) {
// sometimes the agency will return a 307 (temporary redirect)
// in this case we have to pick it up and use the new location returned
// put the current connection to the end of the list
requeueEndpoint(agencyEndpoint, false);
// a 307 does not count as a success
assert(! result.successful());
std::string endpoint;
// transform location into an endpoint
if (result.location().substr(0, 7) == "http://") {
endpoint = "tcp://" + result.location().substr(7);
}
else if (result.location().substr(0, 8) == "https://") {
endpoint = "ssl://" + result.location().substr(8);
}
else {
// invalid endpoint, return an error
return false;
}
const size_t delim = endpoint.find('/', 6);
if (delim == std::string::npos) {
// invalid location header
return false;
}
realUrl = endpoint.substr(delim);
endpoint = endpoint.substr(0, delim);
LOG_WARNING("handling failover from '%s' to '%s'",
agencyEndpoint->_endpoint->getSpecification().c_str(),
endpoint.c_str());
if (! AgencyComm::hasEndpoint(endpoint)) {
// redirection to an unknown endpoint
LOG_ERROR("found redirection to unknown endpoint '%s'. Will not follow!",
endpoint.c_str());
return false;
}
// if we get here, we'll just use the next endpoint from the list
continue;
}
// watches might time out, this still counts as a success
const bool wasSuccessful = result.successful() || (isWatch && result._statusCode == 0);
if (requeueEndpoint(agencyEndpoint, wasSuccessful)) {
// we're done
return true;
}
// otherwise, try next
}
// if we get here, we could not send data to any endpoint successfully
return false;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief sends data to the URL w/ body
/// @brief sends data to the URL
////////////////////////////////////////////////////////////////////////////////
bool AgencyComm::send (triagens::httpclient::GeneralClientConnection* connection,
@ -952,6 +943,14 @@ bool AgencyComm::send (triagens::httpclient::GeneralClientConnection* connection
assert(connection != 0);
if (method == triagens::rest::HttpRequest::HTTP_REQUEST_GET ||
method == triagens::rest::HttpRequest::HTTP_REQUEST_HEAD ||
method == triagens::rest::HttpRequest::HTTP_REQUEST_DELETE) {
assert(body.empty());
}
assert(! url.empty());
result._statusCode = 0;
LOG_TRACE("sending %s request to agency at endpoint '%s', url '%s': %s",
@ -989,45 +988,20 @@ bool AgencyComm::send (triagens::httpclient::GeneralClientConnection* connection
delete response;
return false;
}
/*
if (response->getHttpReturnCode() == 307) {
std::cout << "GOT a 307\n\n";
// sometimes the agency will return a 307 (temporary redirect)
// in this case we have to pick it up and use the new location returned
if (response->getHttpReturnCode() == 307) {
// temporary redirect. now save location header
bool found = false;
std::string location = response->getHeaderField("location", found);
std::cout << "LOCATION: " << location << "\n\n";
result._location = response->getHeaderField("location", found);
if (! found) {
// 307 without a "location" header is just rubbish
// a 307 without a location header does not make any sense
delete response;
return false;
}
// transform location into an endpoint
if (location.substr(0, 7) == "http://") {
location = "tcp://" + location.substr(7);
}
else if (location.substr(0, 8) == "https://") {
location = "ssl://" + location.substr(7);
}
std::cout << "NEW LOCATION: " << location << "\n\n";
const size_t delim = location.find('/', 6);
if (delim == std::string::npos) {
// invalid location header
delete response;
return false;
}
std::string endpoint = location.substr(0, delim);
std::string newUrl = location.substr(delim);
std::cout << "NEW ENDPOINT: " << endpoint << "\n\n";
std::cout << "NEW URL: " << newUrl << "\n\n";
}
*/
result._message = response->getHttpReturnMessage();
result._body = response->getBody().str();
result._index = 0;

View File

@ -145,7 +145,15 @@ namespace triagens {
/// if there is no error, an empty string will be returned
////////////////////////////////////////////////////////////////////////////////
std::string getErrorMessage () const;
std::string errorMessage () const;
////////////////////////////////////////////////////////////////////////////////
/// @brief return the location header (might be empty)
////////////////////////////////////////////////////////////////////////////////
const std::string location () const {
return _location;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief recursively flatten the JSON response into a map
@ -170,6 +178,7 @@ namespace triagens {
public:
std::string _location;
std::string _message;
std::string _body;
uint64_t _index;
@ -234,13 +243,18 @@ namespace triagens {
////////////////////////////////////////////////////////////////////////////////
static bool removeEndpoint (std::string const&);
////////////////////////////////////////////////////////////////////////////////
/// @brief checks if an endpoint is present
////////////////////////////////////////////////////////////////////////////////
static bool hasEndpoint (std::string const&);
////////////////////////////////////////////////////////////////////////////////
/// @brief get a stringified version of the endpoints
////////////////////////////////////////////////////////////////////////////////
static const std::string getEndpointsString ();
////////////////////////////////////////////////////////////////////////////////
/// @brief sets the global prefix for all operations
////////////////////////////////////////////////////////////////////////////////
@ -267,12 +281,18 @@ namespace triagens {
// --SECTION-- public methods
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @brief creates a directory in the backend
////////////////////////////////////////////////////////////////////////////////
AgencyCommResult createDirectory (std::string const&);
////////////////////////////////////////////////////////////////////////////////
/// @brief sets a value in the back end
////////////////////////////////////////////////////////////////////////////////
bool setValue (std::string const&,
std::string const&);
AgencyCommResult setValue (std::string const&,
std::string const&);
////////////////////////////////////////////////////////////////////////////////
/// @brief gets one or multiple values from the back end
@ -285,17 +305,17 @@ namespace triagens {
/// @brief removes one or multiple values from the back end
////////////////////////////////////////////////////////////////////////////////
bool removeValues (std::string const&,
bool);
AgencyCommResult removeValues (std::string const&,
bool);
////////////////////////////////////////////////////////////////////////////////
/// @brief compares and swaps a single value in the backend
/// the CAS condition is whether or not a previous value existed for the key
////////////////////////////////////////////////////////////////////////////////
int casValue (std::string const&,
std::string const&,
bool);
AgencyCommResult casValue (std::string const&,
std::string const&,
bool);
////////////////////////////////////////////////////////////////////////////////
/// @brief compares and swaps a single value in the back end
@ -303,9 +323,9 @@ namespace triagens {
/// identical to `oldValue`
////////////////////////////////////////////////////////////////////////////////
int casValue (std::string const&,
std::string const&,
std::string const&);
AgencyCommResult casValue (std::string const&,
std::string const&,
std::string const&);
////////////////////////////////////////////////////////////////////////////////
/// @brief blocks on a change of a single value in the back end
@ -341,17 +361,18 @@ namespace triagens {
std::string buildUrl (std::string const&) const;
////////////////////////////////////////////////////////////////////////////////
/// @brief sends data to the URL w/o body
/// @brief sends an HTTP request to the agency, handling failover
////////////////////////////////////////////////////////////////////////////////
bool send (triagens::httpclient::GeneralClientConnection*,
triagens::rest::HttpRequest::HttpRequestType,
double,
AgencyCommResult&,
std::string const&);
bool sendWithFailover (triagens::rest::HttpRequest::HttpRequestType,
double,
AgencyCommResult&,
std::string const&,
std::string const&,
bool);
////////////////////////////////////////////////////////////////////////////////
/// @brief sends data to the URL w/ body
/// @brief sends data to the URL
////////////////////////////////////////////////////////////////////////////////
bool send (triagens::httpclient::GeneralClientConnection*,

View File

@ -223,7 +223,7 @@ bool ApplicationCluster::start () {
}
if (! _heartbeat->init() || ! _heartbeat->start()) {
LOG_FATAL_AND_EXIT("could not connect to agency endpoints (%s)",
LOG_FATAL_AND_EXIT("heartbeat could not connect to agency endpoints (%s)",
endpoints.c_str());
}
@ -309,7 +309,7 @@ ServerState::RoleEnum ApplicationCluster::checkCoordinatorsList () const {
"got status code %d, message: %s",
endpoints.c_str(),
result._statusCode,
result.getErrorMessage().c_str());
result.errorMessage().c_str());
}
std::map<std::string, std::string> out;
@ -345,9 +345,9 @@ ServerState::RoleEnum ApplicationCluster::checkServersList () const {
"got status code %d, message: %s",
endpoints.c_str(),
result._statusCode,
result.getErrorMessage().c_str());
result.errorMessage().c_str());
}
std::map<std::string, std::string> out;
if (! result.flattenJson(out, "TmpConfig/DBServers/", false)) {
LOG_FATAL_AND_EXIT("Got an invalid JSON response for TmpConfig/DBServers");

View File

@ -115,13 +115,11 @@ void HeartbeatThread::run () {
if (result.successful()) {
// value has changed!
handleStateChange(result, lastCommandIndex);
// sleep a while
CONDITION_LOCKER(guard, _condition);
guard.wait(_interval);
}
else {
// value did not change, but we already blocked waiting for a change...
@ -245,9 +243,9 @@ bool HeartbeatThread::sendState () {
// return value is intentionally not handled
// if sending the current state fails, we'll just try again in the next iteration
bool result = _agency.setValue("State/ServerStates/" + _myId, value);
AgencyCommResult result(_agency.setValue("State/ServerStates/" + _myId, value));
if (result) {
if (result.successful()) {
_numFails = 0;
}
else {
@ -258,7 +256,7 @@ bool HeartbeatThread::sendState () {
}
}
return result;
return result.successful();
}
// Local Variables: