1
0
Fork 0

no waitForSync during recovery

This commit is contained in:
jsteemann 2016-04-23 17:39:14 +02:00
parent 090068c3e1
commit 7739d0ed78
4 changed files with 20 additions and 14 deletions

View File

@ -3345,11 +3345,9 @@ int TRI_document_collection_t::insert(Transaction* trx, VPackSlice const slice,
return TRI_ERROR_DEBUG;
}
arangodb::CollectionWriteLocker collectionLocker(this, lock);
auto actualMarker = (options.recoveryMarker == nullptr ? marker.get() : options.recoveryMarker);
bool const freeMarker = (options.recoveryMarker == nullptr);
arangodb::wal::DocumentOperation operation(
trx, actualMarker, freeMarker, this, TRI_VOC_DOCUMENT_OPERATION_INSERT);
@ -3368,6 +3366,8 @@ int TRI_document_collection_t::insert(Transaction* trx, VPackSlice const slice,
// test what happens if no header can be acquired
THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG);
}
arangodb::CollectionWriteLocker collectionLocker(this, lock);
// create a new header
TRI_doc_mptr_t* header = operation.header = _masterPointers.request();

View File

@ -22,11 +22,8 @@
////////////////////////////////////////////////////////////////////////////////
#include "transaction.h"
#include "Aql/QueryCache.h"
#include "Basics/conversions.h"
#include "Logger/Logger.h"
#include "Basics/tri-strings.h"
#include "Basics/Exceptions.h"
#include "VocBase/DatafileHelper.h"
#include "VocBase/collection.h"
@ -802,9 +799,10 @@ bool TRI_FreeTransaction(TRI_transaction_t* trx) {
////////////////////////////////////////////////////////////////////////////////
TRI_transaction_type_e TRI_GetTransactionTypeFromStr(char const* s) {
if (TRI_EqualString(s, "read")) {
if (strcmp(s, "read") == 0) {
return TRI_TRANSACTION_READ;
} else if (TRI_EqualString(s, "write")) {
}
if (strcmp(s, "write") == 0) {
return TRI_TRANSACTION_WRITE;
}
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL,
@ -1044,14 +1042,18 @@ int TRI_AddOperationTransaction(TRI_transaction_t* trx,
TRI_document_collection_t* document = operation.document;
bool const isSingleOperationTransaction = IsSingleOperationTransaction(trx);
// upgrade the info for the transaction
if (waitForSync || document->_info.waitForSync()) {
trx->_waitForSync = true;
if (HasHint(trx, TRI_TRANSACTION_HINT_RECOVERY)) {
// turn off all waitForSync operations during recovery
waitForSync = false;
}
if (isSingleOperationTransaction) {
// upgrade the info for the transaction
if (!waitForSync) {
waitForSync |= document->_info.waitForSync();
}
if (waitForSync) {
trx->_waitForSync = true;
}
TRI_IF_FAILURE("TransactionOperationNoSlot") { return TRI_ERROR_DEBUG; }
@ -1059,7 +1061,7 @@ int TRI_AddOperationTransaction(TRI_transaction_t* trx,
THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG);
}
if (!trx->_beginWritten) {
if (!isSingleOperationTransaction && !trx->_beginWritten) {
int res = WriteBeginMarker(trx);
if (res != TRI_ERROR_NO_ERROR) {

View File

@ -99,7 +99,8 @@ enum TRI_transaction_hint_e {
TRI_TRANSACTION_HINT_NO_THROTTLING = 32,
TRI_TRANSACTION_HINT_TRY_LOCK = 64,
TRI_TRANSACTION_HINT_NO_COMPACTION_LOCK = 128,
TRI_TRANSACTION_HINT_NO_USAGE_LOCK = 256
TRI_TRANSACTION_HINT_NO_USAGE_LOCK = 256,
TRI_TRANSACTION_HINT_RECOVERY = 512
};
////////////////////////////////////////////////////////////////////////////////

View File

@ -320,6 +320,7 @@ int RecoverState::executeSingleOperation(
trx.addHint(TRI_TRANSACTION_HINT_NO_ABORT_MARKER, false);
trx.addHint(TRI_TRANSACTION_HINT_NO_THROTTLING, false);
trx.addHint(TRI_TRANSACTION_HINT_LOCK_NEVER, false);
trx.addHint(TRI_TRANSACTION_HINT_RECOVERY, false); // to turn off waitForSync!
res = trx.begin();
@ -506,6 +507,7 @@ bool RecoverState::ReplayMarker(TRI_df_marker_t const* marker, void* data,
options.silent = true;
options.recoveryMarker = envelope;
options.isRestore = true;
options.waitForSync = false;
// try an insert first
OperationResult opRes = trx->insert(collectionName, VPackSlice(ptr), options);
@ -565,6 +567,7 @@ bool RecoverState::ReplayMarker(TRI_df_marker_t const* marker, void* data,
OperationOptions options;
options.silent = true;
options.recoveryMarker = envelope;
options.waitForSync = false;
OperationResult opRes = trx->remove(collectionName, VPackSlice(ptr), options);
int res = opRes.code;