1
0
Fork 0

transaction manager tests (#8759)

This commit is contained in:
Simon 2019-04-16 21:26:40 +02:00 committed by Jan
parent 1a22d1360c
commit 5bb9d2dc17
16 changed files with 518 additions and 112 deletions

View File

@ -5,6 +5,9 @@ devel
* added collection.documentId method to derive document id from key
* Indexes created with the 'inBackground', will not hold an
exclusive collection lock for the entire index creation period (rocksdb only)
* fixed internal issue #536: ArangoSearch may crash server during term lookup
* fixed internal issue #2946: Create graph autocomplete was not working under

View File

@ -71,6 +71,14 @@ struct QueryResult {
extra.reset();
context.reset();
}
// Result-like interface
bool ok() const { return result.ok(); }
bool fail() const { return result.fail(); }
int errorNumber() const { return result.errorNumber(); }
bool is(int errorNumber) const { return result.errorNumber() == errorNumber; }
bool isNot(int errorNumber) const { return !is(errorNumber); }
std::string errorMessage() const { return result.errorMessage(); }
public:
Result result;

View File

@ -57,7 +57,8 @@ void buildTransactionBody(TransactionState& state, ServerID const& server,
builder.openObject();
state.options().toVelocyPack(builder);
builder.add("collections", VPackValue(VPackValueType::Object));
auto addCollections = [&](std::string const& key, AccessMode::Type t) {
auto addCollections = [&](const char* key, AccessMode::Type t) {
size_t numCollections = 0;
builder.add(key, VPackValue(VPackValueType::Array));
state.allCollections([&](TransactionCollection& col) {
if (col.accessType() != t) {
@ -66,8 +67,9 @@ void buildTransactionBody(TransactionState& state, ServerID const& server,
if (!state.isCoordinator()) {
if (col.collection()->followers()->contains(server)) {
builder.add(VPackValue(col.collectionName()));
numCollections++;
}
return true;
return true; // continue
}
// coordinator starts transaction on shard leaders
@ -85,10 +87,11 @@ void buildTransactionBody(TransactionState& state, ServerID const& server,
auto sss = ci->getResponsibleServer(shard);
if (server == sss->at(0)) {
builder.add(VPackValue(shard));
numCollections++;
}
}
}
return true;
return true; // continue
}
#endif
std::shared_ptr<ShardMap> shardIds = col.collection()->shardIds();
@ -97,11 +100,15 @@ void buildTransactionBody(TransactionState& state, ServerID const& server,
// only add shard where server is leader
if (!pair.second.empty() && pair.second[0] == server) {
builder.add(VPackValue(pair.first));
numCollections++;
}
}
return true;
});
builder.close();
if (numCollections == 0) {
builder.removeLast(); // no need to keep empty vals
}
};
addCollections("read", AccessMode::Type::READ);
addCollections("write", AccessMode::Type::WRITE);
@ -180,39 +187,6 @@ Result checkTransactionResult(TRI_voc_tid_t desiredTid,
return res.reset(TRI_ERROR_TRANSACTION_INTERNAL); // unspecified error
}
/*struct TrxCommitMethodsCb final : public ClusterCommCallback {
TRI_voc_tid_t tid;
transaction::Status status;
bool operator()(ClusterCommResult* result) override {
TRI_ASSERT(result != nullptr);
Result res = ::checkTransactionResult(state, status, *result);
if (res.fail()) { // remove follower from all collections
ServerID const& follower = requests[i].result.serverID;
state.allCollections([&](TransactionCollection& tc) {
auto cc = tc.collection();
if (cc) {
if (cc->followers()->remove(follower)) {
// TODO: what happens if a server is re-added during a transaction ?
LOG_TOPIC("ea508", WARN, Logger::REPLICATION)
<< "synchronous replication: dropping follower " << follower
<< " for shard " << tc.collectionName();
} else {
LOG_TOPIC("1b077", ERR, Logger::REPLICATION)
<< "synchronous replication: could not drop follower "
<< follower << " for shard " << tc.collectionName();
res.reset(TRI_ERROR_CLUSTER_COULD_NOT_DROP_FOLLOWER);
return false; // cancel transaction
}
}
return true;
});
}
return true;
}
};*/
Result commitAbortTransaction(transaction::Methods& trx, transaction::Status status) {
Result res;

View File

@ -41,7 +41,7 @@ class ReplicationTransaction : public transaction::Methods {
: transaction::Methods(transaction::StandaloneContext::Create(vocbase)),
_guard(vocbase) {
TRI_ASSERT(_state != nullptr);
_state->setType(AccessMode::Type::EXCLUSIVE);
_state->setExclusiveAccessType();
}
private:

View File

@ -569,7 +569,7 @@ std::unique_ptr<SingleCollectionTransaction> RestVocbaseBaseHandler::createTrans
value = _request->header(StaticStrings::TransactionBody, found);
if (found) {
auto trxOpts = VPackParser::fromJson(value);
Result res = mgr->createManagedTrx(_vocbase, tid, trxOpts->slice());;
Result res = mgr->createManagedTrx(_vocbase, tid, trxOpts->slice());
if (res.fail()) {
THROW_ARANGO_EXCEPTION(res);
}
@ -621,7 +621,7 @@ std::shared_ptr<transaction::Context> RestVocbaseBaseHandler::createAQLTransacti
value = _request->header(StaticStrings::TransactionBody, found);
if (found) {
auto trxOpts = VPackParser::fromJson(value);
Result res = mgr->createManagedTrx(_vocbase, tid, trxOpts->slice());;
Result res = mgr->createManagedTrx(_vocbase, tid, trxOpts->slice());
if (res.fail()) {
THROW_ARANGO_EXCEPTION(res);
}

View File

@ -286,24 +286,13 @@ TransactionCollection* TransactionState::findCollection(TRI_voc_cid_t cid,
return nullptr;
}
void TransactionState::setType(AccessMode::Type type) {
if (AccessMode::isWriteOrExclusive(type) && AccessMode::isWriteOrExclusive(_type)) {
// type already correct. do nothing
return;
}
if (AccessMode::isRead(type) && AccessMode::isWriteOrExclusive(_type)) {
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL,
"cannot make a write transaction read-only");
}
if (AccessMode::isWriteOrExclusive(type) && AccessMode::isRead(_type) &&
_status != transaction::Status::CREATED) {
void TransactionState::setExclusiveAccessType() {
if (_status != transaction::Status::CREATED) {
THROW_ARANGO_EXCEPTION_MESSAGE(
TRI_ERROR_INTERNAL,
"cannot make a running read transaction a write transaction");
"cannot change the type of a running transaction");
}
// all right
_type = type;
_type = AccessMode::Type::EXCLUSIVE;
}
bool TransactionState::isOnlyExclusiveTransaction() const {

View File

@ -183,8 +183,9 @@ class TransactionState {
TransactionCollection* findCollection(TRI_voc_cid_t cid) const;
void setType(AccessMode::Type type);
/// @brief make a exclusive transaction, only valid before begin
void setExclusiveAccessType();
/// @brief whether or not a transaction is read-only
bool isReadOnlyTransaction() const {
return (_type == AccessMode::Type::READ);

View File

@ -208,6 +208,7 @@ bool Manager::garbageCollect(bool abortAll) {
}
} else {
TRI_ASSERT(mtrx.state->isRunning() && mtrx.state->isTopLevelTransaction());
TRI_ASSERT(it->first == mtrx.state->id());
if (abortAll || mtrx.expires < now) {
gcBuffer.emplace_back(mtrx.state->id());
}
@ -286,20 +287,13 @@ void Manager::unregisterAQLTrx(TRI_voc_tid_t tid) noexcept {
Result Manager::createManagedTrx(TRI_vocbase_t& vocbase,
TRI_voc_tid_t tid, VPackSlice const trxOpts) {
const size_t bucket = getBucket(tid);
Result res;
{ // quick check whether ID exists
READ_LOCKER(allTransactionsLocker, _allTransactionsLock);
WRITE_LOCKER(writeLocker, _transactions[bucket]._lock);
auto& buck = _transactions[bucket];
auto it = buck._managed.find(tid);
if (it != buck._managed.end()) {
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_TRANSACTION_INTERNAL,
"transaction ID already used");
}
// parse the collections to register
if (!trxOpts.isObject() || !trxOpts.get("collections").isObject()) {
return res.reset(TRI_ERROR_BAD_PARAMETER, "missing 'collections'");
}
Result res;
// extract the properties from the object
transaction::Options options;
options.fromVelocyPack(trxOpts);
@ -308,44 +302,68 @@ Result Manager::createManagedTrx(TRI_vocbase_t& vocbase,
"<lockTimeout> needs to be positive");
}
// parse the collections to register
if (!trxOpts.isObject() || !trxOpts.get("collections").isObject()) {
return res.reset(TRI_ERROR_BAD_PARAMETER, "missing 'collections'");
}
auto fillColls = [](VPackSlice const& slice, std::vector<std::string>& cols) {
if (slice.isNone()) { // ignore nonexistant keys
return true;
} else if (!slice.isArray()) {
return false;
} else if (slice.isString()) {
cols.emplace_back(slice.copyString());
return true;
}
for (VPackSlice val : VPackArrayIterator(slice)) {
if (!val.isString() || val.getStringLength() == 0) {
return false;
if (slice.isArray()) {
for (VPackSlice val : VPackArrayIterator(slice)) {
if (!val.isString() || val.getStringLength() == 0) {
return false;
}
cols.emplace_back(val.copyString());
}
cols.emplace_back(val.copyString());
return true;
}
return true;
return false;
};
std::vector<std::string> reads, writes, exclusives;
VPackSlice collections = trxOpts.get("collections");
bool isValid = fillColls(collections.get("read"), reads) &&
fillColls(collections.get("write"), writes) &&
fillColls(collections.get("exclusive"), exclusives);
fillColls(collections.get("write"), writes) &&
fillColls(collections.get("exclusive"), exclusives);
if (!isValid) {
return res.reset(TRI_ERROR_BAD_PARAMETER, "invalid 'collections' attribute");
}
return createManagedTrx(vocbase, tid, reads, writes, exclusives, options);
}
/// @brief create managed transaction
Result Manager::createManagedTrx(TRI_vocbase_t& vocbase, TRI_voc_tid_t tid,
std::vector<std::string> const& readCollections,
std::vector<std::string> const& writeCollections,
std::vector<std::string> const& exclusiveCollections,
transaction::Options const& options) {
Result res;
const size_t bucket = getBucket(tid);
{ // quick check whether ID exists
READ_LOCKER(allTransactionsLocker, _allTransactionsLock);
WRITE_LOCKER(writeLocker, _transactions[bucket]._lock);
auto& buck = _transactions[bucket];
auto it = buck._managed.find(tid);
if (it != buck._managed.end()) {
return res.reset(TRI_ERROR_TRANSACTION_INTERNAL, "transaction ID already used");
}
}
std::unique_ptr<TransactionState> state;
try {
// now start our own transaction
StorageEngine* engine = EngineSelectorFeature::ENGINE;
state = engine->createTransactionState(vocbase, tid, options);
TRI_ASSERT(state != nullptr);
TRI_ASSERT(state->id() == tid);
} catch (basics::Exception const& e) {
return res.reset(e.code(), e.message());
}
TRI_ASSERT(state != nullptr);
TRI_ASSERT(state->id() == tid);
// lock collections
CollectionNameResolver resolver(vocbase);
auto lockCols = [&](std::vector<std::string> cols, AccessMode::Type mode) {
@ -371,9 +389,9 @@ Result Manager::createManagedTrx(TRI_vocbase_t& vocbase,
}
return true;
};
if (!lockCols(exclusives, AccessMode::Type::EXCLUSIVE) ||
!lockCols(writes, AccessMode::Type::WRITE) ||
!lockCols(reads, AccessMode::Type::READ)) {
if (!lockCols(exclusiveCollections, AccessMode::Type::EXCLUSIVE) ||
!lockCols(writeCollections, AccessMode::Type::WRITE) ||
!lockCols(readCollections, AccessMode::Type::READ)) {
if (res.fail()) {
// error already set by callback function
return res;
@ -400,6 +418,7 @@ Result Manager::createManagedTrx(TRI_vocbase_t& vocbase,
}
double expires = defaultTTL + TRI_microtime();
TRI_ASSERT(expires > 0);
TRI_ASSERT(state->id() == tid);
_transactions[bucket]._managed.emplace(std::piecewise_construct,
std::forward_as_tuple(tid),
std::forward_as_tuple(MetaType::Managed, state.release(),

View File

@ -1,7 +1,7 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2014-2018 ArangoDB GmbH, Cologne, Germany
/// Copyright 2014-2019 ArangoDB GmbH, Cologne, Germany
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
@ -24,7 +24,6 @@
#ifndef ARANGOD_TRANSACTION_MANAGER_H
#define ARANGOD_TRANSACTION_MANAGER_H 1
#include "Basics/Common.h"
#include "Basics/ReadWriteLock.h"
#include "Basics/ReadWriteSpinLock.h"
#include "Basics/Result.h"
@ -33,6 +32,8 @@
#include "VocBase/voc-types.h"
#include <atomic>
#include <map>
#include <set>
#include <vector>
namespace arangodb {
@ -44,6 +45,7 @@ struct TransactionData {
namespace transaction {
class Context;
struct Options;
class Manager final {
static constexpr size_t numBuckets = 16;
@ -85,11 +87,18 @@ class Manager final {
/// @brief register a AQL transaction
void registerAQLTrx(TransactionState*);
void unregisterAQLTrx(TRI_voc_tid_t tid) noexcept;
/// @brief create managed transaction
Result createManagedTrx(TRI_vocbase_t& vocbase, TRI_voc_tid_t tid,
velocypack::Slice const trxOpts);
/// @brief create managed transaction
Result createManagedTrx(TRI_vocbase_t& vocbase, TRI_voc_tid_t tid,
std::vector<std::string> const& readCollections,
std::vector<std::string> const& writeCollections,
std::vector<std::string> const& exclusiveCollections,
transaction::Options const& options);
/// @brief lease the transaction, increases nesting
std::shared_ptr<transaction::Context> leaseManagedTrx(TRI_voc_tid_t tid,
AccessMode::Type mode);

View File

@ -55,7 +55,7 @@ ManagerFeature::ManagerFeature(application_features::ApplicationServer& server)
auto off = std::chrono::seconds(1);
MUTEX_LOCKER(guard, _workItemMutex);
std::lock_guard<std::mutex> guard(_workItemMutex);
if (!ApplicationServer::isStopping() && !canceled) {
_workItem = SchedulerFeature::SCHEDULER->queueDelay(RequestLane::INTERNAL_LOW, off, _gcfunc);
}
@ -71,13 +71,16 @@ void ManagerFeature::prepare() {
void ManagerFeature::start() {
auto off = std::chrono::seconds(1);
MUTEX_LOCKER(guard, _workItemMutex);
_workItem = SchedulerFeature::SCHEDULER->queueDelay(RequestLane::INTERNAL_LOW, off, _gcfunc);
Scheduler* scheduler = SchedulerFeature::SCHEDULER;
if (scheduler != nullptr) { // is nullptr in catch tests
std::lock_guard<std::mutex> guard(_workItemMutex);
_workItem = scheduler->queueDelay(RequestLane::INTERNAL_LOW, off, _gcfunc);
}
}
void ManagerFeature::beginShutdown() {
{
MUTEX_LOCKER(guard, _workItemMutex);
std::lock_guard<std::mutex> guard(_workItemMutex);
_workItem.reset();
}
// at this point all cursors should have been aborted already
@ -93,7 +96,7 @@ void ManagerFeature::stop() {
// reset again, as there may be a race between beginShutdown and
// the execution of the deferred _workItem
{
MUTEX_LOCKER(guard, _workItemMutex);
std::lock_guard<std::mutex> guard(_workItemMutex);
_workItem.reset();
}
// at this point all cursors should have been aborted already

View File

@ -24,9 +24,10 @@
#define ARANGODB_TRANSACTION_MANAGER_FEATURE_H 1
#include "ApplicationFeatures/ApplicationFeature.h"
#include "Basics/Mutex.h"
#include "Scheduler/Scheduler.h"
#include <mutex>
namespace arangodb {
namespace transaction {
@ -51,7 +52,7 @@ class ManagerFeature final : public application_features::ApplicationFeature {
static std::unique_ptr<transaction::Manager> MANAGER;
private:
arangodb::Mutex _workItemMutex;
std::mutex _workItemMutex;
Scheduler::WorkHandle _workItem;
/// @brief where rhythm is life, and life is rhythm :)

View File

@ -117,7 +117,10 @@ class ReadWriteSpinLock {
}
void readUnlock() { unlockRead(); }
void unlockRead() { _state.fetch_sub(READER_INC, std::memory_order_release); }
void unlockRead() {
TRI_ASSERT(isReadLocked());
_state.fetch_sub(READER_INC, std::memory_order_release);
}
void writeUnlock() { unlockWrite(); }
void unlockWrite() {
@ -128,6 +131,9 @@ class ReadWriteSpinLock {
bool isLocked() const {
return (_state.load(std::memory_order_relaxed) & ~QUEUED_WRITER_MASK) != 0;
}
bool isReadLocked() const {
return (_state.load(std::memory_order_relaxed) & READER_MASK) > 0;
}
bool isWriteLocked() const {
return _state.load(std::memory_order_relaxed) & WRITE_LOCK;
}

View File

@ -184,6 +184,7 @@ set(ARANGODB_TESTS_SOURCES
RocksDBEngine/IndexEstimatorTest.cpp
Sharding/ShardDistributionReporterTest.cpp
SimpleHttpClient/CommunicatorTest.cpp
Transaction/Manager.cpp
VocBase/VersionTest.cpp
${IRESEARCH_TESTS_SOURCES}
)

View File

@ -1045,8 +1045,7 @@ std::unique_ptr<arangodb::transaction::ContextData> StorageEngineMock::createTra
}
std::unique_ptr<arangodb::transaction::Manager> StorageEngineMock::createTransactionManager() {
TRI_ASSERT(false);
return nullptr;
return std::make_unique<arangodb::transaction::Manager>(/*keepData*/ false);
}
std::unique_ptr<arangodb::TransactionState> StorageEngineMock::createTransactionState(
@ -1055,7 +1054,7 @@ std::unique_ptr<arangodb::TransactionState> StorageEngineMock::createTransaction
arangodb::transaction::Options const& options
) {
return std::unique_ptr<arangodb::TransactionState>(
new TransactionStateMock(vocbase, options)
new TransactionStateMock(vocbase, tid, options)
);
}
@ -1388,7 +1387,7 @@ int TransactionCollectionMock::use(int nestingLevel) {
}
}
return _collection ? TRI_ERROR_NO_ERROR : TRI_ERROR_INTERNAL;
return _collection ? TRI_ERROR_NO_ERROR : TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND;
}
int TransactionCollectionMock::doLock(arangodb::AccessMode::Type type, int nestingLevel) {
@ -1417,8 +1416,9 @@ size_t TransactionStateMock::commitTransactionCount;
// ensure each transaction state has a unique ID
TransactionStateMock::TransactionStateMock(TRI_vocbase_t& vocbase,
TRI_voc_tid_t tid,
arangodb::transaction::Options const& options)
: TransactionState(vocbase, 0, options) {}
: TransactionState(vocbase, tid, options) {}
arangodb::Result TransactionStateMock::abortTransaction(arangodb::transaction::Methods* trx) {
++abortTransactionCount;
@ -1431,7 +1431,6 @@ arangodb::Result TransactionStateMock::abortTransaction(arangodb::transaction::M
}
arangodb::Result TransactionStateMock::beginTransaction(arangodb::transaction::Hints hints) {
static std::atomic<TRI_voc_tid_t> lastId(0);
++beginTransactionCount;
_hints = hints;
@ -1439,24 +1438,26 @@ arangodb::Result TransactionStateMock::beginTransaction(arangodb::transaction::H
if (!res.ok()) {
updateStatus(arangodb::transaction::Status::ABORTED);
const_cast<TRI_voc_tid_t&>(_id) =
0; // avoid use of TransactionManagerFeature::manager()->unregisterTransaction(...)
// avoid use of TransactionManagerFeature::manager()->unregisterTransaction(...)
const_cast<TRI_voc_tid_t&>(_id) = 0;
return res;
}
const_cast<TRI_voc_tid_t&>(_id) = ++lastId; // ensure each transaction state has a unique ID
updateStatus(arangodb::transaction::Status::RUNNING);
if (nestingLevel() == 0) {
updateStatus(arangodb::transaction::Status::RUNNING);
}
return arangodb::Result();
}
arangodb::Result TransactionStateMock::commitTransaction(arangodb::transaction::Methods* trx) {
++commitTransactionCount;
updateStatus(arangodb::transaction::Status::COMMITTED);
if (nestingLevel() == 0) {
updateStatus(arangodb::transaction::Status::COMMITTED);
// avoid use of TransactionManagerFeature::manager()->unregisterTransaction(...)
const_cast<TRI_voc_tid_t&>(_id) = 0;
}
unuseCollections(nestingLevel());
// avoid use of TransactionManagerFeature::manager()->unregisterTransaction(...)
const_cast<TRI_voc_tid_t&>(_id) = 0;
return arangodb::Result();
}

View File

@ -136,7 +136,7 @@ class TransactionStateMock: public arangodb::TransactionState {
static size_t beginTransactionCount;
static size_t commitTransactionCount;
TransactionStateMock(TRI_vocbase_t& vocbase, arangodb::transaction::Options const& options);
TransactionStateMock(TRI_vocbase_t& vocbase, TRI_voc_tid_t tid, arangodb::transaction::Options const& options);
virtual arangodb::Result abortTransaction(arangodb::transaction::Methods* trx) override;
virtual arangodb::Result beginTransaction(arangodb::transaction::Hints hints) override;
virtual arangodb::Result commitTransaction(arangodb::transaction::Methods* trx) override;

View File

@ -0,0 +1,391 @@
////////////////////////////////////////////////////////////////////////////////
/// @brief test suite for transaction Manager
///
/// @file
///
/// DISCLAIMER
///
/// Copyright 2019 ArangoDB GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// @author Simon Grätzer
////////////////////////////////////////////////////////////////////////////////
#include "Aql/Query.h"
#include "Aql/OptimizerRulesFeature.h"
#include "RestServer/AqlFeature.h"
#include "RestServer/DatabaseFeature.h"
#include "RestServer/TraverserEngineRegistryFeature.h"
#include "RestServer/QueryRegistryFeature.h"
#include "RestServer/ViewTypesFeature.h"
#include "Sharding/ShardingFeature.h"
#include "StorageEngine/EngineSelectorFeature.h"
#include "Transaction/Manager.h"
#include "Transaction/ManagerFeature.h"
#include "Transaction/StandaloneContext.h"
#include "Transaction/SmartContext.h"
#include "Transaction/Status.h"
#include "Utils/ExecContext.h"
#include "Utils/SingleCollectionTransaction.h"
#include "VocBase/LogicalCollection.h"
#include "../Mocks/StorageEngineMock.h"
#include <velocypack/Parser.h>
#include <velocypack/velocypack-aliases.h>
#include "catch.hpp"
using namespace arangodb;
// -----------------------------------------------------------------------------
// --SECTION-- setup / tear-down
// -----------------------------------------------------------------------------
struct TransactionManagerSetup {
StorageEngineMock engine;
arangodb::application_features::ApplicationServer server;
std::vector<std::pair<arangodb::application_features::ApplicationFeature*, bool>> features;
TransactionManagerSetup(): engine(server), server(nullptr, nullptr) {
arangodb::EngineSelectorFeature::ENGINE = &engine;
// setup required application features
features.emplace_back(new arangodb::DatabaseFeature(server), false); // required for TRI_vocbase_t::dropCollection(...)
features.emplace_back(new arangodb::ShardingFeature(server), false);
features.emplace_back(new transaction::ManagerFeature(server), true);
features.emplace_back(new arangodb::QueryRegistryFeature(server), false); // must be first
arangodb::application_features::ApplicationServer::server->addFeature(features.back().first); // need QueryRegistryFeature feature to be added now in order to create the system database
features.emplace_back(new arangodb::TraverserEngineRegistryFeature(server), false); // must be before AqlFeature
features.emplace_back(new arangodb::AqlFeature(server), true);
features.emplace_back(new arangodb::aql::OptimizerRulesFeature(server), true);
for (auto& f: features) {
arangodb::application_features::ApplicationServer::server->addFeature(f.first);
}
for (auto& f: features) {
f.first->prepare();
}
for (auto& f: features) {
if (f.second) {
f.first->start();
}
}
}
~TransactionManagerSetup() {
arangodb::application_features::ApplicationServer::server = nullptr;
arangodb::EngineSelectorFeature::ENGINE = nullptr;
// destroy application features
for (auto& f: features) {
if (f.second) {
f.first->stop();
}
}
for (auto& f: features) {
f.first->unprepare();
}
}
};
arangodb::aql::QueryResult executeQuery(TRI_vocbase_t& vocbase,
std::string const& queryString,
std::shared_ptr<transaction::Context> ctx) {
auto options = std::make_shared<VPackBuilder>();
options->openObject();
options->close();
std::shared_ptr<arangodb::velocypack::Builder> bindVars;
arangodb::aql::Query query(false,
vocbase,
arangodb::aql::QueryString(queryString),
bindVars,
options,
arangodb::aql::PART_MAIN);
query.setTransactionContext(std::move(ctx));
std::shared_ptr<arangodb::aql::SharedQueryState> ss = query.sharedState();
arangodb::aql::QueryResult result;
while (true) {
auto state = query.execute(arangodb::QueryRegistryFeature::registry(), result);
if (state == arangodb::aql::ExecutionState::WAITING) {
ss->waitForAsyncResponse();
} else {
break;
}
}
return result;
}
// -----------------------------------------------------------------------------
// --SECTION-- test suite
// -----------------------------------------------------------------------------
/// @brief test transaction::Manager
TEST_CASE("TransactionManagerTest", "[transaction]") {
TransactionManagerSetup setup;
TRI_ASSERT(transaction::ManagerFeature::manager());
TRI_vocbase_t vocbase(TRI_vocbase_type_e::TRI_VOCBASE_TYPE_NORMAL, 1, "testVocbase");
auto* mgr = transaction::ManagerFeature::manager();
REQUIRE(mgr != nullptr);
scopeGuard([&] {
mgr->garbageCollect(true);
});
TRI_voc_tid_t tid = TRI_NewTickServer();
SECTION("Parsing errors") {
auto json = arangodb::velocypack::Parser::fromJson("{ \"write\": [33] }");
Result res = mgr->createManagedTrx(vocbase, tid, json->slice());
CHECK(res.is(TRI_ERROR_BAD_PARAMETER));
json = arangodb::velocypack::Parser::fromJson("{ \"collections\":{\"write\": \"33\"}, \"lockTimeout\": -1 }");
res = mgr->createManagedTrx(vocbase, tid, json->slice());
CHECK(res.is(TRI_ERROR_BAD_PARAMETER));
}
SECTION("Collection Not found") {
auto json = arangodb::velocypack::Parser::fromJson("{ \"collections\":{\"read\": [\"33\"]}}");
Result res = mgr->createManagedTrx(vocbase, tid, json->slice());
CHECK(res.errorNumber() == TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND);
json = arangodb::velocypack::Parser::fromJson("{ \"collections\":{\"write\": [\"33\"]}}");
res = mgr->createManagedTrx(vocbase, tid, json->slice());
CHECK(res.errorNumber() == TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND);
json = arangodb::velocypack::Parser::fromJson("{ \"collections\":{\"exclusive\": [\"33\"]}}");
res = mgr->createManagedTrx(vocbase, tid, json->slice());
CHECK(res.errorNumber() == TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND);
}
std::shared_ptr<LogicalCollection> coll;
{
auto json = VPackParser::fromJson("{ \"name\": \"testCollection\", \"id\": 42 }");
coll = vocbase.createCollection(json->slice());
}
REQUIRE(coll != nullptr);
SECTION("Transaction ID reuse") {
auto json = arangodb::velocypack::Parser::fromJson("{ \"collections\":{\"read\": [\"42\"]}}");
Result res = mgr->createManagedTrx(vocbase, tid, json->slice());
REQUIRE(res.ok());
json = arangodb::velocypack::Parser::fromJson("{ \"collections\":{\"write\": [\"33\"]}}");
res = mgr->createManagedTrx(vocbase, tid, json->slice());
CHECK(res.errorNumber() == TRI_ERROR_TRANSACTION_INTERNAL);
res = mgr->abortManagedTrx(tid);
REQUIRE(res.ok());
}
SECTION("Simple Transaction & Abort") {
auto json = arangodb::velocypack::Parser::fromJson("{ \"collections\":{\"write\": [\"42\"]}}");
Result res = mgr->createManagedTrx(vocbase, tid, json->slice());
REQUIRE(res.ok());
auto doc = arangodb::velocypack::Parser::fromJson("{ \"_key\": \"1\"}");
{
auto ctx = mgr->leaseManagedTrx(tid, AccessMode::Type::WRITE);
REQUIRE(ctx.get() != nullptr);
SingleCollectionTransaction trx(ctx, "testCollection", AccessMode::Type::WRITE);
REQUIRE(trx.state()->isEmbeddedTransaction());
OperationOptions opts;
auto opRes = trx.insert(coll->name(), doc->slice(), opts);
REQUIRE(opRes.ok());
REQUIRE(trx.finish(opRes.result).ok());
}
REQUIRE((mgr->getManagedTrxStatus(tid) == transaction::Status::RUNNING));
{ // lease again
auto ctx = mgr->leaseManagedTrx(tid, AccessMode::Type::WRITE);
REQUIRE(ctx.get() != nullptr);
SingleCollectionTransaction trx(ctx, "testCollection", AccessMode::Type::READ);
REQUIRE(trx.state()->isEmbeddedTransaction());
OperationOptions opts;
auto opRes = trx.document(coll->name(), doc->slice(), opts);
REQUIRE(opRes.ok());
REQUIRE(trx.finish(opRes.result).ok());
}
REQUIRE((mgr->getManagedTrxStatus(tid) == transaction::Status::RUNNING));
REQUIRE(mgr->abortManagedTrx(tid).ok());
// perform same operation
REQUIRE(mgr->abortManagedTrx(tid).ok());
// cannot commit aborted transaction
REQUIRE(mgr->commitManagedTrx(tid).is(TRI_ERROR_TRANSACTION_DISALLOWED_OPERATION));
REQUIRE((mgr->getManagedTrxStatus(tid) == transaction::Status::ABORTED));
}
SECTION("Simple Transaction & Commit") {
auto json = arangodb::velocypack::Parser::fromJson("{ \"collections\":{\"write\": [\"42\"]}}");
Result res = mgr->createManagedTrx(vocbase, tid, json->slice());
REQUIRE(res.ok());
{
auto ctx = mgr->leaseManagedTrx(tid, AccessMode::Type::WRITE);
REQUIRE(ctx.get() != nullptr);
SingleCollectionTransaction trx(ctx, "testCollection", AccessMode::Type::WRITE);
REQUIRE(trx.state()->isEmbeddedTransaction());
auto doc = arangodb::velocypack::Parser::fromJson("{ \"abc\": 1}");
OperationOptions opts;
auto opRes = trx.insert(coll->name(), doc->slice(), opts);
REQUIRE(opRes.ok());
REQUIRE(trx.finish(opRes.result).ok());
}
REQUIRE((mgr->getManagedTrxStatus(tid) == transaction::Status::RUNNING));
REQUIRE(mgr->commitManagedTrx(tid).ok());
// perform same operation
REQUIRE(mgr->commitManagedTrx(tid).ok());
// cannot commit aborted transaction
REQUIRE(mgr->abortManagedTrx(tid).is(TRI_ERROR_TRANSACTION_DISALLOWED_OPERATION));
REQUIRE((mgr->getManagedTrxStatus(tid) == transaction::Status::COMMITTED));
}
SECTION("Simple Transaction & Commit while in use") {
auto json = arangodb::velocypack::Parser::fromJson("{ \"collections\":{\"write\": [\"42\"]}}");
Result res = mgr->createManagedTrx(vocbase, tid, json->slice());
REQUIRE(res.ok());
{
auto ctx = mgr->leaseManagedTrx(tid, AccessMode::Type::WRITE);
REQUIRE(ctx.get() != nullptr);
SingleCollectionTransaction trx(ctx, "testCollection", AccessMode::Type::WRITE);
REQUIRE(trx.state()->isEmbeddedTransaction());
auto doc = arangodb::velocypack::Parser::fromJson("{ \"abc\": 1}");
OperationOptions opts;
auto opRes = trx.insert(coll->name(), doc->slice(), opts);
REQUIRE(opRes.ok());
REQUIRE(mgr->commitManagedTrx(tid).is(TRI_ERROR_TRANSACTION_DISALLOWED_OPERATION));
REQUIRE(trx.finish(opRes.result).ok());
}
REQUIRE((mgr->getManagedTrxStatus(tid) == transaction::Status::RUNNING));
REQUIRE(mgr->commitManagedTrx(tid).ok());
// perform same operation
REQUIRE(mgr->commitManagedTrx(tid).ok());
// cannot abort committed transaction
REQUIRE(mgr->abortManagedTrx(tid).is(TRI_ERROR_TRANSACTION_DISALLOWED_OPERATION));
REQUIRE((mgr->getManagedTrxStatus(tid) == transaction::Status::COMMITTED));
}
SECTION("Leasing multiple read-only transactions") {
auto json = arangodb::velocypack::Parser::fromJson("{ \"collections\":{\"read\": [\"42\"]}}");
Result res = mgr->createManagedTrx(vocbase, tid, json->slice());
REQUIRE(res.ok());
{
auto ctx = mgr->leaseManagedTrx(tid, AccessMode::Type::READ);
REQUIRE(ctx.get() != nullptr);
REQUIRE(ctx->getParentTransaction() != nullptr);
auto ctx2 = mgr->leaseManagedTrx(tid, AccessMode::Type::READ);
REQUIRE(ctx2.get() != nullptr);
CHECK(ctx->getParentTransaction() == ctx2->getParentTransaction());
auto ctx3 = mgr->leaseManagedTrx(tid, AccessMode::Type::READ);
REQUIRE(ctx3.get() != nullptr);
CHECK(ctx->getParentTransaction() == ctx3->getParentTransaction());
}
REQUIRE(mgr->abortManagedTrx(tid).ok());
REQUIRE((mgr->getManagedTrxStatus(tid) == transaction::Status::ABORTED));
}
SECTION("Lock conflict") {
auto json = arangodb::velocypack::Parser::fromJson("{ \"collections\":{\"write\": [\"42\"]}}");
Result res = mgr->createManagedTrx(vocbase, tid, json->slice());
REQUIRE(res.ok());
{
auto ctx = mgr->leaseManagedTrx(tid, AccessMode::Type::WRITE);
REQUIRE(ctx.get() != nullptr);
REQUIRE(ctx->getParentTransaction() != nullptr);
REQUIRE_THROWS(mgr->leaseManagedTrx(tid, AccessMode::Type::READ));
}
REQUIRE(mgr->abortManagedTrx(tid).ok());
REQUIRE((mgr->getManagedTrxStatus(tid) == transaction::Status::ABORTED));
}
SECTION("Garbage Collection shutdown") {
auto json = arangodb::velocypack::Parser::fromJson("{ \"collections\":{\"write\": [\"42\"]}}");
Result res = mgr->createManagedTrx(vocbase, tid, json->slice());
REQUIRE(res.ok());
{
auto ctx = mgr->leaseManagedTrx(tid, AccessMode::Type::WRITE);
REQUIRE(ctx.get() != nullptr);
REQUIRE(ctx->getParentTransaction() != nullptr);
}
REQUIRE((mgr->getManagedTrxStatus(tid) == transaction::Status::RUNNING));
REQUIRE(mgr->garbageCollect(/*abortAll*/true));
REQUIRE((mgr->getManagedTrxStatus(tid) == transaction::Status::ABORTED));
}
SECTION("AQL standalone transaction") {
{
auto ctx = transaction::StandaloneContext::Create(vocbase);
SingleCollectionTransaction trx(ctx, "testCollection", AccessMode::Type::WRITE);
REQUIRE(trx.begin().ok());
auto doc = arangodb::velocypack::Parser::fromJson("{ \"abc\": 1}");
OperationOptions opts;
auto opRes = trx.insert(coll->name(), doc->slice(), opts);
REQUIRE(opRes.ok());
REQUIRE(trx.finish(opRes.result).ok());
}
auto ctx = std::make_shared<transaction::AQLStandaloneContext>(vocbase, tid);
auto qq = "FOR doc IN testCollection RETURN doc";
arangodb::aql::QueryResult qres = executeQuery(vocbase, qq, ctx);
REQUIRE(qres.ok());
}
// SECTION("Permission denied") {
// ExecContext exe(ExecContext::Type, "dummy",
// vocbase.name(), auth::Level::NONE, auth::Level::NONE);
// ExecContextScope scope();
//
// auto json = arangodb::velocypack::Parser::fromJson("{ \"collections\":{\"read\": [\"42\"]}}");
// Result res = mgr->createManagedTrx(vocbase, tid, json->slice());
// REQUIRE(res.ok());
//
// json = arangodb::velocypack::Parser::fromJson("{ \"collections\":{\"write\": [\"33\"]}}");
// res = mgr->createManagedTrx(vocbase, tid, json->slice());
// REQUIRE(res.errorNumber() == TRI_ERROR_TRANSACTION_INTERNAL);
// }
//
// SECTION("Acquire transaction") {
// transaction::ManagedContext ctx();
//
// }
}