mirror of https://gitee.com/bigwinds/arangodb
fixed tests
This commit is contained in:
parent
ac1e4c1ef7
commit
5d640b7144
|
@ -443,8 +443,6 @@ int DatabaseFeature::createDatabaseCoordinator(TRI_voc_tick_t id, std::string co
|
||||||
|
|
||||||
// name not yet in use, release the read lock
|
// name not yet in use, release the read lock
|
||||||
auto vocbase = std::make_unique<TRI_vocbase_t>(TRI_VOCBASE_TYPE_COORDINATOR, id, name);
|
auto vocbase = std::make_unique<TRI_vocbase_t>(TRI_VOCBASE_TYPE_COORDINATOR, id, name);
|
||||||
// vocbase is now active
|
|
||||||
vocbase->setState(TRI_vocbase_t::State::NORMAL);
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
vocbase->addReplicationApplier(TRI_CreateReplicationApplier(vocbase.get()));
|
vocbase->addReplicationApplier(TRI_CreateReplicationApplier(vocbase.get()));
|
||||||
|
|
|
@ -949,9 +949,6 @@ TRI_vocbase_t* MMFilesEngine::openExistingDatabase(TRI_voc_tick_t id, std::strin
|
||||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_OUT_OF_MEMORY);
|
THROW_ARANGO_EXCEPTION(TRI_ERROR_OUT_OF_MEMORY);
|
||||||
}
|
}
|
||||||
|
|
||||||
// vocbase is now active
|
|
||||||
vocbase->setState(TRI_vocbase_t::State::NORMAL);
|
|
||||||
|
|
||||||
return vocbase.release();
|
return vocbase.release();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1180,20 +1180,12 @@ static void JS_NameVocbaseCol(v8::FunctionCallbackInfo<v8::Value> const& args) {
|
||||||
TRI_V8_THROW_EXCEPTION_INTERNAL("cannot extract collection");
|
TRI_V8_THROW_EXCEPTION_INTERNAL("cannot extract collection");
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!collection->_isLocal) {
|
|
||||||
std::string const collectionName(collection->name());
|
std::string const collectionName(collection->name());
|
||||||
v8::Handle<v8::Value> result = TRI_V8_STRING(collectionName.c_str());
|
|
||||||
TRI_V8_RETURN(result);
|
if (collectionName.empty()) {
|
||||||
|
TRI_V8_THROW_EXCEPTION(TRI_ERROR_ARANGO_COLLECTION_NOT_FOUND);
|
||||||
}
|
}
|
||||||
|
v8::Handle<v8::Value> result = TRI_V8_STD_STRING(collectionName);
|
||||||
// this copies the name into a new place so we can safely access it later
|
|
||||||
// if we wouldn't do this, we would risk other threads modifying the name
|
|
||||||
// while
|
|
||||||
// we're reading it
|
|
||||||
std::string name(collection->_vocbase->collectionName(collection->_cid));
|
|
||||||
|
|
||||||
v8::Handle<v8::Value> result = TRI_V8_STD_STRING(name);
|
|
||||||
|
|
||||||
TRI_V8_RETURN(result);
|
TRI_V8_RETURN(result);
|
||||||
TRI_V8_TRY_CATCH_END
|
TRI_V8_TRY_CATCH_END
|
||||||
}
|
}
|
||||||
|
|
|
@ -42,6 +42,11 @@ CleanupThread::CleanupThread(TRI_vocbase_t* vocbase)
|
||||||
|
|
||||||
CleanupThread::~CleanupThread() { shutdown(); }
|
CleanupThread::~CleanupThread() { shutdown(); }
|
||||||
|
|
||||||
|
void CleanupThread::signal() {
|
||||||
|
CONDITION_LOCKER(locker, _condition);
|
||||||
|
locker.signal();
|
||||||
|
}
|
||||||
|
|
||||||
/// @brief cleanup event loop
|
/// @brief cleanup event loop
|
||||||
void CleanupThread::run() {
|
void CleanupThread::run() {
|
||||||
uint64_t iterations = 0;
|
uint64_t iterations = 0;
|
||||||
|
@ -51,12 +56,17 @@ void CleanupThread::run() {
|
||||||
while (true) {
|
while (true) {
|
||||||
// keep initial _state value as vocbase->_state might change during cleanup
|
// keep initial _state value as vocbase->_state might change during cleanup
|
||||||
// loop
|
// loop
|
||||||
|
TRI_vocbase_t::State state = _vocbase->state();
|
||||||
|
|
||||||
++iterations;
|
++iterations;
|
||||||
|
|
||||||
// cursorss must be cleaned before collections are handled
|
if (state == TRI_vocbase_t::State::SHUTDOWN_COMPACTOR ||
|
||||||
|
state == TRI_vocbase_t::State::SHUTDOWN_CLEANUP) {
|
||||||
|
// cursors must be cleaned before collections are handled
|
||||||
// otherwise the cursors may still hold barriers on collections
|
// otherwise the cursors may still hold barriers on collections
|
||||||
// and collections cannot be closed properly
|
// and collections cannot be closed properly
|
||||||
cleanupCursors(true);
|
cleanupCursors(true);
|
||||||
|
}
|
||||||
|
|
||||||
// check if we can get the compactor lock exclusively
|
// check if we can get the compactor lock exclusively
|
||||||
// check if compaction is currently disallowed
|
// check if compaction is currently disallowed
|
||||||
|
@ -109,14 +119,17 @@ void CleanupThread::run() {
|
||||||
TRI_CleanupCompactorVocBase(_vocbase);
|
TRI_CleanupCompactorVocBase(_vocbase);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (isStopping()) {
|
if (state == TRI_vocbase_t::State::NORMAL) {
|
||||||
// server shutdown
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
{
|
|
||||||
CONDITION_LOCKER(locker, _condition);
|
CONDITION_LOCKER(locker, _condition);
|
||||||
locker.wait(cleanupInterval());
|
locker.wait(cleanupInterval());
|
||||||
|
} else {
|
||||||
|
// prevent busy waiting
|
||||||
|
usleep(10000);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (state == TRI_vocbase_t::State::SHUTDOWN_CLEANUP) {
|
||||||
|
// server shutdown
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -39,7 +39,7 @@ class CleanupThread : public Thread {
|
||||||
explicit CleanupThread(TRI_vocbase_t* vocbase);
|
explicit CleanupThread(TRI_vocbase_t* vocbase);
|
||||||
~CleanupThread();
|
~CleanupThread();
|
||||||
|
|
||||||
void signal() { _condition.signal(); }
|
void signal();
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
void run() override;
|
void run() override;
|
||||||
|
|
|
@ -63,72 +63,6 @@ static char const* ReasonDeadCount =
|
||||||
static char const* ReasonNothingToCompact =
|
static char const* ReasonNothingToCompact =
|
||||||
"checked datafiles, but no compaction opportunity found";
|
"checked datafiles, but no compaction opportunity found";
|
||||||
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
|
||||||
/// @brief minimum size of dead data (in bytes) in a datafile that will make
|
|
||||||
/// the datafile eligible for compaction at all.
|
|
||||||
///
|
|
||||||
/// Any datafile with less dead data than the threshold will not become a
|
|
||||||
/// candidate for compaction.
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
|
||||||
|
|
||||||
#define COMPACTOR_DEAD_SIZE_THRESHOLD (1024 * 128)
|
|
||||||
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
|
||||||
/// @brief percentage of dead documents in a datafile that will trigger the
|
|
||||||
/// compaction
|
|
||||||
///
|
|
||||||
/// for example, if the collection contains 800 bytes of alive and 400 bytes of
|
|
||||||
/// dead documents, the share of the dead documents is 400 / (400 + 800) = 33 %.
|
|
||||||
/// if this value if higher than the threshold, the datafile will be compacted
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
|
||||||
|
|
||||||
#define COMPACTOR_DEAD_SIZE_SHARE (0.1)
|
|
||||||
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
|
||||||
/// @brief minimum number of deletion marker in file from which on we will
|
|
||||||
/// compact it if nothing else qualifies file for compaction
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
|
||||||
|
|
||||||
#define COMPACTOR_DEAD_THRESHOLD (16384)
|
|
||||||
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
|
||||||
/// @brief maximum number of datafiles to join together in one compaction run
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
|
||||||
|
|
||||||
#define COMPACTOR_MAX_FILES 3
|
|
||||||
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
|
||||||
/// @brief maximum multiple of journal filesize of a compacted file
|
|
||||||
/// a value of 3 means that the maximum filesize of the compacted file is
|
|
||||||
/// 3 x (collection->journalSize)
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
|
||||||
|
|
||||||
#define COMPACTOR_MAX_SIZE_FACTOR (3)
|
|
||||||
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
|
||||||
/// @brief maximum filesize of resulting compacted file
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
|
||||||
|
|
||||||
#define COMPACTOR_MAX_RESULT_FILESIZE (128 * 1024 * 1024)
|
|
||||||
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
|
||||||
/// @brief datafiles smaller than the following value will be merged with others
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
|
||||||
|
|
||||||
#define COMPACTOR_MIN_SIZE (128 * 1024)
|
|
||||||
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
|
||||||
/// @brief re-try compaction of a specific collection in this interval (in s)
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
|
||||||
|
|
||||||
#define COMPACTOR_COLLECTION_INTERVAL (10.0)
|
|
||||||
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
|
||||||
/// @brief compactify interval in microseconds
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
|
||||||
|
|
||||||
static int const COMPACTOR_INTERVAL = (1 * 1000 * 1000);
|
|
||||||
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
/// @brief auxiliary struct used when initializing compaction
|
/// @brief auxiliary struct used when initializing compaction
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
@ -154,15 +88,6 @@ struct compaction_context_t {
|
||||||
bool _keepDeletions;
|
bool _keepDeletions;
|
||||||
};
|
};
|
||||||
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
|
||||||
/// @brief compaction instruction for a single datafile
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
|
||||||
|
|
||||||
struct compaction_info_t {
|
|
||||||
TRI_datafile_t* _datafile;
|
|
||||||
bool _keepDeletions;
|
|
||||||
};
|
|
||||||
|
|
||||||
/// @brief determine the number of documents in the collection
|
/// @brief determine the number of documents in the collection
|
||||||
static uint64_t GetNumberOfDocuments(TRI_collection_t* document) {
|
static uint64_t GetNumberOfDocuments(TRI_collection_t* document) {
|
||||||
TRI_vocbase_t* vocbase = document->_vocbase;
|
TRI_vocbase_t* vocbase = document->_vocbase;
|
||||||
|
@ -491,7 +416,7 @@ static bool CalculateSize(TRI_df_marker_t const* marker, void* data,
|
||||||
|
|
||||||
static compaction_initial_context_t InitCompaction(
|
static compaction_initial_context_t InitCompaction(
|
||||||
arangodb::Transaction* trx, TRI_collection_t* document,
|
arangodb::Transaction* trx, TRI_collection_t* document,
|
||||||
std::vector<compaction_info_t> const& toCompact) {
|
std::vector<CompactorThread::compaction_info_t> const& toCompact) {
|
||||||
compaction_initial_context_t context;
|
compaction_initial_context_t context;
|
||||||
|
|
||||||
memset(&context, 0, sizeof(compaction_initial_context_t));
|
memset(&context, 0, sizeof(compaction_initial_context_t));
|
||||||
|
@ -545,13 +470,10 @@ static compaction_initial_context_t InitCompaction(
|
||||||
return context;
|
return context;
|
||||||
}
|
}
|
||||||
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
/// @brief compact the specified datafiles
|
||||||
/// @brief compact a list of datafiles
|
void CompactorThread::compactDatafiles(TRI_collection_t* document,
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
|
||||||
|
|
||||||
static void CompactifyDatafiles(
|
|
||||||
TRI_collection_t* document,
|
|
||||||
std::vector<compaction_info_t> const& toCompact) {
|
std::vector<compaction_info_t> const& toCompact) {
|
||||||
|
|
||||||
TRI_datafile_t* compactor;
|
TRI_datafile_t* compactor;
|
||||||
compaction_context_t context;
|
compaction_context_t context;
|
||||||
size_t i;
|
size_t i;
|
||||||
|
@ -729,11 +651,8 @@ static void CompactifyDatafiles(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
|
||||||
/// @brief checks all datafiles of a collection
|
/// @brief checks all datafiles of a collection
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
bool CompactorThread::compactCollection(TRI_collection_t* document) {
|
||||||
|
|
||||||
static bool CompactifyDocumentCollection(TRI_collection_t* document) {
|
|
||||||
// we can hopefully get away without the lock here...
|
// we can hopefully get away without the lock here...
|
||||||
// if (! document->isFullyCollected()) {
|
// if (! document->isFullyCollected()) {
|
||||||
// return false;
|
// return false;
|
||||||
|
@ -768,7 +687,7 @@ static bool CompactifyDocumentCollection(TRI_collection_t* document) {
|
||||||
}
|
}
|
||||||
|
|
||||||
std::vector<compaction_info_t> toCompact;
|
std::vector<compaction_info_t> toCompact;
|
||||||
toCompact.reserve(COMPACTOR_MAX_FILES);
|
toCompact.reserve(maxFiles());
|
||||||
|
|
||||||
// now we have datafiles that we can process
|
// now we have datafiles that we can process
|
||||||
size_t const n = datafiles.size();
|
size_t const n = datafiles.size();
|
||||||
|
@ -780,13 +699,12 @@ static bool CompactifyDocumentCollection(TRI_collection_t* document) {
|
||||||
uint64_t const numDocuments = GetNumberOfDocuments(document);
|
uint64_t const numDocuments = GetNumberOfDocuments(document);
|
||||||
|
|
||||||
// get maximum size of result file
|
// get maximum size of result file
|
||||||
uint64_t maxSize = (uint64_t)COMPACTOR_MAX_SIZE_FACTOR *
|
uint64_t maxSize = maxSizeFactor() * (uint64_t)document->_info.maximalSize();
|
||||||
(uint64_t)document->_info.maximalSize();
|
|
||||||
if (maxSize < 8 * 1024 * 1024) {
|
if (maxSize < 8 * 1024 * 1024) {
|
||||||
maxSize = 8 * 1024 * 1024;
|
maxSize = 8 * 1024 * 1024;
|
||||||
}
|
}
|
||||||
if (maxSize >= COMPACTOR_MAX_RESULT_FILESIZE) {
|
if (maxSize >= maxResultFilesize()) {
|
||||||
maxSize = COMPACTOR_MAX_RESULT_FILESIZE;
|
maxSize = maxResultFilesize();
|
||||||
}
|
}
|
||||||
|
|
||||||
if (start >= n || numDocuments == 0) {
|
if (start >= n || numDocuments == 0) {
|
||||||
|
@ -819,7 +737,7 @@ static bool CompactifyDocumentCollection(TRI_collection_t* document) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!doCompact && df->_maximalSize < COMPACTOR_MIN_SIZE && i < n - 1) {
|
if (!doCompact && df->_maximalSize < smallDatafileSize() && i < n - 1) {
|
||||||
// very small datafile and not the last one. let's compact it so it's
|
// very small datafile and not the last one. let's compact it so it's
|
||||||
// merged with others
|
// merged with others
|
||||||
doCompact = true;
|
doCompact = true;
|
||||||
|
@ -836,20 +754,18 @@ static bool CompactifyDocumentCollection(TRI_collection_t* document) {
|
||||||
// compact first datafile(s) if they contain only deletions
|
// compact first datafile(s) if they contain only deletions
|
||||||
doCompact = true;
|
doCompact = true;
|
||||||
reason = ReasonOnlyDeletions;
|
reason = ReasonOnlyDeletions;
|
||||||
} else if (dfi.sizeDead >= (int64_t)COMPACTOR_DEAD_SIZE_THRESHOLD) {
|
} else if (dfi.sizeDead >= deadSizeThreshold()) {
|
||||||
// the size of dead objects is above some threshold
|
// the size of dead objects is above some threshold
|
||||||
doCompact = true;
|
doCompact = true;
|
||||||
reason = ReasonDeadSize;
|
reason = ReasonDeadSize;
|
||||||
} else if (dfi.sizeDead > 0 &&
|
} else if (dfi.sizeDead > 0 &&
|
||||||
(((double)dfi.sizeDead /
|
(((double)dfi.sizeDead /
|
||||||
((double)dfi.sizeDead + (double)dfi.sizeAlive) >=
|
((double)dfi.sizeDead + (double)dfi.sizeAlive) >= deadShare()) ||
|
||||||
COMPACTOR_DEAD_SIZE_SHARE) ||
|
((double)dfi.sizeDead / (double)df->_maximalSize >= deadShare()))) {
|
||||||
((double)dfi.sizeDead / (double)df->_maximalSize >=
|
|
||||||
COMPACTOR_DEAD_SIZE_SHARE))) {
|
|
||||||
// the size of dead objects is above some share
|
// the size of dead objects is above some share
|
||||||
doCompact = true;
|
doCompact = true;
|
||||||
reason = ReasonDeadSizeShare;
|
reason = ReasonDeadSizeShare;
|
||||||
} else if (dfi.numberDead >= (int64_t)COMPACTOR_DEAD_THRESHOLD) {
|
} else if (dfi.numberDead >= deadNumberThreshold()) {
|
||||||
// the number of dead objects is above some threshold
|
// the number of dead objects is above some threshold
|
||||||
doCompact = true;
|
doCompact = true;
|
||||||
reason = ReasonDeadCount;
|
reason = ReasonDeadCount;
|
||||||
|
@ -923,8 +839,8 @@ static bool CompactifyDocumentCollection(TRI_collection_t* document) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (totalSize >= COMPACTOR_MIN_SIZE &&
|
if (totalSize >= smallDatafileSize() &&
|
||||||
toCompact.size() >= COMPACTOR_MAX_FILES) {
|
toCompact.size() >= maxFiles()) {
|
||||||
// found enough files to compact
|
// found enough files to compact
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -951,7 +867,7 @@ static bool CompactifyDocumentCollection(TRI_collection_t* document) {
|
||||||
document->setCompactionStatus(reason);
|
document->setCompactionStatus(reason);
|
||||||
|
|
||||||
document->setNextCompactionStartIndex(start);
|
document->setNextCompactionStartIndex(start);
|
||||||
CompactifyDatafiles(document, toCompact);
|
compactDatafiles(document, toCompact);
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
@ -1086,6 +1002,11 @@ CompactorThread::CompactorThread(TRI_vocbase_t* vocbase)
|
||||||
|
|
||||||
CompactorThread::~CompactorThread() { shutdown(); }
|
CompactorThread::~CompactorThread() { shutdown(); }
|
||||||
|
|
||||||
|
void CompactorThread::signal() {
|
||||||
|
CONDITION_LOCKER(locker, _condition);
|
||||||
|
locker.signal();
|
||||||
|
}
|
||||||
|
|
||||||
void CompactorThread::run() {
|
void CompactorThread::run() {
|
||||||
std::vector<TRI_vocbase_col_t*> collections;
|
std::vector<TRI_vocbase_col_t*> collections;
|
||||||
|
|
||||||
|
@ -1093,6 +1014,7 @@ void CompactorThread::run() {
|
||||||
int numCompacted = 0;
|
int numCompacted = 0;
|
||||||
// keep initial _state value as vocbase->_state might change during
|
// keep initial _state value as vocbase->_state might change during
|
||||||
// compaction loop
|
// compaction loop
|
||||||
|
TRI_vocbase_t::State state = _vocbase->state();
|
||||||
|
|
||||||
{
|
{
|
||||||
// check if compaction is currently disallowed
|
// check if compaction is currently disallowed
|
||||||
|
@ -1100,8 +1022,6 @@ void CompactorThread::run() {
|
||||||
|
|
||||||
if (compactionLocker.isLocked() && !HasActiveBlockers(_vocbase)) {
|
if (compactionLocker.isLocked() && !HasActiveBlockers(_vocbase)) {
|
||||||
// compaction is currently allowed
|
// compaction is currently allowed
|
||||||
double now = TRI_microtime();
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
// copy all collections
|
// copy all collections
|
||||||
collections = _vocbase->collections();
|
collections = _vocbase->collections();
|
||||||
|
@ -1143,8 +1063,8 @@ void CompactorThread::run() {
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
if (document->_lastCompaction + COMPACTOR_COLLECTION_INTERVAL <=
|
double const now = TRI_microtime();
|
||||||
now) {
|
if (document->_lastCompaction + compactionCollectionInterval() <= now) {
|
||||||
auto ce = document->ditches()->createCompactionDitch(__FILE__,
|
auto ce = document->ditches()->createCompactionDitch(__FILE__,
|
||||||
__LINE__);
|
__LINE__);
|
||||||
|
|
||||||
|
@ -1153,7 +1073,7 @@ void CompactorThread::run() {
|
||||||
LOG_TOPIC(WARN, Logger::COMPACTOR) << "out of memory when trying to create compaction ditch";
|
LOG_TOPIC(WARN, Logger::COMPACTOR) << "out of memory when trying to create compaction ditch";
|
||||||
} else {
|
} else {
|
||||||
try {
|
try {
|
||||||
worked = CompactifyDocumentCollection(document);
|
worked = compactCollection(document);
|
||||||
|
|
||||||
if (!worked) {
|
if (!worked) {
|
||||||
// set compaction stamp
|
// set compaction stamp
|
||||||
|
@ -1190,19 +1110,19 @@ void CompactorThread::run() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (isStopping()) {
|
|
||||||
// server shutdown
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (numCompacted > 0) {
|
if (numCompacted > 0) {
|
||||||
// no need to sleep long or go into wait state if we worked.
|
// no need to sleep long or go into wait state if we worked.
|
||||||
// maybe there's still work left
|
// maybe there's still work left
|
||||||
usleep(1000);
|
usleep(1000);
|
||||||
} else {
|
} else if (state != TRI_vocbase_t::State::SHUTDOWN_COMPACTOR && _vocbase->state() == TRI_vocbase_t::State::NORMAL) {
|
||||||
// only sleep while server is still running
|
// only sleep while server is still running
|
||||||
CONDITION_LOCKER(locker, _condition);
|
CONDITION_LOCKER(locker, _condition);
|
||||||
_condition.wait(COMPACTOR_INTERVAL);
|
_condition.wait(compactionSleepTime());
|
||||||
|
}
|
||||||
|
|
||||||
|
if (state == TRI_vocbase_t::State::SHUTDOWN_COMPACTOR) {
|
||||||
|
// server shutdown
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -29,20 +29,73 @@
|
||||||
#include "Basics/Thread.h"
|
#include "Basics/Thread.h"
|
||||||
#include "VocBase/voc-types.h"
|
#include "VocBase/voc-types.h"
|
||||||
|
|
||||||
|
struct TRI_collection_t;
|
||||||
|
struct TRI_datafile_t;
|
||||||
struct TRI_vocbase_t;
|
struct TRI_vocbase_t;
|
||||||
|
|
||||||
namespace arangodb {
|
namespace arangodb {
|
||||||
|
|
||||||
class CompactorThread : public Thread {
|
class CompactorThread : public Thread {
|
||||||
|
public:
|
||||||
|
/// @brief compaction instruction for a single datafile
|
||||||
|
struct compaction_info_t {
|
||||||
|
TRI_datafile_t* _datafile;
|
||||||
|
bool _keepDeletions;
|
||||||
|
};
|
||||||
|
|
||||||
public:
|
public:
|
||||||
explicit CompactorThread(TRI_vocbase_t* vocbase);
|
explicit CompactorThread(TRI_vocbase_t* vocbase);
|
||||||
~CompactorThread();
|
~CompactorThread();
|
||||||
|
|
||||||
void signal() { _condition.signal(); }
|
void signal();
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
void run() override;
|
void run() override;
|
||||||
|
|
||||||
|
private:
|
||||||
|
/// @brief compact the specified datafiles
|
||||||
|
void compactDatafiles(TRI_collection_t*, std::vector<compaction_info_t> const&);
|
||||||
|
|
||||||
|
/// @brief checks all datafiles of a collection
|
||||||
|
bool compactCollection(TRI_collection_t*);
|
||||||
|
|
||||||
|
/// @brief wait time between compaction runs when idle
|
||||||
|
static constexpr unsigned compactionSleepTime() { return 1000 * 1000; }
|
||||||
|
|
||||||
|
/// @brief compaction interval in seconds
|
||||||
|
static constexpr double compactionCollectionInterval() { return 10.0; }
|
||||||
|
|
||||||
|
/// @brief maximum number of files to compact and concat
|
||||||
|
static constexpr unsigned maxFiles() { return 3; }
|
||||||
|
|
||||||
|
/// @brief maximum multiple of journal filesize of a compacted file
|
||||||
|
/// a value of 3 means that the maximum filesize of the compacted file is
|
||||||
|
/// 3 x (collection->journalSize)
|
||||||
|
static constexpr unsigned maxSizeFactor() { return 3; }
|
||||||
|
|
||||||
|
static constexpr unsigned smallDatafileSize() { return 128 * 1024; }
|
||||||
|
|
||||||
|
/// @brief maximum filesize of resulting compacted file
|
||||||
|
static constexpr uint64_t maxResultFilesize() { return 128 * 1024 * 1024; }
|
||||||
|
|
||||||
|
/// @brief minimum number of deletion marker in file from which on we will
|
||||||
|
/// compact it if nothing else qualifies file for compaction
|
||||||
|
static constexpr int64_t deadNumberThreshold() { return 16384; }
|
||||||
|
|
||||||
|
/// @brief minimum size of dead data (in bytes) in a datafile that will make
|
||||||
|
/// the datafile eligible for compaction at all.
|
||||||
|
/// Any datafile with less dead data than the threshold will not become a
|
||||||
|
/// candidate for compaction.
|
||||||
|
static constexpr int64_t deadSizeThreshold() { return 128 * 1024; }
|
||||||
|
|
||||||
|
/// @brief percentage of dead documents in a datafile that will trigger the
|
||||||
|
/// compaction
|
||||||
|
/// for example, if the collection contains 800 bytes of alive and 400 bytes of
|
||||||
|
/// dead documents, the share of the dead documents is 400 / (400 + 800) = 33 %.
|
||||||
|
/// if this value if higher than the threshold, the datafile will be compacted
|
||||||
|
static constexpr double deadShare() { return 0.1; }
|
||||||
|
|
||||||
|
|
||||||
private:
|
private:
|
||||||
TRI_vocbase_t* _vocbase;
|
TRI_vocbase_t* _vocbase;
|
||||||
|
|
||||||
|
|
|
@ -510,11 +510,10 @@ int TRI_vocbase_t::renameCollectionWorker(TRI_vocbase_col_t* collection,
|
||||||
return TRI_set_errno(TRI_ERROR_ARANGO_DUPLICATE_NAME);
|
return TRI_set_errno(TRI_ERROR_ARANGO_DUPLICATE_NAME);
|
||||||
}
|
}
|
||||||
|
|
||||||
// .............................................................................
|
if (collection->_status == TRI_VOC_COL_STATUS_UNLOADED) {
|
||||||
// collection is unloaded
|
// collection is unloaded
|
||||||
// .............................................................................
|
collection->_name = newName;
|
||||||
|
|
||||||
else if (collection->_status == TRI_VOC_COL_STATUS_UNLOADED) {
|
|
||||||
try {
|
try {
|
||||||
arangodb::VocbaseCollectionInfo info =
|
arangodb::VocbaseCollectionInfo info =
|
||||||
arangodb::VocbaseCollectionInfo::fromFile(collection->path(),
|
arangodb::VocbaseCollectionInfo::fromFile(collection->path(),
|
||||||
|
@ -530,17 +529,12 @@ int TRI_vocbase_t::renameCollectionWorker(TRI_vocbase_col_t* collection,
|
||||||
} catch (arangodb::basics::Exception const& e) {
|
} catch (arangodb::basics::Exception const& e) {
|
||||||
return TRI_set_errno(e.code());
|
return TRI_set_errno(e.code());
|
||||||
}
|
}
|
||||||
|
|
||||||
// fall-through intentional
|
// fall-through intentional
|
||||||
}
|
}
|
||||||
|
|
||||||
// .............................................................................
|
|
||||||
// collection is loaded
|
|
||||||
// .............................................................................
|
|
||||||
|
|
||||||
else if (collection->_status == TRI_VOC_COL_STATUS_LOADED ||
|
else if (collection->_status == TRI_VOC_COL_STATUS_LOADED ||
|
||||||
collection->_status == TRI_VOC_COL_STATUS_UNLOADING ||
|
collection->_status == TRI_VOC_COL_STATUS_UNLOADING ||
|
||||||
collection->_status == TRI_VOC_COL_STATUS_LOADING) {
|
collection->_status == TRI_VOC_COL_STATUS_LOADING) {
|
||||||
|
// collection is loaded
|
||||||
int res = collection->_collection->rename(newName);
|
int res = collection->_collection->rename(newName);
|
||||||
|
|
||||||
if (res != TRI_ERROR_NO_ERROR) {
|
if (res != TRI_ERROR_NO_ERROR) {
|
||||||
|
@ -548,38 +542,27 @@ int TRI_vocbase_t::renameCollectionWorker(TRI_vocbase_col_t* collection,
|
||||||
}
|
}
|
||||||
// fall-through intentional
|
// fall-through intentional
|
||||||
}
|
}
|
||||||
|
|
||||||
// .............................................................................
|
|
||||||
// unknown status
|
|
||||||
// .............................................................................
|
|
||||||
|
|
||||||
else {
|
else {
|
||||||
|
// unknown status
|
||||||
return TRI_set_errno(TRI_ERROR_INTERNAL);
|
return TRI_set_errno(TRI_ERROR_INTERNAL);
|
||||||
}
|
}
|
||||||
|
|
||||||
// .............................................................................
|
|
||||||
// rename and release locks
|
|
||||||
// .............................................................................
|
|
||||||
|
|
||||||
_collectionsByName.erase(oldName);
|
|
||||||
|
|
||||||
collection->_name = newName;
|
|
||||||
|
|
||||||
// this shouldn't fail, as we removed an element above so adding one should
|
|
||||||
// be ok
|
|
||||||
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
|
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
|
||||||
auto it2 =
|
auto it2 =
|
||||||
#endif
|
#endif
|
||||||
_collectionsByName.emplace(newName, collection);
|
_collectionsByName.emplace(newName, collection);
|
||||||
TRI_ASSERT(it2.second);
|
TRI_ASSERT(it2.second);
|
||||||
|
|
||||||
|
_collectionsByName.erase(oldName);
|
||||||
|
collection->_name = newName;
|
||||||
|
|
||||||
TRI_ASSERT(_collectionsByName.size() == _collectionsById.size());
|
TRI_ASSERT(_collectionsByName.size() == _collectionsById.size());
|
||||||
} // _colllectionsLock
|
} // _colllectionsLock
|
||||||
|
|
||||||
// to prevent caching returning now invalid old collection name in db's
|
// to prevent caching returning now invalid old collection name in db's
|
||||||
// NamedPropertyAccessor,
|
// NamedPropertyAccessor,
|
||||||
// i.e. db.<old-collection-name>
|
// i.e. db.<old-collection-name>
|
||||||
collection->_internalVersion++;
|
++collection->_internalVersion;
|
||||||
|
|
||||||
// invalidate all entries for the two collections
|
// invalidate all entries for the two collections
|
||||||
arangodb::aql::QueryCache::instance()->invalidate(
|
arangodb::aql::QueryCache::instance()->invalidate(
|
||||||
|
@ -997,9 +980,9 @@ void TRI_vocbase_t::shutdown() {
|
||||||
unloadCollection(collection, true);
|
unloadCollection(collection, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
setState(TRI_vocbase_t::State::SHUTDOWN);
|
// this will signal the compactor thread to do one last iteration
|
||||||
|
setState(TRI_vocbase_t::State::SHUTDOWN_COMPACTOR);
|
||||||
|
|
||||||
// signal the compactor thread to do one last iteration
|
|
||||||
if (_compactorThread != nullptr) {
|
if (_compactorThread != nullptr) {
|
||||||
_compactorThread->beginShutdown();
|
_compactorThread->beginShutdown();
|
||||||
_compactorThread->signal();
|
_compactorThread->signal();
|
||||||
|
@ -1011,7 +994,9 @@ void TRI_vocbase_t::shutdown() {
|
||||||
_compactorThread.reset();
|
_compactorThread.reset();
|
||||||
}
|
}
|
||||||
|
|
||||||
// signal the cleanup thread to do one last iteration
|
// this will signal the cleanup thread to do one last iteration
|
||||||
|
setState(TRI_vocbase_t::State::SHUTDOWN_CLEANUP);
|
||||||
|
|
||||||
if (_cleanupThread != nullptr) {
|
if (_cleanupThread != nullptr) {
|
||||||
_cleanupThread->beginShutdown();
|
_cleanupThread->beginShutdown();
|
||||||
_cleanupThread->signal();
|
_cleanupThread->signal();
|
||||||
|
@ -1524,6 +1509,7 @@ TRI_vocbase_t::TRI_vocbase_t(TRI_vocbase_type_e type, TRI_voc_tick_t id,
|
||||||
_name(name),
|
_name(name),
|
||||||
_type(type),
|
_type(type),
|
||||||
_refCount(0),
|
_refCount(0),
|
||||||
|
_state(TRI_vocbase_t::State::NORMAL),
|
||||||
_isOwnAppsDirectory(true),
|
_isOwnAppsDirectory(true),
|
||||||
_deadlockDetector(false),
|
_deadlockDetector(false),
|
||||||
_userStructures(nullptr) {
|
_userStructures(nullptr) {
|
||||||
|
|
|
@ -156,9 +156,9 @@ struct TRI_vocbase_t {
|
||||||
|
|
||||||
/// @brief database state
|
/// @brief database state
|
||||||
enum class State {
|
enum class State {
|
||||||
INACTIVE = 0,
|
NORMAL = 0,
|
||||||
NORMAL = 1,
|
SHUTDOWN_COMPACTOR = 1,
|
||||||
SHUTDOWN = 2,
|
SHUTDOWN_CLEANUP = 2,
|
||||||
FAILED_VERSION = 3
|
FAILED_VERSION = 3
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
@ -732,7 +732,7 @@ function CompactionSuite () {
|
||||||
waited += 2;
|
waited += 2;
|
||||||
|
|
||||||
fig = c1.figures();
|
fig = c1.figures();
|
||||||
if (fig["dead"]["deletion"] === 0) {
|
if (fig["dead"]["deletion"] === 0 && fig["dead"]["count"] === 0) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue