mirror of https://gitee.com/bigwinds/arangodb
Separate out ConnectionManager from ClusterComm.
This commit is contained in:
parent
f50452259e
commit
fc96df2244
|
@ -27,6 +27,7 @@
|
|||
|
||||
#include "ApplicationCluster.h"
|
||||
#include "Rest/Endpoint.h"
|
||||
#include "SimpleHttpClient/ConnectionManager.h"
|
||||
#include "Cluster/HeartbeatThread.h"
|
||||
#include "Cluster/ServerState.h"
|
||||
#include "Cluster/ClusterInfo.h"
|
||||
|
@ -201,6 +202,9 @@ bool ApplicationCluster::start () {
|
|||
|
||||
ServerState::instance()->setState(ServerState::STATE_STARTUP);
|
||||
|
||||
// initialise ConnectionManager library
|
||||
httpclient::ConnectionManager::instance()->initialise();
|
||||
|
||||
// the agency about our state
|
||||
AgencyComm comm;
|
||||
comm.sendServerState();
|
||||
|
|
|
@ -31,6 +31,7 @@
|
|||
#include "Basics/WriteLocker.h"
|
||||
#include "Basics/ConditionLocker.h"
|
||||
#include "Basics/StringUtils.h"
|
||||
#include "lib/SimpleHttpClient/ConnectionManager.h"
|
||||
|
||||
#include "VocBase/server.h"
|
||||
|
||||
|
@ -40,18 +41,6 @@ using namespace triagens::arango;
|
|||
// --SECTION-- ClusterComm connection options
|
||||
// -----------------------------------------------------------------------------
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief global options for connections
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
ClusterCommOptions ClusterComm::_globalConnectionOptions = {
|
||||
15.0, // connectTimeout
|
||||
3.0, // requestTimeout
|
||||
3, // numRetries
|
||||
5.0, // singleRequestTimeout
|
||||
0 // sslProtocol
|
||||
};
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief global callback for asynchronous REST handler
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -90,11 +79,6 @@ ClusterComm::~ClusterComm () {
|
|||
delete _backgroundThread;
|
||||
_backgroundThread = 0;
|
||||
cleanupAllQueues();
|
||||
WRITE_LOCKER(allLock);
|
||||
map<ServerID,ServerConnections*>::iterator i;
|
||||
for (i = allConnections.begin(); i != allConnections.end(); ++i) {
|
||||
delete i->second;
|
||||
}
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -133,229 +117,6 @@ OperationID ClusterComm::getOperationID () {
|
|||
return TRI_NewTickServer();
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief destructor for SingleServerConnection class
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
ClusterComm::SingleServerConnection::~SingleServerConnection () {
|
||||
delete connection;
|
||||
delete endpoint;
|
||||
lastUsed = 0;
|
||||
serverID = "";
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief destructor of ServerConnections class
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
ClusterComm::ServerConnections::~ServerConnections () {
|
||||
vector<SingleServerConnection*>::iterator i;
|
||||
WRITE_LOCKER(lock);
|
||||
|
||||
unused.clear();
|
||||
for (i = connections.begin();i != connections.end();++i) {
|
||||
delete *i;
|
||||
}
|
||||
connections.clear();
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief open or get a previously cached connection to a server
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
ClusterComm::SingleServerConnection*
|
||||
ClusterComm::getConnection(ServerID& serverID) {
|
||||
map<ServerID,ServerConnections*>::iterator i;
|
||||
ServerConnections* s;
|
||||
SingleServerConnection* c;
|
||||
|
||||
// First find a connections list:
|
||||
{
|
||||
WRITE_LOCKER(allLock);
|
||||
|
||||
i = allConnections.find(serverID);
|
||||
if (i != allConnections.end()) {
|
||||
s = i->second;
|
||||
}
|
||||
else {
|
||||
s = new ServerConnections();
|
||||
allConnections[serverID] = s;
|
||||
}
|
||||
}
|
||||
|
||||
assert(s != 0);
|
||||
|
||||
// Now get an unused one:
|
||||
{
|
||||
WRITE_LOCKER(s->lock);
|
||||
if (!s->unused.empty()) {
|
||||
c = s->unused.back();
|
||||
s->unused.pop_back();
|
||||
return c;
|
||||
}
|
||||
}
|
||||
|
||||
// We need to open a new one:
|
||||
string a = ClusterInfo::instance()->getServerEndpoint(serverID);
|
||||
|
||||
if (a == "") {
|
||||
// Unknown server address, probably not yet connected
|
||||
return 0;
|
||||
}
|
||||
triagens::rest::Endpoint* e = triagens::rest::Endpoint::clientFactory(a);
|
||||
if (0 == e) {
|
||||
return 0;
|
||||
}
|
||||
triagens::httpclient::GeneralClientConnection*
|
||||
g = triagens::httpclient::GeneralClientConnection::factory(
|
||||
e,
|
||||
_globalConnectionOptions._requestTimeout,
|
||||
_globalConnectionOptions._connectTimeout,
|
||||
_globalConnectionOptions._connectRetries,
|
||||
_globalConnectionOptions._sslProtocol);
|
||||
if (0 == g) {
|
||||
delete e;
|
||||
return 0;
|
||||
}
|
||||
c = new SingleServerConnection(g,e,serverID);
|
||||
if (0 == c) {
|
||||
delete g;
|
||||
delete e;
|
||||
return 0;
|
||||
}
|
||||
|
||||
// Now put it into our administration:
|
||||
{
|
||||
WRITE_LOCKER(s->lock);
|
||||
s->connections.push_back(c);
|
||||
}
|
||||
c->lastUsed = time(0);
|
||||
return c;
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief return leased connection to a server
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
void ClusterComm::returnConnection(SingleServerConnection* c) {
|
||||
map<ServerID,ServerConnections*>::iterator i;
|
||||
ServerConnections* s;
|
||||
|
||||
// First find the collections list:
|
||||
{
|
||||
WRITE_LOCKER(allLock);
|
||||
|
||||
i = allConnections.find(c->serverID);
|
||||
if (i != allConnections.end()) {
|
||||
s = i->second;
|
||||
}
|
||||
else {
|
||||
// How strange! We just destroy the connection in despair!
|
||||
delete c;
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
c->lastUsed = time(0);
|
||||
|
||||
// Now mark it as unused:
|
||||
{
|
||||
WRITE_LOCKER(s->lock);
|
||||
s->unused.push_back(c);
|
||||
}
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief report a leased connection as being broken
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
void ClusterComm::brokenConnection(SingleServerConnection* c) {
|
||||
map<ServerID,ServerConnections*>::iterator i;
|
||||
ServerConnections* s;
|
||||
|
||||
// First find the collections list:
|
||||
{
|
||||
WRITE_LOCKER(allLock);
|
||||
|
||||
i = allConnections.find(c->serverID);
|
||||
if (i != allConnections.end()) {
|
||||
s = i->second;
|
||||
}
|
||||
else {
|
||||
// How strange! We just destroy the connection in despair!
|
||||
delete c;
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// Now find it to get rid of it:
|
||||
{
|
||||
WRITE_LOCKER(s->lock);
|
||||
vector<SingleServerConnection*>::iterator i;
|
||||
for (i = s->connections.begin(); i != s->connections.end(); ++i) {
|
||||
if (*i == c) {
|
||||
// Got it, now remove it:
|
||||
s->connections.erase(i);
|
||||
delete c;
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// How strange! We should have known this one!
|
||||
delete c;
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief closes all connections that have been unused for more than
|
||||
/// limit seconds
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
void ClusterComm::closeUnusedConnections (double limit) {
|
||||
WRITE_LOCKER(allLock);
|
||||
map<ServerID,ServerConnections*>::iterator s;
|
||||
list<SingleServerConnection*>::iterator i;
|
||||
list<SingleServerConnection*>::iterator prev;
|
||||
ServerConnections* sc;
|
||||
time_t t;
|
||||
bool haveprev;
|
||||
|
||||
t = time(0);
|
||||
for (s = allConnections.begin(); s != allConnections.end(); ++s) {
|
||||
sc = s->second;
|
||||
{
|
||||
WRITE_LOCKER(sc->lock);
|
||||
haveprev = false;
|
||||
for (i = sc->unused.begin(); i != sc->unused.end(); ) {
|
||||
if (t - (*i)->lastUsed > limit) {
|
||||
vector<SingleServerConnection*>::iterator j;
|
||||
for (j = sc->connections.begin(); j != sc->connections.end(); ++j) {
|
||||
if (*j == *i) {
|
||||
sc->connections.erase(j);
|
||||
break;
|
||||
}
|
||||
}
|
||||
delete (*i);
|
||||
sc->unused.erase(i);
|
||||
if (haveprev) {
|
||||
i = prev; // will be incremented in next iteration
|
||||
i++;
|
||||
haveprev = false;
|
||||
}
|
||||
else {
|
||||
i = sc->unused.begin();
|
||||
}
|
||||
}
|
||||
else {
|
||||
prev = i;
|
||||
++i;
|
||||
haveprev = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief submit an HTTP request to a shard asynchronously.
|
||||
///
|
||||
|
@ -438,7 +199,8 @@ ClusterCommResult* ClusterComm::asyncRequest (
|
|||
}
|
||||
op->headerFields = headerFields;
|
||||
op->callback = callback;
|
||||
op->endTime = timeout == 0.0 ? now()+24*60*60.0 : now()+timeout;
|
||||
op->endTime = timeout == 0.0 ? TRI_microtime()+24*60*60.0
|
||||
: TRI_microtime()+timeout;
|
||||
|
||||
ClusterCommResult* res = new ClusterCommResult();
|
||||
*res = *static_cast<ClusterCommResult*>(op);
|
||||
|
@ -498,7 +260,7 @@ ClusterCommResult* ClusterComm::syncRequest (
|
|||
body = 0;
|
||||
}
|
||||
|
||||
double currentTime = now();
|
||||
double currentTime = TRI_microtime();
|
||||
double endTime = timeout == 0.0 ? currentTime+24*60*60.0
|
||||
: currentTime+timeout;
|
||||
|
||||
|
@ -521,52 +283,61 @@ ClusterCommResult* ClusterComm::syncRequest (
|
|||
}
|
||||
|
||||
// We need a connection to this server:
|
||||
SingleServerConnection* connection = getConnection(res->serverID);
|
||||
if (0 == connection) {
|
||||
string endpoint = ClusterInfo::instance()->getServerEndpoint(res->serverID);
|
||||
if (endpoint == "") {
|
||||
res->status = CL_COMM_ERROR;
|
||||
LOG_ERROR("cannot create connection to server '%s'",
|
||||
res->serverID.c_str());
|
||||
LOG_ERROR("cannot find endpoint of server '%s'", res->serverID.c_str());
|
||||
}
|
||||
else {
|
||||
if (0 != body) {
|
||||
LOG_DEBUG("sending %s request to DB server '%s': %s",
|
||||
triagens::rest::HttpRequest::translateMethod(reqtype).c_str(),
|
||||
res->serverID.c_str(), body);
|
||||
}
|
||||
else {
|
||||
LOG_DEBUG("sending %s request to DB server '%s'",
|
||||
triagens::rest::HttpRequest::translateMethod(reqtype).c_str(),
|
||||
res->serverID.c_str());
|
||||
}
|
||||
triagens::httpclient::SimpleHttpClient* client
|
||||
= new triagens::httpclient::SimpleHttpClient(
|
||||
connection->connection,
|
||||
endTime-currentTime, false);
|
||||
|
||||
res->result = client->request(reqtype, path, body, bodyLength,
|
||||
headerFields);
|
||||
if (res->result == 0 || ! res->result->isComplete()) {
|
||||
brokenConnection(connection);
|
||||
httpclient::ConnectionManager* cm
|
||||
= httpclient::ConnectionManager::instance();
|
||||
httpclient::ConnectionManager::SingleServerConnection* connection
|
||||
= cm->leaseConnection(endpoint);
|
||||
if (0 == connection) {
|
||||
res->status = CL_COMM_ERROR;
|
||||
LOG_ERROR("cannot create connection to server '%s'",
|
||||
res->serverID.c_str());
|
||||
}
|
||||
else {
|
||||
returnConnection(connection);
|
||||
if (res->result->wasHttpError()) {
|
||||
res->status = CL_COMM_ERROR;
|
||||
if (0 != body) {
|
||||
LOG_DEBUG("sending %s request to DB server '%s': %s",
|
||||
triagens::rest::HttpRequest::translateMethod(reqtype).c_str(),
|
||||
res->serverID.c_str(), body);
|
||||
}
|
||||
else if (client->getErrorMessage() ==
|
||||
"Request timeout reached") {
|
||||
res->status = CL_COMM_TIMEOUT;
|
||||
else {
|
||||
LOG_DEBUG("sending %s request to DB server '%s'",
|
||||
triagens::rest::HttpRequest::translateMethod(reqtype).c_str(),
|
||||
res->serverID.c_str());
|
||||
}
|
||||
else if (client->getErrorMessage() != "") {
|
||||
res->status = CL_COMM_ERROR;
|
||||
triagens::httpclient::SimpleHttpClient* client
|
||||
= new triagens::httpclient::SimpleHttpClient(
|
||||
connection->connection,
|
||||
endTime-currentTime, false);
|
||||
client->keepConnectionOnDestruction(true);
|
||||
|
||||
res->result = client->request(reqtype, path, body, bodyLength,
|
||||
headerFields);
|
||||
if (! res->result->isComplete()) {
|
||||
cm->brokenConnection(connection);
|
||||
if (client->getErrorMessage() == "Request timeout reached") {
|
||||
res->status = CL_COMM_TIMEOUT;
|
||||
}
|
||||
else {
|
||||
res->status = CL_COMM_ERROR;
|
||||
}
|
||||
}
|
||||
else {
|
||||
cm->returnConnection(connection);
|
||||
if (res->result->wasHttpError()) {
|
||||
res->status = CL_COMM_ERROR;
|
||||
}
|
||||
}
|
||||
delete client;
|
||||
}
|
||||
if (res->status == CL_COMM_SENDING) {
|
||||
// Everything was OK
|
||||
res->status = CL_COMM_SENT;
|
||||
}
|
||||
delete client;
|
||||
}
|
||||
if (res->status == CL_COMM_SENDING) {
|
||||
// Everything was OK
|
||||
res->status = CL_COMM_SENT;
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
@ -688,7 +459,7 @@ ClusterCommResult* ClusterComm::wait (
|
|||
endtime = 1.0e50; // this is the Sankt Nimmerleinstag
|
||||
}
|
||||
else {
|
||||
endtime = now() + timeout;
|
||||
endtime = TRI_microtime() + timeout;
|
||||
}
|
||||
|
||||
if (0 != operationID) {
|
||||
|
@ -722,7 +493,7 @@ ClusterCommResult* ClusterComm::wait (
|
|||
// It is in the receive queue but still waiting, now wait actually
|
||||
}
|
||||
// Here it could either be in the receive or the send queue, let's wait
|
||||
timeleft = endtime - now();
|
||||
timeleft = endtime - TRI_microtime();
|
||||
if (timeleft <= 0) break;
|
||||
somethingReceived.wait(uint64_t(timeleft * 1000000.0));
|
||||
}
|
||||
|
@ -773,7 +544,7 @@ ClusterCommResult* ClusterComm::wait (
|
|||
return res;
|
||||
}
|
||||
// Here it could either be in the receive or the send queue, let's wait
|
||||
timeleft = endtime - now();
|
||||
timeleft = endtime - TRI_microtime();
|
||||
if (timeleft <= 0) break;
|
||||
somethingReceived.wait(uint64_t(timeleft * 1000000.0));
|
||||
}
|
||||
|
@ -882,8 +653,16 @@ void ClusterComm::asyncAnswer (string& coordinatorHeader,
|
|||
coordinatorID = coordinatorHeader.substr(start,pos-start);
|
||||
|
||||
// Now find the connection to which the request goes from the coordinatorID:
|
||||
ClusterComm::SingleServerConnection* connection
|
||||
= getConnection(coordinatorID);
|
||||
httpclient::ConnectionManager* cm = httpclient::ConnectionManager::instance();
|
||||
string endpoint = ClusterInfo::instance()->getServerEndpoint(coordinatorID);
|
||||
if (endpoint == "") {
|
||||
LOG_ERROR("asyncAnswer: cannot find endpoint for server '%s'",
|
||||
coordinatorID.c_str());
|
||||
return;
|
||||
}
|
||||
|
||||
httpclient::ConnectionManager::SingleServerConnection* connection
|
||||
= cm->leaseConnection(endpoint);
|
||||
if (0 == connection) {
|
||||
LOG_ERROR("asyncAnswer: cannot create connection to server '%s'",
|
||||
coordinatorID.c_str());
|
||||
|
@ -902,24 +681,23 @@ void ClusterComm::asyncAnswer (string& coordinatorHeader,
|
|||
|
||||
triagens::httpclient::SimpleHttpClient* client
|
||||
= new triagens::httpclient::SimpleHttpClient(
|
||||
connection->connection,
|
||||
_globalConnectionOptions._singleRequestTimeout,
|
||||
false);
|
||||
connection->connection, 3600.0, false);
|
||||
client->keepConnectionOnDestruction(true);
|
||||
|
||||
// We add this result to the operation struct without acquiring
|
||||
// a lock, since we know that only we do such a thing:
|
||||
httpclient::SimpleHttpResult* result =
|
||||
client->request(rest::HttpRequest::HTTP_REQUEST_PUT,
|
||||
"/_api/shard-comm", body, len, headers);
|
||||
if (client->getErrorMessage() != "") {
|
||||
brokenConnection(connection);
|
||||
if (! result->isComplete()) {
|
||||
cm->brokenConnection(connection);
|
||||
}
|
||||
else {
|
||||
returnConnection(connection);
|
||||
cm->returnConnection(connection);
|
||||
}
|
||||
// We cannot deal with a bad result here, so forget about it in any case.
|
||||
delete result;
|
||||
delete client;
|
||||
returnConnection(connection);
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -1036,8 +814,8 @@ bool ClusterComm::moveFromSendToReceived (OperationID operationID) {
|
|||
}
|
||||
if (op->status == CL_COMM_SENDING) {
|
||||
// Note that in the meantime the status could have changed to
|
||||
// CL_COMM_ERROR or indeed to CL_COMM_RECEIVED in these cases, we do
|
||||
// not want to overwrite this result
|
||||
// CL_COMM_ERROR, CL_COMM_TIMEOUT or indeed to CL_COMM_RECEIVED in
|
||||
// these cases, we do not want to overwrite this result
|
||||
op->status = CL_COMM_SENT;
|
||||
}
|
||||
received.push_back(op);
|
||||
|
@ -1141,7 +919,7 @@ void ClusterCommThread::run () {
|
|||
// sent the request (happens in moveFromSendToReceived).
|
||||
|
||||
// Have we already reached the timeout?
|
||||
double currentTime = cc->now();
|
||||
double currentTime = TRI_microtime();
|
||||
if (op->endTime <= currentTime) {
|
||||
op->status = CL_COMM_TIMEOUT;
|
||||
}
|
||||
|
@ -1151,53 +929,63 @@ void ClusterCommThread::run () {
|
|||
}
|
||||
else {
|
||||
// We need a connection to this server:
|
||||
ClusterComm::SingleServerConnection* connection
|
||||
= cc->getConnection(op->serverID);
|
||||
if (0 == connection) {
|
||||
string endpoint
|
||||
= ClusterInfo::instance()->getServerEndpoint(op->serverID);
|
||||
if (endpoint == "") {
|
||||
op->status = CL_COMM_ERROR;
|
||||
LOG_ERROR("cannot create connection to server '%s'",
|
||||
LOG_ERROR("cannot find endpoint for server '%s'",
|
||||
op->serverID.c_str());
|
||||
}
|
||||
else {
|
||||
if (0 != op->body) {
|
||||
LOG_DEBUG("sending %s request to DB server '%s': %s",
|
||||
triagens::rest::HttpRequest::translateMethod(op->reqtype)
|
||||
.c_str(), op->serverID.c_str(), op->body);
|
||||
}
|
||||
else {
|
||||
LOG_DEBUG("sending %s request to DB server '%s'",
|
||||
triagens::rest::HttpRequest::translateMethod(op->reqtype)
|
||||
.c_str(), op->serverID.c_str());
|
||||
}
|
||||
|
||||
triagens::httpclient::SimpleHttpClient* client
|
||||
= new triagens::httpclient::SimpleHttpClient(
|
||||
connection->connection,
|
||||
op->endTime-currentTime, false);
|
||||
|
||||
// We add this result to the operation struct without acquiring
|
||||
// a lock, since we know that only we do such a thing:
|
||||
op->result = client->request(op->reqtype, op->path, op->body,
|
||||
op->bodyLength, *(op->headerFields));
|
||||
|
||||
if (op->result == 0 || ! op->result->isComplete()) {
|
||||
cc->brokenConnection(connection);
|
||||
httpclient::ConnectionManager* cm
|
||||
= httpclient::ConnectionManager::instance();
|
||||
httpclient::ConnectionManager::SingleServerConnection* connection
|
||||
= cm->leaseConnection(endpoint);
|
||||
if (0 == connection) {
|
||||
op->status = CL_COMM_ERROR;
|
||||
LOG_ERROR("cannot create connection to server '%s'",
|
||||
op->serverID.c_str());
|
||||
}
|
||||
else {
|
||||
cc->returnConnection(connection);
|
||||
if (op->result->wasHttpError()) {
|
||||
op->status = CL_COMM_ERROR;
|
||||
if (0 != op->body) {
|
||||
LOG_DEBUG("sending %s request to DB server '%s': %s",
|
||||
triagens::rest::HttpRequest::translateMethod(op->reqtype)
|
||||
.c_str(), op->serverID.c_str(), op->body);
|
||||
}
|
||||
else if (client->getErrorMessage() ==
|
||||
"Request timeout reached") {
|
||||
op->status = CL_COMM_TIMEOUT;
|
||||
else {
|
||||
LOG_DEBUG("sending %s request to DB server '%s'",
|
||||
triagens::rest::HttpRequest::translateMethod(op->reqtype)
|
||||
.c_str(), op->serverID.c_str());
|
||||
}
|
||||
else if (client->getErrorMessage() != "") {
|
||||
op->status = CL_COMM_ERROR;
|
||||
|
||||
triagens::httpclient::SimpleHttpClient* client
|
||||
= new triagens::httpclient::SimpleHttpClient(
|
||||
connection->connection,
|
||||
op->endTime-currentTime, false);
|
||||
client->keepConnectionOnDestruction(true);
|
||||
|
||||
// We add this result to the operation struct without acquiring
|
||||
// a lock, since we know that only we do such a thing:
|
||||
op->result = client->request(op->reqtype, op->path, op->body,
|
||||
op->bodyLength, *(op->headerFields));
|
||||
|
||||
if (! op->result->isComplete()) {
|
||||
cm->brokenConnection(connection);
|
||||
if (client->getErrorMessage() == "Request timeout reached") {
|
||||
op->status = CL_COMM_TIMEOUT;
|
||||
}
|
||||
else {
|
||||
op->status = CL_COMM_ERROR;
|
||||
}
|
||||
}
|
||||
else {
|
||||
cm->returnConnection(connection);
|
||||
if (op->result->wasHttpError()) {
|
||||
op->status = CL_COMM_ERROR;
|
||||
}
|
||||
}
|
||||
delete client;
|
||||
}
|
||||
delete client;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1212,7 +1000,7 @@ void ClusterCommThread::run () {
|
|||
// just now, so we can check on our receive queue to detect timeouts:
|
||||
|
||||
{
|
||||
double currentTime = cc->now();
|
||||
double currentTime = TRI_microtime();
|
||||
basics::ConditionLocker locker(&cc->somethingReceived);
|
||||
ClusterComm::QueueIterator q;
|
||||
for (q = cc->received.begin(); q != cc->received.end(); ++q) {
|
||||
|
|
|
@ -191,17 +191,6 @@ namespace triagens {
|
|||
}
|
||||
};
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief options for cluster operations
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
struct ClusterCommOptions {
|
||||
double _connectTimeout;
|
||||
double _requestTimeout;
|
||||
size_t _connectRetries;
|
||||
double _singleRequestTimeout;
|
||||
uint32_t _sslProtocol;
|
||||
};
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief global callback for asynchronous REST handler
|
||||
|
@ -353,96 +342,12 @@ void ClusterCommRestCallback(string& coordinator, rest::HttpResponse* response);
|
|||
|
||||
static ClusterComm* _theinstance;
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief global options for connections
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
static ClusterCommOptions _globalConnectionOptions;
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief produces an operation ID which is unique in this process
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
static OperationID getOperationID ();
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief get timestamp
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
static double now () {
|
||||
struct timeval tv;
|
||||
gettimeofday(&tv, 0);
|
||||
|
||||
double sec = (double) tv.tv_sec; // seconds
|
||||
double usc = (double) tv.tv_usec; // microseconds
|
||||
|
||||
return sec + usc / 1000000.0;
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief class to administrate one connection to a server
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
struct SingleServerConnection {
|
||||
httpclient::GeneralClientConnection* connection;
|
||||
rest::Endpoint* endpoint;
|
||||
time_t lastUsed;
|
||||
ServerID serverID;
|
||||
|
||||
SingleServerConnection (httpclient::GeneralClientConnection* c,
|
||||
rest::Endpoint* e,
|
||||
ServerID s)
|
||||
: connection(c), endpoint(e), lastUsed(0), serverID(s) {}
|
||||
~SingleServerConnection ();
|
||||
};
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief class to administrate all connections to a server
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
struct ServerConnections {
|
||||
vector<SingleServerConnection*> connections;
|
||||
list<SingleServerConnection*> unused;
|
||||
triagens::basics::ReadWriteLock lock;
|
||||
|
||||
ServerConnections () {}
|
||||
~ServerConnections (); // closes all connections
|
||||
};
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief map to store all connections to all servers with corresponding lock
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
// We keep connections to servers open but do not care
|
||||
// if they are closed. The key is the server ID.
|
||||
map<ServerID,ServerConnections*> allConnections;
|
||||
triagens::basics::ReadWriteLock allLock;
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief open or get a previously cached connection to a server
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
SingleServerConnection* getConnection(ServerID& serverID);
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief return leased connection to a server
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
void returnConnection(SingleServerConnection* singleConnection);
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief report a leased connection as being broken
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
void brokenConnection(SingleServerConnection* singleConnection);
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief closes all connections that have been unused for more than
|
||||
/// limit seconds
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
void closeUnusedConnections(double limit);
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief send queue with lock and index
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
|
|
@ -8159,7 +8159,6 @@ static v8::Handle<v8::Value> JS_ListDatabases (v8::Arguments const& argv) {
|
|||
|
||||
#ifdef TRI_ENABLE_CLUSTER
|
||||
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief helper function for the agency
|
||||
///
|
||||
|
@ -8167,10 +8166,16 @@ static v8::Handle<v8::Value> JS_ListDatabases (v8::Arguments const& argv) {
|
|||
/// name.
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
static int CreateDatabaseInAgency(string const& place, string const& name) {
|
||||
static int CreateDatabaseInAgency(string const& place, string const& name,
|
||||
vector<ServerID>* DBServers) {
|
||||
AgencyComm ac;
|
||||
AgencyCommLocker locker(place,"WRITE");
|
||||
AgencyCommResult res;
|
||||
if (0 != DBServers) {
|
||||
ClusterInfo* ci = ClusterInfo::instance();
|
||||
ci->loadDBServers(); // to make sure we know about all of them
|
||||
*DBServers = ci->getDBServers();
|
||||
}
|
||||
res = ac.casValue(place+"/Collections/"+name+"/Lock",string("UNLOCKED"),
|
||||
false, 0.0, 0.0);
|
||||
if (res.successful()) {
|
||||
|
@ -8224,17 +8229,22 @@ static v8::Handle<v8::Value> JS_CreateDatabase_Coordinator (v8::Arguments const&
|
|||
|
||||
const string name = TRI_ObjectToString(argv[0]);
|
||||
|
||||
ClusterInfo* ci = ClusterInfo::instance();
|
||||
//ClusterInfo* ci = ClusterInfo::instance();
|
||||
ClusterComm* cc = ClusterComm::instance();
|
||||
AgencyComm ac;
|
||||
|
||||
int ourerrno = TRI_ERROR_NO_ERROR;
|
||||
|
||||
ourerrno = CreateDatabaseInAgency("Target",name);
|
||||
ourerrno = CreateDatabaseInAgency("Target",name,0);
|
||||
if (ourerrno == TRI_ERROR_NO_ERROR) { // everything OK in /Target
|
||||
ourerrno = CreateDatabaseInAgency("Plan",name);
|
||||
vector<ServerID> DBServers;
|
||||
// We will get the list of DBServers whilst holding the lock to
|
||||
// modify "/Plan/Collections". Therefore, everybody who is on the
|
||||
// list will be told, everybody who is starting later will see the
|
||||
// entry in "/Plan/Collections/..." and will create the database on
|
||||
// startup.
|
||||
ourerrno = CreateDatabaseInAgency("Plan",name,&DBServers);
|
||||
if (ourerrno == TRI_ERROR_NO_ERROR) {
|
||||
vector<ServerID> DBServers = ci->getDBServers();
|
||||
vector<ServerID>::iterator it;
|
||||
// build request to be sent to all servers
|
||||
|
||||
|
@ -8253,44 +8263,37 @@ static v8::Handle<v8::Value> JS_CreateDatabase_Coordinator (v8::Arguments const&
|
|||
jsonstr.size(), new map<string, string>, 0, 0.0);
|
||||
delete res;
|
||||
}
|
||||
cout << "CDB: Have sent " << DBServers.size() << " requests." << endl;
|
||||
unsigned int done = 0;
|
||||
while (done < DBServers.size()) {
|
||||
res = cc->wait("", coordTransactionID, 0, "", 0.0);
|
||||
if (res->status == CL_COMM_RECEIVED) {
|
||||
if (res->answer_code == triagens::rest::HttpResponse::OK) {
|
||||
cout << "CDB: answer OK" << endl;
|
||||
done++;
|
||||
delete res;
|
||||
}
|
||||
else if (res->answer_code == triagens::rest::HttpResponse::CONFLICT) {
|
||||
cout << "CDB: answer CONFLICT" << endl;
|
||||
ourerrno = TRI_ERROR_ARANGO_DUPLICATE_NAME;
|
||||
delete res;
|
||||
break;
|
||||
}
|
||||
else {
|
||||
cout << "CDB: answer BAD" << endl;
|
||||
ourerrno = TRI_ERROR_INTERNAL;
|
||||
delete res;
|
||||
break;
|
||||
}
|
||||
}
|
||||
else {
|
||||
cout << "CDB: CL_COMM_ERROR" << endl;
|
||||
delete res;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (done == DBServers.size()) {
|
||||
ourerrno = CreateDatabaseInAgency("Current",name);
|
||||
ourerrno = CreateDatabaseInAgency("Current",name,0);
|
||||
if (ourerrno == TRI_ERROR_NO_ERROR) {
|
||||
cout << "CDB: All done" << endl;
|
||||
return scope.Close(v8::True());
|
||||
}
|
||||
}
|
||||
cc->drop( "CreateDatabase", coordTransactionID, 0, "" );
|
||||
cout << "CDB: Aborting..." << endl;
|
||||
for (it = DBServers.begin(); it != DBServers.end(); ++it) {
|
||||
res = cc->asyncRequest("CreateDB", coordTransactionID,
|
||||
"server:"+*it,
|
||||
|
@ -8302,7 +8305,6 @@ static v8::Handle<v8::Value> JS_CreateDatabase_Coordinator (v8::Arguments const&
|
|||
done = 0;
|
||||
while (done < DBServers.size()) {
|
||||
res = cc->wait("", coordTransactionID, 0, "", 0.0);
|
||||
cout << "CDB: Got answer" << endl;
|
||||
delete res;
|
||||
done++;
|
||||
}
|
||||
|
@ -8470,6 +8472,66 @@ static v8::Handle<v8::Value> JS_CreateDatabase (v8::Arguments const& argv) {
|
|||
return scope.Close(v8::True());
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief drop a database, case of a coordinator in a cluster
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
#ifdef TRI_ENABLE_CLUSTER
|
||||
|
||||
static v8::Handle<v8::Value> JS_DropDatabase_Coordinator (v8::Arguments const& argv) {
|
||||
v8::HandleScope scope;
|
||||
|
||||
// Arguments are already checked, there is exactly one argument
|
||||
|
||||
const string name = TRI_ObjectToString(argv[0]);
|
||||
|
||||
ClusterInfo* ci = ClusterInfo::instance();
|
||||
ClusterComm* cc = ClusterComm::instance();
|
||||
AgencyComm ac;
|
||||
AgencyCommResult acres;
|
||||
|
||||
int ourerrno = TRI_ERROR_NO_ERROR;
|
||||
|
||||
{
|
||||
AgencyCommLocker locker("Target","WRITE");
|
||||
// FIXME: need to check that locking worked!
|
||||
|
||||
// Now nobody can create or remove a database, so we can check that
|
||||
// the one we want to drop does indeed exist:
|
||||
acres = ac.getValues("Current/Collections/"+name+"/Lock", false);
|
||||
if (!acres.successful()) {
|
||||
TRI_V8_EXCEPTION(scope, TRI_ERROR_ARANGO_DATABASE_NOT_FOUND);
|
||||
}
|
||||
}
|
||||
|
||||
// Now let's lock it.
|
||||
// We cannot use a locker here, because we want to remove all of
|
||||
// Current/Collections/<db-name> before we are done and we must not
|
||||
// unlock the Lock after that.
|
||||
if (!ac.lockWrite("Current/Collections/"+name, 24*3600.0, 24*3600.0)) {
|
||||
TRI_V8_EXCEPTION(scope, TRI_ERROR_INTERNAL);
|
||||
}
|
||||
// res = ac.getValues("Current/Collections/"+name+"/Lock, false);
|
||||
|
||||
// If this fails or the DB does not exist, return an error
|
||||
// Remove entry Plan/Collections/<name> using Plan/Lock
|
||||
// get list of DBServers during the lock
|
||||
// (from now on new DBServers will no longer create a database)
|
||||
// this is the point of no return
|
||||
// tell all DBServers to drop database
|
||||
// note errors, but there is nothing we can do about it if things go wrong
|
||||
// only count and reports the servers with errors
|
||||
// Remove entry Target/Collections/<name>, use Target/Lock
|
||||
// Remove entry Current/Collections/<name> using Current/Lock
|
||||
// (from now on coordinators will understand that the database is gone
|
||||
// Release Plan/Lock
|
||||
// Report error
|
||||
|
||||
return scope.Close(v8::True());
|
||||
}
|
||||
|
||||
#endif
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief drop an existing database
|
||||
///
|
||||
|
@ -8502,6 +8564,13 @@ static v8::Handle<v8::Value> JS_DropDatabase (v8::Arguments const& argv) {
|
|||
TRI_V8_EXCEPTION(scope, TRI_ERROR_ARANGO_USE_SYSTEM_DATABASE);
|
||||
}
|
||||
|
||||
#ifdef TRI_ENABLE_CLUSTER
|
||||
// If we are a coordinator in a cluster, we have to behave differently:
|
||||
if (ServerState::instance()->isCoordinator()) {
|
||||
return JS_DropDatabase_Coordinator(argv);
|
||||
}
|
||||
#endif
|
||||
|
||||
const string name = TRI_ObjectToString(argv[0]);
|
||||
TRI_v8_global_t* v8g = (TRI_v8_global_t*) v8::Isolate::GetCurrent()->GetData();
|
||||
|
||||
|
|
|
@ -106,7 +106,8 @@ lib_libarango_client_a_SOURCES = \
|
|||
lib/SimpleHttpClient/ClientConnection.cpp \
|
||||
lib/SimpleHttpClient/SslClientConnection.cpp \
|
||||
lib/SimpleHttpClient/SimpleHttpClient.cpp \
|
||||
lib/SimpleHttpClient/SimpleHttpResult.cpp
|
||||
lib/SimpleHttpClient/SimpleHttpResult.cpp \
|
||||
lib/SimpleHttpClient/ConnectionManager.cpp
|
||||
|
||||
################################################################################
|
||||
### @brief library "libarango.a", front-end part
|
||||
|
@ -162,7 +163,8 @@ lib_libarango_v8_a_SOURCES = \
|
|||
lib/SimpleHttpClient/ClientConnection.cpp \
|
||||
lib/SimpleHttpClient/SslClientConnection.cpp \
|
||||
lib/SimpleHttpClient/SimpleHttpClient.cpp \
|
||||
lib/SimpleHttpClient/SimpleHttpResult.cpp
|
||||
lib/SimpleHttpClient/SimpleHttpResult.cpp \
|
||||
lib/SimpleHttpClient/ConnectionManager.cpp
|
||||
|
||||
|
||||
################################################################################
|
||||
|
|
|
@ -50,11 +50,6 @@ namespace triagens {
|
|||
// --SECTION-- typedefs
|
||||
// -----------------------------------------------------------------------------
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @addtogroup httpclient
|
||||
/// @{
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
protected:
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -63,19 +58,10 @@ namespace triagens {
|
|||
|
||||
enum { READBUFFER_SIZE = 8192 };
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @}
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
// -----------------------------------------------------------------------------
|
||||
// --SECTION-- constructors / destructors
|
||||
// -----------------------------------------------------------------------------
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @addtogroup httpclient
|
||||
/// @{
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
private:
|
||||
|
||||
GeneralClientConnection (GeneralClientConnection const&);
|
||||
|
@ -98,19 +84,10 @@ namespace triagens {
|
|||
|
||||
virtual ~GeneralClientConnection ();
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @}
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
// -----------------------------------------------------------------------------
|
||||
// --SECTION-- public methods
|
||||
// -----------------------------------------------------------------------------
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @addtogroup httpclient
|
||||
/// @{
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
public:
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -179,19 +156,10 @@ namespace triagens {
|
|||
|
||||
bool handleRead (double, triagens::basics::StringBuffer&);
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @}
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
// -----------------------------------------------------------------------------
|
||||
// --SECTION-- protected virtual methods
|
||||
// -----------------------------------------------------------------------------
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @addtogroup httpclient
|
||||
/// @{
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
protected:
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -230,19 +198,10 @@ namespace triagens {
|
|||
|
||||
virtual bool readable () = 0;
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @}
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
// -----------------------------------------------------------------------------
|
||||
// --SECTION-- protected variables
|
||||
// -----------------------------------------------------------------------------
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @addtogroup httpclient
|
||||
/// @{
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
protected:
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -281,10 +240,6 @@ namespace triagens {
|
|||
|
||||
bool _isConnected;
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @}
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
};
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue