1
0
Fork 0

Can do one roundtrip.

Client -REST-> Coordinator -> sendQueue -REST-> DBServer
         -> recvQueue -REST-> Client
This commit is contained in:
Max Neunhoeffer 2013-12-19 16:15:54 +01:00
parent a79c5801d8
commit 5c80816352
5 changed files with 158 additions and 35 deletions

View File

@ -304,12 +304,13 @@ ClusterCommResult* ClusterComm::asyncRequest (
op->serverID = ClusterState::instance()->getResponsibleServer(
shardID);
op->status = CL_COMM_SUBMITTED;
op->reqtype = reqtype;
op->path = path;
op->body = body;
op->bodyLength = bodyLength;
op->headerFields = headerFields;
op->callback = callback;
op->timeout = timeout;
op->timeout = timeout == 0.0 ? 1e50 : timeout;
ClusterCommResult* res = new ClusterCommResult();
*res = *static_cast<ClusterCommResult*>(op);
@ -339,7 +340,7 @@ bool ClusterComm::match (
shardID == op->shardID) );
}
ClusterCommResult* ClusterComm::enquire (OperationID const operationID) {
ClusterCommResult const* ClusterComm::enquire (OperationID const operationID) {
IndexIterator i;
ClusterCommOperation* op = 0;
ClusterCommResult* res;
@ -355,6 +356,7 @@ ClusterCommResult* ClusterComm::enquire (OperationID const operationID) {
}
op = *(i->second);
*res = *static_cast<ClusterCommResult*>(op);
res->doNotDeleteOnDestruction();
return res;
}
}
@ -376,6 +378,7 @@ ClusterCommResult* ClusterComm::enquire (OperationID const operationID) {
}
op = *(i->second);
*res = *static_cast<ClusterCommResult*>(op);
res->doNotDeleteOnDestruction();
return res;
}
}
@ -438,7 +441,7 @@ ClusterCommResult* ClusterComm::wait (
// Here it could either be in the receive or the send queue, let's wait
timeleft = endtime - now();
if (timeleft <= 0) break;
somethingReceived.wait(uint64_t(timeleft * 1000.0));
somethingReceived.wait(uint64_t(timeleft * 1000000.0));
}
// This place is only reached on timeout
}
@ -489,7 +492,7 @@ ClusterCommResult* ClusterComm::wait (
// Here it could either be in the receive or the send queue, let's wait
timeleft = endtime - now();
if (timeleft <= 0) break;
somethingReceived.wait(uint64_t(timeleft * 1000.0));
somethingReceived.wait(uint64_t(timeleft * 1000000.0));
}
// This place is only reached on timeout
}
@ -580,10 +583,12 @@ bool ClusterComm::moveFromSendToReceived (OperationID operationID) {
assert(op->operationID == operationID);
toSendByOpID.erase(i);
toSend.erase(q);
if (CL_COMM_DROPPING == op->status) {
if (op->dropped) {
return false;
}
op->status = CL_COMM_SENT;
if (op->status == CL_COMM_SENDING) {
op->status = CL_COMM_SENT;
}
received.push_back(op);
q = received.end();
q--;
@ -650,12 +655,46 @@ void ClusterCommThread::run () {
assert(op->status == CL_COMM_SUBMITTED);
op->status = CL_COMM_SENDING;
}
// We release the lock, if it is dropped now, the status just goes
// to CL_COMM_DROPPING, we find out about this after we have sent
// the request.
LOG_DEBUG("ClusterComm faking a send");
// We release the lock, if the operation is dropped now, the
// `dropped` flag is set. We find out about this after we have
// sent the request (happens in moveFromSendToReceived).
// First find the server to which the request goes from the shardID:
ServerID server = ClusterState::instance()->getResponsibleServer(
op->shardID);
if (server == "") {
op->status = CL_COMM_ERROR;
}
else {
// We need a connection to this server:
ClusterComm::SingleServerConnection* connection
= cc->getConnection(server);
if (0 == connection) {
op->status = CL_COMM_ERROR;
}
else {
LOG_TRACE("sending %s request to DB server '%s': %s",
triagens::rest::HttpRequest::translateMethod(op->reqtype).c_str(),
server.c_str(), op->body);
{
triagens::httpclient::SimpleHttpClient client(
connection->connection,
op->timeout, false);
// We add this result to the operation struct without acquiring
// a lock, since we know that only we do such a thing:
op->result = client.request(op->reqtype, op->path, op->body,
op->bodyLength, *(op->headerFields));
// FIXME: handle case that connection was no good and the request
// failed.
}
cc->returnConnection(connection);
}
}
if (!cc->moveFromSendToReceived(op->operationID)) {
// It was dropped in the meantime, so forget about it:
delete op;
@ -666,7 +705,7 @@ void ClusterCommThread::run () {
// a request to terminate the thread:
{
basics::ConditionLocker locker(&cc->somethingToSend);
locker.wait(100);
locker.wait(10000000);
}
}

View File

@ -66,15 +66,16 @@ namespace triagens {
enum ClusterCommOpStatus {
CL_COMM_SUBMITTED = 1, // initial request queued, but not yet sent
CL_COMM_SENDING = 2, // in the process of sending
CL_COMM_DROPPING = 3, // was dropped during send, will be dropped
CL_COMM_SENT = 4, // initial request sent, response available
CL_COMM_TIMEOUT = 5, // no answer received until timeout
CL_COMM_RECEIVED = 6, // answer received
CL_COMM_DROPPED = 7 // nothing known about operation, was dropped
CL_COMM_SENT = 3, // initial request sent, response available
CL_COMM_TIMEOUT = 4, // no answer received until timeout
CL_COMM_RECEIVED = 5, // answer received
CL_COMM_DROPPED = 6, // nothing known about operation, was dropped
// or actually already collected
CL_COMM_ERROR = 7, // original request could not be sent
};
struct ClusterCommResult {
bool _deleteOnDestruction;
ClientTransactionID clientTransactionID;
CoordTransactionID coordTransactionID;
OperationID operationID;
@ -84,16 +85,23 @@ namespace triagens {
// The field result is != 0 ifs status is >= CL_COMM_SENT.
// Note that if status is CL_COMM_TIMEOUT, then the result
// field is a response object that only says "timeout"
bool dropped; // this is set to true, if the operation
// is dropped whilst in state CL_COMM_SENDING
// it is then actually dropped when sent
httpclient::SimpleHttpResult* result;
// the field answer is != 0 iff status is == CL_COMM_RECEIVED
rest::HttpRequest* answer;
ClusterCommResult () : result(0), answer(0) {}
ClusterCommResult ()
: _deleteOnDestruction(true), dropped(false), result(0), answer(0) {}
void doNotDeleteOnDestruction () {
_deleteOnDestruction = false;
}
virtual ~ClusterCommResult () {
if (0 != result) {
if (_deleteOnDestruction && 0 != result) {
delete result;
}
if (0 != answer) {
if (_deleteOnDestruction && 0 != answer) {
delete answer;
}
}
@ -115,6 +123,7 @@ namespace triagens {
typedef double ClusterCommTimeout; // in milliseconds
struct ClusterCommOperation : public ClusterCommResult {
rest::HttpRequest::HttpRequestType reqtype;
string path;
char const* body;
size_t bodyLength;
@ -124,14 +133,14 @@ namespace triagens {
ClusterCommOperation () {}
virtual ~ClusterCommOperation () {
if (0 != body) {
if (_deleteOnDestruction && 0 != body) {
TRI_Free(TRI_UNKNOWN_MEM_ZONE,
reinterpret_cast<void*>(const_cast<char*>(body)));
}
if (0 != headerFields) {
if (_deleteOnDestruction && 0 != headerFields) {
delete headerFields;
}
if (0 != callback) {
if (_deleteOnDestruction && 0 != callback) {
delete callback;
}
@ -304,14 +313,16 @@ namespace triagens {
///
/// This call never blocks and returns information about a specific operation
/// given by `operationID`. Note that if the `status` is >= `CL_COMM_SENT`,
/// then `result` field in the returned object is set, if the `status`
/// then the `result` field in the returned object is set, if the `status`
/// is `CL_COMM_RECEIVED`, then `answer` is set. However, in both cases
/// the ClusterComm library retains the operation in its queues! Therefore,
/// you have to use @ref wait or @ref drop to dequeue. Do not delete
/// `result` and `answer` before doing this!
/// `result` and `answer` before doing this! However, you have to delete
/// the ClusterCommResult pointer you get, it will automatically refrain
/// from deleting `result` and `answer`.
////////////////////////////////////////////////////////////////////////////////
ClusterCommResult* enquire (OperationID const operationID);
ClusterCommResult const* enquire (OperationID const operationID);
////////////////////////////////////////////////////////////////////////////////
/// @brief wait for one answer matching the criteria

View File

@ -28,6 +28,7 @@
#include "Cluster/ClusterState.h"
#include "BasicsC/logging.h"
#include "Basics/ReadLocker.h"
#include "Basics/WriteLocker.h"
using namespace triagens::arango;
@ -62,22 +63,64 @@ ClusterState::~ClusterState () {
}
void ClusterState::loadServerInformation () {
AgencyCommResult res;
while (true) {
// tue die schmutzige Arbeit, verlasse mit return, sobald OK
return; // FIXME ...
LOG_WARNING("ClusterState: Could not (re-)load agency data about servers");
{
WRITE_LOCKER(lock);
res = _agency.getValues("State/ServersRegistered", true);
if (res.successful()) {
if (res.flattenJson(serverAddresses,"State/ServersRegistered/", false)) {
LOG_DEBUG("State/ServersRegistered loaded successfully");
map<ServerID,string>::iterator i;
cout << "Servers registered:" << endl;
for (i = serverAddresses.begin(); i != serverAddresses.end(); ++i) {
cout << " " << i->first << " with address " << i->second << endl;
}
return;
}
else {
LOG_DEBUG("State/ServersRegistered not loaded successfully");
}
}
else {
LOG_DEBUG("Error whilst loading State/ServersRegistered");
}
}
usleep(100);
}
}
void ClusterState::loadShardInformation () {
AgencyCommResult res;
while (true) {
// tue die schmutzige Arbeit, verlasse mit return, sobald OK
return; // FIXME ...
LOG_WARNING("ClusterState: Could not (re-)load agency data about shards");
{
WRITE_LOCKER(lock);
res = _agency.getValues("State/Shards", true);
if (res.successful()) {
if (res.flattenJson(shards,"State/Shards/", false)) {
LOG_DEBUG("State/Shards loaded successfully");
map<ShardID,ServerID>::iterator i;
cout << "Shards:" << endl;
for (i = shards.begin(); i != shards.end(); ++i) {
cout << " " << i->first << " with responsible server "
<< i->second << endl;
}
return;
}
else {
LOG_DEBUG("State/ServersRegistered not loaded successfully");
}
}
else {
LOG_DEBUG("Error whilst loading State/ServersRegistered");
}
}
usleep(100);
}
}
std::string ClusterState::getServerEndpoint (ServerID const& serverID) {
READ_LOCKER(lock);
map<ServerID,string>::iterator i = serverAddresses.find(serverID);
if (i != serverAddresses.end()) {
return i->second;
@ -92,11 +135,12 @@ std::string ClusterState::getServerEndpoint (ServerID const& serverID) {
ServerID ClusterState::getResponsibleServer (ShardID const& shardID)
{
READ_LOCKER(lock);
map<ShardID,ServerID>::iterator i = shards.find(shardID);
if (i != shards.end()) {
return i->second;
}
loadServerInformation();
loadShardInformation();
i = shards.find(shardID);
if (i != shards.end()) {
return i->second;

View File

@ -154,6 +154,8 @@ namespace triagens {
map<ServerID,string> serverAddresses; // from State/ServersRegistered
map<ShardID,ServerID> shards; // from State/Shards
triagens::basics::ReadWriteLock lock;
};
} // end namespace arango

View File

@ -904,6 +904,8 @@ static v8::Handle<v8::Value> JS_ExecuteGlobalContextFunction (v8::Arguments cons
#ifdef TRI_ENABLE_CLUSTER
#include "Cluster/ClusterComm.h"
static v8::Handle<v8::Value> JS_ShardingTest (v8::Arguments const& argv) {
v8::Isolate* isolate;
@ -914,10 +916,35 @@ static v8::Handle<v8::Value> JS_ShardingTest (v8::Arguments const& argv) {
v8g = (TRI_v8_global_t*) isolate->GetData();
if (argv.Length() != 2) {
TRI_V8_EXCEPTION_USAGE(scope, "SYS_TEST_SHARDING(<req>, <res>)");
TRI_V8_EXCEPTION_USAGE(scope, "SYS_SHARDING_TEST(<req>, <res>)");
}
LOG_DEBUG("JS_ShardingTest: we are back in C++");
ClusterComm* cc = ClusterComm::instance();
map<string, string>* headerFields = new map<string, string>;
(*headerFields)["X-ClientTransactionID"] = "BlaBlubb";
ClusterCommResult const* res =
cc->asyncRequest("ClientBla", 12345, "shardBlubb",
triagens::rest::HttpRequest::HTTP_REQUEST_GET,
"/_admin/time", NULL, 0, headerFields, 0, 0);
OperationID opID = res->operationID;
LOG_DEBUG("JS_ShardingTest: request has been submitted");
delete res;
// Wait until the request has actually been sent:
while (true) {
res = cc->enquire(opID);
if (res->status >= CL_COMM_SENT) {
delete res;
break;
}
delete res;
LOG_DEBUG("JS_ShardingTest: request not yet sent");
usleep(1000000);
}
LOG_DEBUG("JS_ShardingTest: request has been sent");
cc->drop("", 0, opID, "");
return scope.Close(v8::Undefined());
}