From 8da61a987583d5786426d9adf9f9bf4468efa3fe Mon Sep 17 00:00:00 2001 From: Jan Date: Fri, 9 Feb 2018 13:06:47 +0100 Subject: [PATCH] Bug fix/more replication tests (#4500) --- CHANGELOG | 7 + .../api-replication-global-spec.rb | 270 ++++++-- arangod/MMFiles/MMFilesIncrementalSync.h | 8 +- .../MMFiles/MMFilesRestReplicationHandler.cpp | 3 +- arangod/Replication/DatabaseInitialSyncer.cpp | 41 +- arangod/Replication/InitialSyncer.cpp | 19 +- arangod/Replication/InitialSyncer.h | 2 + arangod/Replication/ReplicationApplier.cpp | 12 +- arangod/Replication/Syncer.cpp | 2 +- arangod/Replication/TailingSyncer.cpp | 13 +- arangod/RestHandler/RestWalAccessHandler.cpp | 10 +- .../RocksDBEngine/RocksDBBackgroundThread.cpp | 3 +- arangod/RocksDBEngine/RocksDBCollection.cpp | 13 +- arangod/RocksDBEngine/RocksDBCommon.cpp | 2 +- arangod/RocksDBEngine/RocksDBEngine.cpp | 2 +- .../RocksDBEngine/RocksDBIncrementalSync.cpp | 6 + .../RocksDBReplicationContext.cpp | 33 +- .../RocksDBEngine/RocksDBReplicationContext.h | 10 +- .../RocksDBReplicationManager.cpp | 6 +- .../RocksDBEngine/RocksDBReplicationManager.h | 5 +- .../RocksDBRestReplicationHandler.cpp | 43 +- arangod/RocksDBEngine/RocksDBWalAccess.cpp | 24 +- arangod/VocBase/vocbase.cpp | 53 +- arangod/VocBase/vocbase.h | 14 +- .../@arangodb/testsuites/replication.js | 122 ++++ .../recovery/collection-rename-with-data.js | 96 +++ .../tests/replication/replication-aql.js | 578 ++++++++++++++++++ .../tests/replication/replication-random.js | 335 ++++++++++ 28 files changed, 1553 insertions(+), 179 deletions(-) create mode 100644 js/server/tests/recovery/collection-rename-with-data.js create mode 100644 js/server/tests/replication/replication-aql.js create mode 100644 js/server/tests/replication/replication-random.js diff --git a/CHANGELOG b/CHANGELOG index c16aebd83e..659e13ea18 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -99,6 +99,13 @@ devel Health did not function for multiple servers at the same time, as agency transaction was malformed. + +v3.3.4 (XXXX-XX-XX) +------------------- + +* fix timeout issues in replication client expiration + + v3.3.3 (2018-01-16) ------------------- diff --git a/UnitTests/HttpInterface/api-replication-global-spec.rb b/UnitTests/HttpInterface/api-replication-global-spec.rb index 709a679724..6fc9ac6ae2 100644 --- a/UnitTests/HttpInterface/api-replication-global-spec.rb +++ b/UnitTests/HttpInterface/api-replication-global-spec.rb @@ -657,95 +657,233 @@ describe ArangoDB do doc = ArangoDB.log_delete("#{prefix}-follow-collection", cmd) doc.code.should eq(200) end + + it "validates chunkSize restriction" do + ArangoDB.drop_collection("UnitTestsReplication") + + sleep 1 + + cmd = api + "/lastTick" + doc = ArangoDB.log_get("#{prefix}-follow-chunksize", cmd, :body => "") + doc.code.should eq(200) + fromTick = doc.parsed_response["tick"] + originalTick = fromTick + + # create collection + cid = ArangoDB.create_collection("UnitTestsReplication") + cuid = ArangoDB.properties_collection(cid)["globallyUniqueId"] + + # create documents + (1..1500).each do |value| + cmd = "/_api/document?collection=UnitTestsReplication" + body = "{ \"value\" : \"thisIsALongerStringBecauseWeWantToTestTheChunkSizeLimitsLaterOnAndItGetsEvenLongerWithTimeForRealNow\" }" + doc = ArangoDB.log_post("#{prefix}-follow-chunksize", cmd, :body => body) + doc.code.should eq(201) + end + + + sleep 1 + + tickTypes = { 2000 => 0, 2001 => 0, 2300 => 0 } + + while 1 + cmd = api + "/tail?global=true&from=" + fromTick + "&chunkSize=16384" + doc = ArangoDB.log_get("#{prefix}-follow-chunksize", cmd, :body => "", :format => :plain) + [200, 204].should include(doc.code) + + break if doc.code == 204 + + doc.headers["x-arango-replication-lastincluded"].should match(/^\d+$/) + doc.headers["x-arango-replication-lastincluded"].should_not eq("0") + if fromTick == originalTick + # first batch + doc.headers["x-arango-replication-checkmore"].should eq("true") + end + doc.headers["content-type"].should eq("application/x-arango-dump; charset=utf-8") + + # we need to allow for some overhead here, as the chunkSize restriction is not honored precisely + doc.headers["content-length"].to_i.should be < (16 + 8) * 1024 + + body = doc.response.body + + i = 0 + while 1 + position = body.index("\n") + + break if position == nil + + part = body.slice(0, position) + marker = JSON.parse(part) + + # update last tick value + marker.should have_key("tick") + fromTick = marker["tick"] + + if marker["type"] >= 2000 and marker["cuid"] == cuid + # create collection + marker.should have_key("type") + marker.should have_key("cuid") + + if marker["type"] == 2300 + marker.should have_key("data") + end + + cc = tickTypes[marker["type"]] + tickTypes[marker["type"]] = cc + 1 + + break if tickTypes[2300] == 1500 + end + body = body.slice(position + 1, body.length) + end + end + + tickTypes[2000].should eq(1) # collection create + tickTypes[2001].should eq(0) # collection drop + tickTypes[2300].should eq(1500) # document inserts + + + # now try again with a single chunk + tickTypes = { 2000 => 0, 2001 => 0, 2300 => 0 } + + cmd = api + "/tail?global=true&from=" + originalTick + "&chunkSize=1048576" + doc = ArangoDB.log_get("#{prefix}-follow-chunksize", cmd, :body => "", :format => :plain) + doc.code.should eq(200) + + doc.headers["x-arango-replication-lastincluded"].should match(/^\d+$/) + doc.headers["x-arango-replication-lastincluded"].should_not eq("0") + doc.headers["content-type"].should eq("application/x-arango-dump; charset=utf-8") + + # we need to allow for some overhead here, as the chunkSize restriction is not honored precisely + doc.headers["content-length"].to_i.should be > (16 + 8) * 1024 + + body = doc.response.body + + i = 0 + while 1 + position = body.index("\n") + + break if position == nil + + part = body.slice(0, position) + marker = JSON.parse(part) + + marker.should have_key("tick") + + if marker["type"] >= 2000 and marker["cuid"] == cuid + # create collection + marker.should have_key("type") + marker.should have_key("cuid") + + if marker["type"] == 2300 + marker.should have_key("data") + end + + cc = tickTypes[marker["type"]] + tickTypes[marker["type"]] = cc + 1 + + break if tickTypes[2300] == 1500 + end + body = body.slice(position + 1, body.length) + end + + tickTypes[2000].should eq(1) # collection create + tickTypes[2001].should eq(0) # collection drop + tickTypes[2300].should eq(1500) # document inserts + + # drop collection + cmd = "/_api/collection/UnitTestsReplication" + doc = ArangoDB.log_delete("#{prefix}-follow-chunksize", cmd) + doc.code.should eq(200) + end end ################################################################################ ## inventory / dump ################################################################################ - context "dealing with the initial dump" do + context "dealing with the initial dump" do - api = "/_api/replication" - prefix = "api-replication" + api = "/_api/replication" + prefix = "api-replication" - before do - ArangoDB.drop_collection("UnitTestsReplication") - ArangoDB.drop_collection("UnitTestsReplication2") - doc = ArangoDB.post(api + "/batch", :body => "{}") - doc.code.should eq(200) - @batchId = doc.parsed_response['id'] - @batchId.should match(/^\d+$/) - end + before do + ArangoDB.drop_collection("UnitTestsReplication") + ArangoDB.drop_collection("UnitTestsReplication2") + doc = ArangoDB.post(api + "/batch", :body => "{}") + doc.code.should eq(200) + @batchId = doc.parsed_response['id'] + @batchId.should match(/^\d+$/) + end - after do - ArangoDB.delete(api + "/batch/#{@batchId}", :body => "") - ArangoDB.drop_collection("UnitTestsReplication") - ArangoDB.drop_collection("UnitTestsReplication2") - end + after do + ArangoDB.delete(api + "/batch/#{@batchId}", :body => "") + ArangoDB.drop_collection("UnitTestsReplication") + ArangoDB.drop_collection("UnitTestsReplication2") + end ################################################################################ ## inventory ################################################################################ - it "checks the initial inventory" do - cmd = api + "/inventory?includeSystem=true&global=true&batchId=#{@batchId}" - doc = ArangoDB.log_get("#{prefix}-inventory", cmd, :body => "") + it "checks the initial inventory" do + cmd = api + "/inventory?includeSystem=true&global=true&batchId=#{@batchId}" + doc = ArangoDB.log_get("#{prefix}-inventory", cmd, :body => "") - doc.code.should eq(200) - all = doc.parsed_response - all.should have_key('databases') - all.should have_key('state') - state = all['state'] - state['running'].should eq(true) - state['lastLogTick'].should match(/^\d+$/) - state['time'].should match(/^\d+-\d+-\d+T\d+:\d+:\d+Z$/) - - databases = all['databases'] - databases.each { |name, database| - database.should have_key('collections') + doc.code.should eq(200) + all = doc.parsed_response + all.should have_key('databases') + all.should have_key('state') + state = all['state'] + state['running'].should eq(true) + state['lastLogTick'].should match(/^\d+$/) + state['time'].should match(/^\d+-\d+-\d+T\d+:\d+:\d+Z$/) + + databases = all['databases'] + databases.each { |name, database| + database.should have_key('collections') - collections = database["collections"] - filtered = [ ] - collections.each { |collection| - if [ "UnitTestsReplication", "UnitTestsReplication2" ].include? collection["parameters"]["name"] - filtered.push collection - end - collection["parameters"].should have_key('globallyUniqueId') + collections = database["collections"] + filtered = [ ] + collections.each { |collection| + if [ "UnitTestsReplication", "UnitTestsReplication2" ].include? collection["parameters"]["name"] + filtered.push collection + end + collection["parameters"].should have_key('globallyUniqueId') + } + filtered.should eq([ ]) } - filtered.should eq([ ]) - } - end + end - it "checks the inventory after creating collections" do - cid = ArangoDB.create_collection("UnitTestsReplication", false) - cuid = ArangoDB.properties_collection(cid)["globallyUniqueId"] - cid2 = ArangoDB.create_collection("UnitTestsReplication2", true, 3) - cuid2 = ArangoDB.properties_collection(cid2)["globallyUniqueId"] + it "checks the inventory after creating collections" do + cid = ArangoDB.create_collection("UnitTestsReplication", false) + cuid = ArangoDB.properties_collection(cid)["globallyUniqueId"] + cid2 = ArangoDB.create_collection("UnitTestsReplication2", true, 3) + cuid2 = ArangoDB.properties_collection(cid2)["globallyUniqueId"] - cmd = api + "/inventory?includeSystem=true&global=true&batchId=#{@batchId}" - doc = ArangoDB.log_get("#{prefix}-inventory-create", cmd, :body => "") - doc.code.should eq(200) - all = doc.parsed_response + cmd = api + "/inventory?includeSystem=true&global=true&batchId=#{@batchId}" + doc = ArangoDB.log_get("#{prefix}-inventory-create", cmd, :body => "") + doc.code.should eq(200) + all = doc.parsed_response - all.should have_key('state') - state = all['state'] - state['running'].should eq(true) - state['lastLogTick'].should match(/^\d+$/) - state['time'].should match(/^\d+-\d+-\d+T\d+:\d+:\d+Z$/) + all.should have_key('state') + state = all['state'] + state['running'].should eq(true) + state['lastLogTick'].should match(/^\d+$/) + state['time'].should match(/^\d+-\d+-\d+T\d+:\d+:\d+Z$/) - all.should have_key('databases') - databases = all['databases'] + all.should have_key('databases') + databases = all['databases'] - filtered = [ ] - databases.each { |name, database| - database.should have_key('collections') + filtered = [ ] + databases.each { |name, database| + database.should have_key('collections') - collections = database["collections"] - filtered = [ ] - collections.each { |collection| - if [ "UnitTestsReplication", "UnitTestsReplication2" ].include? collection["parameters"]["name"] - filtered.push collection - end + collections = database["collections"] + filtered = [ ] + collections.each { |collection| + if [ "UnitTestsReplication", "UnitTestsReplication2" ].include? collection["parameters"]["name"] + filtered.push collection + end collection["parameters"].should have_key('globallyUniqueId') } } diff --git a/arangod/MMFiles/MMFilesIncrementalSync.h b/arangod/MMFiles/MMFilesIncrementalSync.h index f59dc41156..43fb0b5dda 100644 --- a/arangod/MMFiles/MMFilesIncrementalSync.h +++ b/arangod/MMFiles/MMFilesIncrementalSync.h @@ -287,6 +287,9 @@ Result handleSyncKeysMMFiles(arangodb::DatabaseInitialSyncer& syncer, SingleCollectionTransaction trx( transaction::StandaloneContext::Create(syncer.vocbase()), coll->cid(), AccessMode::Type::WRITE); + + trx.addHint( + transaction::Hints::Hint::RECOVERY); // to turn off waitForSync! Result res = trx.begin(); @@ -360,10 +363,13 @@ Result handleSyncKeysMMFiles(arangodb::DatabaseInitialSyncer& syncer, transaction::StandaloneContext::Create(syncer.vocbase()), coll->cid(), AccessMode::Type::WRITE); + trx.addHint( + transaction::Hints::Hint::RECOVERY); // to turn off waitForSync! + Result res = trx.begin(); if (!res.ok()) { - return Result(res.errorNumber(), std::string("unable to start transaction : ") + res.errorMessage()); + return Result(res.errorNumber(), std::string("unable to start transaction: ") + res.errorMessage()); } trx.pinData(coll->cid()); // will throw when it fails diff --git a/arangod/MMFiles/MMFilesRestReplicationHandler.cpp b/arangod/MMFiles/MMFilesRestReplicationHandler.cpp index 4b89ded903..43ec7ef37a 100644 --- a/arangod/MMFiles/MMFilesRestReplicationHandler.cpp +++ b/arangod/MMFiles/MMFilesRestReplicationHandler.cpp @@ -29,6 +29,7 @@ #include "MMFiles/MMFilesEngine.h" #include "MMFiles/MMFilesLogfileManager.h" #include "MMFiles/mmfiles-replication-dump.h" +#include "Replication/InitialSyncer.h" #include "RestServer/DatabaseFeature.h" #include "StorageEngine/EngineSelectorFeature.h" #include "StorageEngine/StorageEngine.h" @@ -66,7 +67,7 @@ void MMFilesRestReplicationHandler::insertClient( TRI_server_id_t serverId = static_cast(StringUtils::uint64(value)); if (serverId > 0) { - _vocbase->updateReplicationClient(serverId, lastServedTick); + _vocbase->updateReplicationClient(serverId, lastServedTick, InitialSyncer::defaultBatchTimeout); } } } diff --git a/arangod/Replication/DatabaseInitialSyncer.cpp b/arangod/Replication/DatabaseInitialSyncer.cpp index 230d3da769..4368fffdf1 100644 --- a/arangod/Replication/DatabaseInitialSyncer.cpp +++ b/arangod/Replication/DatabaseInitialSyncer.cpp @@ -90,6 +90,8 @@ Result DatabaseInitialSyncer::runWithInventory(bool incremental, setAborted(false); + double const startTime = TRI_microtime(); + try { setProgress("fetching master state"); @@ -151,6 +153,8 @@ Result DatabaseInitialSyncer::runWithInventory(bool incremental, // all done here sendFinishBatch(); + + LOG_TOPIC(DEBUG, Logger::REPLICATION) << "initial synchronization with master took: " << Logger::FIXED(TRI_microtime() - startTime, 6) << " s. status: " << r.errorMessage(); return r; } catch (arangodb::basics::Exception const& ex) { sendFinishBatch(); @@ -197,7 +201,7 @@ Result DatabaseInitialSyncer::sendFlush() { "\"waitForCollectorQueue\":true}"; // send request - std::string const progress = "send WAL flush command to url " + url; + std::string const progress = "sending WAL flush command to url " + url; setProgress(progress); std::unique_ptr response(_client->retryRequest( @@ -227,6 +231,8 @@ Result DatabaseInitialSyncer::applyCollectionDump(transaction::Methods& trx, TRI_ASSERT(*end == '\0'); auto builder = std::make_shared(); + std::string const typeString("type"); + std::string const dataString("data"); while (p < end) { char const* q = strchr(p, '\n'); @@ -260,51 +266,41 @@ Result DatabaseInitialSyncer::applyCollectionDump(transaction::Methods& trx, } TRI_replication_operation_e type = REPLICATION_INVALID; - std::string key; VPackSlice doc; - for (auto const& it : VPackObjectIterator(slice)) { - std::string const attributeName(it.key.copyString()); - - if (attributeName == "type") { + for (auto const& it : VPackObjectIterator(slice, true)) { + if (it.key.isEqualString(typeString)) { if (it.value.isNumber()) { type = static_cast( it.value.getNumber()); } - } - - else if (attributeName == "data") { + } else if (it.key.isEqualString(dataString)) { if (it.value.isObject()) { doc = it.value; } } } + char const* key = nullptr; + VPackValueLength keyLength = 0; + if (!doc.isNone()) { VPackSlice value = doc.get(StaticStrings::KeyString); if (value.isString()) { - key = value.copyString(); + key = value.getString(keyLength); } - - /* TODO: rev is currently not used - value = doc.get(StaticStrings::RevString); - - if (value.isString()) { - rev = value.copyString(); - } - */ } // key must not be empty, but doc can be empty - if (key.empty()) { + if (key == nullptr || keyLength == 0) { return Result(TRI_ERROR_REPLICATION_INVALID_RESPONSE, invalidMsg); } ++markersProcessed; transaction::BuilderLeaser oldBuilder(&trx); - oldBuilder->openObject(); - oldBuilder->add(StaticStrings::KeyString, VPackValue(key)); + oldBuilder->openObject(true); + oldBuilder->add(StaticStrings::KeyString, VPackValuePair(key, keyLength, VPackValueType::String)); oldBuilder->close(); VPackSlice const old = oldBuilder->slice(); @@ -488,6 +484,9 @@ Result DatabaseInitialSyncer::handleCollectionDump(arangodb::LogicalCollection* transaction::StandaloneContext::Create(vocbase()), coll->cid(), AccessMode::Type::EXCLUSIVE); + trx.addHint( + transaction::Hints::Hint::RECOVERY); // to turn off waitForSync! + Result res = trx.begin(); if (!res.ok()) { diff --git a/arangod/Replication/InitialSyncer.cpp b/arangod/Replication/InitialSyncer.cpp index 577f59c407..41a5ca6044 100644 --- a/arangod/Replication/InitialSyncer.cpp +++ b/arangod/Replication/InitialSyncer.cpp @@ -68,7 +68,7 @@ InitialSyncer::InitialSyncer( _processedCollections(), _batchId(0), _batchUpdateTime(0), - _batchTtl(300) {} + _batchTtl(defaultBatchTimeout) {} InitialSyncer::~InitialSyncer() { try { @@ -82,13 +82,14 @@ Result InitialSyncer::sendStartBatch() { return Result(); } + double const now = TRI_microtime(); _batchId = 0; std::string const url = ReplicationUrl + "/batch" + "?serverId=" + _localServerIdString; std::string const body = "{\"ttl\":" + StringUtils::itoa(_batchTtl) + "}"; // send request - std::string const progress = "send batch start command to url " + url; + std::string const progress = "sending batch start command to url " + url; setProgress(progress); std::unique_ptr response(_client->retryRequest( @@ -116,7 +117,7 @@ Result InitialSyncer::sendStartBatch() { } _batchId = StringUtils::uint64(id); - _batchUpdateTime = TRI_microtime(); + _batchUpdateTime = now; return Result(); } @@ -127,10 +128,10 @@ Result InitialSyncer::sendExtendBatch() { return Result(); } - double now = TRI_microtime(); + double const now = TRI_microtime(); - if (now <= _batchUpdateTime + _batchTtl - 60.0) { - // no need to extend the batch yet + if (now <= _batchUpdateTime + _batchTtl * 0.25) { + // no need to extend the batch yet - only extend it if a quarter of its ttl is already over return Result(); } @@ -139,7 +140,7 @@ Result InitialSyncer::sendExtendBatch() { std::string const body = "{\"ttl\":" + StringUtils::itoa(_batchTtl) + "}"; // send request - std::string const progress = "send batch extend command to url " + url; + std::string const progress = "sending batch extend command to url " + url; setProgress(progress); std::unique_ptr response( @@ -149,7 +150,7 @@ Result InitialSyncer::sendExtendBatch() { return buildHttpError(response.get(), url); } - _batchUpdateTime = TRI_microtime(); + _batchUpdateTime = now; return Result(); } @@ -165,7 +166,7 @@ Result InitialSyncer::sendFinishBatch() { "?serverId=" + _localServerIdString; // send request - std::string const progress = "send batch finish command to url " + url; + std::string const progress = "sending batch finish command to url " + url; setProgress(progress); std::unique_ptr response( diff --git a/arangod/Replication/InitialSyncer.h b/arangod/Replication/InitialSyncer.h index 1567509932..a58482a4b4 100644 --- a/arangod/Replication/InitialSyncer.h +++ b/arangod/Replication/InitialSyncer.h @@ -35,6 +35,8 @@ struct TRI_vocbase_t; namespace arangodb { class InitialSyncer : public Syncer { + public: + static constexpr double defaultBatchTimeout = 300.0; public: explicit InitialSyncer(ReplicationApplierConfiguration const&); diff --git a/arangod/Replication/ReplicationApplier.cpp b/arangod/Replication/ReplicationApplier.cpp index df2ef7ab66..e550ea3a28 100644 --- a/arangod/Replication/ReplicationApplier.cpp +++ b/arangod/Replication/ReplicationApplier.cpp @@ -211,7 +211,7 @@ void ReplicationApplier::start(TRI_voc_tick_t initialTick, bool useTick, TRI_voc LOG_TOPIC(DEBUG, Logger::REPLICATION) << "requesting replication applier start for " << _databaseName << ". initialTick: " << initialTick - << ", useTick: " << useTick; + << ", useTick: " << useTick << ", barrierId: " << barrierId; if (_configuration._endpoint.empty()) { Result r(TRI_ERROR_REPLICATION_INVALID_APPLIER_CONFIGURATION, "no endpoint configured"); @@ -225,6 +225,16 @@ void ReplicationApplier::start(TRI_voc_tick_t initialTick, bool useTick, TRI_voc THROW_ARANGO_EXCEPTION(r); } + { + VPackBuilder b; + b.openObject(); + _configuration.toVelocyPack(b, false, false); + b.close(); + + LOG_TOPIC(DEBUG, Logger::REPLICATION) + << "starting applier with configuration " << b.slice().toJson(); + } + // reset error _state._lastError.reset(); diff --git a/arangod/Replication/Syncer.cpp b/arangod/Replication/Syncer.cpp index 7f4477f42d..7aaf31610f 100644 --- a/arangod/Replication/Syncer.cpp +++ b/arangod/Replication/Syncer.cpp @@ -223,7 +223,7 @@ Result Syncer::sendExtendBarrier(TRI_voc_tick_t tick) { double now = TRI_microtime(); - if (now <= _barrierUpdateTime + _barrierTtl - 120.0) { + if (now <= _barrierUpdateTime + _barrierTtl * 0.25) { // no need to extend the barrier yet return Result(); } diff --git a/arangod/Replication/TailingSyncer.cpp b/arangod/Replication/TailingSyncer.cpp index c1d0cd45de..f0c5195a79 100644 --- a/arangod/Replication/TailingSyncer.cpp +++ b/arangod/Replication/TailingSyncer.cpp @@ -1341,8 +1341,8 @@ Result TailingSyncer::followMasterLog(TRI_voc_tick_t& fetchTick, uint64_t& ignoreCount, bool& worked, bool& masterActive) { std::string const baseUrl = tailingBaseUrl("tail") + "chunkSize=" + - StringUtils::itoa(_configuration._chunkSize) + "&barrier=" + - StringUtils::itoa(_barrierId); + StringUtils::itoa(_configuration._chunkSize) + "&barrier=" + + StringUtils::itoa(_barrierId); TRI_voc_tick_t const originalFetchTick = fetchTick; worked = false; @@ -1355,10 +1355,11 @@ Result TailingSyncer::followMasterLog(TRI_voc_tick_t& fetchTick, // send request std::string const progress = - "fetching master log from tick " + StringUtils::itoa(fetchTick) + - ", first regular tick " + StringUtils::itoa(firstRegularTick) + - ", barrier: " + StringUtils::itoa(_barrierId) + ", open transactions: " + - std::to_string(_ongoingTransactions.size()); + "fetching master log from tick " + StringUtils::itoa(fetchTick) + + ", first regular tick " + StringUtils::itoa(firstRegularTick) + + ", barrier: " + StringUtils::itoa(_barrierId) + ", open transactions: " + + std::to_string(_ongoingTransactions.size()) + ", chunk size " + std::to_string(_configuration._chunkSize); + setProgress(progress); std::string body; diff --git a/arangod/RestHandler/RestWalAccessHandler.cpp b/arangod/RestHandler/RestWalAccessHandler.cpp index 57359116b0..d5192b435e 100644 --- a/arangod/RestHandler/RestWalAccessHandler.cpp +++ b/arangod/RestHandler/RestWalAccessHandler.cpp @@ -26,6 +26,7 @@ #include "Basics/VPackStringBufferAdapter.h" #include "Basics/VelocyPackHelper.h" #include "Replication/common-defines.h" +#include "Replication/InitialSyncer.h" #include "Rest/HttpResponse.h" #include "Rest/Version.h" #include "RestServer/DatabaseFeature.h" @@ -346,12 +347,13 @@ void RestWalAccessHandler::handleCommandTail(WalAccess const* wal) { if (found) { serverId = static_cast(StringUtils::uint64(value)); } + DatabaseFeature::DATABASE->enumerateDatabases([&](TRI_vocbase_t* vocbase) { - vocbase->updateReplicationClient(serverId, result.lastIncludedTick()); + vocbase->updateReplicationClient(serverId, result.lastIncludedTick(), InitialSyncer::defaultBatchTimeout); }); - LOG_TOPIC(DEBUG, Logger::REPLICATION) << "Wal tailing after " << tickStart - << ", lastIncludedTick " << result.lastIncludedTick() - << ", fromTickIncluded " << result.fromTickIncluded(); + LOG_TOPIC(DEBUG, Logger::REPLICATION) << "WAL tailing after " << tickStart + << ", lastIncludedTick " << result.lastIncludedTick() + << ", fromTickIncluded " << result.fromTickIncluded(); } else { LOG_TOPIC(DEBUG, Logger::REPLICATION) << "No more data in WAL after " << tickStart; _response->setResponseCode(rest::ResponseCode::NO_CONTENT); diff --git a/arangod/RocksDBEngine/RocksDBBackgroundThread.cpp b/arangod/RocksDBEngine/RocksDBBackgroundThread.cpp index 2522f3038c..7ccc3926a1 100644 --- a/arangod/RocksDBEngine/RocksDBBackgroundThread.cpp +++ b/arangod/RocksDBEngine/RocksDBBackgroundThread.cpp @@ -73,8 +73,7 @@ void RocksDBBackgroundThread::run() { DatabaseFeature::DATABASE->enumerateDatabases( [force, &minTick](TRI_vocbase_t* vocbase) { vocbase->cursorRepository()->garbageCollect(force); - // FIXME: configurable interval tied to follower timeout - vocbase->garbageCollectReplicationClients(120.0); + vocbase->garbageCollectReplicationClients(TRI_microtime()); auto clients = vocbase->getReplicationClients(); for (auto c : clients) { if (std::get<2>(c) < minTick) { diff --git a/arangod/RocksDBEngine/RocksDBCollection.cpp b/arangod/RocksDBEngine/RocksDBCollection.cpp index 02bc7a3b46..63c8c8a01d 100644 --- a/arangod/RocksDBEngine/RocksDBCollection.cpp +++ b/arangod/RocksDBEngine/RocksDBCollection.cpp @@ -740,6 +740,7 @@ void RocksDBCollection::truncate(transaction::Methods* trx, // This should never happen... THROW_ARANGO_EXCEPTION_MESSAGE(res.errorNumber(), res.errorMessage()); } + trackWaitForSync(trx, options); if (found % 10000 == 0) { state->triggerIntermediateCommit(); @@ -876,7 +877,6 @@ Result RocksDBCollection::insert(arangodb::transaction::Methods* trx, TRI_VOC_DOCUMENT_OPERATION_INSERT); res = insertDocument(trx, documentId, newSlice, options); - if (res.ok()) { trackWaitForSync(trx, options); mdr.setManaged(newSlice.begin(), documentId); @@ -944,10 +944,7 @@ Result RocksDBCollection::update(arangodb::transaction::Methods* trx, TRI_ASSERT(!mdr.empty()); - if (_logicalCollection->waitForSync()) { - trx->state()->waitForSync(true); - options.waitForSync = true; - } + trackWaitForSync(trx, options); return Result(); } @@ -1077,7 +1074,6 @@ Result RocksDBCollection::replace( RocksDBOperationResult opResult = updateDocument( trx, oldDocumentId, oldDoc, documentId, newDoc, options); - if (opResult.ok()) { trackWaitForSync(trx, options); @@ -1159,7 +1155,6 @@ Result RocksDBCollection::remove(arangodb::transaction::Methods* trx, state->prepareOperation(_logicalCollection->cid(), documentId.id(), StringRef(key),TRI_VOC_DOCUMENT_OPERATION_REMOVE); res = removeDocument(trx, oldDocumentId, oldDoc, options); - if (res.ok()) { trackWaitForSync(trx, options); @@ -1527,7 +1522,7 @@ RocksDBOperationResult RocksDBCollection::updateDocument( blackListKey(oldKey->string().data(), static_cast(oldKey->string().size())); - res = mthd->Delete(RocksDBColumnFamily::documents(), oldKey.ref()); + res = mthd->Delete(RocksDBColumnFamily::documents(), oldKey.ref()); if (!res.ok()) { return res; } @@ -2016,7 +2011,7 @@ void RocksDBCollection::blackListKey(char const* data, std::size_t len) const { void RocksDBCollection::trackWaitForSync(arangodb::transaction::Methods* trx, OperationOptions& options) { - if (_logicalCollection->waitForSync()) { + if (_logicalCollection->waitForSync() && !options.isRestore) { options.waitForSync = true; } diff --git a/arangod/RocksDBEngine/RocksDBCommon.cpp b/arangod/RocksDBEngine/RocksDBCommon.cpp index f8a4ba546a..79cbd9b762 100644 --- a/arangod/RocksDBEngine/RocksDBCommon.cpp +++ b/arangod/RocksDBEngine/RocksDBCommon.cpp @@ -212,7 +212,7 @@ Result removeLargeRange(rocksdb::TransactionDB* db, } } - LOG_TOPIC(DEBUG, Logger::ROCKSDB) << "removing large range deleted in total: " << total; + LOG_TOPIC(DEBUG, Logger::ROCKSDB) << "removing large range, deleted in total: " << total; if (counter > 0) { LOG_TOPIC(DEBUG, Logger::FIXME) << "intermediate delete write"; diff --git a/arangod/RocksDBEngine/RocksDBEngine.cpp b/arangod/RocksDBEngine/RocksDBEngine.cpp index 2dccfdb437..2000f2f637 100644 --- a/arangod/RocksDBEngine/RocksDBEngine.cpp +++ b/arangod/RocksDBEngine/RocksDBEngine.cpp @@ -1411,7 +1411,7 @@ void RocksDBEngine::determinePrunableWalFiles(TRI_voc_tick_t minTickExternal) { auto const& f = files[current].get(); if (f->Type() == rocksdb::WalFileType::kArchivedLogFile) { if (_prunableWalFiles.find(f->PathName()) == _prunableWalFiles.end()) { - LOG_TOPIC(DEBUG, Logger::ROCKSDB) << "RocksDB WAL file '" << f->PathName() << "' added to prunable list"; + LOG_TOPIC(DEBUG, Logger::ROCKSDB) << "RocksDB WAL file '" << f->PathName() << "' with start sequence " << f->StartSequence() << " added to prunable list"; _prunableWalFiles.emplace(f->PathName(), TRI_microtime() + _pruneWaitTime); } diff --git a/arangod/RocksDBEngine/RocksDBIncrementalSync.cpp b/arangod/RocksDBEngine/RocksDBIncrementalSync.cpp index 2297d91c8b..1529aa0727 100644 --- a/arangod/RocksDBEngine/RocksDBIncrementalSync.cpp +++ b/arangod/RocksDBEngine/RocksDBIncrementalSync.cpp @@ -416,6 +416,9 @@ Result handleSyncKeysRocksDB(DatabaseInitialSyncer& syncer, transaction::StandaloneContext::Create(syncer.vocbase()), col->cid(), AccessMode::Type::EXCLUSIVE); + trx.addHint( + transaction::Hints::Hint::RECOVERY); // to turn off waitForSync! + Result res = trx.begin(); if (!res.ok()) { @@ -473,6 +476,9 @@ Result handleSyncKeysRocksDB(DatabaseInitialSyncer& syncer, transaction::StandaloneContext::Create(syncer.vocbase()), col->cid(), AccessMode::Type::EXCLUSIVE); + trx.addHint( + transaction::Hints::Hint::RECOVERY); // to turn off waitForSync! + Result res = trx.begin(); if (!res.ok()) { diff --git a/arangod/RocksDBEngine/RocksDBReplicationContext.cpp b/arangod/RocksDBEngine/RocksDBReplicationContext.cpp index 74a83d54fb..fcc842fc11 100644 --- a/arangod/RocksDBEngine/RocksDBReplicationContext.cpp +++ b/arangod/RocksDBEngine/RocksDBReplicationContext.cpp @@ -28,6 +28,7 @@ #include "Basics/VPackStringBufferAdapter.h" #include "Logger/Logger.h" #include "Replication/common-defines.h" +#include "Replication/InitialSyncer.h" #include "RestServer/DatabaseFeature.h" #include "RocksDBEngine/RocksDBCollection.h" #include "RocksDBEngine/RocksDBCommon.h" @@ -40,6 +41,7 @@ #include "Utils/DatabaseGuard.h" #include "Utils/ExecContext.h" #include "VocBase/ticks.h" +#include "VocBase/vocbase.h" #include #include @@ -48,10 +50,10 @@ using namespace arangodb; using namespace arangodb::rocksutils; using namespace arangodb::velocypack; -double const RocksDBReplicationContext::DefaultTTL = 300.0; // seconds - -RocksDBReplicationContext::RocksDBReplicationContext(double ttl) - : _id(TRI_NewTickServer()), +RocksDBReplicationContext::RocksDBReplicationContext(TRI_vocbase_t* vocbase, double ttl, TRI_server_id_t serverId) + : _vocbase(vocbase), + _serverId(serverId), + _id(TRI_NewTickServer()), _lastTick(0), _currentTick(0), _trx(), @@ -60,7 +62,8 @@ RocksDBReplicationContext::RocksDBReplicationContext(double ttl) _mdr(), _customTypeHandler(), _vpackOptions(Options::Defaults), - _expires(TRI_microtime() + ttl), + _ttl(ttl), + _expires(TRI_microtime() + _ttl), _isDeleted(false), _isUsed(true), _hasMore(true) {} @@ -521,15 +524,31 @@ void RocksDBReplicationContext::use(double ttl) { TRI_ASSERT(!_isUsed); _isUsed = true; - if (ttl <= 0.0) { - ttl = DefaultTTL; + if (_ttl > 0.0) { + ttl = _ttl; + } else { + ttl = InitialSyncer::defaultBatchTimeout; } _expires = TRI_microtime() + ttl; + if (_serverId != 0) { + _vocbase->updateReplicationClient(_serverId, ttl); + } } void RocksDBReplicationContext::release() { TRI_ASSERT(_isUsed); _isUsed = false; + if (_serverId != 0) { + double ttl; + if (_ttl > 0.0) { + // use TTL as configured + ttl = _ttl; + } else { + // none configuration. use default + ttl = InitialSyncer::defaultBatchTimeout; + } + _vocbase->updateReplicationClient(_serverId, ttl); + } } void RocksDBReplicationContext::releaseDumpingResources() { diff --git a/arangod/RocksDBEngine/RocksDBReplicationContext.h b/arangod/RocksDBEngine/RocksDBReplicationContext.h index c54a1344dd..bcca86786c 100644 --- a/arangod/RocksDBEngine/RocksDBReplicationContext.h +++ b/arangod/RocksDBEngine/RocksDBReplicationContext.h @@ -40,10 +40,6 @@ namespace arangodb { class DatabaseGuard; class RocksDBReplicationContext { - public: - /// default time-to-live for contexts - static double const DefaultTTL; - private: typedef std::function LocalDocumentIdCallback; @@ -52,7 +48,7 @@ class RocksDBReplicationContext { RocksDBReplicationContext(RocksDBReplicationContext const&) = delete; RocksDBReplicationContext& operator=(RocksDBReplicationContext const&) = delete; - explicit RocksDBReplicationContext(double ttl); + RocksDBReplicationContext(TRI_vocbase_t* vocbase, double ttl, TRI_server_id_t server_id); ~RocksDBReplicationContext(); TRI_voc_tick_t id() const; //batchId @@ -107,6 +103,8 @@ class RocksDBReplicationContext { void releaseDumpingResources(); private: + TRI_vocbase_t* _vocbase; + TRI_server_id_t const _serverId; TRI_voc_tick_t _id; // batch id uint64_t _lastTick; // the time at which the snapshot was taken uint64_t _currentTick; // shows how often dump was called @@ -121,7 +119,6 @@ class RocksDBReplicationContext { /// @brief offset in the collection used with the incremental sync uint64_t _lastIteratorOffset; - /// @brief holds last document ManagedDocumentResult _mdr; @@ -129,6 +126,7 @@ class RocksDBReplicationContext { std::shared_ptr _customTypeHandler; arangodb::velocypack::Options _vpackOptions; + double const _ttl; double _expires; bool _isDeleted; bool _isUsed; diff --git a/arangod/RocksDBEngine/RocksDBReplicationManager.cpp b/arangod/RocksDBEngine/RocksDBReplicationManager.cpp index 1e6cde39b4..6d941e096d 100644 --- a/arangod/RocksDBEngine/RocksDBReplicationManager.cpp +++ b/arangod/RocksDBEngine/RocksDBReplicationManager.cpp @@ -95,8 +95,10 @@ RocksDBReplicationManager::~RocksDBReplicationManager() { /// there are active contexts ////////////////////////////////////////////////////////////////////////////// -RocksDBReplicationContext* RocksDBReplicationManager::createContext(double ttl) { - auto context = std::make_unique(ttl); +RocksDBReplicationContext* RocksDBReplicationManager::createContext(TRI_vocbase_t* vocbase, + double ttl, + TRI_server_id_t serverId) { + auto context = std::make_unique(vocbase, ttl, serverId); TRI_ASSERT(context.get() != nullptr); TRI_ASSERT(context->isUsed()); diff --git a/arangod/RocksDBEngine/RocksDBReplicationManager.h b/arangod/RocksDBEngine/RocksDBReplicationManager.h index 193be9c565..e8815f24ea 100644 --- a/arangod/RocksDBEngine/RocksDBReplicationManager.h +++ b/arangod/RocksDBEngine/RocksDBReplicationManager.h @@ -25,6 +25,7 @@ #include "Basics/Common.h" #include "Basics/Mutex.h" +#include "Replication/InitialSyncer.h" #include "RocksDBEngine/RocksDBReplicationContext.h" struct TRI_vocbase_t; @@ -54,7 +55,7 @@ class RocksDBReplicationManager { /// there are active contexts ////////////////////////////////////////////////////////////////////////////// - RocksDBReplicationContext* createContext(double ttl); + RocksDBReplicationContext* createContext(TRI_vocbase_t* vocbase, double ttl, TRI_server_id_t serverId); ////////////////////////////////////////////////////////////////////////////// /// @brief remove a context by id @@ -72,7 +73,7 @@ class RocksDBReplicationManager { RocksDBReplicationContext* find( RocksDBReplicationId, bool& isBusy, - double ttl = RocksDBReplicationContext::DefaultTTL); + double ttl = InitialSyncer::defaultBatchTimeout); ////////////////////////////////////////////////////////////////////////////// /// @brief return a context for later use diff --git a/arangod/RocksDBEngine/RocksDBRestReplicationHandler.cpp b/arangod/RocksDBEngine/RocksDBRestReplicationHandler.cpp index a0817a7d43..e1d098dac7 100644 --- a/arangod/RocksDBEngine/RocksDBRestReplicationHandler.cpp +++ b/arangod/RocksDBEngine/RocksDBRestReplicationHandler.cpp @@ -27,6 +27,7 @@ #include "Basics/VPackStringBufferAdapter.h" #include "Basics/VelocyPackHelper.h" #include "Logger/Logger.h" +#include "Replication/InitialSyncer.h" #include "RestServer/DatabaseFeature.h" #include "RocksDBEngine/RocksDBCommon.h" #include "RocksDBEngine/RocksDBEngine.h" @@ -72,10 +73,20 @@ void RocksDBRestReplicationHandler::handleCommandBatch() { return; } - double ttl = VelocyPackHelper::getNumericValue(input->slice(), "ttl", - RocksDBReplicationContext::DefaultTTL); + double ttl = VelocyPackHelper::getNumericValue(input->slice(), "ttl", InitialSyncer::defaultBatchTimeout); + + bool found; + std::string const& value = _request->value("serverId", found); + TRI_server_id_t serverId = 0; + + if (!found || (!value.empty() && value != "none")) { + if (found) { + serverId = static_cast(StringUtils::uint64(value)); + } + } + // create transaction+snapshot - RocksDBReplicationContext* ctx = _manager->createContext(ttl); + RocksDBReplicationContext* ctx = _manager->createContext(_vocbase, ttl, serverId); RocksDBReplicationContextGuard guard(_manager, ctx); ctx->bind(_vocbase); @@ -85,24 +96,12 @@ void RocksDBRestReplicationHandler::handleCommandBatch() { b.add("lastTick", VPackValue(std::to_string(ctx->lastTick()))); b.close(); - // add client - bool found; - std::string const& value = _request->value("serverId", found); - if (!found) { - LOG_TOPIC(DEBUG, Logger::FIXME) << "no serverId parameter found in request to " << _request->fullUrl(); + if (serverId == 0) { + serverId = ctx->id(); } - - if (!found || (!value.empty() && value != "none")) { - TRI_server_id_t serverId = 0; - if (found) { - serverId = static_cast(StringUtils::uint64(value)); - } else { - serverId = ctx->id(); - } + _vocbase->updateReplicationClient(serverId, ctx->lastTick(), ttl); - _vocbase->updateReplicationClient(serverId, ctx->lastTick()); - } generateResult(rest::ResponseCode::OK, b.slice()); return; } @@ -121,11 +120,11 @@ void RocksDBRestReplicationHandler::handleCommandBatch() { } // extract ttl - double expires = VelocyPackHelper::getNumericValue(input->slice(), "ttl", RocksDBReplicationContext::DefaultTTL); + double ttl = VelocyPackHelper::getNumericValue(input->slice(), "ttl", 0); int res = TRI_ERROR_NO_ERROR; bool busy; - RocksDBReplicationContext* ctx = _manager->find(id, busy, expires); + RocksDBReplicationContext* ctx = _manager->find(id, busy, ttl); RocksDBReplicationContextGuard guard(_manager, ctx); if (busy) { res = TRI_ERROR_CURSOR_BUSY; @@ -153,7 +152,7 @@ void RocksDBRestReplicationHandler::handleCommandBatch() { serverId = ctx->id(); } - _vocbase->updateReplicationClient(serverId, ctx->lastTick()); + _vocbase->updateReplicationClient(serverId, ctx->lastTick(), ttl); } resetResponse(rest::ResponseCode::NO_CONTENT); return; @@ -328,7 +327,7 @@ void RocksDBRestReplicationHandler::handleCommandLoggerFollow() { if (!found || (!value.empty() && value != "none")) { TRI_server_id_t serverId = static_cast(StringUtils::uint64(value)); - _vocbase->updateReplicationClient(serverId, result.maxTick()); + _vocbase->updateReplicationClient(serverId, result.maxTick(), InitialSyncer::defaultBatchTimeout); } } } diff --git a/arangod/RocksDBEngine/RocksDBWalAccess.cpp b/arangod/RocksDBEngine/RocksDBWalAccess.cpp index 3e50ff12fb..cb18db11b0 100644 --- a/arangod/RocksDBEngine/RocksDBWalAccess.cpp +++ b/arangod/RocksDBEngine/RocksDBWalAccess.cpp @@ -151,6 +151,7 @@ class MyWALParser : public rocksdb::WriteBatch::Handler, VPackValueType::String)); } _callback(vocbase, _builder.slice()); + _responseSize += _builder.size(); _builder.clear(); } break; @@ -174,6 +175,7 @@ class MyWALParser : public rocksdb::WriteBatch::Handler, marker->add("data", RocksDBLogValue::indexSlice(blob)); } _callback(vocbase, _builder.slice()); + _responseSize += _builder.size(); _builder.clear(); } } @@ -200,6 +202,7 @@ class MyWALParser : public rocksdb::WriteBatch::Handler, data->add("id", VPackValue(std::to_string(iid))); } _callback(vocbase, _builder.slice()); + _responseSize += _builder.size(); _builder.clear(); } } @@ -229,6 +232,7 @@ class MyWALParser : public rocksdb::WriteBatch::Handler, marker->add("tid", VPackValue(std::to_string(_currentTrxId))); } _callback(vocbase, _builder.slice()); + _responseSize += _builder.size(); _builder.clear(); } break; @@ -319,6 +323,7 @@ class MyWALParser : public rocksdb::WriteBatch::Handler, marker->add("data", data); } _callback(loadVocbase(_currentDbId), _builder.slice()); + _responseSize += _builder.size(); _builder.clear(); } } else if (_lastLogType == RocksDBLogType::DatabaseDrop) { @@ -332,6 +337,7 @@ class MyWALParser : public rocksdb::WriteBatch::Handler, marker->add("db", name); } _callback(loadVocbase(_currentDbId), _builder.slice()); + _responseSize += _builder.size(); _builder.clear(); } else { TRI_ASSERT(false); // unexpected @@ -370,6 +376,7 @@ class MyWALParser : public rocksdb::WriteBatch::Handler, } } _callback(loadVocbase(_currentDbId), _builder.slice()); + _responseSize += _builder.size(); _builder.clear(); } @@ -400,6 +407,7 @@ class MyWALParser : public rocksdb::WriteBatch::Handler, marker->add("data", RocksDBValue::data(value)); } _callback(loadVocbase(_currentDbId), _builder.slice()); + _responseSize += _builder.size(); _builder.clear(); } // reset whether or not marker was printed @@ -499,6 +507,7 @@ class MyWALParser : public rocksdb::WriteBatch::Handler, data->add(StaticStrings::RevString, VPackValue(std::to_string(rid))); } _callback(loadVocbase(_currentDbId), _builder.slice()); + _responseSize += _builder.size(); _builder.clear(); } // reset whether or not marker was printed @@ -523,13 +532,14 @@ class MyWALParser : public rocksdb::WriteBatch::Handler, TRI_ASSERT(_seenBeginTransaction && !_singleOp); TRI_vocbase_t* vocbase = loadVocbase(_currentDbId); if (vocbase != nullptr) { // we be in shutdown - _builder.openObject(); + _builder.openObject(true); _builder.add("tick", VPackValue(std::to_string(_currentSequence))); _builder.add("type", VPackValue(static_cast(REPLICATION_TRANSACTION_COMMIT))); _builder.add("db", VPackValue(vocbase->name())); _builder.add("tid", VPackValue(std::to_string(_currentTrxId))); _builder.close(); _callback(vocbase, _builder.slice()); + _responseSize += _builder.size(); _builder.clear(); _seenBeginTransaction = false; } @@ -676,18 +686,24 @@ WalAccessResult RocksDBWalAccess::tail(uint64_t tickStart, uint64_t tickEnd, 0, latestTick); } + if (chunkSize < 16384) { + // we need to have some sensible minimum + chunkSize = 16384; + } + // we need to check if the builder is bigger than the chunksize, // only after we printed a full WriteBatch. Otherwise a client might // never read the full writebatch + LOG_TOPIC(DEBUG, Logger::ROCKSDB) << "WAL tailing call. tick start: " << tickStart << ", tick end: " << tickEnd << ", chunk size: " << chunkSize; while (iterator->Valid() && lastTick <= tickEnd && handler->responseSize() < chunkSize) { s = iterator->status(); if (!s.ok()) { - LOG_TOPIC(ERR, Logger::ENGINES) << "error during WAL scan: " + LOG_TOPIC(ERR, Logger::ROCKSDB) << "error during WAL scan: " << s.ToString(); break; // s is considered in the end } - + rocksdb::BatchResult batch = iterator->GetBatch(); // record the first tick we are actually considering if (firstTick == UINT64_MAX) { @@ -707,7 +723,7 @@ WalAccessResult RocksDBWalAccess::tail(uint64_t tickStart, uint64_t tickEnd, s = batch.writeBatchPtr->Iterate(handler.get()); if (!s.ok()) { - LOG_TOPIC(ERR, Logger::ENGINES) << "error during WAL scan: " + LOG_TOPIC(ERR, Logger::ROCKSDB) << "error during WAL scan: " << s.ToString(); break; // s is considered in the end } diff --git a/arangod/VocBase/vocbase.cpp b/arangod/VocBase/vocbase.cpp index fe7af451d2..b1a0096313 100644 --- a/arangod/VocBase/vocbase.cpp +++ b/arangod/VocBase/vocbase.cpp @@ -50,6 +50,7 @@ #include "Cluster/ServerState.h" #include "Logger/Logger.h" #include "Replication/DatabaseReplicationApplier.h" +#include "Replication/InitialSyncer.h" #include "RestServer/DatabaseFeature.h" #include "RestServer/ViewTypesFeature.h" #include "StorageEngine/EngineSelectorFeature.h" @@ -1723,19 +1724,49 @@ void TRI_vocbase_t::addReplicationApplier() { _replicationApplier.reset(applier); } +/// @brief note the progress of a connected replication client +/// this only updates the ttl +void TRI_vocbase_t::updateReplicationClient(TRI_server_id_t serverId, double ttl) { + if (ttl <= 0.0) { + ttl = InitialSyncer::defaultBatchTimeout; + } + double const expires = TRI_microtime() + ttl; + + WRITE_LOCKER(writeLocker, _replicationClientsLock); + + try { + auto it = _replicationClients.find(serverId); + + if (it != _replicationClients.end()) { + (*it).second.first = expires; + } + } catch (...) { + // silently fail... + // all we would be missing is the progress information of a slave + } +} + /// @brief note the progress of a connected replication client void TRI_vocbase_t::updateReplicationClient(TRI_server_id_t serverId, - TRI_voc_tick_t lastFetchedTick) { + TRI_voc_tick_t lastFetchedTick, + double ttl) { + if (ttl <= 0.0) { + ttl = InitialSyncer::defaultBatchTimeout; + } + double const expires = TRI_microtime() + ttl; + WRITE_LOCKER(writeLocker, _replicationClientsLock); try { auto it = _replicationClients.find(serverId); if (it == _replicationClients.end()) { + // insert new client entry _replicationClients.emplace( - serverId, std::make_pair(TRI_microtime(), lastFetchedTick)); + serverId, std::make_pair(expires, lastFetchedTick)); } else { - (*it).second.first = TRI_microtime(); + // update an existing client entry + (*it).second.first = expires; if (lastFetchedTick > 0) { (*it).second.second = lastFetchedTick; } @@ -1760,16 +1791,18 @@ TRI_vocbase_t::getReplicationClients() { return result; } -void TRI_vocbase_t::garbageCollectReplicationClients(double ttl) { +void TRI_vocbase_t::garbageCollectReplicationClients(double expireStamp) { + LOG_TOPIC(TRACE, Logger::REPLICATION) << "garbage collecting replication client entries"; + WRITE_LOCKER(writeLocker, _replicationClientsLock); try { - double now = TRI_microtime(); - auto it = _replicationClients.cbegin(); - while (it != _replicationClients.cend()) { - double lastUpdate = it->second.first; - double diff = now - lastUpdate; - if (diff > ttl) { + auto it = _replicationClients.begin(); + + while (it != _replicationClients.end()) { + double const expires = it->second.first; + if (expireStamp > expires) { + LOG_TOPIC(DEBUG, Logger::REPLICATION) << "removing expired replication client for server id " << it->first; it = _replicationClients.erase(it); } else { ++it; diff --git a/arangod/VocBase/vocbase.h b/arangod/VocBase/vocbase.h index 2da1367299..5787f1a33f 100644 --- a/arangod/VocBase/vocbase.h +++ b/arangod/VocBase/vocbase.h @@ -205,11 +205,19 @@ struct TRI_vocbase_t { TRI_vocbase_type_e type() const { return _type; } State state() const { return _state; } void setState(State state) { _state = state; } - void updateReplicationClient(TRI_server_id_t, TRI_voc_tick_t); + // return all replication clients registered std::vector> getReplicationClients(); - /// garbage collect replication clients - void garbageCollectReplicationClients(double ttl); + + // the ttl value is amount of seconds after which the client entry will + // expire and may be garbage-collected + void updateReplicationClient(TRI_server_id_t, double ttl); + // the ttl value is amount of seconds after which the client entry will + // expire and may be garbage-collected + void updateReplicationClient(TRI_server_id_t, TRI_voc_tick_t, double ttl); + // garbage collect replication clients that have an expire date later + // than the specified timetamp + void garbageCollectReplicationClients(double expireStamp); arangodb::DatabaseReplicationApplier* replicationApplier() const { return _replicationApplier.get(); diff --git a/js/client/modules/@arangodb/testsuites/replication.js b/js/client/modules/@arangodb/testsuites/replication.js index eab44adc9e..bf36c32052 100644 --- a/js/client/modules/@arangodb/testsuites/replication.js +++ b/js/client/modules/@arangodb/testsuites/replication.js @@ -26,6 +26,8 @@ // ////////////////////////////////////////////////////////////////////////////// const functionsDocumentation = { + 'replication_random': 'replication randomized tests', + 'replication_aql': 'replication AQL tests', 'replication_ongoing': 'replication ongoing tests', 'replication_static': 'replication static tests', 'replication_sync': 'replication sync tests', @@ -53,6 +55,124 @@ function shellReplication (options) { return tu.performTests(opts, testCases, 'shell_replication', tu.runThere); } +// ////////////////////////////////////////////////////////////////////////////// +// / @brief TEST: replication_random +// ////////////////////////////////////////////////////////////////////////////// + +function replicationRandom (options) { + let testCases = tu.scanTestPath('js/server/tests/replication/'); + + options.replication = true; + options.test = 'replication-random'; + let startStopHandlers = { + postStart: function (options, + serverOptions, + instanceInfo, + customInstanceInfos, + startStopHandlers) { + let message; + let slave = pu.startInstance('tcp', options, {}, 'slave_sync'); + let state = (typeof slave === 'object'); + + if (state) { + message = 'failed to start slave instance!'; + } + + return { + instanceInfo: slave, + message: message, + state: state, + env: { + 'flatCommands': slave.endpoint + } + }; + }, + + preStop: function (options, + serverOptions, + instanceInfo, + customInstanceInfos, + startStopHandlers) { + pu.shutdownInstance(customInstanceInfos.postStart.instanceInfo, options); + + return {}; + }, + + postStop: function (options, + serverOptions, + instanceInfo, + customInstanceInfos, + startStopHandlers) { + if (options.cleanup) { + pu.cleanupLastDirectory(options); + } + return { state: true }; + } + + }; + + return tu.performTests(options, testCases, 'replication_random', tu.runInArangosh, {}, startStopHandlers); +} + +// ////////////////////////////////////////////////////////////////////////////// +// / @brief TEST: replication_aql +// ////////////////////////////////////////////////////////////////////////////// + +function replicationAql (options) { + let testCases = tu.scanTestPath('js/server/tests/replication/'); + + options.replication = true; + options.test = 'replication-aql'; + let startStopHandlers = { + postStart: function (options, + serverOptions, + instanceInfo, + customInstanceInfos, + startStopHandlers) { + let message; + let slave = pu.startInstance('tcp', options, {}, 'slave_sync'); + let state = (typeof slave === 'object'); + + if (state) { + message = 'failed to start slave instance!'; + } + + return { + instanceInfo: slave, + message: message, + state: state, + env: { + 'flatCommands': slave.endpoint + } + }; + }, + + preStop: function (options, + serverOptions, + instanceInfo, + customInstanceInfos, + startStopHandlers) { + pu.shutdownInstance(customInstanceInfos.postStart.instanceInfo, options); + + return {}; + }, + + postStop: function (options, + serverOptions, + instanceInfo, + customInstanceInfos, + startStopHandlers) { + if (options.cleanup) { + pu.cleanupLastDirectory(options); + } + return { state: true }; + } + + }; + + return tu.performTests(options, testCases, 'replication_aql', tu.runInArangosh, {}, startStopHandlers); +} + // ////////////////////////////////////////////////////////////////////////////// // / @brief TEST: replication_ongoing // ////////////////////////////////////////////////////////////////////////////// @@ -280,6 +400,8 @@ function replicationSync (options) { function setup (testFns, defaultFns, opts, fnDocs, optionsDoc) { testFns['shell_replication'] = shellReplication; + testFns['replication_aql'] = replicationAql; + testFns['replication_random'] = replicationRandom; testFns['replication_ongoing'] = replicationOngoing; testFns['replication_static'] = replicationStatic; testFns['replication_sync'] = replicationSync; diff --git a/js/server/tests/recovery/collection-rename-with-data.js b/js/server/tests/recovery/collection-rename-with-data.js new file mode 100644 index 0000000000..21553f1d06 --- /dev/null +++ b/js/server/tests/recovery/collection-rename-with-data.js @@ -0,0 +1,96 @@ +/* jshint globalstrict:false, strict:false, unused : false */ +/* global assertEqual, assertNull */ + +// ////////////////////////////////////////////////////////////////////////////// +// / @brief tests for dump/reload +// / +// / @file +// / +// / DISCLAIMER +// / +// / Copyright 2010-2012 triagens GmbH, Cologne, Germany +// / +// / Licensed under the Apache License, Version 2.0 (the "License") +// / you may not use this file except in compliance with the License. +// / You may obtain a copy of the License at +// / +// / http://www.apache.org/licenses/LICENSE-2.0 +// / +// / Unless required by applicable law or agreed to in writing, software +// / distributed under the License is distributed on an "AS IS" BASIS, +// / WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// / See the License for the specific language governing permissions and +// / limitations under the License. +// / +// / Copyright holder is triAGENS GmbH, Cologne, Germany +// / +// / @author Jan Steemann +// / @author Copyright 2012, triAGENS GmbH, Cologne, Germany +// ////////////////////////////////////////////////////////////////////////////// + +var db = require('@arangodb').db; +var internal = require('internal'); +var jsunity = require('jsunity'); + +function runSetup () { + 'use strict'; + internal.debugClearFailAt(); + var i; + + db._drop('UnitTestsRecovery1'); + db._drop('UnitTestsRecovery2'); + db._create('UnitTestsRecovery1'); + + for (i = 0; i < 10000; ++i) { + db.UnitTestsRecovery1.save({ a: i }); + } + + db._create('UnitTestsRecovery2'); + db._query("FOR doc IN UnitTestsRecovery1 INSERT doc INTO UnitTestsRecovery2"); + + db._drop('UnitTestsRecovery1'); + db.UnitTestsRecovery2.rename('UnitTestsRecovery1'); + + internal.debugSegfault('crashing server'); +} + +// ////////////////////////////////////////////////////////////////////////////// +// / @brief test suite +// ////////////////////////////////////////////////////////////////////////////// + +function recoverySuite () { + 'use strict'; + jsunity.jsUnity.attachAssertions(); + + return { + setUp: function () {}, + tearDown: function () {}, + + // ////////////////////////////////////////////////////////////////////////////// + // / @brief test whether rename and recreate works + // ////////////////////////////////////////////////////////////////////////////// + + testCollectionRenameWithData: function () { + var c = db._collection('UnitTestsRecovery1'); + assertEqual(10000, c.count()); + + assertNull(db._collection('UnitTestsRecovery2')); + } + + }; +} + +// ////////////////////////////////////////////////////////////////////////////// +// / @brief executes the test suite +// ////////////////////////////////////////////////////////////////////////////// + +function main (argv) { + 'use strict'; + if (argv[1] === 'setup') { + runSetup(); + return 0; + } else { + jsunity.run(recoverySuite); + return jsunity.done().status ? 0 : 1; + } +} diff --git a/js/server/tests/replication/replication-aql.js b/js/server/tests/replication/replication-aql.js new file mode 100644 index 0000000000..846fe1f1e6 --- /dev/null +++ b/js/server/tests/replication/replication-aql.js @@ -0,0 +1,578 @@ +/*jshint globalstrict:false, strict:false, unused: false */ +/*global assertEqual, assertTrue, arango, ARGUMENTS */ + +//////////////////////////////////////////////////////////////////////////////// +/// @brief test the replication +/// +/// @file +/// +/// DISCLAIMER +/// +/// Copyright 2010-2012 triagens GmbH, Cologne, Germany +/// +/// Licensed under the Apache License, Version 2.0 (the "License"); +/// you may not use this file except in compliance with the License. +/// You may obtain a copy of the License at +/// +/// http://www.apache.org/licenses/LICENSE-2.0 +/// +/// Unless required by applicable law or agreed to in writing, software +/// distributed under the License is distributed on an "AS IS" BASIS, +/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +/// See the License for the specific language governing permissions and +/// limitations under the License. +/// +/// Copyright holder is triAGENS GmbH, Cologne, Germany +/// +/// @author Jan Steemann +/// @author Copyright 2017, triAGENS GmbH, Cologne, Germany +//////////////////////////////////////////////////////////////////////////////// + +var jsunity = require("jsunity"); +var arangodb = require("@arangodb"); +var db = arangodb.db; + +var replication = require("@arangodb/replication"); +var console = require("console"); +var internal = require("internal"); +var masterEndpoint = arango.getEndpoint(); +var slaveEndpoint = ARGUMENTS[0]; + +//////////////////////////////////////////////////////////////////////////////// +/// @brief test suite +//////////////////////////////////////////////////////////////////////////////// + +function ReplicationSuite() { + 'use strict'; + var cn = "UnitTestsReplication"; + + var connectToMaster = function() { + arango.reconnect(masterEndpoint, db._name(), "root", ""); + db._flushCache(); + }; + + var connectToSlave = function() { + arango.reconnect(slaveEndpoint, db._name(), "root", ""); + db._flushCache(); + }; + + var collectionChecksum = function(name) { + var c = db._collection(name).checksum(true, true); + return c.checksum; + }; + + var collectionCount = function(name) { + return db._collection(name).count(); + }; + + var compareTicks = function(l, r) { + var i; + if (l === null) { + l = "0"; + } + if (r === null) { + r = "0"; + } + if (l.length !== r.length) { + return l.length - r.length < 0 ? -1 : 1; + } + + // length is equal + for (i = 0; i < l.length; ++i) { + if (l[i] !== r[i]) { + return l[i] < r[i] ? -1 : 1; + } + } + + return 0; + }; + + var compare = function(masterFunc, masterFunc2, slaveFuncFinal) { + var state = {}; + + assertEqual(cn, db._name()); + db._flushCache(); + masterFunc(state); + + connectToSlave(); + assertEqual(cn, db._name()); + + var syncResult = replication.sync({ + endpoint: masterEndpoint, + username: "root", + password: "", + verbose: true, + includeSystem: false, + keepBarrier: true, + requireFromPresent: true, + }); + + assertTrue(syncResult.hasOwnProperty('lastLogTick')); + + connectToMaster(); + masterFunc2(state); + + // use lastLogTick as of now + state.lastLogTick = replication.logger.state().state.lastLogTick; + + let applierConfiguration = { + endpoint: masterEndpoint, + username: "root", + password: "" + }; + + connectToSlave(); + assertEqual(cn, db._name()); + + replication.applier.properties(applierConfiguration); + replication.applier.start(syncResult.lastLogTick, syncResult.barrierId); + + var printed = false; + + while (true) { + var slaveState = replication.applier.state(); + + if (slaveState.state.lastError.errorNum > 0) { + console.log("slave has errored:", JSON.stringify(slaveState.state.lastError)); + break; + } + + if (!slaveState.state.running) { + console.log("slave is not running"); + break; + } + + if (compareTicks(slaveState.state.lastAppliedContinuousTick, state.lastLogTick) >= 0 || + compareTicks(slaveState.state.lastProcessedContinuousTick, state.lastLogTick) >= 0) { // || + // compareTicks(slaveState.state.lastAvailableContinuousTick, syncResult.lastLogTick) > 0) { + console.log("slave has caught up. state.lastLogTick:", state.lastLogTick, "slaveState.lastAppliedContinuousTick:", slaveState.state.lastAppliedContinuousTick, "slaveState.lastProcessedContinuousTick:", slaveState.state.lastProcessedContinuousTick); + break; + } + + if (!printed) { + console.log("waiting for slave to catch up"); + printed = true; + } + internal.wait(0.5, false); + } + + db._flushCache(); + slaveFuncFinal(state); + }; + + return { + + setUp: function() { + db._useDatabase("_system"); + connectToMaster(); + try { + db._dropDatabase(cn); + } catch (err) {} + + db._createDatabase(cn); + db._useDatabase(cn); + + db._useDatabase("_system"); + connectToSlave(); + + try { + db._dropDatabase(cn); + } catch (err) {} + + db._createDatabase(cn); + }, + + tearDown: function() { + db._useDatabase("_system"); + connectToMaster(); + + db._useDatabase(cn); + connectToSlave(); + replication.applier.stop(); + replication.applier.forget(); + + db._useDatabase("_system"); + db._dropDatabase(cn); + }, + + testAqlInsert: function() { + db._useDatabase(cn); + connectToMaster(); + + compare( + function(state) { + db._create(cn); + }, + + function(state) { + for (let i = 0; i < 2000; ++i) { + db._query("INSERT { _key: \"test" + i + "\", value1: " + i + ", value2: " + (i % 100) + " } IN " + cn); + } + + assertEqual(2000, collectionCount(cn)); + state.checksum = collectionChecksum(cn); + state.count = collectionCount(cn); + }, + + function(state) { + assertEqual(state.checksum, collectionChecksum(cn)); + assertEqual(state.count, collectionCount(cn)); + + for (let i = 0; i < 2000; ++i) { + let docs = db._query("FOR doc IN " + cn + " FILTER doc._key == \"test" + i + "\" RETURN doc").toArray(); + assertEqual(1, docs.length); + assertEqual("test" + i, docs[0]._key); + assertEqual(i, docs[0].value1); + assertEqual(i % 100, docs[0].value2); + } + } + ); + }, + + testAqlRemove: function() { + db._useDatabase(cn); + connectToMaster(); + + compare( + function(state) { + let c = db._create(cn); + for (let i = 0; i < 100; ++i) { + c.insert({ _key: "test" + i, value1: i, value2: (i % 100) }); + } + }, + + function(state) { + for (let i = 0; i < 100; ++i) { + db._query("FOR doc IN " + cn + " FILTER doc.value1 == " + i + " REMOVE doc IN " + cn); + } + + assertEqual(0, collectionCount(cn)); + state.checksum = collectionChecksum(cn); + state.count = collectionCount(cn); + }, + + function(state) { + assertEqual(0, collectionCount(cn)); + assertEqual(state.checksum, collectionChecksum(cn)); + assertEqual(state.count, collectionCount(cn)); + + for (let i = 0; i < 100; ++i) { + let docs = db._query("FOR doc IN " + cn + " FILTER doc.value1 == " + i + " RETURN doc").toArray(); + assertEqual(0, docs.length); + } + } + ); + }, + + testAqlRemoveMulti: function() { + db._useDatabase(cn); + connectToMaster(); + + compare( + function(state) { + let c = db._create(cn); + for (let i = 0; i < 5000; ++i) { + c.insert({ _key: "test" + i, value1: i, value2: (i % 100) }); + } + c.ensureIndex({ type: "hash", fields: ["value2"] }); + }, + + function(state) { + for (let i = 0; i < 100; ++i) { + db._query("FOR doc IN " + cn + " FILTER doc.value2 == " + i + " REMOVE doc IN " + cn); + } + + assertEqual(0, collectionCount(cn)); + state.checksum = collectionChecksum(cn); + state.count = collectionCount(cn); + }, + + function(state) { + assertEqual(0, collectionCount(cn)); + assertEqual(state.checksum, collectionChecksum(cn)); + assertEqual(state.count, collectionCount(cn)); + } + ); + }, + + testAqlUpdate: function() { + db._useDatabase(cn); + connectToMaster(); + + compare( + function(state) { + let c = db._create(cn); + for (let i = 0; i < 100; ++i) { + c.insert({ _key: "test" + i, value1: i, value2: (i % 100) }); + } + }, + + function(state) { + for (let i = 0; i < 100; ++i) { + db._query("FOR doc IN " + cn + " FILTER doc.value1 == " + i + " UPDATE doc WITH { value3: doc.value1 + 1 } IN " + cn); + } + + state.checksum = collectionChecksum(cn); + state.count = collectionCount(cn); + }, + + function(state) { + assertEqual(state.checksum, collectionChecksum(cn)); + assertEqual(state.count, collectionCount(cn)); + + for (let i = 0; i < 100; ++i) { + let docs = db._query("FOR doc IN " + cn + " FILTER doc.value1 == " + i + " RETURN doc").toArray(); + assertEqual(1, docs.length); + assertEqual("test" + i, docs[0]._key); + assertEqual(i, docs[0].value1); + assertEqual(i % 100, docs[0].value2); + assertEqual(i + 1, docs[0].value3); + } + } + ); + }, + + testAqlUpdateMulti: function() { + db._useDatabase(cn); + connectToMaster(); + + compare( + function(state) { + let c = db._create(cn); + for (let i = 0; i < 5000; ++i) { + c.insert({ _key: "test" + i, value1: i, value2: (i % 100) }); + } + c.ensureIndex({ type: "hash", fields: ["value2"] }); + }, + + function(state) { + for (let i = 0; i < 100; ++i) { + db._query("FOR doc IN " + cn + " FILTER doc.value2 == " + i + " UPDATE doc WITH { value3: doc.value1 + 1 } IN " + cn); + } + + state.checksum = collectionChecksum(cn); + state.count = collectionCount(cn); + }, + + function(state) { + assertEqual(state.checksum, collectionChecksum(cn)); + assertEqual(state.count, collectionCount(cn)); + } + ); + }, + + testAqlUpdateEdge: function() { + db._useDatabase(cn); + connectToMaster(); + + compare( + function(state) { + let c = db._createEdgeCollection(cn); + for (let i = 0; i < 100; ++i) { + c.insert({ _key: "test" + i, _from: "test/v" + i, _to: "test/y" + i }); + } + }, + + function(state) { + for (let i = 0; i < 100; ++i) { + db._query("FOR doc IN " + cn + " FILTER doc._from == \"test/v" + i + "\" UPDATE doc WITH { _from: \"test/x" + i + "\" } IN " + cn); + } + + state.checksum = collectionChecksum(cn); + state.count = collectionCount(cn); + }, + + function(state) { + assertEqual(state.checksum, collectionChecksum(cn)); + assertEqual(state.count, collectionCount(cn)); + + for (let i = 0; i < 100; ++i) { + let docs = db._query("FOR doc IN " + cn + " FILTER doc._from == \"test/v" + i + "\" RETURN doc").toArray(); + assertEqual(0, docs.length); + } + + for (let i = 0; i < 100; ++i) { + let docs = db._query("FOR doc IN " + cn + " FILTER doc._from == \"test/x" + i + "\" RETURN doc").toArray(); + assertEqual(1, docs.length); + assertEqual("test" + i, docs[0]._key); + assertEqual("test/x" + i, docs[0]._from); + assertEqual("test/y" + i, docs[0]._to); + } + } + ); + }, + + testAqlUpdateEdgeMulti: function() { + db._useDatabase(cn); + connectToMaster(); + + compare( + function(state) { + let c = db._createEdgeCollection(cn); + for (let i = 0; i < 1000; ++i) { + c.insert({ _key: "test" + i, _from: "test/v" + (i % 100), _to: "test/y" + (i % 100) }); + } + }, + + function(state) { + for (let i = 0; i < 100; ++i) { + db._query("FOR doc IN " + cn + " FILTER doc._from == \"test/v" + i + "\" UPDATE doc WITH { _from: \"test/x" + i + "\" } IN " + cn); + } + + state.checksum = collectionChecksum(cn); + state.count = collectionCount(cn); + }, + + function(state) { + assertEqual(state.checksum, collectionChecksum(cn)); + assertEqual(state.count, collectionCount(cn)); + + for (let i = 0; i < 100; ++i) { + let docs = db._query("FOR doc IN " + cn + " FILTER doc._from == \"test/v" + i + "\" RETURN doc").toArray(); + assertEqual(0, docs.length); + } + + for (let i = 0; i < 100; ++i) { + let docs = db._query("FOR doc IN " + cn + " FILTER doc._from == \"test/x" + i + "\" RETURN doc").toArray(); + assertEqual(10, docs.length); + assertEqual("test/x" + i, docs[0]._from); + assertEqual("test/y" + i, docs[0]._to); + } + } + ); + }, + + testAqlUpdateEdgeExtraIndex: function() { + db._useDatabase(cn); + connectToMaster(); + + compare( + function(state) { + let c = db._createEdgeCollection(cn); + for (let i = 0; i < 100; ++i) { + c.insert({ _key: "test" + i, _from: "test/v" + i, _to: "test/y" + i }); + } + c.ensureIndex({ type: "hash", fields: ["_from", "_to"], unique: true }); + }, + + function(state) { + for (let i = 0; i < 100; ++i) { + db._query("FOR doc IN " + cn + " FILTER doc._from == \"test/v" + i + "\" UPDATE doc WITH { _from: \"test/x" + i + "\" } IN " + cn); + } + + state.checksum = collectionChecksum(cn); + state.count = collectionCount(cn); + }, + + function(state) { + assertEqual(state.checksum, collectionChecksum(cn)); + assertEqual(state.count, collectionCount(cn)); + + for (let i = 0; i < 100; ++i) { + let docs = db._query("FOR doc IN " + cn + " FILTER doc._from == \"test/v" + i + "\" RETURN doc").toArray(); + assertEqual(0, docs.length); + } + + for (let i = 0; i < 100; ++i) { + let docs = db._query("FOR doc IN " + cn + " FILTER doc._from == \"test/x" + i + "\" RETURN doc").toArray(); + assertEqual(1, docs.length); + assertEqual("test" + i, docs[0]._key); + assertEqual("test/x" + i, docs[0]._from); + assertEqual("test/y" + i, docs[0]._to); + } + } + ); + }, + + testAqlReplaceEdge: function() { + db._useDatabase(cn); + connectToMaster(); + + compare( + function(state) { + let c = db._createEdgeCollection(cn); + for (let i = 0; i < 100; ++i) { + c.insert({ _key: "test" + i, _from: "test/v" + i, _to: "test/y" + i }); + } + }, + + function(state) { + for (let i = 0; i < 100; ++i) { + db._query("FOR doc IN " + cn + " FILTER doc._from == \"test/v" + i + "\" REPLACE doc WITH { _from: \"test/x" + i + "\", _to: \"test/y" + i + "\" } IN " + cn); + } + + state.checksum = collectionChecksum(cn); + state.count = collectionCount(cn); + }, + + function(state) { + assertEqual(state.checksum, collectionChecksum(cn)); + assertEqual(state.count, collectionCount(cn)); + + for (let i = 0; i < 100; ++i) { + let docs = db._query("FOR doc IN " + cn + " FILTER doc._from == \"test/v" + i + "\" RETURN doc").toArray(); + assertEqual(0, docs.length); + } + + for (let i = 0; i < 100; ++i) { + let docs = db._query("FOR doc IN " + cn + " FILTER doc._from == \"test/x" + i + "\" RETURN doc").toArray(); + assertEqual(1, docs.length); + assertEqual("test" + i, docs[0]._key); + assertEqual("test/x" + i, docs[0]._from); + assertEqual("test/y" + i, docs[0]._to); + } + } + ); + }, + + testAqlReplaceEdgeMulti: function() { + db._useDatabase(cn); + connectToMaster(); + + compare( + function(state) { + let c = db._createEdgeCollection(cn); + for (let i = 0; i < 1000; ++i) { + c.insert({ _key: "test" + i, _from: "test/v" + (i % 100), _to: "test/y" + (i % 100) }); + } + }, + + function(state) { + for (let i = 0; i < 100; ++i) { + db._query("FOR doc IN " + cn + " FILTER doc._from == \"test/v" + i + "\" REPLACE doc WITH { _from: \"test/x" + i + "\", _to: doc._to } IN " + cn); + } + + state.checksum = collectionChecksum(cn); + state.count = collectionCount(cn); + }, + + function(state) { + assertEqual(state.checksum, collectionChecksum(cn)); + assertEqual(state.count, collectionCount(cn)); + + for (let i = 0; i < 100; ++i) { + let docs = db._query("FOR doc IN " + cn + " FILTER doc._from == \"test/v" + i + "\" RETURN doc").toArray(); + assertEqual(0, docs.length); + } + + for (let i = 0; i < 100; ++i) { + let docs = db._query("FOR doc IN " + cn + " FILTER doc._from == \"test/x" + i + "\" RETURN doc").toArray(); + assertEqual(10, docs.length); + assertEqual("test/x" + i, docs[0]._from); + assertEqual("test/y" + i, docs[0]._to); + } + } + ); + }, + + }; +} + + +//////////////////////////////////////////////////////////////////////////////// +/// @brief executes the test suite +//////////////////////////////////////////////////////////////////////////////// + +jsunity.run(ReplicationSuite); + +return jsunity.done(); diff --git a/js/server/tests/replication/replication-random.js b/js/server/tests/replication/replication-random.js new file mode 100644 index 0000000000..d9f1ee6b09 --- /dev/null +++ b/js/server/tests/replication/replication-random.js @@ -0,0 +1,335 @@ +/*jshint globalstrict:false, strict:false, unused: false */ +/*global assertEqual, assertTrue, arango, ARGUMENTS */ + +//////////////////////////////////////////////////////////////////////////////// +/// @brief test the replication +/// +/// @file +/// +/// DISCLAIMER +/// +/// Copyright 2010-2012 triagens GmbH, Cologne, Germany +/// +/// Licensed under the Apache License, Version 2.0 (the "License"); +/// you may not use this file except in compliance with the License. +/// You may obtain a copy of the License at +/// +/// http://www.apache.org/licenses/LICENSE-2.0 +/// +/// Unless required by applicable law or agreed to in writing, software +/// distributed under the License is distributed on an "AS IS" BASIS, +/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +/// See the License for the specific language governing permissions and +/// limitations under the License. +/// +/// Copyright holder is triAGENS GmbH, Cologne, Germany +/// +/// @author Jan Steemann +/// @author Copyright 2017, triAGENS GmbH, Cologne, Germany +//////////////////////////////////////////////////////////////////////////////// + +var jsunity = require("jsunity"); +var arangodb = require("@arangodb"); +var db = arangodb.db; + +var replication = require("@arangodb/replication"); +var console = require("console"); +var internal = require("internal"); +var masterEndpoint = arango.getEndpoint(); +var slaveEndpoint = ARGUMENTS[0]; + +//////////////////////////////////////////////////////////////////////////////// +/// @brief test suite +//////////////////////////////////////////////////////////////////////////////// + +function ReplicationSuite() { + 'use strict'; + var cn = "UnitTestsReplication"; + var cn2 = "UnitTestsReplication2"; + var cn3 = "UnitTestsReplication3"; + + var connectToMaster = function() { + arango.reconnect(masterEndpoint, db._name(), "root", ""); + db._flushCache(); + }; + + var connectToSlave = function() { + arango.reconnect(slaveEndpoint, db._name(), "root", ""); + db._flushCache(); + }; + + var collectionChecksum = function(name) { + var c = db._collection(name).checksum(true, true); + return c.checksum; + }; + + var collectionCount = function(name) { + return db._collection(name).count(); + }; + + var compareTicks = function(l, r) { + var i; + if (l === null) { + l = "0"; + } + if (r === null) { + r = "0"; + } + if (l.length !== r.length) { + return l.length - r.length < 0 ? -1 : 1; + } + + // length is equal + for (i = 0; i < l.length; ++i) { + if (l[i] !== r[i]) { + return l[i] < r[i] ? -1 : 1; + } + } + + return 0; + }; + + var compare = function(masterFunc, masterFunc2, slaveFuncFinal) { + var state = {}; + + assertEqual(cn, db._name()); + db._flushCache(); + masterFunc(state); + + connectToSlave(); + assertEqual(cn, db._name()); + + var syncResult = replication.sync({ + endpoint: masterEndpoint, + username: "root", + password: "", + verbose: true, + includeSystem: false, + keepBarrier: true, + requireFromPresent: true, + }); + + assertTrue(syncResult.hasOwnProperty('lastLogTick')); + + connectToMaster(); + masterFunc2(state); + + // use lastLogTick as of now + state.lastLogTick = replication.logger.state().state.lastLogTick; + + let applierConfiguration = { + endpoint: masterEndpoint, + username: "root", + password: "" + }; + + connectToSlave(); + assertEqual(cn, db._name()); + + replication.applier.properties(applierConfiguration); + replication.applier.start(syncResult.lastLogTick, syncResult.barrierId); + + var printed = false; + + while (true) { + var slaveState = replication.applier.state(); + + if (slaveState.state.lastError.errorNum > 0) { + console.log("slave has errored:", JSON.stringify(slaveState.state.lastError)); + break; + } + + if (!slaveState.state.running) { + console.log("slave is not running"); + break; + } + + if (compareTicks(slaveState.state.lastAppliedContinuousTick, state.lastLogTick) >= 0 || + compareTicks(slaveState.state.lastProcessedContinuousTick, state.lastLogTick) >= 0) { // || + // compareTicks(slaveState.state.lastAvailableContinuousTick, syncResult.lastLogTick) > 0) { + console.log("slave has caught up. state.lastLogTick:", state.lastLogTick, "slaveState.lastAppliedContinuousTick:", slaveState.state.lastAppliedContinuousTick, "slaveState.lastProcessedContinuousTick:", slaveState.state.lastProcessedContinuousTick); + break; + } + + if (!printed) { + console.log("waiting for slave to catch up"); + printed = true; + } + internal.wait(0.5, false); + } + + db._flushCache(); + slaveFuncFinal(state); + }; + + return { + + setUp: function() { + db._useDatabase("_system"); + connectToMaster(); + try { + db._dropDatabase(cn); + } catch (err) {} + + db._createDatabase(cn); + db._useDatabase(cn); + + db._create(cn); + db._create(cn2); + db._create(cn3); + + db._useDatabase("_system"); + connectToSlave(); + + try { + db._dropDatabase(cn); + } catch (err) {} + + db._createDatabase(cn); + }, + + tearDown: function() { + db._useDatabase("_system"); + connectToMaster(); + + db._useDatabase(cn); + connectToSlave(); + replication.applier.stop(); + replication.applier.forget(); + + db._useDatabase("_system"); + db._dropDatabase(cn); + }, + + testRandomTransactions: function() { + let nextId = 0; + let keys = { [cn]: [], [cn2]: [], [cn3]: [] }; + + let nextKey = function() { + return "test" + (++nextId); + }; + + let generateInsert = function(collections) { + let c = collections[Math.floor(Math.random() * collections.length)]; + let key = nextKey(); + keys[c][key] = 1; + + return "db[" + JSON.stringify(c) + "].insert({ _key: " + JSON.stringify(key) + ", value: \"thisIsSomeStringValue\" });"; + }; + + let generateUpdate = function(collections) { + let c = collections[Math.floor(Math.random() * collections.length)]; + let all = Object.keys(keys[c]); + if (all.length === 0) { + // still empty, turn into an insert first + return generateInsert(collections); + } + let key = all[Math.floor(Math.random() * all.length)]; + return "db[" + JSON.stringify(c) + "].update(" + JSON.stringify(key) + ", { value: \"thisIsSomeUpdatedStringValue\" });"; + }; + + let generateReplace = function(collections) { + let c = collections[Math.floor(Math.random() * collections.length)]; + let all = Object.keys(keys[c]); + if (all.length === 0) { + // still empty, turn into an insert first + return generateInsert(collections); + } + let key = all[Math.floor(Math.random() * all.length)]; + return "db[" + JSON.stringify(c) + "].replace(" + JSON.stringify(key) + ", { value: \"thisIsSomeReplacedStringValue\" });"; + }; + + let generateRemove = function(collections) { + let c = collections[Math.floor(Math.random() * collections.length)]; + let all = Object.keys(keys[c]); + if (all.length === 0) { + // still empty, turn into an insert first + return generateInsert(collections); + } + let key = all[Math.floor(Math.random() * all.length)]; + delete keys[c][key]; + return "db[" + JSON.stringify(c) + "].remove(" + JSON.stringify(key) + ");"; + }; + + let generateTruncate = function(collections) { + let c = collections[Math.floor(Math.random() * collections.length)]; + keys[c] = {}; + return "db[" + JSON.stringify(c) + "].truncate();"; + }; + + let allOps = [ + { name: "insert", generate: generateInsert }, + { name: "update", generate: generateUpdate }, + { name: "replace", generate: generateReplace }, + { name: "remove", generate: generateRemove }, +// { name: "truncate", generate: generateTruncate } + ]; + + let createTransaction = function(state) { + let trx = { collections: { read: [], write: [] } }; + + // determine collections + do { + if (Math.random() >= 0.5) { + trx.collections.write.push(cn); + } + if (Math.random() >= 0.5) { + trx.collections.write.push(cn2); + } + if (Math.random() >= 0.5) { + trx.collections.write.push(cn3); + } + } while (trx.collections.write.length === 0); + + let n = Math.floor(Math.random() * 100) + 1; + let ops = "function() { let db = require('internal').db;\n"; + for (let i = 0; i < n; ++i) { + ops += allOps[Math.floor(Math.random() * allOps.length)].generate(trx.collections.write) + "\n"; + } + trx.action = ops + " }"; + return trx; + }; + + db._useDatabase(cn); + connectToMaster(); + + compare( + function(state) { + }, + + function(state) { + for (let i = 0; i < 10000; ++i) { + let trx = createTransaction(state); + db._executeTransaction(trx); + } + + state.checksum = collectionChecksum(cn); + state.checksum2 = collectionChecksum(cn2); + state.checksum3 = collectionChecksum(cn3); + state.count = collectionCount(cn); + state.count2 = collectionCount(cn2); + state.count3 = collectionCount(cn3); + }, + + function(state) { + assertEqual(state.checksum, collectionChecksum(cn)); + assertEqual(state.checksum2, collectionChecksum(cn2)); + assertEqual(state.checksum3, collectionChecksum(cn3)); + assertEqual(state.count, collectionCount(cn)); + assertEqual(state.count2, collectionCount(cn2)); + assertEqual(state.count3, collectionCount(cn3)); + } + ); + } + + }; +} + + +//////////////////////////////////////////////////////////////////////////////// +/// @brief executes the test suite +//////////////////////////////////////////////////////////////////////////////// + +jsunity.run(ReplicationSuite); + +return jsunity.done();