1
0
Fork 0

Bug fix/cleanup 31032019 (#8632)

This commit is contained in:
Jan 2019-04-01 17:14:11 +02:00 committed by GitHub
parent 02281d3be4
commit 616ea94f24
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 268 additions and 235 deletions

View File

@ -76,6 +76,11 @@ void QueryRegistry::insert(QueryId id, Query* query, double ttl,
<< "Register query with id " << id << " : " << query->queryString();
auto& vocbase = query->vocbase();
if (vocbase.isDropped()) {
// don't register any queries for dropped databases
THROW_ARANGO_EXCEPTION(TRI_ERROR_ARANGO_DATABASE_NOT_FOUND);
}
// create the query info object outside of the lock
auto p = std::make_unique<QueryInfo>(id, query, ttl, isPrepared);
p->_isOpen = keepLease;
@ -210,6 +215,11 @@ void QueryRegistry::destroy(std::string const& vocbase, QueryId id, int errorCod
// remove query from the table of running queries
m->second.erase(q);
if (m->second.empty()) {
// clear empty entries in database-to-queries map
_queries.erase(m);
}
}
TRI_ASSERT(queryInfo != nullptr);
@ -230,8 +240,30 @@ void QueryRegistry::destroy(std::string const& vocbase, QueryId id, int errorCod
LOG_TOPIC("6756c", DEBUG, arangodb::Logger::AQL) << "query with id " << id << " is now destroyed";
}
void QueryRegistry::destroy(std::string const& vocbase) {
{
WRITE_LOCKER(writeLocker, _lock);
auto m = _queries.find(vocbase);
if (m == _queries.end()) {
return;
}
for (auto& it : (*m).second) {
it.second->_expires = 0.0;
if (it.second->_isOpen) {
// query in use by another thread/request
it.second->_query->kill();
}
}
}
expireQueries();
}
ResultT<bool> QueryRegistry::isQueryInUse(TRI_vocbase_t* vocbase, QueryId id) {
LOG_TOPIC("d9870", DEBUG, arangodb::Logger::AQL) << "Test if query with id " << id << "is in use.";
LOG_TOPIC("d9870", DEBUG, arangodb::Logger::AQL) << "Test if query with id " << id << " is in use.";
READ_LOCKER(readLocker, _lock);
@ -274,7 +306,7 @@ void QueryRegistry::expireQueries() {
}
if (!queriesLeft.empty()) {
LOG_TOPIC("4f142", TRACE, Logger::QUERIES) << "queries left in QueryRegistry: " << queriesLeft;
LOG_TOPIC("4f142", TRACE, arangodb::Logger::AQL) << "queries left in QueryRegistry: " << queriesLeft;
}
for (auto& p : toDelete) {

View File

@ -80,6 +80,10 @@ class QueryRegistry {
/// safe to call if the current thread is currently using the query itself
TEST_VIRTUAL void destroy(std::string const& vocbase, QueryId id, int errorCode, bool ignoreOpened);
/// @brief destroy all queries for the specified database. this can be used
/// when the database gets dropped
void destroy(std::string const& vocbase);
ResultT<bool> isQueryInUse(TRI_vocbase_t* vocbase, QueryId id);
/// @brief expireQueries, this deletes all expired queries from the registry

View File

@ -286,7 +286,7 @@ static arangodb::Result addShardFollower(std::string const& endpoint,
return arangodb::Result(TRI_ERROR_INTERNAL, errorMessage);
}
LOG_TOPIC("79935", DEBUG, Logger::MAINTENANCE) << "cancelReadLockOnLeader: success";
LOG_TOPIC("79935", DEBUG, Logger::MAINTENANCE) << "addShardFollower: success";
return arangodb::Result();
} catch (std::exception const& e) {
std::string errorMsg(
@ -322,6 +322,21 @@ static arangodb::Result cancelReadLockOnLeader(std::string const& endpoint,
auto result = comres->result;
if (result != nullptr && result->getHttpReturnCode() == 404) {
auto const vp = result->getBodyVelocyPack();
auto const& slice = vp->slice();
if (slice.isObject()) {
VPackSlice s = slice.get(StaticStrings::ErrorNum);
if (s.isNumber()) {
int errorNum = s.getNumber<int>();
if (errorNum == TRI_ERROR_ARANGO_DATABASE_NOT_FOUND) {
// database is gone. that means our lock is also gone
return arangodb::Result();
}
}
}
}
if (result == nullptr || result->getHttpReturnCode() != 200) {
auto errorMessage = comres->stringifyErrorMessage();
LOG_TOPIC("52924", ERR, Logger::MAINTENANCE)
@ -443,6 +458,22 @@ arangodb::Result SynchronizeShard::getReadLock(std::string const& endpoint,
LOG_TOPIC("b681f", DEBUG, Logger::MAINTENANCE)
<< "startReadLockOnLeader: Lock not yet acquired...";
} else {
if (result != nullptr && result->getHttpReturnCode() == 404) {
auto const vp = result->getBodyVelocyPack();
auto const& slice = vp->slice();
if (slice.isObject()) {
VPackSlice s = slice.get(StaticStrings::ErrorNum);
if (s.isNumber()) {
int errorNum = s.getNumber<int>();
if (errorNum == TRI_ERROR_ARANGO_DATABASE_NOT_FOUND) {
// database is gone. we can now give up
break;
}
}
}
// fall-through to other cases intentional here
}
LOG_TOPIC("a82bc", DEBUG, Logger::MAINTENANCE)
<< "startReadLockOnLeader: Do not see read lock yet:"
<< putres->stringifyErrorMessage();

View File

@ -32,9 +32,7 @@
#include "Transaction/Manager.h"
#include "Transaction/ManagerFeature.h"
#include "Transaction/Methods.h"
#include "Utils/ExecContext.h"
#include "VocBase/LogicalCollection.h"
#include "VocBase/ticks.h"
using namespace arangodb;

View File

@ -595,18 +595,9 @@ OperationResult GraphOperations::createDocument(transaction::Methods* trx,
options.waitForSync = waitForSync;
options.returnNew = returnNew;
OperationResult result;
result = trx->insert(collectionName, document, options);
OperationResult result = trx->insert(collectionName, document, options);
result.result = trx->finish(result.result);
if (!result.ok()) {
trx->finish(result.result);
return OperationResult(TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND);
}
Result res = trx->finish(result.result);
if (result.ok() && res.fail()) {
return OperationResult(res);
}
return result;
}
@ -695,25 +686,21 @@ OperationResult GraphOperations::createEdge(const std::string& definitionName,
transaction::Options trxOptions;
trxOptions.waitForSync = waitForSync;
std::unique_ptr<transaction::Methods> trx(
new UserTransaction(ctx(), readCollections, writeCollections, {}, trxOptions));
UserTransaction trx(ctx(), readCollections, writeCollections, {}, trxOptions);
Result res = trx->begin();
Result res = trx.begin();
if (!res.ok()) {
return OperationResult(TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND);
}
OperationResult resultFrom = trx->document(fromCollectionName, bF.slice(), options);
OperationResult resultTo = trx->document(toCollectionName, bT.slice(), options);
if (!resultFrom.ok()) {
trx->finish(resultFrom.result);
OperationResult resultFrom = trx.document(fromCollectionName, bF.slice(), options);
OperationResult resultTo = trx.document(toCollectionName, bT.slice(), options);
if (!resultFrom.ok() || !resultTo.ok()) {
// actual result doesn't matter here
trx.finish(Result());
return OperationResult(TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND);
}
if (!resultTo.ok()) {
trx->finish(resultTo.result);
return OperationResult(TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND);
}
return createDocument(trx.get(), definitionName, document, waitForSync, returnNew);
return createDocument(&trx, definitionName, document, waitForSync, returnNew);
}
OperationResult GraphOperations::updateVertex(const std::string& collectionName,
@ -741,16 +728,15 @@ OperationResult GraphOperations::createVertex(const std::string& collectionName,
std::vector<std::string> writeCollections;
writeCollections.emplace_back(collectionName);
std::unique_ptr<transaction::Methods> trx(
new UserTransaction(ctx(), {}, writeCollections, {}, trxOptions));
UserTransaction trx(ctx(), {}, writeCollections, {}, trxOptions);
Result res = trx->begin();
Result res = trx.begin();
if (!res.ok()) {
return OperationResult(res);
}
return createDocument(trx.get(), collectionName, document, waitForSync, returnNew);
return createDocument(&trx, collectionName, document, waitForSync, returnNew);
}
OperationResult GraphOperations::removeEdgeOrVertex(const std::string& collectionName,

View File

@ -2582,6 +2582,10 @@ int MMFilesCollection::lockRead(bool useDeadlockDetector,
if (now - startTime < 0.001) {
std::this_thread::yield();
} else {
if (_logicalCollection.vocbase().isDropped()) {
return TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND;
}
std::this_thread::sleep_for(std::chrono::microseconds(waitTime));
if (waitTime < 32) {
waitTime *= 2;
@ -2695,6 +2699,9 @@ int MMFilesCollection::lockWrite(bool useDeadlockDetector,
if (now - startTime < 0.001) {
std::this_thread::yield();
} else {
if (_logicalCollection.vocbase().isDropped()) {
return TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND;
}
std::this_thread::sleep_for(std::chrono::microseconds(waitTime));
if (waitTime < 32) {
waitTime *= 2;

View File

@ -92,53 +92,53 @@ void MMFilesCollectionExport::run(uint64_t maxWaitTime, size_t limit) {
}
}
{
auto ctx = transaction::StandaloneContext::Create(_collection->vocbase());
SingleCollectionTransaction trx(ctx, _name, AccessMode::Type::READ);
auto guard = scopeGuard([this]() {
// delete guard right now as we're about to return
// if we would continue holding the guard's collection lock and return,
// and the export object gets later freed in a different thread, then all
// would be lost. so we'll release the lock here and rely on the cleanup
// thread not unloading the collection (as we've acquired a document ditch
// for the collection already - this will prevent unloading of the
// collection's datafiles etc.)
_guard.reset();
});
// already locked by guard above
trx.addHint(transaction::Hints::Hint::NO_USAGE_LOCK);
auto ctx = transaction::StandaloneContext::Create(_collection->vocbase());
SingleCollectionTransaction trx(ctx, _name, AccessMode::Type::READ);
Result res = trx.begin();
// already locked by _guard instance variable
trx.addHint(transaction::Hints::Hint::NO_USAGE_LOCK);
if (!res.ok()) {
THROW_ARANGO_EXCEPTION(res);
}
Result res = trx.begin();
size_t maxDocuments =
_collection->numberDocuments(&trx, transaction::CountType::Normal);
if (limit > 0 && limit < maxDocuments) {
maxDocuments = limit;
} else {
limit = maxDocuments;
}
_vpack.reserve(maxDocuments);
MMFilesCollection* mmColl = MMFilesCollection::toMMFilesCollection(_collection);
ManagedDocumentResult mmdr;
trx.invokeOnAllElements(_collection->name(), [this, &limit, &trx, &mmdr,
mmColl](LocalDocumentId const& token) {
if (limit == 0) {
return false;
}
if (mmColl->readDocumentConditional(&trx, token, 0, mmdr)) {
_vpack.emplace_back(mmdr.vpack());
--limit;
}
return true;
});
trx.finish(res.errorNumber());
if (!res.ok()) {
THROW_ARANGO_EXCEPTION(res);
}
// delete guard right now as we're about to return
// if we would continue holding the guard's collection lock and return,
// and the export object gets later freed in a different thread, then all
// would be lost. so we'll release the lock here and rely on the cleanup
// thread not unloading the collection (as we've acquired a document ditch
// for the collection already - this will prevent unloading of the
// collection's datafiles etc.)
_guard.reset();
size_t maxDocuments =
_collection->numberDocuments(&trx, transaction::CountType::Normal);
if (limit > 0 && limit < maxDocuments) {
maxDocuments = limit;
} else {
limit = maxDocuments;
}
_vpack.reserve(maxDocuments);
MMFilesCollection* mmColl = MMFilesCollection::toMMFilesCollection(_collection);
ManagedDocumentResult mmdr;
trx.invokeOnAllElements(_collection->name(), [this, &limit, &trx, &mmdr,
mmColl](LocalDocumentId const& token) {
if (limit == 0) {
return false;
}
if (mmColl->readDocumentConditional(&trx, token, 0, mmdr)) {
_vpack.emplace_back(mmdr.vpack());
--limit;
}
return true;
});
trx.finish(res);
}

View File

@ -289,7 +289,7 @@ void MMFilesCollectorThread::run() {
if (!isStopping() && !engine->isCompactionDisabled()) {
// don't collect additional logfiles in case we want to shut down
bool worked;
int res = this->collectLogfiles(worked);
int res = collectLogfiles(worked);
if (res == TRI_ERROR_NO_ERROR) {
hasWorked |= worked;
@ -300,17 +300,11 @@ void MMFilesCollectorThread::run() {
// step 2: update master pointers
bool worked;
int res = this->processQueuedOperations(worked);
if (res == TRI_ERROR_NO_ERROR) {
hasWorked |= worked;
} else if (res == TRI_ERROR_ARANGO_FILESYSTEM_FULL) {
doDelay = true;
}
} catch (arangodb::basics::Exception const& ex) {
int res = ex.code();
processQueuedOperations(worked);
hasWorked |= worked;
} catch (std::exception const& ex) {
LOG_TOPIC("943ec", ERR, Logger::COLLECTOR)
<< "got unexpected error in collectorThread::run: " << TRI_errno_string(res);
<< "got unexpected error in collectorThread::run: " << ex.what();
} catch (...) {
LOG_TOPIC("f9ec6", ERR, Logger::COLLECTOR)
<< "got unspecific error in collectorThread::run";
@ -401,7 +395,7 @@ int MMFilesCollectorThread::collectLogfiles(bool& worked) {
try {
int res = collect(logfile);
LOG_TOPIC("917e7", TRACE, Logger::COLLECTOR)
<< "collected logfile: " << logfile->id() << ". result: " << res;
<< "collected logfile: " << logfile->id() << ". result: " << TRI_errno_string(res);
if (res == TRI_ERROR_NO_ERROR) {
// reset collector status
@ -425,7 +419,7 @@ int MMFilesCollectorThread::collectLogfiles(bool& worked) {
int res = ex.code();
LOG_TOPIC("9d55c", DEBUG, Logger::COLLECTOR) << "collecting logfile " << logfile->id()
<< " failed: " << TRI_errno_string(res);
<< " failed: " << ex.what();
return res;
} catch (...) {
@ -438,25 +432,25 @@ int MMFilesCollectorThread::collectLogfiles(bool& worked) {
}
/// @brief step 2: process all still-queued collection operations
int MMFilesCollectorThread::processQueuedOperations(bool& worked) {
void MMFilesCollectorThread::processQueuedOperations(bool& worked) {
MMFilesEngine* engine = static_cast<MMFilesEngine*>(EngineSelectorFeature::ENGINE);
// always init result variable
worked = false;
TRI_IF_FAILURE("CollectorThreadProcessQueuedOperations") {
return TRI_ERROR_NO_ERROR;
return;
}
if (engine->isCompactionDisabled()) {
return TRI_ERROR_NO_ERROR;
return;
}
{
MUTEX_LOCKER(mutexLocker, _operationsQueueLock);
if (_operationsQueueInUse || _operationsQueue.empty()) {
// nothing to do
return TRI_ERROR_NO_ERROR;
return;
}
// this flag indicates that no one else must write to the queue
@ -483,7 +477,7 @@ int MMFilesCollectorThread::processQueuedOperations(bool& worked) {
int res = TRI_ERROR_INTERNAL;
try {
res = processCollectionOperations((*it2));
res = processCollectionOperations((*it2).get());
} catch (arangodb::basics::Exception const& ex) {
res = ex.code();
LOG_TOPIC("00a20", TRACE, Logger::COLLECTOR)
@ -537,9 +531,6 @@ int MMFilesCollectorThread::processQueuedOperations(bool& worked) {
_numPendingOperations -= numOperations;
// delete the object
delete (*it2);
// delete the element from the vector while iterating over the vector
it2 = operations.erase(it2);
@ -552,8 +543,6 @@ int MMFilesCollectorThread::processQueuedOperations(bool& worked) {
// next collection
}
return TRI_ERROR_NO_ERROR;
}
void MMFilesCollectorThread::clearQueuedOperations() {
@ -595,8 +584,7 @@ void MMFilesCollectorThread::clearQueuedOperations() {
_numPendingOperations -= cache->operations->size();
_logfileManager->decreaseCollectQueueSize(cache->logfile);
delete cache;
cache = nullptr;
cache.reset();
} catch (...) {
// ignore things like collection not found, database not found etc.
// on shutdown
@ -604,7 +592,7 @@ void MMFilesCollectorThread::clearQueuedOperations() {
// finally remove all the nullptrs from the vector
operations.erase(std::remove_if(operations.begin(), operations.end(),
[](MMFilesCollectorCache* cache) {
[](std::unique_ptr<MMFilesCollectorCache> const& cache) {
return cache == nullptr;
}),
operations.end());
@ -1008,7 +996,7 @@ int MMFilesCollectorThread::transferMarkers(MMFilesWalLogfile* logfile,
<< ", number of bytes transferred: " << numBytesTransferred;
if (res == TRI_ERROR_NO_ERROR && !cache->operations->empty()) {
queueOperations(logfile, cache);
queueOperations(logfile, std::move(cache));
}
} catch (arangodb::basics::Exception const& ex) {
res = ex.code();
@ -1028,8 +1016,8 @@ int MMFilesCollectorThread::transferMarkers(MMFilesWalLogfile* logfile,
}
/// @brief insert the collect operations into a per-collection queue
int MMFilesCollectorThread::queueOperations(arangodb::MMFilesWalLogfile* logfile,
std::unique_ptr<MMFilesCollectorCache>& cache) {
void MMFilesCollectorThread::queueOperations(arangodb::MMFilesWalLogfile* logfile,
std::unique_ptr<MMFilesCollectorCache> cache) {
TRI_ASSERT(cache != nullptr);
TRI_voc_cid_t cid = cache->collectionId;
@ -1042,19 +1030,11 @@ int MMFilesCollectorThread::queueOperations(arangodb::MMFilesWalLogfile* logfile
{
MUTEX_LOCKER(mutexLocker, _operationsQueueLock);
// it is only safe to access the queue if this flag is not set
if (!_operationsQueueInUse) {
// it is only safe to access the queue if this flag is not set
auto it = _operationsQueue.find(cid);
if (it == _operationsQueue.end()) {
_operationsQueue.emplace(cid,
std::vector<MMFilesCollectorCache*>({cache.get()}));
_logfileManager->increaseCollectQueueSize(logfile);
} else {
(*it).second.push_back(cache.get());
_logfileManager->increaseCollectQueueSize(logfile);
}
_operationsQueue[cid].emplace_back(std::move(cache));
// now _operationsQueue is responsible for managing the cache entry
cache.release();
_logfileManager->increaseCollectQueueSize(logfile);
// exit the loop
break;
@ -1077,8 +1057,6 @@ int MMFilesCollectorThread::queueOperations(arangodb::MMFilesWalLogfile* logfile
}
_numPendingOperations += numOperations;
return TRI_ERROR_NO_ERROR;
}
/// @brief update a collection's datafile information

View File

@ -90,23 +90,23 @@ class MMFilesCollectorThread final : public Thread {
size_t numQueuedOperations();
/// @brief step 1: perform collection of a logfile (if any)
int collectLogfiles(bool&);
int collectLogfiles(bool& worked);
/// @brief step 2: process all still-queued collection operations
int processQueuedOperations(bool&);
void processQueuedOperations(bool& worked);
/// @brief process all operations for a single collection
int processCollectionOperations(MMFilesCollectorCache*);
/// @brief collect one logfile
int collect(MMFilesWalLogfile*);
int collect(MMFilesWalLogfile* logfile);
/// @brief transfer markers into a collection
int transferMarkers(MMFilesWalLogfile*, TRI_voc_cid_t, TRI_voc_tick_t,
int64_t, MMFilesOperationsType const&);
/// @brief insert the collect operations into a per-collection queue
int queueOperations(MMFilesWalLogfile*, std::unique_ptr<MMFilesCollectorCache>&);
void queueOperations(MMFilesWalLogfile*, std::unique_ptr<MMFilesCollectorCache>);
/// @brief update a collection's datafile information
int updateDatafileStatistics(LogicalCollection*, MMFilesCollectorCache*);
@ -132,7 +132,7 @@ class MMFilesCollectorThread final : public Thread {
arangodb::Mutex _operationsQueueLock;
/// @brief operations to collect later
std::unordered_map<TRI_voc_cid_t, std::vector<MMFilesCollectorCache*>> _operationsQueue;
std::unordered_map<TRI_voc_cid_t, std::vector<std::unique_ptr<MMFilesCollectorCache>>> _operationsQueue;
/// @brief whether or not the queue is currently in use
bool _operationsQueueInUse;

View File

@ -1462,7 +1462,7 @@ MMFilesWalLogfile* MMFilesLogfileManager::getCollectableLogfile() {
}
LOG_TOPIC("dbee0", DEBUG, Logger::ENGINES) << "getCollectableLogfile: "
<< "found no logfile to collect, minId:" << minId;
<< "found no logfile to collect, minId: " << minId;
return nullptr;
}

View File

@ -713,7 +713,6 @@ void MMFilesRestReplicationHandler::handleCommandGetKeys() {
auto collectionKeysId =
static_cast<CollectionKeysId>(arangodb::basics::StringUtils::uint64(id));
auto collectionKeys = keysRepository->find(collectionKeysId);
if (collectionKeys == nullptr) {
@ -721,38 +720,34 @@ void MMFilesRestReplicationHandler::handleCommandGetKeys() {
TRI_ERROR_CURSOR_NOT_FOUND);
return;
}
TRI_DEFER(collectionKeys->release());
try {
VPackBuilder b;
b.add(VPackValue(VPackValueType::Array));
VPackBuilder b;
b.add(VPackValue(VPackValueType::Array));
TRI_voc_tick_t max = static_cast<TRI_voc_tick_t>(collectionKeys->count());
TRI_voc_tick_t max = static_cast<TRI_voc_tick_t>(collectionKeys->count());
for (TRI_voc_tick_t from = 0; from < max; from += chunkSize) {
TRI_voc_tick_t to = from + chunkSize;
for (TRI_voc_tick_t from = 0; from < max; from += chunkSize) {
TRI_voc_tick_t to = from + chunkSize;
if (to > max) {
to = max;
}
auto result = collectionKeys->hashChunk(static_cast<size_t>(from),
static_cast<size_t>(to));
// Add a chunk
b.add(VPackValue(VPackValueType::Object));
b.add("low", VPackValue(std::get<0>(result)));
b.add("high", VPackValue(std::get<1>(result)));
b.add("hash", VPackValue(std::to_string(std::get<2>(result))));
b.close();
if (to > max) {
to = max;
}
b.close();
collectionKeys->release();
generateResult(rest::ResponseCode::OK, b.slice());
} catch (...) {
collectionKeys->release();
throw;
auto result = collectionKeys->hashChunk(static_cast<size_t>(from),
static_cast<size_t>(to));
// Add a chunk
b.add(VPackValue(VPackValueType::Object));
b.add("low", VPackValue(std::get<0>(result)));
b.add("high", VPackValue(std::get<1>(result)));
b.add("hash", VPackValue(std::to_string(std::get<2>(result))));
b.close();
}
b.close();
generateResult(rest::ResponseCode::OK, b.slice());
}
/// @brief returns date for a key range
@ -829,39 +824,32 @@ void MMFilesRestReplicationHandler::handleCommandFetchKeys() {
TRI_ERROR_CURSOR_NOT_FOUND);
return;
}
TRI_DEFER(collectionKeys->release());
try {
auto ctx = transaction::StandaloneContext::Create(_vocbase);
VPackBuilder resultBuilder(ctx->getVPackOptions());
auto ctx = transaction::StandaloneContext::Create(_vocbase);
VPackBuilder resultBuilder(ctx->getVPackOptions());
resultBuilder.openArray();
resultBuilder.openArray();
if (keys) {
collectionKeys->dumpKeys(resultBuilder, chunk, static_cast<size_t>(chunkSize));
} else {
bool success = false;
VPackSlice parsedIds = this->parseVPackBody(success);
if (keys) {
collectionKeys->dumpKeys(resultBuilder, chunk, static_cast<size_t>(chunkSize));
} else {
bool success = false;
VPackSlice parsedIds = this->parseVPackBody(success);
if (!success) {
// error already created
collectionKeys->release();
return;
}
collectionKeys->dumpDocs(resultBuilder, chunk, static_cast<size_t>(chunkSize),
offsetInChunk, maxChunkSize, parsedIds);
if (!success) {
// error already created
return;
}
resultBuilder.close();
collectionKeys->release();
generateResult(rest::ResponseCode::OK, resultBuilder.slice(), ctx);
return;
} catch (...) {
collectionKeys->release();
throw;
collectionKeys->dumpDocs(resultBuilder, chunk, static_cast<size_t>(chunkSize),
offsetInChunk, maxChunkSize, parsedIds);
}
resultBuilder.close();
generateResult(rest::ResponseCode::OK, resultBuilder.slice(), ctx);
}
void MMFilesRestReplicationHandler::handleCommandRemoveKeys() {

View File

@ -2727,12 +2727,16 @@ Result RestReplicationHandler::createBlockingTransaction(aql::QueryId id,
// is not responsible anymore, it has been handed over to the
// registry.
auto q = query.release();
// Make sure to return the query after we are done
TRI_DEFER(queryRegistry->close(&_vocbase, id));
auto guard = scopeGuard([this, id, &queryRegistry]() {
queryRegistry->close(&_vocbase, id);
});
if (isTombstoned(id)) {
try {
// Code does not matter, read only access, so we can roll back.
guard.fire();
// error code does not matter, read only access, so we can roll back.
// we can ignore the openness here, as it was our thread that had
// inserted the query just a couple of instructions before
queryRegistry->destroy(_vocbase.name(), id, TRI_ERROR_QUERY_KILLED, true /*ignoreOpened*/);
@ -2757,6 +2761,9 @@ ResultT<bool> RestReplicationHandler::isLockHeld(aql::QueryId id) const {
if (queryRegistry == nullptr) {
return ResultT<bool>::error(TRI_ERROR_SHUTTING_DOWN);
}
if (_vocbase.isDropped()) {
return ResultT<bool>::error(TRI_ERROR_ARANGO_DATABASE_NOT_FOUND);
}
auto res = queryRegistry->isQueryInUse(&_vocbase, id);
if (!res.ok()) {
// API compatibility otherwise just return res...

View File

@ -159,6 +159,12 @@ void DatabaseManagerThread::run() {
TRI_RemoveDirectory(path.c_str());
}
}
auto queryRegistry = QueryRegistryFeature::registry();
if (queryRegistry != nullptr) {
// destroy all items in the QueryRegistry for this database
queryRegistry->destroy(database->name());
}
try {
engine->dropDatabase(*database);

View File

@ -163,6 +163,14 @@ void RocksDBTransactionCollection::unuse(int nestingLevel) {
}
void RocksDBTransactionCollection::release() {
// questionable, but seems to work
if (_transaction->hasHint(transaction::Hints::Hint::LOCK_NEVER) ||
_transaction->hasHint(transaction::Hints::Hint::NO_USAGE_LOCK)) {
TRI_ASSERT(!_usageLocked);
_collection = nullptr;
return;
}
if (isLocked()) {
// unlock our own r/w locks
doUnlock(_accessType, 0);
@ -178,7 +186,6 @@ void RocksDBTransactionCollection::release() {
_transaction->vocbase().releaseCollection(_collection.get());
_usageLocked = false;
}
_collection = nullptr;
} else {
TRI_ASSERT(!_usageLocked);

View File

@ -28,6 +28,10 @@
using namespace arangodb;
TransactionCollection::~TransactionCollection() {
TRI_ASSERT(_collection == nullptr);
}
std::string TransactionCollection::collectionName() const {
TRI_ASSERT(_collection != nullptr);
return _collection->name();

View File

@ -45,11 +45,10 @@ class TransactionCollection {
TransactionCollection(TransactionState* trx, TRI_voc_cid_t cid, AccessMode::Type accessType)
: _transaction(trx),
_cid(cid),
_collection(nullptr),
_accessType(accessType),
_lockType(AccessMode::Type::NONE) {}
virtual ~TransactionCollection() = default;
virtual ~TransactionCollection();
inline TRI_voc_cid_t id() const { return _cid; }
@ -109,4 +108,4 @@ class TransactionCollection {
} // namespace arangodb
#endif
#endif

View File

@ -341,18 +341,11 @@ int TransactionState::checkCollectionPermission(std::string const& cname,
}
/// @brief release collection locks for a transaction
int TransactionState::releaseCollections() {
if (hasHint(transaction::Hints::Hint::LOCK_NEVER) ||
hasHint(transaction::Hints::Hint::NO_USAGE_LOCK)) {
return TRI_ERROR_NO_ERROR;
}
void TransactionState::releaseCollections() {
// process collections in reverse order
for (auto it = _collections.rbegin(); it != _collections.rend(); ++it) {
(*it)->release();
}
return TRI_ERROR_NO_ERROR;
}
/// @brief clear the query cache for all collections that were modified by

View File

@ -224,14 +224,12 @@ class TransactionState {
int checkCollectionPermission(std::string const& cname, AccessMode::Type) const;
/// @brief release collection locks for a transaction
int releaseCollections();
void releaseCollections();
/// @brief clear the query cache for all collections that were modified by
/// the transaction
void clearQueryCache();
protected:
protected:
TRI_vocbase_t& _vocbase; /// @brief vocbase for this transaction
TRI_voc_tid_t const _id; /// @brief local trx id

View File

@ -163,14 +163,20 @@ Manager::ManagedTrx::~ManagedTrx() {
delete state;
return;
}
transaction::Options opts;
auto ctx = std::make_shared<transaction::ManagedContext>(2, state, AccessMode::Type::NONE);
MGMethods trx(ctx, opts); // own state now
trx.begin();
TRI_ASSERT(state->nestingLevel() == 1);
state->decreaseNesting();
TRI_ASSERT(state->isTopLevelTransaction());
trx.abort();
try {
transaction::Options opts;
auto ctx = std::make_shared<transaction::ManagedContext>(2, state, AccessMode::Type::NONE);
MGMethods trx(ctx, opts); // own state now
trx.begin();
TRI_ASSERT(state->nestingLevel() == 1);
state->decreaseNesting();
TRI_ASSERT(state->isTopLevelTransaction());
trx.abort();
} catch (...) {
// obviously it is not good to consume all exceptions here,
// but we are in a destructor and must never throw from here
}
}
using namespace arangodb;
@ -605,7 +611,7 @@ Result Manager::updateTransaction(TRI_voc_tid_t tid,
trx.state()->decreaseNesting();
TRI_ASSERT(trx.state()->isTopLevelTransaction());
if (clearServers) {
state->clearKnownServers();
trx.state()->clearKnownServers();
}
if (status == transaction::Status::COMMITTED) {
res = trx.commit();

View File

@ -32,6 +32,10 @@
#include "VocBase/LogicalCollection.h"
#include "VocBase/LogicalView.h"
#include "VocBase/vocbase.h"
namespace {
std::string const UNKNOWN("_unknown");
}
namespace arangodb {
@ -202,7 +206,7 @@ std::string CollectionNameResolver::getCollectionName(TRI_voc_cid_t cid) const {
}
}
std::string name = localNameLookup(cid);
std::string name = lookupName(cid);
{
WRITE_LOCKER(locker, _idLock);
_resolvedIds.emplace(cid, name);
@ -236,8 +240,8 @@ std::string CollectionNameResolver::getCollectionNameCluster(TRI_voc_cid_t cid)
if (ServerState::isDBServer(_serverRole)) {
// This might be a local system collection:
name = localNameLookup(cid);
if (name != "_unknown") {
name = lookupName(cid);
if (name != ::UNKNOWN) {
WRITE_LOCKER(locker, _idLock);
_resolvedIds.emplace(cid, name);
return name;
@ -284,13 +288,12 @@ std::string CollectionNameResolver::getCollectionName(std::string const& nameOrI
nameOrId.data() + nameOrId.size()));
}
std::string CollectionNameResolver::localNameLookup(TRI_voc_cid_t cid) const {
static const std::string UNKNOWN("_unknown");
std::string CollectionNameResolver::lookupName(TRI_voc_cid_t cid) const {
auto collection = _vocbase.lookupCollection(cid);
// exactly as in the non-cluster case
if (!ServerState::isDBServer(_serverRole)) {
return collection ? collection->name() : UNKNOWN;
return collection ? collection->name() : ::UNKNOWN;
}
// DBserver case of a shard:
@ -301,11 +304,11 @@ std::string CollectionNameResolver::localNameLookup(TRI_voc_cid_t cid) const {
}
// can be empty, if collection unknown
if ((collection == nullptr) || (collection->name().empty())) {
return UNKNOWN;
} else {
if (collection != nullptr && !collection->name().empty()) {
return collection->name();
}
return ::UNKNOWN;
}
std::shared_ptr<LogicalDataSource> CollectionNameResolver::getDataSource(TRI_voc_cid_t id) const {

View File

@ -93,14 +93,7 @@ class CollectionNameResolver {
/// coordinator it will use the cluster wide lookup.
//////////////////////////////////////////////////////////////////////////////
TRI_voc_cid_t getCollectionId(std::string const& name) const;
private:
//////////////////////////////////////////////////////////////////////////////
/// @brief look up a collection struct for a collection name
//////////////////////////////////////////////////////////////////////////////
std::shared_ptr<arangodb::LogicalCollection> getCollectionStruct(std::string const& name) const;
public:
//////////////////////////////////////////////////////////////////////////////
/// @brief look up a collection name for a collection id, this implements
/// some magic in the cluster case: a DBserver in a cluster will automatically
@ -164,10 +157,15 @@ class CollectionNameResolver {
TRI_voc_cid_t id) const;
private:
//////////////////////////////////////////////////////////////////////////////
/// @brief look up a collection struct for a collection name
//////////////////////////////////////////////////////////////////////////////
std::shared_ptr<arangodb::LogicalCollection> getCollectionStruct(std::string const& name) const;
mutable std::unordered_map<TRI_voc_cid_t, std::shared_ptr<LogicalDataSource>> _dataSourceById; // cached data-source by id
mutable std::unordered_map<std::string, std::shared_ptr<LogicalDataSource>> _dataSourceByName; // cached data-source by name
std::string localNameLookup(TRI_voc_cid_t cid) const;
std::string lookupName(TRI_voc_cid_t cid) const;
//////////////////////////////////////////////////////////////////////////////
/// @brief vocbase base pointer

View File

@ -1462,12 +1462,6 @@ std::shared_ptr<arangodb::LogicalCollection> TRI_vocbase_t::useCollection(
return useCollectionInternal(std::move(collection), status);
}
/// @brief locks a collection for usage by name
std::shared_ptr<arangodb::LogicalCollection> TRI_vocbase_t::useCollectionByUuid(
std::string const& uuid, TRI_vocbase_col_status_e& status) {
return useCollectionInternal(lookupCollectionByUuid(uuid), status);
}
std::shared_ptr<arangodb::LogicalCollection> TRI_vocbase_t::useCollectionInternal(
std::shared_ptr<arangodb::LogicalCollection> coll, TRI_vocbase_col_status_e& status) {
if (!coll) {
@ -2099,4 +2093,4 @@ TRI_voc_rid_t TRI_StringToRid(char const* p, size_t len, bool& isOld, bool warn)
// -----------------------------------------------------------------------------
// --SECTION-- END-OF-FILE
// -----------------------------------------------------------------------------
// -----------------------------------------------------------------------------

View File

@ -346,13 +346,6 @@ struct TRI_vocbase_t {
std::shared_ptr<arangodb::LogicalCollection> useCollection(std::string const& name,
TRI_vocbase_col_status_e&);
/// @brief locks a collection for usage by uuid
/// Note that this will READ lock the collection you have to release the
/// collection lock by yourself and call @ref TRI_ReleaseCollectionVocBase
/// when you are done with the collection.
std::shared_ptr<arangodb::LogicalCollection> useCollectionByUuid(std::string const& uuid,
TRI_vocbase_col_status_e&);
/// @brief releases a collection from usage
void releaseCollection(arangodb::LogicalCollection* collection);
@ -411,4 +404,4 @@ void TRI_SanitizeObject(arangodb::velocypack::Slice const slice,
void TRI_SanitizeObjectWithEdges(arangodb::velocypack::Slice const slice,
arangodb::velocypack::Builder& builder);
#endif
#endif

View File

@ -363,6 +363,7 @@ rest::ResponseCode GeneralResponse::responseCode(int code) {
case TRI_ERROR_FORBIDDEN:
return ResponseCode::FORBIDDEN;
case TRI_ERROR_HTTP_NOT_FOUND:
case TRI_ERROR_ARANGO_DATABASE_NOT_FOUND:
case TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND:
case TRI_ERROR_ARANGO_COLLECTION_NOT_LOADED: