mirror of https://gitee.com/bigwinds/arangodb
Fixing count
This commit is contained in:
parent
1d90520a9d
commit
d9510eee0b
|
@ -35,42 +35,45 @@
|
||||||
|
|
||||||
#include <rocksdb/utilities/transaction_db.h>
|
#include <rocksdb/utilities/transaction_db.h>
|
||||||
#include <rocksdb/write_batch.h>
|
#include <rocksdb/write_batch.h>
|
||||||
|
#include <rocksdb/utilities/write_batch_with_index.h>
|
||||||
|
|
||||||
#include <velocypack/Iterator.h>
|
#include <velocypack/Iterator.h>
|
||||||
|
#include <velocypack/velocypack-aliases.h>
|
||||||
|
|
||||||
using namespace arangodb;
|
using namespace arangodb;
|
||||||
|
|
||||||
RocksDBCounterManager::Counter::Counter(VPackSlice const& slice) {
|
RocksDBCounterManager::CMValue::CMValue(VPackSlice const& slice) {
|
||||||
TRI_ASSERT(slice.isArray());
|
TRI_ASSERT(slice.isArray());
|
||||||
|
|
||||||
velocypack::ArrayIterator array(slice);
|
velocypack::ArrayIterator array(slice);
|
||||||
if (array.valid()) {
|
if (array.valid()) {
|
||||||
this->_sequenceNumber = (*array).getUInt();
|
this->_sequenceNum = (*array).getUInt();
|
||||||
this->_count = (*(++array)).getUInt();
|
this->_count = (*(++array)).getUInt();
|
||||||
this->_revisionId = (*(++array)).getUInt();
|
this->_revisionId = (*(++array)).getUInt();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
RocksDBCounterManager::Counter::Counter() : _sequenceNumber(0), _count(0), _revisionId(0) {}
|
void RocksDBCounterManager::CMValue::serialize(VPackBuilder& b) const {
|
||||||
|
|
||||||
void RocksDBCounterManager::Counter::serialize(VPackBuilder& b) const {
|
|
||||||
b.openArray();
|
b.openArray();
|
||||||
b.add(VPackValue(this->_sequenceNumber));
|
b.add(VPackValue(this->_sequenceNum));
|
||||||
b.add(VPackValue(this->_count));
|
b.add(VPackValue(this->_count));
|
||||||
b.add(VPackValue(this->_revisionId));
|
b.add(VPackValue(this->_revisionId));
|
||||||
b.close();
|
b.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
void RocksDBCounterManager::Counter::reset() {
|
|
||||||
_sequenceNumber = 0;
|
|
||||||
_count = 0;
|
|
||||||
_revisionId = 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Constructor needs to be called synchrunously,
|
/// Constructor needs to be called synchrunously,
|
||||||
/// will load counts from the db and scan the WAL
|
/// will load counts from the db and scan the WAL
|
||||||
RocksDBCounterManager::RocksDBCounterManager(rocksdb::DB* db, double interval)
|
RocksDBCounterManager::RocksDBCounterManager(rocksdb::DB* db, double interval)
|
||||||
: Thread("RocksDBCounters"), _db(db), _interval(interval) {
|
: Thread("RocksDBCounters"), _db(db), _interval(interval) {
|
||||||
|
|
||||||
|
usleep(1000000);
|
||||||
|
usleep(1000000);
|
||||||
|
usleep(1000000);
|
||||||
|
usleep(1000000);
|
||||||
|
usleep(1000000);
|
||||||
|
usleep(1000000);
|
||||||
|
usleep(1000000);
|
||||||
|
|
||||||
readCounterValues();
|
readCounterValues();
|
||||||
if (_counters.size() > 0) {
|
if (_counters.size() > 0) {
|
||||||
if (parseRocksWAL()) {
|
if (parseRocksWAL()) {
|
||||||
|
@ -98,44 +101,52 @@ void RocksDBCounterManager::run() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
RocksDBCounterManager::Counter const RocksDBCounterManager::loadCounter(uint64_t objectId) {
|
RocksDBCounterManager::CounterUpdate
|
||||||
static const Counter empty{};
|
RocksDBCounterManager::loadCounter(uint64_t objectId) const {
|
||||||
{
|
TRI_ASSERT(objectId != 0);// TODO fix this
|
||||||
|
|
||||||
READ_LOCKER(guard, _rwLock);
|
READ_LOCKER(guard, _rwLock);
|
||||||
auto const& it = _counters.find(objectId);
|
auto const& it = _counters.find(objectId);
|
||||||
if (it != _counters.end()) {
|
if (it != _counters.end()) {
|
||||||
return it->second;
|
return CounterUpdate(it->second._sequenceNum,
|
||||||
|
it->second._count,
|
||||||
|
it->second._revisionId);
|
||||||
}
|
}
|
||||||
} return empty; // do not create
|
return CounterUpdate(0,0,0); // do not create
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/// collections / views / indexes can call this method to update
|
/// collections / views / indexes can call this method to update
|
||||||
/// their total counts. Thread-Safe needs the snapshot so we know
|
/// their total counts. Thread-Safe needs the snapshot so we know
|
||||||
/// the sequence number used
|
/// the sequence number used
|
||||||
void RocksDBCounterManager::updateCounter(uint64_t objectId,
|
void RocksDBCounterManager::updateCounter(uint64_t objectId,
|
||||||
rocksdb::Snapshot const* snapshot,
|
CounterUpdate const& update) {
|
||||||
int64_t delta, uint64_t revisionId) {
|
// From write_batch.cc in rocksdb: first 64 bits in the internal rep_
|
||||||
|
// buffer are the sequence number
|
||||||
|
/*TRI_ASSERT(trx->GetState() == rocksdb::Transaction::COMMITED);
|
||||||
|
rocksdb::WriteBatchWithIndex *batch = trx->GetWriteBatch();
|
||||||
|
rocksdb::SequenceNumber seq = DecodeFixed64(batch->GetWriteBatch()->Data().data());*/
|
||||||
|
|
||||||
bool needsSync = false;
|
bool needsSync = false;
|
||||||
{
|
{
|
||||||
WRITE_LOCKER(guard, _rwLock);
|
WRITE_LOCKER(guard, _rwLock);
|
||||||
|
|
||||||
auto const& it = _counters.find(objectId);
|
auto it = _counters.find(objectId);
|
||||||
if (it != _counters.end()) {
|
if (it != _counters.end()) {
|
||||||
// already have a counter for objectId. now adjust its value
|
it->second._count += update.count();
|
||||||
it->second._count += delta;
|
// just use the latest trx info
|
||||||
|
if (update.sequenceNumber() > it->second._sequenceNum) {
|
||||||
// just use the latest snapshot
|
it->second._sequenceNum = update.sequenceNumber();
|
||||||
if (snapshot->GetSequenceNumber() > it->second._sequenceNumber) {
|
it->second._revisionId = update.revisionId();
|
||||||
it->second._sequenceNumber = snapshot->GetSequenceNumber();
|
|
||||||
it->second._revisionId = revisionId;
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// insert new counter
|
// insert new counter
|
||||||
TRI_ASSERT(delta >= 0);
|
TRI_ASSERT(update.count() != 0);
|
||||||
auto tmpCounter = Counter(snapshot->GetSequenceNumber(), delta, revisionId);
|
_counters.emplace(std::make_pair(objectId, CMValue(update.sequenceNumber(),
|
||||||
_counters.emplace(std::make_pair(objectId,std::move(tmpCounter)));
|
update.count(),
|
||||||
needsSync = true;
|
update.revisionId())));
|
||||||
|
needsSync = true;// only count values from WAL if they are in the DB
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (needsSync) {
|
if (needsSync) {
|
||||||
|
@ -164,7 +175,7 @@ Result RocksDBCounterManager::sync() {
|
||||||
return Result();
|
return Result();
|
||||||
}
|
}
|
||||||
|
|
||||||
std::unordered_map<uint64_t, Counter> copy;
|
std::unordered_map<uint64_t, CMValue> copy;
|
||||||
{ // block all updates
|
{ // block all updates
|
||||||
WRITE_LOCKER(guard, _rwLock);
|
WRITE_LOCKER(guard, _rwLock);
|
||||||
if (_syncing) {
|
if (_syncing) {
|
||||||
|
@ -180,10 +191,11 @@ Result RocksDBCounterManager::sync() {
|
||||||
db->BeginTransaction(writeOptions));
|
db->BeginTransaction(writeOptions));
|
||||||
|
|
||||||
VPackBuilder b;
|
VPackBuilder b;
|
||||||
for (std::pair<uint64_t, Counter> const& pair : copy) {
|
for (std::pair<uint64_t, CMValue> const& pair : copy) {
|
||||||
// Skip values which we did not change
|
// Skip values which we did not change
|
||||||
auto const& it = _syncedSeqNum.find(pair.first);
|
auto const& it = _syncedSeqNums.find(pair.first);
|
||||||
if (it != _syncedSeqNum.end() && it->second == pair.second._sequenceNumber) {
|
if (it != _syncedSeqNums.end()
|
||||||
|
&& it->second == pair.second._sequenceNum) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -200,37 +212,35 @@ Result RocksDBCounterManager::sync() {
|
||||||
return rocksutils::convertStatus(s);
|
return rocksutils::convertStatus(s);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// we have to commit all counters in one batch otherwise
|
// we have to commit all counters in one batch otherwise
|
||||||
// there would be the possibility of
|
// there would be the possibility of
|
||||||
rocksdb::Status s = rtrx->Commit();
|
rocksdb::Status s = rtrx->Commit();
|
||||||
if (s.ok()) {
|
if (s.ok()) {
|
||||||
for (std::pair<uint64_t, Counter> const& pair : copy) {
|
for (std::pair<uint64_t, CMValue> const& pair : copy) {
|
||||||
_syncedSeqNum[pair.first] = pair.second._sequenceNumber;
|
_syncedSeqNums[pair.first] = pair.second._sequenceNum;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
_syncing = false;
|
|
||||||
|
|
||||||
|
_syncing = false;
|
||||||
return rocksutils::convertStatus(s);
|
return rocksutils::convertStatus(s);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Parse counter values from rocksdb
|
/// Parse counter values from rocksdb
|
||||||
void RocksDBCounterManager::readCounterValues() {
|
void RocksDBCounterManager::readCounterValues() {
|
||||||
WRITE_LOCKER(guard, _rwLock);
|
WRITE_LOCKER(guard, _rwLock);
|
||||||
|
|
||||||
rocksdb::TransactionDB* db = rocksutils::globalRocksDB();
|
|
||||||
rocksdb::Comparator const* cmp = db->GetOptions().comparator;
|
|
||||||
RocksDBKeyBounds bounds = RocksDBKeyBounds::CounterValues();
|
RocksDBKeyBounds bounds = RocksDBKeyBounds::CounterValues();
|
||||||
|
|
||||||
|
rocksdb::Comparator const* cmp = _db->GetOptions().comparator;
|
||||||
rocksdb::ReadOptions readOptions;
|
rocksdb::ReadOptions readOptions;
|
||||||
std::unique_ptr<rocksdb::Iterator> iter(db->NewIterator(readOptions));
|
std::unique_ptr<rocksdb::Iterator> iter(_db->NewIterator(readOptions));
|
||||||
iter->Seek(bounds.start());
|
iter->Seek(bounds.start());
|
||||||
|
|
||||||
while (iter->Valid() && cmp->Compare(iter->key(), bounds.end()) < 0) {
|
while (iter->Valid() && cmp->Compare(iter->key(), bounds.end()) < 0) {
|
||||||
|
|
||||||
uint64_t objectId = RocksDBKey::counterObjectId(iter->key());
|
uint64_t objectId = RocksDBKey::counterObjectId(iter->key());
|
||||||
Counter counter(VPackSlice(iter->value().data()));
|
auto const& it = _counters.emplace(objectId, CMValue(VPackSlice(iter->value().data())));
|
||||||
_counters.emplace(objectId, counter);
|
_syncedSeqNums[objectId] = it.first->second._sequenceNum;
|
||||||
_syncedSeqNum[objectId] = counter._sequenceNumber;
|
|
||||||
|
|
||||||
iter->Next();
|
iter->Next();
|
||||||
}
|
}
|
||||||
|
@ -238,39 +248,22 @@ void RocksDBCounterManager::readCounterValues() {
|
||||||
|
|
||||||
/// WAL parser, no locking required here, because we have been locked from the outside
|
/// WAL parser, no locking required here, because we have been locked from the outside
|
||||||
struct WBReader : public rocksdb::WriteBatch::Handler {
|
struct WBReader : public rocksdb::WriteBatch::Handler {
|
||||||
std::unordered_map<uint64_t, rocksdb::SequenceNumber> prevCounters;
|
// must be set by the counter manager
|
||||||
std::unordered_map<uint64_t, RocksDBCounterManager::Counter> deltas;
|
std::unordered_map<uint64_t, rocksdb::SequenceNumber> seqStart;
|
||||||
rocksdb::SequenceNumber seqNum = UINT64_MAX;
|
std::unordered_map<uint64_t, RocksDBCounterManager::CounterUpdate> deltas;
|
||||||
|
rocksdb::SequenceNumber currentSeqNum;
|
||||||
|
|
||||||
explicit WBReader() {}
|
explicit WBReader() {}
|
||||||
|
|
||||||
bool prepKey(const rocksdb::Slice& key) {
|
bool prepKey(const rocksdb::Slice& key) {
|
||||||
|
if (RocksDBKey::type(key) == RocksDBEntryType::Document) {
|
||||||
if (RocksDBKey::type(key) == RocksDBEntryType::CounterValue) {
|
|
||||||
uint64_t objectId = RocksDBKey::counterObjectId(key);
|
uint64_t objectId = RocksDBKey::counterObjectId(key);
|
||||||
auto const& it = prevCounters.find(objectId);
|
auto const& it = seqStart.find(objectId);
|
||||||
if (it == prevCounters.end()) {
|
if (it != seqStart.end()) {
|
||||||
// We found a counter value, now we can track all writes from this
|
if (deltas.find(objectId) == deltas.end()) {
|
||||||
// point in the WAL
|
deltas.emplace(objectId, RocksDBCounterManager::CounterUpdate(0,0,0));
|
||||||
prevCounters.emplace(objectId, seqNum);
|
|
||||||
deltas.emplace(objectId, RocksDBCounterManager::Counter());
|
|
||||||
} else if (it->second < seqNum) {
|
|
||||||
// we found our counter again at a later point in the WAL
|
|
||||||
// which means we can forget about every write we have seen
|
|
||||||
// up to this point
|
|
||||||
TRI_ASSERT(deltas.find(objectId) != deltas.end());
|
|
||||||
it->second = seqNum;
|
|
||||||
deltas[objectId].reset();
|
|
||||||
}
|
}
|
||||||
return false;
|
return it->second < currentSeqNum;
|
||||||
} else if (RocksDBKey::type(key) == RocksDBEntryType::Document) {
|
|
||||||
uint64_t objectId = RocksDBKey::counterObjectId(key);
|
|
||||||
auto const& it = prevCounters.find(objectId);
|
|
||||||
if (it == prevCounters.end()) {
|
|
||||||
return false;
|
|
||||||
} else {
|
|
||||||
TRI_ASSERT(deltas.find(objectId) != deltas.end());
|
|
||||||
return it->second <= seqNum;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
|
@ -282,11 +275,10 @@ struct WBReader : public rocksdb::WriteBatch::Handler {
|
||||||
uint64_t objectId = RocksDBKey::counterObjectId(key);
|
uint64_t objectId = RocksDBKey::counterObjectId(key);
|
||||||
uint64_t revisionId = RocksDBKey::revisionId(key);
|
uint64_t revisionId = RocksDBKey::revisionId(key);
|
||||||
|
|
||||||
RocksDBCounterManager::Counter& cc = deltas[objectId];
|
auto const& it = deltas.find(objectId);
|
||||||
if (cc._sequenceNumber <= seqNum) {
|
if (it != deltas.end()) {
|
||||||
cc._sequenceNumber = seqNum;
|
it->second._count++;
|
||||||
cc._count++;
|
it->second._revisionId = revisionId;
|
||||||
cc._revisionId = revisionId;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -296,11 +288,10 @@ struct WBReader : public rocksdb::WriteBatch::Handler {
|
||||||
uint64_t objectId = RocksDBKey::counterObjectId(key);
|
uint64_t objectId = RocksDBKey::counterObjectId(key);
|
||||||
uint64_t revisionId = RocksDBKey::revisionId(key);
|
uint64_t revisionId = RocksDBKey::revisionId(key);
|
||||||
|
|
||||||
RocksDBCounterManager::Counter& cc = deltas[objectId];
|
auto const& it = deltas.find(objectId);
|
||||||
if (cc._sequenceNumber <= seqNum) {
|
if (it != deltas.end()) {
|
||||||
cc._sequenceNumber = seqNum;
|
it->second._count--;
|
||||||
cc._count--;
|
it->second._revisionId = revisionId;
|
||||||
cc._revisionId = revisionId;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -310,11 +301,10 @@ struct WBReader : public rocksdb::WriteBatch::Handler {
|
||||||
uint64_t objectId = RocksDBKey::counterObjectId(key);
|
uint64_t objectId = RocksDBKey::counterObjectId(key);
|
||||||
uint64_t revisionId = RocksDBKey::revisionId(key);
|
uint64_t revisionId = RocksDBKey::revisionId(key);
|
||||||
|
|
||||||
RocksDBCounterManager::Counter& cc = deltas[objectId];
|
auto const& it = deltas.find(objectId);
|
||||||
if (cc._sequenceNumber <= seqNum) {
|
if (it != deltas.end()) {
|
||||||
cc._sequenceNumber = seqNum;
|
it->second._count--;
|
||||||
cc._count--;
|
it->second._revisionId = revisionId;
|
||||||
cc._revisionId = revisionId;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -325,27 +315,27 @@ bool RocksDBCounterManager::parseRocksWAL() {
|
||||||
WRITE_LOCKER(guard, _rwLock);
|
WRITE_LOCKER(guard, _rwLock);
|
||||||
TRI_ASSERT(_counters.size() > 0);
|
TRI_ASSERT(_counters.size() > 0);
|
||||||
|
|
||||||
// Determine which counter values might be outdated
|
rocksdb::SequenceNumber start = UINT64_MAX;
|
||||||
rocksdb::SequenceNumber lastWrittenSeq = UINT64_MAX;
|
// Tell the WriteBatch reader the transaction markers to look for
|
||||||
for (auto const& it : _syncedSeqNum) {
|
|
||||||
if (it.second < lastWrittenSeq) {
|
|
||||||
lastWrittenSeq = it.second;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
std::unique_ptr<WBReader> handler(new WBReader());
|
std::unique_ptr<WBReader> handler(new WBReader());
|
||||||
|
for (auto const& pair : _counters) {
|
||||||
|
handler->seqStart.emplace(pair.first, pair.second._sequenceNum);
|
||||||
|
start = std::min(start, pair.second._sequenceNum);
|
||||||
|
}
|
||||||
|
|
||||||
std::unique_ptr<rocksdb::TransactionLogIterator> iterator; // reader();
|
std::unique_ptr<rocksdb::TransactionLogIterator> iterator; // reader();
|
||||||
rocksdb::Status s = _db->GetUpdatesSince(lastWrittenSeq-1, &iterator);
|
rocksdb::Status s = _db->GetUpdatesSince(start, &iterator);
|
||||||
if (!s.ok()) { // TODO do something?
|
if (!s.ok()) { // TODO do something?
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
while (iterator->Valid()) {
|
while (iterator->Valid()) {
|
||||||
s = iterator->status();
|
s = iterator->status();
|
||||||
if (s.ok()) {
|
if (s.ok()) {
|
||||||
rocksdb::BatchResult result = iterator->GetBatch();
|
rocksdb::BatchResult batch = iterator->GetBatch();
|
||||||
handler->seqNum = result.sequence;
|
start = batch.sequence;
|
||||||
s = result.writeBatchPtr->Iterate(handler.get());
|
handler->currentSeqNum = start;
|
||||||
|
s = batch.writeBatchPtr->Iterate(handler.get());
|
||||||
}
|
}
|
||||||
if (!s.ok()) {
|
if (!s.ok()) {
|
||||||
LOG_TOPIC(ERR, Logger::ENGINES) << "Error during WAL scan";
|
LOG_TOPIC(ERR, Logger::ENGINES) << "Error during WAL scan";
|
||||||
|
@ -357,12 +347,10 @@ bool RocksDBCounterManager::parseRocksWAL() {
|
||||||
}
|
}
|
||||||
|
|
||||||
LOG_TOPIC(WARN, Logger::FIXME) << "Finished WAL scan";
|
LOG_TOPIC(WARN, Logger::FIXME) << "Finished WAL scan";
|
||||||
for (std::pair<uint64_t, RocksDBCounterManager::Counter> pair : handler->deltas) {
|
for (std::pair<uint64_t, RocksDBCounterManager::CounterUpdate> pair : handler->deltas) {
|
||||||
auto const& it = _counters.find(pair.first);
|
auto const& it = _counters.find(pair.first);
|
||||||
if (it != _counters.end()
|
if (it != _counters.end()) {
|
||||||
&& it->second._sequenceNumber < pair.second._sequenceNumber) {
|
it->second._sequenceNum = start;
|
||||||
|
|
||||||
it->second._sequenceNumber = pair.second._sequenceNumber;
|
|
||||||
it->second._count += pair.second._count;
|
it->second._count += pair.second._count;
|
||||||
it->second._revisionId = pair.second._revisionId;
|
it->second._revisionId = pair.second._revisionId;
|
||||||
LOG_TOPIC(WARN, Logger::FIXME) << "WAL recovered " << pair.second._count
|
LOG_TOPIC(WARN, Logger::FIXME) << "WAL recovered " << pair.second._count
|
||||||
|
@ -372,3 +360,4 @@ bool RocksDBCounterManager::parseRocksWAL() {
|
||||||
}
|
}
|
||||||
return handler->deltas.size() > 0;
|
return handler->deltas.size() > 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -30,16 +30,15 @@
|
||||||
#include "Basics/ReadWriteLock.h"
|
#include "Basics/ReadWriteLock.h"
|
||||||
#include "Basics/Result.h"
|
#include "Basics/Result.h"
|
||||||
#include "Basics/Thread.h"
|
#include "Basics/Thread.h"
|
||||||
|
#include "VocBase/voc-types.h"
|
||||||
#include "RocksDBEngine/RocksDBTypes.h"
|
#include "RocksDBEngine/RocksDBTypes.h"
|
||||||
|
|
||||||
#include <velocypack/Builder.h>
|
#include <velocypack/Builder.h>
|
||||||
#include <velocypack/Slice.h>
|
#include <velocypack/Slice.h>
|
||||||
#include <velocypack/velocypack-aliases.h>
|
|
||||||
|
|
||||||
namespace rocksdb {
|
namespace rocksdb {
|
||||||
class DB;
|
class DB;
|
||||||
class Transaction;
|
class Transaction;
|
||||||
class Snapshot;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
namespace arangodb {
|
namespace arangodb {
|
||||||
|
@ -47,36 +46,34 @@ namespace arangodb {
|
||||||
class RocksDBCounterManager : Thread {
|
class RocksDBCounterManager : Thread {
|
||||||
friend class RocksDBEngine;
|
friend class RocksDBEngine;
|
||||||
|
|
||||||
|
|
||||||
/// Constructor needs to be called synchronously,
|
/// Constructor needs to be called synchronously,
|
||||||
/// will load counts from the db and scan the WAL
|
/// will load counts from the db and scan the WAL
|
||||||
RocksDBCounterManager(rocksdb::DB* db, double interval);
|
RocksDBCounterManager(rocksdb::DB* db, double interval);
|
||||||
|
|
||||||
public:
|
public:
|
||||||
struct Counter {
|
|
||||||
rocksdb::SequenceNumber _sequenceNumber;
|
struct CounterUpdate {
|
||||||
|
rocksdb::SequenceNumber _sequenceNum;
|
||||||
uint64_t _count; // used for number of documents
|
uint64_t _count; // used for number of documents
|
||||||
uint64_t _revisionId; // used for revision id
|
TRI_voc_rid_t _revisionId; // used for revision id
|
||||||
|
|
||||||
Counter();
|
CounterUpdate(rocksdb::SequenceNumber seq, uint64_t count, TRI_voc_rid_t revisionId)
|
||||||
explicit Counter(VPackSlice const&);
|
: _sequenceNum(seq), _count(count), _revisionId(revisionId) {}
|
||||||
Counter(rocksdb::SequenceNumber seq, uint64_t count, uint64_t revisionId)
|
|
||||||
: _sequenceNumber(seq), _count(count), _revisionId(revisionId) {}
|
|
||||||
|
|
||||||
void serialize(VPackBuilder&) const;
|
rocksdb::SequenceNumber sequenceNumber() const {return _sequenceNum;};
|
||||||
void reset();
|
uint64_t count() const { return _count; }
|
||||||
uint64_t count() { return _count; }
|
TRI_voc_rid_t revisionId() const { return _revisionId; }
|
||||||
uint64_t revisionId() { return _revisionId; }
|
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
/// Thread-Safe load a counter
|
/// Thread-Safe load a counter
|
||||||
Counter const loadCounter(uint64_t objectId);
|
CounterUpdate loadCounter(uint64_t objectId) const;
|
||||||
|
|
||||||
/// collections / views / indexes can call this method to update
|
/// collections / views / indexes can call this method to update
|
||||||
/// their total counts. Thread-Safe needs the snapshot so we know
|
/// their total counts. Thread-Safe needs the snapshot so we know
|
||||||
/// the sequence number used
|
/// the sequence number used
|
||||||
void updateCounter(uint64_t objectId, rocksdb::Snapshot const* snapshot,
|
void updateCounter(uint64_t objectId,
|
||||||
int64_t delta, uint64_t revisionId);
|
CounterUpdate const&);
|
||||||
|
|
||||||
/// Thread-Safe remove a counter
|
/// Thread-Safe remove a counter
|
||||||
void removeCounter(uint64_t objectId);
|
void removeCounter(uint64_t objectId);
|
||||||
|
@ -89,19 +86,34 @@ class RocksDBCounterManager : Thread {
|
||||||
protected:
|
protected:
|
||||||
void run() override;
|
void run() override;
|
||||||
|
|
||||||
|
struct CMValue {
|
||||||
|
/// ArangoDB transaction ID
|
||||||
|
rocksdb::SequenceNumber _sequenceNum;
|
||||||
|
/// used for number of documents
|
||||||
|
uint64_t _count;
|
||||||
|
/// used for revision id
|
||||||
|
TRI_voc_rid_t _revisionId;
|
||||||
|
|
||||||
|
CMValue(rocksdb::SequenceNumber sq, uint64_t cc, TRI_voc_rid_t rid)
|
||||||
|
: _sequenceNum(sq), _count(cc), _revisionId(rid) {}
|
||||||
|
explicit CMValue(arangodb::velocypack::Slice const&);
|
||||||
|
void serialize(arangodb::velocypack::Builder&) const;
|
||||||
|
};
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
|
||||||
void readCounterValues();
|
void readCounterValues();
|
||||||
bool parseRocksWAL();
|
bool parseRocksWAL();
|
||||||
|
|
||||||
//////////////////////////////////////////////////////////////////////////////
|
//////////////////////////////////////////////////////////////////////////////
|
||||||
/// @brief counter values
|
/// @brief counter values
|
||||||
//////////////////////////////////////////////////////////////////////////////
|
//////////////////////////////////////////////////////////////////////////////
|
||||||
std::unordered_map<uint64_t, Counter> _counters;
|
std::unordered_map<uint64_t, CMValue> _counters;
|
||||||
|
|
||||||
//////////////////////////////////////////////////////////////////////////////
|
//////////////////////////////////////////////////////////////////////////////
|
||||||
/// @brief synced sequence numbers
|
/// @brief synced sequence numbers
|
||||||
//////////////////////////////////////////////////////////////////////////////
|
//////////////////////////////////////////////////////////////////////////////
|
||||||
std::unordered_map<uint64_t, rocksdb::SequenceNumber> _syncedSeqNum;
|
std::unordered_map<uint64_t, rocksdb::SequenceNumber> _syncedSeqNums;
|
||||||
|
|
||||||
//////////////////////////////////////////////////////////////////////////////
|
//////////////////////////////////////////////////////////////////////////////
|
||||||
/// @brief currently syncing
|
/// @brief currently syncing
|
||||||
|
@ -126,7 +138,7 @@ class RocksDBCounterManager : Thread {
|
||||||
//////////////////////////////////////////////////////////////////////////////
|
//////////////////////////////////////////////////////////////////////////////
|
||||||
/// @brief protect _syncing and _counters
|
/// @brief protect _syncing and _counters
|
||||||
//////////////////////////////////////////////////////////////////////////////
|
//////////////////////////////////////////////////////////////////////////////
|
||||||
basics::ReadWriteLock _rwLock;
|
mutable basics::ReadWriteLock _rwLock;
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -130,6 +130,10 @@ Result RocksDBTransactionState::beginTransaction(transaction::Hints hints) {
|
||||||
_rocksTransaction->SetSnapshot();
|
_rocksTransaction->SetSnapshot();
|
||||||
_rocksReadOptions.snapshot = _rocksTransaction->GetSnapshot();
|
_rocksReadOptions.snapshot = _rocksTransaction->GetSnapshot();
|
||||||
|
|
||||||
|
// add WAL trx marker; Always in before all other operations!!
|
||||||
|
RocksDBValue tid = RocksDBValue::TransactionID(_id);
|
||||||
|
_rocksTransaction->PutLogData(tid.string());
|
||||||
|
|
||||||
/*LOG_TOPIC(ERR, Logger::FIXME)
|
/*LOG_TOPIC(ERR, Logger::FIXME)
|
||||||
<< "#" << _id << " BEGIN (read-only: " << isReadOnlyTransaction()
|
<< "#" << _id << " BEGIN (read-only: " << isReadOnlyTransaction()
|
||||||
<< ")";*/
|
<< ")";*/
|
||||||
|
@ -177,7 +181,9 @@ Result RocksDBTransactionState::commitTransaction(
|
||||||
_rocksWriteOptions.sync = true;
|
_rocksWriteOptions.sync = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO wait for response on github issue to see how we can use the sequence number
|
||||||
result = rocksutils::convertStatus(_rocksTransaction->Commit());
|
result = rocksutils::convertStatus(_rocksTransaction->Commit());
|
||||||
|
rocksdb::SequenceNumber latestSeq = rocksutils::globalRocksDB()->GetLatestSequenceNumber();
|
||||||
if (!result.ok()) {
|
if (!result.ok()) {
|
||||||
abortTransaction(activeTrx);
|
abortTransaction(activeTrx);
|
||||||
return result;
|
return result;
|
||||||
|
@ -209,8 +215,10 @@ Result RocksDBTransactionState::commitTransaction(
|
||||||
RocksDBEngine* engine =
|
RocksDBEngine* engine =
|
||||||
static_cast<RocksDBEngine*>(EngineSelectorFeature::ENGINE);
|
static_cast<RocksDBEngine*>(EngineSelectorFeature::ENGINE);
|
||||||
|
|
||||||
engine->counterManager()->updateCounter(
|
RocksDBCounterManager::CounterUpdate update(latestSeq,
|
||||||
coll->objectId(), snap, adjustment, collection->revision());
|
adjustment,
|
||||||
|
collection->revision());
|
||||||
|
engine->counterManager()->updateCounter(coll->objectId(), update);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1377,9 +1377,9 @@ static void JS_BinaryWrite(v8::FunctionCallbackInfo<v8::Value> const& args) {
|
||||||
|
|
||||||
max_length = MIN(length, MIN(buffer->_length - offset, max_length));
|
max_length = MIN(length, MIN(buffer->_length - offset, max_length));
|
||||||
|
|
||||||
int written = DecodeWrite(isolate, p, max_length, s, BINARY);
|
ssize_t written = DecodeWrite(isolate, p, max_length, s, BINARY);
|
||||||
|
|
||||||
TRI_V8_RETURN(v8::Integer::New(isolate, written));
|
TRI_V8_RETURN(v8::Integer::New(isolate, (int32_t)written));
|
||||||
}
|
}
|
||||||
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
Loading…
Reference in New Issue