mirror of https://gitee.com/bigwinds/arangodb
finalize `--rocksdb.sync-interval` feature (#5856)
This commit is contained in:
parent
ffad59db86
commit
1a128799f2
11
CHANGELOG
11
CHANGELOG
|
@ -6,6 +6,17 @@ devel
|
|||
This change will be visible only on systems which allow assigning names to
|
||||
threads.
|
||||
|
||||
* added configuration option `--rocksdb.sync-interval`
|
||||
|
||||
This option specifies interval (in milliseconds) that ArangoDB will use to
|
||||
automatically synchronize data in RocksDB's write-ahead log (WAL) files to
|
||||
disk. Automatic syncs will only be performed for not-yet synchronized data,
|
||||
and only for operations that have been executed without the *waitForSync*
|
||||
attribute.
|
||||
|
||||
Automatic synchronization is performed by a background thread. The default
|
||||
sync interval is 100 milliseconds.
|
||||
|
||||
* added AQL functions `TO_BASE64`, `TO_HEX`, `ENCODE_URI_COMPONENT` and `SOUNDEX`
|
||||
|
||||
* PR #5857: RocksDB engine would frequently request a new DelayToken. This caused
|
||||
|
|
|
@ -371,7 +371,7 @@ in a specific state on startup. the options for this value are:
|
|||
- any: any directory state allowed
|
||||
|
||||
|
||||
### Journal size
|
||||
### Journal size (MMFiles only)
|
||||
@startDocuBlock databaseMaximalJournalSize
|
||||
|
||||
|
||||
|
@ -613,4 +613,4 @@ an **highly experimental** feature and it is to be expected that certain functio
|
|||
some AQL functions etc) will be missing or severly broken. Nevertheless you may whish to reduce the footprint of ArangoDB by disabling V8.
|
||||
|
||||
This option is expected to **only** work reliably on a _Single-Server_, _Agency_ or _Active-Failover_ setup. Do not try to use
|
||||
this feature on a _Coordinator_, or _DBServer_
|
||||
this feature on a _Coordinator_, or _DBServer_
|
||||
|
|
|
@ -215,3 +215,10 @@ is committed automatically and a new transaction is started.
|
|||
If enabled, throttles the ingest rate of writes if necessary to reduce chances
|
||||
of compactions getting too far behind and blocking incoming writes. This option
|
||||
is `true` by default.
|
||||
|
||||
`--rocksdb.sync-interval`
|
||||
|
||||
The interval (in milliseconds) that ArangoDB will use to automatically
|
||||
synchronize data in RocksDB's write-ahead logs to disk. Automatic syncs will
|
||||
only be performed for not-yet synchronized data, and only for operations that
|
||||
have been executed without the *waitForSync* attribute.
|
||||
|
|
|
@ -15,6 +15,13 @@ specifying the following configuration options:
|
|||
|
||||
@startDocuBlock WalLogfileSyncInterval
|
||||
|
||||
`--rocksdb.sync-interval`
|
||||
|
||||
The interval (in milliseconds) that ArangoDB will use to automatically
|
||||
synchronize data in RocksDB's write-ahead logs to disk. Automatic syncs will
|
||||
only be performed for not-yet synchronized data, and only for operations that
|
||||
have been executed without the *waitForSync* attribute.
|
||||
|
||||
|
||||
Per-collection configuration
|
||||
----------------------------
|
||||
|
@ -33,16 +40,16 @@ Many data-modification operations and also ArangoDB's transactions allow to spec
|
|||
a *waitForSync* attribute, which when set ensures the operation data has been
|
||||
synchronized to disk when the operation returns.
|
||||
|
||||
Disk-Usage Configuration
|
||||
------------------------
|
||||
Disk-Usage Configuration (MMFiles engine)
|
||||
-----------------------------------------
|
||||
|
||||
The amount of disk space used by ArangoDB is determined by a few configuration
|
||||
The amount of disk space used by the MMFiles engine is determined by a few configuration
|
||||
options.
|
||||
|
||||
Global Configuration
|
||||
--------------------
|
||||
|
||||
The total amount of disk storage required by ArangoDB is determined by the size of
|
||||
The total amount of disk storage required by the MMFiles engine is determined by the size of
|
||||
the write-ahead logfiles plus the sizes of the collection journals and datafiles.
|
||||
|
||||
There are the following options for configuring the number and sizes of the write-ahead
|
||||
|
|
|
@ -5,8 +5,7 @@
|
|||
|
||||
The interval (in milliseconds) that ArangoDB will use to automatically
|
||||
synchronize data in its write-ahead logs to disk. Automatic syncs will
|
||||
only
|
||||
be performed for not-yet synchronized data, and only for operations that
|
||||
have been executed without the *waitForSync* attribute.
|
||||
only be performed for not-yet synchronized data, and only for operations
|
||||
that have been executed without the *waitForSync* attribute.
|
||||
@endDocuBlock
|
||||
|
||||
|
|
|
@ -215,7 +215,7 @@ Result MMFilesEngine::dropDatabase(TRI_vocbase_t& database) {
|
|||
return dropDatabaseDirectory(databaseDirectory(database.id()));
|
||||
}
|
||||
|
||||
// add the storage engine's specifc options to the global list of options
|
||||
// add the storage engine's specific options to the global list of options
|
||||
void MMFilesEngine::collectOptions(std::shared_ptr<options::ProgramOptions>) {}
|
||||
|
||||
// validate the storage engine's specific options
|
||||
|
|
|
@ -75,7 +75,7 @@ class MMFilesEngine final : public StorageEngine {
|
|||
// inherited from ApplicationFeature
|
||||
// ---------------------------------
|
||||
|
||||
// add the storage engine's specifc options to the global list of options
|
||||
// add the storage engine's specific options to the global list of options
|
||||
void collectOptions(std::shared_ptr<options::ProgramOptions>) override;
|
||||
|
||||
// validate the storage engine's specific options
|
||||
|
|
|
@ -69,6 +69,7 @@ set(ROCKSDB_SOURCES
|
|||
RocksDBEngine/RocksDBRestReplicationHandler.cpp
|
||||
RocksDBEngine/RocksDBRestWalHandler.cpp
|
||||
RocksDBEngine/RocksDBSettingsManager.cpp
|
||||
RocksDBEngine/RocksDBSyncThread.cpp
|
||||
RocksDBEngine/RocksDBTransactionCollection.cpp
|
||||
RocksDBEngine/RocksDBTransactionState.cpp
|
||||
RocksDBEngine/RocksDBThrottle.cpp
|
||||
|
|
|
@ -61,6 +61,7 @@
|
|||
#include "RocksDBEngine/RocksDBReplicationTailing.h"
|
||||
#include "RocksDBEngine/RocksDBRestHandlers.h"
|
||||
#include "RocksDBEngine/RocksDBSettingsManager.h"
|
||||
#include "RocksDBEngine/RocksDBSyncThread.h"
|
||||
#include "RocksDBEngine/RocksDBThrottle.h"
|
||||
#include "RocksDBEngine/RocksDBTransactionCollection.h"
|
||||
#include "RocksDBEngine/RocksDBTransactionContextData.h"
|
||||
|
@ -111,6 +112,9 @@ rocksdb::ColumnFamilyHandle* RocksDBColumnFamily::_geo(nullptr);
|
|||
rocksdb::ColumnFamilyHandle* RocksDBColumnFamily::_fulltext(nullptr);
|
||||
std::vector<rocksdb::ColumnFamilyHandle*> RocksDBColumnFamily::_allHandles;
|
||||
|
||||
// minimum value for --rocksdb.sync-interval (in ms)
|
||||
static constexpr uint64_t minSyncInterval = 5;
|
||||
|
||||
static constexpr uint64_t databaseIdForGlobalApplier = 0;
|
||||
|
||||
// handles for recovery helpers
|
||||
|
@ -136,6 +140,7 @@ RocksDBEngine::RocksDBEngine(application_features::ApplicationServer* server)
|
|||
_pruneWaitTime(10.0),
|
||||
_pruneWaitTimeInitial(180.0),
|
||||
_releasedTick(0),
|
||||
_syncInterval(100),
|
||||
_useThrottle(true) {
|
||||
startsAfter("BasicsPhase");
|
||||
|
||||
|
@ -151,34 +156,54 @@ RocksDBEngine::~RocksDBEngine() { shutdownRocksDBInstance(); }
|
|||
/// shuts down the RocksDB instance. this is called from unprepare
|
||||
/// and the dtor
|
||||
void RocksDBEngine::shutdownRocksDBInstance() noexcept {
|
||||
if (_db) {
|
||||
// turn off RocksDBThrottle, and release our pointers to it
|
||||
if (nullptr != _listener.get()) {
|
||||
_listener->StopThread();
|
||||
} // if
|
||||
|
||||
for (rocksdb::ColumnFamilyHandle* h : RocksDBColumnFamily::_allHandles) {
|
||||
_db->DestroyColumnFamilyHandle(h);
|
||||
}
|
||||
|
||||
// now prune all obsolete WAL files
|
||||
try {
|
||||
determinePrunableWalFiles(0);
|
||||
pruneWalFiles();
|
||||
} catch (...) {
|
||||
// this is allowed to go wrong on shutdown
|
||||
// we must not throw an exception from here
|
||||
}
|
||||
|
||||
delete _db;
|
||||
_db = nullptr;
|
||||
if (_db == nullptr) {
|
||||
return;
|
||||
}
|
||||
|
||||
// turn off RocksDBThrottle, and release our pointers to it
|
||||
if (nullptr != _listener.get()) {
|
||||
_listener->StopThread();
|
||||
} // if
|
||||
|
||||
for (rocksdb::ColumnFamilyHandle* h : RocksDBColumnFamily::_allHandles) {
|
||||
_db->DestroyColumnFamilyHandle(h);
|
||||
}
|
||||
|
||||
// now prune all obsolete WAL files
|
||||
try {
|
||||
determinePrunableWalFiles(0);
|
||||
pruneWalFiles();
|
||||
} catch (...) {
|
||||
// this is allowed to go wrong on shutdown
|
||||
// we must not throw an exception from here
|
||||
}
|
||||
|
||||
try {
|
||||
// do a final WAL sync here before shutting down
|
||||
Result res = RocksDBSyncThread::sync(_db->GetBaseDB());
|
||||
if (res.fail()) {
|
||||
LOG_TOPIC(WARN, Logger::ROCKSDB) << "could not sync RocksDB WAL: " << res.errorMessage();
|
||||
}
|
||||
|
||||
rocksdb::Status status = _db->Close();
|
||||
|
||||
if (!status.ok()) {
|
||||
Result res = rocksutils::convertStatus(status);
|
||||
LOG_TOPIC(ERR, Logger::ROCKSDB) << "could not shutdown RocksDB: " << res.errorMessage();
|
||||
}
|
||||
} catch (...) {
|
||||
// this is allowed to go wrong on shutdown
|
||||
// we must not throw an exception from here
|
||||
}
|
||||
|
||||
delete _db;
|
||||
_db = nullptr;
|
||||
}
|
||||
|
||||
// inherited from ApplicationFeature
|
||||
// ---------------------------------
|
||||
|
||||
// add the storage engine's specifc options to the global list of options
|
||||
// add the storage engine's specific options to the global list of options
|
||||
void RocksDBEngine::collectOptions(
|
||||
std::shared_ptr<options::ProgramOptions> options) {
|
||||
options->addSection("rocksdb", "RocksDB engine specific configuration");
|
||||
|
@ -199,6 +224,10 @@ void RocksDBEngine::collectOptions(
|
|||
"when this number of "
|
||||
"operations is reached in a transaction",
|
||||
new UInt64Parameter(&_intermediateCommitCount));
|
||||
|
||||
options->addOption("--rocksdb.sync-interval",
|
||||
"interval for automatic, non-requested disk syncs (in milliseconds)",
|
||||
new UInt64Parameter(&_syncInterval));
|
||||
|
||||
options->addOption("--rocksdb.wal-file-timeout",
|
||||
"timeout after which unused WAL files are deleted",
|
||||
|
@ -225,6 +254,12 @@ void RocksDBEngine::validateOptions(
|
|||
#ifdef USE_ENTERPRISE
|
||||
validateEnterpriseOptions(options);
|
||||
#endif
|
||||
|
||||
if (_syncInterval < minSyncInterval) {
|
||||
LOG_TOPIC(FATAL, arangodb::Logger::FIXME) << "invalid value for --rocksdb.sync-interval. Please use a value "
|
||||
"of at least " << minSyncInterval;
|
||||
FATAL_ERROR_EXIT();
|
||||
}
|
||||
}
|
||||
|
||||
// preparation phase for storage engine. can be used for internal setup.
|
||||
|
@ -565,6 +600,13 @@ void RocksDBEngine::start() {
|
|||
|
||||
// only enable logger after RocksDB start
|
||||
logger->enable();
|
||||
|
||||
_syncThread.reset(
|
||||
new RocksDBSyncThread(this, std::chrono::milliseconds(_syncInterval)));
|
||||
if (!_syncThread->start()) {
|
||||
LOG_TOPIC(FATAL, Logger::ENGINES) << "could not start rocksdb sync thread";
|
||||
FATAL_ERROR_EXIT();
|
||||
}
|
||||
|
||||
TRI_ASSERT(_db != nullptr);
|
||||
_settingsManager.reset(new RocksDBSettingsManager(_db));
|
||||
|
@ -617,10 +659,20 @@ void RocksDBEngine::stop() {
|
|||
|
||||
// wait until background thread stops
|
||||
while (_backgroundThread->isRunning()) {
|
||||
std::this_thread::sleep_for(std::chrono::microseconds(10000));
|
||||
std::this_thread::yield();
|
||||
}
|
||||
_backgroundThread.reset();
|
||||
}
|
||||
|
||||
if (_syncThread) {
|
||||
_syncThread->beginShutdown();
|
||||
|
||||
// wait until sync thread stops
|
||||
while (_syncThread->isRunning()) {
|
||||
std::this_thread::yield();
|
||||
}
|
||||
_syncThread.reset();
|
||||
}
|
||||
}
|
||||
|
||||
void RocksDBEngine::unprepare() {
|
||||
|
@ -1475,26 +1527,22 @@ RocksDBEngine::IndexTriple RocksDBEngine::mapObjectToIndex(
|
|||
|
||||
Result RocksDBEngine::flushWal(bool waitForSync, bool waitForCollector,
|
||||
bool /*writeShutdownFile*/) {
|
||||
rocksdb::Status status;
|
||||
#ifndef _WIN32
|
||||
// SyncWAL always reports "not implemented" on Windows
|
||||
status = _db->GetBaseDB()->SyncWAL();
|
||||
if (!status.ok()) {
|
||||
return rocksutils::convertStatus(status);
|
||||
if (_syncThread) {
|
||||
_syncThread->syncWal();
|
||||
}
|
||||
#endif
|
||||
|
||||
if (waitForCollector) {
|
||||
rocksdb::FlushOptions flushOptions;
|
||||
flushOptions.wait = waitForSync;
|
||||
|
||||
for (auto cf : RocksDBColumnFamily::_allHandles) {
|
||||
status = _db->GetBaseDB()->Flush(flushOptions, cf);
|
||||
rocksdb::Status status = _db->GetBaseDB()->Flush(flushOptions, cf);
|
||||
if (!status.ok()) {
|
||||
return rocksutils::convertStatus(status);
|
||||
}
|
||||
}
|
||||
}
|
||||
return TRI_ERROR_NO_ERROR;
|
||||
return Result();
|
||||
}
|
||||
|
||||
void RocksDBEngine::waitForEstimatorSync(
|
||||
|
|
|
@ -55,6 +55,7 @@ class RocksDBLogValue;
|
|||
class RocksDBRecoveryHelper;
|
||||
class RocksDBReplicationManager;
|
||||
class RocksDBSettingsManager;
|
||||
class RocksDBSyncThread;
|
||||
class RocksDBThrottle; // breaks tons if RocksDBThrottle.h included here
|
||||
class RocksDBVPackComparator;
|
||||
class RocksDBWalAccess;
|
||||
|
@ -83,7 +84,7 @@ class RocksDBEngine final : public StorageEngine {
|
|||
// inherited from ApplicationFeature
|
||||
// ---------------------------------
|
||||
|
||||
// add the storage engine's specifc options to the global list of options
|
||||
// add the storage engine's specific options to the global list of options
|
||||
void collectOptions(std::shared_ptr<options::ProgramOptions>) override;
|
||||
// validate the storage engine's specific options
|
||||
void validateOptions(std::shared_ptr<options::ProgramOptions>) override;
|
||||
|
@ -391,6 +392,10 @@ class RocksDBEngine final : public StorageEngine {
|
|||
public:
|
||||
static std::string const EngineName;
|
||||
static std::string const FeatureName;
|
||||
|
||||
rocksdb::Options const& rocksDBOptions() const {
|
||||
return _options;
|
||||
}
|
||||
|
||||
/// @brief recovery manager
|
||||
RocksDBSettingsManager* settingsManager() const {
|
||||
|
@ -403,6 +408,12 @@ class RocksDBEngine final : public StorageEngine {
|
|||
TRI_ASSERT(_replicationManager);
|
||||
return _replicationManager.get();
|
||||
}
|
||||
|
||||
/// @brief returns a pointer to the sync thread
|
||||
RocksDBSyncThread* syncThread() const {
|
||||
TRI_ASSERT(_syncThread);
|
||||
return _syncThread.get();
|
||||
}
|
||||
|
||||
static arangodb::Result registerRecoveryHelper(
|
||||
std::shared_ptr<RocksDBRecoveryHelper> helper);
|
||||
|
@ -457,6 +468,12 @@ class RocksDBEngine final : public StorageEngine {
|
|||
|
||||
// do not release walfiles containing writes later than this
|
||||
TRI_voc_tick_t _releasedTick;
|
||||
|
||||
/// Background thread handling WAL syncing
|
||||
std::unique_ptr<RocksDBSyncThread> _syncThread;
|
||||
|
||||
// WAL sync interval, specified in milliseconds by end user, but uses microseconds internally
|
||||
uint64_t _syncInterval;
|
||||
|
||||
// use write-throttling
|
||||
bool _useThrottle;
|
||||
|
|
|
@ -0,0 +1,140 @@
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// DISCLAIMER
|
||||
///
|
||||
/// Copyright 2017 ArangoDB GmbH, Cologne, Germany
|
||||
///
|
||||
/// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
/// you may not use this file except in compliance with the License.
|
||||
/// You may obtain a copy of the License at
|
||||
///
|
||||
/// http://www.apache.org/licenses/LICENSE-2.0
|
||||
///
|
||||
/// Unless required by applicable law or agreed to in writing, software
|
||||
/// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
/// See the License for the specific language governing permissions and
|
||||
/// limitations under the License.
|
||||
///
|
||||
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
|
||||
///
|
||||
/// @author Jan Steemann
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
#include "RocksDBSyncThread.h"
|
||||
#include "Basics/ConditionLocker.h"
|
||||
#include "Basics/RocksDBUtils.h"
|
||||
#include "Logger/Logger.h"
|
||||
#include "RocksDBEngine/RocksDBEngine.h"
|
||||
|
||||
#include <rocksdb/status.h>
|
||||
#include <rocksdb/utilities/transaction_db.h>
|
||||
|
||||
using namespace arangodb;
|
||||
|
||||
RocksDBSyncThread::RocksDBSyncThread(RocksDBEngine* engine, std::chrono::milliseconds interval)
|
||||
: Thread("RocksDBSync"),
|
||||
_engine(engine),
|
||||
_interval(interval),
|
||||
_lastSyncTime(std::chrono::steady_clock::now()),
|
||||
_lastSequenceNumber(0) {}
|
||||
|
||||
RocksDBSyncThread::~RocksDBSyncThread() { shutdown(); }
|
||||
|
||||
Result RocksDBSyncThread::syncWal() {
|
||||
// note the following line in RocksDB documentation (rocksdb/db.h):
|
||||
// > Currently only works if allow_mmap_writes = false in Options.
|
||||
TRI_ASSERT(!_engine->rocksDBOptions().allow_mmap_writes);
|
||||
|
||||
auto db = _engine->db()->GetBaseDB();
|
||||
|
||||
// set time of last syncing under the lock
|
||||
auto const now = std::chrono::steady_clock::now();
|
||||
{
|
||||
CONDITION_LOCKER(guard, _condition);
|
||||
|
||||
if (now > _lastSyncTime) {
|
||||
// update last sync time...
|
||||
_lastSyncTime = now;
|
||||
}
|
||||
|
||||
auto lastSequenceNumber = db->GetLatestSequenceNumber();
|
||||
|
||||
if (lastSequenceNumber > _lastSequenceNumber) {
|
||||
// update last sequence number
|
||||
_lastSequenceNumber = lastSequenceNumber;
|
||||
}
|
||||
}
|
||||
|
||||
// actual syncing is done without holding the lock
|
||||
return sync(db);
|
||||
}
|
||||
|
||||
Result RocksDBSyncThread::sync(rocksdb::DB* db) {
|
||||
LOG_TOPIC(TRACE, Logger::ROCKSDB) << "syncing RocksDB WAL";
|
||||
|
||||
rocksdb::Status status = db->SyncWAL();
|
||||
if (!status.ok()) {
|
||||
return rocksutils::convertStatus(status);
|
||||
}
|
||||
return Result();
|
||||
}
|
||||
|
||||
void RocksDBSyncThread::beginShutdown() {
|
||||
Thread::beginShutdown();
|
||||
|
||||
// wake up the thread that may be waiting in run()
|
||||
CONDITION_LOCKER(guard, _condition);
|
||||
guard.broadcast();
|
||||
}
|
||||
|
||||
void RocksDBSyncThread::run() {
|
||||
TRI_ASSERT(_engine != nullptr);
|
||||
auto db = _engine->db()->GetBaseDB();
|
||||
|
||||
LOG_TOPIC(TRACE, Logger::ROCKSDB) << "starting RocksDB sync thread with interval " << _interval.count() << " milliseconds";
|
||||
|
||||
while (!isStopping()) {
|
||||
try {
|
||||
auto const now = std::chrono::steady_clock::now();
|
||||
|
||||
{
|
||||
// wait for time to elapse, and after that update last sync time
|
||||
CONDITION_LOCKER(guard, _condition);
|
||||
|
||||
auto const previousLastSequenceNumber = _lastSequenceNumber;
|
||||
auto const previousLastSyncTime = _lastSyncTime;
|
||||
auto const end = _lastSyncTime + _interval;
|
||||
if (end > now) {
|
||||
guard.wait(std::chrono::microseconds(std::chrono::duration_cast<std::chrono::microseconds>(end - now)));
|
||||
}
|
||||
|
||||
if (_lastSyncTime > previousLastSyncTime) {
|
||||
// somebody else outside this thread has called sync...
|
||||
continue;
|
||||
}
|
||||
|
||||
_lastSyncTime = std::chrono::steady_clock::now();
|
||||
|
||||
auto lastSequenceNumber = db->GetLatestSequenceNumber();
|
||||
|
||||
if (lastSequenceNumber == previousLastSequenceNumber) {
|
||||
// nothing to sync, so don't cause unnecessary load
|
||||
continue;
|
||||
}
|
||||
|
||||
_lastSequenceNumber = lastSequenceNumber;
|
||||
}
|
||||
|
||||
// will update last sync time, and do the actual sync
|
||||
Result res = sync(db);
|
||||
|
||||
if (res.fail()) {
|
||||
LOG_TOPIC(WARN, Logger::ROCKSDB) << "could not sync RocksDB WAL: " << res.errorMessage();
|
||||
}
|
||||
} catch (std::exception const& ex) {
|
||||
LOG_TOPIC(ERR, Logger::ROCKSDB) << "caught exception in RocksDBSyncThread: " << ex.what();
|
||||
} catch (...) {
|
||||
LOG_TOPIC(ERR, Logger::ROCKSDB) << "caught unknown exception in RocksDBSyncThread";
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,79 @@
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// DISCLAIMER
|
||||
///
|
||||
/// Copyright 2017 ArangoDB GmbH, Cologne, Germany
|
||||
///
|
||||
/// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
/// you may not use this file except in compliance with the License.
|
||||
/// You may obtain a copy of the License at
|
||||
///
|
||||
/// http://www.apache.org/licenses/LICENSE-2.0
|
||||
///
|
||||
/// Unless required by applicable law or agreed to in writing, software
|
||||
/// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
/// See the License for the specific language governing permissions and
|
||||
/// limitations under the License.
|
||||
///
|
||||
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
|
||||
///
|
||||
/// @author Jan Steemann
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
#ifndef ARANGOD_ROCKSDB_ENGINE_SYNC_THREAD_H
|
||||
#define ARANGOD_ROCKSDB_ENGINE_SYNC_THREAD_H 1
|
||||
|
||||
#include "Basics/Common.h"
|
||||
#include "Basics/ConditionVariable.h"
|
||||
#include "Basics/Result.h"
|
||||
#include "Basics/Thread.h"
|
||||
|
||||
#include <rocksdb/types.h>
|
||||
|
||||
#include <chrono>
|
||||
|
||||
namespace rocksdb {
|
||||
class DB;
|
||||
}
|
||||
|
||||
namespace arangodb {
|
||||
|
||||
class RocksDBEngine;
|
||||
|
||||
class RocksDBSyncThread final : public Thread {
|
||||
public:
|
||||
RocksDBSyncThread(RocksDBEngine* engine, std::chrono::milliseconds interval);
|
||||
|
||||
~RocksDBSyncThread();
|
||||
|
||||
void beginShutdown() override;
|
||||
|
||||
/// @brief updates last sync time and calls the synchronization
|
||||
/// this is the preferred method to call when trying to avoid redundant
|
||||
/// syncs by foreground work and the background sync thread
|
||||
Result syncWal();
|
||||
|
||||
/// @brief unconditionally syncs the RocksDB WAL, static variant
|
||||
static Result sync(rocksdb::DB* db);
|
||||
|
||||
protected:
|
||||
void run() override;
|
||||
|
||||
private:
|
||||
RocksDBEngine* _engine;
|
||||
|
||||
/// @brief the sync interval
|
||||
std::chrono::milliseconds const _interval;
|
||||
|
||||
/// @brief last time we synced the RocksDB WAL
|
||||
std::chrono::time_point<std::chrono::steady_clock> _lastSyncTime;
|
||||
|
||||
/// @brief the last definitely synced RocksDB WAL sequence number
|
||||
rocksdb::SequenceNumber _lastSequenceNumber;
|
||||
|
||||
/// @brief protected _lastSyncTime and _lastSequenceNumber
|
||||
arangodb::basics::ConditionVariable _condition;
|
||||
};
|
||||
} // namespace arangodb
|
||||
|
||||
#endif
|
|
@ -34,7 +34,9 @@
|
|||
#include "RocksDBEngine/RocksDBEngine.h"
|
||||
#include "RocksDBEngine/RocksDBLogValue.h"
|
||||
#include "RocksDBEngine/RocksDBMethods.h"
|
||||
#include "RocksDBEngine/RocksDBSyncThread.h"
|
||||
#include "RocksDBEngine/RocksDBTransactionCollection.h"
|
||||
#include "StorageEngine/EngineSelectorFeature.h"
|
||||
#include "StorageEngine/StorageEngine.h"
|
||||
#include "StorageEngine/TransactionCollection.h"
|
||||
#include "StorageEngine/TransactionManager.h"
|
||||
|
@ -63,7 +65,6 @@ RocksDBTransactionState::RocksDBTransactionState(
|
|||
): TransactionState(vocbase, tid, options),
|
||||
_rocksTransaction(nullptr),
|
||||
_snapshot(nullptr),
|
||||
_rocksWriteOptions(),
|
||||
_rocksReadOptions(),
|
||||
_cacheTx(nullptr),
|
||||
_numCommits(0),
|
||||
|
@ -228,7 +229,7 @@ arangodb::Result RocksDBTransactionState::internalCommit() {
|
|||
return Result(TRI_ERROR_ARANGO_READ_ONLY, "server is in read-only mode");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Result result;
|
||||
if (hasOperations()) {
|
||||
// we are actually going to attempt a commit
|
||||
|
@ -261,12 +262,6 @@ arangodb::Result RocksDBTransactionState::internalCommit() {
|
|||
}
|
||||
#endif
|
||||
|
||||
// set wait for sync flag if required
|
||||
if (waitForSync()) {
|
||||
_rocksWriteOptions.sync = true;
|
||||
_rocksTransaction->SetWriteOptions(_rocksWriteOptions);
|
||||
}
|
||||
|
||||
// prepare for commit on each collection, e.g. place blockers for estimators
|
||||
rocksdb::SequenceNumber preCommitSeq =
|
||||
rocksutils::globalRocksDB()->GetLatestSequenceNumber();
|
||||
|
@ -302,6 +297,13 @@ arangodb::Result RocksDBTransactionState::internalCommit() {
|
|||
collection->commitCounts(id(), latestSeq);
|
||||
committed = true;
|
||||
}
|
||||
|
||||
// wait for sync if required
|
||||
if (waitForSync()) {
|
||||
RocksDBEngine* engine = static_cast<RocksDBEngine*>(EngineSelectorFeature::ENGINE);
|
||||
TRI_ASSERT(engine != nullptr);
|
||||
result = engine->syncThread()->syncWal();
|
||||
}
|
||||
}
|
||||
} else {
|
||||
TRI_ASSERT(_rocksTransaction->GetNumKeys() == 0 &&
|
||||
|
|
|
@ -41,7 +41,6 @@
|
|||
namespace arangodb {
|
||||
namespace rocksutils {
|
||||
|
||||
|
||||
enum StatusHint { none, document, collection, view, index, database, wal };
|
||||
|
||||
arangodb::Result convertStatus(rocksdb::Status const&,
|
||||
|
|
Loading…
Reference in New Issue