diff --git a/arangod/Cluster/ApplicationCluster.cpp b/arangod/Cluster/ApplicationCluster.cpp index c6027b95b6..89402c129f 100644 --- a/arangod/Cluster/ApplicationCluster.cpp +++ b/arangod/Cluster/ApplicationCluster.cpp @@ -201,6 +201,9 @@ bool ApplicationCluster::start () { _myId.c_str()); } + // register our own address + ServerState::instance()->setAddress(_myAddress); + // now we can validate --cluster.my-address const string unified = triagens::rest::Endpoint::getUnifiedForm(_myAddress); diff --git a/arangod/Cluster/ClusterComm.cpp b/arangod/Cluster/ClusterComm.cpp index cfeb8422c6..b482bc904b 100644 --- a/arangod/Cluster/ClusterComm.cpp +++ b/arangod/Cluster/ClusterComm.cpp @@ -125,6 +125,8 @@ ClusterComm::getConnection(ServerID& serverID) { allConnections[serverID] = s; } } + + assert(s != 0); // Now get an unused one: { @@ -138,6 +140,7 @@ ClusterComm::getConnection(ServerID& serverID) { // We need to open a new one: string a = ClusterState::instance()->getServerEndpoint(serverID); + if (a == "") { // Unknown server address, probably not yet connected return 0; @@ -305,6 +308,7 @@ ClusterCommResult* ClusterComm::asyncRequest ( op->shardID = shardID; op->serverID = ClusterState::instance()->getResponsibleServer( shardID); + op->status = CL_COMM_SUBMITTED; op->reqtype = reqtype; op->path = path; @@ -312,8 +316,8 @@ ClusterCommResult* ClusterComm::asyncRequest ( op->bodyLength = bodyLength; op->headerFields = headerFields; op->callback = callback; - op->timeout = timeout == 0.0 ? 1e50 : timeout; - + op->timeout = timeout == 0.0 ? 3600.0 : timeout; + ClusterCommResult* res = new ClusterCommResult(); *res = *static_cast(op); @@ -677,9 +681,9 @@ void ClusterCommThread::run () { = cc->getConnection(server); if (0 == connection) { op->status = CL_COMM_ERROR; + LOG_ERROR("cannot create connection to server '%s'", server.c_str()); } else { - LOG_TRACE("sending %s request to DB server '%s': %s", triagens::rest::HttpRequest::translateMethod(op->reqtype).c_str(), server.c_str(), op->body); diff --git a/arangod/Cluster/ServerState.cpp b/arangod/Cluster/ServerState.cpp index 2f94353de0..2ab3e0857b 100644 --- a/arangod/Cluster/ServerState.cpp +++ b/arangod/Cluster/ServerState.cpp @@ -51,7 +51,9 @@ static ServerState* Instance = 0; //////////////////////////////////////////////////////////////////////////////// ServerState::ServerState () - : _lock(), + : _id(), + _address(), + _lock(), _role(ROLE_UNDEFINED), _state(STATE_UNDEFINED) { diff --git a/arangod/Cluster/ServerState.h b/arangod/Cluster/ServerState.h index be5b6ca256..2641a892b8 100644 --- a/arangod/Cluster/ServerState.h +++ b/arangod/Cluster/ServerState.h @@ -173,6 +173,26 @@ namespace triagens { _id = id; } +//////////////////////////////////////////////////////////////////////////////// +/// @brief get the server address +//////////////////////////////////////////////////////////////////////////////// + + inline std::string getAddress () const { + return _address; + } + +//////////////////////////////////////////////////////////////////////////////// +/// @brief set the server address +//////////////////////////////////////////////////////////////////////////////// + + void setAddress (std::string const& address) { + // address can be set just once + assert(_address.empty()); + assert(! address.empty()); + + _address = address; + } + //////////////////////////////////////////////////////////////////////////////// /// @brief get the current state //////////////////////////////////////////////////////////////////////////////// @@ -215,6 +235,12 @@ namespace triagens { std::string _id; +//////////////////////////////////////////////////////////////////////////////// +/// @brief the server's own address. can be set just once +//////////////////////////////////////////////////////////////////////////////// + + std::string _address; + //////////////////////////////////////////////////////////////////////////////// /// @brief r/w lock for state //////////////////////////////////////////////////////////////////////////////// diff --git a/arangod/V8Server/v8-actions.cpp b/arangod/V8Server/v8-actions.cpp index adcead550c..29d13080ca 100644 --- a/arangod/V8Server/v8-actions.cpp +++ b/arangod/V8Server/v8-actions.cpp @@ -41,6 +41,14 @@ #include "V8/v8-utils.h" #include "V8Server/ApplicationV8.h" #include "V8Server/v8-vocbase.h" +#include "VocBase/server.h" + +#ifdef TRI_ENABLE_CLUSTER + +#include "Cluster/ClusterComm.h" +#include "Cluster/ServerState.h" + +#endif using namespace std; using namespace triagens::basics; @@ -904,8 +912,6 @@ static v8::Handle JS_ExecuteGlobalContextFunction (v8::Arguments cons #ifdef TRI_ENABLE_CLUSTER -#include "Cluster/ClusterComm.h" - static v8::Handle JS_ShardingTest (v8::Arguments const& argv) { v8::Isolate* isolate; @@ -919,30 +925,52 @@ static v8::Handle JS_ShardingTest (v8::Arguments const& argv) { TRI_V8_EXCEPTION_USAGE(scope, "SYS_SHARDING_TEST(, )"); } + const string clientTransactionId = StringUtils::itoa(TRI_NewTickServer()); + ClusterComm* cc = ClusterComm::instance(); + + if (cc == 0) { + TRI_V8_EXCEPTION_MESSAGE(scope, TRI_ERROR_INTERNAL, "clustercomm object not found"); + } + map* headerFields = new map; - (*headerFields)["X-ClientTransactionID"] = "BlaBlubb"; + (*headerFields)["X-ClientTransactionID"] = clientTransactionId; + (*headerFields)["X-Arango-Async"] = "store"; + (*headerFields)["X-Arango-Coordinator"] = ServerState::instance()->getAddress(); ClusterCommResult const* res = - cc->asyncRequest("ClientBla", 12345, "shardBlubb", + cc->asyncRequest(clientTransactionId, TRI_NewTickServer(), "shardBlubb", triagens::rest::HttpRequest::HTTP_REQUEST_GET, "/_admin/time", NULL, 0, headerFields, 0, 0); - OperationID opID = res->operationID; + + if (res == 0) { + TRI_V8_EXCEPTION_MESSAGE(scope, TRI_ERROR_INTERNAL, "couldn't queue async request"); + } + LOG_DEBUG("JS_ShardingTest: request has been submitted"); + + OperationID opID = res->operationID; delete res; // Wait until the request has actually been sent: while (true) { res = cc->enquire(opID); - if (res->status >= CL_COMM_SENT) { - delete res; + if (res == 0) { + TRI_V8_EXCEPTION_MESSAGE(scope, TRI_ERROR_INTERNAL, "couldn't enquire operation"); + } + + ClusterCommOpStatus status = res->status; + + delete res; + + if (status >= CL_COMM_SENT) { break; } - delete res; LOG_DEBUG("JS_ShardingTest: request not yet sent"); - usleep(1000000); + usleep(500000); } + LOG_DEBUG("JS_ShardingTest: request has been sent"); cc->drop("", 0, opID, ""); diff --git a/lib/HttpServer/AsyncJobManager.h b/lib/HttpServer/AsyncJobManager.h index 69a79a1e64..bc30232e66 100644 --- a/lib/HttpServer/AsyncJobManager.h +++ b/lib/HttpServer/AsyncJobManager.h @@ -407,10 +407,12 @@ namespace triagens { AsyncCallbackContext* ctx = 0; + std::cout << "ASYNC REQUEST\n"; bool found; char const* hdr = job->getHandler()->getRequest()->header("x-arango-coordinator", found); if (found) { + std::cout << "FOUND COORDINATOR HEADER\n"; ctx = new AsyncCallbackContext(std::string(hdr)); } diff --git a/lib/SimpleHttpClient/SimpleHttpClient.cpp b/lib/SimpleHttpClient/SimpleHttpClient.cpp index 0c85814f26..7cfe15f089 100644 --- a/lib/SimpleHttpClient/SimpleHttpClient.cpp +++ b/lib/SimpleHttpClient/SimpleHttpClient.cpp @@ -350,7 +350,10 @@ namespace triagens { else { _writeBuffer.appendText("\r\n"); } - _writeBuffer.appendText(body, bodyLength); + + if (body != 0) { + _writeBuffer.appendText(body, bodyLength); + } LOG_TRACE("Request: %s", _writeBuffer.c_str());