1
0
Fork 0

First proper draft of ClusterComm library.

This commit is contained in:
Max Neunhoeffer 2013-12-12 13:53:16 +01:00
parent d11a1ac2d3
commit 47a1c315b8
3 changed files with 194 additions and 71 deletions

View File

@ -25,17 +25,35 @@
/// @author Copyright 2013, triagens GmbH, Cologne, Germany /// @author Copyright 2013, triagens GmbH, Cologne, Germany
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
#include <arangod/Cluster/ClusterComm.h> #include "Cluster/ClusterComm.h"
#include "VocBase/server.h"
using namespace triagens::arango; using namespace triagens::arango;
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
// --SECTION-- // --SECTION-- ClusterComm class
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
//////////////////////////////////////////////////////////////////////////////// ClusterComm::ClusterComm ( ) {
/// @brief // ...
//////////////////////////////////////////////////////////////////////////////// }
ClusterComm* ClusterComm::_theinstance = 0;
ClusterComm* ClusterComm::instance( ) {
// This does not have to be thread-safe, because we guarantee that
// this is called very early in the startup phase when there is still
// a single thread.
if (0 == _theinstance) {
_theinstance = new ClusterComm( ); // this now happens exactly once
}
return _theinstance;
}
OperationID ClusterComm::getOperationID( ) {
return TRI_NewTickServer();
}
// Local Variables: // Local Variables:
// mode: outline-minor // mode: outline-minor

View File

@ -29,8 +29,11 @@
#define TRIAGENS_CLUSTER_COMM_H 1 #define TRIAGENS_CLUSTER_COMM_H 1
#include "Basics/Common.h" #include "Basics/Common.h"
#include "Basics/ReadWriteLock.h" #include "Basics/Mutex.h"
#include "Rest/HttpRequest.h" #include "Rest/HttpRequest.h"
#include "lib/SimpleHttpClient/SimpleHttpResult.h"
#include "lib/SimpleHttpClient/SimpleHttpClient.h"
#include "VocBase/voc-types.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
@ -39,30 +42,46 @@ extern "C" {
namespace triagens { namespace triagens {
namespace arango { namespace arango {
typedef string ClientTID; // -----------------------------------------------------------------------------
typedef string CoordTID; // --SECTION-- some types for ClusterComm
typedef int32_t OpID; // -----------------------------------------------------------------------------
typedef string ShardID;
typedef string ServerID;
class ClusterCommResult { typedef string ClientTransactionID; // Transaction ID from client
OpID opID; typedef TRI_voc_tick_t TransactionID; // Coordinator transaction ID
// All the stuff from the HTTP request result typedef TRI_voc_tick_t OperationID; // Coordinator operation ID
typedef string ShardID; // ID of a shard
typedef string ServerID; // ID of a server
struct ClusterCommResult {
ClientTransactionID clientTransactionID;
TransactionID transactionID;
OperationID operationID;
ShardID shardID;
ServerID serverID; // the actual server ID of the sender
httpclient::SimpleHttpResult* result;
ClusterCommResult () {}
~ClusterCommResult () {
if (0 != result) {
delete result;
}
}
}; };
class ClusterCommCallback { class ClusterCommCallback {
// The idea is that one inherits from this class and implements // The idea is that one inherits from this class and implements
// the callback. // the callback.
ClusterCommCallback(); ClusterCommCallback () {}
~ClusterCommCallback(); virtual ~ClusterCommCallback ();
// Result indicates whether or not the returned result shall be queued // Result indicates whether or not the returned result shall be queued.
virtual bool operator() (ClusterCommResult *); // If one returns false here, one has to call delete on the
// ClusterCommResult*.
virtual bool operator() (ClusterCommResult*);
}; };
typedef uint64_t ClusterCommTimeout; // in microseconds typedef double ClusterCommTimeout; // in milliseconds
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
// --SECTION-- ClusterComm // --SECTION-- ClusterComm
@ -74,90 +93,181 @@ namespace triagens {
// --SECTION-- constructors and destructors // --SECTION-- constructors and destructors
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
public:
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @brief initialises library /// @brief initialises library
///
/// We are a singleton class, therefore nobody is allowed to create
/// new instances or copy them, except we ourselves.
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
ClusterComm ( ); ClusterComm ( );
ClusterComm (ClusterComm const&); // not implemented
void operator= (ClusterComm const&); // not implemented
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @brief shuts down library /// @brief shuts down library
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
public:
~ClusterComm ( ); ~ClusterComm ( );
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
// --SECTION-- public methods // --SECTION-- public methods
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @brief get the unique instance
////////////////////////////////////////////////////////////////////////////////
static ClusterComm* instance( );
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @brief produces an operation ID which is unique in this process /// @brief produces an operation ID which is unique in this process
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
OpID getOpID ( ); static OperationID getOperationID ( );
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @brief submit an HTTP request to a shard synchronously /// @brief submit an HTTP request to a shard asynchronously.
///
/// Actually, it does one synchronous HTTP request but also creates
/// an entry in a list of expected answers. One either has to use a
/// callback for the answer, or poll for it, or drop it to prevent
/// memory leaks. The result of this call is what happened in the
/// original request, which is usually just a "202 Accepted". The actual
/// answer is then delivered either in the callback or via poll. The
/// caller has to call delete on the resulting ClusterCommResult*.
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
int request (const ClientTID& clientTID, ClusterCommResult* asyncRequest (
const CoordTID& coordTID, ClientTransactionID const& clientTransactionID,
const OpID& opID, TransactionID const coordTransactionID,
const ShardID& shardID, ShardID const& shardID,
string const& replyToPath,
rest::HttpRequest::HttpRequestType reqtype, rest::HttpRequest::HttpRequestType reqtype,
string const& path,
char const * body,
size_t const bodyLength,
map<string, string> const& headerFields,
ClusterCommCallback* callback,
ClusterCommTimeout timeout);
////////////////////////////////////////////////////////////////////////////////
/// @brief submit a single HTTP request to a shard synchronously.
////////////////////////////////////////////////////////////////////////////////
ClusterCommResult* syncRequest (
ClientTransactionID const& clientTransactionID,
TransactionID const coordTransactionID,
ShardID const& shardID,
rest::HttpRequest::HttpRequestType reqtype,
string const& path,
char const * body,
size_t const bodyLength,
map<string, string> const& headerFields);
////////////////////////////////////////////////////////////////////////////////
/// @brief forward an HTTP request we got from the client on to a shard
/// asynchronously.
///
/// We have to add a few headers and can use callback and timeout. This
/// creates an entry in a list of expected answers. One either has to
/// use a callback for the answer, or poll for it, or drop it to prevent
/// memory leaks. The result is what happened in the original request,
/// which is usually just a "202 Accepted". The actual answer is then
/// delivered either in the callback or via poll. The caller has to
/// delete the result eventually.
////////////////////////////////////////////////////////////////////////////////
ClusterCommResult* asyncDelegate (
rest::HttpRequest const& req,
TransactionID const coordTransactionID,
ShardID const& shardID,
string const& replyToPath,
const string& path, const string& path,
const char* body,
size_t bodyLength,
const map<string, string>& headerFields, const map<string, string>& headerFields,
ClusterCommCallback* callback, ClusterCommCallback* callback,
ClusterCommTimeout& timeout); ClusterCommTimeout timeout);
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @brief poll one answer for a given opID /// @brief forward an HTTP request we got from the client on to a shard
/// synchronously.
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
ClusterCommResult* poll (const ClientTID& clientTID, ClusterCommResult* syncDelegate (
const CoordTID& coordTID, rest::HttpRequest const& req,
const OpID& opID, TransactionID const coordTransactionID,
const ShardID& shardID, ShardID const& shardID,
const string& path,
const map<string, string>& headerFields);
////////////////////////////////////////////////////////////////////////////////
/// @brief poll one answer matching the criteria
///
/// If clientTransactionID is empty, then any answer with any
/// clientTransactionID matches. If coordTransactionID is 0, then
/// any answer with any coordTransactionID matches. If shardID is
/// empty, then any answer from any ShardID matches. If operationID
/// is 0, then any answer with any operationID matches. If the answer
/// is not 0, then the caller has to call delete on it.
////////////////////////////////////////////////////////////////////////////////
ClusterCommResult* poll (
ClientTransactionID const& clientTransactionID,
TransactionID const coordTransactionID,
OperationID const operationID,
ShardID const& shardID,
bool blocking = false, bool blocking = false,
ClusterCommTimeout timeout = 0); ClusterCommTimeout timeout = 0.0);
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @brief poll many answers for some given opIDs /// @brief poll many answers matching the criteria
///
/// If clientTransactionID is empty, then any answer with any
/// clientTransactionID matches. If coordTransactionID is 0, then
/// any answer with any coordTransactionID matches. If shardID is
/// empty, then any answer from any ShardID matches. If operationID
/// is 0, then any answer with any operationID matches. At most maxAnswers
/// results are returned. If the answer is not 0, then the caller has to
/// call delete on it.
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
vector<ClusterCommResult*> multipoll ( vector<ClusterCommResult*> multipoll (
const ClientTID& clientTID, ClientTransactionID const& clientTransactionID,
const CoordTID& coordTID, TransactionID const coordTransactionID,
const OpID& opID, OperationID const operationID,
const ShardID& shardID, ShardID const& shardID,
int maxanswers = 0, int const maxAnswers = 0,
bool blocking = false, bool blocking = false,
ClusterCommTimeout timeout = 0); ClusterCommTimeout timeout = 0);
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @brief ignore and drop current and future answers /// @brief ignore and drop current and future answers matching
///
/// If clientTransactionID is empty, then any answer with any
/// clientTransactionID matches. If coordTransactionID is 0, then
/// any answer with any coordTransactionID matches. If shardID is
/// empty, then any answer from any ShardID matches. If operationID
/// is 0, then any answer with any operationID matches. At most maxAnswers
/// results are returned. If the answer is not 0, then the caller has to
/// call delete on it.
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
void drop (const ClientTID& clientTID, void drop (ClientTransactionID const& clientTransactionID,
const CoordTID& coordTID, TransactionID const coordTransactionID,
const OpID& opID, OperationID const operationID,
const ShardID& shardID); ShardID const& shardID);
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @brief send an answer HTTP request to a coordinator /// @brief send an answer HTTP request to a coordinator, which contains
/// in its body a HttpResponse that we already have.
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
int answer (rest::HttpRequest& request, httpclient::SimpleHttpResult* asyncAnswer (
rest::HttpRequest::HttpRequestType reqtype, rest::HttpRequest& origRequest,
const string& path, rest::HttpResponse& resonseToSend);
const char* body,
size_t bodyLength,
const map<string, string>& headerFields,
ClusterCommTimeout& timeout);
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
// --SECTION-- private data // --SECTION-- private data
@ -166,16 +276,10 @@ namespace triagens {
private: private:
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @brief process global last used operation ID /// @brief the pointer to the singleton instance
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
static OpID _lastUsedOpID; static ClusterComm* _theinstance;
////////////////////////////////////////////////////////////////////////////////
/// @brief global lock to protect _lastUsedOpID
////////////////////////////////////////////////////////////////////////////////
static triagens::basics::ReadWriteLock _lock;
}; // end of class ClusterComm }; // end of class ClusterComm

View File

@ -113,7 +113,8 @@ bin_arangod_SOURCES += \
arangod/Cluster/AgencyComm.cpp \ arangod/Cluster/AgencyComm.cpp \
arangod/Cluster/ApplicationCluster.cpp \ arangod/Cluster/ApplicationCluster.cpp \
arangod/Cluster/HeartbeatThread.cpp \ arangod/Cluster/HeartbeatThread.cpp \
arangod/Cluster/ServerState.cpp arangod/Cluster/ServerState.cpp \
arangod/Cluster/ClusterComm.cpp
endif endif