diff --git a/arangod/RocksDBEngine/RocksDBCuckooIndexEstimator.h b/arangod/RocksDBEngine/RocksDBCuckooIndexEstimator.h index 0bd64c4174..016be1a154 100644 --- a/arangod/RocksDBEngine/RocksDBCuckooIndexEstimator.h +++ b/arangod/RocksDBEngine/RocksDBCuckooIndexEstimator.h @@ -336,8 +336,6 @@ class RocksDBCuckooIndexEstimator { _truncateBuffer.emplace(seq); _needToPersist.store(true, std::memory_order_release); }); - LOG_TOPIC("69002", TRACE, Logger::ENGINES) - << "buffered truncate with stamp " << seq; return res; } @@ -494,8 +492,6 @@ class RocksDBCuckooIndexEstimator { } _needToPersist.store(true, std::memory_order_release); - LOG_TOPIC("69001", TRACE, Logger::ENGINES) - << "buffered updates with stamp " << seq; }); return res; } @@ -523,88 +519,83 @@ class RocksDBCuckooIndexEstimator { rocksdb::SequenceNumber applyUpdates(rocksdb::SequenceNumber commitSeq) { rocksdb::SequenceNumber appliedSeq = 0; Result res = basics::catchVoidToResult([&]() -> void { - std::vector> inserts; - std::vector> removals; - + std::vector inserts; + std::vector removals; + // truncate will increase this sequence rocksdb::SequenceNumber ignoreSeq = 0; - bool foundTruncate = false; - // find out if we have buffers to apply - { - WRITE_LOCKER(locker, _lock); - - // check for a truncate marker - auto it = _truncateBuffer.begin(); // sorted ASC - while (it != _truncateBuffer.end() && *it <= commitSeq) { - TRI_ASSERT(*it >= ignoreSeq && *it != 0); - ignoreSeq = *it; - foundTruncate = true; - appliedSeq = std::max(appliedSeq, ignoreSeq); - it = _truncateBuffer.erase(it); - } - - // check for inserts - while (!_insertBuffers.empty()) { + while (true) { + bool foundTruncate = false; + // find out if we have buffers to apply + { + WRITE_LOCKER(locker, _lock); + + { + // check for a truncate marker + auto it = _truncateBuffer.begin(); // sorted ASC + while (it != _truncateBuffer.end() && *it <= commitSeq) { + ignoreSeq = *it; + TRI_ASSERT(ignoreSeq != 0); + foundTruncate = true; + appliedSeq = std::max(appliedSeq, ignoreSeq); + it = _truncateBuffer.erase(it); + } + } + TRI_ASSERT(ignoreSeq <= commitSeq); + + // check for inserts auto it = _insertBuffers.begin(); // sorted ASC - if (it->first > commitSeq) { - break; - } - if (it->first > ignoreSeq) { - inserts.emplace_back(std::move(it->second)); + while (it != _insertBuffers.end() && it->first <= commitSeq) { + if (it->first <= ignoreSeq) { + TRI_ASSERT(it->first <= appliedSeq); + it = _insertBuffers.erase(it); + continue; + } + inserts = std::move(it->second); TRI_ASSERT(!inserts.empty()); - LOG_TOPIC("bf36a", TRACE, Logger::ENGINES) - << "will apply insertions with stamp " << it->first; - } else { - LOG_TOPIC("bf36d", TRACE, Logger::ENGINES) - << "ignoring buffered insertions with stamp " << it->first; - } - appliedSeq = std::max(appliedSeq, it->first); - _insertBuffers.erase(it); - } - - // check for removals - while (!_removalBuffers.empty()) { - auto it = _removalBuffers.begin(); // sorted ASC - if (it->first > commitSeq) { + appliedSeq = std::max(appliedSeq, it->first); + _insertBuffers.erase(it); + break; } - if (it->first > ignoreSeq) { - removals.emplace_back(std::move(it->second)); + + // check for removals + it = _removalBuffers.begin(); // sorted ASC + while (it != _removalBuffers.end() && it->first <= commitSeq) { + if (it->first <= ignoreSeq) { + TRI_ASSERT(it->first <= appliedSeq); + it = _removalBuffers.erase(it); + continue; + } + removals = std::move(it->second); TRI_ASSERT(!removals.empty()); - LOG_TOPIC("bf36b", TRACE, Logger::ENGINES) - << "will apply removals with stamp " << it->first; - } else { - LOG_TOPIC("bf36e", TRACE, Logger::ENGINES) - << "ignoring buffered removals with stamp " << it->first; + appliedSeq = std::max(appliedSeq, it->first); + _removalBuffers.erase(it); + break; } - appliedSeq = std::max(appliedSeq, it->first); - _removalBuffers.erase(it); } - } - - if (foundTruncate) { - LOG_TOPIC("bf36c", TRACE, Logger::ENGINES) - << "applying truncate with stamp " << ignoreSeq; - clear(); // clear estimates - } - - while (!inserts.empty()) { - auto batch = inserts.begin(); + + if (foundTruncate) { + clear(); // clear estimates + } + + // no inserts or removals left to apply, drop out of loop + if (inserts.empty() && removals.empty()) { + break; + } + // apply inserts - for (auto const& key : *batch) { + for (auto const& key : inserts) { insert(key); } - inserts.erase(batch); - } - - while (!removals.empty()) { - auto batch = removals.begin(); + inserts.clear(); + // apply removals - for (auto const& key : *batch) { + for (auto const& key : removals) { remove(key); } - removals.erase(batch); - } + removals.clear(); + } // }); return appliedSeq; } diff --git a/tests/RocksDBEngine/IndexEstimatorTest.cpp b/tests/RocksDBEngine/IndexEstimatorTest.cpp index bbef6342a6..cd57d1b809 100644 --- a/tests/RocksDBEngine/IndexEstimatorTest.cpp +++ b/tests/RocksDBEngine/IndexEstimatorTest.cpp @@ -278,6 +278,83 @@ TEST_CASE("IndexEstimator", "[rocksdb][indexestimator]") { REQUIRE(est.appliedSeq() == expected); REQUIRE(0.1 == est.computeEstimate()); } + + SECTION("test_truncate_logic") { + rocksdb::SequenceNumber currentSeq(0); + rocksdb::SequenceNumber expected; + RocksDBCuckooIndexEstimator est(2048); + RocksDBCollectionMeta meta; + + // test buffering where we keep around one old blocker + for (size_t iteration = 0; iteration < 10; iteration++) { + uint64_t index = 0; + std::vector toInsert(10); + std::vector toRemove(0); + std::generate(toInsert.begin(), toInsert.end(), + [&index] { return ++index; }); + + est.bufferUpdates(++currentSeq, std::move(toInsert), std::move(toRemove)); + } + + // now make sure we haven't applied anything + std::string serialization; + expected = currentSeq; + est.serialize(serialization, ++currentSeq); + serialization.clear(); + REQUIRE(est.appliedSeq() == expected); + REQUIRE(0.1 == est.computeEstimate()); + + // multiple turncate + est.bufferTruncate(currentSeq++); + est.bufferTruncate(currentSeq++); + est.bufferTruncate(currentSeq++); + + uint64_t index = 0; + std::vector toInsert(10); + std::vector toRemove(0); + std::generate(toInsert.begin(), toInsert.end(), + [&index] { return ++index; }); + est.bufferUpdates(++currentSeq, std::move(toInsert), std::move(toRemove)); + + + expected = currentSeq; + // now make sure we haven't applied anything + est.serialize(serialization, currentSeq); + serialization.clear(); + REQUIRE(est.appliedSeq() == expected); + REQUIRE(1.0 == est.computeEstimate()); + } + + SECTION("test_truncate_logic_2") { + rocksdb::SequenceNumber currentSeq(0); + RocksDBCuckooIndexEstimator est(2048); + RocksDBCollectionMeta meta; + + // test buffering where we keep around one old blocker + for (size_t iteration = 0; iteration < 10; iteration++) { + uint64_t index = 0; + std::vector toInsert(10); + std::vector toRemove(0); + std::generate(toInsert.begin(), toInsert.end(), + [&index] { return ++index; }); + + est.bufferUpdates(++currentSeq, std::move(toInsert), std::move(toRemove)); + } + + // truncate in the middle + est.bufferTruncate(++currentSeq); + + auto expected = currentSeq; + std::string serialization; + est.serialize(serialization, ++currentSeq); + serialization.clear(); + REQUIRE(est.appliedSeq() == expected); + REQUIRE(1.0 == est.computeEstimate()); + + est.serialize(serialization, ++currentSeq); + REQUIRE(est.appliedSeq() == expected); + REQUIRE(1.0 == est.computeEstimate()); + } // @brief generate tests }