diff --git a/arangod/Cluster/ClusterMethods.cpp b/arangod/Cluster/ClusterMethods.cpp index 100a755f45..47d5c728ef 100644 --- a/arangod/Cluster/ClusterMethods.cpp +++ b/arangod/Cluster/ClusterMethods.cpp @@ -141,6 +141,141 @@ int createDocumentOnCoordinator ( return TRI_ERROR_NO_ERROR; } +//////////////////////////////////////////////////////////////////////////////// +/// @brief deletes a document in a coordinator +//////////////////////////////////////////////////////////////////////////////// + +int deleteDocumentOnCoordinator ( + string const& dbname, + string const& collname, + string const& key, + TRI_voc_rid_t const rev, + TRI_doc_update_policy_e policy, + bool waitForSync, + triagens::rest::HttpResponse::HttpResponseCode& responseCode, + string& contentType, + string& resultBody) { + + // Set a few variables needed for our work: + ClusterInfo* ci = ClusterInfo::instance(); + ClusterComm* cc = ClusterComm::instance(); + + // First determine the collection ID from the name: + CollectionInfo collinfo = ci->getCollection(dbname, collname); + if (collinfo.empty()) { + return TRI_ERROR_ARANGO_COLLECTION_NOT_FOUND; + } + string collid = StringUtils::itoa(collinfo.id()); + + // If _key is the one and only sharding attribute, we can do this quickly, + // because we can easily determine which shard is responsible for the + // document. Otherwise we have to contact all shards and ask them to + // delete the document. All but one will not know it. + // Now find the responsible shard: + TRI_json_t* json = TRI_CreateArrayJson(TRI_UNKNOWN_MEM_ZONE); + if (0 == json) { + return TRI_ERROR_OUT_OF_MEMORY; + } + TRI_Insert2ArrayJson(TRI_UNKNOWN_MEM_ZONE, json, "_key", + TRI_CreateStringReference2Json(TRI_UNKNOWN_MEM_ZONE, + key.c_str(), key.size())); + bool usesDefaultShardingAttributes; + ShardID shardID = ci->getResponsibleShard( collid, json, true, + usesDefaultShardingAttributes ); + TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json); + + // Some stuff to prepare cluster-intern requests: + ClusterCommResult* res; + string revstr; + if (rev != 0) { + revstr = "&rev="+StringUtils::itoa(rev); + } + string policystr; + if (policy == TRI_DOC_UPDATE_LAST_WRITE) { + policystr = "&policy=last"; + } + + if (usesDefaultShardingAttributes) { + // OK, this is the fast method, we only have to ask one shard: + if (shardID == "") { + return TRI_ERROR_SHARD_GONE; + } + + // Send a synchronous request to that shard using ClusterComm: + map headers; + res = cc->syncRequest("", TRI_NewTickServer(), "shard:"+shardID, + triagens::rest::HttpRequest::HTTP_REQUEST_DELETE, + "/_db/"+dbname+"/_api/document/"+ + StringUtils::urlEncode(shardID)+"/"+key+ + "?waitForSync="+(waitForSync ? "true" : "false")+ + revstr+policystr, NULL, 0, headers, 60.0); + + if (res->status == CL_COMM_TIMEOUT) { + // No reply, we give up: + delete res; + return TRI_ERROR_CLUSTER_TIMEOUT; + } + if (res->status == CL_COMM_ERROR) { + // This could be a broken connection or an Http error: + if (!res->result->isComplete()) { + delete res; + return TRI_ERROR_CLUSTER_CONNECTION_LOST; + } + // In this case a proper HTTP error was reported by the DBserver, + // this can be 400 or 404, we simply forward the result. + // We intentionally fall through here. + } + responseCode = static_cast + (res->result->getHttpReturnCode()); + contentType = res->result->getContentType(false); + resultBody = res->result->getBody().str(); + delete res; + return TRI_ERROR_NO_ERROR; + } + + // If we get here, the sharding attributes are not only _key, therefore + // we have to contact everybody: + map shards = collinfo.shardIds(); + map::iterator it; + CoordTransactionID coordTransactionID = TRI_NewTickServer(); + for (it = shards.begin(); it != shards.end(); ++it) { + map* headers = new map; + res = cc->asyncRequest("", coordTransactionID, "shard:"+it->first, + triagens::rest::HttpRequest::HTTP_REQUEST_DELETE, + "/_db/"+dbname+"/_api/document/"+ + StringUtils::urlEncode(it->first)+"/"+key+ + "?waitForSync="+(waitForSync ? "true" : "false")+ + revstr+policystr, NULL, 0, headers, NULL, 60.0); + delete res; + } + // Now listen to the results: + int count; + int nrok = 0; + for (count = shards.size(); count > 0; count--) { + res = cc->wait( "", coordTransactionID, 0, "", 0.0); + if (res->status == CL_COMM_RECEIVED) { + if (res->answer_code < triagens::rest::HttpResponse::BAD) { + nrok++; + responseCode = res->answer_code; + contentType = res->answer->header("content-type"); + resultBody = string(res->answer->body(), res->answer->bodySize()); + } + } + delete res; + } + if (nrok == 1) { + return TRI_ERROR_NO_ERROR; + } + if (nrok > 1) { + return TRI_ERROR_CLUSTER_GOT_CONTRADICTING_ANSWERS; + } + responseCode = triagens::rest::HttpResponse::NOT_FOUND; + contentType = "application/json; charset=utf-8"; + resultBody = "{\"error\":true,\"errorMessage\":\"document not found\"," + "\"errorNum\":404,\"code\":404}"; + return TRI_ERROR_HTTP_NOT_FOUND; +} + } // namespace arango } // namespace triagens diff --git a/arangod/Cluster/ClusterMethods.h b/arangod/Cluster/ClusterMethods.h index 75b7ae33b0..5fdaec7954 100644 --- a/arangod/Cluster/ClusterMethods.h +++ b/arangod/Cluster/ClusterMethods.h @@ -36,6 +36,7 @@ #include "SimpleHttpClient/SimpleHttpResult.h" #include "SimpleHttpClient/SimpleHttpClient.h" #include "VocBase/voc-types.h" +#include "VocBase/update-policy.h" #include "Cluster/AgencyComm.h" #include "Cluster/ClusterInfo.h" @@ -58,6 +59,20 @@ namespace triagens { string& contentType, string& resultBody); +//////////////////////////////////////////////////////////////////////////////// +/// @brief delete a document in a coordinator +//////////////////////////////////////////////////////////////////////////////// + + int deleteDocumentOnCoordinator ( + string const& dbname, + string const& collname, + string const& key, + TRI_voc_rid_t const rev, + TRI_doc_update_policy_e policy, + bool waitForSync, + triagens::rest::HttpResponse::HttpResponseCode& responseCode, + string& contentType, + string& resultBody); // ----------------------------------------------------------------------------- // --SECTION-- public functions diff --git a/arangod/RestHandler/RestDocumentHandler.cpp b/arangod/RestHandler/RestDocumentHandler.cpp index 14712e7223..1c94383f70 100644 --- a/arangod/RestHandler/RestDocumentHandler.cpp +++ b/arangod/RestHandler/RestDocumentHandler.cpp @@ -1511,6 +1511,13 @@ bool RestDocumentHandler::deleteDocument () { return false; } +#ifdef TRI_ENABLE_CLUSTER + if (ServerState::instance()->isCoordinator()) { + return deleteDocumentCoordinator(collection, key, revision, policy, + waitForSync); + } +#endif + SingleCollectionWriteTransaction, 1> trx(_vocbase, _resolver, collection); // ............................................................................. @@ -1554,6 +1561,39 @@ bool RestDocumentHandler::deleteDocument () { return true; } +//////////////////////////////////////////////////////////////////////////////// +/// @brief deletes a document, coordinator case in a cluster +//////////////////////////////////////////////////////////////////////////////// + +#ifdef TRI_ENABLE_CLUSTER +bool RestDocumentHandler::deleteDocumentCoordinator ( + string const& collname, + string const& key, + TRI_voc_rid_t const rev, + TRI_doc_update_policy_e policy, + bool waitForSync) { + string const& dbname = _request->originalDatabaseName(); + triagens::rest::HttpResponse::HttpResponseCode responseCode; + string contentType; + string resultBody; + + int error = triagens::arango::deleteDocumentOnCoordinator( + dbname, collname, key, rev, policy, waitForSync, + responseCode, contentType, resultBody); + + if (error != TRI_ERROR_NO_ERROR) { + generateTransactionError(collname, error); + return false; + } + // Essentially return the response we got from the DBserver, be it + // OK or an error: + _response = createResponse(responseCode); + _response->setContentType(contentType); + _response->body().appendText(resultBody.c_str(), resultBody.size()); + return responseCode >= triagens::rest::HttpResponse::BAD; +} +#endif + //////////////////////////////////////////////////////////////////////////////// /// @} //////////////////////////////////////////////////////////////////////////////// diff --git a/arangod/RestHandler/RestDocumentHandler.h b/arangod/RestHandler/RestDocumentHandler.h index 909ad3668d..9c278fbd1f 100644 --- a/arangod/RestHandler/RestDocumentHandler.h +++ b/arangod/RestHandler/RestDocumentHandler.h @@ -117,9 +117,9 @@ namespace triagens { /// @brief creates a document, coordinator case in a cluster //////////////////////////////////////////////////////////////////////////////// -bool createDocumentCoordinator (char const* collection, - bool waitForSync, - TRI_json_t* json); + bool createDocumentCoordinator (char const* collection, + bool waitForSync, + TRI_json_t* json); //////////////////////////////////////////////////////////////////////////////// /// @brief creates a document @@ -175,6 +175,16 @@ bool createDocumentCoordinator (char const* collection, virtual bool deleteDocument (); +//////////////////////////////////////////////////////////////////////////////// +/// @brief delete a document, coordinator case in a cluster +//////////////////////////////////////////////////////////////////////////////// + + bool deleteDocumentCoordinator (string const& collname, + string const& key, + TRI_voc_rid_t const rev, + TRI_doc_update_policy_e policy, + bool waitForSync); + }; } } diff --git a/arangod/RestHandler/RestEdgeHandler.cpp b/arangod/RestHandler/RestEdgeHandler.cpp index 20c10ab215..621553501d 100644 --- a/arangod/RestHandler/RestEdgeHandler.cpp +++ b/arangod/RestHandler/RestEdgeHandler.cpp @@ -34,6 +34,12 @@ #include "VocBase/document-collection.h" #include "VocBase/edge-collection.h" +#ifdef TRI_ENABLE_CLUSTER +#include "Cluster/ServerState.h" +#include "Cluster/ClusterInfo.h" +#include "Cluster/ClusterMethods.h" +#endif + using namespace std; using namespace triagens::basics; using namespace triagens::rest; @@ -218,6 +224,14 @@ bool RestEdgeHandler::createDocument () { return false; } +#ifdef TRI_ENABLE_CLUSTER + if (ServerState::instance()->isCoordinator()) { + // json will be freed inside! + return createDocumentCoordinator(collection, waitForSync, json, + from, to); + } +#endif + if (! checkCreateCollection(collection, getCollectionType())) { TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json); return false; @@ -318,6 +332,43 @@ bool RestEdgeHandler::createDocument () { return true; } +//////////////////////////////////////////////////////////////////////////////// +/// @brief creates a document (an edge), coordinator case in a cluster +//////////////////////////////////////////////////////////////////////////////// + +#ifdef TRI_ENABLE_CLUSTER +bool RestEdgeHandler::createDocumentCoordinator (string const& collname, + bool waitForSync, + TRI_json_t* json, + char const* from, + char const* to) { + string const& dbname = _request->originalDatabaseName(); + triagens::rest::HttpResponse::HttpResponseCode responseCode; + string contentType; + string resultBody; + + // Not yet implemented: + generateTransactionError(collname.c_str(), TRI_ERROR_INTERNAL); + return false; + + int error = triagens::arango::createDocumentOnCoordinator( + dbname, collname, waitForSync, json, + responseCode, contentType, resultBody); + + if (error != TRI_ERROR_NO_ERROR) { + generateTransactionError(collname.c_str(), error); + return false; + } + // Essentially return the response we got from the DBserver, be it + // OK or an error: + _response = createResponse(responseCode); + _response->setContentType(contentType); + _response->body().appendText(resultBody.c_str(), resultBody.size()); + return responseCode >= triagens::rest::HttpResponse::BAD; +} +#endif + + //////////////////////////////////////////////////////////////////////////////// /// @brief reads a single edge /// diff --git a/arangod/RestHandler/RestEdgeHandler.h b/arangod/RestHandler/RestEdgeHandler.h index 20e7f8caa4..968c489e4f 100644 --- a/arangod/RestHandler/RestEdgeHandler.h +++ b/arangod/RestHandler/RestEdgeHandler.h @@ -113,6 +113,17 @@ namespace triagens { bool createDocument (); +//////////////////////////////////////////////////////////////////////////////// +/// @brief creates a document (an edge), coordinator case in a cluster +//////////////////////////////////////////////////////////////////////////////// + +#ifdef TRI_ENABLE_CLUSTER + bool createDocumentCoordinator (string const& collname, + bool waitForSync, + TRI_json_t* json, + char const* from, + char const* to); +#endif }; } } diff --git a/arangod/V8Server/v8-vocbase.cpp b/arangod/V8Server/v8-vocbase.cpp index dfbef62c5d..1bf17ad148 100644 --- a/arangod/V8Server/v8-vocbase.cpp +++ b/arangod/V8Server/v8-vocbase.cpp @@ -1709,6 +1709,60 @@ static v8::Handle UpdateVocbaseCol (const bool useCollection, return scope.Close(result); } +//////////////////////////////////////////////////////////////////////////////// +/// @brief deletes a document, coordinator case in a cluster +//////////////////////////////////////////////////////////////////////////////// + +#ifdef TRI_ENABLE_CLUSTER +static v8::Handle RemoveVocbaseCol_Coordinator ( + TRI_vocbase_col_t const* collection, + TRI_voc_key_t key, + TRI_voc_rid_t rid, + TRI_doc_update_policy_e policy, + bool waitForSync) { + v8::HandleScope scope; + + // First get the initial data: + string const dbname(collection->_dbName); + string const collname(collection->_name); + + triagens::rest::HttpResponse::HttpResponseCode responseCode; + string contentType; + string resultBody; + + int error = triagens::arango::deleteDocumentOnCoordinator( + dbname, collname, key, rid, policy, waitForSync, + responseCode, contentType, resultBody); + + if (error != TRI_ERROR_NO_ERROR) { + TRI_V8_EXCEPTION(scope, error); + } + // report what the DBserver told us: this could now be 200/202 or + // 404/412 + TRI_json_t* json = TRI_JsonString(TRI_UNKNOWN_MEM_ZONE, resultBody.c_str()); + if (responseCode >= triagens::rest::HttpResponse::BAD) { + if (!TRI_IsArrayJson(json)) { + TRI_V8_EXCEPTION(scope, TRI_ERROR_INTERNAL); + } + int errorNum = 0; + TRI_json_t* subjson = TRI_LookupArrayJson(json, "errorNum"); + if (0 != subjson && TRI_IsNumberJson(subjson)) { + errorNum = static_cast(subjson->_value._number); + } + string errorMessage; + subjson = TRI_LookupArrayJson(json, "errorMessage"); + if (0 != subjson && TRI_IsStringJson(subjson)) { + errorMessage = string(subjson->_value._string.data, + subjson->_value._string.length-1); + } + TRI_V8_EXCEPTION_MESSAGE(scope, errorNum, errorMessage); + } + v8::Handle ret = TRI_ObjectJson(json); + TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json); + return scope.Close(ret); +} +#endif + //////////////////////////////////////////////////////////////////////////////// /// @brief deletes a document //////////////////////////////////////////////////////////////////////////////// @@ -1753,8 +1807,6 @@ static v8::Handle RemoveVocbaseCol (const bool useCollection, CollectionNameResolver resolver(vocbase); v8::Handle err = TRI_ParseDocumentOrDocumentHandle(resolver, col, key, rid, argv[0]); - TRI_SHARDING_COLLECTION_NOT_YET_IMPLEMENTED(scope, col); - if (key == 0) { TRI_V8_EXCEPTION(scope, TRI_ERROR_ARANGO_DOCUMENT_HANDLE_BAD); } @@ -1764,6 +1816,12 @@ static v8::Handle RemoveVocbaseCol (const bool useCollection, return scope.Close(v8::ThrowException(err)); } +#ifdef TRI_ENABLE_CLUSTER + if (ServerState::instance()->isCoordinator()) { + return RemoveVocbaseCol_Coordinator(col, key, rid, policy, forceSync); + } +#endif + assert(col != 0); assert(key != 0); diff --git a/js/apps/system/aardvark/frontend/js/bootstrap/errors.js b/js/apps/system/aardvark/frontend/js/bootstrap/errors.js index 1aaed7dd6c..6ee4a2b20b 100644 --- a/js/apps/system/aardvark/frontend/js/bootstrap/errors.js +++ b/js/apps/system/aardvark/frontend/js/bootstrap/errors.js @@ -128,6 +128,7 @@ "ERROR_SHARD_GONE" : { "code" : 1465, "message" : "no responsible shard found" }, "ERROR_CLUSTER_CONNECTION_LOST" : { "code" : 1466, "message" : "cluster internal HTTP connection broken" }, "ERROR_CLUSTER_MUST_NOT_SPECIFY_KEY" : { "code" : 1467, "message" : "must not specify _key for this collection" }, + "ERROR_CLUSTER_GOT_CONTRADICTING_ANSWERS" : { "code" : 1468, "message" : "got contradicting answers from different shards" }, "ERROR_QUERY_KILLED" : { "code" : 1500, "message" : "query killed" }, "ERROR_QUERY_PARSE" : { "code" : 1501, "message" : "%s" }, "ERROR_QUERY_EMPTY" : { "code" : 1502, "message" : "query is empty" }, diff --git a/js/common/bootstrap/errors.js b/js/common/bootstrap/errors.js index 1aaed7dd6c..6ee4a2b20b 100644 --- a/js/common/bootstrap/errors.js +++ b/js/common/bootstrap/errors.js @@ -128,6 +128,7 @@ "ERROR_SHARD_GONE" : { "code" : 1465, "message" : "no responsible shard found" }, "ERROR_CLUSTER_CONNECTION_LOST" : { "code" : 1466, "message" : "cluster internal HTTP connection broken" }, "ERROR_CLUSTER_MUST_NOT_SPECIFY_KEY" : { "code" : 1467, "message" : "must not specify _key for this collection" }, + "ERROR_CLUSTER_GOT_CONTRADICTING_ANSWERS" : { "code" : 1468, "message" : "got contradicting answers from different shards" }, "ERROR_QUERY_KILLED" : { "code" : 1500, "message" : "query killed" }, "ERROR_QUERY_PARSE" : { "code" : 1501, "message" : "%s" }, "ERROR_QUERY_EMPTY" : { "code" : 1502, "message" : "query is empty" }, diff --git a/lib/BasicsC/errors.dat b/lib/BasicsC/errors.dat index 17300c23a7..8c3f08ff0a 100755 --- a/lib/BasicsC/errors.dat +++ b/lib/BasicsC/errors.dat @@ -163,6 +163,7 @@ ERROR_CLUSTER_COULD_NOT_REMOVE_DATABASE_IN_CURRENT,1464,"could not remove databa ERROR_SHARD_GONE,1465,"no responsible shard found","Will be raised when a coordinator in a cluster cannot determine the shard that is responsible for a given document." ERROR_CLUSTER_CONNECTION_LOST,1466,"cluster internal HTTP connection broken","Will be raised when a coordinator in a cluster loses an HTTP connection to a DBserver in the cluster whilst transferring data." ERROR_CLUSTER_MUST_NOT_SPECIFY_KEY,1467,"must not specify _key for this collection","Will be raised when a coordinator in a cluster finds that the _key attribute was specified in a sharded collection the uses not only _key as sharding attribute." +ERROR_CLUSTER_GOT_CONTRADICTING_ANSWERS,1468,"got contradicting answers from different shards","Will be raised if a coordinator in a cluster gets conflicting results from different shards, which should never happen." ################################################################################ ## ArangoDB query errors diff --git a/lib/BasicsC/voc-errors.c b/lib/BasicsC/voc-errors.c index 17247e26a9..3381dc366a 100644 --- a/lib/BasicsC/voc-errors.c +++ b/lib/BasicsC/voc-errors.c @@ -124,6 +124,7 @@ void TRI_InitialiseErrorMessages (void) { REG_ERROR(ERROR_SHARD_GONE, "no responsible shard found"); REG_ERROR(ERROR_CLUSTER_CONNECTION_LOST, "cluster internal HTTP connection broken"); REG_ERROR(ERROR_CLUSTER_MUST_NOT_SPECIFY_KEY, "must not specify _key for this collection"); + REG_ERROR(ERROR_CLUSTER_GOT_CONTRADICTING_ANSWERS, "got contradicting answers from different shards"); REG_ERROR(ERROR_QUERY_KILLED, "query killed"); REG_ERROR(ERROR_QUERY_PARSE, "%s"); REG_ERROR(ERROR_QUERY_EMPTY, "query is empty"); diff --git a/lib/BasicsC/voc-errors.h b/lib/BasicsC/voc-errors.h index f977217104..95bdf69820 100644 --- a/lib/BasicsC/voc-errors.h +++ b/lib/BasicsC/voc-errors.h @@ -290,6 +290,9 @@ extern "C" { /// Will be raised when a coordinator in a cluster finds that the _key /// attribute was specified in a sharded collection the uses not only _key as /// sharding attribute. +/// - 1468: @LIT{got contradicting answers from different shards} +/// Will be raised if a coordinator in a cluster gets conflicting results +/// from different shards, which should never happen. /// - 1500: @LIT{query killed} /// Will be raised when a running query is killed by an explicit admin /// command. @@ -1668,6 +1671,17 @@ void TRI_InitialiseErrorMessages (void); #define TRI_ERROR_CLUSTER_MUST_NOT_SPECIFY_KEY (1467) +//////////////////////////////////////////////////////////////////////////////// +/// @brief 1468: ERROR_CLUSTER_GOT_CONTRADICTING_ANSWERS +/// +/// got contradicting answers from different shards +/// +/// Will be raised if a coordinator in a cluster gets conflicting results from +/// different shards, which should never happen. +//////////////////////////////////////////////////////////////////////////////// + +#define TRI_ERROR_CLUSTER_GOT_CONTRADICTING_ANSWERS (1468) + //////////////////////////////////////////////////////////////////////////////// /// @brief 1500: ERROR_QUERY_KILLED ///