1
0
Fork 0

ClusterInfo enjoys clientIds

This commit is contained in:
Kaveh Vahedipour 2017-01-19 14:51:29 +01:00
parent 54ccffc0ee
commit 67cd7deaaa
3 changed files with 35 additions and 24 deletions

View File

@ -1075,9 +1075,9 @@ AgencyCommResult AgencyComm::sendTransactionWithFailover(
AgencyCommResult result = sendWithFailover(
arangodb::rest::RequestType::POST,
(timeout == 0.0 ? AgencyCommManager::CONNECTION_OPTIONS._requestTimeout
: timeout),
url, builder.slice().toJson());
(timeout == 0.0) ?
AgencyCommManager::CONNECTION_OPTIONS._requestTimeout : timeout,
url, builder.slice().toJson(), transaction.getClientId());
if (!result.successful() && result.httpCode() !=
(int)arangodb::rest::ResponseCode::PRECONDITION_FAILED) {
@ -1290,7 +1290,8 @@ void AgencyComm::updateEndpoints(arangodb::velocypack::Slice const& current) {
AgencyCommResult AgencyComm::sendWithFailover(
arangodb::rest::RequestType method, double const timeout,
std::string const& initialUrl, std::string const& body) {
std::string const& initialUrl, std::string const& body,
std::string const& clientId) {
std::string endpoint;
std::unique_ptr<GeneralClientConnection> connection =
@ -1331,7 +1332,7 @@ AgencyCommResult AgencyComm::sendWithFailover(
++tries;
if (connection == nullptr) {
AgencyCommResult result(400, "No endpoints for agency found.");
AgencyCommResult result(400, "No endpoints for agency found.", clientId);
LOG_TOPIC(ERR, Logger::AGENCYCOMM) << result._message;
return result;
}
@ -1348,7 +1349,7 @@ AgencyCommResult AgencyComm::sendWithFailover(
// try to send; if we fail completely, do not retry
try {
result = send(connection.get(), method, conTimeout, url, body);
result = send(connection.get(), method, conTimeout, url, body, clientId);
} catch (...) {
AgencyCommManager::MANAGER->failed(std::move(connection), endpoint);
endpoint.clear();
@ -1408,7 +1409,7 @@ AgencyCommResult AgencyComm::sendWithFailover(
AgencyCommResult AgencyComm::send(
arangodb::httpclient::GeneralClientConnection* connection,
arangodb::rest::RequestType method, double timeout, std::string const& url,
std::string const& body) {
std::string const& body, std::string const& clientId) {
TRI_ASSERT(connection != nullptr);
if (method == arangodb::rest::RequestType::GET ||
@ -1422,6 +1423,9 @@ AgencyCommResult AgencyComm::send(
AgencyCommResult result;
result._connected = false;
result._statusCode = 0;
if (!clientId.empty()) {
result._clientId = clientId;
}
LOG_TOPIC(TRACE, Logger::AGENCYCOMM)
<< "sending " << arangodb::HttpRequest::translateMethod(method)

View File

@ -261,6 +261,7 @@ class AgencyCommResult {
private:
std::shared_ptr<velocypack::Builder> _vpack;
public:
std::string _clientId;
};
@ -637,7 +638,8 @@ class AgencyComm {
bool ensureStructureInitialized();
AgencyCommResult sendWithFailover(arangodb::rest::RequestType, double,
std::string const&, std::string const&);
std::string const&, std::string const&,
std::string const& clientId = std::string());
private:
bool lock(std::string const&, double, double,
@ -646,7 +648,8 @@ class AgencyComm {
bool unlock(std::string const&, arangodb::velocypack::Slice const&, double);
AgencyCommResult send(httpclient::GeneralClientConnection*, rest::RequestType,
double, std::string const&, std::string const&);
double, std::string const&, std::string const&,
std::string const& clientId = std::string());
bool tryInitializeStructure(std::string const& jwtSecret);

View File

@ -903,10 +903,11 @@ int ClusterInfo::createDatabaseCoordinator(std::string const& name,
(int)arangodb::rest::ResponseCode::PRECONDITION_FAILED) {
return setErrormsg(TRI_ERROR_ARANGO_DUPLICATE_NAME, errorMsg);
}
errorMsg = std::string("Failed to create database in ") + __FILE__ + ":" + std::to_string(__LINE__);
errorMsg = std::string("Failed to create database with ") +
res._clientId + " at " + __FILE__ + ":" + std::to_string(__LINE__);
return setErrormsg(TRI_ERROR_CLUSTER_COULD_NOT_CREATE_DATABASE_IN_PLAN,
errorMsg);
}
}
// Now update our own cache of planned databases:
loadPlan();
@ -1171,6 +1172,7 @@ int ClusterInfo::createCollectionCoordinator(std::string const& databaseName,
LOG_TOPIC(ERR, Logger::CLUSTER) << "Could not get agency dump!";
}
} else {
errorMsg += std::string("\nClientId ") + res._clientId;
errorMsg += std::string("\n") + __FILE__ + std::to_string(__LINE__);
errorMsg += std::string("\n") + res.errorMessage();
errorMsg += std::string("\n") + res.errorDetails();
@ -1295,9 +1297,9 @@ int ClusterInfo::dropCollectionCoordinator(std::string const& databaseName,
res = ac.sendTransactionWithFailover(trans);
if (!res.successful()) {
LOG(ERR) << "###################### WAS ERLAUBE? ####################";
AgencyCommResult ag = ac.getValues("");
if (ag.successful()) {
LOG_TOPIC(ERR, Logger::CLUSTER) << "ClientId: " << res._clientId;
LOG_TOPIC(ERR, Logger::CLUSTER) << "Agency dump:\n"
<< ag.slice().toJson();
} else {
@ -1761,6 +1763,7 @@ int ClusterInfo::ensureIndexCoordinator(
AgencyCommResult result = ac.sendTransactionWithFailover(trx, 0.0);
if (!result.successful()) {
errorMsg += "ClientId: " + result._clientId;
errorMsg += std::string(" ") + __FILE__ + ":" + std::to_string(__LINE__);
resultBuilder = *resBuilder;
return TRI_ERROR_CLUSTER_COULD_NOT_CREATE_INDEX_IN_PLAN;
@ -1981,6 +1984,7 @@ int ClusterInfo::dropIndexCoordinator(std::string const& databaseName,
AgencyCommResult result = ac.sendTransactionWithFailover(trx, 0.0);
if (!result.successful()) {
errorMsg += "ClientId: " + result._clientId;
errorMsg += std::string(" ") + __FILE__ + ":" + std::to_string(__LINE__);
events::DropIndex(collectionID, idString,
TRI_ERROR_CLUSTER_COULD_NOT_DROP_INDEX_IN_PLAN);
@ -2050,20 +2054,20 @@ void ClusterInfo::loadServers() {
result.slice()[0].get(
std::vector<std::string>(
{AgencyCommManager::path(), "Current", "ServersRegistered"}));
velocypack::Slice serversAliases =
result.slice()[0].get(
std::vector<std::string>(
{AgencyCommManager::path(), "Target", "MapUniqueToShortID"}));
if (serversRegistered.isObject()) {
if (serversRegistered.isObject()) {
decltype(_servers) newServers;
decltype(_serverAliases) newAliases;
size_t i = 0;
for (auto const& res : VPackObjectIterator(serversRegistered)) {
velocypack::Slice slice = res.value;
if (slice.isObject() && slice.hasKey("endpoint")) {
std::string server =
arangodb::basics::VelocyPackHelper::getStringValue(
@ -2080,7 +2084,7 @@ void ClusterInfo::loadServers() {
newServers.emplace(std::make_pair(res.key.copyString(), server));
}
}
// Now set the new value:
{
WRITE_LOCKER(writeLocker, _serversProt.lock);
@ -2092,13 +2096,13 @@ void ClusterInfo::loadServers() {
return;
}
}
LOG_TOPIC(DEBUG, Logger::CLUSTER)
<< "Error while loading " << prefixServers
<< " httpCode: " << result.httpCode()
<< " errorCode: " << result.errorCode()
<< " errorMessage: " << result.errorMessage()
<< " body: " << result.body();
<< "Error while loading " << prefixServers
<< " httpCode: " << result.httpCode()
<< " errorCode: " << result.errorCode()
<< " errorMessage: " << result.errorMessage()
<< " body: " << result.body();
}
////////////////////////////////////////////////////////////////////////////////