mirror of https://gitee.com/bigwinds/arangodb
moved _compactionLock out of TRI_collection_t
This commit is contained in:
parent
8025ec0266
commit
900c28aec9
|
@ -769,4 +769,34 @@ int MMFilesCollection::applyForTickRange(TRI_voc_tick_t dataMin, TRI_voc_tick_t
|
|||
|
||||
return false; // hasMore = false
|
||||
}
|
||||
|
||||
/// @brief disallow compaction of the collection
|
||||
void MMFilesCollection::preventCompaction() {
|
||||
_compactionLock.readLock();
|
||||
}
|
||||
|
||||
/// @brief try disallowing compaction of the collection
|
||||
bool MMFilesCollection::tryPreventCompaction() {
|
||||
return _compactionLock.tryReadLock();
|
||||
}
|
||||
|
||||
/// @brief re-allow compaction of the collection
|
||||
void MMFilesCollection::allowCompaction() {
|
||||
_compactionLock.unlock();
|
||||
}
|
||||
|
||||
/// @brief exclusively lock the collection for compaction
|
||||
void MMFilesCollection::lockForCompaction() {
|
||||
_compactionLock.writeLock();
|
||||
}
|
||||
|
||||
/// @brief try to exclusively lock the collection for compaction
|
||||
bool MMFilesCollection::tryLockForCompaction() {
|
||||
return _compactionLock.tryWriteLock();
|
||||
}
|
||||
|
||||
/// @brief signal that compaction is finished
|
||||
void MMFilesCollection::finishCompaction() {
|
||||
_compactionLock.unlock();
|
||||
}
|
||||
|
||||
|
|
|
@ -94,6 +94,13 @@ class MMFilesCollection final : public PhysicalCollection {
|
|||
/// @brief iterates over a collection
|
||||
bool iterateDatafiles(std::function<bool(TRI_df_marker_t const*, TRI_datafile_t*)> const& cb) override;
|
||||
|
||||
void preventCompaction() override;
|
||||
bool tryPreventCompaction() override;
|
||||
void allowCompaction() override;
|
||||
void lockForCompaction() override;
|
||||
bool tryLockForCompaction() override;
|
||||
void finishCompaction() override;
|
||||
|
||||
private:
|
||||
/// @brief creates a datafile
|
||||
TRI_datafile_t* createDatafile(TRI_voc_fid_t fid,
|
||||
|
@ -115,6 +122,8 @@ class MMFilesCollection final : public PhysicalCollection {
|
|||
std::vector<TRI_datafile_t*> _datafiles; // all datafiles
|
||||
std::vector<TRI_datafile_t*> _journals; // all journals
|
||||
std::vector<TRI_datafile_t*> _compactors; // all compactor files
|
||||
|
||||
arangodb::basics::ReadWriteLock _compactionLock;
|
||||
};
|
||||
|
||||
}
|
||||
|
|
|
@ -36,9 +36,10 @@
|
|||
#include "StorageEngine/StorageEngine.h"
|
||||
#include "Utils/SingleCollectionTransaction.h"
|
||||
#include "Utils/StandaloneTransactionContext.h"
|
||||
#include "VocBase/CompactionLocker.h"
|
||||
#include "VocBase/DatafileHelper.h"
|
||||
#include "VocBase/collection.h"
|
||||
#include "VocBase/LogicalCollection.h"
|
||||
#include "VocBase/collection.h"
|
||||
#include "VocBase/ticks.h"
|
||||
#include "VocBase/vocbase.h"
|
||||
|
||||
|
@ -875,9 +876,9 @@ void MMFilesCompactorThread::run() {
|
|||
// check whether someone else holds a read-lock on the compaction
|
||||
// lock
|
||||
|
||||
TRY_WRITE_LOCKER(locker, document->_compactionLock);
|
||||
TryCompactionLocker compactionLocker(collection);
|
||||
|
||||
if (!locker.isLocked()) {
|
||||
if (!compactionLocker.isLocked()) {
|
||||
// someone else is holding the compactor lock, we'll not compact
|
||||
continue;
|
||||
}
|
||||
|
|
|
@ -0,0 +1,102 @@
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// DISCLAIMER
|
||||
///
|
||||
/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany
|
||||
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
|
||||
///
|
||||
/// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
/// you may not use this file except in compliance with the License.
|
||||
/// You may obtain a copy of the License at
|
||||
///
|
||||
/// http://www.apache.org/licenses/LICENSE-2.0
|
||||
///
|
||||
/// Unless required by applicable law or agreed to in writing, software
|
||||
/// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
/// See the License for the specific language governing permissions and
|
||||
/// limitations under the License.
|
||||
///
|
||||
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
|
||||
///
|
||||
/// @author Jan Steemann
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
#ifndef ARANGOD_VOCBASE_COMPACTION_LOCKER_H
|
||||
#define ARANGOD_VOCBASE_COMPACTION_LOCKER_H 1
|
||||
|
||||
#include "Basics/Common.h"
|
||||
#include "VocBase/LogicalCollection.h"
|
||||
|
||||
namespace arangodb {
|
||||
|
||||
class CompactionPreventer {
|
||||
public:
|
||||
explicit CompactionPreventer(LogicalCollection* collection)
|
||||
: _collection(collection) {
|
||||
_collection->preventCompaction();
|
||||
}
|
||||
|
||||
~CompactionPreventer() { _collection->allowCompaction(); }
|
||||
|
||||
private:
|
||||
LogicalCollection* _collection;
|
||||
};
|
||||
|
||||
class TryCompactionPreventer {
|
||||
public:
|
||||
explicit TryCompactionPreventer(LogicalCollection* collection)
|
||||
: _collection(collection), _isLocked(false) {
|
||||
_isLocked = _collection->tryPreventCompaction();
|
||||
}
|
||||
|
||||
~TryCompactionPreventer() {
|
||||
if (_isLocked) {
|
||||
_collection->allowCompaction();
|
||||
}
|
||||
}
|
||||
|
||||
bool isLocked() const { return _isLocked; }
|
||||
|
||||
private:
|
||||
LogicalCollection* _collection;
|
||||
bool _isLocked;
|
||||
};
|
||||
|
||||
class CompactionLocker {
|
||||
public:
|
||||
explicit CompactionLocker(LogicalCollection* collection)
|
||||
: _collection(collection) {
|
||||
_collection->lockForCompaction();
|
||||
}
|
||||
|
||||
~CompactionLocker() {
|
||||
_collection->finishCompaction();
|
||||
}
|
||||
|
||||
private:
|
||||
LogicalCollection* _collection;
|
||||
};
|
||||
|
||||
class TryCompactionLocker {
|
||||
public:
|
||||
explicit TryCompactionLocker(LogicalCollection* collection)
|
||||
: _collection(collection), _isLocked(false) {
|
||||
_isLocked = _collection->tryLockForCompaction();
|
||||
}
|
||||
|
||||
~TryCompactionLocker() {
|
||||
if (_isLocked) {
|
||||
_collection->finishCompaction();
|
||||
}
|
||||
}
|
||||
|
||||
bool isLocked() const { return _isLocked; }
|
||||
|
||||
private:
|
||||
LogicalCollection* _collection;
|
||||
bool _isLocked;
|
||||
};
|
||||
|
||||
} // namespace arangodb
|
||||
|
||||
#endif
|
|
@ -166,6 +166,17 @@ class LogicalCollection {
|
|||
return getPhysical()->applyForTickRange(dataMin, dataMax, callback);
|
||||
}
|
||||
|
||||
/// @brief disallow starting the compaction of the collection
|
||||
void preventCompaction() { getPhysical()->preventCompaction(); }
|
||||
bool tryPreventCompaction() { return getPhysical()->tryPreventCompaction(); }
|
||||
/// @brief re-allow starting the compaction of the collection
|
||||
void allowCompaction() { getPhysical()->allowCompaction(); }
|
||||
|
||||
/// @brief compaction finished
|
||||
void lockForCompaction() { getPhysical()->lockForCompaction(); }
|
||||
bool tryLockForCompaction() { return getPhysical()->tryLockForCompaction(); }
|
||||
void finishCompaction() { getPhysical()->finishCompaction(); }
|
||||
|
||||
|
||||
PhysicalCollection* getPhysical() const {
|
||||
TRI_ASSERT(_physical != nullptr);
|
||||
|
@ -245,6 +256,7 @@ class LogicalCollection {
|
|||
mutable arangodb::basics::ReadWriteLock _lock; // lock protecting the status and name
|
||||
|
||||
};
|
||||
|
||||
} // namespace arangodb
|
||||
|
||||
#endif
|
||||
|
|
|
@ -62,6 +62,27 @@ class PhysicalCollection {
|
|||
/// @brief iterates over a collection
|
||||
virtual bool iterateDatafiles(std::function<bool(TRI_df_marker_t const*, TRI_datafile_t*)> const& cb) = 0;
|
||||
|
||||
/// @brief disallow compaction of the collection
|
||||
/// after this call it is guaranteed that no compaction will be started until allowCompaction() is called
|
||||
virtual void preventCompaction() = 0;
|
||||
|
||||
/// @brief try disallowing compaction of the collection
|
||||
/// returns true if compaction is disallowed, and false if not
|
||||
virtual bool tryPreventCompaction() = 0;
|
||||
|
||||
/// @brief re-allow compaction of the collection
|
||||
virtual void allowCompaction() = 0;
|
||||
|
||||
/// @brief exclusively lock the collection for compaction
|
||||
virtual void lockForCompaction() = 0;
|
||||
|
||||
/// @brief try to exclusively lock the collection for compaction
|
||||
/// after this call it is guaranteed that no compaction will be started until allowCompaction() is called
|
||||
virtual bool tryLockForCompaction() = 0;
|
||||
|
||||
/// @brief signal that compaction is finished
|
||||
virtual void finishCompaction() = 0;
|
||||
|
||||
protected:
|
||||
LogicalCollection* _logicalCollection;
|
||||
};
|
||||
|
|
|
@ -475,7 +475,6 @@ struct TRI_collection_t {
|
|||
|
||||
std::atomic<int64_t> _uncollectedLogfileEntries;
|
||||
int64_t _numberDocuments;
|
||||
arangodb::basics::ReadWriteLock _compactionLock;
|
||||
|
||||
private:
|
||||
mutable arangodb::Ditches _ditches;
|
||||
|
|
|
@ -25,11 +25,12 @@
|
|||
#include "Basics/ReadLocker.h"
|
||||
#include "Basics/VPackStringBufferAdapter.h"
|
||||
#include "Logger/Logger.h"
|
||||
#include "VocBase/collection.h"
|
||||
#include "VocBase/CompactionLocker.h"
|
||||
#include "VocBase/DatafileHelper.h"
|
||||
#include "VocBase/LogicalCollection.h"
|
||||
#include "VocBase/collection.h"
|
||||
#include "VocBase/datafile.h"
|
||||
#include "VocBase/collection.h"
|
||||
#include "VocBase/LogicalCollection.h"
|
||||
#include "VocBase/vocbase.h"
|
||||
#include "Wal/Logfile.h"
|
||||
#include "Wal/LogfileManager.h"
|
||||
|
@ -447,17 +448,17 @@ static int DumpCollection(TRI_replication_dump_t* dump,
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
int TRI_DumpCollectionReplication(TRI_replication_dump_t* dump,
|
||||
arangodb::LogicalCollection* col,
|
||||
arangodb::LogicalCollection* collection,
|
||||
TRI_voc_tick_t dataMin,
|
||||
TRI_voc_tick_t dataMax, bool withTicks) {
|
||||
TRI_ASSERT(col != nullptr);
|
||||
TRI_ASSERT(col->_collection != nullptr);
|
||||
TRI_ASSERT(collection != nullptr);
|
||||
TRI_ASSERT(collection->_collection != nullptr);
|
||||
|
||||
// get a custom type handler
|
||||
auto customTypeHandler = dump->_transactionContext->orderCustomTypeHandler();
|
||||
dump->_vpackOptions.customTypeHandler = customTypeHandler.get();
|
||||
|
||||
TRI_collection_t* document = col->_collection;
|
||||
TRI_collection_t* document = collection->_collection;
|
||||
TRI_ASSERT(document != nullptr);
|
||||
|
||||
// create a barrier so the underlying collection is not unloaded
|
||||
|
@ -470,10 +471,10 @@ int TRI_DumpCollectionReplication(TRI_replication_dump_t* dump,
|
|||
// block compaction
|
||||
int res;
|
||||
{
|
||||
READ_LOCKER(locker, document->_compactionLock);
|
||||
CompactionPreventer compactionPreventer(collection);
|
||||
|
||||
try {
|
||||
res = DumpCollection(dump, col, document->_vocbase->id(), document->_info.id(), dataMin, dataMax, withTicks);
|
||||
res = DumpCollection(dump, collection, collection->vocbase()->id(), collection->cid(), dataMin, dataMax, withTicks);
|
||||
} catch (...) {
|
||||
res = TRI_ERROR_INTERNAL;
|
||||
}
|
||||
|
|
|
@ -491,7 +491,7 @@ static int UseCollections(TRI_transaction_t* trx, int nestingLevel) {
|
|||
// read-lock the compaction lock
|
||||
if (!HasHint(trx, TRI_TRANSACTION_HINT_NO_COMPACTION_LOCK)) {
|
||||
if (!trxCollection->_compactionLocked) {
|
||||
trxCollection->_collection->_collection->_compactionLock.readLock();
|
||||
trxCollection->_collection->preventCompaction();
|
||||
trxCollection->_compactionLocked = true;
|
||||
}
|
||||
}
|
||||
|
@ -548,7 +548,7 @@ static int UnuseCollections(TRI_transaction_t* trx, int nestingLevel) {
|
|||
if (trxCollection->_accessType == TRI_TRANSACTION_WRITE &&
|
||||
trxCollection->_compactionLocked) {
|
||||
// read-unlock the compaction lock
|
||||
trxCollection->_collection->_collection->_compactionLock.unlock();
|
||||
trxCollection->_collection->allowCompaction();
|
||||
trxCollection->_compactionLocked = false;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -37,10 +37,11 @@
|
|||
#include "Utils/DatabaseGuard.h"
|
||||
#include "Utils/SingleCollectionTransaction.h"
|
||||
#include "Utils/StandaloneTransactionContext.h"
|
||||
#include "VocBase/CompactionLocker.h"
|
||||
#include "VocBase/DatafileHelper.h"
|
||||
#include "VocBase/DatafileStatistics.h"
|
||||
#include "VocBase/collection.h"
|
||||
#include "VocBase/LogicalCollection.h"
|
||||
#include "VocBase/collection.h"
|
||||
#include "Wal/Logfile.h"
|
||||
#include "Wal/LogfileManager.h"
|
||||
|
||||
|
@ -626,10 +627,9 @@ int CollectorThread::processCollectionOperations(CollectorCache* cache) {
|
|||
// first try to read-lock the compactor-lock, afterwards try to write-lock the
|
||||
// collection
|
||||
// if any locking attempt fails, release and try again next time
|
||||
|
||||
TRY_READ_LOCKER(locker, document->_compactionLock);
|
||||
TryCompactionPreventer compactionPreventer(collection);
|
||||
|
||||
if (!locker.isLocked()) {
|
||||
if (!compactionPreventer.isLocked()) {
|
||||
return TRI_ERROR_LOCK_TIMEOUT;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue