1
0
Fork 0

Fix body management in asyncRequest.

This commit is contained in:
Max Neunhoeffer 2014-01-29 13:21:53 +01:00
parent 1edcd6a9e9
commit 60f47e1f21
6 changed files with 50 additions and 59 deletions

View File

@ -133,9 +133,14 @@ OperationID ClusterComm::getOperationID () {
/// either in the callback or via poll. The caller has to call delete on /// either in the callback or via poll. The caller has to call delete on
/// the resulting ClusterCommResult*. The library takes ownerships of /// the resulting ClusterCommResult*. The library takes ownerships of
/// the pointers `headerFields` and `callback` and releases /// the pointers `headerFields` and `callback` and releases
/// the memory when the operation has been finished. It is the caller's /// the memory when the operation has been finished. If `freeBody`
/// responsibility to free the memory to which `body` points after the /// is `true`, then the library takes ownership of the pointer `body`
/// operation has finally terminated. /// as well and deletes it at the end. If `freeBody` is `false, it
/// is the caller's responsibility to ensure that the object object
/// to which `body` points is retained until the full asynchronous
/// operation is finished and has been reported back to the caller
/// and that the object is destructed after the operation has finally
/// terminated.
/// ///
/// Arguments: `clientTransactionID` is a string coming from the client /// Arguments: `clientTransactionID` is a string coming from the client
/// and describing the transaction the client is doing, `coordTransactionID` /// and describing the transaction the client is doing, `coordTransactionID`
@ -153,8 +158,8 @@ ClusterCommResult* ClusterComm::asyncRequest (
string const& destination, string const& destination,
triagens::rest::HttpRequest::HttpRequestType reqtype, triagens::rest::HttpRequest::HttpRequestType reqtype,
string const path, string const path,
char const* body, string const* body,
size_t const bodyLength, bool freeBody,
map<string, string>* headerFields, map<string, string>* headerFields,
ClusterCommCallback* callback, ClusterCommCallback* callback,
ClusterCommTimeout timeout) { ClusterCommTimeout timeout) {
@ -189,14 +194,8 @@ ClusterCommResult* ClusterComm::asyncRequest (
op->status = CL_COMM_SUBMITTED; op->status = CL_COMM_SUBMITTED;
op->reqtype = reqtype; op->reqtype = reqtype;
op->path = path; op->path = path;
if (0 != bodyLength) {
op->body = body; op->body = body;
op->bodyLength = bodyLength; op->freeBody = freeBody;
}
else {
op->body = 0;
op->bodyLength = 0;
}
op->headerFields = headerFields; op->headerFields = headerFields;
op->callback = callback; op->callback = callback;
op->endTime = timeout == 0.0 ? TRI_microtime()+24*60*60.0 op->endTime = timeout == 0.0 ? TRI_microtime()+24*60*60.0
@ -243,8 +242,7 @@ ClusterCommResult* ClusterComm::syncRequest (
string const& destination, string const& destination,
triagens::rest::HttpRequest::HttpRequestType reqtype, triagens::rest::HttpRequest::HttpRequestType reqtype,
string const& path, string const& path,
char const* body, string const& body,
size_t const bodyLength,
map<string, string> const& headerFields, map<string, string> const& headerFields,
ClusterCommTimeout timeout) { ClusterCommTimeout timeout) {
@ -256,10 +254,6 @@ ClusterCommResult* ClusterComm::syncRequest (
} while (res->operationID == 0); // just to make sure } while (res->operationID == 0); // just to make sure
res->status = CL_COMM_SENDING; res->status = CL_COMM_SENDING;
if (0 == bodyLength) {
body = 0;
}
double currentTime = TRI_microtime(); double currentTime = TRI_microtime();
double endTime = timeout == 0.0 ? currentTime+24*60*60.0 double endTime = timeout == 0.0 ? currentTime+24*60*60.0
: currentTime+timeout; : currentTime+timeout;
@ -299,23 +293,16 @@ ClusterCommResult* ClusterComm::syncRequest (
res->serverID.c_str()); res->serverID.c_str());
} }
else { else {
if (0 != body) {
LOG_DEBUG("sending %s request to DB server '%s': %s", LOG_DEBUG("sending %s request to DB server '%s': %s",
triagens::rest::HttpRequest::translateMethod(reqtype).c_str(), triagens::rest::HttpRequest::translateMethod(reqtype).c_str(),
res->serverID.c_str(), body); res->serverID.c_str(), body.c_str());
}
else {
LOG_DEBUG("sending %s request to DB server '%s'",
triagens::rest::HttpRequest::translateMethod(reqtype).c_str(),
res->serverID.c_str());
}
triagens::httpclient::SimpleHttpClient* client triagens::httpclient::SimpleHttpClient* client
= new triagens::httpclient::SimpleHttpClient( = new triagens::httpclient::SimpleHttpClient(
connection->connection, connection->connection,
endTime-currentTime, false); endTime-currentTime, false);
client->keepConnectionOnDestruction(true); client->keepConnectionOnDestruction(true);
res->result = client->request(reqtype, path, body, bodyLength, res->result = client->request(reqtype, path, body.c_str(), body.size(),
headerFields); headerFields);
if (! res->result->isComplete()) { if (! res->result->isComplete()) {
cm->brokenConnection(connection); cm->brokenConnection(connection);
@ -950,7 +937,7 @@ void ClusterCommThread::run () {
if (0 != op->body) { if (0 != op->body) {
LOG_DEBUG("sending %s request to DB server '%s': %s", LOG_DEBUG("sending %s request to DB server '%s': %s",
triagens::rest::HttpRequest::translateMethod(op->reqtype) triagens::rest::HttpRequest::translateMethod(op->reqtype)
.c_str(), op->serverID.c_str(), op->body); .c_str(), op->serverID.c_str(), op->body->c_str());
} }
else { else {
LOG_DEBUG("sending %s request to DB server '%s'", LOG_DEBUG("sending %s request to DB server '%s'",
@ -966,8 +953,15 @@ void ClusterCommThread::run () {
// We add this result to the operation struct without acquiring // We add this result to the operation struct without acquiring
// a lock, since we know that only we do such a thing: // a lock, since we know that only we do such a thing:
op->result = client->request(op->reqtype, op->path, op->body, if (0 != op->body) {
op->bodyLength, *(op->headerFields)); op->result = client->request(op->reqtype, op->path,
op->body->c_str(), op->body->size(),
*(op->headerFields));
}
else {
op->result = client->request(op->reqtype, op->path,
NULL, 0, *(op->headerFields));
}
if (! op->result->isComplete()) { if (! op->result->isComplete()) {
cm->brokenConnection(connection); cm->brokenConnection(connection);

View File

@ -173,13 +173,13 @@ namespace triagens {
struct ClusterCommOperation : public ClusterCommResult { struct ClusterCommOperation : public ClusterCommResult {
rest::HttpRequest::HttpRequestType reqtype; rest::HttpRequest::HttpRequestType reqtype;
string path; string path;
char const* body; string const* body;
size_t bodyLength; bool freeBody;
map<string, string>* headerFields; map<string, string>* headerFields;
ClusterCommCallback* callback; ClusterCommCallback* callback;
ClusterCommTimeout endTime; ClusterCommTimeout endTime;
ClusterCommOperation () : headerFields(0), callback(0) {} ClusterCommOperation () : body(0), headerFields(0), callback(0) {}
virtual ~ClusterCommOperation () { virtual ~ClusterCommOperation () {
if (_deleteOnDestruction && 0 != headerFields) { if (_deleteOnDestruction && 0 != headerFields) {
delete headerFields; delete headerFields;
@ -187,7 +187,9 @@ namespace triagens {
if (_deleteOnDestruction && 0 != callback) { if (_deleteOnDestruction && 0 != callback) {
delete callback; delete callback;
} }
if (_deleteOnDestruction && 0 != body && freeBody) {
delete body;
}
} }
}; };
@ -269,8 +271,8 @@ void ClusterCommRestCallback(string& coordinator, rest::HttpResponse* response);
string const& destination, string const& destination,
rest::HttpRequest::HttpRequestType reqtype, rest::HttpRequest::HttpRequestType reqtype,
string const path, string const path,
char const* body, string const* body,
size_t const bodyLength, bool freeBody,
map<string, string>* headerFields, map<string, string>* headerFields,
ClusterCommCallback* callback, ClusterCommCallback* callback,
ClusterCommTimeout timeout); ClusterCommTimeout timeout);
@ -285,8 +287,7 @@ void ClusterCommRestCallback(string& coordinator, rest::HttpResponse* response);
string const& destination, string const& destination,
rest::HttpRequest::HttpRequestType reqtype, rest::HttpRequest::HttpRequestType reqtype,
string const& path, string const& path,
char const* body, string const& body,
size_t const bodyLength,
map<string, string> const& headerFields, map<string, string> const& headerFields,
ClusterCommTimeout timeout); ClusterCommTimeout timeout);

View File

@ -116,8 +116,7 @@ int createDocumentOnCoordinator (
triagens::rest::HttpRequest::HTTP_REQUEST_POST, triagens::rest::HttpRequest::HTTP_REQUEST_POST,
"/_db/"+dbname+"/_api/document?collection="+ "/_db/"+dbname+"/_api/document?collection="+
StringUtils::urlEncode(shardID)+"&waitForSync="+ StringUtils::urlEncode(shardID)+"&waitForSync="+
(waitForSync ? "true" : "false"), (waitForSync ? "true" : "false"), body, headers, 60.0);
body.c_str(), body.size(), headers, 60.0);
if (res->status == CL_COMM_TIMEOUT) { if (res->status == CL_COMM_TIMEOUT) {
// No reply, we give up: // No reply, we give up:
@ -210,7 +209,7 @@ int deleteDocumentOnCoordinator (
"/_db/"+dbname+"/_api/document/"+ "/_db/"+dbname+"/_api/document/"+
StringUtils::urlEncode(shardID)+"/"+key+ StringUtils::urlEncode(shardID)+"/"+key+
"?waitForSync="+(waitForSync ? "true" : "false")+ "?waitForSync="+(waitForSync ? "true" : "false")+
revstr+policystr, NULL, 0, headers, 60.0); revstr+policystr, "", headers, 60.0);
if (res->status == CL_COMM_TIMEOUT) { if (res->status == CL_COMM_TIMEOUT) {
// No reply, we give up: // No reply, we give up:
@ -247,7 +246,7 @@ int deleteDocumentOnCoordinator (
"/_db/"+dbname+"/_api/document/"+ "/_db/"+dbname+"/_api/document/"+
StringUtils::urlEncode(it->first)+"/"+key+ StringUtils::urlEncode(it->first)+"/"+key+
"?waitForSync="+(waitForSync ? "true" : "false")+ "?waitForSync="+(waitForSync ? "true" : "false")+
revstr+policystr, NULL, 0, headers, NULL, 60.0); revstr+policystr, 0, false, headers, NULL, 60.0);
delete res; delete res;
} }
// Now listen to the results: // Now listen to the results:
@ -349,7 +348,7 @@ int getDocumentOnCoordinator (
res = cc->syncRequest("", TRI_NewTickServer(), "shard:"+shardID, reqType, res = cc->syncRequest("", TRI_NewTickServer(), "shard:"+shardID, reqType,
"/_db/"+dbname+"/_api/document/"+ "/_db/"+dbname+"/_api/document/"+
StringUtils::urlEncode(shardID)+"/"+key+ StringUtils::urlEncode(shardID)+"/"+key+
revstr, NULL, 0, headers, 60.0); revstr, "", headers, 60.0);
if (res->status == CL_COMM_TIMEOUT) { if (res->status == CL_COMM_TIMEOUT) {
// No reply, we give up: // No reply, we give up:
@ -391,7 +390,7 @@ int getDocumentOnCoordinator (
res = cc->asyncRequest("", coordTransactionID, "shard:"+it->first, reqType, res = cc->asyncRequest("", coordTransactionID, "shard:"+it->first, reqType,
"/_db/"+dbname+"/_api/document/"+ "/_db/"+dbname+"/_api/document/"+
StringUtils::urlEncode(it->first)+"/"+key+ StringUtils::urlEncode(it->first)+"/"+key+
revstr, NULL, 0, headers, NULL, 60.0); revstr, 0, false, headers, NULL, 60.0);
delete res; delete res;
} }
// Now listen to the results: // Now listen to the results:
@ -524,7 +523,7 @@ int modifyDocumentOnCoordinator (
res = cc->syncRequest("", TRI_NewTickServer(), "shard:"+shardID, reqType, res = cc->syncRequest("", TRI_NewTickServer(), "shard:"+shardID, reqType,
"/_db/"+dbname+"/_api/document/"+ "/_db/"+dbname+"/_api/document/"+
StringUtils::urlEncode(shardID)+"/"+key+ StringUtils::urlEncode(shardID)+"/"+key+
revstr, body.c_str(), body.size(), headers, 60.0); revstr, body, headers, 60.0);
if (res->status == CL_COMM_TIMEOUT) { if (res->status == CL_COMM_TIMEOUT) {
// No reply, we give up: // No reply, we give up:
@ -564,7 +563,7 @@ int modifyDocumentOnCoordinator (
res = cc->asyncRequest("", coordTransactionID, "shard:"+it->first, reqType, res = cc->asyncRequest("", coordTransactionID, "shard:"+it->first, reqType,
"/_db/"+dbname+"/_api/document/"+ "/_db/"+dbname+"/_api/document/"+
StringUtils::urlEncode(it->first)+"/"+key+revstr, StringUtils::urlEncode(it->first)+"/"+key+revstr,
body.c_str(), body.size(), headers, NULL, 60.0); &body, false, headers, NULL, 60.0);
delete res; delete res;
} }
// Now listen to the results: // Now listen to the results:

View File

@ -1341,20 +1341,19 @@ static v8::Handle<v8::Value> JS_AsyncRequest (v8::Arguments const& argv) {
triagens::rest::HttpRequest::HttpRequestType reqType; triagens::rest::HttpRequest::HttpRequestType reqType;
string destination; string destination;
string path; string path;
string body; string *body = new string();
map<string, string>* headerFields = new map<string, string>; map<string, string>* headerFields = new map<string, string>;
ClientTransactionID clientTransactionID; ClientTransactionID clientTransactionID;
CoordTransactionID coordTransactionID; CoordTransactionID coordTransactionID;
double timeout; double timeout;
PrepareClusterCommRequest(argv, reqType, destination, path, body,headerFields, PrepareClusterCommRequest(argv, reqType, destination, path,*body,headerFields,
clientTransactionID, coordTransactionID, timeout); clientTransactionID, coordTransactionID, timeout);
ClusterCommResult const* res; ClusterCommResult const* res;
res = cc->asyncRequest(clientTransactionID, coordTransactionID, destination, res = cc->asyncRequest(clientTransactionID, coordTransactionID, destination,
reqType, path, body.c_str(), body.size(), reqType, path, body, true, headerFields, 0, timeout);
headerFields, 0, timeout);
if (res == 0) { if (res == 0) {
TRI_V8_EXCEPTION_MESSAGE(scope, TRI_ERROR_INTERNAL, TRI_V8_EXCEPTION_MESSAGE(scope, TRI_ERROR_INTERNAL,
@ -1411,8 +1410,7 @@ static v8::Handle<v8::Value> JS_SyncRequest (v8::Arguments const& argv) {
ClusterCommResult const* res; ClusterCommResult const* res;
res = cc->syncRequest(clientTransactionID, coordTransactionID, destination, res = cc->syncRequest(clientTransactionID, coordTransactionID, destination,
reqType, path, body.c_str(), body.size(), reqType, path, body, *headerFields, timeout);
*headerFields, timeout);
delete headerFields; delete headerFields;

View File

@ -898,7 +898,7 @@ static v8::Handle<v8::Value> JS_ShardingTest (v8::Arguments const& argv) {
if (asyncMode) { if (asyncMode) {
res = cc->asyncRequest(clientTransactionId,TRI_NewTickServer(),destination, res = cc->asyncRequest(clientTransactionId,TRI_NewTickServer(),destination,
reqType, path, body.c_str(), body.size(), headerFields, reqType, path, &body, false, headerFields,
new CallbackTest("Hello Callback"), timeout); new CallbackTest("Hello Callback"), timeout);
if (res == 0) { if (res == 0) {
@ -990,8 +990,7 @@ static v8::Handle<v8::Value> JS_ShardingTest (v8::Arguments const& argv) {
} }
else { // synchronous mode else { // synchronous mode
res = cc->syncRequest(clientTransactionId, TRI_NewTickServer(),destination, res = cc->syncRequest(clientTransactionId, TRI_NewTickServer(),destination,
reqType, path, body.c_str(), body.size(), reqType, path, body, *headerFields, timeout);
*headerFields, timeout);
delete headerFields; delete headerFields;
if (res != 0) { if (res != 0) {
LOG_DEBUG("JS_ShardingTest: request has been sent synchronously, " LOG_DEBUG("JS_ShardingTest: request has been sent synchronously, "

View File

@ -8514,7 +8514,7 @@ static v8::Handle<v8::Value> JS_ListDatabases_Coordinator
headers["Authentication"] = TRI_ObjectToString(argv[2]); headers["Authentication"] = TRI_ObjectToString(argv[2]);
res = cc->syncRequest("", 0, "server:"+sid, res = cc->syncRequest("", 0, "server:"+sid,
triagens::rest::HttpRequest::HTTP_REQUEST_GET, triagens::rest::HttpRequest::HTTP_REQUEST_GET,
"/_api/database/user", 0, 0, headers, 0.0); "/_api/database/user", 0, headers, 0.0);
if (res->status == CL_COMM_SENT) { if (res->status == CL_COMM_SENT) {
// We got an array back as JSON, let's parse it and build a v8 // We got an array back as JSON, let's parse it and build a v8
string body = res->result->getBody().str(); string body = res->result->getBody().str();