1
0
Fork 0

Various memory leaks and callback improvements.

This commit is contained in:
Max Neunhoeffer 2013-12-27 13:15:52 +01:00
parent f075ba8ae6
commit 62976629f0
5 changed files with 161 additions and 47 deletions

View File

@ -113,21 +113,23 @@ std::string AgencyCommResult::errorMessage () const {
TRI_json_t* json = TRI_JsonString(TRI_UNKNOWN_MEM_ZONE, _body.c_str());
if (0 == json) {
result = "OUT OF MEMORY";
return result;
}
if (! TRI_IsArrayJson(json)) {
if (json != 0) {
TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json);
}
TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json);
return result;
}
// get "message" attribute
TRI_json_t const* message = TRI_LookupArrayJson(json, "message");
if (! TRI_IsStringJson(message)) {
return result;
if (TRI_IsStringJson(message)) {
result = std::string(message->_value._string.data, message->_value._string.length - 1);
}
result = std::string(message->_value._string.data, message->_value._string.length - 1);
TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json);
return result;

View File

@ -73,9 +73,16 @@ ClusterComm::ClusterComm () {
}
ClusterComm::~ClusterComm () {
// FIXME: Delete all stuff in queues, close all connections
_backgroundThread->stop();
_backgroundThread->shutdown();
delete _backgroundThread;
_backgroundThread = 0;
WRITE_LOCKER(allLock);
map<ServerID,ServerConnections*>::iterator i;
for (i = allConnections.begin(); i != allConnections.end(); ++i) {
delete i->second;
}
cleanupAllQueues();
}
ClusterComm* ClusterComm::_theinstance = 0;
@ -122,7 +129,9 @@ ClusterComm::getConnection(ServerID& serverID) {
ServerConnections* s;
SingleServerConnection* c;
// First find a collections list:
cout << "getConnection: find connections list" << endl;
// First find a connections list:
{
WRITE_LOCKER(allLock);
@ -138,6 +147,8 @@ ClusterComm::getConnection(ServerID& serverID) {
assert(s != 0);
cout << "getConnection: find unused one" << endl;
// Now get an unused one:
{
WRITE_LOCKER(s->lock);
@ -148,9 +159,13 @@ ClusterComm::getConnection(ServerID& serverID) {
}
}
cout << "getConnection: need to open a new one" << endl;
// We need to open a new one:
string a = ClusterState::instance()->getServerEndpoint(serverID);
cout << "getConnection: have server endpoint: " << a << endl;
if (a == "") {
// Unknown server address, probably not yet connected
return 0;
@ -159,6 +174,7 @@ ClusterComm::getConnection(ServerID& serverID) {
if (0 == e) {
return 0;
}
cout << "getConnection: made endpoint " << endl;
triagens::httpclient::GeneralClientConnection*
g = triagens::httpclient::GeneralClientConnection::factory(
e,
@ -170,6 +186,7 @@ ClusterComm::getConnection(ServerID& serverID) {
delete e;
return 0;
}
cout << "getConnection: made general connection" << endl;
c = new SingleServerConnection(g,e,serverID);
if (0 == c) {
delete g;
@ -177,12 +194,15 @@ ClusterComm::getConnection(ServerID& serverID) {
return 0;
}
cout << "getConnection: made singleserverconnection" << endl;
// Now put it into our administration:
{
WRITE_LOCKER(s->lock);
s->connections.push_back(c);
}
c->lastUsed = time(0);
cout << "getConnection: registered it" << endl;
return c;
}
@ -252,7 +272,7 @@ void ClusterComm::brokenConnection(SingleServerConnection* c) {
delete c;
}
void ClusterComm::closeUnusedConnections () {
void ClusterComm::closeUnusedConnections (double limit) {
WRITE_LOCKER(allLock);
map<ServerID,ServerConnections*>::iterator s;
list<SingleServerConnection*>::iterator i;
@ -268,7 +288,7 @@ void ClusterComm::closeUnusedConnections () {
WRITE_LOCKER(sc->lock);
haveprev = false;
for (i = sc->unused.begin(); i != sc->unused.end(); ) {
if (t - (*i)->lastUsed > 120) {
if (t - (*i)->lastUsed > limit) {
vector<SingleServerConnection*>::iterator j;
for (j = sc->connections.begin(); j != sc->connections.end(); j++) {
if (*j == *i) {
@ -631,11 +651,12 @@ void ClusterComm::asyncAnswer (string& coordinatorHeader,
start = pos+1;
operationID = basics::StringUtils::uint64(coordinatorHeader.substr(start));
cout << "Hallo4" << endl;
cout << "AsyncAnswer: need connection" << endl;
// Now find the connection to which the request goes from the coordinatorID:
ClusterComm::SingleServerConnection* connection
= getConnection(coordinatorID);
cout << "AsyncAnswer: got answer about connection" << endl;
if (0 == connection) {
cout << "asyncAnswer: did not get connection" << endl;
LOG_ERROR("asyncAnswer: cannot create connection to server '%s'",
@ -648,12 +669,14 @@ void ClusterComm::asyncAnswer (string& coordinatorHeader,
char const* body = responseToSend->body().c_str();
size_t len = responseToSend->body().length();
cout << "asyncAnswer: sending PUT request to DB server " << coordinatorID << endl;
LOG_TRACE("asyncAnswer: sending PUT request to DB server '%s'",
coordinatorID.c_str());
cout << "asyncAnswer: initialising client" << endl;
triagens::httpclient::SimpleHttpClient client(
triagens::httpclient::SimpleHttpClient* client
= new triagens::httpclient::SimpleHttpClient(
connection->connection,
_globalConnectionOptions._singleRequestTimeout,
false);
@ -663,13 +686,15 @@ void ClusterComm::asyncAnswer (string& coordinatorHeader,
// We add this result to the operation struct without acquiring
// a lock, since we know that only we do such a thing:
httpclient::SimpleHttpResult* result =
client.request(rest::HttpRequest::HTTP_REQUEST_PUT,
"/_api/shard-comm", body, len, headers);
client->request(rest::HttpRequest::HTTP_REQUEST_PUT,
"/_api/shard-comm", body, len, headers);
cout << "In asyncAnswer, error msg: " << endl
<< client.getErrorMessage() << endl
<< client->getErrorMessage() << endl
<< result->getResultTypeMessage() << endl;
// FIXME: handle case that connection was no good and the request
// failed.
delete result;
delete client;
returnConnection(connection);
}
@ -718,11 +743,20 @@ string ClusterComm::processAnswer(string& coordinatorHeader,
op = *(i->second);
op->answer = answer;
op->status = CL_COMM_RECEIVED;
somethingReceived.broadcast();
// Do we have to do a callback?
if (0 != op->callback) {
if ((*op->callback)(static_cast<ClusterCommResult*>(op))) {
// This is fully processed, so let's remove it from the queue:
QueueIterator q = i->second;
receivedByOpID.erase(i);
received.erase(q);
delete op;
}
}
}
else {
// We have to look in the send queue as well, as it might not yet
// have been moved to the received queue. Note however, that it must
// have been moved to the received queue. Note however that it must
// have been fully sent, so this is highly unlikely, but not impossible.
basics::ConditionLocker sendlocker(&somethingToSend);
i = toSendByOpID.find(operationID);
@ -730,7 +764,15 @@ string ClusterComm::processAnswer(string& coordinatorHeader,
op = *(i->second);
op->answer = answer;
op->status = CL_COMM_RECEIVED;
somethingReceived.broadcast();
if (0 != op->callback) {
if ((*op->callback)(static_cast<ClusterCommResult*>(op))) {
// This is fully processed, so let's remove it from the queue:
QueueIterator q = i->second;
toSendByOpID.erase(i);
toSend.erase(q);
delete op;
}
}
}
else {
// Nothing known about the request, get rid of it:
@ -740,6 +782,8 @@ string ClusterComm::processAnswer(string& coordinatorHeader,
}
}
// Finally tell the others:
somethingReceived.broadcast();
cout << "end of processAnswer" << endl;
return string("");
}
@ -786,6 +830,26 @@ bool ClusterComm::moveFromSendToReceived (OperationID operationID) {
return true;
}
void ClusterComm::cleanupAllQueues() {
QueueIterator i;
{
basics::ConditionLocker locker(&somethingToSend);
for (i = toSend.begin(); i != toSend.end(); ++i) {
delete (*i);
}
toSendByOpID.clear();
toSend.clear();
}
{
basics::ConditionLocker locker(&somethingReceived);
for (i = received.begin(); i != received.end(); ++i) {
delete (*i);
}
receivedByOpID.clear();
received.clear();
}
}
// -----------------------------------------------------------------------------
// --SECTION-- ClusterCommThread
// -----------------------------------------------------------------------------
@ -863,10 +927,10 @@ void ClusterCommThread::run () {
}
else {
// We need a connection to this server:
cout << "Urgh1" << endl;
cout << "Sender: need connection" << endl;
ClusterComm::SingleServerConnection* connection
= cc->getConnection(server);
cout << "Urgh2" << endl;
cout << "Sender: got answer about connection" << endl;
if (0 == connection) {
op->status = CL_COMM_ERROR;
LOG_ERROR("cannot create connection to server '%s'", server.c_str());
@ -879,19 +943,21 @@ void ClusterCommThread::run () {
{
cout << "initialising client" << endl;
triagens::httpclient::SimpleHttpClient client(
connection->connection,
op->timeout, false);
triagens::httpclient::SimpleHttpClient* client
= new triagens::httpclient::SimpleHttpClient(connection->connection,
op->timeout, false);
// We add this result to the operation struct without acquiring
// a lock, since we know that only we do such a thing:
op->result = client.request(op->reqtype, op->path, op->body,
op->bodyLength, *(op->headerFields));
op->result = client->request(op->reqtype, op->path, op->body,
op->bodyLength, *(op->headerFields));
cout << "Sending msg:" << endl
<< client.getErrorMessage() << endl
<< client->getErrorMessage() << endl
<< op->result->getResultTypeMessage() << endl;
delete client;
// FIXME: handle case that connection was no good and the request
// failed.
}
cc->returnConnection(connection);
}

View File

@ -80,12 +80,14 @@ namespace triagens {
ShardID shardID;
ServerID serverID; // the actual server ID of the sender
ClusterCommOpStatus status;
bool dropped; // this is set to true, if the operation
// is dropped whilst in state CL_COMM_SENDING
// it is then actually dropped when it has
// been sent
// The field result is != 0 ifs status is >= CL_COMM_SENT.
// Note that if status is CL_COMM_TIMEOUT, then the result
// field is a response object that only says "timeout"
bool dropped; // this is set to true, if the operation
// is dropped whilst in state CL_COMM_SENDING
// it is then actually dropped when sent
httpclient::SimpleHttpResult* result;
// the field answer is != 0 iff status is == CL_COMM_RECEIVED
rest::HttpRequest* answer;
@ -107,15 +109,18 @@ namespace triagens {
struct ClusterCommCallback {
// The idea is that one inherits from this class and implements
// the callback.
// the callback. Note however that the callback is called whilst
// holding the lock for the receiving (or indeed also the sending)
// queue! Therefore the operation should be quick.
ClusterCommCallback () {}
virtual ~ClusterCommCallback ();
virtual ~ClusterCommCallback () {};
// Result indicates whether or not the returned result shall be queued.
// If one returns false here, one has to call delete on the
// ClusterCommResult*.
virtual bool operator() (ClusterCommResult*);
// Result indicates whether or not the returned result is already
// fully processed. If so, it is removed from all queues. In this
// case the object is automatically destructed, so that the
// callback must not call delete in any case.
virtual bool operator() (ClusterCommResult*) = 0;
};
typedef double ClusterCommTimeout; // in milliseconds
@ -459,7 +464,9 @@ void ClusterCommRestCallback(string& coordinator, rest::HttpResponse* response);
SingleServerConnection* getConnection(ServerID& serverID);
void returnConnection(SingleServerConnection* singleConnection);
void brokenConnection(SingleServerConnection* singleConnection);
void closeUnusedConnections();
// The following closes all connections that have been unused for
// more than limit seconds
void closeUnusedConnections(double limit);
// The data structures for our internal queues:
@ -491,6 +498,9 @@ void ClusterCommRestCallback(string& coordinator, rest::HttpResponse* response);
// Move an operation from the send to the receive queue:
bool moveFromSendToReceived (OperationID operationID);
// Cleanup all queues:
void cleanupAllQueues();
// Finally, our background communications thread:
ClusterCommThread *_backgroundThread;
}; // end of class ClusterComm

View File

@ -912,6 +912,19 @@ static v8::Handle<v8::Value> JS_ExecuteGlobalContextFunction (v8::Arguments cons
#ifdef TRI_ENABLE_CLUSTER
class CallbackTest : public ClusterCommCallback {
string _msg;
public:
CallbackTest(string msg) : _msg(msg) {}
virtual ~CallbackTest() {}
virtual bool operator() (ClusterCommResult* res) {
cout << "ClusterCommCallback called on operation "
<< res->operationID << "Msg: " << _msg << endl;
return false; // Keep it in the queue
}
};
static v8::Handle<v8::Value> JS_ShardingTest (v8::Arguments const& argv) {
v8::Isolate* isolate;
@ -938,17 +951,20 @@ static v8::Handle<v8::Value> JS_ShardingTest (v8::Arguments const& argv) {
ClusterCommResult const* res =
cc->asyncRequest(clientTransactionId, TRI_NewTickServer(), "shardBlubb",
triagens::rest::HttpRequest::HTTP_REQUEST_GET,
"/_admin/time", NULL, 0, headerFields, 0, 0);
"/_admin/time", NULL, 0, headerFields,
new CallbackTest("Bla"), 0);
if (res == 0) {
TRI_V8_EXCEPTION_MESSAGE(scope, TRI_ERROR_INTERNAL, "couldn't queue async request");
}
LOG_DEBUG("JS_ShardingTest: request has been submitted");
cout << "JS_ShardingTest: request has been submitted" << endl;
OperationID opID = res->operationID;
delete res;
ClusterCommOpStatus status;
// Wait until the request has actually been sent:
while (true) {
res = cc->enquire(opID);
@ -956,20 +972,37 @@ static v8::Handle<v8::Value> JS_ShardingTest (v8::Arguments const& argv) {
TRI_V8_EXCEPTION_MESSAGE(scope, TRI_ERROR_INTERNAL, "couldn't enquire operation");
}
ClusterCommOpStatus status = res->status;
status = res->status;
delete res;
if (status >= CL_COMM_SENT) {
break;
}
LOG_DEBUG("JS_ShardingTest: request not yet sent");
cout << "JS_ShardingTest: request not yet sent" << endl;
usleep(50000);
}
LOG_DEBUG("JS_ShardingTest: request has been sent");
cc->drop("", 0, opID, "");
cout << "JS_ShardingTest: request has been sent, status: " << status << endl;
res = cc->wait("", 0, opID, "");
if (res->status == CL_COMM_RECEIVED) {
cout << "JS_ShardingTest: have answer" << endl;
cout << "HTTP request type: "
<< res->answer->translateMethod(res->answer->requestType()) << endl;
cout << "HTTP headers:" << endl;
map<string,string> headers =res->answer->headers();
map<string,string>::iterator i;
for (i = headers.begin(); i != headers.end(); ++i) {
cout << " " << i->first << ":" << i->second << endl;
}
cout << "HTTP body:" << endl << res->answer->body() << endl;
}
else {
cout << "JS_ShardingTest: error code " << res->status << endl;
}
delete res;
return scope.Close(v8::Undefined());
}

View File

@ -445,8 +445,8 @@ namespace triagens {
if (it == _jobs.end()) {
// job is already deleted.
// do nothing here. the dispatcher will throw away the handler, which
// will also dispose the response
// do nothing here. the dispatcher will throw away the handler,
// which will also dispose the response
}
else {
response = handler->stealResponse();
@ -465,9 +465,12 @@ namespace triagens {
}
}
// if callback is set, execute it now (outside of the wr-lock)
if (ctx != 0 && response != 0 && 0 != callback) {
callback(ctx->getCoordinatorHeader(), response);
if (response != 0) {
// if callback is set, execute it now (outside of the wr-lock)
if (0 != ctx && 0 != callback) {
callback(ctx->getCoordinatorHeader(), response);
}
delete response;
}
}