1
0
Fork 0

Propper commit Sequence Numbers (#6958)

This commit is contained in:
Simon 2018-10-18 18:25:34 +02:00 committed by Jan
parent 18de63c7c8
commit c2b6fb99ba
18 changed files with 153 additions and 123 deletions

View File

@ -437,6 +437,12 @@ class Transaction {
virtual void SetLogNumber(uint64_t log) { log_number_ = log; }
virtual uint64_t GetLogNumber() const { return log_number_; }
// Sequence number in WAL where operations start, only valid after
// a successfull commit with the WRITE_COMMITTED db txn policy
virtual SequenceNumber GetCommitedSeqNumber() const {
return 0;
}
virtual Status SetName(const TransactionName& name) = 0;

View File

@ -125,7 +125,8 @@ bool PessimisticTransaction::IsExpired() const {
WriteCommittedTxn::WriteCommittedTxn(TransactionDB* txn_db,
const WriteOptions& write_options,
const TransactionOptions& txn_options)
: PessimisticTransaction(txn_db, write_options, txn_options){};
: PessimisticTransaction(txn_db, write_options, txn_options),
_commited_seq_nr(0) {};
Status PessimisticTransaction::CommitBatch(WriteBatch* batch) {
TransactionKeyMap keys_to_unlock;
@ -228,10 +229,15 @@ Status WriteCommittedTxn::PrepareInternal() {
WriteOptions write_options = write_options_;
write_options.disableWAL = false;
WriteBatchInternal::MarkEndPrepare(GetWriteBatch()->GetWriteBatch(), name_);
uint64_t seq_used = kMaxSequenceNumber;
Status s =
db_impl_->WriteImpl(write_options, GetWriteBatch()->GetWriteBatch(),
/*callback*/ nullptr, &log_number_, /*log ref*/ 0,
/* disable_memtable*/ true);
/*disable_memtable*/ true, &seq_used);
assert(!s.ok() || seq_used != kMaxSequenceNumber);
if (s.ok()) {
_commited_seq_nr = seq_used;
}
return s;
}
@ -320,7 +326,14 @@ Status PessimisticTransaction::Commit() {
}
Status WriteCommittedTxn::CommitWithoutPrepareInternal() {
Status s = db_->Write(write_options_, GetWriteBatch()->GetWriteBatch());
uint64_t seq_used = kMaxSequenceNumber;
auto s = db_impl_->WriteImpl(write_options_, GetWriteBatch()->GetWriteBatch(),
/*callback*/ nullptr, /*log nr*/ nullptr,
/*log ref*/ 0, /*disable_memtable*/ false, &seq_used);
assert(!s.ok() || seq_used != kMaxSequenceNumber);
if (s.ok()) {
_commited_seq_nr = seq_used;
}
return s;
}

View File

@ -201,6 +201,11 @@ class WriteCommittedTxn : public PessimisticTransaction {
const TransactionOptions& txn_options);
virtual ~WriteCommittedTxn() {}
SequenceNumber GetCommitedSeqNumber() const override {
assert(txn_state_ == COMMITED);
return _commited_seq_nr;
}
private:
Status PrepareInternal() override;
@ -216,6 +221,10 @@ class WriteCommittedTxn : public PessimisticTransaction {
// No copying allowed
WriteCommittedTxn(const WriteCommittedTxn&);
void operator=(const WriteCommittedTxn&);
protected:
// seq_nr of WriteBatch in WAL
SequenceNumber _commited_seq_nr;
};
} // namespace rocksdb

View File

@ -181,6 +181,7 @@ auth::TokenCache::Entry auth::TokenCache::checkAuthenticationJWT(
WRITE_LOCKER(writeLocker, _jwtLock);
// intentionally copy the entry from the cache
auth::TokenCache::Entry const& entry = _jwtCache.get(jwt);
// would have thrown if not found
if (entry.expired()) {
try {
_jwtCache.remove(jwt);
@ -215,13 +216,6 @@ auth::TokenCache::Entry auth::TokenCache::checkAuthenticationJWT(
return auth::TokenCache::Entry::Unauthenticated();
}
auth::TokenCache::Entry entry = validateJwtBody(body);
if (!entry._authenticated) {
LOG_TOPIC(TRACE, arangodb::Logger::AUTHENTICATION)
<< "Couldn't validate jwt body " << body;
return auth::TokenCache::Entry::Unauthenticated();
}
std::string const message = header + "." + body;
if (!validateJwtHMAC256Signature(message, signature)) {
LOG_TOPIC(TRACE, arangodb::Logger::AUTHENTICATION)
@ -229,6 +223,13 @@ auth::TokenCache::Entry auth::TokenCache::checkAuthenticationJWT(
<< _jwtSecret;
return auth::TokenCache::Entry::Unauthenticated();
}
auth::TokenCache::Entry entry = validateJwtBody(body);
if (!entry._authenticated) {
LOG_TOPIC(TRACE, arangodb::Logger::AUTHENTICATION)
<< "Couldn't validate jwt body " << body;
return auth::TokenCache::Entry::Unauthenticated();
}
WRITE_LOCKER(writeLocker, _jwtLock);
_jwtCache.put(jwt, entry);

View File

@ -571,6 +571,9 @@ Result auth::UserManager::accessUser(std::string const& user,
}
bool auth::UserManager::userExists(std::string const& user) {
if (user.empty()) {
return false;
}
loadFromDB();
READ_LOCKER(readGuard, _userCacheLock);

View File

@ -640,25 +640,20 @@ Result RocksDBCollection::truncate(transaction::Methods* trx,
// non-transactional truncate optimization. We perform a bunch of
// range deletes and circumwent the normal rocksdb::Transaction.
// no savepoint needed here
rocksdb::WriteBatch batch;
// add the assertion again here, so we are sure we can use RangeDeletes
TRI_ASSERT(static_cast<RocksDBEngine*>(EngineSelectorFeature::ENGINE)->canUseRangeDeleteInWal());
auto log = RocksDBLogValue::CollectionTruncate(trx->vocbase().id(),
_logicalCollection.id(), _objectId);
rocksdb::Status s = batch.PutLogData(log.slice());
if (!s.ok()) {
return rocksutils::convertStatus(s);
}
TRI_ASSERT(!state->hasOperations()); // not allowed
TRI_IF_FAILURE("RocksDBRemoveLargeRangeOn") {
return Result(TRI_ERROR_DEBUG);
}
RocksDBEngine* engine = rocksutils::globalRocksEngine();
// add the assertion again here, so we are sure we can use RangeDeletes
TRI_ASSERT(engine->canUseRangeDeleteInWal());
rocksdb::WriteBatch batch;
// delete documents
RocksDBKeyBounds bounds = RocksDBKeyBounds::CollectionDocuments(_objectId);
s = batch.DeleteRange(bounds.columnFamily(), bounds.start(), bounds.end());
rocksdb::Status s = batch.DeleteRange(bounds.columnFamily(), bounds.start(), bounds.end());
if (!s.ok()) {
return rocksutils::convertStatus(s);
}
@ -676,20 +671,31 @@ Result RocksDBCollection::truncate(transaction::Methods* trx,
idx->afterTruncate(); // clears caches / clears links (if applicable)
}
}
state->addTruncateOperation(_logicalCollection.id());
// now add the log entry so we can recover the correct count
auto log = RocksDBLogValue::CollectionTruncate(trx->vocbase().id(),
_logicalCollection.id(), _objectId);
s = batch.PutLogData(log.slice());
if (!s.ok()) {
return rocksutils::convertStatus(s);
}
rocksdb::WriteOptions wo;
s = rocksutils::globalRocksDB()->Write(wo, &batch);
if (!s.ok()) {
return rocksutils::convertStatus(s);
}
TRI_ASSERT(state->numRemoves() == _numberDocuments);
if (_numberDocuments > 64 * 1024) {
rocksdb::SequenceNumber seq = rocksutils::latestSequenceNumber();
uint64_t numDocs = _numberDocuments.exchange(0);
RocksDBSettingsManager::CounterAdjustment update(seq, /*numInserts*/0,
/*numRemoves*/numDocs, /*revision*/0);
engine->settingsManager()->updateCounter(_objectId, update);
if (numDocs > 64 * 1024) {
// also compact the ranges in order to speed up all further accesses
compact();
}
TRI_ASSERT(!state->hasOperations()); // not allowed
return Result{};
}

View File

@ -518,10 +518,10 @@ class RocksDBCuckooIndexEstimator {
void removeBlocker(uint64_t trxId) {
WRITE_LOCKER(locker, _lock);
auto it = _blockers.find(trxId);
if (_blockers.end() != it) {
if (ADB_LIKELY(_blockers.end() != it)) {
auto cross = _blockersBySeq.find(std::make_pair(it->second, it->first));
TRI_ASSERT(_blockersBySeq.end() != cross);
if (_blockersBySeq.end() != cross) {
if (ADB_LIKELY(_blockersBySeq.end() != cross)) {
_blockersBySeq.erase(cross);
}
_blockers.erase(it);

View File

@ -291,6 +291,12 @@ void RocksDBEngine::validateOptions(
<< "supported on this platform";
}
#endif
if (_pruneWaitTimeInitial < 10) {
LOG_TOPIC(WARN, arangodb::Logger::ENGINES)
<< "consider increasing the value for --rocksdb.wal-file-timeout-initial. "
<< "Replication clients might have trouble to get in sync";
}
}
// preparation phase for storage engine. can be used for internal setup.

View File

@ -277,15 +277,6 @@ void RocksDBTransactionCollection::addOperation(
}
}
void RocksDBTransactionCollection::addTruncateOperation() {
TRI_ASSERT(_numInserts == 0 && _numUpdates == 0 && _numRemoves == 0);
if (!isLocked() || _accessType != AccessMode::Type::EXCLUSIVE) {
TRI_ASSERT(false);
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "collection must be exlusively locked");
}
_numRemoves += _initialNumberDocuments + _numInserts;
}
void RocksDBTransactionCollection::prepareCommit(uint64_t trxId,
uint64_t preCommitSeq) {
TRI_ASSERT(_collection != nullptr);
@ -347,8 +338,8 @@ void RocksDBTransactionCollection::commitCounts(uint64_t trxId,
auto ridx = static_cast<RocksDBIndex*>(idx.get());
auto estimator = ridx->estimator();
if (estimator) {
estimator->bufferUpdates(commitSeq, std::move(pair.second.first),
std::move(pair.second.second));
estimator->bufferUpdates(commitSeq, std::move(pair.second.inserts),
std::move(pair.second.removals));
estimator->removeBlocker(trxId);
}
}
@ -363,13 +354,13 @@ void RocksDBTransactionCollection::commitCounts(uint64_t trxId,
void RocksDBTransactionCollection::trackIndexInsert(uint64_t idxObjectId,
uint64_t hash) {
// First list is Inserts
_trackedIndexOperations[idxObjectId].first.emplace_back(hash);
_trackedIndexOperations[idxObjectId].inserts.emplace_back(hash);
}
void RocksDBTransactionCollection::trackIndexRemove(uint64_t idxObjectId,
uint64_t hash) {
// Second list is Removes
_trackedIndexOperations[idxObjectId].second.emplace_back(hash);
_trackedIndexOperations[idxObjectId].removals.emplace_back(hash);
}
/// @brief lock a collection

View File

@ -89,11 +89,6 @@ class RocksDBTransactionCollection final : public TransactionCollection {
void addOperation(TRI_voc_document_operation_e operationType,
TRI_voc_rid_t revisionId);
/// @brief will perform _numRemoves = _initialNumberDocuments
/// be aware that this is only a valid operation under an
/// exclusive collection lock
void addTruncateOperation();
/**
* @brief Prepare collection for commit by placing index blockers
* @param trxId Active transaction ID
@ -142,12 +137,14 @@ class RocksDBTransactionCollection final : public TransactionCollection {
uint64_t _numRemoves;
bool _usageLocked;
struct IndexOperations {
std::vector<uint64_t> inserts;
std::vector<uint64_t> removals;
};
/// @brief A list where all indexes with estimates can store their operations
/// Will be applied to the inserter on commit and not applied on abort
std::unordered_map<uint64_t,
std::pair<std::vector<uint64_t>, std::vector<uint64_t>>>
_trackedIndexOperations;
std::unordered_map<uint64_t, IndexOperations> _trackedIndexOperations;
};
}

View File

@ -67,7 +67,6 @@ RocksDBTransactionState::RocksDBTransactionState(
_readSnapshot(nullptr),
_rocksReadOptions(),
_cacheTx(nullptr),
_numCommits(0),
_numInserts(0),
_numUpdates(0),
_numRemoves(0),
@ -286,6 +285,7 @@ arangodb::Result RocksDBTransactionState::internalCommit() {
// begin transaction + commit transaction + n doc removes
TRI_ASSERT(_numLogdata == (2 + _numRemoves));
}
++_numCommits;
#endif
// prepare for commit on each collection, e.g. place blockers for estimators
@ -318,12 +318,20 @@ arangodb::Result RocksDBTransactionState::internalCommit() {
}
#endif
++_numCommits;
// total number of sequence ID consuming records
uint64_t numOps = _rocksTransaction->GetNumPuts() +
_rocksTransaction->GetNumDeletes() +
_rocksTransaction->GetNumMerges();
// will invaliate all counts
result = rocksutils::convertStatus(_rocksTransaction->Commit());
if (result.ok()) {
rocksdb::SequenceNumber latestSeq =
rocksutils::globalRocksDB()->GetLatestSequenceNumber();
TRI_ASSERT(numOps > 0); // simon: should hold unless we're beeing stupid
rocksdb::SequenceNumber postCommitSeq = _rocksTransaction->GetCommitedSeqNumber();
if (ADB_LIKELY(numOps > 0)) {
postCommitSeq += numOps - 1; // add to get to the next batch
}
TRI_ASSERT(postCommitSeq <= rocksutils::globalRocksDB()->GetLatestSequenceNumber());
for (auto& trxCollection : _collections) {
RocksDBTransactionCollection* collection =
@ -331,7 +339,7 @@ arangodb::Result RocksDBTransactionState::internalCommit() {
// we need this in case of an intermediate commit. The number of
// initial documents is adjusted and numInserts / removes is set to 0
// index estimator updates are buffered
collection->commitCounts(id(), latestSeq);
collection->commitCounts(id(), postCommitSeq);
committed = true;
}
@ -553,23 +561,6 @@ Result RocksDBTransactionState::addOperation(
return checkIntermediateCommit(currentSize, hasPerformedIntermediateCommit);
}
// only a valid under an exlusive lock as an only operation
void RocksDBTransactionState::addTruncateOperation(TRI_voc_cid_t cid) {
auto tcoll = static_cast<RocksDBTransactionCollection*>(findCollection(cid));
if (tcoll == nullptr) {
std::string message = "collection '" + std::to_string(cid) +
"' not found in transaction state";
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, message);
}
tcoll->addTruncateOperation();
_numRemoves += tcoll->numRemoves();
TRI_ASSERT(_numInserts == 0 && _numUpdates == 0);
TRI_ASSERT(!hasHint(transaction::Hints::Hint::SINGLE_OPERATION));
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
_numLogdata += _numRemoves; // cheat our own sanity checks
#endif
}
RocksDBMethods* RocksDBTransactionState::rocksdbMethods() {
TRI_ASSERT(_rocksMethods);
return _rocksMethods.get();

View File

@ -92,7 +92,9 @@ class RocksDBTransactionState final : public TransactionState {
/// @brief abort a transaction
Result abortTransaction(transaction::Methods* trx) override;
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
uint64_t numCommits() const { return _numCommits; }
#endif
uint64_t numInserts() const { return _numInserts; }
uint64_t numUpdates() const { return _numUpdates; }
uint64_t numRemoves() const { return _numRemoves; }
@ -117,11 +119,7 @@ class RocksDBTransactionState final : public TransactionState {
TRI_voc_rid_t revisionId, TRI_voc_document_operation_e opType,
bool& hasPerformedIntermediateCommit);
/// @brief will perform _numRemoves = _initialNumberDocuments
/// be aware that this is only a valid operation under an
/// exclusive collection lock
void addTruncateOperation(TRI_voc_cid_t cid);
/// @brief return wrapper around rocksdb transaction
RocksDBMethods* rocksdbMethods();
/// @brief insert a snapshot into a (not yet started) transaction.
@ -202,17 +200,17 @@ class RocksDBTransactionState final : public TransactionState {
/// @brief wrapper to use outside this class to access rocksdb
std::unique_ptr<RocksDBMethods> _rocksMethods;
uint64_t _numCommits;
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
/// store the number of log entries in WAL
uint64_t _numLogdata = 0;
uint64_t _numCommits = 0;
#endif
// if a transaction gets bigger than these values then an automatic
// intermediate commit will be done
uint64_t _numInserts;
uint64_t _numUpdates;
uint64_t _numRemoves;
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
/// store the number of log entries in WAL
uint64_t _numLogdata = 0;
#endif
SmallVector<RocksDBKey*, 32>::allocator_type::arena_type _arena;
SmallVector<RocksDBKey*, 32> _keys;
/// @brief if true there key buffers will no longer be shared

View File

@ -167,27 +167,3 @@ std::string Exception::FillFormatExceptionString(char const* format, ...) {
return std::string(buffer);
}
Result basics::catchToResult(std::function<Result()> fn, int defaultError) {
// TODO check whether there are other specific exceptions we should catch
Result result{TRI_ERROR_NO_ERROR};
try {
result = fn();
} catch (arangodb::basics::Exception const& e) {
result.reset(e.code(), e.message());
} catch (std::bad_alloc const&) {
result.reset(TRI_ERROR_OUT_OF_MEMORY);
} catch (std::exception const& e) {
result.reset(defaultError, e.what());
} catch (...) {
result.reset(defaultError);
}
return result;
}
Result basics::catchVoidToResult(std::function<void()> fn, int defaultError) {
std::function<Result()> wrapped = [&fn]() -> Result {
fn();
return Result{TRI_ERROR_NO_ERROR};
};
return catchToResult(wrapped, defaultError);
}

View File

@ -106,10 +106,32 @@ class Exception final : public virtual std::exception {
int const _code;
};
Result catchToResult(std::function<Result()> fn,
int defaultError = TRI_ERROR_INTERNAL);
Result catchVoidToResult(std::function<void()> fn,
int defaultError = TRI_ERROR_INTERNAL);
template<typename F>
Result catchToResult(F&& fn, int defaultError = TRI_ERROR_INTERNAL) {
// TODO check whether there are other specific exceptions we should catch
Result result{TRI_ERROR_NO_ERROR};
try {
result = std::forward<F>(fn)();
} catch (arangodb::basics::Exception const& e) {
result.reset(e.code(), e.message());
} catch (std::bad_alloc const&) {
result.reset(TRI_ERROR_OUT_OF_MEMORY);
} catch (std::exception const& e) {
result.reset(defaultError, e.what());
} catch (...) {
result.reset(defaultError);
}
return result;
}
template<typename F>
Result catchVoidToResult(F&& fn, int defaultError = TRI_ERROR_INTERNAL) {
auto wrapped = [&fn]() -> Result {
std::forward<F>(fn)();
return Result{TRI_ERROR_NO_ERROR};
};
return catchToResult(wrapped, defaultError);
}
}
}

View File

@ -92,7 +92,7 @@ function getClusterEndpoints() {
});
assertTrue(res instanceof request.Response);
assertTrue(res.hasOwnProperty('statusCode'), JSON.stringify(res));
assertTrue(res.statusCode === 200, JSON.stringify(res));
assertEqual(res.statusCode, 200, JSON.stringify(res));
assertTrue(res.hasOwnProperty('json'));
assertTrue(res.json.hasOwnProperty('endpoints'));
assertTrue(res.json.endpoints instanceof Array);

View File

@ -94,7 +94,7 @@ function getClusterEndpoints() {
});
assertTrue(res instanceof request.Response);
assertTrue(res.hasOwnProperty('statusCode'), JSON.stringify(res));
assertTrue(res.statusCode === 200, JSON.stringify(res));
assertEqual(res.statusCode, 200, JSON.stringify(res));
assertTrue(res.hasOwnProperty('json'));
assertTrue(res.json.hasOwnProperty('endpoints'));
assertTrue(res.json.endpoints instanceof Array);
@ -124,7 +124,8 @@ function getApplierState(endpoint) {
}
});
assertTrue(res instanceof request.Response);
assertTrue(res.hasOwnProperty('statusCode') && res.statusCode === 200);
assertTrue(res.hasOwnProperty('statusCode'));
assertEqual(res.statusCode, 200, JSON.stringify(res));
assertTrue(res.hasOwnProperty('json'));
return arangosh.checkRequestResult(res.json);
}

View File

@ -1,5 +1,5 @@
/* jshint globalstrict:false, strict:false, unused: false */
/* global assertEqual, assertFalse, assertNull, assertNotNull */
/* global assertEqual, assertFalse, assertNull, assertNotNull, fail */
// //////////////////////////////////////////////////////////////////////////////
// / @brief tests for transactions
// /
@ -85,6 +85,20 @@ function recoverySuite () {
assertEqual([], db._query(query, { "@collection": c.name(), value: i }).toArray());
assertEqual([], c.edges("test/" + i));
}
internal.waitForEstimatorSync(); // make sure estimates are consistent
let indexes = c.getIndexes(true);
for (let i of indexes) {
switch (i.type) {
case 'primary':
case 'hash':
case 'edge':
assertEqual(i.selectivityEstimate, 1, JSON.stringify(i));
break;
default:
fail();
}
}
}
};

View File

@ -150,9 +150,7 @@ function CollectionTruncateFailuresSuite() {
assertEqual(e.errorNum, ERRORS.ERROR_DEBUG.code);
}
// All docments should be removed through intermediate commits.
// We have two packs that fill up those commits.
// Now validate that we endup with an empty collection.
// all commits failed, no documents removed
assertEqual(c.count(), 20000);
// Test Primary
@ -226,9 +224,7 @@ function CollectionTruncateFailuresSuite() {
assertEqual(e.errorNum, ERRORS.ERROR_DEBUG.code);
}
// All docments should be removed through intermediate commits.
// We have two packs that fill up those commits.
// Now validate that we endup with an empty collection.
// At 10k removals a intermediate commit happens, then a fail
assertEqual(c.count(), 10000);
// Test Primary