1
0
Fork 0

Merge branch 'sharding' of https://github.com/triAGENS/ArangoDB into sharding

This commit is contained in:
Jan Steemann 2014-01-27 09:45:52 +01:00
commit f232becd2e
12 changed files with 343 additions and 5 deletions

View File

@ -141,6 +141,141 @@ int createDocumentOnCoordinator (
return TRI_ERROR_NO_ERROR;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief deletes a document in a coordinator
////////////////////////////////////////////////////////////////////////////////
int deleteDocumentOnCoordinator (
string const& dbname,
string const& collname,
string const& key,
TRI_voc_rid_t const rev,
TRI_doc_update_policy_e policy,
bool waitForSync,
triagens::rest::HttpResponse::HttpResponseCode& responseCode,
string& contentType,
string& resultBody) {
// Set a few variables needed for our work:
ClusterInfo* ci = ClusterInfo::instance();
ClusterComm* cc = ClusterComm::instance();
// First determine the collection ID from the name:
CollectionInfo collinfo = ci->getCollection(dbname, collname);
if (collinfo.empty()) {
return TRI_ERROR_ARANGO_COLLECTION_NOT_FOUND;
}
string collid = StringUtils::itoa(collinfo.id());
// If _key is the one and only sharding attribute, we can do this quickly,
// because we can easily determine which shard is responsible for the
// document. Otherwise we have to contact all shards and ask them to
// delete the document. All but one will not know it.
// Now find the responsible shard:
TRI_json_t* json = TRI_CreateArrayJson(TRI_UNKNOWN_MEM_ZONE);
if (0 == json) {
return TRI_ERROR_OUT_OF_MEMORY;
}
TRI_Insert2ArrayJson(TRI_UNKNOWN_MEM_ZONE, json, "_key",
TRI_CreateStringReference2Json(TRI_UNKNOWN_MEM_ZONE,
key.c_str(), key.size()));
bool usesDefaultShardingAttributes;
ShardID shardID = ci->getResponsibleShard( collid, json, true,
usesDefaultShardingAttributes );
TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json);
// Some stuff to prepare cluster-intern requests:
ClusterCommResult* res;
string revstr;
if (rev != 0) {
revstr = "&rev="+StringUtils::itoa(rev);
}
string policystr;
if (policy == TRI_DOC_UPDATE_LAST_WRITE) {
policystr = "&policy=last";
}
if (usesDefaultShardingAttributes) {
// OK, this is the fast method, we only have to ask one shard:
if (shardID == "") {
return TRI_ERROR_SHARD_GONE;
}
// Send a synchronous request to that shard using ClusterComm:
map<string, string> headers;
res = cc->syncRequest("", TRI_NewTickServer(), "shard:"+shardID,
triagens::rest::HttpRequest::HTTP_REQUEST_DELETE,
"/_db/"+dbname+"/_api/document/"+
StringUtils::urlEncode(shardID)+"/"+key+
"?waitForSync="+(waitForSync ? "true" : "false")+
revstr+policystr, NULL, 0, headers, 60.0);
if (res->status == CL_COMM_TIMEOUT) {
// No reply, we give up:
delete res;
return TRI_ERROR_CLUSTER_TIMEOUT;
}
if (res->status == CL_COMM_ERROR) {
// This could be a broken connection or an Http error:
if (!res->result->isComplete()) {
delete res;
return TRI_ERROR_CLUSTER_CONNECTION_LOST;
}
// In this case a proper HTTP error was reported by the DBserver,
// this can be 400 or 404, we simply forward the result.
// We intentionally fall through here.
}
responseCode = static_cast<triagens::rest::HttpResponse::HttpResponseCode>
(res->result->getHttpReturnCode());
contentType = res->result->getContentType(false);
resultBody = res->result->getBody().str();
delete res;
return TRI_ERROR_NO_ERROR;
}
// If we get here, the sharding attributes are not only _key, therefore
// we have to contact everybody:
map<ShardID, ServerID> shards = collinfo.shardIds();
map<ShardID, ServerID>::iterator it;
CoordTransactionID coordTransactionID = TRI_NewTickServer();
for (it = shards.begin(); it != shards.end(); ++it) {
map<string, string>* headers = new map<string, string>;
res = cc->asyncRequest("", coordTransactionID, "shard:"+it->first,
triagens::rest::HttpRequest::HTTP_REQUEST_DELETE,
"/_db/"+dbname+"/_api/document/"+
StringUtils::urlEncode(it->first)+"/"+key+
"?waitForSync="+(waitForSync ? "true" : "false")+
revstr+policystr, NULL, 0, headers, NULL, 60.0);
delete res;
}
// Now listen to the results:
int count;
int nrok = 0;
for (count = shards.size(); count > 0; count--) {
res = cc->wait( "", coordTransactionID, 0, "", 0.0);
if (res->status == CL_COMM_RECEIVED) {
if (res->answer_code < triagens::rest::HttpResponse::BAD) {
nrok++;
responseCode = res->answer_code;
contentType = res->answer->header("content-type");
resultBody = string(res->answer->body(), res->answer->bodySize());
}
}
delete res;
}
if (nrok == 1) {
return TRI_ERROR_NO_ERROR;
}
if (nrok > 1) {
return TRI_ERROR_CLUSTER_GOT_CONTRADICTING_ANSWERS;
}
responseCode = triagens::rest::HttpResponse::NOT_FOUND;
contentType = "application/json; charset=utf-8";
resultBody = "{\"error\":true,\"errorMessage\":\"document not found\","
"\"errorNum\":404,\"code\":404}";
return TRI_ERROR_HTTP_NOT_FOUND;
}
} // namespace arango
} // namespace triagens

View File

@ -36,6 +36,7 @@
#include "SimpleHttpClient/SimpleHttpResult.h"
#include "SimpleHttpClient/SimpleHttpClient.h"
#include "VocBase/voc-types.h"
#include "VocBase/update-policy.h"
#include "Cluster/AgencyComm.h"
#include "Cluster/ClusterInfo.h"
@ -58,6 +59,20 @@ namespace triagens {
string& contentType,
string& resultBody);
////////////////////////////////////////////////////////////////////////////////
/// @brief delete a document in a coordinator
////////////////////////////////////////////////////////////////////////////////
int deleteDocumentOnCoordinator (
string const& dbname,
string const& collname,
string const& key,
TRI_voc_rid_t const rev,
TRI_doc_update_policy_e policy,
bool waitForSync,
triagens::rest::HttpResponse::HttpResponseCode& responseCode,
string& contentType,
string& resultBody);
// -----------------------------------------------------------------------------
// --SECTION-- public functions

View File

@ -1511,6 +1511,13 @@ bool RestDocumentHandler::deleteDocument () {
return false;
}
#ifdef TRI_ENABLE_CLUSTER
if (ServerState::instance()->isCoordinator()) {
return deleteDocumentCoordinator(collection, key, revision, policy,
waitForSync);
}
#endif
SingleCollectionWriteTransaction<StandaloneTransaction<RestTransactionContext>, 1> trx(_vocbase, _resolver, collection);
// .............................................................................
@ -1554,6 +1561,39 @@ bool RestDocumentHandler::deleteDocument () {
return true;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief deletes a document, coordinator case in a cluster
////////////////////////////////////////////////////////////////////////////////
#ifdef TRI_ENABLE_CLUSTER
bool RestDocumentHandler::deleteDocumentCoordinator (
string const& collname,
string const& key,
TRI_voc_rid_t const rev,
TRI_doc_update_policy_e policy,
bool waitForSync) {
string const& dbname = _request->originalDatabaseName();
triagens::rest::HttpResponse::HttpResponseCode responseCode;
string contentType;
string resultBody;
int error = triagens::arango::deleteDocumentOnCoordinator(
dbname, collname, key, rev, policy, waitForSync,
responseCode, contentType, resultBody);
if (error != TRI_ERROR_NO_ERROR) {
generateTransactionError(collname, error);
return false;
}
// Essentially return the response we got from the DBserver, be it
// OK or an error:
_response = createResponse(responseCode);
_response->setContentType(contentType);
_response->body().appendText(resultBody.c_str(), resultBody.size());
return responseCode >= triagens::rest::HttpResponse::BAD;
}
#endif
////////////////////////////////////////////////////////////////////////////////
/// @}
////////////////////////////////////////////////////////////////////////////////

View File

@ -117,9 +117,9 @@ namespace triagens {
/// @brief creates a document, coordinator case in a cluster
////////////////////////////////////////////////////////////////////////////////
bool createDocumentCoordinator (char const* collection,
bool waitForSync,
TRI_json_t* json);
bool createDocumentCoordinator (char const* collection,
bool waitForSync,
TRI_json_t* json);
////////////////////////////////////////////////////////////////////////////////
/// @brief creates a document
@ -175,6 +175,16 @@ bool createDocumentCoordinator (char const* collection,
virtual bool deleteDocument ();
////////////////////////////////////////////////////////////////////////////////
/// @brief delete a document, coordinator case in a cluster
////////////////////////////////////////////////////////////////////////////////
bool deleteDocumentCoordinator (string const& collname,
string const& key,
TRI_voc_rid_t const rev,
TRI_doc_update_policy_e policy,
bool waitForSync);
};
}
}

