From b55c7e22f94e729f88fe456d846aac52e48b6af6 Mon Sep 17 00:00:00 2001 From: Max Neunhoeffer Date: Thu, 19 Dec 2013 09:06:45 +0100 Subject: [PATCH] Basic queueing logic ready. Actual sending still todo. --- arangod/Cluster/ClusterComm.cpp | 351 +++++++++++++++++++++++++++---- arangod/Cluster/ClusterComm.h | 170 ++++++++++----- arangod/Cluster/ClusterState.cpp | 4 +- arangod/Cluster/ClusterState.h | 8 +- 4 files changed, 432 insertions(+), 101 deletions(-) diff --git a/arangod/Cluster/ClusterComm.cpp b/arangod/Cluster/ClusterComm.cpp index 8e1b78edc6..22572d699e 100644 --- a/arangod/Cluster/ClusterComm.cpp +++ b/arangod/Cluster/ClusterComm.cpp @@ -283,71 +283,314 @@ void ClusterComm::closeUnusedConnections () { } ClusterCommResult* ClusterComm::asyncRequest ( - ClientTransactionID const& clientTransactionID, - TransactionID const coordTransactionID, - ShardID const& shardID, + ClientTransactionID const clientTransactionID, + CoordTransactionID const coordTransactionID, + ShardID const shardID, rest::HttpRequest::HttpRequestType reqtype, - string const& path, - char const * body, + string const path, + char const* body, size_t const bodyLength, - map const& headerFields, + map* headerFields, ClusterCommCallback* callback, ClusterCommTimeout timeout) { - OperationID opID = getOperationID(); - - // Build HTTPRequest - // Build ClusterCommOperation object - // Put into queue - // signal on condition variable - // Build ClusterCommResult object - // return - return 0; + ClusterCommOperation* op = new ClusterCommOperation(); + op->clientTransactionID = clientTransactionID; + op->coordTransactionID = coordTransactionID; + do { + op->operationID = getOperationID(); + } while (op->operationID == 0); // just to make sure + op->shardID = shardID; + op->serverID = ClusterState::instance()->getResponsibleServer( + shardID); + op->status = CL_COMM_SUBMITTED; + op->path = path; + op->body = body; + op->bodyLength = bodyLength; + op->headerFields = headerFields; + op->callback = callback; + op->timeout = timeout; + + ClusterCommResult* res = new ClusterCommResult(); + *res = *static_cast(op); + + { + basics::ConditionLocker locker(&somethingToSend); + toSend.push_back(op); + list::iterator i = toSend.end(); + toSendByOpID[op->operationID] = --i; + } + somethingToSend.signal(); + + return res; } -bool ClusterComm::match (ClientTransactionID const& clientTransactionID, - TransactionID const coordTransactionID, - OperationID const operationID, +bool ClusterComm::match ( + ClientTransactionID const& clientTransactionID, + CoordTransactionID const coordTransactionID, ShardID const& shardID, ClusterCommOperation* op) { - // First check operationID, if given, can return false already - // then check other IDs. - return true; + + return ( (clientTransactionID == "" || + clientTransactionID == op->clientTransactionID) && + (0 == coordTransactionID || + coordTransactionID == op->coordTransactionID) && + (shardID == "" || + shardID == op->shardID) ); } -ClusterCommResult* enquire (OperationID const operationID) { - // Find operation by its ID (fast) - // build ClusterCommResult object and return it. +ClusterCommResult* ClusterComm::enquire (OperationID const operationID) { + IndexIterator i; + ClusterCommOperation* op = 0; + ClusterCommResult* res; + + // First look into the send queue: + { + basics::ConditionLocker locker(&somethingToSend); + i = toSendByOpID.find(operationID); + if (i != toSendByOpID.end()) { + res = new ClusterCommResult(); + if (0 == res) { + return 0; + } + op = *(i->second); + *res = *static_cast(op); + return res; + } + } + + // Note that operations only ever move from the send queue to the + // receive queue and never in the other direction. Therefore it is + // OK to use two different locks here, since we look first in the + // send queue and then in the receive queue; we can never miss + // an operation that is actually there. + + // If the above did not give anything, look into the receive queue: + { + basics::ConditionLocker locker(&somethingReceived); + i = receivedByOpID.find(operationID); + if (i != toSendByOpID.end()) { + res = new ClusterCommResult(); + if (0 == res) { + return 0; + } + op = *(i->second); + *res = *static_cast(op); + return res; + } + } + return 0; } ClusterCommResult* ClusterComm::wait ( ClientTransactionID const& clientTransactionID, - TransactionID const coordTransactionID, + CoordTransactionID const coordTransactionID, OperationID const operationID, ShardID const& shardID, ClusterCommTimeout timeout) { - // Only look at received queue, match, return the first with CL_COMM_RECEIVED - // dequeue it - // Initialise remaining time - // If nothing found, use condition variable and wait to get more with - // possible timeout, if timeout, return empty - // otherwise check again, if ... - return 0; + + IndexIterator i; + QueueIterator q; + ClusterCommOperation* op = 0; + ClusterCommResult* res = 0; + double endtime; + double timeleft; + bool found; + + if (0.0 == timeout) { + endtime = 1.0e50; // this is the Sankt Nimmerleinstag + } + else { + endtime = now() + timeout; + } + + if (0 != operationID) { + // In this case we only have to look into at most one operation. + basics::ConditionLocker locker(&somethingReceived); + while (true) { // will be left by return or break on timeout + i = receivedByOpID.find(operationID); + if (i == receivedByOpID.end()) { + // It could be that the operation is still in the send queue: + basics::ConditionLocker sendlocker(&somethingToSend); + i = toSendByOpID.find(operationID); + if (i == toSendByOpID.end()) { + // Nothing known about this operation, return with failure: + res = new ClusterCommResult(); + res->operationID = operationID; + res->status = CL_COMM_DROPPED; + return res; + } + } + else { + // It is in the receive queue, now look at the status: + q = i->second; + op = *q; + if (op->status >= CL_COMM_TIMEOUT) { + // It is done, let's remove it from the queue and return it: + receivedByOpID.erase(i); + received.erase(q); + res = static_cast(op); + return res; + } + // It is in the receive queue but still waiting, now wait actually + } + // Here it could either be in the receive or the send queue, let's wait + timeleft = endtime - now(); + if (timeleft <= 0) break; + somethingReceived.wait(uint64_t(timeleft * 1000.0)); + } + // This place is only reached on timeout + } + else { + // here, operationID == 0, so we have to do matching, we are only + // interested, if at least one operation matches, if it is ready, + // we return it immediately, otherwise, we report an error or wait. + basics::ConditionLocker locker(&somethingReceived); + while (true) { // will be left by return or break on timeout + found = false; + for (q = received.begin(); q != received.end(); q++) { + op = *q; + if (match(clientTransactionID, coordTransactionID, shardID, op)) { + found = true; + if (op->status >= CL_COMM_TIMEOUT) { + // It is done, let's remove it from the queue and return it: + i = receivedByOpID.find(op->operationID); // cannot fail! + assert(i != receivedByOpID.end()); + assert(i->second == q); + receivedByOpID.erase(i); + received.erase(q); + res = static_cast(op); + return res; + } + } + } + // If we found nothing, we have to look through the send queue: + if (!found) { + basics::ConditionLocker sendlocker(&somethingToSend); + for (q = toSend.begin(); q != toSend.end(); q++) { + op = *q; + if (match(clientTransactionID, coordTransactionID, shardID, op)) { + found = true; + break; + } + } + } + if (!found) { + // Nothing known about this operation, return with failure: + res = new ClusterCommResult(); + res->clientTransactionID = clientTransactionID; + res->coordTransactionID = coordTransactionID; + res->operationID = operationID; + res->shardID = shardID; + res->status = CL_COMM_DROPPED; + return res; + } + // Here it could either be in the receive or the send queue, let's wait + timeleft = endtime - now(); + if (timeleft <= 0) break; + somethingReceived.wait(uint64_t(timeleft * 1000.0)); + } + // This place is only reached on timeout + } + // Now we have to react on timeout: + res = new ClusterCommResult(); + res->clientTransactionID = clientTransactionID; + res->coordTransactionID = coordTransactionID; + res->operationID = operationID; + res->shardID = shardID; + res->status = CL_COMM_TIMEOUT; + return res; } -void ClusterComm::drop (ClientTransactionID const& clientTransactionID, - TransactionID const coordTransactionID, + +void ClusterComm::drop ( + ClientTransactionID const& clientTransactionID, + CoordTransactionID const coordTransactionID, OperationID const operationID, ShardID const& shardID) { - // Look at both send queue and recv queue, delete everything found + + QueueIterator q; + QueueIterator nextq; + IndexIterator i; + ClusterCommOperation* op; + + // First look through the send queue: + { + basics::ConditionLocker sendlocker(&somethingToSend); + for (q = toSend.begin(); q != toSend.end(); ) { + op = *q; + if ((0 != operationID && operationID == op->operationID) || + match(clientTransactionID, coordTransactionID, shardID, op)) { + nextq = q; + nextq++; + i = toSendByOpID.find(op->operationID); // cannot fail + assert(i != toSendByOpID.end()); + assert(q == i->second); + receivedByOpID.erase(i); + toSend.erase(q); + q = nextq; + } + else { + q++; + } + } + } + // Now look through the receive queue: + { + basics::ConditionLocker locker(&somethingReceived); + for (q = received.begin(); q != received.end(); ) { + op = *q; + if ((0 != operationID && operationID == op->operationID) || + match(clientTransactionID, coordTransactionID, shardID, op)) { + nextq = q; + nextq++; + i = receivedByOpID.find(op->operationID); // cannot fail + assert(i != receivedByOpID.end()); + assert(q == i->second); + receivedByOpID.erase(i); + toSend.erase(q); + q = nextq; + } + else { + q++; + } + } + } } int ClusterComm::processAnswer(rest::HttpRequest* answer) { + // find matching operation, report if found, otherwise drop return TRI_ERROR_NO_ERROR; } +// Move an operation from the send to the receive queue: +bool ClusterComm::moveFromSendToReceived (OperationID operationID) { + QueueIterator q; + IndexIterator i; + ClusterCommOperation* op; + + basics::ConditionLocker locker(&somethingReceived); + basics::ConditionLocker sendlocker(&somethingToSend); + i = toSendByOpID.find(operationID); // cannot fail + assert(i != toSendByOpID.end()); + q = i->second; + op = *q; + assert(op->operationID == operationID); + toSendByOpID.erase(i); + toSend.erase(q); + if (CL_COMM_DROPPING == op->status) { + return false; + } + op->status = CL_COMM_SENT; + received.push_back(op); + q = received.end(); + q--; + receivedByOpID[operationID] = q; + return true; +} + // ----------------------------------------------------------------------------- // --SECTION-- ClusterCommThread // ----------------------------------------------------------------------------- @@ -385,12 +628,46 @@ ClusterCommThread::~ClusterCommThread () { //////////////////////////////////////////////////////////////////////////////// void ClusterCommThread::run () { + ClusterComm::QueueIterator q; + ClusterComm::IndexIterator i; + ClusterCommOperation* op; + ClusterComm* cc = ClusterComm::instance(); + LOG_TRACE("starting ClusterComm thread"); while (! _stop) { - usleep(2000000); - // FIXME: ... LOG_DEBUG("ClusterComm alive"); + + // First check the sending queue, as long as it is not empty, we send + // a request via SimpleHttpClient: + while (true) { // will be left by break when queue is empty + { + basics::ConditionLocker locker(&cc->somethingToSend); + if (cc->toSend.empty()) { + break; + } + op = cc->toSend.front(); + assert(op->status == CL_COMM_SUBMITTED); + op->status = CL_COMM_SENDING; + } + // We release the lock, if it is dropped now, the status just goes + // to CL_COMM_DROPPING, we find out about this after we have sent + // the request. + + LOG_DEBUG("ClusterComm faking a send"); + + if (!cc->moveFromSendToReceived(op->operationID)) { + // It was dropped in the meantime, so forget about it: + delete op; + } + } + + // Now wait for the condition variable, but use a timeout to notice + // a request to terminate the thread: + { + basics::ConditionLocker locker(&cc->somethingToSend); + locker.wait(100); + } } // another thread is waiting for this value to shut down properly diff --git a/arangod/Cluster/ClusterComm.h b/arangod/Cluster/ClusterComm.h index 3e1bb3e70c..dba09e272f 100644 --- a/arangod/Cluster/ClusterComm.h +++ b/arangod/Cluster/ClusterComm.h @@ -28,6 +28,7 @@ #ifndef TRIAGENS_CLUSTER_COMM_H #define TRIAGENS_CLUSTER_COMM_H 1 +#include "BasicsC/common.h" #include "Basics/Common.h" #include "Basics/ReadWriteLock.h" #include "Basics/ConditionVariable.h" @@ -59,21 +60,23 @@ namespace triagens { // ----------------------------------------------------------------------------- typedef string ClientTransactionID; // Transaction ID from client - typedef TRI_voc_tick_t TransactionID; // Coordinator transaction ID + typedef TRI_voc_tick_t CoordTransactionID; // Coordinator transaction ID typedef TRI_voc_tick_t OperationID; // Coordinator operation ID enum ClusterCommOpStatus { CL_COMM_SUBMITTED = 1, // initial request queued, but not yet sent - CL_COMM_SENT = 2, // initial request sent, response available - CL_COMM_TIMEOUT = 3, // no answer received until timeout - CL_COMM_RECEIVED = 4, // answer received - CL_COMM_DROPPED = 5 // nothing known about operation, was dropped + CL_COMM_SENDING = 2, // in the process of sending + CL_COMM_DROPPING = 3, // was dropped during send, will be dropped + CL_COMM_SENT = 4, // initial request sent, response available + CL_COMM_TIMEOUT = 5, // no answer received until timeout + CL_COMM_RECEIVED = 6, // answer received + CL_COMM_DROPPED = 7 // nothing known about operation, was dropped // or actually already collected }; struct ClusterCommResult { ClientTransactionID clientTransactionID; - TransactionID transactionID; + CoordTransactionID coordTransactionID; OperationID operationID; ShardID shardID; ServerID serverID; // the actual server ID of the sender @@ -96,21 +99,10 @@ namespace triagens { } }; - struct ClusterCommOperation : public ClusterCommResult { - rest::HttpRequest* question; - - ClusterCommOperation () {} - virtual ~ClusterCommOperation () { - if (0 != question) { - delete question; - } - } - }; - - class ClusterCommCallback { + struct ClusterCommCallback { // The idea is that one inherits from this class and implements // the callback. - + ClusterCommCallback () {} virtual ~ClusterCommCallback (); @@ -122,6 +114,30 @@ namespace triagens { typedef double ClusterCommTimeout; // in milliseconds + struct ClusterCommOperation : public ClusterCommResult { + string path; + char const* body; + size_t bodyLength; + map* headerFields; + ClusterCommCallback* callback; + ClusterCommTimeout timeout; + + ClusterCommOperation () {} + virtual ~ClusterCommOperation () { + if (0 != body) { + TRI_Free(TRI_UNKNOWN_MEM_ZONE, + reinterpret_cast(const_cast(body))); + } + if (0 != headerFields) { + delete headerFields; + } + if (0 != callback) { + delete callback; + } + + } + }; + struct ClusterCommOptions { double _connectTimeout; double _requestTimeout; @@ -135,6 +151,8 @@ namespace triagens { class ClusterComm { + friend class ClusterCommThread; + // ----------------------------------------------------------------------------- // --SECTION-- constructors and destructors // ----------------------------------------------------------------------------- @@ -197,18 +215,22 @@ namespace triagens { /// queued (`status` is CL_COMM_SUBMITTED). Use @ref enquire below to get /// information about the progress. The actual answer is then delivered /// either in the callback or via poll. The caller has to call delete on -/// the resulting ClusterCommResult*. +/// the resulting ClusterCommResult*. The library takes ownerships of +/// the pointers `body`, `headerFields` and `callback` and releases +/// the memory when the operation has been finished. One has to use +/// TRI_Allocate with memory zone TRI_UNKNOWN_MEM_ZONE to allocate the +/// memory to which `body` points. //////////////////////////////////////////////////////////////////////////////// ClusterCommResult* asyncRequest ( - ClientTransactionID const& clientTransactionID, - TransactionID const coordTransactionID, - ShardID const& shardID, + ClientTransactionID const clientTransactionID, + CoordTransactionID const coordTransactionID, + ShardID const shardID, rest::HttpRequest::HttpRequestType reqtype, - string const& path, - char const * body, + string const path, + char const* body, size_t const bodyLength, - map const& headerFields, + map * headerFields, ClusterCommCallback* callback, ClusterCommTimeout timeout); @@ -227,11 +249,11 @@ namespace triagens { ClusterCommResult* syncRequest ( ClientTransactionID const& clientTransactionID, - TransactionID const coordTransactionID, + CoordTransactionID const coordTransactionID, ShardID const& shardID, rest::HttpRequest::HttpRequestType reqtype, string const& path, - char const * body, + char const* body, size_t const bodyLength, map const& headerFields, ClusterCommTimeout timeout); @@ -242,15 +264,21 @@ namespace triagens { /// /// This behaves as @ref asyncRequest except that the actual request is /// taken from `req`. We have to add a few headers and can use callback -/// and timeout. The caller has to delete the result. +/// and timeout. The caller has to delete the result. The library takes +/// ownerships of the pointers `headerFields` and `callback` and +/// releases the memory when the operation has been finished. Note that +/// ClusterComm creates copy of relevant parts of the HTTP request +/// object `req`, simply because it can neither delete nor not delete +/// `req` and its children itself. + //////////////////////////////////////////////////////////////////////////////// ClusterCommResult* asyncDelegate ( rest::HttpRequest const& req, - TransactionID const coordTransactionID, - ShardID const& shardID, - const string& path, - const map& headerFields, + CoordTransactionID const coordTransactionID, + ShardID const shardID, + string const path, + map const* headerFields, ClusterCommCallback* callback, ClusterCommTimeout timeout); @@ -265,32 +293,12 @@ namespace triagens { ClusterCommResult* syncDelegate ( rest::HttpRequest const& req, - TransactionID const coordTransactionID, + CoordTransactionID const coordTransactionID, ShardID const& shardID, const string& path, const map& headerFields, ClusterCommTimeout timeout); -//////////////////////////////////////////////////////////////////////////////// -/// @brief wait for one answer matching the criteria -/// -/// If clientTransactionID is empty, then any answer with any -/// clientTransactionID matches. If coordTransactionID is 0, then -/// any answer with any coordTransactionID matches. If shardID is -/// empty, then any answer from any ShardID matches. If operationID -/// is 0, then any answer with any operationID matches. If `timeout` -/// is given, the result can be 0 indicating that no matching answer -/// was available until the timeout was hit. The caller has to delete -/// the result, if it is not 0. -//////////////////////////////////////////////////////////////////////////////// - - ClusterCommResult* wait ( - ClientTransactionID const& clientTransactionID, - TransactionID const coordTransactionID, - OperationID const operationID, - ShardID const& shardID, - ClusterCommTimeout timeout = 0.0); - //////////////////////////////////////////////////////////////////////////////// /// @brief check on the status of an operation /// @@ -305,6 +313,27 @@ namespace triagens { ClusterCommResult* enquire (OperationID const operationID); +//////////////////////////////////////////////////////////////////////////////// +/// @brief wait for one answer matching the criteria +/// +/// If clientTransactionID is empty, then any answer with any +/// clientTransactionID matches. If coordTransactionID is 0, then +/// any answer with any coordTransactionID matches. If shardID is +/// empty, then any answer from any ShardID matches. If operationID +/// is 0, then any answer with any operationID matches. +/// This function returns 0 if noIf `timeout` +/// is given, the result can be 0 indicating that no matching answer +/// was available until the timeout was hit. The caller has to delete +/// the result, if it is not 0. +//////////////////////////////////////////////////////////////////////////////// + + ClusterCommResult* wait ( + ClientTransactionID const& clientTransactionID, + CoordTransactionID const coordTransactionID, + OperationID const operationID, + ShardID const& shardID, + ClusterCommTimeout timeout = 0.0); + //////////////////////////////////////////////////////////////////////////////// /// @brief ignore and drop current and future answers matching /// @@ -320,7 +349,7 @@ namespace triagens { //////////////////////////////////////////////////////////////////////////////// void drop (ClientTransactionID const& clientTransactionID, - TransactionID const coordTransactionID, + CoordTransactionID const coordTransactionID, OperationID const operationID, ShardID const& shardID); @@ -367,7 +396,21 @@ namespace triagens { static OperationID getOperationID (); - static int const maxConnectionsPerServer = 10; +//////////////////////////////////////////////////////////////////////////////// +/// @brief get timestamp +//////////////////////////////////////////////////////////////////////////////// + + static double now () { + struct timeval tv; + gettimeofday(&tv, 0); + + double sec = (double) tv.tv_sec; // seconds + double usc = (double) tv.tv_usec; // microseconds + + return sec + usc / 1000000.0; + } + + static int const maxConnectionsPerServer = 2; struct SingleServerConnection { httpclient::GeneralClientConnection* connection; @@ -413,13 +456,24 @@ namespace triagens { map::iterator> receivedByOpID; triagens::basics::ConditionVariable somethingReceived; + // Note: If you really have to lock both `somethingToSend` + // and `somethingReceived` at the same time (usually you should + // not have to!), then: first lock `somethingToReceive`, then + // lock `somethingtoSend` in this order! + + // We frequently need the following lengthy types: + typedef list::iterator QueueIterator; + typedef map::iterator IndexIterator; + // An internal function to match an operation: bool match (ClientTransactionID const& clientTransactionID, - TransactionID const coordTransactionID, - OperationID const operationID, + CoordTransactionID const coordTransactionID, ShardID const& shardID, ClusterCommOperation* op); + // Move an operation from the send to the receive queue: + bool moveFromSendToReceived (OperationID operationID); + // Finally, our background communications thread: ClusterCommThread *_backgroundThread; }; // end of class ClusterComm diff --git a/arangod/Cluster/ClusterState.cpp b/arangod/Cluster/ClusterState.cpp index f42a346b59..5bde0f0555 100644 --- a/arangod/Cluster/ClusterState.cpp +++ b/arangod/Cluster/ClusterState.cpp @@ -77,7 +77,7 @@ void ClusterState::loadShardInformation () { } } -std::string ClusterState::getServerEndpoint (ServerID& serverID) { +std::string ClusterState::getServerEndpoint (ServerID const& serverID) { map::iterator i = serverAddresses.find(serverID); if (i != serverAddresses.end()) { return i->second; @@ -90,7 +90,7 @@ std::string ClusterState::getServerEndpoint (ServerID& serverID) { return string(""); } -ServerID ClusterState::getResponsibleServer (ShardID& shardID) +ServerID ClusterState::getResponsibleServer (ShardID const& shardID) { map::iterator i = shards.find(shardID); if (i != shards.end()) { diff --git a/arangod/Cluster/ClusterState.h b/arangod/Cluster/ClusterState.h index d18ebbed21..145a985c7a 100644 --- a/arangod/Cluster/ClusterState.h +++ b/arangod/Cluster/ClusterState.h @@ -118,26 +118,26 @@ namespace triagens { /// @brief find the endpoint of a server from its ID //////////////////////////////////////////////////////////////////////////////// - string getServerEndpoint(ServerID& serverID); + string getServerEndpoint(ServerID const& serverID); //////////////////////////////////////////////////////////////////////////////// /// @brief ask about a collection //////////////////////////////////////////////////////////////////////////////// - string getCollectionInfo(CollectionID& collectionID); + string getCollectionInfo(CollectionID const& collectionID); //////////////////////////////////////////////////////////////////////////////// /// @brief get all shards in a collection //////////////////////////////////////////////////////////////////////////////// - void getShardsCollection(CollectionID& collectionID, + void getShardsCollection(CollectionID const& collectionID, vector &shards); //////////////////////////////////////////////////////////////////////////////// /// @brief find the server who is responsible for a shard //////////////////////////////////////////////////////////////////////////////// - ServerID getResponsibleServer(ShardID& shardID); + ServerID getResponsibleServer(ShardID const& shardID); //////////////////////////////////////////////////////////////////////////////// /// @brief get a number of cluster-wide unique IDs, returns the first