1
0
Fork 0

Basic queueing logic ready.

Actual sending still todo.
This commit is contained in:
Max Neunhoeffer 2013-12-19 09:06:45 +01:00
parent 32f49d3c09
commit b55c7e22f9
4 changed files with 432 additions and 101 deletions

View File

@ -283,71 +283,314 @@ void ClusterComm::closeUnusedConnections () {
}
ClusterCommResult* ClusterComm::asyncRequest (
ClientTransactionID const& clientTransactionID,
TransactionID const coordTransactionID,
ShardID const& shardID,
ClientTransactionID const clientTransactionID,
CoordTransactionID const coordTransactionID,
ShardID const shardID,
rest::HttpRequest::HttpRequestType reqtype,
string const& path,
char const * body,
string const path,
char const* body,
size_t const bodyLength,
map<string, string> const& headerFields,
map<string, string>* headerFields,
ClusterCommCallback* callback,
ClusterCommTimeout timeout) {
OperationID opID = getOperationID();
// Build HTTPRequest
// Build ClusterCommOperation object
// Put into queue
// signal on condition variable
// Build ClusterCommResult object
// return
return 0;
ClusterCommOperation* op = new ClusterCommOperation();
op->clientTransactionID = clientTransactionID;
op->coordTransactionID = coordTransactionID;
do {
op->operationID = getOperationID();
} while (op->operationID == 0); // just to make sure
op->shardID = shardID;
op->serverID = ClusterState::instance()->getResponsibleServer(
shardID);
op->status = CL_COMM_SUBMITTED;
op->path = path;
op->body = body;
op->bodyLength = bodyLength;
op->headerFields = headerFields;
op->callback = callback;
op->timeout = timeout;
ClusterCommResult* res = new ClusterCommResult();
*res = *static_cast<ClusterCommResult*>(op);
{
basics::ConditionLocker locker(&somethingToSend);
toSend.push_back(op);
list<ClusterCommOperation*>::iterator i = toSend.end();
toSendByOpID[op->operationID] = --i;
}
somethingToSend.signal();
return res;
}
bool ClusterComm::match (ClientTransactionID const& clientTransactionID,
TransactionID const coordTransactionID,
OperationID const operationID,
bool ClusterComm::match (
ClientTransactionID const& clientTransactionID,
CoordTransactionID const coordTransactionID,
ShardID const& shardID,
ClusterCommOperation* op) {
// First check operationID, if given, can return false already
// then check other IDs.
return true;
return ( (clientTransactionID == "" ||
clientTransactionID == op->clientTransactionID) &&
(0 == coordTransactionID ||
coordTransactionID == op->coordTransactionID) &&
(shardID == "" ||
shardID == op->shardID) );
}
ClusterCommResult* enquire (OperationID const operationID) {
// Find operation by its ID (fast)
// build ClusterCommResult object and return it.
ClusterCommResult* ClusterComm::enquire (OperationID const operationID) {
IndexIterator i;
ClusterCommOperation* op = 0;
ClusterCommResult* res;
// First look into the send queue:
{
basics::ConditionLocker locker(&somethingToSend);
i = toSendByOpID.find(operationID);
if (i != toSendByOpID.end()) {
res = new ClusterCommResult();
if (0 == res) {
return 0;
}
op = *(i->second);
*res = *static_cast<ClusterCommResult*>(op);
return res;
}
}
// Note that operations only ever move from the send queue to the
// receive queue and never in the other direction. Therefore it is
// OK to use two different locks here, since we look first in the
// send queue and then in the receive queue; we can never miss
// an operation that is actually there.
// If the above did not give anything, look into the receive queue:
{
basics::ConditionLocker locker(&somethingReceived);
i = receivedByOpID.find(operationID);
if (i != toSendByOpID.end()) {
res = new ClusterCommResult();
if (0 == res) {
return 0;
}
op = *(i->second);
*res = *static_cast<ClusterCommResult*>(op);
return res;
}
}
return 0;
}
ClusterCommResult* ClusterComm::wait (
ClientTransactionID const& clientTransactionID,
TransactionID const coordTransactionID,
CoordTransactionID const coordTransactionID,
OperationID const operationID,
ShardID const& shardID,
ClusterCommTimeout timeout) {
// Only look at received queue, match, return the first with CL_COMM_RECEIVED
// dequeue it
// Initialise remaining time
// If nothing found, use condition variable and wait to get more with
// possible timeout, if timeout, return empty
// otherwise check again, if ...
return 0;
IndexIterator i;
QueueIterator q;
ClusterCommOperation* op = 0;
ClusterCommResult* res = 0;
double endtime;
double timeleft;
bool found;
if (0.0 == timeout) {
endtime = 1.0e50; // this is the Sankt Nimmerleinstag
}
else {
endtime = now() + timeout;
}
if (0 != operationID) {
// In this case we only have to look into at most one operation.
basics::ConditionLocker locker(&somethingReceived);
while (true) { // will be left by return or break on timeout
i = receivedByOpID.find(operationID);
if (i == receivedByOpID.end()) {
// It could be that the operation is still in the send queue:
basics::ConditionLocker sendlocker(&somethingToSend);
i = toSendByOpID.find(operationID);
if (i == toSendByOpID.end()) {
// Nothing known about this operation, return with failure:
res = new ClusterCommResult();
res->operationID = operationID;
res->status = CL_COMM_DROPPED;
return res;
}
}
else {
// It is in the receive queue, now look at the status:
q = i->second;
op = *q;
if (op->status >= CL_COMM_TIMEOUT) {
// It is done, let's remove it from the queue and return it:
receivedByOpID.erase(i);
received.erase(q);
res = static_cast<ClusterCommResult*>(op);
return res;
}
// It is in the receive queue but still waiting, now wait actually
}
// Here it could either be in the receive or the send queue, let's wait
timeleft = endtime - now();
if (timeleft <= 0) break;
somethingReceived.wait(uint64_t(timeleft * 1000.0));
}
// This place is only reached on timeout
}
else {
// here, operationID == 0, so we have to do matching, we are only
// interested, if at least one operation matches, if it is ready,
// we return it immediately, otherwise, we report an error or wait.
basics::ConditionLocker locker(&somethingReceived);
while (true) { // will be left by return or break on timeout
found = false;
for (q = received.begin(); q != received.end(); q++) {
op = *q;
if (match(clientTransactionID, coordTransactionID, shardID, op)) {
found = true;
if (op->status >= CL_COMM_TIMEOUT) {
// It is done, let's remove it from the queue and return it:
i = receivedByOpID.find(op->operationID); // cannot fail!
assert(i != receivedByOpID.end());
assert(i->second == q);
receivedByOpID.erase(i);
received.erase(q);
res = static_cast<ClusterCommResult*>(op);
return res;
}
}
}
// If we found nothing, we have to look through the send queue:
if (!found) {
basics::ConditionLocker sendlocker(&somethingToSend);
for (q = toSend.begin(); q != toSend.end(); q++) {
op = *q;
if (match(clientTransactionID, coordTransactionID, shardID, op)) {
found = true;
break;
}
}
}
if (!found) {
// Nothing known about this operation, return with failure:
res = new ClusterCommResult();
res->clientTransactionID = clientTransactionID;
res->coordTransactionID = coordTransactionID;
res->operationID = operationID;
res->shardID = shardID;
res->status = CL_COMM_DROPPED;
return res;
}
// Here it could either be in the receive or the send queue, let's wait
timeleft = endtime - now();
if (timeleft <= 0) break;
somethingReceived.wait(uint64_t(timeleft * 1000.0));
}
// This place is only reached on timeout
}
// Now we have to react on timeout:
res = new ClusterCommResult();
res->clientTransactionID = clientTransactionID;
res->coordTransactionID = coordTransactionID;
res->operationID = operationID;
res->shardID = shardID;
res->status = CL_COMM_TIMEOUT;
return res;
}
void ClusterComm::drop (ClientTransactionID const& clientTransactionID,
TransactionID const coordTransactionID,
void ClusterComm::drop (
ClientTransactionID const& clientTransactionID,
CoordTransactionID const coordTransactionID,
OperationID const operationID,
ShardID const& shardID) {
// Look at both send queue and recv queue, delete everything found
QueueIterator q;
QueueIterator nextq;
IndexIterator i;
ClusterCommOperation* op;
// First look through the send queue:
{
basics::ConditionLocker sendlocker(&somethingToSend);
for (q = toSend.begin(); q != toSend.end(); ) {
op = *q;
if ((0 != operationID && operationID == op->operationID) ||
match(clientTransactionID, coordTransactionID, shardID, op)) {
nextq = q;
nextq++;
i = toSendByOpID.find(op->operationID); // cannot fail
assert(i != toSendByOpID.end());
assert(q == i->second);
receivedByOpID.erase(i);
toSend.erase(q);
q = nextq;
}
else {
q++;
}
}
}
// Now look through the receive queue:
{
basics::ConditionLocker locker(&somethingReceived);
for (q = received.begin(); q != received.end(); ) {
op = *q;
if ((0 != operationID && operationID == op->operationID) ||
match(clientTransactionID, coordTransactionID, shardID, op)) {
nextq = q;
nextq++;
i = receivedByOpID.find(op->operationID); // cannot fail
assert(i != receivedByOpID.end());
assert(q == i->second);
receivedByOpID.erase(i);
toSend.erase(q);
q = nextq;
}
else {
q++;
}
}
}
}
int ClusterComm::processAnswer(rest::HttpRequest* answer) {
// find matching operation, report if found, otherwise drop
return TRI_ERROR_NO_ERROR;
}
// Move an operation from the send to the receive queue:
bool ClusterComm::moveFromSendToReceived (OperationID operationID) {
QueueIterator q;
IndexIterator i;
ClusterCommOperation* op;
basics::ConditionLocker locker(&somethingReceived);
basics::ConditionLocker sendlocker(&somethingToSend);
i = toSendByOpID.find(operationID); // cannot fail
assert(i != toSendByOpID.end());
q = i->second;
op = *q;
assert(op->operationID == operationID);
toSendByOpID.erase(i);
toSend.erase(q);
if (CL_COMM_DROPPING == op->status) {
return false;
}
op->status = CL_COMM_SENT;
received.push_back(op);
q = received.end();
q--;
receivedByOpID[operationID] = q;
return true;
}
// -----------------------------------------------------------------------------
// --SECTION-- ClusterCommThread
// -----------------------------------------------------------------------------
@ -385,12 +628,46 @@ ClusterCommThread::~ClusterCommThread () {
////////////////////////////////////////////////////////////////////////////////
void ClusterCommThread::run () {
ClusterComm::QueueIterator q;
ClusterComm::IndexIterator i;
ClusterCommOperation* op;
ClusterComm* cc = ClusterComm::instance();
LOG_TRACE("starting ClusterComm thread");
while (! _stop) {
usleep(2000000);
// FIXME: ...
LOG_DEBUG("ClusterComm alive");
// First check the sending queue, as long as it is not empty, we send
// a request via SimpleHttpClient:
while (true) { // will be left by break when queue is empty
{
basics::ConditionLocker locker(&cc->somethingToSend);
if (cc->toSend.empty()) {
break;
}
op = cc->toSend.front();
assert(op->status == CL_COMM_SUBMITTED);
op->status = CL_COMM_SENDING;
}
// We release the lock, if it is dropped now, the status just goes
// to CL_COMM_DROPPING, we find out about this after we have sent
// the request.
LOG_DEBUG("ClusterComm faking a send");
if (!cc->moveFromSendToReceived(op->operationID)) {
// It was dropped in the meantime, so forget about it:
delete op;
}
}
// Now wait for the condition variable, but use a timeout to notice
// a request to terminate the thread:
{
basics::ConditionLocker locker(&cc->somethingToSend);
locker.wait(100);
}
}
// another thread is waiting for this value to shut down properly

View File

@ -28,6 +28,7 @@
#ifndef TRIAGENS_CLUSTER_COMM_H
#define TRIAGENS_CLUSTER_COMM_H 1
#include "BasicsC/common.h"
#include "Basics/Common.h"
#include "Basics/ReadWriteLock.h"
#include "Basics/ConditionVariable.h"
@ -59,21 +60,23 @@ namespace triagens {
// -----------------------------------------------------------------------------
typedef string ClientTransactionID; // Transaction ID from client
typedef TRI_voc_tick_t TransactionID; // Coordinator transaction ID
typedef TRI_voc_tick_t CoordTransactionID; // Coordinator transaction ID
typedef TRI_voc_tick_t OperationID; // Coordinator operation ID
enum ClusterCommOpStatus {
CL_COMM_SUBMITTED = 1, // initial request queued, but not yet sent
CL_COMM_SENT = 2, // initial request sent, response available
CL_COMM_TIMEOUT = 3, // no answer received until timeout
CL_COMM_RECEIVED = 4, // answer received
CL_COMM_DROPPED = 5 // nothing known about operation, was dropped
CL_COMM_SENDING = 2, // in the process of sending
CL_COMM_DROPPING = 3, // was dropped during send, will be dropped
CL_COMM_SENT = 4, // initial request sent, response available
CL_COMM_TIMEOUT = 5, // no answer received until timeout
CL_COMM_RECEIVED = 6, // answer received
CL_COMM_DROPPED = 7 // nothing known about operation, was dropped
// or actually already collected
};
struct ClusterCommResult {
ClientTransactionID clientTransactionID;
TransactionID transactionID;
CoordTransactionID coordTransactionID;
OperationID operationID;
ShardID shardID;
ServerID serverID; // the actual server ID of the sender
@ -96,21 +99,10 @@ namespace triagens {
}
};
struct ClusterCommOperation : public ClusterCommResult {
rest::HttpRequest* question;
ClusterCommOperation () {}
virtual ~ClusterCommOperation () {
if (0 != question) {
delete question;
}
}
};
class ClusterCommCallback {
struct ClusterCommCallback {
// The idea is that one inherits from this class and implements
// the callback.
ClusterCommCallback () {}
virtual ~ClusterCommCallback ();
@ -122,6 +114,30 @@ namespace triagens {
typedef double ClusterCommTimeout; // in milliseconds
struct ClusterCommOperation : public ClusterCommResult {
string path;
char const* body;
size_t bodyLength;
map<string, string>* headerFields;
ClusterCommCallback* callback;
ClusterCommTimeout timeout;
ClusterCommOperation () {}
virtual ~ClusterCommOperation () {
if (0 != body) {
TRI_Free(TRI_UNKNOWN_MEM_ZONE,
reinterpret_cast<void*>(const_cast<char*>(body)));
}
if (0 != headerFields) {
delete headerFields;
}
if (0 != callback) {
delete callback;
}
}
};
struct ClusterCommOptions {
double _connectTimeout;
double _requestTimeout;
@ -135,6 +151,8 @@ namespace triagens {
class ClusterComm {
friend class ClusterCommThread;
// -----------------------------------------------------------------------------
// --SECTION-- constructors and destructors
// -----------------------------------------------------------------------------
@ -197,18 +215,22 @@ namespace triagens {
/// queued (`status` is CL_COMM_SUBMITTED). Use @ref enquire below to get
/// information about the progress. The actual answer is then delivered
/// either in the callback or via poll. The caller has to call delete on
/// the resulting ClusterCommResult*.
/// the resulting ClusterCommResult*. The library takes ownerships of
/// the pointers `body`, `headerFields` and `callback` and releases
/// the memory when the operation has been finished. One has to use
/// TRI_Allocate with memory zone TRI_UNKNOWN_MEM_ZONE to allocate the
/// memory to which `body` points.
////////////////////////////////////////////////////////////////////////////////
ClusterCommResult* asyncRequest (
ClientTransactionID const& clientTransactionID,
TransactionID const coordTransactionID,
ShardID const& shardID,
ClientTransactionID const clientTransactionID,
CoordTransactionID const coordTransactionID,
ShardID const shardID,
rest::HttpRequest::HttpRequestType reqtype,
string const& path,
char const * body,
string const path,
char const* body,
size_t const bodyLength,
map<string, string> const& headerFields,
map<string, string> * headerFields,
ClusterCommCallback* callback,
ClusterCommTimeout timeout);
@ -227,11 +249,11 @@ namespace triagens {
ClusterCommResult* syncRequest (
ClientTransactionID const& clientTransactionID,
TransactionID const coordTransactionID,
CoordTransactionID const coordTransactionID,
ShardID const& shardID,
rest::HttpRequest::HttpRequestType reqtype,
string const& path,
char const * body,
char const* body,
size_t const bodyLength,
map<string, string> const& headerFields,
ClusterCommTimeout timeout);
@ -242,15 +264,21 @@ namespace triagens {
///
/// This behaves as @ref asyncRequest except that the actual request is
/// taken from `req`. We have to add a few headers and can use callback
/// and timeout. The caller has to delete the result.
/// and timeout. The caller has to delete the result. The library takes
/// ownerships of the pointers `headerFields` and `callback` and
/// releases the memory when the operation has been finished. Note that
/// ClusterComm creates copy of relevant parts of the HTTP request
/// object `req`, simply because it can neither delete nor not delete
/// `req` and its children itself.
////////////////////////////////////////////////////////////////////////////////
ClusterCommResult* asyncDelegate (
rest::HttpRequest const& req,
TransactionID const coordTransactionID,
ShardID const& shardID,
const string& path,
const map<string, string>& headerFields,
CoordTransactionID const coordTransactionID,
ShardID const shardID,
string const path,
map<string, string> const* headerFields,
ClusterCommCallback* callback,
ClusterCommTimeout timeout);
@ -265,32 +293,12 @@ namespace triagens {
ClusterCommResult* syncDelegate (
rest::HttpRequest const& req,
TransactionID const coordTransactionID,
CoordTransactionID const coordTransactionID,
ShardID const& shardID,
const string& path,
const map<string, string>& headerFields,
ClusterCommTimeout timeout);
////////////////////////////////////////////////////////////////////////////////
/// @brief wait for one answer matching the criteria
///
/// If clientTransactionID is empty, then any answer with any
/// clientTransactionID matches. If coordTransactionID is 0, then
/// any answer with any coordTransactionID matches. If shardID is
/// empty, then any answer from any ShardID matches. If operationID
/// is 0, then any answer with any operationID matches. If `timeout`
/// is given, the result can be 0 indicating that no matching answer
/// was available until the timeout was hit. The caller has to delete
/// the result, if it is not 0.
////////////////////////////////////////////////////////////////////////////////
ClusterCommResult* wait (
ClientTransactionID const& clientTransactionID,
TransactionID const coordTransactionID,
OperationID const operationID,
ShardID const& shardID,
ClusterCommTimeout timeout = 0.0);
////////////////////////////////////////////////////////////////////////////////
/// @brief check on the status of an operation
///
@ -305,6 +313,27 @@ namespace triagens {
ClusterCommResult* enquire (OperationID const operationID);
////////////////////////////////////////////////////////////////////////////////
/// @brief wait for one answer matching the criteria
///
/// If clientTransactionID is empty, then any answer with any
/// clientTransactionID matches. If coordTransactionID is 0, then
/// any answer with any coordTransactionID matches. If shardID is
/// empty, then any answer from any ShardID matches. If operationID
/// is 0, then any answer with any operationID matches.
/// This function returns 0 if noIf `timeout`
/// is given, the result can be 0 indicating that no matching answer
/// was available until the timeout was hit. The caller has to delete
/// the result, if it is not 0.
////////////////////////////////////////////////////////////////////////////////
ClusterCommResult* wait (
ClientTransactionID const& clientTransactionID,
CoordTransactionID const coordTransactionID,
OperationID const operationID,
ShardID const& shardID,
ClusterCommTimeout timeout = 0.0);
////////////////////////////////////////////////////////////////////////////////
/// @brief ignore and drop current and future answers matching
///
@ -320,7 +349,7 @@ namespace triagens {
////////////////////////////////////////////////////////////////////////////////
void drop (ClientTransactionID const& clientTransactionID,
TransactionID const coordTransactionID,
CoordTransactionID const coordTransactionID,
OperationID const operationID,
ShardID const& shardID);
@ -367,7 +396,21 @@ namespace triagens {
static OperationID getOperationID ();
static int const maxConnectionsPerServer = 10;
////////////////////////////////////////////////////////////////////////////////
/// @brief get timestamp
////////////////////////////////////////////////////////////////////////////////
static double now () {
struct timeval tv;
gettimeofday(&tv, 0);
double sec = (double) tv.tv_sec; // seconds
double usc = (double) tv.tv_usec; // microseconds
return sec + usc / 1000000.0;
}
static int const maxConnectionsPerServer = 2;
struct SingleServerConnection {
httpclient::GeneralClientConnection* connection;
@ -413,13 +456,24 @@ namespace triagens {
map<OperationID,list<ClusterCommOperation*>::iterator> receivedByOpID;
triagens::basics::ConditionVariable somethingReceived;
// Note: If you really have to lock both `somethingToSend`
// and `somethingReceived` at the same time (usually you should
// not have to!), then: first lock `somethingToReceive`, then
// lock `somethingtoSend` in this order!
// We frequently need the following lengthy types:
typedef list<ClusterCommOperation*>::iterator QueueIterator;
typedef map<OperationID, QueueIterator>::iterator IndexIterator;
// An internal function to match an operation:
bool match (ClientTransactionID const& clientTransactionID,
TransactionID const coordTransactionID,
OperationID const operationID,
CoordTransactionID const coordTransactionID,
ShardID const& shardID,
ClusterCommOperation* op);
// Move an operation from the send to the receive queue:
bool moveFromSendToReceived (OperationID operationID);
// Finally, our background communications thread:
ClusterCommThread *_backgroundThread;
}; // end of class ClusterComm

View File

@ -77,7 +77,7 @@ void ClusterState::loadShardInformation () {
}
}
std::string ClusterState::getServerEndpoint (ServerID& serverID) {
std::string ClusterState::getServerEndpoint (ServerID const& serverID) {
map<ServerID,string>::iterator i = serverAddresses.find(serverID);
if (i != serverAddresses.end()) {
return i->second;
@ -90,7 +90,7 @@ std::string ClusterState::getServerEndpoint (ServerID& serverID) {
return string("");
}
ServerID ClusterState::getResponsibleServer (ShardID& shardID)
ServerID ClusterState::getResponsibleServer (ShardID const& shardID)
{
map<ShardID,ServerID>::iterator i = shards.find(shardID);
if (i != shards.end()) {

View File

@ -118,26 +118,26 @@ namespace triagens {
/// @brief find the endpoint of a server from its ID
////////////////////////////////////////////////////////////////////////////////
string getServerEndpoint(ServerID& serverID);
string getServerEndpoint(ServerID const& serverID);
////////////////////////////////////////////////////////////////////////////////
/// @brief ask about a collection
////////////////////////////////////////////////////////////////////////////////
string getCollectionInfo(CollectionID& collectionID);
string getCollectionInfo(CollectionID const& collectionID);
////////////////////////////////////////////////////////////////////////////////
/// @brief get all shards in a collection
////////////////////////////////////////////////////////////////////////////////
void getShardsCollection(CollectionID& collectionID,
void getShardsCollection(CollectionID const& collectionID,
vector<ShardID> &shards);
////////////////////////////////////////////////////////////////////////////////
/// @brief find the server who is responsible for a shard
////////////////////////////////////////////////////////////////////////////////
ServerID getResponsibleServer(ShardID& shardID);
ServerID getResponsibleServer(ShardID const& shardID);
////////////////////////////////////////////////////////////////////////////////
/// @brief get a number of cluster-wide unique IDs, returns the first