1
0
Fork 0

Allow WAL logger to split up transactions (#6800)

This commit is contained in:
Simon 2018-10-12 15:15:55 +02:00 committed by Jan
parent fb051ca0bf
commit 010b54c81e
30 changed files with 560 additions and 360 deletions

View File

@ -414,8 +414,12 @@ Status WriteBatch::Iterate(Handler* handler) const {
char tag = 0; char tag = 0;
uint32_t column_family = 0; // default uint32_t column_family = 0; // default
bool last_was_try_again = false; bool last_was_try_again = false;
while (((s.ok() && !input.empty()) || UNLIKELY(s.IsTryAgain())) && bool handlerContinue = true;
handler->Continue()) { while (((s.ok() && !input.empty()) || UNLIKELY(s.IsTryAgain()))) {
if (!(handlerContinue = handler->Continue())) {
break;
}
if (LIKELY(!s.IsTryAgain())) { if (LIKELY(!s.IsTryAgain())) {
last_was_try_again = false; last_was_try_again = false;
tag = 0; tag = 0;
@ -583,7 +587,7 @@ Status WriteBatch::Iterate(Handler* handler) const {
if (!s.ok()) { if (!s.ok()) {
return s; return s;
} }
if (found != WriteBatchInternal::Count(this)) { if (handlerContinue && found != WriteBatchInternal::Count(this)) {
return Status::Corruption("WriteBatch has wrong count"); return Status::Corruption("WriteBatch has wrong count");
} else { } else {
return Status::OK(); return Status::OK();

View File

@ -7,10 +7,10 @@
@RESTQUERYPARAMETERS @RESTQUERYPARAMETERS
@RESTQUERYPARAM{from,number,optional} @RESTQUERYPARAM{from,number,optional}
Lower bound tick value for results. Exclusive lower bound tick value for results.
@RESTQUERYPARAM{to,number,optional} @RESTQUERYPARAM{to,number,optional}
Upper bound tick value for results. Inclusive upper bound tick value for results.
@RESTQUERYPARAM{chunkSize,number,optional} @RESTQUERYPARAM{chunkSize,number,optional}
Approximate maximum size of the returned result. Approximate maximum size of the returned result.

View File

@ -7,10 +7,18 @@
@RESTQUERYPARAMETERS @RESTQUERYPARAMETERS
@RESTQUERYPARAM{from,number,optional} @RESTQUERYPARAM{from,number,optional}
Lower bound tick value for results. Exclusive lower bound tick value for results. On successive calls
to this API you should set this to the value returned
with the *x-arango-replication-lastincluded* header (Unless that header
contains 0).
@RESTQUERYPARAM{to,number,optional} @RESTQUERYPARAM{to,number,optional}
Upper bound tick value for results. Inclusive upper bound tick value for results.
@RESTQUERYPARAM{lastScanned,number,optional}
Should be set to the value of the *x-arango-replication-lastscanned* header
or alternatively 0 on first try. This allows the rocksdb engine to break up
large transactions over multiple responses.
@RESTQUERYPARAM{global,bool,optional} @RESTQUERYPARAM{global,bool,optional}
Whether operations for all databases should be included. When set to *false* Whether operations for all databases should be included. When set to *false*
@ -107,6 +115,8 @@ The response will also contain the following HTTP headers:
- *x-arango-replication-lastscanned*: the last tick the server scanned while - *x-arango-replication-lastscanned*: the last tick the server scanned while
computing the operation log. This might include operations the server did not computing the operation log. This might include operations the server did not
returned to you due to various reasons (i.e. the value was filtered or skipped). returned to you due to various reasons (i.e. the value was filtered or skipped).
You may use this value in the *lastScanned* header to allow the rocksdb engine
to break up requests over multiple responses.
- *x-arango-replication-lasttick*: the last tick value the server has - *x-arango-replication-lasttick*: the last tick value the server has
logged in its write ahead log (not necessarily included in the result). By comparing the the last logged in its write ahead log (not necessarily included in the result). By comparing the the last

View File

@ -438,7 +438,7 @@ void MMFilesRestReplicationHandler::handleCommandLoggerFollow() {
StringUtils::itoa(state.lastCommittedTick)); StringUtils::itoa(state.lastCommittedTick));
_response->setHeaderNC(StaticStrings::ReplicationHeaderLastScanned, _response->setHeaderNC(StaticStrings::ReplicationHeaderLastScanned,
StringUtils::itoa(dump._lastScannedTick)); StringUtils::itoa(dump._lastScannedTick));
_response->setHeaderNC(StaticStrings::ReplicationHeaderActive, "true"); _response->setHeaderNC(StaticStrings::ReplicationHeaderActive, "true"); // TODO remove
_response->setHeaderNC(StaticStrings::ReplicationHeaderFromPresent, _response->setHeaderNC(StaticStrings::ReplicationHeaderFromPresent,
dump._fromTickIncluded ? "true" : "false"); dump._fromTickIncluded ? "true" : "false");

View File

@ -83,22 +83,20 @@ TRI_voc_tick_t MMFilesWalAccess::lastTick() const {
/// should return the list of transactions started, but not committed in that /// should return the list of transactions started, but not committed in that
/// range (range can be adjusted) /// range (range can be adjusted)
WalAccessResult MMFilesWalAccess::openTransactions( WalAccessResult MMFilesWalAccess::openTransactions(WalAccess::Filter const& filter,
uint64_t tickStart, uint64_t tickEnd, WalAccess::Filter const& filter, TransactionCallback const& cb) const {
TransactionCallback const& cb) const {
LOG_TOPIC(TRACE, arangodb::Logger::REPLICATION) LOG_TOPIC(TRACE, arangodb::Logger::REPLICATION)
<< "determining transactions, tick range " << tickStart << " - " << "determining transactions, tick range " << filter.tickStart << " - "
<< tickEnd; << filter.tickEnd;
std::unordered_map<TRI_voc_tid_t, TRI_voc_tick_t> transactions; std::unordered_map<TRI_voc_tid_t, TRI_voc_tick_t> transactions;
MMFilesLogfileManager* mgr = MMFilesLogfileManager::instance();
// ask the logfile manager which datafiles qualify // ask the logfile manager which datafiles qualify
bool fromTickIncluded = false; bool fromTickIncluded = false;
std::vector<arangodb::MMFilesWalLogfile*> logfiles = auto logfiles = mgr->getLogfilesForTickRange(filter.tickStart, filter.tickEnd, fromTickIncluded);
MMFilesLogfileManager::instance()->getLogfilesForTickRange(
tickStart, tickEnd, fromTickIncluded);
// always return the logfiles we have used // always return the logfiles we have used
TRI_DEFER(MMFilesLogfileManager::instance()->returnLogfiles(logfiles)); TRI_DEFER(mgr->returnLogfiles(logfiles));
// setup some iteration state // setup some iteration state
TRI_voc_tick_t lastFoundTick = 0; TRI_voc_tick_t lastFoundTick = 0;
@ -139,12 +137,12 @@ WalAccessResult MMFilesWalAccess::openTransactions(
// get the marker's tick and check whether we should include it // get the marker's tick and check whether we should include it
TRI_voc_tick_t const foundTick = marker->getTick(); TRI_voc_tick_t const foundTick = marker->getTick();
if (foundTick <= tickStart) { if (foundTick <= filter.tickStart) {
// marker too old // marker too old
continue; continue;
} }
if (foundTick > tickEnd) { if (foundTick > filter.tickEnd) {
// marker too new // marker too new
break; break;
} }
@ -408,7 +406,7 @@ struct MMFilesWalAccessContext : WalAccessContext {
return TRI_ERROR_NO_ERROR; return TRI_ERROR_NO_ERROR;
} }
WalAccessResult tail(uint64_t tickStart, uint64_t tickEnd, size_t chunkSize) { WalAccessResult tail(size_t chunkSize) {
MMFilesLogfileManagerState const state = MMFilesLogfileManagerState const state =
MMFilesLogfileManager::instance()->state(); MMFilesLogfileManager::instance()->state();
@ -416,7 +414,7 @@ struct MMFilesWalAccessContext : WalAccessContext {
bool fromTickIncluded = false; bool fromTickIncluded = false;
std::vector<arangodb::MMFilesWalLogfile*> logfiles = std::vector<arangodb::MMFilesWalLogfile*> logfiles =
MMFilesLogfileManager::instance()->getLogfilesForTickRange( MMFilesLogfileManager::instance()->getLogfilesForTickRange(
tickStart, tickEnd, fromTickIncluded); _filter.tickStart, _filter.tickEnd, fromTickIncluded);
// always return the logfiles we have used // always return the logfiles we have used
TRI_DEFER(MMFilesLogfileManager::instance()->returnLogfiles(logfiles)); TRI_DEFER(MMFilesLogfileManager::instance()->returnLogfiles(logfiles));
@ -482,19 +480,19 @@ struct MMFilesWalAccessContext : WalAccessContext {
// get the marker's tick and check whether we should include it // get the marker's tick and check whether we should include it
TRI_voc_tick_t foundTick = marker->getTick(); TRI_voc_tick_t foundTick = marker->getTick();
if (foundTick <= tickEnd) { if (foundTick <= _filter.tickEnd) {
lastScannedTick = foundTick; lastScannedTick = foundTick;
} }
if (foundTick <= tickStart) { if (foundTick <= _filter.tickStart) {
// marker too old // marker too old
continue; continue;
} }
if (foundTick >= tickEnd) { if (foundTick >= _filter.tickEnd) {
hasMore = false; hasMore = false;
if (foundTick > tickEnd) { if (foundTick > _filter.tickEnd) {
// marker too new // marker too new
break; break;
} }
@ -559,10 +557,9 @@ struct MMFilesWalAccessContext : WalAccessContext {
}; };
/// Tails the wall, this will already sanitize the /// Tails the wall, this will already sanitize the
WalAccessResult MMFilesWalAccess::tail(uint64_t tickStart, uint64_t tickEnd, WalAccessResult MMFilesWalAccess::tail(WalAccess::Filter const& filter,
size_t chunkSize, size_t chunkSize,
TRI_voc_tid_t barrierId, TRI_voc_tid_t barrierId,
WalAccess::Filter const& filter,
MarkerCallback const& callback) const { MarkerCallback const& callback) const {
/*OG_TOPIC(WARN, Logger::REPLICATION) /*OG_TOPIC(WARN, Logger::REPLICATION)
<< "1. Starting tailing: tickStart " << tickStart << " tickEnd " << "1. Starting tailing: tickStart " << tickStart << " tickEnd "
@ -571,13 +568,12 @@ WalAccessResult MMFilesWalAccess::tail(uint64_t tickStart, uint64_t tickEnd,
filter.firstRegularTick;*/ filter.firstRegularTick;*/
LOG_TOPIC(TRACE, arangodb::Logger::REPLICATION) LOG_TOPIC(TRACE, arangodb::Logger::REPLICATION)
<< "dumping log, tick range " << tickStart << " - " << tickEnd; << "dumping log, tick range " << filter.tickStart << " - " << filter.tickEnd;
if (barrierId > 0) { if (barrierId > 0) {
// extend the WAL logfile barrier // extend the WAL logfile barrier
MMFilesLogfileManager::instance()->extendLogfileBarrier(barrierId, 180, MMFilesLogfileManager::instance()->extendLogfileBarrier(barrierId, 180, filter.tickStart);
tickStart);
} }
MMFilesWalAccessContext ctx(filter, callback); MMFilesWalAccessContext ctx(filter, callback);
return ctx.tail(tickStart, tickEnd, chunkSize); return ctx.tail(chunkSize);
} }

View File

@ -49,14 +49,12 @@ class MMFilesWalAccess final : public WalAccess {
/// should return the list of transactions started, but not committed in that /// should return the list of transactions started, but not committed in that
/// range (range can be adjusted) /// range (range can be adjusted)
WalAccessResult openTransactions(uint64_t tickStart, uint64_t tickEnd, WalAccessResult openTransactions(WalAccess::Filter const& filter,
WalAccess::Filter const& filter,
TransactionCallback const&) const override; TransactionCallback const&) const override;
/// Tails the wall, this will already sanitize the /// Tails the wall, this will already sanitize the
WalAccessResult tail(uint64_t tickStart, uint64_t tickEnd, size_t chunkSize, WalAccessResult tail(WalAccess::Filter const& filter, size_t chunkSize,
TRI_voc_tid_t barrierId, WalAccess::Filter const& filter, TRI_voc_tid_t barrierId, MarkerCallback const&) const override;
MarkerCallback const&) const override;
}; };
} }

View File

@ -117,7 +117,9 @@ Result GlobalInitialSyncer::runInternal(bool incremental) {
} }
LOG_TOPIC(DEBUG, Logger::REPLICATION) << "created logfile barrier"; LOG_TOPIC(DEBUG, Logger::REPLICATION) << "created logfile barrier";
TRI_DEFER(sendRemoveBarrier()); TRI_DEFER(if (!_state.isChildSyncer) {
_state.barrier.remove(_state.connection);
});
if (!_state.isChildSyncer) { if (!_state.isChildSyncer) {
// start batch is required for the inventory request // start batch is required for the inventory request

View File

@ -382,9 +382,8 @@ Syncer::Syncer(ReplicationApplierConfiguration const& configuration)
} }
Syncer::~Syncer() { Syncer::~Syncer() {
try { if (!_state.isChildSyncer) {
sendRemoveBarrier(); _state.barrier.remove(_state.connection);
} catch (...) {
} }
} }
@ -411,32 +410,6 @@ TRI_voc_tick_t Syncer::stealBarrier() {
return id; return id;
} }
/// @brief send a "remove barrier" command
Result Syncer::sendRemoveBarrier() {
if (_state.isChildSyncer || _state.barrier.id == 0) {
return Result();
}
try {
std::string const url = replutils::ReplicationUrl + "/barrier/" +
basics::StringUtils::itoa(_state.barrier.id);
// send request
std::unique_ptr<httpclient::SimpleHttpResult> response(
_state.connection.client->retryRequest(rest::RequestType::DELETE_REQ,
url, nullptr, 0));
if (replutils::hasFailed(response.get())) {
return replutils::buildHttpError(response.get(), url, _state.connection);
}
_state.barrier.id = 0;
_state.barrier.updateTime = 0;
return Result();
} catch (...) {
return Result(TRI_ERROR_INTERNAL);
}
}
void Syncer::setAborted(bool value) { _state.connection.setAborted(value); } void Syncer::setAborted(bool value) { _state.connection.setAborted(value); }
bool Syncer::isAborted() const { return _state.connection.isAborted(); } bool Syncer::isAborted() const { return _state.connection.isAborted(); }

View File

@ -166,9 +166,6 @@ class Syncer : public std::enable_shared_from_this<Syncer> {
void setLeaderId(std::string const& leaderId) { _state.leaderId = leaderId; } void setLeaderId(std::string const& leaderId) { _state.leaderId = leaderId; }
/// @brief send a "remove barrier" command
Result sendRemoveBarrier();
// TODO worker-safety // TODO worker-safety
void setAborted(bool value); void setAborted(bool value);

View File

@ -159,32 +159,33 @@ void TailingSyncer::abortOngoingTransactions() noexcept {
/// @brief whether or not a marker should be skipped /// @brief whether or not a marker should be skipped
bool TailingSyncer::skipMarker(TRI_voc_tick_t firstRegularTick, bool TailingSyncer::skipMarker(TRI_voc_tick_t firstRegularTick,
VPackSlice const& slice) { VPackSlice const& slice) {
TRI_ASSERT(slice.isObject());
bool tooOld = false; bool tooOld = false;
std::string const tick = VelocyPackHelper::getStringValue(slice, "tick", ""); VPackSlice tickSlice = slice.get("tick");
if (!tick.empty()) { if (tickSlice.isString() && tickSlice.getStringLength() > 0) {
tooOld = (NumberUtils::atoi_zero<TRI_voc_tick_t>( VPackValueLength len = 0;
tick.data(), tick.data() + tick.size()) < firstRegularTick); char const* str = tickSlice.getStringUnchecked(len);
tooOld = (NumberUtils::atoi_zero<TRI_voc_tick_t>(str, str + len) < firstRegularTick);
if (tooOld) { if (tooOld) {
int typeValue = VelocyPackHelper::getNumericValue<int>(slice, "type", 0); int typeValue = VelocyPackHelper::getNumericValue<int>(slice, "type", 0);
// handle marker type // handle marker type
TRI_replication_operation_e type = TRI_replication_operation_e type =
static_cast<TRI_replication_operation_e>(typeValue); static_cast<TRI_replication_operation_e>(typeValue);
if (type == REPLICATION_MARKER_DOCUMENT || if (type == REPLICATION_MARKER_DOCUMENT ||
type == REPLICATION_MARKER_REMOVE || type == REPLICATION_MARKER_REMOVE ||
type == REPLICATION_TRANSACTION_START || type == REPLICATION_TRANSACTION_START ||
type == REPLICATION_TRANSACTION_ABORT || type == REPLICATION_TRANSACTION_ABORT ||
type == REPLICATION_TRANSACTION_COMMIT) { type == REPLICATION_TRANSACTION_COMMIT) {
// read "tid" entry from marker // read "tid" entry from marker
std::string const id = VPackSlice tidSlice = slice.get("tid");
VelocyPackHelper::getStringValue(slice, "tid", ""); if (tidSlice.isString() && tidSlice.getStringLength() > 0) {
str = tidSlice.getStringUnchecked(len);
if (!id.empty()) { TRI_voc_tid_t tid = NumberUtils::atoi_zero<TRI_voc_tid_t>(str, str + len);
TRI_voc_tid_t tid = NumberUtils::atoi_zero<TRI_voc_tid_t>(
id.data(), id.data() + id.size());
if (tid > 0 && if (tid > 0 &&
_ongoingTransactions.find(tid) != _ongoingTransactions.end()) { _ongoingTransactions.find(tid) != _ongoingTransactions.end()) {
// must still use this marker as it belongs to a transaction we need // must still use this marker as it belongs to a transaction we need
@ -1074,7 +1075,9 @@ Result TailingSyncer::runInternal() {
setAborted(false); setAborted(false);
TRI_DEFER(sendRemoveBarrier()); TRI_DEFER(if (!_state.isChildSyncer) {
_state.barrier.remove(_state.connection);
});
uint64_t shortTermFailsInRow = 0; uint64_t shortTermFailsInRow = 0;
retry: retry:
@ -1371,10 +1374,8 @@ Result TailingSyncer::runContinuousSync() {
// get the applier into a sensible start state by fetching the list of // get the applier into a sensible start state by fetching the list of
// open transactions from the master // open transactions from the master
TRI_voc_tick_t fetchTick = safeResumeTick; TRI_voc_tick_t fetchTick = safeResumeTick;
if (safeResumeTick > 0 && safeResumeTick == fromTick) { TRI_voc_tick_t lastScannedTick = safeResumeTick; // hint where server MAY scan from
// special case in which from and to are equal if (safeResumeTick <= 0 || safeResumeTick != fromTick) {
fetchTick = safeResumeTick;
} else {
// adjust fetchTick so we can tail starting from the tick containing // adjust fetchTick so we can tail starting from the tick containing
// the open transactions we did not commit locally // the open transactions we did not commit locally
Result res = fetchOpenTransactions(safeResumeTick, fromTick, fetchTick); Result res = fetchOpenTransactions(safeResumeTick, fromTick, fetchTick);
@ -1416,7 +1417,8 @@ Result TailingSyncer::runContinuousSync() {
Result res = processMasterLog( Result res = processMasterLog(
sharedStatus, sharedStatus,
fetchTick, fetchTick,
lastScannedTick,
fromTick, fromTick,
_state.applier._ignoreErrors, _state.applier._ignoreErrors,
worked, worked,
@ -1628,21 +1630,30 @@ Result TailingSyncer::fetchOpenTransactions(TRI_voc_tick_t fromTick,
} }
/// @brief fetch data for the continuous synchronization /// @brief fetch data for the continuous synchronization
/// @param fetchTick tick from which we want results
/// @param lastScannedTick tick which the server MAY start scanning from
/// @param firstRegularTick if we got openTransactions server will return the
/// only operations belonging to these for ticks < firstRegularTick
void TailingSyncer::fetchMasterLog(std::shared_ptr<Syncer::JobSynchronizer> sharedStatus, void TailingSyncer::fetchMasterLog(std::shared_ptr<Syncer::JobSynchronizer> sharedStatus,
TRI_voc_tick_t fetchTick, TRI_voc_tick_t fetchTick,
TRI_voc_tick_t lastScannedTick,
TRI_voc_tick_t firstRegularTick) { TRI_voc_tick_t firstRegularTick) {
try { try {
std::string const url = std::string const url =
tailingBaseUrl("tail") + tailingBaseUrl("tail") +
"chunkSize=" + StringUtils::itoa(_state.applier._chunkSize) + "chunkSize=" + StringUtils::itoa(_state.applier._chunkSize) +
"&barrier=" + StringUtils::itoa(_state.barrier.id) + "&barrier=" + StringUtils::itoa(_state.barrier.id) +
"&from=" + StringUtils::itoa(fetchTick) + "&from=" + StringUtils::itoa(fetchTick) +
"&firstRegular=" + StringUtils::itoa(firstRegularTick) + "&lastScanned=" + StringUtils::itoa(lastScannedTick) +
(firstRegularTick > fetchTick
? "&firstRegular=" + StringUtils::itoa(firstRegularTick) : "") +
"&serverId=" + _state.localServerIdString + "&serverId=" + _state.localServerIdString +
"&includeSystem=" + (_state.applier._includeSystem ? "true" : "false"); "&includeSystem=" + (_state.applier._includeSystem ? "true" : "false");
// send request // send request
setProgress(std::string("fetching master log from tick ") + StringUtils::itoa(fetchTick) + setProgress(std::string("fetching master log from tick ") + StringUtils::itoa(fetchTick) +
", last scanned tick " + StringUtils::itoa(lastScannedTick) +
", first regular tick " + StringUtils::itoa(firstRegularTick) + ", first regular tick " + StringUtils::itoa(firstRegularTick) +
", barrier: " + StringUtils::itoa(_state.barrier.id) + ", barrier: " + StringUtils::itoa(_state.barrier.id) +
", open transactions: " + std::to_string(_ongoingTransactions.size()) + ", open transactions: " + std::to_string(_ongoingTransactions.size()) +
@ -1686,7 +1697,7 @@ void TailingSyncer::fetchMasterLog(std::shared_ptr<Syncer::JobSynchronizer> shar
/// @brief apply continuous synchronization data from a batch /// @brief apply continuous synchronization data from a batch
Result TailingSyncer::processMasterLog(std::shared_ptr<Syncer::JobSynchronizer> sharedStatus, Result TailingSyncer::processMasterLog(std::shared_ptr<Syncer::JobSynchronizer> sharedStatus,
TRI_voc_tick_t& fetchTick, TRI_voc_tick_t& fetchTick, TRI_voc_tick_t& lastScannedTick,
TRI_voc_tick_t firstRegularTick, TRI_voc_tick_t firstRegularTick,
uint64_t& ignoreCount, bool& worked, bool& mustFetchBatch) { uint64_t& ignoreCount, bool& worked, bool& mustFetchBatch) {
LOG_TOPIC(TRACE, Logger::REPLICATION) LOG_TOPIC(TRACE, Logger::REPLICATION)
@ -1699,7 +1710,7 @@ Result TailingSyncer::processMasterLog(std::shared_ptr<Syncer::JobSynchronizer>
TRI_ASSERT(mustFetchBatch || _workInParallel); TRI_ASSERT(mustFetchBatch || _workInParallel);
if (mustFetchBatch) { if (mustFetchBatch) {
fetchMasterLog(sharedStatus, fetchTick, firstRegularTick); fetchMasterLog(sharedStatus, fetchTick, lastScannedTick, firstRegularTick);
} }
// make sure that on the next invocation we will fetch a new batch // make sure that on the next invocation we will fetch a new batch
@ -1735,7 +1746,7 @@ Result TailingSyncer::processMasterLog(std::shared_ptr<Syncer::JobSynchronizer>
// was the specified from value included the result? // was the specified from value included the result?
bool const fromIncluded = getBoolHeader(response, StaticStrings::ReplicationHeaderFromPresent); bool const fromIncluded = getBoolHeader(response, StaticStrings::ReplicationHeaderFromPresent);
TRI_voc_tick_t const lastScannedTick = getUIntHeader(response, StaticStrings::ReplicationHeaderLastScanned); lastScannedTick = getUIntHeader(response, StaticStrings::ReplicationHeaderLastScanned);
if (!hasHeader(response, StaticStrings::ReplicationHeaderLastIncluded)) { if (!hasHeader(response, StaticStrings::ReplicationHeaderLastIncluded)) {
return Result(TRI_ERROR_REPLICATION_INVALID_RESPONSE, return Result(TRI_ERROR_REPLICATION_INVALID_RESPONSE,
@ -1808,8 +1819,9 @@ Result TailingSyncer::processMasterLog(std::shared_ptr<Syncer::JobSynchronizer>
// (that would be duplicate work) // (that would be duplicate work)
mustFetchBatch = false; mustFetchBatch = false;
auto self = shared_from_this(); auto self = shared_from_this();
sharedStatus->request([this, self, sharedStatus, fetchTick, firstRegularTick]() { sharedStatus->request([this, self, sharedStatus, fetchTick,
fetchMasterLog(sharedStatus, fetchTick, firstRegularTick); lastScannedTick, firstRegularTick]() {
fetchMasterLog(sharedStatus, fetchTick, lastScannedTick, firstRegularTick);
}); });
} }

View File

@ -134,13 +134,19 @@ class TailingSyncer : public Syncer {
arangodb::Result runInternal(); arangodb::Result runInternal();
/// @brief fetch data for the continuous synchronization /// @brief fetch data for the continuous synchronization
/// @param fetchTick tick from which we want results
/// @param lastScannedTick tick which the server MAY start scanning from
/// @param firstRegularTick if we got openTransactions server will return the
/// only operations belonging to these for smaller ticks
void fetchMasterLog(std::shared_ptr<Syncer::JobSynchronizer> sharedStatus, void fetchMasterLog(std::shared_ptr<Syncer::JobSynchronizer> sharedStatus,
TRI_voc_tick_t fetchTick, TRI_voc_tick_t fetchTick,
TRI_voc_tick_t lastScannedTick,
TRI_voc_tick_t firstRegularTick); TRI_voc_tick_t firstRegularTick);
/// @brief apply continuous synchronization data from a batch /// @brief apply continuous synchronization data from a batch
arangodb::Result processMasterLog(std::shared_ptr<Syncer::JobSynchronizer> sharedStatus, arangodb::Result processMasterLog(std::shared_ptr<Syncer::JobSynchronizer> sharedStatus,
TRI_voc_tick_t& fetchTick, TRI_voc_tick_t& fetchTick,
TRI_voc_tick_t& lastScannedTick,
TRI_voc_tick_t firstRegularTick, TRI_voc_tick_t firstRegularTick,
uint64_t& ignoreCount, bool& worked, bool& mustFetchBatch); uint64_t& ignoreCount, bool& worked, bool& mustFetchBatch);

View File

@ -335,6 +335,32 @@ Result BarrierInfo::extend(Connection& connection, TRI_voc_tick_t tick) {
return Result(); return Result();
} }
/// @brief send a "remove barrier" command
Result BarrierInfo::remove(Connection& connection) noexcept {
using basics::StringUtils::itoa;
if (id == 0) {
return Result();
}
try {
std::string const url = replutils::ReplicationUrl + "/barrier/" + itoa(id);
// send request
std::unique_ptr<httpclient::SimpleHttpResult> response(
connection.client->retryRequest(rest::RequestType::DELETE_REQ, url, nullptr, 0));
if (replutils::hasFailed(response.get())) {
return replutils::buildHttpError(response.get(), url, connection);
}
id = 0;
updateTime = 0;
} catch (...) {
return Result(TRI_ERROR_INTERNAL);
}
return Result();
}
constexpr double BatchInfo::DefaultTimeout; constexpr double BatchInfo::DefaultTimeout;
Result BatchInfo::start(replutils::Connection& connection, Result BatchInfo::start(replutils::Connection& connection,

View File

@ -111,10 +111,12 @@ struct BarrierInfo {
/// @brief send a "create barrier" command /// @brief send a "create barrier" command
Result create(Connection&, TRI_voc_tick_t); Result create(Connection&, TRI_voc_tick_t);
/// @brief send an "extend barrier" command /// @brief send an "extend barrier" command
// TODO worker-safety
Result extend(Connection&, TRI_voc_tick_t = 0); // TODO worker safety Result extend(Connection&, TRI_voc_tick_t = 0); // TODO worker safety
/// @brief send remove barrier command
Result remove(Connection&) noexcept;
}; };
struct BatchInfo { struct BatchInfo {
@ -131,9 +133,7 @@ struct BatchInfo {
Result start(Connection& connection, ProgressInfo& progress); Result start(Connection& connection, ProgressInfo& progress);
/// @brief send an "extend batch" command /// @brief send an "extend batch" command
// TODO worker-safety Result extend(Connection& connection, ProgressInfo& progress);
Result extend(Connection& connection,
ProgressInfo& progress); // TODO worker safety
/// @brief send a "finish batch" command /// @brief send a "finish batch" command
// TODO worker-safety // TODO worker-safety

View File

@ -69,6 +69,19 @@ RestWalAccessHandler::RestWalAccessHandler(GeneralRequest* request,
: RestVocbaseBaseHandler(request, response) {} : RestVocbaseBaseHandler(request, response) {}
bool RestWalAccessHandler::parseFilter(WalAccess::Filter& filter) { bool RestWalAccessHandler::parseFilter(WalAccess::Filter& filter) {
// determine start and end tick
filter.tickStart = _request->parsedValue<uint64_t>("from", filter.tickStart);
filter.tickLastScanned = _request->parsedValue<uint64_t>("lastScanned", filter.tickLastScanned);
// determine end tick for dump
filter.tickEnd = _request->parsedValue("to", filter.tickEnd);
if (filter.tickStart > filter.tickEnd || filter.tickEnd == 0) {
generateError(rest::ResponseCode::BAD, TRI_ERROR_HTTP_BAD_PARAMETER,
"invalid from/to values");
return false;
}
bool found = false; bool found = false;
std::string const& value1 = _request->value("global", found); std::string const& value1 = _request->value("global", found);
if (found && StringUtils::boolean(value1)) { if (found && StringUtils::boolean(value1)) {
@ -104,11 +117,7 @@ bool RestWalAccessHandler::parseFilter(WalAccess::Filter& filter) {
// grab list of transactions from the body value // grab list of transactions from the body value
if (_request->requestType() == arangodb::rest::RequestType::PUT) { if (_request->requestType() == arangodb::rest::RequestType::PUT) {
std::string const& value4 = _request->value("firstRegularTick", found); filter.firstRegularTick = _request->parsedValue<uint64_t>("firstRegularTick", 0);
if (found) {
filter.firstRegularTick =
static_cast<TRI_voc_tick_t>(StringUtils::uint64(value4));
}
// copy default options // copy default options
VPackOptions options = VPackOptions::Defaults; VPackOptions options = VPackOptions::Defaults;
@ -232,27 +241,9 @@ void RestWalAccessHandler::handleCommandTail(WalAccess const* wal) {
if (_request->transportType() == Endpoint::TransportType::VST) { if (_request->transportType() == Endpoint::TransportType::VST) {
useVst = true; useVst = true;
} }
// determine start and end tick WalAccess::Filter filter;
TRI_voc_tick_t tickStart = 0; if (!parseFilter(filter)) {
TRI_voc_tick_t tickEnd = UINT64_MAX;
bool found;
std::string const& value1 = _request->value("from", found);
if (found) {
tickStart = static_cast<TRI_voc_tick_t>(StringUtils::uint64(value1));
}
// determine end tick for dump
std::string const& value2 = _request->value("to", found);
if (found) {
tickEnd = static_cast<TRI_voc_tick_t>(StringUtils::uint64(value2));
}
if (found && (tickStart > tickEnd || tickEnd == 0)) {
generateError(rest::ResponseCode::BAD, TRI_ERROR_HTTP_BAD_PARAMETER,
"invalid from/to values");
return; return;
} }
@ -261,13 +252,9 @@ void RestWalAccessHandler::handleCommandTail(WalAccess const* wal) {
// check if a barrier id was specified in request // check if a barrier id was specified in request
TRI_voc_tid_t barrierId = _request->parsedValue("barrier", static_cast<TRI_voc_tid_t>(0)); TRI_voc_tid_t barrierId = _request->parsedValue("barrier", static_cast<TRI_voc_tid_t>(0));
WalAccess::Filter filter;
if (!parseFilter(filter)) {
return;
}
grantTemporaryRights(); grantTemporaryRights();
bool found = false;
size_t chunkSize = 1024 * 1024; size_t chunkSize = 1024 * 1024;
std::string const& value5 = _request->value("chunkSize", found); std::string const& value5 = _request->value("chunkSize", found);
if (found) { if (found) {
@ -296,7 +283,7 @@ void RestWalAccessHandler::handleCommandTail(WalAccess const* wal) {
if (useVst) { if (useVst) {
result = result =
wal->tail(tickStart, tickEnd, chunkSize, barrierId, filter, wal->tail(filter, chunkSize, barrierId,
[&](TRI_vocbase_t* vocbase, VPackSlice const& marker) { [&](TRI_vocbase_t* vocbase, VPackSlice const& marker) {
length++; length++;
@ -318,7 +305,7 @@ void RestWalAccessHandler::handleCommandTail(WalAccess const* wal) {
// note: we need the CustomTypeHandler here // note: we need the CustomTypeHandler here
VPackDumper dumper(&adapter, &opts); VPackDumper dumper(&adapter, &opts);
result = result =
wal->tail(tickStart, tickEnd, chunkSize, barrierId, filter, wal->tail(filter, chunkSize, barrierId,
[&](TRI_vocbase_t* vocbase, VPackSlice const& marker) { [&](TRI_vocbase_t* vocbase, VPackSlice const& marker) {
length++; length++;
@ -352,24 +339,23 @@ void RestWalAccessHandler::handleCommandTail(WalAccess const* wal) {
StringUtils::itoa(result.lastScannedTick())); StringUtils::itoa(result.lastScannedTick()));
_response->setHeaderNC(StaticStrings::ReplicationHeaderLastTick, _response->setHeaderNC(StaticStrings::ReplicationHeaderLastTick,
StringUtils::itoa(result.latestTick())); StringUtils::itoa(result.latestTick()));
_response->setHeaderNC(StaticStrings::ReplicationHeaderActive, "true");
_response->setHeaderNC(StaticStrings::ReplicationHeaderFromPresent, _response->setHeaderNC(StaticStrings::ReplicationHeaderFromPresent,
result.fromTickIncluded() ? "true" : "false"); result.fromTickIncluded() ? "true" : "false");
if (length > 0) { if (length > 0) {
_response->setResponseCode(rest::ResponseCode::OK); _response->setResponseCode(rest::ResponseCode::OK);
LOG_TOPIC(DEBUG, Logger::REPLICATION) << "WAL tailing after " << tickStart LOG_TOPIC(DEBUG, Logger::REPLICATION) << "WAL tailing after " << filter.tickStart
<< ", lastIncludedTick " << result.lastIncludedTick() << ", lastIncludedTick " << result.lastIncludedTick()
<< ", fromTickIncluded " << result.fromTickIncluded(); << ", fromTickIncluded " << result.fromTickIncluded();
} else { } else {
LOG_TOPIC(DEBUG, Logger::REPLICATION) << "No more data in WAL after " << tickStart; LOG_TOPIC(DEBUG, Logger::REPLICATION) << "No more data in WAL after " << filter.tickStart;
_response->setResponseCode(rest::ResponseCode::NO_CONTENT); _response->setResponseCode(rest::ResponseCode::NO_CONTENT);
} }
DatabaseFeature::DATABASE->enumerateDatabases( DatabaseFeature::DATABASE->enumerateDatabases(
[&](TRI_vocbase_t& vocbase)->void { [&](TRI_vocbase_t& vocbase)->void {
vocbase.updateReplicationClient( vocbase.updateReplicationClient(
serverId, tickStart, replutils::BatchInfo::DefaultTimeout serverId, filter.tickStart, replutils::BatchInfo::DefaultTimeout
); );
} }
); );
@ -404,6 +390,8 @@ void RestWalAccessHandler::handleCommandDetermineOpenTransactions(
// check whether a database was specified // check whether a database was specified
WalAccess::Filter filter; WalAccess::Filter filter;
filter.tickStart = minMax.first;
filter.tickEnd = minMax.second;
if (!parseFilter(filter)) { if (!parseFilter(filter)) {
return; return;
} }
@ -412,8 +400,7 @@ void RestWalAccessHandler::handleCommandDetermineOpenTransactions(
VPackBuilder builder(buffer); VPackBuilder builder(buffer);
builder.openArray(); builder.openArray();
WalAccessResult r = WalAccessResult r =
wal->openTransactions(minMax.first, minMax.second, filter, wal->openTransactions(filter, [&](TRI_voc_tick_t tick, TRI_voc_tid_t tid) {
[&](TRI_voc_tick_t tick, TRI_voc_tid_t tid) {
builder.add(VPackValue(std::to_string(tid))); builder.add(VPackValue(std::to_string(tid)));
}); });
builder.close(); builder.close();

View File

@ -166,6 +166,13 @@ void RocksDBReplicationContext::internalBind(
_lastTick = state->sequenceNumber(); _lastTick = state->sequenceNumber();
} }
// we are inserting the current tick (WAL sequence number) here.
// this is ok because the batch creation is the first operation done
// for initial synchronization. the inventory request and collection
// dump requests will all happen after the batch creation, so the
// current tick value here is good
_vocbase->updateReplicationClient(replicationClientId(), _lastTick, _ttl);
} }
/// Bind collection for incremental sync /// Bind collection for incremental sync
@ -696,7 +703,7 @@ bool RocksDBReplicationContext::isDeleted() const {
return _isDeleted; return _isDeleted;
} }
void RocksDBReplicationContext::deleted() { void RocksDBReplicationContext::setDeleted() {
MUTEX_LOCKER(locker, _contextLock); MUTEX_LOCKER(locker, _contextLock);
_isDeleted = true; _isDeleted = true;
} }
@ -734,9 +741,9 @@ bool RocksDBReplicationContext::use(double ttl, bool exclusive) {
ttl = std::max(std::max(_ttl, ttl), replutils::BatchInfo::DefaultTimeout); ttl = std::max(std::max(_ttl, ttl), replutils::BatchInfo::DefaultTimeout);
_expires = TRI_microtime() + ttl; _expires = TRI_microtime() + ttl;
if (_serverId != 0) { // make sure the WAL files are not deleted
_vocbase->updateReplicationClient(_serverId, ttl); _vocbase->updateReplicationClient(replicationClientId(), _lastTick, ttl);
}
return true; return true;
} }
@ -749,17 +756,17 @@ void RocksDBReplicationContext::release() {
if (0 == _users) { if (0 == _users) {
_exclusive = false; _exclusive = false;
} }
if (_serverId != 0) {
double ttl; TRI_ASSERT(_ttl > 0);
if (_ttl > 0.0) { // make sure the WAL files are not deleted immediately
// use TTL as configured _vocbase->updateReplicationClient(replicationClientId(), _lastTick, ttl);
ttl = _ttl; }
} else {
// none configuration. use default /// extend without using the context
ttl = replutils::BatchInfo::DefaultTimeout; void RocksDBReplicationContext::extendLifetime(double ttl) {
} MUTEX_LOCKER(locker, _contextLock);
_vocbase->updateReplicationClient(_serverId, ttl); ttl = std::max(std::max(_ttl, ttl), replutils::BatchInfo::DefaultTimeout);
} _expires = TRI_microtime() + ttl;
} }
void RocksDBReplicationContext::releaseDumpingResources() { void RocksDBReplicationContext::releaseDumpingResources() {

View File

@ -128,15 +128,25 @@ class RocksDBReplicationContext {
size_t chunkSize, size_t offsetInChunk, size_t maxChunkSize, size_t chunkSize, size_t offsetInChunk, size_t maxChunkSize,
std::string const& lowKey, velocypack::Slice const& ids); std::string const& lowKey, velocypack::Slice const& ids);
// lifetime in seconds
double expires() const; double expires() const;
bool isDeleted() const; bool isDeleted() const;
void deleted(); void setDeleted();
bool isUsed() const; bool isUsed() const;
/// set use flag and extend lifetime
bool use(double ttl, bool exclusive); bool use(double ttl, bool exclusive);
/// remove use flag /// remove use flag
void release(); void release();
/// extend lifetime without using the context
void extendLifetime(double ttl);
// buggy clients may not send the serverId
TRI_server_id_t replicationClientId() const {
return _serverId != 0 ? _serverId : _id;
}
private: private:
void releaseDumpingResources(); void releaseDumpingResources();
CollectionIterator* getCollectionIterator(TRI_voc_cid_t cid, bool sorted); CollectionIterator* getCollectionIterator(TRI_voc_cid_t cid, bool sorted);
void internalBind(TRI_vocbase_t& vocbase, bool allowChange = true); void internalBind(TRI_vocbase_t& vocbase, bool allowChange = true);
@ -144,8 +154,8 @@ class RocksDBReplicationContext {
mutable Mutex _contextLock; mutable Mutex _contextLock;
TRI_vocbase_t* _vocbase; TRI_vocbase_t* _vocbase;
TRI_server_id_t const _serverId; TRI_server_id_t const _serverId;
TRI_voc_tick_t const _id; // batch id
TRI_voc_tick_t _id; // batch id
uint64_t _lastTick; // the time at which the snapshot was taken uint64_t _lastTick; // the time at which the snapshot was taken
std::unique_ptr<DatabaseGuard> _guard; std::unique_ptr<DatabaseGuard> _guard;
std::unique_ptr<transaction::Methods> _trx; std::unique_ptr<transaction::Methods> _trx;
@ -159,6 +169,7 @@ class RocksDBReplicationContext {
double const _ttl; double const _ttl;
/// @brief expiration time, updated under lock by ReplicationManager /// @brief expiration time, updated under lock by ReplicationManager
double _expires; double _expires;
/// @brief true if context is deleted, updated under lock by ReplicationManager /// @brief true if context is deleted, updated under lock by ReplicationManager
bool _isDeleted; bool _isDeleted;
/// @brief true if context cannot be used concurrently, updated under lock by ReplicationManager /// @brief true if context cannot be used concurrently, updated under lock by ReplicationManager

View File

@ -147,7 +147,7 @@ bool RocksDBReplicationManager::remove(RocksDBReplicationId id) {
if (context->isUsed()) { if (context->isUsed()) {
// context is in use by someone else. now mark as deleted // context is in use by someone else. now mark as deleted
context->deleted(); context->setDeleted();
return true; return true;
} }
@ -199,6 +199,34 @@ RocksDBReplicationContext* RocksDBReplicationManager::find(
return context; return context;
} }
//////////////////////////////////////////////////////////////////////////////
/// @brief find an existing context by id and extend lifetime
/// may be used concurrently on used contextes
//////////////////////////////////////////////////////////////////////////////
int RocksDBReplicationManager::extendLifetime(RocksDBReplicationId id,
double ttl) {
MUTEX_LOCKER(mutexLocker, _lock);
auto it = _contexts.find(id);
if (it == _contexts.end()) {
// not found
return TRI_ERROR_CURSOR_NOT_FOUND;
}
RocksDBReplicationContext* context = it->second;
TRI_ASSERT(context != nullptr);
if (context->isDeleted()) {
// already deleted
return TRI_ERROR_CURSOR_NOT_FOUND;
}
context->extendLifetime(ttl);
return TRI_ERROR_NO_ERROR;
}
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @brief return a context for later use /// @brief return a context for later use
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
@ -258,7 +286,7 @@ void RocksDBReplicationManager::drop(TRI_vocbase_t* vocbase) {
for (auto& context : _contexts) { for (auto& context : _contexts) {
if (context.second->vocbase() == vocbase) { if (context.second->vocbase() == vocbase) {
context.second->deleted(); context.second->setDeleted();
} }
} }
} }
@ -275,7 +303,7 @@ void RocksDBReplicationManager::dropAll() {
MUTEX_LOCKER(mutexLocker, _lock); MUTEX_LOCKER(mutexLocker, _lock);
for (auto& context : _contexts) { for (auto& context : _contexts) {
context.second->deleted(); context.second->setDeleted();
} }
} }
@ -307,7 +335,7 @@ bool RocksDBReplicationManager::garbageCollect(bool force) {
if (force || context->expires() < now) { if (force || context->expires() < now) {
// expire contexts // expire contexts
context->deleted(); context->setDeleted();
} }
if (context->isDeleted()) { if (context->isDeleted()) {

View File

@ -74,6 +74,13 @@ class RocksDBReplicationManager {
RocksDBReplicationContext* find( RocksDBReplicationContext* find(
RocksDBReplicationId, bool& isBusy, bool exclusive = true, RocksDBReplicationId, bool& isBusy, bool exclusive = true,
double ttl = replutils::BatchInfo::DefaultTimeout); double ttl = replutils::BatchInfo::DefaultTimeout);
//////////////////////////////////////////////////////////////////////////////
/// @brief find an existing context by id and extend lifetime
/// may be used concurrently on used contextes
//////////////////////////////////////////////////////////////////////////////
int extendLifetime(RocksDBReplicationId,
double ttl = replutils::BatchInfo::DefaultTimeout);
////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////
/// @brief return a context for later use /// @brief return a context for later use
@ -81,18 +88,6 @@ class RocksDBReplicationManager {
void release(RocksDBReplicationContext*); void release(RocksDBReplicationContext*);
//////////////////////////////////////////////////////////////////////////////
/// @brief return a context for garbage collection
//////////////////////////////////////////////////////////////////////////////
void destroy(RocksDBReplicationContext*);
//////////////////////////////////////////////////////////////////////////////
/// @brief whether or not the repository contains a used context
//////////////////////////////////////////////////////////////////////////////
bool containsUsedContext();
////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////
/// @brief drop contexts by database (at least mark them as deleted) /// @brief drop contexts by database (at least mark them as deleted)
////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////
@ -117,6 +112,20 @@ class RocksDBReplicationManager {
////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////
void beginShutdown(); void beginShutdown();
private:
//////////////////////////////////////////////////////////////////////////////
/// @brief return a context for garbage collection
//////////////////////////////////////////////////////////////////////////////
void destroy(RocksDBReplicationContext*);
//////////////////////////////////////////////////////////////////////////////
/// @brief whether or not the repository contains a used context
//////////////////////////////////////////////////////////////////////////////
bool containsUsedContext();
private: private:
////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////

View File

@ -519,9 +519,6 @@ class WALParser final : public rocksdb::WriteBatch::Handler {
// observing a specific log entry and a sequence of immediately // observing a specific log entry and a sequence of immediately
// following PUT / DELETE / Log entries // following PUT / DELETE / Log entries
void resetTransientState() { void resetTransientState() {
if (_state == TRANSACTION) {
writeCommitMarker();
}
// reset all states // reset all states
_state = INVALID; _state = INVALID;
_currentTrxId = 0; _currentTrxId = 0;

View File

@ -94,17 +94,6 @@ void RocksDBRestReplicationHandler::handleCommandBatch() {
b.add("lastTick", VPackValue(std::to_string(ctx->lastTick()))); b.add("lastTick", VPackValue(std::to_string(ctx->lastTick())));
b.close(); b.close();
if (serverId == 0) {
serverId = ctx->id();
}
// we are inserting the current tick (WAL sequence number) here.
// this is ok because the batch creation is the first operation done
// for initial synchronization. the inventory request and collection
// dump requests will all happen after the batch creation, so the
// current tick value here is good
_vocbase.updateReplicationClient(serverId, ctx->lastTick(), ttl);
generateResult(rest::ResponseCode::OK, b.slice()); generateResult(rest::ResponseCode::OK, b.slice());
return; return;
} }
@ -125,16 +114,8 @@ void RocksDBRestReplicationHandler::handleCommandBatch() {
// extract ttl. Context uses initial ttl from batch creation, if `ttl == 0` // extract ttl. Context uses initial ttl from batch creation, if `ttl == 0`
double ttl = VelocyPackHelper::getNumericValue<double>(input->slice(), "ttl", 0); double ttl = VelocyPackHelper::getNumericValue<double>(input->slice(), "ttl", 0);
int res = TRI_ERROR_NO_ERROR; int res = _manager->extendLifetime(id, ttl);
bool busy; if (res != TRI_ERROR_NO_ERROR) {
RocksDBReplicationContext* ctx = _manager->find(id, busy, false, ttl);
RocksDBReplicationContextGuard guard(_manager, ctx);
if (busy) {
res = TRI_ERROR_CURSOR_BUSY;
generateError(GeneralResponse::responseCode(res), res);
return;
} else if (ctx == nullptr) {
res = TRI_ERROR_CURSOR_NOT_FOUND;
generateError(GeneralResponse::responseCode(res), res); generateError(GeneralResponse::responseCode(res), res);
return; return;
} }
@ -146,7 +127,7 @@ void RocksDBRestReplicationHandler::handleCommandBatch() {
LOG_TOPIC(DEBUG, Logger::ENGINES) << "no serverId parameter found in request to " << _request->fullUrl(); LOG_TOPIC(DEBUG, Logger::ENGINES) << "no serverId parameter found in request to " << _request->fullUrl();
} }
TRI_server_id_t serverId = ctx->id(); TRI_server_id_t serverId = id; // just use context id as fallback
if (!value.empty() && value != "none") { if (!value.empty() && value != "none") {
serverId = static_cast<TRI_server_id_t>(StringUtils::uint64(value)); serverId = static_cast<TRI_server_id_t>(StringUtils::uint64(value));
} }
@ -154,7 +135,7 @@ void RocksDBRestReplicationHandler::handleCommandBatch() {
// last tick value in context should not have changed compared to the // last tick value in context should not have changed compared to the
// initial tick value used in the context (it's only updated on bind() // initial tick value used in the context (it's only updated on bind()
// call, which is only executed when a batch is initially created) // call, which is only executed when a batch is initially created)
_vocbase.updateReplicationClient(serverId, ctx->lastTick(), ttl); _vocbase.updateReplicationClient(serverId, ttl);
resetResponse(rest::ResponseCode::NO_CONTENT); resetResponse(rest::ResponseCode::NO_CONTENT);
return; return;
@ -180,6 +161,7 @@ void RocksDBRestReplicationHandler::handleCommandBatch() {
TRI_ERROR_HTTP_METHOD_NOT_ALLOWED); TRI_ERROR_HTTP_METHOD_NOT_ALLOWED);
} }
// handled by the batch for rocksdb
void RocksDBRestReplicationHandler::handleCommandBarrier() { void RocksDBRestReplicationHandler::handleCommandBarrier() {
auto const type = _request->requestType(); auto const type = _request->requestType();
if (type == rest::RequestType::POST) { if (type == rest::RequestType::POST) {
@ -218,7 +200,6 @@ void RocksDBRestReplicationHandler::handleCommandLoggerFollow() {
// determine end tick for dump // determine end tick for dump
std::string const& value2 = _request->value("to", found); std::string const& value2 = _request->value("to", found);
if (found) { if (found) {
tickEnd = static_cast<TRI_voc_tick_t>(StringUtils::uint64(value2)); tickEnd = static_cast<TRI_voc_tick_t>(StringUtils::uint64(value2));
} }
@ -237,18 +218,9 @@ void RocksDBRestReplicationHandler::handleCommandLoggerFollow() {
serverId = static_cast<TRI_server_id_t>(StringUtils::uint64(value3)); serverId = static_cast<TRI_server_id_t>(StringUtils::uint64(value3));
} }
bool includeSystem = true; bool includeSystem = _request->parsedValue("includeSystem", true);
std::string const& value4 = _request->value("includeSystem", found); // TODO: determine good default value?
uint64_t chunkSize = _request->parsedValue<uint64_t>("chunkSize", 1024 * 1024);
if (found) {
includeSystem = StringUtils::boolean(value4);
}
size_t chunkSize = 1024 * 1024; // TODO: determine good default value?
std::string const& value5 = _request->value("chunkSize", found);
if (found) {
chunkSize = static_cast<size_t>(StringUtils::uint64(value5));
}
grantTemporaryRights(); grantTemporaryRights();
@ -272,8 +244,8 @@ void RocksDBRestReplicationHandler::handleCommandLoggerFollow() {
builder.openArray(); builder.openArray();
auto result = tailWal( auto result = tailWal(&_vocbase, tickStart, tickEnd, static_cast<size_t>(chunkSize),
&_vocbase, tickStart, tickEnd, chunkSize, includeSystem, cid, builder includeSystem, cid, builder
); );
builder.close(); builder.close();
@ -311,7 +283,7 @@ void RocksDBRestReplicationHandler::handleCommandLoggerFollow() {
StringUtils::itoa((length == 0) ? 0 : result.maxTick())); StringUtils::itoa((length == 0) ? 0 : result.maxTick()));
_response->setHeaderNC(StaticStrings::ReplicationHeaderLastTick, StringUtils::itoa(latest)); _response->setHeaderNC(StaticStrings::ReplicationHeaderLastTick, StringUtils::itoa(latest));
_response->setHeaderNC(StaticStrings::ReplicationHeaderLastScanned, StringUtils::itoa(result.lastScannedTick())); _response->setHeaderNC(StaticStrings::ReplicationHeaderLastScanned, StringUtils::itoa(result.lastScannedTick()));
_response->setHeaderNC(StaticStrings::ReplicationHeaderActive, "true"); _response->setHeaderNC(StaticStrings::ReplicationHeaderActive, "true"); // TODO remove
_response->setHeaderNC(StaticStrings::ReplicationHeaderFromPresent, _response->setHeaderNC(StaticStrings::ReplicationHeaderFromPresent,
result.minTickIncluded() ? "true" : "false"); result.minTickIncluded() ? "true" : "false");

View File

@ -302,6 +302,7 @@ arangodb::Result RocksDBSettingsManager::setAbsoluteCounter(uint64_t objectId,
WRITE_LOCKER(guard, _rwLock); WRITE_LOCKER(guard, _rwLock);
auto it = _counters.find(objectId); auto it = _counters.find(objectId);
if (it != _counters.end()) { if (it != _counters.end()) {
LOG_TOPIC(DEBUG, Logger::ROCKSDB) << "resetting counter value to " << value;
it->second._sequenceNum = std::max(seq, it->second._sequenceNum); it->second._sequenceNum = std::max(seq, it->second._sequenceNum);
it->second._added = value; it->second._added = value;
it->second._removed = 0; it->second._removed = 0;

View File

@ -72,8 +72,7 @@ TRI_voc_tick_t RocksDBWalAccess::lastTick() const {
/// should return the list of transactions started, but not committed in that /// should return the list of transactions started, but not committed in that
/// range (range can be adjusted) /// range (range can be adjusted)
WalAccessResult RocksDBWalAccess::openTransactions( WalAccessResult RocksDBWalAccess::openTransactions(
uint64_t tickStart, uint64_t tickEnd, WalAccess::Filter const& filter, WalAccess::Filter const& filter, TransactionCallback const&) const {
TransactionCallback const&) const {
return WalAccessResult(TRI_ERROR_NO_ERROR, true, 0, 0, 0); return WalAccessResult(TRI_ERROR_NO_ERROR, true, 0, 0, 0);
} }
@ -81,7 +80,7 @@ WalAccessResult RocksDBWalAccess::openTransactions(
/// can potentially be batched into the same rocksdb write batch /// can potentially be batched into the same rocksdb write batch
/// but transactions can never be interleaved with operations /// but transactions can never be interleaved with operations
/// outside of the transaction /// outside of the transaction
class MyWALParser : public rocksdb::WriteBatch::Handler, public WalAccessContext { class MyWALDumper final : public rocksdb::WriteBatch::Handler, public WalAccessContext {
// internal WAL parser states // internal WAL parser states
enum State : char { enum State : char {
@ -101,15 +100,30 @@ class MyWALParser : public rocksdb::WriteBatch::Handler, public WalAccessContext
}; };
public: public:
MyWALParser(WalAccess::Filter const& filter, MyWALDumper(WalAccess::Filter const& filter,
WalAccess::MarkerCallback const& f) WalAccess::MarkerCallback const& f,
size_t maxResponseSize)
: WalAccessContext(filter, f), : WalAccessContext(filter, f),
_definitionsCF(RocksDBColumnFamily::definitions()->GetID()), _definitionsCF(RocksDBColumnFamily::definitions()->GetID()),
_documentsCF(RocksDBColumnFamily::documents()->GetID()), _documentsCF(RocksDBColumnFamily::documents()->GetID()),
_primaryCF(RocksDBColumnFamily::primary()->GetID()), _primaryCF(RocksDBColumnFamily::primary()->GetID()),
_maxResponseSize(maxResponseSize),
_startSequence(0), _startSequence(0),
_currentSequence(0) {} _currentSequence(0),
_lastWrittenSequence(0) {}
bool Continue() override {
if (_responseSize > _maxResponseSize) {
// it should only be possible to be in the middle of a huge batch,
// if and only if we are in one big transaction. We may not stop
// while
if (_state == TRANSACTION && _removedDocRid == 0) {
return false;
}
}
return true;
}
void LogData(rocksdb::Slice const& blob) override { void LogData(rocksdb::Slice const& blob) override {
// rocksdb does not count LogData towards sequence-number // rocksdb does not count LogData towards sequence-number
RocksDBLogType type = RocksDBLogValue::type(blob); RocksDBLogType type = RocksDBLogValue::type(blob);
@ -173,9 +187,7 @@ class MyWALParser : public rocksdb::WriteBatch::Handler, public WalAccessContext
marker->add("cuid", VPackValuePair(uuid.data(), uuid.size(), marker->add("cuid", VPackValuePair(uuid.data(), uuid.size(),
VPackValueType::String)); VPackValueType::String));
} }
_callback(vocbase, _builder.slice()); printMarker(vocbase);
_responseSize += _builder.size();
_builder.clear();
} }
} }
break; break;
@ -196,9 +208,7 @@ class MyWALParser : public rocksdb::WriteBatch::Handler, public WalAccessContext
marker->add("db", VPackValue(vocbase->name())); marker->add("db", VPackValue(vocbase->name()));
marker->add("cuid", VPackValue(coll->guid())); marker->add("cuid", VPackValue(coll->guid()));
} }
_callback(vocbase, _builder.slice()); printMarker(vocbase);
_responseSize += _builder.size();
_builder.clear();
} }
break; break;
} }
@ -218,7 +228,6 @@ class MyWALParser : public rocksdb::WriteBatch::Handler, public WalAccessContext
{ {
uint64_t tick = _currentSequence + (_startOfBatch ? 0 : 1); uint64_t tick = _currentSequence + (_startOfBatch ? 0 : 1);
VPackObjectBuilder marker(&_builder, true); VPackObjectBuilder marker(&_builder, true);
marker->add("tick", VPackValue(std::to_string(tick))); marker->add("tick", VPackValue(std::to_string(tick)));
marker->add("type", VPackValue(rocksutils::convertLogType(type))); marker->add("type", VPackValue(rocksutils::convertLogType(type)));
marker->add("db", VPackValue(vocbase->name())); marker->add("db", VPackValue(vocbase->name()));
@ -226,9 +235,7 @@ class MyWALParser : public rocksdb::WriteBatch::Handler, public WalAccessContext
marker->add("data", stripped.first); marker->add("data", stripped.first);
} }
_callback(vocbase, _builder.slice()); printMarker(vocbase);
_responseSize += _builder.size();
_builder.clear();
} }
break; break;
@ -249,7 +256,6 @@ class MyWALParser : public rocksdb::WriteBatch::Handler, public WalAccessContext
{ {
uint64_t tick = _currentSequence + (_startOfBatch ? 0 : 1); uint64_t tick = _currentSequence + (_startOfBatch ? 0 : 1);
VPackObjectBuilder marker(&_builder, true); VPackObjectBuilder marker(&_builder, true);
marker->add("tick", VPackValue(std::to_string(tick))); marker->add("tick", VPackValue(std::to_string(tick)));
marker->add("type", VPackValue(rocksutils::convertLogType(type))); marker->add("type", VPackValue(rocksutils::convertLogType(type)));
marker->add("db", VPackValue(vocbase->name())); marker->add("db", VPackValue(vocbase->name()));
@ -260,9 +266,7 @@ class MyWALParser : public rocksdb::WriteBatch::Handler, public WalAccessContext
data->add("id", VPackValue(std::to_string(iid))); data->add("id", VPackValue(std::to_string(iid)));
} }
_callback(vocbase, _builder.slice()); printMarker(vocbase);
_responseSize += _builder.size();
_builder.clear();
} }
break; break;
} }
@ -293,9 +297,8 @@ class MyWALParser : public rocksdb::WriteBatch::Handler, public WalAccessContext
marker->add("cuid", VPackValuePair(uuid.data(), uuid.size(), marker->add("cuid", VPackValuePair(uuid.data(), uuid.size(),
VPackValueType::String)); VPackValueType::String));
} }
_callback(vocbase, _builder.slice());
_responseSize += _builder.size(); printMarker(vocbase);
_builder.clear();
} }
} }
// wait for marker data in Put entry // wait for marker data in Put entry
@ -328,9 +331,7 @@ class MyWALParser : public rocksdb::WriteBatch::Handler, public WalAccessContext
marker->add("db", VPackValue(vocbase->name())); marker->add("db", VPackValue(vocbase->name()));
marker->add("tid", VPackValue(std::to_string(_currentTrxId))); marker->add("tid", VPackValue(std::to_string(_currentTrxId)));
} }
_callback(vocbase, _builder.slice()); printMarker(vocbase);
_responseSize += _builder.size();
_builder.clear();
} }
} }
break; break;
@ -400,7 +401,8 @@ class MyWALParser : public rocksdb::WriteBatch::Handler, public WalAccessContext
rocksdb::Status PutCF(uint32_t column_family_id, rocksdb::Slice const& key, rocksdb::Status PutCF(uint32_t column_family_id, rocksdb::Slice const& key,
rocksdb::Slice const& value) override { rocksdb::Slice const& value) override {
tick(); incTick();
//LOG_TOPIC(ERR, Logger::ENGINES) << "[PUT] cf: " << column_family_id //LOG_TOPIC(ERR, Logger::ENGINES) << "[PUT] cf: " << column_family_id
// << ", key:" << key.ToString() << " value: " << value.ToString(); // << ", key:" << key.ToString() << " value: " << value.ToString();
@ -424,9 +426,7 @@ class MyWALParser : public rocksdb::WriteBatch::Handler, public WalAccessContext
marker->add("db", name); marker->add("db", name);
marker->add("data", data); marker->add("data", data);
} }
_callback(vocbase, _builder.slice()); printMarker(vocbase);
_responseSize += _builder.size();
_builder.clear();
} }
} else if (_state == DB_DROP) { } else if (_state == DB_DROP) {
// prepareDropDatabase should always write entry // prepareDropDatabase should always write entry
@ -438,9 +438,7 @@ class MyWALParser : public rocksdb::WriteBatch::Handler, public WalAccessContext
marker->add("type", VPackValue(REPLICATION_DATABASE_DROP)); marker->add("type", VPackValue(REPLICATION_DATABASE_DROP));
marker->add("db", name); marker->add("db", name);
} }
_callback(loadVocbase(dbid), _builder.slice()); printMarker(loadVocbase(dbid));
_responseSize += _builder.size();
_builder.clear();
} // ignore Put in any other case } // ignore Put in any other case
} else if (RocksDBKey::type(key) == RocksDBEntryType::Collection) { } else if (RocksDBKey::type(key) == RocksDBEntryType::Collection) {
TRI_voc_tick_t dbid = RocksDBKey::databaseId(key); TRI_voc_tick_t dbid = RocksDBKey::databaseId(key);
@ -456,33 +454,25 @@ class MyWALParser : public rocksdb::WriteBatch::Handler, public WalAccessContext
{ {
VPackSlice collectionDef = RocksDBValue::data(value); VPackSlice collectionDef = RocksDBValue::data(value);
VPackObjectBuilder marker(&_builder, true); VPackObjectBuilder marker(&_builder, true);
marker->add("tick", VPackValue(std::to_string(_currentSequence))); marker->add("tick", VPackValue(std::to_string(_currentSequence)));
marker->add("db", VPackValue(vocbase->name())); marker->add("db", VPackValue(vocbase->name()));
marker->add("cuid", VPackValue(col->guid())); marker->add("cuid", VPackValue(col->guid()));
if (_state == COLLECTION_CREATE) { if (_state == COLLECTION_CREATE) {
auto stripped = rocksutils::stripObjectIds(collectionDef); auto stripped = rocksutils::stripObjectIds(collectionDef);
marker->add("type", VPackValue(REPLICATION_COLLECTION_CREATE)); marker->add("type", VPackValue(REPLICATION_COLLECTION_CREATE));
marker->add("data", stripped.first); marker->add("data", stripped.first);
} else if (_state == COLLECTION_RENAME) { } else if (_state == COLLECTION_RENAME) {
marker->add("type", VPackValue(REPLICATION_COLLECTION_RENAME)); marker->add("type", VPackValue(REPLICATION_COLLECTION_RENAME));
VPackObjectBuilder data(&_builder, "data", true); VPackObjectBuilder data(&_builder, "data", true);
data->add("name", VPackValue(col->name())); data->add("name", VPackValue(col->name()));
} else if (_state == COLLECTION_CHANGE) { } else if (_state == COLLECTION_CHANGE) {
auto stripped = rocksutils::stripObjectIds(collectionDef); auto stripped = rocksutils::stripObjectIds(collectionDef);
marker->add("type", VPackValue(REPLICATION_COLLECTION_CHANGE)); marker->add("type", VPackValue(REPLICATION_COLLECTION_CHANGE));
marker->add("data", stripped.first); marker->add("data", stripped.first);
} }
} }
printMarker(vocbase);
_callback(vocbase, _builder.slice());
_responseSize += _builder.size();
_builder.clear();
} }
} else if (RocksDBKey::type(key) == RocksDBEntryType::View) { } else if (RocksDBKey::type(key) == RocksDBEntryType::View) {
@ -509,10 +499,7 @@ class MyWALParser : public rocksdb::WriteBatch::Handler, public WalAccessContext
marker->add("type", VPackValue(REPLICATION_VIEW_CHANGE)); marker->add("type", VPackValue(REPLICATION_VIEW_CHANGE));
} }
} }
printMarker(vocbase);
_callback(vocbase, _builder.slice());
_responseSize += _builder.size();
_builder.clear();
} }
} }
} }
@ -548,7 +535,6 @@ class MyWALParser : public rocksdb::WriteBatch::Handler, public WalAccessContext
{ {
VPackObjectBuilder marker(&_builder, true); VPackObjectBuilder marker(&_builder, true);
marker->add("tick", VPackValue(std::to_string(_currentSequence))); marker->add("tick", VPackValue(std::to_string(_currentSequence)));
marker->add("type", VPackValue(REPLICATION_MARKER_DOCUMENT)); marker->add("type", VPackValue(REPLICATION_MARKER_DOCUMENT));
marker->add("db", VPackValue(vocbase->name())); marker->add("db", VPackValue(vocbase->name()));
@ -557,10 +543,7 @@ class MyWALParser : public rocksdb::WriteBatch::Handler, public WalAccessContext
marker->add("data", RocksDBValue::data(value)); marker->add("data", RocksDBValue::data(value));
} }
_callback(vocbase, _builder.slice()); printMarker(vocbase);
_responseSize += _builder.size();
_builder.clear();
if (_state == SINGLE_PUT) { if (_state == SINGLE_PUT) {
resetTransientState(); // always reset after single op resetTransientState(); // always reset after single op
} }
@ -571,7 +554,7 @@ class MyWALParser : public rocksdb::WriteBatch::Handler, public WalAccessContext
rocksdb::Status DeleteCF(uint32_t column_family_id, rocksdb::Status DeleteCF(uint32_t column_family_id,
rocksdb::Slice const& key) override { rocksdb::Slice const& key) override {
tick(); incTick();
if (column_family_id != _primaryCF) { if (column_family_id != _primaryCF) {
return rocksdb::Status(); // ignore all document operations return rocksdb::Status(); // ignore all document operations
@ -589,7 +572,6 @@ class MyWALParser : public rocksdb::WriteBatch::Handler, public WalAccessContext
if (!shouldHandleCollection(dbid, cid)) { if (!shouldHandleCollection(dbid, cid)) {
_removedDocRid = 0; // ignore rid too _removedDocRid = 0; // ignore rid too
return rocksdb::Status(); // no reset here return rocksdb::Status(); // no reset here
} }
@ -602,7 +584,6 @@ class MyWALParser : public rocksdb::WriteBatch::Handler, public WalAccessContext
{ {
VPackObjectBuilder marker(&_builder, true); VPackObjectBuilder marker(&_builder, true);
marker->add("tick", VPackValue(std::to_string(_currentSequence))); marker->add("tick", VPackValue(std::to_string(_currentSequence)));
marker->add("type", VPackValue(REPLICATION_MARKER_REMOVE)); marker->add("type", VPackValue(REPLICATION_MARKER_REMOVE));
marker->add("db", VPackValue(vocbase->name())); marker->add("db", VPackValue(vocbase->name()));
@ -610,15 +591,12 @@ class MyWALParser : public rocksdb::WriteBatch::Handler, public WalAccessContext
marker->add("tid", VPackValue(std::to_string(_currentTrxId))); marker->add("tid", VPackValue(std::to_string(_currentTrxId)));
VPackObjectBuilder data(&_builder, "data", true); VPackObjectBuilder data(&_builder, "data", true);
data->add(StaticStrings::KeyString, VPackValuePair(docKey.data(), docKey.size(), data->add(StaticStrings::KeyString, VPackValuePair(docKey.data(), docKey.size(),
VPackValueType::String)); VPackValueType::String));
data->add(StaticStrings::RevString, VPackValue(TRI_RidToString(_removedDocRid))); data->add(StaticStrings::RevString, VPackValue(TRI_RidToString(_removedDocRid)));
} }
_callback(vocbase, _builder.slice()); printMarker(vocbase);
_responseSize += _builder.size();
_builder.clear();
_removedDocRid = 0; // always reset _removedDocRid = 0; // always reset
if (_state == SINGLE_REMOVE) { if (_state == SINGLE_REMOVE) {
@ -631,9 +609,29 @@ class MyWALParser : public rocksdb::WriteBatch::Handler, public WalAccessContext
rocksdb::Status DeleteRangeCF(uint32_t /*column_family_id*/, rocksdb::Status DeleteRangeCF(uint32_t /*column_family_id*/,
const rocksdb::Slice& /*begin_key*/, const rocksdb::Slice& /*begin_key*/,
const rocksdb::Slice& /*end_key*/) override { const rocksdb::Slice& /*end_key*/) override {
// drop and truncate may use this, but we do not look at these incTick();
// drop and truncate may use this, but we do not print anything
return rocksdb::Status(); // make WAL iterator happy return rocksdb::Status(); // make WAL iterator happy
} }
rocksdb::Status MergeCF(uint32_t, const rocksdb::Slice&,
const rocksdb::Slice&) override {
incTick();
// not used for anything in ArangoDB currently
return rocksdb::Status(); // make WAL iterator happy
}
public:
/// figures out from which sequence number we need to start scanning
/// if we just use tickStart rocksdb will skip over batches we might
/// not have completely evaluated
uint64_t safeBeginTick() const {
if (_filter.tickLastScanned > 0 && _filter.tickLastScanned < _filter.tickStart) {
return _filter.tickLastScanned;
}
return _filter.tickStart;
}
void startNewBatch(rocksdb::SequenceNumber startSequence) { void startNewBatch(rocksdb::SequenceNumber startSequence) {
// starting new write batch // starting new write batch
@ -645,6 +643,18 @@ class MyWALParser : public rocksdb::WriteBatch::Handler, public WalAccessContext
_trxDbId = 0; _trxDbId = 0;
_removedDocRid = 0; _removedDocRid = 0;
} }
uint64_t endBatch() {
TRI_ASSERT(_removedDocRid == 0);
resetTransientState();
return _currentSequence;
}
size_t responseSize() const { return _responseSize; }
uint64_t lastWrittenSequence() const { return _lastWrittenSequence; }
private:
void writeCommitMarker(TRI_voc_tick_t dbid) { void writeCommitMarker(TRI_voc_tick_t dbid) {
TRI_ASSERT(_state == TRANSACTION); TRI_ASSERT(_state == TRANSACTION);
@ -656,39 +666,35 @@ class MyWALParser : public rocksdb::WriteBatch::Handler, public WalAccessContext
_builder.add("db", VPackValue(vocbase->name())); _builder.add("db", VPackValue(vocbase->name()));
_builder.add("tid", VPackValue(std::to_string(_currentTrxId))); _builder.add("tid", VPackValue(std::to_string(_currentTrxId)));
_builder.close(); _builder.close();
_callback(vocbase, _builder.slice()); printMarker(vocbase);
_responseSize += _builder.size();
_builder.clear();
} }
_state = INVALID; _state = INVALID;
} }
/// print maker in builder and clear it
void printMarker(TRI_vocbase_t* vocbase) {
TRI_ASSERT(!_builder.isEmpty());
if (_currentSequence > _filter.tickStart) {
_callback(vocbase, _builder.slice());
_responseSize += _builder.size();
_lastWrittenSequence = _currentSequence;
}
_builder.clear();
}
// should reset state flags which are only valid between // should reset state flags which are only valid between
// observing a specific log entry and a sequence of immediately // observing a specific log entry and a sequence of immediately
// following PUT / DELETE / Log entries // following PUT / DELETE / Log entries
void resetTransientState() { void resetTransientState() {
if (_state == TRANSACTION) {
writeCommitMarker(_trxDbId);
}
// reset all states // reset all states
_state = INVALID; _state = INVALID;
_currentTrxId = 0; _currentTrxId = 0;
_trxDbId = 0; _trxDbId = 0;
_removedDocRid = 0; _removedDocRid = 0;
} }
uint64_t endBatch() {
TRI_ASSERT(_removedDocRid == 0);
resetTransientState();
return _currentSequence;
}
size_t responseSize() const { return _responseSize; }
private:
// tick function that is called before each new WAL entry // tick function that is called before each new WAL entry
void tick() { void incTick() {
if (_startOfBatch) { if (_startOfBatch) {
// we are at the start of a batch. do NOT increase sequence number // we are at the start of a batch. do NOT increase sequence number
_startOfBatch = false; _startOfBatch = false;
@ -702,9 +708,11 @@ class MyWALParser : public rocksdb::WriteBatch::Handler, public WalAccessContext
uint32_t const _definitionsCF; uint32_t const _definitionsCF;
uint32_t const _documentsCF; uint32_t const _documentsCF;
uint32_t const _primaryCF; uint32_t const _primaryCF;
size_t const _maxResponseSize;
rocksdb::SequenceNumber _startSequence; rocksdb::SequenceNumber _startSequence;
rocksdb::SequenceNumber _currentSequence; rocksdb::SequenceNumber _currentSequence;
rocksdb::SequenceNumber _lastWrittenSequence;
bool _startOfBatch = false; bool _startOfBatch = false;
// Various state machine flags // Various state machine flags
@ -716,46 +724,47 @@ class MyWALParser : public rocksdb::WriteBatch::Handler, public WalAccessContext
// iterates over WAL starting at 'from' and returns up to 'chunkSize' documents // iterates over WAL starting at 'from' and returns up to 'chunkSize' documents
// from the corresponding database // from the corresponding database
WalAccessResult RocksDBWalAccess::tail(uint64_t tickStart, uint64_t tickEnd, WalAccessResult RocksDBWalAccess::tail(Filter const& filter, size_t chunkSize,
size_t chunkSize, TRI_voc_tick_t, MarkerCallback const& func) const {
TRI_voc_tick_t,
Filter const& filter,
MarkerCallback const& func) const {
TRI_ASSERT(filter.transactionIds.empty()); // not supported in any way TRI_ASSERT(filter.transactionIds.empty()); // not supported in any way
/*LOG_TOPIC(WARN, Logger::ENGINES) << "1. Starting tailing: tickStart " << /*LOG_TOPIC(WARN, Logger::ENGINES) << "1. Starting tailing: tickStart " <<
tickStart << " tickEnd " << tickEnd << " chunkSize " << chunkSize;//*/ tickStart << " tickEnd " << tickEnd << " chunkSize " << chunkSize;//*/
rocksdb::TransactionDB* db = rocksutils::globalRocksDB(); rocksdb::TransactionDB* db = rocksutils::globalRocksDB();
uint64_t firstTick = UINT64_MAX; // first tick actually read
uint64_t lastTick = tickStart; // lastTick at start of a write batch if (chunkSize < 16384) { // we need to have some sensible minimum
uint64_t lastScannedTick = tickStart; // last tick we looked at chunkSize = 16384;
uint64_t lastWrittenTick = 0; // lastTick at the end of a write batch }
// pre 3.4 breaking up write batches is not supported
size_t maxTrxChunkSize = filter.tickLastScanned > 0 ? chunkSize : SIZE_MAX;
MyWALDumper dumper(filter, func, maxTrxChunkSize);
const uint64_t since = dumper.safeBeginTick();
TRI_ASSERT(since <= filter.tickStart);
TRI_ASSERT(since <= filter.tickEnd);
uint64_t firstTick = UINT64_MAX; // first tick to actually print (exclusive)
uint64_t lastScannedTick = since; // last (begin) tick of batch we looked at
uint64_t lastWrittenTick = 0; // lastTick at the end of a write batch
uint64_t latestTick = db->GetLatestSequenceNumber(); uint64_t latestTick = db->GetLatestSequenceNumber();
MyWALParser handler(filter, func);
std::unique_ptr<rocksdb::TransactionLogIterator> iterator; // reader(); std::unique_ptr<rocksdb::TransactionLogIterator> iterator; // reader();
rocksdb::Status s;
// no need verifying the WAL contents // no need verifying the WAL contents
rocksdb::TransactionLogIterator::ReadOptions ro(false); rocksdb::TransactionLogIterator::ReadOptions ro(false);
s = db->GetUpdatesSince(tickStart, &iterator, ro); rocksdb::Status s = db->GetUpdatesSince(since, &iterator, ro);
if (!s.ok()) { if (!s.ok()) {
Result r = convertStatus(s, rocksutils::StatusHint::wal); Result r = convertStatus(s, rocksutils::StatusHint::wal);
return WalAccessResult(r.errorNumber(), tickStart == latestTick, return WalAccessResult(r.errorNumber(), filter.tickStart == latestTick,
0, 0, latestTick); 0, 0, latestTick);
} }
if (chunkSize < 16384) {
// we need to have some sensible minimum
chunkSize = 16384;
}
// we need to check if the builder is bigger than the chunksize, // we need to check if the builder is bigger than the chunksize,
// only after we printed a full WriteBatch. Otherwise a client might // only after we printed a full WriteBatch. Otherwise a client might
// never read the full writebatch // never read the full writebatch
LOG_TOPIC(DEBUG, Logger::ENGINES) << "WAL tailing call. tick start: " << tickStart << ", tick end: " << tickEnd << ", chunk size: " << chunkSize; LOG_TOPIC(DEBUG, Logger::ENGINES) << "WAL tailing call. Scan since: " << since
while (iterator->Valid() && lastTick <= tickEnd && << ", tick start: " << filter.tickStart
handler.responseSize() < chunkSize) { << ", tick end: " << filter.tickEnd << ", chunk size: " << chunkSize;
while (iterator->Valid() && lastScannedTick <= filter.tickEnd) {
s = iterator->status(); s = iterator->status();
if (!s.ok()) { if (!s.ok()) {
LOG_TOPIC(ERR, Logger::ENGINES) << "error during WAL scan: " LOG_TOPIC(ERR, Logger::ENGINES) << "error during WAL scan: "
@ -768,34 +777,40 @@ WalAccessResult RocksDBWalAccess::tail(uint64_t tickStart, uint64_t tickEnd,
if (firstTick == UINT64_MAX) { if (firstTick == UINT64_MAX) {
firstTick = batch.sequence; firstTick = batch.sequence;
} }
if (batch.sequence <= tickEnd) {
lastScannedTick = batch.sequence; if (batch.sequence > filter.tickEnd) {
}
//LOG_TOPIC(INFO, Logger::ENGINES) << "found batch-seq: " << batch.sequence;
lastTick = batch.sequence; // start of the batch
if (batch.sequence <= tickStart) {
iterator->Next(); // skip
continue;
} else if (batch.sequence > tickEnd) {
break; // cancel out break; // cancel out
} }
handler.startNewBatch(batch.sequence); //LOG_TOPIC(INFO, Logger::ENGINES) << "found batch-seq: " << batch.sequence;
s = batch.writeBatchPtr->Iterate(&handler); lastScannedTick = batch.sequence; // start of the batch
if (batch.sequence < since) {
iterator->Next(); // skip
continue;
}
dumper.startNewBatch(batch.sequence);
s = batch.writeBatchPtr->Iterate(&dumper);
if (!s.ok()) { if (!s.ok()) {
LOG_TOPIC(ERR, Logger::ENGINES) << "error during WAL scan: " LOG_TOPIC(ERR, Logger::ENGINES) << "error during WAL scan: "
<< s.ToString(); << s.ToString();
break; // s is considered in the end break; // s is considered in the end
} }
lastWrittenTick = handler.endBatch(); // end of the batch
uint64_t batchEndSeq = dumper.endBatch(); // end tick of the batch
lastWrittenTick = dumper.lastWrittenSequence(); // 0 if no marker was written
TRI_ASSERT(batchEndSeq >= lastScannedTick);
if (dumper.responseSize() >= chunkSize) { // break if response gets big
break;
}
// we need to set this here again, to avoid re-scanning WriteBatches
lastScannedTick = batchEndSeq; // do not remove, tailing take forever
TRI_ASSERT(lastWrittenTick >= lastTick);
iterator->Next(); iterator->Next();
} }
WalAccessResult result(TRI_ERROR_NO_ERROR, firstTick <= tickStart, WalAccessResult result(TRI_ERROR_NO_ERROR, firstTick <= filter.tickStart,
lastWrittenTick, lastScannedTick, latestTick); lastWrittenTick, lastScannedTick, latestTick);
if (!s.ok()) { if (!s.ok()) {
result.Result::reset(convertStatus(s, rocksutils::StatusHint::wal)); result.Result::reset(convertStatus(s, rocksutils::StatusHint::wal));

View File

@ -50,14 +50,12 @@ class RocksDBWalAccess final : public WalAccess {
/// should return the list of transactions started, but not committed in that /// should return the list of transactions started, but not committed in that
/// range (range can be adjusted) /// range (range can be adjusted)
WalAccessResult openTransactions(uint64_t tickStart, uint64_t tickEnd, WalAccessResult openTransactions(WalAccess::Filter const& filter,
WalAccess::Filter const& filter,
TransactionCallback const&) const override; TransactionCallback const&) const override;
/// Tails the wall, this will already sanitize the /// Tails the wall, this will already sanitize the
WalAccessResult tail(uint64_t tickStart, uint64_t tickEnd, size_t chunkSize, WalAccessResult tail(WalAccess::Filter const& filter, size_t chunkSize,
TRI_voc_tick_t barrierId, TRI_voc_tick_t barrierId,
WalAccess::Filter const& filter,
MarkerCallback const&) const override; MarkerCallback const&) const override;
}; };
} }

View File

@ -84,6 +84,16 @@ class WalAccess {
public: public:
struct Filter { struct Filter {
Filter() {} Filter() {}
/// tick last scanned by the last iteration
/// is used to find batches in rocksdb
uint64_t tickLastScanned = 0;
/// first tick to use
uint64_t tickStart = 0;
/// last tick to include
uint64_t tickEnd = UINT64_MAX;
/// In case collection is == 0, /// In case collection is == 0,
bool includeSystem = false; bool includeSystem = false;
@ -121,13 +131,11 @@ class WalAccess {
/// should return the list of transactions started, but not committed in that /// should return the list of transactions started, but not committed in that
/// range (range can be adjusted) /// range (range can be adjusted)
virtual WalAccessResult openTransactions( virtual WalAccessResult openTransactions(Filter const& filter,
uint64_t tickStart, uint64_t tickEnd, Filter const& filter,
TransactionCallback const&) const = 0; TransactionCallback const&) const = 0;
virtual WalAccessResult tail(uint64_t tickStart, uint64_t tickEnd, virtual WalAccessResult tail(Filter const& filter,
size_t chunkSize, TRI_voc_tid_t barrierId, size_t chunkSize, TRI_voc_tid_t barrierId,
Filter const& filter,
MarkerCallback const&) const = 0; MarkerCallback const&) const = 0;
}; };
@ -165,7 +173,7 @@ struct WalAccessContext {
public: public:
/// @brief arbitrary collection filter (inclusive) /// @brief arbitrary collection filter (inclusive)
WalAccess::Filter _filter; const WalAccess::Filter _filter;
/// @brief callback for marker output /// @brief callback for marker output
WalAccess::MarkerCallback _callback; WalAccess::MarkerCallback _callback;

View File

@ -248,7 +248,7 @@ std::string const& GeneralRequest::header(std::string const& key) const {
} }
std::string const& GeneralRequest::value(std::string const& key, std::string const& GeneralRequest::value(std::string const& key,
bool& found) const { bool& found) const {
if (!_values.empty()) { if (!_values.empty()) {
auto it = _values.find(key); auto it = _values.find(key);

View File

@ -236,7 +236,10 @@ SimpleHttpResult* SimpleHttpClient::doRequest(
std::unordered_map<std::string, std::string> const& headers) { std::unordered_map<std::string, std::string> const& headers) {
// ensure connection has not yet been invalidated // ensure connection has not yet been invalidated
TRI_ASSERT(_connection != nullptr); TRI_ASSERT(_connection != nullptr);
if (_aborted.load(std::memory_order_acquire)) {
return nullptr;
}
// ensure that result is empty // ensure that result is empty
TRI_ASSERT(_result == nullptr); TRI_ASSERT(_result == nullptr);
@ -406,6 +409,8 @@ SimpleHttpResult* SimpleHttpClient::doRequest(
if ( application_features::ApplicationServer::isStopping()) { if ( application_features::ApplicationServer::isStopping()) {
setErrorMessage("Command locally aborted"); setErrorMessage("Command locally aborted");
delete _result;
_result = nullptr;
return nullptr; return nullptr;
} }

View File

@ -108,7 +108,8 @@ function getLoggerState(endpoint) {
} }
}); });
assertTrue(res instanceof request.Response); assertTrue(res instanceof request.Response);
assertTrue(res.hasOwnProperty('statusCode') && res.statusCode === 200); assertTrue(res.hasOwnProperty('statusCode'));
assertEqual(res.statusCode, 200);
assertTrue(res.hasOwnProperty('json')); assertTrue(res.hasOwnProperty('json'));
return arangosh.checkRequestResult(res.json); return arangosh.checkRequestResult(res.json);
} }
@ -312,7 +313,7 @@ function ActiveFailoverSuite() {
assertTrue(currentLead !== oldLead); assertTrue(currentLead !== oldLead);
print("Failover to new leader : ", currentLead); print("Failover to new leader : ", currentLead);
internal.wait(2.5); // settle down, heartbeat interval is 1s internal.wait(5); // settle down, heartbeat interval is 1s
assertEqual(checkData(currentLead), 10000); assertEqual(checkData(currentLead), 10000);
print("New leader has correct data"); print("New leader has correct data");
@ -406,7 +407,7 @@ function ActiveFailoverSuite() {
currentLead = checkForFailover(currentLead); currentLead = checkForFailover(currentLead);
assertTrue(currentLead === nextLead, "Did not fail to best in-sync follower"); assertTrue(currentLead === nextLead, "Did not fail to best in-sync follower");
internal.wait(2.5); // settle down, heartbeat interval is 1s internal.wait(5); // settle down, heartbeat interval is 1s
let cc = checkData(currentLead); let cc = checkData(currentLead);
// we expect to find documents within an acceptable range // we expect to find documents within an acceptable range
assertTrue(10000 <= cc && cc <= upper + 500, "Leader has too little or too many documents"); assertTrue(10000 <= cc && cc <= upper + 500, "Leader has too little or too many documents");

View File

@ -110,7 +110,8 @@ function getLoggerState(endpoint) {
} }
}); });
assertTrue(res instanceof request.Response); assertTrue(res instanceof request.Response);
assertTrue(res.hasOwnProperty('statusCode') && res.statusCode === 200); assertTrue(res.hasOwnProperty('statusCode'));
assertEqual(res.statusCode, 200);
assertTrue(res.hasOwnProperty('json')); assertTrue(res.hasOwnProperty('json'));
return arangosh.checkRequestResult(res.json); return arangosh.checkRequestResult(res.json);
} }
@ -366,7 +367,7 @@ function ActiveFailoverSuite() {
assertTrue(currentLead !== oldLead); assertTrue(currentLead !== oldLead);
print("Failover to new leader : ", currentLead); print("Failover to new leader : ", currentLead);
internal.wait(2.5); // settle down, heartbeat interval is 1s internal.wait(5); // settle down, heartbeat interval is 1s
assertEqual(checkData(currentLead), 10000); assertEqual(checkData(currentLead), 10000);
print("New leader has correct data"); print("New leader has correct data");

View File

@ -1144,6 +1144,97 @@ function ReplicationOtherDBSuite() {
assertTrue(replication.globalApplier.state().state.running); assertTrue(replication.globalApplier.state().state.running);
}; };
suite.testSplitUpLargeTransactions = function() {
// Section - Master
connectToMaster();
// Create the collection
db._flushCache();
db._create(cn);
// Section - Follower
connectToSlave();
// Setup Replication
replication.globalApplier.stop();
replication.globalApplier.forget();
while (replication.globalApplier.state().state.running) {
internal.wait(0.1, false);
}
let config = {
endpoint: masterEndpoint,
username: "root",
password: "",
verbose: true,
includeSystem: false,
restrictType: "",
restrictCollections: [],
keepBarrier: false,
chunkSize: 16384 // small chunksize should split up trxs
};
replication.setupReplicationGlobal(config);
connectToMaster();
let coll = db._collection(cn);
const count = 100000;
let docs = [];
for(let i = 0; i < count; i++) {
if (docs.length > 10000) {
coll.save(docs);
docs = [];
}
docs.push({ value:i });
}
coll.save(docs);
// try to perform another operation afterwards
const cn2 = cn + "Test";
db._create(cn2);
let lastLogTick = replication.logger.state().state.lastLogTick;
// Section - Follower
connectToSlave();
let printed = false;
while (true) {
let slaveState = replication.globalApplier.state();
if (slaveState.state.lastError.errorNum > 0) {
console.log("slave has errored:", JSON.stringify(slaveState.state.lastError));
break;
}
if (!slaveState.state.running) {
console.log("slave is not running");
break;
}
if (compareTicks(slaveState.state.lastAppliedContinuousTick, lastLogTick) >= 0 ||
compareTicks(slaveState.state.lastProcessedContinuousTick, lastLogTick) >= 0) {
console.log("slave has caught up. state.lastLogTick:",
slaveState.state.lastLogTick, "slaveState.lastAppliedContinuousTick:",
slaveState.state.lastAppliedContinuousTick, "slaveState.lastProcessedContinuousTick:",
slaveState.state.lastProcessedContinuousTick);
break;
}
if (!printed) {
console.log("waiting for slave to catch up");
printed = true;
}
internal.wait(0.5, false);
}
// Now we should have the same amount of documents
assertEqual(count, collectionCount(cn));
assertNotNull(db._collection(cn2));
assertTrue(replication.globalApplier.state().state.running);
};
return suite; return suite;
} }

View File

@ -224,7 +224,7 @@ describe ArangoDB do
## wal access ## wal access
################################################################################ ################################################################################
context "dealing with the wal access" do context "dealing with wal access api" do
api = "/_api/wal" api = "/_api/wal"
prefix = "api-wal" prefix = "api-wal"
@ -233,7 +233,7 @@ describe ArangoDB do
## state ## state
################################################################################ ################################################################################
it "checks the state" do it "check the state" do
# fetch state # fetch state
cmd = "/_api/replication/logger-state" cmd = "/_api/replication/logger-state"
doc = ArangoDB.log_get("api-replication-logger-state", cmd, :body => "") doc = ArangoDB.log_get("api-replication-logger-state", cmd, :body => "")
@ -660,7 +660,7 @@ describe ArangoDB do
doc.code.should eq(200) doc.code.should eq(200)
end end
it "validates chunkSize restriction" do it "validates chunkSize restrictions" do
ArangoDB.drop_collection("UnitTestsReplication") ArangoDB.drop_collection("UnitTestsReplication")
sleep 1 sleep 1
@ -670,26 +670,44 @@ describe ArangoDB do
doc.code.should eq(200) doc.code.should eq(200)
fromTick = doc.parsed_response["tick"] fromTick = doc.parsed_response["tick"]
originalTick = fromTick originalTick = fromTick
lastScanned = fromTick
# create collection # create collection
cid = ArangoDB.create_collection("UnitTestsReplication") cid = ArangoDB.create_collection("UnitTestsReplication")
cuid = ArangoDB.properties_collection(cid)["globallyUniqueId"] cuid = ArangoDB.properties_collection(cid)["globallyUniqueId"]
# create documents # create documents
(1..1500).each do |value| (1..250).each do |value|
cmd = "/_api/document?collection=UnitTestsReplication" cmd = "/_api/document?collection=UnitTestsReplication"
body = "{ \"value\" : \"thisIsALongerStringBecauseWeWantToTestTheChunkSizeLimitsLaterOnAndItGetsEvenLongerWithTimeForRealNow\" }" body = "{ \"value\" : \"thisIsALongerStringBecauseWeWantToTestTheChunkSizeLimitsLaterOnAndItGetsEvenLongerWithTimeForRealNow\" }"
doc = ArangoDB.log_post("#{prefix}-follow-chunksize", cmd, :body => body) doc = ArangoDB.log_post("#{prefix}-follow-chunksize", cmd, :body => body)
doc.code.should eq(201) doc.code.should eq(201)
end end
# create one big transaction
docsBody = "["
(1..749).each do |value|
docsBody << "{ \"value\" : \"%d\" }," % [value]
end
docsBody << "{ \"value\" : \"500\" }]"
cmd = "/_api/document?collection=UnitTestsReplication"
doc = ArangoDB.log_post("#{prefix}-follow-chunksize", cmd, :body => docsBody)
doc.code.should eq(201)
# create more documents
(1..500).each do |value|
cmd = "/_api/document?collection=UnitTestsReplication"
body = "{ \"value\" : \"thisIsALongerStringBecauseWeWantToTestTheChunkSizeLimitsLaterOnAndItGetsEvenLongerWithTimeForRealNow\" }"
doc = ArangoDB.log_post("#{prefix}-follow-chunksize", cmd, :body => body)
doc.code.should eq(201)
end
sleep 1 sleep 1
tickTypes = { 2000 => 0, 2001 => 0, 2300 => 0 } tickTypes = { 2000 => 0, 2001 => 0, 2200 => 0, 2201 => 0, 2300 => 0 }
while 1 while 1
cmd = api + "/tail?global=true&from=" + fromTick + "&chunkSize=16384" cmd = api + "/tail?global=true&from=" + fromTick + "&lastScanned=" + lastScanned + "&chunkSize=16384"
doc = ArangoDB.log_get("#{prefix}-follow-chunksize", cmd, :body => "", :format => :plain) doc = ArangoDB.log_get("#{prefix}-follow-chunksize", cmd, :body => "", :format => :plain)
[200, 204].should include(doc.code) [200, 204].should include(doc.code)
@ -697,16 +715,19 @@ describe ArangoDB do
doc.headers["x-arango-replication-lastincluded"].should match(/^\d+$/) doc.headers["x-arango-replication-lastincluded"].should match(/^\d+$/)
doc.headers["x-arango-replication-lastincluded"].should_not eq("0") doc.headers["x-arango-replication-lastincluded"].should_not eq("0")
doc.headers["x-arango-replication-lastscanned"].should match(/^\d+$/)
doc.headers["x-arango-replication-lastscanned"].should_not eq("0")
if fromTick == originalTick if fromTick == originalTick
# first batch # first batch
doc.headers["x-arango-replication-checkmore"].should eq("true") doc.headers["x-arango-replication-checkmore"].should eq("true")
end end
doc.headers["content-type"].should eq("application/x-arango-dump; charset=utf-8") doc.headers["content-type"].should eq("application/x-arango-dump; charset=utf-8")
# we need to allow for some overhead here, as the chunkSize restriction is not honored precisely # we need to allow for some overhead here, as the chunkSize restriction is not honored precisely
doc.headers["content-length"].to_i.should be < (16 + 8) * 1024 doc.headers["content-length"].to_i.should be < (16 + 9) * 1024
body = doc.response.body # update lastScanned for next request
lastScanned = doc.headers["x-arango-replication-lastscanned"]
body = doc.response.body
i = 0 i = 0
while 1 while 1
@ -721,13 +742,23 @@ describe ArangoDB do
marker.should have_key("tick") marker.should have_key("tick")
fromTick = marker["tick"] fromTick = marker["tick"]
if marker["type"] >= 2000 and marker["cuid"] == cuid if marker["type"] == 2200
# create collection marker.should have_key("tid")
marker.should have_key("db")
tickTypes[2200] = tickTypes[2200] + 1
elsif marker["type"] == 2201
marker.should have_key("tid")
tickTypes[2201] = tickTypes[2201] + 1
elsif marker["type"] >= 2000 and marker["cuid"] == cuid
# collection markings
marker.should have_key("type") marker.should have_key("type")
marker.should have_key("cuid") marker.should have_key("cuid")
if marker["type"] == 2300 if marker["type"] == 2300
marker.should have_key("data") marker.should have_key("data")
marker.should have_key("tid")
end end
cc = tickTypes[marker["type"]] cc = tickTypes[marker["type"]]
@ -741,13 +772,15 @@ describe ArangoDB do
tickTypes[2000].should eq(1) # collection create tickTypes[2000].should eq(1) # collection create
tickTypes[2001].should eq(0) # collection drop tickTypes[2001].should eq(0) # collection drop
tickTypes[2200].should be >= 1 # begin transaction
tickTypes[2201].should be >= 1 # commit transaction
tickTypes[2300].should eq(1500) # document inserts tickTypes[2300].should eq(1500) # document inserts
# now try again with a single chunk # now try again with a single chunk
tickTypes = { 2000 => 0, 2001 => 0, 2300 => 0 } tickTypes = { 2000 => 0, 2001 => 0, 2200 => 0, 2201 => 0, 2300 => 0 }
cmd = api + "/tail?global=true&from=" + originalTick + "&chunkSize=1048576" cmd = api + "/tail?global=true&from=" + originalTick + "&lastScanned=" + originalTick + "&chunkSize=1048576"
doc = ArangoDB.log_get("#{prefix}-follow-chunksize", cmd, :body => "", :format => :plain) doc = ArangoDB.log_get("#{prefix}-follow-chunksize", cmd, :body => "", :format => :plain)
doc.code.should eq(200) doc.code.should eq(200)
@ -771,13 +804,23 @@ describe ArangoDB do
marker.should have_key("tick") marker.should have_key("tick")
if marker["type"] >= 2000 and marker["cuid"] == cuid if marker["type"] == 2200
marker.should have_key("tid")
marker.should have_key("db")
tickTypes[2200] = tickTypes[2200] + 1
elsif marker["type"] == 2201
marker.should have_key("tid")
tickTypes[2201] = tickTypes[2201] + 1
elsif marker["type"] >= 2000 and marker["cuid"] == cuid
# create collection # create collection
marker.should have_key("type") marker.should have_key("type")
marker.should have_key("cuid") marker.should have_key("cuid")
if marker["type"] == 2300 if marker["type"] == 2300
marker.should have_key("data") marker.should have_key("data")
marker.should have_key("tid")
end end
cc = tickTypes[marker["type"]] cc = tickTypes[marker["type"]]
@ -790,6 +833,8 @@ describe ArangoDB do
tickTypes[2000].should eq(1) # collection create tickTypes[2000].should eq(1) # collection create
tickTypes[2001].should eq(0) # collection drop tickTypes[2001].should eq(0) # collection drop
tickTypes[2200].should be >= 1 # begin transaction
tickTypes[2201].should be >= 1 # commit transaction
tickTypes[2300].should eq(1500) # document inserts tickTypes[2300].should eq(1500) # document inserts
# drop collection # drop collection