mirror of https://gitee.com/bigwinds/arangodb
Bug fix/selectivity estimates on abort (#4314)
This commit is contained in:
parent
6f9b38b974
commit
d32f3b5fe9
|
@ -1,6 +1,10 @@
|
|||
devel
|
||||
-----
|
||||
|
||||
* Fixed an issue with the index estimates in RocksDB in the case a transaction is aborted.
|
||||
Former the index estimates were modified if the transaction commited or not.
|
||||
Now they will only be modified if the transaction commited successfully.
|
||||
|
||||
* UI: optimized login view for very small screen sizes
|
||||
|
||||
* UI: optimized error messages for invalid query bind parameter
|
||||
|
|
|
@ -470,7 +470,7 @@ Result RocksDBEdgeIndex::insertInternal(transaction::Methods* trx,
|
|||
if (r.ok()) {
|
||||
std::hash<StringRef> hasher;
|
||||
uint64_t hash = static_cast<uint64_t>(hasher(fromToRef));
|
||||
_estimator->insert(hash);
|
||||
RocksDBTransactionState::toState(trx)->trackIndexInsert(_collection->cid(), id(), hash);
|
||||
return IndexResult();
|
||||
} else {
|
||||
return IndexResult(r.errorNumber(), this);
|
||||
|
@ -501,7 +501,7 @@ Result RocksDBEdgeIndex::removeInternal(transaction::Methods* trx,
|
|||
if (res.ok()) {
|
||||
std::hash<StringRef> hasher;
|
||||
uint64_t hash = static_cast<uint64_t>(hasher(fromToRef));
|
||||
_estimator->remove(hash);
|
||||
RocksDBTransactionState::toState(trx)->trackIndexRemove(_collection->cid(), id(), hash);
|
||||
return IndexResult();
|
||||
} else {
|
||||
return IndexResult(res.errorNumber(), this);
|
||||
|
@ -999,3 +999,18 @@ void RocksDBEdgeIndex::recalculateEstimates() {
|
|||
_estimator->insert(hash);
|
||||
}
|
||||
}
|
||||
|
||||
void RocksDBEdgeIndex::applyCommitedEstimates(
|
||||
std::vector<uint64_t> const& inserts,
|
||||
std::vector<uint64_t> const& removes) {
|
||||
if (_estimator != nullptr) {
|
||||
// If we have an estimator apply the changes to it.
|
||||
for (auto const& hash : inserts) {
|
||||
_estimator->insert(hash);
|
||||
}
|
||||
|
||||
for (auto const& hash : removes) {
|
||||
_estimator->remove(hash);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -175,6 +175,9 @@ class RocksDBEdgeIndex final : public RocksDBIndex {
|
|||
|
||||
virtual std::pair<RocksDBCuckooIndexEstimator<uint64_t>*, uint64_t> estimator() const override;
|
||||
|
||||
virtual void applyCommitedEstimates(std::vector<uint64_t> const& inserts,
|
||||
std::vector<uint64_t> const& removes) override;
|
||||
|
||||
private:
|
||||
/// @brief create the iterator
|
||||
IndexIterator* createEqIterator(transaction::Methods*, ManagedDocumentResult*,
|
||||
|
|
|
@ -324,3 +324,11 @@ RocksDBKeyBounds RocksDBIndex::getBounds(Index::IndexType type,
|
|||
std::pair<RocksDBCuckooIndexEstimator<uint64_t>*, uint64_t> RocksDBIndex::estimator() const {
|
||||
return std::make_pair(nullptr, 0);
|
||||
}
|
||||
|
||||
void RocksDBIndex::applyCommitedEstimates(
|
||||
std::vector<uint64_t> const& inserts,
|
||||
std::vector<uint64_t> const& removes) {
|
||||
// This function is required to be overloaded by indexes with Estimates. All other should not call this function.
|
||||
// In Production this call will be ignored, it is not critical
|
||||
TRI_ASSERT(false);
|
||||
}
|
||||
|
|
|
@ -148,7 +148,11 @@ class RocksDBIndex : public Index {
|
|||
static RocksDBKeyBounds getBounds(Index::IndexType type, uint64_t objectId,
|
||||
bool unique);
|
||||
|
||||
virtual std::pair<RocksDBCuckooIndexEstimator<uint64_t>*, uint64_t> estimator() const;
|
||||
virtual std::pair<RocksDBCuckooIndexEstimator<uint64_t>*, uint64_t>
|
||||
estimator() const;
|
||||
|
||||
virtual void applyCommitedEstimates(std::vector<uint64_t> const& inserts,
|
||||
std::vector<uint64_t> const& removes);
|
||||
|
||||
protected:
|
||||
inline bool useCache() const { return (_cacheEnabled && _cachePresent); }
|
||||
|
|
|
@ -26,6 +26,7 @@
|
|||
#include "Cluster/CollectionLockState.h"
|
||||
#include "Logger/Logger.h"
|
||||
#include "RocksDBEngine/RocksDBCollection.h"
|
||||
#include "RocksDBEngine/RocksDBIndex.h"
|
||||
#include "StorageEngine/TransactionState.h"
|
||||
#include "Transaction/Hints.h"
|
||||
#include "Transaction/Methods.h"
|
||||
|
@ -285,11 +286,36 @@ void RocksDBTransactionCollection::addOperation(
|
|||
}
|
||||
|
||||
void RocksDBTransactionCollection::commitCounts() {
|
||||
// Update the index estimates.
|
||||
TRI_ASSERT(_collection != nullptr);
|
||||
for (auto const& pair : _trackedIndexOperations) {
|
||||
auto idx = _collection->lookupIndex(pair.first);
|
||||
if (idx == nullptr) {
|
||||
TRI_ASSERT(false); // Index reported estimates, but does not exist
|
||||
continue;
|
||||
}
|
||||
auto ridx = static_cast<RocksDBIndex*>(idx.get());
|
||||
ridx->applyCommitedEstimates(pair.second.first, pair.second.second);
|
||||
}
|
||||
|
||||
_initialNumberDocuments = _numInserts - _numRemoves;
|
||||
_operationSize = 0;
|
||||
_numInserts = 0;
|
||||
_numUpdates = 0;
|
||||
_numRemoves = 0;
|
||||
_trackedIndexOperations.clear();
|
||||
}
|
||||
|
||||
void RocksDBTransactionCollection::trackIndexInsert(uint64_t idxObjectId,
|
||||
uint64_t hash) {
|
||||
// First list is Inserts
|
||||
_trackedIndexOperations[idxObjectId].first.emplace_back(hash);
|
||||
}
|
||||
|
||||
void RocksDBTransactionCollection::trackIndexRemove(uint64_t idxObjectId,
|
||||
uint64_t hash) {
|
||||
// Second list is Removes
|
||||
_trackedIndexOperations[idxObjectId].second.emplace_back(hash);
|
||||
}
|
||||
|
||||
/// @brief lock a collection
|
||||
|
|
|
@ -90,6 +90,16 @@ class RocksDBTransactionCollection final : public TransactionCollection {
|
|||
uint64_t operationSize, TRI_voc_rid_t revisionId);
|
||||
void commitCounts();
|
||||
|
||||
/// @brief Every index can track hashes inserted into this index
|
||||
/// Used to update the estimate after the trx commited
|
||||
void trackIndexInsert(uint64_t idxObjectId, uint64_t hash);
|
||||
|
||||
/// @brief Every index can track hashes removed from this index
|
||||
/// Used to update the estimate after the trx commited
|
||||
void trackIndexRemove(uint64_t idxObjectId, uint64_t hash);
|
||||
|
||||
|
||||
|
||||
private:
|
||||
/// @brief request a lock for a collection
|
||||
/// returns TRI_ERROR_LOCKED in case the lock was successfully acquired
|
||||
|
@ -110,6 +120,13 @@ class RocksDBTransactionCollection final : public TransactionCollection {
|
|||
uint64_t _numUpdates;
|
||||
uint64_t _numRemoves;
|
||||
bool _usageLocked;
|
||||
|
||||
/// @brief A list where all indexes with estimates can store their operations
|
||||
/// Will be applied to the inserter on commit and not applied on abort
|
||||
std::unordered_map<uint64_t,
|
||||
std::pair<std::vector<uint64_t>, std::vector<uint64_t>>>
|
||||
_trackedIndexOperations;
|
||||
|
||||
};
|
||||
}
|
||||
|
||||
|
|
|
@ -271,13 +271,18 @@ arangodb::Result RocksDBTransactionState::internalCommit() {
|
|||
collection->revision());
|
||||
engine->settingsManager()->updateCounter(coll->objectId(), update);
|
||||
}
|
||||
|
||||
// we need this in case of an intermediate commit. The number of
|
||||
// initial documents is adjusted and numInserts / removes is set to 0
|
||||
collection->commitCounts();
|
||||
}
|
||||
}
|
||||
} else {
|
||||
for (auto& trxCollection : _collections) {
|
||||
RocksDBTransactionCollection* collection =
|
||||
static_cast<RocksDBTransactionCollection*>(trxCollection);
|
||||
// We get here if we have filled indexes. So let us commit counts
|
||||
collection->commitCounts();
|
||||
}
|
||||
// don't write anything if the transaction is empty
|
||||
result = rocksutils::convertStatus(_rocksTransaction->Rollback());
|
||||
}
|
||||
|
@ -596,6 +601,25 @@ void RocksDBTransactionState::returnRocksDBKey(RocksDBKey* key) {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
void RocksDBTransactionState::trackIndexInsert(TRI_voc_cid_t cid, TRI_idx_iid_t idxId, uint64_t hash) {
|
||||
auto col = findCollection(cid);
|
||||
if (col != nullptr) {
|
||||
static_cast<RocksDBTransactionCollection*>(col)->trackIndexInsert(idxId, hash);
|
||||
} else {
|
||||
TRI_ASSERT(false);
|
||||
}
|
||||
}
|
||||
|
||||
void RocksDBTransactionState::trackIndexRemove(TRI_voc_cid_t cid, TRI_idx_iid_t idxId, uint64_t hash) {
|
||||
auto col = findCollection(cid);
|
||||
if (col != nullptr) {
|
||||
static_cast<RocksDBTransactionCollection*>(col)->trackIndexRemove(idxId, hash);
|
||||
} else {
|
||||
TRI_ASSERT(false);
|
||||
}
|
||||
}
|
||||
|
||||
/// @brief constructor, leases a builder
|
||||
RocksDBKeyLeaser::RocksDBKeyLeaser(transaction::Methods* trx)
|
||||
: _rtrx(RocksDBTransactionState::toState(trx)),
|
||||
|
|
|
@ -151,6 +151,14 @@ class RocksDBTransactionState final : public TransactionState {
|
|||
/// Not thread safe
|
||||
void triggerIntermediateCommit();
|
||||
|
||||
/// @brief Every index can track hashes inserted into this index
|
||||
/// Used to update the estimate after the trx commited
|
||||
void trackIndexInsert(TRI_voc_cid_t cid, TRI_idx_iid_t idxObjectId, uint64_t hash);
|
||||
|
||||
/// @brief Every index can track hashes removed from this index
|
||||
/// Used to update the estimate after the trx commited
|
||||
void trackIndexRemove(TRI_voc_cid_t cid, TRI_idx_iid_t idxObjectId, uint64_t hash);
|
||||
|
||||
private:
|
||||
/// @brief create a new rocksdb transaction
|
||||
void createTransaction();
|
||||
|
|
|
@ -610,10 +610,11 @@ Result RocksDBVPackIndex::insertInternal(transaction::Methods* trx,
|
|||
}
|
||||
|
||||
if (res == TRI_ERROR_NO_ERROR) {
|
||||
auto state = RocksDBTransactionState::toState(trx);
|
||||
for (auto& it : hashes) {
|
||||
// The estimator is only useful if we are in a non-unique indexes
|
||||
TRI_ASSERT(!_unique);
|
||||
_estimator->insert(it);
|
||||
state->trackIndexInsert(_collection->cid(), id(), it);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -737,10 +738,11 @@ Result RocksDBVPackIndex::removeInternal(transaction::Methods* trx,
|
|||
}
|
||||
|
||||
if (res == TRI_ERROR_NO_ERROR) {
|
||||
auto state = RocksDBTransactionState::toState(trx);
|
||||
for (auto& it : hashes) {
|
||||
// The estimator is only useful if we are in a non-unique indexes
|
||||
TRI_ASSERT(!_unique);
|
||||
_estimator->remove(it);
|
||||
state->trackIndexRemove(_collection->cid(), id(), it);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1584,3 +1586,18 @@ std::pair<RocksDBCuckooIndexEstimator<uint64_t>*, uint64_t>
|
|||
RocksDBVPackIndex::estimator() const {
|
||||
return std::make_pair(_estimator.get(), _estimatorSerializedSeq);
|
||||
}
|
||||
|
||||
void RocksDBVPackIndex::applyCommitedEstimates(
|
||||
std::vector<uint64_t> const& inserts,
|
||||
std::vector<uint64_t> const& removes) {
|
||||
if (_estimator != nullptr) {
|
||||
// If we have an estimator apply the changes to it.
|
||||
for (auto const& hash : inserts) {
|
||||
_estimator->insert(hash);
|
||||
}
|
||||
|
||||
for (auto const& hash : removes) {
|
||||
_estimator->remove(hash);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -214,6 +214,9 @@ class RocksDBVPackIndex : public RocksDBIndex {
|
|||
|
||||
virtual std::pair<RocksDBCuckooIndexEstimator<uint64_t>*, uint64_t> estimator() const override;
|
||||
|
||||
virtual void applyCommitedEstimates(std::vector<uint64_t> const& inserts,
|
||||
std::vector<uint64_t> const& removes) override;
|
||||
|
||||
private:
|
||||
bool isDuplicateOperator(arangodb::aql::AstNode const*,
|
||||
std::unordered_set<int> const&) const;
|
||||
|
|
|
@ -231,6 +231,13 @@ TransactionCollection* TransactionState::findCollection(
|
|||
}
|
||||
|
||||
/// @brief find a collection in the transaction's list of collections
|
||||
/// The idea is if a collection is found it will be returned.
|
||||
/// In this case the position is not used.
|
||||
/// In case the collection is not found. It will return a
|
||||
/// nullptr and the position will be set. The position
|
||||
/// defines where the collection should be inserted,
|
||||
/// so whenever we want to insert the collection we
|
||||
/// have to use this position for insert.
|
||||
TransactionCollection* TransactionState::findCollection(
|
||||
TRI_voc_cid_t cid, size_t& position) const {
|
||||
size_t const n = _collections.size();
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
/* jshint globalstrict:false, strict:false */
|
||||
/* global assertEqual, assertTrue, assertFalse, assertNotNull */
|
||||
/* global assertEqual, assertTrue, assertFalse, assertNotNull, fail */
|
||||
|
||||
// //////////////////////////////////////////////////////////////////////////////
|
||||
// / @brief test the document interface
|
||||
|
@ -399,7 +399,42 @@ function EdgeIndexSuite () {
|
|||
}
|
||||
edge.save(vn + '/from' + (i % 20), vn + '/to' + i, { });
|
||||
}
|
||||
}
|
||||
},
|
||||
|
||||
testIndexSelectivityAfterAbortion: function () {
|
||||
let docs = [];
|
||||
for (let i = 0; i < 1000; ++i) {
|
||||
docs.push({_from: `${vn}/from${i % 32}`, _to: `${vn}/to${i % 47}`});
|
||||
}
|
||||
edge.save(docs);
|
||||
let idx = edge.getIndexes()[1];
|
||||
let estimateBefore = idx.selectivityEstimate;
|
||||
try {
|
||||
internal.db._executeTransaction({
|
||||
collections: {write: en},
|
||||
action: function () {
|
||||
const vn = 'UnitTestsCollectionVertex';
|
||||
const en = 'UnitTestsCollectionEdge';
|
||||
let docs = [];
|
||||
for (let i = 0; i < 1000; ++i) {
|
||||
docs.push({_from: `${vn}/from${i % 32}`, _to: `${vn}/to${i % 47}`});
|
||||
}
|
||||
// This should significantly modify the estimate
|
||||
// if successful
|
||||
require('@arangodb').db[en].save(docs);
|
||||
throw "banana";
|
||||
}
|
||||
});
|
||||
fail();
|
||||
} catch (e) {
|
||||
assertEqual(e.errorMessage, "banana");
|
||||
// Insert failed.
|
||||
// Validate that estimate is non modified
|
||||
idx = edge.getIndexes()[1];
|
||||
assertEqual(idx.selectivityEstimate, estimateBefore);
|
||||
}
|
||||
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
/*jshint globalstrict:false, strict:false */
|
||||
/*global assertEqual, assertTrue, assertEqual */
|
||||
/*global assertEqual, assertTrue, fail */
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief test the hash index, selectivity estimates
|
||||
|
@ -150,7 +150,47 @@ function HashIndexSuite() {
|
|||
|
||||
idx = collection.ensureHashIndex("value");
|
||||
assertTrue(idx.selectivityEstimate <= (2 / 3000 + 0.0001));
|
||||
}
|
||||
},
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief Validate that selectivity estimate is not modified if the transaction
|
||||
/// is aborted.
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
testSelectivityAfterAbortion : function () {
|
||||
let idx = collection.ensureHashIndex("value");
|
||||
let docs = [];
|
||||
for (let i = 0; i < 1000; ++i) {
|
||||
docs.push({value: i % 100});
|
||||
}
|
||||
collection.save(docs);
|
||||
idx = collection.ensureHashIndex("value");
|
||||
|
||||
assertTrue(idx.selectivityEstimate === 100 / 1000);
|
||||
try {
|
||||
internal.db._executeTransaction({
|
||||
collections: {write: cn},
|
||||
action: function () {
|
||||
const cn = "UnitTestsCollectionHash";
|
||||
let docs = [];
|
||||
for (let i = 0; i < 1000; ++i) {
|
||||
docs.push({value: 1});
|
||||
}
|
||||
// This should significantly modify the estimate
|
||||
// if successful
|
||||
require('@arangodb').db[cn].save(docs);
|
||||
throw "banana";
|
||||
}
|
||||
});
|
||||
fail();
|
||||
} catch (e) {
|
||||
assertEqual(e.errorMessage, "banana");
|
||||
// Insert failed.
|
||||
// Validate that estimate is non modified
|
||||
idx = collection.ensureHashIndex("value");
|
||||
assertTrue(idx.selectivityEstimate === 100 / 1000);
|
||||
}
|
||||
},
|
||||
|
||||
};
|
||||
}
|
||||
|
|
|
@ -0,0 +1,121 @@
|
|||
/*jshint globalstrict:false, strict:false */
|
||||
/*global assertEqual, assertTrue, fail */
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief test the skiplist index, selectivity estimates
|
||||
///
|
||||
/// @file
|
||||
///
|
||||
/// DISCLAIMER
|
||||
///
|
||||
/// Copyright 2018 ArangoDB GmbH, Cologne, Germany
|
||||
///
|
||||
/// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
/// you may not use this file except in compliance with the License.
|
||||
/// You may obtain a copy of the License at
|
||||
///
|
||||
/// http://www.apache.org/licenses/LICENSE-2.0
|
||||
///
|
||||
/// Unless required by applicable law or agreed to in writing, software
|
||||
/// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
/// See the License for the specific language governing permissions and
|
||||
/// limitations under the License.
|
||||
///
|
||||
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
|
||||
///
|
||||
/// @author Michael Hackstein
|
||||
/// @author Copyright 2018, ArangoDB GmbH, Cologne, Germany
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
|
||||
var jsunity = require("jsunity");
|
||||
var internal = require("internal");
|
||||
|
||||
function SkiplistIndexSuite() {
|
||||
'use strict';
|
||||
var cn = "UnitTestsCollectionSkip";
|
||||
var collection = null;
|
||||
|
||||
return {
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief set up
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
setUp : function () {
|
||||
internal.db._drop(cn);
|
||||
collection = internal.db._create(cn);
|
||||
},
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief tear down
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
tearDown : function () {
|
||||
// try...catch is necessary as some tests delete the collection itself!
|
||||
try {
|
||||
collection.unload();
|
||||
collection.drop();
|
||||
}
|
||||
catch (err) {
|
||||
}
|
||||
|
||||
collection = null;
|
||||
internal.wait(0.0);
|
||||
},
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief Validate that selectivity estimate is not modified if the transaction
|
||||
/// is aborted.
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
testSelectivityAfterAbortion : function () {
|
||||
let idx = collection.ensureSkiplist("value");
|
||||
let docs = [];
|
||||
for (let i = 0; i < 1000; ++i) {
|
||||
docs.push({value: i % 100});
|
||||
}
|
||||
collection.save(docs);
|
||||
idx = collection.ensureSkiplist("value");
|
||||
let oldEstimate = idx.selectivityEstimate;
|
||||
|
||||
assertTrue(oldEstimate > 0);
|
||||
assertTrue(oldEstimate < 1);
|
||||
try {
|
||||
internal.db._executeTransaction({
|
||||
collections: {write: cn},
|
||||
action: function () {
|
||||
const cn = "UnitTestsCollectionSkip";
|
||||
let docs = [];
|
||||
for (let i = 0; i < 1000; ++i) {
|
||||
docs.push({value: 1});
|
||||
}
|
||||
// This should significantly modify the estimate
|
||||
// if successful
|
||||
require('@arangodb').db[cn].save(docs);
|
||||
throw "banana";
|
||||
}
|
||||
});
|
||||
fail();
|
||||
} catch (e) {
|
||||
assertEqual(e.errorMessage, "banana");
|
||||
// Insert failed.
|
||||
// Validate that estimate is non modified
|
||||
idx = collection.ensureSkiplist("value");
|
||||
assertEqual(idx.selectivityEstimate, oldEstimate);
|
||||
}
|
||||
},
|
||||
|
||||
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief executes the test suites
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
jsunity.run(SkiplistIndexSuite);
|
||||
|
||||
return jsunity.done();
|
|
@ -142,6 +142,7 @@ function optimizerIndexesSortTestSuite () {
|
|||
return node.type;
|
||||
});
|
||||
|
||||
require("internal").db._explain(query);
|
||||
assertNotEqual(-1, nodeTypes.indexOf("IndexNode"), query);
|
||||
if (!require("@arangodb/cluster").isCluster()) {
|
||||
assertEqual(-1, nodeTypes.indexOf("SortNode"), query);
|
||||
|
|
Loading…
Reference in New Issue