mirror of https://gitee.com/bigwinds/arangodb
keep replication batches alive for longer (#5931)
* keeping replication batches alive for longer * fix timeouts, fix revisions * honor force in gc
This commit is contained in:
parent
3a6737e37a
commit
25ceed2c40
|
@ -39,6 +39,7 @@
|
||||||
#include "RocksDBEngine/RocksDBKeyBounds.h"
|
#include "RocksDBEngine/RocksDBKeyBounds.h"
|
||||||
#include "RocksDBEngine/RocksDBVPackIndex.h"
|
#include "RocksDBEngine/RocksDBVPackIndex.h"
|
||||||
#include "RocksDBEngine/RocksDBValue.h"
|
#include "RocksDBEngine/RocksDBValue.h"
|
||||||
|
#include "Transaction/Helpers.h"
|
||||||
#include "StorageEngine/EngineSelectorFeature.h"
|
#include "StorageEngine/EngineSelectorFeature.h"
|
||||||
#include "VocBase/ticks.h"
|
#include "VocBase/ticks.h"
|
||||||
|
|
||||||
|
@ -641,14 +642,12 @@ class WBReader final : public rocksdb::WriteBatch::Handler {
|
||||||
updateMaxTick(column_family_id, key, value);
|
updateMaxTick(column_family_id, key, value);
|
||||||
if (shouldHandleDocument(column_family_id, key)) {
|
if (shouldHandleDocument(column_family_id, key)) {
|
||||||
uint64_t objectId = RocksDBKey::objectId(key);
|
uint64_t objectId = RocksDBKey::objectId(key);
|
||||||
uint64_t revisionId =
|
|
||||||
RocksDBKey::revisionId(RocksDBEntryType::Document, key);
|
|
||||||
|
|
||||||
auto const& it = deltas.find(objectId);
|
auto const& it = deltas.find(objectId);
|
||||||
if (it != deltas.end()) {
|
if (it != deltas.end()) {
|
||||||
it->second._sequenceNum = currentSeqNum;
|
it->second._sequenceNum = currentSeqNum;
|
||||||
it->second._added++;
|
it->second._added++;
|
||||||
it->second._revisionId = revisionId;
|
it->second._revisionId = transaction::helpers::extractRevFromDocument(RocksDBValue::data(value));
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// We have to adjust the estimate with an insert
|
// We have to adjust the estimate with an insert
|
||||||
|
@ -677,14 +676,11 @@ class WBReader final : public rocksdb::WriteBatch::Handler {
|
||||||
const rocksdb::Slice& key) override {
|
const rocksdb::Slice& key) override {
|
||||||
if (shouldHandleDocument(column_family_id, key)) {
|
if (shouldHandleDocument(column_family_id, key)) {
|
||||||
uint64_t objectId = RocksDBKey::objectId(key);
|
uint64_t objectId = RocksDBKey::objectId(key);
|
||||||
uint64_t revisionId =
|
|
||||||
RocksDBKey::revisionId(RocksDBEntryType::Document, key);
|
|
||||||
|
|
||||||
auto const& it = deltas.find(objectId);
|
auto const& it = deltas.find(objectId);
|
||||||
if (it != deltas.end()) {
|
if (it != deltas.end()) {
|
||||||
it->second._sequenceNum = currentSeqNum;
|
it->second._sequenceNum = currentSeqNum;
|
||||||
it->second._removed++;
|
it->second._removed++;
|
||||||
it->second._revisionId = revisionId;
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// We have to adjust the estimate with an remove
|
// We have to adjust the estimate with an remove
|
||||||
|
|
|
@ -525,11 +525,7 @@ void RocksDBReplicationContext::use(double ttl) {
|
||||||
TRI_ASSERT(!_isUsed);
|
TRI_ASSERT(!_isUsed);
|
||||||
|
|
||||||
_isUsed = true;
|
_isUsed = true;
|
||||||
if (_ttl > 0.0) {
|
ttl = std::max(std::max(_ttl, ttl), 300.0);
|
||||||
ttl = _ttl;
|
|
||||||
} else {
|
|
||||||
ttl = InitialSyncer::defaultBatchTimeout;
|
|
||||||
}
|
|
||||||
_expires = TRI_microtime() + ttl;
|
_expires = TRI_microtime() + ttl;
|
||||||
if (_serverId != 0) {
|
if (_serverId != 0) {
|
||||||
_vocbase->updateReplicationClient(_serverId, ttl);
|
_vocbase->updateReplicationClient(_serverId, ttl);
|
||||||
|
@ -538,6 +534,8 @@ void RocksDBReplicationContext::use(double ttl) {
|
||||||
|
|
||||||
void RocksDBReplicationContext::release() {
|
void RocksDBReplicationContext::release() {
|
||||||
TRI_ASSERT(_isUsed);
|
TRI_ASSERT(_isUsed);
|
||||||
|
double ttl = std::max(_ttl, 300.0);
|
||||||
|
_expires = TRI_microtime() + ttl;
|
||||||
_isUsed = false;
|
_isUsed = false;
|
||||||
if (_serverId != 0) {
|
if (_serverId != 0) {
|
||||||
double ttl;
|
double ttl;
|
||||||
|
|
|
@ -299,18 +299,18 @@ bool RocksDBReplicationManager::garbageCollect(bool force) {
|
||||||
for (auto it = _contexts.begin(); it != _contexts.end();
|
for (auto it = _contexts.begin(); it != _contexts.end();
|
||||||
/* no hoisting */) {
|
/* no hoisting */) {
|
||||||
auto context = it->second;
|
auto context = it->second;
|
||||||
|
|
||||||
|
if (!force && context->isUsed()) {
|
||||||
|
// must not physically destroy contexts that are currently used
|
||||||
|
++it;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
if (force || context->expires() < now) {
|
if (force || context->expires() < now) {
|
||||||
// expire contexts
|
// expire contexts
|
||||||
context->deleted();
|
context->deleted();
|
||||||
}
|
}
|
||||||
|
|
||||||
if (context->isUsed()) {
|
|
||||||
// must not physically destroy contexts that are currently used
|
|
||||||
++it;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (context->isDeleted()) {
|
if (context->isDeleted()) {
|
||||||
try {
|
try {
|
||||||
found.emplace_back(context);
|
found.emplace_back(context);
|
||||||
|
|
Loading…
Reference in New Issue