diff --git a/arangod/Cluster/ApplicationCluster.cpp b/arangod/Cluster/ApplicationCluster.cpp index d3fed81ec3..76dee178ec 100644 --- a/arangod/Cluster/ApplicationCluster.cpp +++ b/arangod/Cluster/ApplicationCluster.cpp @@ -27,6 +27,7 @@ #include "ApplicationCluster.h" #include "Rest/Endpoint.h" +#include "SimpleHttpClient/ConnectionManager.h" #include "Cluster/HeartbeatThread.h" #include "Cluster/ServerState.h" #include "Cluster/ClusterInfo.h" @@ -201,6 +202,9 @@ bool ApplicationCluster::start () { ServerState::instance()->setState(ServerState::STATE_STARTUP); + // initialise ConnectionManager library + httpclient::ConnectionManager::instance()->initialise(); + // the agency about our state AgencyComm comm; comm.sendServerState(); diff --git a/arangod/Cluster/ClusterComm.cpp b/arangod/Cluster/ClusterComm.cpp index 87496301ed..3b1d0c1a8b 100644 --- a/arangod/Cluster/ClusterComm.cpp +++ b/arangod/Cluster/ClusterComm.cpp @@ -31,6 +31,7 @@ #include "Basics/WriteLocker.h" #include "Basics/ConditionLocker.h" #include "Basics/StringUtils.h" +#include "lib/SimpleHttpClient/ConnectionManager.h" #include "VocBase/server.h" @@ -40,18 +41,6 @@ using namespace triagens::arango; // --SECTION-- ClusterComm connection options // ----------------------------------------------------------------------------- -//////////////////////////////////////////////////////////////////////////////// -/// @brief global options for connections -//////////////////////////////////////////////////////////////////////////////// - -ClusterCommOptions ClusterComm::_globalConnectionOptions = { - 15.0, // connectTimeout - 3.0, // requestTimeout - 3, // numRetries - 5.0, // singleRequestTimeout - 0 // sslProtocol -}; - //////////////////////////////////////////////////////////////////////////////// /// @brief global callback for asynchronous REST handler //////////////////////////////////////////////////////////////////////////////// @@ -90,11 +79,6 @@ ClusterComm::~ClusterComm () { delete _backgroundThread; _backgroundThread = 0; cleanupAllQueues(); - WRITE_LOCKER(allLock); - map::iterator i; - for (i = allConnections.begin(); i != allConnections.end(); ++i) { - delete i->second; - } } //////////////////////////////////////////////////////////////////////////////// @@ -133,229 +117,6 @@ OperationID ClusterComm::getOperationID () { return TRI_NewTickServer(); } -//////////////////////////////////////////////////////////////////////////////// -/// @brief destructor for SingleServerConnection class -//////////////////////////////////////////////////////////////////////////////// - -ClusterComm::SingleServerConnection::~SingleServerConnection () { - delete connection; - delete endpoint; - lastUsed = 0; - serverID = ""; -} - -//////////////////////////////////////////////////////////////////////////////// -/// @brief destructor of ServerConnections class -//////////////////////////////////////////////////////////////////////////////// - -ClusterComm::ServerConnections::~ServerConnections () { - vector::iterator i; - WRITE_LOCKER(lock); - - unused.clear(); - for (i = connections.begin();i != connections.end();++i) { - delete *i; - } - connections.clear(); -} - -//////////////////////////////////////////////////////////////////////////////// -/// @brief open or get a previously cached connection to a server -//////////////////////////////////////////////////////////////////////////////// - -ClusterComm::SingleServerConnection* -ClusterComm::getConnection(ServerID& serverID) { - map::iterator i; - ServerConnections* s; - SingleServerConnection* c; - - // First find a connections list: - { - WRITE_LOCKER(allLock); - - i = allConnections.find(serverID); - if (i != allConnections.end()) { - s = i->second; - } - else { - s = new ServerConnections(); - allConnections[serverID] = s; - } - } - - assert(s != 0); - - // Now get an unused one: - { - WRITE_LOCKER(s->lock); - if (!s->unused.empty()) { - c = s->unused.back(); - s->unused.pop_back(); - return c; - } - } - - // We need to open a new one: - string a = ClusterInfo::instance()->getServerEndpoint(serverID); - - if (a == "") { - // Unknown server address, probably not yet connected - return 0; - } - triagens::rest::Endpoint* e = triagens::rest::Endpoint::clientFactory(a); - if (0 == e) { - return 0; - } - triagens::httpclient::GeneralClientConnection* - g = triagens::httpclient::GeneralClientConnection::factory( - e, - _globalConnectionOptions._requestTimeout, - _globalConnectionOptions._connectTimeout, - _globalConnectionOptions._connectRetries, - _globalConnectionOptions._sslProtocol); - if (0 == g) { - delete e; - return 0; - } - c = new SingleServerConnection(g,e,serverID); - if (0 == c) { - delete g; - delete e; - return 0; - } - - // Now put it into our administration: - { - WRITE_LOCKER(s->lock); - s->connections.push_back(c); - } - c->lastUsed = time(0); - return c; -} - -//////////////////////////////////////////////////////////////////////////////// -/// @brief return leased connection to a server -//////////////////////////////////////////////////////////////////////////////// - -void ClusterComm::returnConnection(SingleServerConnection* c) { - map::iterator i; - ServerConnections* s; - - // First find the collections list: - { - WRITE_LOCKER(allLock); - - i = allConnections.find(c->serverID); - if (i != allConnections.end()) { - s = i->second; - } - else { - // How strange! We just destroy the connection in despair! - delete c; - return; - } - } - - c->lastUsed = time(0); - - // Now mark it as unused: - { - WRITE_LOCKER(s->lock); - s->unused.push_back(c); - } -} - -//////////////////////////////////////////////////////////////////////////////// -/// @brief report a leased connection as being broken -//////////////////////////////////////////////////////////////////////////////// - -void ClusterComm::brokenConnection(SingleServerConnection* c) { - map::iterator i; - ServerConnections* s; - - // First find the collections list: - { - WRITE_LOCKER(allLock); - - i = allConnections.find(c->serverID); - if (i != allConnections.end()) { - s = i->second; - } - else { - // How strange! We just destroy the connection in despair! - delete c; - return; - } - } - - // Now find it to get rid of it: - { - WRITE_LOCKER(s->lock); - vector::iterator i; - for (i = s->connections.begin(); i != s->connections.end(); ++i) { - if (*i == c) { - // Got it, now remove it: - s->connections.erase(i); - delete c; - return; - } - } - } - - // How strange! We should have known this one! - delete c; -} - -//////////////////////////////////////////////////////////////////////////////// -/// @brief closes all connections that have been unused for more than -/// limit seconds -//////////////////////////////////////////////////////////////////////////////// - -void ClusterComm::closeUnusedConnections (double limit) { - WRITE_LOCKER(allLock); - map::iterator s; - list::iterator i; - list::iterator prev; - ServerConnections* sc; - time_t t; - bool haveprev; - - t = time(0); - for (s = allConnections.begin(); s != allConnections.end(); ++s) { - sc = s->second; - { - WRITE_LOCKER(sc->lock); - haveprev = false; - for (i = sc->unused.begin(); i != sc->unused.end(); ) { - if (t - (*i)->lastUsed > limit) { - vector::iterator j; - for (j = sc->connections.begin(); j != sc->connections.end(); ++j) { - if (*j == *i) { - sc->connections.erase(j); - break; - } - } - delete (*i); - sc->unused.erase(i); - if (haveprev) { - i = prev; // will be incremented in next iteration - i++; - haveprev = false; - } - else { - i = sc->unused.begin(); - } - } - else { - prev = i; - ++i; - haveprev = true; - } - } - } - } -} - //////////////////////////////////////////////////////////////////////////////// /// @brief submit an HTTP request to a shard asynchronously. /// @@ -438,7 +199,8 @@ ClusterCommResult* ClusterComm::asyncRequest ( } op->headerFields = headerFields; op->callback = callback; - op->endTime = timeout == 0.0 ? now()+24*60*60.0 : now()+timeout; + op->endTime = timeout == 0.0 ? TRI_microtime()+24*60*60.0 + : TRI_microtime()+timeout; ClusterCommResult* res = new ClusterCommResult(); *res = *static_cast(op); @@ -498,7 +260,7 @@ ClusterCommResult* ClusterComm::syncRequest ( body = 0; } - double currentTime = now(); + double currentTime = TRI_microtime(); double endTime = timeout == 0.0 ? currentTime+24*60*60.0 : currentTime+timeout; @@ -521,52 +283,61 @@ ClusterCommResult* ClusterComm::syncRequest ( } // We need a connection to this server: - SingleServerConnection* connection = getConnection(res->serverID); - if (0 == connection) { + string endpoint = ClusterInfo::instance()->getServerEndpoint(res->serverID); + if (endpoint == "") { res->status = CL_COMM_ERROR; - LOG_ERROR("cannot create connection to server '%s'", - res->serverID.c_str()); + LOG_ERROR("cannot find endpoint of server '%s'", res->serverID.c_str()); } else { - if (0 != body) { - LOG_DEBUG("sending %s request to DB server '%s': %s", - triagens::rest::HttpRequest::translateMethod(reqtype).c_str(), - res->serverID.c_str(), body); - } - else { - LOG_DEBUG("sending %s request to DB server '%s'", - triagens::rest::HttpRequest::translateMethod(reqtype).c_str(), - res->serverID.c_str()); - } - triagens::httpclient::SimpleHttpClient* client - = new triagens::httpclient::SimpleHttpClient( - connection->connection, - endTime-currentTime, false); - - res->result = client->request(reqtype, path, body, bodyLength, - headerFields); - if (res->result == 0 || ! res->result->isComplete()) { - brokenConnection(connection); + httpclient::ConnectionManager* cm + = httpclient::ConnectionManager::instance(); + httpclient::ConnectionManager::SingleServerConnection* connection + = cm->leaseConnection(endpoint); + if (0 == connection) { res->status = CL_COMM_ERROR; + LOG_ERROR("cannot create connection to server '%s'", + res->serverID.c_str()); } else { - returnConnection(connection); - if (res->result->wasHttpError()) { - res->status = CL_COMM_ERROR; + if (0 != body) { + LOG_DEBUG("sending %s request to DB server '%s': %s", + triagens::rest::HttpRequest::translateMethod(reqtype).c_str(), + res->serverID.c_str(), body); } - else if (client->getErrorMessage() == - "Request timeout reached") { - res->status = CL_COMM_TIMEOUT; + else { + LOG_DEBUG("sending %s request to DB server '%s'", + triagens::rest::HttpRequest::translateMethod(reqtype).c_str(), + res->serverID.c_str()); } - else if (client->getErrorMessage() != "") { - res->status = CL_COMM_ERROR; + triagens::httpclient::SimpleHttpClient* client + = new triagens::httpclient::SimpleHttpClient( + connection->connection, + endTime-currentTime, false); + client->keepConnectionOnDestruction(true); + + res->result = client->request(reqtype, path, body, bodyLength, + headerFields); + if (! res->result->isComplete()) { + cm->brokenConnection(connection); + if (client->getErrorMessage() == "Request timeout reached") { + res->status = CL_COMM_TIMEOUT; + } + else { + res->status = CL_COMM_ERROR; + } } + else { + cm->returnConnection(connection); + if (res->result->wasHttpError()) { + res->status = CL_COMM_ERROR; + } + } + delete client; + } + if (res->status == CL_COMM_SENDING) { + // Everything was OK + res->status = CL_COMM_SENT; } - delete client; - } - if (res->status == CL_COMM_SENDING) { - // Everything was OK - res->status = CL_COMM_SENT; } return res; } @@ -688,7 +459,7 @@ ClusterCommResult* ClusterComm::wait ( endtime = 1.0e50; // this is the Sankt Nimmerleinstag } else { - endtime = now() + timeout; + endtime = TRI_microtime() + timeout; } if (0 != operationID) { @@ -722,7 +493,7 @@ ClusterCommResult* ClusterComm::wait ( // It is in the receive queue but still waiting, now wait actually } // Here it could either be in the receive or the send queue, let's wait - timeleft = endtime - now(); + timeleft = endtime - TRI_microtime(); if (timeleft <= 0) break; somethingReceived.wait(uint64_t(timeleft * 1000000.0)); } @@ -773,7 +544,7 @@ ClusterCommResult* ClusterComm::wait ( return res; } // Here it could either be in the receive or the send queue, let's wait - timeleft = endtime - now(); + timeleft = endtime - TRI_microtime(); if (timeleft <= 0) break; somethingReceived.wait(uint64_t(timeleft * 1000000.0)); } @@ -882,8 +653,16 @@ void ClusterComm::asyncAnswer (string& coordinatorHeader, coordinatorID = coordinatorHeader.substr(start,pos-start); // Now find the connection to which the request goes from the coordinatorID: - ClusterComm::SingleServerConnection* connection - = getConnection(coordinatorID); + httpclient::ConnectionManager* cm = httpclient::ConnectionManager::instance(); + string endpoint = ClusterInfo::instance()->getServerEndpoint(coordinatorID); + if (endpoint == "") { + LOG_ERROR("asyncAnswer: cannot find endpoint for server '%s'", + coordinatorID.c_str()); + return; + } + + httpclient::ConnectionManager::SingleServerConnection* connection + = cm->leaseConnection(endpoint); if (0 == connection) { LOG_ERROR("asyncAnswer: cannot create connection to server '%s'", coordinatorID.c_str()); @@ -902,24 +681,23 @@ void ClusterComm::asyncAnswer (string& coordinatorHeader, triagens::httpclient::SimpleHttpClient* client = new triagens::httpclient::SimpleHttpClient( - connection->connection, - _globalConnectionOptions._singleRequestTimeout, - false); + connection->connection, 3600.0, false); + client->keepConnectionOnDestruction(true); // We add this result to the operation struct without acquiring // a lock, since we know that only we do such a thing: httpclient::SimpleHttpResult* result = client->request(rest::HttpRequest::HTTP_REQUEST_PUT, "/_api/shard-comm", body, len, headers); - if (client->getErrorMessage() != "") { - brokenConnection(connection); + if (! result->isComplete()) { + cm->brokenConnection(connection); } else { - returnConnection(connection); + cm->returnConnection(connection); } + // We cannot deal with a bad result here, so forget about it in any case. delete result; delete client; - returnConnection(connection); } //////////////////////////////////////////////////////////////////////////////// @@ -1036,8 +814,8 @@ bool ClusterComm::moveFromSendToReceived (OperationID operationID) { } if (op->status == CL_COMM_SENDING) { // Note that in the meantime the status could have changed to - // CL_COMM_ERROR or indeed to CL_COMM_RECEIVED in these cases, we do - // not want to overwrite this result + // CL_COMM_ERROR, CL_COMM_TIMEOUT or indeed to CL_COMM_RECEIVED in + // these cases, we do not want to overwrite this result op->status = CL_COMM_SENT; } received.push_back(op); @@ -1141,7 +919,7 @@ void ClusterCommThread::run () { // sent the request (happens in moveFromSendToReceived). // Have we already reached the timeout? - double currentTime = cc->now(); + double currentTime = TRI_microtime(); if (op->endTime <= currentTime) { op->status = CL_COMM_TIMEOUT; } @@ -1151,53 +929,63 @@ void ClusterCommThread::run () { } else { // We need a connection to this server: - ClusterComm::SingleServerConnection* connection - = cc->getConnection(op->serverID); - if (0 == connection) { + string endpoint + = ClusterInfo::instance()->getServerEndpoint(op->serverID); + if (endpoint == "") { op->status = CL_COMM_ERROR; - LOG_ERROR("cannot create connection to server '%s'", + LOG_ERROR("cannot find endpoint for server '%s'", op->serverID.c_str()); } else { - if (0 != op->body) { - LOG_DEBUG("sending %s request to DB server '%s': %s", - triagens::rest::HttpRequest::translateMethod(op->reqtype) - .c_str(), op->serverID.c_str(), op->body); - } - else { - LOG_DEBUG("sending %s request to DB server '%s'", - triagens::rest::HttpRequest::translateMethod(op->reqtype) - .c_str(), op->serverID.c_str()); - } - - triagens::httpclient::SimpleHttpClient* client - = new triagens::httpclient::SimpleHttpClient( - connection->connection, - op->endTime-currentTime, false); - - // We add this result to the operation struct without acquiring - // a lock, since we know that only we do such a thing: - op->result = client->request(op->reqtype, op->path, op->body, - op->bodyLength, *(op->headerFields)); - - if (op->result == 0 || ! op->result->isComplete()) { - cc->brokenConnection(connection); + httpclient::ConnectionManager* cm + = httpclient::ConnectionManager::instance(); + httpclient::ConnectionManager::SingleServerConnection* connection + = cm->leaseConnection(endpoint); + if (0 == connection) { op->status = CL_COMM_ERROR; + LOG_ERROR("cannot create connection to server '%s'", + op->serverID.c_str()); } else { - cc->returnConnection(connection); - if (op->result->wasHttpError()) { - op->status = CL_COMM_ERROR; + if (0 != op->body) { + LOG_DEBUG("sending %s request to DB server '%s': %s", + triagens::rest::HttpRequest::translateMethod(op->reqtype) + .c_str(), op->serverID.c_str(), op->body); } - else if (client->getErrorMessage() == - "Request timeout reached") { - op->status = CL_COMM_TIMEOUT; + else { + LOG_DEBUG("sending %s request to DB server '%s'", + triagens::rest::HttpRequest::translateMethod(op->reqtype) + .c_str(), op->serverID.c_str()); } - else if (client->getErrorMessage() != "") { - op->status = CL_COMM_ERROR; + + triagens::httpclient::SimpleHttpClient* client + = new triagens::httpclient::SimpleHttpClient( + connection->connection, + op->endTime-currentTime, false); + client->keepConnectionOnDestruction(true); + + // We add this result to the operation struct without acquiring + // a lock, since we know that only we do such a thing: + op->result = client->request(op->reqtype, op->path, op->body, + op->bodyLength, *(op->headerFields)); + + if (! op->result->isComplete()) { + cm->brokenConnection(connection); + if (client->getErrorMessage() == "Request timeout reached") { + op->status = CL_COMM_TIMEOUT; + } + else { + op->status = CL_COMM_ERROR; + } } + else { + cm->returnConnection(connection); + if (op->result->wasHttpError()) { + op->status = CL_COMM_ERROR; + } + } + delete client; } - delete client; } } } @@ -1212,7 +1000,7 @@ void ClusterCommThread::run () { // just now, so we can check on our receive queue to detect timeouts: { - double currentTime = cc->now(); + double currentTime = TRI_microtime(); basics::ConditionLocker locker(&cc->somethingReceived); ClusterComm::QueueIterator q; for (q = cc->received.begin(); q != cc->received.end(); ++q) { diff --git a/arangod/Cluster/ClusterComm.h b/arangod/Cluster/ClusterComm.h index 6c4d8b9b9d..1002a10d66 100644 --- a/arangod/Cluster/ClusterComm.h +++ b/arangod/Cluster/ClusterComm.h @@ -191,17 +191,6 @@ namespace triagens { } }; -//////////////////////////////////////////////////////////////////////////////// -/// @brief options for cluster operations -//////////////////////////////////////////////////////////////////////////////// - - struct ClusterCommOptions { - double _connectTimeout; - double _requestTimeout; - size_t _connectRetries; - double _singleRequestTimeout; - uint32_t _sslProtocol; - }; //////////////////////////////////////////////////////////////////////////////// /// @brief global callback for asynchronous REST handler @@ -353,96 +342,12 @@ void ClusterCommRestCallback(string& coordinator, rest::HttpResponse* response); static ClusterComm* _theinstance; -//////////////////////////////////////////////////////////////////////////////// -/// @brief global options for connections -//////////////////////////////////////////////////////////////////////////////// - - static ClusterCommOptions _globalConnectionOptions; - //////////////////////////////////////////////////////////////////////////////// /// @brief produces an operation ID which is unique in this process //////////////////////////////////////////////////////////////////////////////// static OperationID getOperationID (); -//////////////////////////////////////////////////////////////////////////////// -/// @brief get timestamp -//////////////////////////////////////////////////////////////////////////////// - - static double now () { - struct timeval tv; - gettimeofday(&tv, 0); - - double sec = (double) tv.tv_sec; // seconds - double usc = (double) tv.tv_usec; // microseconds - - return sec + usc / 1000000.0; - } - -//////////////////////////////////////////////////////////////////////////////// -/// @brief class to administrate one connection to a server -//////////////////////////////////////////////////////////////////////////////// - - struct SingleServerConnection { - httpclient::GeneralClientConnection* connection; - rest::Endpoint* endpoint; - time_t lastUsed; - ServerID serverID; - - SingleServerConnection (httpclient::GeneralClientConnection* c, - rest::Endpoint* e, - ServerID s) - : connection(c), endpoint(e), lastUsed(0), serverID(s) {} - ~SingleServerConnection (); - }; - -//////////////////////////////////////////////////////////////////////////////// -/// @brief class to administrate all connections to a server -//////////////////////////////////////////////////////////////////////////////// - - struct ServerConnections { - vector connections; - list unused; - triagens::basics::ReadWriteLock lock; - - ServerConnections () {} - ~ServerConnections (); // closes all connections - }; - -//////////////////////////////////////////////////////////////////////////////// -/// @brief map to store all connections to all servers with corresponding lock -//////////////////////////////////////////////////////////////////////////////// - - // We keep connections to servers open but do not care - // if they are closed. The key is the server ID. - map allConnections; - triagens::basics::ReadWriteLock allLock; - -//////////////////////////////////////////////////////////////////////////////// -/// @brief open or get a previously cached connection to a server -//////////////////////////////////////////////////////////////////////////////// - - SingleServerConnection* getConnection(ServerID& serverID); - -//////////////////////////////////////////////////////////////////////////////// -/// @brief return leased connection to a server -//////////////////////////////////////////////////////////////////////////////// - - void returnConnection(SingleServerConnection* singleConnection); - -//////////////////////////////////////////////////////////////////////////////// -/// @brief report a leased connection as being broken -//////////////////////////////////////////////////////////////////////////////// - - void brokenConnection(SingleServerConnection* singleConnection); - -//////////////////////////////////////////////////////////////////////////////// -/// @brief closes all connections that have been unused for more than -/// limit seconds -//////////////////////////////////////////////////////////////////////////////// - - void closeUnusedConnections(double limit); - //////////////////////////////////////////////////////////////////////////////// /// @brief send queue with lock and index //////////////////////////////////////////////////////////////////////////////// diff --git a/arangod/V8Server/v8-vocbase.cpp b/arangod/V8Server/v8-vocbase.cpp index 2197572852..5d3d97a1a2 100644 --- a/arangod/V8Server/v8-vocbase.cpp +++ b/arangod/V8Server/v8-vocbase.cpp @@ -8159,7 +8159,6 @@ static v8::Handle JS_ListDatabases (v8::Arguments const& argv) { #ifdef TRI_ENABLE_CLUSTER - //////////////////////////////////////////////////////////////////////////////// /// @brief helper function for the agency /// @@ -8167,10 +8166,16 @@ static v8::Handle JS_ListDatabases (v8::Arguments const& argv) { /// name. //////////////////////////////////////////////////////////////////////////////// -static int CreateDatabaseInAgency(string const& place, string const& name) { +static int CreateDatabaseInAgency(string const& place, string const& name, + vector* DBServers) { AgencyComm ac; AgencyCommLocker locker(place,"WRITE"); AgencyCommResult res; + if (0 != DBServers) { + ClusterInfo* ci = ClusterInfo::instance(); + ci->loadDBServers(); // to make sure we know about all of them + *DBServers = ci->getDBServers(); + } res = ac.casValue(place+"/Collections/"+name+"/Lock",string("UNLOCKED"), false, 0.0, 0.0); if (res.successful()) { @@ -8224,17 +8229,22 @@ static v8::Handle JS_CreateDatabase_Coordinator (v8::Arguments const& const string name = TRI_ObjectToString(argv[0]); - ClusterInfo* ci = ClusterInfo::instance(); + //ClusterInfo* ci = ClusterInfo::instance(); ClusterComm* cc = ClusterComm::instance(); AgencyComm ac; int ourerrno = TRI_ERROR_NO_ERROR; - ourerrno = CreateDatabaseInAgency("Target",name); + ourerrno = CreateDatabaseInAgency("Target",name,0); if (ourerrno == TRI_ERROR_NO_ERROR) { // everything OK in /Target - ourerrno = CreateDatabaseInAgency("Plan",name); + vector DBServers; + // We will get the list of DBServers whilst holding the lock to + // modify "/Plan/Collections". Therefore, everybody who is on the + // list will be told, everybody who is starting later will see the + // entry in "/Plan/Collections/..." and will create the database on + // startup. + ourerrno = CreateDatabaseInAgency("Plan",name,&DBServers); if (ourerrno == TRI_ERROR_NO_ERROR) { - vector DBServers = ci->getDBServers(); vector::iterator it; // build request to be sent to all servers @@ -8253,44 +8263,37 @@ static v8::Handle JS_CreateDatabase_Coordinator (v8::Arguments const& jsonstr.size(), new map, 0, 0.0); delete res; } - cout << "CDB: Have sent " << DBServers.size() << " requests." << endl; unsigned int done = 0; while (done < DBServers.size()) { res = cc->wait("", coordTransactionID, 0, "", 0.0); if (res->status == CL_COMM_RECEIVED) { if (res->answer_code == triagens::rest::HttpResponse::OK) { - cout << "CDB: answer OK" << endl; done++; delete res; } else if (res->answer_code == triagens::rest::HttpResponse::CONFLICT) { - cout << "CDB: answer CONFLICT" << endl; ourerrno = TRI_ERROR_ARANGO_DUPLICATE_NAME; delete res; break; } else { - cout << "CDB: answer BAD" << endl; ourerrno = TRI_ERROR_INTERNAL; delete res; break; } } else { - cout << "CDB: CL_COMM_ERROR" << endl; delete res; break; } } if (done == DBServers.size()) { - ourerrno = CreateDatabaseInAgency("Current",name); + ourerrno = CreateDatabaseInAgency("Current",name,0); if (ourerrno == TRI_ERROR_NO_ERROR) { - cout << "CDB: All done" << endl; return scope.Close(v8::True()); } } cc->drop( "CreateDatabase", coordTransactionID, 0, "" ); - cout << "CDB: Aborting..." << endl; for (it = DBServers.begin(); it != DBServers.end(); ++it) { res = cc->asyncRequest("CreateDB", coordTransactionID, "server:"+*it, @@ -8302,7 +8305,6 @@ static v8::Handle JS_CreateDatabase_Coordinator (v8::Arguments const& done = 0; while (done < DBServers.size()) { res = cc->wait("", coordTransactionID, 0, "", 0.0); - cout << "CDB: Got answer" << endl; delete res; done++; } @@ -8470,6 +8472,66 @@ static v8::Handle JS_CreateDatabase (v8::Arguments const& argv) { return scope.Close(v8::True()); } +//////////////////////////////////////////////////////////////////////////////// +/// @brief drop a database, case of a coordinator in a cluster +//////////////////////////////////////////////////////////////////////////////// + +#ifdef TRI_ENABLE_CLUSTER + +static v8::Handle JS_DropDatabase_Coordinator (v8::Arguments const& argv) { + v8::HandleScope scope; + + // Arguments are already checked, there is exactly one argument + + const string name = TRI_ObjectToString(argv[0]); + + ClusterInfo* ci = ClusterInfo::instance(); + ClusterComm* cc = ClusterComm::instance(); + AgencyComm ac; + AgencyCommResult acres; + + int ourerrno = TRI_ERROR_NO_ERROR; + + { + AgencyCommLocker locker("Target","WRITE"); + // FIXME: need to check that locking worked! + + // Now nobody can create or remove a database, so we can check that + // the one we want to drop does indeed exist: + acres = ac.getValues("Current/Collections/"+name+"/Lock", false); + if (!acres.successful()) { + TRI_V8_EXCEPTION(scope, TRI_ERROR_ARANGO_DATABASE_NOT_FOUND); + } + } + + // Now let's lock it. + // We cannot use a locker here, because we want to remove all of + // Current/Collections/ before we are done and we must not + // unlock the Lock after that. + if (!ac.lockWrite("Current/Collections/"+name, 24*3600.0, 24*3600.0)) { + TRI_V8_EXCEPTION(scope, TRI_ERROR_INTERNAL); + } + // res = ac.getValues("Current/Collections/"+name+"/Lock, false); + + // If this fails or the DB does not exist, return an error + // Remove entry Plan/Collections/ using Plan/Lock + // get list of DBServers during the lock + // (from now on new DBServers will no longer create a database) + // this is the point of no return + // tell all DBServers to drop database + // note errors, but there is nothing we can do about it if things go wrong + // only count and reports the servers with errors + // Remove entry Target/Collections/, use Target/Lock + // Remove entry Current/Collections/ using Current/Lock + // (from now on coordinators will understand that the database is gone + // Release Plan/Lock + // Report error + + return scope.Close(v8::True()); +} + +#endif + //////////////////////////////////////////////////////////////////////////////// /// @brief drop an existing database /// @@ -8502,6 +8564,13 @@ static v8::Handle JS_DropDatabase (v8::Arguments const& argv) { TRI_V8_EXCEPTION(scope, TRI_ERROR_ARANGO_USE_SYSTEM_DATABASE); } +#ifdef TRI_ENABLE_CLUSTER + // If we are a coordinator in a cluster, we have to behave differently: + if (ServerState::instance()->isCoordinator()) { + return JS_DropDatabase_Coordinator(argv); + } +#endif + const string name = TRI_ObjectToString(argv[0]); TRI_v8_global_t* v8g = (TRI_v8_global_t*) v8::Isolate::GetCurrent()->GetData(); diff --git a/lib/Makefile.files b/lib/Makefile.files index f772fa331d..46e3fc9cba 100644 --- a/lib/Makefile.files +++ b/lib/Makefile.files @@ -106,7 +106,8 @@ lib_libarango_client_a_SOURCES = \ lib/SimpleHttpClient/ClientConnection.cpp \ lib/SimpleHttpClient/SslClientConnection.cpp \ lib/SimpleHttpClient/SimpleHttpClient.cpp \ - lib/SimpleHttpClient/SimpleHttpResult.cpp + lib/SimpleHttpClient/SimpleHttpResult.cpp \ + lib/SimpleHttpClient/ConnectionManager.cpp ################################################################################ ### @brief library "libarango.a", front-end part @@ -162,7 +163,8 @@ lib_libarango_v8_a_SOURCES = \ lib/SimpleHttpClient/ClientConnection.cpp \ lib/SimpleHttpClient/SslClientConnection.cpp \ lib/SimpleHttpClient/SimpleHttpClient.cpp \ - lib/SimpleHttpClient/SimpleHttpResult.cpp + lib/SimpleHttpClient/SimpleHttpResult.cpp \ + lib/SimpleHttpClient/ConnectionManager.cpp ################################################################################ diff --git a/lib/SimpleHttpClient/GeneralClientConnection.h b/lib/SimpleHttpClient/GeneralClientConnection.h index 6991405388..c642d5d6ce 100644 --- a/lib/SimpleHttpClient/GeneralClientConnection.h +++ b/lib/SimpleHttpClient/GeneralClientConnection.h @@ -50,11 +50,6 @@ namespace triagens { // --SECTION-- typedefs // ----------------------------------------------------------------------------- -//////////////////////////////////////////////////////////////////////////////// -/// @addtogroup httpclient -/// @{ -//////////////////////////////////////////////////////////////////////////////// - protected: //////////////////////////////////////////////////////////////////////////////// @@ -63,19 +58,10 @@ namespace triagens { enum { READBUFFER_SIZE = 8192 }; -//////////////////////////////////////////////////////////////////////////////// -/// @} -//////////////////////////////////////////////////////////////////////////////// - // ----------------------------------------------------------------------------- // --SECTION-- constructors / destructors // ----------------------------------------------------------------------------- -//////////////////////////////////////////////////////////////////////////////// -/// @addtogroup httpclient -/// @{ -//////////////////////////////////////////////////////////////////////////////// - private: GeneralClientConnection (GeneralClientConnection const&); @@ -98,19 +84,10 @@ namespace triagens { virtual ~GeneralClientConnection (); -//////////////////////////////////////////////////////////////////////////////// -/// @} -//////////////////////////////////////////////////////////////////////////////// - // ----------------------------------------------------------------------------- // --SECTION-- public methods // ----------------------------------------------------------------------------- -//////////////////////////////////////////////////////////////////////////////// -/// @addtogroup httpclient -/// @{ -//////////////////////////////////////////////////////////////////////////////// - public: //////////////////////////////////////////////////////////////////////////////// @@ -179,19 +156,10 @@ namespace triagens { bool handleRead (double, triagens::basics::StringBuffer&); -//////////////////////////////////////////////////////////////////////////////// -/// @} -//////////////////////////////////////////////////////////////////////////////// - // ----------------------------------------------------------------------------- // --SECTION-- protected virtual methods // ----------------------------------------------------------------------------- -//////////////////////////////////////////////////////////////////////////////// -/// @addtogroup httpclient -/// @{ -//////////////////////////////////////////////////////////////////////////////// - protected: //////////////////////////////////////////////////////////////////////////////// @@ -230,19 +198,10 @@ namespace triagens { virtual bool readable () = 0; -//////////////////////////////////////////////////////////////////////////////// -/// @} -//////////////////////////////////////////////////////////////////////////////// - // ----------------------------------------------------------------------------- // --SECTION-- protected variables // ----------------------------------------------------------------------------- -//////////////////////////////////////////////////////////////////////////////// -/// @addtogroup httpclient -/// @{ -//////////////////////////////////////////////////////////////////////////////// - protected: //////////////////////////////////////////////////////////////////////////////// @@ -281,10 +240,6 @@ namespace triagens { bool _isConnected; -//////////////////////////////////////////////////////////////////////////////// -/// @} -//////////////////////////////////////////////////////////////////////////////// - }; } }