From bd5df4d49e3071d7de44996e4b8dc108af22d744 Mon Sep 17 00:00:00 2001 From: Simon Date: Wed, 21 Aug 2019 12:07:17 +0200 Subject: [PATCH] Enforce stricter transaction limits (#9740) (#9775) * Enforce stricter transaction limits (#9740) * changelog * make CHANGELOG entry more verbose --- CHANGELOG | 9 + .../ClusterTransactionCollection.cpp | 3 +- .../ClusterTransactionCollection.h | 3 - .../RestHandler/RestTransactionHandler.cpp | 4 +- arangod/Transaction/Manager.cpp | 240 ++++++++++-------- arangod/Transaction/Manager.h | 48 ++-- lib/Futures/Promise.h | 2 +- tests/js/client/shell/shell-transaction.js | 51 ++++ 8 files changed, 221 insertions(+), 139 deletions(-) diff --git a/CHANGELOG b/CHANGELOG index 2e1ca5d486..06e1e30fe8 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,15 @@ v3.5.1 (XXXX-XX-XX) ------------------- +* Changes the _idle_ timeout of stream transactions to 10 seconds and the total + per DB server size of stream transaction data to 128 MB. The idle timer is + restarted after every operation in a stream transaction, so it is not the + total timeout for the transaction. + + These limits were documented in the manual for stream transactions since 3.5.0, + but are enforced only as of 3.5.1. Enforcing the limits is useful to free up + resources from abandoned transactions. + * Consistently honor the return value of all attempts to queue tasks in the internal scheduler. diff --git a/arangod/ClusterEngine/ClusterTransactionCollection.cpp b/arangod/ClusterEngine/ClusterTransactionCollection.cpp index 93d60e8964..597404188d 100644 --- a/arangod/ClusterEngine/ClusterTransactionCollection.cpp +++ b/arangod/ClusterEngine/ClusterTransactionCollection.cpp @@ -35,8 +35,7 @@ ClusterTransactionCollection::ClusterTransactionCollection(TransactionState* trx TRI_voc_cid_t cid, AccessMode::Type accessType, int nestingLevel) - : TransactionCollection(trx, cid, accessType, nestingLevel), - _lockType(AccessMode::Type::NONE) {} + : TransactionCollection(trx, cid, accessType, nestingLevel) {} ClusterTransactionCollection::~ClusterTransactionCollection() {} diff --git a/arangod/ClusterEngine/ClusterTransactionCollection.h b/arangod/ClusterEngine/ClusterTransactionCollection.h index 8f96e31b41..f2a88a4a87 100644 --- a/arangod/ClusterEngine/ClusterTransactionCollection.h +++ b/arangod/ClusterEngine/ClusterTransactionCollection.h @@ -60,9 +60,6 @@ class ClusterTransactionCollection final : public TransactionCollection { /// @brief request an unlock for a collection int doUnlock(AccessMode::Type, int nestingLevel) override; - - private: - AccessMode::Type _lockType; // collection lock type, used for exclusive locks }; } // namespace arangodb diff --git a/arangod/RestHandler/RestTransactionHandler.cpp b/arangod/RestHandler/RestTransactionHandler.cpp index 2e1625d480..390dc3f06c 100644 --- a/arangod/RestHandler/RestTransactionHandler.cpp +++ b/arangod/RestHandler/RestTransactionHandler.cpp @@ -166,7 +166,7 @@ void RestTransactionHandler::executeBegin() { bool parseSuccess = false; - VPackSlice body = parseVPackBody(parseSuccess); + VPackSlice slice = parseVPackBody(parseSuccess); if (!parseSuccess) { // error message generated in parseVPackBody return; @@ -175,7 +175,7 @@ void RestTransactionHandler::executeBegin() { transaction::Manager* mgr = transaction::ManagerFeature::manager(); TRI_ASSERT(mgr != nullptr); - Result res = mgr->createManagedTrx(_vocbase, tid, body); + Result res = mgr->createManagedTrx(_vocbase, tid, slice); if (res.fail()) { generateError(res); } else { diff --git a/arangod/Transaction/Manager.cpp b/arangod/Transaction/Manager.cpp index b1934f4484..0cc68416f4 100644 --- a/arangod/Transaction/Manager.cpp +++ b/arangod/Transaction/Manager.cpp @@ -42,19 +42,20 @@ #include #include - namespace arangodb { namespace transaction { +const size_t Manager::maxTransactionSize; // 128 MiB + namespace { - struct MGMethods final : arangodb::transaction::Methods { - MGMethods(std::shared_ptr const& ctx, - arangodb::transaction::Options const& opts) - : Methods(ctx, opts) { - TRI_ASSERT(_state->isEmbeddedTransaction()); - } - }; -} +struct MGMethods final : arangodb::transaction::Methods { + MGMethods(std::shared_ptr const& ctx, + arangodb::transaction::Options const& opts) + : Methods(ctx, opts) { + TRI_ASSERT(_state->isEmbeddedTransaction()); + } +}; +} // namespace // register a list of failed transactions void Manager::registerFailedTransactions(std::unordered_set const& failedTransactions) { @@ -158,11 +159,29 @@ uint64_t Manager::getActiveTransactionCount() { return _nrRunning.load(std::memory_order_relaxed); } +Manager::ManagedTrx::ManagedTrx(MetaType t, TransactionState* st) + : type(t), + usedTimeSecs(TRI_microtime()), + state(st), + finalStatus(Status::UNDEFINED), + rwlock() {} + +bool Manager::ManagedTrx::expired() const { + double now = TRI_microtime(); + if (type == Manager::MetaType::Tombstone) { + return (now - usedTimeSecs) > tombstoneTTL; + } + + auto role = ServerState::instance()->getRole(); + if ((ServerState::isSingleServer(role) || ServerState::isCoordinator(role))) { + return (now - usedTimeSecs) > idleTTL; + } + return (now - usedTimeSecs) > idleTTLDBServer; +} + Manager::ManagedTrx::~ManagedTrx() { - if (type == MetaType::StandaloneAQL || - state == nullptr || - state->isEmbeddedTransaction()) { - return; + if (type == MetaType::StandaloneAQL || state == nullptr || state->isEmbeddedTransaction()) { + return; // not managed by us } if (!state->isRunning()) { delete state; @@ -171,8 +190,9 @@ Manager::ManagedTrx::~ManagedTrx() { try { transaction::Options opts; - auto ctx = std::make_shared(2, state, AccessMode::Type::NONE); - MGMethods trx(ctx, opts); // own state now + auto ctx = + std::make_shared(2, state, AccessMode::Type::NONE); + MGMethods trx(ctx, opts); // own state now trx.begin(); TRI_ASSERT(state->nestingLevel() == 1); state->decreaseNesting(); @@ -196,20 +216,20 @@ void Manager::registerAQLTrx(TransactionState* state) { TRI_ASSERT(state != nullptr); const size_t bucket = getBucket(state->id()); - READ_LOCKER(allTransactionsLocker, _allTransactionsLock); - WRITE_LOCKER(writeLocker, _transactions[bucket]._lock); - - auto& buck = _transactions[bucket]; - auto it = buck._managed.find(state->id()); - if (it != buck._managed.end()) { - THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_TRANSACTION_INTERNAL, - "transaction ID already used"); + { + READ_LOCKER(allTransactionsLocker, _allTransactionsLock); + WRITE_LOCKER(writeLocker, _transactions[bucket]._lock); + + auto& buck = _transactions[bucket]; + auto it = buck._managed.find(state->id()); + if (it != buck._managed.end()) { + THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_TRANSACTION_INTERNAL, + "transaction ID already used"); + } + + buck._managed.emplace(std::piecewise_construct, std::forward_as_tuple(state->id()), + std::forward_as_tuple(MetaType::StandaloneAQL, state)); } - - buck._managed.emplace(std::piecewise_construct, - std::forward_as_tuple(state->id()), - std::forward_as_tuple(MetaType::StandaloneAQL, state, - (defaultTTL + TRI_microtime()))); } void Manager::unregisterAQLTrx(TRI_voc_tid_t tid) noexcept { @@ -220,23 +240,26 @@ void Manager::unregisterAQLTrx(TRI_voc_tid_t tid) noexcept { auto& buck = _transactions[bucket]; auto it = buck._managed.find(tid); if (it == buck._managed.end()) { - LOG_TOPIC("92a49", ERR, Logger::TRANSACTIONS) << "a registered transaction was not found"; + LOG_TOPIC("92a49", ERR, Logger::TRANSACTIONS) + << "a registered transaction was not found"; TRI_ASSERT(false); return; } TRI_ASSERT(it->second.type == MetaType::StandaloneAQL); /// we need to make sure no-one else is still using the TransactionState - if (!it->second.rwlock.writeLock(/*maxAttempts*/256)) { - LOG_TOPIC("9f7d7", ERR, Logger::TRANSACTIONS) << "a transaction is still in use"; + if (!it->second.rwlock.writeLock(/*maxAttempts*/ 256)) { + LOG_TOPIC("9f7d7", ERR, Logger::TRANSACTIONS) + << "a transaction is still in use"; TRI_ASSERT(false); return; } - buck._managed.erase(it); // unlocking not necessary + + buck._managed.erase(it); // unlocking not necessary } -Result Manager::createManagedTrx(TRI_vocbase_t& vocbase, - TRI_voc_tid_t tid, VPackSlice const trxOpts) { +Result Manager::createManagedTrx(TRI_vocbase_t& vocbase, TRI_voc_tid_t tid, + VPackSlice const trxOpts) { Result res; if (_disallowInserts) { return res.reset(TRI_ERROR_SHUTTING_DOWN); @@ -280,18 +303,19 @@ Result Manager::createManagedTrx(TRI_vocbase_t& vocbase, fillColls(collections.get("write"), writes) && fillColls(collections.get("exclusive"), exclusives); if (!isValid) { - return res.reset(TRI_ERROR_BAD_PARAMETER, "invalid 'collections' attribute"); + return res.reset(TRI_ERROR_BAD_PARAMETER, + "invalid 'collections' attribute"); } - return createManagedTrx(vocbase, tid, reads, writes, exclusives, options); + return createManagedTrx(vocbase, tid, reads, writes, exclusives, std::move(options)); } - /// @brief create managed transaction +/// @brief create managed transaction Result Manager::createManagedTrx(TRI_vocbase_t& vocbase, TRI_voc_tid_t tid, std::vector const& readCollections, std::vector const& writeCollections, std::vector const& exclusiveCollections, - transaction::Options const& options) { + transaction::Options options) { Result res; if (_disallowInserts.load(std::memory_order_acquire)) { return res.reset(TRI_ERROR_SHUTTING_DOWN); @@ -299,16 +323,21 @@ Result Manager::createManagedTrx(TRI_vocbase_t& vocbase, TRI_voc_tid_t tid, const size_t bucket = getBucket(tid); - { // quick check whether ID exists + { // quick check whether ID exists READ_LOCKER(allTransactionsLocker, _allTransactionsLock); WRITE_LOCKER(writeLocker, _transactions[bucket]._lock); auto& buck = _transactions[bucket]; auto it = buck._managed.find(tid); if (it != buck._managed.end()) { - return res.reset(TRI_ERROR_TRANSACTION_INTERNAL, "transaction ID already used"); + return res.reset(TRI_ERROR_TRANSACTION_INTERNAL, + "transaction ID already used"); } } + // enforce size limit per DBServer + options.maxTransactionSize = + std::min(options.maxTransactionSize, Manager::maxTransactionSize); + std::unique_ptr state; try { // now start our own transaction @@ -334,9 +363,10 @@ Result Manager::createManagedTrx(TRI_vocbase_t& vocbase, TRI_voc_tid_t tid, if (cid == 0) { // not found res.reset(TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND, - std::string(TRI_errno_string(TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND)) + ":" + cname); + std::string(TRI_errno_string(TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND)) + + ":" + cname); } else { - res.reset(state->addCollection(cid, cname, mode, /*nestingLevel*/0, false)); + res.reset(state->addCollection(cid, cname, mode, /*nestingLevel*/ 0, false)); } if (res.fail()) { @@ -366,20 +396,19 @@ Result Manager::createManagedTrx(TRI_vocbase_t& vocbase, TRI_voc_tid_t tid, return res; } - { // add transaction to bucket + { // add transaction to bucket READ_LOCKER(allTransactionsLocker, _allTransactionsLock); WRITE_LOCKER(writeLocker, _transactions[bucket]._lock); auto it = _transactions[bucket]._managed.find(tid); if (it != _transactions[bucket]._managed.end()) { - return res.reset(TRI_ERROR_TRANSACTION_INTERNAL, "transaction ID already used"); + return res.reset(TRI_ERROR_TRANSACTION_INTERNAL, + "transaction ID already used"); } - double expires = defaultTTL + TRI_microtime(); - TRI_ASSERT(expires > 0); TRI_ASSERT(state->id() == tid); _transactions[bucket]._managed.emplace(std::piecewise_construct, std::forward_as_tuple(tid), - std::forward_as_tuple(MetaType::Managed, state.release(), - expires)); + std::forward_as_tuple(MetaType::Managed, + state.release())); } LOG_TOPIC("d6806", DEBUG, Logger::TRANSACTIONS) << "created managed trx '" << tid << "'"; @@ -408,22 +437,21 @@ std::shared_ptr Manager::leaseManagedTrx(TRI_voc_tid_t tid ManagedTrx& mtrx = it->second; if (mtrx.type == MetaType::Tombstone) { - return nullptr; // already committet this trx + return nullptr; // already committed this trx } if (AccessMode::isWriteOrExclusive(mode)) { if (mtrx.type == MetaType::StandaloneAQL) { - THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_TRANSACTION_DISALLOWED_OPERATION, - "not allowed to write lock an AQL transaction"); + THROW_ARANGO_EXCEPTION_MESSAGE( + TRI_ERROR_TRANSACTION_DISALLOWED_OPERATION, + "not allowed to write lock an AQL transaction"); } if (mtrx.rwlock.tryWriteLock()) { - mtrx.expires = defaultTTL + TRI_microtime(); state = mtrx.state; break; } } else { if (mtrx.rwlock.tryReadLock()) { - mtrx.expires = defaultTTL + TRI_microtime(); state = mtrx.state; break; } @@ -431,7 +459,7 @@ std::shared_ptr Manager::leaseManagedTrx(TRI_voc_tid_t tid "transaction is already in use"); } - writeLocker.unlock(); // failure; + writeLocker.unlock(); // failure; allTransactionsLocker.unlock(); std::this_thread::yield(); @@ -439,7 +467,7 @@ std::shared_ptr Manager::leaseManagedTrx(TRI_voc_tid_t tid LOG_TOPIC("9e972", DEBUG, Logger::TRANSACTIONS) << "waiting on trx lock " << tid; i = 0; if (application_features::ApplicationServer::isStopping()) { - return nullptr; // shutting down + return nullptr; // shutting down } } } while (true); @@ -449,7 +477,7 @@ std::shared_ptr Manager::leaseManagedTrx(TRI_voc_tid_t tid TRI_ASSERT(!AccessMode::isWriteOrExclusive(mode) || level == 1); return std::make_shared(tid, state, mode); } - TRI_ASSERT(false); // should be unreachable + TRI_ASSERT(false); // should be unreachable return nullptr; } @@ -460,7 +488,8 @@ void Manager::returnManagedTrx(TRI_voc_tid_t tid, AccessMode::Type mode) noexcep auto it = _transactions[bucket]._managed.find(tid); if (it == _transactions[bucket]._managed.end()) { - LOG_TOPIC("1d5b0", WARN, Logger::TRANSACTIONS) << "managed transaction was not found"; + LOG_TOPIC("1d5b0", WARN, Logger::TRANSACTIONS) + << "managed transaction was not found"; TRI_ASSERT(false); return; } @@ -471,13 +500,13 @@ void Manager::returnManagedTrx(TRI_voc_tid_t tid, AccessMode::Type mode) noexcep TRI_ASSERT(!AccessMode::isWriteOrExclusive(mode) || level == 0); // garbageCollection might soft abort used transactions - const bool isSoftAborted = it->second.expires == 0; + const bool isSoftAborted = it->second.usedTimeSecs == 0; if (!isSoftAborted) { - it->second.expires = defaultTTL + TRI_microtime(); + it->second.usedTimeSecs = TRI_microtime(); } if (AccessMode::isWriteOrExclusive(mode)) { it->second.rwlock.unlockWrite(); - } else if (mode == AccessMode::Type::READ){ + } else if (mode == AccessMode::Type::READ) { it->second.rwlock.unlockRead(); } else { TRI_ASSERT(false); @@ -503,7 +532,7 @@ transaction::Status Manager::getManagedTrxStatus(TRI_voc_tid_t tid) const { if (mtrx.type == MetaType::Tombstone) { return mtrx.finalStatus; - } else if (mtrx.expires > TRI_microtime() && mtrx.state != nullptr) { + } else if (!mtrx.expired() && mtrx.state != nullptr) { return transaction::Status::RUNNING; } else { return transaction::Status::ABORTED; @@ -518,14 +547,13 @@ Result Manager::abortManagedTrx(TRI_voc_tid_t tid) { return updateTransaction(tid, transaction::Status::ABORTED, false); } -Result Manager::updateTransaction(TRI_voc_tid_t tid, - transaction::Status status, +Result Manager::updateTransaction(TRI_voc_tid_t tid, transaction::Status status, bool clearServers) { TRI_ASSERT(status == transaction::Status::COMMITTED || status == transaction::Status::ABORTED); - LOG_TOPIC("7bd2f", DEBUG, Logger::TRANSACTIONS) << "managed trx '" << tid - << " updating to '" << status << "'"; + LOG_TOPIC("7bd2f", DEBUG, Logger::TRANSACTIONS) + << "managed trx '" << tid << " updating to '" << status << "'"; Result res; const size_t bucket = getBucket(tid); @@ -555,9 +583,9 @@ Result Manager::updateTransaction(TRI_voc_tid_t tid, } else if (mtrx.type == MetaType::Tombstone) { TRI_ASSERT(mtrx.state == nullptr); // make sure everyone who asks gets the updated timestamp - mtrx.expires = TRI_microtime() + tombstoneTTL; + mtrx.usedTimeSecs = TRI_microtime(); if (mtrx.finalStatus == status) { - return res; // all good + return res; // all good } else { std::string msg("transaction was already "); msg.append(statusString(mtrx.finalStatus)); @@ -565,8 +593,7 @@ Result Manager::updateTransaction(TRI_voc_tid_t tid, } } - double now = TRI_microtime(); - if (mtrx.expires < now) { + if (mtrx.expired()) { status = transaction::Status::ABORTED; wasExpired = true; } @@ -574,13 +601,13 @@ Result Manager::updateTransaction(TRI_voc_tid_t tid, state.reset(mtrx.state); mtrx.state = nullptr; mtrx.type = MetaType::Tombstone; - mtrx.expires = now + tombstoneTTL; + mtrx.usedTimeSecs = TRI_microtime(); mtrx.finalStatus = status; // it is sufficient to pretend that the operation already succeeded } TRI_ASSERT(state); - if (!state) { // this should never happen + if (!state) { // this should never happen return res.reset(TRI_ERROR_INTERNAL, "managed trx in an invalid state"); } @@ -593,13 +620,14 @@ Result Manager::updateTransaction(TRI_voc_tid_t tid, it->second.finalStatus = Status::ABORTED; } }; - if (!state->isRunning()) { // this also should not happen + if (!state->isRunning()) { // this also should not happen abortTombstone(); - return res.reset(TRI_ERROR_TRANSACTION_ABORTED, "transaction was not running"); + return res.reset(TRI_ERROR_TRANSACTION_ABORTED, + "transaction was not running"); } auto ctx = std::make_shared(tid, state.get(), AccessMode::Type::NONE); - state.release(); // now owned by ctx + state.release(); // now owned by ctx transaction::Options trxOpts; MGMethods trx(ctx, trxOpts); @@ -612,7 +640,7 @@ Result Manager::updateTransaction(TRI_voc_tid_t tid, } if (status == transaction::Status::COMMITTED) { res = trx.commit(); - if (res.fail()) { // set final status to aborted + if (res.fail()) { // set final status to aborted abortTombstone(); } } else { @@ -627,10 +655,9 @@ Result Manager::updateTransaction(TRI_voc_tid_t tid, } /// @brief calls the callback function for each managed transaction -void Manager::iterateManagedTrx( - std::function const& callback) const { +void Manager::iterateManagedTrx(std::function const& callback) const { READ_LOCKER(allTransactionsLocker, _allTransactionsLock); - + // iterate over all active transactions for (size_t bucket = 0; bucket < numBuckets; ++bucket) { READ_LOCKER(locker, _transactions[bucket]._lock); @@ -655,35 +682,33 @@ bool Manager::garbageCollect(bool abortAll) { for (size_t bucket = 0; bucket < numBuckets; ++bucket) { WRITE_LOCKER(locker, _transactions[bucket]._lock); - double now = TRI_microtime(); - auto it = _transactions[bucket]._managed.begin(); + auto it = _transactions[bucket]._managed.begin(); while (it != _transactions[bucket]._managed.end()) { ManagedTrx& mtrx = it->second; if (mtrx.type == MetaType::Managed) { TRI_ASSERT(mtrx.state != nullptr); - if (abortAll || mtrx.expires < now) { - TRY_READ_LOCKER(tryGuard, mtrx.rwlock); // needs lock to access state + if (abortAll || mtrx.expired()) { + TRY_READ_LOCKER(tryGuard, mtrx.rwlock); // needs lock to access state if (tryGuard.isLocked()) { TRI_ASSERT(mtrx.state->isRunning() && mtrx.state->isTopLevelTransaction()); TRI_ASSERT(it->first == mtrx.state->id()); toAbort.emplace_back(mtrx.state->id()); - } else if (abortAll) { // transaction is in - mtrx.expires = 0; // soft-abort transaction + } else if (abortAll) { // transaction is in + mtrx.usedTimeSecs = 0; // soft-abort transaction didWork = true; } } - } else if (mtrx.type == MetaType::StandaloneAQL && mtrx.expires < now) { + } else if (mtrx.type == MetaType::StandaloneAQL && mtrx.expired()) { LOG_TOPIC("7ad3f", INFO, Logger::TRANSACTIONS) - << "expired AQL query transaction '" << it->first << "'"; - } else if (mtrx.type == MetaType::Tombstone && mtrx.expires < now) { + << "expired AQL query transaction '" << it->first << "'"; + } else if (mtrx.type == MetaType::Tombstone && mtrx.expired()) { TRI_ASSERT(mtrx.state == nullptr); TRI_ASSERT(mtrx.finalStatus != transaction::Status::UNDEFINED); it = _transactions[bucket]._managed.erase(it); - continue; } @@ -693,19 +718,21 @@ bool Manager::garbageCollect(bool abortAll) { for (TRI_voc_tid_t tid : toAbort) { LOG_TOPIC("6fbaf", DEBUG, Logger::TRANSACTIONS) << "garbage collecting " - "transaction: '" << tid << "'"; - Result res = updateTransaction(tid, Status::ABORTED, /*clearSrvs*/true); + "transaction: '" + << tid << "'"; + Result res = updateTransaction(tid, Status::ABORTED, /*clearSrvs*/ true); if (res.fail()) { - LOG_TOPIC("0a07f", INFO, Logger::TRANSACTIONS) << "error while aborting " - "transaction: '" << res.errorMessage() << "'"; + "transaction: '" + << res.errorMessage() << "'"; } didWork = true; } if (didWork) { - LOG_TOPIC("e5b31", INFO, Logger::TRANSACTIONS) << "aborted expired transactions"; + LOG_TOPIC("e5b31", INFO, Logger::TRANSACTIONS) + << "aborted expired transactions"; } return didWork; @@ -713,7 +740,6 @@ bool Manager::garbageCollect(bool abortAll) { /// @brief abort all transactions matching bool Manager::abortManagedTrx(std::function cb) { - SmallVector::allocator_type::arena_type arena; SmallVector toAbort{arena}; @@ -723,11 +749,10 @@ bool Manager::abortManagedTrx(std::function cb) { auto it = _transactions[bucket]._managed.begin(); while (it != _transactions[bucket]._managed.end()) { - ManagedTrx& mtrx = it->second; if (mtrx.type == MetaType::Managed) { TRI_ASSERT(mtrx.state != nullptr); - TRY_READ_LOCKER(tryGuard, mtrx.rwlock); // needs lock to access state + TRY_READ_LOCKER(tryGuard, mtrx.rwlock); // needs lock to access state if (tryGuard.isLocked() && cb(*mtrx.state)) { toAbort.emplace_back(it->first); } @@ -738,19 +763,18 @@ bool Manager::abortManagedTrx(std::function cb) { } for (TRI_voc_tid_t tid : toAbort) { - Result res = updateTransaction(tid, Status::ABORTED, /*clearSrvs*/true); + Result res = updateTransaction(tid, Status::ABORTED, /*clearSrvs*/ true); if (res.fail()) { LOG_TOPIC("2bf48", INFO, Logger::TRANSACTIONS) << "error aborting " - "transaction: '" << res.errorMessage() << "'"; + "transaction: '" + << res.errorMessage() << "'"; } } return !toAbort.empty(); } -void Manager::toVelocyPack(VPackBuilder& builder, - std::string const& database, - std::string const& username, - bool fanout) const { +void Manager::toVelocyPack(VPackBuilder& builder, std::string const& database, + std::string const& username, bool fanout) const { TRI_ASSERT(!builder.isClosed()); if (fanout) { @@ -759,7 +783,7 @@ void Manager::toVelocyPack(VPackBuilder& builder, if (ci == nullptr) { THROW_ARANGO_EXCEPTION(TRI_ERROR_SHUTTING_DOWN); } - + std::shared_ptr cc = ClusterComm::instance(); if (cc == nullptr) { THROW_ARANGO_EXCEPTION(TRI_ERROR_SHUTTING_DOWN); @@ -767,13 +791,13 @@ void Manager::toVelocyPack(VPackBuilder& builder, std::vector requests; auto auth = AuthenticationFeature::instance(); - + for (auto const& coordinator : ci->getCurrentCoordinators()) { if (coordinator == ServerState::instance()->getId()) { // ourselves! continue; } - + auto headers = std::make_unique>(); if (auth != nullptr && auth->isActive()) { // when in superuser mode, username is empty @@ -786,18 +810,18 @@ void Manager::toVelocyPack(VPackBuilder& builder, } VPackSlice slice = builder.slice(); headers->emplace(StaticStrings::Authorization, - "bearer " + auth->tokenCache().generateJwt(slice)); + "bearer " + auth->tokenCache().generateJwt(slice)); } } requests.emplace_back("server:" + coordinator, rest::RequestType::GET, - "/_db/" + database + "/_api/transaction?local=true", + "/_db/" + database + "/_api/transaction?local=true", std::make_shared(), std::move(headers)); } if (!requests.empty()) { size_t nrGood = cc->performRequests(requests, 30.0, Logger::COMMUNICATION, false); - + if (nrGood != requests.size()) { THROW_ARANGO_EXCEPTION(TRI_ERROR_CLUSTER_BACKEND_UNAVAILABLE); } diff --git a/arangod/Transaction/Manager.h b/arangod/Transaction/Manager.h index 160ab00aa5..4feac9a46b 100644 --- a/arangod/Transaction/Manager.h +++ b/arangod/Transaction/Manager.h @@ -47,33 +47,36 @@ struct TransactionData { namespace velocypack { class Builder; class Slice; -} +} // namespace velocypack namespace transaction { class Context; struct Options; -/// @bried Tracks TransasctionState instances +/// @brief Tracks TransasctionState instances class Manager final { static constexpr size_t numBuckets = 16; - static constexpr double defaultTTL = 10.0 * 60.0; // 10 minutes - static constexpr double tombstoneTTL = 5.0 * 60.0; // 5 minutes - + static constexpr double idleTTL = 10.0; // 10 seconds + static constexpr double idleTTLDBServer = 3 * 60.0; // 3 minutes + static constexpr double tombstoneTTL = 10.0 * 60.0; // 10 minutes + static constexpr size_t maxTransactionSize = 128 * 1024 * 1024; // 128 MiB + enum class MetaType : uint8_t { - Managed = 1, /// global single shard db transaction + Managed = 1, /// global single shard db transaction StandaloneAQL = 2, /// used for a standalone transaction (AQL standalone) Tombstone = 3 /// used to ensure we can acknowledge double commits / aborts }; - + struct ManagedTrx { - ManagedTrx(MetaType t, TransactionState* st, double ex) - : type(t), expires(ex), state(st), finalStatus(Status::UNDEFINED), - rwlock() {} + ManagedTrx(MetaType t, TransactionState* st); ~ManagedTrx(); - - MetaType type; - double expires; /// expiration timestamp, if 0 it expires immediately - TransactionState* state; /// Transaction, may be nullptr + + bool expired() const; + + public: + MetaType type; /// managed, AQL or tombstone + double usedTimeSecs; /// last time used + TransactionState* state; /// Transaction, may be nullptr /// @brief final TRX state that is valid if this is a tombstone /// necessary to avoid getting error on a 'diamond' commit or accidantally /// repeated commit / abort messages @@ -81,7 +84,7 @@ class Manager final { /// cheap usage lock for *state mutable basics::ReadWriteSpinLock rwlock; }; - + public: typedef std::function TrxCallback; @@ -130,8 +133,8 @@ class Manager final { std::vector const& readCollections, std::vector const& writeCollections, std::vector const& exclusiveCollections, - transaction::Options const& options); - + transaction::Options options); + /// @brief lease the transaction, increases nesting std::shared_ptr leaseManagedTrx(TRI_voc_tid_t tid, AccessMode::Type mode); @@ -153,7 +156,7 @@ class Manager final { /// the array must be opened already. /// will use database and username to fan-out the request to the other /// coordinators in a cluster - void toVelocyPack(arangodb::velocypack::Builder& builder, + void toVelocyPack(arangodb::velocypack::Builder& builder, std::string const& database, std::string const& username, bool fanout) const; @@ -162,14 +165,13 @@ class Manager final { inline size_t getBucket(TRI_voc_tid_t tid) const { return std::hash()(tid) % numBuckets; } - - Result updateTransaction(TRI_voc_tid_t tid, transaction::Status status, - bool clearServers); + + Result updateTransaction(TRI_voc_tid_t tid, transaction::Status status, bool clearServers); /// @brief calls the callback function for each managed transaction void iterateManagedTrx(std::function const&) const; - - private: + + /// @brief will be true only for MMFiles bool const _keepTransactionData; // a lock protecting ALL buckets in _transactions diff --git a/lib/Futures/Promise.h b/lib/Futures/Promise.h index ec926d4afc..d1b5c98826 100644 --- a/lib/Futures/Promise.h +++ b/lib/Futures/Promise.h @@ -115,7 +115,7 @@ class Promise { arangodb::futures::Future getFuture(); private: - Promise(detail::SharedState* state) : _state(state), _retrieved(false) {} + explicit Promise(detail::SharedState* state) : _state(state), _retrieved(false) {} // convenience method that checks if _state is set inline detail::SharedState& getState() { diff --git a/tests/js/client/shell/shell-transaction.js b/tests/js/client/shell/shell-transaction.js index 6ee1df9f97..f45cb6a3e9 100644 --- a/tests/js/client/shell/shell-transaction.js +++ b/tests/js/client/shell/shell-transaction.js @@ -3804,6 +3804,56 @@ function transactionAQLStreamSuite () { }; } + +// ////////////////////////////////////////////////////////////////////////////// +// / @brief test suite +// ////////////////////////////////////////////////////////////////////////////// + +function transactionTTLStreamSuite () { + 'use strict'; + const cn = 'UnitTestsTransaction'; + let c; + + return { + + // ////////////////////////////////////////////////////////////////////////////// + // / @brief set up + // ////////////////////////////////////////////////////////////////////////////// + + setUp: function () { + db._drop(cn); + c = db._create(cn, {numberOfShards: 2, replicationFactor: 2}); + }, + + // ////////////////////////////////////////////////////////////////////////////// + // / @brief tear down + // ////////////////////////////////////////////////////////////////////////////// + + tearDown: function () { + db._drop(cn); + }, + + + // ////////////////////////////////////////////////////////////////////////////// + // / @brief test: abort idle transactions + // ////////////////////////////////////////////////////////////////////////////// + + testAbortIdleTrx: function () { + let trx = db._createTransaction({ + collections: { read: cn } + }); + + internal.sleep(12); + try { + trx.collection(cn).save({key:'val'}); + fail(); + } catch (err) { + assertEqual(internal.errors.ERROR_TRANSACTION_NOT_FOUND.code, err.errorNum); + } + } + }; +} + // ////////////////////////////////////////////////////////////////////////////// // / @brief executes the test suites // ////////////////////////////////////////////////////////////////////////////// @@ -3818,5 +3868,6 @@ jsunity.run(transactionCountSuite); jsunity.run(transactionCrossCollectionSuite); jsunity.run(transactionTraversalSuite); jsunity.run(transactionAQLStreamSuite); +jsunity.run(transactionTTLStreamSuite); return jsunity.done();