From 57223ecaba57c30e10c8b1334a3eda59b76c7af9 Mon Sep 17 00:00:00 2001 From: Jan Steemann Date: Tue, 10 Dec 2013 14:33:34 +0100 Subject: [PATCH] failover handling for agency connections --- arangod/Cluster/AgencyComm.cpp | 472 ++++++++++++------------- arangod/Cluster/AgencyComm.h | 59 +++- arangod/Cluster/ApplicationCluster.cpp | 8 +- arangod/Cluster/HeartbeatThread.cpp | 8 +- 4 files changed, 270 insertions(+), 277 deletions(-) diff --git a/arangod/Cluster/AgencyComm.cpp b/arangod/Cluster/AgencyComm.cpp index 855010729e..756bbc8f01 100644 --- a/arangod/Cluster/AgencyComm.cpp +++ b/arangod/Cluster/AgencyComm.cpp @@ -84,7 +84,8 @@ AgencyEndpoint::~AgencyEndpoint () { //////////////////////////////////////////////////////////////////////////////// AgencyCommResult::AgencyCommResult () - : _message(), + : _location(), + _message(), _body(), _index(0), _statusCode(0) { @@ -106,7 +107,7 @@ AgencyCommResult::~AgencyCommResult () { /// if there is no error, an empty string will be returned //////////////////////////////////////////////////////////////////////////////// -std::string AgencyCommResult::getErrorMessage () const { +std::string AgencyCommResult::errorMessage () const { std::string result; TRI_json_t* json = TRI_JsonString(TRI_UNKNOWN_MEM_ZONE, _body.c_str()); @@ -193,25 +194,23 @@ bool AgencyCommResult::processJsonNode (TRI_json_t const* node, // get "value" attribute TRI_json_t const* value = TRI_LookupArrayJson(node, "value"); - if (! TRI_IsStringJson(value)) { - return false; - } + if (TRI_IsStringJson(value)) { + if (! prefix.empty()) { + if (returnIndex) { + // return "modifiedIndex" + TRI_json_t const* modifiedIndex = TRI_LookupArrayJson(node, "modifiedIndex"); - if (! prefix.empty()) { - if (returnIndex) { - // return "modifiedIndex" - TRI_json_t const* modifiedIndex = TRI_LookupArrayJson(node, "modifiedIndex"); - - if (! TRI_IsNumberJson(modifiedIndex)) { - return false; + if (! TRI_IsNumberJson(modifiedIndex)) { + return false; + } + + // convert the number to an integer + out[prefix] = triagens::basics::StringUtils::itoa((uint64_t) modifiedIndex->_value._number); + } + else { + // otherwise return value + out[prefix] = std::string(value->_value._string.data, value->_value._string.length - 1); } - - // convert the number to an integer - out[prefix] = triagens::basics::StringUtils::itoa((uint64_t) modifiedIndex->_value._number); - } - else { - // otherwise return value - out[prefix] = std::string(value->_value._string.data, value->_value._string.length - 1); } } } @@ -471,6 +470,32 @@ bool AgencyComm::removeEndpoint (std::string const& endpointSpecification) { return false; } +//////////////////////////////////////////////////////////////////////////////// +/// @brief checks if an endpoint is present +//////////////////////////////////////////////////////////////////////////////// + +bool AgencyComm::hasEndpoint (std::string const& endpointSpecification) { + { + READ_LOCKER(AgencyComm::_globalLock); + + // check if we have got this endpoint + std::list::iterator it = _globalEndpoints.begin(); + + while (it != _globalEndpoints.end()) { + AgencyEndpoint const* agencyEndpoint = (*it); + + if (agencyEndpoint->_endpoint->getSpecification() == endpointSpecification) { + return true; + } + + ++it; + } + } + + // not found + return false; +} + //////////////////////////////////////////////////////////////////////////////// /// @brief sets the global prefix for all operations //////////////////////////////////////////////////////////////////////////////// @@ -575,43 +600,39 @@ AgencyEndpoint* AgencyComm::createAgencyEndpoint (std::string const& endpointSpe // --SECTION-- public methods // ----------------------------------------------------------------------------- +//////////////////////////////////////////////////////////////////////////////// +/// @brief creates a directory in the backend +//////////////////////////////////////////////////////////////////////////////// + +AgencyCommResult AgencyComm::createDirectory (std::string const& key) { + AgencyCommResult result; + + sendWithFailover(triagens::rest::HttpRequest::HTTP_REQUEST_PUT, + _globalConnectionOptions._requestTimeout, + result, + buildUrl(key) + "?dir=true", + "", + false); + + return result; +} + //////////////////////////////////////////////////////////////////////////////// /// @brief sets a value in the backend //////////////////////////////////////////////////////////////////////////////// -bool AgencyComm::setValue (std::string const& key, - std::string const& value) { +AgencyCommResult AgencyComm::setValue (std::string const& key, + std::string const& value) { AgencyCommResult result; - size_t numEndpoints; - - { - READ_LOCKER(AgencyComm::_globalLock); - numEndpoints = AgencyComm::_globalEndpoints.size(); - assert(numEndpoints > 0); - } - - size_t tries = 0; - - while (tries++ < numEndpoints) { - AgencyEndpoint* agencyEndpoint = popEndpoint(); - send(agencyEndpoint->_connection, - triagens::rest::HttpRequest::HTTP_REQUEST_PUT, - _globalConnectionOptions._requestTimeout, - result, - buildUrl(key), - "value=" + triagens::basics::StringUtils::urlEncode(value)); + sendWithFailover(triagens::rest::HttpRequest::HTTP_REQUEST_PUT, + _globalConnectionOptions._requestTimeout, + result, + buildUrl(key), + "value=" + triagens::basics::StringUtils::urlEncode(value), + false); - if (requeueEndpoint(agencyEndpoint, result.successful())) { - // we're done - return true; - } - - // otherwise, try next - } - - // if we get here, we could not send data to any endpoint successfully - return false; + return result; } //////////////////////////////////////////////////////////////////////////////// @@ -624,36 +645,16 @@ AgencyCommResult AgencyComm::getValues (std::string const& key, if (recursive) { url += "?recursive=true"; } - + AgencyCommResult result; - size_t numEndpoints; - { - READ_LOCKER(AgencyComm::_globalLock); - numEndpoints = AgencyComm::_globalEndpoints.size(); - assert(numEndpoints > 0); - } - - size_t tries = 0; - - while (tries++ < numEndpoints) { - AgencyEndpoint* agencyEndpoint = popEndpoint(); - - send(agencyEndpoint->_connection, - triagens::rest::HttpRequest::HTTP_REQUEST_GET, - _globalConnectionOptions._requestTimeout * 1000.0 * 1000.0, - result, - url); - - if (requeueEndpoint(agencyEndpoint, result.successful())) { - // we're done - break; - } - - // otherwise, try next - } - - // if we get here, we could not send data to any endpoint successfully + sendWithFailover(triagens::rest::HttpRequest::HTTP_REQUEST_GET, + _globalConnectionOptions._requestTimeout, + result, + url, + "", + false); + return result; } @@ -661,43 +662,23 @@ AgencyCommResult AgencyComm::getValues (std::string const& key, /// @brief removes one or multiple values from the backend //////////////////////////////////////////////////////////////////////////////// -bool AgencyComm::removeValues (std::string const& key, - bool recursive) { +AgencyCommResult AgencyComm::removeValues (std::string const& key, + bool recursive) { std::string url(buildUrl(key)); if (recursive) { url += "?recursive=true"; } - + AgencyCommResult result; - size_t numEndpoints; - { - READ_LOCKER(AgencyComm::_globalLock); - numEndpoints = AgencyComm::_globalEndpoints.size(); - assert(numEndpoints > 0); - } + sendWithFailover(triagens::rest::HttpRequest::HTTP_REQUEST_DELETE, + _globalConnectionOptions._requestTimeout, + result, + url, + "", + false); - size_t tries = 0; - - while (tries++ < numEndpoints) { - AgencyEndpoint* agencyEndpoint = popEndpoint(); - - send(agencyEndpoint->_connection, - triagens::rest::HttpRequest::HTTP_REQUEST_DELETE, - _globalConnectionOptions._requestTimeout, - result, - url); - - if (requeueEndpoint(agencyEndpoint, result.successful())) { - // we're done - return true; - } - - // otherwise, try next - } - - // if we get here, we could not send data to any endpoint successfully - return false; + return result; } //////////////////////////////////////////////////////////////////////////////// @@ -705,43 +686,19 @@ bool AgencyComm::removeValues (std::string const& key, /// the CAS condition is whether or not a previous value existed for the key //////////////////////////////////////////////////////////////////////////////// -int AgencyComm::casValue (std::string const& key, - std::string const& value, - bool prevExists) { - +AgencyCommResult AgencyComm::casValue (std::string const& key, + std::string const& value, + bool prevExists) { AgencyCommResult result; - size_t numEndpoints; - - { - READ_LOCKER(AgencyComm::_globalLock); - numEndpoints = AgencyComm::_globalEndpoints.size(); - assert(numEndpoints > 0); - } - - size_t tries = 0; - - while (tries++ < numEndpoints) { - AgencyEndpoint* agencyEndpoint = popEndpoint(); - send(agencyEndpoint->_connection, - triagens::rest::HttpRequest::HTTP_REQUEST_PUT, - _globalConnectionOptions._requestTimeout, - result, - buildUrl(key) + "?prevExists=" + (prevExists ? "true" : "false"), - "value=" + triagens::basics::StringUtils::urlEncode(value)); + sendWithFailover(triagens::rest::HttpRequest::HTTP_REQUEST_PUT, + _globalConnectionOptions._requestTimeout, + result, + buildUrl(key) + "?prevExists=" + (prevExists ? "true" : "false"), + "value=" + triagens::basics::StringUtils::urlEncode(value), + false); - if (requeueEndpoint(agencyEndpoint, result.successful())) { - // we're done - return true; - } - - // otherwise, try next - } - - // if we get here, we could not send data to any endpoint successfully - return false; - - return 0; + return result; } //////////////////////////////////////////////////////////////////////////////// @@ -750,43 +707,19 @@ int AgencyComm::casValue (std::string const& key, /// identical to `oldValue` //////////////////////////////////////////////////////////////////////////////// -int AgencyComm::casValue (std::string const& key, - std::string const& oldValue, - std::string const& newValue) { - +AgencyCommResult AgencyComm::casValue (std::string const& key, + std::string const& oldValue, + std::string const& newValue) { AgencyCommResult result; - size_t numEndpoints; - { - READ_LOCKER(AgencyComm::_globalLock); - numEndpoints = AgencyComm::_globalEndpoints.size(); - assert(numEndpoints > 0); - } + sendWithFailover(triagens::rest::HttpRequest::HTTP_REQUEST_PUT, + _globalConnectionOptions._requestTimeout, + result, + buildUrl(key) + "?prevValue=" + triagens::basics::StringUtils::urlEncode(oldValue), + "value=" + triagens::basics::StringUtils::urlEncode(newValue), + false); - size_t tries = 0; - - while (tries++ < numEndpoints) { - AgencyEndpoint* agencyEndpoint = popEndpoint(); - - send(agencyEndpoint->_connection, - triagens::rest::HttpRequest::HTTP_REQUEST_PUT, - _globalConnectionOptions._requestTimeout, - result, - buildUrl(key) + "?prevValue=" + triagens::basics::StringUtils::urlEncode(oldValue), - "value=" + triagens::basics::StringUtils::urlEncode(newValue)); - - if (requeueEndpoint(agencyEndpoint, result.successful())) { - // we're done - return true; - } - - // otherwise, try next - } - - // if we get here, we could not send data to any endpoint successfully - return false; - - return 0; + return result; } //////////////////////////////////////////////////////////////////////////////// @@ -805,34 +738,14 @@ AgencyCommResult AgencyComm::watchValue (std::string const& key, } AgencyCommResult result; - size_t numEndpoints; + + sendWithFailover(triagens::rest::HttpRequest::HTTP_REQUEST_GET, + timeout == 0.0 ? _globalConnectionOptions._requestTimeout : timeout, + result, + url, + "", + true); - { - READ_LOCKER(AgencyComm::_globalLock); - numEndpoints = AgencyComm::_globalEndpoints.size(); - assert(numEndpoints > 0); - } - - size_t tries = 0; - - while (tries++ < numEndpoints) { - AgencyEndpoint* agencyEndpoint = popEndpoint(); - - send(agencyEndpoint->_connection, - triagens::rest::HttpRequest::HTTP_REQUEST_GET, - timeout == 0.0 ? _globalConnectionOptions._requestTimeout : timeout, - result, - url); - - if (requeueEndpoint(agencyEndpoint, result.successful())) { - // we're done - break; - } - - // otherwise, try next - } - - // if we get here, we could not send data to any endpoint successfully return result; } @@ -921,26 +834,104 @@ bool AgencyComm::requeueEndpoint (AgencyEndpoint* agencyEndpoint, std::string AgencyComm::buildUrl (std::string const& relativePart) const { return AgencyComm::AGENCY_URL_PREFIX + _globalPrefix + relativePart; } + +//////////////////////////////////////////////////////////////////////////////// +/// @brief sends an HTTP request to the agency, handling failover +//////////////////////////////////////////////////////////////////////////////// -//////////////////////////////////////////////////////////////////////////////// -/// @brief sends data to the URL w/o body -//////////////////////////////////////////////////////////////////////////////// - -bool AgencyComm::send (triagens::httpclient::GeneralClientConnection* connection, - triagens::rest::HttpRequest::HttpRequestType method, - double timeout, - AgencyCommResult& result, - std::string const& url) { - // only these methods can be called without a body - assert(method == triagens::rest::HttpRequest::HTTP_REQUEST_DELETE || - method == triagens::rest::HttpRequest::HTTP_REQUEST_GET || - method == triagens::rest::HttpRequest::HTTP_REQUEST_HEAD); +bool AgencyComm::sendWithFailover (triagens::rest::HttpRequest::HttpRequestType method, + const double timeout, + AgencyCommResult& result, + std::string const& url, + std::string const& body, + bool isWatch) { + size_t numEndpoints; - return send(connection, method, timeout, result, url, ""); -} + { + READ_LOCKER(AgencyComm::_globalLock); + numEndpoints = AgencyComm::_globalEndpoints.size(); + assert(numEndpoints > 0); + } + + size_t tries = 0; + std::string realUrl = url; + + while (tries++ < numEndpoints) { + AgencyEndpoint* agencyEndpoint = popEndpoint(); + + send(agencyEndpoint->_connection, + method, + timeout, + result, + realUrl, + body); + + if (result._statusCode == 307) { + // sometimes the agency will return a 307 (temporary redirect) + // in this case we have to pick it up and use the new location returned + + // put the current connection to the end of the list + requeueEndpoint(agencyEndpoint, false); + + // a 307 does not count as a success + assert(! result.successful()); + + std::string endpoint; + + // transform location into an endpoint + if (result.location().substr(0, 7) == "http://") { + endpoint = "tcp://" + result.location().substr(7); + } + else if (result.location().substr(0, 8) == "https://") { + endpoint = "ssl://" + result.location().substr(8); + } + else { + // invalid endpoint, return an error + return false; + } + + const size_t delim = endpoint.find('/', 6); + + if (delim == std::string::npos) { + // invalid location header + return false; + } + + realUrl = endpoint.substr(delim); + endpoint = endpoint.substr(0, delim); + + LOG_WARNING("handling failover from '%s' to '%s'", + agencyEndpoint->_endpoint->getSpecification().c_str(), + endpoint.c_str()); + + if (! AgencyComm::hasEndpoint(endpoint)) { + // redirection to an unknown endpoint + LOG_ERROR("found redirection to unknown endpoint '%s'. Will not follow!", + endpoint.c_str()); + return false; + } + + // if we get here, we'll just use the next endpoint from the list + continue; + } + + // watches might time out, this still counts as a success + const bool wasSuccessful = result.successful() || (isWatch && result._statusCode == 0); + + if (requeueEndpoint(agencyEndpoint, wasSuccessful)) { + // we're done + return true; + } + + // otherwise, try next + } + + // if we get here, we could not send data to any endpoint successfully + return false; +} //////////////////////////////////////////////////////////////////////////////// -/// @brief sends data to the URL w/ body +/// @brief sends data to the URL //////////////////////////////////////////////////////////////////////////////// bool AgencyComm::send (triagens::httpclient::GeneralClientConnection* connection, @@ -952,6 +943,14 @@ bool AgencyComm::send (triagens::httpclient::GeneralClientConnection* connection assert(connection != 0); + if (method == triagens::rest::HttpRequest::HTTP_REQUEST_GET || + method == triagens::rest::HttpRequest::HTTP_REQUEST_HEAD || + method == triagens::rest::HttpRequest::HTTP_REQUEST_DELETE) { + assert(body.empty()); + } + + assert(! url.empty()); + result._statusCode = 0; LOG_TRACE("sending %s request to agency at endpoint '%s', url '%s': %s", @@ -989,45 +988,20 @@ bool AgencyComm::send (triagens::httpclient::GeneralClientConnection* connection delete response; return false; } -/* - if (response->getHttpReturnCode() == 307) { - std::cout << "GOT a 307\n\n"; - // sometimes the agency will return a 307 (temporary redirect) - // in this case we have to pick it up and use the new location returned + if (response->getHttpReturnCode() == 307) { + // temporary redirect. now save location header + bool found = false; - std::string location = response->getHeaderField("location", found); - std::cout << "LOCATION: " << location << "\n\n"; + result._location = response->getHeaderField("location", found); if (! found) { - // 307 without a "location" header is just rubbish + // a 307 without a location header does not make any sense delete response; return false; } - - // transform location into an endpoint - if (location.substr(0, 7) == "http://") { - location = "tcp://" + location.substr(7); - } - else if (location.substr(0, 8) == "https://") { - location = "ssl://" + location.substr(7); - } - std::cout << "NEW LOCATION: " << location << "\n\n"; - - const size_t delim = location.find('/', 6); - if (delim == std::string::npos) { - // invalid location header - delete response; - return false; - } - - std::string endpoint = location.substr(0, delim); - std::string newUrl = location.substr(delim); - - std::cout << "NEW ENDPOINT: " << endpoint << "\n\n"; - std::cout << "NEW URL: " << newUrl << "\n\n"; } -*/ + result._message = response->getHttpReturnMessage(); result._body = response->getBody().str(); result._index = 0; diff --git a/arangod/Cluster/AgencyComm.h b/arangod/Cluster/AgencyComm.h index c502ccce99..a2798b8807 100644 --- a/arangod/Cluster/AgencyComm.h +++ b/arangod/Cluster/AgencyComm.h @@ -145,7 +145,15 @@ namespace triagens { /// if there is no error, an empty string will be returned //////////////////////////////////////////////////////////////////////////////// - std::string getErrorMessage () const; + std::string errorMessage () const; + +//////////////////////////////////////////////////////////////////////////////// +/// @brief return the location header (might be empty) +//////////////////////////////////////////////////////////////////////////////// + + const std::string location () const { + return _location; + } //////////////////////////////////////////////////////////////////////////////// /// @brief recursively flatten the JSON response into a map @@ -170,6 +178,7 @@ namespace triagens { public: + std::string _location; std::string _message; std::string _body; uint64_t _index; @@ -234,13 +243,18 @@ namespace triagens { //////////////////////////////////////////////////////////////////////////////// static bool removeEndpoint (std::string const&); +//////////////////////////////////////////////////////////////////////////////// +/// @brief checks if an endpoint is present +//////////////////////////////////////////////////////////////////////////////// + + static bool hasEndpoint (std::string const&); //////////////////////////////////////////////////////////////////////////////// /// @brief get a stringified version of the endpoints //////////////////////////////////////////////////////////////////////////////// static const std::string getEndpointsString (); - + //////////////////////////////////////////////////////////////////////////////// /// @brief sets the global prefix for all operations //////////////////////////////////////////////////////////////////////////////// @@ -267,12 +281,18 @@ namespace triagens { // --SECTION-- public methods // ----------------------------------------------------------------------------- +//////////////////////////////////////////////////////////////////////////////// +/// @brief creates a directory in the backend +//////////////////////////////////////////////////////////////////////////////// + + AgencyCommResult createDirectory (std::string const&); + //////////////////////////////////////////////////////////////////////////////// /// @brief sets a value in the back end //////////////////////////////////////////////////////////////////////////////// - bool setValue (std::string const&, - std::string const&); + AgencyCommResult setValue (std::string const&, + std::string const&); //////////////////////////////////////////////////////////////////////////////// /// @brief gets one or multiple values from the back end @@ -285,17 +305,17 @@ namespace triagens { /// @brief removes one or multiple values from the back end //////////////////////////////////////////////////////////////////////////////// - bool removeValues (std::string const&, - bool); + AgencyCommResult removeValues (std::string const&, + bool); //////////////////////////////////////////////////////////////////////////////// /// @brief compares and swaps a single value in the backend /// the CAS condition is whether or not a previous value existed for the key //////////////////////////////////////////////////////////////////////////////// - int casValue (std::string const&, - std::string const&, - bool); + AgencyCommResult casValue (std::string const&, + std::string const&, + bool); //////////////////////////////////////////////////////////////////////////////// /// @brief compares and swaps a single value in the back end @@ -303,9 +323,9 @@ namespace triagens { /// identical to `oldValue` //////////////////////////////////////////////////////////////////////////////// - int casValue (std::string const&, - std::string const&, - std::string const&); + AgencyCommResult casValue (std::string const&, + std::string const&, + std::string const&); //////////////////////////////////////////////////////////////////////////////// /// @brief blocks on a change of a single value in the back end @@ -341,17 +361,18 @@ namespace triagens { std::string buildUrl (std::string const&) const; //////////////////////////////////////////////////////////////////////////////// -/// @brief sends data to the URL w/o body +/// @brief sends an HTTP request to the agency, handling failover //////////////////////////////////////////////////////////////////////////////// - bool send (triagens::httpclient::GeneralClientConnection*, - triagens::rest::HttpRequest::HttpRequestType, - double, - AgencyCommResult&, - std::string const&); + bool sendWithFailover (triagens::rest::HttpRequest::HttpRequestType, + double, + AgencyCommResult&, + std::string const&, + std::string const&, + bool); //////////////////////////////////////////////////////////////////////////////// -/// @brief sends data to the URL w/ body +/// @brief sends data to the URL //////////////////////////////////////////////////////////////////////////////// bool send (triagens::httpclient::GeneralClientConnection*, diff --git a/arangod/Cluster/ApplicationCluster.cpp b/arangod/Cluster/ApplicationCluster.cpp index be06ff1662..7441d485ca 100644 --- a/arangod/Cluster/ApplicationCluster.cpp +++ b/arangod/Cluster/ApplicationCluster.cpp @@ -223,7 +223,7 @@ bool ApplicationCluster::start () { } if (! _heartbeat->init() || ! _heartbeat->start()) { - LOG_FATAL_AND_EXIT("could not connect to agency endpoints (%s)", + LOG_FATAL_AND_EXIT("heartbeat could not connect to agency endpoints (%s)", endpoints.c_str()); } @@ -309,7 +309,7 @@ ServerState::RoleEnum ApplicationCluster::checkCoordinatorsList () const { "got status code %d, message: %s", endpoints.c_str(), result._statusCode, - result.getErrorMessage().c_str()); + result.errorMessage().c_str()); } std::map out; @@ -345,9 +345,9 @@ ServerState::RoleEnum ApplicationCluster::checkServersList () const { "got status code %d, message: %s", endpoints.c_str(), result._statusCode, - result.getErrorMessage().c_str()); + result.errorMessage().c_str()); } - + std::map out; if (! result.flattenJson(out, "TmpConfig/DBServers/", false)) { LOG_FATAL_AND_EXIT("Got an invalid JSON response for TmpConfig/DBServers"); diff --git a/arangod/Cluster/HeartbeatThread.cpp b/arangod/Cluster/HeartbeatThread.cpp index 88fde8b495..d7d5cb0cf9 100644 --- a/arangod/Cluster/HeartbeatThread.cpp +++ b/arangod/Cluster/HeartbeatThread.cpp @@ -115,13 +115,11 @@ void HeartbeatThread::run () { if (result.successful()) { // value has changed! - handleStateChange(result, lastCommandIndex); // sleep a while CONDITION_LOCKER(guard, _condition); guard.wait(_interval); - } else { // value did not change, but we already blocked waiting for a change... @@ -245,9 +243,9 @@ bool HeartbeatThread::sendState () { // return value is intentionally not handled // if sending the current state fails, we'll just try again in the next iteration - bool result = _agency.setValue("State/ServerStates/" + _myId, value); + AgencyCommResult result(_agency.setValue("State/ServerStates/" + _myId, value)); - if (result) { + if (result.successful()) { _numFails = 0; } else { @@ -258,7 +256,7 @@ bool HeartbeatThread::sendState () { } } - return result; + return result.successful(); } // Local Variables: