1
0
Fork 0
This commit is contained in:
Simon 2019-04-10 19:14:34 +02:00 committed by Jan
parent 56336e74f8
commit 676012ca3a
9 changed files with 40 additions and 34 deletions

View File

@ -657,7 +657,6 @@ Result RocksDBCollection::truncate(transaction::Methods& trx, OperationOptions&
seq = db->GetLatestSequenceNumber() - 1; // post commit sequence
uint64_t numDocs = _numberDocuments.exchange(0);
_meta.adjustNumberDocuments(seq, /*revision*/ newRevisionId(),
-static_cast<int64_t>(numDocs));

View File

@ -124,14 +124,14 @@ void RocksDBCollectionMeta::removeBlocker(TRI_voc_tid_t trxId) {
}
/// @brief returns the largest safe seq to squash updates against
rocksdb::SequenceNumber RocksDBCollectionMeta::committableSeq() const {
rocksdb::SequenceNumber RocksDBCollectionMeta::committableSeq(rocksdb::SequenceNumber maxCommitSeq) const {
READ_LOCKER(locker, _blockerLock);
// if we have a blocker use the lowest counter
if (!_blockersBySeq.empty()) {
auto it = _blockersBySeq.begin();
return it->first;
return std::min(it->first, maxCommitSeq);
}
return std::numeric_limits<rocksdb::SequenceNumber>::max();
return maxCommitSeq;
}
rocksdb::SequenceNumber RocksDBCollectionMeta::applyAdjustments(rocksdb::SequenceNumber commitSeq,
@ -172,8 +172,9 @@ rocksdb::SequenceNumber RocksDBCollectionMeta::applyAdjustments(rocksdb::Sequenc
/// @brief get the current count
RocksDBCollectionMeta::DocCount RocksDBCollectionMeta::loadCount() {
auto maxxSeq = std::numeric_limits<rocksdb::SequenceNumber>::max();
bool didWork = false;
const rocksdb::SequenceNumber commitSeq = committableSeq();
const rocksdb::SequenceNumber commitSeq = committableSeq(maxxSeq);
applyAdjustments(commitSeq, didWork);
return _count;
}
@ -191,14 +192,23 @@ Result RocksDBCollectionMeta::serializeMeta(rocksdb::WriteBatch& batch,
LogicalCollection& coll, bool force,
VPackBuilder& tmp,
rocksdb::SequenceNumber& appliedSeq) {
TRI_ASSERT(appliedSeq != UINT64_MAX);
Result res;
bool didWork = false;
// maxCommitSeq is == UINT64_MAX without any blockers
const rocksdb::SequenceNumber maxCommitSeq = std::min(appliedSeq, committableSeq());
const rocksdb::SequenceNumber maxCommitSeq = committableSeq(appliedSeq);
const rocksdb::SequenceNumber commitSeq = applyAdjustments(maxCommitSeq, didWork);
TRI_ASSERT(commitSeq <= appliedSeq);
appliedSeq = commitSeq;
TRI_ASSERT(commitSeq <= maxCommitSeq);
TRI_ASSERT(maxCommitSeq <= appliedSeq);
TRI_ASSERT(maxCommitSeq != UINT64_MAX);
if (didWork) {
appliedSeq = commitSeq;
} else {
appliedSeq = maxCommitSeq;
}
RocksDBKey key;
rocksdb::ColumnFamilyHandle* const cf = RocksDBColumnFamily::definitions();

View File

@ -94,7 +94,7 @@ struct RocksDBCollectionMeta final {
void removeBlocker(TRI_voc_tid_t trxId);
/// @brief returns the largest safe seq to squash updates against
rocksdb::SequenceNumber committableSeq() const;
rocksdb::SequenceNumber committableSeq(rocksdb::SequenceNumber maxCommitSeq) const;
/// @brief get the current count
DocCount loadCount();

View File

@ -1105,6 +1105,7 @@ RocksDBCuckooIndexEstimator<uint64_t>* RocksDBEdgeIndex::estimator() {
}
void RocksDBEdgeIndex::setEstimator(std::unique_ptr<RocksDBCuckooIndexEstimator<uint64_t>> est) {
TRI_ASSERT(_estimator == nullptr || _estimator->appliedSeq() <= est->appliedSeq());
_estimator = std::move(est);
}

View File

@ -423,14 +423,15 @@ class WBReader final : public rocksdb::WriteBatch::Handler {
cc._committedSeq = _currentSequence;
cc._added = 0;
cc._removed = 0;
}
for (std::shared_ptr<arangodb::Index> const& idx : coll->getIndexes()) {
RocksDBIndex* ridx = static_cast<RocksDBIndex*>(idx.get());
RocksDBCuckooIndexEstimator<uint64_t>* est = ridx->estimator();
if (est && est->appliedSeq() <= _currentSequence) {
est->clear();
est->setAppliedSeq(_currentSequence);
for (std::shared_ptr<arangodb::Index> const& idx : coll->getIndexes()) {
RocksDBIndex* ridx = static_cast<RocksDBIndex*>(idx.get());
RocksDBCuckooIndexEstimator<uint64_t>* est = ridx->estimator();
TRI_ASSERT(ridx->type() != Index::TRI_IDX_TYPE_EDGE_INDEX || est);
if (est) {
est->clear();
est->setAppliedSeq(_currentSequence);
}
}
}
}

View File

@ -202,14 +202,14 @@ Result RocksDBSettingsManager::sync(bool force) {
batch.Clear();
}
TRI_ASSERT(_lastSync <= minSeqNr);
if (!didWork) {
WRITE_LOCKER(guard, _rwLock);
_lastSync = minSeqNr;
return Result(); // nothing was written
}
_tmpBuilder.clear();
Result res = writeSettings(batch, _tmpBuilder, std::max(_lastSync, minSeqNr));
Result res = writeSettings(batch, _tmpBuilder, std::max(_lastSync.load(), minSeqNr));
if (res.fail()) {
LOG_TOPIC("8a5e6", WARN, Logger::ENGINES)
<< "could not store metadata settings " << res.errorMessage();
@ -219,8 +219,7 @@ Result RocksDBSettingsManager::sync(bool force) {
// we have to commit all counters in one batch
auto s = _db->Write(wo, &batch);
if (s.ok()) {
WRITE_LOCKER(guard, _rwLock);
_lastSync = std::max(_lastSync, minSeqNr);
_lastSync = std::max(_lastSync.load(), minSeqNr);
}
return rocksutils::convertStatus(s);
@ -241,7 +240,6 @@ void RocksDBSettingsManager::loadSettings() {
LOG_TOPIC("7458b", TRACE, Logger::ENGINES) << "read initial settings: " << slice.toJson();
if (!result.empty()) {
WRITE_LOCKER(guard, _rwLock);
try {
if (slice.hasKey("tick")) {
uint64_t lastTick =
@ -279,7 +277,6 @@ void RocksDBSettingsManager::loadSettings() {
/// earliest safe sequence number to throw away from wal
rocksdb::SequenceNumber RocksDBSettingsManager::earliestSeqNeeded() const {
READ_LOCKER(guard, _rwLock);
return _lastSync;
}

View File

@ -68,14 +68,11 @@ class RocksDBSettingsManager {
bool lockForSync(bool force);
private:
/// @brief protect _syncing and _counters
mutable basics::ReadWriteLock _rwLock;
/// @brief a reusable builder, used inside sync() to serialize objects
arangodb::velocypack::Builder _tmpBuilder;
/// @brief last sync sequence number
rocksdb::SequenceNumber _lastSync;
std::atomic<rocksdb::SequenceNumber> _lastSync;
/// @brief currently syncing
std::atomic<bool> _syncing;

View File

@ -1274,6 +1274,7 @@ RocksDBCuckooIndexEstimator<uint64_t>* RocksDBVPackIndex::estimator() {
void RocksDBVPackIndex::setEstimator(std::unique_ptr<RocksDBCuckooIndexEstimator<uint64_t>> est) {
TRI_ASSERT(!_unique);
TRI_ASSERT(_estimator == nullptr || _estimator->appliedSeq() <= est->appliedSeq());
_estimator = std::move(est);
}

View File

@ -157,14 +157,14 @@ TEST_CASE("IndexEstimator", "[rocksdb][indexestimator]") {
est.bufferUpdates(++currentSeq, std::move(toInsert), std::move(toRemove));
// make sure we don't apply yet
est.serialize(serialization, meta.committableSeq());
est.serialize(serialization, meta.committableSeq(UINT64_MAX));
serialization.clear();
REQUIRE(est.appliedSeq() == expected);
REQUIRE((1.0 / std::max(1.0, static_cast<double>(iteration))) ==
est.computeEstimate());
meta.removeBlocker(iteration);
CHECK(meta.committableSeq() == UINT64_MAX);
CHECK(meta.committableSeq(UINT64_MAX) == UINT64_MAX);
// now make sure we apply it
est.serialize(serialization, currentSeq);
@ -188,8 +188,8 @@ TEST_CASE("IndexEstimator", "[rocksdb][indexestimator]") {
est.bufferUpdates(++currentSeq, std::move(toInsert), std::move(toRemove));
// make sure we don't apply yet
REQUIRE(meta.committableSeq() == expected + 1);
est.serialize(serialization, meta.committableSeq());
REQUIRE(meta.committableSeq(UINT64_MAX) == expected + 1);
est.serialize(serialization, meta.committableSeq(UINT64_MAX));
serialization.clear();
REQUIRE(est.appliedSeq() == expected);
REQUIRE((1.0 / std::max(1.0, static_cast<double>(10 - iteration))) ==
@ -198,7 +198,7 @@ TEST_CASE("IndexEstimator", "[rocksdb][indexestimator]") {
meta.removeBlocker(iteration);
// now make sure we apply it
est.serialize(serialization, meta.committableSeq());
est.serialize(serialization, meta.committableSeq(UINT64_MAX));
serialization.clear();
expected = currentSeq;
REQUIRE(est.appliedSeq() == expected);
@ -232,7 +232,7 @@ TEST_CASE("IndexEstimator", "[rocksdb][indexestimator]") {
meta.removeBlocker(iteration - 1);
// now make sure we applied last batch, but not this one
est.serialize(serialization, meta.committableSeq());
est.serialize(serialization, meta.committableSeq(UINT64_MAX));
serialization.clear();
REQUIRE(est.appliedSeq() == expected);
REQUIRE((1.0 / std::max(1.0, static_cast<double>(iteration))) ==
@ -264,7 +264,7 @@ TEST_CASE("IndexEstimator", "[rocksdb][indexestimator]") {
meta.removeBlocker(std::max(static_cast<size_t>(1), iteration));
// now make sure we haven't applied anything
est.serialize(serialization, meta.committableSeq());
est.serialize(serialization, meta.committableSeq(UINT64_MAX));
serialization.clear();
REQUIRE(est.appliedSeq() == expected);
REQUIRE(1.0 == est.computeEstimate());
@ -272,7 +272,7 @@ TEST_CASE("IndexEstimator", "[rocksdb][indexestimator]") {
// now remove first blocker and make sure we apply everything
meta.removeBlocker(0);
est.serialize(serialization, meta.committableSeq());
est.serialize(serialization, meta.committableSeq(UINT64_MAX));
expected = currentSeq;
serialization.clear();
REQUIRE(est.appliedSeq() == expected);