From 47a1c315b83b39156145598c26260cc16814ccf1 Mon Sep 17 00:00:00 2001 From: Max Neunhoeffer Date: Thu, 12 Dec 2013 13:53:16 +0100 Subject: [PATCH] First proper draft of ClusterComm library. --- arangod/Cluster/ClusterComm.cpp | 28 +++- arangod/Cluster/ClusterComm.h | 234 +++++++++++++++++++++++--------- arangod/Makefile.files | 3 +- 3 files changed, 194 insertions(+), 71 deletions(-) diff --git a/arangod/Cluster/ClusterComm.cpp b/arangod/Cluster/ClusterComm.cpp index 7e619bc79c..f2af7c3a59 100644 --- a/arangod/Cluster/ClusterComm.cpp +++ b/arangod/Cluster/ClusterComm.cpp @@ -25,17 +25,35 @@ /// @author Copyright 2013, triagens GmbH, Cologne, Germany //////////////////////////////////////////////////////////////////////////////// -#include +#include "Cluster/ClusterComm.h" + +#include "VocBase/server.h" using namespace triagens::arango; // ----------------------------------------------------------------------------- -// --SECTION-- +// --SECTION-- ClusterComm class // ----------------------------------------------------------------------------- -//////////////////////////////////////////////////////////////////////////////// -/// @brief -//////////////////////////////////////////////////////////////////////////////// +ClusterComm::ClusterComm ( ) { + // ... +} + +ClusterComm* ClusterComm::_theinstance = 0; + +ClusterComm* ClusterComm::instance( ) { + // This does not have to be thread-safe, because we guarantee that + // this is called very early in the startup phase when there is still + // a single thread. + if (0 == _theinstance) { + _theinstance = new ClusterComm( ); // this now happens exactly once + } + return _theinstance; +} + +OperationID ClusterComm::getOperationID( ) { + return TRI_NewTickServer(); +} // Local Variables: // mode: outline-minor diff --git a/arangod/Cluster/ClusterComm.h b/arangod/Cluster/ClusterComm.h index decba86248..a8e3f7b4ca 100644 --- a/arangod/Cluster/ClusterComm.h +++ b/arangod/Cluster/ClusterComm.h @@ -29,8 +29,11 @@ #define TRIAGENS_CLUSTER_COMM_H 1 #include "Basics/Common.h" -#include "Basics/ReadWriteLock.h" +#include "Basics/Mutex.h" #include "Rest/HttpRequest.h" +#include "lib/SimpleHttpClient/SimpleHttpResult.h" +#include "lib/SimpleHttpClient/SimpleHttpClient.h" +#include "VocBase/voc-types.h" #ifdef __cplusplus extern "C" { @@ -39,30 +42,46 @@ extern "C" { namespace triagens { namespace arango { - typedef string ClientTID; - typedef string CoordTID; - typedef int32_t OpID; - typedef string ShardID; - typedef string ServerID; +// ----------------------------------------------------------------------------- +// --SECTION-- some types for ClusterComm +// ----------------------------------------------------------------------------- - class ClusterCommResult { - OpID opID; - // All the stuff from the HTTP request result + typedef string ClientTransactionID; // Transaction ID from client + typedef TRI_voc_tick_t TransactionID; // Coordinator transaction ID + typedef TRI_voc_tick_t OperationID; // Coordinator operation ID + typedef string ShardID; // ID of a shard + typedef string ServerID; // ID of a server + struct ClusterCommResult { + ClientTransactionID clientTransactionID; + TransactionID transactionID; + OperationID operationID; + ShardID shardID; + ServerID serverID; // the actual server ID of the sender + httpclient::SimpleHttpResult* result; + + ClusterCommResult () {} + ~ClusterCommResult () { + if (0 != result) { + delete result; + } + } }; class ClusterCommCallback { // The idea is that one inherits from this class and implements // the callback. - ClusterCommCallback(); - ~ClusterCommCallback(); + ClusterCommCallback () {} + virtual ~ClusterCommCallback (); - // Result indicates whether or not the returned result shall be queued - virtual bool operator() (ClusterCommResult *); + // Result indicates whether or not the returned result shall be queued. + // If one returns false here, one has to call delete on the + // ClusterCommResult*. + virtual bool operator() (ClusterCommResult*); }; - typedef uint64_t ClusterCommTimeout; // in microseconds + typedef double ClusterCommTimeout; // in milliseconds // ----------------------------------------------------------------------------- // --SECTION-- ClusterComm @@ -74,90 +93,181 @@ namespace triagens { // --SECTION-- constructors and destructors // ----------------------------------------------------------------------------- - public: - //////////////////////////////////////////////////////////////////////////////// /// @brief initialises library +/// +/// We are a singleton class, therefore nobody is allowed to create +/// new instances or copy them, except we ourselves. //////////////////////////////////////////////////////////////////////////////// ClusterComm ( ); + ClusterComm (ClusterComm const&); // not implemented + void operator= (ClusterComm const&); // not implemented //////////////////////////////////////////////////////////////////////////////// /// @brief shuts down library //////////////////////////////////////////////////////////////////////////////// + public: + ~ClusterComm ( ); // ----------------------------------------------------------------------------- // --SECTION-- public methods // ----------------------------------------------------------------------------- +//////////////////////////////////////////////////////////////////////////////// +/// @brief get the unique instance +//////////////////////////////////////////////////////////////////////////////// + + static ClusterComm* instance( ); + //////////////////////////////////////////////////////////////////////////////// /// @brief produces an operation ID which is unique in this process //////////////////////////////////////////////////////////////////////////////// - OpID getOpID ( ); + static OperationID getOperationID ( ); //////////////////////////////////////////////////////////////////////////////// -/// @brief submit an HTTP request to a shard synchronously +/// @brief submit an HTTP request to a shard asynchronously. +/// +/// Actually, it does one synchronous HTTP request but also creates +/// an entry in a list of expected answers. One either has to use a +/// callback for the answer, or poll for it, or drop it to prevent +/// memory leaks. The result of this call is what happened in the +/// original request, which is usually just a "202 Accepted". The actual +/// answer is then delivered either in the callback or via poll. The +/// caller has to call delete on the resulting ClusterCommResult*. //////////////////////////////////////////////////////////////////////////////// - int request (const ClientTID& clientTID, - const CoordTID& coordTID, - const OpID& opID, - const ShardID& shardID, - rest::HttpRequest::HttpRequestType reqtype, - const string& path, - const char* body, - size_t bodyLength, - const map& headerFields, - ClusterCommCallback* callback, - ClusterCommTimeout& timeout); + ClusterCommResult* asyncRequest ( + ClientTransactionID const& clientTransactionID, + TransactionID const coordTransactionID, + ShardID const& shardID, + string const& replyToPath, + rest::HttpRequest::HttpRequestType reqtype, + string const& path, + char const * body, + size_t const bodyLength, + map const& headerFields, + ClusterCommCallback* callback, + ClusterCommTimeout timeout); //////////////////////////////////////////////////////////////////////////////// -/// @brief poll one answer for a given opID +/// @brief submit a single HTTP request to a shard synchronously. //////////////////////////////////////////////////////////////////////////////// - ClusterCommResult* poll (const ClientTID& clientTID, - const CoordTID& coordTID, - const OpID& opID, - const ShardID& shardID, - bool blocking = false, - ClusterCommTimeout timeout = 0); + ClusterCommResult* syncRequest ( + ClientTransactionID const& clientTransactionID, + TransactionID const coordTransactionID, + ShardID const& shardID, + rest::HttpRequest::HttpRequestType reqtype, + string const& path, + char const * body, + size_t const bodyLength, + map const& headerFields); //////////////////////////////////////////////////////////////////////////////// -/// @brief poll many answers for some given opIDs +/// @brief forward an HTTP request we got from the client on to a shard +/// asynchronously. +/// +/// We have to add a few headers and can use callback and timeout. This +/// creates an entry in a list of expected answers. One either has to +/// use a callback for the answer, or poll for it, or drop it to prevent +/// memory leaks. The result is what happened in the original request, +/// which is usually just a "202 Accepted". The actual answer is then +/// delivered either in the callback or via poll. The caller has to +/// delete the result eventually. +//////////////////////////////////////////////////////////////////////////////// + + ClusterCommResult* asyncDelegate ( + rest::HttpRequest const& req, + TransactionID const coordTransactionID, + ShardID const& shardID, + string const& replyToPath, + const string& path, + const map& headerFields, + ClusterCommCallback* callback, + ClusterCommTimeout timeout); + +//////////////////////////////////////////////////////////////////////////////// +/// @brief forward an HTTP request we got from the client on to a shard +/// synchronously. +//////////////////////////////////////////////////////////////////////////////// + + ClusterCommResult* syncDelegate ( + rest::HttpRequest const& req, + TransactionID const coordTransactionID, + ShardID const& shardID, + const string& path, + const map& headerFields); + +//////////////////////////////////////////////////////////////////////////////// +/// @brief poll one answer matching the criteria +/// +/// If clientTransactionID is empty, then any answer with any +/// clientTransactionID matches. If coordTransactionID is 0, then +/// any answer with any coordTransactionID matches. If shardID is +/// empty, then any answer from any ShardID matches. If operationID +/// is 0, then any answer with any operationID matches. If the answer +/// is not 0, then the caller has to call delete on it. +//////////////////////////////////////////////////////////////////////////////// + + ClusterCommResult* poll ( + ClientTransactionID const& clientTransactionID, + TransactionID const coordTransactionID, + OperationID const operationID, + ShardID const& shardID, + bool blocking = false, + ClusterCommTimeout timeout = 0.0); + +//////////////////////////////////////////////////////////////////////////////// +/// @brief poll many answers matching the criteria +/// +/// If clientTransactionID is empty, then any answer with any +/// clientTransactionID matches. If coordTransactionID is 0, then +/// any answer with any coordTransactionID matches. If shardID is +/// empty, then any answer from any ShardID matches. If operationID +/// is 0, then any answer with any operationID matches. At most maxAnswers +/// results are returned. If the answer is not 0, then the caller has to +/// call delete on it. //////////////////////////////////////////////////////////////////////////////// vector multipoll ( - const ClientTID& clientTID, - const CoordTID& coordTID, - const OpID& opID, - const ShardID& shardID, - int maxanswers = 0, - bool blocking = false, - ClusterCommTimeout timeout = 0); + ClientTransactionID const& clientTransactionID, + TransactionID const coordTransactionID, + OperationID const operationID, + ShardID const& shardID, + int const maxAnswers = 0, + bool blocking = false, + ClusterCommTimeout timeout = 0); //////////////////////////////////////////////////////////////////////////////// -/// @brief ignore and drop current and future answers +/// @brief ignore and drop current and future answers matching +/// +/// If clientTransactionID is empty, then any answer with any +/// clientTransactionID matches. If coordTransactionID is 0, then +/// any answer with any coordTransactionID matches. If shardID is +/// empty, then any answer from any ShardID matches. If operationID +/// is 0, then any answer with any operationID matches. At most maxAnswers +/// results are returned. If the answer is not 0, then the caller has to +/// call delete on it. //////////////////////////////////////////////////////////////////////////////// - void drop (const ClientTID& clientTID, - const CoordTID& coordTID, - const OpID& opID, - const ShardID& shardID); + void drop (ClientTransactionID const& clientTransactionID, + TransactionID const coordTransactionID, + OperationID const operationID, + ShardID const& shardID); //////////////////////////////////////////////////////////////////////////////// -/// @brief send an answer HTTP request to a coordinator +/// @brief send an answer HTTP request to a coordinator, which contains +/// in its body a HttpResponse that we already have. //////////////////////////////////////////////////////////////////////////////// - int answer (rest::HttpRequest& request, - rest::HttpRequest::HttpRequestType reqtype, - const string& path, - const char* body, - size_t bodyLength, - const map& headerFields, - ClusterCommTimeout& timeout); + httpclient::SimpleHttpResult* asyncAnswer ( + rest::HttpRequest& origRequest, + rest::HttpResponse& resonseToSend); + // ----------------------------------------------------------------------------- // --SECTION-- private data @@ -166,16 +276,10 @@ namespace triagens { private: //////////////////////////////////////////////////////////////////////////////// -/// @brief process global last used operation ID +/// @brief the pointer to the singleton instance //////////////////////////////////////////////////////////////////////////////// - static OpID _lastUsedOpID; - -//////////////////////////////////////////////////////////////////////////////// -/// @brief global lock to protect _lastUsedOpID -//////////////////////////////////////////////////////////////////////////////// - - static triagens::basics::ReadWriteLock _lock; + static ClusterComm* _theinstance; }; // end of class ClusterComm diff --git a/arangod/Makefile.files b/arangod/Makefile.files index dfecaae02e..f1ef6d7c5e 100644 --- a/arangod/Makefile.files +++ b/arangod/Makefile.files @@ -113,7 +113,8 @@ bin_arangod_SOURCES += \ arangod/Cluster/AgencyComm.cpp \ arangod/Cluster/ApplicationCluster.cpp \ arangod/Cluster/HeartbeatThread.cpp \ - arangod/Cluster/ServerState.cpp + arangod/Cluster/ServerState.cpp \ + arangod/Cluster/ClusterComm.cpp endif