1
0
Fork 0

Fix synchronous replication: use shard name instead of ID.

Explanation: Different replicas of the same shard will have different
local collection IDs. Therefore we have to use the shard name
(collection name) to indentify collections in synchronous replication.
This commit is contained in:
Max Neunhoeffer 2016-06-13 13:54:32 +02:00
parent e287c4fdb3
commit 1f926fc24b
11 changed files with 74 additions and 22 deletions

View File

@ -490,8 +490,10 @@ int ContinuousSyncer::processDocument(TRI_replication_operation_e type,
isSystem = (!cnameString.empty() && cnameString[0] == '_');
if (!cnameString.empty()) {
TRI_vocbase_col_t* col =
TRI_LookupCollectionByNameVocBase(_vocbase, cnameString.c_str());
TRI_vocbase_col_t* col = nullptr;
if (_useCollectionId) {
col = TRI_LookupCollectionByNameVocBase(_vocbase, cnameString.c_str());
}
if (col != nullptr && col->_cid != cid) {
// cid change? this may happen for system collections or if we restored
@ -751,7 +753,10 @@ int ContinuousSyncer::renameCollection(VPackSlice const& slice) {
}
TRI_voc_cid_t const cid = getCid(slice);
TRI_vocbase_col_t* col = TRI_LookupCollectionByIdVocBase(_vocbase, cid);
TRI_vocbase_col_t* col = nullptr;
if (_useCollectionId) {
col = TRI_LookupCollectionByIdVocBase(_vocbase, cid);
}
if (col == nullptr && !cname.empty()) {
col = TRI_LookupCollectionByNameVocBase(_vocbase, cname.c_str());
@ -776,7 +781,11 @@ int ContinuousSyncer::changeCollection(VPackSlice const& slice) {
TRI_voc_cid_t cid = getCid(slice);
std::string const cname = getCName(slice);
TRI_vocbase_col_t* col = TRI_LookupCollectionByIdVocBase(_vocbase, cid);
TRI_vocbase_col_t* col = nullptr;
if (col == nullptr) {
TRI_LookupCollectionByIdVocBase(_vocbase, cid);
}
if (col == nullptr && !cname.empty()) {
col = TRI_LookupCollectionByNameVocBase(_vocbase, cname.c_str());

View File

@ -1705,7 +1705,10 @@ int InitialSyncer::handleCollection(VPackSlice const& parameters,
if (phase == PHASE_DROP_CREATE) {
if (!incremental) {
// first look up the collection by the cid
TRI_vocbase_col_t* col = TRI_LookupCollectionByIdVocBase(_vocbase, cid);
TRI_vocbase_col_t* col = nullptr;
if (_useCollectionId) {
col = TRI_LookupCollectionByIdVocBase(_vocbase, cid);
}
if (col == nullptr && !masterName.empty()) {
// not found, try name next
@ -1774,7 +1777,9 @@ int InitialSyncer::handleCollection(VPackSlice const& parameters,
TRI_vocbase_col_t* col = nullptr;
if (incremental) {
if (_useCollectionId) {
col = TRI_LookupCollectionByIdVocBase(_vocbase, cid);
}
if (col == nullptr && !masterName.empty()) {
// not found, try name next
@ -1812,7 +1817,10 @@ int InitialSyncer::handleCollection(VPackSlice const& parameters,
std::string const progress = "dumping data for " + collectionMsg;
setProgress(progress.c_str());
TRI_vocbase_col_t* col = TRI_LookupCollectionByIdVocBase(_vocbase, cid);
TRI_vocbase_col_t* col = nullptr;
if (_useCollectionId) {
col = TRI_LookupCollectionByIdVocBase(_vocbase, cid);
}
if (col == nullptr && !masterName.empty()) {
// not found, try name next

View File

@ -57,7 +57,7 @@ class InitialSyncer : public Syncer {
public:
InitialSyncer(TRI_vocbase_t*, TRI_replication_applier_configuration_t const*,
std::unordered_map<std::string, bool> const&,
std::string const&, bool);
std::string const&, bool verbose);
~InitialSyncer();

View File

@ -79,6 +79,7 @@ Syncer::Syncer(TRI_vocbase_t* vocbase,
_localServerIdString = StringUtils::itoa(_localServerId);
_configuration.update(configuration);
_useCollectionId = _configuration._useCollectionId;
_masterInfo._endpoint = configuration->_endpoint;
@ -439,7 +440,10 @@ int Syncer::createCollection(VPackSlice const& slice, TRI_vocbase_col_t** dst) {
TRI_col_type_e const type = static_cast<TRI_col_type_e>(VelocyPackHelper::getNumericValue<int>(
slice, "type", static_cast<int>(TRI_COL_TYPE_DOCUMENT)));
TRI_vocbase_col_t* col = TRI_LookupCollectionByIdVocBase(_vocbase, cid);
TRI_vocbase_col_t* col = nullptr;
if (_useCollectionId) {
col = TRI_LookupCollectionByIdVocBase(_vocbase, cid);
}
if (col == nullptr) {
// try looking up the collection by name then
@ -480,7 +484,11 @@ int Syncer::createCollection(VPackSlice const& slice, TRI_vocbase_col_t** dst) {
int Syncer::dropCollection(VPackSlice const& slice, bool reportError) {
TRI_voc_cid_t const cid = getCid(slice);
TRI_vocbase_col_t* col = TRI_LookupCollectionByIdVocBase(_vocbase, cid);
TRI_vocbase_col_t* col = nullptr;
if (_useCollectionId) {
col = TRI_LookupCollectionByIdVocBase(_vocbase, cid);
}
if (col == nullptr) {
std::string cname = getCName(slice);

View File

@ -248,6 +248,12 @@ class Syncer {
int _barrierTtl;
//////////////////////////////////////////////////////////////////////////////
/// @brief whether or not to use collection ids in replication
//////////////////////////////////////////////////////////////////////////////
bool _useCollectionId;
//////////////////////////////////////////////////////////////////////////////
/// @brief base url of the replication API
//////////////////////////////////////////////////////////////////////////////

View File

@ -3061,6 +3061,8 @@ void RestReplicationHandler::handleCommandMakeSlave() {
VelocyPackHelper::getBooleanValue(body, "verbose", defaults._verbose);
config._incremental = VelocyPackHelper::getBooleanValue(
body, "incremental", defaults._incremental);
config._useCollectionId = VelocyPackHelper::getBooleanValue(
body, "useCollectionId", defaults._useCollectionId);
config._requireFromPresent = VelocyPackHelper::getBooleanValue(
body, "requireFromPresent", defaults._requireFromPresent);
config._restrictType = VelocyPackHelper::getStringValue(
@ -3215,6 +3217,8 @@ void RestReplicationHandler::handleCommandSync() {
VelocyPackHelper::getBooleanValue(body, "incremental", false);
bool const keepBarrier =
VelocyPackHelper::getBooleanValue(body, "keepBarrier", false);
bool const useCollectionId =
VelocyPackHelper::getBooleanValue(body, "useCollectionId", true);
std::unordered_map<std::string, bool> restrictCollections;
VPackSlice const restriction = body.get("restrictCollections");
@ -3248,6 +3252,7 @@ void RestReplicationHandler::handleCommandSync() {
config._password = password;
config._includeSystem = includeSystem;
config._verbose = verbose;
config._useCollectionId = useCollectionId;
// wait until all data in current logfile got synced
arangodb::wal::LogfileManager::instance()->waitForSync(5.0);

View File

@ -310,6 +310,11 @@ static void JS_SynchronizeReplication(
TRI_ObjectToBoolean(object->Get(TRI_V8_ASCII_STRING("keepBarrier")));
}
if (object->Has(TRI_V8_ASCII_STRING("useCollectionId"))) {
config._useCollectionId =
TRI_ObjectToBoolean(object->Get(TRI_V8_ASCII_STRING("useCollectionId")));
}
std::string errorMsg = "";
InitialSyncer syncer(vocbase, &config, restrictCollections, restrictType,
verbose);

View File

@ -210,6 +210,12 @@ static int LoadConfiguration(TRI_vocbase_t* vocbase,
config->_incremental = value.getBoolean();
}
value = slice.get("useCollectionId");
if (value.isBoolean()) {
config->_useCollectionId = value.getBoolean();
}
value = slice.get("ignoreErrors");
if (value.isNumber()) {
@ -463,6 +469,7 @@ TRI_replication_applier_configuration_t::
_requireFromPresent(false),
_incremental(false),
_verbose(false),
_useCollectionId(true),
_restrictType(),
_restrictCollections() {}
@ -509,6 +516,7 @@ void TRI_replication_applier_configuration_t::toVelocyPack(
builder.add("requireFromPresent", VPackValue(_requireFromPresent));
builder.add("verbose", VPackValue(_verbose));
builder.add("incremental", VPackValue(_incremental));
builder.add("useCollectionId", VPackValue(_useCollectionId));
builder.add("restrictType", VPackValue(_restrictType));
builder.add("restrictCollections", VPackValue(VPackValueType::Array));
@ -800,6 +808,7 @@ void TRI_replication_applier_configuration_t::update(
_requireFromPresent = src->_requireFromPresent;
_verbose = src->_verbose;
_incremental = src->_incremental;
_useCollectionId = src->_useCollectionId;
_restrictType = src->_restrictType;
_restrictCollections = src->_restrictCollections;
_connectionRetryWaitTime = src->_connectionRetryWaitTime;

View File

@ -65,6 +65,7 @@ class TRI_replication_applier_configuration_t {
bool _requireFromPresent;
bool _incremental;
bool _verbose;
bool _useCollectionId;
std::string _restrictType;
std::unordered_map<std::string, bool> _restrictCollections;

View File

@ -1022,7 +1022,7 @@ function synchronizeOneShard(database, shard, planId, leader) {
try {
sy = rep.syncCollection(shard,
{ endpoint: ep, incremental: true,
keepBarrier: true });
keepBarrier: true, useCollectionId: false });
break;
}
catch (err) {
@ -1063,8 +1063,7 @@ function synchronizeOneShard(database, shard, planId, leader) {
if (lockJobId !== false) {
try {
var sy2 = rep.syncCollectionFinalize(
database, shard, sy.collections[0].id,
sy.lastLogTick, { endpoint: ep });
database, shard, sy.lastLogTick, { endpoint: ep });
if (sy2.error) {
console.error("synchronizeOneShard: Could not synchronize shard",
shard, sy2);

View File

@ -205,11 +205,11 @@ var mType = {
REPLICATION_MARKER_REMOVE: 2302
};
function syncCollectionFinalize(database, collname, collid, from, config) {
function syncCollectionFinalize(database, collname, from, config) {
var url = endpointToURL(config.endpoint) + "/_db/" + database +
"/_api/replication/logger-follow?collection=" + collid + "&from=";
"/_api/replication/logger-follow?collection=" + collname + "&from=";
var coll = require("internal").db[collid];
var coll = require("internal").db[collname];
var transactions = {};
@ -237,11 +237,12 @@ function syncCollectionFinalize(database, collname, collid, from, config) {
return;
}
catch (err) {
console.debug("syncCollectionFinalize: insert1", entry, JSON.stringify(err));
}
try {
coll.replace(entry.data._key, entry.data, {isRestore: true});
} catch (errx) {
console.error("syncCollectionFinalize: replace1", entry, errx);
console.error("syncCollectionFinalize: replace1", entry, JSON.stringify(errx));
throw errx;
}
} else if (entry.type === mType.REPLICATION_MARKER_EDGE) {
@ -253,11 +254,12 @@ function syncCollectionFinalize(database, collname, collid, from, config) {
return;
}
catch (err) {
console.debug("syncCollectionFinalize: insert2", entry, JSON.stringify(err));
}
try {
coll.replace(entry.key, entry.data, {isRestore: true});
} catch (errx) {
console.error("syncCollectionFinalize: replace2", entry, errx);
console.error("syncCollectionFinalize: replace2", entry, JSON.stringify(errx));
throw errx;
}
} else if (entry.type === mType.REPLICATION_MARKER_REMOVE) {
@ -267,7 +269,7 @@ function syncCollectionFinalize(database, collname, collid, from, config) {
try {
coll.remove(entry.key);
} catch (errx) {
console.error("syncCollectionFinalize: remove", entry, errx);
console.error("syncCollectionFinalize: remove", entry, JSON.stringify(errx));
throw errx;
}
} else if (entry.type === mType.REPLICATION_TRANSACTION_START) {
@ -286,21 +288,21 @@ function syncCollectionFinalize(database, collname, collid, from, config) {
try {
coll.ensureIndex(entry.index);
} catch(errx) {
console.error("syncCollectionFinalize: ensureIndex", entry, errx);
console.error("syncCollectionFinalize: ensureIndex", entry, JSON.stringify(errx));
throw errx;
}
} else if (entry.type === mType.REPLICATION_INDEX_DROP) {
try {
coll.dropIndex(entry.id);
} catch(errx) {
console.error("syncCollectionFinalize: dropIndex", entry, errx);
console.error("syncCollectionFinalize: dropIndex", entry, JSON.stringify(errx));
throw errx;
}
} else if (entry.type === mType.REPLICATION_COLLECTION_CHANGE) {
try {
coll.properties(entry.collection);
} catch(errx) {
console.error("syncCollectionFinalize: properties", entry, errx);
console.error("syncCollectionFinalize: properties", entry, JSON.stringify(errx));
throw errx;
}
} else {