1
0
Fork 0

Merge branch 'sharding' of https://github.com/triAGENS/ArangoDB into sharding

This commit is contained in:
Jan Steemann 2014-01-03 13:53:08 +01:00
commit a4efa9fe1b
7 changed files with 266 additions and 89 deletions

View File

@ -25,3 +25,28 @@ monitoring of the server.
@CLEARPAGE
@anchor HttpSystemAdminStatisticsDescription
@copydetails JSF_get_admin_statistics_description
@CLEARPAGE
@anchor HttpSystemAdminDescriptionShardingGET
@copydetails JSF_sharding_test_GET
@CLEARPAGE
@anchor HttpSystemAdminDescriptionShardingPUT
@copydetails JSF_sharding_test_PUT
@CLEARPAGE
@anchor HttpSystemAdminDescriptionShardingPOST
@copydetails JSF_sharding_test_POST
@CLEARPAGE
@anchor HttpSystemAdminDescriptionShardingDELETE
@copydetails JSF_sharding_test_DELETE
@CLEARPAGE
@anchor HttpSystemAdminDescriptionShardingPATCH
@copydetails JSF_sharding_test_PATCH
@CLEARPAGE
@anchor HttpSystemAdminDescriptionShardingHEAD
@copydetails JSF_sharding_test_HEAD

View File

