1
0
Fork 0

Merge branch 'devel' of https://github.com/arangodb/arangodb into devel

This commit is contained in:
Mark 2016-06-13 14:37:16 +02:00
commit 822fb3282f
13 changed files with 89 additions and 38 deletions

View File

@ -490,8 +490,10 @@ int ContinuousSyncer::processDocument(TRI_replication_operation_e type,
isSystem = (!cnameString.empty() && cnameString[0] == '_');
if (!cnameString.empty()) {
TRI_vocbase_col_t* col =
TRI_LookupCollectionByNameVocBase(_vocbase, cnameString.c_str());
TRI_vocbase_col_t* col = nullptr;
if (_useCollectionId) {
col = TRI_LookupCollectionByNameVocBase(_vocbase, cnameString.c_str());
}
if (col != nullptr && col->_cid != cid) {
// cid change? this may happen for system collections or if we restored
@ -751,7 +753,10 @@ int ContinuousSyncer::renameCollection(VPackSlice const& slice) {
}
TRI_voc_cid_t const cid = getCid(slice);
TRI_vocbase_col_t* col = TRI_LookupCollectionByIdVocBase(_vocbase, cid);
TRI_vocbase_col_t* col = nullptr;
if (_useCollectionId) {
col = TRI_LookupCollectionByIdVocBase(_vocbase, cid);
}
if (col == nullptr && !cname.empty()) {
col = TRI_LookupCollectionByNameVocBase(_vocbase, cname.c_str());
@ -776,7 +781,11 @@ int ContinuousSyncer::changeCollection(VPackSlice const& slice) {
TRI_voc_cid_t cid = getCid(slice);
std::string const cname = getCName(slice);
TRI_vocbase_col_t* col = TRI_LookupCollectionByIdVocBase(_vocbase, cid);
TRI_vocbase_col_t* col = nullptr;
if (col == nullptr) {
TRI_LookupCollectionByIdVocBase(_vocbase, cid);
}
if (col == nullptr && !cname.empty()) {
col = TRI_LookupCollectionByNameVocBase(_vocbase, cname.c_str());

View File

@ -1705,7 +1705,10 @@ int InitialSyncer::handleCollection(VPackSlice const& parameters,
if (phase == PHASE_DROP_CREATE) {
if (!incremental) {
// first look up the collection by the cid
TRI_vocbase_col_t* col = TRI_LookupCollectionByIdVocBase(_vocbase, cid);
TRI_vocbase_col_t* col = nullptr;
if (_useCollectionId) {
col = TRI_LookupCollectionByIdVocBase(_vocbase, cid);
}
if (col == nullptr && !masterName.empty()) {
// not found, try name next
@ -1774,7 +1777,9 @@ int InitialSyncer::handleCollection(VPackSlice const& parameters,
TRI_vocbase_col_t* col = nullptr;
if (incremental) {
col = TRI_LookupCollectionByIdVocBase(_vocbase, cid);
if (_useCollectionId) {
col = TRI_LookupCollectionByIdVocBase(_vocbase, cid);
}
if (col == nullptr && !masterName.empty()) {
// not found, try name next
@ -1812,7 +1817,10 @@ int InitialSyncer::handleCollection(VPackSlice const& parameters,
std::string const progress = "dumping data for " + collectionMsg;
setProgress(progress.c_str());
TRI_vocbase_col_t* col = TRI_LookupCollectionByIdVocBase(_vocbase, cid);
TRI_vocbase_col_t* col = nullptr;
if (_useCollectionId) {
col = TRI_LookupCollectionByIdVocBase(_vocbase, cid);
}
if (col == nullptr && !masterName.empty()) {
// not found, try name next

View File

@ -57,7 +57,7 @@ class InitialSyncer : public Syncer {
public:
InitialSyncer(TRI_vocbase_t*, TRI_replication_applier_configuration_t const*,
std::unordered_map<std::string, bool> const&,
std::string const&, bool);
std::string const&, bool verbose);
~InitialSyncer();

View File

@ -79,6 +79,7 @@ Syncer::Syncer(TRI_vocbase_t* vocbase,
_localServerIdString = StringUtils::itoa(_localServerId);
_configuration.update(configuration);
_useCollectionId = _configuration._useCollectionId;
_masterInfo._endpoint = configuration->_endpoint;
@ -439,7 +440,10 @@ int Syncer::createCollection(VPackSlice const& slice, TRI_vocbase_col_t** dst) {
TRI_col_type_e const type = static_cast<TRI_col_type_e>(VelocyPackHelper::getNumericValue<int>(
slice, "type", static_cast<int>(TRI_COL_TYPE_DOCUMENT)));
TRI_vocbase_col_t* col = TRI_LookupCollectionByIdVocBase(_vocbase, cid);
TRI_vocbase_col_t* col = nullptr;
if (_useCollectionId) {
col = TRI_LookupCollectionByIdVocBase(_vocbase, cid);
}
if (col == nullptr) {
// try looking up the collection by name then
@ -480,7 +484,11 @@ int Syncer::createCollection(VPackSlice const& slice, TRI_vocbase_col_t** dst) {
int Syncer::dropCollection(VPackSlice const& slice, bool reportError) {
TRI_voc_cid_t const cid = getCid(slice);
TRI_vocbase_col_t* col = TRI_LookupCollectionByIdVocBase(_vocbase, cid);
TRI_vocbase_col_t* col = nullptr;
if (_useCollectionId) {
col = TRI_LookupCollectionByIdVocBase(_vocbase, cid);
}
if (col == nullptr) {
std::string cname = getCName(slice);

View File

@ -247,6 +247,12 @@ class Syncer {
//////////////////////////////////////////////////////////////////////////////
int _barrierTtl;
//////////////////////////////////////////////////////////////////////////////
/// @brief whether or not to use collection ids in replication
//////////////////////////////////////////////////////////////////////////////
bool _useCollectionId;
//////////////////////////////////////////////////////////////////////////////
/// @brief base url of the replication API

View File

@ -3061,6 +3061,8 @@ void RestReplicationHandler::handleCommandMakeSlave() {
VelocyPackHelper::getBooleanValue(body, "verbose", defaults._verbose);
config._incremental = VelocyPackHelper::getBooleanValue(
body, "incremental", defaults._incremental);
config._useCollectionId = VelocyPackHelper::getBooleanValue(
body, "useCollectionId", defaults._useCollectionId);
config._requireFromPresent = VelocyPackHelper::getBooleanValue(
body, "requireFromPresent", defaults._requireFromPresent);
config._restrictType = VelocyPackHelper::getStringValue(
@ -3215,6 +3217,8 @@ void RestReplicationHandler::handleCommandSync() {
VelocyPackHelper::getBooleanValue(body, "incremental", false);
bool const keepBarrier =
VelocyPackHelper::getBooleanValue(body, "keepBarrier", false);
bool const useCollectionId =
VelocyPackHelper::getBooleanValue(body, "useCollectionId", true);
std::unordered_map<std::string, bool> restrictCollections;
VPackSlice const restriction = body.get("restrictCollections");
@ -3248,6 +3252,7 @@ void RestReplicationHandler::handleCommandSync() {
config._password = password;
config._includeSystem = includeSystem;
config._verbose = verbose;
config._useCollectionId = useCollectionId;
// wait until all data in current logfile got synced
arangodb::wal::LogfileManager::instance()->waitForSync(5.0);

View File

@ -310,6 +310,11 @@ static void JS_SynchronizeReplication(
TRI_ObjectToBoolean(object->Get(TRI_V8_ASCII_STRING("keepBarrier")));
}
if (object->Has(TRI_V8_ASCII_STRING("useCollectionId"))) {
config._useCollectionId =
TRI_ObjectToBoolean(object->Get(TRI_V8_ASCII_STRING("useCollectionId")));
}
std::string errorMsg = "";
InitialSyncer syncer(vocbase, &config, restrictCollections, restrictType,
verbose);

View File

@ -210,6 +210,12 @@ static int LoadConfiguration(TRI_vocbase_t* vocbase,
config->_incremental = value.getBoolean();
}
value = slice.get("useCollectionId");
if (value.isBoolean()) {
config->_useCollectionId = value.getBoolean();
}
value = slice.get("ignoreErrors");
if (value.isNumber()) {
@ -463,6 +469,7 @@ TRI_replication_applier_configuration_t::
_requireFromPresent(false),
_incremental(false),
_verbose(false),
_useCollectionId(true),
_restrictType(),
_restrictCollections() {}
@ -509,6 +516,7 @@ void TRI_replication_applier_configuration_t::toVelocyPack(
builder.add("requireFromPresent", VPackValue(_requireFromPresent));
builder.add("verbose", VPackValue(_verbose));
builder.add("incremental", VPackValue(_incremental));
builder.add("useCollectionId", VPackValue(_useCollectionId));
builder.add("restrictType", VPackValue(_restrictType));
builder.add("restrictCollections", VPackValue(VPackValueType::Array));
@ -800,6 +808,7 @@ void TRI_replication_applier_configuration_t::update(
_requireFromPresent = src->_requireFromPresent;
_verbose = src->_verbose;
_incremental = src->_incremental;
_useCollectionId = src->_useCollectionId;
_restrictType = src->_restrictType;
_restrictCollections = src->_restrictCollections;
_connectionRetryWaitTime = src->_connectionRetryWaitTime;

View File

@ -65,6 +65,7 @@ class TRI_replication_applier_configuration_t {
bool _requireFromPresent;
bool _incremental;
bool _verbose;
bool _useCollectionId;
std::string _restrictType;
std::unordered_map<std::string, bool> _restrictCollections;

View File

@ -1022,7 +1022,7 @@ function synchronizeOneShard(database, shard, planId, leader) {
try {
sy = rep.syncCollection(shard,
{ endpoint: ep, incremental: true,
keepBarrier: true });
keepBarrier: true, useCollectionId: false });
break;
}
catch (err) {
@ -1063,8 +1063,7 @@ function synchronizeOneShard(database, shard, planId, leader) {
if (lockJobId !== false) {
try {
var sy2 = rep.syncCollectionFinalize(
database, shard, sy.collections[0].id,
sy.lastLogTick, { endpoint: ep });
database, shard, sy.lastLogTick, { endpoint: ep });
if (sy2.error) {
console.error("synchronizeOneShard: Could not synchronize shard",
shard, sy2);

View File

@ -334,7 +334,6 @@ function mountController(service, mount, filename) {
}
exports.routeService = function (service, throwOnErrors) {
const defaultDocument = service.manifest.defaultDocument;
let error = null;
service.routes = {
@ -353,21 +352,6 @@ exports.routeService = function (service, throwOnErrors) {
}
};
if (defaultDocument) {
// only add redirection if src and target are not the same
service.routes.routes.push({
url: {match: '/'},
action: {
do: '@arangodb/actions/redirectRequest',
options: {
permanently: false,
destination: defaultDocument,
relative: true
}
}
});
}
try {
// mount all controllers
let controllerFiles = service.manifest.controllers;

View File

@ -141,6 +141,21 @@ exports.routeService = function (service, throwOnErrors) {
error = routeLegacyService(service, throwOnErrors);
}
const defaultDocument = service.manifest.defaultDocument;
if (defaultDocument) {
service.routes.routes.push({
url: {match: '/'},
action: {
do: '@arangodb/actions/redirectRequest',
options: {
permanently: false,
destination: defaultDocument,
relative: true
}
}
});
}
if (service.manifest.files) {
const files = service.manifest.files;
_.each(files, function (file, path) {

View File

@ -205,11 +205,11 @@ var mType = {
REPLICATION_MARKER_REMOVE: 2302
};
function syncCollectionFinalize(database, collname, collid, from, config) {
function syncCollectionFinalize(database, collname, from, config) {
var url = endpointToURL(config.endpoint) + "/_db/" + database +
"/_api/replication/logger-follow?collection=" + collid + "&from=";
"/_api/replication/logger-follow?collection=" + collname + "&from=";
var coll = require("internal").db[collid];
var coll = require("internal").db[collname];
var transactions = {};
@ -237,11 +237,12 @@ function syncCollectionFinalize(database, collname, collid, from, config) {
return;
}
catch (err) {
console.debug("syncCollectionFinalize: insert1", entry, JSON.stringify(err));
}
try {
coll.replace(entry.data._key, entry.data, {isRestore: true});
} catch (errx) {
console.error("syncCollectionFinalize: replace1", entry, errx);
console.error("syncCollectionFinalize: replace1", entry, JSON.stringify(errx));
throw errx;
}
} else if (entry.type === mType.REPLICATION_MARKER_EDGE) {
@ -253,11 +254,12 @@ function syncCollectionFinalize(database, collname, collid, from, config) {
return;
}
catch (err) {
console.debug("syncCollectionFinalize: insert2", entry, JSON.stringify(err));
}
try {
coll.replace(entry.key, entry.data, {isRestore: true});
} catch (errx) {
console.error("syncCollectionFinalize: replace2", entry, errx);
console.error("syncCollectionFinalize: replace2", entry, JSON.stringify(errx));
throw errx;
}
} else if (entry.type === mType.REPLICATION_MARKER_REMOVE) {
@ -267,7 +269,7 @@ function syncCollectionFinalize(database, collname, collid, from, config) {
try {
coll.remove(entry.key);
} catch (errx) {
console.error("syncCollectionFinalize: remove", entry, errx);
console.error("syncCollectionFinalize: remove", entry, JSON.stringify(errx));
throw errx;
}
} else if (entry.type === mType.REPLICATION_TRANSACTION_START) {
@ -286,21 +288,21 @@ function syncCollectionFinalize(database, collname, collid, from, config) {
try {
coll.ensureIndex(entry.index);
} catch(errx) {
console.error("syncCollectionFinalize: ensureIndex", entry, errx);
console.error("syncCollectionFinalize: ensureIndex", entry, JSON.stringify(errx));
throw errx;
}
} else if (entry.type === mType.REPLICATION_INDEX_DROP) {
try {
coll.dropIndex(entry.id);
} catch(errx) {
console.error("syncCollectionFinalize: dropIndex", entry, errx);
console.error("syncCollectionFinalize: dropIndex", entry, JSON.stringify(errx));
throw errx;
}
} else if (entry.type === mType.REPLICATION_COLLECTION_CHANGE) {
try {
coll.properties(entry.collection);
} catch(errx) {
console.error("syncCollectionFinalize: properties", entry, errx);
console.error("syncCollectionFinalize: properties", entry, JSON.stringify(errx));
throw errx;
}
} else {