mirror of https://gitee.com/bigwinds/arangodb
do not allow MMFiles single document operations overtaking each other… (#7469)
This commit is contained in:
parent
d280205476
commit
1bef7e842c
|
@ -544,6 +544,7 @@ SET(ARANGOD_SOURCES
|
|||
VocBase/Methods/UpgradeTasks.cpp
|
||||
VocBase/Methods/Version.cpp
|
||||
VocBase/KeyGenerator.cpp
|
||||
VocBase/KeyLockInfo.cpp
|
||||
VocBase/LogicalCollection.cpp
|
||||
VocBase/LogicalDataSource.cpp
|
||||
VocBase/LogicalView.cpp
|
||||
|
|
|
@ -511,7 +511,9 @@ Result ClusterCollection::insert(arangodb::transaction::Methods*,
|
|||
arangodb::velocypack::Slice const,
|
||||
arangodb::ManagedDocumentResult&,
|
||||
OperationOptions&, TRI_voc_tick_t&, bool,
|
||||
TRI_voc_tick_t&, std::function<Result(void)>) {
|
||||
TRI_voc_tick_t&,
|
||||
KeyLockInfo* /*keyLock*/,
|
||||
std::function<Result(void)>) {
|
||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_NOT_IMPLEMENTED);
|
||||
}
|
||||
|
||||
|
@ -538,7 +540,8 @@ Result ClusterCollection::remove(
|
|||
arangodb::transaction::Methods* trx, arangodb::velocypack::Slice slice,
|
||||
arangodb::ManagedDocumentResult& previous, OperationOptions& options,
|
||||
TRI_voc_tick_t& resultMarkerTick, bool, TRI_voc_rid_t& prevRev,
|
||||
TRI_voc_rid_t& revisionId, std::function<Result(void)> /*callbackDuringLock*/) {
|
||||
TRI_voc_rid_t& revisionId, KeyLockInfo* /*keyLock*/,
|
||||
std::function<Result(void)> /*callbackDuringLock*/) {
|
||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_NOT_IMPLEMENTED);
|
||||
}
|
||||
|
||||
|
|
|
@ -157,6 +157,7 @@ class ClusterCollection final : public PhysicalCollection {
|
|||
arangodb::ManagedDocumentResult& result,
|
||||
OperationOptions& options, TRI_voc_tick_t& resultMarkerTick,
|
||||
bool lock, TRI_voc_tick_t& revisionId,
|
||||
KeyLockInfo* /*keyLockInfo*/,
|
||||
std::function<Result(void)> callbackDuringLock) override;
|
||||
|
||||
Result update(arangodb::transaction::Methods* trx,
|
||||
|
@ -179,6 +180,7 @@ class ClusterCollection final : public PhysicalCollection {
|
|||
arangodb::ManagedDocumentResult& previous,
|
||||
OperationOptions& options, TRI_voc_tick_t& resultMarkerTick,
|
||||
bool lock, TRI_voc_rid_t& prevRev, TRI_voc_rid_t& revisionId,
|
||||
KeyLockInfo* /*keyLockInfo*/,
|
||||
std::function<Result(void)> callbackDuringLock) override;
|
||||
|
||||
protected:
|
||||
|
|
|
@ -31,6 +31,7 @@
|
|||
#include "Basics/ReadUnlocker.h"
|
||||
#include "Basics/Result.h"
|
||||
#include "Basics/StaticStrings.h"
|
||||
#include "Basics/StringRef.h"
|
||||
#include "Basics/VelocyPackHelper.h"
|
||||
#include "Basics/WriteLocker.h"
|
||||
#include "Basics/WriteUnlocker.h"
|
||||
|
@ -66,6 +67,7 @@
|
|||
#include "Utils/OperationOptions.h"
|
||||
#include "Utils/SingleCollectionTransaction.h"
|
||||
#include "VocBase/KeyGenerator.h"
|
||||
#include "VocBase/KeyLockInfo.h"
|
||||
#include "VocBase/LocalDocumentId.h"
|
||||
#include "VocBase/LogicalCollection.h"
|
||||
#include "VocBase/ManagedDocumentResult.h"
|
||||
|
@ -2125,7 +2127,8 @@ void MMFilesCollection::prepareIndexes(VPackSlice indexesSlice) {
|
|||
}
|
||||
}
|
||||
|
||||
{READ_LOCKER(guard, _indexesLock);
|
||||
{
|
||||
READ_LOCKER(guard, _indexesLock);
|
||||
for (auto const& idx : _indexes) {
|
||||
if (idx->type() == Index::IndexType::TRI_IDX_TYPE_PRIMARY_INDEX) {
|
||||
foundPrimary = true;
|
||||
|
@ -2890,7 +2893,9 @@ Result MMFilesCollection::insert(
|
|||
arangodb::transaction::Methods* trx, VPackSlice const slice,
|
||||
arangodb::ManagedDocumentResult& result, OperationOptions& options,
|
||||
TRI_voc_tick_t& resultMarkerTick, bool lock, TRI_voc_tick_t& revisionId,
|
||||
KeyLockInfo* keyLockInfo,
|
||||
std::function<Result(void)> callbackDuringLock) {
|
||||
|
||||
LocalDocumentId const documentId = reuseOrCreateLocalDocumentId(options);
|
||||
auto isEdgeCollection = (TRI_COL_TYPE_EDGE == _logicalCollection.type());
|
||||
transaction::BuilderLeaser builder(trx);
|
||||
|
@ -2984,6 +2989,12 @@ Result MMFilesCollection::insert(
|
|||
return Result(TRI_ERROR_INTERNAL);
|
||||
}
|
||||
|
||||
TRI_ASSERT(keyLockInfo != nullptr);
|
||||
if (keyLockInfo->shouldLock) {
|
||||
TRI_ASSERT(lock);
|
||||
lockKey(*keyLockInfo, newSlice.get(StaticStrings::KeyString));
|
||||
}
|
||||
|
||||
res = Result();
|
||||
{
|
||||
// use lock?
|
||||
|
@ -3031,6 +3042,7 @@ Result MMFilesCollection::insert(
|
|||
// store the tick that was used for writing the document
|
||||
resultMarkerTick = operation.tick();
|
||||
}
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
|
@ -3715,7 +3727,9 @@ Result MMFilesCollection::remove(
|
|||
arangodb::transaction::Methods* trx, VPackSlice slice,
|
||||
arangodb::ManagedDocumentResult& previous, OperationOptions& options,
|
||||
TRI_voc_tick_t& resultMarkerTick, bool lock, TRI_voc_rid_t& prevRev,
|
||||
TRI_voc_rid_t& revisionId, std::function<Result(void)> callbackDuringLock) {
|
||||
TRI_voc_rid_t& revisionId, KeyLockInfo* keyLockInfo,
|
||||
std::function<Result(void)> callbackDuringLock) {
|
||||
|
||||
prevRev = 0;
|
||||
LocalDocumentId const documentId = LocalDocumentId::create();
|
||||
|
||||
|
@ -3761,6 +3775,11 @@ Result MMFilesCollection::remove(
|
|||
|
||||
TRI_ASSERT(!key.isNone());
|
||||
|
||||
TRI_ASSERT(keyLockInfo != nullptr);
|
||||
if (keyLockInfo->shouldLock) {
|
||||
lockKey(*keyLockInfo, key);
|
||||
}
|
||||
|
||||
MMFilesDocumentOperation operation(
|
||||
&_logicalCollection, TRI_VOC_DOCUMENT_OPERATION_REMOVE
|
||||
);
|
||||
|
@ -4116,3 +4135,55 @@ Result MMFilesCollection::updateDocument(
|
|||
return Result(static_cast<MMFilesTransactionState*>(trx->state())
|
||||
->addOperation(newDocumentId, revisionId, operation, marker, waitForSync));
|
||||
}
|
||||
|
||||
void MMFilesCollection::lockKey(KeyLockInfo& keyLockInfo, VPackSlice const& key) {
|
||||
TRI_ASSERT(keyLockInfo.key.empty());
|
||||
|
||||
// copy out the key we need to lock
|
||||
TRI_ASSERT(key.isString());
|
||||
std::string k = key.copyString();
|
||||
|
||||
MMFilesCollection::KeyLockShard& shard = getShardForKey(k);
|
||||
|
||||
// register key unlock function
|
||||
keyLockInfo.unlocker = [this](KeyLockInfo& keyLock) {
|
||||
unlockKey(keyLock);
|
||||
};
|
||||
|
||||
do {
|
||||
{
|
||||
MUTEX_LOCKER(locker, shard._mutex);
|
||||
// if the insert fails because the key is already in the set,
|
||||
// we carry on trying until the previous value is gone from the set.
|
||||
// if the insert fails because of an out-of-memory exception,
|
||||
// we can let the exception escape from here. no harm will be done
|
||||
if (shard._keys.insert(k).second) {
|
||||
// if insertion into the lock set succeeded, we can
|
||||
// go on without the lock. otherwise we just need to carry on trying
|
||||
locker.unlock();
|
||||
|
||||
// store key to unlock later. the move assignment will not fail
|
||||
keyLockInfo.key = std::move(k);
|
||||
return;
|
||||
}
|
||||
}
|
||||
std::this_thread::yield();
|
||||
} while (!application_features::ApplicationServer::isStopping());
|
||||
|
||||
// we can only get here on shutdown
|
||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_SHUTTING_DOWN);
|
||||
}
|
||||
|
||||
void MMFilesCollection::unlockKey(KeyLockInfo& keyLockInfo) noexcept {
|
||||
TRI_ASSERT(keyLockInfo.shouldLock);
|
||||
if (!keyLockInfo.key.empty()) {
|
||||
MMFilesCollection::KeyLockShard& shard = getShardForKey(keyLockInfo.key);
|
||||
MUTEX_LOCKER(locker, shard._mutex);
|
||||
shard._keys.erase(keyLockInfo.key);
|
||||
}
|
||||
}
|
||||
|
||||
MMFilesCollection::KeyLockShard& MMFilesCollection::getShardForKey(std::string const& key) noexcept {
|
||||
size_t hash = std::hash<std::string>()(key);
|
||||
return _keyLockShards[hash % numKeyLockShards];
|
||||
}
|
||||
|
|
|
@ -322,6 +322,7 @@ class MMFilesCollection final : public PhysicalCollection {
|
|||
arangodb::ManagedDocumentResult& result,
|
||||
OperationOptions& options, TRI_voc_tick_t& resultMarkerTick,
|
||||
bool lock, TRI_voc_tick_t& revisionId,
|
||||
KeyLockInfo* keyLockInfo,
|
||||
std::function<Result(void)> callbackDuringLock) override;
|
||||
|
||||
Result update(arangodb::transaction::Methods* trx,
|
||||
|
@ -344,6 +345,7 @@ class MMFilesCollection final : public PhysicalCollection {
|
|||
arangodb::ManagedDocumentResult& previous,
|
||||
OperationOptions& options, TRI_voc_tick_t& resultMarkerTick,
|
||||
bool lock, TRI_voc_rid_t& prevRev, TRI_voc_rid_t& revisionId,
|
||||
KeyLockInfo* keyLockInfo,
|
||||
std::function<Result(void)> callbackDuringLock) override;
|
||||
|
||||
Result rollbackOperation(transaction::Methods*, TRI_voc_document_operation_e,
|
||||
|
@ -501,6 +503,12 @@ class MMFilesCollection final : public PhysicalCollection {
|
|||
|
||||
void setCurrentVersion();
|
||||
|
||||
// key locking
|
||||
struct KeyLockShard;
|
||||
void lockKey(KeyLockInfo& keyLockInfo, arangodb::velocypack::Slice const& key);
|
||||
void unlockKey(KeyLockInfo& keyLockInfo) noexcept;
|
||||
KeyLockShard& getShardForKey(std::string const& key) noexcept;
|
||||
|
||||
private:
|
||||
mutable arangodb::MMFilesDitches _ditches;
|
||||
|
||||
|
@ -545,6 +553,18 @@ class MMFilesCollection final : public PhysicalCollection {
|
|||
bool _doCompact;
|
||||
TRI_voc_tick_t _maxTick;
|
||||
|
||||
// currently locked keys
|
||||
struct KeyLockShard {
|
||||
Mutex _mutex;
|
||||
std::unordered_set<std::string> _keys;
|
||||
// TODO: add padding here so we can avoid false sharing
|
||||
};
|
||||
|
||||
static constexpr size_t numKeyLockShards = 8;
|
||||
|
||||
|
||||
std::array<KeyLockShard, numKeyLockShards> _keyLockShards;
|
||||
|
||||
// whether or not all documents are stored with a persistent LocalDocumentId
|
||||
std::atomic<bool> _hasAllPersistentLocalIds{true};
|
||||
};
|
||||
|
|
|
@ -788,6 +788,7 @@ Result RocksDBCollection::insert(
|
|||
arangodb::velocypack::Slice const slice,
|
||||
arangodb::ManagedDocumentResult& mdr, OperationOptions& options,
|
||||
TRI_voc_tick_t& resultMarkerTick, bool, TRI_voc_tick_t& revisionId,
|
||||
KeyLockInfo* /*keyLockInfo*/,
|
||||
std::function<Result(void)> callbackDuringLock) {
|
||||
// store the tick that was used for writing the document
|
||||
// note that we don't need it for this engine
|
||||
|
@ -1101,7 +1102,8 @@ Result RocksDBCollection::remove(
|
|||
arangodb::transaction::Methods* trx, arangodb::velocypack::Slice slice,
|
||||
arangodb::ManagedDocumentResult& previous, OperationOptions& options,
|
||||
TRI_voc_tick_t& resultMarkerTick, bool, TRI_voc_rid_t& prevRev,
|
||||
TRI_voc_rid_t& revisionId, std::function<Result(void)> callbackDuringLock) {
|
||||
TRI_voc_rid_t& revisionId, KeyLockInfo* /*keyLockInfo*/,
|
||||
std::function<Result(void)> callbackDuringLock) {
|
||||
// store the tick that was used for writing the document
|
||||
// note that we don't need it for this engine
|
||||
resultMarkerTick = 0;
|
||||
|
|
|
@ -151,6 +151,7 @@ class RocksDBCollection final : public PhysicalCollection {
|
|||
arangodb::ManagedDocumentResult& result,
|
||||
OperationOptions& options, TRI_voc_tick_t& resultMarkerTick,
|
||||
bool lock, TRI_voc_tick_t& revisionId,
|
||||
KeyLockInfo* /*keyLockInfo*/,
|
||||
std::function<Result(void)> callbackDuringLock) override;
|
||||
|
||||
Result update(arangodb::transaction::Methods* trx,
|
||||
|
@ -173,6 +174,7 @@ class RocksDBCollection final : public PhysicalCollection {
|
|||
arangodb::ManagedDocumentResult& previous,
|
||||
OperationOptions& options, TRI_voc_tick_t& resultMarkerTick,
|
||||
bool lock, TRI_voc_rid_t& prevRev, TRI_voc_rid_t& revisionId,
|
||||
KeyLockInfo* /*keyLockInfo*/,
|
||||
std::function<Result(void)> callbackDuringLock) override;
|
||||
|
||||
/// adjust the current number of docs
|
||||
|
|
|
@ -113,7 +113,7 @@ Result removeKeysOutsideRange(VPackSlice chunkSlice,
|
|||
builder.add(velocypack::ValuePair(docKey.data(), docKey.size(),
|
||||
velocypack::ValueType::String));
|
||||
Result r = physical->remove(&trx, builder.slice(), mdr, options, tick,
|
||||
false, prevRev, revisionId, nullptr);
|
||||
false, prevRev, revisionId, nullptr, nullptr);
|
||||
if (r.fail() && r.isNot(TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND)) {
|
||||
// ignore not found, we remove conflicting docs ahead of time
|
||||
THROW_ARANGO_EXCEPTION(r);
|
||||
|
@ -148,7 +148,7 @@ Result removeKeysOutsideRange(VPackSlice chunkSlice,
|
|||
builder.add(velocypack::ValuePair(docKey.data(), docKey.size(),
|
||||
velocypack::ValueType::String));
|
||||
Result r = physical->remove(&trx, builder.slice(), mdr, options, tick,
|
||||
false, prevRev, revisionId, nullptr);
|
||||
false, prevRev, revisionId, nullptr, nullptr);
|
||||
if (r.fail() && r.isNot(TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND)) {
|
||||
// ignore not found, we remove conflicting docs ahead of time
|
||||
THROW_ARANGO_EXCEPTION(r);
|
||||
|
@ -303,7 +303,7 @@ Result syncChunkRocksDB(
|
|||
|
||||
Result r =
|
||||
physical->remove(trx, keyBuilder->slice(), mdr, options, resultTick,
|
||||
false, prevRev, revisionId, nullptr);
|
||||
false, prevRev, revisionId, nullptr, nullptr);
|
||||
if (r.fail() && r.isNot(TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND)) {
|
||||
// ignore not found, we remove conflicting docs ahead of time
|
||||
return r;
|
||||
|
@ -357,7 +357,7 @@ Result syncChunkRocksDB(
|
|||
|
||||
Result r =
|
||||
physical->remove(trx, keyBuilder->slice(), mdr, options, resultTick,
|
||||
false, prevRev, revisionId, nullptr);
|
||||
false, prevRev, revisionId, nullptr, nullptr);
|
||||
if (r.fail() && r.isNot(TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND)) {
|
||||
// ignore not found, we remove conflicting docs ahead of time
|
||||
return r;
|
||||
|
@ -483,7 +483,7 @@ Result syncChunkRocksDB(
|
|||
|
||||
Result res =
|
||||
physical->remove(trx, keyBuilder->slice(), mdr, options, resultTick,
|
||||
false, prevRev, revisionId, nullptr);
|
||||
false, prevRev, revisionId, nullptr, nullptr);
|
||||
if (res.ok()) {
|
||||
++stats.numDocsRemoved;
|
||||
}
|
||||
|
@ -496,7 +496,7 @@ Result syncChunkRocksDB(
|
|||
TRI_ASSERT(options.indexOperationMode == Index::OperationMode::internal);
|
||||
|
||||
Result res = physical->insert(trx, it, mdr, options, resultTick, false,
|
||||
revisionId, nullptr);
|
||||
revisionId, nullptr, nullptr);
|
||||
if (res.fail()) {
|
||||
if (res.is(TRI_ERROR_ARANGO_UNIQUE_CONSTRAINT_VIOLATED) &&
|
||||
res.errorMessage() > keySlice.copyString()) {
|
||||
|
@ -508,7 +508,7 @@ Result syncChunkRocksDB(
|
|||
}
|
||||
|
||||
res = physical->insert(trx, it, mdr, options, resultTick, false,
|
||||
revisionId, nullptr);
|
||||
revisionId, nullptr, nullptr);
|
||||
if (res.fail()) {
|
||||
return res;
|
||||
}
|
||||
|
@ -745,7 +745,7 @@ Result handleSyncKeysRocksDB(DatabaseInitialSyncer& syncer,
|
|||
TRI_voc_rid_t prevRev, revisionId;
|
||||
Result r = physical->remove(&trx, tempBuilder.slice(), previous,
|
||||
options, resultMarkerTick, false,
|
||||
prevRev, revisionId, nullptr);
|
||||
prevRev, revisionId, nullptr, nullptr);
|
||||
if (r.fail() && r.isNot(TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND)) {
|
||||
// ignore not found, we remove conflicting docs ahead of time
|
||||
THROW_ARANGO_EXCEPTION(r);
|
||||
|
|
|
@ -154,7 +154,7 @@ Result RocksDBSettingsManager::sync(bool force) {
|
|||
auto minSeqNr = maxSeqNr;
|
||||
|
||||
rocksdb::TransactionOptions opts;
|
||||
opts.lock_timeout = 0.05; // do not wait for locking keys
|
||||
opts.lock_timeout = 50; // do not wait for locking keys
|
||||
|
||||
rocksdb::WriteOptions wo;
|
||||
rocksdb::WriteBatch batch;
|
||||
|
|
|
@ -35,11 +35,10 @@
|
|||
namespace arangodb {
|
||||
|
||||
namespace transaction {
|
||||
|
||||
class Methods;
|
||||
|
||||
}
|
||||
|
||||
struct KeyLockInfo;
|
||||
class LocalDocumentId;
|
||||
class Index;
|
||||
class IndexIterator;
|
||||
|
@ -184,6 +183,7 @@ class PhysicalCollection {
|
|||
OperationOptions& options,
|
||||
TRI_voc_tick_t& resultMarkerTick, bool lock,
|
||||
TRI_voc_tick_t& revisionId,
|
||||
KeyLockInfo* keyLockInfo,
|
||||
std::function<Result(void)> callbackDuringLock) = 0;
|
||||
|
||||
Result insert(arangodb::transaction::Methods* trx,
|
||||
|
@ -193,7 +193,7 @@ class PhysicalCollection {
|
|||
bool lock) {
|
||||
TRI_voc_rid_t unused;
|
||||
return insert(trx, newSlice, result, options, resultMarkerTick, lock,
|
||||
unused, nullptr);
|
||||
unused, nullptr, nullptr);
|
||||
}
|
||||
|
||||
virtual Result update(arangodb::transaction::Methods* trx,
|
||||
|
@ -220,6 +220,7 @@ class PhysicalCollection {
|
|||
OperationOptions& options,
|
||||
TRI_voc_tick_t& resultMarkerTick, bool lock,
|
||||
TRI_voc_rid_t& prevRev, TRI_voc_rid_t& revisionId,
|
||||
KeyLockInfo* keyLockInfo,
|
||||
std::function<Result(void)> callbackDuringLock) = 0;
|
||||
|
||||
protected:
|
||||
|
|
|
@ -57,6 +57,7 @@
|
|||
#include "Utils/Events.h"
|
||||
#include "Utils/OperationCursor.h"
|
||||
#include "Utils/OperationOptions.h"
|
||||
#include "VocBase/KeyLockInfo.h"
|
||||
#include "VocBase/LogicalCollection.h"
|
||||
#include "VocBase/ManagedDocumentResult.h"
|
||||
#include "VocBase/Methods/Indexes.h"
|
||||
|
@ -1629,12 +1630,13 @@ OperationResult transaction::Methods::insertLocal(
|
|||
// options.overwrite => !needsLock
|
||||
TRI_ASSERT(!options.overwrite || !needsLock);
|
||||
|
||||
bool const isMMFiles = EngineSelectorFeature::isMMFiles();
|
||||
|
||||
// Assert my assumption that we don't have a lock only with mmfiles single
|
||||
// document operations.
|
||||
|
||||
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
|
||||
{
|
||||
bool const isMMFiles = EngineSelectorFeature::isMMFiles();
|
||||
bool const isMock = EngineSelectorFeature::ENGINE->typeName() == "Mock";
|
||||
if (!isMock) {
|
||||
// needsLock => isMMFiles
|
||||
|
@ -1686,6 +1688,11 @@ OperationResult transaction::Methods::insertLocal(
|
|||
followers = collection->followers()->get();
|
||||
}
|
||||
|
||||
// we may need to lock individual keys here so we can ensure that even with concurrent
|
||||
// operations on the same keys we have the same order of data application on leader
|
||||
// and followers
|
||||
KeyLockInfo keyLockInfo;
|
||||
|
||||
ReplicationType replicationType = ReplicationType::NONE;
|
||||
if (_state->isDBServer()) {
|
||||
// Block operation early if we are not supposed to perform it:
|
||||
|
@ -1697,13 +1704,16 @@ OperationResult transaction::Methods::insertLocal(
|
|||
}
|
||||
|
||||
replicationType = ReplicationType::LEADER;
|
||||
if (isMMFiles && needsLock) {
|
||||
keyLockInfo.shouldLock = true;
|
||||
}
|
||||
// We cannot be silent if we may have to replicate later.
|
||||
// If we need to get the followers under the single document operation's
|
||||
// lock, we don't know yet if we will have followers later and thus cannot
|
||||
// be silent.
|
||||
// Otherwise, if we already know the followers to replicate to, we can
|
||||
// just check if they're empty.
|
||||
if (needsToGetFollowersUnderLock || !followers->empty()) {
|
||||
if (needsToGetFollowersUnderLock || keyLockInfo.shouldLock || !followers->empty()) {
|
||||
options.silent = false;
|
||||
}
|
||||
} else { // we are a follower following theLeader
|
||||
|
@ -1741,7 +1751,7 @@ OperationResult transaction::Methods::insertLocal(
|
|||
TRI_ASSERT(needsLock == !isLocked(collection, AccessMode::Type::WRITE));
|
||||
Result res = collection->insert(this, value, documentResult, options,
|
||||
resultMarkerTick, needsLock, revisionId,
|
||||
updateFollowers);
|
||||
&keyLockInfo, updateFollowers);
|
||||
|
||||
TRI_voc_rid_t previousRevisionId = 0;
|
||||
ManagedDocumentResult previousDocumentResult; // return OLD
|
||||
|
@ -2254,14 +2264,13 @@ OperationResult transaction::Methods::removeLocal(
|
|||
LogicalCollection* collection = documentCollection(trxCollection(cid));
|
||||
|
||||
bool const needsLock = !isLocked(collection, AccessMode::Type::WRITE);
|
||||
bool const isMMFiles = EngineSelectorFeature::isMMFiles();
|
||||
|
||||
// Assert my assumption that we don't have a lock only with mmfiles single
|
||||
// document operations.
|
||||
|
||||
|
||||
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
|
||||
{
|
||||
bool const isMMFiles = EngineSelectorFeature::isMMFiles();
|
||||
bool const isMock = EngineSelectorFeature::ENGINE->typeName() == "Mock";
|
||||
if (!isMock) {
|
||||
// needsLock => isMMFiles
|
||||
|
@ -2306,6 +2315,11 @@ OperationResult transaction::Methods::removeLocal(
|
|||
followers = collection->followers()->get();
|
||||
}
|
||||
|
||||
// we may need to lock individual keys here so we can ensure that even with concurrent
|
||||
// operations on the same keys we have the same order of data application on leader
|
||||
// and followers
|
||||
KeyLockInfo keyLockInfo;
|
||||
|
||||
ReplicationType replicationType = ReplicationType::NONE;
|
||||
if (_state->isDBServer()) {
|
||||
// Block operation early if we are not supposed to perform it:
|
||||
|
@ -2317,6 +2331,9 @@ OperationResult transaction::Methods::removeLocal(
|
|||
}
|
||||
|
||||
replicationType = ReplicationType::LEADER;
|
||||
if (isMMFiles && needsLock) {
|
||||
keyLockInfo.shouldLock = true;
|
||||
}
|
||||
// We cannot be silent if we may have to replicate later.
|
||||
// If we need to get the followers under the single document operation's
|
||||
// lock, we don't know yet if we will have followers later and thus cannot
|
||||
|
@ -2375,7 +2392,7 @@ OperationResult transaction::Methods::removeLocal(
|
|||
|
||||
Result res =
|
||||
collection->remove(this, value, options, resultMarkerTick, needsLock,
|
||||
actualRevision, previous, updateFollowers);
|
||||
actualRevision, previous, &keyLockInfo, updateFollowers);
|
||||
|
||||
if (resultMarkerTick > 0 && resultMarkerTick > maxTick) {
|
||||
maxTick = resultMarkerTick;
|
||||
|
|
|
@ -0,0 +1,33 @@
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// DISCLAIMER
|
||||
///
|
||||
/// Copyright 2014-2017 ArangoDB GmbH, Cologne, Germany
|
||||
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
|
||||
///
|
||||
/// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
/// you may not use this file except in compliance with the License.
|
||||
/// You may obtain a copy of the License at
|
||||
///
|
||||
/// http://www.apache.org/licenses/LICENSE-2.0
|
||||
///
|
||||
/// Unless required by applicable law or agreed to in writing, software
|
||||
/// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
/// See the License for the specific language governing permissions and
|
||||
/// limitations under the License.
|
||||
///
|
||||
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
|
||||
///
|
||||
/// @author Jan Steemann
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
#include "KeyLockInfo.h"
|
||||
|
||||
using namespace arangodb;
|
||||
|
||||
KeyLockInfo::~KeyLockInfo() {
|
||||
if (unlocker) {
|
||||
TRI_ASSERT(shouldLock);
|
||||
unlocker(*this);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,46 @@
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// DISCLAIMER
|
||||
///
|
||||
/// Copyright 2014-2017 ArangoDB GmbH, Cologne, Germany
|
||||
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
|
||||
///
|
||||
/// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
/// you may not use this file except in compliance with the License.
|
||||
/// You may obtain a copy of the License at
|
||||
///
|
||||
/// http://www.apache.org/licenses/LICENSE-2.0
|
||||
///
|
||||
/// Unless required by applicable law or agreed to in writing, software
|
||||
/// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
/// See the License for the specific language governing permissions and
|
||||
/// limitations under the License.
|
||||
///
|
||||
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
|
||||
///
|
||||
/// @author Jan Steemann
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
#ifndef ARANGOD_VOC_BASE_KEY_LOCK_INFO_H
|
||||
#define ARANGOD_VOC_BASE_KEY_LOCK_INFO_H 1
|
||||
|
||||
#include "Basics/Common.h"
|
||||
|
||||
#include <functional>
|
||||
|
||||
namespace arangodb {
|
||||
|
||||
struct KeyLockInfo {
|
||||
KeyLockInfo(KeyLockInfo const&) = delete;
|
||||
KeyLockInfo& operator=(KeyLockInfo const&) = delete;
|
||||
|
||||
KeyLockInfo() : shouldLock(false) {}
|
||||
~KeyLockInfo();
|
||||
|
||||
std::string key;
|
||||
std::function<void(KeyLockInfo&)> unlocker;
|
||||
bool shouldLock;
|
||||
};
|
||||
|
||||
}
|
||||
#endif
|
|
@ -926,13 +926,14 @@ Result LogicalCollection::insert(
|
|||
transaction::Methods* trx, VPackSlice const slice,
|
||||
ManagedDocumentResult& result, OperationOptions& options,
|
||||
TRI_voc_tick_t& resultMarkerTick, bool lock, TRI_voc_tick_t& revisionId,
|
||||
KeyLockInfo* keyLockInfo,
|
||||
std::function<Result(void)> callbackDuringLock) {
|
||||
TRI_IF_FAILURE("LogicalCollection::insert") {
|
||||
return Result(TRI_ERROR_DEBUG);
|
||||
}
|
||||
resultMarkerTick = 0;
|
||||
return getPhysical()->insert(trx, slice, result, options, resultMarkerTick,
|
||||
lock, revisionId, std::move(callbackDuringLock));
|
||||
lock, revisionId, keyLockInfo, std::move(callbackDuringLock));
|
||||
}
|
||||
|
||||
/// @brief updates a document or edge in a collection
|
||||
|
@ -988,6 +989,7 @@ Result LogicalCollection::remove(
|
|||
transaction::Methods* trx, VPackSlice const slice,
|
||||
OperationOptions& options, TRI_voc_tick_t& resultMarkerTick, bool lock,
|
||||
TRI_voc_rid_t& prevRev, ManagedDocumentResult& previous,
|
||||
KeyLockInfo* keyLockInfo,
|
||||
std::function<Result(void)> callbackDuringLock) {
|
||||
TRI_IF_FAILURE("LogicalCollection::remove") {
|
||||
return Result(TRI_ERROR_DEBUG);
|
||||
|
@ -995,7 +997,7 @@ Result LogicalCollection::remove(
|
|||
resultMarkerTick = 0;
|
||||
TRI_voc_rid_t revisionId = 0;
|
||||
return getPhysical()->remove(trx, slice, previous, options, resultMarkerTick,
|
||||
lock, prevRev, revisionId,
|
||||
lock, prevRev, revisionId, keyLockInfo,
|
||||
std::move(callbackDuringLock));
|
||||
}
|
||||
|
||||
|
|
|
@ -41,11 +41,12 @@ typedef std::string ServerID; // ID of a server
|
|||
typedef std::string ShardID; // ID of a shard
|
||||
typedef std::unordered_map<ShardID, std::vector<ServerID>> ShardMap;
|
||||
|
||||
class LocalDocumentId;
|
||||
class FollowerInfo;
|
||||
class Index;
|
||||
class IndexIterator;
|
||||
class KeyGenerator;
|
||||
struct KeyLockInfo;
|
||||
class LocalDocumentId;
|
||||
class ManagedDocumentResult;
|
||||
struct OperationOptions;
|
||||
class PhysicalCollection;
|
||||
|
@ -283,7 +284,7 @@ class LogicalCollection : public LogicalDataSource {
|
|||
TRI_voc_tick_t& resultMarkerTick, bool lock) {
|
||||
TRI_voc_tick_t unused;
|
||||
return insert(trx, slice, result, options, resultMarkerTick, lock, unused,
|
||||
nullptr);
|
||||
nullptr, nullptr);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -295,6 +296,7 @@ class LogicalCollection : public LogicalDataSource {
|
|||
ManagedDocumentResult& result, OperationOptions& options,
|
||||
TRI_voc_tick_t& resultMarkerTick, bool lock,
|
||||
TRI_voc_tick_t& revisionId,
|
||||
KeyLockInfo* keyLockInfo,
|
||||
std::function<Result(void)> callbackDuringLock);
|
||||
|
||||
Result update(transaction::Methods*, velocypack::Slice,
|
||||
|
@ -312,6 +314,7 @@ class LogicalCollection : public LogicalDataSource {
|
|||
Result remove(transaction::Methods*, velocypack::Slice,
|
||||
OperationOptions&, TRI_voc_tick_t&, bool lock,
|
||||
TRI_voc_rid_t& prevRev, ManagedDocumentResult& previous,
|
||||
KeyLockInfo* keyLockInfo,
|
||||
std::function<Result(void)> callbackDuringLock);
|
||||
|
||||
bool readDocument(transaction::Methods* trx,
|
||||
|
|
|
@ -651,6 +651,7 @@ arangodb::Result PhysicalCollectionMock::insert(
|
|||
arangodb::ManagedDocumentResult& result,
|
||||
arangodb::OperationOptions& options, TRI_voc_tick_t& resultMarkerTick,
|
||||
bool lock, TRI_voc_tick_t& revisionId,
|
||||
arangodb::KeyLockInfo* /*keyLockInfo*/,
|
||||
std::function<arangodb::Result(void)> callbackDuringLock) {
|
||||
TRI_ASSERT(callbackDuringLock == nullptr); // not implemented
|
||||
before();
|
||||
|
@ -856,6 +857,7 @@ arangodb::Result PhysicalCollectionMock::remove(
|
|||
arangodb::ManagedDocumentResult& previous,
|
||||
arangodb::OperationOptions& options, TRI_voc_tick_t& resultMarkerTick,
|
||||
bool lock, TRI_voc_rid_t& prevRev, TRI_voc_rid_t& revisionId,
|
||||
arangodb::KeyLockInfo* /*keyLockInfo*/,
|
||||
std::function<arangodb::Result(void)> callbackDuringLock) {
|
||||
TRI_ASSERT(callbackDuringLock == nullptr); // not implemented
|
||||
before();
|
||||
|
@ -949,7 +951,7 @@ arangodb::Result PhysicalCollectionMock::update(
|
|||
|
||||
TRI_voc_rid_t unused;
|
||||
return insert(trx, newSlice, result, options, resultMarkerTick, lock,
|
||||
unused, nullptr);
|
||||
unused, nullptr, nullptr);
|
||||
}
|
||||
|
||||
arangodb::velocypack::Builder builder;
|
||||
|
@ -975,7 +977,7 @@ arangodb::Result PhysicalCollectionMock::update(
|
|||
|
||||
TRI_voc_rid_t unused;
|
||||
return insert(trx, builder.slice(), result, options, resultMarkerTick,
|
||||
lock, unused, nullptr);
|
||||
lock, unused, nullptr, nullptr);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -36,6 +36,7 @@
|
|||
|
||||
namespace arangodb {
|
||||
|
||||
class KeyLockInfo;
|
||||
class TransactionManager;
|
||||
class WalAccess;
|
||||
|
||||
|
@ -72,6 +73,7 @@ class PhysicalCollectionMock: public arangodb::PhysicalCollection {
|
|||
arangodb::ManagedDocumentResult& result,
|
||||
arangodb::OperationOptions& options, TRI_voc_tick_t& resultMarkerTick,
|
||||
bool lock, TRI_voc_tick_t& revisionId,
|
||||
arangodb::KeyLockInfo* /*keyLockInfo*/,
|
||||
std::function<arangodb::Result(void)> callbackDuringLock) override;
|
||||
virtual void invokeOnAllElements(arangodb::transaction::Methods* trx, std::function<bool(arangodb::LocalDocumentId const&)> callback) override;
|
||||
virtual std::shared_ptr<arangodb::Index> lookupIndex(arangodb::velocypack::Slice const&) const override;
|
||||
|
@ -93,6 +95,7 @@ class PhysicalCollectionMock: public arangodb::PhysicalCollection {
|
|||
arangodb::ManagedDocumentResult& previous,
|
||||
arangodb::OperationOptions& options, TRI_voc_tick_t& resultMarkerTick,
|
||||
bool lock, TRI_voc_rid_t& prevRev, TRI_voc_rid_t& revisionId,
|
||||
arangodb::KeyLockInfo* /*keyLockInfo*/,
|
||||
std::function<arangodb::Result(void)> callbackDuringLock) override;
|
||||
virtual arangodb::Result replace(
|
||||
arangodb::transaction::Methods* trx,
|
||||
|
|
Loading…
Reference in New Issue