mirror of https://gitee.com/bigwinds/arangodb
Improve estimator apply algo + test (#8871)
This commit is contained in:
parent
b110f5fc61
commit
1145c7c053
|
@ -336,8 +336,6 @@ class RocksDBCuckooIndexEstimator {
|
|||
_truncateBuffer.emplace(seq);
|
||||
_needToPersist.store(true, std::memory_order_release);
|
||||
});
|
||||
LOG_TOPIC("69002", TRACE, Logger::ENGINES)
|
||||
<< "buffered truncate with stamp " << seq;
|
||||
return res;
|
||||
}
|
||||
|
||||
|
@ -494,8 +492,6 @@ class RocksDBCuckooIndexEstimator {
|
|||
}
|
||||
|
||||
_needToPersist.store(true, std::memory_order_release);
|
||||
LOG_TOPIC("69001", TRACE, Logger::ENGINES)
|
||||
<< "buffered updates with stamp " << seq;
|
||||
});
|
||||
return res;
|
||||
}
|
||||
|
@ -523,88 +519,83 @@ class RocksDBCuckooIndexEstimator {
|
|||
rocksdb::SequenceNumber applyUpdates(rocksdb::SequenceNumber commitSeq) {
|
||||
rocksdb::SequenceNumber appliedSeq = 0;
|
||||
Result res = basics::catchVoidToResult([&]() -> void {
|
||||
std::vector<std::vector<Key>> inserts;
|
||||
std::vector<std::vector<Key>> removals;
|
||||
|
||||
std::vector<Key> inserts;
|
||||
std::vector<Key> removals;
|
||||
|
||||
// truncate will increase this sequence
|
||||
rocksdb::SequenceNumber ignoreSeq = 0;
|
||||
bool foundTruncate = false;
|
||||
// find out if we have buffers to apply
|
||||
{
|
||||
WRITE_LOCKER(locker, _lock);
|
||||
|
||||
// check for a truncate marker
|
||||
auto it = _truncateBuffer.begin(); // sorted ASC
|
||||
while (it != _truncateBuffer.end() && *it <= commitSeq) {
|
||||
TRI_ASSERT(*it >= ignoreSeq && *it != 0);
|
||||
ignoreSeq = *it;
|
||||
foundTruncate = true;
|
||||
appliedSeq = std::max(appliedSeq, ignoreSeq);
|
||||
it = _truncateBuffer.erase(it);
|
||||
}
|
||||
|
||||
// check for inserts
|
||||
while (!_insertBuffers.empty()) {
|
||||
while (true) {
|
||||
bool foundTruncate = false;
|
||||
// find out if we have buffers to apply
|
||||
{
|
||||
WRITE_LOCKER(locker, _lock);
|
||||
|
||||
{
|
||||
// check for a truncate marker
|
||||
auto it = _truncateBuffer.begin(); // sorted ASC
|
||||
while (it != _truncateBuffer.end() && *it <= commitSeq) {
|
||||
ignoreSeq = *it;
|
||||
TRI_ASSERT(ignoreSeq != 0);
|
||||
foundTruncate = true;
|
||||
appliedSeq = std::max(appliedSeq, ignoreSeq);
|
||||
it = _truncateBuffer.erase(it);
|
||||
}
|
||||
}
|
||||
TRI_ASSERT(ignoreSeq <= commitSeq);
|
||||
|
||||
// check for inserts
|
||||
auto it = _insertBuffers.begin(); // sorted ASC
|
||||
if (it->first > commitSeq) {
|
||||
break;
|
||||
}
|
||||
if (it->first > ignoreSeq) {
|
||||
inserts.emplace_back(std::move(it->second));
|
||||
while (it != _insertBuffers.end() && it->first <= commitSeq) {
|
||||
if (it->first <= ignoreSeq) {
|
||||
TRI_ASSERT(it->first <= appliedSeq);
|
||||
it = _insertBuffers.erase(it);
|
||||
continue;
|
||||
}
|
||||
inserts = std::move(it->second);
|
||||
TRI_ASSERT(!inserts.empty());
|
||||
LOG_TOPIC("bf36a", TRACE, Logger::ENGINES)
|
||||
<< "will apply insertions with stamp " << it->first;
|
||||
} else {
|
||||
LOG_TOPIC("bf36d", TRACE, Logger::ENGINES)
|
||||
<< "ignoring buffered insertions with stamp " << it->first;
|
||||
}
|
||||
appliedSeq = std::max(appliedSeq, it->first);
|
||||
_insertBuffers.erase(it);
|
||||
}
|
||||
|
||||
// check for removals
|
||||
while (!_removalBuffers.empty()) {
|
||||
auto it = _removalBuffers.begin(); // sorted ASC
|
||||
if (it->first > commitSeq) {
|
||||
appliedSeq = std::max(appliedSeq, it->first);
|
||||
_insertBuffers.erase(it);
|
||||
|
||||
break;
|
||||
}
|
||||
if (it->first > ignoreSeq) {
|
||||
removals.emplace_back(std::move(it->second));
|
||||
|
||||
// check for removals
|
||||
it = _removalBuffers.begin(); // sorted ASC
|
||||
while (it != _removalBuffers.end() && it->first <= commitSeq) {
|
||||
if (it->first <= ignoreSeq) {
|
||||
TRI_ASSERT(it->first <= appliedSeq);
|
||||
it = _removalBuffers.erase(it);
|
||||
continue;
|
||||
}
|
||||
removals = std::move(it->second);
|
||||
TRI_ASSERT(!removals.empty());
|
||||
LOG_TOPIC("bf36b", TRACE, Logger::ENGINES)
|
||||
<< "will apply removals with stamp " << it->first;
|
||||
} else {
|
||||
LOG_TOPIC("bf36e", TRACE, Logger::ENGINES)
|
||||
<< "ignoring buffered removals with stamp " << it->first;
|
||||
appliedSeq = std::max(appliedSeq, it->first);
|
||||
_removalBuffers.erase(it);
|
||||
break;
|
||||
}
|
||||
appliedSeq = std::max(appliedSeq, it->first);
|
||||
_removalBuffers.erase(it);
|
||||
}
|
||||
}
|
||||
|
||||
if (foundTruncate) {
|
||||
LOG_TOPIC("bf36c", TRACE, Logger::ENGINES)
|
||||
<< "applying truncate with stamp " << ignoreSeq;
|
||||
clear(); // clear estimates
|
||||
}
|
||||
|
||||
while (!inserts.empty()) {
|
||||
auto batch = inserts.begin();
|
||||
|
||||
if (foundTruncate) {
|
||||
clear(); // clear estimates
|
||||
}
|
||||
|
||||
// no inserts or removals left to apply, drop out of loop
|
||||
if (inserts.empty() && removals.empty()) {
|
||||
break;
|
||||
}
|
||||
|
||||
// apply inserts
|
||||
for (auto const& key : *batch) {
|
||||
for (auto const& key : inserts) {
|
||||
insert(key);
|
||||
}
|
||||
inserts.erase(batch);
|
||||
}
|
||||
|
||||
while (!removals.empty()) {
|
||||
auto batch = removals.begin();
|
||||
inserts.clear();
|
||||
|
||||
// apply removals
|
||||
for (auto const& key : *batch) {
|
||||
for (auto const& key : removals) {
|
||||
remove(key);
|
||||
}
|
||||
removals.erase(batch);
|
||||
}
|
||||
removals.clear();
|
||||
} // </while(true)>
|
||||
});
|
||||
return appliedSeq;
|
||||
}
|
||||
|
|
|
@ -278,6 +278,83 @@ TEST_CASE("IndexEstimator", "[rocksdb][indexestimator]") {
|
|||
REQUIRE(est.appliedSeq() == expected);
|
||||
REQUIRE(0.1 == est.computeEstimate());
|
||||
}
|
||||
|
||||
SECTION("test_truncate_logic") {
|
||||
rocksdb::SequenceNumber currentSeq(0);
|
||||
rocksdb::SequenceNumber expected;
|
||||
RocksDBCuckooIndexEstimator<uint64_t> est(2048);
|
||||
RocksDBCollectionMeta meta;
|
||||
|
||||
// test buffering where we keep around one old blocker
|
||||
for (size_t iteration = 0; iteration < 10; iteration++) {
|
||||
uint64_t index = 0;
|
||||
std::vector<uint64_t> toInsert(10);
|
||||
std::vector<uint64_t> toRemove(0);
|
||||
std::generate(toInsert.begin(), toInsert.end(),
|
||||
[&index] { return ++index; });
|
||||
|
||||
est.bufferUpdates(++currentSeq, std::move(toInsert), std::move(toRemove));
|
||||
}
|
||||
|
||||
// now make sure we haven't applied anything
|
||||
std::string serialization;
|
||||
expected = currentSeq;
|
||||
est.serialize(serialization, ++currentSeq);
|
||||
serialization.clear();
|
||||
REQUIRE(est.appliedSeq() == expected);
|
||||
REQUIRE(0.1 == est.computeEstimate());
|
||||
|
||||
// multiple turncate
|
||||
est.bufferTruncate(currentSeq++);
|
||||
est.bufferTruncate(currentSeq++);
|
||||
est.bufferTruncate(currentSeq++);
|
||||
|
||||
uint64_t index = 0;
|
||||
std::vector<uint64_t> toInsert(10);
|
||||
std::vector<uint64_t> toRemove(0);
|
||||
std::generate(toInsert.begin(), toInsert.end(),
|
||||
[&index] { return ++index; });
|
||||
est.bufferUpdates(++currentSeq, std::move(toInsert), std::move(toRemove));
|
||||
|
||||
|
||||
expected = currentSeq;
|
||||
// now make sure we haven't applied anything
|
||||
est.serialize(serialization, currentSeq);
|
||||
serialization.clear();
|
||||
REQUIRE(est.appliedSeq() == expected);
|
||||
REQUIRE(1.0 == est.computeEstimate());
|
||||
}
|
||||
|
||||
SECTION("test_truncate_logic_2") {
|
||||
rocksdb::SequenceNumber currentSeq(0);
|
||||
RocksDBCuckooIndexEstimator<uint64_t> est(2048);
|
||||
RocksDBCollectionMeta meta;
|
||||
|
||||
// test buffering where we keep around one old blocker
|
||||
for (size_t iteration = 0; iteration < 10; iteration++) {
|
||||
uint64_t index = 0;
|
||||
std::vector<uint64_t> toInsert(10);
|
||||
std::vector<uint64_t> toRemove(0);
|
||||
std::generate(toInsert.begin(), toInsert.end(),
|
||||
[&index] { return ++index; });
|
||||
|
||||
est.bufferUpdates(++currentSeq, std::move(toInsert), std::move(toRemove));
|
||||
}
|
||||
|
||||
// truncate in the middle
|
||||
est.bufferTruncate(++currentSeq);
|
||||
|
||||
auto expected = currentSeq;
|
||||
std::string serialization;
|
||||
est.serialize(serialization, ++currentSeq);
|
||||
serialization.clear();
|
||||
REQUIRE(est.appliedSeq() == expected);
|
||||
REQUIRE(1.0 == est.computeEstimate());
|
||||
|
||||
est.serialize(serialization, ++currentSeq);
|
||||
REQUIRE(est.appliedSeq() == expected);
|
||||
REQUIRE(1.0 == est.computeEstimate());
|
||||
}
|
||||
|
||||
// @brief generate tests
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue