1
0
Fork 0

added tests for intermediate commits

This commit is contained in:
jsteemann 2017-05-30 12:03:58 +02:00
parent 4ab6d4d4f9
commit 53b150b206
21 changed files with 514 additions and 191 deletions

View File

@ -335,6 +335,7 @@ SET(ARANGOD_SOURCES
Transaction/Context.cpp
Transaction/Helpers.cpp
Transaction/Methods.cpp
Transaction/Options.cpp
Transaction/StandaloneContext.cpp
Transaction/V8Context.cpp
Utils/CollectionKeys.cpp

View File

@ -263,8 +263,8 @@ transaction::ContextData* MMFilesEngine::createTransactionContextData() {
}
TransactionState* MMFilesEngine::createTransactionState(
TRI_vocbase_t* vocbase) {
return new MMFilesTransactionState(vocbase);
TRI_vocbase_t* vocbase, transaction::Options const& options) {
return new MMFilesTransactionState(vocbase, options);
}
TransactionCollection* MMFilesEngine::createTransactionCollection(

View File

@ -48,6 +48,7 @@ class RestHandlerFactory;
namespace transaction {
class ContextData;
struct Options;
}
/// @brief collection file structure
@ -106,7 +107,7 @@ class MMFilesEngine final : public StorageEngine {
TransactionManager* createTransactionManager() override;
transaction::ContextData* createTransactionContextData() override;
TransactionState* createTransactionState(TRI_vocbase_t*) override;
TransactionState* createTransactionState(TRI_vocbase_t*, transaction::Options const&) override;
TransactionCollection* createTransactionCollection(
TransactionState* state, TRI_voc_cid_t cid, AccessMode::Type accessType,
int nestingLevel) override;

View File

@ -51,8 +51,8 @@ static inline MMFilesLogfileManager* GetMMFilesLogfileManager() {
}
/// @brief transaction type
MMFilesTransactionState::MMFilesTransactionState(TRI_vocbase_t* vocbase)
: TransactionState(vocbase),
MMFilesTransactionState::MMFilesTransactionState(TRI_vocbase_t* vocbase, transaction::Options const& options)
: TransactionState(vocbase, options),
_rocksTransaction(nullptr),
_beginWritten(false),
_hasOperations(false) {}
@ -230,7 +230,7 @@ int MMFilesTransactionState::addOperation(TRI_voc_rid_t revisionId,
}
if (waitForSync) {
_waitForSync = true;
_options.waitForSync = true;
}
TRI_IF_FAILURE("TransactionOperationNoSlot") { return TRI_ERROR_DEBUG; }
@ -467,7 +467,7 @@ int MMFilesTransactionState::writeCommitMarker() {
try {
MMFilesTransactionMarker marker(TRI_DF_MARKER_VPACK_COMMIT_TRANSACTION, _vocbase->id(), _id);
res = GetMMFilesLogfileManager()->allocateAndWrite(marker, _waitForSync).errorCode;
res = GetMMFilesLogfileManager()->allocateAndWrite(marker, _options.waitForSync).errorCode;
TRI_IF_FAILURE("TransactionWriteCommitMarkerSegfault") {
TRI_SegfaultDebugging("crashing on commit");
@ -475,7 +475,7 @@ int MMFilesTransactionState::writeCommitMarker() {
TRI_IF_FAILURE("TransactionWriteCommitMarkerNoRocksSync") { return TRI_ERROR_NO_ERROR; }
if (_waitForSync) {
if (_options.waitForSync) {
// also sync RocksDB WAL
MMFilesPersistentIndexFeature::syncWal();
}

View File

@ -45,13 +45,14 @@ struct MMFilesDocumentOperation;
class MMFilesWalMarker;
namespace transaction {
class Methods;
struct Options;
}
class TransactionCollection;
/// @brief transaction type
class MMFilesTransactionState final : public TransactionState {
public:
explicit MMFilesTransactionState(TRI_vocbase_t* vocbase);
MMFilesTransactionState(TRI_vocbase_t* vocbase, transaction::Options const&);
~MMFilesTransactionState();
/// @brief begin a transaction

View File

@ -327,10 +327,12 @@ RangeIterator<Edge<E>> GraphStore<V, E>::edgeIterator(
template <typename V, typename E>
std::unique_ptr<transaction::Methods> GraphStore<V, E>::_createTransaction() {
double lockTimeout = transaction::Methods::DefaultLockTimeout;
transaction::Options transactionOptions;
transactionOptions.waitForSync = false;
transactionOptions.allowImplicitCollections = true;
auto ctx = transaction::StandaloneContext::Create(_vocbaseGuard.vocbase());
std::unique_ptr<transaction::Methods> trx(
new transaction::UserTransaction(ctx, {}, {}, {}, lockTimeout, false, true));
new transaction::UserTransaction(ctx, {}, {}, {}, transactionOptions));
Result res = trx->begin();
if (!res.ok()) {
THROW_ARANGO_EXCEPTION(res);
@ -504,10 +506,12 @@ void GraphStore<V, E>::_storeVertices(std::vector<ShardID> const& globalShards,
}
currentShard = it->shard();
ShardID const& shard = globalShards[currentShard];
double timeout = transaction::Methods::DefaultLockTimeout;
transaction::Options transactionOptions;
transactionOptions.waitForSync = false;
transactionOptions.allowImplicitCollections = false;
trx.reset(new transaction::UserTransaction(
transaction::StandaloneContext::Create(_vocbaseGuard.vocbase()), {},
{shard}, {}, timeout, false, false));
{shard}, {}, transactionOptions));
res = trx->begin();
if (!res.ok()) {
THROW_ARANGO_EXCEPTION(res);

View File

@ -67,6 +67,7 @@
#include "RocksDBEngine/RocksDBV8Functions.h"
#include "RocksDBEngine/RocksDBValue.h"
#include "RocksDBEngine/RocksDBView.h"
#include "Transaction/Options.h"
#include "VocBase/replication-applier.h"
#include "VocBase/ticks.h"
@ -108,13 +109,12 @@ RocksDBEngine::RocksDBEngine(application_features::ApplicationServer* server)
: StorageEngine(server, EngineName, FeatureName, new RocksDBIndexFactory()),
_db(nullptr),
_vpackCmp(new RocksDBComparator()),
_maxTransactionSize((std::numeric_limits<uint64_t>::max)()),
_intermediateTransactionCommitSize(32 * 1024 * 1024),
_intermediateTransactionCommitCount(100000),
_intermediateTransactionCommitEnabled(false),
_maxTransactionSize(transaction::Options::defaultMaxTransactionSize),
_intermediateCommitSize(transaction::Options::defaultIntermediateCommitSize),
_intermediateCommitCount(transaction::Options::defaultIntermediateCommitCount),
_pruneWaitTime(10.0) {
// inherits order from StorageEngine but requires RocksDBOption that are used
// to configure this Engine and the MMFiles PesistentIndexFeature
// inherits order from StorageEngine but requires "RocksDBOption" that is used
// to configure this engine and the MMFiles PersistentIndexFeature
startsAfter("RocksDBOption");
}
@ -134,21 +134,16 @@ void RocksDBEngine::collectOptions(
new UInt64Parameter(&_maxTransactionSize));
options->addHiddenOption(
"--rocksdb.intermediate-transaction-count",
"an intermediate commit will be tried when a transaction "
"--rocksdb.intermediate-commit-size",
"an intermediate commit will be performed automatically when a transaction "
"has accumulated operations of this size (in bytes)",
new UInt64Parameter(&_intermediateTransactionCommitSize));
new UInt64Parameter(&_intermediateCommitSize));
options->addHiddenOption(
"--rocksdb.intermediate-transaction-count",
"an intermediate commit will be tried when this number of "
"--rocksdb.intermediate-commit-count",
"an intermediate commit will be performed automatically when this number of "
"operations is reached in a transaction",
new UInt64Parameter(&_intermediateTransactionCommitCount));
_intermediateTransactionCommitCount = 100 * 1000;
options->addHiddenOption(
"--rocksdb.intermediate-transaction", "enable intermediate transactions",
new BooleanParameter(&_intermediateTransactionCommitEnabled));
new UInt64Parameter(&_intermediateCommitCount));
options->addOption("--rocksdb.wal-file-timeout",
"timeout after which unused WAL files are deleted",
@ -156,7 +151,9 @@ void RocksDBEngine::collectOptions(
}
// validate the storage engine's specific options
void RocksDBEngine::validateOptions(std::shared_ptr<options::ProgramOptions>) {}
void RocksDBEngine::validateOptions(std::shared_ptr<options::ProgramOptions>) {
transaction::Options::setLimits(_maxTransactionSize, _intermediateCommitSize, _intermediateCommitCount);
}
// preparation phase for storage engine. can be used for internal setup.
// the storage engine must not start any threads here or write any files
@ -457,10 +454,8 @@ transaction::ContextData* RocksDBEngine::createTransactionContextData() {
}
TransactionState* RocksDBEngine::createTransactionState(
TRI_vocbase_t* vocbase) {
return new RocksDBTransactionState(
vocbase, _maxTransactionSize, _intermediateTransactionCommitEnabled,
_intermediateTransactionCommitSize, _intermediateTransactionCommitCount);
TRI_vocbase_t* vocbase, transaction::Options const& options) {
return new RocksDBTransactionState(vocbase, options);
}
TransactionCollection* RocksDBEngine::createTransactionCollection(

View File

@ -53,6 +53,7 @@ class RestHandlerFactory;
namespace transaction {
class ContextData;
struct Options;
}
class RocksDBEngine final : public StorageEngine {
@ -80,7 +81,7 @@ class RocksDBEngine final : public StorageEngine {
TransactionManager* createTransactionManager() override;
transaction::ContextData* createTransactionContextData() override;
TransactionState* createTransactionState(TRI_vocbase_t*) override;
TransactionState* createTransactionState(TRI_vocbase_t*, transaction::Options const&) override;
TransactionCollection* createTransactionCollection(
TransactionState* state, TRI_voc_cid_t cid, AccessMode::Type accessType,
int nestingLevel) override;
@ -259,14 +260,11 @@ class RocksDBEngine final : public StorageEngine {
/// Background thread handling garbage collection etc
std::unique_ptr<RocksDBBackgroundThread> _backgroundThread;
uint64_t _maxTransactionSize; // maximum allowed size for a transaction
uint64_t _intermediateTransactionCommitSize; // maximum size for a
// transaction before a
// intermediate commit will be
// tried
uint64_t _intermediateTransactionCommitCount; // limit of transaction count
// for intermediate commit
bool _intermediateTransactionCommitEnabled; // allow usage of intermediate
// commits
uint64_t _intermediateCommitSize; // maximum size for a
// transaction before an
// intermediate commit is performed
uint64_t _intermediateCommitCount; // limit of transaction count
// for intermediate commit
mutable basics::ReadWriteLock _collectionMapLock;
std::unordered_map<uint64_t, std::pair<TRI_voc_tick_t, TRI_voc_cid_t>>

View File

@ -414,11 +414,14 @@ std::unique_ptr<transaction::Methods>
RocksDBReplicationContext::createTransaction(TRI_vocbase_t* vocbase) {
_guard.reset(new DatabaseGuard(vocbase));
double lockTimeout = transaction::Methods::DefaultLockTimeout;
transaction::Options transactionOptions;
transactionOptions.waitForSync = false;
transactionOptions.allowImplicitCollections = true;
std::shared_ptr<transaction::StandaloneContext> ctx =
transaction::StandaloneContext::Create(vocbase);
std::unique_ptr<transaction::Methods> trx(new transaction::UserTransaction(
ctx, {}, {}, {}, lockTimeout, false, true));
ctx, {}, {}, {}, transactionOptions));
Result res = trx->begin();
if (!res.ok()) {
_guard.reset();

View File

@ -59,21 +59,15 @@ struct RocksDBTransactionData final : public TransactionData {};
/// @brief transaction type
RocksDBTransactionState::RocksDBTransactionState(
TRI_vocbase_t* vocbase, uint64_t maxTransSize,
bool intermediateTransactionEnabled, uint64_t intermediateTransactionSize,
uint64_t intermediateTransactionNumber)
: TransactionState(vocbase),
TRI_vocbase_t* vocbase, transaction::Options const& options)
: TransactionState(vocbase, options),
_snapshot(nullptr),
_rocksWriteOptions(),
_rocksReadOptions(),
_cacheTx(nullptr),
_maxTransactionSize(maxTransSize),
_intermediateTransactionSize(intermediateTransactionSize),
_intermediateTransactionNumber(intermediateTransactionNumber),
_numInserts(0),
_numUpdates(0),
_numRemoves(0),
_intermediateTransactionEnabled(intermediateTransactionEnabled),
_lastUsedCollection(0) {}
/// @brief free a transaction container
@ -147,7 +141,7 @@ Result RocksDBTransactionState::beginTransaction(transaction::Hints hints) {
} else {
createTransaction();
bool readWrites = hasHint(transaction::Hints::Hint::READ_WRITES);
if (_intermediateTransactionEnabled && !readWrites) {
if (!readWrites) {
_snapshot = db->GetSnapshot(); // we must call ReleaseSnapshot at some point
_rocksReadOptions.snapshot = _snapshot;
TRI_ASSERT(_snapshot != nullptr);
@ -396,11 +390,11 @@ RocksDBOperationResult RocksDBTransactionState::addOperation(
size_t currentSize = _rocksTransaction->GetWriteBatch()->GetWriteBatch()->GetDataSize();
uint64_t newSize = currentSize + operationSize + keySize;
if (_maxTransactionSize < newSize) {
if (newSize > _options.maxTransactionSize) {
// we hit the transaction size limit
std::string message =
"aborting transaction because maximal transaction size limit of " +
std::to_string(_maxTransactionSize) + " bytes is reached";
std::to_string(_options.maxTransactionSize) + " bytes is reached";
res.reset(TRI_ERROR_RESOURCE_LIMIT, message);
return res;
}
@ -439,14 +433,13 @@ RocksDBOperationResult RocksDBTransactionState::addOperation(
}
auto numOperations = _numInserts + _numUpdates + _numRemoves;
// signal if intermediate commit is required
// this will be done if intermediate transactions are enabled
// and either the "number of operations" or the "transaction size"
// has reached the limit
if (_intermediateTransactionEnabled &&
(_intermediateTransactionNumber <= numOperations ||
_intermediateTransactionSize <= newSize)) {
//res.commitRequired(true);
// perform an intermediate commit
// this will be done if either the "number of operations" or the
// "transaction size" counters have reached their limit
if (_options.intermediateCommitCount <= numOperations ||
_options.intermediateCommitSize <= newSize) {
LOG_TOPIC(ERR, Logger::FIXME) << "INTERMEDIATE COMMIT!";
internalCommit();
_numInserts = 0;
_numUpdates = 0;

View File

@ -52,6 +52,7 @@ class LogicalCollection;
struct RocksDBDocumentOperation;
namespace transaction {
class Methods;
struct Options;
}
class TransactionCollection;
class RocksDBMethods;
@ -65,11 +66,7 @@ class RocksDBTransactionState final : public TransactionState {
friend class RocksDBBatchedMethods;
public:
explicit RocksDBTransactionState(TRI_vocbase_t* vocbase,
uint64_t maxOperationSize,
bool intermediateTransactionEnabled,
uint64_t intermediateTransactionSize,
uint64_t intermediateTransactionNumber);
RocksDBTransactionState(TRI_vocbase_t* vocbase, transaction::Options const&);
~RocksDBTransactionState();
/// @brief begin a transaction
@ -129,16 +126,11 @@ private:
// wrapper to use outside this class to access rocksdb
std::unique_ptr<RocksDBMethods> _rocksMethods;
// a transaction may not become bigger than this value
uint64_t _maxTransactionSize;
// if a transaction gets bigger than this value and intermediate transactions
// are enabled then a commit will be done
uint64_t _intermediateTransactionSize;
uint64_t _intermediateTransactionNumber;
// if a transaction gets bigger than these values then an automatic
// intermediate commit will be done
uint64_t _numInserts;
uint64_t _numUpdates;
uint64_t _numRemoves;
bool _intermediateTransactionEnabled;
/// Last collection used for transaction
TRI_voc_cid_t _lastUsedCollection;

View File

@ -58,6 +58,7 @@ class RestHandlerFactory;
namespace transaction {
class Context;
class ContextData;
struct Options;
}
class StorageEngine : public application_features::ApplicationFeature {
@ -93,7 +94,7 @@ class StorageEngine : public application_features::ApplicationFeature {
virtual TransactionManager* createTransactionManager() = 0;
virtual transaction::ContextData* createTransactionContextData() = 0;
virtual TransactionState* createTransactionState(TRI_vocbase_t*) = 0;
virtual TransactionState* createTransactionState(TRI_vocbase_t*, transaction::Options const&) = 0;
virtual TransactionCollection* createTransactionCollection(TransactionState*, TRI_voc_cid_t, AccessMode::Type, int nestingLevel) = 0;
// when a new collection is created, this method is called to augment the collection

View File

@ -29,12 +29,13 @@
#include "StorageEngine/StorageEngine.h"
#include "StorageEngine/TransactionCollection.h"
#include "Transaction/Methods.h"
#include "Transaction/Options.h"
#include "VocBase/ticks.h"
using namespace arangodb;
/// @brief transaction type
TransactionState::TransactionState(TRI_vocbase_t* vocbase)
TransactionState::TransactionState(TRI_vocbase_t* vocbase, transaction::Options const& options)
: _vocbase(vocbase),
_id(0),
_type(AccessMode::Type::READ),
@ -43,10 +44,8 @@ TransactionState::TransactionState(TRI_vocbase_t* vocbase)
_collections{_arena}, // assign arena to vector
_serverRole(ServerState::instance()->getRole()),
_hints(),
_timeout(transaction::Methods::DefaultLockTimeout),
_nestingLevel(0),
_allowImplicitCollections(true),
_waitForSync(false) {}
_options(options) {}
/// @brief free a transaction container
TransactionState::~TransactionState() {
@ -99,7 +98,7 @@ int TransactionState::addCollection(TRI_voc_cid_t cid,
// << ", accessType: " << accessType
// << ", nestingLevel: " << nestingLevel
// << ", force: " << force
// << ", allowImplicitCollections: " << _allowImplicitCollections;
// << ", allowImplicitCollections: " << _options.allowImplicitCollections;
// upgrade transaction type if required
if (nestingLevel == 0) {
@ -130,7 +129,7 @@ int TransactionState::addCollection(TRI_voc_cid_t cid,
return TRI_ERROR_TRANSACTION_UNREGISTERED_COLLECTION;
}
if (!AccessMode::isWriteOrExclusive(accessType) && !_allowImplicitCollections) {
if (!AccessMode::isWriteOrExclusive(accessType) && !_options.allowImplicitCollections) {
return TRI_ERROR_TRANSACTION_UNREGISTERED_COLLECTION;
}

View File

@ -29,6 +29,7 @@
#include "Basics/SmallVector.h"
#include "Cluster/ServerState.h"
#include "Transaction/Hints.h"
#include "Transaction/Options.h"
#include "Transaction/Status.h"
#include "VocBase/AccessMode.h"
#include "VocBase/voc-types.h"
@ -49,6 +50,7 @@ struct TRI_vocbase_t;
namespace arangodb {
namespace transaction {
class Methods;
struct Options;
}
class TransactionCollection;
@ -59,7 +61,7 @@ class TransactionState {
TransactionState(TransactionState const&) = delete;
TransactionState& operator=(TransactionState const&) = delete;
explicit TransactionState(TRI_vocbase_t* vocbase);
TransactionState(TRI_vocbase_t* vocbase, transaction::Options const&);
virtual ~TransactionState();
bool isRunningInCluster() const { return ServerState::isRunningInCluster(_serverRole); }
@ -80,18 +82,18 @@ class TransactionState {
bool isTopLevelTransaction() const { return _nestingLevel == 0; }
bool isEmbeddedTransaction() const { return !isTopLevelTransaction(); }
double timeout() const { return _timeout; }
double timeout() const { return _options.lockTimeout; }
void timeout(double value) {
if (value > 0.0) {
_timeout = value;
_options.lockTimeout = value;
}
}
bool waitForSync() const { return _waitForSync; }
void waitForSync(bool value) { _waitForSync = value; }
bool waitForSync() const { return _options.waitForSync; }
void waitForSync(bool value) { _options.waitForSync = value; }
bool allowImplicitCollections() const { return _allowImplicitCollections; }
void allowImplicitCollections(bool value) { _allowImplicitCollections = value; }
bool allowImplicitCollections() const { return _options.allowImplicitCollections; }
void allowImplicitCollections(bool value) { _options.allowImplicitCollections = value; }
std::vector<std::string> collectionNames() const;
@ -169,10 +171,9 @@ class TransactionState {
ServerState::RoleEnum const _serverRole; // role of the server
transaction::Hints _hints; // hints;
double _timeout; // timeout for lock acquisition
int _nestingLevel;
bool _allowImplicitCollections;
bool _waitForSync; // whether or not the transaction had a synchronous op
transaction::Options _options;
};
}

View File

@ -47,6 +47,7 @@
#include "StorageEngine/TransactionState.h"
#include "Transaction/Context.h"
#include "Transaction/Helpers.h"
#include "Transaction/Options.h"
#include "Utils/CollectionNameResolver.h"
#include "Utils/Events.h"
#include "Utils/OperationCursor.h"
@ -541,7 +542,8 @@ bool transaction::Methods::findIndexHandleForAndNode(
}
transaction::Methods::Methods(
std::shared_ptr<transaction::Context> const& transactionContext)
std::shared_ptr<transaction::Context> const& transactionContext,
transaction::Options const& options)
: _state(nullptr),
_transactionContext(transactionContext),
_transactionContextPtr(transactionContext.get()) {
@ -560,7 +562,7 @@ transaction::Methods::Methods(
setupEmbedded(vocbase);
} else {
// non-embedded
setupToplevel(vocbase);
setupToplevel(vocbase, options);
}
TRI_ASSERT(_state != nullptr);
@ -2866,10 +2868,10 @@ void transaction::Methods::setupEmbedded(TRI_vocbase_t*) {
}
/// @brief set up a top-level transaction
void transaction::Methods::setupToplevel(TRI_vocbase_t* vocbase) {
void transaction::Methods::setupToplevel(TRI_vocbase_t* vocbase, transaction::Options const& options) {
// we are not embedded. now start our own transaction
StorageEngine* engine = EngineSelectorFeature::ENGINE;
_state = engine->createTransactionState(vocbase);
_state = engine->createTransactionState(vocbase, options);
TRI_ASSERT(_state != nullptr);

View File

@ -30,6 +30,7 @@
#include "Basics/Result.h"
#include "Utils/OperationResult.h"
#include "Transaction/Hints.h"
#include "Transaction/Options.h"
#include "Transaction/Status.h"
#include "VocBase/AccessMode.h"
#include "VocBase/vocbase.h"
@ -79,16 +80,13 @@ class TransactionState;
class TransactionCollection;
namespace transaction {
struct Options;
class Methods {
friend class traverser::BaseEngine;
friend class CallbackInvoker;
public:
/// @brief time (in seconds) that is spent waiting for a lock
static constexpr double DefaultLockTimeout = 900.0;
class IndexHandle {
friend class transaction::Methods;
@ -124,7 +122,7 @@ class Methods {
protected:
/// @brief create the transaction
explicit Methods(std::shared_ptr<transaction::Context> const& transactionContext);
Methods(std::shared_ptr<transaction::Context> const& transactionContext, transaction::Options const& options = transaction::Options());
public:
@ -547,7 +545,7 @@ class Methods {
void setupEmbedded(TRI_vocbase_t*);
/// @brief set up a top-level transaction
void setupToplevel(TRI_vocbase_t*);
void setupToplevel(TRI_vocbase_t*, transaction::Options const&);
protected:
/// @brief the state

View File

@ -0,0 +1,44 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Jan Steemann
////////////////////////////////////////////////////////////////////////////////
#include "Options.h"
using namespace arangodb::transaction;
uint64_t Options::defaultMaxTransactionSize = UINT64_MAX;
uint64_t Options::defaultIntermediateCommitSize = 512 * 1024 * 1024;
uint64_t Options::defaultIntermediateCommitCount = 1 * 1000 * 1000;
Options::Options()
: lockTimeout(defaultLockTimeout),
maxTransactionSize(defaultMaxTransactionSize),
intermediateCommitSize(defaultIntermediateCommitSize),
intermediateCommitCount(defaultIntermediateCommitCount),
allowImplicitCollections(true),
waitForSync(false) {}
void Options::setLimits(uint64_t maxTransactionSize, uint64_t intermediateCommitSize, uint64_t intermediateCommitCount) {
defaultMaxTransactionSize = maxTransactionSize;
defaultIntermediateCommitSize = intermediateCommitSize;
defaultIntermediateCommitCount = intermediateCommitCount;
}

View File

@ -0,0 +1,53 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Jan Steemann
////////////////////////////////////////////////////////////////////////////////
#ifndef ARANGOD_TRANSACTION_OPTIONS_H
#define ARANGOD_TRANSACTION_OPTIONS_H 1
#include "Basics/Common.h"
namespace arangodb {
namespace transaction {
struct Options {
Options();
static void setLimits(uint64_t maxTransactionSize, uint64_t intermediateCommitSize, uint64_t intermediateCommitCount);
static constexpr double defaultLockTimeout = 900.0;
static uint64_t defaultMaxTransactionSize;
static uint64_t defaultIntermediateCommitSize;
static uint64_t defaultIntermediateCommitCount;
/// @brief time (in seconds) that is spent waiting for a lock
double lockTimeout;
uint64_t maxTransactionSize;
uint64_t intermediateCommitSize;
uint64_t intermediateCommitCount;
bool allowImplicitCollections;
bool waitForSync;
};
}
}
#endif

View File

@ -27,8 +27,9 @@
#include "Basics/Common.h"
#include "StorageEngine/TransactionState.h"
#include "Transaction/Methods.h"
#include "Transaction/Context.h"
#include "Transaction/Methods.h"
#include "Transaction/Options.h"
namespace arangodb {
namespace transaction {
@ -40,9 +41,8 @@ class UserTransaction final : public transaction::Methods {
std::vector<std::string> const& readCollections,
std::vector<std::string> const& writeCollections,
std::vector<std::string> const& exclusiveCollections,
double lockTimeout, bool waitForSync,
bool allowImplicitCollections)
: transaction::Methods(transactionContext) {
transaction::Options const& options)
: transaction::Methods(transactionContext, options) {
addHint(transaction::Hints::Hint::LOCK_ENTIRELY);
for (auto const& it : exclusiveCollections) {
@ -56,10 +56,6 @@ class UserTransaction final : public transaction::Methods {
for (auto const& it : readCollections) {
addCollection(it, AccessMode::Type::READ);
}
_state->timeout(lockTimeout);
_state->waitForSync(waitForSync);
_state->allowImplicitCollections(allowImplicitCollections);
}
};

View File

@ -134,9 +134,7 @@ static void JS_Transaction(v8::FunctionCallbackInfo<v8::Value> const& args) {
v8::Handle<v8::Object> object = v8::Handle<v8::Object>::Cast(args[0]);
// extract the properties from the object
// "lockTimeout"
double lockTimeout = transaction::Methods::DefaultLockTimeout;
transaction::Options trxOptions;
if (object->Has(TRI_V8_ASCII_STRING("lockTimeout"))) {
static std::string const timeoutError =
@ -146,17 +144,15 @@ static void JS_Transaction(v8::FunctionCallbackInfo<v8::Value> const& args) {
TRI_V8_THROW_EXCEPTION_PARAMETER(timeoutError);
}
lockTimeout =
trxOptions.lockTimeout =
TRI_ObjectToDouble(object->Get(TRI_V8_ASCII_STRING("lockTimeout")));
if (lockTimeout < 0.0) {
if (trxOptions.lockTimeout < 0.0) {
TRI_V8_THROW_EXCEPTION_PARAMETER(timeoutError);
}
}
// "waitForSync"
bool waitForSync = false;
TRI_GET_GLOBALS();
TRI_GET_GLOBAL_STRING(WaitForSyncKey);
if (object->Has(WaitForSyncKey)) {
@ -165,7 +161,7 @@ static void JS_Transaction(v8::FunctionCallbackInfo<v8::Value> const& args) {
TRI_V8_THROW_EXCEPTION_PARAMETER("<waitForSync> must be a boolean value");
}
waitForSync = TRI_ObjectToBoolean(WaitForSyncKey);
trxOptions.waitForSync = TRI_ObjectToBoolean(WaitForSyncKey);
}
// "collections"
@ -185,86 +181,61 @@ static void JS_Transaction(v8::FunctionCallbackInfo<v8::Value> const& args) {
TRI_V8_THROW_EXCEPTION_PARAMETER(collectionError);
}
bool isValid = true;
std::vector<std::string> readCollections;
std::vector<std::string> writeCollections;
std::vector<std::string> exclusiveCollections;
bool allowImplicitCollections = true;
if (collections->Has(TRI_V8_ASCII_STRING("allowImplicit"))) {
allowImplicitCollections = TRI_ObjectToBoolean(
trxOptions.allowImplicitCollections = TRI_ObjectToBoolean(
collections->Get(TRI_V8_ASCII_STRING("allowImplicit")));
}
if (object->Has(TRI_V8_ASCII_STRING("maxTransactionSize"))) {
trxOptions.maxTransactionSize = TRI_ObjectToUInt64(
object->Get(TRI_V8_ASCII_STRING("maxTransactionSize")), true);
}
if (object->Has(TRI_V8_ASCII_STRING("intermediateCommitSize"))) {
trxOptions.intermediateCommitSize = TRI_ObjectToUInt64(
object->Get(TRI_V8_ASCII_STRING("intermediateCommitSize")), true);
}
if (object->Has(TRI_V8_ASCII_STRING("intermediateCommitCount"))) {
trxOptions.intermediateCommitCount = TRI_ObjectToUInt64(
object->Get(TRI_V8_ASCII_STRING("intermediateCommitCount")), true);
}
auto getCollections = [&isolate](v8::Handle<v8::Object> obj,
std::vector<std::string>& collections,
char const* attributeName) -> bool {
if (obj->Has(TRI_V8_ASCII_STRING(attributeName))) {
if (obj->Get(TRI_V8_ASCII_STRING(attributeName))->IsArray()) {
v8::Handle<v8::Array> names = v8::Handle<v8::Array>::Cast(
obj->Get(TRI_V8_ASCII_STRING(attributeName)));
for (uint32_t i = 0; i < names->Length(); ++i) {
v8::Handle<v8::Value> collection = names->Get(i);
if (!collection->IsString()) {
return false;
}
collections.emplace_back(TRI_ObjectToString(collection));
}
} else if (obj->Get(TRI_V8_ASCII_STRING(attributeName))->IsString()) {
collections.emplace_back(
TRI_ObjectToString(obj->Get(TRI_V8_ASCII_STRING(attributeName))));
} else {
return false;
}
// fallthrough intentional
}
return true;
};
// collections.read
if (collections->Has(TRI_V8_ASCII_STRING("read"))) {
if (collections->Get(TRI_V8_ASCII_STRING("read"))->IsArray()) {
v8::Handle<v8::Array> names = v8::Handle<v8::Array>::Cast(
collections->Get(TRI_V8_ASCII_STRING("read")));
for (uint32_t i = 0; i < names->Length(); ++i) {
v8::Handle<v8::Value> collection = names->Get(i);
if (!collection->IsString()) {
isValid = false;
break;
}
readCollections.emplace_back(TRI_ObjectToString(collection));
}
} else if (collections->Get(TRI_V8_ASCII_STRING("read"))->IsString()) {
readCollections.emplace_back(
TRI_ObjectToString(collections->Get(TRI_V8_ASCII_STRING("read"))));
} else {
isValid = false;
}
}
// collections.write
if (collections->Has(TRI_V8_ASCII_STRING("write"))) {
if (collections->Get(TRI_V8_ASCII_STRING("write"))->IsArray()) {
v8::Handle<v8::Array> names = v8::Handle<v8::Array>::Cast(
collections->Get(TRI_V8_ASCII_STRING("write")));
for (uint32_t i = 0; i < names->Length(); ++i) {
v8::Handle<v8::Value> collection = names->Get(i);
if (!collection->IsString()) {
isValid = false;
break;
}
writeCollections.emplace_back(TRI_ObjectToString(collection));
}
} else if (collections->Get(TRI_V8_ASCII_STRING("write"))->IsString()) {
writeCollections.emplace_back(
TRI_ObjectToString(collections->Get(TRI_V8_ASCII_STRING("write"))));
} else {
isValid = false;
}
}
bool isValid =
(getCollections(collections, readCollections, "read") &&
getCollections(collections, writeCollections, "write") &&
getCollections(collections, exclusiveCollections, "exclusive"));
// collections.exclusive
if (collections->Has(TRI_V8_ASCII_STRING("exclusive"))) {
if (collections->Get(TRI_V8_ASCII_STRING("exclusive"))->IsArray()) {
v8::Handle<v8::Array> names = v8::Handle<v8::Array>::Cast(
collections->Get(TRI_V8_ASCII_STRING("exclusive")));
for (uint32_t i = 0; i < names->Length(); ++i) {
v8::Handle<v8::Value> collection = names->Get(i);
if (!collection->IsString()) {
isValid = false;
break;
}
exclusiveCollections.emplace_back(TRI_ObjectToString(collection));
}
} else if (collections->Get(TRI_V8_ASCII_STRING("exclusive"))->IsString()) {
exclusiveCollections.emplace_back(
TRI_ObjectToString(collections->Get(TRI_V8_ASCII_STRING("exclusive"))));
} else {
isValid = false;
}
}
if (!isValid) {
TRI_V8_THROW_EXCEPTION_PARAMETER(collectionError);
}
@ -346,7 +317,7 @@ static void JS_Transaction(v8::FunctionCallbackInfo<v8::Value> const& args) {
// start actual transaction
transaction::UserTransaction trx(transactionContext, readCollections, writeCollections, exclusiveCollections,
lockTimeout, waitForSync, allowImplicitCollections);
trxOptions);
Result res = trx.begin();

View File

@ -0,0 +1,270 @@
/*jshint globalstrict:false, strict:false */
/*global assertEqual, assertTrue, fail */
////////////////////////////////////////////////////////////////////////////////
/// @brief tests for client/server side transaction invocation, RocksDB version
///
/// @file
///
/// DISCLAIMER
///
/// Copyright 2010-2012 triagens GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is triAGENS GmbH, Cologne, Germany
///
/// @author Jan Steemann
/// @author Copyright 2012, triAGENS GmbH, Cologne, Germany
////////////////////////////////////////////////////////////////////////////////
var jsunity = require("jsunity");
var arangodb = require("@arangodb");
var ERRORS = arangodb.errors;
var db = arangodb.db;
////////////////////////////////////////////////////////////////////////////////
/// @brief test suite
////////////////////////////////////////////////////////////////////////////////
function RocksDBTransactionsInvocationsSuite () {
'use strict';
var c = null;
var cn = "UnitTestsTransaction";
return {
////////////////////////////////////////////////////////////////////////////////
/// @brief set up
////////////////////////////////////////////////////////////////////////////////
setUp : function () {
db._drop(cn);
c = db._create(cn);
},
////////////////////////////////////////////////////////////////////////////////
/// @brief tear down
////////////////////////////////////////////////////////////////////////////////
tearDown : function () {
db._drop(cn);
},
////////////////////////////////////////////////////////////////////////////////
/// @brief max transaction size
////////////////////////////////////////////////////////////////////////////////
testSmallMaxTransactionSize : function () {
try {
db._executeTransaction({
collections: { write: cn },
action: "function (params) { " +
"var db = require('internal').db; " +
"var c = db._collection(params.cn); " +
"for (var i = 0; i < 10000; ++i) { c.insert({ someValue : i }); } " +
"}",
params: { cn },
maxTransactionSize: 100 * 1000, // 100 KB => not enough!
});
fail();
} catch (err) {
assertEqual(ERRORS.ERROR_RESOURCE_LIMIT.code, err.errorNum);
}
assertEqual(0, db._collection(cn).count());
},
testBigMaxTransactionSize : function () {
db._executeTransaction({
collections: { write: cn },
action: "function (params) { " +
"var db = require('internal').db; " +
"var c = db._collection(params.cn); " +
"for (var i = 0; i < 10000; ++i) { c.insert({ someValue : i }); } " +
"}",
params: { cn },
maxTransactionSize: 10 * 1000 * 1000, // 10 MB => enough!
});
assertEqual(10000, db._collection(cn).count());
},
testIntermediateCommitCountVerySmall : function () {
db._executeTransaction({
collections: { write: cn },
action: "function (params) { " +
"var db = require('internal').db; " +
"var c = db._collection(params.cn); " +
"for (var i = 0; i < 1000; ++i) { c.insert({ someValue : i }); } " +
"}",
params: { cn },
intermediateCommitCount: 1 // this should produce 1000 intermediate commits
});
assertEqual(1000, db._collection(cn).count());
},
testIntermediateCommitCountBigger : function () {
db._executeTransaction({
collections: { write: cn },
action: "function (params) { " +
"var db = require('internal').db; " +
"var c = db._collection(params.cn); " +
"for (var i = 0; i < 10000; ++i) { c.insert({ someValue : i }); } " +
"}",
params: { cn },
intermediateCommitCount: 1000 // this should produce 10 intermediate commits
});
assertEqual(10000, db._collection(cn).count());
},
testIntermediateCommitCountWithFail : function () {
var failed = false;
try {
db._executeTransaction({
collections: { write: cn },
action: "function (params) { " +
"var db = require('internal').db; " +
"var c = db._collection(params.cn); " +
"for (var i = 0; i < 10000; ++i) { c.insert({ someValue : i }); } " +
"throw 'peng!'; " +
"}",
params: { cn },
intermediateCommitCount: 1000 // this should produce 10 intermediate commits
});
} catch (err) {
failed = true;
}
assertTrue(failed);
assertEqual(10000, db._collection(cn).count());
},
testIntermediateCommitCountWithFailInTheMiddle : function () {
var failed = false;
try {
db._executeTransaction({
collections: { write: cn },
action: "function (params) { " +
"var db = require('internal').db; " +
"var c = db._collection(params.cn); " +
"for (var i = 0; i < 10000; ++i) { " +
"c.insert({ someValue : i }); " +
"if (i === 6532) { throw 'peng!'; } " +
"} " +
"}",
params: { cn },
intermediateCommitCount: 1000 // this should produce 6 intermediate commits
});
} catch (err) {
failed = true;
}
assertTrue(failed);
assertEqual(6000, db._collection(cn).count());
},
testIntermediateCommitSizeVerySmall : function () {
db._executeTransaction({
collections: { write: cn },
action: "function (params) { " +
"var db = require('internal').db; " +
"var c = db._collection(params.cn); " +
"for (var i = 0; i < 1000; ++i) { c.insert({ someValue : i }); } " +
"}",
params: { cn },
intermediateCommitSize: 1000 // this should produce a lot of intermediate commits
});
assertEqual(1000, db._collection(cn).count());
},
testIntermediateCommitSizeBigger : function () {
db._executeTransaction({
collections: { write: cn },
action: "function (params) { " +
"var db = require('internal').db; " +
"var c = db._collection(params.cn); " +
"for (var i = 0; i < 10000; ++i) { c.insert({ someValue : i }); } " +
"}",
params: { cn },
intermediateCommitSize: 10000 // this should produce a lot of intermediate commits
});
assertEqual(10000, db._collection(cn).count());
},
testIntermediateCommitSizeWithFail : function () {
var failed = false;
try {
db._executeTransaction({
collections: { write: cn },
action: "function (params) { " +
"var db = require('internal').db; " +
"var c = db._collection(params.cn); " +
"for (var i = 0; i < 10000; ++i) { c.insert({ someValue : i }); } " +
"throw 'peng!'; " +
"}",
params: { cn },
intermediateCommitSize: 10 // this should produce a lot of intermediate commits
});
} catch (err) {
failed = true;
}
assertTrue(failed);
assertEqual(10000, db._collection(cn).count());
},
testIntermediateCommitSizeWithFailInTheMiddle : function () {
var failed = false;
try {
db._executeTransaction({
collections: { write: cn },
action: "function (params) { " +
"var db = require('internal').db; " +
"var c = db._collection(params.cn); " +
"for (var i = 0; i < 10000; ++i) { " +
"c.insert({ someValue : i }); " +
"if (i === 6532) { throw 'peng!'; } " +
"} " +
"}",
params: { cn },
intermediateCommitSize: 10 // this should produce a lot of intermediate commits
});
} catch (err) {
failed = true;
}
assertTrue(failed);
assertEqual(6533, db._collection(cn).count());
}
};
}
////////////////////////////////////////////////////////////////////////////////
/// @brief executes the test suite
////////////////////////////////////////////////////////////////////////////////
jsunity.run(RocksDBTransactionsInvocationsSuite);
return jsunity.done();