diff --git a/arangod/RestHandler/RestCursorHandler.cpp b/arangod/RestHandler/RestCursorHandler.cpp index fdf343dbf4..6dbebb1117 100644 --- a/arangod/RestHandler/RestCursorHandler.cpp +++ b/arangod/RestHandler/RestCursorHandler.cpp @@ -107,17 +107,21 @@ RestStatus RestCursorHandler::continueExecute() { void RestCursorHandler::shutdownExecute(bool isFinalized) noexcept { TRI_DEFER(RestVocbaseBaseHandler::shutdownExecute(isFinalized)); - auto const type = _request->requestType(); // request not done yet if (_state == HandlerState::PAUSED) { return; } - + // only trace create cursor requests - if (type != rest::RequestType::POST) { + if (_request->requestType() != rest::RequestType::POST) { return; } + + // destroy the query context. + // this is needed because the context is managing resources (e.g. leases + // for a managed transaction) that we want to free as early as possible + _queryResult.context.reset(); if (!_isValidForFinalize || _auditLogged) { // set by RestCursorHandler before @@ -344,6 +348,11 @@ RestStatus RestCursorHandler::handleQueryResult() { THROW_ARANGO_EXCEPTION(TRI_ERROR_OUT_OF_MEMORY); } generateResult(rest::ResponseCode::CREATED, std::move(buffer), _queryResult.context); + // directly after returning from here, we will free the query's context and free the + // resources it uses (e.g. leases for a managed transaction). this way the server + // can send back the query result to the client and the client can make follow-up + // requests on the same transaction (e.g. trx.commit()) without the server code for + // freeing the resources and the client code racing for who's first return RestStatus::DONE; } else { // result is bigger than batchSize, and a cursor will be created diff --git a/arangod/RestHandler/RestIndexHandler.cpp b/arangod/RestHandler/RestIndexHandler.cpp index d33bcf5b32..dc7d475993 100644 --- a/arangod/RestHandler/RestIndexHandler.cpp +++ b/arangod/RestHandler/RestIndexHandler.cpp @@ -26,6 +26,7 @@ #include "Cluster/ClusterInfo.h" #include "Cluster/ServerState.h" #include "Rest/HttpRequest.h" +#include "Transaction/StandaloneContext.h" #include "Utils/Events.h" #include "Utils/SingleCollectionTransaction.h" #include "VocBase/LogicalCollection.h" @@ -172,14 +173,30 @@ RestStatus RestIndexHandler::getSelectivityEstimates() { // ............................................................................. bool found = false; - std::string cName = _request->value("collection", found); + std::string const& cName = _request->value("collection", found); if (cName.empty()) { generateError(rest::ResponseCode::NOT_FOUND, TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND); return RestStatus::DONE; } // transaction protects access onto selectivity estimates - auto trx = createTransaction(cName, AccessMode::Type::READ); + std::unique_ptr trx; + + try { + trx = createTransaction(cName, AccessMode::Type::READ); + } catch (basics::Exception const& ex) { + if (ex.code() == TRI_ERROR_TRANSACTION_NOT_FOUND) { + // this will happen if the tid of a managed transaction is passed in, + // but the transaction hasn't yet started on the DB server. in + // this case, we create an ad-hoc transaction on the underlying + // collection + trx = std::make_unique(transaction::StandaloneContext::Create(_vocbase), cName, AccessMode::Type::READ); + } else { + throw; + } + } + + TRI_ASSERT(trx != nullptr); Result res = trx->begin(); if (res.fail()) { diff --git a/arangod/Transaction/Manager.cpp b/arangod/Transaction/Manager.cpp index d010d43068..0fbfdb4e7a 100644 --- a/arangod/Transaction/Manager.cpp +++ b/arangod/Transaction/Manager.cpp @@ -47,6 +47,8 @@ #include #include +#include + namespace { bool authorized(std::string const& user) { auto context = arangodb::ExecContext::CURRENT; @@ -476,7 +478,7 @@ std::shared_ptr Manager::leaseManagedTrx(TRI_voc_tid_t tid return nullptr; } - const size_t bucket = getBucket(tid); + size_t const bucket = getBucket(tid); int i = 0; TransactionState* state = nullptr; do { @@ -508,8 +510,10 @@ std::shared_ptr Manager::leaseManagedTrx(TRI_voc_tid_t tid state = mtrx.state; break; } - THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_TRANSACTION_DISALLOWED_OPERATION, - "transaction is already in use"); + + LOG_TOPIC("abd72", DEBUG, Logger::TRANSACTIONS) << "transaction '" << tid << "' is already in use"; + THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_LOCKED, + std::string("transaction '") + std::to_string(tid) + "' is already in use"); } writeLocker.unlock(); // failure; @@ -591,13 +595,34 @@ transaction::Status Manager::getManagedTrxStatus(TRI_voc_tid_t tid) const { return transaction::Status::ABORTED; } } + + +Result Manager::statusChangeWithTimeout(TRI_voc_tid_t tid, transaction::Status status) { + double startTime = 0.0; + constexpr double maxWaitTime = 2.0; + Result res; + while (true) { + res = updateTransaction(tid, status, false); + if (res.ok() || !res.is(TRI_ERROR_LOCKED)) { + break; + } + if (startTime <= 0.0001) { // fp tolerance + startTime = TRI_microtime(); + } else if (TRI_microtime() - startTime > maxWaitTime) { + // timeout + break; + } + std::this_thread::yield(); + } + return res; +} Result Manager::commitManagedTrx(TRI_voc_tid_t tid) { - return updateTransaction(tid, transaction::Status::COMMITTED, false); + return statusChangeWithTimeout(tid, transaction::Status::COMMITTED); } Result Manager::abortManagedTrx(TRI_voc_tid_t tid) { - return updateTransaction(tid, transaction::Status::ABORTED, false); + return statusChangeWithTimeout(tid, transaction::Status::ABORTED); } Result Manager::updateTransaction(TRI_voc_tid_t tid, transaction::Status status, @@ -609,7 +634,7 @@ Result Manager::updateTransaction(TRI_voc_tid_t tid, transaction::Status status, << "managed trx '" << tid << " updating to '" << status << "'"; Result res; - const size_t bucket = getBucket(tid); + size_t const bucket = getBucket(tid); bool wasExpired = false; std::unique_ptr state; @@ -626,9 +651,11 @@ Result Manager::updateTransaction(TRI_voc_tid_t tid, transaction::Status status, ManagedTrx& mtrx = it->second; TRY_WRITE_LOCKER(tryGuard, mtrx.rwlock); if (!tryGuard.isLocked()) { - return res.reset(TRI_ERROR_TRANSACTION_DISALLOWED_OPERATION, + LOG_TOPIC("dfc30", DEBUG, Logger::TRANSACTIONS) << "transaction '" << tid << "' is in use"; + return res.reset(TRI_ERROR_LOCKED, std::string("transaction '") + std::to_string(tid) + "' is in use"); } + TRI_ASSERT(tryGuard.isLocked()); if (mtrx.type == MetaType::StandaloneAQL) { return res.reset(TRI_ERROR_TRANSACTION_DISALLOWED_OPERATION, @@ -646,7 +673,7 @@ Result Manager::updateTransaction(TRI_voc_tid_t tid, transaction::Status status, } } - if (mtrx.expired()) { + if (mtrx.expired() && status != transaction::Status::ABORTED) { status = transaction::Status::ABORTED; wasExpired = true; } @@ -739,10 +766,9 @@ bool Manager::garbageCollect(bool abortAll) { auto it = _transactions[bucket]._managed.begin(); while (it != _transactions[bucket]._managed.end()) { ManagedTrx& mtrx = it->second; - + if (mtrx.type == MetaType::Managed) { TRI_ASSERT(mtrx.state != nullptr); - if (abortAll || mtrx.expired()) { TRY_READ_LOCKER(tryGuard, mtrx.rwlock); // needs lock to access state @@ -774,12 +800,18 @@ bool Manager::garbageCollect(bool abortAll) { "transaction: '" << tid << "'"; Result res = updateTransaction(tid, Status::ABORTED, /*clearSrvs*/ true); - if (res.fail()) { + // updateTransaction can return TRI_ERROR_TRANSACTION_ABORTED when it + // successfully aborts, so ignore this error. + // we can also get the TRI_ERROR_LOCKED error in case we cannot + // immediately acquire the lock on the transaction. this _can_ happen + // infrequently, but is not an error + if (res.fail() && + !res.is(TRI_ERROR_TRANSACTION_ABORTED) && + !res.is(TRI_ERROR_LOCKED)) { LOG_TOPIC("0a07f", INFO, Logger::TRANSACTIONS) << "error while aborting " "transaction: '" << res.errorMessage() << "'"; } - didWork = true; } diff --git a/arangod/Transaction/Manager.h b/arangod/Transaction/Manager.h index c47bc5853f..3d77bea302 100644 --- a/arangod/Transaction/Manager.h +++ b/arangod/Transaction/Manager.h @@ -162,7 +162,10 @@ class Manager final { std::string const& username, bool fanout) const; private: - // hashes the transaction id into a bucket + /// @brief performs a status change on a transaction using a timeout + Result statusChangeWithTimeout(TRI_voc_tid_t tid, transaction::Status status); + + /// @brief hashes the transaction id into a bucket inline size_t getBucket(TRI_voc_tid_t tid) const { return std::hash()(tid) % numBuckets; } diff --git a/tests/Transaction/Manager-test.cpp b/tests/Transaction/Manager-test.cpp index 8c70579dde..cffa33c5b2 100644 --- a/tests/Transaction/Manager-test.cpp +++ b/tests/Transaction/Manager-test.cpp @@ -260,7 +260,7 @@ TEST_F(TransactionManagerTest, simple_transaction_and_commit_while_in_use) { OperationOptions opts; auto opRes = trx.insert(coll->name(), doc->slice(), opts); ASSERT_TRUE(opRes.ok()); - ASSERT_TRUE(mgr->commitManagedTrx(tid).is(TRI_ERROR_TRANSACTION_DISALLOWED_OPERATION)); + ASSERT_EQ(TRI_ERROR_LOCKED, mgr->commitManagedTrx(tid).errorNumber()); ASSERT_TRUE(trx.finish(opRes.result).ok()); } ASSERT_TRUE((mgr->getManagedTrxStatus(tid) == transaction::Status::RUNNING));