From c55ce367eb1461c672dc75a3f7632df3692ad79f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Simon=20Gra=CC=88tzer?= Date: Tue, 2 May 2017 13:55:19 +0200 Subject: [PATCH] WAL: honor tick end value --- arangod/RocksDBEngine/RocksDBReplicationTailing.cpp | 13 +++++++------ arangod/RocksDBEngine/RocksDBReplicationTailing.h | 4 +++- .../RocksDBEngine/RocksDBRestReplicationHandler.cpp | 3 ++- arangod/V8Server/v8-replication.cpp | 7 ++++--- 4 files changed, 16 insertions(+), 11 deletions(-) diff --git a/arangod/RocksDBEngine/RocksDBReplicationTailing.cpp b/arangod/RocksDBEngine/RocksDBReplicationTailing.cpp index 75c9f47d32..8cfb15708f 100644 --- a/arangod/RocksDBEngine/RocksDBReplicationTailing.cpp +++ b/arangod/RocksDBEngine/RocksDBReplicationTailing.cpp @@ -411,7 +411,9 @@ class WALParser : public rocksdb::WriteBatch::Handler { // iterates over WAL starting at 'from' and returns up to 'limit' documents // from the corresponding database RocksDBReplicationResult rocksutils::tailWal(TRI_vocbase_t* vocbase, - uint64_t tickStart, size_t chunkSize, + uint64_t tickStart, + uint64_t tickEnd, + size_t chunkSize, bool includeSystem, TRI_voc_cid_t collectionId, VPackBuilder& builder) { @@ -431,7 +433,8 @@ RocksDBReplicationResult rocksutils::tailWal(TRI_vocbase_t* vocbase, // we need to check if the builder is bigger than the chunksize, // only after we printed a full WriteBatch. Otherwise a client might // never read the full writebatch - while (iterator->Valid() && builder.buffer()->size() < chunkSize) { + while (iterator->Valid() && tickEnd <= lastTick && builder.buffer()->size() < chunkSize) { + s = iterator->status(); if (s.ok()) { rocksdb::BatchResult batch = iterator->GetBatch(); @@ -439,12 +442,10 @@ RocksDBReplicationResult rocksutils::tailWal(TRI_vocbase_t* vocbase, if (lastTick == tickStart) { fromTickIncluded = true; } - if (tickStart <= batch.sequence) { + if (tickStart <= lastTick && lastTick <= tickEnd) { handler->startNewBatch(batch.sequence); s = batch.writeBatchPtr->Iterate(handler.get()); - if (s.ok()) { - handler->endBatch(); - } + handler->endBatch(); } } else { LOG_TOPIC(ERR, Logger::ENGINES) << "error during WAL scan"; diff --git a/arangod/RocksDBEngine/RocksDBReplicationTailing.h b/arangod/RocksDBEngine/RocksDBReplicationTailing.h index b6c4a59cb7..461e9df4c0 100644 --- a/arangod/RocksDBEngine/RocksDBReplicationTailing.h +++ b/arangod/RocksDBEngine/RocksDBReplicationTailing.h @@ -37,7 +37,9 @@ namespace rocksutils { // iterates over WAL starting at 'from' and returns up to 'limit' documents // from the corresponding database; releases dumping resources RocksDBReplicationResult tailWal(TRI_vocbase_t* vocbase, - uint64_t tickStart, size_t chunkSize, + uint64_t tickStart, + uint64_t tickEnd, + size_t chunkSize, bool includeSystem, TRI_voc_cid_t collectionId, VPackBuilder& builder) ; diff --git a/arangod/RocksDBEngine/RocksDBRestReplicationHandler.cpp b/arangod/RocksDBEngine/RocksDBRestReplicationHandler.cpp index 4550a33814..e59c4450b8 100644 --- a/arangod/RocksDBEngine/RocksDBRestReplicationHandler.cpp +++ b/arangod/RocksDBEngine/RocksDBRestReplicationHandler.cpp @@ -638,7 +638,8 @@ void RocksDBRestReplicationHandler::handleCommandLoggerFollow() { VPackBuilder builder(transactionContext->getVPackOptions()); builder.openArray(); - auto result = tailWal(_vocbase, tickStart, chunkSize, includeSystem, cid, builder); + auto result = tailWal(_vocbase, tickStart, tickEnd, + chunkSize, includeSystem, cid, builder); builder.close(); auto data = builder.slice(); diff --git a/arangod/V8Server/v8-replication.cpp b/arangod/V8Server/v8-replication.cpp index 161de30afb..ada1122bc3 100644 --- a/arangod/V8Server/v8-replication.cpp +++ b/arangod/V8Server/v8-replication.cpp @@ -266,7 +266,7 @@ static void JS_LastLoggerReplication( } else if (engineName == "rocksdb") { bool includeSystem = true; - size_t limit = tickEnd - tickStart; // TODO: determine good default value? + size_t chunkSize = 32 * 1024 * 1024; // TODO: determine good default value? // construct vocbase with proper handler std::shared_ptr transactionContext = @@ -274,8 +274,9 @@ static void JS_LastLoggerReplication( VPackBuilder builder(transactionContext->getVPackOptions()); builder.openArray(); - RocksDBReplicationResult rep = rocksutils::tailWal(vocbase, tickStart, limit, - includeSystem, builder); + RocksDBReplicationResult rep = rocksutils::tailWal(vocbase, tickStart, + tickEnd, chunkSize, + includeSystem, 0, builder); builder.close(); if (rep.ok()) {