mirror of https://gitee.com/bigwinds/arangodb
a few simplifications and extensions (#3115)
This commit is contained in:
parent
ca1aeb4ad4
commit
3350164ec6
|
@ -36,13 +36,12 @@
|
|||
using namespace arangodb;
|
||||
|
||||
MMFilesTransactionCollection::MMFilesTransactionCollection(TransactionState* trx, TRI_voc_cid_t cid, AccessMode::Type accessType, int nestingLevel)
|
||||
: TransactionCollection(trx, cid),
|
||||
: TransactionCollection(trx, cid, accessType),
|
||||
_operations{_arena},
|
||||
_originalRevision(0),
|
||||
_nestingLevel(nestingLevel),
|
||||
_compactionLocked(false),
|
||||
_waitForSync(false),
|
||||
_accessType(accessType),
|
||||
_lockType(AccessMode::Type::NONE) {}
|
||||
|
||||
MMFilesTransactionCollection::~MMFilesTransactionCollection() {}
|
||||
|
@ -244,11 +243,6 @@ int MMFilesTransactionCollection::use(int nestingLevel) {
|
|||
}
|
||||
}
|
||||
|
||||
if (AccessMode::AccessMode::isWriteOrExclusive(_accessType) && _originalRevision == 0) {
|
||||
// store original revision at transaction start
|
||||
_originalRevision = physical->revision();
|
||||
}
|
||||
|
||||
bool shouldLock = _transaction->hasHint(transaction::Hints::Hint::LOCK_ENTIRELY);
|
||||
|
||||
if (!shouldLock) {
|
||||
|
@ -264,6 +258,11 @@ int MMFilesTransactionCollection::use(int nestingLevel) {
|
|||
}
|
||||
}
|
||||
|
||||
if (AccessMode::AccessMode::isWriteOrExclusive(_accessType) && _originalRevision == 0) {
|
||||
// store original revision at transaction start
|
||||
_originalRevision = physical->revision();
|
||||
}
|
||||
|
||||
return TRI_ERROR_NO_ERROR;
|
||||
}
|
||||
|
||||
|
|
|
@ -87,7 +87,6 @@ class MMFilesTransactionCollection final : public TransactionCollection {
|
|||
bool _compactionLocked; // was the compaction lock grabbed for the collection?
|
||||
bool _waitForSync; // whether or not the collection has waitForSync
|
||||
|
||||
AccessMode::Type _accessType; // access type (read|write)
|
||||
AccessMode::Type _lockType; // collection lock type
|
||||
};
|
||||
|
||||
|
|
|
@ -42,20 +42,20 @@ struct RocksDBColumnFamily {
|
|||
static constexpr size_t minNumberOfColumnFamilies = 7;
|
||||
static constexpr size_t numberOfColumnFamilies = 7;
|
||||
|
||||
static rocksdb::ColumnFamilyHandle* definitions() { return _definitions; }
|
||||
static inline rocksdb::ColumnFamilyHandle* definitions() { return _definitions; }
|
||||
|
||||
static rocksdb::ColumnFamilyHandle* documents() { return _documents; }
|
||||
static inline rocksdb::ColumnFamilyHandle* documents() { return _documents; }
|
||||
|
||||
static rocksdb::ColumnFamilyHandle* primary() { return _primary; }
|
||||
static inline rocksdb::ColumnFamilyHandle* primary() { return _primary; }
|
||||
|
||||
static rocksdb::ColumnFamilyHandle* edge() { return _edge; }
|
||||
static inline rocksdb::ColumnFamilyHandle* edge() { return _edge; }
|
||||
|
||||
/// unique and non-unique vpack indexes (skiplist, permanent indexes)
|
||||
static rocksdb::ColumnFamilyHandle* vpack() { return _vpack; }
|
||||
/// unique and non unique vpack indexes (skiplist, permanent indexes)
|
||||
static inline rocksdb::ColumnFamilyHandle* vpack() { return _vpack; }
|
||||
|
||||
static rocksdb::ColumnFamilyHandle* geo() { return _geo; }
|
||||
static inline rocksdb::ColumnFamilyHandle* geo() { return _geo; }
|
||||
|
||||
static rocksdb::ColumnFamilyHandle* fulltext() { return _fulltext; }
|
||||
static inline rocksdb::ColumnFamilyHandle* fulltext() { return _fulltext; }
|
||||
|
||||
static char const* columnFamilyName(rocksdb::ColumnFamilyHandle* cf) {
|
||||
if (cf == _definitions) {
|
||||
|
@ -84,6 +84,9 @@ struct RocksDBColumnFamily {
|
|||
}
|
||||
|
||||
private:
|
||||
// static variables for all existing column families
|
||||
// note that these are initialized in RocksDBEngine.cpp
|
||||
// as there is no RocksDBColumnFamily.cpp
|
||||
static rocksdb::ColumnFamilyHandle* _definitions;
|
||||
static rocksdb::ColumnFamilyHandle* _documents;
|
||||
static rocksdb::ColumnFamilyHandle* _primary;
|
||||
|
|
|
@ -98,6 +98,7 @@ namespace arangodb {
|
|||
std::string const RocksDBEngine::EngineName("rocksdb");
|
||||
std::string const RocksDBEngine::FeatureName("RocksDBEngine");
|
||||
|
||||
// static variables for all existing column families
|
||||
rocksdb::ColumnFamilyHandle* RocksDBColumnFamily::_definitions(nullptr);
|
||||
rocksdb::ColumnFamilyHandle* RocksDBColumnFamily::_documents(nullptr);
|
||||
rocksdb::ColumnFamilyHandle* RocksDBColumnFamily::_primary(nullptr);
|
||||
|
|
|
@ -155,6 +155,14 @@ std::unique_ptr<rocksdb::Iterator> RocksDBReadOnlyMethods::NewIterator(
|
|||
|
||||
// =================== RocksDBTrxMethods ====================
|
||||
|
||||
void RocksDBTrxMethods::DisableIndexing() {
|
||||
_state->_rocksTransaction->DisableIndexing();
|
||||
}
|
||||
|
||||
void RocksDBTrxMethods::EnableIndexing() {
|
||||
_state->_rocksTransaction->EnableIndexing();
|
||||
}
|
||||
|
||||
RocksDBTrxMethods::RocksDBTrxMethods(RocksDBTransactionState* state)
|
||||
: RocksDBMethods(state) {}
|
||||
|
||||
|
@ -209,6 +217,27 @@ arangodb::Result RocksDBTrxMethods::RollbackToSavePoint() {
|
|||
_state->_rocksTransaction->RollbackToSavePoint());
|
||||
}
|
||||
|
||||
// =================== RocksDBTrxUntrackedMethods ====================
|
||||
|
||||
RocksDBTrxUntrackedMethods::RocksDBTrxUntrackedMethods(RocksDBTransactionState* state)
|
||||
: RocksDBTrxMethods(state) {}
|
||||
|
||||
arangodb::Result RocksDBTrxUntrackedMethods::Put(rocksdb::ColumnFamilyHandle* cf,
|
||||
RocksDBKey const& key,
|
||||
rocksdb::Slice const& val,
|
||||
rocksutils::StatusHint hint) {
|
||||
TRI_ASSERT(cf != nullptr);
|
||||
rocksdb::Status s = _state->_rocksTransaction->PutUntracked(cf, key.string(), val);
|
||||
return s.ok() ? arangodb::Result() : rocksutils::convertStatus(s, hint);
|
||||
}
|
||||
|
||||
arangodb::Result RocksDBTrxUntrackedMethods::Delete(rocksdb::ColumnFamilyHandle* cf,
|
||||
RocksDBKey const& key) {
|
||||
TRI_ASSERT(cf != nullptr);
|
||||
rocksdb::Status s = _state->_rocksTransaction->DeleteUntracked(cf, key.string());
|
||||
return s.ok() ? arangodb::Result() : rocksutils::convertStatus(s);
|
||||
}
|
||||
|
||||
// =================== RocksDBBatchedMethods ====================
|
||||
|
||||
RocksDBBatchedMethods::RocksDBBatchedMethods(RocksDBTransactionState* state,
|
||||
|
|
|
@ -67,6 +67,12 @@ class RocksDBMethods {
|
|||
|
||||
rocksdb::ReadOptions const& readOptions();
|
||||
|
||||
// the default implementation is to do nothing
|
||||
virtual void DisableIndexing() {}
|
||||
|
||||
// the default implementation is to do nothing
|
||||
virtual void EnableIndexing() {}
|
||||
|
||||
virtual bool Exists(rocksdb::ColumnFamilyHandle*, RocksDBKey const&) = 0;
|
||||
virtual arangodb::Result Get(rocksdb::ColumnFamilyHandle*, rocksdb::Slice const&,
|
||||
std::string*) = 0;
|
||||
|
@ -125,11 +131,15 @@ class RocksDBReadOnlyMethods final : public RocksDBMethods {
|
|||
rocksdb::TransactionDB* _db;
|
||||
};
|
||||
|
||||
/// transactio wrapper, uses the current rocksdb transaction
|
||||
class RocksDBTrxMethods final : public RocksDBMethods {
|
||||
/// transaction wrapper, uses the current rocksdb transaction
|
||||
class RocksDBTrxMethods : public RocksDBMethods {
|
||||
public:
|
||||
explicit RocksDBTrxMethods(RocksDBTransactionState* state);
|
||||
|
||||
void DisableIndexing() override;
|
||||
|
||||
void EnableIndexing() override;
|
||||
|
||||
bool Exists(rocksdb::ColumnFamilyHandle*, RocksDBKey const&) override;
|
||||
arangodb::Result Get(rocksdb::ColumnFamilyHandle*, rocksdb::Slice const& key,
|
||||
std::string* val) override;
|
||||
|
@ -148,6 +158,19 @@ class RocksDBTrxMethods final : public RocksDBMethods {
|
|||
arangodb::Result RollbackToSavePoint() override;
|
||||
};
|
||||
|
||||
/// transaction wrapper, uses the current rocksdb transaction and non-tracking methods
|
||||
class RocksDBTrxUntrackedMethods final : public RocksDBTrxMethods {
|
||||
public:
|
||||
explicit RocksDBTrxUntrackedMethods(RocksDBTransactionState* state);
|
||||
|
||||
arangodb::Result Put(
|
||||
rocksdb::ColumnFamilyHandle*, RocksDBKey const& key,
|
||||
rocksdb::Slice const& val,
|
||||
rocksutils::StatusHint hint = rocksutils::StatusHint::none) override;
|
||||
arangodb::Result Delete(rocksdb::ColumnFamilyHandle*,
|
||||
RocksDBKey const& key) override;
|
||||
};
|
||||
|
||||
/// wraps a writebatch - non transactional
|
||||
class RocksDBBatchedMethods final : public RocksDBMethods {
|
||||
public:
|
||||
|
|
|
@ -37,8 +37,7 @@ using namespace arangodb;
|
|||
RocksDBTransactionCollection::RocksDBTransactionCollection(
|
||||
TransactionState* trx, TRI_voc_cid_t cid, AccessMode::Type accessType,
|
||||
int nestingLevel)
|
||||
: TransactionCollection(trx, cid),
|
||||
_accessType(accessType),
|
||||
: TransactionCollection(trx, cid, accessType),
|
||||
_lockType(AccessMode::Type::NONE),
|
||||
_nestingLevel(nestingLevel),
|
||||
_initialNumberDocuments(0),
|
||||
|
|
|
@ -92,7 +92,6 @@ class RocksDBTransactionCollection final : public TransactionCollection {
|
|||
int doUnlock(AccessMode::Type, int nestingLevel);
|
||||
|
||||
private:
|
||||
AccessMode::Type _accessType; // access type (read|write)
|
||||
AccessMode::Type _lockType; // collection lock type, used for exclusive locks
|
||||
int _nestingLevel; // the transaction level that added this collection
|
||||
uint64_t _initialNumberDocuments;
|
||||
|
|
|
@ -150,8 +150,18 @@ Result RocksDBTransactionState::beginTransaction(transaction::Hints hints) {
|
|||
} else {
|
||||
_rocksReadOptions.snapshot = _rocksTransaction->GetSnapshot();
|
||||
}
|
||||
|
||||
// under some circumstances we can use untracking Put/Delete methods,
|
||||
// but we need to be sure this does not cause any lost updates or other
|
||||
// inconsistencies.
|
||||
// TODO: enable this optimization once these circumstances are clear
|
||||
// and fully covered by tests
|
||||
if (false && isExclusiveTransactionOnSingleCollection()) {
|
||||
_rocksMethods.reset(new RocksDBTrxUntrackedMethods(this));
|
||||
} else {
|
||||
_rocksMethods.reset(new RocksDBTrxMethods(this));
|
||||
}
|
||||
}
|
||||
|
||||
} else {
|
||||
TRI_ASSERT(_status == transaction::Status::RUNNING);
|
||||
|
@ -160,9 +170,10 @@ Result RocksDBTransactionState::beginTransaction(transaction::Hints hints) {
|
|||
return result;
|
||||
}
|
||||
|
||||
// create a rocksdb transaction. will only be called for write transactions
|
||||
void RocksDBTransactionState::createTransaction() {
|
||||
TRI_ASSERT(!_rocksTransaction);
|
||||
// TODO intermediates
|
||||
TRI_ASSERT(!isReadOnlyTransaction());
|
||||
|
||||
// start rocks transaction
|
||||
rocksdb::TransactionDB* db = rocksutils::globalRocksDB();
|
||||
|
|
|
@ -61,8 +61,8 @@ class RocksDBMethods;
|
|||
class RocksDBTransactionState final : public TransactionState {
|
||||
friend class RocksDBMethods;
|
||||
friend class RocksDBReadOnlyMethods;
|
||||
friend class RocksDBGlobalMethods;
|
||||
friend class RocksDBTrxMethods;
|
||||
friend class RocksDBTrxUntrackedMethods;
|
||||
friend class RocksDBBatchedMethods;
|
||||
|
||||
public:
|
||||
|
|
|
@ -42,8 +42,8 @@ class TransactionCollection {
|
|||
TransactionCollection(TransactionCollection const&) = delete;
|
||||
TransactionCollection& operator=(TransactionCollection const&) = delete;
|
||||
|
||||
TransactionCollection(TransactionState* trx, TRI_voc_cid_t cid)
|
||||
: _transaction(trx), _cid(cid), _collection(nullptr) {}
|
||||
TransactionCollection(TransactionState* trx, TRI_voc_cid_t cid, AccessMode::Type accessType)
|
||||
: _transaction(trx), _cid(cid), _collection(nullptr), _accessType(accessType) {}
|
||||
|
||||
virtual ~TransactionCollection() {}
|
||||
|
||||
|
@ -55,6 +55,8 @@ class TransactionCollection {
|
|||
|
||||
std::string collectionName() const;
|
||||
|
||||
AccessMode::Type accessType() const { return _accessType; }
|
||||
|
||||
/// @brief request a main-level lock for a collection
|
||||
virtual int lock() = 0;
|
||||
|
||||
|
@ -76,6 +78,7 @@ class TransactionCollection {
|
|||
virtual void freeOperations(transaction::Methods* activeTrx, bool mustRollback) = 0;
|
||||
|
||||
virtual bool canAccess(AccessMode::Type accessType) const = 0;
|
||||
|
||||
virtual int updateUsage(AccessMode::Type accessType, int nestingLevel) = 0;
|
||||
virtual int use(int nestingLevel) = 0;
|
||||
virtual void unuse(int nestingLevel) = 0;
|
||||
|
@ -85,6 +88,7 @@ class TransactionCollection {
|
|||
TransactionState* _transaction; // the transaction state
|
||||
TRI_voc_cid_t const _cid; // collection id
|
||||
LogicalCollection* _collection; // vocbase collection pointer
|
||||
AccessMode::Type _accessType; // access type (read|write)
|
||||
};
|
||||
|
||||
}
|
||||
|
|
|
@ -179,10 +179,6 @@ int TransactionState::addCollection(TRI_voc_cid_t cid,
|
|||
|
||||
TRI_ASSERT(trxCollection != nullptr);
|
||||
|
||||
// std::cout << "SingleCollectionTransaction::lockRead() database: " /*<<
|
||||
// documentCollection()->dbName()*/ << ", collection: " <<
|
||||
// trxCollection->collectionName() << "\n";
|
||||
|
||||
// insert collection at the correct position
|
||||
try {
|
||||
_collections.insert(_collections.begin() + position, trxCollection);
|
||||
|
@ -299,6 +295,10 @@ void TransactionState::setType(AccessMode::Type type) {
|
|||
_type = type;
|
||||
}
|
||||
|
||||
bool TransactionState::isExclusiveTransactionOnSingleCollection() const {
|
||||
return ((numCollections() == 1) && (_collections[0]->accessType() == AccessMode::Type::EXCLUSIVE));
|
||||
}
|
||||
|
||||
/// @brief release collection locks for a transaction
|
||||
int TransactionState::releaseCollections() {
|
||||
if (hasHint(transaction::Hints::Hint::LOCK_NEVER) ||
|
||||
|
|
|
@ -123,9 +123,12 @@ class TransactionState {
|
|||
/// @brief use all participating collections of a transaction
|
||||
Result useCollections(int nestingLevel);
|
||||
|
||||
/// @brief run a callback on all collections
|
||||
/// @brief run a callback on all collections of the transaction
|
||||
void allCollections(std::function<bool(TransactionCollection*)> const& cb);
|
||||
|
||||
/// @brief return the number of collections in the transaction
|
||||
size_t numCollections() const { return _collections.size(); }
|
||||
|
||||
/// @brief release collection locks for a transaction
|
||||
int unuseCollections(int nestingLevel);
|
||||
|
||||
|
@ -167,6 +170,9 @@ class TransactionState {
|
|||
return (_type == AccessMode::Type::READ);
|
||||
}
|
||||
|
||||
/// @brief whether or not a transaction is an exclusive transaction on a single collection
|
||||
bool isExclusiveTransactionOnSingleCollection() const;
|
||||
|
||||
/// @brief release collection locks for a transaction
|
||||
int releaseCollections();
|
||||
|
||||
|
|
|
@ -98,18 +98,3 @@ std::string SingleCollectionTransaction::name() {
|
|||
TRI_ASSERT(_trxCollection != nullptr);
|
||||
return _trxCollection->collectionName();
|
||||
}
|
||||
|
||||
/// @brief explicitly lock the underlying collection for read access
|
||||
Result SingleCollectionTransaction::lockRead() {
|
||||
return lock(trxCollection(), AccessMode::Type::READ);
|
||||
}
|
||||
|
||||
/// @brief explicitly unlock the underlying collection after read access
|
||||
Result SingleCollectionTransaction::unlockRead() {
|
||||
return unlock(trxCollection(), AccessMode::Type::READ);
|
||||
}
|
||||
|
||||
/// @brief explicitly lock the underlying collection for write access
|
||||
Result SingleCollectionTransaction::lockWrite() {
|
||||
return lock(trxCollection(), AccessMode::Type::WRITE);
|
||||
}
|
||||
|
|
|
@ -89,24 +89,6 @@ class SingleCollectionTransaction final : public transaction::Methods {
|
|||
|
||||
std::string name();
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief explicitly lock the underlying collection for read access
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
Result lockRead();
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief explicitly unlock the underlying collection after read access
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
Result unlockRead();
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief explicitly lock the underlying collection for write access
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
Result lockWrite();
|
||||
|
||||
private:
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief collection id
|
||||
|
|
|
@ -2357,11 +2357,8 @@ static int GetRevision(arangodb::LogicalCollection* collection, TRI_voc_rid_t& r
|
|||
return res.errorNumber();
|
||||
}
|
||||
|
||||
// READ-LOCK start
|
||||
trx.lockRead();
|
||||
rid = collection->revision(&trx);
|
||||
trx.finish(res);
|
||||
// READ-LOCK end
|
||||
|
||||
return TRI_ERROR_NO_ERROR;
|
||||
}
|
||||
|
|
|
@ -154,9 +154,6 @@ arangodb::Result Indexes::getAll(arangodb::LogicalCollection const* collection,
|
|||
return res;
|
||||
}
|
||||
|
||||
// READ-LOCK start
|
||||
trx.lockRead();
|
||||
|
||||
// get list of indexes
|
||||
auto indexes = collection->getIndexes();
|
||||
tmp.openArray(true);
|
||||
|
@ -165,7 +162,6 @@ arangodb::Result Indexes::getAll(arangodb::LogicalCollection const* collection,
|
|||
}
|
||||
tmp.close();
|
||||
trx.finish(res);
|
||||
// READ-LOCK end
|
||||
}
|
||||
|
||||
double selectivity = 0, memory = 0, cacheSize = 0, cacheLifeTimeHitRate = 0,
|
||||
|
|
Loading…
Reference in New Issue