1
0
Fork 0

Do not allow replication to create/drop collections (#2898)

In the cluster case the only one who is allowed to do this is the schmutz
This commit is contained in:
m0ppers 2017-07-28 14:24:02 +02:00 committed by Frank Celler
parent 634574ed9f
commit 9a0bc716d0
7 changed files with 32 additions and 11 deletions

View File

@ -2893,7 +2893,7 @@ void MMFilesRestReplicationHandler::handleCommandMakeSlave() {
std::string errorMsg = "";
{
InitialSyncer syncer(_vocbase, &config, config._restrictCollections,
restrictType, false);
restrictType, false, false);
res = TRI_ERROR_NO_ERROR;
@ -3014,7 +3014,7 @@ void MMFilesRestReplicationHandler::handleCommandSync() {
MMFilesLogfileManager::instance()->waitForSync(5.0);
InitialSyncer syncer(_vocbase, &config, restrictCollections, restrictType,
verbose);
verbose, false);
std::string errorMsg = "";

View File

@ -256,7 +256,7 @@ retry:
try {
InitialSyncer syncer(
_vocbase, &_configuration, _configuration._restrictCollections,
_configuration._restrictType, _configuration._verbose);
_configuration._restrictType, _configuration._verbose, false);
res = syncer.run(errorMsg, _configuration._incremental);

View File

@ -65,7 +65,7 @@ InitialSyncer::InitialSyncer(
TRI_vocbase_t* vocbase,
TRI_replication_applier_configuration_t const* configuration,
std::unordered_map<std::string, bool> const& restrictCollections,
std::string const& restrictType, bool verbose)
std::string const& restrictType, bool verbose, bool skipCreateDrop)
: Syncer(vocbase, configuration),
_progress("not started"),
_restrictCollections(restrictCollections),
@ -77,7 +77,8 @@ InitialSyncer::InitialSyncer(
_includeSystem(false),
_chunkSize(configuration->_chunkSize),
_verbose(verbose),
_hasFlushed(false) {
_hasFlushed(false),
_skipCreateDrop(skipCreateDrop) {
if (_chunkSize == 0) {
_chunkSize = (uint64_t)2 * 1024 * 1024; // 2 mb
} else if (_chunkSize < 128 * 1024) {
@ -1132,6 +1133,10 @@ int InitialSyncer::handleCollection(VPackSlice const& parameters,
}
} else {
// regular collection
if (_skipCreateDrop) {
setProgress("dropping " + collectionMsg + " skipped because of configuration");
return TRI_ERROR_NO_ERROR;
}
setProgress("dropping " + collectionMsg);
int res = _vocbase->dropCollection(col, true, -1.0);
@ -1160,7 +1165,13 @@ int InitialSyncer::handleCollection(VPackSlice const& parameters,
}
}
std::string const progress = "creating " + collectionMsg;
std::string progress = "creating " + collectionMsg;
if (_skipCreateDrop) {
progress += " skipped because of configuration";
setProgress(progress.c_str());
return TRI_ERROR_NO_ERROR;
}
setProgress(progress.c_str());
int res = createCollection(parameters, &col);

View File

@ -98,7 +98,7 @@ class InitialSyncer : public Syncer {
public:
InitialSyncer(TRI_vocbase_t*, TRI_replication_applier_configuration_t const*,
std::unordered_map<std::string, bool> const&,
std::string const&, bool verbose);
std::string const&, bool verbose, bool skipCreateDrop);
~InitialSyncer();
@ -331,6 +331,11 @@ class InitialSyncer : public Syncer {
static size_t const MaxChunkSize;
// in the cluster case it is a total NOGO to create or drop collections
// because this HAS to be handled in the schmutz. otherwise it forgets who
// the leader was etc.
bool _skipCreateDrop;
};
}

View File

@ -1489,7 +1489,7 @@ void RocksDBRestReplicationHandler::handleCommandMakeSlave() {
std::string errorMsg = "";
{
InitialSyncer syncer(_vocbase, &config, config._restrictCollections,
restrictType, false);
restrictType, false, false);
res = TRI_ERROR_NO_ERROR;
@ -1607,7 +1607,7 @@ void RocksDBRestReplicationHandler::handleCommandSync() {
config._useCollectionId = useCollectionId;
InitialSyncer syncer(_vocbase, &config, restrictCollections, restrictType,
verbose);
verbose, false);
std::string errorMsg = "";

View File

@ -254,6 +254,11 @@ static void JS_SynchronizeReplication(
verbose = TRI_ObjectToBoolean(object->Get(TRI_V8_ASCII_STRING("verbose")));
}
bool skipCreateDrop = false;
if (object->Has(TRI_V8_ASCII_STRING("skipCreateDrop"))) {
skipCreateDrop = TRI_ObjectToBoolean(object->Get(TRI_V8_ASCII_STRING("skipCreateDrop")));
}
if (endpoint.empty()) {
TRI_V8_THROW_EXCEPTION_PARAMETER("<endpoint> must be a valid endpoint");
}
@ -319,7 +324,7 @@ static void JS_SynchronizeReplication(
std::string errorMsg = "";
InitialSyncer syncer(vocbase, &config, restrictCollections, restrictType,
verbose);
verbose, skipCreateDrop);
if (!leaderId.empty()) {
syncer.setLeaderId(leaderId);
}

View File

@ -545,7 +545,7 @@ function synchronizeOneShard (database, shard, planId, leader) {
let startTime = new Date();
sy = rep.syncCollection(shard,
{ endpoint: ep, incremental: true, keepBarrier: true,
useCollectionId: false, leaderId: leader });
useCollectionId: false, leaderId: leader, skipCreateDrop: true });
let endTime = new Date();
let longSync = false;
if (endTime - startTime > 5000) {