1
0
Fork 0

refactoring

This commit is contained in:
Jan Steemann 2015-08-20 18:25:47 +02:00
parent 82f2f07346
commit f0ef3d450f
12 changed files with 427 additions and 408 deletions

View File

@ -97,6 +97,15 @@ ContinuousSyncer::ContinuousSyncer (TRI_server_t* server,
////////////////////////////////////////////////////////////////////////////////
ContinuousSyncer::~ContinuousSyncer () {
// abort all running transactions
for (auto& it : _openInitialTransactions) {
auto trx = it.second;
if (trx != nullptr) {
trx->abort();
delete trx;
}
}
}
// -----------------------------------------------------------------------------
@ -217,15 +226,49 @@ int ContinuousSyncer::saveApplierState () {
return res;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief whether or not a collection should be excluded
/// @brief whether or not a marker should be skipped
////////////////////////////////////////////////////////////////////////////////
bool ContinuousSyncer::excludeCollection (TRI_json_t const* json) const {
if (_restrictType == RESTRICT_NONE && _includeSystem) {
return false;
bool ContinuousSyncer::skipMarker (TRI_voc_tick_t firstRegularTick,
TRI_json_t const* json) const {
bool tooOld = false;
string const tick = JsonHelper::getStringValue(json, "tick", "");
if (! tick.empty()) {
tooOld = (static_cast<TRI_voc_tick_t>(StringUtils::uint64(tick.c_str(), tick.size())) < firstRegularTick);
if (tooOld) {
int typeValue = JsonHelper::getNumericValue<int>(json, "type", 0);
// handle marker type
TRI_replication_operation_e type = (TRI_replication_operation_e) typeValue;
if (type == REPLICATION_MARKER_DOCUMENT ||
type == REPLICATION_MARKER_EDGE ||
type == REPLICATION_MARKER_REMOVE ||
type == REPLICATION_TRANSACTION_START ||
type == REPLICATION_TRANSACTION_ABORT ||
type == REPLICATION_TRANSACTION_COMMIT) {
// read "tid" entry from marker
string const id = JsonHelper::getStringValue(json, "tid", "");
if (! id.empty()) {
TRI_voc_tid_t tid = static_cast<TRI_voc_tid_t>(StringUtils::uint64(id.c_str(), id.size()));
if (tid > 0 &&
_openInitialTransactions.find(tid) != _openInitialTransactions.end()) {
// must still use this marker as it belongs to a transaction we need to finish
tooOld = false;
}
}
}
}
}
if (! TRI_IsObjectJson(json)) {
if (tooOld) {
return true;
}
if (_restrictType == RESTRICT_NONE && _includeSystem) {
return false;
}
@ -369,14 +412,17 @@ int ContinuousSyncer::processDocument (TRI_replication_operation_e type,
}
if (tid > 0) {
auto it = _applier->_runningRemoteTransactions.find(tid);
auto it = _openInitialTransactions.find(tid);
if (it == _applier->_runningRemoteTransactions.end()) {
if (it == _openInitialTransactions.end()) {
return TRI_ERROR_REPLICATION_UNEXPECTED_TRANSACTION;
}
auto trx = (*it).second;
TRI_ASSERT(trx != nullptr);
if (trx == nullptr) {
return TRI_ERROR_REPLICATION_UNEXPECTED_TRANSACTION;
}
TRI_transaction_collection_t* trxCollection = trx->trxCollection(cid);
@ -443,35 +489,35 @@ int ContinuousSyncer::startTransaction (TRI_json_t const* json) {
// note: this is the remote transaction id!
TRI_voc_tid_t tid = static_cast<TRI_voc_tid_t>(StringUtils::uint64(id.c_str(), id.size()));
auto it = _applier->_runningRemoteTransactions.find(tid);
auto it = _openInitialTransactions.find(tid);
if (it != _applier->_runningRemoteTransactions.end()) {
if (it != _openInitialTransactions.end()) {
// found a previous version of the same transaction - should not happen...
auto trx = (*it).second;
_openInitialTransactions.erase(tid);
_applier->_runningRemoteTransactions.erase(tid);
if (trx != nullptr) {
// abort ongoing trx
delete trx;
}
// abort ongoing trx
delete trx;
}
TRI_ASSERT(tid > 0);
LOG_TRACE("starting replication transaction %llu", (unsigned long long) tid);
LOG_TRACE("starting transaction %llu", (unsigned long long) tid);
auto trx = new ReplicationTransaction(_server, _vocbase, tid);
if (trx == nullptr) {
return TRI_ERROR_OUT_OF_MEMORY;
}
std::unique_ptr<ReplicationTransaction> trx(new ReplicationTransaction(_server, _vocbase, tid));
int res = trx->begin();
if (res != TRI_ERROR_NO_ERROR) {
delete trx;
return res;
}
_applier->_runningRemoteTransactions.insert(it, std::make_pair(tid, trx));
_openInitialTransactions[tid] = trx.get();
trx.release();
return TRI_ERROR_NO_ERROR;
}
@ -489,12 +535,12 @@ int ContinuousSyncer::abortTransaction (TRI_json_t const* json) {
}
// transaction id
// note: this is the remote trasnaction id!
// note: this is the remote transaction id!
TRI_voc_tid_t const tid = static_cast<TRI_voc_tid_t>(StringUtils::uint64(id.c_str(), id.size()));
auto it = _applier->_runningRemoteTransactions.find(tid);
auto it = _openInitialTransactions.find(tid);
if (it == _applier->_runningRemoteTransactions.end()) {
if (it == _openInitialTransactions.end()) {
// invalid state, no transaction was started.
return TRI_ERROR_REPLICATION_UNEXPECTED_TRANSACTION;
}
@ -504,12 +550,16 @@ int ContinuousSyncer::abortTransaction (TRI_json_t const* json) {
LOG_TRACE("abort replication transaction %llu", (unsigned long long) tid);
auto trx = (*it).second;
_applier->_runningRemoteTransactions.erase(tid);
_openInitialTransactions.erase(tid);
int res = trx->abort();
delete trx;
if (trx != nullptr) {
int res = trx->abort();
delete trx;
return res;
return res;
}
return TRI_ERROR_REPLICATION_UNEXPECTED_TRANSACTION;
}
////////////////////////////////////////////////////////////////////////////////
@ -528,8 +578,9 @@ int ContinuousSyncer::commitTransaction (TRI_json_t const* json) {
// note: this is the remote trasnaction id!
TRI_voc_tid_t const tid = static_cast<TRI_voc_tid_t>(StringUtils::uint64(id.c_str(), id.size()));
auto it = _applier->_runningRemoteTransactions.find(tid);
if (it == _applier->_runningRemoteTransactions.end()) {
auto it = _openInitialTransactions.find(tid);
if (it == _openInitialTransactions.end()) {
// invalid state, no transaction was started.
return TRI_ERROR_REPLICATION_UNEXPECTED_TRANSACTION;
}
@ -539,12 +590,16 @@ int ContinuousSyncer::commitTransaction (TRI_json_t const* json) {
LOG_TRACE("committing replication transaction %llu", (unsigned long long) tid);
auto trx = (*it).second;
_applier->_runningRemoteTransactions.erase(tid);
_openInitialTransactions.erase(tid);
int res = trx->commit();
delete trx;
if (trx != nullptr) {
int res = trx->commit();
delete trx;
return res;
return res;
}
return TRI_ERROR_REPLICATION_UNEXPECTED_TRANSACTION;
}
////////////////////////////////////////////////////////////////////////////////
@ -621,15 +676,6 @@ int ContinuousSyncer::changeCollection (TRI_json_t const* json) {
int ContinuousSyncer::applyLogMarker (TRI_json_t const* json,
string& errorMsg) {
static const string invalidMsg = "received invalid JSON data";
// check data
if (! JsonHelper::isObject(json)) {
errorMsg = invalidMsg;
return TRI_ERROR_REPLICATION_INVALID_RESPONSE;
}
// fetch marker "type"
int typeValue = JsonHelper::getNumericValue<int>(json, "type", 0);
@ -708,6 +754,7 @@ int ContinuousSyncer::applyLogMarker (TRI_json_t const* json,
////////////////////////////////////////////////////////////////////////////////
int ContinuousSyncer::applyLog (SimpleHttpResult* response,
TRI_voc_tick_t firstRegularTick,
std::string& errorMsg,
uint64_t& processedMarkers,
uint64_t& ignoreCount) {
@ -746,10 +793,16 @@ int ContinuousSyncer::applyLog (SimpleHttpResult* response,
if (json == nullptr) {
return TRI_ERROR_OUT_OF_MEMORY;
}
if (! TRI_IsObjectJson(json.get())) {
errorMsg = "received invalid JSON data";
return TRI_ERROR_REPLICATION_INVALID_RESPONSE;
}
int res;
bool skipped;
if (excludeCollection(json.get())) {
if (skipMarker(firstRegularTick, json.get())) {
// entry is skipped
res = TRI_ERROR_NO_ERROR;
skipped = true;
@ -808,7 +861,6 @@ int ContinuousSyncer::applyLog (SimpleHttpResult* response,
int ContinuousSyncer::runContinuousSync (string& errorMsg) {
uint64_t connectRetries = 0;
uint64_t inactiveCycles = 0;
int res = TRI_ERROR_INTERNAL;
// get start tick
// ---------------------------------------
@ -836,7 +888,19 @@ int ContinuousSyncer::runContinuousSync (string& errorMsg) {
return TRI_ERROR_REPLICATION_NO_START_TICK;
}
// TODO: get the applier into a sensible start state...
// get the applier into a sensible start state by fetching the list of
// open transactions from the master
TRI_voc_tick_t fetchTick = 0;
int res = fetchMasterState(errorMsg, 0, fromTick, fetchTick);
if (res != TRI_ERROR_NO_ERROR) {
return res;
}
if (fetchTick > fromTick) {
// must not happen
return TRI_ERROR_INTERNAL;
}
// run in a loop. the loop is terminated when the applier is stopped or an
// error occurs
@ -844,8 +908,8 @@ int ContinuousSyncer::runContinuousSync (string& errorMsg) {
bool worked;
bool masterActive = false;
// fromTick is passed by reference!
res = followMasterLog(errorMsg, fromTick, _configuration._ignoreErrors, worked, masterActive);
// fetchTick is passed by reference!
res = followMasterLog(errorMsg, fetchTick, fromTick, _configuration._ignoreErrors, worked, masterActive);
uint64_t sleepTime;
@ -928,12 +992,125 @@ int ContinuousSyncer::runContinuousSync (string& errorMsg) {
return res;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief fetch the initial master state
////////////////////////////////////////////////////////////////////////////////
int ContinuousSyncer::fetchMasterState (string& errorMsg,
TRI_voc_tick_t fromTick,
TRI_voc_tick_t toTick,
TRI_voc_tick_t& startTick) {
string const baseUrl = BaseUrl + "/determine-open-transactions";
map<string, string> headers;
string const url = baseUrl +
"?serverId=" + _localServerIdString +
"&from=" + StringUtils::itoa(fromTick) +
"&to=" + StringUtils::itoa(toTick);
string const progress = "fetching initial master state with from tick " + StringUtils::itoa(fromTick) + ", toTick " + StringUtils::itoa(toTick);
LOG_TRACE("fetching initial master state with from tick %llu, to tick %llu, url %s",
(unsigned long long) fromTick,
(unsigned long long) toTick,
url.c_str());
// send request
setProgress(progress.c_str());
SimpleHttpResult* response = _client->request(HttpRequest::HTTP_REQUEST_GET,
url,
nullptr,
0,
headers);
if (response == nullptr || ! response->isComplete()) {
errorMsg = "got invalid response from master at " + string(_masterInfo._endpoint) +
": " + _client->getErrorMessage();
if (response != nullptr) {
delete response;
}
return TRI_ERROR_REPLICATION_NO_RESPONSE;
}
if (response->wasHttpError()) {
errorMsg = "got invalid response from master at " + string(_masterInfo._endpoint) +
": HTTP " + StringUtils::itoa(response->getHttpReturnCode()) +
": " + response->getHttpReturnMessage();
delete response;
return TRI_ERROR_REPLICATION_MASTER_ERROR;
}
bool fromIncluded = false;
bool found;
string header = response->getHeaderField(TRI_REPLICATION_HEADER_FROMPRESENT, found);
if (found) {
fromIncluded = StringUtils::boolean(header);
}
if (! fromIncluded &&
_requireFromPresent) {
errorMsg = "required tick value '" + StringUtils::itoa(fromTick) + "' is not present on master at " + string(_masterInfo._endpoint);
delete response;
return TRI_ERROR_REPLICATION_START_TICK_NOT_PRESENT;
}
// fetch the tick from where we need to start scanning later
header = response->getHeaderField(TRI_REPLICATION_HEADER_LASTTICK, found);
if (! found) {
errorMsg = "got invalid response from master at " + string(_masterInfo._endpoint) +
": required header " + TRI_REPLICATION_HEADER_LASTTICK + " is missing";
delete response;
return TRI_ERROR_REPLICATION_INVALID_RESPONSE;
}
startTick = StringUtils::uint64(header);
StringBuffer& data = response->getBody();
std::unique_ptr<TRI_json_t> json(TRI_JsonString(TRI_UNKNOWN_MEM_ZONE, data.begin()));
delete response;
if (! TRI_IsArrayJson(json.get())) {
errorMsg = "got invalid response from master at " + string(_masterInfo._endpoint) +
": invalid response type for initial data. expecting array";
return TRI_ERROR_REPLICATION_INVALID_RESPONSE;
}
for (size_t i = 0; i < TRI_LengthArrayJson(json.get()); ++i) {
auto id = static_cast<TRI_json_t const*>(TRI_AtVector(&(json.get()->_value._objects), i));
if (! TRI_IsStringJson(id)) {
errorMsg = "got invalid response from master at " + string(_masterInfo._endpoint) +
": invalid response type for initial data. expecting array of ids";
return TRI_ERROR_REPLICATION_INVALID_RESPONSE;
}
_openInitialTransactions.emplace(StringUtils::uint64(id->_value._string.data, id->_value._string.length - 1), nullptr);
}
return TRI_ERROR_NO_ERROR;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief run the continuous synchronisation
////////////////////////////////////////////////////////////////////////////////
int ContinuousSyncer::followMasterLog (string& errorMsg,
TRI_voc_tick_t& fromTick,
TRI_voc_tick_t& fetchTick,
TRI_voc_tick_t firstRegularTick,
uint64_t& ignoreCount,
bool& worked,
bool& masterActive) {
@ -942,24 +1119,48 @@ int ContinuousSyncer::followMasterLog (string& errorMsg,
map<string, string> headers;
worked = false;
string const tickString = StringUtils::itoa(fromTick);
string const tickString = StringUtils::itoa(fetchTick);
string const url = baseUrl +
"&from=" + tickString +
"&firstRegular=" + StringUtils::itoa(firstRegularTick) +
"&serverId=" + _localServerIdString +
"&includeSystem=" + (_includeSystem ? "true" : "false");
LOG_TRACE("running continuous replication request with tick %llu, url %s",
(unsigned long long) fromTick,
(unsigned long long) fetchTick,
url.c_str());
// send request
string const progress = "fetching master log from offset " + tickString;
setProgress(progress.c_str());
SimpleHttpResult* response = _client->request(HttpRequest::HTTP_REQUEST_GET,
std::string body;
if (! _openInitialTransactions.empty()) {
// stringify list of open transactions
body.append("[\"");
bool first = true;
for (auto& it : _openInitialTransactions) {
if (first) {
first = false;
}
else {
body.append("\",\"");
}
body.append(StringUtils::itoa(it.first));
}
body.append("\"]");
}
else {
body.append("[]");
}
SimpleHttpResult* response = _client->request(HttpRequest::HTTP_REQUEST_PUT,
url,
nullptr,
0,
body.c_str(),
body.size(),
headers);
if (response == nullptr || ! response->isComplete()) {
@ -1011,8 +1212,8 @@ int ContinuousSyncer::followMasterLog (string& errorMsg,
if (found) {
tick = StringUtils::uint64(header);
if (tick > fromTick) {
fromTick = tick;
if (tick > fetchTick) {
fetchTick = tick;
worked = true;
}
else {
@ -1052,7 +1253,7 @@ int ContinuousSyncer::followMasterLog (string& errorMsg,
}
uint64_t processedMarkers = 0;
res = applyLog(response, errorMsg, processedMarkers, ignoreCount);
res = applyLog(response, firstRegularTick, errorMsg, processedMarkers, ignoreCount);
if (processedMarkers > 0) {
worked = true;

View File

@ -51,6 +51,7 @@ namespace triagens {
}
namespace arango {
class ReplicationTransaction;
enum RestrictType : uint32_t {
RESTRICT_NONE,
@ -128,7 +129,8 @@ namespace triagens {
/// @brief whether or not a collection should be excluded
////////////////////////////////////////////////////////////////////////////////
bool excludeCollection (struct TRI_json_t const*) const;
bool skipMarker (TRI_voc_tick_t,
struct TRI_json_t const*) const;
////////////////////////////////////////////////////////////////////////////////
/// @brief whether or not a collection should be excluded
@ -192,6 +194,7 @@ namespace triagens {
////////////////////////////////////////////////////////////////////////////////
int applyLog (httpclient::SimpleHttpResult*,
TRI_voc_tick_t,
std::string&,
uint64_t&,
uint64_t&);
@ -202,12 +205,22 @@ namespace triagens {
int runContinuousSync (std::string&);
////////////////////////////////////////////////////////////////////////////////
/// @brief fetch the initial master state
////////////////////////////////////////////////////////////////////////////////
int fetchMasterState (std::string&,
TRI_voc_tick_t,
TRI_voc_tick_t,
TRI_voc_tick_t&);
////////////////////////////////////////////////////////////////////////////////
/// @brief run the continuous synchronisation
////////////////////////////////////////////////////////////////////////////////
int followMasterLog (std::string&,
TRI_voc_tick_t&,
TRI_voc_tick_t,
uint64_t&,
bool&,
bool&);
@ -267,6 +280,12 @@ namespace triagens {
bool _requireFromPresent;
////////////////////////////////////////////////////////////////////////////////
/// @brief which transactions were open and need to be treated specially
////////////////////////////////////////////////////////////////////////////////
std::unordered_map<TRI_voc_tid_t, ReplicationTransaction*> _openInitialTransactions;
};
}

View File

@ -120,7 +120,8 @@ HttpHandler::status_t RestReplicationHandler::execute () {
handleCommandLoggerFirstTick();
}
else if (command == "logger-follow") {
if (type != HttpRequest::HTTP_REQUEST_GET) {
if (type != HttpRequest::HTTP_REQUEST_GET &&
type != HttpRequest::HTTP_REQUEST_PUT) {
goto BAD_CALL;
}
handleCommandLoggerFollow();
@ -1098,7 +1099,8 @@ void RestReplicationHandler::handleCommandLoggerFollow () {
// determine start and end tick
triagens::wal::LogfileManagerState state = triagens::wal::LogfileManager::instance()->state();
TRI_voc_tick_t tickStart = 0;
TRI_voc_tick_t tickEnd = state.lastDataTick;
TRI_voc_tick_t tickEnd = state.lastDataTick;
TRI_voc_tick_t firstRegularTick = 0;
bool found;
char const* value;
@ -1128,6 +1130,39 @@ void RestReplicationHandler::handleCommandLoggerFollow () {
includeSystem = StringUtils::boolean(value);
}
// grab list of transactions from the body value
std::unordered_set<TRI_voc_tid_t> transactionIds;
if (_request->requestType() == triagens::rest::HttpRequest::HTTP_REQUEST_PUT) {
value = _request->value("firstRegularTick", found);
if (found) {
firstRegularTick = static_cast<TRI_voc_tick_t>(StringUtils::uint64(value));
}
char const* ptr = _request->body();
std::unique_ptr<TRI_json_t> json(TRI_JsonString(TRI_UNKNOWN_MEM_ZONE, ptr));
if (! TRI_IsArrayJson(json.get())) {
generateError(HttpResponse::BAD,
TRI_ERROR_HTTP_BAD_PARAMETER,
"invalid body value. expecting array");
return;
}
for (size_t i = 0; i < TRI_LengthArrayJson(json.get()); ++i) {
auto id = static_cast<TRI_json_t const*>(TRI_AtVector(&(json.get()->_value._objects), i));
if (! TRI_IsStringJson(id)) {
generateError(HttpResponse::BAD,
TRI_ERROR_HTTP_BAD_PARAMETER,
"invalid body value. expecting array of ids");
return;
}
transactionIds.emplace(StringUtils::uint64(id->_value._string.data, id->_value._string.length - 1));
}
}
int res = TRI_ERROR_NO_ERROR;
try {
@ -1135,7 +1170,7 @@ void RestReplicationHandler::handleCommandLoggerFollow () {
TRI_replication_dump_t dump(_vocbase, (size_t) determineChunkSize(), includeSystem);
// and dump
res = TRI_DumpLogReplication(&dump, tickStart, tickEnd, false);
res = TRI_DumpLogReplication(&dump, transactionIds, firstRegularTick, tickStart, tickEnd, false);
if (res == TRI_ERROR_NO_ERROR) {
bool const checkMore = (dump._lastFoundTick > 0 && dump._lastFoundTick != state.lastDataTick);
@ -1244,6 +1279,9 @@ void RestReplicationHandler::handleCommandDetermineOpenTransactions () {
}
_response->setContentType("application/x-arango-dump; charset=utf-8");
_response->setHeader(TRI_CHAR_LENGTH_PAIR(TRI_REPLICATION_HEADER_FROMPRESENT), dump._fromTickIncluded ? "true" : "false");
_response->setHeader(TRI_CHAR_LENGTH_PAIR(TRI_REPLICATION_HEADER_LASTTICK), StringUtils::itoa(dump._lastFoundTick));
if (length > 0) {
// transfer ownership of the buffer contents
@ -2201,7 +2239,8 @@ int RestReplicationHandler::processRestoreIndexesCoordinator (
return TRI_ERROR_HTTP_BAD_PARAMETER;
}
const size_t n = TRI_LengthArrayJson(indexes);
size_t const n = TRI_LengthArrayJson(indexes);
if (n == 0) {
// nothing to do
return TRI_ERROR_NO_ERROR;
@ -3661,6 +3700,7 @@ void RestReplicationHandler::handleCommandApplierSetConfig () {
if (TRI_IsArrayJson(value)) {
config._restrictCollections.clear();
size_t const n = TRI_LengthArrayJson(value);
for (size_t i = 0; i < n; ++i) {
TRI_json_t const* collection = TRI_LookupArrayJson(value, i);
if (TRI_IsStringJson(collection)) {

View File

@ -98,7 +98,7 @@ static void JS_LastLoggerReplication (const v8::FunctionCallbackInfo<v8::Value>&
TRI_voc_tick_t tickStart = TRI_ObjectToUInt64(args[0], true);
TRI_voc_tick_t tickEnd = TRI_ObjectToUInt64(args[1], true);
int res = TRI_DumpLogReplication(&dump, tickStart, tickEnd, true);
int res = TRI_DumpLogReplication(&dump, std::unordered_set<TRI_voc_tid_t>(), 0, tickStart, tickEnd, true);
if (res != TRI_ERROR_NO_ERROR) {
TRI_V8_THROW_EXCEPTION(res);

View File

@ -1055,14 +1055,6 @@ TRI_replication_applier_t::~TRI_replication_applier_t () {
stop(true);
TRI_DestroyStateReplicationApplier(&_state);
TRI_DestroyConfigurationReplicationApplier(&_configuration);
for (auto it = _runningRemoteTransactions.begin(); it != _runningRemoteTransactions.end(); ++it) {
auto trx = (*it).second;
// do NOT write abort markers so we can resume running transactions later
trx->addHint(TRI_TRANSACTION_HINT_NO_ABORT_MARKER, true);
delete trx;
}
}
////////////////////////////////////////////////////////////////////////////////
@ -1233,12 +1225,6 @@ int TRI_replication_applier_t::shutdown () {
setTermination(false);
{
WRITE_LOCKER(_statusLock);
// really abort all ongoing transactions
abortRunningRemoteTransactions();
}
LOG_INFO("stopped replication applier for database '%s'", _databaseName.c_str());
return res;

View File

@ -136,10 +136,6 @@ class TRI_replication_applier_t {
_terminateThread.store(value);
}
void addRemoteTransaction (triagens::arango::ReplicationTransaction* trx) {
_runningRemoteTransactions.insert(std::make_pair(trx->externalId(), trx));
}
////////////////////////////////////////////////////////////////////////////////
/// @brief return the database name
////////////////////////////////////////////////////////////////////////////////
@ -173,21 +169,6 @@ class TRI_replication_applier_t {
int shutdown ();
void abortRunningRemoteTransactions () {
size_t const n = _runningRemoteTransactions.size();
triagens::arango::TransactionBase::increaseNumbers((int) n, (int) n);
for (auto it = _runningRemoteTransactions.begin(); it != _runningRemoteTransactions.end(); ++it) {
auto trx = (*it).second;
// do NOT write abort markers so we can resume running transactions later
trx->removeHint(TRI_TRANSACTION_HINT_NO_ABORT_MARKER, true);
delete trx;
}
_runningRemoteTransactions.clear();
}
////////////////////////////////////////////////////////////////////////////////
/// @brief set the progress with or without a lock
////////////////////////////////////////////////////////////////////////////////
@ -228,7 +209,6 @@ class TRI_replication_applier_t {
TRI_replication_applier_state_t _state;
TRI_replication_applier_configuration_t _configuration;
TRI_thread_t _thread;
std::unordered_map<TRI_voc_tid_t, triagens::arango::ReplicationTransaction*> _runningRemoteTransactions;
};
// -----------------------------------------------------------------------------

View File

@ -31,7 +31,6 @@
#define ARANGODB_VOC_BASE_REPLICATION__COMMON_H 1
#include "Basics/Common.h"
#include "VocBase/voc-types.h"
// -----------------------------------------------------------------------------

View File

@ -1070,6 +1070,42 @@ static TRI_voc_tick_t GetCollectionFromWalMarker (TRI_df_marker_t const* marker)
}
}
////////////////////////////////////////////////////////////////////////////////
/// @brief helper function to extract a transaction id from a marker
////////////////////////////////////////////////////////////////////////////////
template<typename T>
static TRI_voc_tid_t GetTransactionId (TRI_df_marker_t const* marker) {
T const* m = reinterpret_cast<T const*>(marker);
return m->_transactionId;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief get the transaction id from a marker
////////////////////////////////////////////////////////////////////////////////
static TRI_voc_tid_t GetTransactionFromWalMarker (TRI_df_marker_t const* marker) {
TRI_ASSERT_EXPENSIVE(MustReplicateWalMarkerType(marker));
switch (marker->_type) {
case TRI_WAL_MARKER_DOCUMENT:
return GetTransactionId<triagens::wal::document_marker_t>(marker);
case TRI_WAL_MARKER_EDGE:
return GetTransactionId<triagens::wal::edge_marker_t>(marker);
case TRI_WAL_MARKER_REMOVE:
return GetTransactionId<triagens::wal::remove_marker_t>(marker);
case TRI_WAL_MARKER_BEGIN_TRANSACTION:
return GetTransactionId<triagens::wal::transaction_begin_marker_t>(marker);
case TRI_WAL_MARKER_COMMIT_TRANSACTION:
return GetTransactionId<triagens::wal::transaction_commit_marker_t>(marker);
case TRI_WAL_MARKER_ABORT_TRANSACTION:
return GetTransactionId<triagens::wal::transaction_abort_marker_t>(marker);
default: {
return 0;
}
}
}
////////////////////////////////////////////////////////////////////////////////
/// @brief whether or not a marker belongs to a transaction
////////////////////////////////////////////////////////////////////////////////
@ -1094,7 +1130,9 @@ static bool IsTransactionWalMarker (TRI_replication_dump_t* dump,
////////////////////////////////////////////////////////////////////////////////
static bool MustReplicateWalMarker (TRI_replication_dump_t* dump,
TRI_df_marker_t const* marker) {
TRI_df_marker_t const* marker,
TRI_voc_tick_t firstRegularTick,
std::unordered_set<TRI_voc_tid_t> const& transactionIds) {
// first check the marker type
if (! MustReplicateWalMarkerType(marker)) {
return false;
@ -1115,6 +1153,18 @@ static bool MustReplicateWalMarker (TRI_replication_dump_t* dump,
}
}
if (marker->_tick >= firstRegularTick) {
return true;
}
if (! transactionIds.empty()) {
TRI_voc_tid_t tid = GetTransactionFromWalMarker(marker);
if (tid == 0 ||
transactionIds.find(tid) == transactionIds.end()) {
return false;
}
}
return true;
}
@ -1384,6 +1434,8 @@ int TRI_DumpCollectionReplication (TRI_replication_dump_t* dump,
////////////////////////////////////////////////////////////////////////////////
int TRI_DumpLogReplication (TRI_replication_dump_t* dump,
std::unordered_set<TRI_voc_tid_t> const& transactionIds,
TRI_voc_tick_t firstRegularTick,
TRI_voc_tick_t tickMin,
TRI_voc_tick_t tickMax,
bool outputAsArray) {
@ -1438,14 +1490,14 @@ int TRI_DumpLogReplication (TRI_replication_dump_t* dump,
if (foundTick >= tickMax) {
hasMore = false;
}
if (foundTick > tickMax) {
// marker too new
break;
if (foundTick > tickMax) {
// marker too new
break;
}
}
if (! MustReplicateWalMarker(dump, marker)) {
if (! MustReplicateWalMarker(dump, marker, firstRegularTick, transactionIds)) {
continue;
}

View File

@ -128,6 +128,8 @@ int TRI_DumpCollectionReplication (TRI_replication_dump_t*,
////////////////////////////////////////////////////////////////////////////////
int TRI_DumpLogReplication (TRI_replication_dump_t*,
std::unordered_set<TRI_voc_tid_t> const&,
TRI_voc_tick_t,
TRI_voc_tick_t,
TRI_voc_tick_t,
bool);

View File

@ -72,7 +72,7 @@ static inline TRI_doc_datafile_info_t& createDfi (CollectorCache* cache,
TRI_doc_datafile_info_t dfi;
memset(&dfi, 0, sizeof(TRI_doc_datafile_info_t));
cache->dfi.emplace(std::make_pair(fid, dfi));
cache->dfi.emplace(fid, dfi);
return getDfi(cache, fid);
}
@ -1133,7 +1133,7 @@ int CollectorThread::queueOperations (triagens::wal::Logfile* logfile,
if (it == _operationsQueue.end()) {
std::vector<CollectorCache*> ops;
ops.push_back(cache);
_operationsQueue.emplace(std::make_pair(cid, ops));
_operationsQueue.emplace(cid, ops);
_logfileManager->increaseCollectQueueSize(logfile);
}
else {

View File

@ -179,14 +179,10 @@ RecoverState::RecoverState (TRI_server_t* server,
bool ignoreRecoveryErrors)
: server(server),
failedTransactions(),
remoteTransactions(),
remoteTransactionCollections(),
remoteTransactionDatabases(),
lastTick(0),
logfilesToProcess(),
openedCollections(),
openedDatabases(),
runningRemoteTransactions(),
emptyLogfiles(),
policy(TRI_DOC_UPDATE_ONLY_IF_NEWER, 0, nullptr),
ignoreRecoveryErrors(ignoreRecoveryErrors),
@ -199,15 +195,6 @@ RecoverState::RecoverState (TRI_server_t* server,
RecoverState::~RecoverState () {
releaseResources();
// free running remote transactions
for (auto it = runningRemoteTransactions.begin(); it != runningRemoteTransactions.end(); ++it) {
auto trx = (*it).second;
delete trx;
}
runningRemoteTransactions.clear();
}
// -----------------------------------------------------------------------------
@ -220,25 +207,6 @@ RecoverState::~RecoverState () {
////////////////////////////////////////////////////////////////////////////////
void RecoverState::releaseResources () {
// hand over running remote transactions to the applier
for (auto it = runningRemoteTransactions.begin(); it != runningRemoteTransactions.end(); ++it) {
auto* trx = (*it).second;
TRI_vocbase_t* vocbase = trx->vocbase();
TRI_ASSERT(vocbase != nullptr);
auto* applier = vocbase->_replicationApplier;
TRI_ASSERT(applier != nullptr);
applier->addRemoteTransaction(trx);
}
// reset trx counter as we're moving transactions from this thread to a potential other
triagens::arango::TransactionBase::setNumbers(0, 0);
runningRemoteTransactions.clear();
// release all collections
for (auto it = openedCollections.begin(); it != openedCollections.end(); ++it) {
TRI_vocbase_col_t* collection = (*it).second;
@ -411,61 +379,6 @@ TRI_document_collection_t* RecoverState::getCollection (TRI_voc_tick_t databaseI
return document;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief executes an operation in a remote transaction
////////////////////////////////////////////////////////////////////////////////
int RecoverState::executeRemoteOperation (TRI_voc_tick_t databaseId,
TRI_voc_cid_t collectionId,
TRI_voc_tid_t transactionId,
TRI_df_marker_t const* marker,
TRI_voc_fid_t fid,
std::function<int(RemoteTransactionType*, Marker*)> func) {
auto it = remoteTransactions.find(transactionId);
if (it == remoteTransactions.end()) {
LOG_WARNING("remote transaction not found: internal error");
return TRI_ERROR_INTERNAL;
}
TRI_voc_tid_t externalId = (*it).second.second;
auto it2 = runningRemoteTransactions.find(externalId);
if (it2 == runningRemoteTransactions.end()) {
LOG_WARNING("remote transaction not found: internal error");
return TRI_ERROR_INTERNAL;
}
auto trx = (*it2).second;
registerRemoteUsage(databaseId, collectionId);
EnvelopeMarker* envelope = nullptr;
int res = TRI_ERROR_INTERNAL;
try {
envelope = new EnvelopeMarker(marker, fid);
// execute the operation
res = func(trx, envelope);
if (res != TRI_ERROR_NO_ERROR) {
THROW_ARANGO_EXCEPTION(res);
}
}
catch (triagens::basics::Exception const& ex) {
res = ex.code();
}
catch (...) {
res = TRI_ERROR_INTERNAL;
}
if (envelope != nullptr) {
delete envelope;
}
return res;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief executes a single operation inside a transaction
////////////////////////////////////////////////////////////////////////////////
@ -600,6 +513,7 @@ bool RecoverState::InitialScanMarker (TRI_df_marker_t const* marker,
transaction_abort_marker_t const* m = reinterpret_cast<transaction_abort_marker_t const*>(marker);
auto it = state->failedTransactions.find(m->_transactionId);
if (it != state->failedTransactions.end()) {
// delete previous element if present
state->failedTransactions.erase(m->_transactionId);
@ -612,15 +526,15 @@ bool RecoverState::InitialScanMarker (TRI_df_marker_t const* marker,
case TRI_WAL_MARKER_BEGIN_REMOTE_TRANSACTION: {
transaction_remote_begin_marker_t const* m = reinterpret_cast<transaction_remote_begin_marker_t const*>(marker);
// insert this transaction into the list of remote transactions
state->remoteTransactions.emplace(std::make_pair(m->_transactionId, std::make_pair(m->_databaseId, m->_externalId)));
// insert this transaction into the list of failed transactions
state->failedTransactions.emplace(std::make_pair(m->_transactionId, std::make_pair(m->_databaseId, false)));
break;
}
case TRI_WAL_MARKER_COMMIT_REMOTE_TRANSACTION: {
transaction_remote_commit_marker_t const* m = reinterpret_cast<transaction_remote_commit_marker_t const*>(marker);
// remove this transaction from the list of remote transactions
state->remoteTransactions.erase(m->_transactionId);
// remove this transaction from the list of failed transactions
state->failedTransactions.erase(m->_transactionId);
break;
}
@ -629,13 +543,13 @@ bool RecoverState::InitialScanMarker (TRI_df_marker_t const* marker,
// insert this transaction into the list of failed transactions
// the transaction is treated the same as a regular local transaction that is aborted
auto it = state->failedTransactions.find(m->_transactionId);
if (it == state->failedTransactions.end()) {
// insert the transaction into the list of failed transactions
state->failedTransactions.emplace(std::make_pair(m->_transactionId, std::make_pair(m->_databaseId, false)));
if (it != state->failedTransactions.end()) {
state->failedTransactions.erase(m->_transactionId);
}
// remove this transaction from the list of remote transactions
state->remoteTransactions.erase(m->_transactionId);
// and (re-)insert
state->failedTransactions.emplace(std::make_pair(m->_transactionId, std::make_pair(m->_databaseId, true)));
break;
}
/*
@ -795,48 +709,21 @@ bool RecoverState::ReplayMarker (TRI_df_marker_t const* marker,
TRI_shaped_json_t shaped;
TRI_EXTRACT_SHAPED_JSON_MARKER(shaped, m);
int res = TRI_ERROR_NO_ERROR;
int res = state->executeSingleOperation(databaseId, collectionId, marker, datafile->_fid, [&](SingleWriteTransactionType* trx, Marker* envelope) -> int {
if (IsVolatile(trx->trxCollection())) {
return TRI_ERROR_NO_ERROR;
}
if (state->isRemoteTransaction(transactionId)) {
// remote operation
res = state->executeRemoteOperation(databaseId, collectionId, transactionId, marker, datafile->_fid, [&](RemoteTransactionType* trx, Marker* envelope) -> int {
if (IsVolatile(trx->trxCollection(collectionId))) {
return TRI_ERROR_NO_ERROR;
}
TRI_doc_mptr_copy_t mptr;
int res = TRI_InsertShapedJsonDocumentCollection(trx->trxCollection(), (TRI_voc_key_t) key, m->_revisionId, envelope, &mptr, &shaped, nullptr, false, false, true);
TRI_doc_mptr_copy_t mptr;
int res = TRI_InsertShapedJsonDocumentCollection(trx->trxCollection(collectionId), (TRI_voc_key_t) key, m->_revisionId, envelope, &mptr, &shaped, nullptr, false, false, true);
if (res == TRI_ERROR_ARANGO_UNIQUE_CONSTRAINT_VIOLATED) {
state->policy.setExpectedRevision(m->_revisionId);
res = TRI_UpdateShapedJsonDocumentCollection(trx->trxCollection(), (TRI_voc_key_t) key, m->_revisionId, envelope, &mptr, &shaped, &state->policy, false, false);
}
if (res == TRI_ERROR_ARANGO_UNIQUE_CONSTRAINT_VIOLATED) {
state->policy.setExpectedRevision(m->_revisionId);
res = TRI_UpdateShapedJsonDocumentCollection(trx->trxCollection(collectionId), (TRI_voc_key_t) key, m->_revisionId, envelope, &mptr, &shaped, &state->policy, false, false);
}
return res;
});
}
else if (! state->isUsedByRemoteTransaction(collectionId)) {
// local operation
res = state->executeSingleOperation(databaseId, collectionId, marker, datafile->_fid, [&](SingleWriteTransactionType* trx, Marker* envelope) -> int {
if (IsVolatile(trx->trxCollection())) {
return TRI_ERROR_NO_ERROR;
}
TRI_doc_mptr_copy_t mptr;
int res = TRI_InsertShapedJsonDocumentCollection(trx->trxCollection(), (TRI_voc_key_t) key, m->_revisionId, envelope, &mptr, &shaped, nullptr, false, false, true);
if (res == TRI_ERROR_ARANGO_UNIQUE_CONSTRAINT_VIOLATED) {
state->policy.setExpectedRevision(m->_revisionId);
res = TRI_UpdateShapedJsonDocumentCollection(trx->trxCollection(), (TRI_voc_key_t) key, m->_revisionId, envelope, &mptr, &shaped, &state->policy, false, false);
}
return res;
});
}
else {
// ERROR - found a local action for a collection that has an ongoing remote transaction
res = TRI_ERROR_TRANSACTION_INTERNAL;
}
return res;
});
if (res != TRI_ERROR_NO_ERROR &&
res != TRI_ERROR_ARANGO_CONFLICT &&
@ -879,48 +766,21 @@ bool RecoverState::ReplayMarker (TRI_df_marker_t const* marker,
TRI_shaped_json_t shaped;
TRI_EXTRACT_SHAPED_JSON_MARKER(shaped, m);
int res = TRI_ERROR_NO_ERROR;
if (state->isRemoteTransaction(transactionId)) {
// remote operation
res = state->executeRemoteOperation(databaseId, collectionId, transactionId, marker, datafile->_fid, [&](RemoteTransactionType* trx, Marker* envelope) -> int {
if (IsVolatile(trx->trxCollection(collectionId))) {
return TRI_ERROR_NO_ERROR;
}
TRI_doc_mptr_copy_t mptr;
int res = TRI_InsertShapedJsonDocumentCollection(trx->trxCollection(collectionId), (TRI_voc_key_t) key, m->_revisionId, envelope, &mptr, &shaped, &edge, false, false, true);
if (res == TRI_ERROR_ARANGO_UNIQUE_CONSTRAINT_VIOLATED) {
state->policy.setExpectedRevision(m->_revisionId);
res = TRI_UpdateShapedJsonDocumentCollection(trx->trxCollection(collectionId), (TRI_voc_key_t) key, m->_revisionId, envelope, &mptr, &shaped, &state->policy, false, false);
}
return res;
});
}
else if (! state->isUsedByRemoteTransaction(collectionId)) {
// local operation
res = state->executeSingleOperation(databaseId, collectionId, marker, datafile->_fid, [&](SingleWriteTransactionType* trx, Marker* envelope) -> int {
if (IsVolatile(trx->trxCollection())) {
return TRI_ERROR_NO_ERROR;
}
int res = state->executeSingleOperation(databaseId, collectionId, marker, datafile->_fid, [&](SingleWriteTransactionType* trx, Marker* envelope) -> int {
if (IsVolatile(trx->trxCollection())) {
return TRI_ERROR_NO_ERROR;
}
TRI_doc_mptr_copy_t mptr;
int res = TRI_InsertShapedJsonDocumentCollection(trx->trxCollection(), (TRI_voc_key_t) key, m->_revisionId, envelope, &mptr, &shaped, &edge, false, false, true);
TRI_doc_mptr_copy_t mptr;
int res = TRI_InsertShapedJsonDocumentCollection(trx->trxCollection(), (TRI_voc_key_t) key, m->_revisionId, envelope, &mptr, &shaped, &edge, false, false, true);
if (res == TRI_ERROR_ARANGO_UNIQUE_CONSTRAINT_VIOLATED) {
state->policy.setExpectedRevision(m->_revisionId);
res = TRI_UpdateShapedJsonDocumentCollection(trx->trxCollection(), (TRI_voc_key_t) key, m->_revisionId, envelope, &mptr, &shaped, &state->policy, false, false);
}
if (res == TRI_ERROR_ARANGO_UNIQUE_CONSTRAINT_VIOLATED) {
state->policy.setExpectedRevision(m->_revisionId);
res = TRI_UpdateShapedJsonDocumentCollection(trx->trxCollection(), (TRI_voc_key_t) key, m->_revisionId, envelope, &mptr, &shaped, &state->policy, false, false);
}
return res;
});
}
else {
// ERROR - found a local action for a collection that has an ongoing remote transaction
res = TRI_ERROR_TRANSACTION_INTERNAL;
}
return res;
});
if (res != TRI_ERROR_NO_ERROR &&
res != TRI_ERROR_ARANGO_CONFLICT &&
@ -955,40 +815,17 @@ bool RecoverState::ReplayMarker (TRI_df_marker_t const* marker,
char const* base = reinterpret_cast<char const*>(m);
char const* key = base + sizeof(remove_marker_t);
int res = TRI_ERROR_NO_ERROR;
if (state->isRemoteTransaction(transactionId)) {
// remote operation
res = state->executeRemoteOperation(databaseId, collectionId, transactionId, marker, datafile->_fid, [&](RemoteTransactionType* trx, Marker* envelope) -> int {
if (IsVolatile(trx->trxCollection(collectionId))) {
return TRI_ERROR_NO_ERROR;
}
// remove the document and ignore any potential errors
state->policy.setExpectedRevision(m->_revisionId);
TRI_RemoveShapedJsonDocumentCollection(trx->trxCollection(collectionId), (TRI_voc_key_t) key, m->_revisionId, envelope, &state->policy, false, false);
int res = state->executeSingleOperation(databaseId, collectionId, marker, datafile->_fid, [&](SingleWriteTransactionType* trx, Marker* envelope) -> int {
if (IsVolatile(trx->trxCollection())) {
return TRI_ERROR_NO_ERROR;
});
}
else if (! state->isUsedByRemoteTransaction(collectionId)) {
// local operation
res = state->executeSingleOperation(databaseId, collectionId, marker, datafile->_fid, [&](SingleWriteTransactionType* trx, Marker* envelope) -> int {
if (IsVolatile(trx->trxCollection())) {
return TRI_ERROR_NO_ERROR;
}
}
// remove the document and ignore any potential errors
state->policy.setExpectedRevision(m->_revisionId);
TRI_RemoveShapedJsonDocumentCollection(trx->trxCollection(), (TRI_voc_key_t) key, m->_revisionId, envelope, &state->policy, false, false);
// remove the document and ignore any potential errors
state->policy.setExpectedRevision(m->_revisionId);
TRI_RemoveShapedJsonDocumentCollection(trx->trxCollection(), (TRI_voc_key_t) key, m->_revisionId, envelope, &state->policy, false, false);
return TRI_ERROR_NO_ERROR;
});
}
else {
// ERROR - found a local action for a collection that has an ongoing remote transaction
res = TRI_ERROR_TRANSACTION_INTERNAL;
}
return TRI_ERROR_NO_ERROR;
});
if (res != TRI_ERROR_NO_ERROR &&
res != TRI_ERROR_ARANGO_CONFLICT &&
@ -1009,47 +846,6 @@ bool RecoverState::ReplayMarker (TRI_df_marker_t const* marker,
// transactions
// -----------------------------------------------------------------------------
case TRI_WAL_MARKER_BEGIN_REMOTE_TRANSACTION: {
transaction_remote_begin_marker_t const* m = reinterpret_cast<transaction_remote_begin_marker_t const*>(marker);
TRI_voc_tick_t databaseId = m->_databaseId;
TRI_voc_tid_t externalId = m->_externalId;
// start a remote transaction
if (state->isDropped(databaseId)) {
return true;
}
TRI_vocbase_t* vocbase = state->useDatabase(databaseId);
if (vocbase == nullptr) {
LOG_WARNING("cannot start remote transaction in database %llu: %s",
(unsigned long long) databaseId,
TRI_errno_string(TRI_ERROR_ARANGO_DATABASE_NOT_FOUND));
}
auto trx = new RemoteTransactionType(state->server, vocbase, externalId);
if (trx == nullptr) {
LOG_WARNING("unable to start transaction: %s", TRI_errno_string(TRI_ERROR_OUT_OF_MEMORY));
++state->errorCount;
return state->canContinue();
}
trx->addHint(TRI_TRANSACTION_HINT_NO_BEGIN_MARKER, true);
int res = trx->begin();
if (res != TRI_ERROR_NO_ERROR) {
LOG_WARNING("unable to start transaction: %s", TRI_errno_string(TRI_ERROR_OUT_OF_MEMORY));
delete trx;
++state->errorCount;
return state->canContinue();
}
state->runningRemoteTransactions.emplace(std::make_pair(m->_externalId, trx));
break;
}
case TRI_WAL_MARKER_RENAME_COLLECTION: {
collection_rename_marker_t const* m = reinterpret_cast<collection_rename_marker_t const*>(marker);
TRI_voc_cid_t collectionId = m->_collectionId;

View File

@ -41,19 +41,12 @@
#include "Wal/Marker.h"
#include <functional>
////////////////////////////////////////////////////////////////////////////////
/// @brief shortcut for multi-operation remote transaction
////////////////////////////////////////////////////////////////////////////////
#define RemoteTransactionType triagens::arango::ReplicationTransaction
////////////////////////////////////////////////////////////////////////////////
/// @brief shortcut for single-operation write transaction
////////////////////////////////////////////////////////////////////////////////
#define SingleWriteTransactionType triagens::arango::SingleCollectionWriteTransaction<1>
namespace triagens {
namespace wal {
@ -134,14 +127,6 @@ namespace triagens {
return ignoreRecoveryErrors;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief whether or not there are remote transactions
////////////////////////////////////////////////////////////////////////////////
inline bool hasRunningRemoteTransactions () const {
return ! runningRemoteTransactions.empty();
}
////////////////////////////////////////////////////////////////////////////////
/// @brief whether or not the recovery procedure must be run
////////////////////////////////////////////////////////////////////////////////
@ -158,32 +143,6 @@ namespace triagens {
return (transactionId > 0 && failedTransactions.find(transactionId) != failedTransactions.end());
}
////////////////////////////////////////////////////////////////////////////////
/// @brief whether or not a transaction was started remotely
////////////////////////////////////////////////////////////////////////////////
inline bool isRemoteTransaction (TRI_voc_tid_t transactionId) const {
return (transactionId > 0 && remoteTransactions.find(transactionId) != remoteTransactions.end());
}
////////////////////////////////////////////////////////////////////////////////
/// @brief register a collection for a remote transaction
////////////////////////////////////////////////////////////////////////////////
inline void registerRemoteUsage (TRI_voc_tick_t databaseId,
TRI_voc_cid_t collectionId) {
remoteTransactionDatabases.insert(databaseId);
remoteTransactionCollections.insert(collectionId);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief whether or not a transaction was started remotely
////////////////////////////////////////////////////////////////////////////////
inline bool isUsedByRemoteTransaction (TRI_voc_tid_t collectionId) const {
return (remoteTransactionCollections.find(collectionId) != remoteTransactionCollections.end());
}
////////////////////////////////////////////////////////////////////////////////
/// @brief release opened collections and databases so they can be shut down
/// etc.
@ -227,17 +186,6 @@ namespace triagens {
TRI_document_collection_t* getCollection (TRI_voc_tick_t,
TRI_voc_cid_t);
////////////////////////////////////////////////////////////////////////////////
/// @brief executes an operation in a remote transaction
////////////////////////////////////////////////////////////////////////////////
int executeRemoteOperation (TRI_voc_tick_t,
TRI_voc_cid_t,
TRI_voc_tid_t,
TRI_df_marker_t const*,
TRI_voc_fid_t,
std::function<int(RemoteTransactionType*, Marker*)>);
////////////////////////////////////////////////////////////////////////////////
/// @brief executes a single operation inside a transaction
////////////////////////////////////////////////////////////////////////////////
@ -302,9 +250,6 @@ namespace triagens {
TRI_server_t* server;
std::unordered_map<TRI_voc_tid_t, std::pair<TRI_voc_tick_t, bool>> failedTransactions;
std::unordered_map<TRI_voc_tid_t, std::pair<TRI_voc_tick_t, TRI_voc_tid_t>> remoteTransactions;
std::unordered_set<TRI_voc_cid_t> remoteTransactionCollections;
std::unordered_set<TRI_voc_tick_t> remoteTransactionDatabases;
std::unordered_set<TRI_voc_cid_t> droppedCollections;
std::unordered_set<TRI_voc_tick_t> droppedDatabases;
std::unordered_set<TRI_voc_cid_t> droppedIds;
@ -313,7 +258,6 @@ namespace triagens {
std::vector<Logfile*> logfilesToProcess;
std::unordered_map<TRI_voc_cid_t, TRI_vocbase_col_t*> openedCollections;
std::unordered_map<TRI_voc_tick_t, TRI_vocbase_t*> openedDatabases;
std::unordered_map<TRI_voc_tid_t, RemoteTransactionType*> runningRemoteTransactions;
std::vector<std::string> emptyLogfiles;
TRI_doc_update_policy_t policy;