1
0
Fork 0

Backport fix for truncate selectivity (#8863)

This commit is contained in:
Simon 2019-04-29 17:10:03 +02:00 committed by Jan
parent dda5c3da88
commit 11aa075747
2 changed files with 130 additions and 48 deletions

View File

@ -536,6 +536,7 @@ class RocksDBCuckooIndexEstimator {
{ {
WRITE_LOCKER(locker, _lock); WRITE_LOCKER(locker, _lock);
{
// check for a truncate marker // check for a truncate marker
auto it = _truncateBuffer.begin(); // sorted ASC auto it = _truncateBuffer.begin(); // sorted ASC
while (it != _truncateBuffer.end() && *it <= commitSeq) { while (it != _truncateBuffer.end() && *it <= commitSeq) {
@ -545,31 +546,38 @@ class RocksDBCuckooIndexEstimator {
appliedSeq = std::max(appliedSeq, ignoreSeq); appliedSeq = std::max(appliedSeq, ignoreSeq);
it = _truncateBuffer.erase(it); it = _truncateBuffer.erase(it);
} }
}
TRI_ASSERT(ignoreSeq <= commitSeq);
// check for inserts // check for inserts
if (!_insertBuffers.empty()) {
auto it = _insertBuffers.begin(); // sorted ASC auto it = _insertBuffers.begin(); // sorted ASC
if (it->first <= commitSeq) { while (it != _insertBuffers.end() && it->first <= commitSeq) {
if (it->first >= ignoreSeq) { if (it->first <= ignoreSeq) {
TRI_ASSERT(it->first <= appliedSeq);
it = _insertBuffers.erase(it);
continue;
}
inserts = std::move(it->second); inserts = std::move(it->second);
TRI_ASSERT(!inserts.empty()); TRI_ASSERT(!inserts.empty());
}
appliedSeq = std::max(appliedSeq, it->first); appliedSeq = std::max(appliedSeq, it->first);
_insertBuffers.erase(it); _insertBuffers.erase(it);
}
break;
} }
// check for removals // check for removals
if (!_removalBuffers.empty()) { it = _removalBuffers.begin(); // sorted ASC
auto it = _removalBuffers.begin(); // sorted ASC while (it != _removalBuffers.end() && it->first <= commitSeq) {
if (it->first <= commitSeq) { if (it->first <= ignoreSeq) {
if (it->first >= ignoreSeq) { TRI_ASSERT(it->first <= appliedSeq);
it = _removalBuffers.erase(it);
continue;
}
removals = std::move(it->second); removals = std::move(it->second);
TRI_ASSERT(!removals.empty()); TRI_ASSERT(!removals.empty());
}
appliedSeq = std::max(appliedSeq, it->first); appliedSeq = std::max(appliedSeq, it->first);
_removalBuffers.erase(it); _removalBuffers.erase(it);
} break;
} }
} }
@ -582,21 +590,17 @@ class RocksDBCuckooIndexEstimator {
break; break;
} }
if (!inserts.empty()) {
// apply inserts // apply inserts
for (auto const& key : inserts) { for (auto const& key : inserts) {
insert(key); insert(key);
} }
inserts.clear(); inserts.clear();
}
if (!removals.empty()) {
// apply removals // apply removals
for (auto const& key : removals) { for (auto const& key : removals) {
remove(key); remove(key);
} }
removals.clear(); removals.clear();
}
} // </while(true)> } // </while(true)>
}); });
return appliedSeq; return appliedSeq;
@ -925,8 +929,8 @@ class RocksDBCuckooIndexEstimator {
std::atomic<rocksdb::SequenceNumber> _committedSeq; std::atomic<rocksdb::SequenceNumber> _committedSeq;
std::atomic<bool> _needToPersist; std::atomic<bool> _needToPersist;
std::map<rocksdb::SequenceNumber, std::vector<Key>> _insertBuffers; std::multimap<rocksdb::SequenceNumber, std::vector<Key>> _insertBuffers;
std::map<rocksdb::SequenceNumber, std::vector<Key>> _removalBuffers; std::multimap<rocksdb::SequenceNumber, std::vector<Key>> _removalBuffers;
std::set<rocksdb::SequenceNumber> _truncateBuffer; std::set<rocksdb::SequenceNumber> _truncateBuffer;
HashKey _hasherKey; // Instance to compute the first hash function HashKey _hasherKey; // Instance to compute the first hash function

View File

@ -279,5 +279,83 @@ TEST_CASE("IndexEstimator", "[rocksdb][indexestimator]") {
REQUIRE(0.1 == est.computeEstimate()); REQUIRE(0.1 == est.computeEstimate());
} }
SECTION("test_truncate_logic") {
rocksdb::SequenceNumber currentSeq(0);
rocksdb::SequenceNumber expected;
RocksDBCuckooIndexEstimator<uint64_t> est(2048);
RocksDBCollectionMeta meta;
// test buffering where we keep around one old blocker
for (size_t iteration = 0; iteration < 10; iteration++) {
uint64_t index = 0;
std::vector<uint64_t> toInsert(10);
std::vector<uint64_t> toRemove(0);
std::generate(toInsert.begin(), toInsert.end(),
[&index] { return ++index; });
est.bufferUpdates(++currentSeq, std::move(toInsert), std::move(toRemove));
}
// now make sure we haven't applied anything
std::string serialization;
expected = currentSeq;
auto appliedSeq = est.serialize(serialization, ++currentSeq);
serialization.clear();
REQUIRE(appliedSeq == expected);
REQUIRE(0.1 == est.computeEstimate());
// multiple turncate
est.bufferTruncate(currentSeq++);
est.bufferTruncate(currentSeq++);
est.bufferTruncate(currentSeq++);
uint64_t index = 0;
std::vector<uint64_t> toInsert(10);
std::vector<uint64_t> toRemove(0);
std::generate(toInsert.begin(), toInsert.end(),
[&index] { return ++index; });
est.bufferUpdates(++currentSeq, std::move(toInsert), std::move(toRemove));
expected = currentSeq;
// now make sure we haven't applied anything
appliedSeq = est.serialize(serialization, currentSeq);
serialization.clear();
REQUIRE(appliedSeq == expected);
REQUIRE(1.0 == est.computeEstimate());
}
SECTION("test_truncate_logic_2") {
rocksdb::SequenceNumber currentSeq(0);
RocksDBCuckooIndexEstimator<uint64_t> est(2048);
RocksDBCollectionMeta meta;
// test buffering where we keep around one old blocker
for (size_t iteration = 0; iteration < 10; iteration++) {
uint64_t index = 0;
std::vector<uint64_t> toInsert(10);
std::vector<uint64_t> toRemove(0);
std::generate(toInsert.begin(), toInsert.end(),
[&index] { return ++index; });
est.bufferUpdates(++currentSeq, std::move(toInsert), std::move(toRemove));
}
// truncate in the middle
est.bufferTruncate(++currentSeq);
auto expected = currentSeq;
std::string serialization;
auto appliedSeq = est.serialize(serialization, ++currentSeq);
serialization.clear();
REQUIRE(appliedSeq == expected);
REQUIRE(1.0 == est.computeEstimate());
appliedSeq = est.serialize(serialization, ++currentSeq);
REQUIRE(appliedSeq == expected);
REQUIRE(1.0 == est.computeEstimate());
}
// @brief generate tests // @brief generate tests
} }