From 5c80816352e2ba6f55c55d89cbf465b5b7f30b46 Mon Sep 17 00:00:00 2001 From: Max Neunhoeffer Date: Thu, 19 Dec 2013 16:15:54 +0100 Subject: [PATCH] Can do one roundtrip. Client -REST-> Coordinator -> sendQueue -REST-> DBServer -> recvQueue -REST-> Client --- arangod/Cluster/ClusterComm.cpp | 63 ++++++++++++++++++++++++++------ arangod/Cluster/ClusterComm.h | 39 +++++++++++++------- arangod/Cluster/ClusterState.cpp | 58 +++++++++++++++++++++++++---- arangod/Cluster/ClusterState.h | 2 + arangod/V8Server/v8-actions.cpp | 31 +++++++++++++++- 5 files changed, 158 insertions(+), 35 deletions(-) diff --git a/arangod/Cluster/ClusterComm.cpp b/arangod/Cluster/ClusterComm.cpp index 22572d699e..2015655771 100644 --- a/arangod/Cluster/ClusterComm.cpp +++ b/arangod/Cluster/ClusterComm.cpp @@ -304,12 +304,13 @@ ClusterCommResult* ClusterComm::asyncRequest ( op->serverID = ClusterState::instance()->getResponsibleServer( shardID); op->status = CL_COMM_SUBMITTED; + op->reqtype = reqtype; op->path = path; op->body = body; op->bodyLength = bodyLength; op->headerFields = headerFields; op->callback = callback; - op->timeout = timeout; + op->timeout = timeout == 0.0 ? 1e50 : timeout; ClusterCommResult* res = new ClusterCommResult(); *res = *static_cast(op); @@ -339,7 +340,7 @@ bool ClusterComm::match ( shardID == op->shardID) ); } -ClusterCommResult* ClusterComm::enquire (OperationID const operationID) { +ClusterCommResult const* ClusterComm::enquire (OperationID const operationID) { IndexIterator i; ClusterCommOperation* op = 0; ClusterCommResult* res; @@ -355,6 +356,7 @@ ClusterCommResult* ClusterComm::enquire (OperationID const operationID) { } op = *(i->second); *res = *static_cast(op); + res->doNotDeleteOnDestruction(); return res; } } @@ -376,6 +378,7 @@ ClusterCommResult* ClusterComm::enquire (OperationID const operationID) { } op = *(i->second); *res = *static_cast(op); + res->doNotDeleteOnDestruction(); return res; } } @@ -438,7 +441,7 @@ ClusterCommResult* ClusterComm::wait ( // Here it could either be in the receive or the send queue, let's wait timeleft = endtime - now(); if (timeleft <= 0) break; - somethingReceived.wait(uint64_t(timeleft * 1000.0)); + somethingReceived.wait(uint64_t(timeleft * 1000000.0)); } // This place is only reached on timeout } @@ -489,7 +492,7 @@ ClusterCommResult* ClusterComm::wait ( // Here it could either be in the receive or the send queue, let's wait timeleft = endtime - now(); if (timeleft <= 0) break; - somethingReceived.wait(uint64_t(timeleft * 1000.0)); + somethingReceived.wait(uint64_t(timeleft * 1000000.0)); } // This place is only reached on timeout } @@ -580,10 +583,12 @@ bool ClusterComm::moveFromSendToReceived (OperationID operationID) { assert(op->operationID == operationID); toSendByOpID.erase(i); toSend.erase(q); - if (CL_COMM_DROPPING == op->status) { + if (op->dropped) { return false; } - op->status = CL_COMM_SENT; + if (op->status == CL_COMM_SENDING) { + op->status = CL_COMM_SENT; + } received.push_back(op); q = received.end(); q--; @@ -650,12 +655,46 @@ void ClusterCommThread::run () { assert(op->status == CL_COMM_SUBMITTED); op->status = CL_COMM_SENDING; } - // We release the lock, if it is dropped now, the status just goes - // to CL_COMM_DROPPING, we find out about this after we have sent - // the request. - LOG_DEBUG("ClusterComm faking a send"); - + // We release the lock, if the operation is dropped now, the + // `dropped` flag is set. We find out about this after we have + // sent the request (happens in moveFromSendToReceived). + + // First find the server to which the request goes from the shardID: + ServerID server = ClusterState::instance()->getResponsibleServer( + op->shardID); + if (server == "") { + op->status = CL_COMM_ERROR; + } + else { + // We need a connection to this server: + ClusterComm::SingleServerConnection* connection + = cc->getConnection(server); + if (0 == connection) { + op->status = CL_COMM_ERROR; + } + else { + + LOG_TRACE("sending %s request to DB server '%s': %s", + triagens::rest::HttpRequest::translateMethod(op->reqtype).c_str(), + server.c_str(), op->body); + + { + triagens::httpclient::SimpleHttpClient client( + connection->connection, + op->timeout, false); + + // We add this result to the operation struct without acquiring + // a lock, since we know that only we do such a thing: + op->result = client.request(op->reqtype, op->path, op->body, + op->bodyLength, *(op->headerFields)); + // FIXME: handle case that connection was no good and the request + // failed. + } + cc->returnConnection(connection); + } + } + if (!cc->moveFromSendToReceived(op->operationID)) { // It was dropped in the meantime, so forget about it: delete op; @@ -666,7 +705,7 @@ void ClusterCommThread::run () { // a request to terminate the thread: { basics::ConditionLocker locker(&cc->somethingToSend); - locker.wait(100); + locker.wait(10000000); } } diff --git a/arangod/Cluster/ClusterComm.h b/arangod/Cluster/ClusterComm.h index dba09e272f..8dab156d38 100644 --- a/arangod/Cluster/ClusterComm.h +++ b/arangod/Cluster/ClusterComm.h @@ -66,15 +66,16 @@ namespace triagens { enum ClusterCommOpStatus { CL_COMM_SUBMITTED = 1, // initial request queued, but not yet sent CL_COMM_SENDING = 2, // in the process of sending - CL_COMM_DROPPING = 3, // was dropped during send, will be dropped - CL_COMM_SENT = 4, // initial request sent, response available - CL_COMM_TIMEOUT = 5, // no answer received until timeout - CL_COMM_RECEIVED = 6, // answer received - CL_COMM_DROPPED = 7 // nothing known about operation, was dropped + CL_COMM_SENT = 3, // initial request sent, response available + CL_COMM_TIMEOUT = 4, // no answer received until timeout + CL_COMM_RECEIVED = 5, // answer received + CL_COMM_DROPPED = 6, // nothing known about operation, was dropped // or actually already collected + CL_COMM_ERROR = 7, // original request could not be sent }; struct ClusterCommResult { + bool _deleteOnDestruction; ClientTransactionID clientTransactionID; CoordTransactionID coordTransactionID; OperationID operationID; @@ -84,16 +85,23 @@ namespace triagens { // The field result is != 0 ifs status is >= CL_COMM_SENT. // Note that if status is CL_COMM_TIMEOUT, then the result // field is a response object that only says "timeout" + bool dropped; // this is set to true, if the operation + // is dropped whilst in state CL_COMM_SENDING + // it is then actually dropped when sent httpclient::SimpleHttpResult* result; // the field answer is != 0 iff status is == CL_COMM_RECEIVED rest::HttpRequest* answer; - ClusterCommResult () : result(0), answer(0) {} + ClusterCommResult () + : _deleteOnDestruction(true), dropped(false), result(0), answer(0) {} + void doNotDeleteOnDestruction () { + _deleteOnDestruction = false; + } virtual ~ClusterCommResult () { - if (0 != result) { + if (_deleteOnDestruction && 0 != result) { delete result; } - if (0 != answer) { + if (_deleteOnDestruction && 0 != answer) { delete answer; } } @@ -115,6 +123,7 @@ namespace triagens { typedef double ClusterCommTimeout; // in milliseconds struct ClusterCommOperation : public ClusterCommResult { + rest::HttpRequest::HttpRequestType reqtype; string path; char const* body; size_t bodyLength; @@ -124,14 +133,14 @@ namespace triagens { ClusterCommOperation () {} virtual ~ClusterCommOperation () { - if (0 != body) { + if (_deleteOnDestruction && 0 != body) { TRI_Free(TRI_UNKNOWN_MEM_ZONE, reinterpret_cast(const_cast(body))); } - if (0 != headerFields) { + if (_deleteOnDestruction && 0 != headerFields) { delete headerFields; } - if (0 != callback) { + if (_deleteOnDestruction && 0 != callback) { delete callback; } @@ -304,14 +313,16 @@ namespace triagens { /// /// This call never blocks and returns information about a specific operation /// given by `operationID`. Note that if the `status` is >= `CL_COMM_SENT`, -/// then `result` field in the returned object is set, if the `status` +/// then the `result` field in the returned object is set, if the `status` /// is `CL_COMM_RECEIVED`, then `answer` is set. However, in both cases /// the ClusterComm library retains the operation in its queues! Therefore, /// you have to use @ref wait or @ref drop to dequeue. Do not delete -/// `result` and `answer` before doing this! +/// `result` and `answer` before doing this! However, you have to delete +/// the ClusterCommResult pointer you get, it will automatically refrain +/// from deleting `result` and `answer`. //////////////////////////////////////////////////////////////////////////////// - ClusterCommResult* enquire (OperationID const operationID); + ClusterCommResult const* enquire (OperationID const operationID); //////////////////////////////////////////////////////////////////////////////// /// @brief wait for one answer matching the criteria diff --git a/arangod/Cluster/ClusterState.cpp b/arangod/Cluster/ClusterState.cpp index 5bde0f0555..ea876d0cfa 100644 --- a/arangod/Cluster/ClusterState.cpp +++ b/arangod/Cluster/ClusterState.cpp @@ -28,6 +28,7 @@ #include "Cluster/ClusterState.h" #include "BasicsC/logging.h" +#include "Basics/ReadLocker.h" #include "Basics/WriteLocker.h" using namespace triagens::arango; @@ -62,22 +63,64 @@ ClusterState::~ClusterState () { } void ClusterState::loadServerInformation () { + AgencyCommResult res; while (true) { - // tue die schmutzige Arbeit, verlasse mit return, sobald OK - return; // FIXME ... - LOG_WARNING("ClusterState: Could not (re-)load agency data about servers"); + { + WRITE_LOCKER(lock); + res = _agency.getValues("State/ServersRegistered", true); + if (res.successful()) { + if (res.flattenJson(serverAddresses,"State/ServersRegistered/", false)) { + LOG_DEBUG("State/ServersRegistered loaded successfully"); + map::iterator i; + cout << "Servers registered:" << endl; + for (i = serverAddresses.begin(); i != serverAddresses.end(); ++i) { + cout << " " << i->first << " with address " << i->second << endl; + } + return; + } + else { + LOG_DEBUG("State/ServersRegistered not loaded successfully"); + } + } + else { + LOG_DEBUG("Error whilst loading State/ServersRegistered"); + } + } + usleep(100); } } void ClusterState::loadShardInformation () { + AgencyCommResult res; while (true) { - // tue die schmutzige Arbeit, verlasse mit return, sobald OK - return; // FIXME ... - LOG_WARNING("ClusterState: Could not (re-)load agency data about shards"); + { + WRITE_LOCKER(lock); + res = _agency.getValues("State/Shards", true); + if (res.successful()) { + if (res.flattenJson(shards,"State/Shards/", false)) { + LOG_DEBUG("State/Shards loaded successfully"); + map::iterator i; + cout << "Shards:" << endl; + for (i = shards.begin(); i != shards.end(); ++i) { + cout << " " << i->first << " with responsible server " + << i->second << endl; + } + return; + } + else { + LOG_DEBUG("State/ServersRegistered not loaded successfully"); + } + } + else { + LOG_DEBUG("Error whilst loading State/ServersRegistered"); + } + } + usleep(100); } } std::string ClusterState::getServerEndpoint (ServerID const& serverID) { + READ_LOCKER(lock); map::iterator i = serverAddresses.find(serverID); if (i != serverAddresses.end()) { return i->second; @@ -92,11 +135,12 @@ std::string ClusterState::getServerEndpoint (ServerID const& serverID) { ServerID ClusterState::getResponsibleServer (ShardID const& shardID) { + READ_LOCKER(lock); map::iterator i = shards.find(shardID); if (i != shards.end()) { return i->second; } - loadServerInformation(); + loadShardInformation(); i = shards.find(shardID); if (i != shards.end()) { return i->second; diff --git a/arangod/Cluster/ClusterState.h b/arangod/Cluster/ClusterState.h index 145a985c7a..bad40829d5 100644 --- a/arangod/Cluster/ClusterState.h +++ b/arangod/Cluster/ClusterState.h @@ -154,6 +154,8 @@ namespace triagens { map serverAddresses; // from State/ServersRegistered map shards; // from State/Shards + triagens::basics::ReadWriteLock lock; + }; } // end namespace arango diff --git a/arangod/V8Server/v8-actions.cpp b/arangod/V8Server/v8-actions.cpp index 8ffd9f6209..adcead550c 100644 --- a/arangod/V8Server/v8-actions.cpp +++ b/arangod/V8Server/v8-actions.cpp @@ -904,6 +904,8 @@ static v8::Handle JS_ExecuteGlobalContextFunction (v8::Arguments cons #ifdef TRI_ENABLE_CLUSTER +#include "Cluster/ClusterComm.h" + static v8::Handle JS_ShardingTest (v8::Arguments const& argv) { v8::Isolate* isolate; @@ -914,10 +916,35 @@ static v8::Handle JS_ShardingTest (v8::Arguments const& argv) { v8g = (TRI_v8_global_t*) isolate->GetData(); if (argv.Length() != 2) { - TRI_V8_EXCEPTION_USAGE(scope, "SYS_TEST_SHARDING(, )"); + TRI_V8_EXCEPTION_USAGE(scope, "SYS_SHARDING_TEST(, )"); } - LOG_DEBUG("JS_ShardingTest: we are back in C++"); + ClusterComm* cc = ClusterComm::instance(); + map* headerFields = new map; + (*headerFields)["X-ClientTransactionID"] = "BlaBlubb"; + + ClusterCommResult const* res = + cc->asyncRequest("ClientBla", 12345, "shardBlubb", + triagens::rest::HttpRequest::HTTP_REQUEST_GET, + "/_admin/time", NULL, 0, headerFields, 0, 0); + OperationID opID = res->operationID; + LOG_DEBUG("JS_ShardingTest: request has been submitted"); + delete res; + + // Wait until the request has actually been sent: + while (true) { + res = cc->enquire(opID); + if (res->status >= CL_COMM_SENT) { + delete res; + break; + } + delete res; + LOG_DEBUG("JS_ShardingTest: request not yet sent"); + + usleep(1000000); + } + LOG_DEBUG("JS_ShardingTest: request has been sent"); + cc->drop("", 0, opID, ""); return scope.Close(v8::Undefined()); }