1
0
Fork 0

Improve ClusterComm library by allowing to talk to DB servers directly.

This commit is contained in:
Max Neunhoeffer 2014-01-09 14:55:29 +01:00
parent e01ac6ff5b
commit 65c3cc6484
3 changed files with 104 additions and 73 deletions

View File

@ -375,12 +375,21 @@ void ClusterComm::closeUnusedConnections (double limit) {
/// the memory when the operation has been finished. It is the caller's
/// responsibility to free the memory to which `body` points after the
/// operation has finally terminated.
///
/// Arguments: `clientTransactionID` is a string coming from the client
/// and describing the transaction the client is doing, `coordTransactionID`
/// is a number describing the transaction the coordinator is doing,
/// `destination` is a string that either starts with "shard:" followed
/// by a shardID identifying the shard this request is sent to,
/// actually, this is internally translated into a server ID. It is also
/// possible to specify a DB server ID directly here in the form of "server:"
/// followed by a serverID.
////////////////////////////////////////////////////////////////////////////////
//
ClusterCommResult* ClusterComm::asyncRequest (
ClientTransactionID const clientTransactionID,
CoordTransactionID const coordTransactionID,
ShardID const shardID,
string const& destination,
rest::HttpRequest::HttpRequestType reqtype,
string const path,
char const* body,
@ -395,9 +404,19 @@ ClusterCommResult* ClusterComm::asyncRequest (
do {
op->operationID = getOperationID();
} while (op->operationID == 0); // just to make sure
op->shardID = shardID;
op->serverID = ClusterInfo::instance()->getResponsibleServer(
shardID);
if (destination.substr(0,6) == "shard:") {
op->shardID = destination.substr(6);
op->serverID = ClusterInfo::instance()->getResponsibleServer(op->shardID);
LOG_DEBUG("Responsible server: %s", op->serverID.c_str());
}
else if (destination.substr(0,7) == "server:") {
op->shardID = "";
op->serverID = destination.substr(7);
}
else {
op->shardID = "";
op->serverID = "";
}
// Add the header fields for asynchronous mode:
(*headerFields)["X-Arango-Async"] = "store";
@ -447,12 +466,19 @@ ClusterCommResult* ClusterComm::asyncRequest (
/// object that only says "timeout". Note that the ClusterComm library
/// does not keep a record of this operation, in particular, you cannot
/// use @ref enquire to ask about it.
///
/// Arguments: `clientTransactionID` is a string coming from the client
/// and describing the transaction the client is doing, `coordTransactionID`
/// is a number describing the transaction the coordinator is doing,
/// shardID is a string that identifies the shard this request is sent to,
/// actually, this is internally translated into a server ID. It is also
/// possible to specify a DB server ID directly here.
////////////////////////////////////////////////////////////////////////////////
ClusterCommResult* ClusterComm::syncRequest (
ClientTransactionID const& clientTransactionID,
CoordTransactionID const coordTransactionID,
ShardID const& shardID,
string const& destination,
triagens::rest::HttpRequest::HttpRequestType reqtype,
string const& path,
char const* body,
@ -466,7 +492,6 @@ ClusterCommResult* ClusterComm::syncRequest (
do {
res->operationID = getOperationID();
} while (res->operationID == 0); // just to make sure
res->shardID = shardID;
res->status = CL_COMM_SENDING;
if (0 == bodyLength) {
@ -477,57 +502,67 @@ ClusterCommResult* ClusterComm::syncRequest (
double endTime = timeout == 0.0 ? currentTime+24*60*60.0
: currentTime+timeout;
res->serverID = ClusterInfo::instance()->getResponsibleServer(shardID);
LOG_DEBUG("Responsible server: %s", res->serverID.c_str());
if (res->serverID == "") {
res->status = CL_COMM_ERROR;
if (destination.substr(0,6) == "shard:") {
res->shardID = destination.substr(6);
res->serverID = ClusterInfo::instance()->getResponsibleServer(res->shardID);
LOG_DEBUG("Responsible server: %s", res->serverID.c_str());
if (res->serverID == "") {
res->status = CL_COMM_ERROR;
return res;
}
}
else if (destination.substr(0,7) == "server:") {
res->shardID = "";
res->serverID = destination.substr(7);
}
else {
// We need a connection to this server:
SingleServerConnection* connection = getConnection(res->serverID);
if (0 == connection) {
res->status = CL_COMM_ERROR;
LOG_ERROR("cannot create connection to server '%s'",
res->serverID.c_str());
res->status = CL_COMM_ERROR;
return res;
}
// We need a connection to this server:
SingleServerConnection* connection = getConnection(res->serverID);
if (0 == connection) {
res->status = CL_COMM_ERROR;
LOG_ERROR("cannot create connection to server '%s'",
res->serverID.c_str());
}
else {
if (0 != body) {
LOG_DEBUG("sending %s request to DB server '%s': %s",
triagens::rest::HttpRequest::translateMethod(reqtype).c_str(),
res->serverID.c_str(), body);
}
else {
if (0 != body) {
LOG_DEBUG("sending %s request to DB server '%s': %s",
triagens::rest::HttpRequest::translateMethod(reqtype).c_str(),
res->serverID.c_str(), body);
}
else {
LOG_DEBUG("sending %s request to DB server '%s'",
triagens::rest::HttpRequest::translateMethod(reqtype).c_str(),
res->serverID.c_str());
}
triagens::httpclient::SimpleHttpClient* client
= new triagens::httpclient::SimpleHttpClient(
connection->connection,
endTime-currentTime, false);
LOG_DEBUG("sending %s request to DB server '%s'",
triagens::rest::HttpRequest::translateMethod(reqtype).c_str(),
res->serverID.c_str());
}
triagens::httpclient::SimpleHttpClient* client
= new triagens::httpclient::SimpleHttpClient(
connection->connection,
endTime-currentTime, false);
res->result = client->request(reqtype, path, body, bodyLength,
headerFields);
if (res->result == 0 || ! res->result->isComplete()) {
brokenConnection(connection);
res->result = client->request(reqtype, path, body, bodyLength,
headerFields);
if (res->result == 0 || ! res->result->isComplete()) {
brokenConnection(connection);
res->status = CL_COMM_ERROR;
}
else {
returnConnection(connection);
if (res->result->wasHttpError()) {
res->status = CL_COMM_ERROR;
}
else {
returnConnection(connection);
if (res->result->wasHttpError()) {
res->status = CL_COMM_ERROR;
}
else if (client->getErrorMessage() ==
"Request timeout reached") {
res->status = CL_COMM_TIMEOUT;
}
else if (client->getErrorMessage() != "") {
res->status = CL_COMM_ERROR;
}
else if (client->getErrorMessage() ==
"Request timeout reached") {
res->status = CL_COMM_TIMEOUT;
}
else if (client->getErrorMessage() != "") {
res->status = CL_COMM_ERROR;
}
delete client;
}
delete client;
}
if (res->status == CL_COMM_SENDING) {
// Everything was OK
@ -1105,32 +1140,28 @@ void ClusterCommThread::run () {
op->status = CL_COMM_TIMEOUT;
}
else {
// First find the server to which the request goes from the shardID:
ServerID server = ClusterInfo::instance()->getResponsibleServer(
op->shardID);
LOG_DEBUG("Responsible server: %s", server.c_str());
if (server == "") {
if (op->serverID == "") {
op->status = CL_COMM_ERROR;
}
else {
// We need a connection to this server:
ClusterComm::SingleServerConnection* connection
= cc->getConnection(server);
= cc->getConnection(op->serverID);
if (0 == connection) {
op->status = CL_COMM_ERROR;
LOG_ERROR("cannot create connection to server '%s'",
server.c_str());
op->serverID.c_str());
}
else {
if (0 != op->body) {
LOG_DEBUG("sending %s request to DB server '%s': %s",
triagens::rest::HttpRequest::translateMethod(op->reqtype)
.c_str(), server.c_str(), op->body);
.c_str(), op->serverID.c_str(), op->body);
}
else {
LOG_DEBUG("sending %s request to DB server '%s'",
triagens::rest::HttpRequest::translateMethod(op->reqtype)
.c_str(), server.c_str());
.c_str(), op->serverID.c_str());
}
triagens::httpclient::SimpleHttpClient* client

View File

@ -274,7 +274,7 @@ void ClusterCommRestCallback(string& coordinator, rest::HttpResponse* response);
ClusterCommResult* asyncRequest (
ClientTransactionID const clientTransactionID,
CoordTransactionID const coordTransactionID,
ShardID const shardID,
string const& destination,
rest::HttpRequest::HttpRequestType reqtype,
string const path,
char const* body,
@ -290,7 +290,7 @@ void ClusterCommRestCallback(string& coordinator, rest::HttpResponse* response);
ClusterCommResult* syncRequest (
ClientTransactionID const& clientTransactionID,
CoordTransactionID const coordTransactionID,
ShardID const& shardID,
string const& destination,
rest::HttpRequest::HttpRequestType reqtype,
string const& path,
char const* body,

View File

@ -1013,7 +1013,7 @@ static v8::Handle<v8::Value> JS_StatusServerState (v8::Arguments const& argv) {
static void PrepareClusterCommRequest (
v8::Arguments const& argv,
triagens::rest::HttpRequest::HttpRequestType& reqType,
ShardID& shardID,
string& destination,
string& path,
string& body,
map<string, string>* headerFields,
@ -1034,12 +1034,12 @@ static void PrepareClusterCommRequest (
}
}
shardID.clear();
destination.clear();
if (argv.Length() > 1) {
shardID = TRI_ObjectToString(argv[1]);
destination = TRI_ObjectToString(argv[1]);
}
if (shardID == "") {
shardID = "shardBlubb";
if (destination == "") {
destination = "shard:shardBlubb";
}
string dbname;
@ -1206,7 +1206,7 @@ static v8::Handle<v8::Value> JS_AsyncRequest (v8::Arguments const& argv) {
if (argv.Length() < 4 || argv.Length() > 7) {
TRI_V8_EXCEPTION_USAGE(scope, "asyncRequest("
"reqType, shardID, dbname, path, body, headers, options)");
"reqType, destination, dbname, path, body, headers, options)");
}
// Possible options:
// - clientTransactionID (string)
@ -1225,7 +1225,7 @@ static v8::Handle<v8::Value> JS_AsyncRequest (v8::Arguments const& argv) {
}
triagens::rest::HttpRequest::HttpRequestType reqType;
ShardID shardID;
string destination;
string path;
string body;
map<string, string>* headerFields = new map<string, string>;
@ -1233,12 +1233,12 @@ static v8::Handle<v8::Value> JS_AsyncRequest (v8::Arguments const& argv) {
CoordTransactionID coordTransactionID;
double timeout;
PrepareClusterCommRequest(argv, reqType, shardID, path, body, headerFields,
PrepareClusterCommRequest(argv, reqType, destination, path, body,headerFields,
clientTransactionID, coordTransactionID, timeout);
ClusterCommResult const* res;
res = cc->asyncRequest(clientTransactionID, coordTransactionID, shardID,
res = cc->asyncRequest(clientTransactionID, coordTransactionID, destination,
reqType, path, body.c_str(), body.size(),
headerFields, 0, timeout);
@ -1264,7 +1264,7 @@ static v8::Handle<v8::Value> JS_SyncRequest (v8::Arguments const& argv) {
if (argv.Length() < 4 || argv.Length() > 7) {
TRI_V8_EXCEPTION_USAGE(scope, "syncRequest("
"reqType, shardID, dbname, path, body, headers, options)");
"reqType, destination, dbname, path, body, headers, options)");
}
// Possible options:
// - clientTransactionID (string)
@ -1283,7 +1283,7 @@ static v8::Handle<v8::Value> JS_SyncRequest (v8::Arguments const& argv) {
}
triagens::rest::HttpRequest::HttpRequestType reqType;
ShardID shardID;
string destination;
string path;
string body;
map<string, string>* headerFields = new map<string, string>;
@ -1291,12 +1291,12 @@ static v8::Handle<v8::Value> JS_SyncRequest (v8::Arguments const& argv) {
CoordTransactionID coordTransactionID;
double timeout;
PrepareClusterCommRequest(argv, reqType, shardID, path, body, headerFields,
PrepareClusterCommRequest(argv, reqType, destination, path, body,headerFields,
clientTransactionID, coordTransactionID, timeout);
ClusterCommResult const* res;
res = cc->syncRequest(clientTransactionID, coordTransactionID, shardID,
res = cc->syncRequest(clientTransactionID, coordTransactionID, destination,
reqType, path, body.c_str(), body.size(),
*headerFields, timeout);