1
0
Fork 0

some bugfixes for asyncRequest etc

This commit is contained in:
Jan Steemann 2013-12-20 18:03:55 +01:00
parent 5a3b49a7bc
commit 16e16f705c
7 changed files with 82 additions and 14 deletions

View File

@ -201,6 +201,9 @@ bool ApplicationCluster::start () {
_myId.c_str());
}
// register our own address
ServerState::instance()->setAddress(_myAddress);
// now we can validate --cluster.my-address
const string unified = triagens::rest::Endpoint::getUnifiedForm(_myAddress);

View File

@ -125,6 +125,8 @@ ClusterComm::getConnection(ServerID& serverID) {
allConnections[serverID] = s;
}
}
assert(s != 0);
// Now get an unused one:
{
@ -138,6 +140,7 @@ ClusterComm::getConnection(ServerID& serverID) {
// We need to open a new one:
string a = ClusterState::instance()->getServerEndpoint(serverID);
if (a == "") {
// Unknown server address, probably not yet connected
return 0;
@ -305,6 +308,7 @@ ClusterCommResult* ClusterComm::asyncRequest (
op->shardID = shardID;
op->serverID = ClusterState::instance()->getResponsibleServer(
shardID);
op->status = CL_COMM_SUBMITTED;
op->reqtype = reqtype;
op->path = path;
@ -312,8 +316,8 @@ ClusterCommResult* ClusterComm::asyncRequest (
op->bodyLength = bodyLength;
op->headerFields = headerFields;
op->callback = callback;
op->timeout = timeout == 0.0 ? 1e50 : timeout;
op->timeout = timeout == 0.0 ? 3600.0 : timeout;
ClusterCommResult* res = new ClusterCommResult();
*res = *static_cast<ClusterCommResult*>(op);
@ -677,9 +681,9 @@ void ClusterCommThread::run () {
= cc->getConnection(server);
if (0 == connection) {
op->status = CL_COMM_ERROR;
LOG_ERROR("cannot create connection to server '%s'", server.c_str());
}
else {
LOG_TRACE("sending %s request to DB server '%s': %s",
triagens::rest::HttpRequest::translateMethod(op->reqtype).c_str(),
server.c_str(), op->body);

View File

@ -51,7 +51,9 @@ static ServerState* Instance = 0;
////////////////////////////////////////////////////////////////////////////////
ServerState::ServerState ()
: _lock(),
: _id(),
_address(),
_lock(),
_role(ROLE_UNDEFINED),
_state(STATE_UNDEFINED) {

View File

@ -173,6 +173,26 @@ namespace triagens {
_id = id;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief get the server address
////////////////////////////////////////////////////////////////////////////////
inline std::string getAddress () const {
return _address;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief set the server address
////////////////////////////////////////////////////////////////////////////////
void setAddress (std::string const& address) {
// address can be set just once
assert(_address.empty());
assert(! address.empty());
_address = address;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief get the current state
////////////////////////////////////////////////////////////////////////////////
@ -215,6 +235,12 @@ namespace triagens {
std::string _id;
////////////////////////////////////////////////////////////////////////////////
/// @brief the server's own address. can be set just once
////////////////////////////////////////////////////////////////////////////////
std::string _address;
////////////////////////////////////////////////////////////////////////////////
/// @brief r/w lock for state
////////////////////////////////////////////////////////////////////////////////

View File

@ -41,6 +41,14 @@
#include "V8/v8-utils.h"
#include "V8Server/ApplicationV8.h"
#include "V8Server/v8-vocbase.h"
#include "VocBase/server.h"
#ifdef TRI_ENABLE_CLUSTER
#include "Cluster/ClusterComm.h"
#include "Cluster/ServerState.h"
#endif
using namespace std;
using namespace triagens::basics;
@ -904,8 +912,6 @@ static v8::Handle<v8::Value> JS_ExecuteGlobalContextFunction (v8::Arguments cons
#ifdef TRI_ENABLE_CLUSTER
#include "Cluster/ClusterComm.h"
static v8::Handle<v8::Value> JS_ShardingTest (v8::Arguments const& argv) {
v8::Isolate* isolate;
@ -919,30 +925,52 @@ static v8::Handle<v8::Value> JS_ShardingTest (v8::Arguments const& argv) {
TRI_V8_EXCEPTION_USAGE(scope, "SYS_SHARDING_TEST(<req>, <res>)");
}
const string clientTransactionId = StringUtils::itoa(TRI_NewTickServer());
ClusterComm* cc = ClusterComm::instance();
if (cc == 0) {
TRI_V8_EXCEPTION_MESSAGE(scope, TRI_ERROR_INTERNAL, "clustercomm object not found");
}
map<string, string>* headerFields = new map<string, string>;
(*headerFields)["X-ClientTransactionID"] = "BlaBlubb";
(*headerFields)["X-ClientTransactionID"] = clientTransactionId;
(*headerFields)["X-Arango-Async"] = "store";
(*headerFields)["X-Arango-Coordinator"] = ServerState::instance()->getAddress();
ClusterCommResult const* res =
cc->asyncRequest("ClientBla", 12345, "shardBlubb",
cc->asyncRequest(clientTransactionId, TRI_NewTickServer(), "shardBlubb",
triagens::rest::HttpRequest::HTTP_REQUEST_GET,
"/_admin/time", NULL, 0, headerFields, 0, 0);
OperationID opID = res->operationID;
if (res == 0) {
TRI_V8_EXCEPTION_MESSAGE(scope, TRI_ERROR_INTERNAL, "couldn't queue async request");
}
LOG_DEBUG("JS_ShardingTest: request has been submitted");
OperationID opID = res->operationID;
delete res;
// Wait until the request has actually been sent:
while (true) {
res = cc->enquire(opID);
if (res->status >= CL_COMM_SENT) {
delete res;
if (res == 0) {
TRI_V8_EXCEPTION_MESSAGE(scope, TRI_ERROR_INTERNAL, "couldn't enquire operation");
}
ClusterCommOpStatus status = res->status;
delete res;
if (status >= CL_COMM_SENT) {
break;
}
delete res;
LOG_DEBUG("JS_ShardingTest: request not yet sent");
usleep(1000000);
usleep(500000);
}
LOG_DEBUG("JS_ShardingTest: request has been sent");
cc->drop("", 0, opID, "");

View File

@ -407,10 +407,12 @@ namespace triagens {
AsyncCallbackContext* ctx = 0;
std::cout << "ASYNC REQUEST\n";
bool found;
char const* hdr = job->getHandler()->getRequest()->header("x-arango-coordinator", found);
if (found) {
std::cout << "FOUND COORDINATOR HEADER\n";
ctx = new AsyncCallbackContext(std::string(hdr));
}

View File

@ -350,7 +350,10 @@ namespace triagens {
else {
_writeBuffer.appendText("\r\n");
}
_writeBuffer.appendText(body, bodyLength);
if (body != 0) {
_writeBuffer.appendText(body, bodyLength);
}
LOG_TRACE("Request: %s", _writeBuffer.c_str());