diff --git a/arangod/Aql/ExecutionEngine.cpp b/arangod/Aql/ExecutionEngine.cpp index e8586e0956..c9cba05aa1 100644 --- a/arangod/Aql/ExecutionEngine.cpp +++ b/arangod/Aql/ExecutionEngine.cpp @@ -37,6 +37,7 @@ #include "Utils/Exception.h" using namespace triagens::aql; +using namespace triagens::arango; using Json = triagens::basics::Json; //////////////////////////////////////////////////////////////////////////////// @@ -162,7 +163,9 @@ ExecutionEngine::ExecutionEngine (Query* query) _blocks(), _root(nullptr), _query(query), - _wasShutdown(false) { + _wasShutdown(false), + _previouslyLockedShards(nullptr), + _lockedShards(nullptr) { _blocks.reserve(8); } @@ -425,6 +428,7 @@ struct CoordinatorInstanciator : public WalkerWorker { // itoa(ID of RemoteNode in original plan) + "_" + shardId // and the value is the // queryId on DBserver + // with a * appended, if it is a PART_MAIN query. // The second case is a query, which lives on the coordinator but is not // the main query. For these, we store // itoa(ID of RemoteNode in original plan) @@ -550,6 +554,7 @@ struct CoordinatorInstanciator : public WalkerWorker { "/_api/aql/instanciate"); auto headers = new std::map; + (*headers)["X-Arango-Nolock"] = shardId; // Prevent locking auto res = cc->asyncRequest("", coordTransactionID, "shard:" + shardId, @@ -599,7 +604,12 @@ struct CoordinatorInstanciator : public WalkerWorker { std::string theID = triagens::basics::StringUtils::itoa(info.idOfRemoteNode) + "_" + res->shardID; - queryIds.emplace(theID, queryId); + if (info.part == triagens::aql::PART_MAIN) { + queryIds.emplace(theID, queryId+"*"); + } + else { + queryIds.emplace(theID, queryId); + } } else { error += "DB SERVER ANSWERED WITH ERROR: "; @@ -731,12 +741,15 @@ struct CoordinatorInstanciator : public WalkerWorker { if (it == queryIds.end()) { THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "could not find query id in list"); } - + std::string idThere = it->second; + if (idThere.back() == '*') { + idThere.pop_back(); + } ExecutionBlock* r = new RemoteBlock(engine.get(), remoteNode, "shard:" + shardId, // server "", // ownName - (*it).second); // queryId + idThere); // queryId try { engine.get()->addBlock(r); @@ -926,6 +939,56 @@ ExecutionEngine* ExecutionEngine::instanciateFromPlan (QueryRegistry* queryRegis try { engine = inst.get()->buildEngines(); root = engine->root(); + // Now find all shards that take part: + if (Transaction::_makeNolockHeaders != nullptr) { + engine->_lockedShards = new std::unordered_set(*Transaction::_makeNolockHeaders); + engine->_previouslyLockedShards = Transaction::_makeNolockHeaders; + } + else { + engine->_lockedShards = new std::unordered_set(); + engine->_previouslyLockedShards = nullptr; + } + std::map forLocking; + for (auto& q : inst.get()->queryIds) { + std::string theId = q.first; + std::string queryId = q.second; + auto pos = theId.find('_'); + if (pos != std::string::npos) { + // So this is a remote one on a DBserver: + if (queryId.back() == '*') { // only the PART_MAIN one! + queryId.pop_back(); + std::string shardId = theId.substr(pos+1); + engine->_lockedShards->insert(shardId); + forLocking.emplace(shardId, queryId); + } + } + } + // Now lock them all in the right order: + for (auto& p : forLocking) { + std::string const& shardId = p.first; + std::string const& queryId = p.second; + // Lock shard on DBserver: + triagens::arango::CoordTransactionID coordTransactionID + = TRI_NewTickServer(); + auto cc = triagens::arango::ClusterComm::instance(); + TRI_vocbase_t* vocbase = query->vocbase(); + std::string const url("/_db/" + + triagens::basics::StringUtils::urlEncode(vocbase->_name) + + "/_api/aql/lock/" + queryId); + std::map headers; + auto res = cc->syncRequest("", coordTransactionID, + "shard:" + shardId, + triagens::rest::HttpRequest::HTTP_REQUEST_PUT, url, "{}", + headers, 30.0); + if (res->status != CL_COMM_SENT) { + delete res; + THROW_ARANGO_EXCEPTION_MESSAGE( + TRI_ERROR_QUERY_COLLECTION_LOCK_FAILED, + "could not lock all shards"); + } + delete res; + } + Transaction::_makeNolockHeaders = engine->_lockedShards; } catch (...) { // We need to destroy all queries that we have built and stuffed @@ -943,14 +1006,17 @@ ExecutionEngine* ExecutionEngine::instanciateFromPlan (QueryRegistry* queryRegis triagens::arango::CoordTransactionID coordTransactionID = TRI_NewTickServer(); auto cc = triagens::arango::ClusterComm::instance(); + if (queryId.back() == '*') { + queryId.pop_back(); + } std::string const url("/_db/" + triagens::basics::StringUtils::urlEncode(vocbase->_name) + "/_api/aql/shutdown/" + queryId); std::map headers; auto res = cc->syncRequest("", coordTransactionID, "shard:" + shardId, - triagens::rest::HttpRequest::HTTP_REQUEST_POST, url, "", - headers, 30.0); + triagens::rest::HttpRequest::HTTP_REQUEST_PUT, url, + "{\"code\": 0}", headers, 30.0); // Ignore result delete res; } diff --git a/arangod/Aql/ExecutionEngine.h b/arangod/Aql/ExecutionEngine.h index 46d0d6aed3..bf11473991 100644 --- a/arangod/Aql/ExecutionEngine.h +++ b/arangod/Aql/ExecutionEngine.h @@ -133,6 +133,19 @@ namespace triagens { int shutdown (int errorCode) { if (_root != nullptr && ! _wasShutdown) { + + // Take care of locking prevention measures in the cluster: + if (_lockedShards != nullptr) { + if (triagens::arango::Transaction::_makeNolockHeaders == + _lockedShards) { + triagens::arango::Transaction::_makeNolockHeaders + = _previouslyLockedShards; + } + delete _lockedShards; + _lockedShards = nullptr; + _previouslyLockedShards = nullptr; + } + // prevent a duplicate shutdown int res = _root->shutdown(errorCode); _wasShutdown = true; @@ -218,6 +231,14 @@ namespace triagens { ExecutionStats _stats; +//////////////////////////////////////////////////////////////////////////////// +/// @brief _lockedShards +//////////////////////////////////////////////////////////////////////////////// + + std::unordered_set* lockedShards () { + return _lockedShards; + } + // ----------------------------------------------------------------------------- // --SECTION-- private variables // ----------------------------------------------------------------------------- @@ -247,6 +268,20 @@ namespace triagens { //////////////////////////////////////////////////////////////////////////////// bool _wasShutdown; + +//////////////////////////////////////////////////////////////////////////////// +/// @brief _previouslyLockedShards, this is read off at instanciating +/// time from a thread local variable +//////////////////////////////////////////////////////////////////////////////// + + std::unordered_set* _previouslyLockedShards; + +//////////////////////////////////////////////////////////////////////////////// +/// @brief _lockedShards, these are the shards we have locked for our query +//////////////////////////////////////////////////////////////////////////////// + + std::unordered_set* _lockedShards; + }; } diff --git a/arangod/Aql/QueryRegistry.cpp b/arangod/Aql/QueryRegistry.cpp index 46031c2ee3..3d367f73de 100644 --- a/arangod/Aql/QueryRegistry.cpp +++ b/arangod/Aql/QueryRegistry.cpp @@ -28,8 +28,10 @@ #include "Aql/QueryRegistry.h" #include "Basics/WriteLocker.h" #include "Basics/ReadLocker.h" +#include "Aql/ExecutionEngine.h" using namespace triagens::aql; +using namespace triagens::arango; // ----------------------------------------------------------------------------- // --SECTION-- the QueryRegistry class @@ -112,6 +114,16 @@ void QueryRegistry::insert (QueryId id, // Also, we need to count down the debugging counters for transactions: triagens::arango::TransactionBase::increaseNumbers(-1, -1); + + // If we have set _makeNolockHeaders, we need to unset it: + if (Transaction::_makeNolockHeaders != nullptr) { + if (Transaction::_makeNolockHeaders == query->engine()->lockedShards()) { + Transaction::_makeNolockHeaders = nullptr; + } + else { + LOG_WARNING("Found strange lockedShards in thread!"); + } + } } else { THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, @@ -146,6 +158,16 @@ Query* QueryRegistry::open (TRI_vocbase_t* vocbase, // We need to count up the debugging counters for transactions: triagens::arango::TransactionBase::increaseNumbers(1, 1); + // If we had set _makeNolockHeaders, we need to reset it: + if (qi->_query->engine()->lockedShards() != nullptr) { + if (Transaction::_makeNolockHeaders == nullptr) { + Transaction::_makeNolockHeaders = qi->_query->engine()->lockedShards(); + } + else { + LOG_WARNING("Found strange lockedShards in thread, not overwriting!"); + } + } + return qi->_query; } @@ -175,6 +197,16 @@ void QueryRegistry::close (TRI_vocbase_t* vocbase, QueryId id, double ttl) { // We need to count down the debugging counters for transactions: triagens::arango::TransactionBase::increaseNumbers(-1, -1); + // If we have set _makeNolockHeaders, we need to unset it: + if (Transaction::_makeNolockHeaders != nullptr) { + if (Transaction::_makeNolockHeaders == qi->_query->engine()->lockedShards()) { + Transaction::_makeNolockHeaders = nullptr; + } + else { + LOG_WARNING("Found strange lockedShards in thread!"); + } + } + qi->_isOpen = false; qi->_expires = TRI_microtime() + qi->_timeToLive; } @@ -205,6 +237,15 @@ void QueryRegistry::destroy (std::string const& vocbase, if (! qi->_isOpen) { // We need to count up the debugging counters for transactions: triagens::arango::TransactionBase::increaseNumbers(1, 1); + // If we had set _makeNolockHeaders, we need to reset it: + if (qi->_query->engine()->lockedShards() != nullptr) { + if (Transaction::_makeNolockHeaders == nullptr) { + Transaction::_makeNolockHeaders = qi->_query->engine()->lockedShards(); + } + else { + LOG_WARNING("Found strange lockedShards in thread, not overwriting!"); + } + } } if (errorCode == TRI_ERROR_NO_ERROR) { diff --git a/arangod/Aql/RestAqlHandler.cpp b/arangod/Aql/RestAqlHandler.cpp index d5f5d08100..d098e319b0 100644 --- a/arangod/Aql/RestAqlHandler.cpp +++ b/arangod/Aql/RestAqlHandler.cpp @@ -70,7 +70,7 @@ const std::string RestAqlHandler::QUEUE_NAME = "STANDARD"; RestAqlHandler::RestAqlHandler (triagens::rest::HttpRequest* request, std::pair* pair) - : RestBaseHandler(request), + : RestVocbaseBaseHandler(request), _applicationV8(pair->first), _context(static_cast(request->getRequestContext())), _vocbase(_context->getVocbase()), @@ -357,7 +357,7 @@ void RestAqlHandler::createQueryFromString () { //////////////////////////////////////////////////////////////////////////////// /// @brief PUT method for /_api/aql//, this is using /// the part of the cursor API with side effects. -/// : can be "getSome" or "skip" or "initializeCursor" or +/// : can be "lock" or "getSome" or "skip" or "initializeCursor" or /// "shutdown". /// The body must be a Json with the following attributes: /// For the "getSome" operation one has to give: @@ -397,8 +397,8 @@ void RestAqlHandler::createQueryFromString () { /// "items": This is a serialised AqlItemBlock with usually only one row /// and the correct number of columns. /// "pos": The number of the row in "items" to take, usually 0. -/// For the "shutdown" operation no additional arguments are required and -/// an empty JSON object in the body is OK. +/// For the "shutdown" and "lock" operations no additional arguments are +/// required and an empty JSON object in the body is OK. /// All operations allow to set the HTTP header "Shard-ID:". If this is /// set, then the root block of the stored query must be a ScatterBlock /// and the shard ID is given as an additional argument to the ScatterBlock's @@ -596,7 +596,6 @@ triagens::rest::HttpHandler::status_t RestAqlHandler::execute () { // extract the sub-request type HttpRequest::HttpRequestType type = _request->requestType(); - // execute one of the CRUD methods switch (type) { case HttpRequest::HTTP_REQUEST_POST: { @@ -720,7 +719,21 @@ void RestAqlHandler::handleUseQuery (std::string const& operation, Json answerBody(Json::Object, 3); - if (operation == "getSome") { + if (operation == "lock") { + int res = TRI_ERROR_INTERNAL; + try { + res = query->trx()->lockCollections(); + } + catch (...) { + LOG_ERROR("lock lead to an exception"); + generateError(HttpResponse::SERVER_ERROR, TRI_ERROR_HTTP_SERVER_ERROR, + "lock lead to an exception"); + return; + } + answerBody("error", res == TRI_ERROR_NO_ERROR ? Json(false) : Json(true)) + ("code", Json(static_cast(res))); + } + else if (operation == "getSome") { auto atLeast = JsonHelper::getNumericValue(queryJson.json(), "atLeast", 1); auto atMost = JsonHelper::getNumericValue(queryJson.json(), diff --git a/arangod/Aql/RestAqlHandler.h b/arangod/Aql/RestAqlHandler.h index fb1d71823b..4ba8cdcc44 100644 --- a/arangod/Aql/RestAqlHandler.h +++ b/arangod/Aql/RestAqlHandler.h @@ -35,6 +35,7 @@ #include "Admin/RestBaseHandler.h" #include "V8Server/ApplicationV8.h" #include "RestServer/VocbaseContext.h" +#include "RestHandler/RestVocbaseBaseHandler.h" #include "Aql/QueryRegistry.h" #include "Aql/types.h" @@ -55,7 +56,7 @@ namespace triagens { /// @brief shard control request handler //////////////////////////////////////////////////////////////////////////////// - class RestAqlHandler : public admin::RestBaseHandler { + class RestAqlHandler : public arango::RestVocbaseBaseHandler { // ----------------------------------------------------------------------------- // --SECTION-- constructors and destructors diff --git a/arangod/Cluster/ClusterComm.cpp b/arangod/Cluster/ClusterComm.cpp index 2621f7f0f8..0701b3fc67 100644 --- a/arangod/Cluster/ClusterComm.cpp +++ b/arangod/Cluster/ClusterComm.cpp @@ -35,6 +35,7 @@ #include "Basics/StringUtils.h" #include "SimpleHttpClient/ConnectionManager.h" #include "Dispatcher/DispatcherThread.h" +#include "Utils/Transaction.h" #include "VocBase/server.h" @@ -194,6 +195,12 @@ ClusterCommResult* ClusterComm::asyncRequest ( op->shardID = destination.substr(6); op->serverID = ClusterInfo::instance()->getResponsibleServer(op->shardID); LOG_DEBUG("Responsible server: %s", op->serverID.c_str()); + if (triagens::arango::Transaction::_makeNolockHeaders != nullptr) { + auto it = triagens::arango::Transaction::_makeNolockHeaders->find(op->shardID); + if (it != triagens::arango::Transaction::_makeNolockHeaders->end()) { + (*headerFields)["X-Arango-Nolock"] = op->shardID; + } + } } else if (destination.substr(0,7) == "server:") { op->shardID = ""; @@ -280,6 +287,8 @@ ClusterCommResult* ClusterComm::syncRequest ( map const& headerFields, ClusterCommTimeout timeout) { + map headersCopy(headerFields); + ClusterCommResult* res = new ClusterCommResult(); res->clientTransactionID = clientTransactionID; res->coordTransactionID = coordTransactionID; @@ -301,6 +310,12 @@ ClusterCommResult* ClusterComm::syncRequest ( res->status = CL_COMM_ERROR; return res; } + if (triagens::arango::Transaction::_makeNolockHeaders != nullptr) { + auto it = triagens::arango::Transaction::_makeNolockHeaders->find(res->shardID); + if (it != triagens::arango::Transaction::_makeNolockHeaders->end()) { + headersCopy["X-Arango-Nolock"] = res->shardID; + } + } } else if (destination.substr(0, 7) == "server:") { res->shardID = ""; @@ -346,8 +361,7 @@ ClusterCommResult* ClusterComm::syncRequest ( endTime - currentTime, false); client->keepConnectionOnDestruction(true); - map headersCopy(headerFields); - headersCopy.emplace(make_pair(string("Authorization"), ServerState::instance()->getAuthentication())); + headersCopy["Authorization"] = ServerState::instance()->getAuthentication(); #ifdef DEBUG_CLUSTER_COMM #ifdef TRI_ENABLE_MAINTAINER_MODE #if HAVE_BACKTRACE diff --git a/arangod/RestHandler/RestBatchHandler.cpp b/arangod/RestHandler/RestBatchHandler.cpp index 8af362c381..0f3273b999 100644 --- a/arangod/RestHandler/RestBatchHandler.cpp +++ b/arangod/RestHandler/RestBatchHandler.cpp @@ -184,7 +184,7 @@ RestBatchHandler::~RestBatchHandler () { /// @END_EXAMPLE_ARANGOSH_RUN //////////////////////////////////////////////////////////////////////////////// -Handler::status_t RestBatchHandler::execute() { +Handler::status_t RestBatchHandler::execute () { // extract the request type const HttpRequest::HttpRequestType type = _request->requestType(); @@ -304,6 +304,7 @@ Handler::status_t RestBatchHandler::execute() { Handler::status_t status(Handler::HANDLER_FAILED); do { + handler->prepareExecute(); try { status = handler->execute(); } @@ -319,6 +320,7 @@ Handler::status_t RestBatchHandler::execute() { triagens::basics::InternalError err("executeDirectHandler", __FILE__, __LINE__); handler->handleError(err); } + handler->finalizeExecute(); } while (status.status == Handler::HANDLER_REQUEUE); diff --git a/arangod/RestHandler/RestBatchHandler.h b/arangod/RestHandler/RestBatchHandler.h index 49cfebad57..618a2cf505 100644 --- a/arangod/RestHandler/RestBatchHandler.h +++ b/arangod/RestHandler/RestBatchHandler.h @@ -114,7 +114,7 @@ namespace triagens { /// {@inheritDoc} //////////////////////////////////////////////////////////////////////////////// - Handler::status_t execute(); + Handler::status_t execute (); // ----------------------------------------------------------------------------- // --SECTION-- private methods diff --git a/arangod/RestHandler/RestReplicationHandler.cpp b/arangod/RestHandler/RestReplicationHandler.cpp index eaf2f48eb5..a186861746 100644 --- a/arangod/RestHandler/RestReplicationHandler.cpp +++ b/arangod/RestHandler/RestReplicationHandler.cpp @@ -88,7 +88,7 @@ RestReplicationHandler::~RestReplicationHandler () { /// {@inheritDoc} //////////////////////////////////////////////////////////////////////////////// -Handler::status_t RestReplicationHandler::execute() { +Handler::status_t RestReplicationHandler::execute () { // extract the request type const HttpRequest::HttpRequestType type = _request->requestType(); diff --git a/arangod/RestHandler/RestReplicationHandler.h b/arangod/RestHandler/RestReplicationHandler.h index c98346ea5b..95d3f85c14 100644 --- a/arangod/RestHandler/RestReplicationHandler.h +++ b/arangod/RestHandler/RestReplicationHandler.h @@ -84,7 +84,7 @@ namespace triagens { /// {@inheritDoc} //////////////////////////////////////////////////////////////////////////////// - Handler::status_t execute(); + Handler::status_t execute (); // ----------------------------------------------------------------------------- // --SECTION-- public static methods diff --git a/arangod/RestHandler/RestUploadHandler.cpp b/arangod/RestHandler/RestUploadHandler.cpp index 5440bc6b2e..bd10a8e5bd 100644 --- a/arangod/RestHandler/RestUploadHandler.cpp +++ b/arangod/RestHandler/RestUploadHandler.cpp @@ -67,7 +67,7 @@ RestUploadHandler::~RestUploadHandler () { /// {@inheritDoc} //////////////////////////////////////////////////////////////////////////////// -Handler::status_t RestUploadHandler::execute() { +Handler::status_t RestUploadHandler::execute () { // extract the request type const HttpRequest::HttpRequestType type = _request->requestType(); diff --git a/arangod/RestHandler/RestUploadHandler.h b/arangod/RestHandler/RestUploadHandler.h index d9ebb03f74..c5092059a7 100644 --- a/arangod/RestHandler/RestUploadHandler.h +++ b/arangod/RestHandler/RestUploadHandler.h @@ -72,7 +72,7 @@ namespace triagens { /// {@inheritDoc} //////////////////////////////////////////////////////////////////////////////// - Handler::status_t execute(); + Handler::status_t execute (); }; } diff --git a/arangod/RestHandler/RestVocbaseBaseHandler.cpp b/arangod/RestHandler/RestVocbaseBaseHandler.cpp index 946ae8f7fb..8d10e7401c 100644 --- a/arangod/RestHandler/RestVocbaseBaseHandler.cpp +++ b/arangod/RestHandler/RestVocbaseBaseHandler.cpp @@ -105,7 +105,8 @@ const string RestVocbaseBaseHandler::QUEUE_NAME = "STANDARD"; RestVocbaseBaseHandler::RestVocbaseBaseHandler (HttpRequest* request) : RestBaseHandler(request), _context(static_cast(request->getRequestContext())), - _vocbase(_context->getVocbase()) { + _vocbase(_context->getVocbase()), + _nolockHeaderSet(nullptr) { } //////////////////////////////////////////////////////////////////////////////// @@ -656,6 +657,32 @@ int RestVocbaseBaseHandler::parseDocumentId (CollectionNameResolver const* resol return TRI_ERROR_NO_ERROR; } +//////////////////////////////////////////////////////////////////////////////// +/// @brief prepareExecute, to react to X-Arango-Nolock header +//////////////////////////////////////////////////////////////////////////////// + +void RestVocbaseBaseHandler::prepareExecute () { + bool found; + char const* shardId = _request->header("x-arango-nolock", found); + if (found) { + _nolockHeaderSet = new std::unordered_set(); + _nolockHeaderSet->insert(std::string(shardId)); + triagens::arango::Transaction::_makeNolockHeaders = _nolockHeaderSet; + } +} + +//////////////////////////////////////////////////////////////////////////////// +/// @brief finalizeExecute, to react to X-Arango-Nolock header +//////////////////////////////////////////////////////////////////////////////// + +void RestVocbaseBaseHandler::finalizeExecute () { + if (_nolockHeaderSet != nullptr) { + triagens::arango::Transaction::_makeNolockHeaders = nullptr; + delete _nolockHeaderSet; + _nolockHeaderSet = nullptr; + } +} + // ----------------------------------------------------------------------------- // --SECTION-- END-OF-FILE // ----------------------------------------------------------------------------- diff --git a/arangod/RestHandler/RestVocbaseBaseHandler.h b/arangod/RestHandler/RestVocbaseBaseHandler.h index 1f2308d69d..20c2f6905e 100644 --- a/arangod/RestHandler/RestVocbaseBaseHandler.h +++ b/arangod/RestHandler/RestVocbaseBaseHandler.h @@ -403,6 +403,30 @@ namespace triagens { std::string const& queue () const; +//////////////////////////////////////////////////////////////////////////////// +/// @brief prepareExecute, to react to X-Arango-Nolock header +//////////////////////////////////////////////////////////////////////////////// + + virtual void prepareExecute (); + +//////////////////////////////////////////////////////////////////////////////// +/// @brief finalizeExecute, to react to X-Arango-Nolock header +//////////////////////////////////////////////////////////////////////////////// + + virtual void finalizeExecute (); + +//////////////////////////////////////////////////////////////////////////////// +/// @brief _nolockHeaderFound +//////////////////////////////////////////////////////////////////////////////// + + bool _nolockHeaderFound; + +//////////////////////////////////////////////////////////////////////////////// +/// @brief _nolockHeaderFound +//////////////////////////////////////////////////////////////////////////////// + + std::unordered_set* _nolockHeaderSet; + }; } } diff --git a/arangod/Utils/AqlTransaction.h b/arangod/Utils/AqlTransaction.h index c590f220d3..d2d07d263a 100644 --- a/arangod/Utils/AqlTransaction.h +++ b/arangod/Utils/AqlTransaction.h @@ -179,6 +179,32 @@ namespace triagens { &_collections, false); } +//////////////////////////////////////////////////////////////////////////////// +/// @brief lockCollections, this is needed in a corner case in AQL: we need +/// to lock all shards in a controlled way when we set up a distributed +/// execution engine. To this end, we prevent the standard mechanism to +/// lock collections on the DBservers when we instanciate the query. Then, +/// in a second round, we need to lock the shards in exactly the right +/// order via an HTTP call. This method is used to implement that HTTP action. +//////////////////////////////////////////////////////////////////////////////// + + int lockCollections () { + auto trx = getInternals(); + size_t i = trx->_collections._length; + + while (i-- > 0) { + TRI_transaction_collection_t* trxCollection + = static_cast + (TRI_AtVectorPointer(&trx->_collections, i)); + int res = TRI_UnlockCollectionTransaction(trxCollection, + trxCollection->_accessType, 0); + if (res != TRI_ERROR_NO_ERROR) { + return res; + } + } + return TRI_ERROR_NO_ERROR; + } + //////////////////////////////////////////////////////////////////////////////// /// @brief keep a copy of the collections, this is needed for the clone /// operation diff --git a/arangod/Utils/Transaction.cpp b/arangod/Utils/Transaction.cpp index d5e64141c4..34d8b3225e 100644 --- a/arangod/Utils/Transaction.cpp +++ b/arangod/Utils/Transaction.cpp @@ -49,6 +49,14 @@ thread_local int TransactionBase::_numberTrxInScope = 0; thread_local int TransactionBase::_numberTrxActive = 0; #endif +//////////////////////////////////////////////////////////////////////////////// +/// @brief if this pointer is set to an actual set, then for each request +/// sent to a shardId using the ClusterComm library, an X-Arango-Nolock +/// header is generated. +//////////////////////////////////////////////////////////////////////////////// + +thread_local std::unordered_set* Transaction::_makeNolockHeaders = nullptr; + // ----------------------------------------------------------------------------- // --SECTION-- END-OF-FILE // ----------------------------------------------------------------------------- diff --git a/arangod/Utils/Transaction.h b/arangod/Utils/Transaction.h index e518b53f2a..e271c5dfc9 100644 --- a/arangod/Utils/Transaction.h +++ b/arangod/Utils/Transaction.h @@ -1504,6 +1504,14 @@ namespace triagens { TransactionContext* _transactionContext; +//////////////////////////////////////////////////////////////////////////////// +/// @brief makeNolockHeaders +//////////////////////////////////////////////////////////////////////////////// + + public: + + static thread_local std::unordered_set* _makeNolockHeaders; + }; } diff --git a/arangod/VocBase/document-collection.cpp b/arangod/VocBase/document-collection.cpp index 09b5e6a831..450829de10 100644 --- a/arangod/VocBase/document-collection.cpp +++ b/arangod/VocBase/document-collection.cpp @@ -856,6 +856,14 @@ static int CloneMarkerNoLegend (triagens::wal::Marker*& marker, //////////////////////////////////////////////////////////////////////////////// static int BeginRead (TRI_document_collection_t* document) { + if (triagens::arango::Transaction::_makeNolockHeaders != nullptr) { + std::string collName(document->_info._name); + auto it = triagens::arango::Transaction::_makeNolockHeaders->find(collName); + if (it != triagens::arango::Transaction::_makeNolockHeaders->end()) { + // do not lock by command + return TRI_ERROR_NO_ERROR; + } + } TRI_READ_LOCK_DOCUMENTS_INDEXES_PRIMARY_COLLECTION(document); return TRI_ERROR_NO_ERROR; @@ -866,6 +874,14 @@ static int BeginRead (TRI_document_collection_t* document) { //////////////////////////////////////////////////////////////////////////////// static int EndRead (TRI_document_collection_t* document) { + if (triagens::arango::Transaction::_makeNolockHeaders != nullptr) { + std::string collName(document->_info._name); + auto it = triagens::arango::Transaction::_makeNolockHeaders->find(collName); + if (it != triagens::arango::Transaction::_makeNolockHeaders->end()) { + // do not lock by command + return TRI_ERROR_NO_ERROR; + } + } TRI_READ_UNLOCK_DOCUMENTS_INDEXES_PRIMARY_COLLECTION(document); return TRI_ERROR_NO_ERROR; @@ -876,6 +892,14 @@ static int EndRead (TRI_document_collection_t* document) { //////////////////////////////////////////////////////////////////////////////// static int BeginWrite (TRI_document_collection_t* document) { + if (triagens::arango::Transaction::_makeNolockHeaders != nullptr) { + std::string collName(document->_info._name); + auto it = triagens::arango::Transaction::_makeNolockHeaders->find(collName); + if (it != triagens::arango::Transaction::_makeNolockHeaders->end()) { + // do not lock by command + return TRI_ERROR_NO_ERROR; + } + } TRI_WRITE_LOCK_DOCUMENTS_INDEXES_PRIMARY_COLLECTION(document); return TRI_ERROR_NO_ERROR; @@ -886,6 +910,14 @@ static int BeginWrite (TRI_document_collection_t* document) { //////////////////////////////////////////////////////////////////////////////// static int EndWrite (TRI_document_collection_t* document) { + if (triagens::arango::Transaction::_makeNolockHeaders != nullptr) { + std::string collName(document->_info._name); + auto it = triagens::arango::Transaction::_makeNolockHeaders->find(collName); + if (it != triagens::arango::Transaction::_makeNolockHeaders->end()) { + // do not lock by command + return TRI_ERROR_NO_ERROR; + } + } TRI_WRITE_UNLOCK_DOCUMENTS_INDEXES_PRIMARY_COLLECTION(document); return TRI_ERROR_NO_ERROR; diff --git a/arangod/VocBase/transaction.cpp b/arangod/VocBase/transaction.cpp index 5a32596453..f36d1aa133 100644 --- a/arangod/VocBase/transaction.cpp +++ b/arangod/VocBase/transaction.cpp @@ -40,6 +40,7 @@ #include "VocBase/vocbase.h" #include "Wal/DocumentOperation.h" #include "Wal/LogfileManager.h" +#include "Utils/Transaction.h" #ifdef TRI_ENABLE_MAINTAINER_MODE @@ -341,6 +342,16 @@ static int LockCollection (TRI_transaction_collection_t* trxCollection, } TRI_ASSERT(trxCollection->_collection != nullptr); + + if (triagens::arango::Transaction::_makeNolockHeaders != nullptr) { + std::string collName(trxCollection->_collection->_name); + auto it = triagens::arango::Transaction::_makeNolockHeaders->find(collName); + if (it != triagens::arango::Transaction::_makeNolockHeaders->end()) { + // do not lock by command + return TRI_ERROR_NO_ERROR; + } + } + TRI_ASSERT(trxCollection->_collection->_collection != nullptr); TRI_ASSERT(! IsLocked(trxCollection)); @@ -394,6 +405,16 @@ static int UnlockCollection (TRI_transaction_collection_t* trxCollection, } TRI_ASSERT(trxCollection->_collection != nullptr); + + if (triagens::arango::Transaction::_makeNolockHeaders != nullptr) { + std::string collName(trxCollection->_collection->_name); + auto it = triagens::arango::Transaction::_makeNolockHeaders->find(collName); + if (it != triagens::arango::Transaction::_makeNolockHeaders->end()) { + // do not lock by command + return TRI_ERROR_NO_ERROR; + } + } + TRI_ASSERT(trxCollection->_collection->_collection != nullptr); TRI_ASSERT(IsLocked(trxCollection)); diff --git a/lib/GeneralServer/GeneralServer.h b/lib/GeneralServer/GeneralServer.h index a76101fe80..52daad6e34 100644 --- a/lib/GeneralServer/GeneralServer.h +++ b/lib/GeneralServer/GeneralServer.h @@ -443,6 +443,7 @@ namespace triagens { RequestStatisticsAgentSetRequestStart(handler); try { + handler->prepareExecute(); try { status = handler->execute(); } @@ -463,6 +464,7 @@ namespace triagens { basics::InternalError err("handleRequestDirectly", __FILE__, __LINE__); handler->handleError(err); } + handler->finalizeExecute(); if (status.status == Handler::HANDLER_REQUEUE) { handler->RequestStatisticsAgent::transfer(task); diff --git a/lib/GeneralServer/GeneralServerJob.h b/lib/GeneralServer/GeneralServerJob.h index 6b00f59c58..55e7ef5219 100644 --- a/lib/GeneralServer/GeneralServerJob.h +++ b/lib/GeneralServer/GeneralServerJob.h @@ -163,7 +163,16 @@ namespace triagens { } RequestStatisticsAgentSetRequestStart(_handler); - Handler::status_t status = _handler->execute(); + _handler->prepareExecute(); + Handler::status_t status; + try { + status = _handler->execute(); + } + catch (...) { + _handler->finalizeExecute(); + throw; + } + _handler->finalizeExecute(); RequestStatisticsAgentSetRequestEnd(_handler); LOG_TRACE("finished job %p with status %d", (void*) this, (int) status.status); diff --git a/lib/Rest/Handler.cpp b/lib/Rest/Handler.cpp index 841dccef0a..a7b831dc72 100644 --- a/lib/Rest/Handler.cpp +++ b/lib/Rest/Handler.cpp @@ -80,6 +80,20 @@ void Handler::setDispatcherThread (DispatcherThread* dispatcherThread) { _dispatcherThread = dispatcherThread; } +//////////////////////////////////////////////////////////////////////////////// +/// @brief prepares for execution +//////////////////////////////////////////////////////////////////////////////// + +void Handler::prepareExecute () { +} + +//////////////////////////////////////////////////////////////////////////////// +/// @brief tries to cancel an execution +//////////////////////////////////////////////////////////////////////////////// + +void Handler::finalizeExecute () { +} + //////////////////////////////////////////////////////////////////////////////// /// @brief tries to cancel an execution //////////////////////////////////////////////////////////////////////////////// diff --git a/lib/Rest/Handler.h b/lib/Rest/Handler.h index 98288f21b1..7c44eec609 100644 --- a/lib/Rest/Handler.h +++ b/lib/Rest/Handler.h @@ -159,12 +159,24 @@ namespace triagens { virtual void setDispatcherThread (DispatcherThread*); +//////////////////////////////////////////////////////////////////////////////// +/// @brief prepares execution of a handler, has to be called before execute +//////////////////////////////////////////////////////////////////////////////// + + virtual void prepareExecute (); + //////////////////////////////////////////////////////////////////////////////// /// @brief executes a handler //////////////////////////////////////////////////////////////////////////////// virtual status_t execute () = 0; +//////////////////////////////////////////////////////////////////////////////// +/// @brief finalizes execution of a handler, has to be called after execute +//////////////////////////////////////////////////////////////////////////////// + + virtual void finalizeExecute (); + //////////////////////////////////////////////////////////////////////////////// /// @brief tries to cancel an execution ////////////////////////////////////////////////////////////////////////////////