1
0
Fork 0

wait for sync thread outside of lock

This commit is contained in:
Jan Steemann 2014-12-17 14:21:24 +01:00
parent 363df745ea
commit db1ddb880b
4 changed files with 49 additions and 16 deletions

View File

@ -141,7 +141,7 @@ TRI_shaper_t* TRI_document_collection_t::getShaper () const {
/// @brief add a WAL operation for a transaction collection
////////////////////////////////////////////////////////////////////////////////
int TRI_AddOperationTransaction (triagens::wal::DocumentOperation&, bool);
int TRI_AddOperationTransaction (triagens::wal::DocumentOperation&, bool&);
// -----------------------------------------------------------------------------
// --SECTION-- forward declarations
@ -588,7 +588,7 @@ static int InsertDocument (TRI_transaction_collection_t* trxCollection,
TRI_doc_mptr_t* header,
triagens::wal::DocumentOperation& operation,
TRI_doc_mptr_copy_t* mptr,
bool syncRequested) {
bool& waitForSync) {
TRI_ASSERT(header != nullptr);
TRI_ASSERT(mptr != nullptr);
@ -627,7 +627,7 @@ static int InsertDocument (TRI_transaction_collection_t* trxCollection,
THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG);
}
res = TRI_AddOperationTransaction(operation, syncRequested);
res = TRI_AddOperationTransaction(operation, waitForSync);
if (res != TRI_ERROR_NO_ERROR) {
return res;
@ -4896,6 +4896,7 @@ int TRI_RemoveShapedJsonDocumentCollection (TRI_transaction_collection_t* trxCol
TRI_doc_mptr_t* header;
int res;
TRI_voc_tick_t markerTick = 0;
{
TRI_IF_FAILURE("RemoveDocumentNoLock") {
// test what happens if no lock can be acquired
@ -4946,6 +4947,15 @@ int TRI_RemoveShapedJsonDocumentCollection (TRI_transaction_collection_t* trxCol
}
res = TRI_AddOperationTransaction(operation, forceSync);
if (res == TRI_ERROR_NO_ERROR && forceSync) {
markerTick = operation.tick;
}
}
if (markerTick > 0) {
// need to wait for tick, outside the lock
triagens::wal::LogfileManager::instance()->slots()->waitForTick(markerTick);
}
return res;
@ -5019,6 +5029,7 @@ int TRI_InsertShapedJsonDocumentCollection (TRI_transaction_collection_t* trxCol
TRI_ASSERT(marker != nullptr);
TRI_voc_tick_t markerTick = 0;
// now insert into indexes
{
TRI_IF_FAILURE("InsertDocumentNoLock") {
@ -5064,9 +5075,18 @@ int TRI_InsertShapedJsonDocumentCollection (TRI_transaction_collection_t* trxCol
if (res == TRI_ERROR_NO_ERROR) {
TRI_ASSERT(mptr->getDataPtr() != nullptr); // PROTECTED by trx in trxCollection
if (forceSync) {
markerTick = operation.tick;
}
}
}
if (markerTick > 0) {
// need to wait for tick, outside the lock
triagens::wal::LogfileManager::instance()->slots()->waitForTick(markerTick);
}
return res;
}
@ -5097,7 +5117,7 @@ int TRI_UpdateShapedJsonDocumentCollection (TRI_transaction_collection_t* trxCol
//TRI_ASSERT_EXPENSIVE(lock || TRI_IsLockedCollectionTransaction(trxCollection, TRI_TRANSACTION_WRITE, 0));
int res = TRI_ERROR_NO_ERROR;
TRI_voc_tick_t markerTick = 0;
{
TRI_IF_FAILURE("UpdateDocumentNoLock") {
return TRI_ERROR_DEBUG;
@ -5154,6 +5174,10 @@ int TRI_UpdateShapedJsonDocumentCollection (TRI_transaction_collection_t* trxCol
operation.init();
res = UpdateDocument(trxCollection, oldHeader, operation, mptr, forceSync);
if (res == TRI_ERROR_NO_ERROR && forceSync) {
markerTick = operation.tick;
}
}
if (res == TRI_ERROR_NO_ERROR) {
@ -5161,6 +5185,11 @@ int TRI_UpdateShapedJsonDocumentCollection (TRI_transaction_collection_t* trxCol
TRI_ASSERT(mptr->_rid > 0);
}
if (markerTick > 0) {
// need to wait for tick, outside the lock
triagens::wal::LogfileManager::instance()->slots()->waitForTick(markerTick);
}
return res;
}

View File

@ -2797,9 +2797,8 @@ bool TRI_MSync (int fd,
char* b = (char*)( (p / g) * g );
char* e = (char*)( ((q + g - 1) / g) * g );
int res;
res = TRI_FlushMMFile(fd, &mmHandle, b, e - b, MS_SYNC);
int res = TRI_FlushMMFile(fd, &mmHandle, b, e - b, MS_SYNC);
if (res != TRI_ERROR_NO_ERROR) {
TRI_set_errno(res);

View File

@ -1058,7 +1058,7 @@ bool TRI_IsLockedCollectionTransaction (TRI_transaction_collection_t const* trxC
////////////////////////////////////////////////////////////////////////////////
int TRI_AddOperationTransaction (triagens::wal::DocumentOperation& operation,
bool syncRequested) {
bool& waitForSync) {
TRI_transaction_collection_t* trxCollection = operation.trxCollection;
TRI_transaction_t* trx = trxCollection->_transaction;
@ -1067,13 +1067,13 @@ int TRI_AddOperationTransaction (triagens::wal::DocumentOperation& operation,
bool const isSingleOperationTransaction = IsSingleOperationTransaction(trx);
// default is false
bool waitForSync = false;
waitForSync = false;
if (isSingleOperationTransaction) {
waitForSync = syncRequested || trxCollection->_waitForSync;
waitForSync |= trxCollection->_waitForSync;
}
// upgrade the info for the transaction
if (syncRequested || trxCollection->_waitForSync) {
if (waitForSync) {
trx->_waitForSync = true;
}
@ -1105,7 +1105,7 @@ int TRI_AddOperationTransaction (triagens::wal::DocumentOperation& operation,
auto oldm = reinterpret_cast<triagens::wal::document_marker_t*>(oldmarker);
if ((oldm->_type == TRI_WAL_MARKER_DOCUMENT ||
oldm->_type == TRI_WAL_MARKER_EDGE) &&
!triagens::wal::LogfileManager::instance()->suppressShapeInformation()) {
! triagens::wal::LogfileManager::instance()->suppressShapeInformation()) {
// In this case we have to take care of the legend, we know that the
// marker does not have a legend so far, so first try to get away
// with this:
@ -1113,7 +1113,7 @@ int TRI_AddOperationTransaction (triagens::wal::DocumentOperation& operation,
TRI_voc_cid_t cid = oldm->_collectionId;
TRI_shape_sid_t sid = oldm->_shape;
void* oldLegend;
triagens::wal::SlotInfoCopy slotInfo = triagens::wal::LogfileManager::instance()->allocateAndWrite(oldmarker, operation.marker->size(), waitForSync, cid, sid, 0, oldLegend);
triagens::wal::SlotInfoCopy slotInfo = triagens::wal::LogfileManager::instance()->allocateAndWrite(oldmarker, operation.marker->size(), false, cid, sid, 0, oldLegend);
if (slotInfo.errorCode == TRI_ERROR_LEGEND_NOT_IN_WAL_FILE) {
// Oh dear, we have to build a legend and patch the marker:
triagens::basics::JsonLegend legend(document->getShaper()); // PROTECTED by trx in trxCollection
@ -1139,14 +1139,14 @@ int TRI_AddOperationTransaction (triagens::wal::DocumentOperation& operation,
auto newm = reinterpret_cast<triagens::wal::document_marker_t*>(newmarker);
newm->_size = newMarkerSize;
newm->_offsetJson = (uint32_t) (oldm->_offsetLegend + legend.getSize());
triagens::wal::SlotInfoCopy slotInfo2 = triagens::wal::LogfileManager::instance()->allocateAndWrite(newmarker, newMarkerSize, waitForSync, cid, sid, newm->_offsetLegend, oldLegend);
triagens::wal::SlotInfoCopy slotInfo2 = triagens::wal::LogfileManager::instance()->allocateAndWrite(newmarker, newMarkerSize, false, cid, sid, newm->_offsetLegend, oldLegend);
delete[] newmarker;
if (slotInfo2.errorCode != TRI_ERROR_NO_ERROR) {
return slotInfo2.errorCode;
}
fid = slotInfo2.logfileId;
position = slotInfo2.mem;
operation.tick = slotInfo2.tick;
}
}
else if (slotInfo.errorCode != TRI_ERROR_NO_ERROR) {
@ -1159,6 +1159,7 @@ int TRI_AddOperationTransaction (triagens::wal::DocumentOperation& operation,
-reinterpret_cast<char*>(legendPtr);
// This means that we can find the old legend relative to
// the new position in the same WAL file.
operation.tick = slotInfo.tick;
fid = slotInfo.logfileId;
position = slotInfo.mem;
}
@ -1166,11 +1167,12 @@ int TRI_AddOperationTransaction (triagens::wal::DocumentOperation& operation,
}
else {
// No document or edge marker, just append it to the WAL:
triagens::wal::SlotInfoCopy slotInfo = triagens::wal::LogfileManager::instance()->allocateAndWrite(operation.marker->mem(), operation.marker->size(), waitForSync);
triagens::wal::SlotInfoCopy slotInfo = triagens::wal::LogfileManager::instance()->allocateAndWrite(operation.marker->mem(), operation.marker->size(), false);
if (slotInfo.errorCode != TRI_ERROR_NO_ERROR) {
// some error occurred
return slotInfo.errorCode;
}
operation.tick = slotInfo.tick;
fid = slotInfo.logfileId;
position = slotInfo.mem;
}
@ -1181,7 +1183,7 @@ int TRI_AddOperationTransaction (triagens::wal::DocumentOperation& operation,
fid = operation.marker->fid();
position = operation.marker->mem();
}
TRI_ASSERT(fid > 0);
TRI_ASSERT(position != nullptr);

View File

@ -30,6 +30,7 @@ namespace triagens {
trxCollection(trxCollection),
header(nullptr),
rid(rid),
tick(0),
type(type),
status(StatusType::CREATED),
freeMarker(freeMarker) {
@ -55,6 +56,7 @@ namespace triagens {
DocumentOperation* swap () {
DocumentOperation* copy = new DocumentOperation(marker, freeMarker, trxCollection, type, rid);
copy->tick = tick;
copy->header = header;
copy->oldHeader = oldHeader;
copy->status = status;
@ -131,6 +133,7 @@ namespace triagens {
TRI_doc_mptr_t* header;
TRI_doc_mptr_copy_t oldHeader;
TRI_voc_rid_t const rid;
TRI_voc_tick_t tick;
TRI_voc_document_operation_e type;
StatusType status;
bool freeMarker;