1
0
Fork 0

limit transaction size and work on intermediate commits

This commit is contained in:
Jan Christoph Uhde 2017-04-12 07:10:19 +02:00
parent 4c4d491eac
commit d67aa33fe3
7 changed files with 183 additions and 59 deletions

2
.gitignore vendored
View File

@ -108,3 +108,5 @@ npm-debug.log
log-*
data-*
cluster-init
datafile-*.db

View File

@ -408,6 +408,9 @@ void RocksDBCollection::invokeOnAllElements(
void RocksDBCollection::truncate(transaction::Methods* trx,
OperationOptions& options) {
// TODO FIXME -- improve transaction size
// TODO FIXME -- intermediate commit
rocksdb::Comparator const* cmp = globalRocksEngine()->cmp();
TRI_voc_cid_t cid = _logicalCollection->cid();
@ -428,11 +431,20 @@ void RocksDBCollection::truncate(transaction::Methods* trx,
THROW_ARANGO_EXCEPTION(converted);
}
// transaction size limit reached -- fail
TRI_voc_rid_t revisionId = RocksDBKey::revisionId(iter->key());
auto result = state->addOperation(cid, revisionId, TRI_VOC_DOCUMENT_OPERATION_REMOVE, 0, iter->key().size());
// report size of key
RocksDBOperationResult result = state->addOperation(cid, revisionId, TRI_VOC_DOCUMENT_OPERATION_REMOVE, 0, iter->key().size());
if (result.fail()){
THROW_ARANGO_EXCEPTION(result);
}
// force intermediate commit
if(result.commitRequired()){
// force commit
}
iter->Next();
}
@ -475,14 +487,20 @@ void RocksDBCollection::truncate(transaction::Methods* trx,
THROW_ARANGO_EXCEPTION(converted);
}
//update size
auto result = state->addOperation(cid, /*ignored revisionId*/ 0,
// report index key size
RocksDBOperationResult result = state->addOperation(cid, /*ignored revisionId*/ 0,
TRI_VOC_NOOP_OPERATION_UPDATE_SIZE, 0, iter->key().size()
);
// transaction size limit reached -- fail
if (result.fail()){
THROW_ARANGO_EXCEPTION(result);
}
// force intermediate commit
if(result.commitRequired()){
// force commit
}
iter->Next();
}
}
@ -537,6 +555,10 @@ int RocksDBCollection::insert(arangodb::transaction::Methods* trx,
arangodb::ManagedDocumentResult& mdr,
OperationOptions& options,
TRI_voc_tick_t& resultMarkerTick, bool /*lock*/) {
// TODO FIXME -- limit transaction size
// TODO FIXME -- intermediate commit
// store the tick that was used for writing the document
// note that we don't need it for this engine
resultMarkerTick = 0;
@ -598,9 +620,21 @@ int RocksDBCollection::insert(arangodb::transaction::Methods* trx,
return lookupResult.errorNumber();
}
static_cast<RocksDBTransactionState*>(trx->state())
// report document and key size
RocksDBOperationResult result = static_cast<RocksDBTransactionState*>(trx->state())
->addOperation(_logicalCollection->cid(), revisionId,
TRI_VOC_DOCUMENT_OPERATION_INSERT, newSlice.byteSize(), res.keySize());
// transaction size limit reached -- fail
if(res.fail()){
THROW_ARANGO_EXCEPTION(res);
}
// force intermediate commit
if(result.commitRequired()){
// force commit
}
guard.commit();
}
@ -617,7 +651,7 @@ int RocksDBCollection::update(arangodb::transaction::Methods* trx,
TRI_voc_rid_t const& revisionId,
arangodb::velocypack::Slice const key) {
// TODO FIXME -- limit transaction size
// TODO FIXME -- intermediate commit
resultMarkerTick = 0;
RocksDBOperationResult res;
@ -683,22 +717,30 @@ int RocksDBCollection::update(arangodb::transaction::Methods* trx,
res = updateDocument(trx, oldRevisionId, oldDoc, revisionId, newDoc,
options.waitForSync);
uint64_t keySize = res.keySize();
if (res.ok()) {
Result result = lookupRevisionVPack(revisionId, trx, mdr);
RocksDBOperationResult result = lookupRevisionVPack(revisionId, trx, mdr);
if (result.fail()) {
return result.errorNumber();
}
TRI_ASSERT(!mdr.empty());
//update as combination of remove/insert?!
//check add operation result!!!!
// report document and key size
result = static_cast<RocksDBTransactionState*>(trx->state())
->addOperation(_logicalCollection->cid(), revisionId,
TRI_VOC_DOCUMENT_OPERATION_UPDATE, newDoc.byteSize(), keySize);
//result.fail -> throw
TRI_VOC_DOCUMENT_OPERATION_UPDATE, newDoc.byteSize(), res.keySize());
// transaction size limit reached -- fail
if(result.fail()){
THROW_ARANGO_EXCEPTION(result);
}
// force intermediate commit
if(result.commitRequired()){
// force commit
}
guard.commit();
}
@ -714,7 +756,8 @@ int RocksDBCollection::replace(
arangodb::velocypack::Slice const toSlice) {
resultMarkerTick = 0;
//TODO FIXME -- limit transaction size
// TODO FIXME -- improve transaction size
// TODO FIXME -- intermediate commit
Result res;
bool const isEdgeCollection =
@ -774,7 +817,7 @@ int RocksDBCollection::replace(
RocksDBOperationResult opResult = updateDocument(trx, oldRevisionId, oldDoc, revisionId,
VPackSlice(builder->slice()), options.waitForSync);
if (opResult.ok()) {
Result result = lookupRevisionVPack(revisionId, trx, mdr);
RocksDBOperationResult result = lookupRevisionVPack(revisionId, trx, mdr);
if (!result.ok()) {
return result.errorNumber();
@ -782,10 +825,22 @@ int RocksDBCollection::replace(
TRI_ASSERT(!mdr.empty());
static_cast<RocksDBTransactionState*>(trx->state())
// report document and key size
result = static_cast<RocksDBTransactionState*>(trx->state())
->addOperation(_logicalCollection->cid(), revisionId,
TRI_VOC_DOCUMENT_OPERATION_REPLACE,
VPackSlice(builder->slice()).byteSize(), opResult.keySize());
// transaction size limit reached -- fail
if(result.fail()){
THROW_ARANGO_EXCEPTION(result);
}
// force intermediate commit
if(result.commitRequired()){
// force commit
}
guard.commit();
}
@ -800,7 +855,8 @@ int RocksDBCollection::remove(arangodb::transaction::Methods* trx,
TRI_voc_rid_t const& revisionId,
TRI_voc_rid_t& prevRev) {
//TODO FIXME -- limit transaction size
// TODO FIXME -- improve transaction size
// TODO FIXME -- intermediate commit
// store the tick that was used for writing the document
// note that we don't need it for this engine
@ -846,10 +902,20 @@ int RocksDBCollection::remove(arangodb::transaction::Methods* trx,
res = removeDocument(trx, oldRevisionId, oldDoc, options.waitForSync);
if (res.ok()) {
static_cast<RocksDBTransactionState*>(trx->state())
// report key size
res = static_cast<RocksDBTransactionState*>(trx->state())
->addOperation(_logicalCollection->cid(), revisionId,
TRI_VOC_DOCUMENT_OPERATION_REMOVE, oldDoc.byteSize(),
res.keySize());
TRI_VOC_DOCUMENT_OPERATION_REMOVE, 0, res.keySize());
// transaction size limit reached -- fail
if(res.fail()){
THROW_ARANGO_EXCEPTION(res);
}
// force intermediate commit
if(res.commitRequired()){
// force commit
}
guard.commit();
}
@ -1012,6 +1078,8 @@ RocksDBOperationResult RocksDBCollection::insertDocument(arangodb::transaction::
<< " INSERT DOCUMENT FAILED. REVISIONID: " << revisionId;*/
Result converted = rocksutils::convertStatus(status, rocksutils::StatusHint::document);
res = converted;
// set keysize that is passed up to the crud operations
res.keySize(key.string().size());
return res;
}
@ -1151,6 +1219,8 @@ RocksDBOperationResult RocksDBCollection::updateDocument(transaction::Methods* t
VPackSlice const& newDoc,
bool& waitForSync) {
// keysize in return value is set by insertDocument
// Coordinator doesn't know index internals
TRI_ASSERT(trx->state()->isRunning());
TRI_ASSERT(!ServerState::instance()->isCoordinator());

View File

@ -52,24 +52,32 @@ public:
RocksDBOperationResult()
:Result()
,_keySize(0)
,_commitRequired(false)
{}
RocksDBOperationResult(Result const& other)
: _keySize(0)
,_commitRequired(false)
{
cloneData(other);
}
RocksDBOperationResult(Result&& other)
: _keySize(0)
,_commitRequired(false)
{
cloneData(std::move(other));
}
uint64_t keySize(){ return _keySize; }
uint64_t keySize(uint64_t s ) { _keySize = s; return _keySize; }
bool commitRequired(){ return _commitRequired; }
bool commitRequired(bool cr ) { _commitRequired = cr; return _commitRequired; }
protected:
uint64_t _keySize;
bool _commitRequired;
};
class TransactionState;

View File

@ -88,13 +88,20 @@ RocksDBEngine::~RocksDBEngine() { delete _db; }
// ---------------------------------
// add the storage engine's specifc options to the global list of options
void RocksDBEngine::collectOptions(std::shared_ptr<options::ProgramOptions> options) {
void RocksDBEngine::collectOptions(
std::shared_ptr<options::ProgramOptions> options) {
options->addSection("rocksdb", "RocksDB engine specific configuration");
_maxTransactionSize = std::numeric_limits<uint64_t>::max(); // set sensible default value here
options->addOption("--rocksdb.max-transaction-size"
,"transaction size limit"
,new UInt64Parameter(&_maxTransactionSize)
);
// control transaction size for RocksDB engine
_maxTransactionSize =
std::numeric_limits<uint64_t>::max(); // set sensible default value here
options->addOption("--rocksdb.max-transaction-size", "transaction size limit",
new UInt64Parameter(&_maxTransactionSize));
// control intermediate transactions in RocksDB
_intermediateTransactionSize = (_maxTransactionSize / 5) * 4; // transaction size that will trigger an intermediate commit
_intermediateTransactionNumber = 100 * 1000; // number operation after that a commit will be tried
_intermediateTransactionEnabled = false;
}
// validate the storage engine's specific options
@ -134,9 +141,9 @@ void RocksDBEngine::start() {
_options.comparator = _cmp.get();
// WAL_ttl_seconds needs to be bigger than the sync interval of the count
// manager
_options.WAL_ttl_seconds = 15; //(uint64_t)(counter_sync_seconds * 2.0);
_options.WAL_ttl_seconds = 15; //(uint64_t)(counter_sync_seconds * 2.0);
// TODO: prefix_extractior + memtable_insert_with_hint_prefix
rocksdb::Status status =
rocksdb::TransactionDB::Open(_options, transactionOptions, _path, &_db);
@ -185,7 +192,9 @@ transaction::ContextData* RocksDBEngine::createTransactionContextData() {
TransactionState* RocksDBEngine::createTransactionState(
TRI_vocbase_t* vocbase) {
return new RocksDBTransactionState(vocbase, _maxTransactionSize);
return new RocksDBTransactionState(
vocbase, _maxTransactionSize, _intermediateTransactionEnabled,
_intermediateTransactionSize, _intermediateTransactionNumber);
}
TransactionCollection* RocksDBEngine::createTransactionCollection(
@ -450,15 +459,15 @@ void RocksDBEngine::prepareDropDatabase(TRI_vocbase_t* vocbase,
bool useWriteMarker, int& status) {
// probably not required
// THROW_ARANGO_NOT_YET_IMPLEMENTED();
//status = saveDatabaseParameters(vocbase->id(), vocbase->name(), true);
// status = saveDatabaseParameters(vocbase->id(), vocbase->name(), true);
VPackBuilder builder;
builder.openObject();
builder.add("id", VPackValue(std::to_string(vocbase->id())));
builder.add("name", VPackValue(vocbase->name()));
builder.add("deleted", VPackValue(true));
builder.close();
status = writeCreateDatabaseMarker(vocbase->id(), builder.slice());
}
@ -515,11 +524,11 @@ arangodb::Result RocksDBEngine::persistCollection(
int res = writeCreateCollectionMarker(vocbase->id(), cid, slice);
result.reset(res);
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
if (result.ok()) {
RocksDBCollection *rcoll =
RocksDBCollection::toRocksDBCollection(collection->getPhysical());
RocksDBCollection* rcoll =
RocksDBCollection::toRocksDBCollection(collection->getPhysical());
TRI_ASSERT(rcoll->numberDocuments() == 0);
}
#endif
@ -564,7 +573,7 @@ arangodb::Result RocksDBEngine::dropCollection(
return res; // let collection exist so the remaining elements can still be
// accessed
}
// delete collection
_counterManager->removeCounter(coll->objectId());
auto key = RocksDBKey::Collection(vocbase->id(), collection->cid());
@ -671,7 +680,6 @@ void RocksDBEngine::signalCleanup(TRI_vocbase_t*) {
void RocksDBEngine::iterateDocuments(
TRI_voc_tick_t databaseId, TRI_voc_cid_t collectionId,
std::function<void(arangodb::velocypack::Slice const&)> const& cb) {
THROW_ARANGO_NOT_YET_IMPLEMENTED();
}
@ -696,14 +704,14 @@ bool RocksDBEngine::cleanupCompactionBlockers(TRI_vocbase_t* vocbase) {
/// @brief insert a compaction blocker
int RocksDBEngine::insertCompactionBlocker(TRI_vocbase_t* vocbase, double ttl,
TRI_voc_tick_t& id) {
//THROW_ARANGO_NOT_YET_IMPLEMENTED();
// THROW_ARANGO_NOT_YET_IMPLEMENTED();
return TRI_ERROR_NO_ERROR;
}
/// @brief touch an existing compaction blocker
int RocksDBEngine::extendCompactionBlocker(TRI_vocbase_t* vocbase,
TRI_voc_tick_t id, double ttl) {
//THROW_ARANGO_NOT_YET_IMPLEMENTED();
// THROW_ARANGO_NOT_YET_IMPLEMENTED();
return TRI_ERROR_NO_ERROR;
}
@ -784,8 +792,8 @@ Result RocksDBEngine::dropDatabase(TRI_voc_tick_t id) {
if (indexes.isArray()) {
for (auto const& it : VPackArrayIterator(indexes)) {
// delete index documents
uint64_t objectId = basics::VelocyPackHelper::stringUInt64(
it, "objectId");
uint64_t objectId =
basics::VelocyPackHelper::stringUInt64(it, "objectId");
RocksDBKeyBounds bounds = RocksDBKeyBounds::IndexEntries(objectId);
res = rocksutils::removeLargeRange(_db, bounds);
if (res.fail()) {

View File

@ -235,11 +235,11 @@ class RocksDBEngine final : public StorageEngine {
arangodb::velocypack::Slice info) override;
void addParametersForNewIndex(arangodb::velocypack::Builder& builder,
arangodb::velocypack::Slice info) override;
rocksdb::TransactionDB* db() const { return _db; }
RocksDBComparator* cmp() const { return _cmp.get(); }
int writeCreateCollectionMarker(TRI_voc_tick_t databaseId, TRI_voc_cid_t id,
VPackSlice const& slice);
@ -266,6 +266,9 @@ class RocksDBEngine final : public StorageEngine {
std::unique_ptr<RocksDBCounterManager> _counterManager;
uint64_t _maxTransactionSize;
uint64_t _intermediateTransactionSize;
uint64_t _intermediateTransactionNumber;
bool _intermediateTransactionEnabled;
};
}
#endif

View File

@ -74,16 +74,21 @@ void RocksDBSavePoint::rollback() {
}
/// @brief transaction type
RocksDBTransactionState::RocksDBTransactionState(TRI_vocbase_t* vocbase, uint64_t maxTransSize)
RocksDBTransactionState::RocksDBTransactionState(
TRI_vocbase_t* vocbase, uint64_t maxTransSize,
bool intermediateTransactionEnabled, uint64_t intermediateTransactionSize,
uint64_t intermediateTransactionNumber)
: TransactionState(vocbase),
_rocksReadOptions(),
_cacheTx(nullptr),
_transactionSize(0),
_maxTransactionSize(maxTransSize),
_intermediateTransactionSize(intermediateTransactionSize),
_intermediateTransactionNumber(intermediateTransactionNumber),
_numInserts(0),
_numUpdates(0),
_numRemoves(0)
{}
_numRemoves(0),
_intermediateTransactionEnabled(intermediateTransactionEnabled) {}
/// @brief free a transaction container
RocksDBTransactionState::~RocksDBTransactionState() {
@ -159,7 +164,9 @@ Result RocksDBTransactionState::commitTransaction(
<< " transaction";
TRI_ASSERT(_status == transaction::Status::RUNNING);
TRI_IF_FAILURE("TransactionWriteCommitMarker") { return Result(TRI_ERROR_DEBUG); }
TRI_IF_FAILURE("TransactionWriteCommitMarker") {
return Result(TRI_ERROR_DEBUG);
}
arangodb::Result result;
@ -262,15 +269,15 @@ Result RocksDBTransactionState::abortTransaction(
}
/// @brief add an operation for a transaction collection
Result RocksDBTransactionState::addOperation(
RocksDBOperationResult RocksDBTransactionState::addOperation(
TRI_voc_cid_t cid, TRI_voc_rid_t revisionId,
TRI_voc_document_operation_e operationType,
uint64_t operationSize, uint64_t keySize) {
Result res;
TRI_voc_document_operation_e operationType, uint64_t operationSize,
uint64_t keySize) {
RocksDBOperationResult res;
uint64_t newSize = _transactionSize + operationSize + keySize;
if(_maxTransactionSize < newSize){
//we hit the transaction size limit
if (_maxTransactionSize < newSize) {
// we hit the transaction size limit
res.reset(TRI_ERROR_RESOURCE_LIMIT, "maximal transaction limit reached");
return res;
}
@ -283,7 +290,7 @@ Result RocksDBTransactionState::addOperation(
"collection not found in transaction state");
}
//sould not fail or fail with exception
// sould not fail or fail with exception
collection->addOperation(operationType, operationSize, revisionId);
switch (operationType) {
@ -303,5 +310,17 @@ Result RocksDBTransactionState::addOperation(
}
_transactionSize = newSize;
auto numOperations = _numInserts + _numUpdates + _numRemoves;
// signal if intermediate commit is required
// this will be done if intermeadiate transactions are endabled
// and either the number of operations or the transaction size
// has reached the limit
if (_intermediateTransactionEnabled &&
(_intermediateTransactionNumber <= numOperations ||
_intermediateTransactionSize <= newSize)) {
res.commitRequired(true);
}
return res;
}

View File

@ -31,6 +31,7 @@
#include "Transaction/Methods.h"
#include "VocBase/AccessMode.h"
#include "VocBase/voc-types.h"
#include "RocksDBEngine/RocksDBCommon.h"
#include <rocksdb/options.h>
#include <rocksdb/status.h>
@ -70,7 +71,12 @@ class RocksDBSavePoint {
/// @brief transaction type
class RocksDBTransactionState final : public TransactionState {
public:
explicit RocksDBTransactionState(TRI_vocbase_t* vocbase, uint64_t maxOperationSize);
explicit RocksDBTransactionState(TRI_vocbase_t* vocbase
, uint64_t maxOperationSize
, bool intermediateTransactionEnabled
, uint64_t intermediateTransactionSize
, uint64_t intermediateTransactionNumber
);
~RocksDBTransactionState();
/// @brief begin a transaction
@ -85,7 +91,7 @@ class RocksDBTransactionState final : public TransactionState {
uint64_t numInserts() const { return _numInserts; }
uint64_t numUpdates() const { return _numUpdates; }
uint64_t numRemoves() const { return _numRemoves; }
inline bool hasOperations() const {
return (_numInserts > 0 || _numRemoves > 0 || _numUpdates > 0);
}
@ -95,9 +101,10 @@ class RocksDBTransactionState final : public TransactionState {
}
/// @brief add an operation for a transaction collection
Result addOperation(TRI_voc_cid_t collectionId, TRI_voc_rid_t revisionId,
TRI_voc_document_operation_e operationType,
uint64_t operationSize, uint64_t keySize);
RocksDBOperationResult addOperation(
TRI_voc_cid_t collectionId, TRI_voc_rid_t revisionId,
TRI_voc_document_operation_e operationType, uint64_t operationSize,
uint64_t keySize);
rocksdb::Transaction* rocksTransaction() {
TRI_ASSERT(_rocksTransaction != nullptr);
@ -111,11 +118,18 @@ class RocksDBTransactionState final : public TransactionState {
rocksdb::WriteOptions _rocksWriteOptions;
rocksdb::ReadOptions _rocksReadOptions;
cache::Transaction* _cacheTx;
uint64_t _transactionSize; // current transaction size
uint64_t _maxTransactionSize; // a transaction may not become bigger than this value
// current transaction size
uint64_t _transactionSize;
// a transaction may not become bigger than this value
uint64_t _maxTransactionSize;
// if a transaction gets bigger than this value and intermediate trasnations
// are endabled then a commit will be done
uint64_t _intermediateTransactionSize;
uint64_t _intermediateTransactionNumber;
uint64_t _numInserts;
uint64_t _numUpdates;
uint64_t _numRemoves;
bool _intermediateTransactionEnabled;
};
}