mirror of https://gitee.com/bigwinds/arangodb
Fix background indexing (#8065)
This commit is contained in:
parent
0f889d61bc
commit
655d594949
|
@ -559,12 +559,12 @@ arangodb::Result RocksDBBuilderIndex::fillIndexBackground(Locker& locker) {
|
|||
rootDB->ReleaseSnapshot(snap);
|
||||
snap = nullptr;
|
||||
|
||||
int maxCatchups = 3;
|
||||
rocksdb::SequenceNumber lastScanned = 0;
|
||||
uint64_t numScanned = 0;
|
||||
|
||||
int maxCatchups = 4;
|
||||
while(true) {
|
||||
do {
|
||||
lastScanned = 0;
|
||||
numScanned = 0;
|
||||
if (internal->unique()) {
|
||||
const rocksdb::Comparator* cmp = internal->columnFamily()->GetComparator();
|
||||
// unique index. we need to keep track of all our changes because we need to
|
||||
|
@ -586,14 +586,10 @@ arangodb::Result RocksDBBuilderIndex::fillIndexBackground(Locker& locker) {
|
|||
return res;
|
||||
}
|
||||
|
||||
if (numScanned < 5000 || maxCatchups-- == 0) {
|
||||
TRI_ASSERT(lastScanned > scanFrom);
|
||||
std::this_thread::yield();
|
||||
break;
|
||||
}
|
||||
}
|
||||
scanFrom = lastScanned;
|
||||
} while (maxCatchups-- > 0 && numScanned > 5000);
|
||||
|
||||
if (!locker.lock()) {
|
||||
if (!locker.lock()) { // acquire exclusive collection lock
|
||||
return res.reset(TRI_ERROR_LOCK_TIMEOUT);
|
||||
}
|
||||
|
||||
|
|
|
@ -401,11 +401,12 @@ std::shared_ptr<Index> RocksDBCollection::createIndex(VPackSlice const& info,
|
|||
_indexes.emplace_back(buildIdx);
|
||||
res = buildIdx->fillIndexBackground(locker);
|
||||
} else {
|
||||
res = buildIdx->fillIndexForeground(); // will lock again internally
|
||||
res = buildIdx->fillIndexForeground();
|
||||
}
|
||||
}
|
||||
locker.lock(); // always lock to avoid inconsistencies
|
||||
|
||||
TRI_ASSERT(res.fail() || locker.isLocked()); // always lock to avoid inconsistencies
|
||||
locker.lock();
|
||||
|
||||
// Step 5. cleanup
|
||||
if (res.ok()) {
|
||||
{
|
||||
|
@ -1206,9 +1207,9 @@ Result RocksDBCollection::insertDocument(arangodb::transaction::Methods* trx,
|
|||
IndexingDisabler disabler(mthds, trx->isSingleOperationTransaction());
|
||||
|
||||
rocksdb::Status s =
|
||||
mthds->Put(RocksDBColumnFamily::documents(), key.ref(),
|
||||
rocksdb::Slice(reinterpret_cast<char const*>(doc.begin()),
|
||||
static_cast<size_t>(doc.byteSize())));
|
||||
mthds->PutUntracked(RocksDBColumnFamily::documents(), key.ref(),
|
||||
rocksdb::Slice(doc.startAs<char>(),
|
||||
static_cast<size_t>(doc.byteSize())));
|
||||
if (!s.ok()) {
|
||||
return res.reset(rocksutils::convertStatus(s, rocksutils::document));
|
||||
}
|
||||
|
@ -1297,9 +1298,9 @@ Result RocksDBCollection::updateDocument(transaction::Methods* trx,
|
|||
|
||||
key->constructDocument(_objectId, newDocumentId);
|
||||
// simon: we do not need to blacklist the new documentId
|
||||
s = mthd->Put(RocksDBColumnFamily::documents(), key.ref(),
|
||||
rocksdb::Slice(reinterpret_cast<char const*>(newDoc.begin()),
|
||||
static_cast<size_t>(newDoc.byteSize())));
|
||||
s = mthd->PutUntracked(RocksDBColumnFamily::documents(), key.ref(),
|
||||
rocksdb::Slice(newDoc.startAs<char>(),
|
||||
static_cast<size_t>(newDoc.byteSize())));
|
||||
if (!s.ok()) {
|
||||
return res.reset(rocksutils::convertStatus(s, rocksutils::document));
|
||||
}
|
||||
|
|
|
@ -165,6 +165,11 @@ rocksdb::Status RocksDBReadOnlyMethods::Put(rocksdb::ColumnFamilyHandle* cf,
|
|||
THROW_ARANGO_EXCEPTION(TRI_ERROR_ARANGO_READ_ONLY);
|
||||
}
|
||||
|
||||
rocksdb::Status RocksDBReadOnlyMethods::PutUntracked(rocksdb::ColumnFamilyHandle* cf,
|
||||
RocksDBKey const&, rocksdb::Slice const&) {
|
||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_ARANGO_READ_ONLY);
|
||||
}
|
||||
|
||||
rocksdb::Status RocksDBReadOnlyMethods::Delete(rocksdb::ColumnFamilyHandle* cf,
|
||||
RocksDBKey const& key) {
|
||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_ARANGO_READ_ONLY);
|
||||
|
@ -229,6 +234,12 @@ rocksdb::Status RocksDBTrxMethods::Put(rocksdb::ColumnFamilyHandle* cf,
|
|||
return _state->_rocksTransaction->Put(cf, key.string(), val);
|
||||
}
|
||||
|
||||
rocksdb::Status RocksDBTrxMethods::PutUntracked(rocksdb::ColumnFamilyHandle* cf,
|
||||
RocksDBKey const& key, rocksdb::Slice const& val) {
|
||||
TRI_ASSERT(cf != nullptr);
|
||||
return _state->_rocksTransaction->PutUntracked(cf, key.string(), val);
|
||||
}
|
||||
|
||||
rocksdb::Status RocksDBTrxMethods::Delete(rocksdb::ColumnFamilyHandle* cf,
|
||||
RocksDBKey const& key) {
|
||||
TRI_ASSERT(cf != nullptr);
|
||||
|
@ -268,30 +279,6 @@ void RocksDBTrxMethods::PopSavePoint() {
|
|||
#endif
|
||||
}
|
||||
|
||||
// =================== RocksDBTrxUntrackedMethods ====================
|
||||
|
||||
RocksDBTrxUntrackedMethods::RocksDBTrxUntrackedMethods(RocksDBTransactionState* state)
|
||||
: RocksDBTrxMethods(state) {}
|
||||
|
||||
rocksdb::Status RocksDBTrxUntrackedMethods::Put(rocksdb::ColumnFamilyHandle* cf,
|
||||
RocksDBKey const& key,
|
||||
rocksdb::Slice const& val) {
|
||||
TRI_ASSERT(cf != nullptr);
|
||||
return _state->_rocksTransaction->PutUntracked(cf, key.string(), val);
|
||||
}
|
||||
|
||||
rocksdb::Status RocksDBTrxUntrackedMethods::Delete(rocksdb::ColumnFamilyHandle* cf,
|
||||
RocksDBKey const& key) {
|
||||
TRI_ASSERT(cf != nullptr);
|
||||
return _state->_rocksTransaction->DeleteUntracked(cf, key.string());
|
||||
}
|
||||
|
||||
rocksdb::Status RocksDBTrxUntrackedMethods::SingleDelete(rocksdb::ColumnFamilyHandle* cf,
|
||||
RocksDBKey const& key) {
|
||||
TRI_ASSERT(cf != nullptr);
|
||||
return _state->_rocksTransaction->SingleDeleteUntracked(cf, key.string());
|
||||
}
|
||||
|
||||
// =================== RocksDBBatchedMethods ====================
|
||||
|
||||
RocksDBBatchedMethods::RocksDBBatchedMethods(RocksDBTransactionState* state,
|
||||
|
@ -318,6 +305,12 @@ rocksdb::Status RocksDBBatchedMethods::Put(rocksdb::ColumnFamilyHandle* cf,
|
|||
return _wb->Put(cf, key.string(), val);
|
||||
}
|
||||
|
||||
rocksdb::Status RocksDBBatchedMethods::PutUntracked(rocksdb::ColumnFamilyHandle* cf,
|
||||
RocksDBKey const& key,
|
||||
rocksdb::Slice const& val) {
|
||||
return RocksDBBatchedMethods::Put(cf, key, val);
|
||||
}
|
||||
|
||||
rocksdb::Status RocksDBBatchedMethods::Delete(rocksdb::ColumnFamilyHandle* cf,
|
||||
RocksDBKey const& key) {
|
||||
TRI_ASSERT(cf != nullptr);
|
||||
|
@ -371,6 +364,12 @@ rocksdb::Status RocksDBBatchedWithIndexMethods::Put(rocksdb::ColumnFamilyHandle*
|
|||
return _wb->Put(cf, key.string(), val);
|
||||
}
|
||||
|
||||
rocksdb::Status RocksDBBatchedWithIndexMethods::PutUntracked(rocksdb::ColumnFamilyHandle* cf,
|
||||
RocksDBKey const& key,
|
||||
rocksdb::Slice const& val) {
|
||||
return RocksDBBatchedWithIndexMethods::Put(cf, key, val);
|
||||
}
|
||||
|
||||
rocksdb::Status RocksDBBatchedWithIndexMethods::Delete(rocksdb::ColumnFamilyHandle* cf,
|
||||
RocksDBKey const& key) {
|
||||
TRI_ASSERT(cf != nullptr);
|
||||
|
|
|
@ -88,6 +88,9 @@ class RocksDBMethods {
|
|||
rocksdb::Slice const&, rocksdb::PinnableSlice*) = 0;
|
||||
virtual rocksdb::Status Put(rocksdb::ColumnFamilyHandle*, RocksDBKey const&,
|
||||
rocksdb::Slice const&) = 0;
|
||||
/// Like Put, but will not perform any write-write conflict checks
|
||||
virtual rocksdb::Status PutUntracked(rocksdb::ColumnFamilyHandle*, RocksDBKey const&,
|
||||
rocksdb::Slice const&) = 0;
|
||||
|
||||
virtual rocksdb::Status Delete(rocksdb::ColumnFamilyHandle*, RocksDBKey const&) = 0;
|
||||
/// contrary to Delete, a SingleDelete may only be used
|
||||
|
@ -122,6 +125,8 @@ class RocksDBReadOnlyMethods final : public RocksDBMethods {
|
|||
rocksdb::PinnableSlice* val) override;
|
||||
rocksdb::Status Put(rocksdb::ColumnFamilyHandle*, RocksDBKey const& key,
|
||||
rocksdb::Slice const& val) override;
|
||||
rocksdb::Status PutUntracked(rocksdb::ColumnFamilyHandle*, RocksDBKey const& key,
|
||||
rocksdb::Slice const& val) override;
|
||||
rocksdb::Status Delete(rocksdb::ColumnFamilyHandle*, RocksDBKey const& key) override;
|
||||
rocksdb::Status SingleDelete(rocksdb::ColumnFamilyHandle*, RocksDBKey const&) override;
|
||||
void PutLogData(rocksdb::Slice const&) override;
|
||||
|
@ -155,6 +160,8 @@ class RocksDBTrxMethods : public RocksDBMethods {
|
|||
rocksdb::PinnableSlice* val) override;
|
||||
rocksdb::Status Put(rocksdb::ColumnFamilyHandle*, RocksDBKey const& key,
|
||||
rocksdb::Slice const& val) override;
|
||||
rocksdb::Status PutUntracked(rocksdb::ColumnFamilyHandle*, RocksDBKey const& key,
|
||||
rocksdb::Slice const& val) override;
|
||||
rocksdb::Status Delete(rocksdb::ColumnFamilyHandle*, RocksDBKey const& key) override;
|
||||
rocksdb::Status SingleDelete(rocksdb::ColumnFamilyHandle*, RocksDBKey const&) override;
|
||||
void PutLogData(rocksdb::Slice const&) override;
|
||||
|
@ -169,17 +176,6 @@ class RocksDBTrxMethods : public RocksDBMethods {
|
|||
bool _indexingDisabled;
|
||||
};
|
||||
|
||||
/// transaction wrapper, uses the current rocksdb transaction and non-tracking
|
||||
/// methods
|
||||
class RocksDBTrxUntrackedMethods final : public RocksDBTrxMethods {
|
||||
public:
|
||||
explicit RocksDBTrxUntrackedMethods(RocksDBTransactionState* state);
|
||||
|
||||
rocksdb::Status Put(rocksdb::ColumnFamilyHandle*, RocksDBKey const& key,
|
||||
rocksdb::Slice const& val) override;
|
||||
rocksdb::Status Delete(rocksdb::ColumnFamilyHandle*, RocksDBKey const& key) override;
|
||||
rocksdb::Status SingleDelete(rocksdb::ColumnFamilyHandle*, RocksDBKey const&) override;
|
||||
};
|
||||
|
||||
/// wraps a writebatch - non transactional
|
||||
class RocksDBBatchedMethods final : public RocksDBMethods {
|
||||
|
@ -192,6 +188,8 @@ class RocksDBBatchedMethods final : public RocksDBMethods {
|
|||
rocksdb::PinnableSlice* val) override;
|
||||
rocksdb::Status Put(rocksdb::ColumnFamilyHandle*, RocksDBKey const& key,
|
||||
rocksdb::Slice const& val) override;
|
||||
rocksdb::Status PutUntracked(rocksdb::ColumnFamilyHandle*, RocksDBKey const& key,
|
||||
rocksdb::Slice const& val) override;
|
||||
rocksdb::Status Delete(rocksdb::ColumnFamilyHandle*, RocksDBKey const& key) override;
|
||||
rocksdb::Status SingleDelete(rocksdb::ColumnFamilyHandle*, RocksDBKey const&) override;
|
||||
void PutLogData(rocksdb::Slice const&) override;
|
||||
|
@ -220,6 +218,8 @@ class RocksDBBatchedWithIndexMethods final : public RocksDBMethods {
|
|||
rocksdb::PinnableSlice* val) override;
|
||||
rocksdb::Status Put(rocksdb::ColumnFamilyHandle*, RocksDBKey const& key,
|
||||
rocksdb::Slice const& val) override;
|
||||
rocksdb::Status PutUntracked(rocksdb::ColumnFamilyHandle*, RocksDBKey const& key,
|
||||
rocksdb::Slice const& val) override;
|
||||
rocksdb::Status Delete(rocksdb::ColumnFamilyHandle*, RocksDBKey const& key) override;
|
||||
rocksdb::Status SingleDelete(rocksdb::ColumnFamilyHandle*, RocksDBKey const&) override;
|
||||
void PutLogData(rocksdb::Slice const&) override;
|
||||
|
|
|
@ -144,13 +144,7 @@ Result RocksDBTransactionState::beginTransaction(transaction::Hints hints) {
|
|||
TRI_ASSERT(_readSnapshot != nullptr);
|
||||
}
|
||||
|
||||
// with exlusive locking there is no chance of conflict
|
||||
// with other transactions -> we can use untracked< Put/Delete methods
|
||||
if (isOnlyExclusiveTransaction()) {
|
||||
_rocksMethods.reset(new RocksDBTrxUntrackedMethods(this));
|
||||
} else {
|
||||
_rocksMethods.reset(new RocksDBTrxMethods(this));
|
||||
}
|
||||
_rocksMethods.reset(new RocksDBTrxMethods(this));
|
||||
|
||||
if (hasHint(transaction::Hints::Hint::NO_INDEXING)) {
|
||||
// do not track our own writes... we can only use this in very
|
||||
|
|
|
@ -70,7 +70,6 @@ class RocksDBTransactionState final : public TransactionState {
|
|||
friend class RocksDBMethods;
|
||||
friend class RocksDBReadOnlyMethods;
|
||||
friend class RocksDBTrxMethods;
|
||||
friend class RocksDBTrxUntrackedMethods;
|
||||
friend class RocksDBBatchedMethods;
|
||||
friend class RocksDBBatchedWithIndexMethods;
|
||||
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
/*jshint globalstrict:false, strict:false */
|
||||
/*global fail, assertEqual, assertNotEqual, assertTrue, assertFalse */
|
||||
/*global fail, assertEqual, assertNotEqual, assertTrue, assertFalse, assertNotUndefined */
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief test the index
|
||||
|
@ -193,7 +193,7 @@ function backgroundIndexSuite() {
|
|||
let c = require("internal").db._collection(cn);
|
||||
// first lets add some initial documents
|
||||
let x = 0;
|
||||
while(x < 10000) {
|
||||
while(x < 5000) {
|
||||
let docs = [];
|
||||
for(let i = 0; i < 1000; i++) {
|
||||
docs.push({value: x++});
|
||||
|
@ -201,11 +201,18 @@ function backgroundIndexSuite() {
|
|||
c.save(docs);
|
||||
}
|
||||
|
||||
const idxDef = {type: 'skiplist', fields: ['value'], unique: true, inBackground: true};
|
||||
// lets insert the rest via tasks
|
||||
for (let i = 1; i < 5; ++i) {
|
||||
if (i === 2) { // create the index in a task
|
||||
let command = `const c = require("internal").db._collection("${cn}");
|
||||
let idx = c.ensureIndex(${JSON.stringify(idxDef)});
|
||||
c.save({_key: 'myindex', index: idx});`;
|
||||
tasks.register({ name: "UnitTestsIndexCreateIDX" + i, command: command });
|
||||
}
|
||||
let command = `const c = require("internal").db._collection("${cn}");
|
||||
let x = ${i} * 10000;
|
||||
while(x < ${i + 1} * 10000) {
|
||||
let x = ${i} * 5000;
|
||||
while(x < ${i + 1} * 5000) {
|
||||
let docs = [];
|
||||
for(let i = 0; i < 1000; i++) {
|
||||
docs.push({value: x++})
|
||||
|
@ -215,15 +222,20 @@ function backgroundIndexSuite() {
|
|||
tasks.register({ name: "UnitTestsIndexInsert" + i, command: command });
|
||||
}
|
||||
|
||||
// create the index on the main thread
|
||||
c.ensureIndex({type: 'hash', fields: ['value'], unique: true, inBackground: true });
|
||||
|
||||
// wait for insertion tasks to complete
|
||||
waitForTasks();
|
||||
|
||||
// sanity checks
|
||||
assertEqual(c.count(), 50000);
|
||||
for (let i = 0; i < 50000; i++) {
|
||||
assertEqual(c.count(), 25001);
|
||||
|
||||
// verify that the index was created
|
||||
let idx = c.document('myindex').index;
|
||||
assertNotUndefined(idx);
|
||||
idxDef.inBackground = false;
|
||||
let cmp = c.ensureIndex(idxDef);
|
||||
assertEqual(cmp.id, idx.id);
|
||||
|
||||
for (let i = 0; i < 25000; i++) {
|
||||
const cursor = db._query("FOR doc IN @@coll FILTER doc.value == @val RETURN 1",
|
||||
{'@coll': cn, 'val': i}, {count:true});
|
||||
assertEqual(cursor.count(), 1);
|
||||
|
@ -234,7 +246,7 @@ function backgroundIndexSuite() {
|
|||
switch (i.type) {
|
||||
case 'primary':
|
||||
break;
|
||||
case 'hash':
|
||||
case 'skiplist':
|
||||
assertEqual(i.selectivityEstimate, 1.0);
|
||||
break;
|
||||
default:
|
||||
|
|
Loading…
Reference in New Issue