@ -7,3 +7,9 @@ TOC {#HttpSystemTOC}
- @ref HttpSystemFlushServerModules "POST /_admin/modules/flush"
- @ref HttpSystemAdminStatistics "GET /_admin/statistics"
- @ref HttpSystemAdminStatisticsDescription "GET /_admin/statistics-descriptions"
- @ref HttpSystemAdminDescriptionShardingGET "GET /_admin/sharding-test"
- @ref HttpSystemAdminDescriptionShardingPUT "PUT /_admin/sharding-test"
- @ref HttpSystemAdminDescriptionShardingPOST "POST /_admin/sharding-test"
- @ref HttpSystemAdminDescriptionShardingDELETE "DELETE /_admin/sharding-test"
- @ref HttpSystemAdminDescriptionShardingPATCH "PATCH /_admin/sharding-test"
- @ref HttpSystemAdminDescriptionShardingHEAD "HEAD /_admin/sharding-test"

View File

@ -357,7 +357,7 @@ ClusterCommResult* ClusterComm::asyncRequest (
list<ClusterCommOperation*>::iterator i = toSend.end();
toSendByOpID[op->operationID] = --i;
}
LOG_TRACE("In asyncRequest, put into queue %ld", op->operationID);
LOG_DEBUG("In asyncRequest, put into queue %ld", op->operationID);
somethingToSend.signal();
return res;
@ -392,7 +392,7 @@ ClusterCommResult* ClusterComm::syncRequest (
: currentTime+timeout;
res->serverID = ClusterInfo::instance()->getResponsibleServer(shardID);
LOG_TRACE("Responsible server: %s", res->serverID.c_str());
LOG_DEBUG("Responsible server: %s", res->serverID.c_str());
if (res->serverID == "") {
res->status = CL_COMM_ERROR;
@ -407,12 +407,12 @@ ClusterCommResult* ClusterComm::syncRequest (
}
else {
if (0 != body) {
LOG_TRACE("sending %s request to DB server '%s': %s",
LOG_DEBUG("sending %s request to DB server '%s': %s",
triagens::rest::HttpRequest::translateMethod(reqtype).c_str(),
res->serverID.c_str(), body);
}
else {
LOG_TRACE("sending %s request to DB server '%s'",
LOG_DEBUG("sending %s request to DB server '%s'",
triagens::rest::HttpRequest::translateMethod(reqtype).c_str(),
res->serverID.c_str());
}
@ -690,7 +690,7 @@ void ClusterComm::asyncAnswer (string& coordinatorHeader,
size_t start = 0;
size_t pos;
LOG_TRACE("In asyncAnswer, seeing %s", coordinatorHeader.c_str());
LOG_DEBUG("In asyncAnswer, seeing %s", coordinatorHeader.c_str());
pos = coordinatorHeader.find(":",start);
if (pos == string::npos) {
LOG_ERROR("Could not find coordinator ID in X-Arango-Coordinator");
@ -712,7 +712,7 @@ void ClusterComm::asyncAnswer (string& coordinatorHeader,
char const* body = responseToSend->body().c_str();
size_t len = responseToSend->body().length();
LOG_TRACE("asyncAnswer: sending PUT request to DB server '%s'",
LOG_DEBUG("asyncAnswer: sending PUT request to DB server '%s'",
coordinatorID.c_str());
triagens::httpclient::SimpleHttpClient* client
@ -744,7 +744,7 @@ string ClusterComm::processAnswer(string& coordinatorHeader,
size_t start = 0;
size_t pos;
LOG_TRACE("In processAnswer, seeing %s", coordinatorHeader.c_str());
LOG_DEBUG("In processAnswer, seeing %s", coordinatorHeader.c_str());
pos = coordinatorHeader.find(":",start);
if (pos == string::npos) {
@ -819,7 +819,7 @@ bool ClusterComm::moveFromSendToReceived (OperationID operationID) {
IndexIterator i;
ClusterCommOperation* op;
LOG_TRACE("In moveFromSendToReceived %ld", operationID);
LOG_DEBUG("In moveFromSendToReceived %ld", operationID);
basics::ConditionLocker locker(&somethingReceived);
basics::ConditionLocker sendlocker(&somethingToSend);
i = toSendByOpID.find(operationID); // cannot fail
@ -910,7 +910,7 @@ void ClusterCommThread::run () {
ClusterCommOperation* op;
ClusterComm* cc = ClusterComm::instance();
LOG_TRACE("starting ClusterComm thread");
LOG_DEBUG("starting ClusterComm thread");
while (0 == _stop) {
// First check the sending queue, as long as it is not empty, we send
@ -925,7 +925,7 @@ void ClusterCommThread::run () {
break;
}
else {
LOG_TRACE("Noticed something to send");
LOG_DEBUG("Noticed something to send");
op = cc->toSend.front();
assert(op->status == CL_COMM_SUBMITTED);
op->status = CL_COMM_SENDING;
@ -945,7 +945,7 @@ void ClusterCommThread::run () {
// First find the server to which the request goes from the shardID:
ServerID server = ClusterInfo::instance()->getResponsibleServer(
op->shardID);
LOG_TRACE("Responsible server: %s", server.c_str());
LOG_DEBUG("Responsible server: %s", server.c_str());
if (server == "") {
op->status = CL_COMM_ERROR;
}
@ -960,12 +960,12 @@ void ClusterCommThread::run () {
}
else {
if (0 != op->body) {
LOG_TRACE("sending %s request to DB server '%s': %s",
LOG_DEBUG("sending %s request to DB server '%s': %s",
triagens::rest::HttpRequest::translateMethod(op->reqtype)
.c_str(), server.c_str(), op->body);
}
else {
LOG_TRACE("sending %s request to DB server '%s'",
LOG_DEBUG("sending %s request to DB server '%s'",
triagens::rest::HttpRequest::translateMethod(op->reqtype)
.c_str(), server.c_str());
}
@ -1014,8 +1014,6 @@ void ClusterCommThread::run () {
}
}
LOG_TRACE("ClusterComm alive");
// Finally, wait for some time or until something happens using
// the condition variable:
{
@ -1027,7 +1025,7 @@ void ClusterCommThread::run () {
// another thread is waiting for this value to shut down properly
_stop = 2;
LOG_TRACE("stopped ClusterComm thread");
LOG_DEBUG("stopped ClusterComm thread");
}
// -----------------------------------------------------------------------------

View File

@ -919,9 +919,9 @@ class CallbackTest : public ClusterCommCallback {
CallbackTest(string msg) : _msg(msg) {}
virtual ~CallbackTest() {}
virtual bool operator() (ClusterCommResult* res) {
LOG_TRACE("ClusterCommCallback called on operation %ld",
LOG_DEBUG("ClusterCommCallback called on operation %ld",
res->operationID);
LOG_TRACE("Message: %s", _msg.c_str());
LOG_DEBUG("Message: %s", _msg.c_str());
return false; // Keep it in the queue
}
};
@ -1001,6 +1001,8 @@ static v8::Handle<v8::Value> JS_ShardingTest (v8::Arguments const& argv) {
ClusterCommResult const* res;
v8::Handle<v8::Object> r = v8::Object::New();
if (asyncMode) {
res = cc->asyncRequest(clientTransactionId, TRI_NewTickServer(), shard,
reqType, path, body.c_str(), body.size(), headerFields,
@ -1011,7 +1013,7 @@ static v8::Handle<v8::Value> JS_ShardingTest (v8::Arguments const& argv) {
"couldn't queue async request");
}
LOG_TRACE("JS_ShardingTest: request has been submitted");
LOG_DEBUG("JS_ShardingTest: request has been submitted");
OperationID opID = res->operationID;
delete res;
@ -1030,55 +1032,99 @@ static v8::Handle<v8::Value> JS_ShardingTest (v8::Arguments const& argv) {
if (status >= CL_COMM_SENT) {
break;
}
LOG_TRACE("JS_ShardingTest: request not yet sent");
LOG_DEBUG("JS_ShardingTest: request not yet sent");
usleep(50000);
}
LOG_TRACE("JS_ShardingTest: request has been sent, status: %d",status);
LOG_DEBUG("JS_ShardingTest: request has been sent, status: %d",status);
res = cc->wait("", 0, opID, "");
if (0 == res) {
r->Set(v8::String::New("errorMsg"),v8::String::New("out of memory"));
LOG_DEBUG("JS_ShardingTest: out of memory");
}
else if (res->status == CL_COMM_TIMEOUT) {
r->Set(v8::String::New("timeout"),v8::BooleanObject::New(true));
LOG_DEBUG("JS_ShardingTest: timeout");
}
else if (res->status == CL_COMM_ERROR) {
r->Set(v8::String::New("errorMessage"),
v8::String::New("could not send request, DBServer gone"));
LOG_DEBUG("JS_ShardingTest: communications error");
}
else if (res->status == CL_COMM_DROPPED) {
// Note that this can basically not happen
r->Set(v8::String::New("errorMessage"),
v8::String::New("request dropped whilst waiting for answer"));
LOG_DEBUG("JS_ShardingTest: dropped");
}
else { // Everything is OK
// The headers:
v8::Handle<v8::Object> h = v8::Object::New();
map<string,string> headers = res->answer->headers();
map<string,string>::iterator i;
for (i = headers.begin(); i != headers.end(); ++i) {
h->Set(v8::String::New(i->first.c_str()),
v8::String::New(i->second.c_str()));
}
r->Set(v8::String::New("headers"), h);
// The body:
if (0 != res->answer->body()) {
r->Set(v8::String::New("body"), v8::String::New(res->answer->body(),
res->answer->bodySize()));
}
LOG_DEBUG("JS_ShardingTest: success");
}
}
else { // synchronous mode
res = cc->syncRequest(clientTransactionId, TRI_NewTickServer(), shard,
reqType, path, body.c_str(), body.size(),
*headerFields, timeout);
delete headerFields;
}
LOG_DEBUG("JS_ShardingTest: request has been sent synchronously, "
"status: %d",res->status);
v8::Handle<v8::Object> r = v8::Object::New();
if (0 == res) {
r->Set(v8::String::New("errorMsg"),v8::String::New("out of memory"));
}
else if (res->status == CL_COMM_TIMEOUT) {
r->Set(v8::String::New("timeout"),v8::BooleanObject::New(true));
}
else if (res->status == CL_COMM_ERROR) {
r->Set(v8::String::New("errorMessage"),
v8::String::New("could not send request, DBServer gone"));
}
else if (res->status == CL_COMM_DROPPED) {
// Note that this can basically not happen
r->Set(v8::String::New("errorMessage"),
v8::String::New("request dropped whilst waiting for answer"));
}
else { // Everything is OK
// The headers:
v8::Handle<v8::Object> h = v8::Object::New();
map<string,string> headers = res->answer->headers();
map<string,string>::iterator i;
for (i = headers.begin(); i != headers.end(); ++i) {
h->Set(v8::String::New(i->first.c_str()),
v8::String::New(i->second.c_str()));
if (0 == res) {
r->Set(v8::String::New("errorMsg"),v8::String::New("out of memory"));
LOG_DEBUG("JS_ShardingTest: out of memory");
}
r->Set(v8::String::New("headers"), h);
else if (res->status == CL_COMM_TIMEOUT) {
r->Set(v8::String::New("timeout"),v8::BooleanObject::New(true));
LOG_DEBUG("JS_ShardingTest: timeout");
}
else if (res->status == CL_COMM_ERROR) {
r->Set(v8::String::New("errorMessage"),
v8::String::New("could not send request, DBServer gone"));
LOG_DEBUG("JS_ShardingTest: communications error");
}
else if (res->status == CL_COMM_DROPPED) {
// Note that this can basically not happen
r->Set(v8::String::New("errorMessage"),
v8::String::New("request dropped whilst waiting for answer"));
LOG_DEBUG("JS_ShardingTest: dropped");
}
else { // Everything is OK
// The headers:
v8::Handle<v8::Object> h = v8::Object::New();
map<string,string> headers = res->result->getHeaderFields();
map<string,string>::iterator i;
for (i = headers.begin(); i != headers.end(); ++i) {
h->Set(v8::String::New(i->first.c_str()),
v8::String::New(i->second.c_str()));
}
r->Set(v8::String::New("headers"), h);
// The body:
string theBody = res->result->getBody().str();
r->Set(v8::String::New("body"), v8::String::New(theBody.c_str(),
theBody.size()));
LOG_DEBUG("JS_ShardingTest: success");
// The body:
if (0 != res->answer->body()) {
r->Set(v8::String::New("body"), v8::String::New(res->answer->body(),
res->answer->bodySize()));
}
}
delete headerFields;
if (0 != res) {
delete res;

View File

@ -803,14 +803,122 @@ actions.defineHttp({
});
////////////////////////////////////////////////////////////////////////////////
/// @fn JSF_sharding_test
/// @brief executes a test function for sharding
/// @fn JSF_sharding_test_GET
/// @brief executes a cluster roundtrip for sharding
///
/// @RESTHEADER{POST /_admin/sharding-test,executes a test function}
/// @RESTHEADER{GET /_admin/sharding-test,executes a cluster roundtrip}
///
/// @RESTDESCRIPTION
///
/// Executes a test function
/// Executes a cluster roundtrip from a coordinator to a DB server and
/// back. This call only works in a coordinator node in a cluster.
/// One can and should append an arbitrary path to the URL and the
/// part after `/_admin/sharding-test` is used as the path of the HTTP
/// request which is sent from the coordinator to a DB node. Likewise,
/// any form data appended to the URL is forwarded in the request to the
/// DB node. This handler takes care of all request types (see below)
/// and uses the same request type in its request to the DB node.
///
/// The following HTTP headers are interpreted in a special way:
///
/// - `X-Shard-ID`: This specifies the ID of the shard to which the
/// cluster request is sent and thus tells the system to which DB server
/// to send the cluster request. Note that the mapping from the
/// shard ID to the responsible server has to be defined in the
/// agency under `Current/ShardLocation/<shardID>`. One has to give
/// this header, otherwise the system does not know where to send
/// the request.
/// - `X-Client-Transaction-ID`: the value of this header is taken
/// as the client transaction ID for the request
/// - `X-Timeout`: specifies a timeout in seconds for the cluster
/// operation. If the answer does not arrive within the specified
/// timeout, an corresponding error is returned and any subsequent
/// real answer is ignored. The default if not given is 24 hours.
/// - `X-Synchronous-Mode`: If set to `true` the test function uses
/// synchronous mode, otherwise the default asynchronous operation
/// mode is used. This is mainly for debugging purposes.
/// - `Host`: This header is ignored and not forwarded to the DB server.
/// - `User-Agent`: This header is ignored and not forwarded to the DB
/// server.
///
/// All other HTTP headers and the body of the request (if present, see
/// other HTTP methods below) are forwarded as given in the original request.
///
/// In asynchronous mode the DB server answers with an HTTP request of its
/// own, in synchronous mode it sends a HTTP response. In both cases the
/// headers and the body are used to produce the HTTP response of this
/// API call.
///
/// @RESTRETURNCODES
///
/// The return code can be anything the cluster request returns, as well as:
///
/// @RESTRETURNCODE{200}
/// is returned when everything went well, or if a timeout occurred. In the
/// latter case a body of type application/json indicating the timeout
/// is returned.
///
/// @RESTRETURNCODE{403}
/// is returned if ArangoDB is not running in cluster mode.
///
/// @RESTRETURNCODE{404}
/// is returned if ArangoDB was not compiled for cluster operation.
////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////
/// @fn JSF_sharding_test_POST
/// @brief executes a cluster roundtrip for sharding
///
/// @RESTHEADER{POST /_admin/sharding-test,executes a cluster roundtrip}
///
/// @RESTBODYPARAM{body,anything,required}
///
/// @RESTDESCRIPTION
/// See GET method. The body can be any type and is simply forwarded.
////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////
/// @fn JSF_sharding_test_PUT
/// @brief executes a cluster roundtrip for sharding
///
/// @RESTHEADER{PUT /_admin/sharding-test,executes a cluster roundtrip}
///
/// @RESTBODYPARAM{body,anything,required}
///
/// @RESTDESCRIPTION
/// See GET method. The body can be any type and is simply forwarded.
////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////
/// @fn JSF_sharding_test_DELETE
/// @brief executes a cluster roundtrip for sharding
///
/// @RESTHEADER{DELETE /_admin/sharding-test,executes a cluster roundtrip}
///
/// @RESTDESCRIPTION
/// See GET method. The body can be any type and is simply forwarded.
////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////
/// @fn JSF_sharding_test_PATCH
/// @brief executes a cluster roundtrip for sharding
///
/// @RESTHEADER{PATCH/_admin/sharding-test,executes a cluster roundtrip}
///
/// @RESTBODYPARAM{body,anything,required}
///
/// @RESTDESCRIPTION
/// See GET method. The body can be any type and is simply forwarded.
////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////
/// @fn JSF_sharding_test_HEAD
/// @brief executes a cluster roundtrip for sharding
///
/// @RESTHEADER{HEAD/_admin/sharding-test,executes a cluster roundtrip}
///
/// @RESTDESCRIPTION
/// See GET method. The body can be any type and is simply forwarded.
////////////////////////////////////////////////////////////////////////////////
actions.defineHttp({
@ -830,18 +938,13 @@ actions.defineHttp({
var shard = "";
for (var p in req.parameters) {
if (req.parameters.hasOwnProperty(p)) {
if (p === "shardID") {
shard = req.parameters[p];
if (params === "") {
params = "?";
}
else {
if (params === "") {
params = "?";
}
else {
params += "&";
}
params += p+"="+String(req.parameters[p])
params += "&";
}
params += p+"="+String(req.parameters[p])
}
}
if (params !== "") {
@ -853,21 +956,21 @@ actions.defineHttp({
var asyncMode = true;
for (var p in req.headers) {
if (req.headers.hasOwnProperty(p)) {
if (p === "host" || p === "user-agent") {
// We ignore these
}
else if (p === "clientTransactionID") {
if (p === "x-client-transaction-id") {
transID = req.headers[p];
}
else if (p === "timeout") {
else if (p === "x-timeout") {
timeout = parseFloat(req.headers[p]);
if (isNaN(timeout)) {
timeout = 24*3600.0;
}
}
else if (p === "synchronousMode") {
else if (p === "x-synchronous-mode") {
asyncMode = false;
}
else if (p === "x-shard-id") {
shard = req.headers[p];
}
else {
headers[p] = req.headers[p];
}

View File

@ -403,7 +403,7 @@ namespace triagens {
char const* hdr = job->getHandler()->getRequest()->header("x-arango-coordinator", found);
if (found) {
LOG_TRACE("Found header X-Arango-Coordinator in async request");
LOG_DEBUG("Found header X-Arango-Coordinator in async request");
ctx = new AsyncCallbackContext(std::string(hdr));
}

View File

@ -39,41 +39,40 @@ function set() {
if [ "$1" == "init" ] ; then
$CURL -X DELETE "$URL$PREFIX?recursive=true" > /dev/null
set Target/Version 1
set Plan/Version 1
set Current/Version 1
set Target/Lock UNLOCKED
set Plan/Lock UNLOCKED
set Current/Lock UNLOCKED
set Target/MapLocalToID
set Target/MapIDToEndpoint
set Target/Version 1
set Target/DBServers
set Target/Coordinators
set Target/Databases
set Target/Collections
set Target/ShardLocation
set Plan/Version 1
set Plan/DBServers
set Plan/Coordinators
set Plan/Databases
set Plan/Collections
set Plan/ShardLocation
set Target/Databases
set Plan/Databases
set Current/Databases
set Target/Collections
set Plan/Collections
set Current/Collections
set Current/Version 1
set Current/ServersRegistered
set Current/DBServers
set Current/Coordinators
set Current/Databases
set Current/Collections
set Current/ShardLocation
set Current/ShardsCopied
set Sync/ServerStates
set Sync/Problems
set Sync/ClusterManager none
set Sync/LatestID 0
set Sync/Commands
echo Initialisation complete.
fi