mirror of https://gitee.com/bigwinds/arangodb
do not buffer index estimates (#9338)
This commit is contained in:
parent
b97a62c0de
commit
17468f1dc9
|
@ -319,12 +319,31 @@ std::shared_ptr<Index> RocksDBCollection::lookupIndex(velocypack::Slice const& i
|
|||
return findIndex(info, _indexes);
|
||||
}
|
||||
|
||||
namespace {
|
||||
struct BuilderTrx : public arangodb::transaction::Methods {
|
||||
BuilderTrx(std::shared_ptr<transaction::Context> const& transactionContext,
|
||||
LogicalDataSource const& collection)
|
||||
: transaction::Methods(transactionContext), _cid(collection.id()) {
|
||||
// add the (sole) data-source
|
||||
addCollection(collection.id(), collection.name(), AccessMode::Type::EXCLUSIVE);
|
||||
addHint(transaction::Hints::Hint::NO_DLD);
|
||||
}
|
||||
|
||||
/// @brief get the underlying transaction collection
|
||||
RocksDBTransactionCollection* resolveTrxCollection() {
|
||||
return static_cast<RocksDBTransactionCollection*>(trxCollection(_cid));
|
||||
}
|
||||
|
||||
private:
|
||||
TRI_voc_cid_t _cid;
|
||||
};
|
||||
}
|
||||
|
||||
std::shared_ptr<Index> RocksDBCollection::createIndex(arangodb::velocypack::Slice const& info,
|
||||
bool restore, bool& created) {
|
||||
TRI_ASSERT(info.isObject());
|
||||
SingleCollectionTransaction trx( // prevent concurrent dropping
|
||||
transaction::StandaloneContext::Create(_logicalCollection.vocbase()),
|
||||
_logicalCollection, AccessMode::Type::EXCLUSIVE);
|
||||
::BuilderTrx trx( // prevent concurrent dropping
|
||||
transaction::StandaloneContext::Create(_logicalCollection.vocbase()), _logicalCollection);
|
||||
Result res = trx.begin();
|
||||
|
||||
if (!res.ok()) {
|
||||
|
@ -1178,6 +1197,8 @@ static arangodb::Result fillIndex(transaction::Methods* trx, RocksDBIndex* ridx,
|
|||
std::unique_ptr<IndexIterator> it,
|
||||
WriteBatchType& batch, RocksDBCollection* rcol) {
|
||||
auto state = RocksDBTransactionState::toState(trx);
|
||||
auto* btrx = static_cast<::BuilderTrx*>(trx);
|
||||
RocksDBTransactionCollection* tcoll = btrx->resolveTrxCollection();
|
||||
|
||||
// fillindex can be non transactional, we just need to clean up
|
||||
rocksdb::DB* db = rocksutils::globalRocksDB()->GetRootDB();
|
||||
|
@ -1217,6 +1238,19 @@ static arangodb::Result fillIndex(transaction::Methods* trx, RocksDBIndex* ridx,
|
|||
res = rocksutils::convertStatus(s, rocksutils::StatusHint::index);
|
||||
break;
|
||||
}
|
||||
|
||||
auto ops = tcoll->stealTrackedOperations();
|
||||
if (!ops.empty()) {
|
||||
TRI_ASSERT(ridx->hasSelectivityEstimate() && ops.size() == 1);
|
||||
auto it = ops.begin();
|
||||
TRI_ASSERT(ridx->id() == it->first);
|
||||
for (uint64_t hash : it->second.inserts) {
|
||||
ridx->estimator()->insert(hash);
|
||||
}
|
||||
for (uint64_t hash : it->second.removals) {
|
||||
ridx->estimator()->remove(hash);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
batch.Clear();
|
||||
|
|
|
@ -272,14 +272,14 @@ void RocksDBTransactionCollection::commitCounts(uint64_t trxId, uint64_t commitS
|
|||
_trackedIndexOperations.clear();
|
||||
}
|
||||
|
||||
void RocksDBTransactionCollection::trackIndexInsert(uint64_t idxObjectId, uint64_t hash) {
|
||||
void RocksDBTransactionCollection::trackIndexInsert(TRI_idx_iid_t iid, uint64_t hash) {
|
||||
// First list is Inserts
|
||||
_trackedIndexOperations[idxObjectId].inserts.emplace_back(hash);
|
||||
_trackedIndexOperations[iid].inserts.emplace_back(hash);
|
||||
}
|
||||
|
||||
void RocksDBTransactionCollection::trackIndexRemove(uint64_t idxObjectId, uint64_t hash) {
|
||||
void RocksDBTransactionCollection::trackIndexRemove(TRI_idx_iid_t iid, uint64_t hash) {
|
||||
// Second list is Removes
|
||||
_trackedIndexOperations[idxObjectId].removals.emplace_back(hash);
|
||||
_trackedIndexOperations[iid].removals.emplace_back(hash);
|
||||
}
|
||||
|
||||
/// @brief lock a collection
|
||||
|
|
|
@ -87,12 +87,26 @@ class RocksDBTransactionCollection final : public TransactionCollection {
|
|||
|
||||
/// @brief Every index can track hashes inserted into this index
|
||||
/// Used to update the estimate after the trx commited
|
||||
void trackIndexInsert(uint64_t idxObjectId, uint64_t hash);
|
||||
void trackIndexInsert(TRI_idx_iid_t iid, uint64_t hash);
|
||||
|
||||
/// @brief Every index can track hashes removed from this index
|
||||
/// Used to update the estimate after the trx commited
|
||||
void trackIndexRemove(uint64_t idxObjectId, uint64_t hash);
|
||||
void trackIndexRemove(TRI_idx_iid_t iid, uint64_t hash);
|
||||
|
||||
/// @brief tracked index operations
|
||||
struct IndexOperations {
|
||||
std::vector<uint64_t> inserts;
|
||||
std::vector<uint64_t> removals;
|
||||
};
|
||||
typedef std::unordered_map<TRI_idx_iid_t, IndexOperations> OperationsMap;
|
||||
|
||||
/// @brief steal the tracked operations from the map
|
||||
OperationsMap stealTrackedOperations() {
|
||||
OperationsMap empty;
|
||||
_trackedIndexOperations.swap(empty);
|
||||
return empty;
|
||||
}
|
||||
|
||||
private:
|
||||
/// @brief request a lock for a collection
|
||||
/// returns TRI_ERROR_LOCKED in case the lock was successfully acquired
|
||||
|
@ -111,15 +125,10 @@ class RocksDBTransactionCollection final : public TransactionCollection {
|
|||
uint64_t _numUpdates;
|
||||
uint64_t _numRemoves;
|
||||
bool _usageLocked;
|
||||
|
||||
struct IndexOperations {
|
||||
std::vector<uint64_t> inserts;
|
||||
std::vector<uint64_t> removals;
|
||||
};
|
||||
|
||||
|
||||
/// @brief A list where all indexes with estimates can store their operations
|
||||
/// Will be applied to the inserter on commit and not applied on abort
|
||||
std::unordered_map<uint64_t, IndexOperations> _trackedIndexOperations;
|
||||
OperationsMap _trackedIndexOperations;
|
||||
};
|
||||
} // namespace arangodb
|
||||
|
||||
|
|
Loading…
Reference in New Issue