1
0
Fork 0

Merge branch 'devel' of https://github.com/arangodb/arangodb into devel

* 'devel' of https://github.com/arangodb/arangodb:
  erroneous .25 wait before each agency transaction
  added more assertions
  added assertions for transaction states
  fix reading of initial tick values from RocksDB
  Moved tests for indexBuckets to MMFilesEngine only in dump_authentication. Not needed in RocksDB
  disable optimization
  optimizations for read-only transactions
  optimizations for empty transactions
  try to fix non-deterministic behavior in test
This commit is contained in:
Jan Christoph Uhde 2017-04-28 14:44:47 +02:00
commit 5874b4d997
10 changed files with 140 additions and 89 deletions

View File

@ -1316,7 +1316,7 @@ AgencyCommResult AgencyComm::sendWithFailover(
AgencyCommResult result;
std::string url = initialUrl;
std::chrono::duration<double> waitInterval (.25); // seconds
std::chrono::duration<double> waitInterval (.0); // seconds
auto started = std::chrono::steady_clock::now();
auto timeOut = std::chrono::steady_clock::now() +
std::chrono::duration<double>(timeout);

View File

@ -701,12 +701,14 @@ int RocksDBCollection::insert(arangodb::transaction::Methods* trx,
TRI_voc_rid_t revisionId =
transaction::helpers::extractRevFromDocument(newSlice);
RocksDBSavePoint guard(rocksTransaction(trx),
trx->isSingleOperationTransaction());
RocksDBTransactionState* state =
static_cast<RocksDBTransactionState*>(trx->state());
RocksDBSavePoint guard(rocksTransaction(trx),
trx->isSingleOperationTransaction(),
[&state]() { state->resetLogState(); });
state->prepareOperation(_logicalCollection->cid(), revisionId,
StringRef(),
TRI_VOC_DOCUMENT_OPERATION_INSERT);
@ -810,7 +812,8 @@ int RocksDBCollection::update(arangodb::transaction::Methods* trx,
}
RocksDBSavePoint guard(rocksTransaction(trx),
trx->isSingleOperationTransaction());
trx->isSingleOperationTransaction(),
[&state]() { state->resetLogState(); });
// add possible log statement under guard
state->prepareOperation(_logicalCollection->cid(), revisionId, StringRef(),
@ -912,7 +915,8 @@ int RocksDBCollection::replace(
}
RocksDBSavePoint guard(rocksTransaction(trx),
trx->isSingleOperationTransaction());
trx->isSingleOperationTransaction(),
[&state]() { state->resetLogState(); });
// add possible log statement under guard
state->prepareOperation(_logicalCollection->cid(), revisionId, StringRef(),
@ -998,13 +1002,15 @@ int RocksDBCollection::remove(arangodb::transaction::Methods* trx,
return res;
}
}
RocksDBSavePoint guard(rocksTransaction(trx),
trx->isSingleOperationTransaction());
// add possible log statement under guard
RocksDBTransactionState* state =
static_cast<RocksDBTransactionState*>(trx->state());
RocksDBSavePoint guard(rocksTransaction(trx),
trx->isSingleOperationTransaction(),
[&state]() { state->resetLogState(); });
// add possible log statement under guard
state->prepareOperation(_logicalCollection->cid(), revisionId,
StringRef(key),
TRI_VOC_DOCUMENT_OPERATION_REMOVE);

View File

@ -229,10 +229,8 @@ void RocksDBCounterManager::readSettings() {
if (!result.empty()) {
try {
std::shared_ptr<VPackBuilder> builder = VPackParser::fromJson(result);
VPackSlice s = builder->slice();
uint64_t lastTick = basics::VelocyPackHelper::stringUInt64(s.get("tick"));
uint64_t lastTick = basics::VelocyPackHelper::stringUInt64(slice.get("tick"));
LOG_TOPIC(TRACE, Logger::ENGINES) << "using last tick: " << lastTick;
TRI_UpdateTickServer(lastTick);
} catch (...) {
LOG_TOPIC(WARN, Logger::ENGINES) << "unable to read initial settings: invalid data";

View File

@ -66,6 +66,7 @@ RocksDBEdgeIndexIterator::RocksDBEdgeIndexIterator(
keys.release(); // now we have ownership for _keys
TRI_ASSERT(_keys->slice().isArray());
RocksDBTransactionState* state = rocksutils::toRocksTransactionState(_trx);
TRI_ASSERT(state != nullptr);
rocksdb::Transaction* rtrx = state->rocksTransaction();
_iterator.reset(rtrx->GetIterator(state->readOptions()));
updateBounds();
@ -95,6 +96,8 @@ RocksDBEdgeIndexIterator::~RocksDBEdgeIndexIterator() {
}
bool RocksDBEdgeIndexIterator::next(TokenCallback const& cb, size_t limit) {
TRI_ASSERT(_trx->state()->isRunning());
if (limit == 0 || !_keysIterator.valid()) {
// No limit no data, or we are actually done. The last call should have
// returned false

View File

@ -121,8 +121,10 @@ RocksDBAllIndexIterator::RocksDBAllIndexIterator(
_bounds(RocksDBKeyBounds::PrimaryIndex(index->objectId())) {
// acquire rocksdb transaction
RocksDBTransactionState* state = rocksutils::toRocksTransactionState(trx);
TRI_ASSERT(state != nullptr);
rocksdb::Transaction* rtrx = state->rocksTransaction();
auto& options = state->readOptions();
auto const& options = state->readOptions();
TRI_ASSERT(options.snapshot != nullptr);
_iterator.reset(rtrx->GetIterator(options));
@ -134,6 +136,8 @@ RocksDBAllIndexIterator::RocksDBAllIndexIterator(
}
bool RocksDBAllIndexIterator::next(TokenCallback const& cb, size_t limit) {
TRI_ASSERT(_trx->state()->isRunning());
if (limit == 0 || !_iterator->Valid() || outOfRange()) {
// No limit no data, or we are actually done. The last call should have
// returned false
@ -163,6 +167,8 @@ bool RocksDBAllIndexIterator::next(TokenCallback const& cb, size_t limit) {
/// special method to expose the document key for incremental replication
bool RocksDBAllIndexIterator::nextWithKey(TokenKeyCallback const& cb, size_t limit) {
TRI_ASSERT(_trx->state()->isRunning());
if (limit == 0 || !_iterator->Valid() || outOfRange()) {
// No limit no data, or we are actually done. The last call should have
// returned false
@ -189,6 +195,8 @@ bool RocksDBAllIndexIterator::nextWithKey(TokenKeyCallback const& cb, size_t lim
}
void RocksDBAllIndexIterator::reset() {
TRI_ASSERT(_trx->state()->isRunning());
if (_reverse) {
_iterator->SeekForPrev(_bounds.end());
} else {
@ -197,6 +205,8 @@ void RocksDBAllIndexIterator::reset() {
}
bool RocksDBAllIndexIterator::outOfRange() const {
TRI_ASSERT(_trx->state()->isRunning());
if (_reverse) {
return _cmp->Compare(_iterator->key(), _bounds.start()) < 0;
} else {
@ -216,8 +226,10 @@ RocksDBAnyIndexIterator::RocksDBAnyIndexIterator(
_returned(0) {
// acquire rocksdb transaction
RocksDBTransactionState* state = rocksutils::toRocksTransactionState(trx);
TRI_ASSERT(state != nullptr);
rocksdb::Transaction* rtrx = state->rocksTransaction();
auto& options = state->readOptions();
auto const& options = state->readOptions();
TRI_ASSERT(options.snapshot != nullptr);
_iterator.reset(rtrx->GetIterator(options));
@ -244,6 +256,8 @@ RocksDBAnyIndexIterator::RocksDBAnyIndexIterator(
}
bool RocksDBAnyIndexIterator::next(TokenCallback const& cb, size_t limit) {
TRI_ASSERT(_trx->state()->isRunning());
if (limit == 0 || !_iterator->Valid() || outOfRange()) {
// No limit no data, or we are actually done. The last call should have
// returned false
@ -272,6 +286,7 @@ bool RocksDBAnyIndexIterator::next(TokenCallback const& cb, size_t limit) {
void RocksDBAnyIndexIterator::reset() { _iterator->Seek(_bounds.start()); }
bool RocksDBAnyIndexIterator::outOfRange() const {
TRI_ASSERT(_trx->state()->isRunning());
return _cmp->Compare(_iterator->key(), _bounds.end()) > 0;
}

View File

@ -55,11 +55,8 @@ using namespace arangodb;
// for the RocksDB engine we do not need any additional data
struct RocksDBTransactionData final : public TransactionData {};
RocksDBSavePoint::RocksDBSavePoint(rocksdb::Transaction* trx)
: RocksDBSavePoint(trx, false) {}
RocksDBSavePoint::RocksDBSavePoint(rocksdb::Transaction* trx, bool handled)
: _trx(trx), _handled(handled) {
RocksDBSavePoint::RocksDBSavePoint(rocksdb::Transaction* trx, bool handled, std::function<void()> const& rollbackCallback)
: _trx(trx), _rollbackCallback(rollbackCallback), _handled(handled) {
TRI_ASSERT(trx != nullptr);
if (!_handled) {
_trx->SetSavePoint();
@ -81,6 +78,7 @@ void RocksDBSavePoint::rollback() {
TRI_ASSERT(!_handled);
_trx->RollbackToSavePoint();
_handled = true; // in order to not roll back again by accident
_rollbackCallback();
}
/// @brief transaction type
@ -159,7 +157,7 @@ Result RocksDBTransactionState::beginTransaction(transaction::Hints hints) {
_rocksTransaction->SetSnapshot();
_rocksReadOptions.snapshot = _rocksTransaction->GetSnapshot();
if (!hasHint(transaction::Hints::Hint::SINGLE_OPERATION)) {
if (!isReadOnlyTransaction() && !hasHint(transaction::Hints::Hint::SINGLE_OPERATION)) {
RocksDBLogValue header =
RocksDBLogValue::BeginTransaction(_vocbase->id(), _id);
_rocksTransaction->PutLogData(header.slice());
@ -187,50 +185,60 @@ Result RocksDBTransactionState::commitTransaction(
if (_nestingLevel == 0) {
if (_rocksTransaction != nullptr) {
if (hasOperations()) {
// set wait for sync flag if required
if (waitForSync()) {
_rocksWriteOptions.sync = true;
_rocksTransaction->SetWriteOptions(_rocksWriteOptions);
}
if (waitForSync()) {
_rocksWriteOptions.sync = true;
_rocksTransaction->SetWriteOptions(_rocksWriteOptions);
}
// TODO wait for response on github issue to see how we can use the
// sequence number
result = rocksutils::convertStatus(_rocksTransaction->Commit());
rocksdb::SequenceNumber latestSeq =
rocksutils::globalRocksDB()->GetLatestSequenceNumber();
if (!result.ok()) {
abortTransaction(activeTrx);
return result;
}
// TODO wait for response on github issue to see how we can use the
// sequence number
result = rocksutils::convertStatus(_rocksTransaction->Commit());
rocksdb::SequenceNumber latestSeq =
rocksutils::globalRocksDB()->GetLatestSequenceNumber();
if (!result.ok()) {
abortTransaction(activeTrx);
return result;
}
if (_cacheTx != nullptr) {
// note: endTransaction() will delete _cacheTx!
CacheManagerFeature::MANAGER->endTransaction(_cacheTx);
_cacheTx = nullptr;
}
if (_cacheTx != nullptr) {
// note: endTransaction() will delete _cacheTx!
CacheManagerFeature::MANAGER->endTransaction(_cacheTx);
_cacheTx = nullptr;
}
for (auto& trxCollection : _collections) {
RocksDBTransactionCollection* collection =
static_cast<RocksDBTransactionCollection*>(trxCollection);
int64_t adjustment =
collection->numInserts() - collection->numRemoves();
rocksdb::Snapshot const* snap = this->_rocksReadOptions.snapshot;
TRI_ASSERT(snap != nullptr);
if (collection->numInserts() != 0 || collection->numRemoves() != 0 ||
collection->revision() != 0) {
RocksDBCollection* coll = static_cast<RocksDBCollection*>(
trxCollection->collection()->getPhysical());
coll->adjustNumberDocuments(adjustment);
coll->setRevision(collection->revision());
RocksDBEngine* engine =
static_cast<RocksDBEngine*>(EngineSelectorFeature::ENGINE);
for (auto& trxCollection : _collections) {
RocksDBTransactionCollection* collection =
static_cast<RocksDBTransactionCollection*>(trxCollection);
int64_t adjustment =
collection->numInserts() - collection->numRemoves();
if (collection->numInserts() != 0 || collection->numRemoves() != 0 ||
collection->revision() != 0) {
RocksDBCollection* coll = static_cast<RocksDBCollection*>(
trxCollection->collection()->getPhysical());
coll->adjustNumberDocuments(adjustment);
coll->setRevision(collection->revision());
RocksDBEngine* engine =
static_cast<RocksDBEngine*>(EngineSelectorFeature::ENGINE);
RocksDBCounterManager::CounterAdjustment update(
latestSeq, collection->numInserts(), collection->numRemoves(),
collection->revision());
engine->counterManager()->updateCounter(coll->objectId(), update);
RocksDBCounterManager::CounterAdjustment update(
latestSeq, collection->numInserts(), collection->numRemoves(),
collection->revision());
engine->counterManager()->updateCounter(coll->objectId(), update);
}
}
} else {
// don't write anything if the transaction is empty
// TODO: calling Rollback() here does not work for some reason but it should.
// must investigate further!!
result = rocksutils::convertStatus(_rocksTransaction->Commit());
if (_cacheTx != nullptr) {
// note: endTransaction() will delete _cacheTx!
CacheManagerFeature::MANAGER->endTransaction(_cacheTx);
_cacheTx = nullptr;
}
}
@ -257,13 +265,14 @@ Result RocksDBTransactionState::abortTransaction(
if (_rocksTransaction != nullptr) {
rocksdb::Status status = _rocksTransaction->Rollback();
result = rocksutils::convertStatus(status);
_rocksTransaction.reset();
}
if (_cacheTx != nullptr) {
// note: endTransaction() will delete _cacheTx!
CacheManagerFeature::MANAGER->endTransaction(_cacheTx);
_cacheTx = nullptr;
}
if (_cacheTx != nullptr) {
// note: endTransaction() will delete _cacheTx!
CacheManagerFeature::MANAGER->endTransaction(_cacheTx);
_cacheTx = nullptr;
_rocksTransaction.reset();
}
updateStatus(transaction::Status::ABORTED);
@ -283,7 +292,9 @@ Result RocksDBTransactionState::abortTransaction(
void RocksDBTransactionState::prepareOperation(
TRI_voc_cid_t collectionId, TRI_voc_rid_t revisionId,
StringRef const& key, TRI_voc_document_operation_e operationType) {
TRI_ASSERT(!isReadOnlyTransaction());
bool singleOp = hasHint(transaction::Hints::Hint::SINGLE_OPERATION);
// single operations should never call this method twice
TRI_ASSERT(!singleOp || _lastUsedCollection == 0);
@ -321,7 +332,7 @@ void RocksDBTransactionState::prepareOperation(
_lastUsedCollection = collectionId;
}
// we need to the remove log entry, if we don't have the single optimization
// we need to log the remove log entry, if we don't have the single optimization
if (!singleOp && operationType == TRI_VOC_DOCUMENT_OPERATION_REMOVE) {
RocksDBLogValue logValue =
RocksDBLogValue::DocumentRemove(key);

View File

@ -57,8 +57,7 @@ class TransactionCollection;
class RocksDBSavePoint {
public:
explicit RocksDBSavePoint(rocksdb::Transaction* trx);
RocksDBSavePoint(rocksdb::Transaction* trx, bool handled);
RocksDBSavePoint(rocksdb::Transaction* trx, bool handled, std::function<void()> const& rollbackCallback);
~RocksDBSavePoint();
void commit();
@ -68,6 +67,7 @@ class RocksDBSavePoint {
private:
rocksdb::Transaction* _trx;
std::function<void()> const _rollbackCallback;
bool _handled;
};
@ -94,6 +94,9 @@ class RocksDBTransactionState final : public TransactionState {
uint64_t numUpdates() const { return _numUpdates; }
uint64_t numRemoves() const { return _numRemoves; }
/// @brief reset previous log state after a rollback to safepoint
void resetLogState() { _lastUsedCollection = 0; }
inline bool hasOperations() const {
return (_numInserts > 0 || _numRemoves > 0 || _numUpdates > 0);
}

View File

@ -90,7 +90,8 @@ RocksDBVPackIndexIterator::RocksDBVPackIndexIterator(
left, right)) {
RocksDBTransactionState* state = rocksutils::toRocksTransactionState(trx);
rocksdb::Transaction* rtrx = state->rocksTransaction();
auto options = state->readOptions();
TRI_ASSERT(state != nullptr);
auto const& options = state->readOptions();
_iterator.reset(rtrx->GetIterator(options));
if (reverse) {
@ -102,6 +103,8 @@ RocksDBVPackIndexIterator::RocksDBVPackIndexIterator(
/// @brief Reset the cursor
void RocksDBVPackIndexIterator::reset() {
TRI_ASSERT(_trx->state()->isRunning());
if (_reverse) {
_iterator->SeekForPrev(_bounds.end());
} else {
@ -110,6 +113,8 @@ void RocksDBVPackIndexIterator::reset() {
}
bool RocksDBVPackIndexIterator::outOfRange() const {
TRI_ASSERT(_trx->state()->isRunning());
if (_reverse) {
return (_cmp->Compare(_iterator->key(), _bounds.start()) < 0);
} else {
@ -118,6 +123,8 @@ bool RocksDBVPackIndexIterator::outOfRange() const {
}
bool RocksDBVPackIndexIterator::next(TokenCallback const& cb, size_t limit) {
TRI_ASSERT(_trx->state()->isRunning());
if (limit == 0 || !_iterator->Valid() || outOfRange()) {
// No limit no data, or we are actually done. The last call should have
// returned false

View File

@ -66,7 +66,9 @@ function dumpTestSuite () {
assertEqual(2, c.type()); // document
assertTrue(p.waitForSync);
assertFalse(p.isVolatile);
assertEqual(256, p.indexBuckets);
if (db._engine().name === "mmfiles") {
assertEqual(256, p.indexBuckets);
}
assertEqual(1, c.getIndexes().length); // just primary index
assertEqual("primary", c.getIndexes()[0].type);
@ -139,7 +141,9 @@ function dumpTestSuite () {
assertEqual(2, c.type()); // document
assertFalse(p.waitForSync);
assertFalse(p.isVolatile);
assertEqual(16, p.indexBuckets);
if (db._engine().name === "mmfiles") {
assertEqual(16, p.indexBuckets);
}
assertEqual(1, c.getIndexes().length); // just primary index
assertEqual("primary", c.getIndexes()[0].type);

View File

@ -164,8 +164,14 @@ function ReplicationSuite() {
var printed = false;
while (true) {
if (!slaveFuncOngoing(state)) {
return;
var r = slaveFuncOngoing(state);
if (r === "wait") {
// special return code that tells us to hang on
internal.wait(0.5, false);
continue;
}
if (!r) {
break;
}
var slaveState = replication.applier.state();
@ -666,6 +672,9 @@ function ReplicationSuite() {
try {
require("@arangodb/tasks").get(state.task);
// task exists
connectToSlave();
internal.wait(0.5, false);
return "wait";
} catch (err) {
// task does not exist. we're done
state.checksum = collectionChecksum(cn);
@ -674,10 +683,6 @@ function ReplicationSuite() {
connectToSlave();
return false;
}
connectToSlave();
internal.wait(0.5, false);
return true;
},
function(state) {
@ -745,6 +750,12 @@ function ReplicationSuite() {
try {
require("@arangodb/tasks").get(state.task);
// task exists
connectToSlave();
internal.wait(0.5, false);
replication.applier.start();
assertTrue(replication.applier.state().state.running);
return "wait";
} catch (err) {
// task does not exist. we're done
state.checksum = collectionChecksum(cn);
@ -753,13 +764,6 @@ function ReplicationSuite() {
connectToSlave();
return false;
}
connectToSlave();
internal.wait(0.5, false);
replication.applier.start();
assertTrue(replication.applier.state().state.running);
return true;
},
function(state) {