mirror of https://gitee.com/bigwinds/arangodb
do not acquire a snapshot for a single read op (#8916)
This commit is contained in:
parent
8b245d25a4
commit
0a0cb41f04
|
@ -791,21 +791,25 @@ bool RocksDBCollection::lookupRevision(transaction::Methods* trx, VPackSlice con
|
||||||
Result RocksDBCollection::read(transaction::Methods* trx,
|
Result RocksDBCollection::read(transaction::Methods* trx,
|
||||||
arangodb::velocypack::StringRef const& key,
|
arangodb::velocypack::StringRef const& key,
|
||||||
ManagedDocumentResult& result, bool /*lock*/) {
|
ManagedDocumentResult& result, bool /*lock*/) {
|
||||||
|
Result res;
|
||||||
|
do {
|
||||||
LocalDocumentId const documentId = primaryIndex()->lookupKey(trx, key);
|
LocalDocumentId const documentId = primaryIndex()->lookupKey(trx, key);
|
||||||
if (!documentId.isSet()) {
|
if (!documentId.isSet()) {
|
||||||
return TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND;
|
res.reset(TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND);
|
||||||
} // found
|
break;
|
||||||
|
} // else found
|
||||||
|
|
||||||
std::string* buffer = result.setManaged();
|
std::string* buffer = result.setManaged();
|
||||||
rocksdb::PinnableSlice ps(buffer);
|
rocksdb::PinnableSlice ps(buffer);
|
||||||
Result res = lookupDocumentVPack(trx, documentId, ps, /*readCache*/true, /*fillCache*/true);
|
res = lookupDocumentVPack(trx, documentId, ps, /*readCache*/true, /*fillCache*/true);
|
||||||
if (res.ok()) {
|
if (res.ok()) {
|
||||||
if (ps.IsPinned()) {
|
if (ps.IsPinned()) {
|
||||||
buffer->assign(ps.data(), ps.size());
|
buffer->assign(ps.data(), ps.size());
|
||||||
} // else value is already assigned
|
} // else value is already assigned
|
||||||
result.setRevisionId(); // extracts id from buffer
|
result.setRevisionId(); // extracts id from buffer
|
||||||
}
|
}
|
||||||
|
} while(res.is(TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND) &&
|
||||||
|
RocksDBTransactionState::toState(trx)->setSnapshotOnReadOnly());
|
||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1416,7 +1420,7 @@ arangodb::Result RocksDBCollection::lookupDocumentVPack(transaction::Methods* tr
|
||||||
ps.PinSelf(rocksdb::Slice(reinterpret_cast<char const*>(f.value()->value()),
|
ps.PinSelf(rocksdb::Slice(reinterpret_cast<char const*>(f.value()->value()),
|
||||||
f.value()->valueSize()));
|
f.value()->valueSize()));
|
||||||
// TODO we could potentially use the PinSlice method ?!
|
// TODO we could potentially use the PinSlice method ?!
|
||||||
return res;
|
return res; // all good
|
||||||
}
|
}
|
||||||
if (f.result().errorNumber() == TRI_ERROR_LOCK_TIMEOUT) {
|
if (f.result().errorNumber() == TRI_ERROR_LOCK_TIMEOUT) {
|
||||||
// assuming someone is currently holding a write lock, which
|
// assuming someone is currently holding a write lock, which
|
||||||
|
|
|
@ -584,8 +584,9 @@ Result RocksDBPrimaryIndex::insert(transaction::Methods& trx, RocksDBMethods* mt
|
||||||
RocksDBKeyLeaser key(&trx);
|
RocksDBKeyLeaser key(&trx);
|
||||||
key->constructPrimaryIndexValue(_objectId, arangodb::velocypack::StringRef(keySlice));
|
key->constructPrimaryIndexValue(_objectId, arangodb::velocypack::StringRef(keySlice));
|
||||||
|
|
||||||
rocksdb::PinnableSlice val;
|
transaction::StringLeaser buffer(&trx);
|
||||||
rocksdb::Status s = mthd->Get(_cf, key->string(), &val);
|
rocksdb::PinnableSlice ps(buffer.get());
|
||||||
|
rocksdb::Status s = mthd->Get(_cf, key->string(), &ps);
|
||||||
|
|
||||||
Result res;
|
Result res;
|
||||||
if (s.ok()) { // detected conflicting primary key
|
if (s.ok()) { // detected conflicting primary key
|
||||||
|
@ -599,7 +600,7 @@ Result RocksDBPrimaryIndex::insert(transaction::Methods& trx, RocksDBMethods* mt
|
||||||
|
|
||||||
return addErrorMsg(res, existingId);
|
return addErrorMsg(res, existingId);
|
||||||
}
|
}
|
||||||
val.Reset(); // clear used memory
|
ps.Reset(); // clear used memory
|
||||||
|
|
||||||
if (trx.state()->hasHint(transaction::Hints::Hint::GLOBAL_MANAGED)) {
|
if (trx.state()->hasHint(transaction::Hints::Hint::GLOBAL_MANAGED)) {
|
||||||
// blacklist new index entry to avoid caching without committing first
|
// blacklist new index entry to avoid caching without committing first
|
||||||
|
|
|
@ -118,9 +118,12 @@ Result RocksDBTransactionState::beginTransaction(transaction::Hints hints) {
|
||||||
|
|
||||||
TRI_ASSERT(_readSnapshot == nullptr);
|
TRI_ASSERT(_readSnapshot == nullptr);
|
||||||
if (isReadOnlyTransaction()) {
|
if (isReadOnlyTransaction()) {
|
||||||
|
// no need to acquire a snapshot for a single op
|
||||||
|
if (!isSingleOperation()) {
|
||||||
_readSnapshot = db->GetSnapshot(); // must call ReleaseSnapshot later
|
_readSnapshot = db->GetSnapshot(); // must call ReleaseSnapshot later
|
||||||
TRI_ASSERT(_readSnapshot != nullptr);
|
TRI_ASSERT(_readSnapshot != nullptr);
|
||||||
_rocksReadOptions.snapshot = _readSnapshot;
|
_rocksReadOptions.snapshot = _readSnapshot;
|
||||||
|
}
|
||||||
_rocksMethods.reset(new RocksDBReadOnlyMethods(this));
|
_rocksMethods.reset(new RocksDBReadOnlyMethods(this));
|
||||||
} else {
|
} else {
|
||||||
createTransaction();
|
createTransaction();
|
||||||
|
@ -133,7 +136,6 @@ Result RocksDBTransactionState::beginTransaction(transaction::Hints hints) {
|
||||||
}
|
}
|
||||||
|
|
||||||
_rocksMethods.reset(new RocksDBTrxMethods(this));
|
_rocksMethods.reset(new RocksDBTrxMethods(this));
|
||||||
|
|
||||||
if (hasHint(transaction::Hints::Hint::NO_INDEXING)) {
|
if (hasHint(transaction::Hints::Hint::NO_INDEXING)) {
|
||||||
// do not track our own writes... we can only use this in very
|
// do not track our own writes... we can only use this in very
|
||||||
// specific scenarios, i.e. when we are sure that we will have a
|
// specific scenarios, i.e. when we are sure that we will have a
|
||||||
|
@ -186,9 +188,9 @@ void RocksDBTransactionState::createTransaction() {
|
||||||
rocksdb::TransactionDB* db = rocksutils::globalRocksDB();
|
rocksdb::TransactionDB* db = rocksutils::globalRocksDB();
|
||||||
rocksdb::TransactionOptions trxOpts;
|
rocksdb::TransactionOptions trxOpts;
|
||||||
trxOpts.set_snapshot = true;
|
trxOpts.set_snapshot = true;
|
||||||
|
|
||||||
// unclear performance implications do not use for now
|
// unclear performance implications do not use for now
|
||||||
// trxOpts.deadlock_detect = !hasHint(transaction::Hints::Hint::NO_DLD);
|
// trxOpts.deadlock_detect = !hasHint(transaction::Hints::Hint::NO_DLD);
|
||||||
|
|
||||||
if (isOnlyExclusiveTransaction()) {
|
if (isOnlyExclusiveTransaction()) {
|
||||||
// we are exclusively modifying collection data here, so we can turn off
|
// we are exclusively modifying collection data here, so we can turn off
|
||||||
// all concurrency control checks to save time
|
// all concurrency control checks to save time
|
||||||
|
@ -550,11 +552,24 @@ uint64_t RocksDBTransactionState::sequenceNumber() const {
|
||||||
return static_cast<uint64_t>(_rocksTransaction->GetSnapshot()->GetSequenceNumber());
|
return static_cast<uint64_t>(_rocksTransaction->GetSnapshot()->GetSequenceNumber());
|
||||||
} else if (_readSnapshot != nullptr) {
|
} else if (_readSnapshot != nullptr) {
|
||||||
return static_cast<uint64_t>(_readSnapshot->GetSequenceNumber());
|
return static_cast<uint64_t>(_readSnapshot->GetSequenceNumber());
|
||||||
|
} else if (isReadOnlyTransaction() && isSingleOperation()) {
|
||||||
|
return rocksutils::latestSequenceNumber();
|
||||||
}
|
}
|
||||||
TRI_ASSERT(false);
|
TRI_ASSERT(false);
|
||||||
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "No snapshot set");
|
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "No snapshot set");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// @brief acquire a database snapshot
|
||||||
|
bool RocksDBTransactionState::setSnapshotOnReadOnly() {
|
||||||
|
if (_readSnapshot == nullptr && isReadOnlyTransaction()) {
|
||||||
|
TRI_ASSERT(_rocksTransaction == nullptr);
|
||||||
|
TRI_ASSERT(isSingleOperation());
|
||||||
|
_readSnapshot = rocksutils::globalRocksDB()->GetSnapshot();
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
Result RocksDBTransactionState::triggerIntermediateCommit(bool& hasPerformedIntermediateCommit) {
|
Result RocksDBTransactionState::triggerIntermediateCommit(bool& hasPerformedIntermediateCommit) {
|
||||||
TRI_ASSERT(!hasPerformedIntermediateCommit);
|
TRI_ASSERT(!hasPerformedIntermediateCommit);
|
||||||
|
|
||||||
|
|
|
@ -116,6 +116,9 @@ class RocksDBTransactionState final : public TransactionState {
|
||||||
/// has either a snapshot or a transaction
|
/// has either a snapshot or a transaction
|
||||||
rocksdb::SequenceNumber sequenceNumber() const;
|
rocksdb::SequenceNumber sequenceNumber() const;
|
||||||
|
|
||||||
|
/// @brief acquire a database snapshot
|
||||||
|
bool setSnapshotOnReadOnly();
|
||||||
|
|
||||||
static RocksDBTransactionState* toState(transaction::Methods* trx) {
|
static RocksDBTransactionState* toState(transaction::Methods* trx) {
|
||||||
TRI_ASSERT(trx != nullptr);
|
TRI_ASSERT(trx != nullptr);
|
||||||
TransactionState* state = trx->state();
|
TransactionState* state = trx->state();
|
||||||
|
|
Loading…
Reference in New Issue