From 676012ca3aced736559fe63c5bd5d43d21af9dd0 Mon Sep 17 00:00:00 2001 From: Simon Date: Wed, 10 Apr 2019 19:14:34 +0200 Subject: [PATCH] snafu (#8728) --- arangod/RocksDBEngine/RocksDBCollection.cpp | 1 - .../RocksDBEngine/RocksDBCollectionMeta.cpp | 22 ++++++++++++++----- arangod/RocksDBEngine/RocksDBCollectionMeta.h | 2 +- arangod/RocksDBEngine/RocksDBEdgeIndex.cpp | 1 + .../RocksDBEngine/RocksDBRecoveryManager.cpp | 17 +++++++------- .../RocksDBEngine/RocksDBSettingsManager.cpp | 9 +++----- .../RocksDBEngine/RocksDBSettingsManager.h | 5 +---- arangod/RocksDBEngine/RocksDBVPackIndex.cpp | 1 + tests/RocksDBEngine/IndexEstimatorTest.cpp | 16 +++++++------- 9 files changed, 40 insertions(+), 34 deletions(-) diff --git a/arangod/RocksDBEngine/RocksDBCollection.cpp b/arangod/RocksDBEngine/RocksDBCollection.cpp index ecf4cbd41a..9c10e1e260 100644 --- a/arangod/RocksDBEngine/RocksDBCollection.cpp +++ b/arangod/RocksDBEngine/RocksDBCollection.cpp @@ -657,7 +657,6 @@ Result RocksDBCollection::truncate(transaction::Methods& trx, OperationOptions& seq = db->GetLatestSequenceNumber() - 1; // post commit sequence uint64_t numDocs = _numberDocuments.exchange(0); - _meta.adjustNumberDocuments(seq, /*revision*/ newRevisionId(), -static_cast(numDocs)); diff --git a/arangod/RocksDBEngine/RocksDBCollectionMeta.cpp b/arangod/RocksDBEngine/RocksDBCollectionMeta.cpp index 0bf4275589..b0dd561780 100644 --- a/arangod/RocksDBEngine/RocksDBCollectionMeta.cpp +++ b/arangod/RocksDBEngine/RocksDBCollectionMeta.cpp @@ -124,14 +124,14 @@ void RocksDBCollectionMeta::removeBlocker(TRI_voc_tid_t trxId) { } /// @brief returns the largest safe seq to squash updates against -rocksdb::SequenceNumber RocksDBCollectionMeta::committableSeq() const { +rocksdb::SequenceNumber RocksDBCollectionMeta::committableSeq(rocksdb::SequenceNumber maxCommitSeq) const { READ_LOCKER(locker, _blockerLock); // if we have a blocker use the lowest counter if (!_blockersBySeq.empty()) { auto it = _blockersBySeq.begin(); - return it->first; + return std::min(it->first, maxCommitSeq); } - return std::numeric_limits::max(); + return maxCommitSeq; } rocksdb::SequenceNumber RocksDBCollectionMeta::applyAdjustments(rocksdb::SequenceNumber commitSeq, @@ -172,8 +172,9 @@ rocksdb::SequenceNumber RocksDBCollectionMeta::applyAdjustments(rocksdb::Sequenc /// @brief get the current count RocksDBCollectionMeta::DocCount RocksDBCollectionMeta::loadCount() { + auto maxxSeq = std::numeric_limits::max(); bool didWork = false; - const rocksdb::SequenceNumber commitSeq = committableSeq(); + const rocksdb::SequenceNumber commitSeq = committableSeq(maxxSeq); applyAdjustments(commitSeq, didWork); return _count; } @@ -191,14 +192,23 @@ Result RocksDBCollectionMeta::serializeMeta(rocksdb::WriteBatch& batch, LogicalCollection& coll, bool force, VPackBuilder& tmp, rocksdb::SequenceNumber& appliedSeq) { + TRI_ASSERT(appliedSeq != UINT64_MAX); + Result res; bool didWork = false; // maxCommitSeq is == UINT64_MAX without any blockers - const rocksdb::SequenceNumber maxCommitSeq = std::min(appliedSeq, committableSeq()); + const rocksdb::SequenceNumber maxCommitSeq = committableSeq(appliedSeq); const rocksdb::SequenceNumber commitSeq = applyAdjustments(maxCommitSeq, didWork); TRI_ASSERT(commitSeq <= appliedSeq); - appliedSeq = commitSeq; + TRI_ASSERT(commitSeq <= maxCommitSeq); + TRI_ASSERT(maxCommitSeq <= appliedSeq); + TRI_ASSERT(maxCommitSeq != UINT64_MAX); + if (didWork) { + appliedSeq = commitSeq; + } else { + appliedSeq = maxCommitSeq; + } RocksDBKey key; rocksdb::ColumnFamilyHandle* const cf = RocksDBColumnFamily::definitions(); diff --git a/arangod/RocksDBEngine/RocksDBCollectionMeta.h b/arangod/RocksDBEngine/RocksDBCollectionMeta.h index 25357bd2f9..0f56b7919a 100644 --- a/arangod/RocksDBEngine/RocksDBCollectionMeta.h +++ b/arangod/RocksDBEngine/RocksDBCollectionMeta.h @@ -94,7 +94,7 @@ struct RocksDBCollectionMeta final { void removeBlocker(TRI_voc_tid_t trxId); /// @brief returns the largest safe seq to squash updates against - rocksdb::SequenceNumber committableSeq() const; + rocksdb::SequenceNumber committableSeq(rocksdb::SequenceNumber maxCommitSeq) const; /// @brief get the current count DocCount loadCount(); diff --git a/arangod/RocksDBEngine/RocksDBEdgeIndex.cpp b/arangod/RocksDBEngine/RocksDBEdgeIndex.cpp index 8d3a60fc44..f7c2779366 100644 --- a/arangod/RocksDBEngine/RocksDBEdgeIndex.cpp +++ b/arangod/RocksDBEngine/RocksDBEdgeIndex.cpp @@ -1105,6 +1105,7 @@ RocksDBCuckooIndexEstimator* RocksDBEdgeIndex::estimator() { } void RocksDBEdgeIndex::setEstimator(std::unique_ptr> est) { + TRI_ASSERT(_estimator == nullptr || _estimator->appliedSeq() <= est->appliedSeq()); _estimator = std::move(est); } diff --git a/arangod/RocksDBEngine/RocksDBRecoveryManager.cpp b/arangod/RocksDBEngine/RocksDBRecoveryManager.cpp index c80790aca7..6883e2c436 100644 --- a/arangod/RocksDBEngine/RocksDBRecoveryManager.cpp +++ b/arangod/RocksDBEngine/RocksDBRecoveryManager.cpp @@ -423,14 +423,15 @@ class WBReader final : public rocksdb::WriteBatch::Handler { cc._committedSeq = _currentSequence; cc._added = 0; cc._removed = 0; - } - - for (std::shared_ptr const& idx : coll->getIndexes()) { - RocksDBIndex* ridx = static_cast(idx.get()); - RocksDBCuckooIndexEstimator* est = ridx->estimator(); - if (est && est->appliedSeq() <= _currentSequence) { - est->clear(); - est->setAppliedSeq(_currentSequence); + + for (std::shared_ptr const& idx : coll->getIndexes()) { + RocksDBIndex* ridx = static_cast(idx.get()); + RocksDBCuckooIndexEstimator* est = ridx->estimator(); + TRI_ASSERT(ridx->type() != Index::TRI_IDX_TYPE_EDGE_INDEX || est); + if (est) { + est->clear(); + est->setAppliedSeq(_currentSequence); + } } } } diff --git a/arangod/RocksDBEngine/RocksDBSettingsManager.cpp b/arangod/RocksDBEngine/RocksDBSettingsManager.cpp index af030287fa..7fde905bfa 100644 --- a/arangod/RocksDBEngine/RocksDBSettingsManager.cpp +++ b/arangod/RocksDBEngine/RocksDBSettingsManager.cpp @@ -202,14 +202,14 @@ Result RocksDBSettingsManager::sync(bool force) { batch.Clear(); } + TRI_ASSERT(_lastSync <= minSeqNr); if (!didWork) { - WRITE_LOCKER(guard, _rwLock); _lastSync = minSeqNr; return Result(); // nothing was written } _tmpBuilder.clear(); - Result res = writeSettings(batch, _tmpBuilder, std::max(_lastSync, minSeqNr)); + Result res = writeSettings(batch, _tmpBuilder, std::max(_lastSync.load(), minSeqNr)); if (res.fail()) { LOG_TOPIC("8a5e6", WARN, Logger::ENGINES) << "could not store metadata settings " << res.errorMessage(); @@ -219,8 +219,7 @@ Result RocksDBSettingsManager::sync(bool force) { // we have to commit all counters in one batch auto s = _db->Write(wo, &batch); if (s.ok()) { - WRITE_LOCKER(guard, _rwLock); - _lastSync = std::max(_lastSync, minSeqNr); + _lastSync = std::max(_lastSync.load(), minSeqNr); } return rocksutils::convertStatus(s); @@ -241,7 +240,6 @@ void RocksDBSettingsManager::loadSettings() { LOG_TOPIC("7458b", TRACE, Logger::ENGINES) << "read initial settings: " << slice.toJson(); if (!result.empty()) { - WRITE_LOCKER(guard, _rwLock); try { if (slice.hasKey("tick")) { uint64_t lastTick = @@ -279,7 +277,6 @@ void RocksDBSettingsManager::loadSettings() { /// earliest safe sequence number to throw away from wal rocksdb::SequenceNumber RocksDBSettingsManager::earliestSeqNeeded() const { - READ_LOCKER(guard, _rwLock); return _lastSync; } diff --git a/arangod/RocksDBEngine/RocksDBSettingsManager.h b/arangod/RocksDBEngine/RocksDBSettingsManager.h index 3ba5cb0c73..43b662d7dc 100644 --- a/arangod/RocksDBEngine/RocksDBSettingsManager.h +++ b/arangod/RocksDBEngine/RocksDBSettingsManager.h @@ -68,14 +68,11 @@ class RocksDBSettingsManager { bool lockForSync(bool force); private: - /// @brief protect _syncing and _counters - mutable basics::ReadWriteLock _rwLock; - /// @brief a reusable builder, used inside sync() to serialize objects arangodb::velocypack::Builder _tmpBuilder; /// @brief last sync sequence number - rocksdb::SequenceNumber _lastSync; + std::atomic _lastSync; /// @brief currently syncing std::atomic _syncing; diff --git a/arangod/RocksDBEngine/RocksDBVPackIndex.cpp b/arangod/RocksDBEngine/RocksDBVPackIndex.cpp index 3652289314..4964937169 100644 --- a/arangod/RocksDBEngine/RocksDBVPackIndex.cpp +++ b/arangod/RocksDBEngine/RocksDBVPackIndex.cpp @@ -1274,6 +1274,7 @@ RocksDBCuckooIndexEstimator* RocksDBVPackIndex::estimator() { void RocksDBVPackIndex::setEstimator(std::unique_ptr> est) { TRI_ASSERT(!_unique); + TRI_ASSERT(_estimator == nullptr || _estimator->appliedSeq() <= est->appliedSeq()); _estimator = std::move(est); } diff --git a/tests/RocksDBEngine/IndexEstimatorTest.cpp b/tests/RocksDBEngine/IndexEstimatorTest.cpp index 42882a3e61..bbef6342a6 100644 --- a/tests/RocksDBEngine/IndexEstimatorTest.cpp +++ b/tests/RocksDBEngine/IndexEstimatorTest.cpp @@ -157,14 +157,14 @@ TEST_CASE("IndexEstimator", "[rocksdb][indexestimator]") { est.bufferUpdates(++currentSeq, std::move(toInsert), std::move(toRemove)); // make sure we don't apply yet - est.serialize(serialization, meta.committableSeq()); + est.serialize(serialization, meta.committableSeq(UINT64_MAX)); serialization.clear(); REQUIRE(est.appliedSeq() == expected); REQUIRE((1.0 / std::max(1.0, static_cast(iteration))) == est.computeEstimate()); meta.removeBlocker(iteration); - CHECK(meta.committableSeq() == UINT64_MAX); + CHECK(meta.committableSeq(UINT64_MAX) == UINT64_MAX); // now make sure we apply it est.serialize(serialization, currentSeq); @@ -188,8 +188,8 @@ TEST_CASE("IndexEstimator", "[rocksdb][indexestimator]") { est.bufferUpdates(++currentSeq, std::move(toInsert), std::move(toRemove)); // make sure we don't apply yet - REQUIRE(meta.committableSeq() == expected + 1); - est.serialize(serialization, meta.committableSeq()); + REQUIRE(meta.committableSeq(UINT64_MAX) == expected + 1); + est.serialize(serialization, meta.committableSeq(UINT64_MAX)); serialization.clear(); REQUIRE(est.appliedSeq() == expected); REQUIRE((1.0 / std::max(1.0, static_cast(10 - iteration))) == @@ -198,7 +198,7 @@ TEST_CASE("IndexEstimator", "[rocksdb][indexestimator]") { meta.removeBlocker(iteration); // now make sure we apply it - est.serialize(serialization, meta.committableSeq()); + est.serialize(serialization, meta.committableSeq(UINT64_MAX)); serialization.clear(); expected = currentSeq; REQUIRE(est.appliedSeq() == expected); @@ -232,7 +232,7 @@ TEST_CASE("IndexEstimator", "[rocksdb][indexestimator]") { meta.removeBlocker(iteration - 1); // now make sure we applied last batch, but not this one - est.serialize(serialization, meta.committableSeq()); + est.serialize(serialization, meta.committableSeq(UINT64_MAX)); serialization.clear(); REQUIRE(est.appliedSeq() == expected); REQUIRE((1.0 / std::max(1.0, static_cast(iteration))) == @@ -264,7 +264,7 @@ TEST_CASE("IndexEstimator", "[rocksdb][indexestimator]") { meta.removeBlocker(std::max(static_cast(1), iteration)); // now make sure we haven't applied anything - est.serialize(serialization, meta.committableSeq()); + est.serialize(serialization, meta.committableSeq(UINT64_MAX)); serialization.clear(); REQUIRE(est.appliedSeq() == expected); REQUIRE(1.0 == est.computeEstimate()); @@ -272,7 +272,7 @@ TEST_CASE("IndexEstimator", "[rocksdb][indexestimator]") { // now remove first blocker and make sure we apply everything meta.removeBlocker(0); - est.serialize(serialization, meta.committableSeq()); + est.serialize(serialization, meta.committableSeq(UINT64_MAX)); expected = currentSeq; serialization.clear(); REQUIRE(est.appliedSeq() == expected);