1
0
Fork 0

Enforce stricter transaction limits (#9740) (#9775)

* Enforce stricter transaction limits (#9740)

* changelog

* make CHANGELOG entry more verbose
This commit is contained in:
Simon 2019-08-21 12:07:17 +02:00 committed by KVS85
parent 663212ba19
commit bd5df4d49e
8 changed files with 221 additions and 139 deletions

View File

@ -1,6 +1,15 @@
v3.5.1 (XXXX-XX-XX)
-------------------
* Changes the _idle_ timeout of stream transactions to 10 seconds and the total
per DB server size of stream transaction data to 128 MB. The idle timer is
restarted after every operation in a stream transaction, so it is not the
total timeout for the transaction.
These limits were documented in the manual for stream transactions since 3.5.0,
but are enforced only as of 3.5.1. Enforcing the limits is useful to free up
resources from abandoned transactions.
* Consistently honor the return value of all attempts to queue tasks in the
internal scheduler.

View File

@ -35,8 +35,7 @@ ClusterTransactionCollection::ClusterTransactionCollection(TransactionState* trx
TRI_voc_cid_t cid,
AccessMode::Type accessType,
int nestingLevel)
: TransactionCollection(trx, cid, accessType, nestingLevel),
_lockType(AccessMode::Type::NONE) {}
: TransactionCollection(trx, cid, accessType, nestingLevel) {}
ClusterTransactionCollection::~ClusterTransactionCollection() {}

View File

@ -60,9 +60,6 @@ class ClusterTransactionCollection final : public TransactionCollection {
/// @brief request an unlock for a collection
int doUnlock(AccessMode::Type, int nestingLevel) override;
private:
AccessMode::Type _lockType; // collection lock type, used for exclusive locks
};
} // namespace arangodb

View File

@ -166,7 +166,7 @@ void RestTransactionHandler::executeBegin() {
bool parseSuccess = false;
VPackSlice body = parseVPackBody(parseSuccess);
VPackSlice slice = parseVPackBody(parseSuccess);
if (!parseSuccess) {
// error message generated in parseVPackBody
return;
@ -175,7 +175,7 @@ void RestTransactionHandler::executeBegin() {
transaction::Manager* mgr = transaction::ManagerFeature::manager();
TRI_ASSERT(mgr != nullptr);
Result res = mgr->createManagedTrx(_vocbase, tid, body);
Result res = mgr->createManagedTrx(_vocbase, tid, slice);
if (res.fail()) {
generateError(res);
} else {

View File

@ -42,19 +42,20 @@
#include <velocypack/Iterator.h>
#include <velocypack/velocypack-aliases.h>
namespace arangodb {
namespace transaction {
const size_t Manager::maxTransactionSize; // 128 MiB
namespace {
struct MGMethods final : arangodb::transaction::Methods {
MGMethods(std::shared_ptr<arangodb::transaction::Context> const& ctx,
arangodb::transaction::Options const& opts)
: Methods(ctx, opts) {
TRI_ASSERT(_state->isEmbeddedTransaction());
}
};
}
struct MGMethods final : arangodb::transaction::Methods {
MGMethods(std::shared_ptr<arangodb::transaction::Context> const& ctx,
arangodb::transaction::Options const& opts)
: Methods(ctx, opts) {
TRI_ASSERT(_state->isEmbeddedTransaction());
}
};
} // namespace
// register a list of failed transactions
void Manager::registerFailedTransactions(std::unordered_set<TRI_voc_tid_t> const& failedTransactions) {
@ -158,11 +159,29 @@ uint64_t Manager::getActiveTransactionCount() {
return _nrRunning.load(std::memory_order_relaxed);
}
Manager::ManagedTrx::ManagedTrx(MetaType t, TransactionState* st)
: type(t),
usedTimeSecs(TRI_microtime()),
state(st),
finalStatus(Status::UNDEFINED),
rwlock() {}
bool Manager::ManagedTrx::expired() const {
double now = TRI_microtime();
if (type == Manager::MetaType::Tombstone) {
return (now - usedTimeSecs) > tombstoneTTL;
}
auto role = ServerState::instance()->getRole();
if ((ServerState::isSingleServer(role) || ServerState::isCoordinator(role))) {
return (now - usedTimeSecs) > idleTTL;
}
return (now - usedTimeSecs) > idleTTLDBServer;
}
Manager::ManagedTrx::~ManagedTrx() {
if (type == MetaType::StandaloneAQL ||
state == nullptr ||
state->isEmbeddedTransaction()) {
return;
if (type == MetaType::StandaloneAQL || state == nullptr || state->isEmbeddedTransaction()) {
return; // not managed by us
}
if (!state->isRunning()) {
delete state;
@ -171,8 +190,9 @@ Manager::ManagedTrx::~ManagedTrx() {
try {
transaction::Options opts;
auto ctx = std::make_shared<transaction::ManagedContext>(2, state, AccessMode::Type::NONE);
MGMethods trx(ctx, opts); // own state now
auto ctx =
std::make_shared<transaction::ManagedContext>(2, state, AccessMode::Type::NONE);
MGMethods trx(ctx, opts); // own state now
trx.begin();
TRI_ASSERT(state->nestingLevel() == 1);
state->decreaseNesting();
@ -196,20 +216,20 @@ void Manager::registerAQLTrx(TransactionState* state) {
TRI_ASSERT(state != nullptr);
const size_t bucket = getBucket(state->id());
READ_LOCKER(allTransactionsLocker, _allTransactionsLock);
WRITE_LOCKER(writeLocker, _transactions[bucket]._lock);
auto& buck = _transactions[bucket];
auto it = buck._managed.find(state->id());
if (it != buck._managed.end()) {
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_TRANSACTION_INTERNAL,
"transaction ID already used");
{
READ_LOCKER(allTransactionsLocker, _allTransactionsLock);
WRITE_LOCKER(writeLocker, _transactions[bucket]._lock);
auto& buck = _transactions[bucket];
auto it = buck._managed.find(state->id());
if (it != buck._managed.end()) {
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_TRANSACTION_INTERNAL,
"transaction ID already used");
}
buck._managed.emplace(std::piecewise_construct, std::forward_as_tuple(state->id()),
std::forward_as_tuple(MetaType::StandaloneAQL, state));
}
buck._managed.emplace(std::piecewise_construct,
std::forward_as_tuple(state->id()),
std::forward_as_tuple(MetaType::StandaloneAQL, state,
(defaultTTL + TRI_microtime())));
}
void Manager::unregisterAQLTrx(TRI_voc_tid_t tid) noexcept {
@ -220,23 +240,26 @@ void Manager::unregisterAQLTrx(TRI_voc_tid_t tid) noexcept {
auto& buck = _transactions[bucket];
auto it = buck._managed.find(tid);
if (it == buck._managed.end()) {
LOG_TOPIC("92a49", ERR, Logger::TRANSACTIONS) << "a registered transaction was not found";
LOG_TOPIC("92a49", ERR, Logger::TRANSACTIONS)
<< "a registered transaction was not found";
TRI_ASSERT(false);
return;
}
TRI_ASSERT(it->second.type == MetaType::StandaloneAQL);
/// we need to make sure no-one else is still using the TransactionState
if (!it->second.rwlock.writeLock(/*maxAttempts*/256)) {
LOG_TOPIC("9f7d7", ERR, Logger::TRANSACTIONS) << "a transaction is still in use";
if (!it->second.rwlock.writeLock(/*maxAttempts*/ 256)) {
LOG_TOPIC("9f7d7", ERR, Logger::TRANSACTIONS)
<< "a transaction is still in use";
TRI_ASSERT(false);
return;
}
buck._managed.erase(it); // unlocking not necessary
buck._managed.erase(it); // unlocking not necessary
}
Result Manager::createManagedTrx(TRI_vocbase_t& vocbase,
TRI_voc_tid_t tid, VPackSlice const trxOpts) {
Result Manager::createManagedTrx(TRI_vocbase_t& vocbase, TRI_voc_tid_t tid,
VPackSlice const trxOpts) {
Result res;
if (_disallowInserts) {
return res.reset(TRI_ERROR_SHUTTING_DOWN);
@ -280,18 +303,19 @@ Result Manager::createManagedTrx(TRI_vocbase_t& vocbase,
fillColls(collections.get("write"), writes) &&
fillColls(collections.get("exclusive"), exclusives);
if (!isValid) {
return res.reset(TRI_ERROR_BAD_PARAMETER, "invalid 'collections' attribute");
return res.reset(TRI_ERROR_BAD_PARAMETER,
"invalid 'collections' attribute");
}
return createManagedTrx(vocbase, tid, reads, writes, exclusives, options);
return createManagedTrx(vocbase, tid, reads, writes, exclusives, std::move(options));
}
/// @brief create managed transaction
/// @brief create managed transaction
Result Manager::createManagedTrx(TRI_vocbase_t& vocbase, TRI_voc_tid_t tid,
std::vector<std::string> const& readCollections,
std::vector<std::string> const& writeCollections,
std::vector<std::string> const& exclusiveCollections,
transaction::Options const& options) {
transaction::Options options) {
Result res;
if (_disallowInserts.load(std::memory_order_acquire)) {
return res.reset(TRI_ERROR_SHUTTING_DOWN);
@ -299,16 +323,21 @@ Result Manager::createManagedTrx(TRI_vocbase_t& vocbase, TRI_voc_tid_t tid,
const size_t bucket = getBucket(tid);
{ // quick check whether ID exists
{ // quick check whether ID exists
READ_LOCKER(allTransactionsLocker, _allTransactionsLock);
WRITE_LOCKER(writeLocker, _transactions[bucket]._lock);
auto& buck = _transactions[bucket];
auto it = buck._managed.find(tid);
if (it != buck._managed.end()) {
return res.reset(TRI_ERROR_TRANSACTION_INTERNAL, "transaction ID already used");
return res.reset(TRI_ERROR_TRANSACTION_INTERNAL,
"transaction ID already used");
}
}
// enforce size limit per DBServer
options.maxTransactionSize =
std::min<size_t>(options.maxTransactionSize, Manager::maxTransactionSize);
std::unique_ptr<TransactionState> state;
try {
// now start our own transaction
@ -334,9 +363,10 @@ Result Manager::createManagedTrx(TRI_vocbase_t& vocbase, TRI_voc_tid_t tid,
if (cid == 0) {
// not found
res.reset(TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND,
std::string(TRI_errno_string(TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND)) + ":" + cname);
std::string(TRI_errno_string(TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND)) +
":" + cname);
} else {
res.reset(state->addCollection(cid, cname, mode, /*nestingLevel*/0, false));
res.reset(state->addCollection(cid, cname, mode, /*nestingLevel*/ 0, false));
}
if (res.fail()) {
@ -366,20 +396,19 @@ Result Manager::createManagedTrx(TRI_vocbase_t& vocbase, TRI_voc_tid_t tid,
return res;
}
{ // add transaction to bucket
{ // add transaction to bucket
READ_LOCKER(allTransactionsLocker, _allTransactionsLock);
WRITE_LOCKER(writeLocker, _transactions[bucket]._lock);
auto it = _transactions[bucket]._managed.find(tid);
if (it != _transactions[bucket]._managed.end()) {
return res.reset(TRI_ERROR_TRANSACTION_INTERNAL, "transaction ID already used");
return res.reset(TRI_ERROR_TRANSACTION_INTERNAL,
"transaction ID already used");
}
double expires = defaultTTL + TRI_microtime();
TRI_ASSERT(expires > 0);
TRI_ASSERT(state->id() == tid);
_transactions[bucket]._managed.emplace(std::piecewise_construct,
std::forward_as_tuple(tid),
std::forward_as_tuple(MetaType::Managed, state.release(),
expires));
std::forward_as_tuple(MetaType::Managed,
state.release()));
}
LOG_TOPIC("d6806", DEBUG, Logger::TRANSACTIONS) << "created managed trx '" << tid << "'";
@ -408,22 +437,21 @@ std::shared_ptr<transaction::Context> Manager::leaseManagedTrx(TRI_voc_tid_t tid
ManagedTrx& mtrx = it->second;
if (mtrx.type == MetaType::Tombstone) {
return nullptr; // already committet this trx
return nullptr; // already committed this trx
}
if (AccessMode::isWriteOrExclusive(mode)) {
if (mtrx.type == MetaType::StandaloneAQL) {
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_TRANSACTION_DISALLOWED_OPERATION,
"not allowed to write lock an AQL transaction");
THROW_ARANGO_EXCEPTION_MESSAGE(
TRI_ERROR_TRANSACTION_DISALLOWED_OPERATION,
"not allowed to write lock an AQL transaction");
}
if (mtrx.rwlock.tryWriteLock()) {
mtrx.expires = defaultTTL + TRI_microtime();
state = mtrx.state;
break;
}
} else {
if (mtrx.rwlock.tryReadLock()) {
mtrx.expires = defaultTTL + TRI_microtime();
state = mtrx.state;
break;
}
@ -431,7 +459,7 @@ std::shared_ptr<transaction::Context> Manager::leaseManagedTrx(TRI_voc_tid_t tid
"transaction is already in use");
}
writeLocker.unlock(); // failure;
writeLocker.unlock(); // failure;
allTransactionsLocker.unlock();
std::this_thread::yield();
@ -439,7 +467,7 @@ std::shared_ptr<transaction::Context> Manager::leaseManagedTrx(TRI_voc_tid_t tid
LOG_TOPIC("9e972", DEBUG, Logger::TRANSACTIONS) << "waiting on trx lock " << tid;
i = 0;
if (application_features::ApplicationServer::isStopping()) {
return nullptr; // shutting down
return nullptr; // shutting down
}
}
} while (true);
@ -449,7 +477,7 @@ std::shared_ptr<transaction::Context> Manager::leaseManagedTrx(TRI_voc_tid_t tid
TRI_ASSERT(!AccessMode::isWriteOrExclusive(mode) || level == 1);
return std::make_shared<ManagedContext>(tid, state, mode);
}
TRI_ASSERT(false); // should be unreachable
TRI_ASSERT(false); // should be unreachable
return nullptr;
}
@ -460,7 +488,8 @@ void Manager::returnManagedTrx(TRI_voc_tid_t tid, AccessMode::Type mode) noexcep
auto it = _transactions[bucket]._managed.find(tid);
if (it == _transactions[bucket]._managed.end()) {
LOG_TOPIC("1d5b0", WARN, Logger::TRANSACTIONS) << "managed transaction was not found";
LOG_TOPIC("1d5b0", WARN, Logger::TRANSACTIONS)
<< "managed transaction was not found";
TRI_ASSERT(false);
return;
}
@ -471,13 +500,13 @@ void Manager::returnManagedTrx(TRI_voc_tid_t tid, AccessMode::Type mode) noexcep
TRI_ASSERT(!AccessMode::isWriteOrExclusive(mode) || level == 0);
// garbageCollection might soft abort used transactions
const bool isSoftAborted = it->second.expires == 0;
const bool isSoftAborted = it->second.usedTimeSecs == 0;
if (!isSoftAborted) {
it->second.expires = defaultTTL + TRI_microtime();
it->second.usedTimeSecs = TRI_microtime();
}
if (AccessMode::isWriteOrExclusive(mode)) {
it->second.rwlock.unlockWrite();
} else if (mode == AccessMode::Type::READ){
} else if (mode == AccessMode::Type::READ) {
it->second.rwlock.unlockRead();
} else {
TRI_ASSERT(false);
@ -503,7 +532,7 @@ transaction::Status Manager::getManagedTrxStatus(TRI_voc_tid_t tid) const {
if (mtrx.type == MetaType::Tombstone) {
return mtrx.finalStatus;
} else if (mtrx.expires > TRI_microtime() && mtrx.state != nullptr) {
} else if (!mtrx.expired() && mtrx.state != nullptr) {
return transaction::Status::RUNNING;
} else {
return transaction::Status::ABORTED;
@ -518,14 +547,13 @@ Result Manager::abortManagedTrx(TRI_voc_tid_t tid) {
return updateTransaction(tid, transaction::Status::ABORTED, false);
}
Result Manager::updateTransaction(TRI_voc_tid_t tid,
transaction::Status status,
Result Manager::updateTransaction(TRI_voc_tid_t tid, transaction::Status status,
bool clearServers) {
TRI_ASSERT(status == transaction::Status::COMMITTED ||
status == transaction::Status::ABORTED);
LOG_TOPIC("7bd2f", DEBUG, Logger::TRANSACTIONS) << "managed trx '" << tid
<< " updating to '" << status << "'";
LOG_TOPIC("7bd2f", DEBUG, Logger::TRANSACTIONS)
<< "managed trx '" << tid << " updating to '" << status << "'";
Result res;
const size_t bucket = getBucket(tid);
@ -555,9 +583,9 @@ Result Manager::updateTransaction(TRI_voc_tid_t tid,
} else if (mtrx.type == MetaType::Tombstone) {
TRI_ASSERT(mtrx.state == nullptr);
// make sure everyone who asks gets the updated timestamp
mtrx.expires = TRI_microtime() + tombstoneTTL;
mtrx.usedTimeSecs = TRI_microtime();
if (mtrx.finalStatus == status) {
return res; // all good
return res; // all good
} else {
std::string msg("transaction was already ");
msg.append(statusString(mtrx.finalStatus));
@ -565,8 +593,7 @@ Result Manager::updateTransaction(TRI_voc_tid_t tid,
}
}
double now = TRI_microtime();
if (mtrx.expires < now) {
if (mtrx.expired()) {
status = transaction::Status::ABORTED;
wasExpired = true;
}
@ -574,13 +601,13 @@ Result Manager::updateTransaction(TRI_voc_tid_t tid,
state.reset(mtrx.state);
mtrx.state = nullptr;
mtrx.type = MetaType::Tombstone;
mtrx.expires = now + tombstoneTTL;
mtrx.usedTimeSecs = TRI_microtime();
mtrx.finalStatus = status;
// it is sufficient to pretend that the operation already succeeded
}
TRI_ASSERT(state);
if (!state) { // this should never happen
if (!state) { // this should never happen
return res.reset(TRI_ERROR_INTERNAL, "managed trx in an invalid state");
}
@ -593,13 +620,14 @@ Result Manager::updateTransaction(TRI_voc_tid_t tid,
it->second.finalStatus = Status::ABORTED;
}
};
if (!state->isRunning()) { // this also should not happen
if (!state->isRunning()) { // this also should not happen
abortTombstone();
return res.reset(TRI_ERROR_TRANSACTION_ABORTED, "transaction was not running");
return res.reset(TRI_ERROR_TRANSACTION_ABORTED,
"transaction was not running");
}
auto ctx = std::make_shared<ManagedContext>(tid, state.get(), AccessMode::Type::NONE);
state.release(); // now owned by ctx
state.release(); // now owned by ctx
transaction::Options trxOpts;
MGMethods trx(ctx, trxOpts);
@ -612,7 +640,7 @@ Result Manager::updateTransaction(TRI_voc_tid_t tid,
}
if (status == transaction::Status::COMMITTED) {
res = trx.commit();
if (res.fail()) { // set final status to aborted
if (res.fail()) { // set final status to aborted
abortTombstone();
}
} else {
@ -627,10 +655,9 @@ Result Manager::updateTransaction(TRI_voc_tid_t tid,
}
/// @brief calls the callback function for each managed transaction
void Manager::iterateManagedTrx(
std::function<void(TRI_voc_tid_t, ManagedTrx const&)> const& callback) const {
void Manager::iterateManagedTrx(std::function<void(TRI_voc_tid_t, ManagedTrx const&)> const& callback) const {
READ_LOCKER(allTransactionsLocker, _allTransactionsLock);
// iterate over all active transactions
for (size_t bucket = 0; bucket < numBuckets; ++bucket) {
READ_LOCKER(locker, _transactions[bucket]._lock);
@ -655,35 +682,33 @@ bool Manager::garbageCollect(bool abortAll) {
for (size_t bucket = 0; bucket < numBuckets; ++bucket) {
WRITE_LOCKER(locker, _transactions[bucket]._lock);
double now = TRI_microtime();
auto it = _transactions[bucket]._managed.begin();
auto it = _transactions[bucket]._managed.begin();
while (it != _transactions[bucket]._managed.end()) {
ManagedTrx& mtrx = it->second;
if (mtrx.type == MetaType::Managed) {
TRI_ASSERT(mtrx.state != nullptr);
if (abortAll || mtrx.expires < now) {
TRY_READ_LOCKER(tryGuard, mtrx.rwlock); // needs lock to access state
if (abortAll || mtrx.expired()) {
TRY_READ_LOCKER(tryGuard, mtrx.rwlock); // needs lock to access state
if (tryGuard.isLocked()) {
TRI_ASSERT(mtrx.state->isRunning() && mtrx.state->isTopLevelTransaction());
TRI_ASSERT(it->first == mtrx.state->id());
toAbort.emplace_back(mtrx.state->id());
} else if (abortAll) { // transaction is in
mtrx.expires = 0; // soft-abort transaction
} else if (abortAll) { // transaction is in
mtrx.usedTimeSecs = 0; // soft-abort transaction
didWork = true;
}
}
} else if (mtrx.type == MetaType::StandaloneAQL && mtrx.expires < now) {
} else if (mtrx.type == MetaType::StandaloneAQL && mtrx.expired()) {
LOG_TOPIC("7ad3f", INFO, Logger::TRANSACTIONS)
<< "expired AQL query transaction '" << it->first << "'";
} else if (mtrx.type == MetaType::Tombstone && mtrx.expires < now) {
<< "expired AQL query transaction '" << it->first << "'";
} else if (mtrx.type == MetaType::Tombstone && mtrx.expired()) {
TRI_ASSERT(mtrx.state == nullptr);
TRI_ASSERT(mtrx.finalStatus != transaction::Status::UNDEFINED);
it = _transactions[bucket]._managed.erase(it);
continue;
}
@ -693,19 +718,21 @@ bool Manager::garbageCollect(bool abortAll) {
for (TRI_voc_tid_t tid : toAbort) {
LOG_TOPIC("6fbaf", DEBUG, Logger::TRANSACTIONS) << "garbage collecting "
"transaction: '" << tid << "'";
Result res = updateTransaction(tid, Status::ABORTED, /*clearSrvs*/true);
"transaction: '"
<< tid << "'";
Result res = updateTransaction(tid, Status::ABORTED, /*clearSrvs*/ true);
if (res.fail()) {
LOG_TOPIC("0a07f", INFO, Logger::TRANSACTIONS) << "error while aborting "
"transaction: '" << res.errorMessage() << "'";
"transaction: '"
<< res.errorMessage() << "'";
}
didWork = true;
}
if (didWork) {
LOG_TOPIC("e5b31", INFO, Logger::TRANSACTIONS) << "aborted expired transactions";
LOG_TOPIC("e5b31", INFO, Logger::TRANSACTIONS)
<< "aborted expired transactions";
}
return didWork;
@ -713,7 +740,6 @@ bool Manager::garbageCollect(bool abortAll) {
/// @brief abort all transactions matching
bool Manager::abortManagedTrx(std::function<bool(TransactionState const&)> cb) {
SmallVector<TRI_voc_tid_t, 64>::allocator_type::arena_type arena;
SmallVector<TRI_voc_tid_t, 64> toAbort{arena};
@ -723,11 +749,10 @@ bool Manager::abortManagedTrx(std::function<bool(TransactionState const&)> cb) {
auto it = _transactions[bucket]._managed.begin();
while (it != _transactions[bucket]._managed.end()) {
ManagedTrx& mtrx = it->second;
if (mtrx.type == MetaType::Managed) {
TRI_ASSERT(mtrx.state != nullptr);
TRY_READ_LOCKER(tryGuard, mtrx.rwlock); // needs lock to access state
TRY_READ_LOCKER(tryGuard, mtrx.rwlock); // needs lock to access state
if (tryGuard.isLocked() && cb(*mtrx.state)) {
toAbort.emplace_back(it->first);
}
@ -738,19 +763,18 @@ bool Manager::abortManagedTrx(std::function<bool(TransactionState const&)> cb) {
}
for (TRI_voc_tid_t tid : toAbort) {
Result res = updateTransaction(tid, Status::ABORTED, /*clearSrvs*/true);
Result res = updateTransaction(tid, Status::ABORTED, /*clearSrvs*/ true);
if (res.fail()) {
LOG_TOPIC("2bf48", INFO, Logger::TRANSACTIONS) << "error aborting "
"transaction: '" << res.errorMessage() << "'";
"transaction: '"
<< res.errorMessage() << "'";
}
}
return !toAbort.empty();
}
void Manager::toVelocyPack(VPackBuilder& builder,
std::string const& database,
std::string const& username,
bool fanout) const {
void Manager::toVelocyPack(VPackBuilder& builder, std::string const& database,
std::string const& username, bool fanout) const {
TRI_ASSERT(!builder.isClosed());
if (fanout) {
@ -759,7 +783,7 @@ void Manager::toVelocyPack(VPackBuilder& builder,
if (ci == nullptr) {
THROW_ARANGO_EXCEPTION(TRI_ERROR_SHUTTING_DOWN);
}
std::shared_ptr<ClusterComm> cc = ClusterComm::instance();
if (cc == nullptr) {
THROW_ARANGO_EXCEPTION(TRI_ERROR_SHUTTING_DOWN);
@ -767,13 +791,13 @@ void Manager::toVelocyPack(VPackBuilder& builder,
std::vector<ClusterCommRequest> requests;
auto auth = AuthenticationFeature::instance();
for (auto const& coordinator : ci->getCurrentCoordinators()) {
if (coordinator == ServerState::instance()->getId()) {
// ourselves!
continue;
}
auto headers = std::make_unique<std::unordered_map<std::string, std::string>>();
if (auth != nullptr && auth->isActive()) {
// when in superuser mode, username is empty
@ -786,18 +810,18 @@ void Manager::toVelocyPack(VPackBuilder& builder,
}
VPackSlice slice = builder.slice();
headers->emplace(StaticStrings::Authorization,
"bearer " + auth->tokenCache().generateJwt(slice));
"bearer " + auth->tokenCache().generateJwt(slice));
}
}
requests.emplace_back("server:" + coordinator, rest::RequestType::GET,
"/_db/" + database + "/_api/transaction?local=true",
"/_db/" + database + "/_api/transaction?local=true",
std::make_shared<std::string>(), std::move(headers));
}
if (!requests.empty()) {
size_t nrGood = cc->performRequests(requests, 30.0, Logger::COMMUNICATION, false);
if (nrGood != requests.size()) {
THROW_ARANGO_EXCEPTION(TRI_ERROR_CLUSTER_BACKEND_UNAVAILABLE);
}

View File

@ -47,33 +47,36 @@ struct TransactionData {
namespace velocypack {
class Builder;
class Slice;
}
} // namespace velocypack
namespace transaction {
class Context;
struct Options;
/// @bried Tracks TransasctionState instances
/// @brief Tracks TransasctionState instances
class Manager final {
static constexpr size_t numBuckets = 16;
static constexpr double defaultTTL = 10.0 * 60.0; // 10 minutes
static constexpr double tombstoneTTL = 5.0 * 60.0; // 5 minutes
static constexpr double idleTTL = 10.0; // 10 seconds
static constexpr double idleTTLDBServer = 3 * 60.0; // 3 minutes
static constexpr double tombstoneTTL = 10.0 * 60.0; // 10 minutes
static constexpr size_t maxTransactionSize = 128 * 1024 * 1024; // 128 MiB
enum class MetaType : uint8_t {
Managed = 1, /// global single shard db transaction
Managed = 1, /// global single shard db transaction
StandaloneAQL = 2, /// used for a standalone transaction (AQL standalone)
Tombstone = 3 /// used to ensure we can acknowledge double commits / aborts
};
struct ManagedTrx {
ManagedTrx(MetaType t, TransactionState* st, double ex)
: type(t), expires(ex), state(st), finalStatus(Status::UNDEFINED),
rwlock() {}
ManagedTrx(MetaType t, TransactionState* st);
~ManagedTrx();
MetaType type;
double expires; /// expiration timestamp, if 0 it expires immediately
TransactionState* state; /// Transaction, may be nullptr
bool expired() const;
public:
MetaType type; /// managed, AQL or tombstone
double usedTimeSecs; /// last time used
TransactionState* state; /// Transaction, may be nullptr
/// @brief final TRX state that is valid if this is a tombstone
/// necessary to avoid getting error on a 'diamond' commit or accidantally
/// repeated commit / abort messages
@ -81,7 +84,7 @@ class Manager final {
/// cheap usage lock for *state
mutable basics::ReadWriteSpinLock rwlock;
};
public:
typedef std::function<void(TRI_voc_tid_t, TransactionData const*)> TrxCallback;
@ -130,8 +133,8 @@ class Manager final {
std::vector<std::string> const& readCollections,
std::vector<std::string> const& writeCollections,
std::vector<std::string> const& exclusiveCollections,
transaction::Options const& options);
transaction::Options options);
/// @brief lease the transaction, increases nesting
std::shared_ptr<transaction::Context> leaseManagedTrx(TRI_voc_tid_t tid,
AccessMode::Type mode);
@ -153,7 +156,7 @@ class Manager final {
/// the array must be opened already.
/// will use database and username to fan-out the request to the other
/// coordinators in a cluster
void toVelocyPack(arangodb::velocypack::Builder& builder,
void toVelocyPack(arangodb::velocypack::Builder& builder,
std::string const& database,
std::string const& username, bool fanout) const;
@ -162,14 +165,13 @@ class Manager final {
inline size_t getBucket(TRI_voc_tid_t tid) const {
return std::hash<TRI_voc_cid_t>()(tid) % numBuckets;
}
Result updateTransaction(TRI_voc_tid_t tid, transaction::Status status,
bool clearServers);
Result updateTransaction(TRI_voc_tid_t tid, transaction::Status status, bool clearServers);
/// @brief calls the callback function for each managed transaction
void iterateManagedTrx(std::function<void(TRI_voc_tid_t, ManagedTrx const&)> const&) const;
private:
/// @brief will be true only for MMFiles
bool const _keepTransactionData;
// a lock protecting ALL buckets in _transactions

View File

@ -115,7 +115,7 @@ class Promise {
arangodb::futures::Future<T> getFuture();
private:
Promise(detail::SharedState<T>* state) : _state(state), _retrieved(false) {}
explicit Promise(detail::SharedState<T>* state) : _state(state), _retrieved(false) {}
// convenience method that checks if _state is set
inline detail::SharedState<T>& getState() {

View File

@ -3804,6 +3804,56 @@ function transactionAQLStreamSuite () {
};
}
// //////////////////////////////////////////////////////////////////////////////
// / @brief test suite
// //////////////////////////////////////////////////////////////////////////////
function transactionTTLStreamSuite () {
'use strict';
const cn = 'UnitTestsTransaction';
let c;
return {
// //////////////////////////////////////////////////////////////////////////////
// / @brief set up
// //////////////////////////////////////////////////////////////////////////////
setUp: function () {
db._drop(cn);
c = db._create(cn, {numberOfShards: 2, replicationFactor: 2});
},
// //////////////////////////////////////////////////////////////////////////////
// / @brief tear down
// //////////////////////////////////////////////////////////////////////////////
tearDown: function () {
db._drop(cn);
},
// //////////////////////////////////////////////////////////////////////////////
// / @brief test: abort idle transactions
// //////////////////////////////////////////////////////////////////////////////
testAbortIdleTrx: function () {
let trx = db._createTransaction({
collections: { read: cn }
});
internal.sleep(12);
try {
trx.collection(cn).save({key:'val'});
fail();
} catch (err) {
assertEqual(internal.errors.ERROR_TRANSACTION_NOT_FOUND.code, err.errorNum);
}
}
};
}
// //////////////////////////////////////////////////////////////////////////////
// / @brief executes the test suites
// //////////////////////////////////////////////////////////////////////////////
@ -3818,5 +3868,6 @@ jsunity.run(transactionCountSuite);
jsunity.run(transactionCrossCollectionSuite);
jsunity.run(transactionTraversalSuite);
jsunity.run(transactionAQLStreamSuite);
jsunity.run(transactionTTLStreamSuite);
return jsunity.done();