mirror of https://gitee.com/bigwinds/arangodb
honor "restrictType" and "restrictCollections" when syncing (#5689)
This commit is contained in:
parent
b1a7316df4
commit
96cbe699b9
|
@ -189,6 +189,23 @@ Result DatabaseInitialSyncer::runWithInventory(bool incremental,
|
|||
}
|
||||
}
|
||||
|
||||
/// @brief returns the inventory
|
||||
Result DatabaseInitialSyncer::inventory(VPackBuilder& builder) {
|
||||
if (_client == nullptr || _connection == nullptr || _endpoint == nullptr) {
|
||||
return Result(TRI_ERROR_INTERNAL, "invalid endpoint");
|
||||
}
|
||||
|
||||
auto r = sendStartBatch();
|
||||
if (r.fail()) {
|
||||
return r;
|
||||
}
|
||||
|
||||
TRI_DEFER(sendFinishBatch());
|
||||
|
||||
// caller did not supply an inventory, we need to fetch it
|
||||
return fetchInventory(builder);
|
||||
}
|
||||
|
||||
/// @brief check whether the initial synchronization should be aborted
|
||||
bool DatabaseInitialSyncer::isAborted() const {
|
||||
if (application_features::ApplicationServer::isStopping() ||
|
||||
|
|
|
@ -124,8 +124,13 @@ class DatabaseInitialSyncer final : public InitialSyncer {
|
|||
/// in rocksdb for a constant view of the data
|
||||
double batchUpdateTime() const { return _batchUpdateTime; }
|
||||
|
||||
/// @brief fetch the server's inventory, public method
|
||||
Result inventory(arangodb::velocypack::Builder& builder);
|
||||
|
||||
private:
|
||||
|
||||
/// @brief fetch the server's inventory
|
||||
Result fetchInventory(arangodb::velocypack::Builder& builder);
|
||||
|
||||
/// @brief set a progress message
|
||||
void setProgress(std::string const& msg) override;
|
||||
|
||||
|
@ -161,10 +166,7 @@ class DatabaseInitialSyncer final : public InitialSyncer {
|
|||
Result handleCollection(arangodb::velocypack::Slice const&,
|
||||
arangodb::velocypack::Slice const&, bool incremental,
|
||||
sync_phase_e);
|
||||
|
||||
/// @brief fetch the server's inventory
|
||||
Result fetchInventory(arangodb::velocypack::Builder& builder);
|
||||
|
||||
|
||||
/// @brief handle the inventory response of the master
|
||||
Result handleLeaderCollections(arangodb::velocypack::Slice const&, bool);
|
||||
|
||||
|
@ -187,4 +189,4 @@ class DatabaseInitialSyncer final : public InitialSyncer {
|
|||
|
||||
} // arangodb
|
||||
|
||||
#endif
|
||||
#endif
|
||||
|
|
|
@ -196,3 +196,58 @@ Result DatabaseTailingSyncer::syncCollectionFinalize(std::string const& collecti
|
|||
LOG_TOPIC(DEBUG, Logger::REPLICATION) << "Fetching more data fromTick " << fromTick;
|
||||
}
|
||||
}
|
||||
|
||||
bool DatabaseTailingSyncer::skipMarker(VPackSlice const& slice) {
|
||||
// we do not have a "cname" attribute in the marker...
|
||||
// now check for a globally unique id attribute ("cuid")
|
||||
// if its present, then we will use our local cuid -> collection name
|
||||
// translation table
|
||||
VPackSlice const name = slice.get("cuid");
|
||||
if (!name.isString()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (_translations.empty()) {
|
||||
// no translations yet... query master inventory to find names of all
|
||||
// collections
|
||||
try {
|
||||
DatabaseInitialSyncer init(*_vocbase, _configuration);
|
||||
VPackBuilder inventoryResponse;
|
||||
Result res = init.inventory(inventoryResponse);
|
||||
if (res.fail()) {
|
||||
LOG_TOPIC(ERR, Logger::REPLICATION) << "got error while fetching master inventory for collection name translations: " << res.errorMessage();
|
||||
return false;
|
||||
}
|
||||
VPackSlice invSlice = inventoryResponse.slice();
|
||||
if (!invSlice.isObject()) {
|
||||
return false;
|
||||
}
|
||||
invSlice = invSlice.get("collections");
|
||||
if (!invSlice.isArray()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
for (auto const& it : VPackArrayIterator(invSlice)) {
|
||||
if (!it.isObject()) {
|
||||
continue;
|
||||
}
|
||||
VPackSlice c = it.get("parameters");
|
||||
if (c.hasKey("name") && c.hasKey("globallyUniqueId")) {
|
||||
_translations[c.get("globallyUniqueId").copyString()] = c.get("name").copyString();
|
||||
}
|
||||
}
|
||||
} catch (std::exception const& ex) {
|
||||
LOG_TOPIC(ERR, Logger::REPLICATION) << "got error while fetching inventory: " << ex.what();
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
// look up cuid in translations map
|
||||
auto it = _translations.find(name.copyString());
|
||||
|
||||
if (it != _translations.end()) {
|
||||
return isExcludedCollection((*it).second);
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
|
|
@ -62,11 +62,16 @@ class DatabaseTailingSyncer final : public TailingSyncer {
|
|||
return &(vocbases().begin()->second.database());
|
||||
}
|
||||
|
||||
/// @brief whether or not we should skip a specific marker
|
||||
bool skipMarker(arangodb::velocypack::Slice const& slice) override;
|
||||
|
||||
private:
|
||||
|
||||
/// @brief vocbase to use for this run
|
||||
TRI_vocbase_t* _vocbase;
|
||||
|
||||
|
||||
/// @brief translation between globallyUniqueId and collection name
|
||||
std::unordered_map<std::string, std::string> _translations;
|
||||
};
|
||||
}
|
||||
|
||||
|
|
|
@ -322,6 +322,25 @@ Result GlobalInitialSyncer::updateServerInventory(VPackSlice const& masterDataba
|
|||
return TRI_ERROR_NO_ERROR;
|
||||
}
|
||||
|
||||
/// @brief returns the inventory
|
||||
Result GlobalInitialSyncer::inventory(VPackBuilder& builder) {
|
||||
if (_client == nullptr || _connection == nullptr || _endpoint == nullptr) {
|
||||
return Result(TRI_ERROR_INTERNAL, "invalid endpoint");
|
||||
} else if (application_features::ApplicationServer::isStopping()) {
|
||||
return Result(TRI_ERROR_SHUTTING_DOWN);
|
||||
}
|
||||
|
||||
auto r = sendStartBatch();
|
||||
if (r.fail()) {
|
||||
return r;
|
||||
}
|
||||
|
||||
TRI_DEFER(sendFinishBatch());
|
||||
|
||||
// caller did not supply an inventory, we need to fetch it
|
||||
return fetchInventory(builder);
|
||||
}
|
||||
|
||||
Result GlobalInitialSyncer::fetchInventory(VPackBuilder& builder) {
|
||||
std::string url = ReplicationUrl + "/inventory?serverId=" + _localServerIdString +
|
||||
"&batchId=" + std::to_string(_batchId) + "&global=true";
|
||||
|
|
|
@ -40,6 +40,9 @@ class GlobalInitialSyncer final : public InitialSyncer {
|
|||
/// public method, catches exceptions
|
||||
arangodb::Result run(bool incremental) override;
|
||||
|
||||
/// @brief fetch the server's inventory, public method
|
||||
Result inventory(arangodb::velocypack::Builder& builder);
|
||||
|
||||
private:
|
||||
|
||||
/// @brief run method, performs a full synchronization
|
||||
|
|
|
@ -27,6 +27,9 @@
|
|||
#include "Replication/GlobalInitialSyncer.h"
|
||||
#include "Replication/ReplicationFeature.h"
|
||||
|
||||
#include <velocypack/Iterator.h>
|
||||
#include <velocypack/velocypack-aliases.h>
|
||||
|
||||
using namespace arangodb;
|
||||
using namespace arangodb::basics;
|
||||
using namespace arangodb::httpclient;
|
||||
|
@ -78,3 +81,73 @@ Result GlobalTailingSyncer::saveApplierState() {
|
|||
}
|
||||
return TRI_ERROR_INTERNAL;
|
||||
}
|
||||
|
||||
bool GlobalTailingSyncer::skipMarker(VPackSlice const& slice) {
|
||||
// we do not have a "cname" attribute in the marker...
|
||||
// now check for a globally unique id attribute ("cuid")
|
||||
// if its present, then we will use our local cuid -> collection name
|
||||
// translation table
|
||||
VPackSlice const name = slice.get("cuid");
|
||||
if (!name.isString()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (_translations.empty()) {
|
||||
// no translations yet... query master inventory to find names of all
|
||||
// collections
|
||||
try {
|
||||
GlobalInitialSyncer init(_configuration);
|
||||
VPackBuilder inventoryResponse;
|
||||
Result res = init.inventory(inventoryResponse);
|
||||
if (res.fail()) {
|
||||
LOG_TOPIC(ERR, Logger::REPLICATION) << "got error while fetching master inventory for collection name translations: " << res.errorMessage();
|
||||
return false;
|
||||
}
|
||||
|
||||
VPackSlice invSlice = inventoryResponse.slice();
|
||||
if (!invSlice.isObject()) {
|
||||
return false;
|
||||
}
|
||||
invSlice = invSlice.get("databases");
|
||||
if (!invSlice.isObject()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
for (auto const& it : VPackObjectIterator(invSlice)) {
|
||||
VPackSlice dbObj = it.value;
|
||||
if (!dbObj.isObject()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
dbObj = dbObj.get("collections");
|
||||
if (!dbObj.isArray()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
for (auto const& it : VPackArrayIterator(dbObj)) {
|
||||
if (!it.isObject()) {
|
||||
continue;
|
||||
}
|
||||
VPackSlice c = it.get("parameters");
|
||||
if (c.hasKey("name") && c.hasKey("globallyUniqueId")) {
|
||||
// we'll store everything for all databases in a global hash table,
|
||||
// as we expect the globally unique ids to be unique...
|
||||
_translations[c.get("globallyUniqueId").copyString()] = c.get("name").copyString();
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (std::exception const& ex) {
|
||||
LOG_TOPIC(ERR, Logger::REPLICATION) << "got error while fetching inventory: " << ex.what();
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
// look up cuid in translations map
|
||||
auto it = _translations.find(name.copyString());
|
||||
|
||||
if (it != _translations.end()) {
|
||||
return isExcludedCollection((*it).second);
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
|
|
@ -22,7 +22,7 @@
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
#ifndef ARANGOD_REPLICATION_GLOBAL_CONTINUOUS_SYNCER_H
|
||||
#define ARANGOD_REPLICATION_DATABASE_CONTINUOUS_SYNCER_H 1
|
||||
#define ARANGOD_REPLICATION_GLOBAL_CONTINUOUS_SYNCER_H 1
|
||||
|
||||
#include "TailingSyncer.h"
|
||||
#include "Replication/GlobalReplicationApplier.h"
|
||||
|
@ -45,12 +45,17 @@ class GlobalTailingSyncer : public TailingSyncer {
|
|||
}
|
||||
|
||||
protected:
|
||||
|
||||
/// @brief resolve to proper base url
|
||||
std::string tailingBaseUrl(std::string const& command) override;
|
||||
|
||||
/// @brief save the current applier state
|
||||
Result saveApplierState() override;
|
||||
|
||||
bool skipMarker(arangodb::velocypack::Slice const& slice) override;
|
||||
|
||||
private:
|
||||
/// @brief translation between globallyUniqueId and collection name
|
||||
std::unordered_map<std::string, std::string> _translations;
|
||||
};
|
||||
}
|
||||
|
||||
|
|
|
@ -138,7 +138,7 @@ void TailingSyncer::abortOngoingTransactions() {
|
|||
|
||||
/// @brief whether or not a marker should be skipped
|
||||
bool TailingSyncer::skipMarker(TRI_voc_tick_t firstRegularTick,
|
||||
VPackSlice const& slice) const {
|
||||
VPackSlice const& slice) {
|
||||
bool tooOld = false;
|
||||
std::string const tick = VelocyPackHelper::getStringValue(slice, "tick", "");
|
||||
|
||||
|
@ -177,20 +177,23 @@ bool TailingSyncer::skipMarker(TRI_voc_tick_t firstRegularTick,
|
|||
if (tooOld) {
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
// the transient applier state is just used for one shard / collection
|
||||
if (!_configuration._restrictCollections.empty()) {
|
||||
if (_configuration._restrictType.empty() && _configuration._includeSystem) {
|
||||
return false;
|
||||
}
|
||||
|
||||
VPackSlice const name = slice.get("cname");
|
||||
if (name.isString()) {
|
||||
return isExcludedCollection(name.copyString());
|
||||
}
|
||||
if (_configuration._restrictCollections.empty()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return false;
|
||||
|
||||
if (_configuration._restrictType.empty() && _configuration._includeSystem) {
|
||||
return false;
|
||||
}
|
||||
|
||||
VPackSlice const name = slice.get("cname");
|
||||
if (name.isString()) {
|
||||
return isExcludedCollection(name.copyString());
|
||||
}
|
||||
|
||||
// call virtual method
|
||||
return skipMarker(slice);
|
||||
}
|
||||
|
||||
/// @brief whether or not a collection should be excluded
|
||||
|
|
|
@ -73,7 +73,7 @@ class TailingSyncer : public Syncer {
|
|||
void abortOngoingTransactions();
|
||||
|
||||
/// @brief whether or not a collection should be excluded
|
||||
bool skipMarker(TRI_voc_tick_t, arangodb::velocypack::Slice const&) const;
|
||||
bool skipMarker(TRI_voc_tick_t, arangodb::velocypack::Slice const&);
|
||||
|
||||
/// @brief whether or not a collection should be excluded
|
||||
bool isExcludedCollection(std::string const&) const;
|
||||
|
@ -131,6 +131,7 @@ class TailingSyncer : public Syncer {
|
|||
arangodb::Result runInternal();
|
||||
|
||||
protected:
|
||||
virtual bool skipMarker(arangodb::velocypack::Slice const& slice) = 0;
|
||||
|
||||
/// @brief pointer to the applier
|
||||
ReplicationApplier* _applier;
|
||||
|
|
|
@ -76,6 +76,12 @@ const compare = function(masterFunc, masterFunc2, slaveFuncOngoing, slaveFuncFin
|
|||
while (replication.globalApplier.state().state.running) {
|
||||
internal.wait(0.1, false);
|
||||
}
|
||||
|
||||
applierConfiguration = applierConfiguration || {};
|
||||
applierConfiguration.endpoint = masterEndpoint;
|
||||
applierConfiguration.username = "root";
|
||||
applierConfiguration.password = "";
|
||||
applierConfiguration.includeSystem = false;
|
||||
|
||||
var syncResult = replication.syncGlobal({
|
||||
endpoint: masterEndpoint,
|
||||
|
@ -83,7 +89,9 @@ const compare = function(masterFunc, masterFunc2, slaveFuncOngoing, slaveFuncFin
|
|||
password: "",
|
||||
verbose: true,
|
||||
includeSystem: false,
|
||||
keepBarrier: false
|
||||
keepBarrier: false,
|
||||
restrictType: applierConfiguration.restrictType,
|
||||
restrictCollections: applierConfiguration.restrictCollections
|
||||
});
|
||||
|
||||
assertTrue(syncResult.hasOwnProperty('lastLogTick'));
|
||||
|
@ -94,12 +102,6 @@ const compare = function(masterFunc, masterFunc2, slaveFuncOngoing, slaveFuncFin
|
|||
// use lastLogTick as of now
|
||||
state.lastLogTick = replication.logger.state().state.lastLogTick;
|
||||
|
||||
applierConfiguration = applierConfiguration || {};
|
||||
applierConfiguration.endpoint = masterEndpoint;
|
||||
applierConfiguration.username = "root";
|
||||
applierConfiguration.password = "";
|
||||
applierConfiguration.includeSystem = false;
|
||||
|
||||
if (!applierConfiguration.hasOwnProperty('chunkSize')) {
|
||||
applierConfiguration.chunkSize = 16384;
|
||||
}
|
||||
|
@ -160,6 +162,66 @@ function BaseTestConfig() {
|
|||
'use strict';
|
||||
|
||||
return {
|
||||
|
||||
testIncludeCollection: function() {
|
||||
connectToMaster();
|
||||
|
||||
compare(
|
||||
function(state) {
|
||||
},
|
||||
|
||||
function(state) {
|
||||
db._create(cn);
|
||||
db._create(cn + "2");
|
||||
for (var i = 0; i < 100; ++i) {
|
||||
db._collection(cn).save({ value: i });
|
||||
db._collection(cn + "2").save({ value: i });
|
||||
}
|
||||
internal.wal.flush(true, true);
|
||||
},
|
||||
|
||||
function(state) {
|
||||
return true;
|
||||
},
|
||||
|
||||
function(state) {
|
||||
assertTrue(db._collection(cn).count() === 100);
|
||||
assertNull(db._collection(cn + "2"));
|
||||
},
|
||||
|
||||
{ restrictType: "include", restrictCollections: [cn] }
|
||||
);
|
||||
},
|
||||
|
||||
testExcludeCollection: function() {
|
||||
connectToMaster();
|
||||
|
||||
compare(
|
||||
function(state) {
|
||||
},
|
||||
|
||||
function(state) {
|
||||
db._create(cn);
|
||||
db._create(cn + "2");
|
||||
for (var i = 0; i < 100; ++i) {
|
||||
db._collection(cn).save({ value: i });
|
||||
db._collection(cn + "2").save({ value: i });
|
||||
}
|
||||
internal.wal.flush(true, true);
|
||||
},
|
||||
|
||||
function(state) {
|
||||
return true;
|
||||
},
|
||||
|
||||
function(state) {
|
||||
assertTrue(db._collection(cn).count() === 100);
|
||||
assertNull(db._collection(cn + "2"));
|
||||
},
|
||||
|
||||
{ restrictType: "exclude", restrictCollections: [cn + "2"] }
|
||||
);
|
||||
},
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief test collection creation
|
||||
|
|
Loading…
Reference in New Issue