From 96cbe699b9dc9611d889a32ea332fc73692f69db Mon Sep 17 00:00:00 2001 From: Jan Date: Thu, 28 Jun 2018 19:14:14 +0200 Subject: [PATCH] honor "restrictType" and "restrictCollections" when syncing (#5689) --- arangod/Replication/DatabaseInitialSyncer.cpp | 17 +++++ arangod/Replication/DatabaseInitialSyncer.h | 14 ++-- arangod/Replication/DatabaseTailingSyncer.cpp | 55 ++++++++++++++ arangod/Replication/DatabaseTailingSyncer.h | 7 +- arangod/Replication/GlobalInitialSyncer.cpp | 19 +++++ arangod/Replication/GlobalInitialSyncer.h | 3 + arangod/Replication/GlobalTailingSyncer.cpp | 73 ++++++++++++++++++ arangod/Replication/GlobalTailingSyncer.h | 9 ++- arangod/Replication/TailingSyncer.cpp | 29 +++---- arangod/Replication/TailingSyncer.h | 3 +- .../replication/replication-ongoing-global.js | 76 +++++++++++++++++-- 11 files changed, 275 insertions(+), 30 deletions(-) diff --git a/arangod/Replication/DatabaseInitialSyncer.cpp b/arangod/Replication/DatabaseInitialSyncer.cpp index 649fb30145..c5fbb60357 100644 --- a/arangod/Replication/DatabaseInitialSyncer.cpp +++ b/arangod/Replication/DatabaseInitialSyncer.cpp @@ -189,6 +189,23 @@ Result DatabaseInitialSyncer::runWithInventory(bool incremental, } } +/// @brief returns the inventory +Result DatabaseInitialSyncer::inventory(VPackBuilder& builder) { + if (_client == nullptr || _connection == nullptr || _endpoint == nullptr) { + return Result(TRI_ERROR_INTERNAL, "invalid endpoint"); + } + + auto r = sendStartBatch(); + if (r.fail()) { + return r; + } + + TRI_DEFER(sendFinishBatch()); + + // caller did not supply an inventory, we need to fetch it + return fetchInventory(builder); +} + /// @brief check whether the initial synchronization should be aborted bool DatabaseInitialSyncer::isAborted() const { if (application_features::ApplicationServer::isStopping() || diff --git a/arangod/Replication/DatabaseInitialSyncer.h b/arangod/Replication/DatabaseInitialSyncer.h index 7c7cbc7bc1..2c9673f74b 100644 --- a/arangod/Replication/DatabaseInitialSyncer.h +++ b/arangod/Replication/DatabaseInitialSyncer.h @@ -124,8 +124,13 @@ class DatabaseInitialSyncer final : public InitialSyncer { /// in rocksdb for a constant view of the data double batchUpdateTime() const { return _batchUpdateTime; } + /// @brief fetch the server's inventory, public method + Result inventory(arangodb::velocypack::Builder& builder); + private: - + /// @brief fetch the server's inventory + Result fetchInventory(arangodb::velocypack::Builder& builder); + /// @brief set a progress message void setProgress(std::string const& msg) override; @@ -161,10 +166,7 @@ class DatabaseInitialSyncer final : public InitialSyncer { Result handleCollection(arangodb::velocypack::Slice const&, arangodb::velocypack::Slice const&, bool incremental, sync_phase_e); - - /// @brief fetch the server's inventory - Result fetchInventory(arangodb::velocypack::Builder& builder); - + /// @brief handle the inventory response of the master Result handleLeaderCollections(arangodb::velocypack::Slice const&, bool); @@ -187,4 +189,4 @@ class DatabaseInitialSyncer final : public InitialSyncer { } // arangodb -#endif \ No newline at end of file +#endif diff --git a/arangod/Replication/DatabaseTailingSyncer.cpp b/arangod/Replication/DatabaseTailingSyncer.cpp index 339e2630f1..922df24edb 100644 --- a/arangod/Replication/DatabaseTailingSyncer.cpp +++ b/arangod/Replication/DatabaseTailingSyncer.cpp @@ -196,3 +196,58 @@ Result DatabaseTailingSyncer::syncCollectionFinalize(std::string const& collecti LOG_TOPIC(DEBUG, Logger::REPLICATION) << "Fetching more data fromTick " << fromTick; } } + +bool DatabaseTailingSyncer::skipMarker(VPackSlice const& slice) { + // we do not have a "cname" attribute in the marker... + // now check for a globally unique id attribute ("cuid") + // if its present, then we will use our local cuid -> collection name + // translation table + VPackSlice const name = slice.get("cuid"); + if (!name.isString()) { + return false; + } + + if (_translations.empty()) { + // no translations yet... query master inventory to find names of all + // collections + try { + DatabaseInitialSyncer init(*_vocbase, _configuration); + VPackBuilder inventoryResponse; + Result res = init.inventory(inventoryResponse); + if (res.fail()) { + LOG_TOPIC(ERR, Logger::REPLICATION) << "got error while fetching master inventory for collection name translations: " << res.errorMessage(); + return false; + } + VPackSlice invSlice = inventoryResponse.slice(); + if (!invSlice.isObject()) { + return false; + } + invSlice = invSlice.get("collections"); + if (!invSlice.isArray()) { + return false; + } + + for (auto const& it : VPackArrayIterator(invSlice)) { + if (!it.isObject()) { + continue; + } + VPackSlice c = it.get("parameters"); + if (c.hasKey("name") && c.hasKey("globallyUniqueId")) { + _translations[c.get("globallyUniqueId").copyString()] = c.get("name").copyString(); + } + } + } catch (std::exception const& ex) { + LOG_TOPIC(ERR, Logger::REPLICATION) << "got error while fetching inventory: " << ex.what(); + return false; + } + } + + // look up cuid in translations map + auto it = _translations.find(name.copyString()); + + if (it != _translations.end()) { + return isExcludedCollection((*it).second); + } + + return false; +} diff --git a/arangod/Replication/DatabaseTailingSyncer.h b/arangod/Replication/DatabaseTailingSyncer.h index 01f38335e6..3e3502d0e0 100644 --- a/arangod/Replication/DatabaseTailingSyncer.h +++ b/arangod/Replication/DatabaseTailingSyncer.h @@ -62,11 +62,16 @@ class DatabaseTailingSyncer final : public TailingSyncer { return &(vocbases().begin()->second.database()); } + /// @brief whether or not we should skip a specific marker + bool skipMarker(arangodb::velocypack::Slice const& slice) override; + private: /// @brief vocbase to use for this run TRI_vocbase_t* _vocbase; - + + /// @brief translation between globallyUniqueId and collection name + std::unordered_map _translations; }; } diff --git a/arangod/Replication/GlobalInitialSyncer.cpp b/arangod/Replication/GlobalInitialSyncer.cpp index 9d73ddf5ee..b1fed7eb16 100644 --- a/arangod/Replication/GlobalInitialSyncer.cpp +++ b/arangod/Replication/GlobalInitialSyncer.cpp @@ -322,6 +322,25 @@ Result GlobalInitialSyncer::updateServerInventory(VPackSlice const& masterDataba return TRI_ERROR_NO_ERROR; } +/// @brief returns the inventory +Result GlobalInitialSyncer::inventory(VPackBuilder& builder) { + if (_client == nullptr || _connection == nullptr || _endpoint == nullptr) { + return Result(TRI_ERROR_INTERNAL, "invalid endpoint"); + } else if (application_features::ApplicationServer::isStopping()) { + return Result(TRI_ERROR_SHUTTING_DOWN); + } + + auto r = sendStartBatch(); + if (r.fail()) { + return r; + } + + TRI_DEFER(sendFinishBatch()); + + // caller did not supply an inventory, we need to fetch it + return fetchInventory(builder); +} + Result GlobalInitialSyncer::fetchInventory(VPackBuilder& builder) { std::string url = ReplicationUrl + "/inventory?serverId=" + _localServerIdString + "&batchId=" + std::to_string(_batchId) + "&global=true"; diff --git a/arangod/Replication/GlobalInitialSyncer.h b/arangod/Replication/GlobalInitialSyncer.h index 4b734e5182..87bcf0be7f 100644 --- a/arangod/Replication/GlobalInitialSyncer.h +++ b/arangod/Replication/GlobalInitialSyncer.h @@ -40,6 +40,9 @@ class GlobalInitialSyncer final : public InitialSyncer { /// public method, catches exceptions arangodb::Result run(bool incremental) override; + /// @brief fetch the server's inventory, public method + Result inventory(arangodb::velocypack::Builder& builder); + private: /// @brief run method, performs a full synchronization diff --git a/arangod/Replication/GlobalTailingSyncer.cpp b/arangod/Replication/GlobalTailingSyncer.cpp index 6757fb4e9f..2bf8ce1bb3 100644 --- a/arangod/Replication/GlobalTailingSyncer.cpp +++ b/arangod/Replication/GlobalTailingSyncer.cpp @@ -27,6 +27,9 @@ #include "Replication/GlobalInitialSyncer.h" #include "Replication/ReplicationFeature.h" +#include +#include + using namespace arangodb; using namespace arangodb::basics; using namespace arangodb::httpclient; @@ -78,3 +81,73 @@ Result GlobalTailingSyncer::saveApplierState() { } return TRI_ERROR_INTERNAL; } + +bool GlobalTailingSyncer::skipMarker(VPackSlice const& slice) { + // we do not have a "cname" attribute in the marker... + // now check for a globally unique id attribute ("cuid") + // if its present, then we will use our local cuid -> collection name + // translation table + VPackSlice const name = slice.get("cuid"); + if (!name.isString()) { + return false; + } + + if (_translations.empty()) { + // no translations yet... query master inventory to find names of all + // collections + try { + GlobalInitialSyncer init(_configuration); + VPackBuilder inventoryResponse; + Result res = init.inventory(inventoryResponse); + if (res.fail()) { + LOG_TOPIC(ERR, Logger::REPLICATION) << "got error while fetching master inventory for collection name translations: " << res.errorMessage(); + return false; + } + + VPackSlice invSlice = inventoryResponse.slice(); + if (!invSlice.isObject()) { + return false; + } + invSlice = invSlice.get("databases"); + if (!invSlice.isObject()) { + return false; + } + + for (auto const& it : VPackObjectIterator(invSlice)) { + VPackSlice dbObj = it.value; + if (!dbObj.isObject()) { + continue; + } + + dbObj = dbObj.get("collections"); + if (!dbObj.isArray()) { + return false; + } + + for (auto const& it : VPackArrayIterator(dbObj)) { + if (!it.isObject()) { + continue; + } + VPackSlice c = it.get("parameters"); + if (c.hasKey("name") && c.hasKey("globallyUniqueId")) { + // we'll store everything for all databases in a global hash table, + // as we expect the globally unique ids to be unique... + _translations[c.get("globallyUniqueId").copyString()] = c.get("name").copyString(); + } + } + } + } catch (std::exception const& ex) { + LOG_TOPIC(ERR, Logger::REPLICATION) << "got error while fetching inventory: " << ex.what(); + return false; + } + } + + // look up cuid in translations map + auto it = _translations.find(name.copyString()); + + if (it != _translations.end()) { + return isExcludedCollection((*it).second); + } + + return false; +} diff --git a/arangod/Replication/GlobalTailingSyncer.h b/arangod/Replication/GlobalTailingSyncer.h index 3e8a1c454b..426a1fc22b 100644 --- a/arangod/Replication/GlobalTailingSyncer.h +++ b/arangod/Replication/GlobalTailingSyncer.h @@ -22,7 +22,7 @@ //////////////////////////////////////////////////////////////////////////////// #ifndef ARANGOD_REPLICATION_GLOBAL_CONTINUOUS_SYNCER_H -#define ARANGOD_REPLICATION_DATABASE_CONTINUOUS_SYNCER_H 1 +#define ARANGOD_REPLICATION_GLOBAL_CONTINUOUS_SYNCER_H 1 #include "TailingSyncer.h" #include "Replication/GlobalReplicationApplier.h" @@ -45,12 +45,17 @@ class GlobalTailingSyncer : public TailingSyncer { } protected: - /// @brief resolve to proper base url std::string tailingBaseUrl(std::string const& command) override; /// @brief save the current applier state Result saveApplierState() override; + + bool skipMarker(arangodb::velocypack::Slice const& slice) override; + + private: + /// @brief translation between globallyUniqueId and collection name + std::unordered_map _translations; }; } diff --git a/arangod/Replication/TailingSyncer.cpp b/arangod/Replication/TailingSyncer.cpp index c9523476f3..d00a2281ff 100644 --- a/arangod/Replication/TailingSyncer.cpp +++ b/arangod/Replication/TailingSyncer.cpp @@ -138,7 +138,7 @@ void TailingSyncer::abortOngoingTransactions() { /// @brief whether or not a marker should be skipped bool TailingSyncer::skipMarker(TRI_voc_tick_t firstRegularTick, - VPackSlice const& slice) const { + VPackSlice const& slice) { bool tooOld = false; std::string const tick = VelocyPackHelper::getStringValue(slice, "tick", ""); @@ -177,20 +177,23 @@ bool TailingSyncer::skipMarker(TRI_voc_tick_t firstRegularTick, if (tooOld) { return true; } - + // the transient applier state is just used for one shard / collection - if (!_configuration._restrictCollections.empty()) { - if (_configuration._restrictType.empty() && _configuration._includeSystem) { - return false; - } - - VPackSlice const name = slice.get("cname"); - if (name.isString()) { - return isExcludedCollection(name.copyString()); - } + if (_configuration._restrictCollections.empty()) { + return false; } - - return false; + + if (_configuration._restrictType.empty() && _configuration._includeSystem) { + return false; + } + + VPackSlice const name = slice.get("cname"); + if (name.isString()) { + return isExcludedCollection(name.copyString()); + } + + // call virtual method + return skipMarker(slice); } /// @brief whether or not a collection should be excluded diff --git a/arangod/Replication/TailingSyncer.h b/arangod/Replication/TailingSyncer.h index f34434ab05..224cd03621 100644 --- a/arangod/Replication/TailingSyncer.h +++ b/arangod/Replication/TailingSyncer.h @@ -73,7 +73,7 @@ class TailingSyncer : public Syncer { void abortOngoingTransactions(); /// @brief whether or not a collection should be excluded - bool skipMarker(TRI_voc_tick_t, arangodb::velocypack::Slice const&) const; + bool skipMarker(TRI_voc_tick_t, arangodb::velocypack::Slice const&); /// @brief whether or not a collection should be excluded bool isExcludedCollection(std::string const&) const; @@ -131,6 +131,7 @@ class TailingSyncer : public Syncer { arangodb::Result runInternal(); protected: + virtual bool skipMarker(arangodb::velocypack::Slice const& slice) = 0; /// @brief pointer to the applier ReplicationApplier* _applier; diff --git a/js/server/tests/replication/replication-ongoing-global.js b/js/server/tests/replication/replication-ongoing-global.js index cffc19cec3..cf13239ded 100644 --- a/js/server/tests/replication/replication-ongoing-global.js +++ b/js/server/tests/replication/replication-ongoing-global.js @@ -76,6 +76,12 @@ const compare = function(masterFunc, masterFunc2, slaveFuncOngoing, slaveFuncFin while (replication.globalApplier.state().state.running) { internal.wait(0.1, false); } + + applierConfiguration = applierConfiguration || {}; + applierConfiguration.endpoint = masterEndpoint; + applierConfiguration.username = "root"; + applierConfiguration.password = ""; + applierConfiguration.includeSystem = false; var syncResult = replication.syncGlobal({ endpoint: masterEndpoint, @@ -83,7 +89,9 @@ const compare = function(masterFunc, masterFunc2, slaveFuncOngoing, slaveFuncFin password: "", verbose: true, includeSystem: false, - keepBarrier: false + keepBarrier: false, + restrictType: applierConfiguration.restrictType, + restrictCollections: applierConfiguration.restrictCollections }); assertTrue(syncResult.hasOwnProperty('lastLogTick')); @@ -94,12 +102,6 @@ const compare = function(masterFunc, masterFunc2, slaveFuncOngoing, slaveFuncFin // use lastLogTick as of now state.lastLogTick = replication.logger.state().state.lastLogTick; - applierConfiguration = applierConfiguration || {}; - applierConfiguration.endpoint = masterEndpoint; - applierConfiguration.username = "root"; - applierConfiguration.password = ""; - applierConfiguration.includeSystem = false; - if (!applierConfiguration.hasOwnProperty('chunkSize')) { applierConfiguration.chunkSize = 16384; } @@ -160,6 +162,66 @@ function BaseTestConfig() { 'use strict'; return { + + testIncludeCollection: function() { + connectToMaster(); + + compare( + function(state) { + }, + + function(state) { + db._create(cn); + db._create(cn + "2"); + for (var i = 0; i < 100; ++i) { + db._collection(cn).save({ value: i }); + db._collection(cn + "2").save({ value: i }); + } + internal.wal.flush(true, true); + }, + + function(state) { + return true; + }, + + function(state) { + assertTrue(db._collection(cn).count() === 100); + assertNull(db._collection(cn + "2")); + }, + + { restrictType: "include", restrictCollections: [cn] } + ); + }, + + testExcludeCollection: function() { + connectToMaster(); + + compare( + function(state) { + }, + + function(state) { + db._create(cn); + db._create(cn + "2"); + for (var i = 0; i < 100; ++i) { + db._collection(cn).save({ value: i }); + db._collection(cn + "2").save({ value: i }); + } + internal.wal.flush(true, true); + }, + + function(state) { + return true; + }, + + function(state) { + assertTrue(db._collection(cn).count() === 100); + assertNull(db._collection(cn + "2")); + }, + + { restrictType: "exclude", restrictCollections: [cn + "2"] } + ); + }, //////////////////////////////////////////////////////////////////////////////// /// @brief test collection creation