1
0
Fork 0

Fix locking of shards in TraverserEngines.

This commit is contained in:
Max Neunhoeffer 2017-03-22 15:24:01 +01:00
parent 66a8ea1538
commit 467088b8af
3 changed files with 29 additions and 9 deletions

View File

@ -1007,7 +1007,14 @@ struct CoordinatorInstanciator : public WalkerWorker<ExecutionNode> {
if (!shardSet.empty()) {
arangodb::CoordTransactionID coordTransactionID = TRI_NewTickServer();
std::unordered_map<std::string, std::string> headers;
std::string shardList;
for (auto const& shard : shardSet) {
if (!shardList.empty()) {
shardList += ",";
}
shardList += shard;
}
headers["X-Arango-Nolock"] = shardList; // Prevent locking
auto res = cc->syncRequest("", coordTransactionID, "server:" + list.first,
RequestType::POST, url, engineInfo.toJson(),
headers, 30.0);
@ -1018,7 +1025,7 @@ struct CoordinatorInstanciator : public WalkerWorker<ExecutionNode> {
message += std::string(" : ") + res->errorMessage;
}
THROW_ARANGO_EXCEPTION_MESSAGE(
TRI_ERROR_QUERY_COLLECTION_LOCK_FAILED, message);
TRI_ERROR_CLUSTER_BACKEND_UNAVAILABLE, message);
} else {
// Only if the result was successful we will get here
arangodb::basics::StringBuffer& body = res->result->getBody();
@ -1028,7 +1035,7 @@ struct CoordinatorInstanciator : public WalkerWorker<ExecutionNode> {
VPackSlice resultSlice = builder->slice();
if (!resultSlice.isNumber()) {
THROW_ARANGO_EXCEPTION_MESSAGE(
TRI_ERROR_INTERNAL, "got unexpected response from engine lock request");
TRI_ERROR_INTERNAL, "got unexpected response from engine build request");
}
auto engineId = resultSlice.getNumericValue<traverser::TraverserEngineID>();
TRI_ASSERT(engineId != 0);
@ -1209,6 +1216,9 @@ ExecutionEngine* ExecutionEngine::instantiateFromPlan(
engine->_lockedShards = new std::unordered_set<std::string>();
engine->_previouslyLockedShards = nullptr;
}
// Note that it is crucial that this is a map and not an unordered_map,
// because we need to guarantee the order of locking by using
// alphabetical order on the shard names!
std::map<std::string, std::pair<std::string, bool>> forLocking;
for (auto& q : inst.get()->queryIds) {
std::string theId = q.first;

View File

@ -94,9 +94,12 @@ BaseTraverserEngine::BaseTraverserEngine(TRI_vocbase_t* vocbase,
auto params = std::make_shared<VPackBuilder>();
auto opts = std::make_shared<VPackBuilder>();
_trx = new aql::AqlTransaction(
arangodb::transaction::StandaloneContext::Create(vocbase),
_collections.collections(), false);
_trx = new arangodb::AqlTransaction(
arangodb::StandaloneTransactionContext::Create(vocbase),
_collections.collections(), true);
// true here as last argument is crucial: it leads to the fact that the
// created transaction is considered a "MAIN" part and will not switch
// off collection locking completely!
_query = new aql::Query(true, vocbase, "", 0, params, opts, aql::PART_DEPENDENT);
_query->injectTransaction(_trx);
@ -112,7 +115,6 @@ BaseTraverserEngine::BaseTraverserEngine(TRI_vocbase_t* vocbase,
}
}
_trx->begin(); // We begin the transaction before we lock.
// We also setup indexes before we lock.
}

View File

@ -627,8 +627,16 @@ void RestVocbaseBaseHandler::prepareExecute() {
std::string const& shardId = _request->header("x-arango-nolock", found);
if (found) {
_nolockHeaderSet =
new std::unordered_set<std::string>{std::string(shardId)};
_nolockHeaderSet = new std::unordered_set<std::string>();
// Split value at commas, if there are any, otherwise take full value:
size_t pos = shardId.find(',');
size_t oldpos = 0;
while (pos != std::string::npos) {
_nolockHeaderSet->emplace(shardId.substr(oldpos, pos - oldpos));
oldpos = pos + 1;
pos = shardId.find(',', oldpos);
}
_nolockHeaderSet->emplace(shardId.substr(oldpos));
CollectionLockState::_noLockHeaders = _nolockHeaderSet;
}
}