1
0
Fork 0

auto-flush more exotic column families from time to time

This commit is contained in:
jsteemann 2019-11-11 14:53:20 +01:00
parent 9117a2b14f
commit ed1651ce40
4 changed files with 48 additions and 9 deletions

View File

@ -22,6 +22,7 @@
#include "RocksDBBackgroundThread.h"
#include "Basics/ConditionLocker.h"
#include "Random/RandomGenerator.h"
#include "Replication/ReplicationClients.h"
#include "RestServer/DatabaseFeature.h"
#include "RocksDBEngine/RocksDBCommon.h"
@ -33,7 +34,17 @@
using namespace arangodb;
RocksDBBackgroundThread::RocksDBBackgroundThread(RocksDBEngine& eng, double interval)
: Thread(eng.server(), "RocksDBThread"), _engine(eng), _interval(interval) {}
: Thread(eng.server(), "RocksDBThread"),
_engine(eng),
_interval(interval),
_nextFlushTime(TRI_microtime() + 10.0 * 60.0 + RandomGenerator::interval(uint32_t(120))) {
// initial column family flush is around 10 minutes after startup, with a bit of
// random delay. The random delay is used to prevent all servers from flushing their
// data at the very same time. The 10 minute offset is used to prevent
// long startup delays, and also to prevent many repeated flushes in case there are
// startup errors and thus restarts. after the initial flush shortly after the
// server start, we will only flush every few hours to reduce ongoing I/O burden
}
RocksDBBackgroundThread::~RocksDBBackgroundThread() { shutdown(); }
@ -101,13 +112,22 @@ void RocksDBBackgroundThread::run() {
// will not have a chance to reconnect to a restarted master in
// time so the master may purge WAL files that replication slaves
// would still like to peek into
if (TRI_microtime() >= startTime + _engine.pruneWaitTimeInitial()) {
double const now = TRI_microtime();
if (now >= startTime + _engine.pruneWaitTimeInitial()) {
// determine which WAL files can be pruned
_engine.determinePrunableWalFiles(minTick);
// and then prune them when they expired
_engine.pruneWalFiles();
}
// flush column families every now and then, to prevent data from
// exotic column column families to reside only in memtables and thus
// block WAL file collection
if (now >= _nextFlushTime) {
_engine.flushColumnFamilies(false);
// flush at most every 12 hours
_nextFlushTime = now + 12.0 * 60.0 * 60.0;
}
} catch (std::exception const& ex) {
LOG_TOPIC("8236f", WARN, Logger::ENGINES)
<< "caught exception in rocksdb background thread: " << ex.what();

View File

@ -33,19 +33,13 @@ class RocksDBEngine;
class RocksDBBackgroundThread final : public Thread {
public:
//////////////////////////////////////////////////////////////////////////////
/// @brief engine pointer
//////////////////////////////////////////////////////////////////////////////
RocksDBEngine& _engine;
//////////////////////////////////////////////////////////////////////////////
/// @brief interval in which we will run
//////////////////////////////////////////////////////////////////////////////
double const _interval;
//////////////////////////////////////////////////////////////////////////////
/// @brief condition variable for heartbeat
//////////////////////////////////////////////////////////////////////////////
arangodb::basics::ConditionVariable _condition;
RocksDBBackgroundThread(RocksDBEngine& eng, double interval);
@ -55,6 +49,13 @@ class RocksDBBackgroundThread final : public Thread {
protected:
void run() override;
private:
/// @brief next time point for flushing column families
/// note that we need to flush column families every now and then
/// to prevent data from exotic column families to only reside in
/// memtables and block WAL file collection
double _nextFlushTime;
};
} // namespace arangodb

View File

@ -2432,6 +2432,21 @@ void RocksDBEngine::releaseTick(TRI_voc_tick_t tick) {
}
}
void RocksDBEngine::flushColumnFamilies(bool wait) {
// we are intentionally not flushing cfs "document" and "primary", as these
// are expected to be written to every now and then by statistics etc.
LOG_TOPIC("9354c", DEBUG, Logger::ENGINES) << "flushing column families";
rocksdb::FlushOptions options;
options.wait = wait;
_db->Flush(options, RocksDBColumnFamily::definitions());
_db->Flush(options, RocksDBColumnFamily::edge());
_db->Flush(options, RocksDBColumnFamily::vpack());
_db->Flush(options, RocksDBColumnFamily::geo());
_db->Flush(options, RocksDBColumnFamily::fulltext());
}
} // namespace arangodb
// -----------------------------------------------------------------------------

View File

@ -395,6 +395,9 @@ public:
/// note: returns a nullptr if automatic syncing is turned off!
RocksDBSyncThread* syncThread() const { return _syncThread.get(); }
/// @brief flushes most of the column families
void flushColumnFamilies(bool wait);
static arangodb::Result registerRecoveryHelper(std::shared_ptr<RocksDBRecoveryHelper> helper);
static std::vector<std::shared_ptr<RocksDBRecoveryHelper>> const& recoveryHelpers();