mirror of https://gitee.com/bigwinds/arangodb
Merge branch 'sharding' of https://github.com/triAGENS/ArangoDB into sharding
This commit is contained in:
commit
1889fa52a0
|
@ -40,6 +40,10 @@ using namespace triagens::arango;
|
|||
// --SECTION-- ClusterComm connection options
|
||||
// -----------------------------------------------------------------------------
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief global options for connections
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
ClusterCommOptions ClusterComm::_globalConnectionOptions = {
|
||||
15.0, // connectTimeout
|
||||
3.0, // requestTimeout
|
||||
|
@ -62,6 +66,10 @@ void triagens::arango::ClusterCommRestCallback(string& coordinator,
|
|||
// --SECTION-- ClusterComm class
|
||||
// -----------------------------------------------------------------------------
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief ClusterComm constructor
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
ClusterComm::ClusterComm () {
|
||||
_backgroundThread = new ClusterCommThread();
|
||||
if (0 == _backgroundThread) {
|
||||
|
@ -72,6 +80,10 @@ ClusterComm::ClusterComm () {
|
|||
}
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief ClusterComm destructor
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
ClusterComm::~ClusterComm () {
|
||||
_backgroundThread->stop();
|
||||
_backgroundThread->shutdown();
|
||||
|
@ -85,8 +97,16 @@ ClusterComm::~ClusterComm () {
|
|||
}
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief the actual singleton instance
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
ClusterComm* ClusterComm::_theinstance = 0;
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief getter for our singleton instance
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
ClusterComm* ClusterComm::instance () {
|
||||
// This does not have to be thread-safe, because we guarantee that
|
||||
// this is called very early in the startup phase when there is still
|
||||
|
@ -97,14 +117,26 @@ ClusterComm* ClusterComm::instance () {
|
|||
return _theinstance;
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief only used to trigger creation
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
void ClusterComm::initialise () {
|
||||
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief produces an operation ID which is unique in this process
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
OperationID ClusterComm::getOperationID () {
|
||||
return TRI_NewTickServer();
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief destructor for SingleServerConnection class
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
ClusterComm::SingleServerConnection::~SingleServerConnection () {
|
||||
delete connection;
|
||||
delete endpoint;
|
||||
|
@ -112,6 +144,10 @@ ClusterComm::SingleServerConnection::~SingleServerConnection () {
|
|||
serverID = "";
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief destructor of ServerConnections class
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
ClusterComm::ServerConnections::~ServerConnections () {
|
||||
vector<SingleServerConnection*>::iterator i;
|
||||
WRITE_LOCKER(lock);
|
||||
|
@ -123,6 +159,10 @@ ClusterComm::ServerConnections::~ServerConnections () {
|
|||
connections.clear();
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief open or get a previously cached connection to a server
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
ClusterComm::SingleServerConnection*
|
||||
ClusterComm::getConnection(ServerID& serverID) {
|
||||
map<ServerID,ServerConnections*>::iterator i;
|
||||
|
@ -193,6 +233,9 @@ ClusterComm::getConnection(ServerID& serverID) {
|
|||
return c;
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief return leased connection to a server
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
void ClusterComm::returnConnection(SingleServerConnection* c) {
|
||||
map<ServerID,ServerConnections*>::iterator i;
|
||||
|
@ -222,6 +265,10 @@ void ClusterComm::returnConnection(SingleServerConnection* c) {
|
|||
}
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief report a leased connection as being broken
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
void ClusterComm::brokenConnection(SingleServerConnection* c) {
|
||||
map<ServerID,ServerConnections*>::iterator i;
|
||||
ServerConnections* s;
|
||||
|
@ -259,6 +306,11 @@ void ClusterComm::brokenConnection(SingleServerConnection* c) {
|
|||
delete c;
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief closes all connections that have been unused for more than
|
||||
/// limit seconds
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
void ClusterComm::closeUnusedConnections (double limit) {
|
||||
WRITE_LOCKER(allLock);
|
||||
map<ServerID,ServerConnections*>::iterator s;
|
||||
|
@ -304,6 +356,27 @@ void ClusterComm::closeUnusedConnections (double limit) {
|
|||
}
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief submit an HTTP request to a shard asynchronously.
|
||||
///
|
||||
/// This function is only called when arangod is in coordinator mode. It
|
||||
/// queues a single HTTP request to one of the DBServers to be sent by
|
||||
/// ClusterComm in the background thread. This request actually orders
|
||||
/// an answer, which is an HTTP request sent from the target DBServer
|
||||
/// back to us. Therefore ClusterComm also creates an entry in a list of
|
||||
/// expected answers. One either has to use a callback for the answer,
|
||||
/// or poll for it, or drop it to prevent memory leaks. The result of
|
||||
/// this call is just a record that the initial HTTP request has been
|
||||
/// queued (`status` is CL_COMM_SUBMITTED). Use @ref enquire below to get
|
||||
/// information about the progress. The actual answer is then delivered
|
||||
/// either in the callback or via poll. The caller has to call delete on
|
||||
/// the resulting ClusterCommResult*. The library takes ownerships of
|
||||
/// the pointers `headerFields` and `callback` and releases
|
||||
/// the memory when the operation has been finished. It is the caller's
|
||||
/// responsibility to free the memory to which `body` points after the
|
||||
/// operation has finally terminated.
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
//
|
||||
ClusterCommResult* ClusterComm::asyncRequest (
|
||||
ClientTransactionID const clientTransactionID,
|
||||
CoordTransactionID const coordTransactionID,
|
||||
|
@ -363,6 +436,19 @@ ClusterCommResult* ClusterComm::asyncRequest (
|
|||
return res;
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief submit a single HTTP request to a shard synchronously.
|
||||
///
|
||||
/// This function does an HTTP request synchronously, waiting for the
|
||||
/// result. Note that the result has `status` field set to `CL_COMM_SENT`
|
||||
/// and the field `result` is set to the HTTP response. The field `answer`
|
||||
/// is unused in this case. In case of a timeout the field `status` is
|
||||
/// `CL_COMM_TIMEOUT` and the field `result` points to an HTTP response
|
||||
/// object that only says "timeout". Note that the ClusterComm library
|
||||
/// does not keep a record of this operation, in particular, you cannot
|
||||
/// use @ref enquire to ask about it.
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
ClusterCommResult* ClusterComm::syncRequest (
|
||||
ClientTransactionID const& clientTransactionID,
|
||||
CoordTransactionID const coordTransactionID,
|
||||
|
@ -440,6 +526,10 @@ ClusterCommResult* ClusterComm::syncRequest (
|
|||
return res;
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief internal function to match an operation:
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
bool ClusterComm::match (
|
||||
ClientTransactionID const& clientTransactionID,
|
||||
CoordTransactionID const coordTransactionID,
|
||||
|
@ -454,6 +544,20 @@ bool ClusterComm::match (
|
|||
shardID == op->shardID) );
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief check on the status of an operation
|
||||
///
|
||||
/// This call never blocks and returns information about a specific operation
|
||||
/// given by `operationID`. Note that if the `status` is >= `CL_COMM_SENT`,
|
||||
/// then the `result` field in the returned object is set, if the `status`
|
||||
/// is `CL_COMM_RECEIVED`, then `answer` is set. However, in both cases
|
||||
/// the ClusterComm library retains the operation in its queues! Therefore,
|
||||
/// you have to use @ref wait or @ref drop to dequeue. Do not delete
|
||||
/// `result` and `answer` before doing this! However, you have to delete
|
||||
/// the ClusterCommResult pointer you get, it will automatically refrain
|
||||
/// from deleting `result` and `answer`.
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
ClusterCommResult const* ClusterComm::enquire (OperationID const operationID) {
|
||||
IndexIterator i;
|
||||
ClusterCommOperation* op = 0;
|
||||
|
@ -506,6 +610,21 @@ ClusterCommResult const* ClusterComm::enquire (OperationID const operationID) {
|
|||
return res;
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief wait for one answer matching the criteria
|
||||
///
|
||||
/// If clientTransactionID is empty, then any answer with any
|
||||
/// clientTransactionID matches. If coordTransactionID is 0, then any
|
||||
/// answer with any coordTransactionID matches. If shardID is empty,
|
||||
/// then any answer from any ShardID matches. If operationID is 0, then
|
||||
/// any answer with any operationID matches. This function returns
|
||||
/// a result structure with status CL_COMM_DROPPED if no operation
|
||||
/// matches. If `timeout` is given, the result can be a result structure
|
||||
/// with status CL_COMM_TIMEOUT indicating that no matching answer was
|
||||
/// available until the timeout was hit. The caller has to delete the
|
||||
/// result.
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
ClusterCommResult* ClusterComm::wait (
|
||||
ClientTransactionID const& clientTransactionID,
|
||||
CoordTransactionID const coordTransactionID,
|
||||
|
@ -625,6 +744,19 @@ ClusterCommResult* ClusterComm::wait (
|
|||
return res;
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief ignore and drop current and future answers matching
|
||||
///
|
||||
/// If clientTransactionID is empty, then any answer with any
|
||||
/// clientTransactionID matches. If coordTransactionID is 0, then
|
||||
/// any answer with any coordTransactionID matches. If shardID is
|
||||
/// empty, then any answer from any ShardID matches. If operationID
|
||||
/// is 0, then any answer with any operationID matches. If there
|
||||
/// is already an answer for a matching operation, it is dropped and
|
||||
/// freed. If not, any future answer coming in is automatically dropped.
|
||||
/// This function can be used to automatically delete all information about an
|
||||
/// operation, for which @ref enquire reported successful completion.
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
void ClusterComm::drop (
|
||||
ClientTransactionID const& clientTransactionID,
|
||||
|
@ -681,6 +813,13 @@ void ClusterComm::drop (
|
|||
}
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief send an answer HTTP request to a coordinator
|
||||
///
|
||||
/// This is only called in a DBServer node and never in a coordinator
|
||||
/// node.
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
void ClusterComm::asyncAnswer (string& coordinatorHeader,
|
||||
rest::HttpResponse* responseToSend) {
|
||||
|
||||
|
@ -736,6 +875,15 @@ void ClusterComm::asyncAnswer (string& coordinatorHeader,
|
|||
returnConnection(connection);
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief process an answer coming in on the HTTP socket
|
||||
///
|
||||
/// this is called for a request, which is actually an answer to one of
|
||||
/// our earlier requests, return value of "" means OK and nonempty is
|
||||
/// an error. This is only called in a coordinator node and not in a
|
||||
/// DBServer node.
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
string ClusterComm::processAnswer(string& coordinatorHeader,
|
||||
rest::HttpRequest* answer) {
|
||||
// First take apart the header to get the operaitonID:
|
||||
|
@ -811,7 +959,10 @@ string ClusterComm::processAnswer(string& coordinatorHeader,
|
|||
return string("");
|
||||
}
|
||||
|
||||
// Move an operation from the send to the receive queue:
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief move an operation from the send to the receive queue
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
bool ClusterComm::moveFromSendToReceived (OperationID operationID) {
|
||||
QueueIterator q;
|
||||
IndexIterator i;
|
||||
|
@ -846,6 +997,10 @@ bool ClusterComm::moveFromSendToReceived (OperationID operationID) {
|
|||
return true;
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief cleanup all queues
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
void ClusterComm::cleanupAllQueues() {
|
||||
QueueIterator i;
|
||||
{
|
||||
|
@ -1031,7 +1186,7 @@ void ClusterCommThread::run () {
|
|||
// -----------------------------------------------------------------------------
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief initialises the heartbeat
|
||||
/// @brief initialises the cluster comm background thread
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
bool ClusterCommThread::init () {
|
||||
|
|
|
@ -56,9 +56,27 @@ namespace triagens {
|
|||
// --SECTION-- some types for ClusterComm
|
||||
// -----------------------------------------------------------------------------
|
||||
|
||||
typedef string ClientTransactionID; // Transaction ID from client
|
||||
typedef TRI_voc_tick_t CoordTransactionID; // Coordinator transaction ID
|
||||
typedef TRI_voc_tick_t OperationID; // Coordinator operation ID
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief type of a client transaction ID
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
typedef string ClientTransactionID;
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief type of a coordinator transaction ID
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
typedef TRI_voc_tick_t CoordTransactionID;
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief trype of an operation ID
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
typedef TRI_voc_tick_t OperationID;
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief status of an (a-)synchronous cluster operation
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
enum ClusterCommOpStatus {
|
||||
CL_COMM_SUBMITTED = 1, // initial request queued, but not yet sent
|
||||
|
@ -72,6 +90,11 @@ namespace triagens {
|
|||
// in the wait or enquire methods
|
||||
};
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief used to report the status, progress and possibly result of
|
||||
/// an operation
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
struct ClusterCommResult {
|
||||
bool _deleteOnDestruction;
|
||||
ClientTransactionID clientTransactionID;
|
||||
|
@ -107,23 +130,42 @@ namespace triagens {
|
|||
}
|
||||
};
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief type for a callback for a cluster operation
|
||||
///
|
||||
/// The idea is that one inherits from this class and implements
|
||||
/// the callback. Note however that the callback is called whilst
|
||||
/// holding the lock for the receiving (or indeed also the sending)
|
||||
/// queue! Therefore the operation should be quick.
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
struct ClusterCommCallback {
|
||||
// The idea is that one inherits from this class and implements
|
||||
// the callback. Note however that the callback is called whilst
|
||||
// holding the lock for the receiving (or indeed also the sending)
|
||||
// queue! Therefore the operation should be quick.
|
||||
|
||||
ClusterCommCallback () {}
|
||||
virtual ~ClusterCommCallback () {};
|
||||
|
||||
// Result indicates whether or not the returned result is already
|
||||
// fully processed. If so, it is removed from all queues. In this
|
||||
// case the object is automatically destructed, so that the
|
||||
// callback must not call delete in any case.
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief the actual callback function
|
||||
///
|
||||
/// Result indicates whether or not the returned result is already
|
||||
/// fully processed. If so, it is removed from all queues. In this
|
||||
/// case the object is automatically destructed, so that the
|
||||
/// callback must not call delete in any case.
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
virtual bool operator() (ClusterCommResult*) = 0;
|
||||
};
|
||||
|
||||
typedef double ClusterCommTimeout; // in milliseconds
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief type of a timeout specification, is meant in seconds
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
typedef double ClusterCommTimeout;
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief used to store the status, progress and possibly result of
|
||||
/// an operation
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
struct ClusterCommOperation : public ClusterCommResult {
|
||||
rest::HttpRequest::HttpRequestType reqtype;
|
||||
|
@ -146,6 +188,10 @@ namespace triagens {
|
|||
}
|
||||
};
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief options for cluster operations
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
struct ClusterCommOptions {
|
||||
double _connectTimeout;
|
||||
double _requestTimeout;
|
||||
|
@ -165,6 +211,10 @@ void ClusterCommRestCallback(string& coordinator, rest::HttpResponse* response);
|
|||
// --SECTION-- ClusterComm
|
||||
// -----------------------------------------------------------------------------
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief the class for the cluster communications library
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
class ClusterComm {
|
||||
|
||||
friend class ClusterCommThread;
|
||||
|
@ -219,23 +269,6 @@ void ClusterCommRestCallback(string& coordinator, rest::HttpResponse* response);
|
|||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief submit an HTTP request to a shard asynchronously.
|
||||
///
|
||||
/// This function is only called when arangod is in coordinator mode. It
|
||||
/// queues a single HTTP request to one of the DBServers to be sent by
|
||||
/// ClusterComm in the background thread. This request actually orders
|
||||
/// an answer, which is an HTTP request sent from the target DBServer
|
||||
/// back to us. Therefore ClusterComm also creates an entry in a list of
|
||||
/// expected answers. One either has to use a callback for the answer,
|
||||
/// or poll for it, or drop it to prevent memory leaks. The result of
|
||||
/// this call is just a record that the initial HTTP request has been
|
||||
/// queued (`status` is CL_COMM_SUBMITTED). Use @ref enquire below to get
|
||||
/// information about the progress. The actual answer is then delivered
|
||||
/// either in the callback or via poll. The caller has to call delete on
|
||||
/// the resulting ClusterCommResult*. The library takes ownerships of
|
||||
/// the pointers `headerFields` and `callback` and releases
|
||||
/// the memory when the operation has been finished. It is the caller's
|
||||
/// responsibility to free the memory to which `body` points after the
|
||||
/// operation has finally terminated.
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
ClusterCommResult* asyncRequest (
|
||||
|
@ -252,15 +285,6 @@ void ClusterCommRestCallback(string& coordinator, rest::HttpResponse* response);
|
|||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief submit a single HTTP request to a shard synchronously.
|
||||
///
|
||||
/// This function does an HTTP request synchronously, waiting for the
|
||||
/// result. Note that the result has `status` field set to `CL_COMM_SENT`
|
||||
/// and the field `result` is set to the HTTP response. The field `answer`
|
||||
/// is unused in this case. In case of a timeout the field `status` is
|
||||
/// `CL_COMM_TIMEOUT` and the field `result` points to an HTTP response
|
||||
/// object that only says "timeout". Note that the ClusterComm library
|
||||
/// does not keep a record of this operation, in particular, you cannot
|
||||
/// use @ref enquire to ask about it.
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
ClusterCommResult* syncRequest (
|
||||
|
@ -276,32 +300,12 @@ void ClusterCommRestCallback(string& coordinator, rest::HttpResponse* response);
|
|||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief check on the status of an operation
|
||||
///
|
||||
/// This call never blocks and returns information about a specific operation
|
||||
/// given by `operationID`. Note that if the `status` is >= `CL_COMM_SENT`,
|
||||
/// then the `result` field in the returned object is set, if the `status`
|
||||
/// is `CL_COMM_RECEIVED`, then `answer` is set. However, in both cases
|
||||
/// the ClusterComm library retains the operation in its queues! Therefore,
|
||||
/// you have to use @ref wait or @ref drop to dequeue. Do not delete
|
||||
/// `result` and `answer` before doing this! However, you have to delete
|
||||
/// the ClusterCommResult pointer you get, it will automatically refrain
|
||||
/// from deleting `result` and `answer`.
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
ClusterCommResult const* enquire (OperationID const operationID);
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief wait for one answer matching the criteria
|
||||
///
|
||||
/// If clientTransactionID is empty, then any answer with any
|
||||
/// clientTransactionID matches. If coordTransactionID is 0, then
|
||||
/// any answer with any coordTransactionID matches. If shardID is
|
||||
/// empty, then any answer from any ShardID matches. If operationID
|
||||
/// is 0, then any answer with any operationID matches.
|
||||
/// This function returns 0 if noIf `timeout`
|
||||
/// is given, the result can be 0 indicating that no matching answer
|
||||
/// was available until the timeout was hit. The caller has to delete
|
||||
/// the result, if it is not 0.
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
ClusterCommResult* wait (
|
||||
|
@ -313,16 +317,6 @@ void ClusterCommRestCallback(string& coordinator, rest::HttpResponse* response);
|
|||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief ignore and drop current and future answers matching
|
||||
///
|
||||
/// If clientTransactionID is empty, then any answer with any
|
||||
/// clientTransactionID matches. If coordTransactionID is 0, then
|
||||
/// any answer with any coordTransactionID matches. If shardID is
|
||||
/// empty, then any answer from any ShardID matches. If operationID
|
||||
/// is 0, then any answer with any operationID matches. If there
|
||||
/// is already an answer for a matching operation, it is dropped and
|
||||
/// freed. If not, any future answer coming in is automatically dropped.
|
||||
/// This function can be used to automatically delete all information about an
|
||||
/// operation, for which @ref enquire reported successful completion.
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
void drop (ClientTransactionID const& clientTransactionID,
|
||||
|
@ -331,19 +325,14 @@ void ClusterCommRestCallback(string& coordinator, rest::HttpResponse* response);
|
|||
ShardID const& shardID);
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief process an answer coming in on the HTTP socket which is actually
|
||||
/// an answer to one of our earlier requests, return value of "" means OK
|
||||
/// and nonempty is an error. This is only called in a coordinator node
|
||||
/// and not in a DBServer node.
|
||||
/// @brief process an answer coming in on the HTTP socket
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
string processAnswer(string& coordinatorHeader,
|
||||
rest::HttpRequest* answer);
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief send an answer HTTP request to a coordinator, which contains
|
||||
/// in its body a HttpResponse that we already have. This is only called in
|
||||
/// a DBServer node and never in a coordinator node.
|
||||
/// @brief send an answer HTTP request to a coordinator
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
void asyncAnswer (string& coordinatorHeader,
|
||||
|
@ -387,7 +376,9 @@ void ClusterCommRestCallback(string& coordinator, rest::HttpResponse* response);
|
|||
return sec + usc / 1000000.0;
|
||||
}
|
||||
|
||||
static int const maxConnectionsPerServer = 2;
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief class to administrate one connection to a server
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
struct SingleServerConnection {
|
||||
httpclient::GeneralClientConnection* connection;
|
||||
|
@ -402,6 +393,10 @@ void ClusterCommRestCallback(string& coordinator, rest::HttpResponse* response);
|
|||
~SingleServerConnection ();
|
||||
};
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief class to administrate all connections to a server
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
struct ServerConnections {
|
||||
vector<SingleServerConnection*> connections;
|
||||
list<SingleServerConnection*> unused;
|
||||
|
@ -411,25 +406,52 @@ void ClusterCommRestCallback(string& coordinator, rest::HttpResponse* response);
|
|||
~ServerConnections (); // closes all connections
|
||||
};
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief map to store all connections to all servers with corresponding lock
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
// We keep connections to servers open but do not care
|
||||
// if they are closed. The key is the server ID.
|
||||
map<ServerID,ServerConnections*> allConnections;
|
||||
triagens::basics::ReadWriteLock allLock;
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief open or get a previously cached connection to a server
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
SingleServerConnection* getConnection(ServerID& serverID);
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief return leased connection to a server
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
void returnConnection(SingleServerConnection* singleConnection);
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief report a leased connection as being broken
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
void brokenConnection(SingleServerConnection* singleConnection);
|
||||
// The following closes all connections that have been unused for
|
||||
// more than limit seconds
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief closes all connections that have been unused for more than
|
||||
/// limit seconds
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
void closeUnusedConnections(double limit);
|
||||
|
||||
// The data structures for our internal queues:
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief send queue with lock and index
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
// Sending questions:
|
||||
list<ClusterCommOperation*> toSend;
|
||||
map<OperationID,list<ClusterCommOperation*>::iterator> toSendByOpID;
|
||||
triagens::basics::ConditionVariable somethingToSend;
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief received queue with lock and index
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
// Receiving answers:
|
||||
list<ClusterCommOperation*> received;
|
||||
map<OperationID,list<ClusterCommOperation*>::iterator> receivedByOpID;
|
||||
|
@ -440,23 +462,43 @@ void ClusterCommRestCallback(string& coordinator, rest::HttpResponse* response);
|
|||
// not have to!), then: first lock `somethingToReceive`, then
|
||||
// lock `somethingtoSend` in this order!
|
||||
|
||||
// We frequently need the following lengthy types:
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief iterator type which is frequently used
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
typedef list<ClusterCommOperation*>::iterator QueueIterator;
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief iterator type which is frequently used
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
typedef map<OperationID, QueueIterator>::iterator IndexIterator;
|
||||
|
||||
// An internal function to match an operation:
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief internal function to match an operation:
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
bool match (ClientTransactionID const& clientTransactionID,
|
||||
CoordTransactionID const coordTransactionID,
|
||||
ShardID const& shardID,
|
||||
ClusterCommOperation* op);
|
||||
|
||||
// Move an operation from the send to the receive queue:
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief move an operation from the send to the receive queue
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
bool moveFromSendToReceived (OperationID operationID);
|
||||
|
||||
// Cleanup all queues:
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief cleanup all queues
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
void cleanupAllQueues();
|
||||
|
||||
// Finally, our background communications thread:
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief our background communications thread
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
ClusterCommThread *_backgroundThread;
|
||||
}; // end of class ClusterComm
|
||||
|
||||
|
@ -464,6 +506,10 @@ void ClusterCommRestCallback(string& coordinator, rest::HttpResponse* response);
|
|||
// --SECTION-- ClusterCommThread
|
||||
// -----------------------------------------------------------------------------
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief our background communications thread
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
class ClusterCommThread : public basics::Thread {
|
||||
|
||||
// -----------------------------------------------------------------------------
|
||||
|
|
|
@ -29,9 +29,12 @@
|
|||
|
||||
#include "BasicsC/common.h"
|
||||
|
||||
#include "VocBase/server.h"
|
||||
|
||||
#include "Cluster/AgencyComm.h"
|
||||
#include "Cluster/ClusterInfo.h"
|
||||
#include "Cluster/ServerState.h"
|
||||
#include "Cluster/ClusterComm.h"
|
||||
#include "V8/v8-conv.h"
|
||||
#include "V8/v8-globals.h"
|
||||
#include "V8/v8-utils.h"
|
||||
|
@ -902,6 +905,310 @@ static v8::Handle<v8::Value> JS_StatusServerState (v8::Arguments const& argv) {
|
|||
return scope.Close(v8::String::New(state.c_str(), state.size()));
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief prepare to send a request
|
||||
///
|
||||
/// this is used for asynchronous as well as synchronous requests.
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
static void PrepareClusterCommRequest (
|
||||
v8::Arguments const& argv,
|
||||
triagens::rest::HttpRequest::HttpRequestType& reqType,
|
||||
ShardID& shardID,
|
||||
string& path,
|
||||
string& body,
|
||||
map<string, string>* headerFields,
|
||||
ClientTransactionID& clientTransactionID,
|
||||
CoordTransactionID& coordTransactionID,
|
||||
double& timeout) {
|
||||
|
||||
TRI_v8_global_t* v8g = (TRI_v8_global_t*)
|
||||
v8::Isolate::GetCurrent()->GetData();
|
||||
|
||||
reqType = triagens::rest::HttpRequest::HTTP_REQUEST_GET;
|
||||
if (argv.Length() > 0 && argv[0]->IsString()) {
|
||||
TRI_Utf8ValueNFC UTF8(TRI_UNKNOWN_MEM_ZONE, argv[0]);
|
||||
string methstring = *UTF8;
|
||||
reqType = triagens::rest::HttpRequest::translateMethod(methstring);
|
||||
if (reqType == triagens::rest::HttpRequest::HTTP_REQUEST_ILLEGAL) {
|
||||
reqType = triagens::rest::HttpRequest::HTTP_REQUEST_GET;
|
||||
}
|
||||
}
|
||||
|
||||
shardID.clear();
|
||||
if (argv.Length() > 1) {
|
||||
shardID = TRI_ObjectToString(argv[1]);
|
||||
}
|
||||
if (shardID == "") {
|
||||
shardID = "shardBlubb";
|
||||
}
|
||||
|
||||
string dbname;
|
||||
if (argv.Length() > 2) {
|
||||
dbname = TRI_ObjectToString(argv[2]);
|
||||
}
|
||||
if (dbname == "") {
|
||||
dbname = "_system";
|
||||
}
|
||||
path.clear();
|
||||
if (argv.Length() > 3) {
|
||||
path = TRI_ObjectToString(argv[3]);
|
||||
}
|
||||
if (path == "") {
|
||||
path = "/_admin/version";
|
||||
}
|
||||
path = "/_db/" + dbname + path;
|
||||
|
||||
body.clear();
|
||||
if (argv.Length() > 4) {
|
||||
body = TRI_ObjectToString(argv[4]);
|
||||
}
|
||||
|
||||
if (argv.Length() > 5 && argv[5]->IsObject()) {
|
||||
v8::Handle<v8::Object> obj = argv[5].As<v8::Object>();
|
||||
v8::Handle<v8::Array> props = obj->GetOwnPropertyNames();
|
||||
uint32_t i;
|
||||
for (i = 0; i < props->Length(); ++i) {
|
||||
v8::Handle<v8::Value> prop = props->Get(i);
|
||||
v8::Handle<v8::Value> val = obj->Get(prop);
|
||||
string propstring = TRI_ObjectToString(prop);
|
||||
string valstring = TRI_ObjectToString(val);
|
||||
if (propstring != "") {
|
||||
headerFields->insert(pair<string,string>(propstring, valstring));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
clientTransactionID = "";
|
||||
coordTransactionID = 0;
|
||||
timeout = 24*3600.0;
|
||||
|
||||
if (argv.Length() > 6 && argv[6]->IsObject()) {
|
||||
v8::Handle<v8::Object> opt = argv[6].As<v8::Object>();
|
||||
if (opt->Has(v8g->ClientTransactionIDKey)) {
|
||||
clientTransactionID
|
||||
= TRI_ObjectToString(opt->Get(v8g->ClientTransactionIDKey));
|
||||
}
|
||||
if (opt->Has(v8g->CoordTransactionIDKey)) {
|
||||
coordTransactionID
|
||||
= TRI_ObjectToUInt64(opt->Get(v8g->CoordTransactionIDKey), true);
|
||||
}
|
||||
if (opt->Has(v8g->TimeoutKey)) {
|
||||
timeout
|
||||
= TRI_ObjectToDouble(opt->Get(v8g->TimeoutKey));
|
||||
}
|
||||
}
|
||||
if (clientTransactionID == "") {
|
||||
clientTransactionID = StringUtils::itoa(TRI_NewTickServer());
|
||||
}
|
||||
if (coordTransactionID == 0) {
|
||||
coordTransactionID = TRI_NewTickServer();
|
||||
}
|
||||
if (timeout == 0) {
|
||||
timeout = 24*3600.0;
|
||||
}
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief prepare a ClusterCommResult for JavaScript
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
v8::Handle<v8::Object> PrepareClusterCommResultForJS(
|
||||
ClusterCommResult const* res) {
|
||||
v8::HandleScope scope;
|
||||
TRI_v8_global_t* v8g = (TRI_v8_global_t*)
|
||||
v8::Isolate::GetCurrent()->GetData();
|
||||
|
||||
v8::Handle<v8::Object> r = v8::Object::New();
|
||||
if (0 == res) {
|
||||
r->Set(v8g->ErrorMessageKey, v8::String::New("out of memory"));
|
||||
} else if (res->dropped) {
|
||||
r->Set(v8g->ErrorMessageKey, v8::String::New("operation was dropped"));
|
||||
} else {
|
||||
r->Set(v8g->ClientTransactionIDKey,
|
||||
v8::String::New(res->clientTransactionID.c_str(),
|
||||
res->clientTransactionID.size()));
|
||||
r->Set(v8g->CoordTransactionIDKey,
|
||||
v8::Number::New(res->coordTransactionID));
|
||||
r->Set(v8g->OperationIDKey,
|
||||
v8::Number::New(res->operationID));
|
||||
r->Set(v8g->ShardIDKey,
|
||||
v8::String::New(res->shardID.c_str(), res->shardID.size()));
|
||||
if (res->status == CL_COMM_SUBMITTED) {
|
||||
r->Set(v8g->StatusKey, v8::String::New("SUBMITTED"));
|
||||
}
|
||||
else if (res->status == CL_COMM_SENT) {
|
||||
r->Set(v8g->StatusKey, v8::String::New("SENT"));
|
||||
// Maybe return the response of the initial request???
|
||||
}
|
||||
else if (res->status == CL_COMM_TIMEOUT) {
|
||||
r->Set(v8g->StatusKey, v8::String::New("TIMEOUT"));
|
||||
r->Set(v8g->TimeoutKey,v8::BooleanObject::New(true));
|
||||
}
|
||||
else if (res->status == CL_COMM_ERROR) {
|
||||
r->Set(v8g->StatusKey, v8::String::New("ERROR"));
|
||||
r->Set(v8g->ErrorMessageKey,
|
||||
v8::String::New("could not send request, DBServer gone"));
|
||||
}
|
||||
else if (res->status == CL_COMM_DROPPED) {
|
||||
r->Set(v8g->StatusKey, v8::String::New("DROPPED"));
|
||||
r->Set(v8g->ErrorMessageKey,
|
||||
v8::String::New("request dropped whilst waiting for answer"));
|
||||
}
|
||||
else { // Everything is OK
|
||||
// The headers:
|
||||
v8::Handle<v8::Object> h = v8::Object::New();
|
||||
map<string,string> headers = res->answer->headers();
|
||||
map<string,string>::iterator i;
|
||||
for (i = headers.begin(); i != headers.end(); ++i) {
|
||||
h->Set(v8::String::New(i->first.c_str()),
|
||||
v8::String::New(i->second.c_str()));
|
||||
}
|
||||
r->Set(v8::String::New("headers"), h);
|
||||
|
||||
// The body:
|
||||
if (0 != res->answer->body()) {
|
||||
r->Set(v8::String::New("body"), v8::String::New(res->answer->body(),
|
||||
res->answer->bodySize()));
|
||||
}
|
||||
}
|
||||
}
|
||||
return scope.Close(r);
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief send an asynchronous request
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
static v8::Handle<v8::Value> JS_AsyncRequest (v8::Arguments const& argv) {
|
||||
v8::HandleScope scope;
|
||||
|
||||
if (argv.Length() < 4 || argv.Length() > 7) {
|
||||
TRI_V8_EXCEPTION_USAGE(scope, "asyncRequest("
|
||||
"reqType, shardID, dbname, path, body, headers, options)");
|
||||
}
|
||||
// Possible options:
|
||||
// - clientTransactionID (string)
|
||||
// - coordTransactionID (number)
|
||||
// - timeout (number)
|
||||
|
||||
if (ServerState::instance()->getRole() != ServerState::ROLE_COORDINATOR) {
|
||||
TRI_V8_EXCEPTION_INTERNAL(scope,"request works only in coordinator role");
|
||||
}
|
||||
|
||||
ClusterComm* cc = ClusterComm::instance();
|
||||
|
||||
if (cc == 0) {
|
||||
TRI_V8_EXCEPTION_MESSAGE(scope, TRI_ERROR_INTERNAL,
|
||||
"clustercomm object not found");
|
||||
}
|
||||
|
||||
triagens::rest::HttpRequest::HttpRequestType reqType;
|
||||
ShardID shardID;
|
||||
string path;
|
||||
string body;
|
||||
map<string, string>* headerFields = new map<string, string>;
|
||||
ClientTransactionID clientTransactionID;
|
||||
CoordTransactionID coordTransactionID;
|
||||
double timeout;
|
||||
|
||||
PrepareClusterCommRequest(argv, reqType, shardID, path, body, headerFields,
|
||||
clientTransactionID, coordTransactionID, timeout);
|
||||
|
||||
ClusterCommResult const* res;
|
||||
|
||||
res = cc->asyncRequest(clientTransactionID, coordTransactionID, shardID,
|
||||
reqType, path, body.c_str(), body.size(),
|
||||
headerFields, 0, timeout);
|
||||
|
||||
if (res == 0) {
|
||||
TRI_V8_EXCEPTION_MESSAGE(scope, TRI_ERROR_INTERNAL,
|
||||
"couldn't queue async request");
|
||||
}
|
||||
|
||||
LOG_DEBUG("JS_AsyncRequest: request has been submitted");
|
||||
|
||||
v8::Handle<v8::Object> result = PrepareClusterCommResultForJS(res);
|
||||
delete res;
|
||||
|
||||
return scope.Close(result);
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief wait for the result of an asynchronous request
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
static v8::Handle<v8::Value> JS_Wait (v8::Arguments const& argv) {
|
||||
v8::HandleScope scope;
|
||||
|
||||
if (argv.Length() != 1) {
|
||||
TRI_V8_EXCEPTION_USAGE(scope, "wait(obj)");
|
||||
}
|
||||
// Possible options:
|
||||
// - clientTransactionID (string)
|
||||
// - coordTransactionID (number)
|
||||
// - operationID (number)
|
||||
// - shardID (string)
|
||||
// - timeout (number)
|
||||
|
||||
if (ServerState::instance()->getRole() != ServerState::ROLE_COORDINATOR) {
|
||||
TRI_V8_EXCEPTION_INTERNAL(scope,"request works only in coordinator role");
|
||||
}
|
||||
|
||||
ClusterComm* cc = ClusterComm::instance();
|
||||
|
||||
if (cc == 0) {
|
||||
TRI_V8_EXCEPTION_MESSAGE(scope, TRI_ERROR_INTERNAL,
|
||||
"clustercomm object not found");
|
||||
}
|
||||
|
||||
ClientTransactionID clientTransactionID = "";
|
||||
CoordTransactionID coordTransactionID = 0;
|
||||
OperationID operationID = 0;
|
||||
ShardID shardID = "";
|
||||
double timeout = 24*3600.0;
|
||||
|
||||
TRI_v8_global_t* v8g = (TRI_v8_global_t*)
|
||||
v8::Isolate::GetCurrent()->GetData();
|
||||
|
||||
if (argv[0]->IsObject()) {
|
||||
v8::Handle<v8::Object> obj = argv[0].As<v8::Object>();
|
||||
if (obj->Has(v8g->ClientTransactionIDKey)) {
|
||||
clientTransactionID
|
||||
= TRI_ObjectToString(obj->Get(v8g->ClientTransactionIDKey));
|
||||
}
|
||||
if (obj->Has(v8g->CoordTransactionIDKey)) {
|
||||
coordTransactionID
|
||||
= TRI_ObjectToUInt64(obj->Get(v8g->CoordTransactionIDKey), true);
|
||||
}
|
||||
if (obj->Has(v8g->OperationIDKey)) {
|
||||
operationID = TRI_ObjectToUInt64(obj->Get(v8g->OperationIDKey), true);
|
||||
}
|
||||
if (obj->Has(v8g->ShardIDKey)) {
|
||||
shardID = TRI_ObjectToString(obj->Get(v8g->ShardIDKey));
|
||||
}
|
||||
if (obj->Has(v8g->TimeoutKey)) {
|
||||
timeout = TRI_ObjectToDouble(obj->Get(v8g->TimeoutKey));
|
||||
if (timeout == 0.0) {
|
||||
timeout = 24*3600.0;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ClusterCommResult const* res;
|
||||
|
||||
LOG_DEBUG("JS_wait: calling ClusterComm::wait()");
|
||||
|
||||
res = cc->wait(clientTransactionID, coordTransactionID, operationID,
|
||||
shardID, timeout);
|
||||
|
||||
v8::Handle<v8::Object> result = PrepareClusterCommResultForJS(res);
|
||||
delete res;
|
||||
|
||||
return scope.Close(result);
|
||||
}
|
||||
|
||||
|
||||
// -----------------------------------------------------------------------------
|
||||
// --SECTION-- public functions
|
||||
// -----------------------------------------------------------------------------
|
||||
|
@ -921,9 +1228,9 @@ void TRI_InitV8Cluster (v8::Handle<v8::Context> context) {
|
|||
v8::Handle<v8::ObjectTemplate> rt;
|
||||
v8::Handle<v8::FunctionTemplate> ft;
|
||||
|
||||
// .............................................................................
|
||||
// ...........................................................................
|
||||
// generate the agency template
|
||||
// .............................................................................
|
||||
// ...........................................................................
|
||||
|
||||
ft = v8::FunctionTemplate::New();
|
||||
ft->SetClassName(TRI_V8_SYMBOL("ArangoAgency"));
|
||||
|
@ -986,7 +1293,7 @@ void TRI_InitV8Cluster (v8::Handle<v8::Context> context) {
|
|||
|
||||
// .............................................................................
|
||||
// generate the server state template
|
||||
// .............................................................................
|
||||
// ...........................................................................
|
||||
|
||||
ft = v8::FunctionTemplate::New();
|
||||
ft->SetClassName(TRI_V8_SYMBOL("ArangoServerState"));
|
||||
|
@ -1011,6 +1318,36 @@ void TRI_InitV8Cluster (v8::Handle<v8::Context> context) {
|
|||
if (! ss.IsEmpty()) {
|
||||
TRI_AddGlobalVariableVocbase(context, "ArangoServerState", ss);
|
||||
}
|
||||
|
||||
// ...........................................................................
|
||||
// generate the cluster comm template
|
||||
// ...........................................................................
|
||||
|
||||
ft = v8::FunctionTemplate::New();
|
||||
ft->SetClassName(TRI_V8_SYMBOL("ArangoClusterComm"));
|
||||
|
||||
rt = ft->InstanceTemplate();
|
||||
rt->SetInternalFieldCount(2);
|
||||
|
||||
TRI_AddMethodVocbase(rt, "asyncRequest", JS_AsyncRequest);
|
||||
#if 0
|
||||
TRI_AddMethodVocbase(rt, "syncRequest", JS_SyncRequest);
|
||||
TRI_AddMethodVocbase(rt, "enquire", JS_Enquire);
|
||||
#endif
|
||||
TRI_AddMethodVocbase(rt, "wait", JS_Wait);
|
||||
#if 0
|
||||
TRI_AddMethodVocbase(rt, "drop", JS_Drop);
|
||||
#endif
|
||||
|
||||
v8g->ClusterCommTempl = v8::Persistent<v8::ObjectTemplate>::New(isolate, rt);
|
||||
TRI_AddGlobalFunctionVocbase(context, "ArangoClusterCommCtor",
|
||||
ft->GetFunction());
|
||||
|
||||
// register the global object
|
||||
ss = v8g->ClusterCommTempl->NewInstance();
|
||||
if (! ss.IsEmpty()) {
|
||||
TRI_AddGlobalVariableVocbase(context, "ArangoClusterComm", ss);
|
||||
}
|
||||
}
|
||||
|
||||
// Local Variables:
|
||||
|
|
|
@ -1010,11 +1010,8 @@ static v8::Handle<v8::Value> JS_ShardingTest (v8::Arguments const& argv) {
|
|||
|
||||
}
|
||||
}
|
||||
delete headerFields;
|
||||
|
||||
if (0 != res) {
|
||||
delete res;
|
||||
}
|
||||
delete res;
|
||||
|
||||
return scope.Close(r);
|
||||
}
|
||||
|
|
|
@ -834,11 +834,17 @@ actions.defineHttp({
|
|||
/// @RESTHEADER{POST /_admin/execute,executes a program}
|
||||
///
|
||||
/// @RESTBODYPARAM{body,javascript,required}
|
||||
/// The body to be executed.
|
||||
/// The body to be executed.
|
||||
///
|
||||
/// @RESTDESCRIPTION
|
||||
///
|
||||
/// Executes the javascript code in the body on the server.
|
||||
/// Executes the javascript code in the body on the server as the body
|
||||
/// of a function with no arguments. If you have a `return` statement
|
||||
/// then the return value you produce will be returned as content type
|
||||
/// `application/json`. If the parameter `returnAsJSON` is set to
|
||||
/// `true`, the result will be a JSON object describing the return value
|
||||
/// directly, otherwise a string produced by JSON.stringify will be
|
||||
/// returned.
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
actions.defineHttp({
|
||||
|
@ -856,7 +862,13 @@ actions.defineHttp({
|
|||
result = eval("(function() {" + body + "}());");
|
||||
}
|
||||
|
||||
actions.resultOk(req, res, actions.HTTP_OK, JSON.stringify(result));
|
||||
if (req.parameters.hasOwnProperty("returnAsJSON") &&
|
||||
req.parameters.returnAsJSON === "true") {
|
||||
actions.resultOk(req, res, actions.HTTP_OK, result);
|
||||
}
|
||||
else {
|
||||
actions.resultOk(req, res, actions.HTTP_OK, JSON.stringify(result));
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
|
|
|
@ -43,6 +43,7 @@ TRI_v8_global_s::TRI_v8_global_s (v8::Isolate* isolate)
|
|||
AgencyTempl(),
|
||||
ClusterInfoTempl(),
|
||||
ServerStateTempl(),
|
||||
ClusterCommTempl(),
|
||||
#endif
|
||||
ErrorTempl(),
|
||||
GeneralCursorTempl(),
|
||||
|
@ -66,9 +67,15 @@ TRI_v8_global_s::TRI_v8_global_s (v8::Isolate* isolate)
|
|||
BodyFromFileKey(),
|
||||
BodyKey(),
|
||||
ClientKey(),
|
||||
#ifdef TRI_ENABLE_CLUSTER
|
||||
ClientTransactionIDKey(),
|
||||
#endif
|
||||
CodeKey(),
|
||||
CompatibilityKey(),
|
||||
ContentTypeKey(),
|
||||
#ifdef TRI_ENABLE_CLUSTER
|
||||
CoordTransactionIDKey(),
|
||||
#endif
|
||||
DatabaseKey(),
|
||||
DoCompactKey(),
|
||||
DomainKey(),
|
||||
|
@ -85,6 +92,9 @@ TRI_v8_global_s::TRI_v8_global_s (v8::Isolate* isolate)
|
|||
LengthKey(),
|
||||
LifeTimeKey(),
|
||||
NameKey(),
|
||||
#ifdef TRI_ENABLE_CLUSTER
|
||||
OperationIDKey(),
|
||||
#endif
|
||||
ParametersKey(),
|
||||
PathKey(),
|
||||
PrefixKey(),
|
||||
|
@ -95,7 +105,14 @@ TRI_v8_global_s::TRI_v8_global_s (v8::Isolate* isolate)
|
|||
ResponseCodeKey(),
|
||||
SecureKey(),
|
||||
ServerKey(),
|
||||
#ifdef TRI_ENABLE_CLUSTER
|
||||
ShardIDKey(),
|
||||
StatusKey(),
|
||||
#endif
|
||||
SuffixKey(),
|
||||
#ifdef TRI_ENABLE_CLUSTER
|
||||
TimeoutKey(),
|
||||
#endif
|
||||
TransformationsKey(),
|
||||
UrlKey(),
|
||||
UserKey(),
|
||||
|
@ -130,10 +147,16 @@ TRI_v8_global_s::TRI_v8_global_s (v8::Isolate* isolate)
|
|||
BodyFromFileKey = v8::Persistent<v8::String>::New(isolate, TRI_V8_SYMBOL("bodyFromFile"));
|
||||
BodyKey = v8::Persistent<v8::String>::New(isolate, TRI_V8_SYMBOL("body"));
|
||||
ClientKey = v8::Persistent<v8::String>::New(isolate, TRI_V8_SYMBOL("client"));
|
||||
#ifdef TRI_ENABLE_CLUSTER
|
||||
ClientTransactionIDKey = v8::Persistent<v8::String>::New(isolate, TRI_V8_SYMBOL("clientTransactionID"));
|
||||
#endif
|
||||
CodeKey = v8::Persistent<v8::String>::New(isolate, TRI_V8_SYMBOL("code"));
|
||||
CompatibilityKey = v8::Persistent<v8::String>::New(isolate, TRI_V8_SYMBOL("compatibility"));
|
||||
ContentTypeKey = v8::Persistent<v8::String>::New(isolate, TRI_V8_SYMBOL("contentType"));
|
||||
CookiesKey = v8::Persistent<v8::String>::New(isolate, TRI_V8_SYMBOL("cookies"));
|
||||
#ifdef TRI_ENABLE_CLUSTER
|
||||
CoordTransactionIDKey = v8::Persistent<v8::String>::New(isolate, TRI_V8_SYMBOL("coordTransactionID"));
|
||||
#endif
|
||||
DatabaseKey = v8::Persistent<v8::String>::New(isolate, TRI_V8_SYMBOL("database"));
|
||||
DoCompactKey = v8::Persistent<v8::String>::New(isolate, TRI_V8_SYMBOL("doCompact"));
|
||||
DomainKey = v8::Persistent<v8::String>::New(isolate, TRI_V8_SYMBOL("domain"));
|
||||
|
@ -150,6 +173,9 @@ TRI_v8_global_s::TRI_v8_global_s (v8::Isolate* isolate)
|
|||
LengthKey = v8::Persistent<v8::String>::New(isolate, TRI_V8_SYMBOL("length"));
|
||||
LifeTimeKey = v8::Persistent<v8::String>::New(isolate, TRI_V8_SYMBOL("lifeTime"));
|
||||
NameKey = v8::Persistent<v8::String>::New(isolate, TRI_V8_SYMBOL("name"));
|
||||
#ifdef TRI_ENABLE_CLUSTER
|
||||
OperationIDKey = v8::Persistent<v8::String>::New(isolate, TRI_V8_SYMBOL("operationID"));
|
||||
#endif
|
||||
ParametersKey = v8::Persistent<v8::String>::New(isolate, TRI_V8_SYMBOL("parameters"));
|
||||
PathKey = v8::Persistent<v8::String>::New(isolate, TRI_V8_SYMBOL("path"));
|
||||
PrefixKey = v8::Persistent<v8::String>::New(isolate, TRI_V8_SYMBOL("prefix"));
|
||||
|
@ -160,7 +186,14 @@ TRI_v8_global_s::TRI_v8_global_s (v8::Isolate* isolate)
|
|||
ResponseCodeKey = v8::Persistent<v8::String>::New(isolate, TRI_V8_SYMBOL("responseCode"));
|
||||
SecureKey = v8::Persistent<v8::String>::New(isolate, TRI_V8_SYMBOL("secure"));
|
||||
ServerKey = v8::Persistent<v8::String>::New(isolate, TRI_V8_SYMBOL("server"));
|
||||
#ifdef TRI_ENABLE_CLUSTER
|
||||
ShardIDKey = v8::Persistent<v8::String>::New(isolate, TRI_V8_SYMBOL("shardID"));
|
||||
StatusKey = v8::Persistent<v8::String>::New(isolate, TRI_V8_SYMBOL("status"));
|
||||
#endif
|
||||
SuffixKey = v8::Persistent<v8::String>::New(isolate, TRI_V8_SYMBOL("suffix"));
|
||||
#ifdef TRI_ENABLE_CLUSTER
|
||||
TimeoutKey = v8::Persistent<v8::String>::New(isolate, TRI_V8_SYMBOL("timeout"));
|
||||
#endif
|
||||
TransformationsKey = v8::Persistent<v8::String>::New(isolate, TRI_V8_SYMBOL("transformations"));
|
||||
UrlKey = v8::Persistent<v8::String>::New(isolate, TRI_V8_SYMBOL("url"));
|
||||
UserKey = v8::Persistent<v8::String>::New(isolate, TRI_V8_SYMBOL("user"));
|
||||
|
|
|
@ -226,13 +226,21 @@ typedef struct TRI_v8_global_s {
|
|||
#endif
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief agency template
|
||||
/// @brief server state template
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
#ifdef TRI_ENABLE_CLUSTER
|
||||
v8::Persistent<v8::ObjectTemplate> ServerStateTempl;
|
||||
#endif
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief cluster comm template
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
#ifdef TRI_ENABLE_CLUSTER
|
||||
v8::Persistent<v8::ObjectTemplate> ClusterCommTempl;
|
||||
#endif
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief error template
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -359,6 +367,14 @@ typedef struct TRI_v8_global_s {
|
|||
|
||||
v8::Persistent<v8::String> ClientKey;
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief "clientTransactionID" key name
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
#ifdef TRI_ENABLE_CLUSTER
|
||||
v8::Persistent<v8::String> ClientTransactionIDKey;
|
||||
#endif
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief "code" key name
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -383,6 +399,14 @@ typedef struct TRI_v8_global_s {
|
|||
|
||||
v8::Persistent<v8::String> CookiesKey;
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief "coordTransactionID" key name
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
#ifdef TRI_ENABLE_CLUSTER
|
||||
v8::Persistent<v8::String> CoordTransactionIDKey;
|
||||
#endif
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief "database" key name
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -479,6 +503,14 @@ typedef struct TRI_v8_global_s {
|
|||
|
||||
v8::Persistent<v8::String> NameKey;
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief "operationID" key
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
#ifdef TRI_ENABLE_CLUSTER
|
||||
v8::Persistent<v8::String> OperationIDKey;
|
||||
#endif
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief "parameters" key name
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -539,12 +571,36 @@ typedef struct TRI_v8_global_s {
|
|||
|
||||
v8::Persistent<v8::String> ServerKey;
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief "shardID" key name
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
#ifdef TRI_ENABLE_CLUSTER
|
||||
v8::Persistent<v8::String> ShardIDKey;
|
||||
#endif
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief "status" key name
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
#ifdef TRI_ENABLE_CLUSTER
|
||||
v8::Persistent<v8::String> StatusKey;
|
||||
#endif
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief "suffix" key name
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
v8::Persistent<v8::String> SuffixKey;
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief "timeout" key name
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
#ifdef TRI_ENABLE_CLUSTER
|
||||
v8::Persistent<v8::String> TimeoutKey;
|
||||
#endif
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief "transformations" key name
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
|
|
@ -47,12 +47,18 @@ if [ "$1" == "init" ] ; then
|
|||
set Target/DBServers
|
||||
set Target/Coordinators
|
||||
set Target/Collections
|
||||
set Target/Collections/_system
|
||||
set Target/Collections/_system/Version 1
|
||||
set Target/Collections/_system/Lock UNLOCKED
|
||||
set Target/ShardLocation
|
||||
|
||||
set Plan/Version 1
|
||||
set Plan/DBServers
|
||||
set Plan/Coordinators
|
||||
set Plan/Collections
|
||||
set Plan/Collections/_system
|
||||
set Plan/Collections/_system/Version 1
|
||||
set Plan/Collections/_system/Lock UNLOCKED
|
||||
set Plan/ShardLocation
|
||||
|
||||
set Current/Version 1
|
||||
|
@ -60,6 +66,9 @@ if [ "$1" == "init" ] ; then
|
|||
set Current/DBServers
|
||||
set Current/Coordinators
|
||||
set Current/Collections
|
||||
set Current/Collections/_system
|
||||
set Current/Collections/_system/Version 1
|
||||
set Current/Collections/_system/Lock UNLOCKED
|
||||
set Current/ShardLocation
|
||||
|
||||
set Current/ShardsCopied
|
||||
|
|
Loading…
Reference in New Issue