diff --git a/arangod/Replication/ContinuousSyncer.cpp b/arangod/Replication/ContinuousSyncer.cpp index 124b0098af..b6bd9e34f6 100644 --- a/arangod/Replication/ContinuousSyncer.cpp +++ b/arangod/Replication/ContinuousSyncer.cpp @@ -97,6 +97,15 @@ ContinuousSyncer::ContinuousSyncer (TRI_server_t* server, //////////////////////////////////////////////////////////////////////////////// ContinuousSyncer::~ContinuousSyncer () { + // abort all running transactions + for (auto& it : _openInitialTransactions) { + auto trx = it.second; + + if (trx != nullptr) { + trx->abort(); + delete trx; + } + } } // ----------------------------------------------------------------------------- @@ -217,15 +226,49 @@ int ContinuousSyncer::saveApplierState () { return res; } //////////////////////////////////////////////////////////////////////////////// -/// @brief whether or not a collection should be excluded +/// @brief whether or not a marker should be skipped //////////////////////////////////////////////////////////////////////////////// -bool ContinuousSyncer::excludeCollection (TRI_json_t const* json) const { - if (_restrictType == RESTRICT_NONE && _includeSystem) { - return false; +bool ContinuousSyncer::skipMarker (TRI_voc_tick_t firstRegularTick, + TRI_json_t const* json) const { + bool tooOld = false; + string const tick = JsonHelper::getStringValue(json, "tick", ""); + + if (! tick.empty()) { + tooOld = (static_cast(StringUtils::uint64(tick.c_str(), tick.size())) < firstRegularTick); + + if (tooOld) { + int typeValue = JsonHelper::getNumericValue(json, "type", 0); + // handle marker type + TRI_replication_operation_e type = (TRI_replication_operation_e) typeValue; + + if (type == REPLICATION_MARKER_DOCUMENT || + type == REPLICATION_MARKER_EDGE || + type == REPLICATION_MARKER_REMOVE || + type == REPLICATION_TRANSACTION_START || + type == REPLICATION_TRANSACTION_ABORT || + type == REPLICATION_TRANSACTION_COMMIT) { + // read "tid" entry from marker + string const id = JsonHelper::getStringValue(json, "tid", ""); + + if (! id.empty()) { + TRI_voc_tid_t tid = static_cast(StringUtils::uint64(id.c_str(), id.size())); + + if (tid > 0 && + _openInitialTransactions.find(tid) != _openInitialTransactions.end()) { + // must still use this marker as it belongs to a transaction we need to finish + tooOld = false; + } + } + } + } } - if (! TRI_IsObjectJson(json)) { + if (tooOld) { + return true; + } + + if (_restrictType == RESTRICT_NONE && _includeSystem) { return false; } @@ -369,14 +412,17 @@ int ContinuousSyncer::processDocument (TRI_replication_operation_e type, } if (tid > 0) { - auto it = _applier->_runningRemoteTransactions.find(tid); + auto it = _openInitialTransactions.find(tid); - if (it == _applier->_runningRemoteTransactions.end()) { + if (it == _openInitialTransactions.end()) { return TRI_ERROR_REPLICATION_UNEXPECTED_TRANSACTION; } auto trx = (*it).second; - TRI_ASSERT(trx != nullptr); + + if (trx == nullptr) { + return TRI_ERROR_REPLICATION_UNEXPECTED_TRANSACTION; + } TRI_transaction_collection_t* trxCollection = trx->trxCollection(cid); @@ -443,35 +489,35 @@ int ContinuousSyncer::startTransaction (TRI_json_t const* json) { // note: this is the remote transaction id! TRI_voc_tid_t tid = static_cast(StringUtils::uint64(id.c_str(), id.size())); - auto it = _applier->_runningRemoteTransactions.find(tid); + auto it = _openInitialTransactions.find(tid); - if (it != _applier->_runningRemoteTransactions.end()) { + if (it != _openInitialTransactions.end()) { + // found a previous version of the same transaction - should not happen... auto trx = (*it).second; + + _openInitialTransactions.erase(tid); - _applier->_runningRemoteTransactions.erase(tid); + if (trx != nullptr) { + // abort ongoing trx + delete trx; + } - // abort ongoing trx - delete trx; } TRI_ASSERT(tid > 0); - LOG_TRACE("starting replication transaction %llu", (unsigned long long) tid); + LOG_TRACE("starting transaction %llu", (unsigned long long) tid); - auto trx = new ReplicationTransaction(_server, _vocbase, tid); - - if (trx == nullptr) { - return TRI_ERROR_OUT_OF_MEMORY; - } + std::unique_ptr trx(new ReplicationTransaction(_server, _vocbase, tid)); int res = trx->begin(); if (res != TRI_ERROR_NO_ERROR) { - delete trx; return res; } - _applier->_runningRemoteTransactions.insert(it, std::make_pair(tid, trx)); + _openInitialTransactions[tid] = trx.get(); + trx.release(); return TRI_ERROR_NO_ERROR; } @@ -489,12 +535,12 @@ int ContinuousSyncer::abortTransaction (TRI_json_t const* json) { } // transaction id - // note: this is the remote trasnaction id! + // note: this is the remote transaction id! TRI_voc_tid_t const tid = static_cast(StringUtils::uint64(id.c_str(), id.size())); - auto it = _applier->_runningRemoteTransactions.find(tid); + auto it = _openInitialTransactions.find(tid); - if (it == _applier->_runningRemoteTransactions.end()) { + if (it == _openInitialTransactions.end()) { // invalid state, no transaction was started. return TRI_ERROR_REPLICATION_UNEXPECTED_TRANSACTION; } @@ -504,12 +550,16 @@ int ContinuousSyncer::abortTransaction (TRI_json_t const* json) { LOG_TRACE("abort replication transaction %llu", (unsigned long long) tid); auto trx = (*it).second; - _applier->_runningRemoteTransactions.erase(tid); + _openInitialTransactions.erase(tid); - int res = trx->abort(); - delete trx; + if (trx != nullptr) { + int res = trx->abort(); + delete trx; - return res; + return res; + } + + return TRI_ERROR_REPLICATION_UNEXPECTED_TRANSACTION; } //////////////////////////////////////////////////////////////////////////////// @@ -528,8 +578,9 @@ int ContinuousSyncer::commitTransaction (TRI_json_t const* json) { // note: this is the remote trasnaction id! TRI_voc_tid_t const tid = static_cast(StringUtils::uint64(id.c_str(), id.size())); - auto it = _applier->_runningRemoteTransactions.find(tid); - if (it == _applier->_runningRemoteTransactions.end()) { + auto it = _openInitialTransactions.find(tid); + + if (it == _openInitialTransactions.end()) { // invalid state, no transaction was started. return TRI_ERROR_REPLICATION_UNEXPECTED_TRANSACTION; } @@ -539,12 +590,16 @@ int ContinuousSyncer::commitTransaction (TRI_json_t const* json) { LOG_TRACE("committing replication transaction %llu", (unsigned long long) tid); auto trx = (*it).second; - _applier->_runningRemoteTransactions.erase(tid); + _openInitialTransactions.erase(tid); - int res = trx->commit(); - delete trx; + if (trx != nullptr) { + int res = trx->commit(); + delete trx; - return res; + return res; + } + + return TRI_ERROR_REPLICATION_UNEXPECTED_TRANSACTION; } //////////////////////////////////////////////////////////////////////////////// @@ -621,15 +676,6 @@ int ContinuousSyncer::changeCollection (TRI_json_t const* json) { int ContinuousSyncer::applyLogMarker (TRI_json_t const* json, string& errorMsg) { - static const string invalidMsg = "received invalid JSON data"; - - // check data - if (! JsonHelper::isObject(json)) { - errorMsg = invalidMsg; - - return TRI_ERROR_REPLICATION_INVALID_RESPONSE; - } - // fetch marker "type" int typeValue = JsonHelper::getNumericValue(json, "type", 0); @@ -708,6 +754,7 @@ int ContinuousSyncer::applyLogMarker (TRI_json_t const* json, //////////////////////////////////////////////////////////////////////////////// int ContinuousSyncer::applyLog (SimpleHttpResult* response, + TRI_voc_tick_t firstRegularTick, std::string& errorMsg, uint64_t& processedMarkers, uint64_t& ignoreCount) { @@ -746,10 +793,16 @@ int ContinuousSyncer::applyLog (SimpleHttpResult* response, if (json == nullptr) { return TRI_ERROR_OUT_OF_MEMORY; } + + if (! TRI_IsObjectJson(json.get())) { + errorMsg = "received invalid JSON data"; + + return TRI_ERROR_REPLICATION_INVALID_RESPONSE; + } int res; bool skipped; - if (excludeCollection(json.get())) { + if (skipMarker(firstRegularTick, json.get())) { // entry is skipped res = TRI_ERROR_NO_ERROR; skipped = true; @@ -808,7 +861,6 @@ int ContinuousSyncer::applyLog (SimpleHttpResult* response, int ContinuousSyncer::runContinuousSync (string& errorMsg) { uint64_t connectRetries = 0; uint64_t inactiveCycles = 0; - int res = TRI_ERROR_INTERNAL; // get start tick // --------------------------------------- @@ -836,7 +888,19 @@ int ContinuousSyncer::runContinuousSync (string& errorMsg) { return TRI_ERROR_REPLICATION_NO_START_TICK; } - // TODO: get the applier into a sensible start state... + // get the applier into a sensible start state by fetching the list of + // open transactions from the master + TRI_voc_tick_t fetchTick = 0; + int res = fetchMasterState(errorMsg, 0, fromTick, fetchTick); + + if (res != TRI_ERROR_NO_ERROR) { + return res; + } + + if (fetchTick > fromTick) { + // must not happen + return TRI_ERROR_INTERNAL; + } // run in a loop. the loop is terminated when the applier is stopped or an // error occurs @@ -844,8 +908,8 @@ int ContinuousSyncer::runContinuousSync (string& errorMsg) { bool worked; bool masterActive = false; - // fromTick is passed by reference! - res = followMasterLog(errorMsg, fromTick, _configuration._ignoreErrors, worked, masterActive); + // fetchTick is passed by reference! + res = followMasterLog(errorMsg, fetchTick, fromTick, _configuration._ignoreErrors, worked, masterActive); uint64_t sleepTime; @@ -928,12 +992,125 @@ int ContinuousSyncer::runContinuousSync (string& errorMsg) { return res; } +//////////////////////////////////////////////////////////////////////////////// +/// @brief fetch the initial master state +//////////////////////////////////////////////////////////////////////////////// + +int ContinuousSyncer::fetchMasterState (string& errorMsg, + TRI_voc_tick_t fromTick, + TRI_voc_tick_t toTick, + TRI_voc_tick_t& startTick) { + string const baseUrl = BaseUrl + "/determine-open-transactions"; + + map headers; + + string const url = baseUrl + + "?serverId=" + _localServerIdString + + "&from=" + StringUtils::itoa(fromTick) + + "&to=" + StringUtils::itoa(toTick); + + string const progress = "fetching initial master state with from tick " + StringUtils::itoa(fromTick) + ", toTick " + StringUtils::itoa(toTick); + + LOG_TRACE("fetching initial master state with from tick %llu, to tick %llu, url %s", + (unsigned long long) fromTick, + (unsigned long long) toTick, + url.c_str()); + + // send request + setProgress(progress.c_str()); + + SimpleHttpResult* response = _client->request(HttpRequest::HTTP_REQUEST_GET, + url, + nullptr, + 0, + headers); + + if (response == nullptr || ! response->isComplete()) { + errorMsg = "got invalid response from master at " + string(_masterInfo._endpoint) + + ": " + _client->getErrorMessage(); + + if (response != nullptr) { + delete response; + } + + return TRI_ERROR_REPLICATION_NO_RESPONSE; + } + + if (response->wasHttpError()) { + errorMsg = "got invalid response from master at " + string(_masterInfo._endpoint) + + ": HTTP " + StringUtils::itoa(response->getHttpReturnCode()) + + ": " + response->getHttpReturnMessage(); + + delete response; + + return TRI_ERROR_REPLICATION_MASTER_ERROR; + } + + bool fromIncluded = false; + + bool found; + string header = response->getHeaderField(TRI_REPLICATION_HEADER_FROMPRESENT, found); + + if (found) { + fromIncluded = StringUtils::boolean(header); + } + + if (! fromIncluded && + _requireFromPresent) { + errorMsg = "required tick value '" + StringUtils::itoa(fromTick) + "' is not present on master at " + string(_masterInfo._endpoint); + delete response; + + return TRI_ERROR_REPLICATION_START_TICK_NOT_PRESENT; + } + + // fetch the tick from where we need to start scanning later + header = response->getHeaderField(TRI_REPLICATION_HEADER_LASTTICK, found); + + if (! found) { + errorMsg = "got invalid response from master at " + string(_masterInfo._endpoint) + + ": required header " + TRI_REPLICATION_HEADER_LASTTICK + " is missing"; + + delete response; + return TRI_ERROR_REPLICATION_INVALID_RESPONSE; + } + + startTick = StringUtils::uint64(header); + + StringBuffer& data = response->getBody(); + std::unique_ptr json(TRI_JsonString(TRI_UNKNOWN_MEM_ZONE, data.begin())); + + delete response; + + if (! TRI_IsArrayJson(json.get())) { + errorMsg = "got invalid response from master at " + string(_masterInfo._endpoint) + + ": invalid response type for initial data. expecting array"; + + return TRI_ERROR_REPLICATION_INVALID_RESPONSE; + } + + for (size_t i = 0; i < TRI_LengthArrayJson(json.get()); ++i) { + auto id = static_cast(TRI_AtVector(&(json.get()->_value._objects), i)); + + if (! TRI_IsStringJson(id)) { + errorMsg = "got invalid response from master at " + string(_masterInfo._endpoint) + + ": invalid response type for initial data. expecting array of ids"; + + return TRI_ERROR_REPLICATION_INVALID_RESPONSE; + } + + _openInitialTransactions.emplace(StringUtils::uint64(id->_value._string.data, id->_value._string.length - 1), nullptr); + } + + return TRI_ERROR_NO_ERROR; +} + //////////////////////////////////////////////////////////////////////////////// /// @brief run the continuous synchronisation //////////////////////////////////////////////////////////////////////////////// int ContinuousSyncer::followMasterLog (string& errorMsg, - TRI_voc_tick_t& fromTick, + TRI_voc_tick_t& fetchTick, + TRI_voc_tick_t firstRegularTick, uint64_t& ignoreCount, bool& worked, bool& masterActive) { @@ -942,24 +1119,48 @@ int ContinuousSyncer::followMasterLog (string& errorMsg, map headers; worked = false; - string const tickString = StringUtils::itoa(fromTick); + string const tickString = StringUtils::itoa(fetchTick); string const url = baseUrl + "&from=" + tickString + + "&firstRegular=" + StringUtils::itoa(firstRegularTick) + "&serverId=" + _localServerIdString + "&includeSystem=" + (_includeSystem ? "true" : "false"); LOG_TRACE("running continuous replication request with tick %llu, url %s", - (unsigned long long) fromTick, + (unsigned long long) fetchTick, url.c_str()); // send request string const progress = "fetching master log from offset " + tickString; setProgress(progress.c_str()); - SimpleHttpResult* response = _client->request(HttpRequest::HTTP_REQUEST_GET, + std::string body; + + if (! _openInitialTransactions.empty()) { + // stringify list of open transactions + body.append("[\""); + bool first = true; + + for (auto& it : _openInitialTransactions) { + if (first) { + first = false; + } + else { + body.append("\",\""); + } + + body.append(StringUtils::itoa(it.first)); + } + body.append("\"]"); + } + else { + body.append("[]"); + } + + SimpleHttpResult* response = _client->request(HttpRequest::HTTP_REQUEST_PUT, url, - nullptr, - 0, + body.c_str(), + body.size(), headers); if (response == nullptr || ! response->isComplete()) { @@ -1011,8 +1212,8 @@ int ContinuousSyncer::followMasterLog (string& errorMsg, if (found) { tick = StringUtils::uint64(header); - if (tick > fromTick) { - fromTick = tick; + if (tick > fetchTick) { + fetchTick = tick; worked = true; } else { @@ -1052,7 +1253,7 @@ int ContinuousSyncer::followMasterLog (string& errorMsg, } uint64_t processedMarkers = 0; - res = applyLog(response, errorMsg, processedMarkers, ignoreCount); + res = applyLog(response, firstRegularTick, errorMsg, processedMarkers, ignoreCount); if (processedMarkers > 0) { worked = true; diff --git a/arangod/Replication/ContinuousSyncer.h b/arangod/Replication/ContinuousSyncer.h index 0d010d4268..6f4f3f9896 100644 --- a/arangod/Replication/ContinuousSyncer.h +++ b/arangod/Replication/ContinuousSyncer.h @@ -51,6 +51,7 @@ namespace triagens { } namespace arango { + class ReplicationTransaction; enum RestrictType : uint32_t { RESTRICT_NONE, @@ -128,7 +129,8 @@ namespace triagens { /// @brief whether or not a collection should be excluded //////////////////////////////////////////////////////////////////////////////// - bool excludeCollection (struct TRI_json_t const*) const; + bool skipMarker (TRI_voc_tick_t, + struct TRI_json_t const*) const; //////////////////////////////////////////////////////////////////////////////// /// @brief whether or not a collection should be excluded @@ -192,6 +194,7 @@ namespace triagens { //////////////////////////////////////////////////////////////////////////////// int applyLog (httpclient::SimpleHttpResult*, + TRI_voc_tick_t, std::string&, uint64_t&, uint64_t&); @@ -202,12 +205,22 @@ namespace triagens { int runContinuousSync (std::string&); +//////////////////////////////////////////////////////////////////////////////// +/// @brief fetch the initial master state +//////////////////////////////////////////////////////////////////////////////// + + int fetchMasterState (std::string&, + TRI_voc_tick_t, + TRI_voc_tick_t, + TRI_voc_tick_t&); + //////////////////////////////////////////////////////////////////////////////// /// @brief run the continuous synchronisation //////////////////////////////////////////////////////////////////////////////// int followMasterLog (std::string&, TRI_voc_tick_t&, + TRI_voc_tick_t, uint64_t&, bool&, bool&); @@ -267,6 +280,12 @@ namespace triagens { bool _requireFromPresent; +//////////////////////////////////////////////////////////////////////////////// +/// @brief which transactions were open and need to be treated specially +//////////////////////////////////////////////////////////////////////////////// + + std::unordered_map _openInitialTransactions; + }; } diff --git a/arangod/RestHandler/RestReplicationHandler.cpp b/arangod/RestHandler/RestReplicationHandler.cpp index c3dc04630e..4470c638dc 100644 --- a/arangod/RestHandler/RestReplicationHandler.cpp +++ b/arangod/RestHandler/RestReplicationHandler.cpp @@ -120,7 +120,8 @@ HttpHandler::status_t RestReplicationHandler::execute () { handleCommandLoggerFirstTick(); } else if (command == "logger-follow") { - if (type != HttpRequest::HTTP_REQUEST_GET) { + if (type != HttpRequest::HTTP_REQUEST_GET && + type != HttpRequest::HTTP_REQUEST_PUT) { goto BAD_CALL; } handleCommandLoggerFollow(); @@ -1098,7 +1099,8 @@ void RestReplicationHandler::handleCommandLoggerFollow () { // determine start and end tick triagens::wal::LogfileManagerState state = triagens::wal::LogfileManager::instance()->state(); TRI_voc_tick_t tickStart = 0; - TRI_voc_tick_t tickEnd = state.lastDataTick; + TRI_voc_tick_t tickEnd = state.lastDataTick; + TRI_voc_tick_t firstRegularTick = 0; bool found; char const* value; @@ -1128,6 +1130,39 @@ void RestReplicationHandler::handleCommandLoggerFollow () { includeSystem = StringUtils::boolean(value); } + // grab list of transactions from the body value + std::unordered_set transactionIds; + + if (_request->requestType() == triagens::rest::HttpRequest::HTTP_REQUEST_PUT) { + value = _request->value("firstRegularTick", found); + if (found) { + firstRegularTick = static_cast(StringUtils::uint64(value)); + } + + char const* ptr = _request->body(); + + std::unique_ptr json(TRI_JsonString(TRI_UNKNOWN_MEM_ZONE, ptr)); + + if (! TRI_IsArrayJson(json.get())) { + generateError(HttpResponse::BAD, + TRI_ERROR_HTTP_BAD_PARAMETER, + "invalid body value. expecting array"); + return; + } + + for (size_t i = 0; i < TRI_LengthArrayJson(json.get()); ++i) { + auto id = static_cast(TRI_AtVector(&(json.get()->_value._objects), i)); + if (! TRI_IsStringJson(id)) { + generateError(HttpResponse::BAD, + TRI_ERROR_HTTP_BAD_PARAMETER, + "invalid body value. expecting array of ids"); + return; + } + + transactionIds.emplace(StringUtils::uint64(id->_value._string.data, id->_value._string.length - 1)); + } + } + int res = TRI_ERROR_NO_ERROR; try { @@ -1135,7 +1170,7 @@ void RestReplicationHandler::handleCommandLoggerFollow () { TRI_replication_dump_t dump(_vocbase, (size_t) determineChunkSize(), includeSystem); // and dump - res = TRI_DumpLogReplication(&dump, tickStart, tickEnd, false); + res = TRI_DumpLogReplication(&dump, transactionIds, firstRegularTick, tickStart, tickEnd, false); if (res == TRI_ERROR_NO_ERROR) { bool const checkMore = (dump._lastFoundTick > 0 && dump._lastFoundTick != state.lastDataTick); @@ -1244,6 +1279,9 @@ void RestReplicationHandler::handleCommandDetermineOpenTransactions () { } _response->setContentType("application/x-arango-dump; charset=utf-8"); + + _response->setHeader(TRI_CHAR_LENGTH_PAIR(TRI_REPLICATION_HEADER_FROMPRESENT), dump._fromTickIncluded ? "true" : "false"); + _response->setHeader(TRI_CHAR_LENGTH_PAIR(TRI_REPLICATION_HEADER_LASTTICK), StringUtils::itoa(dump._lastFoundTick)); if (length > 0) { // transfer ownership of the buffer contents @@ -2201,7 +2239,8 @@ int RestReplicationHandler::processRestoreIndexesCoordinator ( return TRI_ERROR_HTTP_BAD_PARAMETER; } - const size_t n = TRI_LengthArrayJson(indexes); + size_t const n = TRI_LengthArrayJson(indexes); + if (n == 0) { // nothing to do return TRI_ERROR_NO_ERROR; @@ -3661,6 +3700,7 @@ void RestReplicationHandler::handleCommandApplierSetConfig () { if (TRI_IsArrayJson(value)) { config._restrictCollections.clear(); size_t const n = TRI_LengthArrayJson(value); + for (size_t i = 0; i < n; ++i) { TRI_json_t const* collection = TRI_LookupArrayJson(value, i); if (TRI_IsStringJson(collection)) { diff --git a/arangod/V8Server/v8-replication.cpp b/arangod/V8Server/v8-replication.cpp index 129789a81a..81a91f58b7 100644 --- a/arangod/V8Server/v8-replication.cpp +++ b/arangod/V8Server/v8-replication.cpp @@ -98,7 +98,7 @@ static void JS_LastLoggerReplication (const v8::FunctionCallbackInfo& TRI_voc_tick_t tickStart = TRI_ObjectToUInt64(args[0], true); TRI_voc_tick_t tickEnd = TRI_ObjectToUInt64(args[1], true); - int res = TRI_DumpLogReplication(&dump, tickStart, tickEnd, true); + int res = TRI_DumpLogReplication(&dump, std::unordered_set(), 0, tickStart, tickEnd, true); if (res != TRI_ERROR_NO_ERROR) { TRI_V8_THROW_EXCEPTION(res); diff --git a/arangod/VocBase/replication-applier.cpp b/arangod/VocBase/replication-applier.cpp index 4f43118c77..da4033d8a9 100644 --- a/arangod/VocBase/replication-applier.cpp +++ b/arangod/VocBase/replication-applier.cpp @@ -1055,14 +1055,6 @@ TRI_replication_applier_t::~TRI_replication_applier_t () { stop(true); TRI_DestroyStateReplicationApplier(&_state); TRI_DestroyConfigurationReplicationApplier(&_configuration); - - for (auto it = _runningRemoteTransactions.begin(); it != _runningRemoteTransactions.end(); ++it) { - auto trx = (*it).second; - - // do NOT write abort markers so we can resume running transactions later - trx->addHint(TRI_TRANSACTION_HINT_NO_ABORT_MARKER, true); - delete trx; - } } //////////////////////////////////////////////////////////////////////////////// @@ -1233,12 +1225,6 @@ int TRI_replication_applier_t::shutdown () { setTermination(false); - { - WRITE_LOCKER(_statusLock); - // really abort all ongoing transactions - abortRunningRemoteTransactions(); - } - LOG_INFO("stopped replication applier for database '%s'", _databaseName.c_str()); return res; diff --git a/arangod/VocBase/replication-applier.h b/arangod/VocBase/replication-applier.h index b4cfa2e548..c3ebdde345 100644 --- a/arangod/VocBase/replication-applier.h +++ b/arangod/VocBase/replication-applier.h @@ -136,10 +136,6 @@ class TRI_replication_applier_t { _terminateThread.store(value); } - void addRemoteTransaction (triagens::arango::ReplicationTransaction* trx) { - _runningRemoteTransactions.insert(std::make_pair(trx->externalId(), trx)); - } - //////////////////////////////////////////////////////////////////////////////// /// @brief return the database name //////////////////////////////////////////////////////////////////////////////// @@ -173,21 +169,6 @@ class TRI_replication_applier_t { int shutdown (); - void abortRunningRemoteTransactions () { - size_t const n = _runningRemoteTransactions.size(); - triagens::arango::TransactionBase::increaseNumbers((int) n, (int) n); - - for (auto it = _runningRemoteTransactions.begin(); it != _runningRemoteTransactions.end(); ++it) { - auto trx = (*it).second; - - // do NOT write abort markers so we can resume running transactions later - trx->removeHint(TRI_TRANSACTION_HINT_NO_ABORT_MARKER, true); - delete trx; - } - - _runningRemoteTransactions.clear(); - } - //////////////////////////////////////////////////////////////////////////////// /// @brief set the progress with or without a lock //////////////////////////////////////////////////////////////////////////////// @@ -228,7 +209,6 @@ class TRI_replication_applier_t { TRI_replication_applier_state_t _state; TRI_replication_applier_configuration_t _configuration; TRI_thread_t _thread; - std::unordered_map _runningRemoteTransactions; }; // ----------------------------------------------------------------------------- diff --git a/arangod/VocBase/replication-common.h b/arangod/VocBase/replication-common.h index 88b77d3233..aa590e80f5 100644 --- a/arangod/VocBase/replication-common.h +++ b/arangod/VocBase/replication-common.h @@ -31,7 +31,6 @@ #define ARANGODB_VOC_BASE_REPLICATION__COMMON_H 1 #include "Basics/Common.h" - #include "VocBase/voc-types.h" // ----------------------------------------------------------------------------- diff --git a/arangod/VocBase/replication-dump.cpp b/arangod/VocBase/replication-dump.cpp index 55dd5d8308..d2f9fc474a 100644 --- a/arangod/VocBase/replication-dump.cpp +++ b/arangod/VocBase/replication-dump.cpp @@ -1070,6 +1070,42 @@ static TRI_voc_tick_t GetCollectionFromWalMarker (TRI_df_marker_t const* marker) } } +//////////////////////////////////////////////////////////////////////////////// +/// @brief helper function to extract a transaction id from a marker +//////////////////////////////////////////////////////////////////////////////// + +template +static TRI_voc_tid_t GetTransactionId (TRI_df_marker_t const* marker) { + T const* m = reinterpret_cast(marker); + return m->_transactionId; +} + +//////////////////////////////////////////////////////////////////////////////// +/// @brief get the transaction id from a marker +//////////////////////////////////////////////////////////////////////////////// + +static TRI_voc_tid_t GetTransactionFromWalMarker (TRI_df_marker_t const* marker) { + TRI_ASSERT_EXPENSIVE(MustReplicateWalMarkerType(marker)); + + switch (marker->_type) { + case TRI_WAL_MARKER_DOCUMENT: + return GetTransactionId(marker); + case TRI_WAL_MARKER_EDGE: + return GetTransactionId(marker); + case TRI_WAL_MARKER_REMOVE: + return GetTransactionId(marker); + case TRI_WAL_MARKER_BEGIN_TRANSACTION: + return GetTransactionId(marker); + case TRI_WAL_MARKER_COMMIT_TRANSACTION: + return GetTransactionId(marker); + case TRI_WAL_MARKER_ABORT_TRANSACTION: + return GetTransactionId(marker); + default: { + return 0; + } + } +} + //////////////////////////////////////////////////////////////////////////////// /// @brief whether or not a marker belongs to a transaction //////////////////////////////////////////////////////////////////////////////// @@ -1094,7 +1130,9 @@ static bool IsTransactionWalMarker (TRI_replication_dump_t* dump, //////////////////////////////////////////////////////////////////////////////// static bool MustReplicateWalMarker (TRI_replication_dump_t* dump, - TRI_df_marker_t const* marker) { + TRI_df_marker_t const* marker, + TRI_voc_tick_t firstRegularTick, + std::unordered_set const& transactionIds) { // first check the marker type if (! MustReplicateWalMarkerType(marker)) { return false; @@ -1115,6 +1153,18 @@ static bool MustReplicateWalMarker (TRI_replication_dump_t* dump, } } + if (marker->_tick >= firstRegularTick) { + return true; + } + + if (! transactionIds.empty()) { + TRI_voc_tid_t tid = GetTransactionFromWalMarker(marker); + if (tid == 0 || + transactionIds.find(tid) == transactionIds.end()) { + return false; + } + } + return true; } @@ -1384,6 +1434,8 @@ int TRI_DumpCollectionReplication (TRI_replication_dump_t* dump, //////////////////////////////////////////////////////////////////////////////// int TRI_DumpLogReplication (TRI_replication_dump_t* dump, + std::unordered_set const& transactionIds, + TRI_voc_tick_t firstRegularTick, TRI_voc_tick_t tickMin, TRI_voc_tick_t tickMax, bool outputAsArray) { @@ -1438,14 +1490,14 @@ int TRI_DumpLogReplication (TRI_replication_dump_t* dump, if (foundTick >= tickMax) { hasMore = false; - } - if (foundTick > tickMax) { - // marker too new - break; + if (foundTick > tickMax) { + // marker too new + break; + } } - if (! MustReplicateWalMarker(dump, marker)) { + if (! MustReplicateWalMarker(dump, marker, firstRegularTick, transactionIds)) { continue; } diff --git a/arangod/VocBase/replication-dump.h b/arangod/VocBase/replication-dump.h index 6e90caddcd..a0f5589fd7 100644 --- a/arangod/VocBase/replication-dump.h +++ b/arangod/VocBase/replication-dump.h @@ -128,6 +128,8 @@ int TRI_DumpCollectionReplication (TRI_replication_dump_t*, //////////////////////////////////////////////////////////////////////////////// int TRI_DumpLogReplication (TRI_replication_dump_t*, + std::unordered_set const&, + TRI_voc_tick_t, TRI_voc_tick_t, TRI_voc_tick_t, bool); diff --git a/arangod/Wal/CollectorThread.cpp b/arangod/Wal/CollectorThread.cpp index 034d1dbb4b..0bf8ad60b6 100644 --- a/arangod/Wal/CollectorThread.cpp +++ b/arangod/Wal/CollectorThread.cpp @@ -72,7 +72,7 @@ static inline TRI_doc_datafile_info_t& createDfi (CollectorCache* cache, TRI_doc_datafile_info_t dfi; memset(&dfi, 0, sizeof(TRI_doc_datafile_info_t)); - cache->dfi.emplace(std::make_pair(fid, dfi)); + cache->dfi.emplace(fid, dfi); return getDfi(cache, fid); } @@ -1133,7 +1133,7 @@ int CollectorThread::queueOperations (triagens::wal::Logfile* logfile, if (it == _operationsQueue.end()) { std::vector ops; ops.push_back(cache); - _operationsQueue.emplace(std::make_pair(cid, ops)); + _operationsQueue.emplace(cid, ops); _logfileManager->increaseCollectQueueSize(logfile); } else { diff --git a/arangod/Wal/RecoverState.cpp b/arangod/Wal/RecoverState.cpp index 8a20b8fcaf..3f3d212703 100644 --- a/arangod/Wal/RecoverState.cpp +++ b/arangod/Wal/RecoverState.cpp @@ -179,14 +179,10 @@ RecoverState::RecoverState (TRI_server_t* server, bool ignoreRecoveryErrors) : server(server), failedTransactions(), - remoteTransactions(), - remoteTransactionCollections(), - remoteTransactionDatabases(), lastTick(0), logfilesToProcess(), openedCollections(), openedDatabases(), - runningRemoteTransactions(), emptyLogfiles(), policy(TRI_DOC_UPDATE_ONLY_IF_NEWER, 0, nullptr), ignoreRecoveryErrors(ignoreRecoveryErrors), @@ -199,15 +195,6 @@ RecoverState::RecoverState (TRI_server_t* server, RecoverState::~RecoverState () { releaseResources(); - - // free running remote transactions - for (auto it = runningRemoteTransactions.begin(); it != runningRemoteTransactions.end(); ++it) { - auto trx = (*it).second; - - delete trx; - } - - runningRemoteTransactions.clear(); } // ----------------------------------------------------------------------------- @@ -220,25 +207,6 @@ RecoverState::~RecoverState () { //////////////////////////////////////////////////////////////////////////////// void RecoverState::releaseResources () { - // hand over running remote transactions to the applier - for (auto it = runningRemoteTransactions.begin(); it != runningRemoteTransactions.end(); ++it) { - auto* trx = (*it).second; - - TRI_vocbase_t* vocbase = trx->vocbase(); - TRI_ASSERT(vocbase != nullptr); - - auto* applier = vocbase->_replicationApplier; - TRI_ASSERT(applier != nullptr); - - applier->addRemoteTransaction(trx); - } - - // reset trx counter as we're moving transactions from this thread to a potential other - triagens::arango::TransactionBase::setNumbers(0, 0); - - runningRemoteTransactions.clear(); - - // release all collections for (auto it = openedCollections.begin(); it != openedCollections.end(); ++it) { TRI_vocbase_col_t* collection = (*it).second; @@ -411,61 +379,6 @@ TRI_document_collection_t* RecoverState::getCollection (TRI_voc_tick_t databaseI return document; } -//////////////////////////////////////////////////////////////////////////////// -/// @brief executes an operation in a remote transaction -//////////////////////////////////////////////////////////////////////////////// - -int RecoverState::executeRemoteOperation (TRI_voc_tick_t databaseId, - TRI_voc_cid_t collectionId, - TRI_voc_tid_t transactionId, - TRI_df_marker_t const* marker, - TRI_voc_fid_t fid, - std::function func) { - - auto it = remoteTransactions.find(transactionId); - if (it == remoteTransactions.end()) { - LOG_WARNING("remote transaction not found: internal error"); - return TRI_ERROR_INTERNAL; - } - - TRI_voc_tid_t externalId = (*it).second.second; - auto it2 = runningRemoteTransactions.find(externalId); - if (it2 == runningRemoteTransactions.end()) { - LOG_WARNING("remote transaction not found: internal error"); - return TRI_ERROR_INTERNAL; - } - - auto trx = (*it2).second; - - registerRemoteUsage(databaseId, collectionId); - - EnvelopeMarker* envelope = nullptr; - int res = TRI_ERROR_INTERNAL; - - try { - envelope = new EnvelopeMarker(marker, fid); - - // execute the operation - res = func(trx, envelope); - - if (res != TRI_ERROR_NO_ERROR) { - THROW_ARANGO_EXCEPTION(res); - } - } - catch (triagens::basics::Exception const& ex) { - res = ex.code(); - } - catch (...) { - res = TRI_ERROR_INTERNAL; - } - - if (envelope != nullptr) { - delete envelope; - } - - return res; -} - //////////////////////////////////////////////////////////////////////////////// /// @brief executes a single operation inside a transaction //////////////////////////////////////////////////////////////////////////////// @@ -600,6 +513,7 @@ bool RecoverState::InitialScanMarker (TRI_df_marker_t const* marker, transaction_abort_marker_t const* m = reinterpret_cast(marker); auto it = state->failedTransactions.find(m->_transactionId); + if (it != state->failedTransactions.end()) { // delete previous element if present state->failedTransactions.erase(m->_transactionId); @@ -612,15 +526,15 @@ bool RecoverState::InitialScanMarker (TRI_df_marker_t const* marker, case TRI_WAL_MARKER_BEGIN_REMOTE_TRANSACTION: { transaction_remote_begin_marker_t const* m = reinterpret_cast(marker); - // insert this transaction into the list of remote transactions - state->remoteTransactions.emplace(std::make_pair(m->_transactionId, std::make_pair(m->_databaseId, m->_externalId))); + // insert this transaction into the list of failed transactions + state->failedTransactions.emplace(std::make_pair(m->_transactionId, std::make_pair(m->_databaseId, false))); break; } case TRI_WAL_MARKER_COMMIT_REMOTE_TRANSACTION: { transaction_remote_commit_marker_t const* m = reinterpret_cast(marker); - // remove this transaction from the list of remote transactions - state->remoteTransactions.erase(m->_transactionId); + // remove this transaction from the list of failed transactions + state->failedTransactions.erase(m->_transactionId); break; } @@ -629,13 +543,13 @@ bool RecoverState::InitialScanMarker (TRI_df_marker_t const* marker, // insert this transaction into the list of failed transactions // the transaction is treated the same as a regular local transaction that is aborted auto it = state->failedTransactions.find(m->_transactionId); - if (it == state->failedTransactions.end()) { - // insert the transaction into the list of failed transactions - state->failedTransactions.emplace(std::make_pair(m->_transactionId, std::make_pair(m->_databaseId, false))); + + if (it != state->failedTransactions.end()) { + state->failedTransactions.erase(m->_transactionId); } - // remove this transaction from the list of remote transactions - state->remoteTransactions.erase(m->_transactionId); + // and (re-)insert + state->failedTransactions.emplace(std::make_pair(m->_transactionId, std::make_pair(m->_databaseId, true))); break; } /* @@ -795,48 +709,21 @@ bool RecoverState::ReplayMarker (TRI_df_marker_t const* marker, TRI_shaped_json_t shaped; TRI_EXTRACT_SHAPED_JSON_MARKER(shaped, m); - int res = TRI_ERROR_NO_ERROR; + int res = state->executeSingleOperation(databaseId, collectionId, marker, datafile->_fid, [&](SingleWriteTransactionType* trx, Marker* envelope) -> int { + if (IsVolatile(trx->trxCollection())) { + return TRI_ERROR_NO_ERROR; + } - if (state->isRemoteTransaction(transactionId)) { - // remote operation - res = state->executeRemoteOperation(databaseId, collectionId, transactionId, marker, datafile->_fid, [&](RemoteTransactionType* trx, Marker* envelope) -> int { - if (IsVolatile(trx->trxCollection(collectionId))) { - return TRI_ERROR_NO_ERROR; - } + TRI_doc_mptr_copy_t mptr; + int res = TRI_InsertShapedJsonDocumentCollection(trx->trxCollection(), (TRI_voc_key_t) key, m->_revisionId, envelope, &mptr, &shaped, nullptr, false, false, true); - TRI_doc_mptr_copy_t mptr; - int res = TRI_InsertShapedJsonDocumentCollection(trx->trxCollection(collectionId), (TRI_voc_key_t) key, m->_revisionId, envelope, &mptr, &shaped, nullptr, false, false, true); + if (res == TRI_ERROR_ARANGO_UNIQUE_CONSTRAINT_VIOLATED) { + state->policy.setExpectedRevision(m->_revisionId); + res = TRI_UpdateShapedJsonDocumentCollection(trx->trxCollection(), (TRI_voc_key_t) key, m->_revisionId, envelope, &mptr, &shaped, &state->policy, false, false); + } - if (res == TRI_ERROR_ARANGO_UNIQUE_CONSTRAINT_VIOLATED) { - state->policy.setExpectedRevision(m->_revisionId); - res = TRI_UpdateShapedJsonDocumentCollection(trx->trxCollection(collectionId), (TRI_voc_key_t) key, m->_revisionId, envelope, &mptr, &shaped, &state->policy, false, false); - } - - return res; - }); - } - else if (! state->isUsedByRemoteTransaction(collectionId)) { - // local operation - res = state->executeSingleOperation(databaseId, collectionId, marker, datafile->_fid, [&](SingleWriteTransactionType* trx, Marker* envelope) -> int { - if (IsVolatile(trx->trxCollection())) { - return TRI_ERROR_NO_ERROR; - } - - TRI_doc_mptr_copy_t mptr; - int res = TRI_InsertShapedJsonDocumentCollection(trx->trxCollection(), (TRI_voc_key_t) key, m->_revisionId, envelope, &mptr, &shaped, nullptr, false, false, true); - - if (res == TRI_ERROR_ARANGO_UNIQUE_CONSTRAINT_VIOLATED) { - state->policy.setExpectedRevision(m->_revisionId); - res = TRI_UpdateShapedJsonDocumentCollection(trx->trxCollection(), (TRI_voc_key_t) key, m->_revisionId, envelope, &mptr, &shaped, &state->policy, false, false); - } - - return res; - }); - } - else { - // ERROR - found a local action for a collection that has an ongoing remote transaction - res = TRI_ERROR_TRANSACTION_INTERNAL; - } + return res; + }); if (res != TRI_ERROR_NO_ERROR && res != TRI_ERROR_ARANGO_CONFLICT && @@ -879,48 +766,21 @@ bool RecoverState::ReplayMarker (TRI_df_marker_t const* marker, TRI_shaped_json_t shaped; TRI_EXTRACT_SHAPED_JSON_MARKER(shaped, m); - int res = TRI_ERROR_NO_ERROR; - - if (state->isRemoteTransaction(transactionId)) { - // remote operation - res = state->executeRemoteOperation(databaseId, collectionId, transactionId, marker, datafile->_fid, [&](RemoteTransactionType* trx, Marker* envelope) -> int { - if (IsVolatile(trx->trxCollection(collectionId))) { - return TRI_ERROR_NO_ERROR; - } - - TRI_doc_mptr_copy_t mptr; - int res = TRI_InsertShapedJsonDocumentCollection(trx->trxCollection(collectionId), (TRI_voc_key_t) key, m->_revisionId, envelope, &mptr, &shaped, &edge, false, false, true); - - if (res == TRI_ERROR_ARANGO_UNIQUE_CONSTRAINT_VIOLATED) { - state->policy.setExpectedRevision(m->_revisionId); - res = TRI_UpdateShapedJsonDocumentCollection(trx->trxCollection(collectionId), (TRI_voc_key_t) key, m->_revisionId, envelope, &mptr, &shaped, &state->policy, false, false); - } - - return res; - }); - } - else if (! state->isUsedByRemoteTransaction(collectionId)) { - // local operation - res = state->executeSingleOperation(databaseId, collectionId, marker, datafile->_fid, [&](SingleWriteTransactionType* trx, Marker* envelope) -> int { - if (IsVolatile(trx->trxCollection())) { - return TRI_ERROR_NO_ERROR; - } + int res = state->executeSingleOperation(databaseId, collectionId, marker, datafile->_fid, [&](SingleWriteTransactionType* trx, Marker* envelope) -> int { + if (IsVolatile(trx->trxCollection())) { + return TRI_ERROR_NO_ERROR; + } - TRI_doc_mptr_copy_t mptr; - int res = TRI_InsertShapedJsonDocumentCollection(trx->trxCollection(), (TRI_voc_key_t) key, m->_revisionId, envelope, &mptr, &shaped, &edge, false, false, true); + TRI_doc_mptr_copy_t mptr; + int res = TRI_InsertShapedJsonDocumentCollection(trx->trxCollection(), (TRI_voc_key_t) key, m->_revisionId, envelope, &mptr, &shaped, &edge, false, false, true); - if (res == TRI_ERROR_ARANGO_UNIQUE_CONSTRAINT_VIOLATED) { - state->policy.setExpectedRevision(m->_revisionId); - res = TRI_UpdateShapedJsonDocumentCollection(trx->trxCollection(), (TRI_voc_key_t) key, m->_revisionId, envelope, &mptr, &shaped, &state->policy, false, false); - } + if (res == TRI_ERROR_ARANGO_UNIQUE_CONSTRAINT_VIOLATED) { + state->policy.setExpectedRevision(m->_revisionId); + res = TRI_UpdateShapedJsonDocumentCollection(trx->trxCollection(), (TRI_voc_key_t) key, m->_revisionId, envelope, &mptr, &shaped, &state->policy, false, false); + } - return res; - }); - } - else { - // ERROR - found a local action for a collection that has an ongoing remote transaction - res = TRI_ERROR_TRANSACTION_INTERNAL; - } + return res; + }); if (res != TRI_ERROR_NO_ERROR && res != TRI_ERROR_ARANGO_CONFLICT && @@ -955,40 +815,17 @@ bool RecoverState::ReplayMarker (TRI_df_marker_t const* marker, char const* base = reinterpret_cast(m); char const* key = base + sizeof(remove_marker_t); - int res = TRI_ERROR_NO_ERROR; - - if (state->isRemoteTransaction(transactionId)) { - // remote operation - res = state->executeRemoteOperation(databaseId, collectionId, transactionId, marker, datafile->_fid, [&](RemoteTransactionType* trx, Marker* envelope) -> int { - if (IsVolatile(trx->trxCollection(collectionId))) { - return TRI_ERROR_NO_ERROR; - } - - // remove the document and ignore any potential errors - state->policy.setExpectedRevision(m->_revisionId); - TRI_RemoveShapedJsonDocumentCollection(trx->trxCollection(collectionId), (TRI_voc_key_t) key, m->_revisionId, envelope, &state->policy, false, false); - + int res = state->executeSingleOperation(databaseId, collectionId, marker, datafile->_fid, [&](SingleWriteTransactionType* trx, Marker* envelope) -> int { + if (IsVolatile(trx->trxCollection())) { return TRI_ERROR_NO_ERROR; - }); - } - else if (! state->isUsedByRemoteTransaction(collectionId)) { - // local operation - res = state->executeSingleOperation(databaseId, collectionId, marker, datafile->_fid, [&](SingleWriteTransactionType* trx, Marker* envelope) -> int { - if (IsVolatile(trx->trxCollection())) { - return TRI_ERROR_NO_ERROR; - } + } - // remove the document and ignore any potential errors - state->policy.setExpectedRevision(m->_revisionId); - TRI_RemoveShapedJsonDocumentCollection(trx->trxCollection(), (TRI_voc_key_t) key, m->_revisionId, envelope, &state->policy, false, false); + // remove the document and ignore any potential errors + state->policy.setExpectedRevision(m->_revisionId); + TRI_RemoveShapedJsonDocumentCollection(trx->trxCollection(), (TRI_voc_key_t) key, m->_revisionId, envelope, &state->policy, false, false); - return TRI_ERROR_NO_ERROR; - }); - } - else { - // ERROR - found a local action for a collection that has an ongoing remote transaction - res = TRI_ERROR_TRANSACTION_INTERNAL; - } + return TRI_ERROR_NO_ERROR; + }); if (res != TRI_ERROR_NO_ERROR && res != TRI_ERROR_ARANGO_CONFLICT && @@ -1009,47 +846,6 @@ bool RecoverState::ReplayMarker (TRI_df_marker_t const* marker, // transactions // ----------------------------------------------------------------------------- - case TRI_WAL_MARKER_BEGIN_REMOTE_TRANSACTION: { - transaction_remote_begin_marker_t const* m = reinterpret_cast(marker); - TRI_voc_tick_t databaseId = m->_databaseId; - TRI_voc_tid_t externalId = m->_externalId; - // start a remote transaction - - if (state->isDropped(databaseId)) { - return true; - } - - TRI_vocbase_t* vocbase = state->useDatabase(databaseId); - if (vocbase == nullptr) { - LOG_WARNING("cannot start remote transaction in database %llu: %s", - (unsigned long long) databaseId, - TRI_errno_string(TRI_ERROR_ARANGO_DATABASE_NOT_FOUND)); - } - - auto trx = new RemoteTransactionType(state->server, vocbase, externalId); - - if (trx == nullptr) { - LOG_WARNING("unable to start transaction: %s", TRI_errno_string(TRI_ERROR_OUT_OF_MEMORY)); - ++state->errorCount; - return state->canContinue(); - } - - trx->addHint(TRI_TRANSACTION_HINT_NO_BEGIN_MARKER, true); - - int res = trx->begin(); - - if (res != TRI_ERROR_NO_ERROR) { - LOG_WARNING("unable to start transaction: %s", TRI_errno_string(TRI_ERROR_OUT_OF_MEMORY)); - delete trx; - ++state->errorCount; - return state->canContinue(); - } - - state->runningRemoteTransactions.emplace(std::make_pair(m->_externalId, trx)); - break; - } - - case TRI_WAL_MARKER_RENAME_COLLECTION: { collection_rename_marker_t const* m = reinterpret_cast(marker); TRI_voc_cid_t collectionId = m->_collectionId; diff --git a/arangod/Wal/RecoverState.h b/arangod/Wal/RecoverState.h index b02f643002..6fb81e54b4 100644 --- a/arangod/Wal/RecoverState.h +++ b/arangod/Wal/RecoverState.h @@ -41,19 +41,12 @@ #include "Wal/Marker.h" #include -//////////////////////////////////////////////////////////////////////////////// -/// @brief shortcut for multi-operation remote transaction -//////////////////////////////////////////////////////////////////////////////// - -#define RemoteTransactionType triagens::arango::ReplicationTransaction - //////////////////////////////////////////////////////////////////////////////// /// @brief shortcut for single-operation write transaction //////////////////////////////////////////////////////////////////////////////// #define SingleWriteTransactionType triagens::arango::SingleCollectionWriteTransaction<1> - namespace triagens { namespace wal { @@ -134,14 +127,6 @@ namespace triagens { return ignoreRecoveryErrors; } -//////////////////////////////////////////////////////////////////////////////// -/// @brief whether or not there are remote transactions -//////////////////////////////////////////////////////////////////////////////// - - inline bool hasRunningRemoteTransactions () const { - return ! runningRemoteTransactions.empty(); - } - //////////////////////////////////////////////////////////////////////////////// /// @brief whether or not the recovery procedure must be run //////////////////////////////////////////////////////////////////////////////// @@ -158,32 +143,6 @@ namespace triagens { return (transactionId > 0 && failedTransactions.find(transactionId) != failedTransactions.end()); } -//////////////////////////////////////////////////////////////////////////////// -/// @brief whether or not a transaction was started remotely -//////////////////////////////////////////////////////////////////////////////// - - inline bool isRemoteTransaction (TRI_voc_tid_t transactionId) const { - return (transactionId > 0 && remoteTransactions.find(transactionId) != remoteTransactions.end()); - } - -//////////////////////////////////////////////////////////////////////////////// -/// @brief register a collection for a remote transaction -//////////////////////////////////////////////////////////////////////////////// - - inline void registerRemoteUsage (TRI_voc_tick_t databaseId, - TRI_voc_cid_t collectionId) { - remoteTransactionDatabases.insert(databaseId); - remoteTransactionCollections.insert(collectionId); - } - -//////////////////////////////////////////////////////////////////////////////// -/// @brief whether or not a transaction was started remotely -//////////////////////////////////////////////////////////////////////////////// - - inline bool isUsedByRemoteTransaction (TRI_voc_tid_t collectionId) const { - return (remoteTransactionCollections.find(collectionId) != remoteTransactionCollections.end()); - } - //////////////////////////////////////////////////////////////////////////////// /// @brief release opened collections and databases so they can be shut down /// etc. @@ -227,17 +186,6 @@ namespace triagens { TRI_document_collection_t* getCollection (TRI_voc_tick_t, TRI_voc_cid_t); -//////////////////////////////////////////////////////////////////////////////// -/// @brief executes an operation in a remote transaction -//////////////////////////////////////////////////////////////////////////////// - - int executeRemoteOperation (TRI_voc_tick_t, - TRI_voc_cid_t, - TRI_voc_tid_t, - TRI_df_marker_t const*, - TRI_voc_fid_t, - std::function); - //////////////////////////////////////////////////////////////////////////////// /// @brief executes a single operation inside a transaction //////////////////////////////////////////////////////////////////////////////// @@ -302,9 +250,6 @@ namespace triagens { TRI_server_t* server; std::unordered_map> failedTransactions; - std::unordered_map> remoteTransactions; - std::unordered_set remoteTransactionCollections; - std::unordered_set remoteTransactionDatabases; std::unordered_set droppedCollections; std::unordered_set droppedDatabases; std::unordered_set droppedIds; @@ -313,7 +258,6 @@ namespace triagens { std::vector logfilesToProcess; std::unordered_map openedCollections; std::unordered_map openedDatabases; - std::unordered_map runningRemoteTransactions; std::vector emptyLogfiles; TRI_doc_update_policy_t policy;