1
0
Fork 0

recovery and replication

This commit is contained in:
Jan Steemann 2014-06-30 13:07:48 +02:00
parent abf273d453
commit bda2db16da
27 changed files with 757 additions and 331 deletions

View File

@ -117,10 +117,10 @@ static int InitialiseCap (TRI_cap_constraint_t* cap,
TRI_voc_cid_t cid = document->_info._cid;
triagens::arango::SingleCollectionWriteTransaction<triagens::arango::RestTransactionContext, UINT64_MAX> trx(vocbase, cid);
trx.addHint(TRI_TRANSACTION_HINT_LOCK_NEVER);
trx.addHint(TRI_TRANSACTION_HINT_NO_BEGIN_MARKER);
trx.addHint(TRI_TRANSACTION_HINT_NO_ABORT_MARKER);
trx.addHint(TRI_TRANSACTION_HINT_SINGLE_OPERATION); // this is actually not true, but necessary to create trx id 0
trx.addHint(TRI_TRANSACTION_HINT_LOCK_NEVER, false);
trx.addHint(TRI_TRANSACTION_HINT_NO_BEGIN_MARKER, false);
trx.addHint(TRI_TRANSACTION_HINT_NO_ABORT_MARKER, false);
trx.addHint(TRI_TRANSACTION_HINT_SINGLE_OPERATION, false); // this is actually not true, but necessary to create trx id 0
int res = trx.begin();

View File

@ -98,11 +98,13 @@ static inline void LocalGetline (char const*& p,
/// @brief constructor
////////////////////////////////////////////////////////////////////////////////
ContinuousSyncer::ContinuousSyncer (TRI_vocbase_t* vocbase,
ContinuousSyncer::ContinuousSyncer (TRI_server_t* server,
TRI_vocbase_t* vocbase,
TRI_replication_applier_configuration_t const* configuration,
TRI_voc_tick_t initialTick,
bool useTick)
: Syncer(vocbase, configuration),
_server(server),
_applier(vocbase->_replicationApplier),
_chunkSize(),
_initialTick(initialTick),
@ -116,6 +118,11 @@ ContinuousSyncer::ContinuousSyncer (TRI_vocbase_t* vocbase,
TRI_ASSERT(c > 0);
_chunkSize = StringUtils::itoa(c);
// get number of running remote transactions so we can forge the transaction
// statistics
int const n = static_cast<int>(_applier->_runningRemoteTransactions.size());
triagens::arango::TransactionBase::setNumbers(n, n);
}
////////////////////////////////////////////////////////////////////////////////
@ -123,7 +130,6 @@ ContinuousSyncer::ContinuousSyncer (TRI_vocbase_t* vocbase,
////////////////////////////////////////////////////////////////////////////////
ContinuousSyncer::~ContinuousSyncer () {
abortOngoingTransactions();
}
// -----------------------------------------------------------------------------
@ -287,20 +293,6 @@ int ContinuousSyncer::getLocalState (string& errorMsg) {
return res;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief abort any ongoing transactions
////////////////////////////////////////////////////////////////////////////////
void ContinuousSyncer::abortOngoingTransactions () {
for (auto it = _transactions.begin(); it != _transactions.end(); ++it) {
auto trx = (*it).second;
delete trx;
}
_transactions.clear();
}
////////////////////////////////////////////////////////////////////////////////
/// @brief inserts a document, based on the JSON provided
////////////////////////////////////////////////////////////////////////////////
@ -351,9 +343,9 @@ int ContinuousSyncer::processDocument (TRI_replication_operation_e type,
}
if (tid > 0) {
auto it = _transactions.find(tid);
auto it = _applier->_runningRemoteTransactions.find(tid);
if (it == _transactions.end()) {
if (it == _applier->_runningRemoteTransactions.end()) {
return TRI_ERROR_REPLICATION_UNEXPECTED_TRANSACTION;
}
@ -422,26 +414,38 @@ int ContinuousSyncer::startTransaction (TRI_json_t const* json) {
}
// transaction id
// note: this is the remote trasnaction id!
TRI_voc_tid_t tid = static_cast<TRI_voc_tid_t>(StringUtils::uint64(id.c_str(), id.size()));
auto it = _transactions.find(tid);
if (it != _transactions.end()) {
auto it = _applier->_runningRemoteTransactions.find(tid);
if (it != _applier->_runningRemoteTransactions.end()) {
_applier->_runningRemoteTransactions.erase(tid);
auto trx = (*it).second;
// abort ongoing trx
delete trx;
_transactions.erase(tid);
}
TRI_ASSERT(tid > 0);
LOG_TRACE("starting replication transaction %llu", (unsigned long long) tid);
auto trx = new ReplicationTransaction(_vocbase);
auto trx = new ReplicationTransaction(_server, _vocbase, tid);
if (trx == nullptr) {
return TRI_ERROR_OUT_OF_MEMORY;
}
_transactions.insert(it, std::make_pair(tid, trx));
int res = trx->begin();
if (res != TRI_ERROR_NO_ERROR) {
delete trx;
return res;
}
_applier->_runningRemoteTransactions.insert(it, std::make_pair(tid, trx));
return TRI_ERROR_NO_ERROR;
}
@ -458,23 +462,26 @@ int ContinuousSyncer::abortTransaction (TRI_json_t const* json) {
}
// transaction id
// note: this is the remote trasnaction id!
TRI_voc_tid_t const tid = static_cast<TRI_voc_tid_t>(StringUtils::uint64(id.c_str(), id.size()));
auto it = _transactions.find(tid);
if (it == _transactions.end()) {
auto it = _applier->_runningRemoteTransactions.find(tid);
if (it == _applier->_runningRemoteTransactions.end()) {
// invalid state, no transaction was started.
return TRI_ERROR_REPLICATION_UNEXPECTED_TRANSACTION;
}
TRI_ASSERT(tid > 0);
LOG_TRACE("abort replication transaction %llu", (unsigned long long) tid);
_applier->_runningRemoteTransactions.erase(tid);
auto trx = (*it).second;
int res = trx->abort();
delete trx;
_transactions.erase(tid);
return res;
}
@ -491,23 +498,26 @@ int ContinuousSyncer::commitTransaction (TRI_json_t const* json) {
}
// transaction id
// note: this is the remote trasnaction id!
TRI_voc_tid_t const tid = static_cast<TRI_voc_tid_t>(StringUtils::uint64(id.c_str(), id.size()));
auto it = _transactions.find(tid);
if (it == _transactions.end()) {
auto it = _applier->_runningRemoteTransactions.find(tid);
if (it == _applier->_runningRemoteTransactions.end()) {
// invalid state, no transaction was started.
return TRI_ERROR_REPLICATION_UNEXPECTED_TRANSACTION;
}
TRI_ASSERT(tid > 0);
LOG_TRACE("committing replication transaction %llu", (unsigned long long) tid);
_applier->_runningRemoteTransactions.erase(tid);
auto trx = (*it).second;
int res = trx->commit();
delete trx;
_transactions.erase(tid);
return res;
}

View File

@ -34,14 +34,14 @@
#include "Replication/Syncer.h"
#include "Utils/ReplicationTransaction.h"
#include "VocBase/replication-applier.h"
// -----------------------------------------------------------------------------
// --SECTION-- forward declarations
// -----------------------------------------------------------------------------
struct TRI_json_s;
struct TRI_replication_applier_s;
struct TRI_replication_applier_configuration_s;
struct TRI_server_s;
struct TRI_vocbase_s;
namespace triagens {
@ -68,7 +68,8 @@ namespace triagens {
/// @brief constructor
////////////////////////////////////////////////////////////////////////////////
ContinuousSyncer (struct TRI_vocbase_s*,
ContinuousSyncer (struct TRI_server_s*,
struct TRI_vocbase_s*,
struct TRI_replication_applier_configuration_s const*,
TRI_voc_tick_t,
bool);
@ -115,12 +116,6 @@ namespace triagens {
int getLocalState (std::string&);
////////////////////////////////////////////////////////////////////////////////
/// @brief abort any ongoing transactions
////////////////////////////////////////////////////////////////////////////////
void abortOngoingTransactions ();
////////////////////////////////////////////////////////////////////////////////
/// @brief starts a transaction, based on the JSON provided
////////////////////////////////////////////////////////////////////////////////
@ -198,17 +193,17 @@ namespace triagens {
private:
////////////////////////////////////////////////////////////////////////////////
/// @brief server
////////////////////////////////////////////////////////////////////////////////
struct TRI_server_s* _server;
////////////////////////////////////////////////////////////////////////////////
/// @brief pointer to the applier state
////////////////////////////////////////////////////////////////////////////////
struct TRI_replication_applier_s* _applier;
////////////////////////////////////////////////////////////////////////////////
/// @brief currently running transactions
////////////////////////////////////////////////////////////////////////////////
std::unordered_map<TRI_voc_tid_t, ReplicationTransaction*> _transactions;
TRI_replication_applier_t* _applier;
////////////////////////////////////////////////////////////////////////////////
/// @brief stringified chunk size

View File

@ -830,7 +830,6 @@ int ArangoServer::startupServer () {
_applicationV8->runVersionCheck(skipUpgrade, performUpgrade);
// finally flush the write-ahead log so all data in the WAL goes into the collections
wal::LogfileManager::instance()->flush(true, true, true);
// WAL recovery done after here

View File

@ -64,10 +64,10 @@ namespace triagens {
AhuacatlTransaction (struct TRI_vocbase_s* vocbase,
TRI_aql_context_t* context)
: Transaction<T>(vocbase),
: Transaction<T>(vocbase, 0),
_context(context) {
this->addHint(TRI_TRANSACTION_HINT_LOCK_ENTIRELY);
this->addHint(TRI_TRANSACTION_HINT_LOCK_ENTIRELY, false);
TRI_vector_pointer_t* collections = &context->_collections;

View File

@ -63,9 +63,9 @@ namespace triagens {
std::vector<std::string> const& writeCollections,
double lockTimeout,
bool waitForSync)
: Transaction<T>(vocbase) {
: Transaction<T>(vocbase, 0) {
this->addHint(TRI_TRANSACTION_HINT_LOCK_ENTIRELY);
this->addHint(TRI_TRANSACTION_HINT_LOCK_ENTIRELY, false);
if (lockTimeout >= 0.0) {
this->setTimeout(lockTimeout);
@ -75,12 +75,12 @@ namespace triagens {
this->setWaitForSync();
}
for (size_t i = 0; i < readCollections.size(); ++i) {
this->addCollection(readCollections[i], TRI_TRANSACTION_READ);
for (auto it = readCollections.begin(); it != readCollections.end(); ++it) {
this->addCollection((*it), TRI_TRANSACTION_READ);
}
for (size_t i = 0; i < writeCollections.size(); ++i) {
this->addCollection(writeCollections[i], TRI_TRANSACTION_WRITE);
for (auto it = writeCollections.begin(); it != writeCollections.end(); ++it) {
this->addCollection((*it), TRI_TRANSACTION_WRITE);
}
}

View File

@ -58,8 +58,14 @@ namespace triagens {
/// @brief create the transaction
////////////////////////////////////////////////////////////////////////////////
ReplicationTransaction (struct TRI_vocbase_s* vocbase)
: Transaction<RestTransactionContext>(vocbase) {
ReplicationTransaction (TRI_server_t* server,
struct TRI_vocbase_s* vocbase,
TRI_voc_tid_t externalId)
: Transaction<RestTransactionContext>(vocbase, externalId),
_server(server),
_externalId(externalId) {
TRI_UseDatabaseServer(_server, vocbase->_name);
}
////////////////////////////////////////////////////////////////////////////////
@ -67,6 +73,21 @@ namespace triagens {
////////////////////////////////////////////////////////////////////////////////
~ReplicationTransaction () {
TRI_ReleaseDatabaseServer(_server, vocbase());
}
// -----------------------------------------------------------------------------
// --SECTION-- public functions
// -----------------------------------------------------------------------------
public:
////////////////////////////////////////////////////////////////////////////////
/// @brief return the remote (external) id of the transaction
////////////////////////////////////////////////////////////////////////////////
inline TRI_voc_tid_t externalId () const {
return _externalId;
}
////////////////////////////////////////////////////////////////////////////////
@ -83,7 +104,7 @@ namespace triagens {
int res = TRI_AddCollectionTransaction(this->_trx, cid, TRI_TRANSACTION_WRITE, 0, true);
if (res == TRI_ERROR_NO_ERROR) {
res = TRI_LockCollectionTransaction(trxCollection, TRI_TRANSACTION_WRITE, 0);
res = TRI_EnsureCollectionsTransaction(this->_trx);
}
if (res != TRI_ERROR_NO_ERROR) {
@ -93,9 +114,18 @@ namespace triagens {
trxCollection = TRI_GetCollectionTransaction(this->_trx, cid, TRI_TRANSACTION_WRITE);
}
return nullptr;
return trxCollection;
}
// -----------------------------------------------------------------------------
// --SECTION-- private variables
// -----------------------------------------------------------------------------
private:
TRI_server_t* _server;
TRI_voc_tid_t _externalId;
};
}

View File

@ -67,7 +67,7 @@ namespace triagens {
SingleCollectionTransaction (TRI_vocbase_t* vocbase,
TRI_voc_cid_t cid,
TRI_transaction_type_e accessType)
: Transaction<T>(vocbase),
: Transaction<T>(vocbase, 0),
_cid(cid),
_trxCollection(nullptr),
_documentCollection(nullptr),
@ -84,7 +84,7 @@ namespace triagens {
SingleCollectionTransaction (TRI_vocbase_t* vocbase,
std::string const& name,
TRI_transaction_type_e accessType)
: Transaction<T>(vocbase),
: Transaction<T>(vocbase, 0),
_cid(this->resolver()->getCollectionId(name)),
_trxCollection(nullptr),
_documentCollection(nullptr),

View File

@ -74,7 +74,7 @@ namespace triagens {
_numWrites(0) {
if (N == 1) {
this->addHint(TRI_TRANSACTION_HINT_SINGLE_OPERATION);
this->addHint(TRI_TRANSACTION_HINT_SINGLE_OPERATION, false);
}
}
@ -88,7 +88,7 @@ namespace triagens {
_numWrites(0) {
if (N == 1) {
this->addHint(TRI_TRANSACTION_HINT_SINGLE_OPERATION);
this->addHint(TRI_TRANSACTION_HINT_SINGLE_OPERATION, false);
}
}

View File

@ -80,8 +80,10 @@ namespace triagens {
/// @brief create the transaction
////////////////////////////////////////////////////////////////////////////////
Transaction (TRI_vocbase_t* vocbase)
Transaction (TRI_vocbase_t* vocbase,
TRI_voc_tid_t externalId)
: T(),
_externalId(externalId),
_setupState(TRI_ERROR_NO_ERROR),
_nestingLevel(0),
_errorData(),
@ -130,13 +132,39 @@ namespace triagens {
public:
////////////////////////////////////////////////////////////////////////////////
/// @brief return database of transaction
////////////////////////////////////////////////////////////////////////////////
inline TRI_vocbase_t* vocbase () const {
return _vocbase;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief add a transaction hint
////////////////////////////////////////////////////////////////////////////////
void inline addHint (TRI_transaction_hint_e hint) {
_hints |= (TRI_transaction_hint_t) hint;
}
void inline addHint (TRI_transaction_hint_e hint,
bool passthrough) {
_hints |= (TRI_transaction_hint_t) hint;
if (passthrough && _trx != nullptr) {
_trx->_hints |= ((TRI_transaction_hint_t) hint);
}
}
////////////////////////////////////////////////////////////////////////////////
/// @brief remove a transaction hint
////////////////////////////////////////////////////////////////////////////////
void inline removeHint (TRI_transaction_hint_e hint,
bool passthrough) {
_hints &= ~ ((TRI_transaction_hint_t) hint);
if (passthrough && _trx != nullptr) {
_trx->_hints &= ~ ((TRI_transaction_hint_t) hint);
}
}
////////////////////////////////////////////////////////////////////////////////
/// @brief return the registered error data
@ -252,6 +280,7 @@ namespace triagens {
if (_nestingLevel == 0) {
_trx->_status = TRI_TRANSACTION_ABORTED;
}
#ifdef TRI_ENABLE_MAINTAINER_MODE
TRI_ASSERT(_numberTrxActive == _numberTrxInScope);
_numberTrxActive--; // Every transaction gets here at most once
@ -1161,6 +1190,7 @@ namespace triagens {
// we are not embedded. now start our own transaction
_trx = TRI_CreateTransaction(_vocbase,
_externalId,
_timeout,
_waitForSync);
@ -1195,6 +1225,12 @@ namespace triagens {
private:
////////////////////////////////////////////////////////////////////////////////
/// @brief external transaction id. used in replication only
////////////////////////////////////////////////////////////////////////////////
TRI_voc_tid_t _externalId;
////////////////////////////////////////////////////////////////////////////////
/// @brief error that occurred on transaction initialisation (before begin())
////////////////////////////////////////////////////////////////////////////////

View File

@ -5054,14 +5054,14 @@ static v8::Handle<v8::Value> JS_StartApplierReplication (v8::Arguments const& ar
}
////////////////////////////////////////////////////////////////////////////////
/// @brief stop the replication applier manually
/// @brief shuts down the replication applier manually
////////////////////////////////////////////////////////////////////////////////
static v8::Handle<v8::Value> JS_StopApplierReplication (v8::Arguments const& argv) {
static v8::Handle<v8::Value> JS_ShutdownApplierReplication (v8::Arguments const& argv) {
v8::HandleScope scope;
if (argv.Length() != 0) {
TRI_V8_EXCEPTION_USAGE(scope, "REPLICATION_APPLIER_STOP()");
TRI_V8_EXCEPTION_USAGE(scope, "REPLICATION_APPLIER_SHUTDOWN()");
}
TRI_vocbase_t* vocbase = GetContextVocBase();
@ -5074,10 +5074,10 @@ static v8::Handle<v8::Value> JS_StopApplierReplication (v8::Arguments const& arg
TRI_V8_EXCEPTION(scope, TRI_ERROR_INTERNAL);
}
int res = TRI_StopReplicationApplier(vocbase->_replicationApplier, true);
int res = TRI_ShutdownReplicationApplier(vocbase->_replicationApplier);
if (res != TRI_ERROR_NO_ERROR) {
TRI_V8_EXCEPTION_MESSAGE(scope, res, "cannot stop replication applier");
TRI_V8_EXCEPTION_MESSAGE(scope, res, "cannot shut down replication applier");
}
return scope.Close(v8::True());
@ -10244,7 +10244,7 @@ void TRI_InitV8VocBridge (v8::Handle<v8::Context> context,
TRI_AddGlobalFunctionVocbase(context, "REPLICATION_SERVER_ID", JS_ServerIdReplication, true);
TRI_AddGlobalFunctionVocbase(context, "REPLICATION_APPLIER_CONFIGURE", JS_ConfigureApplierReplication, true);
TRI_AddGlobalFunctionVocbase(context, "REPLICATION_APPLIER_START", JS_StartApplierReplication, true);
TRI_AddGlobalFunctionVocbase(context, "REPLICATION_APPLIER_STOP", JS_StopApplierReplication, true);
TRI_AddGlobalFunctionVocbase(context, "REPLICATION_APPLIER_SHUTDOWN", JS_ShutdownApplierReplication, true);
TRI_AddGlobalFunctionVocbase(context, "REPLICATION_APPLIER_STATE", JS_StateApplierReplication, true);
TRI_AddGlobalFunctionVocbase(context, "REPLICATION_APPLIER_FORGET", JS_ForgetApplierReplication, true);

View File

@ -434,37 +434,36 @@ void ApplyThread (void* data) {
static int StartApplier (TRI_replication_applier_t* applier,
TRI_voc_tick_t initialTick,
bool useTick) {
TRI_replication_applier_state_t* state;
void* fetcher;
state = &applier->_state;
TRI_replication_applier_state_t* state = &applier->_state;
if (state->_active) {
return TRI_ERROR_INTERNAL;
}
if (applier->_configuration._endpoint == NULL) {
if (applier->_configuration._endpoint == nullptr) {
return SetError(applier, TRI_ERROR_REPLICATION_INVALID_APPLIER_CONFIGURATION, "no endpoint configured");
}
if (applier->_configuration._database == NULL) {
if (applier->_configuration._database == nullptr) {
return SetError(applier, TRI_ERROR_REPLICATION_INVALID_APPLIER_CONFIGURATION, "no database configured");
}
// TODO: prevent restart of the applier with a tick after a shutdown
fetcher = (void*) new triagens::arango::ContinuousSyncer(applier->_vocbase,
&applier->_configuration,
initialTick,
useTick);
auto fetcher = new triagens::arango::ContinuousSyncer(applier->_server,
applier->_vocbase,
&applier->_configuration,
initialTick,
useTick);
if (fetcher == NULL) {
if (fetcher == nullptr) {
return TRI_ERROR_OUT_OF_MEMORY;
}
// reset error
if (state->_lastError._msg != NULL) {
if (state->_lastError._msg != nullptr) {
TRI_FreeString(TRI_CORE_MEM_ZONE, state->_lastError._msg);
state->_lastError._msg = NULL;
state->_lastError._msg = nullptr;
}
state->_lastError._code = TRI_ERROR_NO_ERROR;
@ -477,9 +476,8 @@ static int StartApplier (TRI_replication_applier_t* applier,
TRI_InitThread(&applier->_thread);
if (! TRI_StartThread(&applier->_thread, NULL, "[applier]", ApplyThread, fetcher)) {
triagens::arango::ContinuousSyncer* s = static_cast<triagens::arango::ContinuousSyncer*>(fetcher);
delete s;
if (! TRI_StartThread(&applier->_thread, nullptr, "[applier]", ApplyThread, static_cast<void*>(fetcher))) {
delete fetcher;
return TRI_ERROR_INTERNAL;
}
@ -510,9 +508,9 @@ static int StopApplier (TRI_replication_applier_t* applier,
TRI_SetProgressReplicationApplier(applier, "applier stopped", false);
if (resetError) {
if (state->_lastError._msg != NULL) {
if (state->_lastError._msg != nullptr) {
TRI_FreeString(TRI_CORE_MEM_ZONE, state->_lastError._msg);
state->_lastError._msg = NULL;
state->_lastError._msg = nullptr;
}
state->_lastError._code = TRI_ERROR_NO_ERROR;
@ -527,6 +525,40 @@ static int StopApplier (TRI_replication_applier_t* applier,
return TRI_ERROR_NO_ERROR;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief shut down the replication applier
/// note: must hold the lock when calling this
////////////////////////////////////////////////////////////////////////////////
static int ShutdownApplier (TRI_replication_applier_t* applier) {
TRI_replication_applier_state_t* state = &applier->_state;
if (! state->_active) {
return TRI_ERROR_INTERNAL;
}
state->_active = false;
SetTerminateFlag(applier, true);
TRI_SetProgressReplicationApplier(applier, "applier shut down", false);
if (state->_lastError._msg != nullptr) {
TRI_FreeString(TRI_CORE_MEM_ZONE, state->_lastError._msg);
state->_lastError._msg = nullptr;
}
state->_lastError._code = TRI_ERROR_NO_ERROR;
TRI_GetTimeStampReplication(state->_lastError._time, sizeof(state->_lastError._time) - 1);
TRI_LockCondition(&applier->_runStateChangeCondition);
TRI_SignalCondition(&applier->_runStateChangeCondition);
TRI_UnlockCondition(&applier->_runStateChangeCondition);
return TRI_ERROR_NO_ERROR;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief get a JSON representation of an applier state
////////////////////////////////////////////////////////////////////////////////
@ -621,28 +653,28 @@ static TRI_json_t* JsonState (TRI_replication_applier_state_t const* state) {
/// @brief create a replication applier
////////////////////////////////////////////////////////////////////////////////
TRI_replication_applier_t* TRI_CreateReplicationApplier (TRI_vocbase_t* vocbase) {
TRI_replication_applier_t* applier = static_cast<TRI_replication_applier_t*>(TRI_Allocate(TRI_CORE_MEM_ZONE, sizeof(TRI_replication_applier_t), false));
TRI_replication_applier_t* TRI_CreateReplicationApplier (TRI_server_t* server,
TRI_vocbase_t* vocbase) {
TRI_replication_applier_t* applier = new TRI_replication_applier_t(server, vocbase);
if (applier == NULL) {
return NULL;
if (applier == nullptr) {
return nullptr;
}
TRI_InitConfigurationReplicationApplier(&applier->_configuration);
TRI_InitStateReplicationApplier(&applier->_state);
if (vocbase->_type == TRI_VOCBASE_TYPE_NORMAL) {
int res;
res = LoadConfiguration(vocbase, &applier->_configuration);
int res = LoadConfiguration(vocbase, &applier->_configuration);
if (res != TRI_ERROR_NO_ERROR &&
res != TRI_ERROR_FILE_NOT_FOUND) {
TRI_set_errno(res);
TRI_DestroyStateReplicationApplier(&applier->_state);
TRI_DestroyConfigurationReplicationApplier(&applier->_configuration);
TRI_Free(TRI_CORE_MEM_ZONE, applier);
delete applier;
return NULL;
return nullptr;
}
res = TRI_LoadStateReplicationApplier(vocbase, &applier->_state);
@ -652,22 +684,15 @@ TRI_replication_applier_t* TRI_CreateReplicationApplier (TRI_vocbase_t* vocbase)
TRI_set_errno(res);
TRI_DestroyStateReplicationApplier(&applier->_state);
TRI_DestroyConfigurationReplicationApplier(&applier->_configuration);
TRI_Free(TRI_CORE_MEM_ZONE, applier);
delete applier;
return NULL;
return nullptr;
}
}
TRI_InitReadWriteLock(&applier->_statusLock);
TRI_InitSpin(&applier->_threadLock);
TRI_InitCondition(&applier->_runStateChangeCondition);
applier->_vocbase = vocbase;
applier->_databaseName = TRI_DuplicateStringZ(TRI_CORE_MEM_ZONE, vocbase->_name);
SetTerminateFlag(applier, false);
TRI_ASSERT(applier->_databaseName != NULL);
TRI_ASSERT(applier->_databaseName != nullptr);
TRI_SetProgressReplicationApplier(applier, "applier created", false);
@ -683,10 +708,6 @@ void TRI_DestroyReplicationApplier (TRI_replication_applier_t* applier) {
TRI_DestroyStateReplicationApplier(&applier->_state);
TRI_DestroyConfigurationReplicationApplier(&applier->_configuration);
TRI_FreeString(TRI_CORE_MEM_ZONE, applier->_databaseName);
TRI_DestroyCondition(&applier->_runStateChangeCondition);
TRI_DestroySpin(&applier->_threadLock);
TRI_DestroyReadWriteLock(&applier->_statusLock);
}
////////////////////////////////////////////////////////////////////////////////
@ -695,7 +716,7 @@ void TRI_DestroyReplicationApplier (TRI_replication_applier_t* applier) {
void TRI_FreeReplicationApplier (TRI_replication_applier_t* applier) {
TRI_DestroyReplicationApplier(applier);
TRI_Free(TRI_CORE_MEM_ZONE, applier);
delete applier;
}
// -----------------------------------------------------------------------------
@ -789,7 +810,7 @@ int TRI_StopReplicationApplier (TRI_replication_applier_t* applier,
return TRI_ERROR_NO_ERROR;
}
int res = StopApplier(applier, resetError);
int res = ShutdownApplier(applier);
TRI_WriteUnlockReadWriteLock(&applier->_statusLock);
// join the thread without the status lock (otherwise it would probably not join)
@ -815,6 +836,62 @@ int TRI_StopReplicationApplier (TRI_replication_applier_t* applier,
return res;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief shut down the replication applier
////////////////////////////////////////////////////////////////////////////////
int TRI_ShutdownReplicationApplier (TRI_replication_applier_t* applier) {
if (applier == nullptr) {
return TRI_ERROR_NO_ERROR;
}
LOG_TRACE("requesting replication applier shutdown");
if (applier->_vocbase->_type == TRI_VOCBASE_TYPE_COORDINATOR) {
return TRI_ERROR_CLUSTER_UNSUPPORTED;
}
TRI_WriteLockReadWriteLock(&applier->_statusLock);
if (! applier->_state._active) {
TRI_WriteUnlockReadWriteLock(&applier->_statusLock);
return TRI_ERROR_NO_ERROR;
}
int res = StopApplier(applier, true);
TRI_WriteUnlockReadWriteLock(&applier->_statusLock);
// join the thread without the status lock (otherwise it would probably not join)
if (res == TRI_ERROR_NO_ERROR) {
res = TRI_JoinThread(&applier->_thread);
}
else {
// stop the thread but keep original error code
int res2 = TRI_JoinThread(&applier->_thread);
if (res2 != TRI_ERROR_NO_ERROR) {
LOG_ERROR("could not join replication applier for database '%s': %s",
applier->_databaseName,
TRI_errno_string(res2));
}
}
SetTerminateFlag(applier, false);
TRI_WriteLockReadWriteLock(&applier->_statusLock);
// really abort all ongoing transactions
applier->abortRunningRemoteTransactions();
TRI_WriteUnlockReadWriteLock(&applier->_statusLock);
LOG_INFO("stopped replication applier for database '%s'",
applier->_databaseName);
return res;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief configure the replication applier
////////////////////////////////////////////////////////////////////////////////

View File

@ -33,6 +33,7 @@
#include "Basics/Common.h"
#include "BasicsC/locks.h"
#include "BasicsC/threads.h"
#include "Utils/ReplicationTransaction.h"
#include "VocBase/replication-common.h"
#include "VocBase/voc-types.h"
@ -41,7 +42,7 @@
// -----------------------------------------------------------------------------
struct TRI_json_s;
struct TRI_transaction_s;
struct TRI_server_s;
struct TRI_vocbase_s;
// -----------------------------------------------------------------------------
@ -87,7 +88,7 @@ TRI_replication_applier_error_t;
/// @brief state information about replication application
////////////////////////////////////////////////////////////////////////////////
typedef struct TRI_replication_applier_state_s {
struct TRI_replication_applier_state_t {
TRI_voc_tick_t _lastProcessedContinuousTick;
TRI_voc_tick_t _lastAppliedContinuousTick;
TRI_voc_tick_t _lastAvailableContinuousTick;
@ -100,14 +101,57 @@ typedef struct TRI_replication_applier_state_s {
uint64_t _totalRequests;
uint64_t _totalFailedConnects;
uint64_t _totalEvents;
}
TRI_replication_applier_state_t;
};
////////////////////////////////////////////////////////////////////////////////
/// @brief replication applier
////////////////////////////////////////////////////////////////////////////////
typedef struct TRI_replication_applier_s {
struct TRI_replication_applier_t {
TRI_replication_applier_t (TRI_server_t* server,
TRI_vocbase_t* vocbase)
: _server(server),
_vocbase(vocbase),
_databaseName(TRI_DuplicateStringZ(TRI_CORE_MEM_ZONE, vocbase->_name)) {
TRI_InitCondition(&_runStateChangeCondition);
TRI_InitReadWriteLock(&_statusLock);
TRI_InitSpin(&_threadLock);
}
~TRI_replication_applier_t () {
for (auto it = _runningRemoteTransactions.begin(); it != _runningRemoteTransactions.end(); ++it) {
auto trx = (*it).second;
// do NOT write abort markers so we can resume running transactions later
trx->addHint(TRI_TRANSACTION_HINT_NO_ABORT_MARKER, true);
delete trx;
}
TRI_DestroySpin(&_threadLock);
TRI_DestroyReadWriteLock(&_statusLock);
TRI_DestroyCondition(&_runStateChangeCondition);
TRI_FreeString(TRI_CORE_MEM_ZONE, _databaseName);
}
void addRemoteTransaction (triagens::arango::ReplicationTransaction* trx) {
_runningRemoteTransactions.insert(std::make_pair(trx->externalId(), trx));
}
void abortRunningRemoteTransactions () {
for (auto it = _runningRemoteTransactions.begin(); it != _runningRemoteTransactions.end(); ++it) {
auto trx = (*it).second;
// do NOT write abort markers so we can resume running transactions later
trx->removeHint(TRI_TRANSACTION_HINT_NO_ABORT_MARKER, true);
delete trx;
}
_runningRemoteTransactions.clear();
}
struct TRI_server_s* _server;
struct TRI_vocbase_s* _vocbase;
TRI_read_write_lock_t _statusLock;
TRI_spin_t _threadLock;
@ -117,8 +161,8 @@ typedef struct TRI_replication_applier_s {
TRI_replication_applier_configuration_t _configuration;
char* _databaseName;
TRI_thread_t _thread;
}
TRI_replication_applier_t;
std::unordered_map<TRI_voc_tid_t, triagens::arango::ReplicationTransaction*> _runningRemoteTransactions;
};
// -----------------------------------------------------------------------------
// --SECTION-- constructors / destructors
@ -128,7 +172,8 @@ TRI_replication_applier_t;
/// @brief create a replication applier
////////////////////////////////////////////////////////////////////////////////
TRI_replication_applier_t* TRI_CreateReplicationApplier (struct TRI_vocbase_s*);
TRI_replication_applier_t* TRI_CreateReplicationApplier (struct TRI_server_s*,
struct TRI_vocbase_s*);
////////////////////////////////////////////////////////////////////////////////
/// @brief destroy a replication applier
@ -174,6 +219,12 @@ int TRI_StartReplicationApplier (TRI_replication_applier_t*,
int TRI_StopReplicationApplier (TRI_replication_applier_t*,
bool);
////////////////////////////////////////////////////////////////////////////////
/// @brief shuts down the replication applier
////////////////////////////////////////////////////////////////////////////////
int TRI_ShutdownReplicationApplier (TRI_replication_applier_t*);
////////////////////////////////////////////////////////////////////////////////
/// @brief configure the replication applier
////////////////////////////////////////////////////////////////////////////////

View File

@ -1931,12 +1931,7 @@ int TRI_InitDatabasesServer (TRI_server_t* server) {
TRI_StartCompactorVocBase(vocbase);
// start the replication applier
vocbase->_replicationApplier = TRI_CreateReplicationApplier(vocbase);
if (vocbase->_replicationApplier == nullptr) {
// TODO
LOG_FATAL_AND_EXIT("initialising replication applier for database '%s' failed", vocbase->_name);
}
TRI_ASSERT(vocbase->_replicationApplier != nullptr);
if (vocbase->_replicationApplier->_configuration._autoStart) {
if (server->_disableReplicationAppliers) {
@ -2032,7 +2027,7 @@ int TRI_CreateCoordinatorDatabaseServer (TRI_server_t* server,
TRI_ASSERT(vocbase != nullptr);
vocbase->_replicationApplier = TRI_CreateReplicationApplier(vocbase);
vocbase->_replicationApplier = TRI_CreateReplicationApplier(server, vocbase);
if (vocbase->_replicationApplier == nullptr) {
TRI_DestroyInitialVocBase(vocbase);
@ -2153,13 +2148,6 @@ int TRI_CreateDatabaseServer (TRI_server_t* server,
TRI_StartCompactorVocBase(vocbase);
// start the replication applier
vocbase->_replicationApplier = TRI_CreateReplicationApplier(vocbase);
if (vocbase->_replicationApplier == nullptr) {
// TODO
LOG_FATAL_AND_EXIT("initialising replication applier for database '%s' failed", name);
}
if (vocbase->_replicationApplier->_configuration._autoStart) {
if (server->_disableReplicationAppliers) {
LOG_INFO("replication applier explicitly deactivated for database '%s'", name);

View File

@ -550,8 +550,16 @@ static int WriteBeginMarker (TRI_transaction_t* trx) {
int res;
try {
triagens::wal::BeginTransactionMarker marker(trx->_vocbase->_id, trx->_id);
res = GetLogfileManager()->allocateAndWrite(marker, false).errorCode;
if (trx->_externalId > 0) {
// remotely started trx
triagens::wal::BeginRemoteTransactionMarker marker(trx->_vocbase->_id, trx->_id, trx->_externalId);
res = GetLogfileManager()->allocateAndWrite(marker, false).errorCode;
}
else {
// local trx
triagens::wal::BeginTransactionMarker marker(trx->_vocbase->_id, trx->_id);
res = GetLogfileManager()->allocateAndWrite(marker, false).errorCode;
}
}
catch (triagens::arango::Exception const& ex) {
res = ex.code();
@ -587,8 +595,16 @@ static int WriteAbortMarker (TRI_transaction_t* trx) {
int res;
try {
triagens::wal::AbortTransactionMarker marker(trx->_vocbase->_id, trx->_id);
res = GetLogfileManager()->allocateAndWrite(marker, false).errorCode;
if (trx->_externalId > 0) {
// remotely started trx
triagens::wal::AbortRemoteTransactionMarker marker(trx->_vocbase->_id, trx->_id, trx->_externalId);
res = GetLogfileManager()->allocateAndWrite(marker, false).errorCode;
}
else {
// local trx
triagens::wal::AbortTransactionMarker marker(trx->_vocbase->_id, trx->_id);
res = GetLogfileManager()->allocateAndWrite(marker, false).errorCode;
}
}
catch (triagens::arango::Exception const& ex) {
res = ex.code();
@ -620,8 +636,16 @@ static int WriteCommitMarker (TRI_transaction_t* trx) {
int res;
try {
triagens::wal::CommitTransactionMarker marker(trx->_vocbase->_id, trx->_id);
res = GetLogfileManager()->allocateAndWrite(marker, false).errorCode;
if (trx->_externalId > 0) {
// remotely started trx
triagens::wal::CommitRemoteTransactionMarker marker(trx->_vocbase->_id, trx->_id, trx->_externalId);
res = GetLogfileManager()->allocateAndWrite(marker, false).errorCode;
}
else {
// local trx
triagens::wal::CommitTransactionMarker marker(trx->_vocbase->_id, trx->_id);
res = GetLogfileManager()->allocateAndWrite(marker, false).errorCode;
}
}
catch (triagens::arango::Exception const& ex) {
res = ex.code();
@ -664,6 +688,7 @@ static void UpdateTransactionStatus (TRI_transaction_t* const trx,
////////////////////////////////////////////////////////////////////////////////
TRI_transaction_t* TRI_CreateTransaction (TRI_vocbase_t* vocbase,
TRI_voc_tid_t externalId,
double timeout,
bool waitForSync) {
TRI_transaction_t* trx = static_cast<TRI_transaction_t*>(TRI_Allocate(TRI_UNKNOWN_MEM_ZONE, sizeof(TRI_transaction_t), false));
@ -676,8 +701,8 @@ TRI_transaction_t* TRI_CreateTransaction (TRI_vocbase_t* vocbase,
trx->_vocbase = vocbase;
// note: the real transaction id will be acquired on transaction start
trx->_id = 0;
trx->_id = 0; // local trx id
trx->_externalId = externalId; // remote trx id (used in replication)
trx->_status = TRI_TRANSACTION_CREATED;
trx->_type = TRI_TRANSACTION_READ;
trx->_hints = 0;
@ -693,6 +718,11 @@ TRI_transaction_t* TRI_CreateTransaction (TRI_vocbase_t* vocbase,
trx->_timeout = (uint64_t) 0;
}
if (trx->_externalId != 0) {
// replication transaction is always a write transaction
trx->_type = TRI_TRANSACTION_WRITE;
}
TRI_InitVectorPointer2(&trx->_collections, TRI_UNKNOWN_MEM_ZONE, 2);
return trx;
@ -860,6 +890,14 @@ int TRI_AddCollectionTransaction (TRI_transaction_t* trx,
return TRI_ERROR_NO_ERROR;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief make sure all declared collections are used & locked
////////////////////////////////////////////////////////////////////////////////
int TRI_EnsureCollectionsTransaction (TRI_transaction_t* trx) {
return UseCollections(trx, 0);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief request a lock for a collection
////////////////////////////////////////////////////////////////////////////////

View File

@ -142,15 +142,16 @@ TRI_transaction_hint_e;
typedef struct TRI_transaction_s {
struct TRI_vocbase_s* _vocbase; // vocbase
TRI_voc_tid_t _id; // trx id
TRI_voc_tid_t _id; // local trx id
TRI_voc_tid_t _externalId; // external trx id (used in replication)
TRI_transaction_type_e _type; // access type (read|write)
TRI_transaction_status_e _status; // current status
TRI_vector_pointer_t _collections; // list of participating collections
TRI_transaction_hint_t _hints; // hints;
int _nestingLevel;
uint64_t _timeout; // timeout for lock acquisition
bool _hasOperations;
bool _waitForSync; // whether or not the collection had a synchronous op
uint64_t _timeout; // timeout for lock acquisition
}
TRI_transaction_t;
@ -182,6 +183,7 @@ TRI_transaction_collection_t;
////////////////////////////////////////////////////////////////////////////////
TRI_transaction_t* TRI_CreateTransaction (struct TRI_vocbase_s*,
TRI_voc_tid_t,
double,
bool);
@ -239,6 +241,12 @@ int TRI_AddCollectionTransaction (TRI_transaction_t*,
int,
bool);
////////////////////////////////////////////////////////////////////////////////
/// @brief make sure all declared collections are used & locked
////////////////////////////////////////////////////////////////////////////////
int TRI_EnsureCollectionsTransaction (TRI_transaction_t*);
////////////////////////////////////////////////////////////////////////////////
/// @brief request a lock for a collection
////////////////////////////////////////////////////////////////////////////////

View File

@ -206,6 +206,17 @@ namespace triagens {
#endif
}
////////////////////////////////////////////////////////////////////////////////
/// @brief set counters, used in replication client to transfer transactions
/// between threads.
////////////////////////////////////////////////////////////////////////////////
static void setNumbers (int numberInScope, int numberActive) {
_numberTrxInScope = numberInScope;
_numberTrxActive = numberActive;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief assert that a transaction object is in scope in the current thread
////////////////////////////////////////////////////////////////////////////////

View File

@ -164,6 +164,36 @@ static bool EqualKeyCollectionName (TRI_associative_pointer_t* array, void const
// --SECTION-- private functions
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @brief write a drop collection marker into the log
////////////////////////////////////////////////////////////////////////////////
static int WriteDropCollectionMarker (TRI_vocbase_t* vocbase,
TRI_voc_cid_t collectionId) {
int res = TRI_ERROR_NO_ERROR;
try {
triagens::wal::DropCollectionMarker marker(vocbase->_id, collectionId);
triagens::wal::SlotInfoCopy slotInfo = triagens::wal::LogfileManager::instance()->allocateAndWrite(marker, false);
if (slotInfo.errorCode != TRI_ERROR_NO_ERROR) {
THROW_ARANGO_EXCEPTION(slotInfo.errorCode);
}
}
catch (triagens::arango::Exception const& ex) {
res = ex.code();
}
catch (...) {
res = TRI_ERROR_INTERNAL;
}
if (res != TRI_ERROR_NO_ERROR) {
LOG_WARNING("could not save collection drop marker in log: %s", TRI_errno_string(res));
}
return res;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief removes a collection name from the global list of collections
///
@ -172,8 +202,7 @@ static bool EqualKeyCollectionName (TRI_associative_pointer_t* array, void const
////////////////////////////////////////////////////////////////////////////////
static bool UnregisterCollection (TRI_vocbase_t* vocbase,
TRI_vocbase_col_t* collection,
bool writeMarker) {
TRI_vocbase_col_t* collection) {
TRI_ASSERT(collection != nullptr);
TRI_ASSERT(collection->_name != nullptr);
@ -194,32 +223,6 @@ static bool UnregisterCollection (TRI_vocbase_t* vocbase,
TRI_WRITE_UNLOCK_COLLECTIONS_VOCBASE(vocbase);
if (! writeMarker) {
return true;
}
int res = TRI_ERROR_NO_ERROR;
try {
triagens::wal::DropCollectionMarker marker(vocbase->_id, collection->_cid);
triagens::wal::SlotInfoCopy slotInfo = triagens::wal::LogfileManager::instance()->allocateAndWrite(marker, false);
if (slotInfo.errorCode != TRI_ERROR_NO_ERROR) {
THROW_ARANGO_EXCEPTION(slotInfo.errorCode);
}
return true;
}
catch (triagens::arango::Exception const& ex) {
res = ex.code();
}
catch (...) {
res = TRI_ERROR_INTERNAL;
}
LOG_WARNING("could not save collection drop marker in log: %s", TRI_errno_string(res));
// TODO: what to do here?
return true;
}
@ -1479,6 +1482,14 @@ TRI_vocbase_t* TRI_OpenVocBase (TRI_server_t* server,
// start cleanup thread
TRI_InitThread(&vocbase->_cleanup);
TRI_StartThread(&vocbase->_cleanup, nullptr, "[cleanup]", TRI_CleanupVocBase, vocbase);
vocbase->_replicationApplier = TRI_CreateReplicationApplier(server, vocbase);
if (vocbase->_replicationApplier == nullptr) {
// TODO
LOG_FATAL_AND_EXIT("initialising replication applier for database '%s' failed", vocbase->_name);
}
// we are done
return vocbase;
@ -1990,7 +2001,7 @@ int TRI_DropCollectionVocBase (TRI_vocbase_t* vocbase,
if (collection->_status == TRI_VOC_COL_STATUS_DELETED) {
// mark collection as deleted
UnregisterCollection(vocbase, collection, writeMarker);
UnregisterCollection(vocbase, collection);
TRI_WRITE_UNLOCK_STATUS_VOCBASE_COL(collection);
@ -2044,7 +2055,10 @@ int TRI_DropCollectionVocBase (TRI_vocbase_t* vocbase,
}
collection->_status = TRI_VOC_COL_STATUS_DELETED;
UnregisterCollection(vocbase, collection, writeMarker);
UnregisterCollection(vocbase, collection);
if (writeMarker) {
WriteDropCollectionMarker(vocbase, collection->_cid);
}
TRI_WRITE_UNLOCK_STATUS_VOCBASE_COL(collection);
@ -2099,7 +2113,10 @@ int TRI_DropCollectionVocBase (TRI_vocbase_t* vocbase,
collection->_status = TRI_VOC_COL_STATUS_DELETED;
UnregisterCollection(vocbase, collection, writeMarker);
UnregisterCollection(vocbase, collection);
if (writeMarker) {
WriteDropCollectionMarker(vocbase, collection->_cid);
}
TRI_WRITE_UNLOCK_STATUS_VOCBASE_COL(collection);

View File

@ -47,12 +47,11 @@ struct TRI_document_collection_t;
struct TRI_col_info_s;
struct TRI_general_cursor_store_s;
struct TRI_json_s;
struct TRI_replication_applier_s;
struct TRI_replication_logger_s;
struct TRI_server_s;
struct TRI_vector_pointer_s;
struct TRI_vector_string_s;
struct TRI_vocbase_defaults_s;
struct TRI_replication_applier_t;
// -----------------------------------------------------------------------------
// --SECTION-- public macros
@ -310,7 +309,7 @@ typedef struct TRI_vocbase_s {
std::set<TRI_voc_tid_t>* _oldTransactions;
struct TRI_replication_applier_s* _replicationApplier;
struct TRI_replication_applier_t* _replicationApplier;
// state of the database
// 0 = inactive

View File

@ -30,6 +30,7 @@
#include "RecoverState.h"
#include "Basics/FileUtils.h"
#include "VocBase/collection.h"
#include "VocBase/replication-applier.h"
#include "VocBase/voc-shaper.h"
#include "Wal/LogfileManager.h"
#include "Wal/Slots.h"
@ -62,6 +63,7 @@ RecoverState::RecoverState (TRI_server_t* server,
remoteTransactionDatabases(),
droppedCollections(),
droppedDatabases(),
droppedIndexes(),
lastTick(0),
logfilesToProcess(),
openedCollections(),
@ -99,6 +101,25 @@ RecoverState::~RecoverState () {
////////////////////////////////////////////////////////////////////////////////
void RecoverState::releaseResources () {
// hand over running remote transactions to the applier
for (auto it = runningRemoteTransactions.begin(); it != runningRemoteTransactions.end(); ++it) {
auto* trx = (*it).second;
TRI_vocbase_t* vocbase = trx->vocbase();
TRI_ASSERT(vocbase != nullptr);
auto* applier = vocbase->_replicationApplier;
TRI_ASSERT(applier != nullptr);
applier->addRemoteTransaction(trx);
}
// reset trx counter as we're moving transactions from this thread to a potential other
triagens::arango::TransactionBase::setNumbers(0, 0);
runningRemoteTransactions.clear();
// release all collections
for (auto it = openedCollections.begin(); it != openedCollections.end(); ++it) {
TRI_vocbase_col_t* collection = (*it).second;
@ -311,10 +332,10 @@ int RecoverState::executeSingleOperation (TRI_voc_tick_t databaseId,
THROW_ARANGO_EXCEPTION(res);
}
trx->addHint(TRI_TRANSACTION_HINT_NO_BEGIN_MARKER);
trx->addHint(TRI_TRANSACTION_HINT_NO_ABORT_MARKER);
trx->addHint(TRI_TRANSACTION_HINT_NO_THROTTLING);
trx->addHint(TRI_TRANSACTION_HINT_LOCK_NEVER);
trx->addHint(TRI_TRANSACTION_HINT_NO_BEGIN_MARKER, false);
trx->addHint(TRI_TRANSACTION_HINT_NO_ABORT_MARKER, false);
trx->addHint(TRI_TRANSACTION_HINT_NO_THROTTLING, false);
trx->addHint(TRI_TRANSACTION_HINT_LOCK_NEVER, false);
res = trx->begin();
@ -433,6 +454,33 @@ bool RecoverState::InitialScanMarker (TRI_df_marker_t const* marker,
state->remoteTransactions.erase(m->_transactionId);
break;
}
// -----------------------------------------------------------------------------
// create markers
// -----------------------------------------------------------------------------
case TRI_WAL_MARKER_CREATE_COLLECTION: {
collection_create_marker_t const* m = reinterpret_cast<collection_create_marker_t const*>(marker);
// undo a potential drop marker discovered before for the same collection
state->droppedCollections.erase(m->_collectionId);
break;
}
case TRI_WAL_MARKER_CREATE_DATABASE: {
database_create_marker_t const* m = reinterpret_cast<database_create_marker_t const*>(marker);
// undo a potential drop marker discovered before for the same database
state->droppedDatabases.erase(m->_databaseId);
break;
}
case TRI_WAL_MARKER_CREATE_INDEX: {
index_create_marker_t const* m = reinterpret_cast<index_create_marker_t const*>(marker);
// undo a potential drop marker discovered before for the same index
state->droppedIndexes.erase(m->_indexId);
break;
}
// -----------------------------------------------------------------------------
// drop markers
@ -451,6 +499,13 @@ bool RecoverState::InitialScanMarker (TRI_df_marker_t const* marker,
state->droppedDatabases.insert(m->_databaseId);
break;
}
case TRI_WAL_MARKER_DROP_INDEX: {
index_drop_marker_t const* m = reinterpret_cast<index_drop_marker_t const*>(marker);
// note that the index was dropped and doesn't need to be recovered
state->droppedIndexes.insert(m->_indexId);
break;
}
}
return true;
@ -755,6 +810,7 @@ bool RecoverState::ReplayMarker (TRI_df_marker_t const* marker,
case TRI_WAL_MARKER_BEGIN_REMOTE_TRANSACTION: {
transaction_remote_begin_marker_t const* m = reinterpret_cast<transaction_remote_begin_marker_t const*>(marker);
TRI_voc_tick_t databaseId = m->_databaseId;
TRI_voc_tid_t externalId = m->_externalId;
// start a remote transaction
TRI_vocbase_t* vocbase = state->useDatabase(databaseId);
@ -764,14 +820,17 @@ bool RecoverState::ReplayMarker (TRI_df_marker_t const* marker,
TRI_errno_string(TRI_ERROR_ARANGO_DATABASE_NOT_FOUND));
}
auto trx = new RemoteTransactionType(vocbase);
auto trx = new RemoteTransactionType(state->server, vocbase, externalId);
if (trx == nullptr) {
LOG_WARNING("unable to start transaction: %s", TRI_errno_string(TRI_ERROR_OUT_OF_MEMORY));
return state->shouldAbort();
}
trx->addHint(TRI_TRANSACTION_HINT_NO_BEGIN_MARKER, true);
int res = trx->begin();
if (res != TRI_ERROR_NO_ERROR) {
LOG_WARNING("unable to start transaction: %s", TRI_errno_string(TRI_ERROR_OUT_OF_MEMORY));
delete trx;
@ -956,16 +1015,18 @@ bool RecoverState::ReplayMarker (TRI_df_marker_t const* marker,
return true;
}
TRI_document_collection_t* document = state->getCollection(databaseId, collectionId);
if (document == nullptr) {
return state->shouldAbort();
}
if (state->droppedIndexes.find(indexId) == state->droppedIndexes.end()) {
TRI_document_collection_t* document = state->getCollection(databaseId, collectionId);
if (document == nullptr) {
return state->shouldAbort();
}
// fake transaction to satisfy assertions
triagens::arango::TransactionBase trx(true);
// fake transaction to satisfy assertions
triagens::arango::TransactionBase trx(true);
// ignore any potential error returned by this call
TRI_DropIndexDocumentCollection(document, indexId, false);
// ignore any potential error returned by this call
TRI_DropIndexDocumentCollection(document, indexId, false);
}
break;
}
@ -974,26 +1035,27 @@ bool RecoverState::ReplayMarker (TRI_df_marker_t const* marker,
collection_drop_marker_t const* m = reinterpret_cast<collection_drop_marker_t const*>(marker);
TRI_voc_cid_t collectionId = m->_collectionId;
TRI_voc_tick_t databaseId = m->_databaseId;
if (state->isDropped(databaseId, collectionId)) {
TRI_vocbase_t* vocbase = state->useDatabase(databaseId);
if (vocbase == nullptr) {
// database already deleted - do nothing
return true;
}
TRI_vocbase_col_t* collection = TRI_LookupCollectionByIdVocBase(vocbase, collectionId);
if (collection == nullptr) {
// collection already deleted - do nothing
return true;
}
// fake transaction to satisfy assertions
triagens::arango::TransactionBase trx(true);
TRI_vocbase_t* vocbase = state->useDatabase(databaseId);
if (vocbase == nullptr) {
// database already deleted - do nothing
return true;
// ignore any potential error returned by this call
TRI_DropCollectionVocBase(vocbase, collection, false);
}
TRI_vocbase_col_t* collection = TRI_LookupCollectionByIdVocBase(vocbase, collectionId);
if (collection == nullptr) {
// collection already deleted - do nothing
return true;
}
// fake transaction to satisfy assertions
triagens::arango::TransactionBase trx(true);
// ignore any potential error returned by this call
TRI_DropCollectionVocBase(vocbase, collection, false);
break;
}
@ -1003,14 +1065,12 @@ bool RecoverState::ReplayMarker (TRI_df_marker_t const* marker,
TRI_voc_tick_t databaseId = m->_databaseId;
if (state->isDropped(databaseId)) {
return true;
}
// fake transaction to satisfy assertions
triagens::arango::TransactionBase trx(true);
// fake transaction to satisfy assertions
triagens::arango::TransactionBase trx(true);
// ignore any potential error returned by this call
TRI_DropByIdDatabaseServer(state->server, databaseId, false);
// ignore any potential error returned by this call
TRI_DropByIdDatabaseServer(state->server, databaseId, false);
}
break;
}

View File

@ -274,6 +274,7 @@ namespace triagens {
std::unordered_set<TRI_voc_tick_t> remoteTransactionDatabases;
std::unordered_set<TRI_voc_cid_t> droppedCollections;
std::unordered_set<TRI_voc_tick_t> droppedDatabases;
std::unordered_set<TRI_idx_iid_t> droppedIndexes;
TRI_voc_tick_t lastTick;
std::vector<Logfile*> logfilesToProcess;
std::unordered_map<TRI_voc_cid_t, TRI_vocbase_col_t*> openedCollections;

View File

@ -1,9 +1,7 @@
/*jslint indent: 2, nomen: true, maxlen: 120, vars: true, white: true, plusplus: true, nonpropdel: true, proto: true */
/*jslint sloppy: true, regexp: true */
/*global require, module, Module, ArangoError, SleepAndRequeue,
REPLICATION_LOGGER_STATE, REPLICATION_LOGGER_CONFIGURE, REPLICATION_APPLIER_CONFIGURE, REPLICATION_APPLIER_START,
REPLICATION_APPLIER_STOP, REPLICATION_APPLIER_FORGET, REPLICATION_APPLIER_STATE,
REPLICATION_SYNCHRONISE, REPLICATION_SERVER_ID, CONFIGURE_ENDPOINT, REMOVE_ENDPOINT, LIST_ENDPOINTS,
CONFIGURE_ENDPOINT, REMOVE_ENDPOINT, LIST_ENDPOINTS,
SYS_BASE64DECODE, SYS_BASE64ENCODE, SYS_DEBUG_SEGFAULT,
SYS_DEBUG_CAN_USE_FAILAT, SYS_DEBUG_SET_FAILAT, SYS_DEBUG_REMOVE_FAILAT, SYS_DEBUG_CLEAR_FAILAT,
SYS_DOWNLOAD, SYS_EXECUTE, SYS_GET_CURRENT_REQUEST, SYS_GET_CURRENT_RESPONSE,
@ -264,87 +262,6 @@
// --SECTION-- public functions
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @brief getStateReplicationLogger
////////////////////////////////////////////////////////////////////////////////
if (typeof REPLICATION_LOGGER_STATE !== "undefined") {
exports.getStateReplicationLogger = REPLICATION_LOGGER_STATE;
delete REPLICATION_LOGGER_STATE;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief configureReplicationLogger
////////////////////////////////////////////////////////////////////////////////
if (typeof REPLICATION_LOGGER_CONFIGURE !== "undefined") {
exports.configureReplicationLogger = REPLICATION_LOGGER_CONFIGURE;
delete REPLICATION_LOGGER_CONFIGURE;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief configureReplicationApplier
////////////////////////////////////////////////////////////////////////////////
if (typeof REPLICATION_APPLIER_CONFIGURE !== "undefined") {
exports.configureReplicationApplier = REPLICATION_APPLIER_CONFIGURE;
delete REPLICATION_APPLIER_CONFIGURE;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief startReplicationApplier
////////////////////////////////////////////////////////////////////////////////
if (typeof REPLICATION_APPLIER_START !== "undefined") {
exports.startReplicationApplier = REPLICATION_APPLIER_START;
delete REPLICATION_APPLIER_START;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief stopReplicationApplier
////////////////////////////////////////////////////////////////////////////////
if (typeof REPLICATION_APPLIER_STOP !== "undefined") {
exports.stopReplicationApplier = REPLICATION_APPLIER_STOP;
delete REPLICATION_APPLIER_STOP;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief getStateReplicationApplier
////////////////////////////////////////////////////////////////////////////////
if (typeof REPLICATION_APPLIER_STATE !== "undefined") {
exports.getStateReplicationApplier = REPLICATION_APPLIER_STATE;
delete REPLICATION_APPLIER_STATE;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief forgetStateReplicationApplier
////////////////////////////////////////////////////////////////////////////////
if (typeof REPLICATION_APPLIER_FORGET !== "undefined") {
exports.forgetStateReplicationApplier = REPLICATION_APPLIER_FORGET;
delete REPLICATION_APPLIER_FORGET;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief sychroniseReplication
////////////////////////////////////////////////////////////////////////////////
if (typeof REPLICATION_SYNCHRONISE !== "undefined") {
exports.synchroniseReplication = REPLICATION_SYNCHRONISE;
delete REPLICATION_SYNCHRONISE;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief serverId
////////////////////////////////////////////////////////////////////////////////
if (typeof REPLICATION_SERVER_ID !== "undefined") {
exports.serverId = REPLICATION_SERVER_ID;
delete REPLICATION_SERVER_ID;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief configureEndpoint
////////////////////////////////////////////////////////////////////////////////

View File

@ -49,7 +49,7 @@ function ReplicationLoggerSuite () {
var cn2 = "UnitTestsReplication2";
var waitForSync = function () {
internal.wait(0.75, false);
internal.wait(1, false);
};
var getLogEntries = function (tick, type) {
@ -122,7 +122,6 @@ function ReplicationLoggerSuite () {
////////////////////////////////////////////////////////////////////////////////
tearDown : function () {
replication.logger.stop();
replication.logger.properties({ maxEvents: 1048576 });
db._drop(cn);
db._drop(cn2);
@ -1701,7 +1700,7 @@ function ReplicationApplierSuite () {
////////////////////////////////////////////////////////////////////////////////
setUp : function () {
replication.applier.stop();
replication.applier.shutdown();
replication.applier.forget();
},
@ -1710,7 +1709,7 @@ function ReplicationApplierSuite () {
////////////////////////////////////////////////////////////////////////////////
tearDown : function () {
replication.applier.stop();
replication.applier.shutdown();
},
////////////////////////////////////////////////////////////////////////////////
@ -1856,14 +1855,14 @@ function ReplicationApplierSuite () {
assertTrue(state.state.running);
// stop
replication.applier.stop();
replication.applier.shutdown();
state = replication.applier.state();
assertFalse(state.state.running);
// stop again
replication.applier.stop();
replication.applier.shutdown();
state = replication.applier.state();
assertFalse(state.state.running);
},
@ -1987,9 +1986,8 @@ function ReplicationSyncSuite () {
////////////////////////////////////////////////////////////////////////////////
setUp : function () {
replication.applier.stop();
replication.applier.shutdown();
replication.applier.forget();
replication.logger.stop();
},
////////////////////////////////////////////////////////////////////////////////

View File

@ -1,7 +1,10 @@
/*jslint indent: 2, nomen: true, maxlen: 120, sloppy: true, vars: true, white: true, plusplus: true, nonpropdel: true */
/*global require, db, ArangoCollection, ArangoDatabase, ArangoCursor, module,
ShapedJson, RELOAD_AUTH, SYS_DEFINE_ACTION, SYS_EXECUTE_GLOBAL_CONTEXT_FUNCTION,
AHUACATL_RUN, AHUACATL_PARSE, AHUACATL_EXPLAIN, WAL_FLUSH, WAL_PROPERTIES */
AHUACATL_RUN, AHUACATL_PARSE, AHUACATL_EXPLAIN, WAL_FLUSH, WAL_PROPERTIES,
REPLICATION_LOGGER_STATE, REPLICATION_LOGGER_CONFIGURE, REPLICATION_SERVER_ID,
REPLICATION_APPLIER_CONFIGURE, REPLICATION_APPLIER_START, REPLICATION_APPLIER_SHUTDOWN,
REPLICATION_APPLIER_FORGET, REPLICATION_APPLIER_STATE, REPLICATION_SYNCHRONISE */
////////////////////////////////////////////////////////////////////////////////
/// @brief module "internal"
@ -216,6 +219,87 @@
delete SYS_EXECUTE_GLOBAL_CONTEXT_FUNCTION;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief getStateReplicationLogger
////////////////////////////////////////////////////////////////////////////////
if (typeof REPLICATION_LOGGER_STATE !== "undefined") {
internal.getStateReplicationLogger = REPLICATION_LOGGER_STATE;
delete REPLICATION_LOGGER_STATE;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief configureReplicationLogger
////////////////////////////////////////////////////////////////////////////////
if (typeof REPLICATION_LOGGER_CONFIGURE !== "undefined") {
internal.configureReplicationLogger = REPLICATION_LOGGER_CONFIGURE;
delete REPLICATION_LOGGER_CONFIGURE;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief configureReplicationApplier
////////////////////////////////////////////////////////////////////////////////
if (typeof REPLICATION_APPLIER_CONFIGURE !== "undefined") {
internal.configureReplicationApplier = REPLICATION_APPLIER_CONFIGURE;
delete REPLICATION_APPLIER_CONFIGURE;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief startReplicationApplier
////////////////////////////////////////////////////////////////////////////////
if (typeof REPLICATION_APPLIER_START !== "undefined") {
internal.startReplicationApplier = REPLICATION_APPLIER_START;
delete REPLICATION_APPLIER_START;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief shutdownReplicationApplier
////////////////////////////////////////////////////////////////////////////////
if (typeof REPLICATION_APPLIER_SHUTDOWN !== "undefined") {
internal.shutdownReplicationApplier = REPLICATION_APPLIER_SHUTDOWN;
delete REPLICATION_APPLIER_SHUTODWN;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief getStateReplicationApplier
////////////////////////////////////////////////////////////////////////////////
if (typeof REPLICATION_APPLIER_STATE !== "undefined") {
internal.getStateReplicationApplier = REPLICATION_APPLIER_STATE;
delete REPLICATION_APPLIER_STATE;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief forgetStateReplicationApplier
////////////////////////////////////////////////////////////////////////////////
if (typeof REPLICATION_APPLIER_FORGET !== "undefined") {
internal.forgetStateReplicationApplier = REPLICATION_APPLIER_FORGET;
delete REPLICATION_APPLIER_FORGET;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief sychroniseReplication
////////////////////////////////////////////////////////////////////////////////
if (typeof REPLICATION_SYNCHRONISE !== "undefined") {
internal.synchroniseReplication = REPLICATION_SYNCHRONISE;
delete REPLICATION_SYNCHRONISE;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief serverId
////////////////////////////////////////////////////////////////////////////////
if (typeof REPLICATION_SERVER_ID !== "undefined") {
internal.serverId = REPLICATION_SERVER_ID;
delete REPLICATION_SERVER_ID;
}
}());
// -----------------------------------------------------------------------------

View File

@ -108,7 +108,17 @@ applier.start = function (initialTick) {
applier.stop = function () {
'use strict';
return internal.stopReplicationApplier();
throw "stop is not supported anymore - please use shutdown";
};
////////////////////////////////////////////////////////////////////////////////
/// @brief shuts down the replication applier
////////////////////////////////////////////////////////////////////////////////
applier.shutdown = function () {
'use strict';
return internal.shutdownReplicationApplier();
};
////////////////////////////////////////////////////////////////////////////////

View File

@ -46,16 +46,6 @@
db._useDatabase("UnitTestsDumpSrc");
// clean up first
db._drop("UnitTestsDumpEmpty");
db._drop("UnitTestsDumpMany");
db._drop("UnitTestsDumpEdges");
db._drop("UnitTestsDumpOrder");
db._drop("UnitTestsDumpRemoved");
db._drop("UnitTestsDumpIndexes");
db._drop("UnitTestsDumpTruncated");
db._drop("UnitTestsDumpKeygen");
db._drop("UnitTestsDumpStrings");
// this remains empty
db._create("UnitTestsDumpEmpty", { waitForSync: true });
@ -140,6 +130,60 @@
c.save({ _key: "text" + i, value: t });
});
c = db._create("UnitTestsDumpTransactionCommit");
db._executeTransaction({
collections: {
write: "UnitTestsDumpTransactionCommit"
},
action: function (params) {
var c = require("internal").db.UnitTestsDumpTransactionCommit;
for (var i = 0; i < 1000; ++i) {
c.save({ _key: "test" + i, value1: i, value2: "this is a test", value3: "test" + i });
}
}
});
c = db._create("UnitTestsDumpTransactionUpdate");
for (i = 0; i < 1000; ++i) {
c.save({ _key: "test" + i, value1: i, value2: "this is a test", value3: "test" + i });
}
db._executeTransaction({
collections: {
write: "UnitTestsDumpTransactionUpdate"
},
action: function (params) {
var c = require("internal").db.UnitTestsDumpTransactionUpdate;
for (var i = 0; i < 1000; i += 2) {
c.update("test" + i, { value3 : i });
}
}
});
c = db._create("UnitTestsDumpTransactionAbort");
c.save({ _key: "foo" });
try {
db._executeTransaction({
collections: {
write: "UnitTestsDumpTransactionAbort"
},
action: function (params) {
var c = require("internal").db.UnitTestsDumpTransactionAbort;
c.remove("foo");
for (i = 0; i < 1000; ++i) {
c.save({ _key: "test" + i, value1: i, value2: "this is a test", value3: "test" + i });
}
throw "rollback!";
}
});
throw "unexpected!";
}
catch (err) {
}
})();
return true;

View File

@ -328,6 +328,59 @@ function dumpTestSuite () {
assertEqual(t, doc.value);
});
},
////////////////////////////////////////////////////////////////////////////////
/// @brief test committed trx
////////////////////////////////////////////////////////////////////////////////
testTransactionCommit : function () {
var c = db._collection("UnitTestsDumpTransactionCommit");
assertEqual(1000, c.count());
for (var i = 0; i < 1000; ++i) {
var doc = c.document("test" + i);
assertEqual(i, doc.value1);
assertEqual("this is a test", doc.value2);
assertEqual("test" + i, doc.value3);
}
},
////////////////////////////////////////////////////////////////////////////////
/// @brief test committed trx
////////////////////////////////////////////////////////////////////////////////
testTransactionUpdate : function () {
var c = db._collection("UnitTestsDumpTransactionUpdate");
assertEqual(1000, c.count());
for (var i = 0; i < 1000; ++i) {
var doc = c.document("test" + i);
assertEqual(i, doc.value1);
assertEqual("this is a test", doc.value2);
if (i % 2 == 0) {
assertEqual(i, doc.value3);
}
else {
assertEqual("test" + i, doc.value3);
}
}
},
////////////////////////////////////////////////////////////////////////////////
/// @brief test aborted trx
////////////////////////////////////////////////////////////////////////////////
testTransactionAbort : function () {
var c = db._collection("UnitTestsDumpTransactionAbort");
assertEqual(1, c.count());
assertTrue(c.exists("foo"));
}
};