diff --git a/arangod/Utils/SingleCollectionTransaction.h b/arangod/Utils/SingleCollectionTransaction.h index 0f78aa9dd0..9316efa554 100644 --- a/arangod/Utils/SingleCollectionTransaction.h +++ b/arangod/Utils/SingleCollectionTransaction.h @@ -168,6 +168,7 @@ namespace triagens { //////////////////////////////////////////////////////////////////////////////// inline int readRandom (TRI_doc_mptr_t* mptr, TRI_barrier_t** barrier) { + assert(mptr != nullptr); return this->readAny(this->trxCollection(), mptr, barrier); } @@ -176,6 +177,7 @@ namespace triagens { //////////////////////////////////////////////////////////////////////////////// inline int read (TRI_doc_mptr_t* mptr, const string& key) { + assert(mptr != nullptr); return this->readSingle(this->trxCollection(), mptr, key); } diff --git a/arangod/Utils/SingleCollectionWriteTransaction.h b/arangod/Utils/SingleCollectionWriteTransaction.h index ce9a9b282e..a5b58c7f26 100644 --- a/arangod/Utils/SingleCollectionWriteTransaction.h +++ b/arangod/Utils/SingleCollectionWriteTransaction.h @@ -145,6 +145,8 @@ namespace triagens { if (_numWrites++ > N) { return TRI_ERROR_TRANSACTION_INTERNAL; } + + assert(mptr != nullptr); return this->create(this->trxCollection(), TRI_DOC_MARKER_KEY_DOCUMENT, @@ -166,6 +168,8 @@ namespace triagens { if (_numWrites++ > N) { return TRI_ERROR_TRANSACTION_INTERNAL; } + + assert(mptr != nullptr); return this->create(this->trxCollection(), TRI_DOC_MARKER_KEY_EDGE, @@ -186,6 +190,8 @@ namespace triagens { if (_numWrites++ > N) { return TRI_ERROR_TRANSACTION_INTERNAL; } + + assert(mptr != nullptr); return this->create(this->trxCollection(), key, @@ -210,6 +216,8 @@ namespace triagens { return TRI_ERROR_TRANSACTION_INTERNAL; } + assert(mptr != nullptr); + return this->create(this->trxCollection(), key, 0, @@ -235,6 +243,8 @@ namespace triagens { if (_numWrites++ > N) { return TRI_ERROR_TRANSACTION_INTERNAL; } + + assert(mptr != nullptr); return this->update(this->trxCollection(), key, @@ -262,6 +272,8 @@ namespace triagens { if (_numWrites++ > N) { return TRI_ERROR_TRANSACTION_INTERNAL; } + + assert(mptr != nullptr); return this->update(this->trxCollection(), key, diff --git a/arangod/Utils/Transaction.h b/arangod/Utils/Transaction.h index 8db7131d57..2ece3c33c7 100644 --- a/arangod/Utils/Transaction.h +++ b/arangod/Utils/Transaction.h @@ -535,9 +535,10 @@ namespace triagens { int readSingle (TRI_transaction_collection_t* trxCollection, TRI_doc_mptr_t* mptr, const string& key) { + + assert(mptr != nullptr); TRI_primary_collection_t* primary = primaryCollection(trxCollection); - memset(&mptr, 0, sizeof(TRI_doc_mptr_t)); int res = primary->readDocument(trxCollection, (TRI_voc_key_t) key.c_str(), diff --git a/arangod/VocBase/document-collection.cpp b/arangod/VocBase/document-collection.cpp index 40ebd963bf..bb6559f683 100644 --- a/arangod/VocBase/document-collection.cpp +++ b/arangod/VocBase/document-collection.cpp @@ -47,10 +47,17 @@ #include "VocBase/server.h" #include "VocBase/update-policy.h" #include "VocBase/voc-shaper.h" +#include "Wal/DocumentOperation.h" #include "Wal/LogfileManager.h" #include "Wal/Marker.h" #include "Wal/Slots.h" +//////////////////////////////////////////////////////////////////////////////// +/// @brief add a WAL operation for a transaction collection +//////////////////////////////////////////////////////////////////////////////// + +int TRI_AddOperationTransaction (triagens::wal::DocumentOperation&, bool); + // ----------------------------------------------------------------------------- // --SECTION-- forward declarations // ----------------------------------------------------------------------------- @@ -153,9 +160,9 @@ static int InsertPrimaryIndex (TRI_document_collection_t* document, TRI_doc_mptr_t* found; int res; - TRI_ASSERT_MAINTAINER(document != NULL); - TRI_ASSERT_MAINTAINER(header != NULL); - TRI_ASSERT_MAINTAINER(header->_key != NULL); + TRI_ASSERT_MAINTAINER(document != nullptr); + TRI_ASSERT_MAINTAINER(header != nullptr); + TRI_ASSERT_MAINTAINER(header->_key != nullptr); primary = &document->base; @@ -166,7 +173,7 @@ static int InsertPrimaryIndex (TRI_document_collection_t* document, return res; } - if (found == NULL) { + if (found == nullptr) { // success IncreaseDocumentCount(primary); @@ -190,18 +197,12 @@ static int InsertPrimaryIndex (TRI_document_collection_t* document, static int InsertSecondaryIndexes (TRI_document_collection_t* document, TRI_doc_mptr_t const* header, const bool isRollback) { - size_t i, n; - int result; + int result = TRI_ERROR_NO_ERROR; + size_t const n = document->_allIndexes._length; - result = TRI_ERROR_NO_ERROR; - n = document->_allIndexes._length; - - for (i = 0; i < n; ++i) { - TRI_index_t* idx; - int res; - - idx = static_cast(document->_allIndexes._buffer[i]); - res = idx->insert(idx, header, isRollback); + for (size_t i = 0; i < n; ++i) { + TRI_index_t* idx = static_cast(document->_allIndexes._buffer[i]); + int res = idx->insert(idx, header, isRollback); // in case of no-memory, return immediately if (res == TRI_ERROR_OUT_OF_MEMORY) { @@ -233,7 +234,7 @@ static int DeletePrimaryIndex (TRI_document_collection_t* document, TRI_primary_collection_t* primary = &document->base; TRI_doc_mptr_t* found = static_cast(TRI_RemoveKeyAssociativePointer(&primary->_primaryIndex, header->_key)); - if (found == NULL) { + if (found == nullptr) { return TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND; } @@ -249,13 +250,10 @@ static int DeletePrimaryIndex (TRI_document_collection_t* document, static int DeleteSecondaryIndexes (TRI_document_collection_t* document, TRI_doc_mptr_t const* header, const bool isRollback) { - size_t i, n; - int result; + size_t const n = document->_allIndexes._length; + int result = TRI_ERROR_NO_ERROR; - n = document->_allIndexes._length; - result = TRI_ERROR_NO_ERROR; - - for (i = 0; i < n; ++i) { + for (size_t i = 0; i < n; ++i) { TRI_index_t* idx = static_cast(document->_allIndexes._buffer[i]); int res = idx->remove(idx, header, isRollback); @@ -358,23 +356,13 @@ static int RotateJournal (TRI_document_collection_t* document) { //////////////////////////////////////////////////////////////////////////////// static int RollbackInsert (TRI_document_collection_t* document, - TRI_doc_mptr_t* newHeader, - TRI_doc_mptr_t* oldHeader) { - int res; - - // there is no old header - assert(oldHeader == NULL); + TRI_doc_mptr_t* header) { // ignore any errors we're getting from this - DeletePrimaryIndex(document, newHeader, true); - DeleteSecondaryIndexes(document, newHeader, true); + DeletePrimaryIndex(document, header, true); + DeleteSecondaryIndexes(document, header, true); - // release the header. nobody else should point to it now - document->_headers->release(document->_headers, newHeader, true); - - res = TRI_ERROR_NO_ERROR; - - return res; + return TRI_ERROR_NO_ERROR; } //////////////////////////////////////////////////////////////////////////////// @@ -414,18 +402,12 @@ static int RollbackUpdate (TRI_document_collection_t* document, //////////////////////////////////////////////////////////////////////////////// static int RollbackRemove (TRI_document_collection_t* document, - TRI_doc_mptr_t* newHeader, - TRI_doc_mptr_t* oldHeader, + TRI_doc_mptr_t* header, bool adjustHeader) { - int res; - - // there is no new header - assert(newHeader == NULL); - - res = InsertPrimaryIndex(document, oldHeader, true); + int res = InsertPrimaryIndex(document, header, true); if (res == TRI_ERROR_NO_ERROR) { - res = InsertSecondaryIndexes(document, oldHeader, true); + res = InsertSecondaryIndexes(document, header, true); } else { LOG_ERROR("error rolling back remove operation"); @@ -433,7 +415,7 @@ static int RollbackRemove (TRI_document_collection_t* document, if (adjustHeader) { // put back the header into its old position - document->_headers->relink(document->_headers, oldHeader, oldHeader); + document->_headers->relink(document->_headers, header, header); } return res; @@ -596,35 +578,6 @@ static void DebugHeadersDocumentCollection (TRI_document_collection_t* collectio } } -//////////////////////////////////////////////////////////////////////////////// -/// @brief insert a WAL marker into indexes -//////////////////////////////////////////////////////////////////////////////// - -static int InsertIndexes (TRI_transaction_collection_t* trxCollection, - TRI_doc_mptr_t* header) { - - TRI_document_collection_t* document = (TRI_document_collection_t*) trxCollection->_collection->_collection; - - // insert into primary index first - int res = InsertPrimaryIndex(document, header, false); - - if (res != TRI_ERROR_NO_ERROR) { - // insert has failed - return res; - } - - // insert into secondary indexes - res = InsertSecondaryIndexes(document, header, false); - - if (res != TRI_ERROR_NO_ERROR) { - // insertion into secondary indexes failed - DeleteSecondaryIndexes(document, header, true); - DeletePrimaryIndex(document, header, true); - } - - return res; -} - //////////////////////////////////////////////////////////////////////////////// /// @brief post-insert operation //////////////////////////////////////////////////////////////////////////////// @@ -647,6 +600,63 @@ static int PostInsertIndexes (TRI_transaction_collection_t* trxCollection, return TRI_ERROR_NO_ERROR; } +//////////////////////////////////////////////////////////////////////////////// +/// @brief generates a new revision id if not yet set +//////////////////////////////////////////////////////////////////////////////// + +static inline TRI_voc_rid_t GetRevisionId (TRI_voc_rid_t previous) { + if (previous != 0) { + return previous; + } + + // generate new revision id + return static_cast(TRI_NewTickServer()); +} + +//////////////////////////////////////////////////////////////////////////////// +/// @brief insert a document +//////////////////////////////////////////////////////////////////////////////// + +static int InsertDocument (TRI_transaction_collection_t* trxCollection, + TRI_doc_mptr_t* header, + triagens::wal::DocumentOperation& operation, + TRI_doc_mptr_t* mptr, + bool syncRequested) { + + assert(header != nullptr); + assert(mptr != nullptr); + TRI_document_collection_t* document = (TRI_document_collection_t*) trxCollection->_collection->_collection; + + // ............................................................................. + // insert into indexes + // ............................................................................. + + // insert into primary index first + int res = InsertPrimaryIndex(document, header, false); + + if (res != TRI_ERROR_NO_ERROR) { + // insert has failed + return res; + } + + // insert into secondary indexes + res = InsertSecondaryIndexes(document, header, false); + + if (res != TRI_ERROR_NO_ERROR) { + return res; + } + + res = TRI_AddOperationTransaction(operation, syncRequested); + + if (res == TRI_ERROR_NO_ERROR) { + *mptr = *header; + + res = PostInsertIndexes(trxCollection, header); + } + + return res; +} + //////////////////////////////////////////////////////////////////////////////// /// @brief insert a shaped-json document (or edge) /// note: key might be NULL. in this case, a key is auto-generated @@ -664,16 +674,12 @@ static int InsertDocumentShapedJson (TRI_transaction_collection_t* trxCollection bool isRestore) { // TODO: isRestore is not used yet! - TRI_voc_tick_t tick; + assert(mptr != nullptr); + mptr->_data = nullptr; + mptr->_key = nullptr; - if (rid == 0) { - // generate new revision id - tick = TRI_NewTickServer(); - rid = static_cast(tick); - } - else { - tick = static_cast(rid); - } + rid = GetRevisionId(rid); + TRI_voc_tick_t tick = static_cast(rid); TRI_primary_collection_t* primary = trxCollection->_collection->_collection; TRI_key_generator_t* keyGenerator = static_cast(primary->_keyGenerator); @@ -707,37 +713,32 @@ static int InsertDocumentShapedJson (TRI_transaction_collection_t* trxCollection return res; } - - triagens::wal::SlotInfo slotInfo; + triagens::wal::Marker* marker = nullptr; if (markerType == TRI_DOC_MARKER_KEY_DOCUMENT) { // document assert(edge == nullptr); - triagens::wal::DocumentMarker marker(primary->base._vocbase->_id, - primary->base._info._cid, - rid, - trxCollection->_transaction->_id, - keyString, - legend, - shaped); - - slotInfo = triagens::wal::LogfileManager::instance()->writeMarker(marker, forceSync); + marker = new triagens::wal::DocumentMarker(primary->base._vocbase->_id, + primary->base._info._cid, + rid, + trxCollection->_transaction->_id, + keyString, + legend, + shaped); } else if (markerType == TRI_DOC_MARKER_KEY_EDGE) { // edge assert(edge != nullptr); - triagens::wal::EdgeMarker marker(primary->base._vocbase->_id, - primary->base._info._cid, - rid, - trxCollection->_transaction->_id, - keyString, - edge, - legend, - shaped); - - slotInfo = triagens::wal::LogfileManager::instance()->writeMarker(marker, forceSync); + marker = new triagens::wal::EdgeMarker(primary->base._vocbase->_id, + primary->base._info._cid, + rid, + trxCollection->_transaction->_id, + keyString, + edge, + legend, + shaped); } else { // invalid marker type @@ -745,71 +746,72 @@ static int InsertDocumentShapedJson (TRI_transaction_collection_t* trxCollection } - if (slotInfo.errorCode != TRI_ERROR_NO_ERROR) { - // some error occurred - return slotInfo.errorCode; - } - + assert(marker != nullptr); // now insert into indexes { triagens::arango::CollectionWriteLocker collectionLocker(primary, lock); + + triagens::wal::DocumentOperation operation(marker, trxCollection, TRI_VOC_DOCUMENT_OPERATION_INSERT, rid); + // create a new header TRI_document_collection_t* document = (TRI_document_collection_t*) primary; - TRI_doc_mptr_t* header = document->_headers->request(document->_headers, slotInfo.size); + TRI_doc_mptr_t* header = operation.header = document->_headers->request(document->_headers, marker->size()); if (header == nullptr) { + // out of memory. no harm done here. just return the error return TRI_ERROR_OUT_OF_MEMORY; } - + // update the header we got - triagens::wal::document_marker_t const* m = static_cast(slotInfo.mem); + void* mem = operation.marker->mem(); + triagens::wal::document_marker_t const* m = static_cast(mem); header->_rid = rid; - header->_fid = 0; // TODO: use WAL fid - // let header point to WAL location - header->_data = slotInfo.mem; - header->_key = static_cast(const_cast(slotInfo.mem)) + m->_offsetKey; + header->_data = mem; + header->_key = static_cast(const_cast(mem)) + m->_offsetKey; // insert into indexes - res = InsertIndexes(trxCollection, header); - - if (res != TRI_ERROR_NO_ERROR) { - // release the header - document->_headers->release(document->_headers, header, true); - - return res; - } - - // add the operation to the running transaction - res = TRI_AddOperationTransaction(trxCollection, - TRI_VOC_DOCUMENT_OPERATION_INSERT, - header, - nullptr, - nullptr, - forceSync); - + res = InsertDocument(trxCollection, header, operation, mptr, forceSync); + if (res != TRI_ERROR_NO_ERROR) { + // release the header. nobody else should point to it now + assert(mptr->_data == nullptr); + assert(mptr->_key == nullptr); + // something has failed.... now delete from the indexes again - RollbackInsert(document, header, nullptr); - // TODO: do we need to release the header here? - return res; + RollbackInsert(document, header); } - - // execute a post-insert - res = PostInsertIndexes(trxCollection, header); - - // TODO: do we need to release the header if something goes wrong? - // TODO: post-insert will never return an error - - if (res == TRI_ERROR_NO_ERROR) { - TRI_SetRevisionDocumentCollection(document, header->_rid, false); - *mptr = *header; + else { + assert(mptr->_data != nullptr); + assert(mptr->_key != nullptr); } } return res; } +//////////////////////////////////////////////////////////////////////////////// +/// @brief looks up a document by key +/// the caller must make sure the read lock on the collection is helt +//////////////////////////////////////////////////////////////////////////////// + +static int LookupDocument (TRI_primary_collection_t* primary, + TRI_voc_key_t key, + TRI_doc_update_policy_t const* policy, + TRI_doc_mptr_t*& header) { + header = static_cast(TRI_LookupByKeyAssociativePointer(&primary->_primaryIndex, key)); + + if (! IsVisible(header)) { + return TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND; + } + + if (policy != nullptr) { + return TRI_CheckUpdatePolicy(policy, header->_rid); + } + + return TRI_ERROR_NO_ERROR; +} + //////////////////////////////////////////////////////////////////////////////// /// @brief reads an element from the document collection //////////////////////////////////////////////////////////////////////////////// @@ -818,15 +820,19 @@ static int ReadDocumentShapedJson (TRI_transaction_collection_t* trxCollection, const TRI_voc_key_t key, TRI_doc_mptr_t* mptr, bool lock) { - TRI_primary_collection_t* primary = trxCollection->_collection->_collection; + assert(mptr != nullptr); + mptr->_data = nullptr; + mptr->_key = nullptr; { + TRI_primary_collection_t* primary = trxCollection->_collection->_collection; triagens::arango::CollectionReadLocker collectionLocker(primary, lock); - TRI_doc_mptr_t const* header = static_cast(TRI_LookupByKeyAssociativePointer(&primary->_primaryIndex, key)); + TRI_doc_mptr_t* header; + int res = LookupDocument(primary, key, nullptr, header); - if (! IsVisible(header)) { - return TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND; + if (res != TRI_ERROR_NO_ERROR) { + return res; } // we found a document, now copy it over @@ -841,15 +847,26 @@ static int ReadDocumentShapedJson (TRI_transaction_collection_t* trxCollection, } //////////////////////////////////////////////////////////////////////////////// -/// @brief updates a document in the indexes +/// @brief updates an existing document //////////////////////////////////////////////////////////////////////////////// -static int UpdateIndexes (TRI_transaction_collection_t* trxCollection, - TRI_doc_mptr_t* oldHeader, - TRI_doc_mptr_t* newHeader) { +static int UpdateDocument (TRI_transaction_collection_t* trxCollection, + TRI_doc_mptr_t* oldHeader, + triagens::wal::DocumentOperation& operation, + TRI_doc_mptr_t* mptr, + bool syncRequested) { + TRI_document_collection_t* document = (TRI_document_collection_t*) trxCollection->_collection->_collection; + + // save the old data, remember + TRI_doc_mptr_t oldData = *oldHeader; + + // ............................................................................. + // update indexes + // ............................................................................. + // remove old document from secondary indexes // (it will stay in the primary index as the key won't change) - TRI_document_collection_t* document = (TRI_document_collection_t*) trxCollection->_collection->_collection; + int res = DeleteSecondaryIndexes(document, oldHeader, false); if (res != TRI_ERROR_NO_ERROR) { @@ -858,6 +875,16 @@ static int UpdateIndexes (TRI_transaction_collection_t* trxCollection, return res; } + + // ............................................................................. + // update header + // ............................................................................. + + TRI_doc_mptr_t* newHeader = oldHeader; + + // update the header. this will modify oldHeader, too !!! + newHeader->_rid = operation.rid; + newHeader->_data = operation.marker->mem(); // insert new document into secondary indexes res = InsertSecondaryIndexes(document, newHeader, false); @@ -865,7 +892,23 @@ static int UpdateIndexes (TRI_transaction_collection_t* trxCollection, if (res != TRI_ERROR_NO_ERROR) { // rollback DeleteSecondaryIndexes(document, newHeader, true); + + // copy back old header data + *oldHeader = oldData; + InsertSecondaryIndexes(document, oldHeader, true); + + return res; + } + + res = TRI_AddOperationTransaction(operation, syncRequested); + + if (res == TRI_ERROR_NO_ERROR) { + // write new header into result + *mptr = *((TRI_doc_mptr_t*) newHeader); + } + else { + RollbackUpdate(document, newHeader, &oldData, false); // TODO: check whether "false" is correct! } return res; @@ -884,184 +927,89 @@ static int UpdateDocumentShapedJson (TRI_transaction_collection_t* trxCollection bool lock, bool forceSync) { - if (rid == 0) { - // generate new revision id - rid = static_cast(TRI_NewTickServer()); - } + rid = GetRevisionId(rid); + + assert(key != nullptr); // initialise the result assert(mptr != nullptr); mptr->_key = nullptr; mptr->_data = nullptr; - + TRI_primary_collection_t* primary = trxCollection->_collection->_collection; - TRI_document_collection_t* document = (TRI_document_collection_t*) primary; + + // create legend + triagens::basics::JsonLegend legend(primary->_shaper); + int res = legend.addShape(shaped->_sid, &shaped->_data); - int res; + if (res != TRI_ERROR_NO_ERROR) { + return res; + } + { triagens::arango::CollectionWriteLocker collectionLocker(primary, lock); // get the header pointer of the previous revision - assert(key != nullptr); - TRI_doc_mptr_t* oldHeader = static_cast(TRI_LookupByKeyAssociativePointer(&primary->_primaryIndex, key)); - - if (IsVisible(oldHeader)) { - // document found, now check revision - res = TRI_CheckUpdatePolicy(policy, oldHeader->_rid); - } - else { - // document not found - res = TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND; - } + TRI_doc_mptr_t* oldHeader; + res = LookupDocument(primary, key, policy, oldHeader); if (res != TRI_ERROR_NO_ERROR) { return res; } - triagens::basics::JsonLegend legend(primary->_shaper); - res = legend.addShape(shaped->_sid, &shaped->_data); - - if (res != TRI_ERROR_NO_ERROR) { - return res; - } - - triagens::wal::SlotInfo slotInfo; + triagens::wal::Marker* marker = nullptr; TRI_df_marker_t const* original = static_cast(oldHeader->_data); if (original->_type == TRI_WAL_MARKER_DOCUMENT || original->_type == TRI_DOC_MARKER_KEY_DOCUMENT) { - - triagens::wal::DocumentMarker marker = triagens::wal::DocumentMarker::clone(original, - primary->base._vocbase->_id, - primary->base._info._cid, - rid, - trxCollection->_transaction->_id, - legend, - shaped); - - slotInfo = triagens::wal::LogfileManager::instance()->writeMarker(marker, forceSync); + // create a WAL document marker + + marker = triagens::wal::DocumentMarker::clone(original, + primary->base._vocbase->_id, + primary->base._info._cid, + rid, + trxCollection->_transaction->_id, + legend, + shaped); } else if (original->_type == TRI_WAL_MARKER_EDGE || - original->_type == TRI_DOC_MARKER_KEY_EDGE) { + original->_type == TRI_DOC_MARKER_KEY_EDGE) { + // create a WAL edge marker - triagens::wal::EdgeMarker marker = triagens::wal::EdgeMarker::clone(original, - primary->base._vocbase->_id, - primary->base._info._cid, - rid, - trxCollection->_transaction->_id, - legend, - shaped); - - slotInfo = triagens::wal::LogfileManager::instance()->writeMarker(marker, forceSync); + marker = triagens::wal::EdgeMarker::clone(original, + primary->base._vocbase->_id, + primary->base._info._cid, + rid, + trxCollection->_transaction->_id, + legend, + shaped); } else { // invalid marker type - assert(false); return TRI_ERROR_INTERNAL; } + + triagens::wal::DocumentOperation operation(marker, trxCollection, TRI_VOC_DOCUMENT_OPERATION_UPDATE, rid); + operation.header = oldHeader; + operation.init(); - - if (slotInfo.errorCode != TRI_ERROR_NO_ERROR) { - // some error occurred - return slotInfo.errorCode; - } - - triagens::wal::document_marker_t const* m = static_cast(slotInfo.mem); - - TRI_doc_mptr_t newHeader; - newHeader._rid = rid; - newHeader._fid = 0; // TODO - newHeader._data = slotInfo.mem; - newHeader._key = static_cast(const_cast(slotInfo.mem)) + m->_offsetKey; - - res = UpdateIndexes(trxCollection, oldHeader, &newHeader); - - if (res != TRI_ERROR_NO_ERROR) { - return res; - } - - res = TRI_AddOperationTransaction(trxCollection, - TRI_VOC_DOCUMENT_OPERATION_UPDATE, - &newHeader, - oldHeader, - oldHeader, - forceSync); - - if (res != TRI_ERROR_NO_ERROR) { - RollbackUpdate(document, &newHeader, oldHeader, false); - return res; - } - - TRI_SetRevisionDocumentCollection(document, rid, false); - - // update the old header in place - oldHeader->_rid = newHeader._rid; - oldHeader->_fid = newHeader._fid; - oldHeader->_data = newHeader._data; - oldHeader->_key = newHeader._key; - - // return value - *mptr = newHeader; + res = UpdateDocument(trxCollection, oldHeader, operation, mptr, forceSync); + } + + if (res == TRI_ERROR_NO_ERROR) { + assert(mptr->_key != nullptr); + assert(mptr->_data != nullptr); + assert(mptr->_rid > 0); + } + else { + assert(mptr->_key == nullptr); + assert(mptr->_data == nullptr); + assert(mptr->_rid == 0); } - - assert(mptr->_key != nullptr); - assert(mptr->_data != nullptr); - assert(mptr->_rid > 0); return res; } -//////////////////////////////////////////////////////////////////////////////// -/// @brief removes a document from the indexes -//////////////////////////////////////////////////////////////////////////////// - -static int RemoveIndexes (TRI_transaction_collection_t* trxCollection, - TRI_doc_update_policy_t const* policy, - TRI_voc_key_t key, - TRI_doc_mptr_t** mptr) { - - TRI_primary_collection_t* primary = trxCollection->_collection->_collection; - - // get the existing header pointer - TRI_doc_mptr_t* header = static_cast(TRI_LookupByKeyAssociativePointer(&primary->_primaryIndex, key)); - - if (! IsVisible(header)) { - return TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND; - } - - *mptr = header; - - // ............................................................................. - // check the revision - // ............................................................................. - - int res = TRI_CheckUpdatePolicy(policy, header->_rid); - - if (res != TRI_ERROR_NO_ERROR) { - return res; - } - - // delete from indexes - TRI_document_collection_t* document = (TRI_document_collection_t*) primary; - res = DeleteSecondaryIndexes(document, header, false); - - if (res != TRI_ERROR_NO_ERROR) { - // deletion failed. roll back - InsertSecondaryIndexes(document, header, true); - - return res; - } - - res = DeletePrimaryIndex(document, header, false); - - if (res != TRI_ERROR_NO_ERROR) { - // deletion failed. roll back - InsertSecondaryIndexes(document, header, true); - } - - return res; -} - //////////////////////////////////////////////////////////////////////////////// /// @brief removes a shaped-json document (or edge) //////////////////////////////////////////////////////////////////////////////// @@ -1073,55 +1021,50 @@ static int RemoveDocumentShapedJson (TRI_transaction_collection_t* trxCollection bool lock, bool forceSync) { assert(key != nullptr); - - if (rid == 0) { - // generate new revision id - rid = static_cast(TRI_NewTickServer()); - } + + rid = GetRevisionId(rid); TRI_primary_collection_t* primary = trxCollection->_collection->_collection; - TRI_document_collection_t* document = (TRI_document_collection_t*) primary; - triagens::wal::RemoveMarker marker(primary->base._vocbase->_id, - primary->base._info._cid, - rid, - trxCollection->_transaction->_id, - std::string(key)); - - triagens::wal::SlotInfo slotInfo = triagens::wal::LogfileManager::instance()->writeMarker(marker, forceSync); - - if (slotInfo.errorCode != TRI_ERROR_NO_ERROR) { - // some error occurred - return slotInfo.errorCode; - } + triagens::wal::Marker* marker = new triagens::wal::RemoveMarker(primary->base._vocbase->_id, + primary->base._info._cid, + rid, + trxCollection->_transaction->_id, + std::string(key)); TRI_doc_mptr_t* header; int res; { triagens::arango::CollectionWriteLocker collectionLocker(primary, lock); + + triagens::wal::DocumentOperation operation(marker, trxCollection, TRI_VOC_DOCUMENT_OPERATION_REMOVE, rid); - res = RemoveIndexes(trxCollection, policy, key, &header); + res = LookupDocument(primary, key, policy, header); if (res != TRI_ERROR_NO_ERROR) { return res; } - + + // we found a document to remove assert(header != nullptr); + operation.header = header; - res = TRI_AddOperationTransaction(trxCollection, - TRI_VOC_DOCUMENT_OPERATION_REMOVE, - nullptr, - header, - header, - forceSync); + TRI_document_collection_t* document = (TRI_document_collection_t*) primary; + + // delete from indexes + res = DeleteSecondaryIndexes(document, header, false); + + if (res != TRI_ERROR_NO_ERROR) { + InsertSecondaryIndexes(document, header, true); + + return res; + } + + res = TRI_AddOperationTransaction(operation, forceSync); if (res != TRI_ERROR_NO_ERROR) { // deletion failed. roll back - RollbackRemove(document, nullptr, header, true); // TODO: check if "true" is correct! - } - else { - TRI_SetRevisionDocumentCollection(document, rid, false); - document->_headers->unlink(document->_headers, header); + RollbackRemove(document, header, false); } } @@ -2575,13 +2518,13 @@ int TRI_RollbackOperationDocumentCollection (TRI_document_collection_t* document int res; if (type == TRI_VOC_DOCUMENT_OPERATION_INSERT) { - res = RollbackInsert(document, newHeader, NULL); + res = RollbackInsert(document, newHeader); } else if (type == TRI_VOC_DOCUMENT_OPERATION_UPDATE) { res = RollbackUpdate(document, newHeader, oldData, true); } else if (type == TRI_VOC_DOCUMENT_OPERATION_REMOVE) { - res = RollbackRemove(document, NULL, oldHeader, true); + res = RollbackRemove(document, oldHeader, true); } else { res = TRI_ERROR_INTERNAL; diff --git a/arangod/VocBase/headers.cpp b/arangod/VocBase/headers.cpp index 75cbec2e58..f6342f7565 100644 --- a/arangod/VocBase/headers.cpp +++ b/arangod/VocBase/headers.cpp @@ -103,7 +103,7 @@ static void ClearHeader (TRI_headers_t* h, TRI_doc_mptr_t* header) { simple_headers_t* headers = (simple_headers_t*) h; - TRI_ASSERT_MAINTAINER(header != NULL); + TRI_ASSERT_MAINTAINER(header != nullptr); memset(header, 0, sizeof(TRI_doc_mptr_t)); @@ -123,7 +123,7 @@ static void MoveBackHeader (TRI_headers_t* h, int64_t oldSize; int64_t newSize; - if (header == NULL) { + if (header == nullptr) { return; } @@ -132,13 +132,13 @@ static void MoveBackHeader (TRI_headers_t* h, TRI_ASSERT_MAINTAINER(headers->_totalSize > 0); // we have at least one element in the list - TRI_ASSERT_MAINTAINER(headers->_begin != NULL); - TRI_ASSERT_MAINTAINER(headers->_end != NULL); + TRI_ASSERT_MAINTAINER(headers->_begin != nullptr); + TRI_ASSERT_MAINTAINER(headers->_end != nullptr); TRI_ASSERT_MAINTAINER(header->_prev != header); TRI_ASSERT_MAINTAINER(header->_next != header); - TRI_ASSERT_MAINTAINER(old != NULL); - TRI_ASSERT_MAINTAINER(old->_data != NULL); + TRI_ASSERT_MAINTAINER(old != nullptr); + TRI_ASSERT_MAINTAINER(old->_data != nullptr); newSize = (int64_t) (((TRI_df_marker_t*) header->_data)->_size); oldSize = (int64_t) (((TRI_df_marker_t*) old->_data)->_size); @@ -149,32 +149,32 @@ static void MoveBackHeader (TRI_headers_t* h, if (headers->_end == header) { // header is already at the end - TRI_ASSERT_MAINTAINER(header->_next == NULL); + TRI_ASSERT_MAINTAINER(header->_next == nullptr); return; } TRI_ASSERT_MAINTAINER(headers->_begin != headers->_end); // unlink the element - if (header->_prev != NULL) { + if (header->_prev != nullptr) { header->_prev->_next = header->_next; } - if (header->_next != NULL) { + if (header->_next != nullptr) { header->_next->_prev = header->_prev; } if (headers->_begin == header) { - TRI_ASSERT_MAINTAINER(header->_next != NULL); + TRI_ASSERT_MAINTAINER(header->_next != nullptr); headers->_begin = header->_next; } header->_prev = headers->_end; - header->_next = NULL; + header->_next = nullptr; headers->_end = header; header->_prev->_next = header; - TRI_ASSERT_MAINTAINER(headers->_begin != NULL); - TRI_ASSERT_MAINTAINER(headers->_end != NULL); + TRI_ASSERT_MAINTAINER(headers->_begin != nullptr); + TRI_ASSERT_MAINTAINER(headers->_end != nullptr); TRI_ASSERT_MAINTAINER(header->_prev != header); TRI_ASSERT_MAINTAINER(header->_next != header); @@ -190,8 +190,8 @@ static void UnlinkHeader (TRI_headers_t* h, simple_headers_t* headers = (simple_headers_t*) h; int64_t size; - TRI_ASSERT_MAINTAINER(header != NULL); - TRI_ASSERT_MAINTAINER(header->_data != NULL); + TRI_ASSERT_MAINTAINER(header != nullptr); + TRI_ASSERT_MAINTAINER(header->_data != nullptr); TRI_ASSERT_MAINTAINER(header->_prev != header); TRI_ASSERT_MAINTAINER(header->_next != header); @@ -199,11 +199,11 @@ static void UnlinkHeader (TRI_headers_t* h, TRI_ASSERT_MAINTAINER(size > 0); // unlink the header - if (header->_prev != NULL) { + if (header->_prev != nullptr) { header->_prev->_next = header->_next; } - if (header->_next != NULL) { + if (header->_next != nullptr) { header->_next->_prev = header->_prev; } @@ -224,13 +224,13 @@ static void UnlinkHeader (TRI_headers_t* h, headers->_totalSize -= TRI_DF_ALIGN_BLOCK(size); if (headers->_nrLinked == 0) { - TRI_ASSERT_MAINTAINER(headers->_begin == NULL); - TRI_ASSERT_MAINTAINER(headers->_end == NULL); + TRI_ASSERT_MAINTAINER(headers->_begin == nullptr); + TRI_ASSERT_MAINTAINER(headers->_end == nullptr); TRI_ASSERT_MAINTAINER(headers->_totalSize == 0); } else { - TRI_ASSERT_MAINTAINER(headers->_begin != NULL); - TRI_ASSERT_MAINTAINER(headers->_end != NULL); + TRI_ASSERT_MAINTAINER(headers->_begin != nullptr); + TRI_ASSERT_MAINTAINER(headers->_end != nullptr); TRI_ASSERT_MAINTAINER(headers->_totalSize > 0); } @@ -250,17 +250,17 @@ static void MoveHeader (TRI_headers_t* h, int64_t newSize; int64_t oldSize; - if (header == NULL) { + if (header == nullptr) { return; } TRI_ASSERT_MAINTAINER(headers->_nrAllocated > 0); TRI_ASSERT_MAINTAINER(header->_prev != header); TRI_ASSERT_MAINTAINER(header->_next != header); - TRI_ASSERT_MAINTAINER(header->_data != NULL); + TRI_ASSERT_MAINTAINER(header->_data != nullptr); TRI_ASSERT_MAINTAINER(((TRI_df_marker_t*) header->_data)->_size > 0); - TRI_ASSERT_MAINTAINER(old != NULL); - TRI_ASSERT_MAINTAINER(old->_data != NULL); + TRI_ASSERT_MAINTAINER(old != nullptr); + TRI_ASSERT_MAINTAINER(old->_data != nullptr); newSize = (int64_t) (((TRI_df_marker_t*) header->_data)->_size); oldSize = (int64_t) (((TRI_df_marker_t*) old->_data)->_size); @@ -269,56 +269,56 @@ static void MoveHeader (TRI_headers_t* h, headers->_totalSize += TRI_DF_ALIGN_BLOCK(oldSize); // adjust list start and end pointers - if (old->_prev == NULL) { + if (old->_prev == nullptr) { headers->_begin = header; } else if (headers->_begin == header) { headers->_begin = header->_next; } - if (old->_next == NULL) { + if (old->_next == nullptr) { headers->_end = header; } else if (headers->_end == header) { headers->_end = header->_prev; } - if (header->_prev != NULL) { + if (header->_prev != nullptr) { if (header->_prev == old->_next) { - header->_prev->_next = NULL; + header->_prev->_next = nullptr; } else { header->_prev->_next = header->_next; } } - if (header->_next != NULL) { + if (header->_next != nullptr) { if (header->_next == old->_prev) { - header->_next->_prev = NULL; + header->_next->_prev = nullptr; } else { header->_next->_prev = header->_prev; } } - if (old->_prev != NULL) { + if (old->_prev != nullptr) { old->_prev->_next = header; header->_prev = old->_prev; } else { - header->_prev = NULL; + header->_prev = nullptr; } - if (old->_next != NULL) { + if (old->_next != nullptr) { old->_next->_prev = header; header->_next = old->_next; } else { - header->_next = NULL; + header->_next = nullptr; } - TRI_ASSERT_MAINTAINER(headers->_begin != NULL); - TRI_ASSERT_MAINTAINER(headers->_end != NULL); + TRI_ASSERT_MAINTAINER(headers->_begin != nullptr); + TRI_ASSERT_MAINTAINER(headers->_end != nullptr); TRI_ASSERT_MAINTAINER(header->_prev != header); TRI_ASSERT_MAINTAINER(header->_next != header); } @@ -334,11 +334,11 @@ static void RelinkHeader (TRI_headers_t* h, simple_headers_t* headers = (simple_headers_t*) h; int64_t size; - if (header == NULL) { + if (header == nullptr) { return; } - TRI_ASSERT_MAINTAINER(header->_data != NULL); + TRI_ASSERT_MAINTAINER(header->_data != nullptr); size = (int64_t) ((TRI_df_marker_t*) header->_data)->_size; TRI_ASSERT_MAINTAINER(size > 0); @@ -366,7 +366,7 @@ static TRI_doc_mptr_t* RequestHeader (TRI_headers_t* h, assert(size > 0); - if (headers->_freelist == NULL) { + if (headers->_freelist == nullptr) { size_t blockSize = GetBlockSize(headers->_blocks._length); TRI_ASSERT_MAINTAINER(blockSize > 0); @@ -374,36 +374,36 @@ static TRI_doc_mptr_t* RequestHeader (TRI_headers_t* h, char* begin = static_cast(TRI_Allocate(TRI_UNKNOWN_MEM_ZONE, blockSize * sizeof(TRI_doc_mptr_t), true)); // out of memory - if (begin == NULL) { + if (begin == nullptr) { TRI_set_errno(TRI_ERROR_OUT_OF_MEMORY); - return NULL; + return nullptr; } char* ptr = begin + sizeof(TRI_doc_mptr_t) * (blockSize - 1); - header = NULL; + header = nullptr; for (; begin <= ptr; ptr -= sizeof(TRI_doc_mptr_t)) { ((TRI_doc_mptr_t*) ptr)->_data = header; header = ptr; } - TRI_ASSERT_MAINTAINER(headers != NULL); + TRI_ASSERT_MAINTAINER(headers != nullptr); headers->_freelist = (TRI_doc_mptr_t*) header; TRI_PushBackVectorPointer(&headers->_blocks, begin); } - TRI_ASSERT_MAINTAINER(headers->_freelist != NULL); + TRI_ASSERT_MAINTAINER(headers->_freelist != nullptr); TRI_doc_mptr_t* result = const_cast(headers->_freelist); - TRI_ASSERT_MAINTAINER(result != NULL); + TRI_ASSERT_MAINTAINER(result != nullptr); headers->_freelist = static_cast(result->_data); - result->_data = NULL; + result->_data = nullptr; // put new header at the end of the list - if (headers->_begin == NULL) { + if (headers->_begin == nullptr) { // list of headers is empty TRI_ASSERT_MAINTAINER(headers->_nrLinked == 0); TRI_ASSERT_MAINTAINER(headers->_totalSize == 0); @@ -411,20 +411,20 @@ static TRI_doc_mptr_t* RequestHeader (TRI_headers_t* h, headers->_begin = result; headers->_end = result; - result->_prev = NULL; - result->_next = NULL; + result->_prev = nullptr; + result->_next = nullptr; } else { // list is not empty TRI_ASSERT_MAINTAINER(headers->_nrLinked > 0); TRI_ASSERT_MAINTAINER(headers->_totalSize > 0); TRI_ASSERT_MAINTAINER(headers->_nrAllocated > 0); - TRI_ASSERT_MAINTAINER(headers->_begin != NULL); - TRI_ASSERT_MAINTAINER(headers->_end != NULL); + TRI_ASSERT_MAINTAINER(headers->_begin != nullptr); + TRI_ASSERT_MAINTAINER(headers->_end != nullptr); headers->_end->_next = result; result->_prev = headers->_end; - result->_next = NULL; + result->_next = nullptr; headers->_end = result; } @@ -444,7 +444,7 @@ static void ReleaseHeader (TRI_headers_t* h, bool unlinkHeader) { simple_headers_t* headers = (simple_headers_t*) h; - if (header == NULL) { + if (header == nullptr) { return; } @@ -461,7 +461,7 @@ static void ReleaseHeader (TRI_headers_t* h, //////////////////////////////////////////////////////////////////////////////// /// @brief return the element at the head of the list /// -/// note: the element returned might be NULL +/// note: the element returned might be nullptr //////////////////////////////////////////////////////////////////////////////// static TRI_doc_mptr_t* FrontHeaders (TRI_headers_t const* h) { @@ -473,7 +473,7 @@ static TRI_doc_mptr_t* FrontHeaders (TRI_headers_t const* h) { //////////////////////////////////////////////////////////////////////////////// /// @brief return the element at the tail of the list /// -/// note: the element returned might be NULL +/// note: the element returned might be nullptr //////////////////////////////////////////////////////////////////////////////// static TRI_doc_mptr_t* BackHeaders (TRI_headers_t const* h) { @@ -520,7 +520,7 @@ static void DumpHeaders (TRI_headers_t const* h) { printf("begin ptr: %p\n", headers->_begin); printf("end ptr: %p\n", headers->_end); - while (next != NULL) { + while (next != nullptr) { printf("- header #%lu: ptr: %p, prev: %p, next: %p, key: %s\n", (unsigned long) i, next, @@ -529,7 +529,7 @@ static void DumpHeaders (TRI_headers_t const* h) { next->_key); i++; - if (next->_next == NULL) { + if (next->_next == nullptr) { TRI_ASSERT_MAINTAINER(next == headers->_end); } @@ -561,15 +561,14 @@ static void DumpHeaders (TRI_headers_t const* h) { TRI_headers_t* TRI_CreateSimpleHeaders () { simple_headers_t* headers = static_cast(TRI_Allocate(TRI_UNKNOWN_MEM_ZONE, sizeof(simple_headers_t), false)); - if (headers == NULL) { + if (headers == nullptr) { TRI_set_errno(TRI_ERROR_OUT_OF_MEMORY); - return NULL; + return nullptr; } - headers->_freelist = NULL; - - headers->_begin = NULL; - headers->_end = NULL; + headers->_freelist = nullptr; + headers->_begin = nullptr; + headers->_end = nullptr; headers->_nrAllocated = 0; headers->_nrLinked = 0; headers->_totalSize = 0; diff --git a/arangod/VocBase/transaction.cpp b/arangod/VocBase/transaction.cpp index 1b958f1878..45f8f40153 100644 --- a/arangod/VocBase/transaction.cpp +++ b/arangod/VocBase/transaction.cpp @@ -37,6 +37,7 @@ #include "VocBase/replication-logger.h" #include "VocBase/server.h" #include "VocBase/vocbase.h" +#include "Wal/DocumentOperation.h" #include "Wal/LogfileManager.h" #define LOG_TRX(trx, level, format, ...) \ @@ -125,45 +126,6 @@ static int InitCollectionOperations (TRI_transaction_collection_t* trxCollection return res; } -//////////////////////////////////////////////////////////////////////////////// -/// @brief add a WAL-based operation for a collection -//////////////////////////////////////////////////////////////////////////////// - -static int AddOperation (TRI_transaction_collection_t* trxCollection, - TRI_voc_document_operation_e type, - TRI_doc_mptr_t* newHeader, - TRI_doc_mptr_t* oldHeader, - TRI_doc_mptr_t* oldData) { - - TRI_DEBUG_INTENTIONAL_FAIL_IF("AddOperation-OOM") { - return TRI_ERROR_DEBUG; - } - - int res; - - if (trxCollection->_operations == nullptr) { - res = InitCollectionOperations(trxCollection); - - if (res != TRI_ERROR_NO_ERROR) { - return TRI_ERROR_OUT_OF_MEMORY; - } - } - - TRI_transaction_operation_t trxOperation; - trxOperation._type = type; - trxOperation._newHeader = newHeader; - trxOperation._oldHeader = oldHeader; - - if (oldData != nullptr) { - trxOperation._oldData = *oldData; - } - else { - memset(&trxOperation._oldData, 0, sizeof(TRI_doc_mptr_t)); - } - - return TRI_PushBackVector(trxCollection->_operations, &trxOperation); -} - //////////////////////////////////////////////////////////////////////////////// /// @brief rollback all operations for a collection //////////////////////////////////////////////////////////////////////////////// @@ -969,49 +931,56 @@ int TRI_AddIdFailedTransaction (TRI_vector_t* vector, return TRI_ERROR_NO_ERROR; } -//////////////////////////////////////////////////////////////////////////////// -/// @brief reserve room in the transactions operations vector -//////////////////////////////////////////////////////////////////////////////// - -int TRI_ReserveOperationTransaction (TRI_transaction_collection_t* trxCollection) { - if (trxCollection->_operations == nullptr) { - return InitCollectionOperations(trxCollection); - } - - return TRI_ReserveVector(trxCollection->_operations, 1); -} - //////////////////////////////////////////////////////////////////////////////// /// @brief add a WAL operation for a transaction collection //////////////////////////////////////////////////////////////////////////////// -int TRI_AddOperationTransaction (TRI_transaction_collection_t* trxCollection, - TRI_voc_document_operation_e type, - TRI_doc_mptr_t* newHeader, - TRI_doc_mptr_t* oldHeader, - TRI_doc_mptr_t* oldData, +int TRI_AddOperationTransaction (triagens::wal::DocumentOperation& operation, bool syncRequested) { - + TRI_transaction_collection_t* trxCollection = operation.trxCollection; TRI_transaction_t* trx = trxCollection->_transaction; - // TODO: re-add replication!! - - int res = AddOperation(trxCollection, type, newHeader, oldHeader, oldData); + assert(operation.header != nullptr); - if (res == TRI_ERROR_NO_ERROR) { - // if everything went well, this will ensure we don't double free etc. headers - trx->_hasOperations = true; - - // update waitForSync for the transaction + bool const isSingleOperationTransaction = IsSingleOperationTransaction(trx); + + // default is false + bool waitForSync = false; + if (isSingleOperationTransaction) { + waitForSync = syncRequested || trxCollection->_waitForSync; + } + else { + // upgrade the info for the transaction if (syncRequested || trxCollection->_waitForSync) { trx->_waitForSync = true; } } - else { - TRI_ASSERT_MAINTAINER(res == TRI_ERROR_OUT_OF_MEMORY || res == TRI_ERROR_DEBUG); - } - return res; + triagens::wal::SlotInfo slotInfo = triagens::wal::LogfileManager::instance()->allocateAndWrite(operation.marker->mem(), operation.marker->size(), waitForSync); + + if (slotInfo.errorCode != TRI_ERROR_NO_ERROR) { + // some error occurred + operation.revert(isSingleOperationTransaction); + + return slotInfo.errorCode; + } + + if (operation.type == TRI_VOC_DOCUMENT_OPERATION_INSERT || + operation.type == TRI_VOC_DOCUMENT_OPERATION_UPDATE) { + // adjust the data position in the header + operation.header->_data = slotInfo.mem; + triagens::wal::document_marker_t const* m = static_cast(operation.header->_data); + operation.header->_key = static_cast(const_cast(slotInfo.mem)) + m->_offsetKey; + } + + operation.handled(isSingleOperationTransaction); + + if (operation.rid > 0) { + TRI_document_collection_t* document = (TRI_document_collection_t*) trxCollection->_collection->_collection; + TRI_SetRevisionDocumentCollection(document, operation.rid, false); + } + + return TRI_ERROR_NO_ERROR; } //////////////////////////////////////////////////////////////////////////////// diff --git a/arangod/VocBase/transaction.h b/arangod/VocBase/transaction.h index 543601c67e..840f6bc622 100644 --- a/arangod/VocBase/transaction.h +++ b/arangod/VocBase/transaction.h @@ -250,23 +250,6 @@ bool TRI_IsLockedCollectionTransaction (TRI_transaction_collection_t*, int TRI_AddIdFailedTransaction (TRI_vector_t*, TRI_voc_tid_t); -//////////////////////////////////////////////////////////////////////////////// -/// @brief reserve room in the transactions operations vector -//////////////////////////////////////////////////////////////////////////////// - -int TRI_ReserveOperationTransaction (TRI_transaction_collection_t*); - -//////////////////////////////////////////////////////////////////////////////// -/// @brief add a WAL operation fora transaction collection -//////////////////////////////////////////////////////////////////////////////// - -int TRI_AddOperationTransaction (TRI_transaction_collection_t*, - TRI_voc_document_operation_e, - struct TRI_doc_mptr_s*, - struct TRI_doc_mptr_s*, - struct TRI_doc_mptr_s*, - bool); - //////////////////////////////////////////////////////////////////////////////// /// @brief get a transaction's id //////////////////////////////////////////////////////////////////////////////// diff --git a/arangod/Wal/DocumentOperation.h b/arangod/Wal/DocumentOperation.h new file mode 100644 index 0000000000..3d891668fa --- /dev/null +++ b/arangod/Wal/DocumentOperation.h @@ -0,0 +1,98 @@ + +#ifndef TRIAGENS_VOC_BASE_DOCUMENT_OPERATION_H +#define TRIAGENS_VOC_BASE_DOCUMENT_OPERATION_H 1 + +#include "Basics/Common.h" +#include "VocBase/voc-types.h" +#include "VocBase/document-collection.h" +#include "Wal/Marker.h" + +struct TRI_transaction_collection_s; + +namespace triagens { + namespace wal { + class Marker; + + struct DocumentOperation { + DocumentOperation (Marker* marker, + struct TRI_transaction_collection_s* trxCollection, + TRI_voc_document_operation_e type, + TRI_voc_rid_t rid) + : marker(marker), + trxCollection(trxCollection), + header(nullptr), + type(type), + rid(rid) { + + assert(marker != nullptr); + } + + ~DocumentOperation () { + if (marker != nullptr) { + delete marker; + } + } + + void init () { + if (type == TRI_VOC_DOCUMENT_OPERATION_UPDATE) { + // copy the old header into a safe area + assert(header != nullptr); + + oldHeader = *header; + } + } + + void handled (bool isSingleOperationTransaction) { + assert(header != nullptr); + + TRI_document_collection_t* document = (TRI_document_collection_t*) trxCollection->_collection->_collection; + + if (type == TRI_VOC_DOCUMENT_OPERATION_INSERT) { + // nothing to do for insert + } + else if (type == TRI_VOC_DOCUMENT_OPERATION_UPDATE) { + // move header to the end of the list + document->_headers->moveBack(document->_headers, header, &oldHeader); + } + else if (type == TRI_VOC_DOCUMENT_OPERATION_REMOVE) { + // free or unlink the header + if (isSingleOperationTransaction) { + document->_headers->release(document->_headers, header, true); + } + else { + document->_headers->unlink(document->_headers, header); + } + } + + // free the local marker buffer + delete[] marker->steal(); + } + + void revert (bool isSingleOperationTransaction) { + assert(header != nullptr); + + TRI_document_collection_t* document = (TRI_document_collection_t*) trxCollection->_collection->_collection; + + if (type == TRI_VOC_DOCUMENT_OPERATION_INSERT) { + document->_headers->release(document->_headers, header, true); + } + else if (type == TRI_VOC_DOCUMENT_OPERATION_UPDATE) { + document->_headers->move(document->_headers, header, &oldHeader); + } + else if (type == TRI_VOC_DOCUMENT_OPERATION_REMOVE) { + document->_headers->relink(document->_headers, header, header); + } + } + + Marker* marker; + struct TRI_transaction_collection_s* trxCollection; + TRI_doc_mptr_t* header; + TRI_doc_mptr_t oldHeader; + TRI_voc_document_operation_e const type; + TRI_voc_rid_t const rid; + }; + } +} + +#endif + diff --git a/arangod/Wal/Marker.cpp b/arangod/Wal/Marker.cpp index 3718949df4..55a2de5778 100644 --- a/arangod/Wal/Marker.cpp +++ b/arangod/Wal/Marker.cpp @@ -38,28 +38,6 @@ using namespace triagens::wal; // --SECTION-- constructors and destructors // ----------------------------------------------------------------------------- -//////////////////////////////////////////////////////////////////////////////// -/// @brief create a copy of a marker -//////////////////////////////////////////////////////////////////////////////// - -Marker::Marker (Marker&& other) - : _buffer(other._buffer), - _size(other._size) { - - other._buffer = nullptr; -} - -//////////////////////////////////////////////////////////////////////////////// -/// @brief create a copy of a marker -//////////////////////////////////////////////////////////////////////////////// - -Marker::Marker (Marker const& other) - : _buffer(new char[other._size]), - _size(other._size) { - - memcpy(_buffer, other._buffer, other._size); -} - //////////////////////////////////////////////////////////////////////////////// /// @brief create marker with a sized buffer //////////////////////////////////////////////////////////////////////////////// @@ -70,10 +48,14 @@ Marker::Marker (TRI_df_marker_type_e type, _size(static_cast(size)) { TRI_df_marker_t* m = reinterpret_cast(begin()); + memset(m, 0, size); // to shut up valgrind + m->_type = type; m->_size = static_cast(size); m->_crc = 0; m->_tick = 0; + + std::cout << "CREATED A MARKER OF SIZE: " << size << ", buffer: " << (void*) _buffer << "\n"; } //////////////////////////////////////////////////////////////////////////////// @@ -82,7 +64,7 @@ Marker::Marker (TRI_df_marker_type_e type, Marker::~Marker () { if (_buffer != nullptr) { - delete _buffer; + delete[] _buffer; } } @@ -351,25 +333,25 @@ void DocumentMarker::dump () const { /// @brief clone a marker from another marker //////////////////////////////////////////////////////////////////////////////// -DocumentMarker DocumentMarker::clone (TRI_df_marker_t const* other, - TRI_voc_tick_t databaseId, - TRI_voc_cid_t collectionId, - TRI_voc_rid_t revisionId, - TRI_voc_tid_t transactionId, - triagens::basics::JsonLegend& legend, - TRI_shaped_json_t const* shapedJson) { +DocumentMarker* DocumentMarker::clone (TRI_df_marker_t const* other, + TRI_voc_tick_t databaseId, + TRI_voc_cid_t collectionId, + TRI_voc_rid_t revisionId, + TRI_voc_tid_t transactionId, + triagens::basics::JsonLegend& legend, + TRI_shaped_json_t const* shapedJson) { char const* base = reinterpret_cast(other); if (other->_type == TRI_DOC_MARKER_KEY_DOCUMENT) { TRI_doc_document_key_marker_t const* original = reinterpret_cast(other); - return DocumentMarker(databaseId, - collectionId, - revisionId, - transactionId, - std::string(base + original->_offsetKey), - legend, - shapedJson); + return new DocumentMarker(databaseId, + collectionId, + revisionId, + transactionId, + std::string(base + original->_offsetKey), + legend, + shapedJson); } else { assert(other->_type == TRI_WAL_MARKER_DOCUMENT); @@ -379,13 +361,13 @@ DocumentMarker DocumentMarker::clone (TRI_df_marker_t const* other, assert(original->_databaseId == databaseId); assert(original->_collectionId == collectionId); - return DocumentMarker(original->_databaseId, - original->_collectionId, - revisionId, - transactionId, - std::string(base + original->_offsetKey), - legend, - shapedJson); + return new DocumentMarker(original->_databaseId, + original->_collectionId, + revisionId, + transactionId, + std::string(base + original->_offsetKey), + legend, + shapedJson); } } @@ -489,13 +471,13 @@ void EdgeMarker::dump () const { /// @brief clone a marker from another marker //////////////////////////////////////////////////////////////////////////////// -EdgeMarker EdgeMarker::clone (TRI_df_marker_t const* other, - TRI_voc_tick_t databaseId, - TRI_voc_cid_t collectionId, - TRI_voc_rid_t revisionId, - TRI_voc_tid_t transactionId, - triagens::basics::JsonLegend& legend, - TRI_shaped_json_t const* shapedJson) { +EdgeMarker* EdgeMarker::clone (TRI_df_marker_t const* other, + TRI_voc_tick_t databaseId, + TRI_voc_cid_t collectionId, + TRI_voc_rid_t revisionId, + TRI_voc_tid_t transactionId, + triagens::basics::JsonLegend& legend, + TRI_shaped_json_t const* shapedJson) { char const* base = reinterpret_cast(other); if (other->_type == TRI_DOC_MARKER_KEY_EDGE) { @@ -507,14 +489,14 @@ EdgeMarker EdgeMarker::clone (TRI_df_marker_t const* other, edge._toKey = (TRI_voc_key_t) base + original->_offsetToKey; edge._fromKey = (TRI_voc_key_t) base + original->_offsetFromKey; - return EdgeMarker(databaseId, - collectionId, - revisionId, - transactionId, - std::string(base + original->base._offsetKey), - &edge, - legend, - shapedJson); + return new EdgeMarker(databaseId, + collectionId, + revisionId, + transactionId, + std::string(base + original->base._offsetKey), + &edge, + legend, + shapedJson); } else { assert(other->_type == TRI_WAL_MARKER_EDGE); @@ -530,14 +512,14 @@ EdgeMarker EdgeMarker::clone (TRI_df_marker_t const* other, edge._toKey = (TRI_voc_key_t) base + original->_offsetToKey; edge._fromKey = (TRI_voc_key_t) base + original->_offsetFromKey; - return EdgeMarker(original->_databaseId, - original->_collectionId, - revisionId, - transactionId, - std::string(base + original->_offsetKey), - &edge, - legend, - shapedJson); + return new EdgeMarker(original->_databaseId, + original->_collectionId, + revisionId, + transactionId, + std::string(base + original->_offsetKey), + &edge, + legend, + shapedJson); } } @@ -591,6 +573,8 @@ void RemoveMarker::dump () const { << ", KEY: " << key() << "\n"; + std::cout << "BEGIN: " << begin() << ", SIZE: " << size() << "\n"; + std::cout << "BINARY: '" << stringifyPart(begin(), size()) << "'\n"; } diff --git a/arangod/Wal/Marker.h b/arangod/Wal/Marker.h index 26043a7433..f46c7255fe 100644 --- a/arangod/Wal/Marker.h +++ b/arangod/Wal/Marker.h @@ -147,20 +147,26 @@ namespace triagens { Marker& operator= (Marker const&) = delete; - Marker (Marker&&); + Marker (Marker&&) = delete; - Marker (Marker const&); + Marker (Marker const&) = delete; Marker (TRI_df_marker_type_e, size_t); - virtual ~Marker (); - // ----------------------------------------------------------------------------- // --SECTION-- public methods // ----------------------------------------------------------------------------- public: + + virtual ~Marker (); + + inline char* steal () { + char* buffer = _buffer; + _buffer = nullptr; + return buffer; + } static inline size_t alignedSize (size_t size) { return TRI_DF_ALIGN_BLOCK(size); @@ -379,13 +385,13 @@ namespace triagens { void dump () const; - static DocumentMarker clone (TRI_df_marker_t const*, - TRI_voc_tick_t, - TRI_voc_cid_t, - TRI_voc_rid_t, - TRI_voc_tid_t, - triagens::basics::JsonLegend&, - TRI_shaped_json_t const*); + static DocumentMarker* clone (TRI_df_marker_t const*, + TRI_voc_tick_t, + TRI_voc_cid_t, + TRI_voc_rid_t, + TRI_voc_tid_t, + triagens::basics::JsonLegend&, + TRI_shaped_json_t const*); }; // ----------------------------------------------------------------------------- @@ -458,13 +464,13 @@ namespace triagens { void dump () const; - static EdgeMarker clone (TRI_df_marker_t const*, - TRI_voc_tick_t, - TRI_voc_cid_t, - TRI_voc_rid_t, - TRI_voc_tid_t, - triagens::basics::JsonLegend&, - TRI_shaped_json_t const*); + static EdgeMarker* clone (TRI_df_marker_t const*, + TRI_voc_tick_t, + TRI_voc_cid_t, + TRI_voc_rid_t, + TRI_voc_tid_t, + triagens::basics::JsonLegend&, + TRI_shaped_json_t const*); }; // ----------------------------------------------------------------------------- diff --git a/arangod/Wal/Slot.cpp b/arangod/Wal/Slot.cpp index 326ab8be80..a8bed28dda 100644 --- a/arangod/Wal/Slot.cpp +++ b/arangod/Wal/Slot.cpp @@ -89,7 +89,8 @@ std::string Slot::statusText () const { void Slot::fill (void* src, size_t size) { - assert(size == _size); + assert(size == _size); + assert(src != nullptr); TRI_df_marker_t* marker = static_cast(src);