1
0
Fork 0

moved compaction info into StorageEngine

This commit is contained in:
jsteemann 2016-08-15 18:35:22 +02:00
parent 9b0ba1ebb9
commit 680042b062
11 changed files with 391 additions and 292 deletions

View File

@ -40,13 +40,14 @@
#include "Rest/Version.h"
#include "RestServer/DatabaseFeature.h"
#include "RestServer/ServerIdFeature.h"
#include "StorageEngine/EngineSelectorFeature.h"
#include "StorageEngine/StorageEngine.h"
#include "Utils/CollectionGuard.h"
#include "Utils/CollectionKeys.h"
#include "Utils/CollectionKeysRepository.h"
#include "Utils/CollectionNameResolver.h"
#include "Utils/StandaloneTransactionContext.h"
#include "Utils/TransactionContext.h"
#include "VocBase/CompactorThread.h"
#include "VocBase/replication-applier.h"
#include "VocBase/replication-dump.h"
#include "VocBase/ticks.h"
@ -561,7 +562,8 @@ void RestReplicationHandler::handleCommandBatch() {
VelocyPackHelper::getNumericValue<double>(input->slice(), "ttl", 0);
TRI_voc_tick_t id;
int res = TRI_InsertBlockerCompactorVocBase(_vocbase, expires, &id);
StorageEngine* engine = EngineSelectorFeature::ENGINE;
int res = engine->insertCompactionBlocker(_vocbase, expires, id);
if (res != TRI_ERROR_NO_ERROR) {
generateError(GeneralResponse::responseCode(res), res);
@ -599,7 +601,8 @@ void RestReplicationHandler::handleCommandBatch() {
VelocyPackHelper::getNumericValue<double>(input->slice(), "ttl", 0);
// now extend the blocker
int res = TRI_TouchBlockerCompactorVocBase(_vocbase, id, expires);
StorageEngine* engine = EngineSelectorFeature::ENGINE;
int res = engine->extendCompactionBlocker(_vocbase, id, expires);
if (res == TRI_ERROR_NO_ERROR) {
setResponseCode(GeneralResponse::ResponseCode::NO_CONTENT);
@ -614,7 +617,8 @@ void RestReplicationHandler::handleCommandBatch() {
TRI_voc_tick_t id =
static_cast<TRI_voc_tick_t>(StringUtils::uint64(suffix[1]));
int res = TRI_RemoveBlockerCompactorVocBase(_vocbase, id);
StorageEngine* engine = EngineSelectorFeature::ENGINE;
int res = engine->removeCompactionBlocker(_vocbase, id);
if (res == TRI_ERROR_NO_ERROR) {
setResponseCode(GeneralResponse::ResponseCode::NO_CONTENT);
@ -2772,8 +2776,9 @@ void RestReplicationHandler::handleCommandCreateKeys() {
TRI_ASSERT(col != nullptr);
// turn off the compaction for the collection
StorageEngine* engine = EngineSelectorFeature::ENGINE;
TRI_voc_tick_t id;
res = TRI_InsertBlockerCompactorVocBase(_vocbase, 1200.0, &id);
res = engine->insertCompactionBlocker(_vocbase, 1200.0, id);
if (res != TRI_ERROR_NO_ERROR) {
THROW_ARANGO_EXCEPTION(res);

View File

@ -1298,3 +1298,159 @@ VocbaseCollectionInfo MMFilesEngine::loadCollectionInfo(TRI_vocbase_t* vocbase,
}
return info;
}
/// @brief remove data of expired compaction blockers
bool MMFilesEngine::cleanupCompactionBlockers(TRI_vocbase_t* vocbase) {
// check if we can instantly acquire the lock
TRY_WRITE_LOCKER(locker, _compactionBlockersLock);
if (!locker.isLocked()) {
// couldn't acquire lock
return false;
}
auto it = _compactionBlockers.find(vocbase->id());
if (it == _compactionBlockers.end()) {
// no entry for this database
return true;
}
// we are now holding the write lock
double now = TRI_microtime();
size_t n = (*it).second.size();
for (size_t i = 0; i < n; /* no hoisting */) {
auto& blocker = (*it).second[i];
if (blocker._expires < now) {
(*it).second.erase((*it).second.begin() + i);
n--;
} else {
i++;
}
}
if ((*it).second.empty()) {
// remove last element
_compactionBlockers.erase(it);
}
return true;
}
/// @brief insert a compaction blocker
int MMFilesEngine::insertCompactionBlocker(TRI_vocbase_t* vocbase, double ttl,
TRI_voc_tick_t& id) {
id = 0;
if (ttl <= 0.0) {
return TRI_ERROR_BAD_PARAMETER;
}
CompactionBlocker blocker(TRI_NewTickServer(), TRI_microtime() + ttl);
{
WRITE_LOCKER_EVENTUAL(locker, _compactionBlockersLock, 1000);
auto it = _compactionBlockers.find(vocbase->id());
if (it == _compactionBlockers.end()) {
it = _compactionBlockers.emplace(vocbase->id(), std::vector<CompactionBlocker>()).first;
}
(*it).second.emplace_back(blocker);
}
id = blocker._id;
return TRI_ERROR_NO_ERROR;
}
/// @brief touch an existing compaction blocker
int MMFilesEngine::extendCompactionBlocker(TRI_vocbase_t* vocbase, TRI_voc_tick_t id,
double ttl) {
if (ttl <= 0.0) {
return TRI_ERROR_BAD_PARAMETER;
}
WRITE_LOCKER_EVENTUAL(locker, _compactionBlockersLock, 1000);
auto it = _compactionBlockers.find(vocbase->id());
if (it == _compactionBlockers.end()) {
return TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND;
}
for (auto& blocker : (*it).second) {
if (blocker._id == id) {
blocker._expires = TRI_microtime() + ttl;
return TRI_ERROR_NO_ERROR;
}
}
return TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND;
}
/// @brief remove an existing compaction blocker
int MMFilesEngine::removeCompactionBlocker(TRI_vocbase_t* vocbase,
TRI_voc_tick_t id) {
WRITE_LOCKER_EVENTUAL(locker, _compactionBlockersLock, 1000);
auto it = _compactionBlockers.find(vocbase->id());
if (it == _compactionBlockers.end()) {
return TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND;
}
size_t const n = (*it).second.size();
for (size_t i = 0; i < n; ++i) {
auto& blocker = (*it).second[i];
if (blocker._id == id) {
(*it).second.erase((*it).second.begin() + i);
if ((*it).second.empty()) {
// remove last item
_compactionBlockers.erase(it);
}
return TRI_ERROR_NO_ERROR;
}
}
return TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND;
}
void MMFilesEngine::preventCompaction(TRI_vocbase_t* vocbase,
std::function<void(TRI_vocbase_t*)> const& callback) {
WRITE_LOCKER_EVENTUAL(locker, _compactionBlockersLock, 5000);
callback(vocbase);
}
bool MMFilesEngine::tryPreventCompaction(TRI_vocbase_t* vocbase,
std::function<void(TRI_vocbase_t*)> const& callback,
bool checkForActiveBlockers) {
TRY_WRITE_LOCKER(locker, _compactionBlockersLock);
if (locker.isLocked()) {
if (checkForActiveBlockers) {
double const now = TRI_microtime();
// check if we have a still-valid compaction blocker
auto it = _compactionBlockers.find(vocbase->id());
if (it != _compactionBlockers.end()) {
for (auto const& blocker : (*it).second) {
if (blocker._expires > now) {
// found a compaction blocker
return false;
}
}
}
}
callback(vocbase);
return true;
}
return false;
}

View File

@ -220,6 +220,27 @@ class MMFilesEngine final : public StorageEngine {
/// @brief scans a collection and locates all files
MMFilesEngineCollectionFiles scanCollectionDirectory(std::string const& path);
/// @brief remove data of expired compaction blockers
bool cleanupCompactionBlockers(TRI_vocbase_t* vocbase) override;
/// @brief insert a compaction blocker
int insertCompactionBlocker(TRI_vocbase_t* vocbase, double ttl, TRI_voc_tick_t& id) override;
/// @brief touch an existing compaction blocker
int extendCompactionBlocker(TRI_vocbase_t* vocbase, TRI_voc_tick_t id, double ttl) override;
/// @brief remove an existing compaction blocker
int removeCompactionBlocker(TRI_vocbase_t* vocbase, TRI_voc_tick_t id) override;
/// @brief a callback function that is run while it is guaranteed that there is no compaction ongoing
void preventCompaction(TRI_vocbase_t* vocbase,
std::function<void(TRI_vocbase_t*)> const& callback) override;
/// @brief a callback function that is run there is no compaction ongoing
bool tryPreventCompaction(TRI_vocbase_t* vocbase,
std::function<void(TRI_vocbase_t*)> const& callback,
bool checkForActiveBlockers) override;
private:
void verifyDirectories();
@ -285,6 +306,19 @@ class MMFilesEngine final : public StorageEngine {
std::vector<std::pair<std::string, std::string>> _deleted;
std::unordered_map<TRI_voc_tick_t, std::unordered_map<TRI_voc_cid_t, std::string>> _collectionPaths;
struct CompactionBlocker {
CompactionBlocker(TRI_voc_tick_t id, double expires) : _id(id), _expires(expires) {}
CompactionBlocker() = delete;
TRI_voc_tick_t _id;
double _expires;
};
// lock for compaction blockers
arangodb::basics::ReadWriteLock _compactionBlockersLock;
// cross-database map of compaction blockers
std::unordered_map<TRI_voc_tick_t, std::vector<CompactionBlocker>> _compactionBlockers;
};
}

View File

@ -196,6 +196,37 @@ class OtherEngine final : public StorageEngine {
// from the storage engine's realm
void removeDocumentRevision(TRI_voc_tick_t databaseId, TRI_voc_cid_t collectionId,
arangodb::velocypack::Slice const& document) override;
/// @brief remove data of expired compaction blockers
bool cleanupCompactionBlockers(TRI_vocbase_t* vocbase) override { return false; }
/// @brief insert a compaction blocker
int insertCompactionBlocker(TRI_vocbase_t* vocbase, double ttl, TRI_voc_tick_t& id) override {
id = 0;
return TRI_ERROR_NO_ERROR;
}
/// @brief touch an existing compaction blocker
int extendCompactionBlocker(TRI_vocbase_t* vocbase, TRI_voc_tick_t id, double ttl) override {
return TRI_ERROR_NO_ERROR;
}
/// @brief remove an existing compaction blocker
int removeCompactionBlocker(TRI_vocbase_t* vocbase, TRI_voc_tick_t id) override {
return TRI_ERROR_NO_ERROR;
}
/// @brief a callback function that is run while it is guaranteed that there is no compaction ongoing
void preventCompaction(TRI_vocbase_t* vocbase,
std::function<void(TRI_vocbase_t*)> const& callback) override {
}
/// @brief a callback function that is run there is no compaction ongoing
bool tryPreventCompaction(TRI_vocbase_t* vocbase,
std::function<void(TRI_vocbase_t*)> const& callback,
bool checkForActiveBlockers) override {
return true;
}
public:
static std::string const EngineName;

View File

@ -210,7 +210,28 @@ class StorageEngine : public application_features::ApplicationFeature {
// from the storage engine's realm
virtual void removeDocumentRevision(TRI_voc_tick_t databaseId, TRI_voc_cid_t collectionId,
arangodb::velocypack::Slice const& document) = 0;
/// @brief remove data of expired compaction blockers
virtual bool cleanupCompactionBlockers(TRI_vocbase_t* vocbase) = 0;
/// @brief insert a compaction blocker
virtual int insertCompactionBlocker(TRI_vocbase_t* vocbase, double ttl, TRI_voc_tick_t& id) = 0;
/// @brief touch an existing compaction blocker
virtual int extendCompactionBlocker(TRI_vocbase_t* vocbase, TRI_voc_tick_t id, double ttl) = 0;
/// @brief remove an existing compaction blocker
virtual int removeCompactionBlocker(TRI_vocbase_t* vocbase, TRI_voc_tick_t id) = 0;
/// @brief a callback function that is run while it is guaranteed that there is no compaction ongoing
virtual void preventCompaction(TRI_vocbase_t* vocbase,
std::function<void(TRI_vocbase_t*)> const& callback) = 0;
/// @brief a callback function that is run there is no compaction ongoing
virtual bool tryPreventCompaction(TRI_vocbase_t* vocbase,
std::function<void(TRI_vocbase_t*)> const& callback,
bool checkForActiveBlockers) = 0;
protected:
TRI_vocbase_col_t* registerCollection(bool doLock, TRI_vocbase_t* vocbase, TRI_col_type_e type, TRI_voc_cid_t cid,
std::string const& name, TRI_voc_cid_t planId, std::string const& path) {

View File

@ -24,10 +24,11 @@
#include "CollectionExport.h"
#include "Basics/WriteLocker.h"
#include "Indexes/PrimaryIndex.h"
#include "StorageEngine/EngineSelectorFeature.h"
#include "StorageEngine/StorageEngine.h"
#include "Utils/CollectionGuard.h"
#include "Utils/SingleCollectionTransaction.h"
#include "Utils/StandaloneTransactionContext.h"
#include "VocBase/CompactorThread.h"
#include "VocBase/Ditch.h"
#include "VocBase/collection.h"
#include "VocBase/vocbase.h"
@ -64,13 +65,13 @@ CollectionExport::~CollectionExport() {
}
void CollectionExport::run(uint64_t maxWaitTime, size_t limit) {
{
// try to acquire the exclusive lock on the compaction
WRITE_LOCKER_EVENTUAL(locker, _document->_vocbase->_compactionBlockers._lock, 5000);
StorageEngine* engine = EngineSelectorFeature::ENGINE;
// try to acquire the exclusive lock on the compaction
engine->preventCompaction(_document->_vocbase, [this](TRI_vocbase_t* vocbase) {
// create a ditch under the compaction lock
_ditch = _document->ditches()->createDocumentDitch(false, __FILE__, __LINE__);
}
});
// now we either have a ditch or not
if (_ditch == nullptr) {

View File

@ -24,10 +24,11 @@
#include "CollectionKeys.h"
#include "Basics/StaticStrings.h"
#include "Basics/WriteLocker.h"
#include "StorageEngine/EngineSelectorFeature.h"
#include "StorageEngine/StorageEngine.h"
#include "Utils/CollectionGuard.h"
#include "Utils/SingleCollectionTransaction.h"
#include "Utils/StandaloneTransactionContext.h"
#include "VocBase/CompactorThread.h"
#include "VocBase/DatafileHelper.h"
#include "VocBase/Ditch.h"
#include "VocBase/collection.h"
@ -70,7 +71,8 @@ CollectionKeys::CollectionKeys(TRI_vocbase_t* vocbase, std::string const& name,
CollectionKeys::~CollectionKeys() {
// remove compaction blocker
TRI_RemoveBlockerCompactorVocBase(_vocbase, _blockerId);
StorageEngine* engine = EngineSelectorFeature::ENGINE;
engine->removeCompactionBlocker(_vocbase, _blockerId);
delete _markers;
@ -88,14 +90,12 @@ CollectionKeys::~CollectionKeys() {
void CollectionKeys::create(TRI_voc_tick_t maxTick) {
arangodb::wal::LogfileManager::instance()->waitForCollectorQueue(
_document->_info.id(), 30.0);
{
// try to acquire the exclusive lock on the compaction
WRITE_LOCKER_EVENTUAL(locker, _document->_vocbase->_compactionBlockers._lock, 5000);
StorageEngine* engine = EngineSelectorFeature::ENGINE;
engine->preventCompaction(_document->_vocbase, [this](TRI_vocbase_t* vocbase) {
// create a ditch under the compaction lock
_ditch = _document->ditches()->createDocumentDitch(false, __FILE__, __LINE__);
}
});
// now we either have a ditch or not
if (_ditch == nullptr) {

View File

@ -29,8 +29,9 @@
#include "Basics/WriteLocker.h"
#include "Basics/files.h"
#include "Logger/Logger.h"
#include "StorageEngine/EngineSelectorFeature.h"
#include "StorageEngine/StorageEngine.h"
#include "Utils/CursorRepository.h"
#include "VocBase/CompactorThread.h"
#include "VocBase/Ditch.h"
#include "VocBase/collection.h"
#include "Wal/LogfileManager.h"
@ -49,6 +50,7 @@ void CleanupThread::signal() {
/// @brief cleanup event loop
void CleanupThread::run() {
StorageEngine* engine = EngineSelectorFeature::ENGINE;
uint64_t iterations = 0;
std::vector<TRI_vocbase_col_t*> collections;
@ -70,53 +72,49 @@ void CleanupThread::run() {
// check if we can get the compactor lock exclusively
// check if compaction is currently disallowed
{
TRY_WRITE_LOCKER(locker, _vocbase->_compactionBlockers._lock);
if (locker.isLocked()) {
try {
// copy all collections
collections = _vocbase->collections();
} catch (...) {
collections.clear();
}
for (auto& collection : collections) {
TRI_ASSERT(collection != nullptr);
TRI_collection_t* document;
{
READ_LOCKER(readLocker, collection->_lock);
document = collection->_collection;
}
if (document == nullptr) {
// collection currently not loaded
continue;
}
TRI_ASSERT(document != nullptr);
// we're the only ones that can unload the collection, so using
// the collection pointer outside the lock is ok
// maybe cleanup indexes, unload the collection or some datafiles
// clean indexes?
if (iterations % cleanupIndexIterations() == 0) {
document->cleanupIndexes();
}
cleanupCollection(collection, document);
}
engine->tryPreventCompaction(_vocbase, [this, &collections, &iterations](TRI_vocbase_t* vocbase) {
try {
// copy all collections
collections = vocbase->collections();
} catch (...) {
collections.clear();
}
}
for (auto& collection : collections) {
TRI_ASSERT(collection != nullptr);
TRI_collection_t* document;
{
READ_LOCKER(readLocker, collection->_lock);
document = collection->_collection;
}
if (document == nullptr) {
// collection currently not loaded
continue;
}
TRI_ASSERT(document != nullptr);
// we're the only ones that can unload the collection, so using
// the collection pointer outside the lock is ok
// maybe cleanup indexes, unload the collection or some datafiles
// clean indexes?
if (iterations % cleanupIndexIterations() == 0) {
document->cleanupIndexes();
}
cleanupCollection(collection, document);
}
}, false);
// server is still running, clean up unused cursors
if (iterations % cleanupCursorIterations() == 0) {
cleanupCursors(false);
// clean up expired compactor locks
TRI_CleanupCompactorVocBase(_vocbase);
engine->cleanupCompactionBlockers(_vocbase);
}
if (state == TRI_vocbase_t::State::NORMAL) {

View File

@ -31,6 +31,8 @@
#include "Basics/memory-map.h"
#include "Logger/Logger.h"
#include "Indexes/PrimaryIndex.h"
#include "StorageEngine/EngineSelectorFeature.h"
#include "StorageEngine/StorageEngine.h"
#include "Utils/SingleCollectionTransaction.h"
#include "Utils/StandaloneTransactionContext.h"
#include "VocBase/DatafileHelper.h"
@ -808,6 +810,7 @@ void CompactorThread::signal() {
}
void CompactorThread::run() {
StorageEngine* engine = EngineSelectorFeature::ENGINE;
std::vector<TRI_vocbase_col_t*> collections;
int numCompacted = 0;
@ -816,100 +819,95 @@ void CompactorThread::run() {
// compaction loop
TRI_vocbase_t::State state = _vocbase->state();
{
// check if compaction is currently disallowed
TRY_WRITE_LOCKER(compactionLocker, _vocbase->_compactionBlockers._lock);
engine->tryPreventCompaction(_vocbase, [this, &numCompacted, &collections](TRI_vocbase_t* vocbase) {
// compaction is currently allowed
numCompacted = 0;
try {
// copy all collections
collections = _vocbase->collections();
} catch (...) {
collections.clear();
}
if (compactionLocker.isLocked() && !hasActiveBlockers()) {
// compaction is currently allowed
numCompacted = 0;
try {
// copy all collections
collections = _vocbase->collections();
} catch (...) {
collections.clear();
}
bool worked;
bool worked;
for (auto& collection : collections) {
{
TRY_READ_LOCKER(readLocker, collection->_lock);
for (auto& collection : collections) {
{
TRY_READ_LOCKER(readLocker, collection->_lock);
if (!readLocker.isLocked()) {
// if we can't acquire the read lock instantly, we continue directly
// we don't want to stall here for too long
continue;
}
TRI_collection_t* document = collection->_collection;
if (document == nullptr) {
continue;
}
worked = false;
bool doCompact = document->_info.doCompact();
// for document collection, compactify datafiles
if (collection->status() == TRI_VOC_COL_STATUS_LOADED && doCompact) {
// check whether someone else holds a read-lock on the compaction
// lock
TRY_WRITE_LOCKER(locker, document->_compactionLock);
if (!locker.isLocked()) {
// someone else is holding the compactor lock, we'll not compact
continue;
}
try {
double const now = TRI_microtime();
if (document->_lastCompaction + compactionCollectionInterval() <= now) {
auto ce = document->ditches()->createCompactionDitch(__FILE__,
__LINE__);
if (ce == nullptr) {
// out of memory
LOG_TOPIC(WARN, Logger::COMPACTOR) << "out of memory when trying to create compaction ditch";
} else {
try {
bool wasBlocked = false;
worked = compactCollection(document, wasBlocked);
if (!worked && !wasBlocked) {
// set compaction stamp
document->_lastCompaction = now;
}
// if we worked or were blocked, then we don't set the compaction stamp to
// force another round of compaction
} catch (...) {
LOG_TOPIC(ERR, Logger::COMPACTOR) << "an unknown exception occurred during compaction";
// in case an error occurs, we must still free this ditch
}
document->ditches()->freeDitch(ce);
}
}
} catch (...) {
// in case an error occurs, we must still relase the lock
LOG_TOPIC(ERR, Logger::COMPACTOR) << "an unknown exception occurred during compaction";
}
}
} // end of lock
if (worked) {
++numCompacted;
// signal the cleanup thread that we worked and that it can now wake
// up
CONDITION_LOCKER(locker, _condition);
locker.signal();
if (!readLocker.isLocked()) {
// if we can't acquire the read lock instantly, we continue directly
// we don't want to stall here for too long
continue;
}
TRI_collection_t* document = collection->_collection;
if (document == nullptr) {
continue;
}
worked = false;
bool doCompact = document->_info.doCompact();
// for document collection, compactify datafiles
if (collection->status() == TRI_VOC_COL_STATUS_LOADED && doCompact) {
// check whether someone else holds a read-lock on the compaction
// lock
TRY_WRITE_LOCKER(locker, document->_compactionLock);
if (!locker.isLocked()) {
// someone else is holding the compactor lock, we'll not compact
continue;
}
try {
double const now = TRI_microtime();
if (document->_lastCompaction + compactionCollectionInterval() <= now) {
auto ce = document->ditches()->createCompactionDitch(__FILE__,
__LINE__);
if (ce == nullptr) {
// out of memory
LOG_TOPIC(WARN, Logger::COMPACTOR) << "out of memory when trying to create compaction ditch";
} else {
try {
bool wasBlocked = false;
worked = compactCollection(document, wasBlocked);
if (!worked && !wasBlocked) {
// set compaction stamp
document->_lastCompaction = now;
}
// if we worked or were blocked, then we don't set the compaction stamp to
// force another round of compaction
} catch (...) {
LOG_TOPIC(ERR, Logger::COMPACTOR) << "an unknown exception occurred during compaction";
// in case an error occurs, we must still free this ditch
}
document->ditches()->freeDitch(ce);
}
}
} catch (...) {
// in case an error occurs, we must still relase the lock
LOG_TOPIC(ERR, Logger::COMPACTOR) << "an unknown exception occurred during compaction";
}
}
} // end of lock
if (worked) {
++numCompacted;
// signal the cleanup thread that we worked and that it can now wake
// up
CONDITION_LOCKER(locker, _condition);
locker.signal();
}
}
}
}, true);
if (numCompacted > 0) {
// no need to sleep long or go into wait state if we worked.
@ -930,21 +928,6 @@ void CompactorThread::run() {
LOG_TOPIC(DEBUG, Logger::COMPACTOR) << "shutting down compactor thread";
}
/// @brief check whether there is an active compaction blocker
/// note that this must be called while holding the compactionBlockers lock
bool CompactorThread::hasActiveBlockers() const {
double const now = TRI_microtime();
// check if we have a still-valid compaction blocker
for (auto const& blocker : _vocbase->_compactionBlockers._data) {
if (blocker._expires > now) {
// found a compaction blocker
return true;
}
}
return false;
}
/// @brief determine the number of documents in the collection
uint64_t CompactorThread::getNumberOfDocuments(TRI_collection_t* document) {
SingleCollectionTransaction trx(StandaloneTransactionContext::Create(_vocbase), document->_info.id(), TRI_TRANSACTION_READ);
@ -976,101 +959,3 @@ int CompactorThread::copyMarker(TRI_collection_t* document,
return TRI_WriteElementDatafile(compactor, *result, marker, false);
}
/// @brief remove data of expired compaction blockers
bool TRI_CleanupCompactorVocBase(TRI_vocbase_t* vocbase) {
// check if we can instantly acquire the lock
TRY_WRITE_LOCKER(locker, vocbase->_compactionBlockers._lock);
if (!locker.isLocked()) {
// couldn't acquire lock
return false;
}
// we are now holding the write lock
double now = TRI_microtime();
size_t n = vocbase->_compactionBlockers._data.size();
for (size_t i = 0; i < n; /* no hoisting */) {
auto& blocker = vocbase->_compactionBlockers._data[i];
if (blocker._expires < now) {
vocbase->_compactionBlockers._data.erase(vocbase->_compactionBlockers._data.begin() + i);
n--;
} else {
i++;
}
}
return true;
}
/// @brief insert a compaction blocker
int TRI_InsertBlockerCompactorVocBase(TRI_vocbase_t* vocbase, double lifetime,
TRI_voc_tick_t* id) {
if (lifetime <= 0.0) {
return TRI_ERROR_BAD_PARAMETER;
}
compaction_blocker_t blocker;
blocker._id = TRI_NewTickServer();
blocker._expires = TRI_microtime() + lifetime;
int res = TRI_ERROR_NO_ERROR;
{
WRITE_LOCKER_EVENTUAL(locker, vocbase->_compactionBlockers._lock, 1000);
try {
vocbase->_compactionBlockers._data.push_back(blocker);
} catch (...) {
res = TRI_ERROR_OUT_OF_MEMORY;
}
}
if (res != TRI_ERROR_NO_ERROR) {
return res;
}
*id = blocker._id;
return TRI_ERROR_NO_ERROR;
}
/// @brief touch an existing compaction blocker
int TRI_TouchBlockerCompactorVocBase(TRI_vocbase_t* vocbase, TRI_voc_tick_t id,
double lifetime) {
if (lifetime <= 0.0) {
return TRI_ERROR_BAD_PARAMETER;
}
WRITE_LOCKER_EVENTUAL(locker, vocbase->_compactionBlockers._lock, 1000);
for (auto& blocker : vocbase->_compactionBlockers._data) {
if (blocker._id == id) {
blocker._expires = TRI_microtime() + lifetime;
return TRI_ERROR_NO_ERROR;
}
}
return TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND;
}
/// @brief remove an existing compaction blocker
int TRI_RemoveBlockerCompactorVocBase(TRI_vocbase_t* vocbase,
TRI_voc_tick_t id) {
WRITE_LOCKER_EVENTUAL(locker, vocbase->_compactionBlockers._lock, 1000);
size_t const n = vocbase->_compactionBlockers._data.size();
for (size_t i = 0; i < n; ++i) {
auto& blocker = vocbase->_compactionBlockers._data[i];
if (blocker._id == id) {
vocbase->_compactionBlockers._data.erase(vocbase->_compactionBlockers._data.begin() + i);
return TRI_ERROR_NO_ERROR;
}
}
return TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND;
}

View File

@ -101,10 +101,6 @@ class CompactorThread : public Thread {
/// @brief remove an empty datafile
int removeDatafile(TRI_collection_t* document, TRI_datafile_t* datafile);
/// @brief check whether there is an active compaction blocker
/// note that this must be called while holding the compactionBlockers lock
bool hasActiveBlockers() const;
/// @brief determine the number of documents in the collection
uint64_t getNumberOfDocuments(TRI_collection_t* document);
@ -157,16 +153,4 @@ class CompactorThread : public Thread {
}
/// @brief remove data of expired compaction blockers
bool TRI_CleanupCompactorVocBase(TRI_vocbase_t*);
/// @brief insert a compaction blocker
int TRI_InsertBlockerCompactorVocBase(TRI_vocbase_t*, double, TRI_voc_tick_t*);
/// @brief touch an existing compaction blocker
int TRI_TouchBlockerCompactorVocBase(TRI_vocbase_t*, TRI_voc_tick_t, double);
/// @brief remove an existing compaction blocker
int TRI_RemoveBlockerCompactorVocBase(TRI_vocbase_t*, TRI_voc_tick_t);
#endif

View File

@ -58,20 +58,6 @@ class StorageEngine;
class Thread;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief compaction blocker entry
////////////////////////////////////////////////////////////////////////////////
struct compaction_blocker_t {
TRI_voc_tick_t _id;
double _expires;
};
struct compaction_blockers_t {
arangodb::basics::ReadWriteLock _lock;
std::vector<compaction_blocker_t> _data;
};
////////////////////////////////////////////////////////////////////////////////
/// @brief name of the system database
////////////////////////////////////////////////////////////////////////////////
@ -215,8 +201,6 @@ struct TRI_vocbase_t {
std::unique_ptr<arangodb::CompactorThread> _compactorThread;
std::unique_ptr<arangodb::CleanupThread> _cleanupThread;
compaction_blockers_t _compactionBlockers;
public:
/// @brief checks if a database name is allowed
/// returns true if the name is allowed and false otherwise