1
0
Fork 0

Truncate Fix (Nr 6) (#7176)

This commit is contained in:
Simon 2018-10-31 16:32:00 +01:00 committed by Jan
parent 42fd0825ab
commit f2f4a0fdaf
5 changed files with 29 additions and 26 deletions

View File

@ -77,7 +77,7 @@ void CollectionAccessingNode::toVelocyPack(arangodb::velocypack::Builder& builde
builder.add("satellite", VPackValue(_collection->isSatellite())); builder.add("satellite", VPackValue(_collection->isSatellite()));
if (ServerState::instance()->isCoordinator()) { if (ServerState::instance()->isCoordinator()) {
builder.add("numberOfShards", VPackValue(_collection->numberOfShards())); builder.add(StaticStrings::NumberOfShards, VPackValue(_collection->numberOfShards()));
} }
if (!_restrictedTo.empty()) { if (!_restrictedTo.empty()) {

View File

@ -299,12 +299,12 @@ void RestCollectionHandler::handleCommandPost() {
} }
// for some "security" a white-list of allowed parameters // for some "security" a white-list of allowed parameters
VPackBuilder filtered = VPackCollection::keep( VPackBuilder filtered = VPackCollection::keep(body,
body,
std::unordered_set<std::string>{ std::unordered_set<std::string>{
"doCompact", "isSystem", "id", "isVolatile", "journalSize", "doCompact", "isSystem", "id", "isVolatile", "journalSize",
"indexBuckets", "keyOptions", "waitForSync", "cacheEnabled", "indexBuckets", "keyOptions", "waitForSync", "cacheEnabled",
"shardKeys", "numberOfShards", "distributeShardsLike", "avoidServers", StaticStrings::ShardKeys, StaticStrings::NumberOfShards,
StaticStrings::DistributeShardsLike, "avoidServers",
"isSmart", "shardingStrategy", "smartGraphAttribute", "replicationFactor", "isSmart", "shardingStrategy", "smartGraphAttribute", "replicationFactor",
"servers"}); "servers"});
VPackSlice const parameters = filtered.slice(); VPackSlice const parameters = filtered.slice();

View File

@ -1909,6 +1909,8 @@ RocksDBCollection::serializeIndexEstimates(
rocksdb::Transaction* rtrx, rocksdb::SequenceNumber inputSeq) const { rocksdb::Transaction* rtrx, rocksdb::SequenceNumber inputSeq) const {
auto outputSeq = inputSeq; auto outputSeq = inputSeq;
std::string output; std::string output;
RocksDBKey key;
for (auto index : getIndexes()) { for (auto index : getIndexes()) {
output.clear(); output.clear();
RocksDBIndex* cindex = static_cast<RocksDBIndex*>(index.get()); RocksDBIndex* cindex = static_cast<RocksDBIndex*>(index.get());
@ -1923,7 +1925,6 @@ RocksDBCollection::serializeIndexEstimates(
<< "serialized estimate for index '" << cindex->objectId() << "serialized estimate for index '" << cindex->objectId()
<< "' valid through seq " << outputSeq; << "' valid through seq " << outputSeq;
if (output.size() > sizeof(uint64_t)) { if (output.size() > sizeof(uint64_t)) {
RocksDBKey key;
key.constructIndexEstimateValue(cindex->objectId()); key.constructIndexEstimateValue(cindex->objectId());
rocksdb::Slice value(output); rocksdb::Slice value(output);
rocksdb::Status s = rocksdb::Status s =

View File

@ -580,28 +580,29 @@ class RocksDBCuckooIndexEstimator {
Result res = basics::catchVoidToResult([&]() -> void { Result res = basics::catchVoidToResult([&]() -> void {
std::vector<Key> inserts; std::vector<Key> inserts;
std::vector<Key> removals; std::vector<Key> removals;
bool foundTruncate = false;
// truncate will increase this sequence
rocksdb::SequenceNumber ignoreSeq = 0;
while (true) { while (true) {
bool foundTruncate = false;
// find out if we have buffers to apply // find out if we have buffers to apply
{ {
WRITE_LOCKER(locker, _lock); WRITE_LOCKER(locker, _lock);
rocksdb::SequenceNumber ignoreSeq = 0;
// check for a truncate marker // check for a truncate marker
if (!_truncateBuffer.empty()) { auto it = _truncateBuffer.begin(); // sorted ASC
auto it = _truncateBuffer.begin(); // sorted ASC while (it != _truncateBuffer.end() && *it <= commitSeq) {
while (*it <= commitSeq && *it >= ignoreSeq) { ignoreSeq = *it;
ignoreSeq = *it; TRI_ASSERT(ignoreSeq != 0);
foundTruncate = true; foundTruncate = true;
it = _truncateBuffer.erase(it); it = _truncateBuffer.erase(it);
}
} }
// check for inserts // check for inserts
if (!_insertBuffers.empty()) { if (!_insertBuffers.empty()) {
auto it = _insertBuffers.begin(); // sorted ASC auto it = _insertBuffers.begin(); // sorted ASC
if (it->first <= commitSeq) { if (it->first <= commitSeq) {
if (!foundTruncate || it->first > ignoreSeq) { if (it->first >= ignoreSeq) {
inserts = std::move(it->second); inserts = std::move(it->second);
TRI_ASSERT(!inserts.empty()); TRI_ASSERT(!inserts.empty());
} }
@ -613,7 +614,7 @@ class RocksDBCuckooIndexEstimator {
if (!_removalBuffers.empty()) { if (!_removalBuffers.empty()) {
auto it = _removalBuffers.begin(); // sorted ASC auto it = _removalBuffers.begin(); // sorted ASC
if (it->first <= commitSeq) { if (it->first <= commitSeq) {
if (!foundTruncate || it->first > ignoreSeq) { if (it->first >= ignoreSeq) {
removals = std::move(it->second); removals = std::move(it->second);
TRI_ASSERT(!removals.empty()); TRI_ASSERT(!removals.empty());
} }
@ -624,7 +625,6 @@ class RocksDBCuckooIndexEstimator {
if (foundTruncate) { if (foundTruncate) {
clear(); // clear estimates clear(); // clear estimates
foundTruncate = false;
} }
// no inserts or removals left to apply, drop out of loop // no inserts or removals left to apply, drop out of loop

View File

@ -282,7 +282,7 @@ class WBReader final : public rocksdb::WriteBatch::Handler {
} }
} }
/// Truncate indexes of collection /// Truncate indexes of collection with objectId
bool truncateIndexes(uint64_t objectId) { bool truncateIndexes(uint64_t objectId) {
RocksDBEngine* engine = RocksDBEngine* engine =
static_cast<RocksDBEngine*>(EngineSelectorFeature::ENGINE); static_cast<RocksDBEngine*>(EngineSelectorFeature::ENGINE);
@ -304,9 +304,10 @@ class WBReader final : public rocksdb::WriteBatch::Handler {
} }
for (auto const& idx : coll->getIndexes()) { for (auto const& idx : coll->getIndexes()) {
LOG_DEVEL << "truncating index: " << idx->typeName();
RocksDBIndex* ridx = static_cast<RocksDBIndex*>(idx.get()); RocksDBIndex* ridx = static_cast<RocksDBIndex*>(idx.get());
RocksDBCuckooIndexEstimator<uint64_t>* est = ridx->estimator(); RocksDBCuckooIndexEstimator<uint64_t>* est = ridx->estimator();
if (est) { if (est && est->commitSeq() < currentSeqNum) {
est->bufferTruncate(currentSeqNum); est->bufferTruncate(currentSeqNum);
} }
} }
@ -314,6 +315,7 @@ class WBReader final : public rocksdb::WriteBatch::Handler {
return true; return true;
} }
// find estimator for index
RocksDBCuckooIndexEstimator<uint64_t>* findEstimator(uint64_t objectId) { RocksDBCuckooIndexEstimator<uint64_t>* findEstimator(uint64_t objectId) {
RocksDBEngine* engine = RocksDBEngine* engine =
static_cast<RocksDBEngine*>(EngineSelectorFeature::ENGINE); static_cast<RocksDBEngine*>(EngineSelectorFeature::ENGINE);
@ -536,12 +538,12 @@ class WBReader final : public rocksdb::WriteBatch::Handler {
ops->removed = 0; ops->removed = 0;
ops->added = 0; ops->added = 0;
ops->mustTruncate = true; ops->mustTruncate = true;
}
if (!truncateIndexes(objectId)) { // index estimates have their own commitSeq
// unable to truncate indexes of the collection. if (!truncateIndexes(objectId)) {
// may be due to collection having been deleted etc. // unable to truncate indexes of the collection.
LOG_TOPIC(WARN, Logger::ENGINES) << "unable to truncate indexes for objectId " << objectId; // may be due to collection having been deleted etc.
} LOG_TOPIC(WARN, Logger::ENGINES) << "unable to truncate indexes for objectId " << objectId;
} }
_lastRemovedDocRid = 0; // reset in any other case _lastRemovedDocRid = 0; // reset in any other case