1
0
Fork 0

Fixing an issue with intermediate commits (#3975)

This commit is contained in:
Simon Grätzer 2017-12-12 09:17:18 +01:00 committed by Jan
parent 4ce91ac16d
commit dce677720d
10 changed files with 80 additions and 110 deletions

View File

@ -286,7 +286,7 @@ Transaction* Manager::beginTransaction(bool readOnly) {
return _transactions.begin(readOnly);
}
void Manager::endTransaction(Transaction* tx) { _transactions.end(tx); }
void Manager::endTransaction(Transaction* tx) noexcept { _transactions.end(tx); }
bool Manager::post(std::function<void()> fn) { return _schedulerPost(fn); }

View File

@ -157,7 +157,7 @@ class Manager {
//////////////////////////////////////////////////////////////////////////////
/// @brief Signal the end of a transaction. Deletes the passed Transaction.
//////////////////////////////////////////////////////////////////////////////
void endTransaction(Transaction* tx);
void endTransaction(Transaction* tx) noexcept;
//////////////////////////////////////////////////////////////////////////////
/// @brief Post a function to the scheduler

View File

@ -63,7 +63,7 @@ Transaction* TransactionManager::begin(bool readOnly) {
return tx;
}
void TransactionManager::end(Transaction* tx) {
void TransactionManager::end(Transaction* tx) noexcept {
TRI_ASSERT(tx != nullptr);
lock();

View File

@ -63,7 +63,7 @@ class TransactionManager {
//////////////////////////////////////////////////////////////////////////////
/// @brief Signal the end of a transaction. Deletes the passed Transaction.
//////////////////////////////////////////////////////////////////////////////
void end(Transaction* tx);
void end(Transaction* tx) noexcept;
//////////////////////////////////////////////////////////////////////////////
/// @brief Return the current window identifier.

View File

@ -535,13 +535,9 @@ void HeartbeatThread::runSingleServer() {
if (result.successful()) { // sucessfull leadership takeover
LOG_TOPIC(INFO, Logger::HEARTBEAT) << "All your base are belong to us";
if (applier->isRunning()) {
applier->stopAndJoin();
}
ServerState::instance()->setFoxxmaster(_myId);
ServerState::setServerMode(ServerState::Mode::DEFAULT);
continue; // nothing more to do here
leaderSlice = myIdBuilder.slice();
// intentionally falls through to case 2
} else if (result.httpCode() == TRI_ERROR_HTTP_PRECONDITION_FAILED) {
// we did not become leader, someone else is, response contains
// current value in agency
@ -611,12 +607,12 @@ void HeartbeatThread::runSingleServer() {
LOG_TOPIC(INFO, Logger::HEARTBEAT) << "Starting replication from " << endpoint;
ReplicationApplierConfiguration config = applier->configuration();
config._endpoint = endpoint;
config._autoResync = true;
config._autoResyncRetries = 3;
if (config._jwt.empty()) {
config._jwt = auth->jwtToken();
}
config._endpoint = endpoint;
config._autoResync = true;
config._autoResyncRetries = 3;
// TODO: how do we initially configure the applier
// start initial synchronization

View File

@ -331,12 +331,7 @@ void RestCollectionHandler::handleCommandPut() {
SingleCollectionTransaction trx(ctx, coll->cid(),
AccessMode::Type::EXCLUSIVE);
// we must read our own writes in this transaction for the deletion
// checks that are executed at the end of truncate in maintainer mode
trx.addHint(transaction::Hints::Hint::READ_OWN_WRITES);
res = trx.begin();
if (res.ok()) {
OperationResult result = trx.truncate(coll->name(), opts);
res = trx.finish(result.result);

View File

@ -914,8 +914,6 @@ Result RestReplicationHandler::processRestoreCollection(
AccessMode::Type::EXCLUSIVE);
// to turn off waitForSync!
trx.addHint(transaction::Hints::Hint::RECOVERY);
trx.addHint(transaction::Hints::Hint::READ_OWN_WRITES);
res = trx.begin();
if (!res.ok()) {
return res;

View File

@ -36,7 +36,6 @@
#include "RocksDBEngine/RocksDBLogValue.h"
#include "RocksDBEngine/RocksDBMethods.h"
#include "RocksDBEngine/RocksDBTransactionCollection.h"
#include "StorageEngine/EngineSelectorFeature.h"
#include "StorageEngine/StorageEngine.h"
#include "StorageEngine/TransactionCollection.h"
#include "StorageEngine/TransactionManager.h"
@ -61,6 +60,7 @@ struct RocksDBTransactionData final : public TransactionData {};
RocksDBTransactionState::RocksDBTransactionState(
TRI_vocbase_t* vocbase, transaction::Options const& options)
: TransactionState(vocbase, options),
_rocksTransaction(nullptr),
_snapshot(nullptr),
_rocksWriteOptions(),
_rocksReadOptions(),
@ -76,17 +76,7 @@ RocksDBTransactionState::RocksDBTransactionState(
/// @brief free a transaction container
RocksDBTransactionState::~RocksDBTransactionState() {
if (_cacheTx != nullptr) {
// note: endTransaction() will delete _cacheTrx!
CacheManagerFeature::MANAGER->endTransaction(_cacheTx);
_cacheTx = nullptr;
}
if (_snapshot != nullptr) {
rocksdb::TransactionDB* db = rocksutils::globalRocksDB();
db->ReleaseSnapshot(_snapshot);
_snapshot = nullptr;
}
cleanupTransaction();
for (auto& it : _keys) {
delete it;
}
@ -124,9 +114,8 @@ Result RocksDBTransactionState::beginTransaction(transaction::Hints hints) {
// get a new id
_id = TRI_NewTickServer();
// register a protector
auto data =
std::make_unique<RocksDBTransactionData>(); // intentionally empty
// register a protector (intentionally empty)
auto data = std::make_unique<RocksDBTransactionData>();
TransactionManagerFeature::manager()->registerTransaction(_id,
std::move(data));
@ -152,8 +141,7 @@ Result RocksDBTransactionState::beginTransaction(transaction::Hints hints) {
} else {
TRI_ASSERT(_snapshot == nullptr);
createTransaction();
bool readWrites = hasHint(transaction::Hints::Hint::READ_OWN_WRITES);
if (readWrites) {
if (hasHint(transaction::Hints::Hint::SINGLE_OPERATION)) {
_rocksReadOptions.snapshot = _rocksTransaction->GetSnapshot();
} else {
TRI_ASSERT(_options.intermediateCommitCount != UINT64_MAX ||
@ -185,18 +173,21 @@ Result RocksDBTransactionState::beginTransaction(transaction::Hints hints) {
// create a rocksdb transaction. will only be called for write transactions
void RocksDBTransactionState::createTransaction() {
TRI_ASSERT(!_rocksTransaction);
TRI_ASSERT(!isReadOnlyTransaction());
// start rocks transaction
rocksdb::TransactionDB* db = rocksutils::globalRocksDB();
rocksdb::TransactionOptions trxOpts;
trxOpts.set_snapshot = true;
trxOpts.deadlock_detect = !hasHint(transaction::Hints::Hint::NO_DLD);
// unclear performance implications do not use for now
//trxOpts.deadlock_detect = !hasHint(transaction::Hints::Hint::NO_DLD);
_rocksTransaction.reset(db->BeginTransaction(_rocksWriteOptions, trxOpts));
TRI_ASSERT(_rocksTransaction == nullptr ||
_rocksTransaction->GetState() == rocksdb::Transaction::COMMITED ||
_rocksTransaction->GetState() == rocksdb::Transaction::ROLLEDBACK);
_rocksTransaction = db->BeginTransaction(_rocksWriteOptions, trxOpts, _rocksTransaction);
// set begin marker
// add transaction begin marker
if (!hasHint(transaction::Hints::Hint::SINGLE_OPERATION)) {
RocksDBLogValue header =
RocksDBLogValue::BeginTransaction(_vocbase->id(), _id);
@ -206,6 +197,22 @@ void RocksDBTransactionState::createTransaction() {
++_numLogdata;
#endif
}
TRI_ASSERT(_lastUsedCollection == 0);
}
void RocksDBTransactionState::cleanupTransaction() noexcept {
delete _rocksTransaction;
_rocksTransaction = nullptr;
if (_cacheTx != nullptr) {
// note: endTransaction() will delete _cacheTrx!
CacheManagerFeature::MANAGER->endTransaction(_cacheTx);
_cacheTx = nullptr;
}
if (_snapshot != nullptr) {
rocksdb::TransactionDB* db = rocksutils::globalRocksDB();
db->ReleaseSnapshot(_snapshot);
_snapshot = nullptr;
}
}
arangodb::Result RocksDBTransactionState::internalCommit() {
@ -213,8 +220,7 @@ arangodb::Result RocksDBTransactionState::internalCommit() {
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
uint64_t x = _numInserts + _numRemoves + _numUpdates;
if (hasHint(transaction::Hints::Hint::SINGLE_OPERATION)) {
TRI_ASSERT(x <= 1);
TRI_ASSERT(_numLogdata == x);
TRI_ASSERT(x <= 1 && _numLogdata == x);
} else {
if (_numLogdata < 1 + (x > 0 ? 1 : 0) + _numRemoves) {
LOG_TOPIC(ERR, Logger::FIXME)
@ -246,54 +252,35 @@ arangodb::Result RocksDBTransactionState::internalCommit() {
++_numCommits;
result = rocksutils::convertStatus(_rocksTransaction->Commit());
rocksdb::SequenceNumber latestSeq =
rocksutils::globalRocksDB()->GetLatestSequenceNumber();
if (!result.ok()) {
return result;
}
if (_cacheTx != nullptr) {
// note: endTransaction() will delete _cacheTx!
CacheManagerFeature::MANAGER->endTransaction(_cacheTx);
_cacheTx = nullptr;
}
for (auto& trxCollection : _collections) {
RocksDBTransactionCollection* collection =
static_cast<RocksDBTransactionCollection*>(trxCollection);
int64_t adjustment = collection->numInserts() - collection->numRemoves();
if (collection->numInserts() != 0 || collection->numRemoves() != 0 ||
collection->revision() != 0) {
RocksDBCollection* coll = static_cast<RocksDBCollection*>(
trxCollection->collection()->getPhysical());
coll->adjustNumberDocuments(adjustment);
coll->setRevision(collection->revision());
RocksDBEngine* engine =
static_cast<RocksDBEngine*>(EngineSelectorFeature::ENGINE);
RocksDBCounterManager::CounterAdjustment update(
latestSeq, collection->numInserts(), collection->numRemoves(),
collection->revision());
engine->counterManager()->updateCounter(coll->objectId(), update);
rocksdb::SequenceNumber latestSeq = rocksutils::globalRocksDB()->GetLatestSequenceNumber();
if (result.ok()) {
for (auto& trxCollection : _collections) {
RocksDBTransactionCollection* collection =
static_cast<RocksDBTransactionCollection*>(trxCollection);
int64_t adjustment = collection->numInserts() - collection->numRemoves();
if (collection->numInserts() != 0 || collection->numRemoves() != 0 ||
collection->revision() != 0) {
RocksDBCollection* coll = static_cast<RocksDBCollection*>(trxCollection->collection()->getPhysical());
coll->adjustNumberDocuments(adjustment);
coll->setRevision(collection->revision());
RocksDBEngine* engine = rocksutils::globalRocksEngine();
RocksDBCounterManager::CounterAdjustment update(latestSeq, collection->numInserts(), collection->numRemoves(),
collection->revision());
engine->counterManager()->updateCounter(coll->objectId(), update);
}
// we need this in case of an intermediate commit. The number of
// initial documents is adjusted and numInserts / removes is set to 0
collection->commitCounts();
}
// we need this in case of an intermediate commit. The number of
// initial documents is adjusted and numInserts / removes is set to 0
collection->commitCounts();
}
} else {
// don't write anything if the transaction is empty
result = rocksutils::convertStatus(_rocksTransaction->Rollback());
if (_cacheTx != nullptr) {
// note: endTransaction() will delete _cacheTx!
CacheManagerFeature::MANAGER->endTransaction(_cacheTx);
_cacheTx = nullptr;
}
}
_rocksTransaction.reset();
return result;
}
@ -312,17 +299,18 @@ Result RocksDBTransactionState::commitTransaction(
if (_nestingLevel == 0) {
if (_rocksTransaction != nullptr) {
res = internalCommit();
if (!res.ok()) {
abortTransaction(activeTrx);
}
}
if (res.ok()) {
updateStatus(transaction::Status::COMMITTED);
cleanupTransaction(); // deletes trx
} else {
abortTransaction(activeTrx); // deletes trx
}
TRI_ASSERT(!_rocksTransaction && !_cacheTx && !_snapshot);
}
unuseCollections(_nestingLevel);
return res;
}
@ -333,32 +321,23 @@ Result RocksDBTransactionState::abortTransaction(
<< " transaction";
TRI_ASSERT(_status == transaction::Status::RUNNING);
Result result;
if (_nestingLevel == 0) {
if (_rocksTransaction != nullptr) {
rocksdb::Status status = _rocksTransaction->Rollback();
result = rocksutils::convertStatus(status);
if (_cacheTx != nullptr) {
// note: endTransaction() will delete _cacheTx!
CacheManagerFeature::MANAGER->endTransaction(_cacheTx);
_cacheTx = nullptr;
}
_rocksTransaction.reset();
}
cleanupTransaction(); // deletes trx
updateStatus(transaction::Status::ABORTED);
if (hasOperations()) {
// must clean up the query cache because the transaction
// may have queried something via AQL that is now rolled back
clearQueryCache();
}
TRI_ASSERT(!_rocksTransaction && !_cacheTx && !_snapshot);
}
unuseCollections(_nestingLevel);
return result;
}
@ -522,9 +501,6 @@ void RocksDBTransactionState::donateSnapshot(rocksdb::Snapshot const* snap) {
rocksdb::Snapshot const* RocksDBTransactionState::stealSnapshot() {
TRI_ASSERT(_snapshot != nullptr);
TRI_ASSERT(isReadOnlyTransaction());
if (_status != transaction::Status::COMMITTED && _status != transaction::Status::ABORTED) {
LOG_TOPIC(ERR, Logger::FIXME) << "OOOH: " << (int) _status;
}
TRI_ASSERT(_status == transaction::Status::COMMITTED || _status == transaction::Status::ABORTED);
rocksdb::Snapshot const* snap = _snapshot;
_snapshot = nullptr;
@ -549,7 +525,8 @@ void RocksDBTransactionState::checkIntermediateCommit(uint64_t newSize) {
// "transaction size" counters have reached their limit
if (_options.intermediateCommitCount <= numOperations ||
_options.intermediateCommitSize <= newSize) {
// LOG_TOPIC(ERR, Logger::FIXME) << "INTERMEDIATE COMMIT!";
TRI_ASSERT(!hasHint(transaction::Hints::Hint::SINGLE_OPERATION));
LOG_TOPIC(DEBUG, Logger::ROCKSDB) << "INTERMEDIATE COMMIT!";
internalCommit();
_lastUsedCollection = 0;
_numInternal = 0;

View File

@ -112,10 +112,10 @@ class RocksDBTransactionState final : public TransactionState {
RocksDBMethods* rocksdbMethods();
/// @brief insert a snapshot into a (not yet started) transaction.
/// Only ever valid on a read-only transaction
/// Only ever valid on a trx in CREATED state
void donateSnapshot(rocksdb::Snapshot const* snap);
/// @brief steal snapshot of this transaction. Only ever valid on a
/// read-only transaction
/// @brief steal snapshot of this transaction.
/// Does not work on a single operation
rocksdb::Snapshot const* stealSnapshot();
/// @brief Rocksdb sequence number of snapshot. Works while trx
@ -147,13 +147,18 @@ class RocksDBTransactionState final : public TransactionState {
void returnRocksDBKey(RocksDBKey* key);
private:
/// @brief create a new rocksdb transaction
void createTransaction();
/// @brief delete transaction, snapshot and cache trx
void cleanupTransaction() noexcept;
/// @brief internally commit a transaction
arangodb::Result internalCommit();
/// @brief check sizes and call internalCommit if too big
void checkIntermediateCommit(uint64_t newSize);
private:
/// @brief rocksdb transaction may be null for read only transactions
std::unique_ptr<rocksdb::Transaction> _rocksTransaction;
rocksdb::Transaction* _rocksTransaction;
/// @brief rocksdb snapshot, is null if _rocksTransaction is set
rocksdb::Snapshot const* _snapshot;
/// @brief write options used

View File

@ -46,8 +46,7 @@ class Hints {
NO_COMPACTION_LOCK = 128,
NO_USAGE_LOCK = 256,
RECOVERY = 512,
NO_DLD = 1024, // disable deadlock detection
READ_OWN_WRITES = 2048 // do not use snapshot
NO_DLD = 1024 // disable deadlock detection
};
Hints() : _value(0) {}