mirror of https://gitee.com/bigwinds/arangodb
WAL: honor tick end value
This commit is contained in:
parent
39d8186e16
commit
c55ce367eb
|
@ -411,7 +411,9 @@ class WALParser : public rocksdb::WriteBatch::Handler {
|
||||||
// iterates over WAL starting at 'from' and returns up to 'limit' documents
|
// iterates over WAL starting at 'from' and returns up to 'limit' documents
|
||||||
// from the corresponding database
|
// from the corresponding database
|
||||||
RocksDBReplicationResult rocksutils::tailWal(TRI_vocbase_t* vocbase,
|
RocksDBReplicationResult rocksutils::tailWal(TRI_vocbase_t* vocbase,
|
||||||
uint64_t tickStart, size_t chunkSize,
|
uint64_t tickStart,
|
||||||
|
uint64_t tickEnd,
|
||||||
|
size_t chunkSize,
|
||||||
bool includeSystem,
|
bool includeSystem,
|
||||||
TRI_voc_cid_t collectionId,
|
TRI_voc_cid_t collectionId,
|
||||||
VPackBuilder& builder) {
|
VPackBuilder& builder) {
|
||||||
|
@ -431,7 +433,8 @@ RocksDBReplicationResult rocksutils::tailWal(TRI_vocbase_t* vocbase,
|
||||||
// we need to check if the builder is bigger than the chunksize,
|
// we need to check if the builder is bigger than the chunksize,
|
||||||
// only after we printed a full WriteBatch. Otherwise a client might
|
// only after we printed a full WriteBatch. Otherwise a client might
|
||||||
// never read the full writebatch
|
// never read the full writebatch
|
||||||
while (iterator->Valid() && builder.buffer()->size() < chunkSize) {
|
while (iterator->Valid() && tickEnd <= lastTick && builder.buffer()->size() < chunkSize) {
|
||||||
|
|
||||||
s = iterator->status();
|
s = iterator->status();
|
||||||
if (s.ok()) {
|
if (s.ok()) {
|
||||||
rocksdb::BatchResult batch = iterator->GetBatch();
|
rocksdb::BatchResult batch = iterator->GetBatch();
|
||||||
|
@ -439,13 +442,11 @@ RocksDBReplicationResult rocksutils::tailWal(TRI_vocbase_t* vocbase,
|
||||||
if (lastTick == tickStart) {
|
if (lastTick == tickStart) {
|
||||||
fromTickIncluded = true;
|
fromTickIncluded = true;
|
||||||
}
|
}
|
||||||
if (tickStart <= batch.sequence) {
|
if (tickStart <= lastTick && lastTick <= tickEnd) {
|
||||||
handler->startNewBatch(batch.sequence);
|
handler->startNewBatch(batch.sequence);
|
||||||
s = batch.writeBatchPtr->Iterate(handler.get());
|
s = batch.writeBatchPtr->Iterate(handler.get());
|
||||||
if (s.ok()) {
|
|
||||||
handler->endBatch();
|
handler->endBatch();
|
||||||
}
|
}
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
LOG_TOPIC(ERR, Logger::ENGINES) << "error during WAL scan";
|
LOG_TOPIC(ERR, Logger::ENGINES) << "error during WAL scan";
|
||||||
auto converted = convertStatus(s);
|
auto converted = convertStatus(s);
|
||||||
|
|
|
@ -37,7 +37,9 @@ namespace rocksutils {
|
||||||
// iterates over WAL starting at 'from' and returns up to 'limit' documents
|
// iterates over WAL starting at 'from' and returns up to 'limit' documents
|
||||||
// from the corresponding database; releases dumping resources
|
// from the corresponding database; releases dumping resources
|
||||||
RocksDBReplicationResult tailWal(TRI_vocbase_t* vocbase,
|
RocksDBReplicationResult tailWal(TRI_vocbase_t* vocbase,
|
||||||
uint64_t tickStart, size_t chunkSize,
|
uint64_t tickStart,
|
||||||
|
uint64_t tickEnd,
|
||||||
|
size_t chunkSize,
|
||||||
bool includeSystem,
|
bool includeSystem,
|
||||||
TRI_voc_cid_t collectionId,
|
TRI_voc_cid_t collectionId,
|
||||||
VPackBuilder& builder) ;
|
VPackBuilder& builder) ;
|
||||||
|
|
|
@ -638,7 +638,8 @@ void RocksDBRestReplicationHandler::handleCommandLoggerFollow() {
|
||||||
|
|
||||||
VPackBuilder builder(transactionContext->getVPackOptions());
|
VPackBuilder builder(transactionContext->getVPackOptions());
|
||||||
builder.openArray();
|
builder.openArray();
|
||||||
auto result = tailWal(_vocbase, tickStart, chunkSize, includeSystem, cid, builder);
|
auto result = tailWal(_vocbase, tickStart, tickEnd,
|
||||||
|
chunkSize, includeSystem, cid, builder);
|
||||||
builder.close();
|
builder.close();
|
||||||
auto data = builder.slice();
|
auto data = builder.slice();
|
||||||
|
|
||||||
|
|
|
@ -266,7 +266,7 @@ static void JS_LastLoggerReplication(
|
||||||
|
|
||||||
} else if (engineName == "rocksdb") {
|
} else if (engineName == "rocksdb") {
|
||||||
bool includeSystem = true;
|
bool includeSystem = true;
|
||||||
size_t limit = tickEnd - tickStart; // TODO: determine good default value?
|
size_t chunkSize = 32 * 1024 * 1024; // TODO: determine good default value?
|
||||||
|
|
||||||
// construct vocbase with proper handler
|
// construct vocbase with proper handler
|
||||||
std::shared_ptr<transaction::Context> transactionContext =
|
std::shared_ptr<transaction::Context> transactionContext =
|
||||||
|
@ -274,8 +274,9 @@ static void JS_LastLoggerReplication(
|
||||||
VPackBuilder builder(transactionContext->getVPackOptions());
|
VPackBuilder builder(transactionContext->getVPackOptions());
|
||||||
|
|
||||||
builder.openArray();
|
builder.openArray();
|
||||||
RocksDBReplicationResult rep = rocksutils::tailWal(vocbase, tickStart, limit,
|
RocksDBReplicationResult rep = rocksutils::tailWal(vocbase, tickStart,
|
||||||
includeSystem, builder);
|
tickEnd, chunkSize,
|
||||||
|
includeSystem, 0, builder);
|
||||||
builder.close();
|
builder.close();
|
||||||
|
|
||||||
if (rep.ok()) {
|
if (rep.ok()) {
|
||||||
|
|
Loading…
Reference in New Issue