View File

@ -34,6 +34,12 @@
#include "VocBase/document-collection.h"
#include "VocBase/edge-collection.h"
#ifdef TRI_ENABLE_CLUSTER
#include "Cluster/ServerState.h"
#include "Cluster/ClusterInfo.h"
#include "Cluster/ClusterMethods.h"
#endif
using namespace std;
using namespace triagens::basics;
using namespace triagens::rest;
@ -218,6 +224,14 @@ bool RestEdgeHandler::createDocument () {
return false;
}
#ifdef TRI_ENABLE_CLUSTER
if (ServerState::instance()->isCoordinator()) {
// json will be freed inside!
return createDocumentCoordinator(collection, waitForSync, json,
from, to);
}
#endif
if (! checkCreateCollection(collection, getCollectionType())) {
TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json);
return false;
@ -318,6 +332,43 @@ bool RestEdgeHandler::createDocument () {
return true;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief creates a document (an edge), coordinator case in a cluster
////////////////////////////////////////////////////////////////////////////////
#ifdef TRI_ENABLE_CLUSTER
bool RestEdgeHandler::createDocumentCoordinator (string const& collname,
bool waitForSync,
TRI_json_t* json,
char const* from,
char const* to) {
string const& dbname = _request->originalDatabaseName();
triagens::rest::HttpResponse::HttpResponseCode responseCode;
string contentType;
string resultBody;
// Not yet implemented:
generateTransactionError(collname.c_str(), TRI_ERROR_INTERNAL);
return false;
int error = triagens::arango::createDocumentOnCoordinator(
dbname, collname, waitForSync, json,
responseCode, contentType, resultBody);
if (error != TRI_ERROR_NO_ERROR) {
generateTransactionError(collname.c_str(), error);
return false;
}
// Essentially return the response we got from the DBserver, be it
// OK or an error:
_response = createResponse(responseCode);
_response->setContentType(contentType);
_response->body().appendText(resultBody.c_str(), resultBody.size());
return responseCode >= triagens::rest::HttpResponse::BAD;
}
#endif
////////////////////////////////////////////////////////////////////////////////
/// @brief reads a single edge
///

View File

@ -113,6 +113,17 @@ namespace triagens {
bool createDocument ();
////////////////////////////////////////////////////////////////////////////////
/// @brief creates a document (an edge), coordinator case in a cluster
////////////////////////////////////////////////////////////////////////////////
#ifdef TRI_ENABLE_CLUSTER
bool createDocumentCoordinator (string const& collname,
bool waitForSync,
TRI_json_t* json,
char const* from,
char const* to);
#endif
};
}
}

View File

