diff --git a/arangod/Cluster/ClusterComm.cpp b/arangod/Cluster/ClusterComm.cpp index b482bc904b..a432d408a6 100644 --- a/arangod/Cluster/ClusterComm.cpp +++ b/arangod/Cluster/ClusterComm.cpp @@ -30,6 +30,7 @@ #include "BasicsC/logging.h" #include "Basics/WriteLocker.h" #include "Basics/ConditionLocker.h" +#include "Basics/StringUtils.h" #include "VocBase/server.h" @@ -47,6 +48,15 @@ ClusterCommOptions ClusterComm::_globalConnectionOptions = { 0 // sslProtocol }; +//////////////////////////////////////////////////////////////////////////////// +/// @brief global callback for asynchronous REST handler +//////////////////////////////////////////////////////////////////////////////// + +void triagens::arango::ClusterCommRestCallback(string& coordinator, + triagens::rest::HttpResponse* response) +{ + ClusterComm::instance()->asyncAnswer(coordinator, response); +} // ----------------------------------------------------------------------------- // --SECTION-- ClusterComm class @@ -309,6 +319,13 @@ ClusterCommResult* ClusterComm::asyncRequest ( op->serverID = ClusterState::instance()->getResponsibleServer( shardID); + // Add the header fields for asynchronous mode: + (*headerFields)["X-Arango-Async"] = "store"; + (*headerFields)["X-Arango-Coordinator"] = ServerState::instance()->getId() + + ":" + clientTransactionID + ":" + + basics::StringUtils::itoa(coordTransactionID) + + ":" + basics::StringUtils::itoa(op->operationID); + op->status = CL_COMM_SUBMITTED; op->reqtype = reqtype; op->path = path; @@ -327,6 +344,7 @@ ClusterCommResult* ClusterComm::asyncRequest ( list::iterator i = toSend.end(); toSendByOpID[op->operationID] = --i; } + cout << "In asyncRequest, put into queue " << op->operationID << endl; somethingToSend.signal(); return res; @@ -389,7 +407,13 @@ ClusterCommResult const* ClusterComm::enquire (OperationID const operationID) { } } - return 0; + res = new ClusterCommResult(); + if (0 == res) { + return 0; + } + res->operationID = operationID; + res->status = CL_COMM_DROPPED; + return res; } ClusterCommResult* ClusterComm::wait ( @@ -568,10 +592,156 @@ void ClusterComm::drop ( } } -int ClusterComm::processAnswer(rest::HttpRequest* answer) { +void ClusterComm::asyncAnswer (string& coordinatorHeader, + rest::HttpResponse* responseToSend) { - // find matching operation, report if found, otherwise drop - return TRI_ERROR_NO_ERROR; + // First take apart the header to get various IDs: + ServerID coordinatorID; + string clientTransactionID; + CoordTransactionID coordTransactionID; + OperationID operationID; + size_t start = 0; + size_t pos; + + cout << "In asyncAnswer, seeing " << coordinatorHeader << endl; + pos = coordinatorHeader.find(":",start); + if (pos == string::npos) { + cout << "Hallo1" << endl; + LOG_ERROR("Could not find coordinator ID in X-Arango-Coordinator"); + return; + } + coordinatorID = coordinatorHeader.substr(start,pos-start); + start = pos+1; + pos = coordinatorHeader.find(":",start); + if (pos == string::npos) { + cout << "Hallo2" << endl; + LOG_ERROR("Could not find clientTransactionID in X-Arango-Coordinator"); + return; + } + clientTransactionID = coordinatorHeader.substr(start,pos-start); + start = pos+1; + pos = coordinatorHeader.find(":",start); + if (pos == string::npos) { + cout << "Hallo3" << endl; + LOG_ERROR("Could not find coordTransactionID in X-Arango-Coordinator"); + return; + } + coordTransactionID = basics::StringUtils::uint64( + coordinatorHeader.substr(start,pos-start)); + start = pos+1; + operationID = basics::StringUtils::uint64(coordinatorHeader.substr(start)); + + cout << "Hallo4" << endl; + + // Now find the connection to which the request goes from the coordinatorID: + ClusterComm::SingleServerConnection* connection + = getConnection(coordinatorID); + if (0 == connection) { + cout << "asyncAnswer: did not get connection" << endl; + LOG_ERROR("asyncAnswer: cannot create connection to server '%s'", + coordinatorID.c_str()); + return; + } + + map headers = responseToSend->headers(); + headers["X-Arango-Coordinator"] = coordinatorHeader; + char const* body = responseToSend->body().c_str(); + size_t len = responseToSend->body().length(); + + LOG_TRACE("asyncAnswer: sending PUT request to DB server '%s'", + coordinatorID.c_str()); + + cout << "asyncAnswer: initialising client" << endl; + + triagens::httpclient::SimpleHttpClient client( + connection->connection, + _globalConnectionOptions._singleRequestTimeout, + false); + + cout << "asyncAnswer: sending request" << endl; + + // We add this result to the operation struct without acquiring + // a lock, since we know that only we do such a thing: + httpclient::SimpleHttpResult* result = + client.request(rest::HttpRequest::HTTP_REQUEST_PUT, + "/_api/shard-comm", body, len, headers); + cout << "In asyncAnswer, error msg: " << endl + << client.getErrorMessage() << endl + << result->getResultTypeMessage() << endl; + // FIXME: handle case that connection was no good and the request + // failed. + returnConnection(connection); +} + +string ClusterComm::processAnswer(string& coordinatorHeader, + rest::HttpRequest* answer) { + // First take apart the header to get various IDs: + ServerID coordinatorID; + string clientTransactionID; + CoordTransactionID coordTransactionID; + OperationID operationID; + size_t start = 0; + size_t pos; + + cout << "In processAnswer, seeing " << coordinatorHeader << endl; + + pos = coordinatorHeader.find(":",start); + if (pos == string::npos) { + return string("could not find coordinator ID in 'X-Arango-Coordinator'"); + } + coordinatorID = coordinatorHeader.substr(start,pos-start); + start = pos+1; + pos = coordinatorHeader.find(":",start); + if (pos == string::npos) { + return + string("could not find clientTransactionID in 'X-Arango-Coordinator'"); + } + clientTransactionID = coordinatorHeader.substr(start,pos-start); + start = pos+1; + pos = coordinatorHeader.find(":",start); + if (pos == string::npos) { + return + string("could not find coordTransactionID in 'X-Arango-Coordinator'"); + } + coordTransactionID = basics::StringUtils::uint64( + coordinatorHeader.substr(start,pos-start)); + start = pos+1; + operationID = basics::StringUtils::uint64(coordinatorHeader.substr(start)); + + // Finally find the ClusterCommOperation record for this operation: + ClusterCommOperation* op; + { + basics::ConditionLocker locker(&somethingReceived); + ClusterComm::IndexIterator i; + i = receivedByOpID.find(operationID); + if (i != receivedByOpID.end()) { + op = *(i->second); + op->answer = answer; + op->status = CL_COMM_RECEIVED; + somethingReceived.broadcast(); + } + else { + // We have to look in the send queue as well, as it might not yet + // have been moved to the received queue. Note however, that it must + // have been fully sent, so this is highly unlikely, but not impossible. + basics::ConditionLocker sendlocker(&somethingToSend); + i = toSendByOpID.find(operationID); + if (i != toSendByOpID.end()) { + op = *(i->second); + op->answer = answer; + op->status = CL_COMM_RECEIVED; + somethingReceived.broadcast(); + } + else { + // Nothing known about the request, get rid of it: + delete answer; + return string("operation was already dropped by sender"); + } + } + } + + cout << "end of processAnswer" << endl; + return string(""); } // Move an operation from the send to the receive queue: @@ -580,9 +750,18 @@ bool ClusterComm::moveFromSendToReceived (OperationID operationID) { IndexIterator i; ClusterCommOperation* op; + cout << "In moveFromSendToReceived " << operationID << endl; basics::ConditionLocker locker(&somethingReceived); basics::ConditionLocker sendlocker(&somethingToSend); i = toSendByOpID.find(operationID); // cannot fail + if (i == toSendByOpID.end()) { + IndexIterator j; + cout << "Looking for operationID:" << operationID << endl; + for (j = toSendByOpID.begin(); j != toSendByOpID.end(); ++j) { + cout << "Have operationID:" << (*j->second)->operationID << endl; + } + } + assert(i != toSendByOpID.end()); q = i->second; op = *q; @@ -593,12 +772,17 @@ bool ClusterComm::moveFromSendToReceived (OperationID operationID) { return false; } if (op->status == CL_COMM_SENDING) { + // Note that in the meantime the status could have changed to + // CL_COMM_ERROR or indeed to CL_COMM_RECEIVED in these cases, we do + // not want to overwrite this result op->status = CL_COMM_SENT; } received.push_back(op); q = received.end(); q--; receivedByOpID[operationID] = q; + somethingReceived.broadcast(); + cout << "In moveFromSendToReceived moved " << operationID << endl; return true; } @@ -660,6 +844,7 @@ void ClusterCommThread::run () { if (0 != _stop) { break; } + cout << "Noticed something to send" << endl; op = cc->toSend.front(); assert(op->status == CL_COMM_SUBMITTED); op->status = CL_COMM_SENDING; @@ -672,16 +857,20 @@ void ClusterCommThread::run () { // First find the server to which the request goes from the shardID: ServerID server = ClusterState::instance()->getResponsibleServer( op->shardID); + cout << "Have responsible server " << server << endl; if (server == "") { op->status = CL_COMM_ERROR; } else { // We need a connection to this server: + cout << "Urgh1" << endl; ClusterComm::SingleServerConnection* connection = cc->getConnection(server); + cout << "Urgh2" << endl; if (0 == connection) { op->status = CL_COMM_ERROR; LOG_ERROR("cannot create connection to server '%s'", server.c_str()); + cout << "did not get connection object" << endl; } else { LOG_TRACE("sending %s request to DB server '%s': %s", @@ -689,6 +878,7 @@ void ClusterCommThread::run () { server.c_str(), op->body); { + cout << "initialising client" << endl; triagens::httpclient::SimpleHttpClient client( connection->connection, op->timeout, false); @@ -697,6 +887,9 @@ void ClusterCommThread::run () { // a lock, since we know that only we do such a thing: op->result = client.request(op->reqtype, op->path, op->body, op->bodyLength, *(op->headerFields)); + cout << "Sending msg:" << endl + << client.getErrorMessage() << endl + << op->result->getResultTypeMessage() << endl; // FIXME: handle case that connection was no good and the request // failed. } diff --git a/arangod/Cluster/ClusterComm.h b/arangod/Cluster/ClusterComm.h index 6a7b1be638..630aa46372 100644 --- a/arangod/Cluster/ClusterComm.h +++ b/arangod/Cluster/ClusterComm.h @@ -41,10 +41,7 @@ #include "Cluster/AgencyComm.h" #include "Cluster/ClusterState.h" - -#ifdef __cplusplus -extern "C" { -#endif +#include "Cluster/ServerState.h" namespace triagens { namespace arango { @@ -69,9 +66,10 @@ namespace triagens { CL_COMM_SENT = 3, // initial request sent, response available CL_COMM_TIMEOUT = 4, // no answer received until timeout CL_COMM_RECEIVED = 5, // answer received - CL_COMM_DROPPED = 6, // nothing known about operation, was dropped - // or actually already collected - CL_COMM_ERROR = 7, // original request could not be sent + CL_COMM_ERROR = 6, // original request could not be sent + CL_COMM_DROPPED = 7 // operation was dropped, not known + // this is only used to report an error + // in the wait or enquire methods }; struct ClusterCommResult { @@ -155,6 +153,13 @@ namespace triagens { uint32_t _sslProtocol; }; +//////////////////////////////////////////////////////////////////////////////// +/// @brief global callback for asynchronous REST handler +//////////////////////////////////////////////////////////////////////////////// + +void ClusterCommRestCallback(string& coordinator, rest::HttpResponse* response); + + // ----------------------------------------------------------------------------- // --SECTION-- ClusterComm // ----------------------------------------------------------------------------- @@ -367,12 +372,13 @@ namespace triagens { //////////////////////////////////////////////////////////////////////////////// /// @brief process an answer coming in on the HTTP socket which is actually -/// an answer to one of our earlier requests, return value of 0 means OK -/// and nonzero is an error. This is only called in a coordinator node +/// an answer to one of our earlier requests, return value of "" means OK +/// and nonempty is an error. This is only called in a coordinator node /// and not in a DBServer node. //////////////////////////////////////////////////////////////////////////////// - int processAnswer(rest::HttpRequest* answer); + string processAnswer(string& coordinatorHeader, + rest::HttpRequest* answer); //////////////////////////////////////////////////////////////////////////////// /// @brief send an answer HTTP request to a coordinator, which contains @@ -380,9 +386,8 @@ namespace triagens { /// a DBServer node and never in a coordinator node. //////////////////////////////////////////////////////////////////////////////// - httpclient::SimpleHttpResult* asyncAnswer ( - rest::HttpRequest& origRequest, - rest::HttpResponse& responseToSend); + void asyncAnswer (string& coordinatorHeader, + rest::HttpResponse* responseToSend); // ----------------------------------------------------------------------------- // --SECTION-- private methods and data @@ -595,10 +600,6 @@ namespace triagens { } // namespace arango } // namespace triagens -#ifdef __cplusplus -} -#endif - #endif // Local Variables: diff --git a/arangod/Cluster/HeartbeatThread.cpp b/arangod/Cluster/HeartbeatThread.cpp index c699a668bf..0be13f1810 100644 --- a/arangod/Cluster/HeartbeatThread.cpp +++ b/arangod/Cluster/HeartbeatThread.cpp @@ -124,6 +124,8 @@ void HeartbeatThread::run () { else { // value did not change, but we already blocked waiting for a change... // nothing to do here + LOG_ERROR("HeartbeatThread suspended for 10 seconds"); + sleep(10); } } diff --git a/arangod/Cluster/RestShardHandler.cpp b/arangod/Cluster/RestShardHandler.cpp index 522d6d90b2..4564558a02 100644 --- a/arangod/Cluster/RestShardHandler.cpp +++ b/arangod/Cluster/RestShardHandler.cpp @@ -27,7 +27,9 @@ #include "RestShardHandler.h" +#include "Basics/ConditionLocker.h" #include "Cluster/ServerState.h" +#include "Cluster/ClusterComm.h" #include "Dispatcher/Dispatcher.h" #include "Rest/HttpRequest.h" #include "Rest/HttpResponse.h" @@ -94,46 +96,35 @@ string const& RestShardHandler::queue () const { triagens::rest::HttpHandler::status_e RestShardHandler::execute () { ServerState::RoleEnum role = ServerState::instance()->getRole(); - if (role == ServerState::ROLE_COORDINATOR) { + if (role != ServerState::ROLE_COORDINATOR) { generateError(triagens::rest::HttpResponse::BAD, (int) triagens::rest::HttpResponse::BAD, "this API is meant to be called on a coordinator node"); return HANDLER_DONE; } -/* + bool found; - char const* coordinator = _request->header("x-arango-coordinator", found); + char const* _coordinator = _request->header("x-arango-coordinator", found); if (! found) { generateError(triagens::rest::HttpResponse::BAD, (int) triagens::rest::HttpResponse::BAD, - "header 'x-arango-coordinator' is missing"); + "header 'X-Arango-Coordinator' is missing"); return HANDLER_DONE; } - char const* operation = _request->header("x-arango-operation", found); - if (! found) { - generateError(triagens::rest::HttpResponse::BAD, - (int) triagens::rest::HttpResponse::BAD, - "header 'x-arango-operation' is missing"); - } - - char const* url = _request->header("x-arango-url", found); - if (! found) { - generateError(triagens::rest::HttpResponse::BAD, - (int) triagens::rest::HttpResponse::BAD, - "header 'x-arango-url' is missing"); - } -*/ + string coordinatorHeader = _coordinator; + string result = ClusterComm::instance()->processAnswer(coordinatorHeader, + stealRequest()); -/* - triagens::rest::HttpHandler* handler = this->_server->createHandler(_request); - triagens::rest::Job* job = new triagens::rest::GeneralServerJob(0, handler, true); - - _dispatcher->addJob(job); -*/ - // respond with a 202 - _response = createResponse(triagens::rest::HttpResponse::ACCEPTED); + if (result == "") { + _response = createResponse(triagens::rest::HttpResponse::ACCEPTED); + } + else { + generateError(triagens::rest::HttpResponse::BAD, + (int) triagens::rest::HttpResponse::BAD, + result.c_str()); + } return HANDLER_DONE; } diff --git a/arangod/RestServer/ArangoServer.cpp b/arangod/RestServer/ArangoServer.cpp index 261649b3df..83afd434f2 100644 --- a/arangod/RestServer/ArangoServer.cpp +++ b/arangod/RestServer/ArangoServer.cpp @@ -75,6 +75,7 @@ #ifdef TRI_ENABLE_CLUSTER #include "Cluster/ApplicationCluster.h" #include "Cluster/RestShardHandler.h" +#include "Cluster/ClusterComm.h" #endif #include "V8/V8LineEditor.h" @@ -555,7 +556,12 @@ void ArangoServer::buildApplicationServer () { // endpoint server // ............................................................................. - _jobManager = new AsyncJobManager(&TRI_NewTickServer); +#ifdef TRI_ENABLE_CLUSTER + _jobManager = new AsyncJobManager(&TRI_NewTickServer, + ClusterCommRestCallback); +#else + _jobManager = new AsyncJobManager(&TRI_NewTickServer, 0); +#endif _applicationEndpointServer = new ApplicationEndpointServer(_applicationServer, _applicationScheduler, diff --git a/arangod/V8Server/v8-actions.cpp b/arangod/V8Server/v8-actions.cpp index 29d13080ca..c901bd0a97 100644 --- a/arangod/V8Server/v8-actions.cpp +++ b/arangod/V8Server/v8-actions.cpp @@ -934,9 +934,6 @@ static v8::Handle JS_ShardingTest (v8::Arguments const& argv) { } map* headerFields = new map; - (*headerFields)["X-ClientTransactionID"] = clientTransactionId; - (*headerFields)["X-Arango-Async"] = "store"; - (*headerFields)["X-Arango-Coordinator"] = ServerState::instance()->getAddress(); ClusterCommResult const* res = cc->asyncRequest(clientTransactionId, TRI_NewTickServer(), "shardBlubb", @@ -968,7 +965,7 @@ static v8::Handle JS_ShardingTest (v8::Arguments const& argv) { } LOG_DEBUG("JS_ShardingTest: request not yet sent"); - usleep(500000); + usleep(50000); } LOG_DEBUG("JS_ShardingTest: request has been sent"); diff --git a/lib/HttpServer/AsyncJobManager.h b/lib/HttpServer/AsyncJobManager.h index bc30232e66..0885663ae4 100644 --- a/lib/HttpServer/AsyncJobManager.h +++ b/lib/HttpServer/AsyncJobManager.h @@ -50,11 +50,11 @@ namespace triagens { // ----------------------------------------------------------------------------- public: - AsyncCallbackContext (std::string const& url) - : _url(url), + AsyncCallbackContext (std::string const& coordHeader) + : _coordHeader(coordHeader), _response(0) { - std::cout << "generated async context " << _url << std::endl; + std::cout << "generated async context " << _coordHeader << std::endl; } ~AsyncCallbackContext () { @@ -69,16 +69,8 @@ namespace triagens { public: - bool callback (HttpResponse* response) { - if (response == 0) { - return false; - } - - _response = response; - - std::cout << "callback called for async context " << _url << ", BODY: " << _response->body().c_str() << std::endl; - - return true; + string& getCoordinatorHeader() { + return _coordHeader; } // ----------------------------------------------------------------------------- @@ -87,7 +79,7 @@ namespace triagens { private: - std::string _url; + std::string _coordHeader; HttpResponse* _response; }; @@ -216,10 +208,12 @@ namespace triagens { /// @brief constructor //////////////////////////////////////////////////////////////////////////////// - AsyncJobManager (uint64_t (*idFunc)()) + AsyncJobManager (uint64_t (*idFunc)(), + void (*callbackFunc)(string&, HttpResponse*)) : _lock(), _jobs(), - generate(idFunc) { + generate(idFunc), + callback(callbackFunc) { } //////////////////////////////////////////////////////////////////////////////// @@ -412,11 +406,12 @@ namespace triagens { char const* hdr = job->getHandler()->getRequest()->header("x-arango-coordinator", found); if (found) { - std::cout << "FOUND COORDINATOR HEADER\n"; + std::cout << "FOUND COORDINATOR HEADER\n"; ctx = new AsyncCallbackContext(std::string(hdr)); } - AsyncJobResult ajr(*jobId, 0, TRI_microtime(), AsyncJobResult::JOB_PENDING, ctx); + AsyncJobResult ajr(*jobId, 0, TRI_microtime(), + AsyncJobResult::JOB_PENDING, ctx); WRITE_LOCKER(_lock); @@ -471,8 +466,8 @@ namespace triagens { } // if callback is set, execute it now (outside of the wr-lock) - if (ctx != 0 && response != 0) { - ctx->callback(response); + if (ctx != 0 && response != 0 && 0 != callback) { + callback(ctx->getCoordinatorHeader(), response); } } @@ -500,6 +495,12 @@ namespace triagens { uint64_t (*generate)(); +//////////////////////////////////////////////////////////////////////////////// +/// @brief function pointer for callback registered at initialisation +//////////////////////////////////////////////////////////////////////////////// + + void (*callback)(string& coordinator, HttpResponse* response); + }; } diff --git a/lib/HttpServer/HttpHandler.cpp b/lib/HttpServer/HttpHandler.cpp index 6faf8a12df..f21ea860ec 100644 --- a/lib/HttpServer/HttpHandler.cpp +++ b/lib/HttpServer/HttpHandler.cpp @@ -100,6 +100,16 @@ HttpResponse* HttpHandler::stealResponse () { return tmp; } +//////////////////////////////////////////////////////////////////////////////// +/// @brief steal the request +//////////////////////////////////////////////////////////////////////////////// + +HttpRequest* HttpHandler::stealRequest () { + HttpRequest* tmp = _request; + _request = 0; + return tmp; +} + //////////////////////////////////////////////////////////////////////////////// /// @} //////////////////////////////////////////////////////////////////////////////// diff --git a/lib/HttpServer/HttpHandler.h b/lib/HttpServer/HttpHandler.h index 3d8b8c48c5..bef1c98909 100644 --- a/lib/HttpServer/HttpHandler.h +++ b/lib/HttpServer/HttpHandler.h @@ -79,7 +79,7 @@ namespace triagens { /// @brief constructs a new handler /// /// Note that the handler owns the request and the response. It is its -/// responsibility to destroy them both. +/// responsibility to destroy them both. See also the two steal methods. //////////////////////////////////////////////////////////////////////////////// explicit @@ -149,6 +149,12 @@ namespace triagens { return _request; } +//////////////////////////////////////////////////////////////////////////////// +/// @brief steal the pointer to the request +//////////////////////////////////////////////////////////////////////////////// + + HttpRequest* stealRequest (); + //////////////////////////////////////////////////////////////////////////////// /// {@inheritDoc} ////////////////////////////////////////////////////////////////////////////////