diff --git a/arangod/Cluster/AgencyComm.cpp b/arangod/Cluster/AgencyComm.cpp index 7eff7a992a..7b8a8ec44e 100644 --- a/arangod/Cluster/AgencyComm.cpp +++ b/arangod/Cluster/AgencyComm.cpp @@ -69,12 +69,8 @@ AgencyEndpoint::AgencyEndpoint (triagens::rest::Endpoint* endpoint, //////////////////////////////////////////////////////////////////////////////// AgencyEndpoint::~AgencyEndpoint () { - if (_connection != nullptr) { - delete _connection; - } - if (_endpoint != nullptr) { - delete _endpoint; - } + delete _connection; + delete _endpoint; } // ----------------------------------------------------------------------------- @@ -142,24 +138,19 @@ int AgencyCommResult::httpCode () const { int AgencyCommResult::errorCode () const { int result = 0; - TRI_json_t* json = TRI_JsonString(TRI_UNKNOWN_MEM_ZONE, _body.c_str()); + std::unique_ptr json(TRI_JsonString(TRI_UNKNOWN_MEM_ZONE, _body.c_str())); - if (! TRI_IsObjectJson(json)) { - if (json != nullptr) { - TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json); - } + if (! TRI_IsObjectJson(json.get())) { return result; } // get "errorCode" attribute - TRI_json_t const* errorCode = TRI_LookupObjectJson(json, "errorCode"); + TRI_json_t const* errorCode = TRI_LookupObjectJson(json.get(), "errorCode"); if (TRI_IsNumberJson(errorCode)) { result = (int) errorCode->_value._number; } - TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json); - return result; } @@ -180,26 +171,23 @@ std::string AgencyCommResult::errorMessage () const { return std::string("unable to connect to agency"); } - TRI_json_t* json = TRI_JsonString(TRI_UNKNOWN_MEM_ZONE, _body.c_str()); + std::unique_ptr json(TRI_JsonString(TRI_UNKNOWN_MEM_ZONE, _body.c_str())); - if (0 == json) { + if (json == nullptr) { return std::string("Out of memory"); } - if (! TRI_IsObjectJson(json)) { - TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json); + if (! TRI_IsObjectJson(json.get())) { return result; } // get "message" attribute - TRI_json_t const* message = TRI_LookupObjectJson(json, "message"); + TRI_json_t const* message = TRI_LookupObjectJson(json.get(), "message"); if (TRI_IsStringJson(message)) { result = std::string(message->_value._string.data, message->_value._string.length - 1); } - TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json); - return result; } @@ -289,7 +277,7 @@ bool AgencyCommResult::parseJsonNode (TRI_json_t const* node, entry._index = 0; entry._json = 0; entry._isDir = true; - _values.emplace(std::make_pair(prefix, entry)); + _values.emplace(prefix, entry); } // is a directory, so there may be a "nodes" attribute @@ -325,7 +313,7 @@ bool AgencyCommResult::parseJsonNode (TRI_json_t const* node, entry._json = triagens::basics::JsonHelper::fromString(value->_value._string.data, value->_value._string.length - 1); entry._isDir = false; - _values.emplace(std::make_pair(prefix, entry)); + _values.emplace(prefix, entry); } } } @@ -866,27 +854,30 @@ std::string AgencyComm::generateStamp () { //////////////////////////////////////////////////////////////////////////////// AgencyEndpoint* AgencyComm::createAgencyEndpoint (std::string const& endpointSpecification) { - triagens::rest::Endpoint* endpoint = triagens::rest::Endpoint::clientFactory(endpointSpecification); + std::unique_ptr endpoint(triagens::rest::Endpoint::clientFactory(endpointSpecification)); if (endpoint == nullptr) { // could not create endpoint... return nullptr; } - triagens::httpclient::GeneralClientConnection* connection = - triagens::httpclient::GeneralClientConnection::factory(endpoint, + std::unique_ptr connection( + triagens::httpclient::GeneralClientConnection::factory(endpoint.get(), _globalConnectionOptions._requestTimeout, _globalConnectionOptions._connectTimeout, _globalConnectionOptions._connectRetries, - 0); + 0) + ); if (connection == nullptr) { - delete endpoint; - return nullptr; } - return new AgencyEndpoint(endpoint, connection); + auto ep = new AgencyEndpoint(endpoint.get(), connection.get()); + endpoint.release(); + connection.release(); + + return ep; } // ----------------------------------------------------------------------------- @@ -899,20 +890,19 @@ AgencyEndpoint* AgencyComm::createAgencyEndpoint (std::string const& endpointSpe AgencyCommResult AgencyComm::sendServerState (double ttl) { // construct JSON value { "status": "...", "time": "..." } - TRI_json_t* json = TRI_CreateObjectJson(TRI_UNKNOWN_MEM_ZONE); + std::unique_ptr json(TRI_CreateObjectJson(TRI_UNKNOWN_MEM_ZONE, 2)); if (json == nullptr) { return AgencyCommResult(); } - const std::string status = ServerState::stateToString(ServerState::instance()->getState()); - const std::string stamp = AgencyComm::generateStamp(); + std::string const status = ServerState::stateToString(ServerState::instance()->getState()); + std::string const stamp = std::move(AgencyComm::generateStamp()); - TRI_Insert3ObjectJson(TRI_UNKNOWN_MEM_ZONE, json, "status", TRI_CreateStringCopyJson(TRI_UNKNOWN_MEM_ZONE, status.c_str(), status.size())); - TRI_Insert3ObjectJson(TRI_UNKNOWN_MEM_ZONE, json, "time", TRI_CreateStringCopyJson(TRI_UNKNOWN_MEM_ZONE, stamp.c_str(), stamp.size())); + TRI_Insert3ObjectJson(TRI_UNKNOWN_MEM_ZONE, json.get(), "status", TRI_CreateStringCopyJson(TRI_UNKNOWN_MEM_ZONE, status.c_str(), status.size())); + TRI_Insert3ObjectJson(TRI_UNKNOWN_MEM_ZONE, json.get(), "time", TRI_CreateStringCopyJson(TRI_UNKNOWN_MEM_ZONE, stamp.c_str(), stamp.size())); - AgencyCommResult result(setValue("Sync/ServerStates/" + ServerState::instance()->getId(), json, ttl)); - TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json); + AgencyCommResult result(setValue("Sync/ServerStates/" + ServerState::instance()->getId(), json.get(), ttl)); return result; } @@ -952,7 +942,7 @@ bool AgencyComm::increaseVersion (std::string const& key) { } // no version key found, now set it - TRI_json_t* json = triagens::basics::JsonHelper::uint64String(TRI_UNKNOWN_MEM_ZONE, 1); + std::unique_ptr json(triagens::basics::JsonHelper::uint64String(TRI_UNKNOWN_MEM_ZONE, 1)); if (json == nullptr) { return false; @@ -960,19 +950,17 @@ bool AgencyComm::increaseVersion (std::string const& key) { result.clear(); result = casValue(key, - json, + json.get(), false, 0.0, 0.0); - TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json); - return result.successful(); } // found a version result.parse("", false); - std::map::const_iterator it = result._values.begin(); + auto it = result._values.begin(); if (it == result._values.end()) { return false; @@ -981,30 +969,26 @@ bool AgencyComm::increaseVersion (std::string const& key) { uint64_t version = triagens::basics::JsonHelper::stringUInt64((*it).second._json); // version key found, now update it - TRI_json_t* oldJson = triagens::basics::JsonHelper::uint64String(TRI_UNKNOWN_MEM_ZONE, version); + std::unique_ptr oldJson(triagens::basics::JsonHelper::uint64String(TRI_UNKNOWN_MEM_ZONE, version)); if (oldJson == nullptr) { return false; } - TRI_json_t* newJson = triagens::basics::JsonHelper::uint64String(TRI_UNKNOWN_MEM_ZONE, version + 1); + std::unique_ptr newJson(triagens::basics::JsonHelper::uint64String(TRI_UNKNOWN_MEM_ZONE, version + 1)); if (newJson == nullptr) { - TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, oldJson); return false; } result.clear(); result = casValue(key, - oldJson, - newJson, + oldJson.get(), + newJson.get(), 0.0, 0.0); - TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, newJson); - TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, oldJson); - return result.successful(); } @@ -1204,15 +1188,13 @@ AgencyCommResult AgencyComm::watchValue (std::string const& key, bool AgencyComm::lockRead (std::string const& key, double ttl, double timeout) { - TRI_json_t* json = TRI_CreateStringCopyJson(TRI_UNKNOWN_MEM_ZONE, "READ", strlen("READ")); + std::unique_ptr json(TRI_CreateStringCopyJson(TRI_UNKNOWN_MEM_ZONE, "READ", strlen("READ"))); if (json == nullptr) { return false; } - bool result = lock(key, ttl, timeout, json); - TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json); - return result; + return lock(key, ttl, timeout, json.get()); } //////////////////////////////////////////////////////////////////////////////// @@ -1222,15 +1204,13 @@ bool AgencyComm::lockRead (std::string const& key, bool AgencyComm::lockWrite (std::string const& key, double ttl, double timeout) { - TRI_json_t* json = TRI_CreateStringCopyJson(TRI_UNKNOWN_MEM_ZONE, "WRITE", strlen("WRITE")); + std::unique_ptr json(TRI_CreateStringCopyJson(TRI_UNKNOWN_MEM_ZONE, "WRITE", strlen("WRITE"))); if (json == nullptr) { return false; } - bool result = lock(key, ttl, timeout, json); - TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json); - return result; + return lock(key, ttl, timeout, json.get()); } //////////////////////////////////////////////////////////////////////////////// @@ -1239,15 +1219,13 @@ bool AgencyComm::lockWrite (std::string const& key, bool AgencyComm::unlockRead (std::string const& key, double timeout) { - TRI_json_t* json = TRI_CreateStringCopyJson(TRI_UNKNOWN_MEM_ZONE, "READ", strlen("READ")); + std::unique_ptr json(TRI_CreateStringCopyJson(TRI_UNKNOWN_MEM_ZONE, "READ", strlen("READ"))); if (json == nullptr) { return false; } - bool result = unlock(key, json, timeout); - TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json); - return result; + return unlock(key, json.get(), timeout); } //////////////////////////////////////////////////////////////////////////////// @@ -1256,15 +1234,13 @@ bool AgencyComm::unlockRead (std::string const& key, bool AgencyComm::unlockWrite (std::string const& key, double timeout) { - TRI_json_t* json = TRI_CreateStringCopyJson(TRI_UNKNOWN_MEM_ZONE, "WRITE", strlen("WRITE")); + std::unique_ptr json(TRI_CreateStringCopyJson(TRI_UNKNOWN_MEM_ZONE, "WRITE", strlen("WRITE"))); if (json == nullptr) { return false; } - bool result = unlock(key, json, timeout); - TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json); - return result; + return unlock(key, json.get(), timeout); } //////////////////////////////////////////////////////////////////////////////// @@ -1284,12 +1260,11 @@ AgencyCommResult AgencyComm::uniqid (std::string const& key, result = getValues(key, false); if (result.httpCode() == (int) triagens::rest::HttpResponse::NOT_FOUND) { - TRI_json_t* json = TRI_CreateStringCopyJson(TRI_UNKNOWN_MEM_ZONE, "0", strlen("0")); + std::unique_ptr json(TRI_CreateStringCopyJson(TRI_UNKNOWN_MEM_ZONE, "0", strlen("0"))); if (json != nullptr) { // create the key on the fly - setValue(key, json, 0.0); - TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json); + setValue(key, json.get(), 0.0); tries--; continue; @@ -1302,36 +1277,33 @@ AgencyCommResult AgencyComm::uniqid (std::string const& key, result.parse("", false); - TRI_json_t* oldJson = nullptr; + std::unique_ptr oldJson; std::map::iterator it = result._values.begin(); if (it != result._values.end()) { // steal the json - oldJson = (*it).second._json; + oldJson.reset((*it).second._json); (*it).second._json = nullptr; } else { - oldJson = TRI_CreateStringCopyJson(TRI_UNKNOWN_MEM_ZONE, "0", strlen("0")); + oldJson.reset(TRI_CreateStringCopyJson(TRI_UNKNOWN_MEM_ZONE, "0", strlen("0"))); } if (oldJson == nullptr) { return AgencyCommResult(); } - const uint64_t oldValue = triagens::basics::JsonHelper::stringUInt64(oldJson) + count; - const uint64_t newValue = oldValue + count; - TRI_json_t* newJson = triagens::basics::JsonHelper::uint64String(TRI_UNKNOWN_MEM_ZONE, newValue); + uint64_t const oldValue = triagens::basics::JsonHelper::stringUInt64(oldJson.get()) + count; + uint64_t const newValue = oldValue + count; + std::unique_ptr newJson(triagens::basics::JsonHelper::uint64String(TRI_UNKNOWN_MEM_ZONE, newValue)); if (newJson == nullptr) { - TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, oldJson); return AgencyCommResult(); } result.clear(); - result = casValue(key, oldJson, newJson, 0.0, timeout); - TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, newJson); - TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, oldJson); + result = casValue(key, oldJson.get(), newJson.get(), 0.0, timeout); if (result.successful()) { result._index = oldValue + 1; @@ -1376,23 +1348,22 @@ bool AgencyComm::lock (std::string const& key, } unsigned long sleepTime = InitialSleepTime; - const double end = TRI_microtime() + timeout; + double const end = TRI_microtime() + timeout; + + std::unique_ptr oldJson(TRI_CreateStringCopyJson(TRI_UNKNOWN_MEM_ZONE, "UNLOCKED", strlen("UNLOCKED"))); + + if (oldJson == nullptr) { + return false; + } + while (true) { - TRI_json_t* oldJson = TRI_CreateStringCopyJson(TRI_UNKNOWN_MEM_ZONE, "UNLOCKED", strlen("UNLOCKED")); - - if (oldJson == nullptr) { - return false; - } - AgencyCommResult result = casValue(key + "/Lock", - oldJson, + oldJson.get(), json, ttl, timeout); - TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, oldJson); - if (! result.successful() && result.httpCode() == (int) triagens::rest::HttpResponse::NOT_FOUND) { // key does not yet exist. create it now @@ -1413,9 +1384,7 @@ bool AgencyComm::lock (std::string const& key, sleepTime += InitialSleepTime; } - const double now = TRI_microtime(); - - if (now >= end) { + if (TRI_microtime() >= end) { return false; } } @@ -1436,23 +1405,21 @@ bool AgencyComm::unlock (std::string const& key, } unsigned long sleepTime = InitialSleepTime; - const double end = TRI_microtime() + timeout; + double const end = TRI_microtime() + timeout; + + std::unique_ptr newJson(TRI_CreateStringCopyJson(TRI_UNKNOWN_MEM_ZONE, "UNLOCKED", strlen("UNLOCKED"))); + + if (newJson == nullptr) { + return false; + } while (true) { - TRI_json_t* newJson = TRI_CreateStringCopyJson(TRI_UNKNOWN_MEM_ZONE, "UNLOCKED", strlen("UNLOCKED")); - - if (newJson == nullptr) { - return false; - } - AgencyCommResult result = casValue(key + "/Lock", json, - newJson, + newJson.get(), 0.0, timeout); - TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, newJson); - if (result.successful()) { return true; } @@ -1463,9 +1430,7 @@ bool AgencyComm::unlock (std::string const& key, sleepTime += InitialSleepTime; } - const double now = TRI_microtime(); - - if (now >= end) { + if (TRI_microtime() >= end) { return false; } } @@ -1541,7 +1506,7 @@ AgencyEndpoint* AgencyComm::popEndpoint (std::string const& endpoint) { void AgencyComm::requeueEndpoint (AgencyEndpoint* agencyEndpoint, bool wasWorking) { WRITE_LOCKER(AgencyComm::_globalLock); - const size_t numEndpoints TRI_UNUSED = _globalEndpoints.size(); + size_t const numEndpoints TRI_UNUSED = _globalEndpoints.size(); TRI_ASSERT(agencyEndpoint != nullptr); TRI_ASSERT(agencyEndpoint->_busy); @@ -1594,6 +1559,7 @@ bool AgencyComm::sendWithFailover (triagens::rest::HttpRequest::HttpRequestType { READ_LOCKER(AgencyComm::_globalLock); numEndpoints = AgencyComm::_globalEndpoints.size(); + if (numEndpoints == 0) { return false; } @@ -1603,19 +1569,31 @@ bool AgencyComm::sendWithFailover (triagens::rest::HttpRequest::HttpRequestType size_t tries = 0; std::string realUrl = url; - std::string forceEndpoint = ""; + std::string forceEndpoint; while (tries++ < numEndpoints) { AgencyEndpoint* agencyEndpoint = popEndpoint(forceEndpoint); TRI_ASSERT(agencyEndpoint != nullptr); - send(agencyEndpoint->_connection, - method, - timeout, - result, - realUrl, - body); + try { + send(agencyEndpoint->_connection, + method, + timeout, + result, + realUrl, + body); + } + catch (...) { + result._connected = false; + result._statusCode = 0; + result._message = "could not send request to agency"; + + agencyEndpoint->_connection->disconnect(); + + requeueEndpoint(agencyEndpoint, true); + return false; + } if (result._statusCode == (int) triagens::rest::HttpResponse::TEMPORARY_REDIRECT) { // sometimes the agency will return a 307 (temporary redirect) @@ -1641,7 +1619,7 @@ bool AgencyComm::sendWithFailover (triagens::rest::HttpRequest::HttpRequestType return false; } - const size_t delim = endpoint.find('/', 6); + size_t const delim = endpoint.find('/', 6); if (delim == std::string::npos) { // invalid location header @@ -1684,7 +1662,7 @@ bool AgencyComm::sendWithFailover (triagens::rest::HttpRequest::HttpRequestType // we can stop iterating over endpoints if the operation succeeded, // if a watch timed out or // if the reason for failure was a client-side error - const bool canAbort = result.successful() || + bool const canAbort = result.successful() || (isWatch && result._statusCode == 0) || (result._statusCode >= 400 && result._statusCode <= 499); @@ -1747,11 +1725,13 @@ bool AgencyComm::send (triagens::httpclient::GeneralClientConnection* connection } // send the actual request - triagens::httpclient::SimpleHttpResult* response = client.request(method, - url, - body.c_str(), - body.size(), - headers); + std::unique_ptr response( + client.request(method, + url, + body.c_str(), + body.size(), + headers + )); if (response == nullptr) { connection->disconnect(); @@ -1765,7 +1745,7 @@ bool AgencyComm::send (triagens::httpclient::GeneralClientConnection* connection connection->disconnect(); result._message = "sending request to agency failed"; LOG_TRACE("sending request to agency failed"); - delete response; + return false; } @@ -1781,8 +1761,9 @@ bool AgencyComm::send (triagens::httpclient::GeneralClientConnection* connection if (! found) { // a 307 without a location header does not make any sense - delete response; + connection->disconnect(); result._message = "invalid agency response (header missing)"; + return false; } } @@ -1804,13 +1785,12 @@ bool AgencyComm::send (triagens::httpclient::GeneralClientConnection* connection result._message.c_str(), result._body.c_str()); - delete response; - - if (result._statusCode > 399) { - connection->disconnect(); + if (result.successful()) { + return true; } - return result.successful(); + connection->disconnect(); + return false; } ////////////////////////////////////////////////////////////////////////////////