1
0
Fork 0

Integrated counters with collections

This commit is contained in:
Simon Grätzer 2017-04-03 13:24:22 +02:00
parent 805314274e
commit cf4294c9ca
12 changed files with 273 additions and 25 deletions

View File

@ -31,6 +31,7 @@
#include "Indexes/IndexIterator.h"
#include "RestServer/DatabaseFeature.h"
#include "RocksDBEngine/RocksDBCommon.h"
#include "RocksDBEngine/RocksDBCounterManager.h"
#include "RocksDBEngine/RocksDBEngine.h"
#include "RocksDBEngine/RocksDBKey.h"
#include "RocksDBEngine/RocksDBPrimaryIndex.h"
@ -167,9 +168,11 @@ size_t RocksDBCollection::memory() const {
void RocksDBCollection::open(bool ignoreErrors) {
// set the initial number of documents
rocksdb::ReadOptions readOptions;
rocksdb::TransactionDB* db = static_cast<RocksDBEngine*>(EngineSelectorFeature::ENGINE)->db();
_numberDocuments = countKeyRange(db, readOptions, RocksDBKeyBounds::CollectionDocuments(_objectId));
//rocksdb::ReadOptions readOptions;
//rocksdb::TransactionDB* db = static_cast<RocksDBEngine*>(EngineSelectorFeature::ENGINE)->db();
RocksDBEngine* engine = static_cast<RocksDBEngine*>(EngineSelectorFeature::ENGINE);
_numberDocuments = engine->counterManager()->loadCounter(this->objectId());
//_numberDocuments = countKeyRange(db, readOptions, RocksDBKeyBounds::CollectionDocuments(_objectId));
}
/// @brief iterate all markers of a collection on load

View File

@ -25,34 +25,51 @@
#include "Basics/ReadLocker.h"
#include "Basics/WriteLocker.h"
#include "Basics/ConditionLocker.h"
#include "RocksDBEngine/RocksDBKey.h"
#include "RocksDBEngine/RocksDBKeyBounds.h"
#include "RocksDBEngine/RocksDBValue.h"
#include "RocksDBEngine/RocksDBCommon.h"
#include <rocksdb/utilities/transaction_db.h>
#include <rocksdb/write_batch.h>
#include <velocypack/Iterator.h>
using namespace arangodb;
struct WBReader : public rocksdb::WriteBatch::Handler {
void Put(const rocksdb::Slice& /*key*/, const rocksdb::Slice& /*value*/) override {
}
void Delete(const rocksdb::Slice& /*key*/) override {
}
void SingleDelete(const rocksdb::Slice& /*key*/) override {
}
};
/// Constructor needs to be called synchrunously,
/// will load counts from the db and scan the WAL
RocksDBCounterManager::RocksDBCounterManager(uint64_t interval)
RocksDBCounterManager::RocksDBCounterManager(rocksdb::DB *db, double interval)
: Thread("RocksDBCounters"),
_db(db),
_interval(interval) {
WRITE_LOCKER(guard, _rwLock);
readCounterValues();
if (_counters.size() > 0) {
if (parseRocksWAL()) {
sync();
}
}
}
void RocksDBCounterManager::beginShutdown() {
Thread::beginShutdown();
_condition.broadcast();
//CONDITION_LOCKER(locker, _condition);
//locker.signal();
}
void RocksDBCounterManager::run() {
while (!isStopping()) {
CONDITION_LOCKER(locker, _condition);
locker.wait(static_cast<uint64_t>(_interval * 1000000.0));
if (!isStopping()) {
this->sync();
}
}
}
uint64_t RocksDBCounterManager::loadCounter(uint64_t objectId) {
@ -95,6 +112,18 @@ void RocksDBCounterManager::updateCounter(uint64_t objectId,
}
}
void RocksDBCounterManager::removeCounter(uint64_t objectId) {
WRITE_LOCKER(guard, _rwLock);
auto const& it = _counters.find(objectId);
if (it != _counters.end()) {
RocksDBKey key = RocksDBKey::CounterValue(it->first);
rocksdb::WriteOptions options;
_db->Delete(options, key.string());
_counters.erase(it);
}
}
/// Thread-Safe force sync
Result RocksDBCounterManager::sync() {
if (_syncing) {
@ -117,8 +146,15 @@ Result RocksDBCounterManager::sync() {
VPackBuilder b;
for (std::pair<uint64_t, Counter> const& pair : tmp) {
// Skip values which we did not change
auto const& it = _syncedCounters.find(pair.first);
if (it != _syncedCounters.end() && it->second.sequenceNumber == pair.second.sequenceNumber) {
continue;
}
b.clear();
b.openArray();
b.add(VPackValue(pair.first));
b.add(VPackValue(pair.second.sequenceNumber));
b.add(VPackValue(pair.second.count));
b.close();
@ -132,11 +168,105 @@ Result RocksDBCounterManager::sync() {
}
}
rocksdb::Status s = rtrx->Commit();
if (s.ok()) {
_syncedCounters = tmp;
}
_syncing = false;
return rocksutils::convertStatus(s);
}
void RocksDBCounterManager::loadCounterValues() {
void RocksDBCounterManager::readCounterValues() {
rocksdb::TransactionDB* db = rocksutils::globalRocksDB();
rocksdb::Comparator const* cmp = db->GetOptions().comparator;
RocksDBKeyBounds bounds = RocksDBKeyBounds::CounterValues();
rocksdb::ReadOptions readOptions;
std::unique_ptr<rocksdb::Iterator> iter (db->NewIterator(readOptions));
iter->Seek(bounds.start());
while (iter->Valid() && cmp->Compare(iter->key(), bounds.end()) == -1) {
rocksdb::Slice sl = iter->value();
velocypack::ArrayIterator array(VPackSlice(sl.data()));
if (array.valid()) {
uint64_t objectId = (*array).getUInt();
uint64_t sequenceNumber = (*(++array)).getUInt();
uint64_t count = (*(++array)).getUInt();
_counters.emplace(objectId,
Counter{.sequenceNumber = sequenceNumber,
.count = count});
}
iter->Next();
}
// update synced counters
_syncedCounters = _counters;
}
struct WBReader : public rocksdb::WriteBatch::Handler {
std::unordered_map<uint64_t, RocksDBCounterManager::Counter> &counters;
rocksdb::SequenceNumber seqNum = UINT64_MAX;
bool recovered = false;
WBReader(std::unordered_map<uint64_t, RocksDBCounterManager::Counter> &cnts)
: counters(cnts) {}
void Put(const rocksdb::Slice& key, const rocksdb::Slice& /*value*/) override {
uint64_t obj = RocksDBKey::extractObjectId(key);
auto it = counters.find(obj);
if (it != counters.end() && it->second.sequenceNumber < seqNum) {
it->second.count++;
recovered = true;
}
}
void Delete(const rocksdb::Slice& key) override {
uint64_t obj = RocksDBKey::extractObjectId(key);
auto it = counters.find(obj);
if (it != counters.end() && it->second.sequenceNumber < seqNum) {
it->second.count--;
recovered = true;
}
}
void SingleDelete(const rocksdb::Slice& key) override {
uint64_t obj = RocksDBKey::extractObjectId(key);
auto it = counters.find(obj);
if (it != counters.end() && it->second.sequenceNumber < seqNum) {
it->second.count--;
recovered = true;
}
}
};
bool RocksDBCounterManager::parseRocksWAL() {
TRI_ASSERT(_counters.size() != 0);
rocksdb::SequenceNumber minSeq = UINT64_MAX;
for (auto const& it : _syncedCounters) {
if (it.second.sequenceNumber < minSeq) {
minSeq = it.second.sequenceNumber;
}
}
std::unique_ptr<WBReader> handler(new WBReader(_counters));
std::unique_ptr<rocksdb::TransactionLogIterator> iterator;//reader();
rocksdb::Status s = _db->GetUpdatesSince(minSeq, &iterator);
if (!s.ok()) {// TODO do something?
return false;
}
while (iterator->Valid()) {
rocksdb::BatchResult result = iterator->GetBatch();
if (result.sequence <= minSeq) {
continue;
}
handler->seqNum = result.sequence;
s = result.writeBatchPtr->Iterate(handler.get());
if (!s.ok()) {
break;
}
iterator->Next();
}
return handler->recovered;
}

View File

@ -25,19 +25,23 @@
#define ARANGOD_ROCKSDB_ENGINE_ROCKSDB_COUNTMANAGER_H 1
#include "Basics/Common.h"
#include "Basics/ConditionVariable.h"
#include "Basics/ReadWriteLock.h"
#include "RocksDBEngine/RocksDBTypes.h"
#include "Basics/Thread.h"
#include "Basics/Result.h"
#include <rocksdb/types.h>
namespace rocksdb {class Transaction; class Snapshot;}
namespace rocksdb {class DB; class Transaction; class Snapshot;}
namespace arangodb {
class RocksDBCounterManager : Thread {
public:
friend class RocksDBEngine;
RocksDBCounterManager(rocksdb::DB* db, double interval);
public:
struct Counter {
rocksdb::SequenceNumber sequenceNumber;
uint64_t count;
@ -45,7 +49,6 @@ public:
/// Constructor needs to be called synchrunously,
/// will load counts from the db and scan the WAL
RocksDBCounterManager(uint64_t interval);
/// Thread-Safe load a counter
uint64_t loadCounter(uint64_t objectId);
@ -55,18 +58,58 @@ public:
/// the sequence number used
void updateCounter(uint64_t objectId, rocksdb::Snapshot const* snapshot,
uint64_t counter);
/// Thread-Safe remove a counter
void removeCounter(uint64_t objectId);
/// Thread-Safe force sync
arangodb::Result sync();
void beginShutdown() override;
protected:
void run() override;
private:
void loadCounterValues();
void readCounterValues();
bool parseRocksWAL();
basics::ReadWriteLock _rwLock;
//////////////////////////////////////////////////////////////////////////////
/// @brief counter values
//////////////////////////////////////////////////////////////////////////////
std::unordered_map<uint64_t, Counter> _counters;
bool _syncing;
uint64_t const _interval;
//////////////////////////////////////////////////////////////////////////////
/// @brief counter values
//////////////////////////////////////////////////////////////////////////////
std::unordered_map<uint64_t, Counter> _syncedCounters;
//////////////////////////////////////////////////////////////////////////////
/// @brief currently syncing
//////////////////////////////////////////////////////////////////////////////
bool _syncing = false;
//////////////////////////////////////////////////////////////////////////////
/// @brief rocsdb instance
//////////////////////////////////////////////////////////////////////////////
rocksdb::DB* _db;
//////////////////////////////////////////////////////////////////////////////
/// @brief interval i which we will sync
//////////////////////////////////////////////////////////////////////////////
double const _interval;
//////////////////////////////////////////////////////////////////////////////
/// @brief condition variable for heartbeat
//////////////////////////////////////////////////////////////////////////////
arangodb::basics::ConditionVariable _condition;
//////////////////////////////////////////////////////////////////////////////
/// @brief protect _syncing and _counters
//////////////////////////////////////////////////////////////////////////////
basics::ReadWriteLock _rwLock;
};
}

View File

@ -36,6 +36,7 @@
#include "RocksDBEngine/RocksDBCollection.h"
#include "RocksDBEngine/RocksDBCommon.h"
#include "RocksDBEngine/RocksDBComparator.h"
#include "RocksDBEngine/RocksDBCounterManager.h"
#include "RocksDBEngine/RocksDBIndexFactory.h"
#include "RocksDBEngine/RocksDBKey.h"
#include "RocksDBEngine/RocksDBTransactionCollection.h"
@ -106,10 +107,13 @@ void RocksDBEngine::start() {
<< _path;
rocksdb::TransactionDBOptions transactionOptions;
double counter_sync_seconds = 2.5;
_options.create_if_missing = true;
_options.max_open_files = -1;
_options.comparator = _cmp.get();
_options.WAL_ttl_seconds = counter_sync_seconds*2;
rocksdb::Status status =
rocksdb::TransactionDB::Open(_options, transactionOptions, _path, &_db);
@ -121,6 +125,11 @@ void RocksDBEngine::start() {
}
TRI_ASSERT(_db != nullptr);
_counterManager.reset(new RocksDBCounterManager(_db, counter_sync_seconds));
if (!_counterManager->start()) {
LOG_TOPIC(ERR, Logger::ENGINES) << "Could not start rocksdb counter manager";
TRI_ASSERT(false);
}
if (!systemDatabaseExists()) {
addSystemDatabase();
@ -135,6 +144,11 @@ void RocksDBEngine::unprepare() {
}
if (_db) {
if (_counterManager) {
_counterManager->sync();
_counterManager.reset();
}
delete _db;
_db = nullptr;
}
@ -802,5 +816,9 @@ TRI_vocbase_t* RocksDBEngine::openExistingDatabase(TRI_voc_tick_t id,
throw;
}
}
RocksDBCounterManager* RocksDBEngine::counterManager() {
return _counterManager.get();
}
} // namespace

View File

@ -38,6 +38,7 @@
namespace arangodb {
class RocksDBComparator;
class RocksDBCounterManager;
class PhysicalCollection;
class PhysicalView;
class TransactionCollection;
@ -252,12 +253,15 @@ class RocksDBEngine final : public StorageEngine {
public:
static std::string const EngineName;
static std::string const FeatureName;
RocksDBCounterManager* counterManager();
private:
rocksdb::TransactionDB* _db;
rocksdb::Options _options;
std::unique_ptr<RocksDBComparator> _cmp;
std::string _path;
std::unique_ptr<RocksDBCounterManager> _counterManager;
};
}
#endif

View File

@ -94,6 +94,12 @@ RocksDBEntryType RocksDBKey::type(rocksdb::Slice const& slice) {
return type(slice.data(), slice.size());
}
uint64_t RocksDBKey::extractObjectId(rocksdb::Slice const& s) {
TRI_ASSERT(s.size() >= (sizeof(char) + sizeof(uint64_t)));
return uint64FromPersistent(s.data() + sizeof(char));
}
TRI_voc_tick_t RocksDBKey::databaseId(RocksDBKey const& key) {
return databaseId(key._buffer.data(), key._buffer.size());
}

View File

@ -122,6 +122,13 @@ class RocksDBKey {
//////////////////////////////////////////////////////////////////////////////
static RocksDBEntryType type(RocksDBKey const&);
static RocksDBEntryType type(rocksdb::Slice const&);
//////////////////////////////////////////////////////////////////////////////
/// @brief Extracts the object id
///
/// May be called on any valid key (in our keyspace)
//////////////////////////////////////////////////////////////////////////////
static uint64_t extractObjectId(rocksdb::Slice const&);
//////////////////////////////////////////////////////////////////////////////
/// @brief Extracts the databaseId from a key

View File

@ -89,6 +89,10 @@ RocksDBKeyBounds RocksDBKeyBounds::DatabaseViews(TRI_voc_tick_t databaseId) {
return RocksDBKeyBounds(RocksDBEntryType::View, databaseId);
}
RocksDBKeyBounds RocksDBKeyBounds::CounterValues() {
return RocksDBKeyBounds(RocksDBEntryType::CounterValue);
}
rocksdb::Slice const RocksDBKeyBounds::start() const {
return rocksdb::Slice(_startBuffer);
}
@ -111,6 +115,17 @@ RocksDBKeyBounds::RocksDBKeyBounds(RocksDBEntryType type)
break;
}
case RocksDBEntryType::CounterValue: {
size_t length = sizeof(char);
_startBuffer.reserve(length);
_startBuffer.push_back(static_cast<char>(_type));
_endBuffer.clear();
_endBuffer.append(_startBuffer);
uint64ToPersistent(_startBuffer, UINT64_MAX);
//nextPrefix(_endBuffer);
break;
}
default:
THROW_ARANGO_EXCEPTION(TRI_ERROR_BAD_PARAMETER);

View File

@ -107,6 +107,11 @@ class RocksDBKeyBounds {
/// @brief Bounds for all views belonging to a specified database
//////////////////////////////////////////////////////////////////////////////
static RocksDBKeyBounds DatabaseViews(TRI_voc_tick_t databaseId);
//////////////////////////////////////////////////////////////////////////////
/// @brief Bounds for all counter values
//////////////////////////////////////////////////////////////////////////////
static RocksDBKeyBounds CounterValues();
public:
//////////////////////////////////////////////////////////////////////////////

View File

@ -31,6 +31,7 @@
#include "RestServer/TransactionManagerFeature.h"
#include "RocksDBEngine/RocksDBCollection.h"
#include "RocksDBEngine/RocksDBCommon.h"
#include "RocksDBEngine/RocksDBCounterManager.h"
#include "RocksDBEngine/RocksDBEngine.h"
#include "RocksDBEngine/RocksDBTransactionCollection.h"
#include "StorageEngine/EngineSelectorFeature.h"
@ -162,10 +163,15 @@ Result RocksDBTransactionState::commitTransaction(
return result;
}
rocksdb::Snapshot const* snap = this->_rocksReadOptions.snapshot;
for (auto& trxCollection : _collections) {
RocksDBTransactionCollection* collection = static_cast<RocksDBTransactionCollection*>(trxCollection);
int64_t adjustment = collection->numInserts() - collection->numRemoves();
static_cast<RocksDBCollection*>(trxCollection->collection()->getPhysical())->adjustNumberDocuments(adjustment);
RocksDBCollection *coll = static_cast<RocksDBCollection*>(trxCollection->collection()->getPhysical());
coll->adjustNumberDocuments(adjustment);
RocksDBEngine* engine = static_cast<RocksDBEngine*>(EngineSelectorFeature::ENGINE);
engine->counterManager()->updateCounter(coll->objectId(), snap, coll->numberDocuments());
}
_rocksTransaction.reset();

View File

@ -40,6 +40,10 @@ const ArangoError = arangodb.ArangoError;
// converts a user document to the legacy format
const convertToLegacyFormat = function (doc) {
// TODO remove this
if (!doc)
return {user:"root", active:true, extra:{},changePassword:null};
let ad = doc.authData || {};
return {
user: doc.user,

View File

@ -154,6 +154,13 @@ bool ApplicationServer::isRequired(std::string const& name) const {
// signal. after that, it will shutdown all features
void ApplicationServer::run(int argc, char* argv[]) {
LOG_TOPIC(TRACE, Logger::STARTUP) << "ApplicationServer::run";
usleep(1000000);
usleep(1000000);
usleep(1000000);
usleep(1000000);
usleep(1000000);
usleep(1000000);
// collect options from all features
// in this phase, all features are order-independent