@ -1709,6 +1709,60 @@ static v8::Handle<v8::Value> UpdateVocbaseCol (const bool useCollection,
return scope.Close(result);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief deletes a document, coordinator case in a cluster
////////////////////////////////////////////////////////////////////////////////
#ifdef TRI_ENABLE_CLUSTER
static v8::Handle<v8::Value> RemoveVocbaseCol_Coordinator (
TRI_vocbase_col_t const* collection,
TRI_voc_key_t key,
TRI_voc_rid_t rid,
TRI_doc_update_policy_e policy,
bool waitForSync) {
v8::HandleScope scope;
// First get the initial data:
string const dbname(collection->_dbName);
string const collname(collection->_name);
triagens::rest::HttpResponse::HttpResponseCode responseCode;
string contentType;
string resultBody;
int error = triagens::arango::deleteDocumentOnCoordinator(
dbname, collname, key, rid, policy, waitForSync,
responseCode, contentType, resultBody);
if (error != TRI_ERROR_NO_ERROR) {
TRI_V8_EXCEPTION(scope, error);
}
// report what the DBserver told us: this could now be 200/202 or
// 404/412
TRI_json_t* json = TRI_JsonString(TRI_UNKNOWN_MEM_ZONE, resultBody.c_str());
if (responseCode >= triagens::rest::HttpResponse::BAD) {
if (!TRI_IsArrayJson(json)) {
TRI_V8_EXCEPTION(scope, TRI_ERROR_INTERNAL);
}
int errorNum = 0;
TRI_json_t* subjson = TRI_LookupArrayJson(json, "errorNum");
if (0 != subjson && TRI_IsNumberJson(subjson)) {
errorNum = static_cast<int>(subjson->_value._number);
}
string errorMessage;
subjson = TRI_LookupArrayJson(json, "errorMessage");
if (0 != subjson && TRI_IsStringJson(subjson)) {
errorMessage = string(subjson->_value._string.data,
subjson->_value._string.length-1);
}
TRI_V8_EXCEPTION_MESSAGE(scope, errorNum, errorMessage);
}
v8::Handle<v8::Value> ret = TRI_ObjectJson(json);
TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json);
return scope.Close(ret);
}
#endif
////////////////////////////////////////////////////////////////////////////////
/// @brief deletes a document
////////////////////////////////////////////////////////////////////////////////
@ -1753,8 +1807,6 @@ static v8::Handle<v8::Value> RemoveVocbaseCol (const bool useCollection,
CollectionNameResolver resolver(vocbase);
v8::Handle<v8::Value> err = TRI_ParseDocumentOrDocumentHandle(resolver, col, key, rid, argv[0]);
TRI_SHARDING_COLLECTION_NOT_YET_IMPLEMENTED(scope, col);
if (key == 0) {
TRI_V8_EXCEPTION(scope, TRI_ERROR_ARANGO_DOCUMENT_HANDLE_BAD);
}
@ -1764,6 +1816,12 @@ static v8::Handle<v8::Value> RemoveVocbaseCol (const bool useCollection,
return scope.Close(v8::ThrowException(err));
}
#ifdef TRI_ENABLE_CLUSTER
if (ServerState::instance()->isCoordinator()) {
return RemoveVocbaseCol_Coordinator(col, key, rid, policy, forceSync);
}
#endif
assert(col != 0);
assert(key != 0);

View File

@ -128,6 +128,7 @@
"ERROR_SHARD_GONE" : { "code" : 1465, "message" : "no responsible shard found" },
"ERROR_CLUSTER_CONNECTION_LOST" : { "code" : 1466, "message" : "cluster internal HTTP connection broken" },
"ERROR_CLUSTER_MUST_NOT_SPECIFY_KEY" : { "code" : 1467, "message" : "must not specify _key for this collection" },
"ERROR_CLUSTER_GOT_CONTRADICTING_ANSWERS" : { "code" : 1468, "message" : "got contradicting answers from different shards" },
"ERROR_QUERY_KILLED" : { "code" : 1500, "message" : "query killed" },
"ERROR_QUERY_PARSE" : { "code" : 1501, "message" : "%s" },
"ERROR_QUERY_EMPTY" : { "code" : 1502, "message" : "query is empty" },

View File

@ -128,6 +128,7 @@
"ERROR_SHARD_GONE" : { "code" : 1465, "message" : "no responsible shard found" },
"ERROR_CLUSTER_CONNECTION_LOST" : { "code" : 1466, "message" : "cluster internal HTTP connection broken" },
"ERROR_CLUSTER_MUST_NOT_SPECIFY_KEY" : { "code" : 1467, "message" : "must not specify _key for this collection" },
"ERROR_CLUSTER_GOT_CONTRADICTING_ANSWERS" : { "code" : 1468, "message" : "got contradicting answers from different shards" },
"ERROR_QUERY_KILLED" : { "code" : 1500, "message" : "query killed" },
"ERROR_QUERY_PARSE" : { "code" : 1501, "message" : "%s" },
"ERROR_QUERY_EMPTY" : { "code" : 1502, "message" : "query is empty" },

View File

@ -163,6 +163,7 @@ ERROR_CLUSTER_COULD_NOT_REMOVE_DATABASE_IN_CURRENT,1464,"could not remove databa
ERROR_SHARD_GONE,1465,"no responsible shard found","Will be raised when a coordinator in a cluster cannot determine the shard that is responsible for a given document."
ERROR_CLUSTER_CONNECTION_LOST,1466,"cluster internal HTTP connection broken","Will be raised when a coordinator in a cluster loses an HTTP connection to a DBserver in the cluster whilst transferring data."
ERROR_CLUSTER_MUST_NOT_SPECIFY_KEY,1467,"must not specify _key for this collection","Will be raised when a coordinator in a cluster finds that the _key attribute was specified in a sharded collection the uses not only _key as sharding attribute."
ERROR_CLUSTER_GOT_CONTRADICTING_ANSWERS,1468,"got contradicting answers from different shards","Will be raised if a coordinator in a cluster gets conflicting results from different shards, which should never happen."
################################################################################
## ArangoDB query errors

View File

@ -124,6 +124,7 @@ void TRI_InitialiseErrorMessages (void) {
REG_ERROR(ERROR_SHARD_GONE, "no responsible shard found");
REG_ERROR(ERROR_CLUSTER_CONNECTION_LOST, "cluster internal HTTP connection broken");
REG_ERROR(ERROR_CLUSTER_MUST_NOT_SPECIFY_KEY, "must not specify _key for this collection");
REG_ERROR(ERROR_CLUSTER_GOT_CONTRADICTING_ANSWERS, "got contradicting answers from different shards");
REG_ERROR(ERROR_QUERY_KILLED, "query killed");
REG_ERROR(ERROR_QUERY_PARSE, "%s");
REG_ERROR(ERROR_QUERY_EMPTY, "query is empty");

View File

@ -290,6 +290,9 @@ extern "C" {
/// Will be raised when a coordinator in a cluster finds that the _key
/// attribute was specified in a sharded collection the uses not only _key as
/// sharding attribute.
/// - 1468: @LIT{got contradicting answers from different shards}
/// Will be raised if a coordinator in a cluster gets conflicting results
/// from different shards, which should never happen.
/// - 1500: @LIT{query killed}
/// Will be raised when a running query is killed by an explicit admin
/// command.
@ -1668,6 +1671,17 @@ void TRI_InitialiseErrorMessages (void);
#define TRI_ERROR_CLUSTER_MUST_NOT_SPECIFY_KEY (1467)
////////////////////////////////////////////////////////////////////////////////
/// @brief 1468: ERROR_CLUSTER_GOT_CONTRADICTING_ANSWERS
///
/// got contradicting answers from different shards
///
/// Will be raised if a coordinator in a cluster gets conflicting results from
/// different shards, which should never happen.
////////////////////////////////////////////////////////////////////////////////
#define TRI_ERROR_CLUSTER_GOT_CONTRADICTING_ANSWERS (1468)
////////////////////////////////////////////////////////////////////////////////
/// @brief 1500: ERROR_QUERY_KILLED
///