1
0
Fork 0

Full round trip implemented but not yet working.

This commit is contained in:
Max Neunhoeffer 2013-12-23 16:19:59 +01:00
parent 16e16f705c
commit c9b2884def
9 changed files with 280 additions and 73 deletions

View File

@ -30,6 +30,7 @@
#include "BasicsC/logging.h"
#include "Basics/WriteLocker.h"
#include "Basics/ConditionLocker.h"
#include "Basics/StringUtils.h"
#include "VocBase/server.h"
@ -47,6 +48,15 @@ ClusterCommOptions ClusterComm::_globalConnectionOptions = {
0 // sslProtocol
};
////////////////////////////////////////////////////////////////////////////////
/// @brief global callback for asynchronous REST handler
////////////////////////////////////////////////////////////////////////////////
void triagens::arango::ClusterCommRestCallback(string& coordinator,
triagens::rest::HttpResponse* response)
{
ClusterComm::instance()->asyncAnswer(coordinator, response);
}
// -----------------------------------------------------------------------------
// --SECTION-- ClusterComm class
@ -309,6 +319,13 @@ ClusterCommResult* ClusterComm::asyncRequest (
op->serverID = ClusterState::instance()->getResponsibleServer(
shardID);
// Add the header fields for asynchronous mode:
(*headerFields)["X-Arango-Async"] = "store";
(*headerFields)["X-Arango-Coordinator"] = ServerState::instance()->getId() +
":" + clientTransactionID + ":" +
basics::StringUtils::itoa(coordTransactionID) +
":" + basics::StringUtils::itoa(op->operationID);
op->status = CL_COMM_SUBMITTED;
op->reqtype = reqtype;
op->path = path;
@ -327,6 +344,7 @@ ClusterCommResult* ClusterComm::asyncRequest (
list<ClusterCommOperation*>::iterator i = toSend.end();
toSendByOpID[op->operationID] = --i;
}
cout << "In asyncRequest, put into queue " << op->operationID << endl;
somethingToSend.signal();
return res;
@ -389,7 +407,13 @@ ClusterCommResult const* ClusterComm::enquire (OperationID const operationID) {
}
}
return 0;
res = new ClusterCommResult();
if (0 == res) {
return 0;
}
res->operationID = operationID;
res->status = CL_COMM_DROPPED;
return res;
}
ClusterCommResult* ClusterComm::wait (
@ -568,10 +592,156 @@ void ClusterComm::drop (
}
}
int ClusterComm::processAnswer(rest::HttpRequest* answer) {
void ClusterComm::asyncAnswer (string& coordinatorHeader,
rest::HttpResponse* responseToSend) {
// find matching operation, report if found, otherwise drop
return TRI_ERROR_NO_ERROR;
// First take apart the header to get various IDs:
ServerID coordinatorID;
string clientTransactionID;
CoordTransactionID coordTransactionID;
OperationID operationID;
size_t start = 0;
size_t pos;
cout << "In asyncAnswer, seeing " << coordinatorHeader << endl;
pos = coordinatorHeader.find(":",start);
if (pos == string::npos) {
cout << "Hallo1" << endl;
LOG_ERROR("Could not find coordinator ID in X-Arango-Coordinator");
return;
}
coordinatorID = coordinatorHeader.substr(start,pos-start);
start = pos+1;
pos = coordinatorHeader.find(":",start);
if (pos == string::npos) {
cout << "Hallo2" << endl;
LOG_ERROR("Could not find clientTransactionID in X-Arango-Coordinator");
return;
}
clientTransactionID = coordinatorHeader.substr(start,pos-start);
start = pos+1;
pos = coordinatorHeader.find(":",start);
if (pos == string::npos) {
cout << "Hallo3" << endl;
LOG_ERROR("Could not find coordTransactionID in X-Arango-Coordinator");
return;
}
coordTransactionID = basics::StringUtils::uint64(
coordinatorHeader.substr(start,pos-start));
start = pos+1;
operationID = basics::StringUtils::uint64(coordinatorHeader.substr(start));
cout << "Hallo4" << endl;
// Now find the connection to which the request goes from the coordinatorID:
ClusterComm::SingleServerConnection* connection
= getConnection(coordinatorID);
if (0 == connection) {
cout << "asyncAnswer: did not get connection" << endl;
LOG_ERROR("asyncAnswer: cannot create connection to server '%s'",
coordinatorID.c_str());
return;
}
map<string, string> headers = responseToSend->headers();
headers["X-Arango-Coordinator"] = coordinatorHeader;
char const* body = responseToSend->body().c_str();
size_t len = responseToSend->body().length();
LOG_TRACE("asyncAnswer: sending PUT request to DB server '%s'",
coordinatorID.c_str());
cout << "asyncAnswer: initialising client" << endl;
triagens::httpclient::SimpleHttpClient client(
connection->connection,
_globalConnectionOptions._singleRequestTimeout,
false);
cout << "asyncAnswer: sending request" << endl;
// We add this result to the operation struct without acquiring
// a lock, since we know that only we do such a thing:
httpclient::SimpleHttpResult* result =
client.request(rest::HttpRequest::HTTP_REQUEST_PUT,
"/_api/shard-comm", body, len, headers);
cout << "In asyncAnswer, error msg: " << endl
<< client.getErrorMessage() << endl
<< result->getResultTypeMessage() << endl;
// FIXME: handle case that connection was no good and the request
// failed.
returnConnection(connection);
}
string ClusterComm::processAnswer(string& coordinatorHeader,
rest::HttpRequest* answer) {
// First take apart the header to get various IDs:
ServerID coordinatorID;
string clientTransactionID;
CoordTransactionID coordTransactionID;
OperationID operationID;
size_t start = 0;
size_t pos;
cout << "In processAnswer, seeing " << coordinatorHeader << endl;
pos = coordinatorHeader.find(":",start);
if (pos == string::npos) {
return string("could not find coordinator ID in 'X-Arango-Coordinator'");
}
coordinatorID = coordinatorHeader.substr(start,pos-start);
start = pos+1;
pos = coordinatorHeader.find(":",start);
if (pos == string::npos) {
return
string("could not find clientTransactionID in 'X-Arango-Coordinator'");
}
clientTransactionID = coordinatorHeader.substr(start,pos-start);
start = pos+1;
pos = coordinatorHeader.find(":",start);
if (pos == string::npos) {
return
string("could not find coordTransactionID in 'X-Arango-Coordinator'");
}
coordTransactionID = basics::StringUtils::uint64(
coordinatorHeader.substr(start,pos-start));
start = pos+1;
operationID = basics::StringUtils::uint64(coordinatorHeader.substr(start));
// Finally find the ClusterCommOperation record for this operation:
ClusterCommOperation* op;
{
basics::ConditionLocker locker(&somethingReceived);
ClusterComm::IndexIterator i;
i = receivedByOpID.find(operationID);
if (i != receivedByOpID.end()) {
op = *(i->second);
op->answer = answer;
op->status = CL_COMM_RECEIVED;
somethingReceived.broadcast();
}
else {
// We have to look in the send queue as well, as it might not yet
// have been moved to the received queue. Note however, that it must
// have been fully sent, so this is highly unlikely, but not impossible.
basics::ConditionLocker sendlocker(&somethingToSend);
i = toSendByOpID.find(operationID);
if (i != toSendByOpID.end()) {
op = *(i->second);
op->answer = answer;
op->status = CL_COMM_RECEIVED;
somethingReceived.broadcast();
}
else {
// Nothing known about the request, get rid of it:
delete answer;
return string("operation was already dropped by sender");
}
}
}
cout << "end of processAnswer" << endl;
return string("");
}
// Move an operation from the send to the receive queue:
@ -580,9 +750,18 @@ bool ClusterComm::moveFromSendToReceived (OperationID operationID) {
IndexIterator i;
ClusterCommOperation* op;
cout << "In moveFromSendToReceived " << operationID << endl;
basics::ConditionLocker locker(&somethingReceived);
basics::ConditionLocker sendlocker(&somethingToSend);
i = toSendByOpID.find(operationID); // cannot fail
if (i == toSendByOpID.end()) {
IndexIterator j;
cout << "Looking for operationID:" << operationID << endl;
for (j = toSendByOpID.begin(); j != toSendByOpID.end(); ++j) {
cout << "Have operationID:" << (*j->second)->operationID << endl;
}
}
assert(i != toSendByOpID.end());
q = i->second;
op = *q;
@ -593,12 +772,17 @@ bool ClusterComm::moveFromSendToReceived (OperationID operationID) {
return false;
}
if (op->status == CL_COMM_SENDING) {
// Note that in the meantime the status could have changed to
// CL_COMM_ERROR or indeed to CL_COMM_RECEIVED in these cases, we do
// not want to overwrite this result
op->status = CL_COMM_SENT;
}
received.push_back(op);
q = received.end();
q--;
receivedByOpID[operationID] = q;
somethingReceived.broadcast();
cout << "In moveFromSendToReceived moved " << operationID << endl;
return true;
}
@ -660,6 +844,7 @@ void ClusterCommThread::run () {
if (0 != _stop) {
break;
}
cout << "Noticed something to send" << endl;
op = cc->toSend.front();
assert(op->status == CL_COMM_SUBMITTED);
op->status = CL_COMM_SENDING;
@ -672,16 +857,20 @@ void ClusterCommThread::run () {
// First find the server to which the request goes from the shardID:
ServerID server = ClusterState::instance()->getResponsibleServer(
op->shardID);
cout << "Have responsible server " << server << endl;
if (server == "") {
op->status = CL_COMM_ERROR;
}
else {
// We need a connection to this server:
cout << "Urgh1" << endl;
ClusterComm::SingleServerConnection* connection
= cc->getConnection(server);
cout << "Urgh2" << endl;
if (0 == connection) {
op->status = CL_COMM_ERROR;
LOG_ERROR("cannot create connection to server '%s'", server.c_str());
cout << "did not get connection object" << endl;
}
else {
LOG_TRACE("sending %s request to DB server '%s': %s",
@ -689,6 +878,7 @@ void ClusterCommThread::run () {
server.c_str(), op->body);
{
cout << "initialising client" << endl;
triagens::httpclient::SimpleHttpClient client(
connection->connection,
op->timeout, false);
@ -697,6 +887,9 @@ void ClusterCommThread::run () {
// a lock, since we know that only we do such a thing:
op->result = client.request(op->reqtype, op->path, op->body,
op->bodyLength, *(op->headerFields));
cout << "Sending msg:" << endl
<< client.getErrorMessage() << endl
<< op->result->getResultTypeMessage() << endl;
// FIXME: handle case that connection was no good and the request
// failed.
}

View File

@ -41,10 +41,7 @@
#include "Cluster/AgencyComm.h"
#include "Cluster/ClusterState.h"
#ifdef __cplusplus
extern "C" {
#endif
#include "Cluster/ServerState.h"
namespace triagens {
namespace arango {
@ -69,9 +66,10 @@ namespace triagens {
CL_COMM_SENT = 3, // initial request sent, response available
CL_COMM_TIMEOUT = 4, // no answer received until timeout
CL_COMM_RECEIVED = 5, // answer received
CL_COMM_DROPPED = 6, // nothing known about operation, was dropped
// or actually already collected
CL_COMM_ERROR = 7, // original request could not be sent
CL_COMM_ERROR = 6, // original request could not be sent
CL_COMM_DROPPED = 7 // operation was dropped, not known
// this is only used to report an error
// in the wait or enquire methods
};
struct ClusterCommResult {
@ -155,6 +153,13 @@ namespace triagens {
uint32_t _sslProtocol;
};
////////////////////////////////////////////////////////////////////////////////
/// @brief global callback for asynchronous REST handler
////////////////////////////////////////////////////////////////////////////////
void ClusterCommRestCallback(string& coordinator, rest::HttpResponse* response);
// -----------------------------------------------------------------------------
// --SECTION-- ClusterComm
// -----------------------------------------------------------------------------
@ -367,12 +372,13 @@ namespace triagens {
////////////////////////////////////////////////////////////////////////////////
/// @brief process an answer coming in on the HTTP socket which is actually
/// an answer to one of our earlier requests, return value of 0 means OK
/// and nonzero is an error. This is only called in a coordinator node
/// an answer to one of our earlier requests, return value of "" means OK
/// and nonempty is an error. This is only called in a coordinator node
/// and not in a DBServer node.
////////////////////////////////////////////////////////////////////////////////
int processAnswer(rest::HttpRequest* answer);
string processAnswer(string& coordinatorHeader,
rest::HttpRequest* answer);
////////////////////////////////////////////////////////////////////////////////
/// @brief send an answer HTTP request to a coordinator, which contains
@ -380,9 +386,8 @@ namespace triagens {
/// a DBServer node and never in a coordinator node.
////////////////////////////////////////////////////////////////////////////////
httpclient::SimpleHttpResult* asyncAnswer (
rest::HttpRequest& origRequest,
rest::HttpResponse& responseToSend);
void asyncAnswer (string& coordinatorHeader,
rest::HttpResponse* responseToSend);
// -----------------------------------------------------------------------------
// --SECTION-- private methods and data
@ -595,10 +600,6 @@ namespace triagens {
} // namespace arango
} // namespace triagens
#ifdef __cplusplus
}
#endif
#endif
// Local Variables:

View File

@ -124,6 +124,8 @@ void HeartbeatThread::run () {
else {
// value did not change, but we already blocked waiting for a change...
// nothing to do here
LOG_ERROR("HeartbeatThread suspended for 10 seconds");
sleep(10);
}
}

View File

@ -27,7 +27,9 @@
#include "RestShardHandler.h"
#include "Basics/ConditionLocker.h"
#include "Cluster/ServerState.h"
#include "Cluster/ClusterComm.h"
#include "Dispatcher/Dispatcher.h"
#include "Rest/HttpRequest.h"
#include "Rest/HttpResponse.h"
@ -94,46 +96,35 @@ string const& RestShardHandler::queue () const {
triagens::rest::HttpHandler::status_e RestShardHandler::execute () {
ServerState::RoleEnum role = ServerState::instance()->getRole();
if (role == ServerState::ROLE_COORDINATOR) {
if (role != ServerState::ROLE_COORDINATOR) {
generateError(triagens::rest::HttpResponse::BAD,
(int) triagens::rest::HttpResponse::BAD,
"this API is meant to be called on a coordinator node");
return HANDLER_DONE;
}
/*
bool found;
char const* coordinator = _request->header("x-arango-coordinator", found);
char const* _coordinator = _request->header("x-arango-coordinator", found);
if (! found) {
generateError(triagens::rest::HttpResponse::BAD,
(int) triagens::rest::HttpResponse::BAD,
"header 'x-arango-coordinator' is missing");
"header 'X-Arango-Coordinator' is missing");
return HANDLER_DONE;
}
char const* operation = _request->header("x-arango-operation", found);
if (! found) {
generateError(triagens::rest::HttpResponse::BAD,
(int) triagens::rest::HttpResponse::BAD,
"header 'x-arango-operation' is missing");
}
char const* url = _request->header("x-arango-url", found);
if (! found) {
generateError(triagens::rest::HttpResponse::BAD,
(int) triagens::rest::HttpResponse::BAD,
"header 'x-arango-url' is missing");
}
*/
string coordinatorHeader = _coordinator;
string result = ClusterComm::instance()->processAnswer(coordinatorHeader,
stealRequest());
/*
triagens::rest::HttpHandler* handler = this->_server->createHandler(_request);
triagens::rest::Job* job = new triagens::rest::GeneralServerJob<triagens::rest::HttpServer, triagens::rest::HttpHandlerFactory::GeneralHandler>(0, handler, true);
_dispatcher->addJob(job);
*/
// respond with a 202
_response = createResponse(triagens::rest::HttpResponse::ACCEPTED);
if (result == "") {
_response = createResponse(triagens::rest::HttpResponse::ACCEPTED);
}
else {
generateError(triagens::rest::HttpResponse::BAD,
(int) triagens::rest::HttpResponse::BAD,
result.c_str());
}
return HANDLER_DONE;
}

View File

@ -75,6 +75,7 @@
#ifdef TRI_ENABLE_CLUSTER
#include "Cluster/ApplicationCluster.h"
#include "Cluster/RestShardHandler.h"
#include "Cluster/ClusterComm.h"
#endif
#include "V8/V8LineEditor.h"
@ -555,7 +556,12 @@ void ArangoServer::buildApplicationServer () {
// endpoint server
// .............................................................................
_jobManager = new AsyncJobManager(&TRI_NewTickServer);
#ifdef TRI_ENABLE_CLUSTER
_jobManager = new AsyncJobManager(&TRI_NewTickServer,
ClusterCommRestCallback);
#else
_jobManager = new AsyncJobManager(&TRI_NewTickServer, 0);
#endif
_applicationEndpointServer = new ApplicationEndpointServer(_applicationServer,
_applicationScheduler,

View File

@ -934,9 +934,6 @@ static v8::Handle<v8::Value> JS_ShardingTest (v8::Arguments const& argv) {
}
map<string, string>* headerFields = new map<string, string>;
(*headerFields)["X-ClientTransactionID"] = clientTransactionId;
(*headerFields)["X-Arango-Async"] = "store";
(*headerFields)["X-Arango-Coordinator"] = ServerState::instance()->getAddress();
ClusterCommResult const* res =
cc->asyncRequest(clientTransactionId, TRI_NewTickServer(), "shardBlubb",
@ -968,7 +965,7 @@ static v8::Handle<v8::Value> JS_ShardingTest (v8::Arguments const& argv) {
}
LOG_DEBUG("JS_ShardingTest: request not yet sent");
usleep(500000);
usleep(50000);
}
LOG_DEBUG("JS_ShardingTest: request has been sent");

View File

@ -50,11 +50,11 @@ namespace triagens {
// -----------------------------------------------------------------------------
public:
AsyncCallbackContext (std::string const& url)
: _url(url),
AsyncCallbackContext (std::string const& coordHeader)
: _coordHeader(coordHeader),
_response(0) {
std::cout << "generated async context " << _url << std::endl;
std::cout << "generated async context " << _coordHeader << std::endl;
}
~AsyncCallbackContext () {
@ -69,16 +69,8 @@ namespace triagens {
public:
bool callback (HttpResponse* response) {
if (response == 0) {
return false;
}
_response = response;
std::cout << "callback called for async context " << _url << ", BODY: " << _response->body().c_str() << std::endl;
return true;
string& getCoordinatorHeader() {
return _coordHeader;
}
// -----------------------------------------------------------------------------
@ -87,7 +79,7 @@ namespace triagens {
private:
std::string _url;
std::string _coordHeader;
HttpResponse* _response;
};
@ -216,10 +208,12 @@ namespace triagens {
/// @brief constructor
////////////////////////////////////////////////////////////////////////////////
AsyncJobManager (uint64_t (*idFunc)())
AsyncJobManager (uint64_t (*idFunc)(),
void (*callbackFunc)(string&, HttpResponse*))
: _lock(),
_jobs(),
generate(idFunc) {
generate(idFunc),
callback(callbackFunc) {
}
////////////////////////////////////////////////////////////////////////////////
@ -412,11 +406,12 @@ namespace triagens {
char const* hdr = job->getHandler()->getRequest()->header("x-arango-coordinator", found);
if (found) {
std::cout << "FOUND COORDINATOR HEADER\n";
std::cout << "FOUND COORDINATOR HEADER\n";
ctx = new AsyncCallbackContext(std::string(hdr));
}
AsyncJobResult ajr(*jobId, 0, TRI_microtime(), AsyncJobResult::JOB_PENDING, ctx);
AsyncJobResult ajr(*jobId, 0, TRI_microtime(),
AsyncJobResult::JOB_PENDING, ctx);
WRITE_LOCKER(_lock);
@ -471,8 +466,8 @@ namespace triagens {
}
// if callback is set, execute it now (outside of the wr-lock)
if (ctx != 0 && response != 0) {
ctx->callback(response);
if (ctx != 0 && response != 0 && 0 != callback) {
callback(ctx->getCoordinatorHeader(), response);
}
}
@ -500,6 +495,12 @@ namespace triagens {
uint64_t (*generate)();
////////////////////////////////////////////////////////////////////////////////
/// @brief function pointer for callback registered at initialisation
////////////////////////////////////////////////////////////////////////////////
void (*callback)(string& coordinator, HttpResponse* response);
};
}

View File

@ -100,6 +100,16 @@ HttpResponse* HttpHandler::stealResponse () {
return tmp;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief steal the request
////////////////////////////////////////////////////////////////////////////////
HttpRequest* HttpHandler::stealRequest () {
HttpRequest* tmp = _request;
_request = 0;
return tmp;
}
////////////////////////////////////////////////////////////////////////////////
/// @}
////////////////////////////////////////////////////////////////////////////////

View File

@ -79,7 +79,7 @@ namespace triagens {
/// @brief constructs a new handler
///
/// Note that the handler owns the request and the response. It is its
/// responsibility to destroy them both.
/// responsibility to destroy them both. See also the two steal methods.
////////////////////////////////////////////////////////////////////////////////
explicit
@ -149,6 +149,12 @@ namespace triagens {
return _request;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief steal the pointer to the request
////////////////////////////////////////////////////////////////////////////////
HttpRequest* stealRequest ();
////////////////////////////////////////////////////////////////////////////////
/// {@inheritDoc}
////////////////////////////////////////////////////////////////////////////////