1
0
Fork 0

optionally restrict logger-follow to a single collection

This commit is contained in:
Jan Steemann 2016-02-10 16:49:22 +01:00
parent b580a7df22
commit 23dfbeee46
6 changed files with 34 additions and 17 deletions

View File

@ -876,7 +876,7 @@ void RestReplicationHandler::handleCommandLoggerFollow() {
} }
for (auto const& id : VPackArrayIterator(slice)) { for (auto const& id : VPackArrayIterator(slice)) {
if (id.isString()) { if (!id.isString()) {
generateError(HttpResponse::BAD, TRI_ERROR_HTTP_BAD_PARAMETER, generateError(HttpResponse::BAD, TRI_ERROR_HTTP_BAD_PARAMETER,
"invalid body value. expecting array of ids"); "invalid body value. expecting array of ids");
return; return;
@ -884,6 +884,23 @@ void RestReplicationHandler::handleCommandLoggerFollow() {
transactionIds.emplace(StringUtils::uint64(id.copyString())); transactionIds.emplace(StringUtils::uint64(id.copyString()));
} }
} }
// extract collection
TRI_voc_cid_t cid = 0;
value = _request->value("collection", found);
if (found) {
TRI_vocbase_col_t* c =
TRI_LookupCollectionByNameVocBase(_vocbase, value);
if (c == nullptr) {
generateError(HttpResponse::NOT_FOUND,
TRI_ERROR_ARANGO_COLLECTION_NOT_FOUND);
return;
}
cid = c->_cid;
}
if (barrierId > 0) { if (barrierId > 0) {
// extend the WAL logfile barrier // extend the WAL logfile barrier
@ -895,7 +912,7 @@ void RestReplicationHandler::handleCommandLoggerFollow() {
try { try {
// initialize the dump container // initialize the dump container
TRI_replication_dump_t dump(_vocbase, (size_t)determineChunkSize(), TRI_replication_dump_t dump(_vocbase, (size_t)determineChunkSize(),
includeSystem); includeSystem, cid);
// and dump // and dump
res = TRI_DumpLogReplication(&dump, transactionIds, firstRegularTick, res = TRI_DumpLogReplication(&dump, transactionIds, firstRegularTick,
@ -995,7 +1012,7 @@ void RestReplicationHandler::handleCommandDetermineOpenTransactions() {
try { try {
// initialize the dump container // initialize the dump container
TRI_replication_dump_t dump(_vocbase, (size_t)determineChunkSize(), false); TRI_replication_dump_t dump(_vocbase, (size_t)determineChunkSize(), false, 0);
// and dump // and dump
res = TRI_DetermineOpenTransactionsReplication(&dump, tickStart, tickEnd); res = TRI_DetermineOpenTransactionsReplication(&dump, tickStart, tickEnd);
@ -1211,7 +1228,7 @@ int RestReplicationHandler::createCollection(VPackSlice const& slice,
} }
} }
const TRI_col_type_e type = static_cast<TRI_col_type_e>( TRI_col_type_e const type = static_cast<TRI_col_type_e>(
arangodb::basics::VelocyPackHelper::getNumericValue<int>( arangodb::basics::VelocyPackHelper::getNumericValue<int>(
slice, "type", (int)TRI_COL_TYPE_DOCUMENT)); slice, "type", (int)TRI_COL_TYPE_DOCUMENT));
@ -2949,7 +2966,7 @@ void RestReplicationHandler::handleCommandDump() {
// initialize the dump container // initialize the dump container
TRI_replication_dump_t dump(_vocbase, (size_t)determineChunkSize(), TRI_replication_dump_t dump(_vocbase, (size_t)determineChunkSize(),
includeSystem); includeSystem, 0);
res = res =
TRI_DumpCollectionReplication(&dump, col, tickStart, tickEnd, withTicks, TRI_DumpCollectionReplication(&dump, col, tickStart, tickEnd, withTicks,

View File

@ -150,7 +150,7 @@ static void JS_LastLoggerReplication(
"REPLICATION_LOGGER_LAST(<fromTick>, <toTick>)"); "REPLICATION_LOGGER_LAST(<fromTick>, <toTick>)");
} }
TRI_replication_dump_t dump(vocbase, 0, true); TRI_replication_dump_t dump(vocbase, 0, true, 0);
TRI_voc_tick_t tickStart = TRI_ObjectToUInt64(args[0], true); TRI_voc_tick_t tickStart = TRI_ObjectToUInt64(args[0], true);
TRI_voc_tick_t tickEnd = TRI_ObjectToUInt64(args[1], true); TRI_voc_tick_t tickEnd = TRI_ObjectToUInt64(args[1], true);

View File

@ -1124,9 +1124,10 @@ static bool MustReplicateWalMarker(
if (dump->_vocbase->_id != GetDatabaseFromWalMarker(marker)) { if (dump->_vocbase->_id != GetDatabaseFromWalMarker(marker)) {
return false; return false;
} }
// finally check if the marker is for a collection that we want to ignore // finally check if the marker is for a collection that we want to ignore
TRI_voc_cid_t cid = GetCollectionFromWalMarker(marker); TRI_voc_cid_t cid = GetCollectionFromWalMarker(marker);
if (cid != 0) { if (cid != 0) {
char const* name = NameFromCid(dump, cid); char const* name = NameFromCid(dump, cid);
@ -1135,7 +1136,13 @@ static bool MustReplicateWalMarker(
return false; return false;
} }
} }
if (dump->_restrictCollection > 0 && cid != dump->_restrictCollection) {
// restrict output to a single collection, but a different one
return false;
}
if (marker->_tick >= firstRegularTick) { if (marker->_tick >= firstRegularTick) {
return true; return true;
} }

View File

@ -41,13 +41,14 @@ class TRI_vocbase_col_t;
struct TRI_replication_dump_t { struct TRI_replication_dump_t {
TRI_replication_dump_t(TRI_vocbase_t* vocbase, size_t chunkSize, TRI_replication_dump_t(TRI_vocbase_t* vocbase, size_t chunkSize,
bool includeSystem) bool includeSystem, TRI_voc_cid_t restrictCollection)
: _vocbase(vocbase), : _vocbase(vocbase),
_buffer(nullptr), _buffer(nullptr),
_chunkSize(chunkSize), _chunkSize(chunkSize),
_lastFoundTick(0), _lastFoundTick(0),
_lastSid(0), _lastSid(0),
_lastShape(nullptr), _lastShape(nullptr),
_restrictCollection(restrictCollection),
_collectionNames(), _collectionNames(),
_failed(false), _failed(false),
_bufferFull(false), _bufferFull(false),
@ -79,6 +80,7 @@ struct TRI_replication_dump_t {
TRI_voc_tick_t _lastFoundTick; TRI_voc_tick_t _lastFoundTick;
TRI_shape_sid_t _lastSid; TRI_shape_sid_t _lastSid;
struct TRI_shape_s const* _lastShape; struct TRI_shape_s const* _lastShape;
TRI_voc_cid_t _restrictCollection;
std::unordered_map<TRI_voc_cid_t, std::string> _collectionNames; std::unordered_map<TRI_voc_cid_t, std::string> _collectionNames;
bool _failed; bool _failed;
bool _bufferFull; bool _bufferFull;

View File

@ -137,10 +137,6 @@ function ReplicationSuite() {
connectToMaster(); connectToMaster();
masterFunc2(state); masterFunc2(state);
if (typeof applierConfiguration === 'object') {
console.log("using special applier configuration: " + JSON.stringify(applierConfiguration));
}
applierConfiguration = applierConfiguration || {}; applierConfiguration = applierConfiguration || {};
applierConfiguration.endpoint = masterEndpoint; applierConfiguration.endpoint = masterEndpoint;
applierConfiguration.username = "root"; applierConfiguration.username = "root";

View File

@ -130,11 +130,6 @@ function ReplicationSuite() {
assertTrue(syncResult.hasOwnProperty('lastLogTick')); assertTrue(syncResult.hasOwnProperty('lastLogTick'));
if (typeof applierConfiguration === 'object') {
console.log("using special applier configuration: " +
JSON.stringify(applierConfiguration));
}
applierConfiguration = applierConfiguration || {}; applierConfiguration = applierConfiguration || {};
applierConfiguration.endpoint = masterEndpoint; applierConfiguration.endpoint = masterEndpoint;
applierConfiguration.username = replicatorUser; applierConfiguration.username = replicatorUser;