mirror of https://gitee.com/bigwinds/arangodb
Squashed commit of the following: organise locking in distributed AQL
Locking is now done in an extra round after the query is fully instanciated in the cluster. All participating shards are locked in alphabetical order of their shard ID (local collection name). For this to work there is a new action in the RestAqlHandler plus a mechanism to prevent the usual locking from happening: Each thread has a thread local static class variable of triagens::arango::Transaction::_makeNolockHeaders which is of type std::unordered_set<std::string>*. Whenever this is not equal to nullptr and a local collection name is stored in there, no locking or unlocking takes place. This information is forwarded by the X-Arango-Nolock HTTP header, whenever an HTTP request is sent via ClusterComm to a shard.
This commit is contained in:
parent
67aa5b67e5
commit
e50a705d9a
|
@ -37,6 +37,7 @@
|
|||
#include "Utils/Exception.h"
|
||||
|
||||
using namespace triagens::aql;
|
||||
using namespace triagens::arango;
|
||||
using Json = triagens::basics::Json;
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -162,7 +163,9 @@ ExecutionEngine::ExecutionEngine (Query* query)
|
|||
_blocks(),
|
||||
_root(nullptr),
|
||||
_query(query),
|
||||
_wasShutdown(false) {
|
||||
_wasShutdown(false),
|
||||
_previouslyLockedShards(nullptr),
|
||||
_lockedShards(nullptr) {
|
||||
|
||||
_blocks.reserve(8);
|
||||
}
|
||||
|
@ -425,6 +428,7 @@ struct CoordinatorInstanciator : public WalkerWorker<ExecutionNode> {
|
|||
// itoa(ID of RemoteNode in original plan) + "_" + shardId
|
||||
// and the value is the
|
||||
// queryId on DBserver
|
||||
// with a * appended, if it is a PART_MAIN query.
|
||||
// The second case is a query, which lives on the coordinator but is not
|
||||
// the main query. For these, we store
|
||||
// itoa(ID of RemoteNode in original plan)
|
||||
|
@ -550,6 +554,7 @@ struct CoordinatorInstanciator : public WalkerWorker<ExecutionNode> {
|
|||
"/_api/aql/instanciate");
|
||||
|
||||
auto headers = new std::map<std::string, std::string>;
|
||||
(*headers)["X-Arango-Nolock"] = shardId; // Prevent locking
|
||||
auto res = cc->asyncRequest("",
|
||||
coordTransactionID,
|
||||
"shard:" + shardId,
|
||||
|
@ -599,7 +604,12 @@ struct CoordinatorInstanciator : public WalkerWorker<ExecutionNode> {
|
|||
std::string theID
|
||||
= triagens::basics::StringUtils::itoa(info.idOfRemoteNode)
|
||||
+ "_" + res->shardID;
|
||||
queryIds.emplace(theID, queryId);
|
||||
if (info.part == triagens::aql::PART_MAIN) {
|
||||
queryIds.emplace(theID, queryId+"*");
|
||||
}
|
||||
else {
|
||||
queryIds.emplace(theID, queryId);
|
||||
}
|
||||
}
|
||||
else {
|
||||
error += "DB SERVER ANSWERED WITH ERROR: ";
|
||||
|
@ -731,12 +741,15 @@ struct CoordinatorInstanciator : public WalkerWorker<ExecutionNode> {
|
|||
if (it == queryIds.end()) {
|
||||
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "could not find query id in list");
|
||||
}
|
||||
|
||||
std::string idThere = it->second;
|
||||
if (idThere.back() == '*') {
|
||||
idThere.pop_back();
|
||||
}
|
||||
ExecutionBlock* r = new RemoteBlock(engine.get(),
|
||||
remoteNode,
|
||||
"shard:" + shardId, // server
|
||||
"", // ownName
|
||||
(*it).second); // queryId
|
||||
idThere); // queryId
|
||||
|
||||
try {
|
||||
engine.get()->addBlock(r);
|
||||
|
@ -926,6 +939,56 @@ ExecutionEngine* ExecutionEngine::instanciateFromPlan (QueryRegistry* queryRegis
|
|||
try {
|
||||
engine = inst.get()->buildEngines();
|
||||
root = engine->root();
|
||||
// Now find all shards that take part:
|
||||
if (Transaction::_makeNolockHeaders != nullptr) {
|
||||
engine->_lockedShards = new std::unordered_set<std::string>(*Transaction::_makeNolockHeaders);
|
||||
engine->_previouslyLockedShards = Transaction::_makeNolockHeaders;
|
||||
}
|
||||
else {
|
||||
engine->_lockedShards = new std::unordered_set<std::string>();
|
||||
engine->_previouslyLockedShards = nullptr;
|
||||
}
|
||||
std::map<std::string, std::string> forLocking;
|
||||
for (auto& q : inst.get()->queryIds) {
|
||||
std::string theId = q.first;
|
||||
std::string queryId = q.second;
|
||||
auto pos = theId.find('_');
|
||||
if (pos != std::string::npos) {
|
||||
// So this is a remote one on a DBserver:
|
||||
if (queryId.back() == '*') { // only the PART_MAIN one!
|
||||
queryId.pop_back();
|
||||
std::string shardId = theId.substr(pos+1);
|
||||
engine->_lockedShards->insert(shardId);
|
||||
forLocking.emplace(shardId, queryId);
|
||||
}
|
||||
}
|
||||
}
|
||||
// Now lock them all in the right order:
|
||||
for (auto& p : forLocking) {
|
||||
std::string const& shardId = p.first;
|
||||
std::string const& queryId = p.second;
|
||||
// Lock shard on DBserver:
|
||||
triagens::arango::CoordTransactionID coordTransactionID
|
||||
= TRI_NewTickServer();
|
||||
auto cc = triagens::arango::ClusterComm::instance();
|
||||
TRI_vocbase_t* vocbase = query->vocbase();
|
||||
std::string const url("/_db/"
|
||||
+ triagens::basics::StringUtils::urlEncode(vocbase->_name)
|
||||
+ "/_api/aql/lock/" + queryId);
|
||||
std::map<std::string, std::string> headers;
|
||||
auto res = cc->syncRequest("", coordTransactionID,
|
||||
"shard:" + shardId,
|
||||
triagens::rest::HttpRequest::HTTP_REQUEST_PUT, url, "{}",
|
||||
headers, 30.0);
|
||||
if (res->status != CL_COMM_SENT) {
|
||||
delete res;
|
||||
THROW_ARANGO_EXCEPTION_MESSAGE(
|
||||
TRI_ERROR_QUERY_COLLECTION_LOCK_FAILED,
|
||||
"could not lock all shards");
|
||||
}
|
||||
delete res;
|
||||
}
|
||||
Transaction::_makeNolockHeaders = engine->_lockedShards;
|
||||
}
|
||||
catch (...) {
|
||||
// We need to destroy all queries that we have built and stuffed
|
||||
|
@ -943,14 +1006,17 @@ ExecutionEngine* ExecutionEngine::instanciateFromPlan (QueryRegistry* queryRegis
|
|||
triagens::arango::CoordTransactionID coordTransactionID
|
||||
= TRI_NewTickServer();
|
||||
auto cc = triagens::arango::ClusterComm::instance();
|
||||
if (queryId.back() == '*') {
|
||||
queryId.pop_back();
|
||||
}
|
||||
std::string const url("/_db/"
|
||||
+ triagens::basics::StringUtils::urlEncode(vocbase->_name)
|
||||
+ "/_api/aql/shutdown/" + queryId);
|
||||
std::map<std::string, std::string> headers;
|
||||
auto res = cc->syncRequest("", coordTransactionID,
|
||||
"shard:" + shardId,
|
||||
triagens::rest::HttpRequest::HTTP_REQUEST_POST, url, "",
|
||||
headers, 30.0);
|
||||
triagens::rest::HttpRequest::HTTP_REQUEST_PUT, url,
|
||||
"{\"code\": 0}", headers, 30.0);
|
||||
// Ignore result
|
||||
delete res;
|
||||
}
|
||||
|
|
|
@ -133,6 +133,19 @@ namespace triagens {
|
|||
|
||||
int shutdown (int errorCode) {
|
||||
if (_root != nullptr && ! _wasShutdown) {
|
||||
|
||||
// Take care of locking prevention measures in the cluster:
|
||||
if (_lockedShards != nullptr) {
|
||||
if (triagens::arango::Transaction::_makeNolockHeaders ==
|
||||
_lockedShards) {
|
||||
triagens::arango::Transaction::_makeNolockHeaders
|
||||
= _previouslyLockedShards;
|
||||
}
|
||||
delete _lockedShards;
|
||||
_lockedShards = nullptr;
|
||||
_previouslyLockedShards = nullptr;
|
||||
}
|
||||
|
||||
// prevent a duplicate shutdown
|
||||
int res = _root->shutdown(errorCode);
|
||||
_wasShutdown = true;
|
||||
|
@ -218,6 +231,14 @@ namespace triagens {
|
|||
|
||||
ExecutionStats _stats;
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief _lockedShards
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
std::unordered_set<std::string>* lockedShards () {
|
||||
return _lockedShards;
|
||||
}
|
||||
|
||||
// -----------------------------------------------------------------------------
|
||||
// --SECTION-- private variables
|
||||
// -----------------------------------------------------------------------------
|
||||
|
@ -247,6 +268,20 @@ namespace triagens {
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
bool _wasShutdown;
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief _previouslyLockedShards, this is read off at instanciating
|
||||
/// time from a thread local variable
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
std::unordered_set<std::string>* _previouslyLockedShards;
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief _lockedShards, these are the shards we have locked for our query
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
std::unordered_set<std::string>* _lockedShards;
|
||||
|
||||
};
|
||||
|
||||
}
|
||||
|
|
|
@ -28,8 +28,10 @@
|
|||
#include "Aql/QueryRegistry.h"
|
||||
#include "Basics/WriteLocker.h"
|
||||
#include "Basics/ReadLocker.h"
|
||||
#include "Aql/ExecutionEngine.h"
|
||||
|
||||
using namespace triagens::aql;
|
||||
using namespace triagens::arango;
|
||||
|
||||
// -----------------------------------------------------------------------------
|
||||
// --SECTION-- the QueryRegistry class
|
||||
|
@ -112,6 +114,16 @@ void QueryRegistry::insert (QueryId id,
|
|||
|
||||
// Also, we need to count down the debugging counters for transactions:
|
||||
triagens::arango::TransactionBase::increaseNumbers(-1, -1);
|
||||
|
||||
// If we have set _makeNolockHeaders, we need to unset it:
|
||||
if (Transaction::_makeNolockHeaders != nullptr) {
|
||||
if (Transaction::_makeNolockHeaders == query->engine()->lockedShards()) {
|
||||
Transaction::_makeNolockHeaders = nullptr;
|
||||
}
|
||||
else {
|
||||
LOG_WARNING("Found strange lockedShards in thread!");
|
||||
}
|
||||
}
|
||||
}
|
||||
else {
|
||||
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL,
|
||||
|
@ -146,6 +158,16 @@ Query* QueryRegistry::open (TRI_vocbase_t* vocbase,
|
|||
// We need to count up the debugging counters for transactions:
|
||||
triagens::arango::TransactionBase::increaseNumbers(1, 1);
|
||||
|
||||
// If we had set _makeNolockHeaders, we need to reset it:
|
||||
if (qi->_query->engine()->lockedShards() != nullptr) {
|
||||
if (Transaction::_makeNolockHeaders == nullptr) {
|
||||
Transaction::_makeNolockHeaders = qi->_query->engine()->lockedShards();
|
||||
}
|
||||
else {
|
||||
LOG_WARNING("Found strange lockedShards in thread, not overwriting!");
|
||||
}
|
||||
}
|
||||
|
||||
return qi->_query;
|
||||
}
|
||||
|
||||
|
@ -175,6 +197,16 @@ void QueryRegistry::close (TRI_vocbase_t* vocbase, QueryId id, double ttl) {
|
|||
// We need to count down the debugging counters for transactions:
|
||||
triagens::arango::TransactionBase::increaseNumbers(-1, -1);
|
||||
|
||||
// If we have set _makeNolockHeaders, we need to unset it:
|
||||
if (Transaction::_makeNolockHeaders != nullptr) {
|
||||
if (Transaction::_makeNolockHeaders == qi->_query->engine()->lockedShards()) {
|
||||
Transaction::_makeNolockHeaders = nullptr;
|
||||
}
|
||||
else {
|
||||
LOG_WARNING("Found strange lockedShards in thread!");
|
||||
}
|
||||
}
|
||||
|
||||
qi->_isOpen = false;
|
||||
qi->_expires = TRI_microtime() + qi->_timeToLive;
|
||||
}
|
||||
|
@ -205,6 +237,15 @@ void QueryRegistry::destroy (std::string const& vocbase,
|
|||
if (! qi->_isOpen) {
|
||||
// We need to count up the debugging counters for transactions:
|
||||
triagens::arango::TransactionBase::increaseNumbers(1, 1);
|
||||
// If we had set _makeNolockHeaders, we need to reset it:
|
||||
if (qi->_query->engine()->lockedShards() != nullptr) {
|
||||
if (Transaction::_makeNolockHeaders == nullptr) {
|
||||
Transaction::_makeNolockHeaders = qi->_query->engine()->lockedShards();
|
||||
}
|
||||
else {
|
||||
LOG_WARNING("Found strange lockedShards in thread, not overwriting!");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (errorCode == TRI_ERROR_NO_ERROR) {
|
||||
|
|
|
@ -70,7 +70,7 @@ const std::string RestAqlHandler::QUEUE_NAME = "STANDARD";
|
|||
RestAqlHandler::RestAqlHandler (triagens::rest::HttpRequest* request,
|
||||
std::pair<ApplicationV8*,
|
||||
QueryRegistry*>* pair)
|
||||
: RestBaseHandler(request),
|
||||
: RestVocbaseBaseHandler(request),
|
||||
_applicationV8(pair->first),
|
||||
_context(static_cast<VocbaseContext*>(request->getRequestContext())),
|
||||
_vocbase(_context->getVocbase()),
|
||||
|
@ -357,7 +357,7 @@ void RestAqlHandler::createQueryFromString () {
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief PUT method for /_api/aql/<operation>/<queryId>, this is using
|
||||
/// the part of the cursor API with side effects.
|
||||
/// <operation>: can be "getSome" or "skip" or "initializeCursor" or
|
||||
/// <operation>: can be "lock" or "getSome" or "skip" or "initializeCursor" or
|
||||
/// "shutdown".
|
||||
/// The body must be a Json with the following attributes:
|
||||
/// For the "getSome" operation one has to give:
|
||||
|
@ -397,8 +397,8 @@ void RestAqlHandler::createQueryFromString () {
|
|||
/// "items": This is a serialised AqlItemBlock with usually only one row
|
||||
/// and the correct number of columns.
|
||||
/// "pos": The number of the row in "items" to take, usually 0.
|
||||
/// For the "shutdown" operation no additional arguments are required and
|
||||
/// an empty JSON object in the body is OK.
|
||||
/// For the "shutdown" and "lock" operations no additional arguments are
|
||||
/// required and an empty JSON object in the body is OK.
|
||||
/// All operations allow to set the HTTP header "Shard-ID:". If this is
|
||||
/// set, then the root block of the stored query must be a ScatterBlock
|
||||
/// and the shard ID is given as an additional argument to the ScatterBlock's
|
||||
|
@ -596,7 +596,6 @@ triagens::rest::HttpHandler::status_t RestAqlHandler::execute () {
|
|||
// extract the sub-request type
|
||||
HttpRequest::HttpRequestType type = _request->requestType();
|
||||
|
||||
|
||||
// execute one of the CRUD methods
|
||||
switch (type) {
|
||||
case HttpRequest::HTTP_REQUEST_POST: {
|
||||
|
@ -720,7 +719,21 @@ void RestAqlHandler::handleUseQuery (std::string const& operation,
|
|||
|
||||
Json answerBody(Json::Object, 3);
|
||||
|
||||
if (operation == "getSome") {
|
||||
if (operation == "lock") {
|
||||
int res = TRI_ERROR_INTERNAL;
|
||||
try {
|
||||
res = query->trx()->lockCollections();
|
||||
}
|
||||
catch (...) {
|
||||
LOG_ERROR("lock lead to an exception");
|
||||
generateError(HttpResponse::SERVER_ERROR, TRI_ERROR_HTTP_SERVER_ERROR,
|
||||
"lock lead to an exception");
|
||||
return;
|
||||
}
|
||||
answerBody("error", res == TRI_ERROR_NO_ERROR ? Json(false) : Json(true))
|
||||
("code", Json(static_cast<double>(res)));
|
||||
}
|
||||
else if (operation == "getSome") {
|
||||
auto atLeast = JsonHelper::getNumericValue<uint64_t>(queryJson.json(),
|
||||
"atLeast", 1);
|
||||
auto atMost = JsonHelper::getNumericValue<uint64_t>(queryJson.json(),
|
||||
|
|
|
@ -35,6 +35,7 @@
|
|||
#include "Admin/RestBaseHandler.h"
|
||||
#include "V8Server/ApplicationV8.h"
|
||||
#include "RestServer/VocbaseContext.h"
|
||||
#include "RestHandler/RestVocbaseBaseHandler.h"
|
||||
#include "Aql/QueryRegistry.h"
|
||||
#include "Aql/types.h"
|
||||
|
||||
|
@ -55,7 +56,7 @@ namespace triagens {
|
|||
/// @brief shard control request handler
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
class RestAqlHandler : public admin::RestBaseHandler {
|
||||
class RestAqlHandler : public arango::RestVocbaseBaseHandler {
|
||||
|
||||
// -----------------------------------------------------------------------------
|
||||
// --SECTION-- constructors and destructors
|
||||
|
|
|
@ -35,6 +35,7 @@
|
|||
#include "Basics/StringUtils.h"
|
||||
#include "SimpleHttpClient/ConnectionManager.h"
|
||||
#include "Dispatcher/DispatcherThread.h"
|
||||
#include "Utils/Transaction.h"
|
||||
|
||||
#include "VocBase/server.h"
|
||||
|
||||
|
@ -194,6 +195,12 @@ ClusterCommResult* ClusterComm::asyncRequest (
|
|||
op->shardID = destination.substr(6);
|
||||
op->serverID = ClusterInfo::instance()->getResponsibleServer(op->shardID);
|
||||
LOG_DEBUG("Responsible server: %s", op->serverID.c_str());
|
||||
if (triagens::arango::Transaction::_makeNolockHeaders != nullptr) {
|
||||
auto it = triagens::arango::Transaction::_makeNolockHeaders->find(op->shardID);
|
||||
if (it != triagens::arango::Transaction::_makeNolockHeaders->end()) {
|
||||
(*headerFields)["X-Arango-Nolock"] = op->shardID;
|
||||
}
|
||||
}
|
||||
}
|
||||
else if (destination.substr(0,7) == "server:") {
|
||||
op->shardID = "";
|
||||
|
@ -280,6 +287,8 @@ ClusterCommResult* ClusterComm::syncRequest (
|
|||
map<string, string> const& headerFields,
|
||||
ClusterCommTimeout timeout) {
|
||||
|
||||
map<string, string> headersCopy(headerFields);
|
||||
|
||||
ClusterCommResult* res = new ClusterCommResult();
|
||||
res->clientTransactionID = clientTransactionID;
|
||||
res->coordTransactionID = coordTransactionID;
|
||||
|
@ -301,6 +310,12 @@ ClusterCommResult* ClusterComm::syncRequest (
|
|||
res->status = CL_COMM_ERROR;
|
||||
return res;
|
||||
}
|
||||
if (triagens::arango::Transaction::_makeNolockHeaders != nullptr) {
|
||||
auto it = triagens::arango::Transaction::_makeNolockHeaders->find(res->shardID);
|
||||
if (it != triagens::arango::Transaction::_makeNolockHeaders->end()) {
|
||||
headersCopy["X-Arango-Nolock"] = res->shardID;
|
||||
}
|
||||
}
|
||||
}
|
||||
else if (destination.substr(0, 7) == "server:") {
|
||||
res->shardID = "";
|
||||
|
@ -346,8 +361,7 @@ ClusterCommResult* ClusterComm::syncRequest (
|
|||
endTime - currentTime, false);
|
||||
client->keepConnectionOnDestruction(true);
|
||||
|
||||
map<string, string> headersCopy(headerFields);
|
||||
headersCopy.emplace(make_pair(string("Authorization"), ServerState::instance()->getAuthentication()));
|
||||
headersCopy["Authorization"] = ServerState::instance()->getAuthentication();
|
||||
#ifdef DEBUG_CLUSTER_COMM
|
||||
#ifdef TRI_ENABLE_MAINTAINER_MODE
|
||||
#if HAVE_BACKTRACE
|
||||
|
|
|
@ -184,7 +184,7 @@ RestBatchHandler::~RestBatchHandler () {
|
|||
/// @END_EXAMPLE_ARANGOSH_RUN
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
Handler::status_t RestBatchHandler::execute() {
|
||||
Handler::status_t RestBatchHandler::execute () {
|
||||
// extract the request type
|
||||
const HttpRequest::HttpRequestType type = _request->requestType();
|
||||
|
||||
|
@ -304,6 +304,7 @@ Handler::status_t RestBatchHandler::execute() {
|
|||
Handler::status_t status(Handler::HANDLER_FAILED);
|
||||
|
||||
do {
|
||||
handler->prepareExecute();
|
||||
try {
|
||||
status = handler->execute();
|
||||
}
|
||||
|
@ -319,6 +320,7 @@ Handler::status_t RestBatchHandler::execute() {
|
|||
triagens::basics::InternalError err("executeDirectHandler", __FILE__, __LINE__);
|
||||
handler->handleError(err);
|
||||
}
|
||||
handler->finalizeExecute();
|
||||
}
|
||||
while (status.status == Handler::HANDLER_REQUEUE);
|
||||
|
||||
|
|
|
@ -114,7 +114,7 @@ namespace triagens {
|
|||
/// {@inheritDoc}
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
Handler::status_t execute();
|
||||
Handler::status_t execute ();
|
||||
|
||||
// -----------------------------------------------------------------------------
|
||||
// --SECTION-- private methods
|
||||
|
|
|
@ -88,7 +88,7 @@ RestReplicationHandler::~RestReplicationHandler () {
|
|||
/// {@inheritDoc}
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
Handler::status_t RestReplicationHandler::execute() {
|
||||
Handler::status_t RestReplicationHandler::execute () {
|
||||
// extract the request type
|
||||
const HttpRequest::HttpRequestType type = _request->requestType();
|
||||
|
||||
|
|
|
@ -84,7 +84,7 @@ namespace triagens {
|
|||
/// {@inheritDoc}
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
Handler::status_t execute();
|
||||
Handler::status_t execute ();
|
||||
|
||||
// -----------------------------------------------------------------------------
|
||||
// --SECTION-- public static methods
|
||||
|
|
|
@ -67,7 +67,7 @@ RestUploadHandler::~RestUploadHandler () {
|
|||
/// {@inheritDoc}
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
Handler::status_t RestUploadHandler::execute() {
|
||||
Handler::status_t RestUploadHandler::execute () {
|
||||
// extract the request type
|
||||
const HttpRequest::HttpRequestType type = _request->requestType();
|
||||
|
||||
|
|
|
@ -72,7 +72,7 @@ namespace triagens {
|
|||
/// {@inheritDoc}
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
Handler::status_t execute();
|
||||
Handler::status_t execute ();
|
||||
|
||||
};
|
||||
}
|
||||
|
|
|
@ -105,7 +105,8 @@ const string RestVocbaseBaseHandler::QUEUE_NAME = "STANDARD";
|
|||
RestVocbaseBaseHandler::RestVocbaseBaseHandler (HttpRequest* request)
|
||||
: RestBaseHandler(request),
|
||||
_context(static_cast<VocbaseContext*>(request->getRequestContext())),
|
||||
_vocbase(_context->getVocbase()) {
|
||||
_vocbase(_context->getVocbase()),
|
||||
_nolockHeaderSet(nullptr) {
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -656,6 +657,32 @@ int RestVocbaseBaseHandler::parseDocumentId (CollectionNameResolver const* resol
|
|||
return TRI_ERROR_NO_ERROR;
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief prepareExecute, to react to X-Arango-Nolock header
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
void RestVocbaseBaseHandler::prepareExecute () {
|
||||
bool found;
|
||||
char const* shardId = _request->header("x-arango-nolock", found);
|
||||
if (found) {
|
||||
_nolockHeaderSet = new std::unordered_set<std::string>();
|
||||
_nolockHeaderSet->insert(std::string(shardId));
|
||||
triagens::arango::Transaction::_makeNolockHeaders = _nolockHeaderSet;
|
||||
}
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief finalizeExecute, to react to X-Arango-Nolock header
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
void RestVocbaseBaseHandler::finalizeExecute () {
|
||||
if (_nolockHeaderSet != nullptr) {
|
||||
triagens::arango::Transaction::_makeNolockHeaders = nullptr;
|
||||
delete _nolockHeaderSet;
|
||||
_nolockHeaderSet = nullptr;
|
||||
}
|
||||
}
|
||||
|
||||
// -----------------------------------------------------------------------------
|
||||
// --SECTION-- END-OF-FILE
|
||||
// -----------------------------------------------------------------------------
|
||||
|
|
|
@ -403,6 +403,30 @@ namespace triagens {
|
|||
|
||||
std::string const& queue () const;
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief prepareExecute, to react to X-Arango-Nolock header
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
virtual void prepareExecute ();
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief finalizeExecute, to react to X-Arango-Nolock header
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
virtual void finalizeExecute ();
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief _nolockHeaderFound
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
bool _nolockHeaderFound;
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief _nolockHeaderFound
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
std::unordered_set<std::string>* _nolockHeaderSet;
|
||||
|
||||
};
|
||||
}
|
||||
}
|
||||
|
|
|
@ -179,6 +179,32 @@ namespace triagens {
|
|||
&_collections, false);
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief lockCollections, this is needed in a corner case in AQL: we need
|
||||
/// to lock all shards in a controlled way when we set up a distributed
|
||||
/// execution engine. To this end, we prevent the standard mechanism to
|
||||
/// lock collections on the DBservers when we instanciate the query. Then,
|
||||
/// in a second round, we need to lock the shards in exactly the right
|
||||
/// order via an HTTP call. This method is used to implement that HTTP action.
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
int lockCollections () {
|
||||
auto trx = getInternals();
|
||||
size_t i = trx->_collections._length;
|
||||
|
||||
while (i-- > 0) {
|
||||
TRI_transaction_collection_t* trxCollection
|
||||
= static_cast<TRI_transaction_collection_t*>
|
||||
(TRI_AtVectorPointer(&trx->_collections, i));
|
||||
int res = TRI_UnlockCollectionTransaction(trxCollection,
|
||||
trxCollection->_accessType, 0);
|
||||
if (res != TRI_ERROR_NO_ERROR) {
|
||||
return res;
|
||||
}
|
||||
}
|
||||
return TRI_ERROR_NO_ERROR;
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief keep a copy of the collections, this is needed for the clone
|
||||
/// operation
|
||||
|
|
|
@ -49,6 +49,14 @@ thread_local int TransactionBase::_numberTrxInScope = 0;
|
|||
thread_local int TransactionBase::_numberTrxActive = 0;
|
||||
#endif
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief if this pointer is set to an actual set, then for each request
|
||||
/// sent to a shardId using the ClusterComm library, an X-Arango-Nolock
|
||||
/// header is generated.
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
thread_local std::unordered_set<std::string>* Transaction::_makeNolockHeaders = nullptr;
|
||||
|
||||
// -----------------------------------------------------------------------------
|
||||
// --SECTION-- END-OF-FILE
|
||||
// -----------------------------------------------------------------------------
|
||||
|
|
|
@ -1504,6 +1504,14 @@ namespace triagens {
|
|||
|
||||
TransactionContext* _transactionContext;
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief makeNolockHeaders
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
public:
|
||||
|
||||
static thread_local std::unordered_set<std::string>* _makeNolockHeaders;
|
||||
|
||||
};
|
||||
|
||||
}
|
||||
|
|
|
@ -856,6 +856,14 @@ static int CloneMarkerNoLegend (triagens::wal::Marker*& marker,
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
static int BeginRead (TRI_document_collection_t* document) {
|
||||
if (triagens::arango::Transaction::_makeNolockHeaders != nullptr) {
|
||||
std::string collName(document->_info._name);
|
||||
auto it = triagens::arango::Transaction::_makeNolockHeaders->find(collName);
|
||||
if (it != triagens::arango::Transaction::_makeNolockHeaders->end()) {
|
||||
// do not lock by command
|
||||
return TRI_ERROR_NO_ERROR;
|
||||
}
|
||||
}
|
||||
TRI_READ_LOCK_DOCUMENTS_INDEXES_PRIMARY_COLLECTION(document);
|
||||
|
||||
return TRI_ERROR_NO_ERROR;
|
||||
|
@ -866,6 +874,14 @@ static int BeginRead (TRI_document_collection_t* document) {
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
static int EndRead (TRI_document_collection_t* document) {
|
||||
if (triagens::arango::Transaction::_makeNolockHeaders != nullptr) {
|
||||
std::string collName(document->_info._name);
|
||||
auto it = triagens::arango::Transaction::_makeNolockHeaders->find(collName);
|
||||
if (it != triagens::arango::Transaction::_makeNolockHeaders->end()) {
|
||||
// do not lock by command
|
||||
return TRI_ERROR_NO_ERROR;
|
||||
}
|
||||
}
|
||||
TRI_READ_UNLOCK_DOCUMENTS_INDEXES_PRIMARY_COLLECTION(document);
|
||||
|
||||
return TRI_ERROR_NO_ERROR;
|
||||
|
@ -876,6 +892,14 @@ static int EndRead (TRI_document_collection_t* document) {
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
static int BeginWrite (TRI_document_collection_t* document) {
|
||||
if (triagens::arango::Transaction::_makeNolockHeaders != nullptr) {
|
||||
std::string collName(document->_info._name);
|
||||
auto it = triagens::arango::Transaction::_makeNolockHeaders->find(collName);
|
||||
if (it != triagens::arango::Transaction::_makeNolockHeaders->end()) {
|
||||
// do not lock by command
|
||||
return TRI_ERROR_NO_ERROR;
|
||||
}
|
||||
}
|
||||
TRI_WRITE_LOCK_DOCUMENTS_INDEXES_PRIMARY_COLLECTION(document);
|
||||
|
||||
return TRI_ERROR_NO_ERROR;
|
||||
|
@ -886,6 +910,14 @@ static int BeginWrite (TRI_document_collection_t* document) {
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
static int EndWrite (TRI_document_collection_t* document) {
|
||||
if (triagens::arango::Transaction::_makeNolockHeaders != nullptr) {
|
||||
std::string collName(document->_info._name);
|
||||
auto it = triagens::arango::Transaction::_makeNolockHeaders->find(collName);
|
||||
if (it != triagens::arango::Transaction::_makeNolockHeaders->end()) {
|
||||
// do not lock by command
|
||||
return TRI_ERROR_NO_ERROR;
|
||||
}
|
||||
}
|
||||
TRI_WRITE_UNLOCK_DOCUMENTS_INDEXES_PRIMARY_COLLECTION(document);
|
||||
|
||||
return TRI_ERROR_NO_ERROR;
|
||||
|
|
|
@ -40,6 +40,7 @@
|
|||
#include "VocBase/vocbase.h"
|
||||
#include "Wal/DocumentOperation.h"
|
||||
#include "Wal/LogfileManager.h"
|
||||
#include "Utils/Transaction.h"
|
||||
|
||||
#ifdef TRI_ENABLE_MAINTAINER_MODE
|
||||
|
||||
|
@ -341,6 +342,16 @@ static int LockCollection (TRI_transaction_collection_t* trxCollection,
|
|||
}
|
||||
|
||||
TRI_ASSERT(trxCollection->_collection != nullptr);
|
||||
|
||||
if (triagens::arango::Transaction::_makeNolockHeaders != nullptr) {
|
||||
std::string collName(trxCollection->_collection->_name);
|
||||
auto it = triagens::arango::Transaction::_makeNolockHeaders->find(collName);
|
||||
if (it != triagens::arango::Transaction::_makeNolockHeaders->end()) {
|
||||
// do not lock by command
|
||||
return TRI_ERROR_NO_ERROR;
|
||||
}
|
||||
}
|
||||
|
||||
TRI_ASSERT(trxCollection->_collection->_collection != nullptr);
|
||||
TRI_ASSERT(! IsLocked(trxCollection));
|
||||
|
||||
|
@ -394,6 +405,16 @@ static int UnlockCollection (TRI_transaction_collection_t* trxCollection,
|
|||
}
|
||||
|
||||
TRI_ASSERT(trxCollection->_collection != nullptr);
|
||||
|
||||
if (triagens::arango::Transaction::_makeNolockHeaders != nullptr) {
|
||||
std::string collName(trxCollection->_collection->_name);
|
||||
auto it = triagens::arango::Transaction::_makeNolockHeaders->find(collName);
|
||||
if (it != triagens::arango::Transaction::_makeNolockHeaders->end()) {
|
||||
// do not lock by command
|
||||
return TRI_ERROR_NO_ERROR;
|
||||
}
|
||||
}
|
||||
|
||||
TRI_ASSERT(trxCollection->_collection->_collection != nullptr);
|
||||
TRI_ASSERT(IsLocked(trxCollection));
|
||||
|
||||
|
|
|
@ -443,6 +443,7 @@ namespace triagens {
|
|||
RequestStatisticsAgentSetRequestStart(handler);
|
||||
|
||||
try {
|
||||
handler->prepareExecute();
|
||||
try {
|
||||
status = handler->execute();
|
||||
}
|
||||
|
@ -463,6 +464,7 @@ namespace triagens {
|
|||
basics::InternalError err("handleRequestDirectly", __FILE__, __LINE__);
|
||||
handler->handleError(err);
|
||||
}
|
||||
handler->finalizeExecute();
|
||||
|
||||
if (status.status == Handler::HANDLER_REQUEUE) {
|
||||
handler->RequestStatisticsAgent::transfer(task);
|
||||
|
|
|
@ -163,7 +163,16 @@ namespace triagens {
|
|||
}
|
||||
|
||||
RequestStatisticsAgentSetRequestStart(_handler);
|
||||
Handler::status_t status = _handler->execute();
|
||||
_handler->prepareExecute();
|
||||
Handler::status_t status;
|
||||
try {
|
||||
status = _handler->execute();
|
||||
}
|
||||
catch (...) {
|
||||
_handler->finalizeExecute();
|
||||
throw;
|
||||
}
|
||||
_handler->finalizeExecute();
|
||||
RequestStatisticsAgentSetRequestEnd(_handler);
|
||||
|
||||
LOG_TRACE("finished job %p with status %d", (void*) this, (int) status.status);
|
||||
|
|
|
@ -80,6 +80,20 @@ void Handler::setDispatcherThread (DispatcherThread* dispatcherThread) {
|
|||
_dispatcherThread = dispatcherThread;
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief prepares for execution
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
void Handler::prepareExecute () {
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief tries to cancel an execution
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
void Handler::finalizeExecute () {
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief tries to cancel an execution
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
|
|
@ -159,12 +159,24 @@ namespace triagens {
|
|||
|
||||
virtual void setDispatcherThread (DispatcherThread*);
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief prepares execution of a handler, has to be called before execute
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
virtual void prepareExecute ();
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief executes a handler
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
virtual status_t execute () = 0;
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief finalizes execution of a handler, has to be called after execute
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
virtual void finalizeExecute ();
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief tries to cancel an execution
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
|
Loading…
Reference in New Issue